1 /*
2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * NOTE! This file may be compiled for userland libraries as well as for
35 * the kernel.
36 */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72
73 /************************************************************************
74 * MESSAGE FUNCTIONS *
75 ************************************************************************/
76
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80 return port->mp_putport(port, msg);
81 }
82
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86 return port->mp_putport_oncpu(port, msg);
87 }
88
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92 KKASSERT(msg->ms_reply_port != NULL &&
93 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100 int error;
101
102 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103 /*
104 * Target port opted to execute the message synchronously so
105 * queue the response.
106 */
107 lwkt_replymsg(msg, error);
108 }
109 }
110
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114 int error;
115
116 if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117 /*
118 * Target port opted to execute the message synchronously so
119 * queue the response.
120 */
121 lwkt_replymsg(msg, error);
122 }
123 }
124
125 /*
126 * lwkt_sendmsg()
127 *
128 * Request asynchronous completion and call lwkt_beginmsg(). The
129 * target port can opt to execute the message synchronously or
130 * asynchronously and this function will automatically queue the
131 * response if the target executes the message synchronously.
132 *
133 * NOTE: The message is in an indeterminant state until this call
134 * returns. The caller should not mess with it (e.g. try to abort it)
135 * until then.
136 *
137 * NOTE: Do not use this function to forward a message as we might
138 * clobber ms_flags in a SMP race.
139 */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143 _lwkt_sendmsg_prepare(port, msg);
144 _lwkt_sendmsg_start(port, msg);
145 }
146
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150 _lwkt_sendmsg_prepare(port, msg);
151 _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157 _lwkt_sendmsg_prepare(port, msg);
158 }
159
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163 _lwkt_sendmsg_start(port, msg);
164 }
165
166 /*
167 * lwkt_domsg()
168 *
169 * Request synchronous completion and call lwkt_beginmsg(). The
170 * target port can opt to execute the message synchronously or
171 * asynchronously and this function will automatically block and
172 * wait for a response if the target executes the message
173 * asynchronously.
174 *
175 * NOTE: Do not use this function to forward a message as we might
176 * clobber ms_flags in a SMP race.
177 */
178 int
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
180 {
181 int error;
182
183 KKASSERT(msg->ms_reply_port != NULL &&
184 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186 msg->ms_flags |= MSGF_SYNC;
187 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
188 /*
189 * Target port opted to execute the message asynchronously so
190 * block and wait for a reply.
191 */
192 error = lwkt_waitmsg(msg, flags);
193 } else {
194 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
195 }
196 return(error);
197 }
198
199 /*
200 * lwkt_forwardmsg()
201 *
202 * Forward a message received on one port to another port.
203 */
204 int
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
206 {
207 int error;
208
209 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210 if ((error = port->mp_putport(port, msg)) != EASYNC)
211 lwkt_replymsg(msg, error);
212 return(error);
213 }
214
215 /*
216 * lwkt_abortmsg()
217 *
218 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
219 * The caller must ensure that the message will not be both replied AND
220 * destroyed while the abort is in progress.
221 *
222 * This function issues a callback which might block!
223 */
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
226 {
227 /*
228 * A critical section protects us from reply IPIs on this cpu.
229 */
230 crit_enter();
231
232 /*
233 * Shortcut the operation if the message has already been returned.
234 * The callback typically constructs a lwkt_msg with the abort request,
235 * issues it synchronously, and waits for completion. The callback
236 * is not required to actually abort the message and the target port,
237 * upon receiving an abort request message generated by the callback
238 * should check whether the original message has already completed or
239 * not.
240 */
241 if (msg->ms_flags & MSGF_ABORTABLE) {
242 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243 msg->ms_abortfn(msg);
244 }
245 crit_exit();
246 }
247
248 /************************************************************************
249 * PORT INITIALIZATION API *
250 ************************************************************************/
251
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
258
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
266
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
272
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
281
282 /*
283 * Core port initialization (internal)
284 */
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288 void *(*gportfn)(lwkt_port_t),
289 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290 int (*wmsgfn)(lwkt_msg_t, int),
291 void *(*wportfn)(lwkt_port_t, int),
292 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
295 {
296 bzero(port, sizeof(*port));
297 port->mp_cpuid = -1;
298 TAILQ_INIT(&port->mp_msgq);
299 TAILQ_INIT(&port->mp_msgq_prio);
300 port->mp_getport = gportfn;
301 port->mp_putport = pportfn;
302 port->mp_waitmsg = wmsgfn;
303 port->mp_waitport = wportfn;
304 port->mp_replyport = rportfn;
305 port->mp_dropmsg = dmsgfn;
306 port->mp_putport_oncpu = pportfn_oncpu;
307 }
308
309 /*
310 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
311 * we tell the scheduler not to reschedule if td is at a higher priority.
312 *
313 * This routine is called even if the thread is already scheduled.
314 */
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
318 {
319 lwkt_schedule(td);
320 }
321
322 /*
323 * lwkt_initport_thread()
324 *
325 * Initialize a port for use by a particular thread. The port may
326 * only be used by <td>.
327 */
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
330 {
331 _lwkt_initport(port,
332 lwkt_thread_getport,
333 lwkt_thread_putport,
334 lwkt_thread_waitmsg,
335 lwkt_thread_waitport,
336 lwkt_thread_replyport,
337 lwkt_thread_dropmsg,
338 lwkt_thread_putport);
339 port->mpu_td = td;
340 }
341
342 /*
343 * lwkt_initport_spin()
344 *
345 * Initialize a port for use with descriptors that might be accessed
346 * via multiple LWPs, processes, or threads. Has somewhat more
347 * overhead then thread ports.
348 */
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
351 {
352 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
354
355 if (td == NULL)
356 dmsgfn = lwkt_panic_dropmsg;
357 else
358 dmsgfn = lwkt_spin_dropmsg;
359
360 if (fixed_cpuid)
361 pportfn_oncpu = lwkt_spin_putport_oncpu;
362 else
363 pportfn_oncpu = lwkt_panic_putport_oncpu;
364
365 _lwkt_initport(port,
366 lwkt_spin_getport,
367 lwkt_spin_putport,
368 lwkt_spin_waitmsg,
369 lwkt_spin_waitport,
370 lwkt_spin_replyport,
371 dmsgfn,
372 pportfn_oncpu);
373 spin_init(&port->mpu_spin);
374 port->mpu_td = td;
375 if (fixed_cpuid)
376 port->mp_cpuid = td->td_gd->gd_cpuid;
377 }
378
379 /*
380 * lwkt_initport_serialize()
381 *
382 * Initialize a port for use with descriptors that might be accessed
383 * via multiple LWPs, processes, or threads. Callers are assumed to
384 * have held the serializer (slz).
385 */
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
388 {
389 _lwkt_initport(port,
390 lwkt_serialize_getport,
391 lwkt_serialize_putport,
392 lwkt_serialize_waitmsg,
393 lwkt_serialize_waitport,
394 lwkt_serialize_replyport,
395 lwkt_panic_dropmsg,
396 lwkt_panic_putport_oncpu);
397 port->mpu_serialize = slz;
398 }
399
400 /*
401 * Similar to the standard initport, this function simply marks the message
402 * as being done and does not attempt to return it to an originating port.
403 */
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
406 {
407 _lwkt_initport(port,
408 lwkt_panic_getport,
409 lwkt_panic_putport,
410 lwkt_panic_waitmsg,
411 lwkt_panic_waitport,
412 lwkt_null_replyport,
413 lwkt_panic_dropmsg,
414 lwkt_panic_putport_oncpu);
415 }
416
417 /*
418 * Initialize a reply-only port, typically used as a message sink. Such
419 * ports can only be used as a reply port.
420 */
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
424 {
425 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426 lwkt_panic_waitmsg, lwkt_panic_waitport,
427 rportfn, lwkt_panic_dropmsg,
428 lwkt_panic_putport_oncpu);
429 }
430
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
434 {
435 _lwkt_initport(port, lwkt_panic_getport, pportfn,
436 lwkt_panic_waitmsg, lwkt_panic_waitport,
437 lwkt_panic_replyport, lwkt_panic_dropmsg,
438 lwkt_panic_putport_oncpu);
439 }
440
441 void
442 lwkt_initport_panic(lwkt_port_t port)
443 {
444 _lwkt_initport(port,
445 lwkt_panic_getport, lwkt_panic_putport,
446 lwkt_panic_waitmsg, lwkt_panic_waitport,
447 lwkt_panic_replyport, lwkt_panic_dropmsg,
448 lwkt_panic_putport_oncpu);
449 }
450
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
454 {
455 lwkt_msg_queue *queue;
456
457 /*
458 * normal case, remove and return the message.
459 */
460 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461 queue = &port->mp_msgq_prio;
462 else
463 queue = &port->mp_msgq;
464 TAILQ_REMOVE(queue, msg, ms_node);
465
466 /*
467 * atomic op needed for spin ports
468 */
469 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
470 }
471
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
475 {
476 lwkt_msg_queue *queue;
477
478 /*
479 * atomic op needed for spin ports
480 */
481 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483 queue = &port->mp_msgq_prio;
484 else
485 queue = &port->mp_msgq;
486 TAILQ_INSERT_TAIL(queue, msg, ms_node);
487 }
488
489 static __inline
490 lwkt_msg_t
491 _lwkt_pollmsg(lwkt_port_t port)
492 {
493 lwkt_msg_t msg;
494
495 msg = TAILQ_FIRST(&port->mp_msgq_prio);
496 if (__predict_false(msg != NULL))
497 return msg;
498
499 /*
500 * Priority queue has no message, fallback to non-priority queue.
501 */
502 return TAILQ_FIRST(&port->mp_msgq);
503 }
504
505 static __inline
506 void
507 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
508 {
509 /*
510 * atomic op needed for spin ports
511 */
512 _lwkt_pushmsg(port, msg);
513 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
514 }
515
516 /************************************************************************
517 * THREAD PORT BACKEND *
518 ************************************************************************
519 *
520 * This backend is used when the port a message is retrieved from is owned
521 * by a single thread (the calling thread). Messages are IPId to the
522 * correct cpu before being enqueued to a port. Note that this is fairly
523 * optimal since scheduling would have had to do an IPI anyway if the
524 * message were headed to a different cpu.
525 */
526
527 /*
528 * This function completes reply processing for the default case in the
529 * context of the originating cpu.
530 */
531 static
532 void
533 lwkt_thread_replyport_remote(lwkt_msg_t msg)
534 {
535 lwkt_port_t port = msg->ms_reply_port;
536 int flags;
537
538 /*
539 * Chase any thread migration that occurs
540 */
541 if (port->mpu_td->td_gd != mycpu) {
542 lwkt_send_ipiq(port->mpu_td->td_gd,
543 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
544 return;
545 }
546
547 /*
548 * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
549 */
550 #ifdef INVARIANTS
551 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
552 msg->ms_flags &= ~MSGF_INTRANSIT;
553 #endif
554 flags = msg->ms_flags;
555 if (msg->ms_flags & MSGF_SYNC) {
556 cpu_sfence();
557 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
558 } else {
559 _lwkt_enqueue_reply(port, msg);
560 }
561 if (port->mp_flags & MSGPORTF_WAITING)
562 _lwkt_schedule_msg(port->mpu_td, flags);
563 }
564
565 /*
566 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
567 *
568 * Called with the reply port as an argument but in the context of the
569 * original target port. Completion must occur on the target port's
570 * cpu.
571 *
572 * The critical section protects us from IPIs on the this CPU.
573 */
574 static
575 void
576 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
577 {
578 int flags;
579
580 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
581
582 if (msg->ms_flags & MSGF_SYNC) {
583 /*
584 * If a synchronous completion has been requested, just wakeup
585 * the message without bothering to queue it to the target port.
586 *
587 * Assume the target thread is non-preemptive, so no critical
588 * section is required.
589 */
590 if (port->mpu_td->td_gd == mycpu) {
591 crit_enter();
592 flags = msg->ms_flags;
593 cpu_sfence();
594 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
595 if (port->mp_flags & MSGPORTF_WAITING)
596 _lwkt_schedule_msg(port->mpu_td, flags);
597 crit_exit();
598 } else {
599 #ifdef INVARIANTS
600 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
601 #endif
602 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
603 lwkt_send_ipiq(port->mpu_td->td_gd,
604 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
605 }
606 } else {
607 /*
608 * If an asynchronous completion has been requested the message
609 * must be queued to the reply port.
610 *
611 * A critical section is required to interlock the port queue.
612 */
613 if (port->mpu_td->td_gd == mycpu) {
614 crit_enter();
615 _lwkt_enqueue_reply(port, msg);
616 if (port->mp_flags & MSGPORTF_WAITING)
617 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
618 crit_exit();
619 } else {
620 #ifdef INVARIANTS
621 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
622 #endif
623 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
624 lwkt_send_ipiq(port->mpu_td->td_gd,
625 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
626 }
627 }
628 }
629
630 /*
631 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
632 *
633 * This function could _only_ be used when caller is in the same thread
634 * as the message's target port owner thread.
635 */
636 static int
637 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
638 {
639 int error;
640
641 KASSERT(port->mpu_td == curthread,
642 ("message could only be dropped in the same thread "
643 "as the message target port thread"));
644 crit_enter_quick(port->mpu_td);
645 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
646 _lwkt_pullmsg(port, msg);
647 atomic_set_int(&msg->ms_flags, MSGF_DONE);
648 error = 0;
649 } else {
650 error = ENOENT;
651 }
652 crit_exit_quick(port->mpu_td);
653
654 return (error);
655 }
656
657 /*
658 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
659 *
660 * Called with the target port as an argument but in the context of the
661 * reply port. This function always implements an asynchronous put to
662 * the target message port, and thus returns EASYNC.
663 *
664 * The message must already have cleared MSGF_DONE and MSGF_REPLY
665 */
666 static
667 void
668 lwkt_thread_putport_remote(lwkt_msg_t msg)
669 {
670 lwkt_port_t port = msg->ms_target_port;
671
672 /*
673 * Chase any thread migration that occurs
674 */
675 if (port->mpu_td->td_gd != mycpu) {
676 lwkt_send_ipiq(port->mpu_td->td_gd,
677 (ipifunc1_t)lwkt_thread_putport_remote, msg);
678 return;
679 }
680
681 /*
682 * An atomic op is needed on ms_flags vs originator. Also
683 * note that the originator might be using a different type
684 * of msgport.
685 */
686 #ifdef INVARIANTS
687 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
688 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
689 #endif
690 _lwkt_pushmsg(port, msg);
691 if (port->mp_flags & MSGPORTF_WAITING)
692 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
693 }
694
695 static
696 int
697 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
698 {
699 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
700
701 msg->ms_target_port = port;
702 if (port->mpu_td->td_gd == mycpu) {
703 crit_enter();
704 _lwkt_pushmsg(port, msg);
705 if (port->mp_flags & MSGPORTF_WAITING)
706 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
707 crit_exit();
708 } else {
709 #ifdef INVARIANTS
710 /*
711 * Cleanup.
712 *
713 * An atomic op is needed on ms_flags vs originator. Also
714 * note that the originator might be using a different type
715 * of msgport.
716 */
717 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
718 #endif
719 lwkt_send_ipiq(port->mpu_td->td_gd,
720 (ipifunc1_t)lwkt_thread_putport_remote, msg);
721 }
722 return (EASYNC);
723 }
724
725 /*
726 * lwkt_thread_getport()
727 *
728 * Retrieve the next message from the port or NULL if no messages
729 * are ready.
730 */
731 static
732 void *
733 lwkt_thread_getport(lwkt_port_t port)
734 {
735 lwkt_msg_t msg;
736
737 KKASSERT(port->mpu_td == curthread);
738
739 crit_enter_quick(port->mpu_td);
740 if ((msg = _lwkt_pollmsg(port)) != NULL)
741 _lwkt_pullmsg(port, msg);
742 crit_exit_quick(port->mpu_td);
743 return(msg);
744 }
745
746 /*
747 * lwkt_thread_waitmsg()
748 *
749 * Wait for a particular message to be replied. We must be the only
750 * thread waiting on the message. The port must be owned by the
751 * caller.
752 */
753 static
754 int
755 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
756 {
757 thread_t td = curthread;
758
759 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
760 ("can't wait dropable message"));
761
762 if ((msg->ms_flags & MSGF_DONE) == 0) {
763 /*
764 * If the done bit was not set we have to block until it is.
765 */
766 lwkt_port_t port = msg->ms_reply_port;
767 int sentabort;
768
769 KKASSERT(port->mpu_td == td);
770 crit_enter_quick(td);
771 sentabort = 0;
772
773 while ((msg->ms_flags & MSGF_DONE) == 0) {
774 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
775 if (sentabort == 0) {
776 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
777 lwkt_abortmsg(msg);
778 }
779 } else {
780 lwkt_sleep("waitabt", 0);
781 }
782 port->mp_flags &= ~MSGPORTF_WAITING;
783 }
784 if (msg->ms_flags & MSGF_QUEUED)
785 _lwkt_pullmsg(port, msg);
786 crit_exit_quick(td);
787 } else {
788 /*
789 * If the done bit was set we only have to mess around with the
790 * message if it is queued on the reply port.
791 */
792 crit_enter_quick(td);
793 if (msg->ms_flags & MSGF_QUEUED) {
794 lwkt_port_t port = msg->ms_reply_port;
795 thread_t td __debugvar = curthread;
796
797 KKASSERT(port->mpu_td == td);
798 _lwkt_pullmsg(port, msg);
799 }
800 crit_exit_quick(td);
801 }
802 return(msg->ms_error);
803 }
804
805 /*
806 * lwkt_thread_waitport()
807 *
808 * Wait for a new message to be available on the port. We must be the
809 * the only thread waiting on the port. The port must be owned by caller.
810 */
811 static
812 void *
813 lwkt_thread_waitport(lwkt_port_t port, int flags)
814 {
815 thread_t td = curthread;
816 lwkt_msg_t msg;
817 int error;
818
819 KKASSERT(port->mpu_td == td);
820 crit_enter_quick(td);
821 while ((msg = _lwkt_pollmsg(port)) == NULL) {
822 port->mp_flags |= MSGPORTF_WAITING;
823 error = lwkt_sleep("waitport", flags);
824 port->mp_flags &= ~MSGPORTF_WAITING;
825 if (error)
826 goto done;
827 }
828 _lwkt_pullmsg(port, msg);
829 done:
830 crit_exit_quick(td);
831 return(msg);
832 }
833
834 /************************************************************************
835 * SPIN PORT BACKEND *
836 ************************************************************************
837 *
838 * This backend uses spinlocks instead of making assumptions about which
839 * thread is accessing the port. It must be used when a port is not owned
840 * by a particular thread. This is less optimal then thread ports but
841 * you don't have a choice if there are multiple threads accessing the port.
842 *
843 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
844 * on the message port, it is the responsibility of the code doing the
845 * wakeup to clear this flag rather then the blocked threads. Some
846 * superfluous wakeups may occur, which is ok.
847 *
848 * XXX synchronous message wakeups are not current optimized.
849 */
850
851 static
852 void *
853 lwkt_spin_getport(lwkt_port_t port)
854 {
855 lwkt_msg_t msg;
856
857 spin_lock(&port->mpu_spin);
858 if ((msg = _lwkt_pollmsg(port)) != NULL)
859 _lwkt_pullmsg(port, msg);
860 spin_unlock(&port->mpu_spin);
861 return(msg);
862 }
863
864 static __inline int
865 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
866 {
867 int dowakeup;
868
869 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
870
871 msg->ms_target_port = port;
872 spin_lock(&port->mpu_spin);
873 _lwkt_pushmsg(port, msg);
874 dowakeup = 0;
875 if (port->mp_flags & MSGPORTF_WAITING) {
876 port->mp_flags &= ~MSGPORTF_WAITING;
877 dowakeup = 1;
878 }
879 spin_unlock(&port->mpu_spin);
880
881 return dowakeup;
882 }
883
884 static
885 int
886 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
887 {
888 if (lwkt_spin_putport_only(port, msg))
889 wakeup(port);
890 return (EASYNC);
891 }
892
893 static
894 int
895 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
896 {
897 KASSERT(port->mp_cpuid == mycpuid,
898 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
899 port->mp_cpuid, mycpuid));
900 if (lwkt_spin_putport_only(port, msg))
901 wakeup_mycpu(port);
902 return (EASYNC);
903 }
904
905 static
906 int
907 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
908 {
909 lwkt_port_t port;
910 int sentabort;
911 int error;
912
913 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
914 ("can't wait dropable message"));
915 port = msg->ms_reply_port;
916
917 if ((msg->ms_flags & MSGF_DONE) == 0) {
918 sentabort = 0;
919 spin_lock(&port->mpu_spin);
920 while ((msg->ms_flags & MSGF_DONE) == 0) {
921 void *won;
922
923 /*
924 * If message was sent synchronously from the beginning
925 * the wakeup will be on the message structure, else it
926 * will be on the port structure.
927 *
928 * ms_flags needs atomic op originator vs target MSGF_QUEUED
929 */
930 if (msg->ms_flags & MSGF_SYNC) {
931 won = msg;
932 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
933 } else {
934 won = port;
935 port->mp_flags |= MSGPORTF_WAITING;
936 }
937
938 /*
939 * Only messages which support abort can be interrupted.
940 * We must still wait for message completion regardless.
941 */
942 if ((flags & PCATCH) && sentabort == 0) {
943 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
944 if (error) {
945 sentabort = error;
946 spin_unlock(&port->mpu_spin);
947 lwkt_abortmsg(msg);
948 spin_lock(&port->mpu_spin);
949 }
950 } else {
951 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
952 }
953 /* see note at the top on the MSGPORTF_WAITING flag */
954 }
955 /*
956 * Turn EINTR into ERESTART if the signal indicates.
957 */
958 if (sentabort && msg->ms_error == EINTR)
959 msg->ms_error = sentabort;
960 if (msg->ms_flags & MSGF_QUEUED)
961 _lwkt_pullmsg(port, msg);
962 spin_unlock(&port->mpu_spin);
963 } else {
964 spin_lock(&port->mpu_spin);
965 if (msg->ms_flags & MSGF_QUEUED) {
966 _lwkt_pullmsg(port, msg);
967 }
968 spin_unlock(&port->mpu_spin);
969 }
970 return(msg->ms_error);
971 }
972
973 static
974 void *
975 lwkt_spin_waitport(lwkt_port_t port, int flags)
976 {
977 lwkt_msg_t msg;
978 int error;
979
980 spin_lock(&port->mpu_spin);
981 while ((msg = _lwkt_pollmsg(port)) == NULL) {
982 port->mp_flags |= MSGPORTF_WAITING;
983 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
984 /* see note at the top on the MSGPORTF_WAITING flag */
985 if (error) {
986 spin_unlock(&port->mpu_spin);
987 return(NULL);
988 }
989 }
990 _lwkt_pullmsg(port, msg);
991 spin_unlock(&port->mpu_spin);
992 return(msg);
993 }
994
995 static
996 void
997 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
998 {
999 int dowakeup;
1000
1001 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1002
1003 if (msg->ms_flags & MSGF_SYNC) {
1004 /*
1005 * If a synchronous completion has been requested, just wakeup
1006 * the message without bothering to queue it to the target port.
1007 *
1008 * ms_flags protected by reply port spinlock
1009 */
1010 spin_lock(&port->mpu_spin);
1011 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1012 dowakeup = 0;
1013 if (msg->ms_flags & MSGF_WAITING) {
1014 msg->ms_flags &= ~MSGF_WAITING;
1015 dowakeup = 1;
1016 }
1017 spin_unlock(&port->mpu_spin);
1018 if (dowakeup)
1019 wakeup(msg);
1020 } else {
1021 /*
1022 * If an asynchronous completion has been requested the message
1023 * must be queued to the reply port.
1024 */
1025 spin_lock(&port->mpu_spin);
1026 _lwkt_enqueue_reply(port, msg);
1027 dowakeup = 0;
1028 if (port->mp_flags & MSGPORTF_WAITING) {
1029 port->mp_flags &= ~MSGPORTF_WAITING;
1030 dowakeup = 1;
1031 }
1032 spin_unlock(&port->mpu_spin);
1033 if (dowakeup)
1034 wakeup(port);
1035 }
1036 }
1037
1038 /*
1039 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1040 *
1041 * This function could _only_ be used when caller is in the same thread
1042 * as the message's target port owner thread.
1043 */
1044 static int
1045 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1046 {
1047 int error;
1048
1049 KASSERT(port->mpu_td == curthread,
1050 ("message could only be dropped in the same thread "
1051 "as the message target port thread\n"));
1052 spin_lock(&port->mpu_spin);
1053 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1054 _lwkt_pullmsg(port, msg);
1055 msg->ms_flags |= MSGF_DONE;
1056 error = 0;
1057 } else {
1058 error = ENOENT;
1059 }
1060 spin_unlock(&port->mpu_spin);
1061
1062 return (error);
1063 }
1064
1065 /************************************************************************
1066 * SERIALIZER PORT BACKEND *
1067 ************************************************************************
1068 *
1069 * This backend uses serializer to protect port accessing. Callers are
1070 * assumed to have serializer held. This kind of port is usually created
1071 * by network device driver along with _one_ lwkt thread to pipeline
1072 * operations which may temporarily release serializer.
1073 *
1074 * Implementation is based on SPIN PORT BACKEND.
1075 */
1076
1077 static
1078 void *
1079 lwkt_serialize_getport(lwkt_port_t port)
1080 {
1081 lwkt_msg_t msg;
1082
1083 ASSERT_SERIALIZED(port->mpu_serialize);
1084
1085 if ((msg = _lwkt_pollmsg(port)) != NULL)
1086 _lwkt_pullmsg(port, msg);
1087 return(msg);
1088 }
1089
1090 static
1091 int
1092 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1093 {
1094 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1095 ASSERT_SERIALIZED(port->mpu_serialize);
1096
1097 msg->ms_target_port = port;
1098 _lwkt_pushmsg(port, msg);
1099 if (port->mp_flags & MSGPORTF_WAITING) {
1100 port->mp_flags &= ~MSGPORTF_WAITING;
1101 wakeup(port);
1102 }
1103 return (EASYNC);
1104 }
1105
1106 static
1107 int
1108 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1109 {
1110 lwkt_port_t port;
1111 int sentabort;
1112 int error;
1113
1114 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1115 ("can't wait dropable message"));
1116
1117 if ((msg->ms_flags & MSGF_DONE) == 0) {
1118 port = msg->ms_reply_port;
1119
1120 ASSERT_SERIALIZED(port->mpu_serialize);
1121
1122 sentabort = 0;
1123 while ((msg->ms_flags & MSGF_DONE) == 0) {
1124 void *won;
1125
1126 /*
1127 * If message was sent synchronously from the beginning
1128 * the wakeup will be on the message structure, else it
1129 * will be on the port structure.
1130 */
1131 if (msg->ms_flags & MSGF_SYNC) {
1132 won = msg;
1133 } else {
1134 won = port;
1135 port->mp_flags |= MSGPORTF_WAITING;
1136 }
1137
1138 /*
1139 * Only messages which support abort can be interrupted.
1140 * We must still wait for message completion regardless.
1141 */
1142 if ((flags & PCATCH) && sentabort == 0) {
1143 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1144 if (error) {
1145 sentabort = error;
1146 lwkt_serialize_exit(port->mpu_serialize);
1147 lwkt_abortmsg(msg);
1148 lwkt_serialize_enter(port->mpu_serialize);
1149 }
1150 } else {
1151 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1152 }
1153 /* see note at the top on the MSGPORTF_WAITING flag */
1154 }
1155 /*
1156 * Turn EINTR into ERESTART if the signal indicates.
1157 */
1158 if (sentabort && msg->ms_error == EINTR)
1159 msg->ms_error = sentabort;
1160 if (msg->ms_flags & MSGF_QUEUED)
1161 _lwkt_pullmsg(port, msg);
1162 } else {
1163 if (msg->ms_flags & MSGF_QUEUED) {
1164 port = msg->ms_reply_port;
1165
1166 ASSERT_SERIALIZED(port->mpu_serialize);
1167 _lwkt_pullmsg(port, msg);
1168 }
1169 }
1170 return(msg->ms_error);
1171 }
1172
1173 static
1174 void *
1175 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1176 {
1177 lwkt_msg_t msg;
1178 int error;
1179
1180 ASSERT_SERIALIZED(port->mpu_serialize);
1181
1182 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1183 port->mp_flags |= MSGPORTF_WAITING;
1184 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1185 /* see note at the top on the MSGPORTF_WAITING flag */
1186 if (error)
1187 return(NULL);
1188 }
1189 _lwkt_pullmsg(port, msg);
1190 return(msg);
1191 }
1192
1193 static
1194 void
1195 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1196 {
1197 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1198 ASSERT_SERIALIZED(port->mpu_serialize);
1199
1200 if (msg->ms_flags & MSGF_SYNC) {
1201 /*
1202 * If a synchronous completion has been requested, just wakeup
1203 * the message without bothering to queue it to the target port.
1204 *
1205 * (both sides synchronized via serialized reply port)
1206 */
1207 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1208 wakeup(msg);
1209 } else {
1210 /*
1211 * If an asynchronous completion has been requested the message
1212 * must be queued to the reply port.
1213 */
1214 _lwkt_enqueue_reply(port, msg);
1215 if (port->mp_flags & MSGPORTF_WAITING) {
1216 port->mp_flags &= ~MSGPORTF_WAITING;
1217 wakeup(port);
1218 }
1219 }
1220 }
1221
1222 /************************************************************************
1223 * PANIC AND SPECIAL PORT FUNCTIONS *
1224 ************************************************************************/
1225
1226 /*
1227 * You can point a port's reply vector at this function if you just want
1228 * the message marked done, without any queueing or signaling. This is
1229 * often used for structure-embedded messages.
1230 */
1231 static
1232 void
1233 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1234 {
1235 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1236 }
1237
1238 static
1239 void *
1240 lwkt_panic_getport(lwkt_port_t port)
1241 {
1242 panic("lwkt_getport() illegal on port %p", port);
1243 }
1244
1245 static
1246 int
1247 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1248 {
1249 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1250 }
1251
1252 static
1253 int
1254 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1255 {
1256 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1257 }
1258
1259 static
1260 void *
1261 lwkt_panic_waitport(lwkt_port_t port, int flags)
1262 {
1263 panic("port %p cannot be waited on", port);
1264 }
1265
1266 static
1267 void
1268 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1269 {
1270 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1271 }
1272
1273 static
1274 int
1275 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1276 {
1277 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1278 /* NOT REACHED */
1279 return (ENOENT);
1280 }
1281
1282 static
1283 int
1284 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1285 {
1286 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1287 port, msg);
1288 /* NOT REACHED */
1289 return (ENOENT);
1290 }
Cache object: 71d647319385be869d5bd1c9e0dc088b
|