The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
    2 
    3 /*-
    4  * SPDX-License-Identifier: BSD-3-Clause
    5  *
    6  * Copyright (c) 2009, Sun Microsystems, Inc.
    7  * All rights reserved.
    8  *
    9  * Redistribution and use in source and binary forms, with or without 
   10  * modification, are permitted provided that the following conditions are met:
   11  * - Redistributions of source code must retain the above copyright notice, 
   12  *   this list of conditions and the following disclaimer.
   13  * - Redistributions in binary form must reproduce the above copyright notice, 
   14  *   this list of conditions and the following disclaimer in the documentation 
   15  *   and/or other materials provided with the distribution.
   16  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
   17  *   contributors may be used to endorse or promote products derived 
   18  *   from this software without specific prior written permission.
   19  * 
   20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
   21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
   22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
   23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
   24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
   25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
   26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
   27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
   29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
   30  * POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #if defined(LIBC_SCCS) && !defined(lint)
   34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
   35 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
   36 #endif
   37 #include <sys/cdefs.h>
   38 __FBSDID("$FreeBSD$");
   39 
   40 /*
   41  * svc.c, Server-side remote procedure call interface.
   42  *
   43  * There are two sets of procedures here.  The xprt routines are
   44  * for handling transport handles.  The svc routines handle the
   45  * list of service routines.
   46  *
   47  * Copyright (C) 1984, Sun Microsystems, Inc.
   48  */
   49 
   50 #include <sys/param.h>
   51 #include <sys/lock.h>
   52 #include <sys/kernel.h>
   53 #include <sys/kthread.h>
   54 #include <sys/malloc.h>
   55 #include <sys/mbuf.h>
   56 #include <sys/mutex.h>
   57 #include <sys/proc.h>
   58 #include <sys/queue.h>
   59 #include <sys/socketvar.h>
   60 #include <sys/systm.h>
   61 #include <sys/smp.h>
   62 #include <sys/sx.h>
   63 #include <sys/ucred.h>
   64 
   65 #include <rpc/rpc.h>
   66 #include <rpc/rpcb_clnt.h>
   67 #include <rpc/replay.h>
   68 
   69 #include <rpc/rpc_com.h>
   70 
   71 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
   72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
   73 
   74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
   75     char *);
   76 static void svc_new_thread(SVCGROUP *grp);
   77 static void xprt_unregister_locked(SVCXPRT *xprt);
   78 static void svc_change_space_used(SVCPOOL *pool, long delta);
   79 static bool_t svc_request_space_available(SVCPOOL *pool);
   80 static void svcpool_cleanup(SVCPOOL *pool);
   81 
   82 /* ***************  SVCXPRT related stuff **************** */
   83 
   84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
   85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
   86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
   87 
   88 SVCPOOL*
   89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
   90 {
   91         SVCPOOL *pool;
   92         SVCGROUP *grp;
   93         int g;
   94 
   95         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
   96         
   97         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
   98         pool->sp_name = name;
   99         pool->sp_state = SVCPOOL_INIT;
  100         pool->sp_proc = NULL;
  101         TAILQ_INIT(&pool->sp_callouts);
  102         TAILQ_INIT(&pool->sp_lcallouts);
  103         pool->sp_minthreads = 1;
  104         pool->sp_maxthreads = 1;
  105         pool->sp_groupcount = 1;
  106         for (g = 0; g < SVC_MAXGROUPS; g++) {
  107                 grp = &pool->sp_groups[g];
  108                 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
  109                 grp->sg_pool = pool;
  110                 grp->sg_state = SVCPOOL_ACTIVE;
  111                 TAILQ_INIT(&grp->sg_xlist);
  112                 TAILQ_INIT(&grp->sg_active);
  113                 LIST_INIT(&grp->sg_idlethreads);
  114                 grp->sg_minthreads = 1;
  115                 grp->sg_maxthreads = 1;
  116         }
  117 
  118         /*
  119          * Don't use more than a quarter of mbuf clusters.  Nota bene:
  120          * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
  121          * on LP64 architectures, so cast to u_long to avoid undefined
  122          * behavior.  (ILP32 architectures cannot have nmbclusters
  123          * large enough to overflow for other reasons.)
  124          */
  125         pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
  126         pool->sp_space_low = (pool->sp_space_high / 3) * 2;
  127 
  128         sysctl_ctx_init(&pool->sp_sysctl);
  129         if (sysctl_base) {
  130                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  131                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
  132                     pool, 0, svcpool_minthread_sysctl, "I",
  133                     "Minimal number of threads");
  134                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  135                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
  136                     pool, 0, svcpool_maxthread_sysctl, "I",
  137                     "Maximal number of threads");
  138                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  139                     "threads", CTLTYPE_INT | CTLFLAG_RD,
  140                     pool, 0, svcpool_threads_sysctl, "I",
  141                     "Current number of threads");
  142                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  143                     "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
  144                     "Number of thread groups");
  145 
  146                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  147                     "request_space_used", CTLFLAG_RD,
  148                     &pool->sp_space_used,
  149                     "Space in parsed but not handled requests.");
  150 
  151                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  152                     "request_space_used_highest", CTLFLAG_RD,
  153                     &pool->sp_space_used_highest,
  154                     "Highest space used since reboot.");
  155 
  156                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  157                     "request_space_high", CTLFLAG_RW,
  158                     &pool->sp_space_high,
  159                     "Maximum space in parsed but not handled requests.");
  160 
  161                 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  162                     "request_space_low", CTLFLAG_RW,
  163                     &pool->sp_space_low,
  164                     "Low water mark for request space.");
  165 
  166                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  167                     "request_space_throttled", CTLFLAG_RD,
  168                     &pool->sp_space_throttled, 0,
  169                     "Whether nfs requests are currently throttled");
  170 
  171                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
  172                     "request_space_throttle_count", CTLFLAG_RD,
  173                     &pool->sp_space_throttle_count, 0,
  174                     "Count of times throttling based on request space has occurred");
  175         }
  176 
  177         return pool;
  178 }
  179 
  180 /*
  181  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
  182  * the pool data structures.
  183  */
  184 static void
  185 svcpool_cleanup(SVCPOOL *pool)
  186 {
  187         SVCGROUP *grp;
  188         SVCXPRT *xprt, *nxprt;
  189         struct svc_callout *s;
  190         struct svc_loss_callout *sl;
  191         struct svcxprt_list cleanup;
  192         int g;
  193 
  194         TAILQ_INIT(&cleanup);
  195 
  196         for (g = 0; g < SVC_MAXGROUPS; g++) {
  197                 grp = &pool->sp_groups[g];
  198                 mtx_lock(&grp->sg_lock);
  199                 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
  200                         xprt_unregister_locked(xprt);
  201                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
  202                 }
  203                 mtx_unlock(&grp->sg_lock);
  204         }
  205         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
  206                 if (xprt->xp_socket != NULL)
  207                         soshutdown(xprt->xp_socket, SHUT_WR);
  208                 SVC_RELEASE(xprt);
  209         }
  210 
  211         mtx_lock(&pool->sp_lock);
  212         while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
  213                 mtx_unlock(&pool->sp_lock);
  214                 svc_unreg(pool, s->sc_prog, s->sc_vers);
  215                 mtx_lock(&pool->sp_lock);
  216         }
  217         while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
  218                 mtx_unlock(&pool->sp_lock);
  219                 svc_loss_unreg(pool, sl->slc_dispatch);
  220                 mtx_lock(&pool->sp_lock);
  221         }
  222         mtx_unlock(&pool->sp_lock);
  223 }
  224 
  225 void
  226 svcpool_destroy(SVCPOOL *pool)
  227 {
  228         SVCGROUP *grp;
  229         int g;
  230 
  231         svcpool_cleanup(pool);
  232 
  233         for (g = 0; g < SVC_MAXGROUPS; g++) {
  234                 grp = &pool->sp_groups[g];
  235                 mtx_destroy(&grp->sg_lock);
  236         }
  237         mtx_destroy(&pool->sp_lock);
  238 
  239         if (pool->sp_rcache)
  240                 replay_freecache(pool->sp_rcache);
  241 
  242         sysctl_ctx_free(&pool->sp_sysctl);
  243         free(pool, M_RPC);
  244 }
  245 
  246 /*
  247  * Similar to svcpool_destroy(), except that it does not destroy the actual
  248  * data structures.  As such, "pool" may be used again.
  249  */
  250 void
  251 svcpool_close(SVCPOOL *pool)
  252 {
  253         SVCGROUP *grp;
  254         int g;
  255 
  256         svcpool_cleanup(pool);
  257 
  258         /* Now, initialize the pool's state for a fresh svc_run() call. */
  259         mtx_lock(&pool->sp_lock);
  260         pool->sp_state = SVCPOOL_INIT;
  261         mtx_unlock(&pool->sp_lock);
  262         for (g = 0; g < SVC_MAXGROUPS; g++) {
  263                 grp = &pool->sp_groups[g];
  264                 mtx_lock(&grp->sg_lock);
  265                 grp->sg_state = SVCPOOL_ACTIVE;
  266                 mtx_unlock(&grp->sg_lock);
  267         }
  268 }
  269 
  270 /*
  271  * Sysctl handler to get the present thread count on a pool
  272  */
  273 static int
  274 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
  275 {
  276         SVCPOOL *pool;
  277         int threads, error, g;
  278 
  279         pool = oidp->oid_arg1;
  280         threads = 0;
  281         mtx_lock(&pool->sp_lock);
  282         for (g = 0; g < pool->sp_groupcount; g++)
  283                 threads += pool->sp_groups[g].sg_threadcount;
  284         mtx_unlock(&pool->sp_lock);
  285         error = sysctl_handle_int(oidp, &threads, 0, req);
  286         return (error);
  287 }
  288 
  289 /*
  290  * Sysctl handler to set the minimum thread count on a pool
  291  */
  292 static int
  293 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
  294 {
  295         SVCPOOL *pool;
  296         int newminthreads, error, g;
  297 
  298         pool = oidp->oid_arg1;
  299         newminthreads = pool->sp_minthreads;
  300         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
  301         if (error == 0 && newminthreads != pool->sp_minthreads) {
  302                 if (newminthreads > pool->sp_maxthreads)
  303                         return (EINVAL);
  304                 mtx_lock(&pool->sp_lock);
  305                 pool->sp_minthreads = newminthreads;
  306                 for (g = 0; g < pool->sp_groupcount; g++) {
  307                         pool->sp_groups[g].sg_minthreads = max(1,
  308                             pool->sp_minthreads / pool->sp_groupcount);
  309                 }
  310                 mtx_unlock(&pool->sp_lock);
  311         }
  312         return (error);
  313 }
  314 
  315 /*
  316  * Sysctl handler to set the maximum thread count on a pool
  317  */
  318 static int
  319 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
  320 {
  321         SVCPOOL *pool;
  322         int newmaxthreads, error, g;
  323 
  324         pool = oidp->oid_arg1;
  325         newmaxthreads = pool->sp_maxthreads;
  326         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
  327         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
  328                 if (newmaxthreads < pool->sp_minthreads)
  329                         return (EINVAL);
  330                 mtx_lock(&pool->sp_lock);
  331                 pool->sp_maxthreads = newmaxthreads;
  332                 for (g = 0; g < pool->sp_groupcount; g++) {
  333                         pool->sp_groups[g].sg_maxthreads = max(1,
  334                             pool->sp_maxthreads / pool->sp_groupcount);
  335                 }
  336                 mtx_unlock(&pool->sp_lock);
  337         }
  338         return (error);
  339 }
  340 
  341 /*
  342  * Activate a transport handle.
  343  */
  344 void
  345 xprt_register(SVCXPRT *xprt)
  346 {
  347         SVCPOOL *pool = xprt->xp_pool;
  348         SVCGROUP *grp;
  349         int g;
  350 
  351         SVC_ACQUIRE(xprt);
  352         g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
  353         xprt->xp_group = grp = &pool->sp_groups[g];
  354         mtx_lock(&grp->sg_lock);
  355         xprt->xp_registered = TRUE;
  356         xprt->xp_active = FALSE;
  357         TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
  358         mtx_unlock(&grp->sg_lock);
  359 }
  360 
  361 /*
  362  * De-activate a transport handle. Note: the locked version doesn't
  363  * release the transport - caller must do that after dropping the pool
  364  * lock.
  365  */
  366 static void
  367 xprt_unregister_locked(SVCXPRT *xprt)
  368 {
  369         SVCGROUP *grp = xprt->xp_group;
  370 
  371         mtx_assert(&grp->sg_lock, MA_OWNED);
  372         KASSERT(xprt->xp_registered == TRUE,
  373             ("xprt_unregister_locked: not registered"));
  374         xprt_inactive_locked(xprt);
  375         TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
  376         xprt->xp_registered = FALSE;
  377 }
  378 
  379 void
  380 xprt_unregister(SVCXPRT *xprt)
  381 {
  382         SVCGROUP *grp = xprt->xp_group;
  383 
  384         mtx_lock(&grp->sg_lock);
  385         if (xprt->xp_registered == FALSE) {
  386                 /* Already unregistered by another thread */
  387                 mtx_unlock(&grp->sg_lock);
  388                 return;
  389         }
  390         xprt_unregister_locked(xprt);
  391         mtx_unlock(&grp->sg_lock);
  392 
  393         if (xprt->xp_socket != NULL)
  394                 soshutdown(xprt->xp_socket, SHUT_WR);
  395         SVC_RELEASE(xprt);
  396 }
  397 
  398 /*
  399  * Attempt to assign a service thread to this transport.
  400  */
  401 static int
  402 xprt_assignthread(SVCXPRT *xprt)
  403 {
  404         SVCGROUP *grp = xprt->xp_group;
  405         SVCTHREAD *st;
  406 
  407         mtx_assert(&grp->sg_lock, MA_OWNED);
  408         st = LIST_FIRST(&grp->sg_idlethreads);
  409         if (st) {
  410                 LIST_REMOVE(st, st_ilink);
  411                 SVC_ACQUIRE(xprt);
  412                 xprt->xp_thread = st;
  413                 st->st_xprt = xprt;
  414                 cv_signal(&st->st_cond);
  415                 return (TRUE);
  416         } else {
  417                 /*
  418                  * See if we can create a new thread. The
  419                  * actual thread creation happens in
  420                  * svc_run_internal because our locking state
  421                  * is poorly defined (we are typically called
  422                  * from a socket upcall). Don't create more
  423                  * than one thread per second.
  424                  */
  425                 if (grp->sg_state == SVCPOOL_ACTIVE
  426                     && grp->sg_lastcreatetime < time_uptime
  427                     && grp->sg_threadcount < grp->sg_maxthreads) {
  428                         grp->sg_state = SVCPOOL_THREADWANTED;
  429                 }
  430         }
  431         return (FALSE);
  432 }
  433 
  434 void
  435 xprt_active(SVCXPRT *xprt)
  436 {
  437         SVCGROUP *grp = xprt->xp_group;
  438 
  439         mtx_lock(&grp->sg_lock);
  440 
  441         if (!xprt->xp_registered) {
  442                 /*
  443                  * Race with xprt_unregister - we lose.
  444                  */
  445                 mtx_unlock(&grp->sg_lock);
  446                 return;
  447         }
  448 
  449         if (!xprt->xp_active) {
  450                 xprt->xp_active = TRUE;
  451                 if (xprt->xp_thread == NULL) {
  452                         if (!svc_request_space_available(xprt->xp_pool) ||
  453                             !xprt_assignthread(xprt))
  454                                 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
  455                                     xp_alink);
  456                 }
  457         }
  458 
  459         mtx_unlock(&grp->sg_lock);
  460 }
  461 
  462 void
  463 xprt_inactive_locked(SVCXPRT *xprt)
  464 {
  465         SVCGROUP *grp = xprt->xp_group;
  466 
  467         mtx_assert(&grp->sg_lock, MA_OWNED);
  468         if (xprt->xp_active) {
  469                 if (xprt->xp_thread == NULL)
  470                         TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
  471                 xprt->xp_active = FALSE;
  472         }
  473 }
  474 
  475 void
  476 xprt_inactive(SVCXPRT *xprt)
  477 {
  478         SVCGROUP *grp = xprt->xp_group;
  479 
  480         mtx_lock(&grp->sg_lock);
  481         xprt_inactive_locked(xprt);
  482         mtx_unlock(&grp->sg_lock);
  483 }
  484 
  485 /*
  486  * Variant of xprt_inactive() for use only when sure that port is
  487  * assigned to thread. For example, within receive handlers.
  488  */
  489 void
  490 xprt_inactive_self(SVCXPRT *xprt)
  491 {
  492 
  493         KASSERT(xprt->xp_thread != NULL,
  494             ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
  495         xprt->xp_active = FALSE;
  496 }
  497 
  498 /*
  499  * Add a service program to the callout list.
  500  * The dispatch routine will be called when a rpc request for this
  501  * program number comes in.
  502  */
  503 bool_t
  504 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
  505     void (*dispatch)(struct svc_req *, SVCXPRT *),
  506     const struct netconfig *nconf)
  507 {
  508         SVCPOOL *pool = xprt->xp_pool;
  509         struct svc_callout *s;
  510         char *netid = NULL;
  511         int flag = 0;
  512 
  513 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
  514 
  515         if (xprt->xp_netid) {
  516                 netid = strdup(xprt->xp_netid, M_RPC);
  517                 flag = 1;
  518         } else if (nconf && nconf->nc_netid) {
  519                 netid = strdup(nconf->nc_netid, M_RPC);
  520                 flag = 1;
  521         } /* must have been created with svc_raw_create */
  522         if ((netid == NULL) && (flag == 1)) {
  523                 return (FALSE);
  524         }
  525 
  526         mtx_lock(&pool->sp_lock);
  527         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
  528                 if (netid)
  529                         free(netid, M_RPC);
  530                 if (s->sc_dispatch == dispatch)
  531                         goto rpcb_it; /* he is registering another xptr */
  532                 mtx_unlock(&pool->sp_lock);
  533                 return (FALSE);
  534         }
  535         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
  536         if (s == NULL) {
  537                 if (netid)
  538                         free(netid, M_RPC);
  539                 mtx_unlock(&pool->sp_lock);
  540                 return (FALSE);
  541         }
  542 
  543         s->sc_prog = prog;
  544         s->sc_vers = vers;
  545         s->sc_dispatch = dispatch;
  546         s->sc_netid = netid;
  547         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
  548 
  549         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
  550                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
  551 
  552 rpcb_it:
  553         mtx_unlock(&pool->sp_lock);
  554         /* now register the information with the local binder service */
  555         if (nconf) {
  556                 bool_t dummy;
  557                 struct netconfig tnc;
  558                 struct netbuf nb;
  559                 tnc = *nconf;
  560                 nb.buf = &xprt->xp_ltaddr;
  561                 nb.len = xprt->xp_ltaddr.ss_len;
  562                 dummy = rpcb_set(prog, vers, &tnc, &nb);
  563                 return (dummy);
  564         }
  565         return (TRUE);
  566 }
  567 
  568 /*
  569  * Remove a service program from the callout list.
  570  */
  571 void
  572 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
  573 {
  574         struct svc_callout *s;
  575 
  576         /* unregister the information anyway */
  577         (void) rpcb_unset(prog, vers, NULL);
  578         mtx_lock(&pool->sp_lock);
  579         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
  580                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
  581                 if (s->sc_netid)
  582                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
  583                 mem_free(s, sizeof (struct svc_callout));
  584         }
  585         mtx_unlock(&pool->sp_lock);
  586 }
  587 
  588 /*
  589  * Add a service connection loss program to the callout list.
  590  * The dispatch routine will be called when some port in ths pool die.
  591  */
  592 bool_t
  593 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
  594 {
  595         SVCPOOL *pool = xprt->xp_pool;
  596         struct svc_loss_callout *s;
  597 
  598         mtx_lock(&pool->sp_lock);
  599         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
  600                 if (s->slc_dispatch == dispatch)
  601                         break;
  602         }
  603         if (s != NULL) {
  604                 mtx_unlock(&pool->sp_lock);
  605                 return (TRUE);
  606         }
  607         s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
  608         if (s == NULL) {
  609                 mtx_unlock(&pool->sp_lock);
  610                 return (FALSE);
  611         }
  612         s->slc_dispatch = dispatch;
  613         TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
  614         mtx_unlock(&pool->sp_lock);
  615         return (TRUE);
  616 }
  617 
  618 /*
  619  * Remove a service connection loss program from the callout list.
  620  */
  621 void
  622 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
  623 {
  624         struct svc_loss_callout *s;
  625 
  626         mtx_lock(&pool->sp_lock);
  627         TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
  628                 if (s->slc_dispatch == dispatch) {
  629                         TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
  630                         free(s, M_RPC);
  631                         break;
  632                 }
  633         }
  634         mtx_unlock(&pool->sp_lock);
  635 }
  636 
  637 /* ********************** CALLOUT list related stuff ************* */
  638 
  639 /*
  640  * Search the callout list for a program number, return the callout
  641  * struct.
  642  */
  643 static struct svc_callout *
  644 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
  645 {
  646         struct svc_callout *s;
  647 
  648         mtx_assert(&pool->sp_lock, MA_OWNED);
  649         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
  650                 if (s->sc_prog == prog && s->sc_vers == vers
  651                     && (netid == NULL || s->sc_netid == NULL ||
  652                         strcmp(netid, s->sc_netid) == 0))
  653                         break;
  654         }
  655 
  656         return (s);
  657 }
  658 
  659 /* ******************* REPLY GENERATION ROUTINES  ************ */
  660 
  661 static bool_t
  662 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
  663     struct mbuf *body)
  664 {
  665         SVCXPRT *xprt = rqstp->rq_xprt;
  666         bool_t ok;
  667 
  668         if (rqstp->rq_args) {
  669                 m_freem(rqstp->rq_args);
  670                 rqstp->rq_args = NULL;
  671         }
  672 
  673         if (xprt->xp_pool->sp_rcache)
  674                 replay_setreply(xprt->xp_pool->sp_rcache,
  675                     rply, svc_getrpccaller(rqstp), body);
  676 
  677         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
  678                 return (FALSE);
  679 
  680         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
  681         if (rqstp->rq_addr) {
  682                 free(rqstp->rq_addr, M_SONAME);
  683                 rqstp->rq_addr = NULL;
  684         }
  685 
  686         return (ok);
  687 }
  688 
  689 /*
  690  * Send a reply to an rpc request
  691  */
  692 bool_t
  693 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
  694 {
  695         struct rpc_msg rply; 
  696         struct mbuf *m;
  697         XDR xdrs;
  698         bool_t ok;
  699 
  700         rply.rm_xid = rqstp->rq_xid;
  701         rply.rm_direction = REPLY;  
  702         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
  703         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
  704         rply.acpted_rply.ar_stat = SUCCESS;
  705         rply.acpted_rply.ar_results.where = NULL;
  706         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
  707 
  708         m = m_getcl(M_WAITOK, MT_DATA, 0);
  709         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
  710         ok = xdr_results(&xdrs, xdr_location);
  711         XDR_DESTROY(&xdrs);
  712 
  713         if (ok) {
  714                 return (svc_sendreply_common(rqstp, &rply, m));
  715         } else {
  716                 m_freem(m);
  717                 return (FALSE);
  718         }
  719 }
  720 
  721 bool_t
  722 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
  723 {
  724         struct rpc_msg rply; 
  725 
  726         rply.rm_xid = rqstp->rq_xid;
  727         rply.rm_direction = REPLY;  
  728         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
  729         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
  730         rply.acpted_rply.ar_stat = SUCCESS;
  731         rply.acpted_rply.ar_results.where = NULL;
  732         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
  733 
  734         return (svc_sendreply_common(rqstp, &rply, m));
  735 }
  736 
  737 /*
  738  * No procedure error reply
  739  */
  740 void
  741 svcerr_noproc(struct svc_req *rqstp)
  742 {
  743         SVCXPRT *xprt = rqstp->rq_xprt;
  744         struct rpc_msg rply;
  745 
  746         rply.rm_xid = rqstp->rq_xid;
  747         rply.rm_direction = REPLY;
  748         rply.rm_reply.rp_stat = MSG_ACCEPTED;
  749         rply.acpted_rply.ar_verf = rqstp->rq_verf;
  750         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
  751 
  752         if (xprt->xp_pool->sp_rcache)
  753                 replay_setreply(xprt->xp_pool->sp_rcache,
  754                     &rply, svc_getrpccaller(rqstp), NULL);
  755 
  756         svc_sendreply_common(rqstp, &rply, NULL);
  757 }
  758 
  759 /*
  760  * Can't decode args error reply
  761  */
  762 void
  763 svcerr_decode(struct svc_req *rqstp)
  764 {
  765         SVCXPRT *xprt = rqstp->rq_xprt;
  766         struct rpc_msg rply; 
  767 
  768         rply.rm_xid = rqstp->rq_xid;
  769         rply.rm_direction = REPLY; 
  770         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
  771         rply.acpted_rply.ar_verf = rqstp->rq_verf;
  772         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
  773 
  774         if (xprt->xp_pool->sp_rcache)
  775                 replay_setreply(xprt->xp_pool->sp_rcache,
  776                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
  777 
  778         svc_sendreply_common(rqstp, &rply, NULL);
  779 }
  780 
  781 /*
  782  * Some system error
  783  */
  784 void
  785 svcerr_systemerr(struct svc_req *rqstp)
  786 {
  787         SVCXPRT *xprt = rqstp->rq_xprt;
  788         struct rpc_msg rply; 
  789 
  790         rply.rm_xid = rqstp->rq_xid;
  791         rply.rm_direction = REPLY; 
  792         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
  793         rply.acpted_rply.ar_verf = rqstp->rq_verf;
  794         rply.acpted_rply.ar_stat = SYSTEM_ERR;
  795 
  796         if (xprt->xp_pool->sp_rcache)
  797                 replay_setreply(xprt->xp_pool->sp_rcache,
  798                     &rply, svc_getrpccaller(rqstp), NULL);
  799 
  800         svc_sendreply_common(rqstp, &rply, NULL);
  801 }
  802 
  803 /*
  804  * Authentication error reply
  805  */
  806 void
  807 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
  808 {
  809         SVCXPRT *xprt = rqstp->rq_xprt;
  810         struct rpc_msg rply;
  811 
  812         rply.rm_xid = rqstp->rq_xid;
  813         rply.rm_direction = REPLY;
  814         rply.rm_reply.rp_stat = MSG_DENIED;
  815         rply.rjcted_rply.rj_stat = AUTH_ERROR;
  816         rply.rjcted_rply.rj_why = why;
  817 
  818         if (xprt->xp_pool->sp_rcache)
  819                 replay_setreply(xprt->xp_pool->sp_rcache,
  820                     &rply, svc_getrpccaller(rqstp), NULL);
  821 
  822         svc_sendreply_common(rqstp, &rply, NULL);
  823 }
  824 
  825 /*
  826  * Auth too weak error reply
  827  */
  828 void
  829 svcerr_weakauth(struct svc_req *rqstp)
  830 {
  831 
  832         svcerr_auth(rqstp, AUTH_TOOWEAK);
  833 }
  834 
  835 /*
  836  * Program unavailable error reply
  837  */
  838 void 
  839 svcerr_noprog(struct svc_req *rqstp)
  840 {
  841         SVCXPRT *xprt = rqstp->rq_xprt;
  842         struct rpc_msg rply;  
  843 
  844         rply.rm_xid = rqstp->rq_xid;
  845         rply.rm_direction = REPLY;   
  846         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
  847         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
  848         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
  849 
  850         if (xprt->xp_pool->sp_rcache)
  851                 replay_setreply(xprt->xp_pool->sp_rcache,
  852                     &rply, svc_getrpccaller(rqstp), NULL);
  853 
  854         svc_sendreply_common(rqstp, &rply, NULL);
  855 }
  856 
  857 /*
  858  * Program version mismatch error reply
  859  */
  860 void  
  861 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
  862 {
  863         SVCXPRT *xprt = rqstp->rq_xprt;
  864         struct rpc_msg rply;
  865 
  866         rply.rm_xid = rqstp->rq_xid;
  867         rply.rm_direction = REPLY;
  868         rply.rm_reply.rp_stat = MSG_ACCEPTED;
  869         rply.acpted_rply.ar_verf = rqstp->rq_verf;
  870         rply.acpted_rply.ar_stat = PROG_MISMATCH;
  871         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
  872         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
  873 
  874         if (xprt->xp_pool->sp_rcache)
  875                 replay_setreply(xprt->xp_pool->sp_rcache,
  876                     &rply, svc_getrpccaller(rqstp), NULL);
  877 
  878         svc_sendreply_common(rqstp, &rply, NULL);
  879 }
  880 
  881 /*
  882  * Allocate a new server transport structure. All fields are
  883  * initialized to zero and xp_p3 is initialized to point at an
  884  * extension structure to hold various flags and authentication
  885  * parameters.
  886  */
  887 SVCXPRT *
  888 svc_xprt_alloc(void)
  889 {
  890         SVCXPRT *xprt;
  891         SVCXPRT_EXT *ext;
  892 
  893         xprt = mem_alloc(sizeof(SVCXPRT));
  894         ext = mem_alloc(sizeof(SVCXPRT_EXT));
  895         xprt->xp_p3 = ext;
  896         refcount_init(&xprt->xp_refs, 1);
  897 
  898         return (xprt);
  899 }
  900 
  901 /*
  902  * Free a server transport structure.
  903  */
  904 void
  905 svc_xprt_free(SVCXPRT *xprt)
  906 {
  907 
  908         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
  909         mem_free(xprt, sizeof(SVCXPRT));
  910 }
  911 
  912 /* ******************* SERVER INPUT STUFF ******************* */
  913 
  914 /*
  915  * Read RPC requests from a transport and queue them to be
  916  * executed. We handle authentication and replay cache replies here.
  917  * Actually dispatching the RPC is deferred till svc_executereq.
  918  */
  919 static enum xprt_stat
  920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
  921 {
  922         SVCPOOL *pool = xprt->xp_pool;
  923         struct svc_req *r;
  924         struct rpc_msg msg;
  925         struct mbuf *args;
  926         struct svc_loss_callout *s;
  927         enum xprt_stat stat;
  928 
  929         /* now receive msgs from xprtprt (support batch calls) */
  930         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
  931 
  932         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
  933         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
  934         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
  935         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
  936                 enum auth_stat why;
  937 
  938                 /*
  939                  * Handle replays and authenticate before queuing the
  940                  * request to be executed.
  941                  */
  942                 SVC_ACQUIRE(xprt);
  943                 r->rq_xprt = xprt;
  944                 if (pool->sp_rcache) {
  945                         struct rpc_msg repmsg;
  946                         struct mbuf *repbody;
  947                         enum replay_state rs;
  948                         rs = replay_find(pool->sp_rcache, &msg,
  949                             svc_getrpccaller(r), &repmsg, &repbody);
  950                         switch (rs) {
  951                         case RS_NEW:
  952                                 break;
  953                         case RS_DONE:
  954                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
  955                                     repbody, &r->rq_reply_seq);
  956                                 if (r->rq_addr) {
  957                                         free(r->rq_addr, M_SONAME);
  958                                         r->rq_addr = NULL;
  959                                 }
  960                                 m_freem(args);
  961                                 goto call_done;
  962 
  963                         default:
  964                                 m_freem(args);
  965                                 goto call_done;
  966                         }
  967                 }
  968 
  969                 r->rq_xid = msg.rm_xid;
  970                 r->rq_prog = msg.rm_call.cb_prog;
  971                 r->rq_vers = msg.rm_call.cb_vers;
  972                 r->rq_proc = msg.rm_call.cb_proc;
  973                 r->rq_size = sizeof(*r) + m_length(args, NULL);
  974                 r->rq_args = args;
  975                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
  976                         /*
  977                          * RPCSEC_GSS uses this return code
  978                          * for requests that form part of its
  979                          * context establishment protocol and
  980                          * should not be dispatched to the
  981                          * application.
  982                          */
  983                         if (why != RPCSEC_GSS_NODISPATCH)
  984                                 svcerr_auth(r, why);
  985                         goto call_done;
  986                 }
  987 
  988                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
  989                         svcerr_decode(r);
  990                         goto call_done;
  991                 }
  992 
  993                 /*
  994                  * Everything checks out, return request to caller.
  995                  */
  996                 *rqstp_ret = r;
  997                 r = NULL;
  998         }
  999 call_done:
 1000         if (r) {
 1001                 svc_freereq(r);
 1002                 r = NULL;
 1003         }
 1004         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
 1005                 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
 1006                         (*s->slc_dispatch)(xprt);
 1007                 xprt_unregister(xprt);
 1008         }
 1009 
 1010         return (stat);
 1011 }
 1012 
 1013 static void
 1014 svc_executereq(struct svc_req *rqstp)
 1015 {
 1016         SVCXPRT *xprt = rqstp->rq_xprt;
 1017         SVCPOOL *pool = xprt->xp_pool;
 1018         int prog_found;
 1019         rpcvers_t low_vers;
 1020         rpcvers_t high_vers;
 1021         struct svc_callout *s;
 1022 
 1023         /* now match message with a registered service*/
 1024         prog_found = FALSE;
 1025         low_vers = (rpcvers_t) -1L;
 1026         high_vers = (rpcvers_t) 0L;
 1027         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
 1028                 if (s->sc_prog == rqstp->rq_prog) {
 1029                         if (s->sc_vers == rqstp->rq_vers) {
 1030                                 /*
 1031                                  * We hand ownership of r to the
 1032                                  * dispatch method - they must call
 1033                                  * svc_freereq.
 1034                                  */
 1035                                 (*s->sc_dispatch)(rqstp, xprt);
 1036                                 return;
 1037                         }  /* found correct version */
 1038                         prog_found = TRUE;
 1039                         if (s->sc_vers < low_vers)
 1040                                 low_vers = s->sc_vers;
 1041                         if (s->sc_vers > high_vers)
 1042                                 high_vers = s->sc_vers;
 1043                 }   /* found correct program */
 1044         }
 1045 
 1046         /*
 1047          * if we got here, the program or version
 1048          * is not served ...
 1049          */
 1050         if (prog_found)
 1051                 svcerr_progvers(rqstp, low_vers, high_vers);
 1052         else
 1053                 svcerr_noprog(rqstp);
 1054 
 1055         svc_freereq(rqstp);
 1056 }
 1057 
 1058 static void
 1059 svc_checkidle(SVCGROUP *grp)
 1060 {
 1061         SVCXPRT *xprt, *nxprt;
 1062         time_t timo;
 1063         struct svcxprt_list cleanup;
 1064 
 1065         TAILQ_INIT(&cleanup);
 1066         TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
 1067                 /*
 1068                  * Only some transports have idle timers. Don't time
 1069                  * something out which is just waking up.
 1070                  */
 1071                 if (!xprt->xp_idletimeout || xprt->xp_thread)
 1072                         continue;
 1073 
 1074                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
 1075                 if (time_uptime > timo) {
 1076                         xprt_unregister_locked(xprt);
 1077                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
 1078                 }
 1079         }
 1080 
 1081         mtx_unlock(&grp->sg_lock);
 1082         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
 1083                 soshutdown(xprt->xp_socket, SHUT_WR);
 1084                 SVC_RELEASE(xprt);
 1085         }
 1086         mtx_lock(&grp->sg_lock);
 1087 }
 1088 
 1089 static void
 1090 svc_assign_waiting_sockets(SVCPOOL *pool)
 1091 {
 1092         SVCGROUP *grp;
 1093         SVCXPRT *xprt;
 1094         int g;
 1095 
 1096         for (g = 0; g < pool->sp_groupcount; g++) {
 1097                 grp = &pool->sp_groups[g];
 1098                 mtx_lock(&grp->sg_lock);
 1099                 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
 1100                         if (xprt_assignthread(xprt))
 1101                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
 1102                         else
 1103                                 break;
 1104                 }
 1105                 mtx_unlock(&grp->sg_lock);
 1106         }
 1107 }
 1108 
 1109 static void
 1110 svc_change_space_used(SVCPOOL *pool, long delta)
 1111 {
 1112         unsigned long value;
 1113 
 1114         value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
 1115         if (delta > 0) {
 1116                 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
 1117                         pool->sp_space_throttled = TRUE;
 1118                         pool->sp_space_throttle_count++;
 1119                 }
 1120                 if (value > pool->sp_space_used_highest)
 1121                         pool->sp_space_used_highest = value;
 1122         } else {
 1123                 if (value < pool->sp_space_low && pool->sp_space_throttled) {
 1124                         pool->sp_space_throttled = FALSE;
 1125                         svc_assign_waiting_sockets(pool);
 1126                 }
 1127         }
 1128 }
 1129 
 1130 static bool_t
 1131 svc_request_space_available(SVCPOOL *pool)
 1132 {
 1133 
 1134         if (pool->sp_space_throttled)
 1135                 return (FALSE);
 1136         return (TRUE);
 1137 }
 1138 
 1139 static void
 1140 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
 1141 {
 1142         SVCPOOL *pool = grp->sg_pool;
 1143         SVCTHREAD *st, *stpref;
 1144         SVCXPRT *xprt;
 1145         enum xprt_stat stat;
 1146         struct svc_req *rqstp;
 1147         struct proc *p;
 1148         long sz;
 1149         int error;
 1150 
 1151         st = mem_alloc(sizeof(*st));
 1152         mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
 1153         st->st_pool = pool;
 1154         st->st_xprt = NULL;
 1155         STAILQ_INIT(&st->st_reqs);
 1156         cv_init(&st->st_cond, "rpcsvc");
 1157 
 1158         mtx_lock(&grp->sg_lock);
 1159 
 1160         /*
 1161          * If we are a new thread which was spawned to cope with
 1162          * increased load, set the state back to SVCPOOL_ACTIVE.
 1163          */
 1164         if (grp->sg_state == SVCPOOL_THREADSTARTING)
 1165                 grp->sg_state = SVCPOOL_ACTIVE;
 1166 
 1167         while (grp->sg_state != SVCPOOL_CLOSING) {
 1168                 /*
 1169                  * Create new thread if requested.
 1170                  */
 1171                 if (grp->sg_state == SVCPOOL_THREADWANTED) {
 1172                         grp->sg_state = SVCPOOL_THREADSTARTING;
 1173                         grp->sg_lastcreatetime = time_uptime;
 1174                         mtx_unlock(&grp->sg_lock);
 1175                         svc_new_thread(grp);
 1176                         mtx_lock(&grp->sg_lock);
 1177                         continue;
 1178                 }
 1179 
 1180                 /*
 1181                  * Check for idle transports once per second.
 1182                  */
 1183                 if (time_uptime > grp->sg_lastidlecheck) {
 1184                         grp->sg_lastidlecheck = time_uptime;
 1185                         svc_checkidle(grp);
 1186                 }
 1187 
 1188                 xprt = st->st_xprt;
 1189                 if (!xprt) {
 1190                         /*
 1191                          * Enforce maxthreads count.
 1192                          */
 1193                         if (!ismaster && grp->sg_threadcount >
 1194                             grp->sg_maxthreads)
 1195                                 break;
 1196 
 1197                         /*
 1198                          * Before sleeping, see if we can find an
 1199                          * active transport which isn't being serviced
 1200                          * by a thread.
 1201                          */
 1202                         if (svc_request_space_available(pool) &&
 1203                             (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
 1204                                 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
 1205                                 SVC_ACQUIRE(xprt);
 1206                                 xprt->xp_thread = st;
 1207                                 st->st_xprt = xprt;
 1208                                 continue;
 1209                         }
 1210 
 1211                         LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
 1212                         if (ismaster || (!ismaster &&
 1213                             grp->sg_threadcount > grp->sg_minthreads))
 1214                                 error = cv_timedwait_sig(&st->st_cond,
 1215                                     &grp->sg_lock, 5 * hz);
 1216                         else
 1217                                 error = cv_wait_sig(&st->st_cond,
 1218                                     &grp->sg_lock);
 1219                         if (st->st_xprt == NULL)
 1220                                 LIST_REMOVE(st, st_ilink);
 1221 
 1222                         /*
 1223                          * Reduce worker thread count when idle.
 1224                          */
 1225                         if (error == EWOULDBLOCK) {
 1226                                 if (!ismaster
 1227                                     && (grp->sg_threadcount
 1228                                         > grp->sg_minthreads)
 1229                                         && !st->st_xprt)
 1230                                         break;
 1231                         } else if (error != 0) {
 1232                                 KASSERT(error == EINTR || error == ERESTART,
 1233                                     ("non-signal error %d", error));
 1234                                 mtx_unlock(&grp->sg_lock);
 1235                                 p = curproc;
 1236                                 PROC_LOCK(p);
 1237                                 if (P_SHOULDSTOP(p) ||
 1238                                     (p->p_flag & P_TOTAL_STOP) != 0) {
 1239                                         thread_suspend_check(0);
 1240                                         PROC_UNLOCK(p);
 1241                                         mtx_lock(&grp->sg_lock);
 1242                                 } else {
 1243                                         PROC_UNLOCK(p);
 1244                                         svc_exit(pool);
 1245                                         mtx_lock(&grp->sg_lock);
 1246                                         break;
 1247                                 }
 1248                         }
 1249                         continue;
 1250                 }
 1251                 mtx_unlock(&grp->sg_lock);
 1252 
 1253                 /*
 1254                  * Drain the transport socket and queue up any RPCs.
 1255                  */
 1256                 xprt->xp_lastactive = time_uptime;
 1257                 do {
 1258                         if (!svc_request_space_available(pool))
 1259                                 break;
 1260                         rqstp = NULL;
 1261                         stat = svc_getreq(xprt, &rqstp);
 1262                         if (rqstp) {
 1263                                 svc_change_space_used(pool, rqstp->rq_size);
 1264                                 /*
 1265                                  * See if the application has a preference
 1266                                  * for some other thread.
 1267                                  */
 1268                                 if (pool->sp_assign) {
 1269                                         stpref = pool->sp_assign(st, rqstp);
 1270                                         rqstp->rq_thread = stpref;
 1271                                         STAILQ_INSERT_TAIL(&stpref->st_reqs,
 1272                                             rqstp, rq_link);
 1273                                         mtx_unlock(&stpref->st_lock);
 1274                                         if (stpref != st)
 1275                                                 rqstp = NULL;
 1276                                 } else {
 1277                                         rqstp->rq_thread = st;
 1278                                         STAILQ_INSERT_TAIL(&st->st_reqs,
 1279                                             rqstp, rq_link);
 1280                                 }
 1281                         }
 1282                 } while (rqstp == NULL && stat == XPRT_MOREREQS
 1283                     && grp->sg_state != SVCPOOL_CLOSING);
 1284 
 1285                 /*
 1286                  * Move this transport to the end of the active list to
 1287                  * ensure fairness when multiple transports are active.
 1288                  * If this was the last queued request, svc_getreq will end
 1289                  * up calling xprt_inactive to remove from the active list.
 1290                  */
 1291                 mtx_lock(&grp->sg_lock);
 1292                 xprt->xp_thread = NULL;
 1293                 st->st_xprt = NULL;
 1294                 if (xprt->xp_active) {
 1295                         if (!svc_request_space_available(pool) ||
 1296                             !xprt_assignthread(xprt))
 1297                                 TAILQ_INSERT_TAIL(&grp->sg_active,
 1298                                     xprt, xp_alink);
 1299                 }
 1300                 mtx_unlock(&grp->sg_lock);
 1301                 SVC_RELEASE(xprt);
 1302 
 1303                 /*
 1304                  * Execute what we have queued.
 1305                  */
 1306                 mtx_lock(&st->st_lock);
 1307                 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
 1308                         STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
 1309                         mtx_unlock(&st->st_lock);
 1310                         sz = (long)rqstp->rq_size;
 1311                         svc_executereq(rqstp);
 1312                         svc_change_space_used(pool, -sz);
 1313                         mtx_lock(&st->st_lock);
 1314                 }
 1315                 mtx_unlock(&st->st_lock);
 1316                 mtx_lock(&grp->sg_lock);
 1317         }
 1318 
 1319         if (st->st_xprt) {
 1320                 xprt = st->st_xprt;
 1321                 st->st_xprt = NULL;
 1322                 SVC_RELEASE(xprt);
 1323         }
 1324         KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
 1325         mtx_destroy(&st->st_lock);
 1326         cv_destroy(&st->st_cond);
 1327         mem_free(st, sizeof(*st));
 1328 
 1329         grp->sg_threadcount--;
 1330         if (!ismaster)
 1331                 wakeup(grp);
 1332         mtx_unlock(&grp->sg_lock);
 1333 }
 1334 
 1335 static void
 1336 svc_thread_start(void *arg)
 1337 {
 1338 
 1339         svc_run_internal((SVCGROUP *) arg, FALSE);
 1340         kthread_exit();
 1341 }
 1342 
 1343 static void
 1344 svc_new_thread(SVCGROUP *grp)
 1345 {
 1346         SVCPOOL *pool = grp->sg_pool;
 1347         struct thread *td;
 1348 
 1349         mtx_lock(&grp->sg_lock);
 1350         grp->sg_threadcount++;
 1351         mtx_unlock(&grp->sg_lock);
 1352         kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
 1353             "%s: service", pool->sp_name);
 1354 }
 1355 
 1356 void
 1357 svc_run(SVCPOOL *pool)
 1358 {
 1359         int g, i;
 1360         struct proc *p;
 1361         struct thread *td;
 1362         SVCGROUP *grp;
 1363 
 1364         p = curproc;
 1365         td = curthread;
 1366         snprintf(td->td_name, sizeof(td->td_name),
 1367             "%s: master", pool->sp_name);
 1368         pool->sp_state = SVCPOOL_ACTIVE;
 1369         pool->sp_proc = p;
 1370 
 1371         /* Choose group count based on number of threads and CPUs. */
 1372         pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
 1373             min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
 1374         for (g = 0; g < pool->sp_groupcount; g++) {
 1375                 grp = &pool->sp_groups[g];
 1376                 grp->sg_minthreads = max(1,
 1377                     pool->sp_minthreads / pool->sp_groupcount);
 1378                 grp->sg_maxthreads = max(1,
 1379                     pool->sp_maxthreads / pool->sp_groupcount);
 1380                 grp->sg_lastcreatetime = time_uptime;
 1381         }
 1382 
 1383         /* Starting threads */
 1384         pool->sp_groups[0].sg_threadcount++;
 1385         for (g = 0; g < pool->sp_groupcount; g++) {
 1386                 grp = &pool->sp_groups[g];
 1387                 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
 1388                         svc_new_thread(grp);
 1389         }
 1390         svc_run_internal(&pool->sp_groups[0], TRUE);
 1391 
 1392         /* Waiting for threads to stop. */
 1393         for (g = 0; g < pool->sp_groupcount; g++) {
 1394                 grp = &pool->sp_groups[g];
 1395                 mtx_lock(&grp->sg_lock);
 1396                 while (grp->sg_threadcount > 0)
 1397                         msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
 1398                 mtx_unlock(&grp->sg_lock);
 1399         }
 1400 }
 1401 
 1402 void
 1403 svc_exit(SVCPOOL *pool)
 1404 {
 1405         SVCGROUP *grp;
 1406         SVCTHREAD *st;
 1407         int g;
 1408 
 1409         pool->sp_state = SVCPOOL_CLOSING;
 1410         for (g = 0; g < pool->sp_groupcount; g++) {
 1411                 grp = &pool->sp_groups[g];
 1412                 mtx_lock(&grp->sg_lock);
 1413                 if (grp->sg_state != SVCPOOL_CLOSING) {
 1414                         grp->sg_state = SVCPOOL_CLOSING;
 1415                         LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
 1416                                 cv_signal(&st->st_cond);
 1417                 }
 1418                 mtx_unlock(&grp->sg_lock);
 1419         }
 1420 }
 1421 
 1422 bool_t
 1423 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
 1424 {
 1425         struct mbuf *m;
 1426         XDR xdrs;
 1427         bool_t stat;
 1428 
 1429         m = rqstp->rq_args;
 1430         rqstp->rq_args = NULL;
 1431 
 1432         xdrmbuf_create(&xdrs, m, XDR_DECODE);
 1433         stat = xargs(&xdrs, args);
 1434         XDR_DESTROY(&xdrs);
 1435 
 1436         return (stat);
 1437 }
 1438 
 1439 bool_t
 1440 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
 1441 {
 1442         XDR xdrs;
 1443 
 1444         if (rqstp->rq_addr) {
 1445                 free(rqstp->rq_addr, M_SONAME);
 1446                 rqstp->rq_addr = NULL;
 1447         }
 1448 
 1449         xdrs.x_op = XDR_FREE;
 1450         return (xargs(&xdrs, args));
 1451 }
 1452 
 1453 void
 1454 svc_freereq(struct svc_req *rqstp)
 1455 {
 1456         SVCTHREAD *st;
 1457         SVCPOOL *pool;
 1458 
 1459         st = rqstp->rq_thread;
 1460         if (st) {
 1461                 pool = st->st_pool;
 1462                 if (pool->sp_done)
 1463                         pool->sp_done(st, rqstp);
 1464         }
 1465 
 1466         if (rqstp->rq_auth.svc_ah_ops)
 1467                 SVCAUTH_RELEASE(&rqstp->rq_auth);
 1468 
 1469         if (rqstp->rq_xprt) {
 1470                 SVC_RELEASE(rqstp->rq_xprt);
 1471         }
 1472 
 1473         if (rqstp->rq_addr)
 1474                 free(rqstp->rq_addr, M_SONAME);
 1475 
 1476         if (rqstp->rq_args)
 1477                 m_freem(rqstp->rq_args);
 1478 
 1479         free(rqstp, M_RPC);
 1480 }

Cache object: 8879d27ade3253cc051ac9666ea42a7e


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.