FreeBSD/Linux Kernel Cross Reference
sys/ipc/ipc_mqueue.c
1 /*
2 * Mach Operating System
3 * Copyright (c) 1993,1992,1991,1990,1989 Carnegie Mellon University
4 * All Rights Reserved.
5 *
6 * Permission to use, copy, modify and distribute this software and its
7 * documentation is hereby granted, provided that both the copyright
8 * notice and this permission notice appear in all copies of the
9 * software, derivative works or modified versions, and any portions
10 * thereof, and that both notices appear in supporting documentation.
11 *
12 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
13 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
14 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
15 *
16 * Carnegie Mellon requests users of this software to return to
17 *
18 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
19 * School of Computer Science
20 * Carnegie Mellon University
21 * Pittsburgh PA 15213-3890
22 *
23 * any improvements or extensions that they make and grant Carnegie Mellon
24 * the rights to redistribute these changes.
25 */
26 /*
27 * HISTORY
28 * $Log: ipc_mqueue.c,v $
29 * Revision 2.22 93/11/17 16:58:43 dbg
30 * Fixed type of null continuation argument to thread_block.
31 * [93/06/03 dbg]
32 *
33 * Use proper type for continuations.
34 * [93/05/04 dbg]
35 *
36 * Added ANSI function prototypes. Cleaned up return type of
37 * ipc_kmsg result parameter from ipc_mqueue_receive by using union
38 * type defined in ipc/ipc_types.h.
39 * [93/03/29 dbg]
40 *
41 * Include kern/sched_prim.h instead of kern/ipc_sched.h.
42 * [93/01/26 dbg]
43 *
44 * Revision 2.21 92/08/03 17:34:47 jfriedl
45 * removed silly prototypes
46 * [92/08/02 jfriedl]
47 *
48 * Revision 2.20 92/05/21 17:10:42 jfriedl
49 * Made correct for when assert is off.
50 * Also some cleanup to quiet gcc warnings.
51 * [92/05/16 jfriedl]
52 *
53 * Revision 2.19 92/03/10 16:26:01 jsb
54 * Merged in norma branch changes as of NORMA_MK7:
55 * Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
56 * before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
57 * [92/01/17 14:35:03 jsb]
58 * Removed spurious arguments to norma_ipc_send.
59 * Options and timeout will be handled here, not by norma_ipc_send.
60 * [91/12/26 19:51:59 jsb]
61 *
62 * Revision 2.18 92/01/03 20:13:05 dbg
63 * Removed THREAD_SHOULD_TERMINATE.
64 * [91/12/19 dbg]
65 *
66 * Revision 2.17 91/12/15 10:40:33 jsb
67 * Added norma_ipc_finish_receiving call to support large in-line msgs.
68 *
69 * Revision 2.16 91/12/14 14:27:10 jsb
70 * Removed ipc_fields.h hack.
71 *
72 * Revision 2.15 91/11/14 16:56:07 rpd
73 * Picked up mysterious norma changes.
74 * [91/11/14 rpd]
75 *
76 * Revision 2.14 91/08/28 11:13:34 jsb
77 * Added seqno argument to ipc_mqueue_receive.
78 * Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
79 * [91/08/10 rpd]
80 * Fixed norma_ipc_handoff technology.
81 * Renamed clport things to norma_ipc things.
82 * [91/08/15 08:23:17 jsb]
83 *
84 * Revision 2.13 91/08/03 18:18:27 jsb
85 * Renamed replenish routine.
86 * [91/08/01 23:00:06 jsb]
87 *
88 * Removed obsolete include.
89 * Added option, timeout parameters to ipc_clport_send.
90 * [91/07/17 14:04:15 jsb]
91 *
92 * Revision 2.12 91/06/25 10:27:34 rpd
93 * Added some wait_result assertions.
94 * [91/05/30 rpd]
95 *
96 * Revision 2.11 91/06/17 15:46:18 jsb
97 * Renamed NORMA conditionals.
98 * [91/06/17 10:44:39 jsb]
99 *
100 * Revision 2.10 91/06/06 17:06:06 jsb
101 * Added call to ip_unlock after calling ipc_clport_send.
102 * Added support for clport handoff.
103 * [91/06/06 16:05:12 jsb]
104 *
105 * Revision 2.9 91/05/14 16:33:58 mrt
106 * Correcting copyright
107 *
108 * Revision 2.8 91/03/16 14:48:18 rpd
109 * Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
110 * to thread_{go,will_wait,will_wait_with_timeout}.
111 * Replaced ipc_thread_block with thread_block.
112 * [91/02/17 rpd]
113 *
114 * Revision 2.7 91/02/05 17:22:24 mrt
115 * Changed to new Mach copyright
116 * [91/02/01 15:46:33 mrt]
117 *
118 * Revision 2.6 91/01/08 15:14:35 rpd
119 * Changed continuation argument to (void (*)()).
120 * [90/12/18 rpd]
121 * Reorganized ipc_mqueue_receive.
122 * [90/11/22 rpd]
123 *
124 * Minor cleanup.
125 * [90/11/11 rpd]
126 *
127 * Revision 2.5 90/12/14 11:02:32 jsb
128 * Changed parameters in ipc_clport_send call.
129 * [90/12/13 21:20:13 jsb]
130 *
131 * Revision 2.4 90/11/05 14:29:04 rpd
132 * Use new io_reference and io_release.
133 * Use new ip_reference and ip_release.
134 * [90/10/29 rpd]
135 *
136 * Revision 2.3 90/09/28 16:54:58 jsb
137 * Added NORMA_IPC support.
138 * [90/09/28 14:03:24 jsb]
139 *
140 * Revision 2.2 90/06/02 14:50:39 rpd
141 * Created for new IPC.
142 * [90/03/26 20:57:06 rpd]
143 *
144 */
145 /*
146 * File: ipc/ipc_mqueue.c
147 * Author: Rich Draves
148 * Date: 1989
149 *
150 * Functions to manipulate IPC message queues.
151 */
152
153 #include <norma_ipc.h>
154
155 #include <mach/port.h>
156 #include <mach/message.h>
157 #include <kern/assert.h>
158 #include <kern/counters.h>
159 #include <kern/sched_prim.h>
160 #include <kern/ipc_kobject.h>
161 #include <ipc/ipc_mqueue.h>
162 #include <ipc/ipc_thread.h>
163 #include <ipc/ipc_kmsg.h>
164 #include <ipc/ipc_port.h>
165 #include <ipc/ipc_pset.h>
166 #include <ipc/ipc_space.h>
167 #include <ipc/ipc_marequest.h>
168
169
170
171 #if NORMA_IPC
172 extern ipc_mqueue_t norma_ipc_handoff_mqueue;
173 extern ipc_kmsg_t norma_ipc_handoff_msg;
174 extern mach_msg_size_t norma_ipc_handoff_max_size;
175 extern mach_msg_size_t norma_ipc_handoff_msg_size;
176 extern ipc_kmsg_t norma_ipc_kmsg_accept();
177 #endif /* NORMA_IPC */
178
179 /*
180 * Routine: ipc_mqueue_init
181 * Purpose:
182 * Initialize a newly-allocated message queue.
183 */
184
185 void
186 ipc_mqueue_init(
187 ipc_mqueue_t mqueue)
188 {
189 imq_lock_init(mqueue);
190 ipc_kmsg_queue_init(&mqueue->imq_messages);
191 ipc_thread_queue_init(&mqueue->imq_threads);
192 }
193
194 /*
195 * Routine: ipc_mqueue_move
196 * Purpose:
197 * Move messages from one queue (source) to another (dest).
198 * Only moves messages sent to the specified port.
199 * Conditions:
200 * Both queues must be locked.
201 * (This is sufficient to manipulate port->ip_seqno.)
202 */
203
204 void
205 ipc_mqueue_move(
206 ipc_mqueue_t dest,
207 ipc_mqueue_t source,
208 ipc_port_t port)
209 {
210 ipc_kmsg_queue_t oldq, newq;
211 ipc_thread_queue_t blockedq;
212 ipc_kmsg_t kmsg, next;
213 ipc_thread_t th;
214
215 oldq = &source->imq_messages;
216 newq = &dest->imq_messages;
217 blockedq = &dest->imq_threads;
218
219 for (kmsg = ipc_kmsg_queue_first(oldq);
220 kmsg != IKM_NULL; kmsg = next) {
221 next = ipc_kmsg_queue_next(oldq, kmsg);
222
223 /* only move messages sent to port */
224
225 if (kmsg->ikm_header.msgh_remote_port != (mach_port_t) port)
226 continue;
227
228 ipc_kmsg_rmqueue(oldq, kmsg);
229
230 /* before adding kmsg to newq, check for a blocked receiver */
231
232 while ((th = ipc_thread_dequeue(blockedq)) != ITH_NULL) {
233 assert(ipc_kmsg_queue_empty(newq));
234
235 thread_go(th);
236
237 /* check if the receiver can handle the message */
238
239 if (kmsg->ikm_header.msgh_size <= th->ith_msize) {
240 th->ith_state = MACH_MSG_SUCCESS;
241 th->ith_kmsg = kmsg;
242 th->ith_seqno = port->ip_seqno++;
243
244 goto next_kmsg;
245 }
246
247 th->ith_state = MACH_RCV_TOO_LARGE;
248 th->ith_msize = kmsg->ikm_header.msgh_size;
249 }
250
251 /* didn't find a receiver to handle the message */
252
253 ipc_kmsg_enqueue(newq, kmsg);
254 next_kmsg:;
255 }
256 }
257
258 /*
259 * Routine: ipc_mqueue_changed
260 * Purpose:
261 * Wake up receivers waiting in a message queue.
262 * Conditions:
263 * The message queue is locked.
264 */
265
266 void
267 ipc_mqueue_changed(
268 ipc_mqueue_t mqueue,
269 mach_msg_return_t mr)
270 {
271 ipc_thread_t th;
272
273 while ((th = ipc_thread_dequeue(&mqueue->imq_threads)) != ITH_NULL) {
274 th->ith_state = mr;
275 thread_go(th);
276 }
277 }
278
279 /*
280 * Routine: ipc_mqueue_send
281 * Purpose:
282 * Send a message to a port. The message holds a reference
283 * for the destination port in the msgh_remote_port field.
284 *
285 * If unsuccessful, the caller still has possession of
286 * the message and must do something with it. If successful,
287 * the message is queued, given to a receiver, destroyed,
288 * or handled directly by the kernel via mach_msg.
289 * Conditions:
290 * Nothing locked.
291 * Returns:
292 * MACH_MSG_SUCCESS The message was accepted.
293 * MACH_SEND_TIMED_OUT Caller still has message.
294 * MACH_SEND_INTERRUPTED Caller still has message.
295 */
296
297 mach_msg_return_t
298 ipc_mqueue_send(
299 ipc_kmsg_t kmsg,
300 mach_msg_option_t option,
301 mach_msg_timeout_t time_out)
302 {
303 ipc_port_t port;
304
305 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
306 assert(IP_VALID(port));
307
308 ip_lock(port);
309
310 if (port->ip_receiver == ipc_space_kernel) {
311 ipc_kmsg_t reply;
312
313 /*
314 * We can check ip_receiver == ipc_space_kernel
315 * before checking that the port is active because
316 * ipc_port_dealloc_kernel clears ip_receiver
317 * before destroying a kernel port.
318 */
319
320 assert(ip_active(port));
321 ip_unlock(port);
322
323 reply = ipc_kobject_server(kmsg);
324 if (reply != IKM_NULL)
325 ipc_mqueue_send_always(reply);
326
327 return MACH_MSG_SUCCESS;
328 }
329
330 #if NORMA_IPC
331 if (IP_NORMA_IS_PROXY(port)) {
332 mach_msg_return_t mr;
333
334 mr = norma_ipc_send(kmsg);
335 ip_unlock(port);
336 return mr;
337 }
338 #endif /* NORMA_IPC */
339
340 for (;;) {
341 ipc_thread_t self;
342
343 /*
344 * Can't deliver to a dead port.
345 * However, we can pretend it got sent
346 * and was then immediately destroyed.
347 */
348
349 if (!ip_active(port)) {
350 /*
351 * We can't let ipc_kmsg_destroy deallocate
352 * the port right, because we might end up
353 * in an infinite loop trying to deliver
354 * a send-once notification.
355 */
356
357 ip_release(port);
358 ip_check_unlock(port);
359 kmsg->ikm_header.msgh_remote_port = MACH_PORT_NULL;
360 #if NORMA_IPC
361 /* XXX until ipc_kmsg_destroy is fixed... */
362 norma_ipc_finish_receiving(&kmsg);
363 #endif /* NORMA_IPC */
364 ipc_kmsg_destroy(kmsg);
365 return MACH_MSG_SUCCESS;
366 }
367
368 /*
369 * Don't block if:
370 * 1) We're under the queue limit.
371 * 2) Caller used the MACH_SEND_ALWAYS internal option.
372 * 3) Message is sent to a send-once right.
373 */
374
375 if ((port->ip_msgcount < port->ip_qlimit) ||
376 (option & MACH_SEND_ALWAYS) ||
377 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
378 MACH_MSG_TYPE_PORT_SEND_ONCE))
379 break;
380
381 /* must block waiting for queue to clear */
382
383 self = current_thread();
384
385 if (option & MACH_SEND_TIMEOUT) {
386 if (time_out == 0) {
387 ip_unlock(port);
388 return MACH_SEND_TIMED_OUT;
389 }
390
391 thread_will_wait_with_timeout(self, time_out);
392 } else
393 thread_will_wait(self);
394
395 ipc_thread_enqueue(&port->ip_blocked, self);
396 self->ith_state = MACH_SEND_IN_PROGRESS;
397
398 ip_unlock(port);
399 counter(c_ipc_mqueue_send_block++);
400 thread_block(CONTINUE_NULL);
401 ip_lock(port);
402
403 /* why did we wake up? */
404
405 if (self->ith_state == MACH_MSG_SUCCESS)
406 continue;
407 assert(self->ith_state == MACH_SEND_IN_PROGRESS);
408
409 /* take ourselves off blocked queue */
410
411 ipc_thread_rmqueue(&port->ip_blocked, self);
412
413 /*
414 * Thread wakeup-reason field tells us why
415 * the wait was interrupted.
416 */
417
418 switch (self->ith_wait_result) {
419 case THREAD_INTERRUPTED:
420 /* send was interrupted - give up */
421
422 ip_unlock(port);
423 return MACH_SEND_INTERRUPTED;
424
425 case THREAD_TIMED_OUT:
426 /* timeout expired */
427
428 assert(option & MACH_SEND_TIMEOUT);
429 time_out = 0;
430 break;
431
432 case THREAD_RESTART:
433 default:
434 #if MACH_ASSERT
435 assert(!"ipc_mqueue_send");
436 #else
437 panic("ipc_mqueue_send");
438 #endif
439 }
440 }
441
442 if (kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
443 ip_unlock(port);
444
445 /* don't allow the creation of a circular loop */
446
447 #if NORMA_IPC
448 /* XXX until ipc_kmsg_destroy is fixed... */
449 norma_ipc_finish_receiving(&kmsg);
450 #endif /* NORMA_IPC */
451 ipc_kmsg_destroy(kmsg);
452 return MACH_MSG_SUCCESS;
453 }
454
455 {
456 ipc_mqueue_t mqueue;
457 ipc_pset_t pset;
458 ipc_thread_t receiver;
459 ipc_thread_queue_t receivers;
460
461 port->ip_msgcount++;
462 assert(port->ip_msgcount > 0);
463
464 pset = port->ip_pset;
465 if (pset == IPS_NULL)
466 mqueue = &port->ip_messages;
467 else
468 mqueue = &pset->ips_messages;
469
470 imq_lock(mqueue);
471 receivers = &mqueue->imq_threads;
472
473 /*
474 * Can unlock the port now that the msg queue is locked
475 * and we know the port is active. While the msg queue
476 * is locked, we have control of the kmsg, so the ref in
477 * it for the port is still good. If the msg queue is in
478 * a set (dead or alive), then we're OK because the port
479 * is still a member of the set and the set won't go away
480 * until the port is taken out, which tries to lock the
481 * set's msg queue to remove the port's msgs.
482 */
483
484 ip_unlock(port);
485
486 /* check for a receiver for the message */
487
488 #if NORMA_IPC
489 if (mqueue == norma_ipc_handoff_mqueue) {
490 norma_ipc_handoff_msg = kmsg;
491 if (kmsg->ikm_header.msgh_size <= norma_ipc_handoff_max_size) {
492 imq_unlock(mqueue);
493 return MACH_MSG_SUCCESS;
494 }
495 norma_ipc_handoff_msg_size = kmsg->ikm_header.msgh_size;
496 }
497 #endif /* NORMA_IPC */
498
499 for (;;) {
500 receiver = ipc_thread_queue_first(receivers);
501 if (receiver == ITH_NULL) {
502 /* no receivers; queue kmsg */
503
504 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
505 imq_unlock(mqueue);
506 break;
507 }
508
509 ipc_thread_rmqueue_first_macro(receivers, receiver);
510 assert(ipc_kmsg_queue_empty(&mqueue->imq_messages));
511
512 if (kmsg->ikm_header.msgh_size <= receiver->ith_msize) {
513 /* got a successful receiver */
514
515 receiver->ith_state = MACH_MSG_SUCCESS;
516 receiver->ith_kmsg = kmsg;
517 receiver->ith_seqno = port->ip_seqno++;
518 imq_unlock(mqueue);
519
520 thread_go(receiver);
521 break;
522 }
523
524 receiver->ith_state = MACH_RCV_TOO_LARGE;
525 receiver->ith_msize = kmsg->ikm_header.msgh_size;
526 thread_go(receiver);
527 }
528 }
529
530 return MACH_MSG_SUCCESS;
531 }
532
533 /*
534 * Routine: ipc_mqueue_copyin
535 * Purpose:
536 * Convert a name in a space to a message queue.
537 * Conditions:
538 * Nothing locked. If successful, the message queue
539 * is returned locked and caller gets a ref for the object.
540 * This ref ensures the continued existence of the queue.
541 * Returns:
542 * MACH_MSG_SUCCESS Found a message queue.
543 * MACH_RCV_INVALID_NAME The space is dead.
544 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
545 * MACH_RCV_INVALID_NAME
546 * The denoted right is not receive or port set.
547 * MACH_RCV_IN_SET Receive right is a member of a set.
548 */
549
550 mach_msg_return_t
551 ipc_mqueue_copyin(
552 ipc_space_t space,
553 mach_port_t name,
554 ipc_mqueue_t *mqueuep,
555 ipc_object_t *objectp)
556 {
557 ipc_entry_t entry;
558 ipc_entry_bits_t bits;
559 ipc_object_t object;
560 ipc_mqueue_t mqueue;
561
562 is_read_lock(space);
563 if (!space->is_active) {
564 is_read_unlock(space);
565 return MACH_RCV_INVALID_NAME;
566 }
567
568 entry = ipc_entry_lookup(space, name);
569 if (entry == IE_NULL) {
570 is_read_unlock(space);
571 return MACH_RCV_INVALID_NAME;
572 }
573
574 bits = entry->ie_bits;
575 object = entry->ie_object;
576
577 if (bits & MACH_PORT_TYPE_RECEIVE) {
578 ipc_port_t port;
579 ipc_pset_t pset;
580
581 port = (ipc_port_t) object;
582 assert(port != IP_NULL);
583
584 ip_lock(port);
585 assert(ip_active(port));
586 assert(port->ip_receiver_name == name);
587 assert(port->ip_receiver == space);
588 is_read_unlock(space);
589
590 pset = port->ip_pset;
591 if (pset != IPS_NULL) {
592 ips_lock(pset);
593 if (ips_active(pset)) {
594 ips_unlock(pset);
595 ip_unlock(port);
596 return MACH_RCV_IN_SET;
597 }
598
599 ipc_pset_remove(pset, port);
600 ips_check_unlock(pset);
601 assert(port->ip_pset == IPS_NULL);
602 }
603
604 mqueue = &port->ip_messages;
605 } else if (bits & MACH_PORT_TYPE_PORT_SET) {
606 ipc_pset_t pset;
607
608 pset = (ipc_pset_t) object;
609 assert(pset != IPS_NULL);
610
611 ips_lock(pset);
612 assert(ips_active(pset));
613 assert(pset->ips_local_name == name);
614 is_read_unlock(space);
615
616 mqueue = &pset->ips_messages;
617 } else {
618 is_read_unlock(space);
619 return MACH_RCV_INVALID_NAME;
620 }
621
622 /*
623 * At this point, the object is locked and active,
624 * the space is unlocked, and mqueue is initialized.
625 */
626
627 io_reference(object);
628 imq_lock(mqueue);
629 io_unlock(object);
630
631 *objectp = object;
632 *mqueuep = mqueue;
633 return MACH_MSG_SUCCESS;
634 }
635
636 /*
637 * Routine: ipc_mqueue_receive
638 * Purpose:
639 * Receive a message from a message queue.
640 *
641 * If continuation is non-zero, then we might discard
642 * our kernel stack when we block. We will continue
643 * after unblocking by executing continuation.
644 *
645 * If resume is true, then we are resuming a receive
646 * operation after a blocked receive discarded our stack.
647 * Conditions:
648 * The message queue is locked; it is unlocked.
649 *
650 * Our caller must hold a reference for the port or port set
651 * to which this queue belongs, to keep the queue
652 * from being deallocated. Furthermore, the port or set
653 * must have been active when the queue was locked.
654 *
655 * The kmsg is returned with clean header fields
656 * and with the circular bit turned off.
657 * Returns:
658 * MACH_MSG_SUCCESS Message returned in kmsg_retp.
659 * MACH_RCV_TOO_LARGE Message size returned in kmsg_retp.
660 * MACH_RCV_TIMED_OUT No message obtained.
661 * MACH_RCV_INTERRUPTED No message obtained.
662 * MACH_RCV_PORT_DIED Port/set died; no message.
663 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
664 */
665
666 mach_msg_return_t
667 ipc_mqueue_receive(
668 ipc_mqueue_t mqueue,
669 mach_msg_option_t option,
670 mach_msg_size_t max_size,
671 mach_msg_timeout_t time_out,
672 boolean_t resume,
673 continuation_t continuation,
674 union ipc_kmsg_return *kmsg_retp,
675 mach_port_seqno_t *seqnop)
676 {
677 ipc_port_t port;
678 ipc_kmsg_t kmsg;
679 mach_port_seqno_t seqno;
680
681 {
682 ipc_kmsg_queue_t kmsgs = &mqueue->imq_messages;
683 ipc_thread_t self = current_thread();
684
685 if (resume)
686 goto after_thread_block;
687
688 for (;;) {
689 kmsg = ipc_kmsg_queue_first(kmsgs);
690 #if NORMA_IPC
691 /*
692 * It may be possible to make this work even when a timeout
693 * is specified.
694 *
695 * Netipc_replenish should be moved somewhere else.
696 */
697 if (kmsg == IKM_NULL && ! (option & MACH_RCV_TIMEOUT)) {
698 netipc_replenish(FALSE);
699 kmsg_retp->kmsg = IKM_NULL;
700 kmsg = norma_ipc_kmsg_accept(mqueue, max_size,
701 &kmsg_retp->msize);
702 if (kmsg != IKM_NULL) {
703 port = (ipc_port_t)
704 kmsg->ikm_header.msgh_remote_port;
705 seqno = port->ip_seqno++;
706 break;
707 }
708 if (kmsg_retp->msize) {
709 imq_unlock(mqueue);
710 return MACH_RCV_TOO_LARGE;
711 }
712 }
713 #endif /* NORMA_IPC */
714 if (kmsg != IKM_NULL) {
715 /* check space requirements */
716
717 if (kmsg->ikm_header.msgh_size > max_size) {
718 kmsg_retp->msize = kmsg->ikm_header.msgh_size;
719 imq_unlock(mqueue);
720 return MACH_RCV_TOO_LARGE;
721 }
722
723 ipc_kmsg_rmqueue_first_macro(kmsgs, kmsg);
724 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
725 seqno = port->ip_seqno++;
726 break;
727 }
728
729 /* must block waiting for a message */
730
731 if (option & MACH_RCV_TIMEOUT) {
732 if (time_out == 0) {
733 imq_unlock(mqueue);
734 return MACH_RCV_TIMED_OUT;
735 }
736
737 thread_will_wait_with_timeout(self, time_out);
738 } else
739 thread_will_wait(self);
740
741 ipc_thread_enqueue_macro(&mqueue->imq_threads, self);
742 self->ith_state = MACH_RCV_IN_PROGRESS;
743 self->ith_msize = max_size;
744
745 imq_unlock(mqueue);
746 if (continuation != CONTINUE_NULL) {
747 counter(c_ipc_mqueue_receive_block_user++);
748 thread_block(continuation);
749 } else {
750 counter(c_ipc_mqueue_receive_block_kernel++);
751 thread_block(CONTINUE_NULL);
752 }
753 after_thread_block:
754 imq_lock(mqueue);
755
756 /* why did we wake up? */
757
758 if (self->ith_state == MACH_MSG_SUCCESS) {
759 /* pick up the message that was handed to us */
760
761 kmsg = self->ith_kmsg;
762 seqno = self->ith_seqno;
763 port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
764 break;
765 }
766
767 switch (self->ith_state) {
768 case MACH_RCV_TOO_LARGE:
769 /* pick up size of the too-large message */
770
771 kmsg_retp->msize = self->ith_msize;
772 /* fall-through */
773
774 case MACH_RCV_PORT_DIED:
775 case MACH_RCV_PORT_CHANGED:
776 /* something bad happened to the port/set */
777
778 imq_unlock(mqueue);
779 return self->ith_state;
780
781 case MACH_RCV_IN_PROGRESS:
782 /*
783 * Awakened for other than IPC completion.
784 * Remove ourselves from the waiting queue,
785 * then check the wakeup cause.
786 */
787
788 ipc_thread_rmqueue(&mqueue->imq_threads, self);
789
790 switch (self->ith_wait_result) {
791 case THREAD_INTERRUPTED:
792 /* receive was interrupted - give up */
793
794 imq_unlock(mqueue);
795 return MACH_RCV_INTERRUPTED;
796
797 case THREAD_TIMED_OUT:
798 /* timeout expired */
799
800 assert(option & MACH_RCV_TIMEOUT);
801 time_out = 0;
802 break;
803
804 case THREAD_RESTART:
805 default:
806 #if MACH_ASSERT
807 assert(!"ipc_mqueue_receive");
808 #else
809 panic("ipc_mqueue_receive");
810 #endif
811 }
812 break;
813
814 default:
815 #if MACH_ASSERT
816 assert(!"ipc_mqueue_receive: strange ith_state");
817 #else
818 panic("ipc_mqueue_receive: strange ith_state");
819 #endif
820 }
821 }
822
823 /* we have a kmsg; unlock the msg queue */
824
825 imq_unlock(mqueue);
826 assert(kmsg->ikm_header.msgh_size <= max_size);
827 }
828
829 {
830 ipc_marequest_t marequest;
831
832 marequest = kmsg->ikm_marequest;
833 if (marequest != IMAR_NULL) {
834 ipc_marequest_destroy(marequest);
835 kmsg->ikm_marequest = IMAR_NULL;
836 }
837 assert((kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) == 0);
838
839 assert(port == (ipc_port_t) kmsg->ikm_header.msgh_remote_port);
840 ip_lock(port);
841
842 if (ip_active(port)) {
843 ipc_thread_queue_t senders;
844 ipc_thread_t sender;
845
846 assert(port->ip_msgcount > 0);
847 port->ip_msgcount--;
848
849 senders = &port->ip_blocked;
850 sender = ipc_thread_queue_first(senders);
851
852 if ((sender != ITH_NULL) &&
853 (port->ip_msgcount < port->ip_qlimit)) {
854 ipc_thread_rmqueue(senders, sender);
855 sender->ith_state = MACH_MSG_SUCCESS;
856 thread_go(sender);
857 }
858 }
859
860 ip_unlock(port);
861 }
862
863 #if NORMA_IPC
864 norma_ipc_finish_receiving(&kmsg);
865 #endif /* NORMA_IPC */
866 kmsg_retp->kmsg = kmsg;
867 *seqnop = seqno;
868 return MACH_MSG_SUCCESS;
869 }
Cache object: df726e4573fd9c683eca4e0bf4683afa
|