The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/ipc/ipc_mqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /* 
    2  * Mach Operating System
    3  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
    4  * All Rights Reserved.
    5  * 
    6  * Permission to use, copy, modify and distribute this software and its
    7  * documentation is hereby granted, provided that both the copyright
    8  * notice and this permission notice appear in all copies of the
    9  * software, derivative works or modified versions, and any portions
   10  * thereof, and that both notices appear in supporting documentation.
   11  * 
   12  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
   13  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
   14  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
   15  * 
   16  * Carnegie Mellon requests users of this software to return to
   17  * 
   18  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
   19  *  School of Computer Science
   20  *  Carnegie Mellon University
   21  *  Pittsburgh PA 15213-3890
   22  * 
   23  * any improvements or extensions that they make and grant Carnegie Mellon
   24  * the rights to redistribute these changes.
   25  */
   26 /*
   27  * HISTORY
   28  * $Log:        ipc_mqueue.c,v $
   29  * Revision 2.21  92/08/03  17:34:47  jfriedl
   30  *      removed silly prototypes
   31  *      [92/08/02            jfriedl]
   32  * 
   33  * Revision 2.20  92/05/21  17:10:42  jfriedl
   34  *      Made correct for when assert is off.
   35  *      Also some cleanup to quiet gcc warnings.
   36  *      [92/05/16            jfriedl]
   37  * 
   38  * Revision 2.19  92/03/10  16:26:01  jsb
   39  *      Merged in norma branch changes as of NORMA_MK7:
   40  *      Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
   41  *      before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
   42  *      [92/01/17  14:35:03  jsb]
   43  *      Removed spurious arguments to norma_ipc_send.
   44  *      Options and timeout will be handled here, not by norma_ipc_send.
   45  *      [91/12/26  19:51:59  jsb]
   46  * 
   47  * Revision 2.18  92/01/03  20:13:05  dbg
   48  *      Removed THREAD_SHOULD_TERMINATE.
   49  *      [91/12/19            dbg]
   50  * 
   51  * Revision 2.17  91/12/15  10:40:33  jsb
   52  *      Added norma_ipc_finish_receiving call to support large in-line msgs.
   53  * 
   54  * Revision 2.16  91/12/14  14:27:10  jsb
   55  *      Removed ipc_fields.h hack.
   56  * 
   57  * Revision 2.15  91/11/14  16:56:07  rpd
   58  *      Picked up mysterious norma changes.
   59  *      [91/11/14            rpd]
   60  * 
   61  * Revision 2.14  91/08/28  11:13:34  jsb
   62  *      Added seqno argument to ipc_mqueue_receive.
   63  *      Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
   64  *      [91/08/10            rpd]
   65  *      Fixed norma_ipc_handoff technology.
   66  *      Renamed clport things to norma_ipc things.
   67  *      [91/08/15  08:23:17  jsb]
   68  * 
   69  * Revision 2.13  91/08/03  18:18:27  jsb
   70  *      Renamed replenish routine.
   71  *      [91/08/01  23:00:06  jsb]
   72  * 
   73  *      Removed obsolete include.
   74  *      Added option, timeout parameters to ipc_clport_send.
   75  *      [91/07/17  14:04:15  jsb]
   76  * 
   77  * Revision 2.12  91/06/25  10:27:34  rpd
   78  *      Added some wait_result assertions.
   79  *      [91/05/30            rpd]
   80  * 
   81  * Revision 2.11  91/06/17  15:46:18  jsb
   82  *      Renamed NORMA conditionals.
   83  *      [91/06/17  10:44:39  jsb]
   84  * 
   85  * Revision 2.10  91/06/06  17:06:06  jsb
   86  *      Added call to ip_unlock after calling ipc_clport_send.
   87  *      Added support for clport handoff.
   88  *      [91/06/06  16:05:12  jsb]
   89  * 
   90  * Revision 2.9  91/05/14  16:33:58  mrt
   91  *      Correcting copyright
   92  * 
   93  * Revision 2.8  91/03/16  14:48:18  rpd
   94  *      Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
   95  *      to thread_{go,will_wait,will_wait_with_timeout}.
   96  *      Replaced ipc_thread_block with thread_block.
   97  *      [91/02/17            rpd]
   98  * 
   99  * Revision 2.7  91/02/05  17:22:24  mrt
  100  *      Changed to new Mach copyright
  101  *      [91/02/01  15:46:33  mrt]
  102  * 
  103  * Revision 2.6  91/01/08  15:14:35  rpd
  104  *      Changed continuation argument to (void (*)()).
  105  *      [90/12/18            rpd]
  106  *      Reorganized ipc_mqueue_receive.
  107  *      [90/11/22            rpd]
  108  * 
  109  *      Minor cleanup.
  110  *      [90/11/11            rpd]
  111  * 
  112  * Revision 2.5  90/12/14  11:02:32  jsb
  113  *      Changed parameters in ipc_clport_send call.
  114  *      [90/12/13  21:20:13  jsb]
  115  * 
  116  * Revision 2.4  90/11/05  14:29:04  rpd
  117  *      Use new io_reference and io_release.
  118  *      Use new ip_reference and ip_release.
  119  *      [90/10/29            rpd]
  120  * 
  121  * Revision 2.3  90/09/28  16:54:58  jsb
  122  *      Added NORMA_IPC support.
  123  *      [90/09/28  14:03:24  jsb]
  124  * 
  125  * Revision 2.2  90/06/02  14:50:39  rpd
  126  *      Created for new IPC.
  127  *      [90/03/26  20:57:06  rpd]
  128  * 
  129  */
  130 /*
  131  *      File:   ipc/ipc_mqueue.c
  132  *      Author: Rich Draves
  133  *      Date:   1989
  134  *
  135  *      Functions to manipulate IPC message queues.
  136  */
  137 
  138 #include <norma_ipc.h>
  139 
  140 #include <mach/port.h>
  141 #include <mach/message.h>
  142 #include <kern/assert.h>
  143 #include <kern/counters.h>
  144 #include <kern/sched_prim.h>
  145 #include <kern/ipc_sched.h>
  146 #include <kern/ipc_kobject.h>
  147 #include <ipc/ipc_mqueue.h>
  148 #include <ipc/ipc_thread.h>
  149 #include <ipc/ipc_kmsg.h>
  150 #include <ipc/ipc_port.h>
  151 #include <ipc/ipc_pset.h>
  152 #include <ipc/ipc_space.h>
  153 #include <ipc/ipc_marequest.h>
  154 
  155 
  156 
  157 #if     NORMA_IPC
  158 extern ipc_mqueue_t     norma_ipc_handoff_mqueue;
  159 extern ipc_kmsg_t       norma_ipc_handoff_msg;
  160 extern mach_msg_size_t  norma_ipc_handoff_max_size;
  161 extern mach_msg_size_t  norma_ipc_handoff_msg_size;
  162 extern ipc_kmsg_t       norma_ipc_kmsg_accept();
  163 #endif  NORMA_IPC
  164 
  165 /*
  166  *      Routine:        ipc_mqueue_init
  167  *      Purpose:
  168  *              Initialize a newly-allocated message queue.
  169  */
  170 
  171 void
  172 ipc_mqueue_init(mqueue)
  173         ipc_mqueue_t mqueue;
  174 {
  175         imq_lock_init(mqueue);
  176         ipc_kmsg_queue_init(&mqueue->imq_messages);
  177         ipc_thread_queue_init(&mqueue->imq_threads);
  178 }
  179 
  180 /*
  181  *      Routine:        ipc_mqueue_move
  182  *      Purpose:
  183  *              Move messages from one queue (source) to another (dest).
  184  *              Only moves messages sent to the specified port.
  185  *      Conditions:
  186  *              Both queues must be locked.
  187  *              (This is sufficient to manipulate port->ip_seqno.)
  188  */
  189 
  190 void
  191 ipc_mqueue_move(dest, source, port)
  192         ipc_mqueue_t dest;
  193         ipc_mqueue_t source;
  194         ipc_port_t port;
  195 {
  196         ipc_kmsg_queue_t oldq, newq;
  197         ipc_thread_queue_t blockedq;
  198         ipc_kmsg_t kmsg, next;
  199         ipc_thread_t th;
  200 
  201         oldq = &source->imq_messages;
  202         newq = &dest->imq_messages;
  203         blockedq = &dest->imq_threads;
  204 
  205         for (kmsg = ipc_kmsg_queue_first(oldq);
  206              kmsg != IKM_NULL; kmsg = next) {
  207                 next = ipc_kmsg_queue_next(oldq, kmsg);
  208 
  209                 /* only move messages sent to port */
  210 
  211                 if (kmsg->ikm_header.msgh_remote_port != (mach_port_t) port)
  212                         continue;
  213 
  214                 ipc_kmsg_rmqueue(oldq, kmsg);
  215 
  216                 /* before adding kmsg to newq, check for a blocked receiver */
  217 
  218                 while ((th = ipc_thread_dequeue(blockedq)) != ITH_NULL) {
  219                         assert(ipc_kmsg_queue_empty(newq));
  220 
  221                         thread_go(th);
  222 
  223                         /* check if the receiver can handle the message */
  224 
  225                         if (kmsg->ikm_header.msgh_size <= th->ith_msize) {
  226                                 th->ith_state = MACH_MSG_SUCCESS;
  227                                 th->ith_kmsg = kmsg;
  228                                 th->ith_seqno = port->ip_seqno++;
  229 
  230                                 goto next_kmsg;
  231                         }
  232 
  233                         th->ith_state = MACH_RCV_TOO_LARGE;
  234                         th->ith_msize = kmsg->ikm_header.msgh_size;
  235                 }
  236 
  237                 /* didn't find a receiver to handle the message */
  238 
  239                 ipc_kmsg_enqueue(newq, kmsg);
  240             next_kmsg:;
  241         }
  242 }
  243 
  244 /*
  245  *      Routine:        ipc_mqueue_changed
  246  *      Purpose:
  247  *              Wake up receivers waiting in a message queue.
  248  *      Conditions:
  249  *              The message queue is locked.
  250  */
  251 
  252 void
  253 ipc_mqueue_changed(mqueue, mr)
  254         ipc_mqueue_t mqueue;
  255         mach_msg_return_t mr;
  256 {
  257         ipc_thread_t th;
  258 
  259         while ((th = ipc_thread_dequeue(&mqueue->imq_threads)) != ITH_NULL) {
  260                 th->ith_state = mr;
  261                 thread_go(th);
  262         }
  263 }
  264 
  265 /*
  266  *      Routine:        ipc_mqueue_send
  267  *      Purpose:
  268  *              Send a message to a port.  The message holds a reference
  269  *              for the destination port in the msgh_remote_port field.
  270  *
  271  *              If unsuccessful, the caller still has possession of
  272  *              the message and must do something with it.  If successful,
  273  *              the message is queued, given to a receiver, destroyed,
  274  *              or handled directly by the kernel via mach_msg.
  275  *      Conditions:
  276  *              Nothing locked.
  277  *      Returns:
  278  *              MACH_MSG_SUCCESS        The message was accepted.
  279  *              MACH_SEND_TIMED_OUT     Caller still has message.
  280  *              MACH_SEND_INTERRUPTED   Caller still has message.
  281  */
  282 
  283 mach_msg_return_t
  284 ipc_mqueue_send(kmsg, option, time_out)
  285         ipc_kmsg_t kmsg;
  286         mach_msg_option_t option;
  287         mach_msg_timeout_t time_out;
  288 {
  289         ipc_port_t port;
  290 
  291         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  292         assert(IP_VALID(port));
  293 
  294         ip_lock(port);
  295 
  296         if (port->ip_receiver == ipc_space_kernel) {
  297                 ipc_kmsg_t reply;
  298 
  299                 /*
  300                  *      We can check ip_receiver == ipc_space_kernel
  301                  *      before checking that the port is active because
  302                  *      ipc_port_dealloc_kernel clears ip_receiver
  303                  *      before destroying a kernel port.
  304                  */
  305 
  306                 assert(ip_active(port));
  307                 ip_unlock(port);
  308 
  309                 reply = ipc_kobject_server(kmsg);
  310                 if (reply != IKM_NULL)
  311                         ipc_mqueue_send_always(reply);
  312 
  313                 return MACH_MSG_SUCCESS;
  314         }
  315 
  316 #if     NORMA_IPC
  317         if (IP_NORMA_IS_PROXY(port)) {
  318                 mach_msg_return_t mr;
  319 
  320                 mr = norma_ipc_send(kmsg);
  321                 ip_unlock(port);
  322                 return mr;
  323         }
  324 #endif  NORMA_IPC
  325 
  326         for (;;) {
  327                 ipc_thread_t self;
  328 
  329                 /*
  330                  *      Can't deliver to a dead port.
  331                  *      However, we can pretend it got sent
  332                  *      and was then immediately destroyed.
  333                  */
  334 
  335                 if (!ip_active(port)) {
  336                         /*
  337                          *      We can't let ipc_kmsg_destroy deallocate
  338                          *      the port right, because we might end up
  339                          *      in an infinite loop trying to deliver
  340                          *      a send-once notification.
  341                          */
  342 
  343                         ip_release(port);
  344                         ip_check_unlock(port);
  345                         kmsg->ikm_header.msgh_remote_port = MACH_PORT_NULL;
  346 #if     NORMA_IPC
  347                         /* XXX until ipc_kmsg_destroy is fixed... */
  348                         norma_ipc_finish_receiving(&kmsg);
  349 #endif  NORMA_IPC
  350                         ipc_kmsg_destroy(kmsg);
  351                         return MACH_MSG_SUCCESS;
  352                 }
  353 
  354                 /*
  355                  *  Don't block if:
  356                  *      1) We're under the queue limit.
  357                  *      2) Caller used the MACH_SEND_ALWAYS internal option.
  358                  *      3) Message is sent to a send-once right.
  359                  */
  360 
  361                 if ((port->ip_msgcount < port->ip_qlimit) ||
  362                     (option & MACH_SEND_ALWAYS) ||
  363                     (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
  364                                                 MACH_MSG_TYPE_PORT_SEND_ONCE))
  365                         break;
  366 
  367                 /* must block waiting for queue to clear */
  368 
  369                 self = current_thread();
  370 
  371                 if (option & MACH_SEND_TIMEOUT) {
  372                         if (time_out == 0) {
  373                                 ip_unlock(port);
  374                                 return MACH_SEND_TIMED_OUT;
  375                         }
  376 
  377                         thread_will_wait_with_timeout(self, time_out);
  378                 } else
  379                         thread_will_wait(self);
  380 
  381                 ipc_thread_enqueue(&port->ip_blocked, self);
  382                 self->ith_state = MACH_SEND_IN_PROGRESS;
  383 
  384                 ip_unlock(port);
  385                 counter(c_ipc_mqueue_send_block++);
  386                 thread_block((void (*)()) 0);
  387                 ip_lock(port);
  388 
  389                 /* why did we wake up? */
  390 
  391                 if (self->ith_state == MACH_MSG_SUCCESS)
  392                         continue;
  393                 assert(self->ith_state == MACH_SEND_IN_PROGRESS);
  394 
  395                 /* take ourselves off blocked queue */
  396 
  397                 ipc_thread_rmqueue(&port->ip_blocked, self);
  398 
  399                 /*
  400                  *      Thread wakeup-reason field tells us why
  401                  *      the wait was interrupted.
  402                  */
  403 
  404                 switch (self->ith_wait_result) {
  405                     case THREAD_INTERRUPTED:
  406                         /* send was interrupted - give up */
  407 
  408                         ip_unlock(port);
  409                         return MACH_SEND_INTERRUPTED;
  410 
  411                     case THREAD_TIMED_OUT:
  412                         /* timeout expired */
  413 
  414                         assert(option & MACH_SEND_TIMEOUT);
  415                         time_out = 0;
  416                         break;
  417 
  418                     case THREAD_RESTART:
  419                     default:
  420 #if MACH_ASSERT
  421                         assert(!"ipc_mqueue_send");
  422 #else
  423                         panic("ipc_mqueue_send");
  424 #endif
  425                 }
  426         }
  427 
  428         if (kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
  429                 ip_unlock(port);
  430 
  431                 /* don't allow the creation of a circular loop */
  432 
  433 #if     NORMA_IPC
  434                 /* XXX until ipc_kmsg_destroy is fixed... */
  435                 norma_ipc_finish_receiving(&kmsg);
  436 #endif  NORMA_IPC
  437                 ipc_kmsg_destroy(kmsg);
  438                 return MACH_MSG_SUCCESS;
  439         }
  440 
  441     {
  442         ipc_mqueue_t mqueue;
  443         ipc_pset_t pset;
  444         ipc_thread_t receiver;
  445         ipc_thread_queue_t receivers;
  446 
  447         port->ip_msgcount++;
  448         assert(port->ip_msgcount > 0);
  449 
  450         pset = port->ip_pset;
  451         if (pset == IPS_NULL)
  452                 mqueue = &port->ip_messages;
  453         else
  454                 mqueue = &pset->ips_messages;
  455 
  456         imq_lock(mqueue);
  457         receivers = &mqueue->imq_threads;
  458 
  459         /*
  460          *      Can unlock the port now that the msg queue is locked
  461          *      and we know the port is active.  While the msg queue
  462          *      is locked, we have control of the kmsg, so the ref in
  463          *      it for the port is still good.  If the msg queue is in
  464          *      a set (dead or alive), then we're OK because the port
  465          *      is still a member of the set and the set won't go away
  466          *      until the port is taken out, which tries to lock the
  467          *      set's msg queue to remove the port's msgs.
  468          */
  469 
  470         ip_unlock(port);
  471 
  472         /* check for a receiver for the message */
  473 
  474 #if     NORMA_IPC
  475         if (mqueue == norma_ipc_handoff_mqueue) {
  476                 norma_ipc_handoff_msg = kmsg;
  477                 if (kmsg->ikm_header.msgh_size <= norma_ipc_handoff_max_size) {
  478                         imq_unlock(mqueue);
  479                         return MACH_MSG_SUCCESS;
  480                 }
  481                 norma_ipc_handoff_msg_size = kmsg->ikm_header.msgh_size;
  482         }
  483 #endif  NORMA_IPC
  484         for (;;) {
  485                 receiver = ipc_thread_queue_first(receivers);
  486                 if (receiver == ITH_NULL) {
  487                         /* no receivers; queue kmsg */
  488 
  489                         ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
  490                         imq_unlock(mqueue);
  491                         break;
  492                 }
  493 
  494                 ipc_thread_rmqueue_first_macro(receivers, receiver);
  495                 assert(ipc_kmsg_queue_empty(&mqueue->imq_messages));
  496 
  497                 if (kmsg->ikm_header.msgh_size <= receiver->ith_msize) {
  498                         /* got a successful receiver */
  499 
  500                         receiver->ith_state = MACH_MSG_SUCCESS;
  501                         receiver->ith_kmsg = kmsg;
  502                         receiver->ith_seqno = port->ip_seqno++;
  503                         imq_unlock(mqueue);
  504 
  505                         thread_go(receiver);
  506                         break;
  507                 }
  508 
  509                 receiver->ith_state = MACH_RCV_TOO_LARGE;
  510                 receiver->ith_msize = kmsg->ikm_header.msgh_size;
  511                 thread_go(receiver);
  512         }
  513     }
  514 
  515         return MACH_MSG_SUCCESS;
  516 }
  517 
  518 /*
  519  *      Routine:        ipc_mqueue_copyin
  520  *      Purpose:
  521  *              Convert a name in a space to a message queue.
  522  *      Conditions:
  523  *              Nothing locked.  If successful, the message queue
  524  *              is returned locked and caller gets a ref for the object.
  525  *              This ref ensures the continued existence of the queue.
  526  *      Returns:
  527  *              MACH_MSG_SUCCESS        Found a message queue.
  528  *              MACH_RCV_INVALID_NAME   The space is dead.
  529  *              MACH_RCV_INVALID_NAME   The name doesn't denote a right.
  530  *              MACH_RCV_INVALID_NAME
  531  *                      The denoted right is not receive or port set.
  532  *              MACH_RCV_IN_SET         Receive right is a member of a set.
  533  */
  534 
  535 mach_msg_return_t
  536 ipc_mqueue_copyin(space, name, mqueuep, objectp)
  537         ipc_space_t space;
  538         mach_port_t name;
  539         ipc_mqueue_t *mqueuep;
  540         ipc_object_t *objectp;
  541 {
  542         ipc_entry_t entry;
  543         ipc_entry_bits_t bits;
  544         ipc_object_t object;
  545         ipc_mqueue_t mqueue;
  546 
  547         is_read_lock(space);
  548         if (!space->is_active) {
  549                 is_read_unlock(space);
  550                 return MACH_RCV_INVALID_NAME;
  551         }
  552 
  553         entry = ipc_entry_lookup(space, name);
  554         if (entry == IE_NULL) {
  555                 is_read_unlock(space);
  556                 return MACH_RCV_INVALID_NAME;
  557         }
  558 
  559         bits = entry->ie_bits;
  560         object = entry->ie_object;
  561 
  562         if (bits & MACH_PORT_TYPE_RECEIVE) {
  563                 ipc_port_t port;
  564                 ipc_pset_t pset;
  565 
  566                 port = (ipc_port_t) object;
  567                 assert(port != IP_NULL);
  568 
  569                 ip_lock(port);
  570                 assert(ip_active(port));
  571                 assert(port->ip_receiver_name == name);
  572                 assert(port->ip_receiver == space);
  573                 is_read_unlock(space);
  574 
  575                 pset = port->ip_pset;
  576                 if (pset != IPS_NULL) {
  577                         ips_lock(pset);
  578                         if (ips_active(pset)) {
  579                                 ips_unlock(pset);
  580                                 ip_unlock(port);
  581                                 return MACH_RCV_IN_SET;
  582                         }
  583 
  584                         ipc_pset_remove(pset, port);
  585                         ips_check_unlock(pset);
  586                         assert(port->ip_pset == IPS_NULL);
  587                 }
  588 
  589                 mqueue = &port->ip_messages;
  590         } else if (bits & MACH_PORT_TYPE_PORT_SET) {
  591                 ipc_pset_t pset;
  592 
  593                 pset = (ipc_pset_t) object;
  594                 assert(pset != IPS_NULL);
  595 
  596                 ips_lock(pset);
  597                 assert(ips_active(pset));
  598                 assert(pset->ips_local_name == name);
  599                 is_read_unlock(space);
  600 
  601                 mqueue = &pset->ips_messages;
  602         } else {
  603                 is_read_unlock(space);
  604                 return MACH_RCV_INVALID_NAME;
  605         }
  606 
  607         /*
  608          *      At this point, the object is locked and active,
  609          *      the space is unlocked, and mqueue is initialized.
  610          */
  611 
  612         io_reference(object);
  613         imq_lock(mqueue);
  614         io_unlock(object);
  615 
  616         *objectp = object;
  617         *mqueuep = mqueue;
  618         return MACH_MSG_SUCCESS;
  619 }
  620 
  621 /*
  622  *      Routine:        ipc_mqueue_receive
  623  *      Purpose:
  624  *              Receive a message from a message queue.
  625  *
  626  *              If continuation is non-zero, then we might discard
  627  *              our kernel stack when we block.  We will continue
  628  *              after unblocking by executing continuation.
  629  *
  630  *              If resume is true, then we are resuming a receive
  631  *              operation after a blocked receive discarded our stack.
  632  *      Conditions:
  633  *              The message queue is locked; it is unlocked.
  634  *
  635  *              Our caller must hold a reference for the port or port set
  636  *              to which this queue belongs, to keep the queue
  637  *              from being deallocated.  Furthermore, the port or set
  638  *              must have been active when the queue was locked.
  639  *
  640  *              The kmsg is returned with clean header fields
  641  *              and with the circular bit turned off.
  642  *      Returns:
  643  *              MACH_MSG_SUCCESS        Message returned in kmsgp.
  644  *              MACH_RCV_TOO_LARGE      Message size returned in kmsgp.
  645  *              MACH_RCV_TIMED_OUT      No message obtained.
  646  *              MACH_RCV_INTERRUPTED    No message obtained.
  647  *              MACH_RCV_PORT_DIED      Port/set died; no message.
  648  *              MACH_RCV_PORT_CHANGED   Port moved into set; no msg.
  649  */
  650 
  651 mach_msg_return_t
  652 ipc_mqueue_receive(mqueue, option, max_size, time_out,
  653                    resume, continuation, kmsgp, seqnop)
  654         ipc_mqueue_t mqueue;
  655         mach_msg_option_t option;
  656         mach_msg_size_t max_size;
  657         mach_msg_timeout_t time_out;
  658         boolean_t resume;
  659         void (*continuation)();
  660         ipc_kmsg_t *kmsgp;
  661         mach_port_seqno_t *seqnop;
  662 {
  663         ipc_port_t port;
  664         ipc_kmsg_t kmsg;
  665         mach_port_seqno_t seqno;
  666 
  667     {
  668         ipc_kmsg_queue_t kmsgs = &mqueue->imq_messages;
  669         ipc_thread_t self = current_thread();
  670 
  671         if (resume)
  672                 goto after_thread_block;
  673 
  674         for (;;) {
  675                 kmsg = ipc_kmsg_queue_first(kmsgs);
  676 #if     NORMA_IPC
  677                 /*
  678                  * It may be possible to make this work even when a timeout
  679                  * is specified.
  680                  *
  681                  * Netipc_replenish should be moved somewhere else.
  682                  */
  683                 if (kmsg == IKM_NULL && ! (option & MACH_RCV_TIMEOUT)) {
  684                         netipc_replenish(FALSE);
  685                         *kmsgp = IKM_NULL;
  686                         kmsg = norma_ipc_kmsg_accept(mqueue, max_size,
  687                                                      (mach_msg_size_t *)kmsgp);
  688                         if (kmsg != IKM_NULL) {
  689                                 port = (ipc_port_t)
  690                                     kmsg->ikm_header.msgh_remote_port;
  691                                 seqno = port->ip_seqno++;
  692                                 break;
  693                         }
  694                         if (*kmsgp) {
  695                                 imq_unlock(mqueue);
  696                                 return MACH_RCV_TOO_LARGE;
  697                         }
  698                 }
  699 #endif  NORMA_IPC
  700                 if (kmsg != IKM_NULL) {
  701                         /* check space requirements */
  702 
  703                         if (kmsg->ikm_header.msgh_size > max_size) {
  704                                 * (mach_msg_size_t *) kmsgp =
  705                                         kmsg->ikm_header.msgh_size;
  706                                 imq_unlock(mqueue);
  707                                 return MACH_RCV_TOO_LARGE;
  708                         }
  709 
  710                         ipc_kmsg_rmqueue_first_macro(kmsgs, kmsg);
  711                         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  712                         seqno = port->ip_seqno++;
  713                         break;
  714                 }
  715 
  716                 /* must block waiting for a message */
  717 
  718                 if (option & MACH_RCV_TIMEOUT) {
  719                         if (time_out == 0) {
  720                                 imq_unlock(mqueue);
  721                                 return MACH_RCV_TIMED_OUT;
  722                         }
  723 
  724                         thread_will_wait_with_timeout(self, time_out);
  725                 } else
  726                         thread_will_wait(self);
  727 
  728                 ipc_thread_enqueue_macro(&mqueue->imq_threads, self);
  729                 self->ith_state = MACH_RCV_IN_PROGRESS;
  730                 self->ith_msize = max_size;
  731 
  732                 imq_unlock(mqueue);
  733                 if (continuation != (void (*)()) 0) {
  734                         counter(c_ipc_mqueue_receive_block_user++);
  735                         thread_block(continuation);
  736                 } else {
  737                         counter(c_ipc_mqueue_receive_block_kernel++);
  738                         thread_block((void (*)()) 0);
  739                 }
  740         after_thread_block:
  741                 imq_lock(mqueue);
  742 
  743                 /* why did we wake up? */
  744 
  745                 if (self->ith_state == MACH_MSG_SUCCESS) {
  746                         /* pick up the message that was handed to us */
  747 
  748                         kmsg = self->ith_kmsg;
  749                         seqno = self->ith_seqno;
  750                         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  751                         break;
  752                 }
  753 
  754                 switch (self->ith_state) {
  755                     case MACH_RCV_TOO_LARGE:
  756                         /* pick up size of the too-large message */
  757 
  758                         * (mach_msg_size_t *) kmsgp = self->ith_msize;
  759                         /* fall-through */
  760 
  761                     case MACH_RCV_PORT_DIED:
  762                     case MACH_RCV_PORT_CHANGED:
  763                         /* something bad happened to the port/set */
  764 
  765                         imq_unlock(mqueue);
  766                         return self->ith_state;
  767 
  768                     case MACH_RCV_IN_PROGRESS:
  769                         /*
  770                          *      Awakened for other than IPC completion.
  771                          *      Remove ourselves from the waiting queue,
  772                          *      then check the wakeup cause.
  773                          */
  774 
  775                         ipc_thread_rmqueue(&mqueue->imq_threads, self);
  776 
  777                         switch (self->ith_wait_result) {
  778                             case THREAD_INTERRUPTED:
  779                                 /* receive was interrupted - give up */
  780 
  781                                 imq_unlock(mqueue);
  782                                 return MACH_RCV_INTERRUPTED;
  783 
  784                             case THREAD_TIMED_OUT:
  785                                 /* timeout expired */
  786 
  787                                 assert(option & MACH_RCV_TIMEOUT);
  788                                 time_out = 0;
  789                                 break;
  790 
  791                             case THREAD_RESTART:
  792                             default:
  793 #if MACH_ASSERT
  794                                 assert(!"ipc_mqueue_receive");
  795 #else
  796                                 panic("ipc_mqueue_receive");
  797 #endif
  798                         }
  799                         break;
  800 
  801                     default:
  802 #if MACH_ASSERT
  803                         assert(!"ipc_mqueue_receive: strange ith_state");
  804 #else
  805                         panic("ipc_mqueue_receive: strange ith_state");
  806 #endif
  807                 }
  808         }
  809 
  810         /* we have a kmsg; unlock the msg queue */
  811 
  812         imq_unlock(mqueue);
  813         assert(kmsg->ikm_header.msgh_size <= max_size);
  814     }
  815 
  816     {
  817         ipc_marequest_t marequest;
  818 
  819         marequest = kmsg->ikm_marequest;
  820         if (marequest != IMAR_NULL) {
  821                 ipc_marequest_destroy(marequest);
  822                 kmsg->ikm_marequest = IMAR_NULL;
  823         }
  824         assert((kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) == 0);
  825 
  826         assert(port == (ipc_port_t) kmsg->ikm_header.msgh_remote_port);
  827         ip_lock(port);
  828 
  829         if (ip_active(port)) {
  830                 ipc_thread_queue_t senders;
  831                 ipc_thread_t sender;
  832 
  833                 assert(port->ip_msgcount > 0);
  834                 port->ip_msgcount--;
  835 
  836                 senders = &port->ip_blocked;
  837                 sender = ipc_thread_queue_first(senders);
  838 
  839                 if ((sender != ITH_NULL) &&
  840                     (port->ip_msgcount < port->ip_qlimit)) {
  841                         ipc_thread_rmqueue(senders, sender);
  842                         sender->ith_state = MACH_MSG_SUCCESS;
  843                         thread_go(sender);
  844                 }
  845         }
  846 
  847         ip_unlock(port);
  848     }
  849 
  850 #if     NORMA_IPC
  851         norma_ipc_finish_receiving(&kmsg);
  852 #endif  NORMA_IPC
  853         *kmsgp = kmsg;
  854         *seqnop = seqno;
  855         return MACH_MSG_SUCCESS;
  856 }

Cache object: 5bbd2449f959de96594beb9ee6eab0de


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.