FreeBSD/Linux Kernel Cross Reference
sys/ipc/ipc_mqueue.c
1 /*
2 * Mach Operating System
3 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
4 * All Rights Reserved.
5 *
6 * Permission to use, copy, modify and distribute this software and its
7 * documentation is hereby granted, provided that both the copyright
8 * notice and this permission notice appear in all copies of the
9 * software, derivative works or modified versions, and any portions
10 * thereof, and that both notices appear in supporting documentation.
11 *
12 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
13 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
14 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
15 *
16 * Carnegie Mellon requests users of this software to return to
17 *
18 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
19 * School of Computer Science
20 * Carnegie Mellon University
21 * Pittsburgh PA 15213-3890
22 *
23 * any improvements or extensions that they make and grant Carnegie Mellon
24 * the rights to redistribute these changes.
25 */
26 /*
27 * HISTORY
28 * $Log: ipc_mqueue.c,v $
29 * Revision 2.21 92/08/03 17:34:47 jfriedl
30 * removed silly prototypes
31 * [92/08/02 jfriedl]
32 *
33 * Revision 2.20 92/05/21 17:10:42 jfriedl
34 * Made correct for when assert is off.
35 * Also some cleanup to quiet gcc warnings.
36 * [92/05/16 jfriedl]
37 *
38 * Revision 2.19 92/03/10 16:26:01 jsb
39 * Merged in norma branch changes as of NORMA_MK7:
40 * Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
41 * before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
42 * [92/01/17 14:35:03 jsb]
43 * Removed spurious arguments to norma_ipc_send.
44 * Options and timeout will be handled here, not by norma_ipc_send.
45 * [91/12/26 19:51:59 jsb]
46 *
47 * Revision 2.18 92/01/03 20:13:05 dbg
48 * Removed THREAD_SHOULD_TERMINATE.
49 * [91/12/19 dbg]
50 *
51 * Revision 2.17 91/12/15 10:40:33 jsb
52 * Added norma_ipc_finish_receiving call to support large in-line msgs.
53 *
54 * Revision 2.16 91/12/14 14:27:10 jsb
55 * Removed ipc_fields.h hack.
56 *
57 * Revision 2.15 91/11/14 16:56:07 rpd
58 * Picked up mysterious norma changes.
59 * [91/11/14 rpd]
60 *
61 * Revision 2.14 91/08/28 11:13:34 jsb
62 * Added seqno argument to ipc_mqueue_receive.
63 * Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
64 * [91/08/10 rpd]
65 * Fixed norma_ipc_handoff technology.
66 * Renamed clport things to norma_ipc things.
67 * [91/08/15 08:23:17 jsb]
68 *
69 * Revision 2.13 91/08/03 18:18:27 jsb
70 * Renamed replenish routine.
71 * [91/08/01 23:00:06 jsb]
72 *
73 * Removed obsolete include.
74 * Added option, timeout parameters to ipc_clport_send.
75 * [91/07/17 14:04:15 jsb]
76 *
77 * Revision 2.12 91/06/25 10:27:34 rpd
78 * Added some wait_result assertions.
79 * [91/05/30 rpd]
80 *
81 * Revision 2.11 91/06/17 15:46:18 jsb
82 * Renamed NORMA conditionals.
83 * [91/06/17 10:44:39 jsb]
84 *
85 * Revision 2.10 91/06/06 17:06:06 jsb
86 * Added call to ip_unlock after calling ipc_clport_send.
87 * Added support for clport handoff.
88 * [91/06/06 16:05:12 jsb]
89 *
90 * Revision 2.9 91/05/14 16:33:58 mrt
91 * Correcting copyright
92 *
93 * Revision 2.8 91/03/16 14:48:18 rpd
94 * Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
95 * to thread_{go,will_wait,will_wait_with_timeout}.
96 * Replaced ipc_thread_block with thread_block.
97 * [91/02/17 rpd]
98 *
99 * Revision 2.7 91/02/05 17:22:24 mrt
100 * Changed to new Mach copyright
101 * [91/02/01 15:46:33 mrt]
102 *
103 * Revision 2.6 91/01/08 15:14:35 rpd
104 * Changed continuation argument to (void (*)()).
105 * [90/12/18 rpd]
106 * Reorganized ipc_mqueue_receive.
107 * [90/11/22 rpd]
108 *
109 * Minor cleanup.
110 * [90/11/11 rpd]
111 *
112 * Revision 2.5 90/12/14 11:02:32 jsb
113 * Changed parameters in ipc_clport_send call.
114 * [90/12/13 21:20:13 jsb]
115 *
116 * Revision 2.4 90/11/05 14:29:04 rpd
117 * Use new io_reference and io_release.
118 * Use new ip_reference and ip_release.
119 * [90/10/29 rpd]
120 *
121 * Revision 2.3 90/09/28 16:54:58 jsb
122 * Added NORMA_IPC support.
123 * [90/09/28 14:03:24 jsb]
124 *
125 * Revision 2.2 90/06/02 14:50:39 rpd
126 * Created for new IPC.
127 * [90/03/26 20:57:06 rpd]
128 *
129 */
130 /*
131 * File: ipc/ipc_mqueue.c
132 * Author: Rich Draves
133 * Date: 1989
134 *
135 * Functions to manipulate IPC message queues.
136 */
137
138 #include <norma_ipc.h>
139
140 #include <mach/port.h>
141 #include <mach/message.h>
142 #include <kern/assert.h>
143 #include <kern/counters.h>
144 #include <kern/sched_prim.h>
145 #include <kern/ipc_sched.h>
146 #include <kern/ipc_kobject.h>
147 #include <ipc/ipc_mqueue.h>
148 #include <ipc/ipc_thread.h>
149 #include <ipc/ipc_kmsg.h>
150 #include <ipc/ipc_port.h>
151 #include <ipc/ipc_pset.h>
152 #include <ipc/ipc_space.h>
153 #include <ipc/ipc_marequest.h>
154
155
156
157 #if NORMA_IPC
158 extern ipc_mqueue_t norma_ipc_handoff_mqueue;
159 extern ipc_kmsg_t norma_ipc_handoff_msg;
160 extern mach_msg_size_t norma_ipc_handoff_max_size;
161 extern mach_msg_size_t norma_ipc_handoff_msg_size;
162 extern ipc_kmsg_t norma_ipc_kmsg_accept();
163 #endif NORMA_IPC
164
165 /*
166 * Routine: ipc_mqueue_init
167 * Purpose:
168 * Initialize a newly-allocated message queue.
169 */
170
171 void
172 ipc_mqueue_init(mqueue)
173 ipc_mqueue_t mqueue;
174 {
175 imq_lock_init(mqueue);
176 ipc_kmsg_queue_init(&mqueue->imq_messages);
177 ipc_thread_queue_init(&mqueue->imq_threads);
178 }
179
180 /*
181 * Routine: ipc_mqueue_move
182 * Purpose:
183 * Move messages from one queue (source) to another (dest).
184 * Only moves messages sent to the specified port.
185 * Conditions:
186 * Both queues must be locked.
187 * (This is sufficient to manipulate port->ip_seqno.)
188 */
189
190 void
191 ipc_mqueue_move(dest, source, port)
192 ipc_mqueue_t dest;
193 ipc_mqueue_t source;
194 ipc_port_t port;
195 {
196 ipc_kmsg_queue_t oldq, newq;
197 ipc_thread_queue_t blockedq;
198 ipc_kmsg_t kmsg, next;
199 ipc_thread_t th;
200
201 oldq = &source->imq_messages;
202 newq = &dest->imq_messages;
203 blockedq = &dest->imq_threads;
204
205 for (kmsg = ipc_kmsg_queue_first(oldq);
206 kmsg != IKM_NULL; kmsg = next) {
207 next = ipc_kmsg_queue_next(oldq, kmsg);
208
209 /* only move messages sent to port */
210
211 if (kmsg->ikm_header.msgh_remote_port != (mach_port_t) port)
212 continue;
213
214 ipc_kmsg_rmqueue(oldq, kmsg);
215
216 /* before adding kmsg to newq, check for a blocked receiver */
217
218 while ((th = ipc_thread_dequeue(blockedq)) != ITH_NULL) {
219 assert(ipc_kmsg_queue_empty(newq));
220
221 thread_go(th);
222
223 /* check if the receiver can handle the message */
224
225 if (kmsg->ikm_header.msgh_size <= th->ith_msize) {
226 th->ith_state = MACH_MSG_SUCCESS;
227 th->ith_kmsg = kmsg;
228 th->ith_seqno = port->ip_seqno++;
229
230 goto next_kmsg;
231 }
232
233 th->ith_state = MACH_RCV_TOO_LARGE;
234 th->ith_msize = kmsg->ikm_header.msgh_size;
235 }
236
237 /* didn't find a receiver to handle the message */
238
239 ipc_kmsg_enqueue(newq, kmsg);
240 next_kmsg:;
241 }
242 }
243
244 /*
245 * Routine: ipc_mqueue_changed
246 * Purpose:
247 * Wake up receivers waiting in a message queue.
248 * Conditions:
249 * The message queue is locked.
250 */
251
252 void
253 ipc_mqueue_changed(mqueue, mr)
254 ipc_mqueue_t mqueue;
255 mach_msg_return_t mr;
256 {
257 ipc_thread_t th;
258
259 while ((th = ipc_thread_dequeue(&mqueue->imq_threads)) != ITH_NULL) {
260 th->ith_state = mr;
261 thread_go(th);
262 }
263 }
264
265 /*
266 * Routine: ipc_mqueue_send
267 * Purpose:
268 * Send a message to a port. The message holds a reference
269 * for the destination port in the msgh_remote_port field.
270 *
271 * If unsuccessful, the caller still has possession of
272 * the message and must do something with it. If successful,
273 * the message is queued, given to a receiver, destroyed,
274 * or handled directly by the kernel via mach_msg.
275 * Conditions:
276 * Nothing locked.
277 * Returns:
278 * MACH_MSG_SUCCESS The message was accepted.
279 * MACH_SEND_TIMED_OUT Caller still has message.
280 * MACH_SEND_INTERRUPTED Caller still has message.
281 */
282
283 mach_msg_return_t
284 ipc_mqueue_send(kmsg, option, time_out)
285 ipc_kmsg_t kmsg;
286 mach_msg_option_t option;
287 mach_msg_timeout_t time_out;
288 {
289 ipc_port_t port;
290
291 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
292 assert(IP_VALID(port));
293
294 ip_lock(port);
295
296 if (port->ip_receiver == ipc_space_kernel) {
297 ipc_kmsg_t reply;
298
299 /*
300 * We can check ip_receiver == ipc_space_kernel
301 * before checking that the port is active because
302 * ipc_port_dealloc_kernel clears ip_receiver
303 * before destroying a kernel port.
304 */
305
306 assert(ip_active(port));
307 ip_unlock(port);
308
309 reply = ipc_kobject_server(kmsg);
310 if (reply != IKM_NULL)
311 ipc_mqueue_send_always(reply);
312
313 return MACH_MSG_SUCCESS;
314 }
315
316 #if NORMA_IPC
317 if (IP_NORMA_IS_PROXY(port)) {
318 mach_msg_return_t mr;
319
320 mr = norma_ipc_send(kmsg);
321 ip_unlock(port);
322 return mr;
323 }
324 #endif NORMA_IPC
325
326 for (;;) {
327 ipc_thread_t self;
328
329 /*
330 * Can't deliver to a dead port.
331 * However, we can pretend it got sent
332 * and was then immediately destroyed.
333 */
334
335 if (!ip_active(port)) {
336 /*
337 * We can't let ipc_kmsg_destroy deallocate
338 * the port right, because we might end up
339 * in an infinite loop trying to deliver
340 * a send-once notification.
341 */
342
343 ip_release(port);
344 ip_check_unlock(port);
345 kmsg->ikm_header.msgh_remote_port = MACH_PORT_NULL;
346 #if NORMA_IPC
347 /* XXX until ipc_kmsg_destroy is fixed... */
348 norma_ipc_finish_receiving(&kmsg);
349 #endif NORMA_IPC
350 ipc_kmsg_destroy(kmsg);
351 return MACH_MSG_SUCCESS;
352 }
353
354 /*
355 * Don't block if:
356 * 1) We're under the queue limit.
357 * 2) Caller used the MACH_SEND_ALWAYS internal option.
358 * 3) Message is sent to a send-once right.
359 */
360
361 if ((port->ip_msgcount < port->ip_qlimit) ||
362 (option & MACH_SEND_ALWAYS) ||
363 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
364 MACH_MSG_TYPE_PORT_SEND_ONCE))
365 break;
366
367 /* must block waiting for queue to clear */
368
369 self = current_thread();
370
371 if (option & MACH_SEND_TIMEOUT) {
372 if (time_out == 0) {
373 ip_unlock(port);
374 return MACH_SEND_TIMED_OUT;
375 }
376
377 thread_will_wait_with_timeout(self, time_out);
378 } else
379 thread_will_wait(self);
380
381 ipc_thread_enqueue(&port->ip_blocked, self);
382 self->ith_state = MACH_SEND_IN_PROGRESS;
383
384 ip_unlock(port);
385 counter(c_ipc_mqueue_send_block++);
386 thread_block((void (*)()) 0);
387 ip_lock(port);
388
389 /* why did we wake up? */
390
391 if (self->ith_state == MACH_MSG_SUCCESS)
392 continue;
393 assert(self->ith_state == MACH_SEND_IN_PROGRESS);
394
395 /* take ourselves off blocked queue */
396
397 ipc_thread_rmqueue(&port->ip_blocked, self);
398
399 /*
400 * Thread wakeup-reason field tells us why
401 * the wait was interrupted.
402 */
403
404 switch (self->ith_wait_result) {
405 case THREAD_INTERRUPTED:
406 /* send was interrupted - give up */
407
408 ip_unlock(port);
409 return MACH_SEND_INTERRUPTED;
410
411 case THREAD_TIMED_OUT:
412 /* timeout expired */
413
414 assert(option & MACH_SEND_TIMEOUT);
415 time_out = 0;
416 break;
417
418 case THREAD_RESTART:
419 default:
420 #if MACH_ASSERT
421 assert(!"ipc_mqueue_send");
422 #else
423 panic("ipc_mqueue_send");
424 #endif
425 }
426 }
427
428 if (kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
429 ip_unlock(port);
430
431 /* don't allow the creation of a circular loop */
432
433 #if NORMA_IPC
434 /* XXX until ipc_kmsg_destroy is fixed... */
435 norma_ipc_finish_receiving(&kmsg);
436 #endif NORMA_IPC
437 ipc_kmsg_destroy(kmsg);
438 return MACH_MSG_SUCCESS;
439 }
440
441 {
442 ipc_mqueue_t mqueue;
443 ipc_pset_t pset;
444 ipc_thread_t receiver;
445 ipc_thread_queue_t receivers;
446
447 port->ip_msgcount++;
448 assert(port->ip_msgcount > 0);
449
450 pset = port->ip_pset;
451 if (pset == IPS_NULL)
452 mqueue = &port->ip_messages;
453 else
454 mqueue = &pset->ips_messages;
455
456 imq_lock(mqueue);
457 receivers = &mqueue->imq_threads;
458
459 /*
460 * Can unlock the port now that the msg queue is locked
461 * and we know the port is active. While the msg queue
462 * is locked, we have control of the kmsg, so the ref in
463 * it for the port is still good. If the msg queue is in
464 * a set (dead or alive), then we're OK because the port
465 * is still a member of the set and the set won't go away
466 * until the port is taken out, which tries to lock the
467 * set's msg queue to remove the port's msgs.
468 */
469
470 ip_unlock(port);
471
472 /* check for a receiver for the message */
473
474 #if NORMA_IPC
475 if (mqueue == norma_ipc_handoff_mqueue) {
476 norma_ipc_handoff_msg = kmsg;
477 if (kmsg->ikm_header.msgh_size <= norma_ipc_handoff_max_size) {
478 imq_unlock(mqueue);
479 return MACH_MSG_SUCCESS;
480 }
481 norma_ipc_handoff_msg_size = kmsg->ikm_header.msgh_size;
482 }
483 #endif NORMA_IPC
484 for (;;) {
485 receiver = ipc_thread_queue_first(receivers);
486 if (receiver == ITH_NULL) {
487 /* no receivers; queue kmsg */
488
489 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
490 imq_unlock(mqueue);
491 break;
492 }
493
494 ipc_thread_rmqueue_first_macro(receivers, receiver);
495 assert(ipc_kmsg_queue_empty(&mqueue->imq_messages));
496
497 if (kmsg->ikm_header.msgh_size <= receiver->ith_msize) {
498 /* got a successful receiver */
499
500 receiver->ith_state = MACH_MSG_SUCCESS;
501 receiver->ith_kmsg = kmsg;
502 receiver->ith_seqno = port->ip_seqno++;
503 imq_unlock(mqueue);
504
505 thread_go(receiver);
506 break;
507 }
508
509 receiver->ith_state = MACH_RCV_TOO_LARGE;
510 receiver->ith_msize = kmsg->ikm_header.msgh_size;
511 thread_go(receiver);
512 }
513 }
514
515 return MACH_MSG_SUCCESS;
516 }
517
518 /*
519 * Routine: ipc_mqueue_copyin
520 * Purpose:
521 * Convert a name in a space to a message queue.
522 * Conditions:
523 * Nothing locked. If successful, the message queue
524 * is returned locked and caller gets a ref for the object.
525 * This ref ensures the continued existence of the queue.
526 * Returns:
527 * MACH_MSG_SUCCESS Found a message queue.
528 * MACH_RCV_INVALID_NAME The space is dead.
529 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
530 * MACH_RCV_INVALID_NAME
531 * The denoted right is not receive or port set.
532 * MACH_RCV_IN_SET Receive right is a member of a set.
533 */
534
535 mach_msg_return_t
536 ipc_mqueue_copyin(space, name, mqueuep, objectp)
537 ipc_space_t space;
538 mach_port_t name;
539 ipc_mqueue_t *mqueuep;
540 ipc_object_t *objectp;
541 {
542 ipc_entry_t entry;
543 ipc_entry_bits_t bits;
544 ipc_object_t object;
545 ipc_mqueue_t mqueue;
546
547 is_read_lock(space);
548 if (!space->is_active) {
549 is_read_unlock(space);
550 return MACH_RCV_INVALID_NAME;
551 }
552
553 entry = ipc_entry_lookup(space, name);
554 if (entry == IE_NULL) {
555 is_read_unlock(space);
556 return MACH_RCV_INVALID_NAME;
557 }
558
559 bits = entry->ie_bits;
560 object = entry->ie_object;
561
562 if (bits & MACH_PORT_TYPE_RECEIVE) {
563 ipc_port_t port;
564 ipc_pset_t pset;
565
566 port = (ipc_port_t) object;
567 assert(port != IP_NULL);
568
569 ip_lock(port);
570 assert(ip_active(port));
571 assert(port->ip_receiver_name == name);
572 assert(port->ip_receiver == space);
573 is_read_unlock(space);
574
575 pset = port->ip_pset;
576 if (pset != IPS_NULL) {
577 ips_lock(pset);
578 if (ips_active(pset)) {
579 ips_unlock(pset);
580 ip_unlock(port);
581 return MACH_RCV_IN_SET;
582 }
583
584 ipc_pset_remove(pset, port);
585 ips_check_unlock(pset);
586 assert(port->ip_pset == IPS_NULL);
587 }
588
589 mqueue = &port->ip_messages;
590 } else if (bits & MACH_PORT_TYPE_PORT_SET) {
591 ipc_pset_t pset;
592
593 pset = (ipc_pset_t) object;
594 assert(pset != IPS_NULL);
595
596 ips_lock(pset);
597 assert(ips_active(pset));
598 assert(pset->ips_local_name == name);
599 is_read_unlock(space);
600
601 mqueue = &pset->ips_messages;
602 } else {
603 is_read_unlock(space);
604 return MACH_RCV_INVALID_NAME;
605 }
606
607 /*
608 * At this point, the object is locked and active,
609 * the space is unlocked, and mqueue is initialized.
610 */
611
612 io_reference(object);
613 imq_lock(mqueue);
614 io_unlock(object);
615
616 *objectp = object;
617 *mqueuep = mqueue;
618 return MACH_MSG_SUCCESS;
619 }
620
621 /*
622 * Routine: ipc_mqueue_receive
623 * Purpose:
624 * Receive a message from a message queue.
625 *
626 * If continuation is non-zero, then we might discard
627 * our kernel stack when we block. We will continue
628 * after unblocking by executing continuation.
629 *
630 * If resume is true, then we are resuming a receive
631 * operation after a blocked receive discarded our stack.
632 * Conditions:
633 * The message queue is locked; it is unlocked.
634 *
635 * Our caller must hold a reference for the port or port set
636 * to which this queue belongs, to keep the queue
637 * from being deallocated. Furthermore, the port or set
638 * must have been active when the queue was locked.
639 *
640 * The kmsg is returned with clean header fields
641 * and with the circular bit turned off.
642 * Returns:
643 * MACH_MSG_SUCCESS Message returned in kmsgp.
644 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
645 * MACH_RCV_TIMED_OUT No message obtained.
646 * MACH_RCV_INTERRUPTED No message obtained.
647 * MACH_RCV_PORT_DIED Port/set died; no message.
648 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
649 */
650
651 mach_msg_return_t
652 ipc_mqueue_receive(mqueue, option, max_size, time_out,
653 resume, continuation, kmsgp, seqnop)
654 ipc_mqueue_t mqueue;
655 mach_msg_option_t option;
656 mach_msg_size_t max_size;
657 mach_msg_timeout_t time_out;
658 boolean_t resume;
659 void (*continuation)();
660 ipc_kmsg_t *kmsgp;
661 mach_port_seqno_t *seqnop;
662 {
663 ipc_port_t port;
664 ipc_kmsg_t kmsg;
665 mach_port_seqno_t seqno;
666
667 {
668 ipc_kmsg_queue_t kmsgs = &mqueue->imq_messages;
669 ipc_thread_t self = current_thread();
670
671 if (resume)
672 goto after_thread_block;
673
674 for (;;) {
675 kmsg = ipc_kmsg_queue_first(kmsgs);
676 #if NORMA_IPC
677 /*
678 * It may be possible to make this work even when a timeout
679 * is specified.
680 *
681 * Netipc_replenish should be moved somewhere else.
682 */
683 if (kmsg == IKM_NULL && ! (option & MACH_RCV_TIMEOUT)) {
684 netipc_replenish(FALSE);
685 *kmsgp = IKM_NULL;
686 kmsg = norma_ipc_kmsg_accept(mqueue, max_size,
687 (mach_msg_size_t *)kmsgp);
688 if (kmsg != IKM_NULL) {
689 port = (ipc_port_t)
690 kmsg->ikm_header.msgh_remote_port;
691 seqno = port->ip_seqno++;
692 break;
693 }
694 if (*kmsgp) {
695 imq_unlock(mqueue);
696 return MACH_RCV_TOO_LARGE;
697 }
698 }
699 #endif NORMA_IPC
700 if (kmsg != IKM_NULL) {
701 /* check space requirements */
702
703 if (kmsg->ikm_header.msgh_size > max_size) {
704 * (mach_msg_size_t *) kmsgp =
705 kmsg->ikm_header.msgh_size;
706 imq_unlock(mqueue);
707 return MACH_RCV_TOO_LARGE;
708 }
709
710 ipc_kmsg_rmqueue_first_macro(kmsgs, kmsg);
711 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
712 seqno = port->ip_seqno++;
713 break;
714 }
715
716 /* must block waiting for a message */
717
718 if (option & MACH_RCV_TIMEOUT) {
719 if (time_out == 0) {
720 imq_unlock(mqueue);
721 return MACH_RCV_TIMED_OUT;
722 }
723
724 thread_will_wait_with_timeout(self, time_out);
725 } else
726 thread_will_wait(self);
727
728 ipc_thread_enqueue_macro(&mqueue->imq_threads, self);
729 self->ith_state = MACH_RCV_IN_PROGRESS;
730 self->ith_msize = max_size;
731
732 imq_unlock(mqueue);
733 if (continuation != (void (*)()) 0) {
734 counter(c_ipc_mqueue_receive_block_user++);
735 thread_block(continuation);
736 } else {
737 counter(c_ipc_mqueue_receive_block_kernel++);
738 thread_block((void (*)()) 0);
739 }
740 after_thread_block:
741 imq_lock(mqueue);
742
743 /* why did we wake up? */
744
745 if (self->ith_state == MACH_MSG_SUCCESS) {
746 /* pick up the message that was handed to us */
747
748 kmsg = self->ith_kmsg;
749 seqno = self->ith_seqno;
750 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
751 break;
752 }
753
754 switch (self->ith_state) {
755 case MACH_RCV_TOO_LARGE:
756 /* pick up size of the too-large message */
757
758 * (mach_msg_size_t *) kmsgp = self->ith_msize;
759 /* fall-through */
760
761 case MACH_RCV_PORT_DIED:
762 case MACH_RCV_PORT_CHANGED:
763 /* something bad happened to the port/set */
764
765 imq_unlock(mqueue);
766 return self->ith_state;
767
768 case MACH_RCV_IN_PROGRESS:
769 /*
770 * Awakened for other than IPC completion.
771 * Remove ourselves from the waiting queue,
772 * then check the wakeup cause.
773 */
774
775 ipc_thread_rmqueue(&mqueue->imq_threads, self);
776
777 switch (self->ith_wait_result) {
778 case THREAD_INTERRUPTED:
779 /* receive was interrupted - give up */
780
781 imq_unlock(mqueue);
782 return MACH_RCV_INTERRUPTED;
783
784 case THREAD_TIMED_OUT:
785 /* timeout expired */
786
787 assert(option & MACH_RCV_TIMEOUT);
788 time_out = 0;
789 break;
790
791 case THREAD_RESTART:
792 default:
793 #if MACH_ASSERT
794 assert(!"ipc_mqueue_receive");
795 #else
796 panic("ipc_mqueue_receive");
797 #endif
798 }
799 break;
800
801 default:
802 #if MACH_ASSERT
803 assert(!"ipc_mqueue_receive: strange ith_state");
804 #else
805 panic("ipc_mqueue_receive: strange ith_state");
806 #endif
807 }
808 }
809
810 /* we have a kmsg; unlock the msg queue */
811
812 imq_unlock(mqueue);
813 assert(kmsg->ikm_header.msgh_size <= max_size);
814 }
815
816 {
817 ipc_marequest_t marequest;
818
819 marequest = kmsg->ikm_marequest;
820 if (marequest != IMAR_NULL) {
821 ipc_marequest_destroy(marequest);
822 kmsg->ikm_marequest = IMAR_NULL;
823 }
824 assert((kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) == 0);
825
826 assert(port == (ipc_port_t) kmsg->ikm_header.msgh_remote_port);
827 ip_lock(port);
828
829 if (ip_active(port)) {
830 ipc_thread_queue_t senders;
831 ipc_thread_t sender;
832
833 assert(port->ip_msgcount > 0);
834 port->ip_msgcount--;
835
836 senders = &port->ip_blocked;
837 sender = ipc_thread_queue_first(senders);
838
839 if ((sender != ITH_NULL) &&
840 (port->ip_msgcount < port->ip_qlimit)) {
841 ipc_thread_rmqueue(senders, sender);
842 sender->ith_state = MACH_MSG_SUCCESS;
843 thread_go(sender);
844 }
845 }
846
847 ip_unlock(port);
848 }
849
850 #if NORMA_IPC
851 norma_ipc_finish_receiving(&kmsg);
852 #endif NORMA_IPC
853 *kmsgp = kmsg;
854 *seqnop = seqno;
855 return MACH_MSG_SUCCESS;
856 }
Cache object: 5bbd2449f959de96594beb9ee6eab0de
|