[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c

Version: -  FREEBSD  -  FREEBSD7  -  FREEBSD70  -  FREEBSD6  -  FREEBSD63  -  FREEBSD62  -  FREEBSD61  -  FREEBSD60  -  FREEBSD5  -  FREEBSD55  -  FREEBSD54  -  FREEBSD53  -  FREEBSD52  -  FREEBSD51  -  FREEBSD50  -  FREEBSD4  -  FREEBSD3  -  FREEBSD22  -  linux-2.6  -  linux-2.4.22  -  MK83  -  MK84  -  PLAN9  -  DFBSD  -  NETBSD  -  NETBSD4  -  NETBSD3  -  NETBSD20  -  OPENBSD  -  xnu-517  -  xnu-792  -  xnu-792.6.70  -  xnu-1228  -  OPENSOLARIS  -  minix-3-1-1  -  TRUSTEDBSD-SEBSD  -  FREEBSD-LIBC  -  FREEBSD7-LIBC  -  FREEBSD6-LIBC  -  GLIBC27 
SearchContext: -  none  -  excerpts  -  bigexcerpts 

  1 /*      $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $        */
  2 
  3 /*
  4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
  5  * unrestricted use provided that this legend is included on all tape
  6  * media and as a part of the software program in whole or part.  Users
  7  * may copy or modify Sun RPC without charge, but are not authorized
  8  * to license or distribute it to anyone else except as part of a product or
  9  * program developed by the user.
 10  *
 11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
 12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
 13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
 14  *
 15  * Sun RPC is provided with no support and without any obligation on the
 16  * part of Sun Microsystems, Inc. to assist in its use, correction,
 17  * modification or enhancement.
 18  *
 19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
 20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
 21  * OR ANY PART THEREOF.
 22  *
 23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
 24  * or profits or other special, indirect and consequential damages, even if
 25  * Sun has been advised of the possibility of such damages.
 26  *
 27  * Sun Microsystems, Inc.
 28  * 2550 Garcia Avenue
 29  * Mountain View, California  94043
 30  */
 31 
 32 #if defined(LIBC_SCCS) && !defined(lint)
 33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
 34 static char *sccsid = "@(#)svc.c        2.4 88/08/11 4.0 RPCSRC";
 35 #endif
 36 #include <sys/cdefs.h>
 37 __FBSDID("$FreeBSD: src/sys/rpc/svc.c,v 1.3 2008/11/03 10:38:00 dfr Exp $");
 38 
 39 /*
 40  * svc.c, Server-side remote procedure call interface.
 41  *
 42  * There are two sets of procedures here.  The xprt routines are
 43  * for handling transport handles.  The svc routines handle the
 44  * list of service routines.
 45  *
 46  * Copyright (C) 1984, Sun Microsystems, Inc.
 47  */
 48 
 49 #include <sys/param.h>
 50 #include <sys/lock.h>
 51 #include <sys/kernel.h>
 52 #include <sys/kthread.h>
 53 #include <sys/malloc.h>
 54 #include <sys/mbuf.h>
 55 #include <sys/mutex.h>
 56 #include <sys/proc.h>
 57 #include <sys/queue.h>
 58 #include <sys/socketvar.h>
 59 #include <sys/systm.h>
 60 #include <sys/ucred.h>
 61 
 62 #include <rpc/rpc.h>
 63 #include <rpc/rpcb_clnt.h>
 64 #include <rpc/replay.h>
 65 
 66 #include <rpc/rpc_com.h>
 67 
 68 #define SVC_VERSQUIET 0x0001            /* keep quiet about vers mismatch */
 69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
 70 
 71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
 72     char *);
 73 static void svc_new_thread(SVCPOOL *pool);
 74 static void xprt_unregister_locked(SVCXPRT *xprt);
 75 
 76 /* ***************  SVCXPRT related stuff **************** */
 77 
 78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
 79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
 80 
 81 SVCPOOL*
 82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
 83 {
 84         SVCPOOL *pool;
 85 
 86         pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
 87         
 88         mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
 89         pool->sp_name = name;
 90         pool->sp_state = SVCPOOL_INIT;
 91         pool->sp_proc = NULL;
 92         TAILQ_INIT(&pool->sp_xlist);
 93         TAILQ_INIT(&pool->sp_active);
 94         TAILQ_INIT(&pool->sp_callouts);
 95         LIST_INIT(&pool->sp_threads);
 96         LIST_INIT(&pool->sp_idlethreads);
 97         pool->sp_minthreads = 1;
 98         pool->sp_maxthreads = 1;
 99         pool->sp_threadcount = 0;
100 
101         /*
102          * Don't use more than a quarter of mbuf clusters or more than
103          * 45Mb buffering requests.
104          */
105         pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106         if (pool->sp_space_high > 45 << 20)
107                 pool->sp_space_high = 45 << 20;
108         pool->sp_space_low = 2 * pool->sp_space_high / 3;
109 
110         sysctl_ctx_init(&pool->sp_sysctl);
111         if (sysctl_base) {
112                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113                     "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114                     pool, 0, svcpool_minthread_sysctl, "I", "");
115                 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116                     "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117                     pool, 0, svcpool_maxthread_sysctl, "I", "");
118                 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119                     "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120 
121                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122                     "request_space_used", CTLFLAG_RD,
123                     &pool->sp_space_used, 0,
124                     "Space in parsed but not handled requests.");
125 
126                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127                     "request_space_used_highest", CTLFLAG_RD,
128                     &pool->sp_space_used_highest, 0,
129                     "Highest space used since reboot.");
130 
131                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132                     "request_space_high", CTLFLAG_RW,
133                     &pool->sp_space_high, 0,
134                     "Maximum space in parsed but not handled requests.");
135 
136                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137                     "request_space_low", CTLFLAG_RW,
138                     &pool->sp_space_low, 0,
139                     "Low water mark for request space.");
140 
141                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142                     "request_space_throttled", CTLFLAG_RD,
143                     &pool->sp_space_throttled, 0,
144                     "Whether nfs requests are currently throttled");
145 
146                 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147                     "request_space_throttle_count", CTLFLAG_RD,
148                     &pool->sp_space_throttle_count, 0,
149                     "Count of times throttling based on request space has occurred");
150         }
151 
152         return pool;
153 }
154 
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158         SVCXPRT *xprt, *nxprt;
159         struct svc_callout *s;
160         struct svcxprt_list cleanup;
161 
162         TAILQ_INIT(&cleanup);
163         mtx_lock(&pool->sp_lock);
164 
165         while (TAILQ_FIRST(&pool->sp_xlist)) {
166                 xprt = TAILQ_FIRST(&pool->sp_xlist);
167                 xprt_unregister_locked(xprt);
168                 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169         }
170 
171         while (TAILQ_FIRST(&pool->sp_callouts)) {
172                 s = TAILQ_FIRST(&pool->sp_callouts);
173                 mtx_unlock(&pool->sp_lock);
174                 svc_unreg(pool, s->sc_prog, s->sc_vers);
175                 mtx_lock(&pool->sp_lock);
176         }
177 
178         mtx_destroy(&pool->sp_lock);
179 
180         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181                 SVC_RELEASE(xprt);
182         }
183 
184         if (pool->sp_rcache)
185                 replay_freecache(pool->sp_rcache);
186 
187         sysctl_ctx_free(&pool->sp_sysctl);
188         free(pool, M_RPC);
189 }
190 
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194         enum svcpool_state state = pool->sp_state;
195 
196         if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197                 return (FALSE);
198         return (TRUE);
199 }
200 
201 /*
202  * Sysctl handler to set the minimum thread count on a pool
203  */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207         SVCPOOL *pool;
208         int newminthreads, error, n;
209 
210         pool = oidp->oid_arg1;
211         newminthreads = pool->sp_minthreads;
212         error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213         if (error == 0 && newminthreads != pool->sp_minthreads) {
214                 if (newminthreads > pool->sp_maxthreads)
215                         return (EINVAL);
216                 mtx_lock(&pool->sp_lock);
217                 if (newminthreads > pool->sp_minthreads
218                     && svcpool_active(pool)) {
219                         /*
220                          * If the pool is running and we are
221                          * increasing, create some more threads now.
222                          */
223                         n = newminthreads - pool->sp_threadcount;
224                         if (n > 0) {
225                                 mtx_unlock(&pool->sp_lock);
226                                 while (n--)
227                                         svc_new_thread(pool);
228                                 mtx_lock(&pool->sp_lock);
229                         }
230                 }
231                 pool->sp_minthreads = newminthreads;
232                 mtx_unlock(&pool->sp_lock);
233         }
234         return (error);
235 }
236 
237 /*
238  * Sysctl handler to set the maximum thread count on a pool
239  */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243         SVCPOOL *pool;
244         SVCTHREAD *st;
245         int newmaxthreads, error;
246 
247         pool = oidp->oid_arg1;
248         newmaxthreads = pool->sp_maxthreads;
249         error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250         if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251                 if (newmaxthreads < pool->sp_minthreads)
252                         return (EINVAL);
253                 mtx_lock(&pool->sp_lock);
254                 if (newmaxthreads < pool->sp_maxthreads
255                     && svcpool_active(pool)) {
256                         /*
257                          * If the pool is running and we are
258                          * decreasing, wake up some idle threads to
259                          * encourage them to exit.
260                          */
261                         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262                                 cv_signal(&st->st_cond);
263                 }
264                 pool->sp_maxthreads = newmaxthreads;
265                 mtx_unlock(&pool->sp_lock);
266         }
267         return (error);
268 }
269 
270 /*
271  * Activate a transport handle.
272  */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276         SVCPOOL *pool = xprt->xp_pool;
277 
278         mtx_lock(&pool->sp_lock);
279         xprt->xp_registered = TRUE;
280         xprt->xp_active = FALSE;
281         TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282         mtx_unlock(&pool->sp_lock);
283 }
284 
285 /*
286  * De-activate a transport handle. Note: the locked version doesn't
287  * release the transport - caller must do that after dropping the pool
288  * lock.
289  */
290 static void
291 xprt_unregister_locked(SVCXPRT *xprt)
292 {
293         SVCPOOL *pool = xprt->xp_pool;
294 
295         if (xprt->xp_active) {
296                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297                 xprt->xp_active = FALSE;
298         }
299         TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300         xprt->xp_registered = FALSE;
301 }
302 
303 void
304 xprt_unregister(SVCXPRT *xprt)
305 {
306         SVCPOOL *pool = xprt->xp_pool;
307 
308         mtx_lock(&pool->sp_lock);
309         xprt_unregister_locked(xprt);
310         mtx_unlock(&pool->sp_lock);
311 
312         SVC_RELEASE(xprt);
313 }
314 
315 static void
316 xprt_assignthread(SVCXPRT *xprt)
317 {
318         SVCPOOL *pool = xprt->xp_pool;
319         SVCTHREAD *st;
320 
321         /*
322          * Attempt to assign a service thread to this
323          * transport.
324          */
325         LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326                 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327                         break;
328         }
329         if (st) {
330                 SVC_ACQUIRE(xprt);
331                 xprt->xp_thread = st;
332                 st->st_xprt = xprt;
333                 cv_signal(&st->st_cond);
334         } else {
335                 /*
336                  * See if we can create a new thread. The
337                  * actual thread creation happens in
338                  * svc_run_internal because our locking state
339                  * is poorly defined (we are typically called
340                  * from a socket upcall). Don't create more
341                  * than one thread per second.
342                  */
343                 if (pool->sp_state == SVCPOOL_ACTIVE
344                     && pool->sp_lastcreatetime < time_uptime
345                     && pool->sp_threadcount < pool->sp_maxthreads) {
346                         pool->sp_state = SVCPOOL_THREADWANTED;
347                 }
348         }
349 }
350 
351 void
352 xprt_active(SVCXPRT *xprt)
353 {
354         SVCPOOL *pool = xprt->xp_pool;
355 
356         if (!xprt->xp_registered) {
357                 /*
358                  * Race with xprt_unregister - we lose.
359                  */
360                 return;
361         }
362 
363         mtx_lock(&pool->sp_lock);
364 
365         if (!xprt->xp_active) {
366                 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367                 xprt->xp_active = TRUE;
368                 xprt_assignthread(xprt);
369         }
370 
371         mtx_unlock(&pool->sp_lock);
372 }
373 
374 void
375 xprt_inactive_locked(SVCXPRT *xprt)
376 {
377         SVCPOOL *pool = xprt->xp_pool;
378 
379         if (xprt->xp_active) {
380                 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381                 xprt->xp_active = FALSE;
382         }
383 }
384 
385 void
386 xprt_inactive(SVCXPRT *xprt)
387 {
388         SVCPOOL *pool = xprt->xp_pool;
389 
390         mtx_lock(&pool->sp_lock);
391         xprt_inactive_locked(xprt);
392         mtx_unlock(&pool->sp_lock);
393 }
394 
395 /*
396  * Add a service program to the callout list.
397  * The dispatch routine will be called when a rpc request for this
398  * program number comes in.
399  */
400 bool_t
401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402     void (*dispatch)(struct svc_req *, SVCXPRT *),
403     const struct netconfig *nconf)
404 {
405         SVCPOOL *pool = xprt->xp_pool;
406         struct svc_callout *s;
407         char *netid = NULL;
408         int flag = 0;
409 
410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
411 
412         if (xprt->xp_netid) {
413                 netid = strdup(xprt->xp_netid, M_RPC);
414                 flag = 1;
415         } else if (nconf && nconf->nc_netid) {
416                 netid = strdup(nconf->nc_netid, M_RPC);
417                 flag = 1;
418         } /* must have been created with svc_raw_create */
419         if ((netid == NULL) && (flag == 1)) {
420                 return (FALSE);
421         }
422 
423         mtx_lock(&pool->sp_lock);
424         if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
425                 if (netid)
426                         free(netid, M_RPC);
427                 if (s->sc_dispatch == dispatch)
428                         goto rpcb_it; /* he is registering another xptr */
429                 mtx_unlock(&pool->sp_lock);
430                 return (FALSE);
431         }
432         s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
433         if (s == NULL) {
434                 if (netid)
435                         free(netid, M_RPC);
436                 mtx_unlock(&pool->sp_lock);
437                 return (FALSE);
438         }
439 
440         s->sc_prog = prog;
441         s->sc_vers = vers;
442         s->sc_dispatch = dispatch;
443         s->sc_netid = netid;
444         TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
445 
446         if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447                 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
448 
449 rpcb_it:
450         mtx_unlock(&pool->sp_lock);
451         /* now register the information with the local binder service */
452         if (nconf) {
453                 bool_t dummy;
454                 struct netconfig tnc;
455                 struct netbuf nb;
456                 tnc = *nconf;
457                 nb.buf = &xprt->xp_ltaddr;
458                 nb.len = xprt->xp_ltaddr.ss_len;
459                 dummy = rpcb_set(prog, vers, &tnc, &nb);
460                 return (dummy);
461         }
462         return (TRUE);
463 }
464 
465 /*
466  * Remove a service program from the callout list.
467  */
468 void
469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
470 {
471         struct svc_callout *s;
472 
473         /* unregister the information anyway */
474         (void) rpcb_unset(prog, vers, NULL);
475         mtx_lock(&pool->sp_lock);
476         while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477                 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
478                 if (s->sc_netid)
479                         mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480                 mem_free(s, sizeof (struct svc_callout));
481         }
482         mtx_unlock(&pool->sp_lock);
483 }
484 
485 /* ********************** CALLOUT list related stuff ************* */
486 
487 /*
488  * Search the callout list for a program number, return the callout
489  * struct.
490  */
491 static struct svc_callout *
492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
493 {
494         struct svc_callout *s;
495 
496         mtx_assert(&pool->sp_lock, MA_OWNED);
497         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498                 if (s->sc_prog == prog && s->sc_vers == vers
499                     && (netid == NULL || s->sc_netid == NULL ||
500                         strcmp(netid, s->sc_netid) == 0))
501                         break;
502         }
503 
504         return (s);
505 }
506 
507 /* ******************* REPLY GENERATION ROUTINES  ************ */
508 
509 static bool_t
510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
511     struct mbuf *body)
512 {
513         SVCXPRT *xprt = rqstp->rq_xprt;
514         bool_t ok;
515 
516         if (rqstp->rq_args) {
517                 m_freem(rqstp->rq_args);
518                 rqstp->rq_args = NULL;
519         }
520 
521         if (xprt->xp_pool->sp_rcache)
522                 replay_setreply(xprt->xp_pool->sp_rcache,
523                     rply, svc_getrpccaller(rqstp), body);
524 
525         if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
526                 return (FALSE);
527 
528         ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body); 
529         if (rqstp->rq_addr) {
530                 free(rqstp->rq_addr, M_SONAME);
531                 rqstp->rq_addr = NULL;
532         }
533 
534         return (ok);
535 }
536 
537 /*
538  * Send a reply to an rpc request
539  */
540 bool_t
541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
542 {
543         struct rpc_msg rply; 
544         struct mbuf *m;
545         XDR xdrs;
546         bool_t ok;
547 
548         rply.rm_xid = rqstp->rq_xid;
549         rply.rm_direction = REPLY;  
550         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
551         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
552         rply.acpted_rply.ar_stat = SUCCESS;
553         rply.acpted_rply.ar_results.where = NULL;
554         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
555 
556         MGET(m, M_WAIT, MT_DATA);
557         MCLGET(m, M_WAIT);
558         m->m_len = 0;
559         xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560         ok = xdr_results(&xdrs, xdr_location);
561         XDR_DESTROY(&xdrs);
562 
563         if (ok) {
564                 return (svc_sendreply_common(rqstp, &rply, m));
565         } else {
566                 m_freem(m);
567                 return (FALSE);
568         }
569 }
570 
571 bool_t
572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
573 {
574         struct rpc_msg rply; 
575 
576         rply.rm_xid = rqstp->rq_xid;
577         rply.rm_direction = REPLY;  
578         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
579         rply.acpted_rply.ar_verf = rqstp->rq_verf; 
580         rply.acpted_rply.ar_stat = SUCCESS;
581         rply.acpted_rply.ar_results.where = NULL;
582         rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
583 
584         return (svc_sendreply_common(rqstp, &rply, m));
585 }
586 
587 /*
588  * No procedure error reply
589  */
590 void
591 svcerr_noproc(struct svc_req *rqstp)
592 {
593         SVCXPRT *xprt = rqstp->rq_xprt;
594         struct rpc_msg rply;
595 
596         rply.rm_xid = rqstp->rq_xid;
597         rply.rm_direction = REPLY;
598         rply.rm_reply.rp_stat = MSG_ACCEPTED;
599         rply.acpted_rply.ar_verf = rqstp->rq_verf;
600         rply.acpted_rply.ar_stat = PROC_UNAVAIL;
601 
602         if (xprt->xp_pool->sp_rcache)
603                 replay_setreply(xprt->xp_pool->sp_rcache,
604                     &rply, svc_getrpccaller(rqstp), NULL);
605 
606         svc_sendreply_common(rqstp, &rply, NULL);
607 }
608 
609 /*
610  * Can't decode args error reply
611  */
612 void
613 svcerr_decode(struct svc_req *rqstp)
614 {
615         SVCXPRT *xprt = rqstp->rq_xprt;
616         struct rpc_msg rply; 
617 
618         rply.rm_xid = rqstp->rq_xid;
619         rply.rm_direction = REPLY; 
620         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
621         rply.acpted_rply.ar_verf = rqstp->rq_verf;
622         rply.acpted_rply.ar_stat = GARBAGE_ARGS;
623 
624         if (xprt->xp_pool->sp_rcache)
625                 replay_setreply(xprt->xp_pool->sp_rcache,
626                     &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
627 
628         svc_sendreply_common(rqstp, &rply, NULL);
629 }
630 
631 /*
632  * Some system error
633  */
634 void
635 svcerr_systemerr(struct svc_req *rqstp)
636 {
637         SVCXPRT *xprt = rqstp->rq_xprt;
638         struct rpc_msg rply; 
639 
640         rply.rm_xid = rqstp->rq_xid;
641         rply.rm_direction = REPLY; 
642         rply.rm_reply.rp_stat = MSG_ACCEPTED; 
643         rply.acpted_rply.ar_verf = rqstp->rq_verf;
644         rply.acpted_rply.ar_stat = SYSTEM_ERR;
645 
646         if (xprt->xp_pool->sp_rcache)
647                 replay_setreply(xprt->xp_pool->sp_rcache,
648                     &rply, svc_getrpccaller(rqstp), NULL);
649 
650         svc_sendreply_common(rqstp, &rply, NULL);
651 }
652 
653 /*
654  * Authentication error reply
655  */
656 void
657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
658 {
659         SVCXPRT *xprt = rqstp->rq_xprt;
660         struct rpc_msg rply;
661 
662         rply.rm_xid = rqstp->rq_xid;
663         rply.rm_direction = REPLY;
664         rply.rm_reply.rp_stat = MSG_DENIED;
665         rply.rjcted_rply.rj_stat = AUTH_ERROR;
666         rply.rjcted_rply.rj_why = why;
667 
668         if (xprt->xp_pool->sp_rcache)
669                 replay_setreply(xprt->xp_pool->sp_rcache,
670                     &rply, svc_getrpccaller(rqstp), NULL);
671 
672         svc_sendreply_common(rqstp, &rply, NULL);
673 }
674 
675 /*
676  * Auth too weak error reply
677  */
678 void
679 svcerr_weakauth(struct svc_req *rqstp)
680 {
681 
682         svcerr_auth(rqstp, AUTH_TOOWEAK);
683 }
684 
685 /*
686  * Program unavailable error reply
687  */
688 void 
689 svcerr_noprog(struct svc_req *rqstp)
690 {
691         SVCXPRT *xprt = rqstp->rq_xprt;
692         struct rpc_msg rply;  
693 
694         rply.rm_xid = rqstp->rq_xid;
695         rply.rm_direction = REPLY;   
696         rply.rm_reply.rp_stat = MSG_ACCEPTED;  
697         rply.acpted_rply.ar_verf = rqstp->rq_verf;  
698         rply.acpted_rply.ar_stat = PROG_UNAVAIL;
699 
700         if (xprt->xp_pool->sp_rcache)
701                 replay_setreply(xprt->xp_pool->sp_rcache,
702                     &rply, svc_getrpccaller(rqstp), NULL);
703 
704         svc_sendreply_common(rqstp, &rply, NULL);
705 }
706 
707 /*
708  * Program version mismatch error reply
709  */
710 void  
711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
712 {
713         SVCXPRT *xprt = rqstp->rq_xprt;
714         struct rpc_msg rply;
715 
716         rply.rm_xid = rqstp->rq_xid;
717         rply.rm_direction = REPLY;
718         rply.rm_reply.rp_stat = MSG_ACCEPTED;
719         rply.acpted_rply.ar_verf = rqstp->rq_verf;
720         rply.acpted_rply.ar_stat = PROG_MISMATCH;
721         rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722         rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
723 
724         if (xprt->xp_pool->sp_rcache)
725                 replay_setreply(xprt->xp_pool->sp_rcache,
726                     &rply, svc_getrpccaller(rqstp), NULL);
727 
728         svc_sendreply_common(rqstp, &rply, NULL);
729 }
730 
731 /*
732  * Allocate a new server transport structure. All fields are
733  * initialized to zero and xp_p3 is initialized to point at an
734  * extension structure to hold various flags and authentication
735  * parameters.
736  */
737 SVCXPRT *
738 svc_xprt_alloc()
739 {
740         SVCXPRT *xprt;
741         SVCXPRT_EXT *ext;
742 
743         xprt = mem_alloc(sizeof(SVCXPRT));
744         memset(xprt, 0, sizeof(SVCXPRT));
745         ext = mem_alloc(sizeof(SVCXPRT_EXT));
746         memset(ext, 0, sizeof(SVCXPRT_EXT));
747         xprt->xp_p3 = ext;
748         refcount_init(&xprt->xp_refs, 1);
749 
750         return (xprt);
751 }
752 
753 /*
754  * Free a server transport structure.
755  */
756 void
757 svc_xprt_free(xprt)
758         SVCXPRT *xprt;
759 {
760 
761         mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762         mem_free(xprt, sizeof(SVCXPRT));
763 }
764 
765 /* ******************* SERVER INPUT STUFF ******************* */
766 
767 /*
768  * Read RPC requests from a transport and queue them to be
769  * executed. We handle authentication and replay cache replies here.
770  * Actually dispatching the RPC is deferred till svc_executereq.
771  */
772 static enum xprt_stat
773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
774 {
775         SVCPOOL *pool = xprt->xp_pool;
776         struct svc_req *r;
777         struct rpc_msg msg;
778         struct mbuf *args;
779         enum xprt_stat stat;
780 
781         /* now receive msgs from xprtprt (support batch calls) */
782         r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
783 
784         msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785         msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786         r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787         if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
788                 enum auth_stat why;
789 
790                 /*
791                  * Handle replays and authenticate before queuing the
792                  * request to be executed.
793                  */
794                 SVC_ACQUIRE(xprt);
795                 r->rq_xprt = xprt;
796                 if (pool->sp_rcache) {
797                         struct rpc_msg repmsg;
798                         struct mbuf *repbody;
799                         enum replay_state rs;
800                         rs = replay_find(pool->sp_rcache, &msg,
801                             svc_getrpccaller(r), &repmsg, &repbody);
802                         switch (rs) {
803                         case RS_NEW:
804                                 break;
805                         case RS_DONE:
806                                 SVC_REPLY(xprt, &repmsg, r->rq_addr,
807                                     repbody);
808                                 if (r->rq_addr) {
809                                         free(r->rq_addr, M_SONAME);
810                                         r->rq_addr = NULL;
811                                 }
812                                 goto call_done;
813 
814                         default:
815                                 goto call_done;
816                         }
817                 }
818 
819                 r->rq_xid = msg.rm_xid;
820                 r->rq_prog = msg.rm_call.cb_prog;
821                 r->rq_vers = msg.rm_call.cb_vers;
822                 r->rq_proc = msg.rm_call.cb_proc;
823                 r->rq_size = sizeof(*r) + m_length(args, NULL);
824                 r->rq_args = args;
825                 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
826                         /*
827                          * RPCSEC_GSS uses this return code
828                          * for requests that form part of its
829                          * context establishment protocol and
830                          * should not be dispatched to the
831                          * application.
832                          */
833                         if (why != RPCSEC_GSS_NODISPATCH)
834                                 svcerr_auth(r, why);
835                         goto call_done;
836                 }
837 
838                 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
839                         svcerr_decode(r);
840                         goto call_done;
841                 }
842 
843                 /*
844                  * Everything checks out, return request to caller.
845                  */
846                 *rqstp_ret = r;
847                 r = NULL;
848         }
849 call_done:
850         if (r) {
851                 svc_freereq(r);
852                 r = NULL;
853         }
854         if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855                 xprt_unregister(xprt);
856         }
857 
858         return (stat);
859 }
860 
861 static void
862 svc_executereq(struct svc_req *rqstp)
863 {
864         SVCXPRT *xprt = rqstp->rq_xprt;
865         SVCPOOL *pool = xprt->xp_pool;
866         int prog_found;
867         rpcvers_t low_vers;
868         rpcvers_t high_vers;
869         struct svc_callout *s;
870 
871         /* now match message with a registered service*/
872         prog_found = FALSE;
873         low_vers = (rpcvers_t) -1L;
874         high_vers = (rpcvers_t) 0L;
875         TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876                 if (s->sc_prog == rqstp->rq_prog) {
877                         if (s->sc_vers == rqstp->rq_vers) {
878                                 /*
879                                  * We hand ownership of r to the
880                                  * dispatch method - they must call
881                                  * svc_freereq.
882                                  */
883                                 (*s->sc_dispatch)(rqstp, xprt);
884                                 return;
885                         }  /* found correct version */
886                         prog_found = TRUE;
887                         if (s->sc_vers < low_vers)
888                                 low_vers = s->sc_vers;
889                         if (s->sc_vers > high_vers)
890                                 high_vers = s->sc_vers;
891                 }   /* found correct program */
892         }
893 
894         /*
895          * if we got here, the program or version
896          * is not served ...
897          */
898         if (prog_found)
899                 svcerr_progvers(rqstp, low_vers, high_vers);
900         else
901                 svcerr_noprog(rqstp);
902 
903         svc_freereq(rqstp);
904 }
905 
906 static void
907 svc_checkidle(SVCPOOL *pool)
908 {
909         SVCXPRT *xprt, *nxprt;
910         time_t timo;
911         struct svcxprt_list cleanup;
912 
913         TAILQ_INIT(&cleanup);
914         TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
915                 /*
916                  * Only some transports have idle timers. Don't time
917                  * something out which is just waking up.
918                  */
919                 if (!xprt->xp_idletimeout || xprt->xp_thread)
920                         continue;
921 
922                 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923                 if (time_uptime > timo) {
924                         xprt_unregister_locked(xprt);
925                         TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
926                 }
927         }
928 
929         mtx_unlock(&pool->sp_lock);
930         TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
931                 SVC_RELEASE(xprt);
932         }
933         mtx_lock(&pool->sp_lock);
934 
935 }
936 
937 static void
938 svc_assign_waiting_sockets(SVCPOOL *pool)
939 {
940         SVCXPRT *xprt;
941 
942         TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943                 if (!xprt->xp_thread) {
944                         xprt_assignthread(xprt);
945                 }
946         }
947 }
948 
949 static bool_t
950 svc_request_space_available(SVCPOOL *pool)
951 {
952 
953         mtx_assert(&pool->sp_lock, MA_OWNED);
954 
955         if (pool->sp_space_throttled) {
956                 /*
957                  * Below the low-water yet? If so, assign any waiting sockets.
958                  */
959                 if (pool->sp_space_used < pool->sp_space_low) {
960                         pool->sp_space_throttled = FALSE;
961                         svc_assign_waiting_sockets(pool);
962                         return TRUE;
963                 }
964                 
965                 return FALSE;
966         } else {
967                 if (pool->sp_space_used
968                     >= pool->sp_space_high) {
969                         pool->sp_space_throttled = TRUE;
970                         pool->sp_space_throttle_count++;
971                         return FALSE;
972                 }
973 
974                 return TRUE;
975         }
976 }
977 
978 static void
979 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
980 {
981         SVCTHREAD *st, *stpref;
982         SVCXPRT *xprt;
983         enum xprt_stat stat;
984         struct svc_req *rqstp;
985         int error;
986 
987         st = mem_alloc(sizeof(*st));
988         st->st_xprt = NULL;
989         STAILQ_INIT(&st->st_reqs);
990         cv_init(&st->st_cond, "rpcsvc");
991 
992         mtx_lock(&pool->sp_lock);
993         LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
994 
995         /*
996          * If we are a new thread which was spawned to cope with
997          * increased load, set the state back to SVCPOOL_ACTIVE.
998          */
999         if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000                 pool->sp_state = SVCPOOL_ACTIVE;
1001 
1002         while (pool->sp_state != SVCPOOL_CLOSING) {
1003                 /*
1004                  * Check for idle transports once per second.
1005                  */
1006                 if (time_uptime > pool->sp_lastidlecheck) {
1007                         pool->sp_lastidlecheck = time_uptime;
1008                         svc_checkidle(pool);
1009                 }
1010 
1011                 xprt = st->st_xprt;
1012                 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1013                         /*
1014                          * Enforce maxthreads count.
1015                          */
1016                         if (pool->sp_threadcount > pool->sp_maxthreads)
1017                                 break;
1018 
1019                         /*
1020                          * Before sleeping, see if we can find an
1021                          * active transport which isn't being serviced
1022                          * by a thread.
1023                          */
1024                         if (svc_request_space_available(pool)) {
1025                                 TAILQ_FOREACH(xprt, &pool->sp_active,
1026                                     xp_alink) {
1027                                         if (!xprt->xp_thread) {
1028                                                 SVC_ACQUIRE(xprt);
1029                                                 xprt->xp_thread = st;
1030                                                 st->st_xprt = xprt;
1031                                                 break;
1032                                         }
1033                                 }
1034                         }
1035                         if (st->st_xprt)
1036                                 continue;
1037 
1038                         LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039                         error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1040                                 5 * hz);
1041                         LIST_REMOVE(st, st_ilink);
1042 
1043                         /*
1044                          * Reduce worker thread count when idle.
1045                          */
1046                         if (error == EWOULDBLOCK) {
1047                                 if (!ismaster
1048                                     && (pool->sp_threadcount
1049                                         > pool->sp_minthreads)
1050                                         && !st->st_xprt
1051                                         && STAILQ_EMPTY(&st->st_reqs))
1052                                         break;
1053                         }
1054                         if (error == EWOULDBLOCK)
1055                                 continue;
1056                         if (error) {
1057                                 if (pool->sp_state != SVCPOOL_CLOSING) {
1058                                         mtx_unlock(&pool->sp_lock);
1059                                         svc_exit(pool);
1060                                         mtx_lock(&pool->sp_lock);
1061                                 }
1062                                 break;
1063                         }
1064 
1065                         if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066                                 pool->sp_state = SVCPOOL_THREADSTARTING;
1067                                 pool->sp_lastcreatetime = time_uptime;
1068                                 mtx_unlock(&pool->sp_lock);
1069                                 svc_new_thread(pool);
1070                                 mtx_lock(&pool->sp_lock);
1071                         }
1072                         continue;
1073                 }
1074 
1075                 if (xprt) {
1076                         /*
1077                          * Drain the transport socket and queue up any
1078                          * RPCs.
1079                          */
1080                         xprt->xp_lastactive = time_uptime;
1081                         stat = XPRT_IDLE;
1082                         do {
1083                                 if (!svc_request_space_available(pool))
1084                                         break;
1085                                 rqstp = NULL;
1086                                 mtx_unlock(&pool->sp_lock);
1087                                 stat = svc_getreq(xprt, &rqstp);
1088                                 mtx_lock(&pool->sp_lock);
1089                                 if (rqstp) {
1090                                         /*
1091                                          * See if the application has
1092                                          * a preference for some other
1093