FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c
1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD: src/sys/rpc/svc.c,v 1.3 2008/11/03 10:38:00 dfr Exp $");
38
39 /*
40 * svc.c, Server-side remote procedure call interface.
41 *
42 * There are two sets of procedures here. The xprt routines are
43 * for handling transport handles. The svc routines handle the
44 * list of service routines.
45 *
46 * Copyright (C) 1984, Sun Microsystems, Inc.
47 */
48
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65
66 #include <rpc/rpc_com.h>
67
68 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72 char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75
76 /* *************** SVCXPRT related stuff **************** */
77
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84 SVCPOOL *pool;
85
86 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87
88 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89 pool->sp_name = name;
90 pool->sp_state = SVCPOOL_INIT;
91 pool->sp_proc = NULL;
92 TAILQ_INIT(&pool->sp_xlist);
93 TAILQ_INIT(&pool->sp_active);
94 TAILQ_INIT(&pool->sp_callouts);
95 LIST_INIT(&pool->sp_threads);
96 LIST_INIT(&pool->sp_idlethreads);
97 pool->sp_minthreads = 1;
98 pool->sp_maxthreads = 1;
99 pool->sp_threadcount = 0;
100
101 /*
102 * Don't use more than a quarter of mbuf clusters or more than
103 * 45Mb buffering requests.
104 */
105 pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106 if (pool->sp_space_high > 45 << 20)
107 pool->sp_space_high = 45 << 20;
108 pool->sp_space_low = 2 * pool->sp_space_high / 3;
109
110 sysctl_ctx_init(&pool->sp_sysctl);
111 if (sysctl_base) {
112 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114 pool, 0, svcpool_minthread_sysctl, "I", "");
115 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117 pool, 0, svcpool_maxthread_sysctl, "I", "");
118 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120
121 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 "request_space_used", CTLFLAG_RD,
123 &pool->sp_space_used, 0,
124 "Space in parsed but not handled requests.");
125
126 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 "request_space_used_highest", CTLFLAG_RD,
128 &pool->sp_space_used_highest, 0,
129 "Highest space used since reboot.");
130
131 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 "request_space_high", CTLFLAG_RW,
133 &pool->sp_space_high, 0,
134 "Maximum space in parsed but not handled requests.");
135
136 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "request_space_low", CTLFLAG_RW,
138 &pool->sp_space_low, 0,
139 "Low water mark for request space.");
140
141 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 "request_space_throttled", CTLFLAG_RD,
143 &pool->sp_space_throttled, 0,
144 "Whether nfs requests are currently throttled");
145
146 SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 "request_space_throttle_count", CTLFLAG_RD,
148 &pool->sp_space_throttle_count, 0,
149 "Count of times throttling based on request space has occurred");
150 }
151
152 return pool;
153 }
154
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158 SVCXPRT *xprt, *nxprt;
159 struct svc_callout *s;
160 struct svcxprt_list cleanup;
161
162 TAILQ_INIT(&cleanup);
163 mtx_lock(&pool->sp_lock);
164
165 while (TAILQ_FIRST(&pool->sp_xlist)) {
166 xprt = TAILQ_FIRST(&pool->sp_xlist);
167 xprt_unregister_locked(xprt);
168 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169 }
170
171 while (TAILQ_FIRST(&pool->sp_callouts)) {
172 s = TAILQ_FIRST(&pool->sp_callouts);
173 mtx_unlock(&pool->sp_lock);
174 svc_unreg(pool, s->sc_prog, s->sc_vers);
175 mtx_lock(&pool->sp_lock);
176 }
177
178 mtx_destroy(&pool->sp_lock);
179
180 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181 SVC_RELEASE(xprt);
182 }
183
184 if (pool->sp_rcache)
185 replay_freecache(pool->sp_rcache);
186
187 sysctl_ctx_free(&pool->sp_sysctl);
188 free(pool, M_RPC);
189 }
190
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194 enum svcpool_state state = pool->sp_state;
195
196 if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197 return (FALSE);
198 return (TRUE);
199 }
200
201 /*
202 * Sysctl handler to set the minimum thread count on a pool
203 */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207 SVCPOOL *pool;
208 int newminthreads, error, n;
209
210 pool = oidp->oid_arg1;
211 newminthreads = pool->sp_minthreads;
212 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213 if (error == 0 && newminthreads != pool->sp_minthreads) {
214 if (newminthreads > pool->sp_maxthreads)
215 return (EINVAL);
216 mtx_lock(&pool->sp_lock);
217 if (newminthreads > pool->sp_minthreads
218 && svcpool_active(pool)) {
219 /*
220 * If the pool is running and we are
221 * increasing, create some more threads now.
222 */
223 n = newminthreads - pool->sp_threadcount;
224 if (n > 0) {
225 mtx_unlock(&pool->sp_lock);
226 while (n--)
227 svc_new_thread(pool);
228 mtx_lock(&pool->sp_lock);
229 }
230 }
231 pool->sp_minthreads = newminthreads;
232 mtx_unlock(&pool->sp_lock);
233 }
234 return (error);
235 }
236
237 /*
238 * Sysctl handler to set the maximum thread count on a pool
239 */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243 SVCPOOL *pool;
244 SVCTHREAD *st;
245 int newmaxthreads, error;
246
247 pool = oidp->oid_arg1;
248 newmaxthreads = pool->sp_maxthreads;
249 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251 if (newmaxthreads < pool->sp_minthreads)
252 return (EINVAL);
253 mtx_lock(&pool->sp_lock);
254 if (newmaxthreads < pool->sp_maxthreads
255 && svcpool_active(pool)) {
256 /*
257 * If the pool is running and we are
258 * decreasing, wake up some idle threads to
259 * encourage them to exit.
260 */
261 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262 cv_signal(&st->st_cond);
263 }
264 pool->sp_maxthreads = newmaxthreads;
265 mtx_unlock(&pool->sp_lock);
266 }
267 return (error);
268 }
269
270 /*
271 * Activate a transport handle.
272 */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276 SVCPOOL *pool = xprt->xp_pool;
277
278 mtx_lock(&pool->sp_lock);
279 xprt->xp_registered = TRUE;
280 xprt->xp_active = FALSE;
281 TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282 mtx_unlock(&pool->sp_lock);
283 }
284
285 /*
286 * De-activate a transport handle. Note: the locked version doesn't
287 * release the transport - caller must do that after dropping the pool
288 * lock.
289 */
290 static void
291 xprt_unregister_locked(SVCXPRT *xprt)
292 {
293 SVCPOOL *pool = xprt->xp_pool;
294
295 if (xprt->xp_active) {
296 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297 xprt->xp_active = FALSE;
298 }
299 TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300 xprt->xp_registered = FALSE;
301 }
302
303 void
304 xprt_unregister(SVCXPRT *xprt)
305 {
306 SVCPOOL *pool = xprt->xp_pool;
307
308 mtx_lock(&pool->sp_lock);
309 xprt_unregister_locked(xprt);
310 mtx_unlock(&pool->sp_lock);
311
312 SVC_RELEASE(xprt);
313 }
314
315 static void
316 xprt_assignthread(SVCXPRT *xprt)
317 {
318 SVCPOOL *pool = xprt->xp_pool;
319 SVCTHREAD *st;
320
321 /*
322 * Attempt to assign a service thread to this
323 * transport.
324 */
325 LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326 if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327 break;
328 }
329 if (st) {
330 SVC_ACQUIRE(xprt);
331 xprt->xp_thread = st;
332 st->st_xprt = xprt;
333 cv_signal(&st->st_cond);
334 } else {
335 /*
336 * See if we can create a new thread. The
337 * actual thread creation happens in
338 * svc_run_internal because our locking state
339 * is poorly defined (we are typically called
340 * from a socket upcall). Don't create more
341 * than one thread per second.
342 */
343 if (pool->sp_state == SVCPOOL_ACTIVE
344 && pool->sp_lastcreatetime < time_uptime
345 && pool->sp_threadcount < pool->sp_maxthreads) {
346 pool->sp_state = SVCPOOL_THREADWANTED;
347 }
348 }
349 }
350
351 void
352 xprt_active(SVCXPRT *xprt)
353 {
354 SVCPOOL *pool = xprt->xp_pool;
355
356 if (!xprt->xp_registered) {
357 /*
358 * Race with xprt_unregister - we lose.
359 */
360 return;
361 }
362
363 mtx_lock(&pool->sp_lock);
364
365 if (!xprt->xp_active) {
366 TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367 xprt->xp_active = TRUE;
368 xprt_assignthread(xprt);
369 }
370
371 mtx_unlock(&pool->sp_lock);
372 }
373
374 void
375 xprt_inactive_locked(SVCXPRT *xprt)
376 {
377 SVCPOOL *pool = xprt->xp_pool;
378
379 if (xprt->xp_active) {
380 TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381 xprt->xp_active = FALSE;
382 }
383 }
384
385 void
386 xprt_inactive(SVCXPRT *xprt)
387 {
388 SVCPOOL *pool = xprt->xp_pool;
389
390 mtx_lock(&pool->sp_lock);
391 xprt_inactive_locked(xprt);
392 mtx_unlock(&pool->sp_lock);
393 }
394
395 /*
396 * Add a service program to the callout list.
397 * The dispatch routine will be called when a rpc request for this
398 * program number comes in.
399 */
400 bool_t
401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402 void (*dispatch)(struct svc_req *, SVCXPRT *),
403 const struct netconfig *nconf)
404 {
405 SVCPOOL *pool = xprt->xp_pool;
406 struct svc_callout *s;
407 char *netid = NULL;
408 int flag = 0;
409
410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
411
412 if (xprt->xp_netid) {
413 netid = strdup(xprt->xp_netid, M_RPC);
414 flag = 1;
415 } else if (nconf && nconf->nc_netid) {
416 netid = strdup(nconf->nc_netid, M_RPC);
417 flag = 1;
418 } /* must have been created with svc_raw_create */
419 if ((netid == NULL) && (flag == 1)) {
420 return (FALSE);
421 }
422
423 mtx_lock(&pool->sp_lock);
424 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
425 if (netid)
426 free(netid, M_RPC);
427 if (s->sc_dispatch == dispatch)
428 goto rpcb_it; /* he is registering another xptr */
429 mtx_unlock(&pool->sp_lock);
430 return (FALSE);
431 }
432 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
433 if (s == NULL) {
434 if (netid)
435 free(netid, M_RPC);
436 mtx_unlock(&pool->sp_lock);
437 return (FALSE);
438 }
439
440 s->sc_prog = prog;
441 s->sc_vers = vers;
442 s->sc_dispatch = dispatch;
443 s->sc_netid = netid;
444 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
445
446 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
448
449 rpcb_it:
450 mtx_unlock(&pool->sp_lock);
451 /* now register the information with the local binder service */
452 if (nconf) {
453 bool_t dummy;
454 struct netconfig tnc;
455 struct netbuf nb;
456 tnc = *nconf;
457 nb.buf = &xprt->xp_ltaddr;
458 nb.len = xprt->xp_ltaddr.ss_len;
459 dummy = rpcb_set(prog, vers, &tnc, &nb);
460 return (dummy);
461 }
462 return (TRUE);
463 }
464
465 /*
466 * Remove a service program from the callout list.
467 */
468 void
469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
470 {
471 struct svc_callout *s;
472
473 /* unregister the information anyway */
474 (void) rpcb_unset(prog, vers, NULL);
475 mtx_lock(&pool->sp_lock);
476 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
478 if (s->sc_netid)
479 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480 mem_free(s, sizeof (struct svc_callout));
481 }
482 mtx_unlock(&pool->sp_lock);
483 }
484
485 /* ********************** CALLOUT list related stuff ************* */
486
487 /*
488 * Search the callout list for a program number, return the callout
489 * struct.
490 */
491 static struct svc_callout *
492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
493 {
494 struct svc_callout *s;
495
496 mtx_assert(&pool->sp_lock, MA_OWNED);
497 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498 if (s->sc_prog == prog && s->sc_vers == vers
499 && (netid == NULL || s->sc_netid == NULL ||
500 strcmp(netid, s->sc_netid) == 0))
501 break;
502 }
503
504 return (s);
505 }
506
507 /* ******************* REPLY GENERATION ROUTINES ************ */
508
509 static bool_t
510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
511 struct mbuf *body)
512 {
513 SVCXPRT *xprt = rqstp->rq_xprt;
514 bool_t ok;
515
516 if (rqstp->rq_args) {
517 m_freem(rqstp->rq_args);
518 rqstp->rq_args = NULL;
519 }
520
521 if (xprt->xp_pool->sp_rcache)
522 replay_setreply(xprt->xp_pool->sp_rcache,
523 rply, svc_getrpccaller(rqstp), body);
524
525 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
526 return (FALSE);
527
528 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
529 if (rqstp->rq_addr) {
530 free(rqstp->rq_addr, M_SONAME);
531 rqstp->rq_addr = NULL;
532 }
533
534 return (ok);
535 }
536
537 /*
538 * Send a reply to an rpc request
539 */
540 bool_t
541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
542 {
543 struct rpc_msg rply;
544 struct mbuf *m;
545 XDR xdrs;
546 bool_t ok;
547
548 rply.rm_xid = rqstp->rq_xid;
549 rply.rm_direction = REPLY;
550 rply.rm_reply.rp_stat = MSG_ACCEPTED;
551 rply.acpted_rply.ar_verf = rqstp->rq_verf;
552 rply.acpted_rply.ar_stat = SUCCESS;
553 rply.acpted_rply.ar_results.where = NULL;
554 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
555
556 MGET(m, M_WAIT, MT_DATA);
557 MCLGET(m, M_WAIT);
558 m->m_len = 0;
559 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560 ok = xdr_results(&xdrs, xdr_location);
561 XDR_DESTROY(&xdrs);
562
563 if (ok) {
564 return (svc_sendreply_common(rqstp, &rply, m));
565 } else {
566 m_freem(m);
567 return (FALSE);
568 }
569 }
570
571 bool_t
572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
573 {
574 struct rpc_msg rply;
575
576 rply.rm_xid = rqstp->rq_xid;
577 rply.rm_direction = REPLY;
578 rply.rm_reply.rp_stat = MSG_ACCEPTED;
579 rply.acpted_rply.ar_verf = rqstp->rq_verf;
580 rply.acpted_rply.ar_stat = SUCCESS;
581 rply.acpted_rply.ar_results.where = NULL;
582 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
583
584 return (svc_sendreply_common(rqstp, &rply, m));
585 }
586
587 /*
588 * No procedure error reply
589 */
590 void
591 svcerr_noproc(struct svc_req *rqstp)
592 {
593 SVCXPRT *xprt = rqstp->rq_xprt;
594 struct rpc_msg rply;
595
596 rply.rm_xid = rqstp->rq_xid;
597 rply.rm_direction = REPLY;
598 rply.rm_reply.rp_stat = MSG_ACCEPTED;
599 rply.acpted_rply.ar_verf = rqstp->rq_verf;
600 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
601
602 if (xprt->xp_pool->sp_rcache)
603 replay_setreply(xprt->xp_pool->sp_rcache,
604 &rply, svc_getrpccaller(rqstp), NULL);
605
606 svc_sendreply_common(rqstp, &rply, NULL);
607 }
608
609 /*
610 * Can't decode args error reply
611 */
612 void
613 svcerr_decode(struct svc_req *rqstp)
614 {
615 SVCXPRT *xprt = rqstp->rq_xprt;
616 struct rpc_msg rply;
617
618 rply.rm_xid = rqstp->rq_xid;
619 rply.rm_direction = REPLY;
620 rply.rm_reply.rp_stat = MSG_ACCEPTED;
621 rply.acpted_rply.ar_verf = rqstp->rq_verf;
622 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
623
624 if (xprt->xp_pool->sp_rcache)
625 replay_setreply(xprt->xp_pool->sp_rcache,
626 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
627
628 svc_sendreply_common(rqstp, &rply, NULL);
629 }
630
631 /*
632 * Some system error
633 */
634 void
635 svcerr_systemerr(struct svc_req *rqstp)
636 {
637 SVCXPRT *xprt = rqstp->rq_xprt;
638 struct rpc_msg rply;
639
640 rply.rm_xid = rqstp->rq_xid;
641 rply.rm_direction = REPLY;
642 rply.rm_reply.rp_stat = MSG_ACCEPTED;
643 rply.acpted_rply.ar_verf = rqstp->rq_verf;
644 rply.acpted_rply.ar_stat = SYSTEM_ERR;
645
646 if (xprt->xp_pool->sp_rcache)
647 replay_setreply(xprt->xp_pool->sp_rcache,
648 &rply, svc_getrpccaller(rqstp), NULL);
649
650 svc_sendreply_common(rqstp, &rply, NULL);
651 }
652
653 /*
654 * Authentication error reply
655 */
656 void
657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
658 {
659 SVCXPRT *xprt = rqstp->rq_xprt;
660 struct rpc_msg rply;
661
662 rply.rm_xid = rqstp->rq_xid;
663 rply.rm_direction = REPLY;
664 rply.rm_reply.rp_stat = MSG_DENIED;
665 rply.rjcted_rply.rj_stat = AUTH_ERROR;
666 rply.rjcted_rply.rj_why = why;
667
668 if (xprt->xp_pool->sp_rcache)
669 replay_setreply(xprt->xp_pool->sp_rcache,
670 &rply, svc_getrpccaller(rqstp), NULL);
671
672 svc_sendreply_common(rqstp, &rply, NULL);
673 }
674
675 /*
676 * Auth too weak error reply
677 */
678 void
679 svcerr_weakauth(struct svc_req *rqstp)
680 {
681
682 svcerr_auth(rqstp, AUTH_TOOWEAK);
683 }
684
685 /*
686 * Program unavailable error reply
687 */
688 void
689 svcerr_noprog(struct svc_req *rqstp)
690 {
691 SVCXPRT *xprt = rqstp->rq_xprt;
692 struct rpc_msg rply;
693
694 rply.rm_xid = rqstp->rq_xid;
695 rply.rm_direction = REPLY;
696 rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
699
700 if (xprt->xp_pool->sp_rcache)
701 replay_setreply(xprt->xp_pool->sp_rcache,
702 &rply, svc_getrpccaller(rqstp), NULL);
703
704 svc_sendreply_common(rqstp, &rply, NULL);
705 }
706
707 /*
708 * Program version mismatch error reply
709 */
710 void
711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
712 {
713 SVCXPRT *xprt = rqstp->rq_xprt;
714 struct rpc_msg rply;
715
716 rply.rm_xid = rqstp->rq_xid;
717 rply.rm_direction = REPLY;
718 rply.rm_reply.rp_stat = MSG_ACCEPTED;
719 rply.acpted_rply.ar_verf = rqstp->rq_verf;
720 rply.acpted_rply.ar_stat = PROG_MISMATCH;
721 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
723
724 if (xprt->xp_pool->sp_rcache)
725 replay_setreply(xprt->xp_pool->sp_rcache,
726 &rply, svc_getrpccaller(rqstp), NULL);
727
728 svc_sendreply_common(rqstp, &rply, NULL);
729 }
730
731 /*
732 * Allocate a new server transport structure. All fields are
733 * initialized to zero and xp_p3 is initialized to point at an
734 * extension structure to hold various flags and authentication
735 * parameters.
736 */
737 SVCXPRT *
738 svc_xprt_alloc()
739 {
740 SVCXPRT *xprt;
741 SVCXPRT_EXT *ext;
742
743 xprt = mem_alloc(sizeof(SVCXPRT));
744 memset(xprt, 0, sizeof(SVCXPRT));
745 ext = mem_alloc(sizeof(SVCXPRT_EXT));
746 memset(ext, 0, sizeof(SVCXPRT_EXT));
747 xprt->xp_p3 = ext;
748 refcount_init(&xprt->xp_refs, 1);
749
750 return (xprt);
751 }
752
753 /*
754 * Free a server transport structure.
755 */
756 void
757 svc_xprt_free(xprt)
758 SVCXPRT *xprt;
759 {
760
761 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762 mem_free(xprt, sizeof(SVCXPRT));
763 }
764
765 /* ******************* SERVER INPUT STUFF ******************* */
766
767 /*
768 * Read RPC requests from a transport and queue them to be
769 * executed. We handle authentication and replay cache replies here.
770 * Actually dispatching the RPC is deferred till svc_executereq.
771 */
772 static enum xprt_stat
773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
774 {
775 SVCPOOL *pool = xprt->xp_pool;
776 struct svc_req *r;
777 struct rpc_msg msg;
778 struct mbuf *args;
779 enum xprt_stat stat;
780
781 /* now receive msgs from xprtprt (support batch calls) */
782 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
783
784 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
788 enum auth_stat why;
789
790 /*
791 * Handle replays and authenticate before queuing the
792 * request to be executed.
793 */
794 SVC_ACQUIRE(xprt);
795 r->rq_xprt = xprt;
796 if (pool->sp_rcache) {
797 struct rpc_msg repmsg;
798 struct mbuf *repbody;
799 enum replay_state rs;
800 rs = replay_find(pool->sp_rcache, &msg,
801 svc_getrpccaller(r), &repmsg, &repbody);
802 switch (rs) {
803 case RS_NEW:
804 break;
805 case RS_DONE:
806 SVC_REPLY(xprt, &repmsg, r->rq_addr,
807 repbody);
808 if (r->rq_addr) {
809 free(r->rq_addr, M_SONAME);
810 r->rq_addr = NULL;
811 }
812 goto call_done;
813
814 default:
815 goto call_done;
816 }
817 }
818
819 r->rq_xid = msg.rm_xid;
820 r->rq_prog = msg.rm_call.cb_prog;
821 r->rq_vers = msg.rm_call.cb_vers;
822 r->rq_proc = msg.rm_call.cb_proc;
823 r->rq_size = sizeof(*r) + m_length(args, NULL);
824 r->rq_args = args;
825 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
826 /*
827 * RPCSEC_GSS uses this return code
828 * for requests that form part of its
829 * context establishment protocol and
830 * should not be dispatched to the
831 * application.
832 */
833 if (why != RPCSEC_GSS_NODISPATCH)
834 svcerr_auth(r, why);
835 goto call_done;
836 }
837
838 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
839 svcerr_decode(r);
840 goto call_done;
841 }
842
843 /*
844 * Everything checks out, return request to caller.
845 */
846 *rqstp_ret = r;
847 r = NULL;
848 }
849 call_done:
850 if (r) {
851 svc_freereq(r);
852 r = NULL;
853 }
854 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855 xprt_unregister(xprt);
856 }
857
858 return (stat);
859 }
860
861 static void
862 svc_executereq(struct svc_req *rqstp)
863 {
864 SVCXPRT *xprt = rqstp->rq_xprt;
865 SVCPOOL *pool = xprt->xp_pool;
866 int prog_found;
867 rpcvers_t low_vers;
868 rpcvers_t high_vers;
869 struct svc_callout *s;
870
871 /* now match message with a registered service*/
872 prog_found = FALSE;
873 low_vers = (rpcvers_t) -1L;
874 high_vers = (rpcvers_t) 0L;
875 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876 if (s->sc_prog == rqstp->rq_prog) {
877 if (s->sc_vers == rqstp->rq_vers) {
878 /*
879 * We hand ownership of r to the
880 * dispatch method - they must call
881 * svc_freereq.
882 */
883 (*s->sc_dispatch)(rqstp, xprt);
884 return;
885 } /* found correct version */
886 prog_found = TRUE;
887 if (s->sc_vers < low_vers)
888 low_vers = s->sc_vers;
889 if (s->sc_vers > high_vers)
890 high_vers = s->sc_vers;
891 } /* found correct program */
892 }
893
894 /*
895 * if we got here, the program or version
896 * is not served ...
897 */
898 if (prog_found)
899 svcerr_progvers(rqstp, low_vers, high_vers);
900 else
901 svcerr_noprog(rqstp);
902
903 svc_freereq(rqstp);
904 }
905
906 static void
907 svc_checkidle(SVCPOOL *pool)
908 {
909 SVCXPRT *xprt, *nxprt;
910 time_t timo;
911 struct svcxprt_list cleanup;
912
913 TAILQ_INIT(&cleanup);
914 TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
915 /*
916 * Only some transports have idle timers. Don't time
917 * something out which is just waking up.
918 */
919 if (!xprt->xp_idletimeout || xprt->xp_thread)
920 continue;
921
922 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923 if (time_uptime > timo) {
924 xprt_unregister_locked(xprt);
925 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
926 }
927 }
928
929 mtx_unlock(&pool->sp_lock);
930 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
931 SVC_RELEASE(xprt);
932 }
933 mtx_lock(&pool->sp_lock);
934
935 }
936
937 static void
938 svc_assign_waiting_sockets(SVCPOOL *pool)
939 {
940 SVCXPRT *xprt;
941
942 TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943 if (!xprt->xp_thread) {
944 xprt_assignthread(xprt);
945 }
946 }
947 }
948
949 static bool_t
950 svc_request_space_available(SVCPOOL *pool)
951 {
952
953 mtx_assert(&pool->sp_lock, MA_OWNED);
954
955 if (pool->sp_space_throttled) {
956 /*
957 * Below the low-water yet? If so, assign any waiting sockets.
958 */
959 if (pool->sp_space_used < pool->sp_space_low) {
960 pool->sp_space_throttled = FALSE;
961 svc_assign_waiting_sockets(pool);
962 return TRUE;
963 }
964
965 return FALSE;
966 } else {
967 if (pool->sp_space_used
968 >= pool->sp_space_high) {
969 pool->sp_space_throttled = TRUE;
970 pool->sp_space_throttle_count++;
971 return FALSE;
972 }
973
974 return TRUE;
975 }
976 }
977
978 static void
979 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
980 {
981 SVCTHREAD *st, *stpref;
982 SVCXPRT *xprt;
983 enum xprt_stat stat;
984 struct svc_req *rqstp;
985 int error;
986
987 st = mem_alloc(sizeof(*st));
988 st->st_xprt = NULL;
989 STAILQ_INIT(&st->st_reqs);
990 cv_init(&st->st_cond, "rpcsvc");
991
992 mtx_lock(&pool->sp_lock);
993 LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
994
995 /*
996 * If we are a new thread which was spawned to cope with
997 * increased load, set the state back to SVCPOOL_ACTIVE.
998 */
999 if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000 pool->sp_state = SVCPOOL_ACTIVE;
1001
1002 while (pool->sp_state != SVCPOOL_CLOSING) {
1003 /*
1004 * Check for idle transports once per second.
1005 */
1006 if (time_uptime > pool->sp_lastidlecheck) {
1007 pool->sp_lastidlecheck = time_uptime;
1008 svc_checkidle(pool);
1009 }
1010
1011 xprt = st->st_xprt;
1012 if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1013 /*
1014 * Enforce maxthreads count.
1015 */
1016 if (pool->sp_threadcount > pool->sp_maxthreads)
1017 break;
1018
1019 /*
1020 * Before sleeping, see if we can find an
1021 * active transport which isn't being serviced
1022 * by a thread.
1023 */
1024 if (svc_request_space_available(pool)) {
1025 TAILQ_FOREACH(xprt, &pool->sp_active,
1026 xp_alink) {
1027 if (!xprt->xp_thread) {
1028 SVC_ACQUIRE(xprt);
1029 xprt->xp_thread = st;
1030 st->st_xprt = xprt;
1031 break;
1032 }
1033 }
1034 }
1035 if (st->st_xprt)
1036 continue;
1037
1038 LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039 error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1040 5 * hz);
1041 LIST_REMOVE(st, st_ilink);
1042
1043 /*
1044 * Reduce worker thread count when idle.
1045 */
1046 if (error == EWOULDBLOCK) {
1047 if (!ismaster
1048 && (pool->sp_threadcount
1049 > pool->sp_minthreads)
1050 && !st->st_xprt
1051 && STAILQ_EMPTY(&st->st_reqs))
1052 break;
1053 }
1054 if (error == EWOULDBLOCK)
1055 continue;
1056 if (error) {
1057 if (pool->sp_state != SVCPOOL_CLOSING) {
1058 mtx_unlock(&pool->sp_lock);
1059 svc_exit(pool);
1060 mtx_lock(&pool->sp_lock);
1061 }
1062 break;
1063 }
1064
1065 if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066 pool->sp_state = SVCPOOL_THREADSTARTING;
1067 pool->sp_lastcreatetime = time_uptime;
1068 mtx_unlock(&pool->sp_lock);
1069 svc_new_thread(pool);
1070 mtx_lock(&pool->sp_lock);
1071 }
1072 continue;
1073 }
1074
1075 if (xprt) {
1076 /*
1077 * Drain the transport socket and queue up any
1078 * RPCs.
1079 */
1080 xprt->xp_lastactive = time_uptime;
1081 stat = XPRT_IDLE;
1082 do {
1083 if (!svc_request_space_available(pool))
1084 break;
1085 rqstp = NULL;
1086 mtx_unlock(&pool->sp_lock);
1087 stat = svc_getreq(xprt, &rqstp);
1088 mtx_lock(&pool->sp_lock);
1089 if (rqstp) {
1090 /*
1091 * See if the application has
1092 * a preference for some other
1093 |