The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/ipc/ipc_mqueue.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /* 
    2  * Mach Operating System
    3  * Copyright (c) 1993,1992,1991,1990,1989 Carnegie Mellon University
    4  * All Rights Reserved.
    5  * 
    6  * Permission to use, copy, modify and distribute this software and its
    7  * documentation is hereby granted, provided that both the copyright
    8  * notice and this permission notice appear in all copies of the
    9  * software, derivative works or modified versions, and any portions
   10  * thereof, and that both notices appear in supporting documentation.
   11  * 
   12  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
   13  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
   14  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
   15  * 
   16  * Carnegie Mellon requests users of this software to return to
   17  * 
   18  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
   19  *  School of Computer Science
   20  *  Carnegie Mellon University
   21  *  Pittsburgh PA 15213-3890
   22  * 
   23  * any improvements or extensions that they make and grant Carnegie Mellon
   24  * the rights to redistribute these changes.
   25  */
   26 /*
   27  * HISTORY
   28  * $Log:        ipc_mqueue.c,v $
   29  * Revision 2.22  93/11/17  16:58:43  dbg
   30  *      Fixed type of null continuation argument to thread_block.
   31  *      [93/06/03            dbg]
   32  * 
   33  *      Use proper type for continuations.
   34  *      [93/05/04            dbg]
   35  * 
   36  *      Added ANSI function prototypes.  Cleaned up return type of
   37  *      ipc_kmsg result parameter from ipc_mqueue_receive by using union
   38  *      type defined in ipc/ipc_types.h.
   39  *      [93/03/29            dbg]
   40  * 
   41  *      Include kern/sched_prim.h instead of kern/ipc_sched.h.
   42  *      [93/01/26            dbg]
   43  * 
   44  * Revision 2.21  92/08/03  17:34:47  jfriedl
   45  *      removed silly prototypes
   46  *      [92/08/02            jfriedl]
   47  * 
   48  * Revision 2.20  92/05/21  17:10:42  jfriedl
   49  *      Made correct for when assert is off.
   50  *      Also some cleanup to quiet gcc warnings.
   51  *      [92/05/16            jfriedl]
   52  * 
   53  * Revision 2.19  92/03/10  16:26:01  jsb
   54  *      Merged in norma branch changes as of NORMA_MK7:
   55  *      Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
   56  *      before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
   57  *      [92/01/17  14:35:03  jsb]
   58  *      Removed spurious arguments to norma_ipc_send.
   59  *      Options and timeout will be handled here, not by norma_ipc_send.
   60  *      [91/12/26  19:51:59  jsb]
   61  * 
   62  * Revision 2.18  92/01/03  20:13:05  dbg
   63  *      Removed THREAD_SHOULD_TERMINATE.
   64  *      [91/12/19            dbg]
   65  * 
   66  * Revision 2.17  91/12/15  10:40:33  jsb
   67  *      Added norma_ipc_finish_receiving call to support large in-line msgs.
   68  * 
   69  * Revision 2.16  91/12/14  14:27:10  jsb
   70  *      Removed ipc_fields.h hack.
   71  * 
   72  * Revision 2.15  91/11/14  16:56:07  rpd
   73  *      Picked up mysterious norma changes.
   74  *      [91/11/14            rpd]
   75  * 
   76  * Revision 2.14  91/08/28  11:13:34  jsb
   77  *      Added seqno argument to ipc_mqueue_receive.
   78  *      Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
   79  *      [91/08/10            rpd]
   80  *      Fixed norma_ipc_handoff technology.
   81  *      Renamed clport things to norma_ipc things.
   82  *      [91/08/15  08:23:17  jsb]
   83  * 
   84  * Revision 2.13  91/08/03  18:18:27  jsb
   85  *      Renamed replenish routine.
   86  *      [91/08/01  23:00:06  jsb]
   87  * 
   88  *      Removed obsolete include.
   89  *      Added option, timeout parameters to ipc_clport_send.
   90  *      [91/07/17  14:04:15  jsb]
   91  * 
   92  * Revision 2.12  91/06/25  10:27:34  rpd
   93  *      Added some wait_result assertions.
   94  *      [91/05/30            rpd]
   95  * 
   96  * Revision 2.11  91/06/17  15:46:18  jsb
   97  *      Renamed NORMA conditionals.
   98  *      [91/06/17  10:44:39  jsb]
   99  * 
  100  * Revision 2.10  91/06/06  17:06:06  jsb
  101  *      Added call to ip_unlock after calling ipc_clport_send.
  102  *      Added support for clport handoff.
  103  *      [91/06/06  16:05:12  jsb]
  104  * 
  105  * Revision 2.9  91/05/14  16:33:58  mrt
  106  *      Correcting copyright
  107  * 
  108  * Revision 2.8  91/03/16  14:48:18  rpd
  109  *      Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
  110  *      to thread_{go,will_wait,will_wait_with_timeout}.
  111  *      Replaced ipc_thread_block with thread_block.
  112  *      [91/02/17            rpd]
  113  * 
  114  * Revision 2.7  91/02/05  17:22:24  mrt
  115  *      Changed to new Mach copyright
  116  *      [91/02/01  15:46:33  mrt]
  117  * 
  118  * Revision 2.6  91/01/08  15:14:35  rpd
  119  *      Changed continuation argument to (void (*)()).
  120  *      [90/12/18            rpd]
  121  *      Reorganized ipc_mqueue_receive.
  122  *      [90/11/22            rpd]
  123  * 
  124  *      Minor cleanup.
  125  *      [90/11/11            rpd]
  126  * 
  127  * Revision 2.5  90/12/14  11:02:32  jsb
  128  *      Changed parameters in ipc_clport_send call.
  129  *      [90/12/13  21:20:13  jsb]
  130  * 
  131  * Revision 2.4  90/11/05  14:29:04  rpd
  132  *      Use new io_reference and io_release.
  133  *      Use new ip_reference and ip_release.
  134  *      [90/10/29            rpd]
  135  * 
  136  * Revision 2.3  90/09/28  16:54:58  jsb
  137  *      Added NORMA_IPC support.
  138  *      [90/09/28  14:03:24  jsb]
  139  * 
  140  * Revision 2.2  90/06/02  14:50:39  rpd
  141  *      Created for new IPC.
  142  *      [90/03/26  20:57:06  rpd]
  143  * 
  144  */
  145 /*
  146  *      File:   ipc/ipc_mqueue.c
  147  *      Author: Rich Draves
  148  *      Date:   1989
  149  *
  150  *      Functions to manipulate IPC message queues.
  151  */
  152 
  153 #include <norma_ipc.h>
  154 
  155 #include <mach/port.h>
  156 #include <mach/message.h>
  157 #include <kern/assert.h>
  158 #include <kern/counters.h>
  159 #include <kern/sched_prim.h>
  160 #include <kern/ipc_kobject.h>
  161 #include <ipc/ipc_mqueue.h>
  162 #include <ipc/ipc_thread.h>
  163 #include <ipc/ipc_kmsg.h>
  164 #include <ipc/ipc_port.h>
  165 #include <ipc/ipc_pset.h>
  166 #include <ipc/ipc_space.h>
  167 #include <ipc/ipc_marequest.h>
  168 
  169 
  170 
  171 #if     NORMA_IPC
  172 extern ipc_mqueue_t     norma_ipc_handoff_mqueue;
  173 extern ipc_kmsg_t       norma_ipc_handoff_msg;
  174 extern mach_msg_size_t  norma_ipc_handoff_max_size;
  175 extern mach_msg_size_t  norma_ipc_handoff_msg_size;
  176 extern ipc_kmsg_t       norma_ipc_kmsg_accept();
  177 #endif  /* NORMA_IPC */
  178 
  179 /*
  180  *      Routine:        ipc_mqueue_init
  181  *      Purpose:
  182  *              Initialize a newly-allocated message queue.
  183  */
  184 
  185 void
  186 ipc_mqueue_init(
  187         ipc_mqueue_t mqueue)
  188 {
  189         imq_lock_init(mqueue);
  190         ipc_kmsg_queue_init(&mqueue->imq_messages);
  191         ipc_thread_queue_init(&mqueue->imq_threads);
  192 }
  193 
  194 /*
  195  *      Routine:        ipc_mqueue_move
  196  *      Purpose:
  197  *              Move messages from one queue (source) to another (dest).
  198  *              Only moves messages sent to the specified port.
  199  *      Conditions:
  200  *              Both queues must be locked.
  201  *              (This is sufficient to manipulate port->ip_seqno.)
  202  */
  203 
  204 void
  205 ipc_mqueue_move(
  206         ipc_mqueue_t    dest,
  207         ipc_mqueue_t    source,
  208         ipc_port_t      port)
  209 {
  210         ipc_kmsg_queue_t oldq, newq;
  211         ipc_thread_queue_t blockedq;
  212         ipc_kmsg_t kmsg, next;
  213         ipc_thread_t th;
  214 
  215         oldq = &source->imq_messages;
  216         newq = &dest->imq_messages;
  217         blockedq = &dest->imq_threads;
  218 
  219         for (kmsg = ipc_kmsg_queue_first(oldq);
  220              kmsg != IKM_NULL; kmsg = next) {
  221                 next = ipc_kmsg_queue_next(oldq, kmsg);
  222 
  223                 /* only move messages sent to port */
  224 
  225                 if (kmsg->ikm_header.msgh_remote_port != (mach_port_t) port)
  226                         continue;
  227 
  228                 ipc_kmsg_rmqueue(oldq, kmsg);
  229 
  230                 /* before adding kmsg to newq, check for a blocked receiver */
  231 
  232                 while ((th = ipc_thread_dequeue(blockedq)) != ITH_NULL) {
  233                         assert(ipc_kmsg_queue_empty(newq));
  234 
  235                         thread_go(th);
  236 
  237                         /* check if the receiver can handle the message */
  238 
  239                         if (kmsg->ikm_header.msgh_size <= th->ith_msize) {
  240                                 th->ith_state = MACH_MSG_SUCCESS;
  241                                 th->ith_kmsg = kmsg;
  242                                 th->ith_seqno = port->ip_seqno++;
  243 
  244                                 goto next_kmsg;
  245                         }
  246 
  247                         th->ith_state = MACH_RCV_TOO_LARGE;
  248                         th->ith_msize = kmsg->ikm_header.msgh_size;
  249                 }
  250 
  251                 /* didn't find a receiver to handle the message */
  252 
  253                 ipc_kmsg_enqueue(newq, kmsg);
  254             next_kmsg:;
  255         }
  256 }
  257 
  258 /*
  259  *      Routine:        ipc_mqueue_changed
  260  *      Purpose:
  261  *              Wake up receivers waiting in a message queue.
  262  *      Conditions:
  263  *              The message queue is locked.
  264  */
  265 
  266 void
  267 ipc_mqueue_changed(
  268         ipc_mqueue_t mqueue,
  269         mach_msg_return_t mr)
  270 {
  271         ipc_thread_t th;
  272 
  273         while ((th = ipc_thread_dequeue(&mqueue->imq_threads)) != ITH_NULL) {
  274                 th->ith_state = mr;
  275                 thread_go(th);
  276         }
  277 }
  278 
  279 /*
  280  *      Routine:        ipc_mqueue_send
  281  *      Purpose:
  282  *              Send a message to a port.  The message holds a reference
  283  *              for the destination port in the msgh_remote_port field.
  284  *
  285  *              If unsuccessful, the caller still has possession of
  286  *              the message and must do something with it.  If successful,
  287  *              the message is queued, given to a receiver, destroyed,
  288  *              or handled directly by the kernel via mach_msg.
  289  *      Conditions:
  290  *              Nothing locked.
  291  *      Returns:
  292  *              MACH_MSG_SUCCESS        The message was accepted.
  293  *              MACH_SEND_TIMED_OUT     Caller still has message.
  294  *              MACH_SEND_INTERRUPTED   Caller still has message.
  295  */
  296 
  297 mach_msg_return_t
  298 ipc_mqueue_send(
  299         ipc_kmsg_t         kmsg,
  300         mach_msg_option_t  option,
  301         mach_msg_timeout_t time_out)
  302 {
  303         ipc_port_t port;
  304 
  305         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  306         assert(IP_VALID(port));
  307 
  308         ip_lock(port);
  309 
  310         if (port->ip_receiver == ipc_space_kernel) {
  311                 ipc_kmsg_t reply;
  312 
  313                 /*
  314                  *      We can check ip_receiver == ipc_space_kernel
  315                  *      before checking that the port is active because
  316                  *      ipc_port_dealloc_kernel clears ip_receiver
  317                  *      before destroying a kernel port.
  318                  */
  319 
  320                 assert(ip_active(port));
  321                 ip_unlock(port);
  322 
  323                 reply = ipc_kobject_server(kmsg);
  324                 if (reply != IKM_NULL)
  325                         ipc_mqueue_send_always(reply);
  326 
  327                 return MACH_MSG_SUCCESS;
  328         }
  329 
  330 #if     NORMA_IPC
  331         if (IP_NORMA_IS_PROXY(port)) {
  332                 mach_msg_return_t mr;
  333 
  334                 mr = norma_ipc_send(kmsg);
  335                 ip_unlock(port);
  336                 return mr;
  337         }
  338 #endif  /* NORMA_IPC */
  339 
  340         for (;;) {
  341                 ipc_thread_t self;
  342 
  343                 /*
  344                  *      Can't deliver to a dead port.
  345                  *      However, we can pretend it got sent
  346                  *      and was then immediately destroyed.
  347                  */
  348 
  349                 if (!ip_active(port)) {
  350                         /*
  351                          *      We can't let ipc_kmsg_destroy deallocate
  352                          *      the port right, because we might end up
  353                          *      in an infinite loop trying to deliver
  354                          *      a send-once notification.
  355                          */
  356 
  357                         ip_release(port);
  358                         ip_check_unlock(port);
  359                         kmsg->ikm_header.msgh_remote_port = MACH_PORT_NULL;
  360 #if     NORMA_IPC
  361                         /* XXX until ipc_kmsg_destroy is fixed... */
  362                         norma_ipc_finish_receiving(&kmsg);
  363 #endif  /* NORMA_IPC */
  364                         ipc_kmsg_destroy(kmsg);
  365                         return MACH_MSG_SUCCESS;
  366                 }
  367 
  368                 /*
  369                  *  Don't block if:
  370                  *      1) We're under the queue limit.
  371                  *      2) Caller used the MACH_SEND_ALWAYS internal option.
  372                  *      3) Message is sent to a send-once right.
  373                  */
  374 
  375                 if ((port->ip_msgcount < port->ip_qlimit) ||
  376                     (option & MACH_SEND_ALWAYS) ||
  377                     (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header.msgh_bits) ==
  378                                                 MACH_MSG_TYPE_PORT_SEND_ONCE))
  379                         break;
  380 
  381                 /* must block waiting for queue to clear */
  382 
  383                 self = current_thread();
  384 
  385                 if (option & MACH_SEND_TIMEOUT) {
  386                         if (time_out == 0) {
  387                                 ip_unlock(port);
  388                                 return MACH_SEND_TIMED_OUT;
  389                         }
  390 
  391                         thread_will_wait_with_timeout(self, time_out);
  392                 } else
  393                         thread_will_wait(self);
  394 
  395                 ipc_thread_enqueue(&port->ip_blocked, self);
  396                 self->ith_state = MACH_SEND_IN_PROGRESS;
  397 
  398                 ip_unlock(port);
  399                 counter(c_ipc_mqueue_send_block++);
  400                 thread_block(CONTINUE_NULL);
  401                 ip_lock(port);
  402 
  403                 /* why did we wake up? */
  404 
  405                 if (self->ith_state == MACH_MSG_SUCCESS)
  406                         continue;
  407                 assert(self->ith_state == MACH_SEND_IN_PROGRESS);
  408 
  409                 /* take ourselves off blocked queue */
  410 
  411                 ipc_thread_rmqueue(&port->ip_blocked, self);
  412 
  413                 /*
  414                  *      Thread wakeup-reason field tells us why
  415                  *      the wait was interrupted.
  416                  */
  417 
  418                 switch (self->ith_wait_result) {
  419                     case THREAD_INTERRUPTED:
  420                         /* send was interrupted - give up */
  421 
  422                         ip_unlock(port);
  423                         return MACH_SEND_INTERRUPTED;
  424 
  425                     case THREAD_TIMED_OUT:
  426                         /* timeout expired */
  427 
  428                         assert(option & MACH_SEND_TIMEOUT);
  429                         time_out = 0;
  430                         break;
  431 
  432                     case THREAD_RESTART:
  433                     default:
  434 #if MACH_ASSERT
  435                         assert(!"ipc_mqueue_send");
  436 #else
  437                         panic("ipc_mqueue_send");
  438 #endif
  439                 }
  440         }
  441 
  442         if (kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
  443                 ip_unlock(port);
  444 
  445                 /* don't allow the creation of a circular loop */
  446 
  447 #if     NORMA_IPC
  448                 /* XXX until ipc_kmsg_destroy is fixed... */
  449                 norma_ipc_finish_receiving(&kmsg);
  450 #endif  /* NORMA_IPC */
  451                 ipc_kmsg_destroy(kmsg);
  452                 return MACH_MSG_SUCCESS;
  453         }
  454 
  455     {
  456         ipc_mqueue_t mqueue;
  457         ipc_pset_t pset;
  458         ipc_thread_t receiver;
  459         ipc_thread_queue_t receivers;
  460 
  461         port->ip_msgcount++;
  462         assert(port->ip_msgcount > 0);
  463 
  464         pset = port->ip_pset;
  465         if (pset == IPS_NULL)
  466                 mqueue = &port->ip_messages;
  467         else
  468                 mqueue = &pset->ips_messages;
  469 
  470         imq_lock(mqueue);
  471         receivers = &mqueue->imq_threads;
  472 
  473         /*
  474          *      Can unlock the port now that the msg queue is locked
  475          *      and we know the port is active.  While the msg queue
  476          *      is locked, we have control of the kmsg, so the ref in
  477          *      it for the port is still good.  If the msg queue is in
  478          *      a set (dead or alive), then we're OK because the port
  479          *      is still a member of the set and the set won't go away
  480          *      until the port is taken out, which tries to lock the
  481          *      set's msg queue to remove the port's msgs.
  482          */
  483 
  484         ip_unlock(port);
  485 
  486         /* check for a receiver for the message */
  487 
  488 #if     NORMA_IPC
  489         if (mqueue == norma_ipc_handoff_mqueue) {
  490                 norma_ipc_handoff_msg = kmsg;
  491                 if (kmsg->ikm_header.msgh_size <= norma_ipc_handoff_max_size) {
  492                         imq_unlock(mqueue);
  493                         return MACH_MSG_SUCCESS;
  494                 }
  495                 norma_ipc_handoff_msg_size = kmsg->ikm_header.msgh_size;
  496         }
  497 #endif  /* NORMA_IPC */
  498 
  499         for (;;) {
  500                 receiver = ipc_thread_queue_first(receivers);
  501                 if (receiver == ITH_NULL) {
  502                         /* no receivers; queue kmsg */
  503 
  504                         ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
  505                         imq_unlock(mqueue);
  506                         break;
  507                 }
  508 
  509                 ipc_thread_rmqueue_first_macro(receivers, receiver);
  510                 assert(ipc_kmsg_queue_empty(&mqueue->imq_messages));
  511 
  512                 if (kmsg->ikm_header.msgh_size <= receiver->ith_msize) {
  513                         /* got a successful receiver */
  514 
  515                         receiver->ith_state = MACH_MSG_SUCCESS;
  516                         receiver->ith_kmsg = kmsg;
  517                         receiver->ith_seqno = port->ip_seqno++;
  518                         imq_unlock(mqueue);
  519 
  520                         thread_go(receiver);
  521                         break;
  522                 }
  523 
  524                 receiver->ith_state = MACH_RCV_TOO_LARGE;
  525                 receiver->ith_msize = kmsg->ikm_header.msgh_size;
  526                 thread_go(receiver);
  527         }
  528     }
  529 
  530         return MACH_MSG_SUCCESS;
  531 }
  532 
  533 /*
  534  *      Routine:        ipc_mqueue_copyin
  535  *      Purpose:
  536  *              Convert a name in a space to a message queue.
  537  *      Conditions:
  538  *              Nothing locked.  If successful, the message queue
  539  *              is returned locked and caller gets a ref for the object.
  540  *              This ref ensures the continued existence of the queue.
  541  *      Returns:
  542  *              MACH_MSG_SUCCESS        Found a message queue.
  543  *              MACH_RCV_INVALID_NAME   The space is dead.
  544  *              MACH_RCV_INVALID_NAME   The name doesn't denote a right.
  545  *              MACH_RCV_INVALID_NAME
  546  *                      The denoted right is not receive or port set.
  547  *              MACH_RCV_IN_SET         Receive right is a member of a set.
  548  */
  549 
  550 mach_msg_return_t
  551 ipc_mqueue_copyin(
  552         ipc_space_t space,
  553         mach_port_t name,
  554         ipc_mqueue_t *mqueuep,
  555         ipc_object_t *objectp)
  556 {
  557         ipc_entry_t entry;
  558         ipc_entry_bits_t bits;
  559         ipc_object_t object;
  560         ipc_mqueue_t mqueue;
  561 
  562         is_read_lock(space);
  563         if (!space->is_active) {
  564                 is_read_unlock(space);
  565                 return MACH_RCV_INVALID_NAME;
  566         }
  567 
  568         entry = ipc_entry_lookup(space, name);
  569         if (entry == IE_NULL) {
  570                 is_read_unlock(space);
  571                 return MACH_RCV_INVALID_NAME;
  572         }
  573 
  574         bits = entry->ie_bits;
  575         object = entry->ie_object;
  576 
  577         if (bits & MACH_PORT_TYPE_RECEIVE) {
  578                 ipc_port_t port;
  579                 ipc_pset_t pset;
  580 
  581                 port = (ipc_port_t) object;
  582                 assert(port != IP_NULL);
  583 
  584                 ip_lock(port);
  585                 assert(ip_active(port));
  586                 assert(port->ip_receiver_name == name);
  587                 assert(port->ip_receiver == space);
  588                 is_read_unlock(space);
  589 
  590                 pset = port->ip_pset;
  591                 if (pset != IPS_NULL) {
  592                         ips_lock(pset);
  593                         if (ips_active(pset)) {
  594                                 ips_unlock(pset);
  595                                 ip_unlock(port);
  596                                 return MACH_RCV_IN_SET;
  597                         }
  598 
  599                         ipc_pset_remove(pset, port);
  600                         ips_check_unlock(pset);
  601                         assert(port->ip_pset == IPS_NULL);
  602                 }
  603 
  604                 mqueue = &port->ip_messages;
  605         } else if (bits & MACH_PORT_TYPE_PORT_SET) {
  606                 ipc_pset_t pset;
  607 
  608                 pset = (ipc_pset_t) object;
  609                 assert(pset != IPS_NULL);
  610 
  611                 ips_lock(pset);
  612                 assert(ips_active(pset));
  613                 assert(pset->ips_local_name == name);
  614                 is_read_unlock(space);
  615 
  616                 mqueue = &pset->ips_messages;
  617         } else {
  618                 is_read_unlock(space);
  619                 return MACH_RCV_INVALID_NAME;
  620         }
  621 
  622         /*
  623          *      At this point, the object is locked and active,
  624          *      the space is unlocked, and mqueue is initialized.
  625          */
  626 
  627         io_reference(object);
  628         imq_lock(mqueue);
  629         io_unlock(object);
  630 
  631         *objectp = object;
  632         *mqueuep = mqueue;
  633         return MACH_MSG_SUCCESS;
  634 }
  635 
  636 /*
  637  *      Routine:        ipc_mqueue_receive
  638  *      Purpose:
  639  *              Receive a message from a message queue.
  640  *
  641  *              If continuation is non-zero, then we might discard
  642  *              our kernel stack when we block.  We will continue
  643  *              after unblocking by executing continuation.
  644  *
  645  *              If resume is true, then we are resuming a receive
  646  *              operation after a blocked receive discarded our stack.
  647  *      Conditions:
  648  *              The message queue is locked; it is unlocked.
  649  *
  650  *              Our caller must hold a reference for the port or port set
  651  *              to which this queue belongs, to keep the queue
  652  *              from being deallocated.  Furthermore, the port or set
  653  *              must have been active when the queue was locked.
  654  *
  655  *              The kmsg is returned with clean header fields
  656  *              and with the circular bit turned off.
  657  *      Returns:
  658  *              MACH_MSG_SUCCESS        Message returned in kmsg_retp.
  659  *              MACH_RCV_TOO_LARGE      Message size returned in kmsg_retp.
  660  *              MACH_RCV_TIMED_OUT      No message obtained.
  661  *              MACH_RCV_INTERRUPTED    No message obtained.
  662  *              MACH_RCV_PORT_DIED      Port/set died; no message.
  663  *              MACH_RCV_PORT_CHANGED   Port moved into set; no msg.
  664  */
  665 
  666 mach_msg_return_t
  667 ipc_mqueue_receive(
  668         ipc_mqueue_t            mqueue,
  669         mach_msg_option_t       option,
  670         mach_msg_size_t         max_size,
  671         mach_msg_timeout_t      time_out,
  672         boolean_t               resume,
  673         continuation_t          continuation,
  674         union ipc_kmsg_return   *kmsg_retp,
  675         mach_port_seqno_t       *seqnop)
  676 {
  677         ipc_port_t port;
  678         ipc_kmsg_t kmsg;
  679         mach_port_seqno_t seqno;
  680 
  681     {
  682         ipc_kmsg_queue_t kmsgs = &mqueue->imq_messages;
  683         ipc_thread_t self = current_thread();
  684 
  685         if (resume)
  686                 goto after_thread_block;
  687 
  688         for (;;) {
  689                 kmsg = ipc_kmsg_queue_first(kmsgs);
  690 #if     NORMA_IPC
  691                 /*
  692                  * It may be possible to make this work even when a timeout
  693                  * is specified.
  694                  *
  695                  * Netipc_replenish should be moved somewhere else.
  696                  */
  697                 if (kmsg == IKM_NULL && ! (option & MACH_RCV_TIMEOUT)) {
  698                         netipc_replenish(FALSE);
  699                         kmsg_retp->kmsg = IKM_NULL;
  700                         kmsg = norma_ipc_kmsg_accept(mqueue, max_size,
  701                                                      &kmsg_retp->msize);
  702                         if (kmsg != IKM_NULL) {
  703                                 port = (ipc_port_t)
  704                                     kmsg->ikm_header.msgh_remote_port;
  705                                 seqno = port->ip_seqno++;
  706                                 break;
  707                         }
  708                         if (kmsg_retp->msize) {
  709                                 imq_unlock(mqueue);
  710                                 return MACH_RCV_TOO_LARGE;
  711                         }
  712                 }
  713 #endif  /* NORMA_IPC */
  714                 if (kmsg != IKM_NULL) {
  715                         /* check space requirements */
  716 
  717                         if (kmsg->ikm_header.msgh_size > max_size) {
  718                                 kmsg_retp->msize = kmsg->ikm_header.msgh_size;
  719                                 imq_unlock(mqueue);
  720                                 return MACH_RCV_TOO_LARGE;
  721                         }
  722 
  723                         ipc_kmsg_rmqueue_first_macro(kmsgs, kmsg);
  724                         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  725                         seqno = port->ip_seqno++;
  726                         break;
  727                 }
  728 
  729                 /* must block waiting for a message */
  730 
  731                 if (option & MACH_RCV_TIMEOUT) {
  732                         if (time_out == 0) {
  733                                 imq_unlock(mqueue);
  734                                 return MACH_RCV_TIMED_OUT;
  735                         }
  736 
  737                         thread_will_wait_with_timeout(self, time_out);
  738                 } else
  739                         thread_will_wait(self);
  740 
  741                 ipc_thread_enqueue_macro(&mqueue->imq_threads, self);
  742                 self->ith_state = MACH_RCV_IN_PROGRESS;
  743                 self->ith_msize = max_size;
  744 
  745                 imq_unlock(mqueue);
  746                 if (continuation != CONTINUE_NULL) {
  747                         counter(c_ipc_mqueue_receive_block_user++);
  748                         thread_block(continuation);
  749                 } else {
  750                         counter(c_ipc_mqueue_receive_block_kernel++);
  751                         thread_block(CONTINUE_NULL);
  752                 }
  753         after_thread_block:
  754                 imq_lock(mqueue);
  755 
  756                 /* why did we wake up? */
  757 
  758                 if (self->ith_state == MACH_MSG_SUCCESS) {
  759                         /* pick up the message that was handed to us */
  760 
  761                         kmsg = self->ith_kmsg;
  762                         seqno = self->ith_seqno;
  763                         port = (ipc_port_t) kmsg->ikm_header.msgh_remote_port;
  764                         break;
  765                 }
  766 
  767                 switch (self->ith_state) {
  768                     case MACH_RCV_TOO_LARGE:
  769                         /* pick up size of the too-large message */
  770 
  771                         kmsg_retp->msize = self->ith_msize;
  772                         /* fall-through */
  773 
  774                     case MACH_RCV_PORT_DIED:
  775                     case MACH_RCV_PORT_CHANGED:
  776                         /* something bad happened to the port/set */
  777 
  778                         imq_unlock(mqueue);
  779                         return self->ith_state;
  780 
  781                     case MACH_RCV_IN_PROGRESS:
  782                         /*
  783                          *      Awakened for other than IPC completion.
  784                          *      Remove ourselves from the waiting queue,
  785                          *      then check the wakeup cause.
  786                          */
  787 
  788                         ipc_thread_rmqueue(&mqueue->imq_threads, self);
  789 
  790                         switch (self->ith_wait_result) {
  791                             case THREAD_INTERRUPTED:
  792                                 /* receive was interrupted - give up */
  793 
  794                                 imq_unlock(mqueue);
  795                                 return MACH_RCV_INTERRUPTED;
  796 
  797                             case THREAD_TIMED_OUT:
  798                                 /* timeout expired */
  799 
  800                                 assert(option & MACH_RCV_TIMEOUT);
  801                                 time_out = 0;
  802                                 break;
  803 
  804                             case THREAD_RESTART:
  805                             default:
  806 #if MACH_ASSERT
  807                                 assert(!"ipc_mqueue_receive");
  808 #else
  809                                 panic("ipc_mqueue_receive");
  810 #endif
  811                         }
  812                         break;
  813 
  814                     default:
  815 #if MACH_ASSERT
  816                         assert(!"ipc_mqueue_receive: strange ith_state");
  817 #else
  818                         panic("ipc_mqueue_receive: strange ith_state");
  819 #endif
  820                 }
  821         }
  822 
  823         /* we have a kmsg; unlock the msg queue */
  824 
  825         imq_unlock(mqueue);
  826         assert(kmsg->ikm_header.msgh_size <= max_size);
  827     }
  828 
  829     {
  830         ipc_marequest_t marequest;
  831 
  832         marequest = kmsg->ikm_marequest;
  833         if (marequest != IMAR_NULL) {
  834                 ipc_marequest_destroy(marequest);
  835                 kmsg->ikm_marequest = IMAR_NULL;
  836         }
  837         assert((kmsg->ikm_header.msgh_bits & MACH_MSGH_BITS_CIRCULAR) == 0);
  838 
  839         assert(port == (ipc_port_t) kmsg->ikm_header.msgh_remote_port);
  840         ip_lock(port);
  841 
  842         if (ip_active(port)) {
  843                 ipc_thread_queue_t senders;
  844                 ipc_thread_t sender;
  845 
  846                 assert(port->ip_msgcount > 0);
  847                 port->ip_msgcount--;
  848 
  849                 senders = &port->ip_blocked;
  850                 sender = ipc_thread_queue_first(senders);
  851 
  852                 if ((sender != ITH_NULL) &&
  853                     (port->ip_msgcount < port->ip_qlimit)) {
  854                         ipc_thread_rmqueue(senders, sender);
  855                         sender->ith_state = MACH_MSG_SUCCESS;
  856                         thread_go(sender);
  857                 }
  858         }
  859 
  860         ip_unlock(port);
  861     }
  862 
  863 #if     NORMA_IPC
  864         norma_ipc_finish_receiving(&kmsg);
  865 #endif  /* NORMA_IPC */
  866         kmsg_retp->kmsg = kmsg;
  867         *seqnop = seqno;
  868         return MACH_MSG_SUCCESS;
  869 }

Cache object: df726e4573fd9c683eca4e0bf4683afa


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.