FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c
1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD$");
37
38 /*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66
67 #include <rpc/rpc_com.h>
68
69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
79
80 /* *************** SVCXPRT related stuff **************** */
81
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86 SVCPOOL*
87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88 {
89 SVCPOOL *pool;
90 SVCGROUP *grp;
91 int g;
92
93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96 pool->sp_name = name;
97 pool->sp_state = SVCPOOL_INIT;
98 pool->sp_proc = NULL;
99 TAILQ_INIT(&pool->sp_callouts);
100 TAILQ_INIT(&pool->sp_lcallouts);
101 pool->sp_minthreads = 1;
102 pool->sp_maxthreads = 1;
103 pool->sp_groupcount = 1;
104 for (g = 0; g < SVC_MAXGROUPS; g++) {
105 grp = &pool->sp_groups[g];
106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107 grp->sg_pool = pool;
108 grp->sg_state = SVCPOOL_ACTIVE;
109 TAILQ_INIT(&grp->sg_xlist);
110 TAILQ_INIT(&grp->sg_active);
111 LIST_INIT(&grp->sg_idlethreads);
112 grp->sg_minthreads = 1;
113 grp->sg_maxthreads = 1;
114 }
115
116 /*
117 * Don't use more than a quarter of mbuf clusters. Nota bene:
118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 * on LP64 architectures, so cast to u_long to avoid undefined
120 * behavior. (ILP32 architectures cannot have nmbclusters
121 * large enough to overflow for other reasons.)
122 */
123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126 sysctl_ctx_init(&pool->sp_sysctl);
127 if (sysctl_base) {
128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
130 pool, 0, svcpool_minthread_sysctl, "I",
131 "Minimal number of threads");
132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
134 pool, 0, svcpool_maxthread_sysctl, "I",
135 "Maximal number of threads");
136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "threads", CTLTYPE_INT | CTLFLAG_RD,
138 pool, 0, svcpool_threads_sysctl, "I",
139 "Current number of threads");
140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 "Number of thread groups");
143
144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 "request_space_used", CTLFLAG_RD,
146 &pool->sp_space_used,
147 "Space in parsed but not handled requests.");
148
149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 "request_space_used_highest", CTLFLAG_RD,
151 &pool->sp_space_used_highest,
152 "Highest space used since reboot.");
153
154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 "request_space_high", CTLFLAG_RW,
156 &pool->sp_space_high,
157 "Maximum space in parsed but not handled requests.");
158
159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 "request_space_low", CTLFLAG_RW,
161 &pool->sp_space_low,
162 "Low water mark for request space.");
163
164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 "request_space_throttled", CTLFLAG_RD,
166 &pool->sp_space_throttled, 0,
167 "Whether nfs requests are currently throttled");
168
169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 "request_space_throttle_count", CTLFLAG_RD,
171 &pool->sp_space_throttle_count, 0,
172 "Count of times throttling based on request space has occurred");
173 }
174
175 return pool;
176 }
177
178 /*
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
181 */
182 static void
183 svcpool_cleanup(SVCPOOL *pool)
184 {
185 SVCGROUP *grp;
186 SVCXPRT *xprt, *nxprt;
187 struct svc_callout *s;
188 struct svc_loss_callout *sl;
189 struct svcxprt_list cleanup;
190 int g;
191
192 TAILQ_INIT(&cleanup);
193
194 for (g = 0; g < SVC_MAXGROUPS; g++) {
195 grp = &pool->sp_groups[g];
196 mtx_lock(&grp->sg_lock);
197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 xprt_unregister_locked(xprt);
199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200 }
201 mtx_unlock(&grp->sg_lock);
202 }
203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204 SVC_RELEASE(xprt);
205 }
206
207 mtx_lock(&pool->sp_lock);
208 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
209 mtx_unlock(&pool->sp_lock);
210 svc_unreg(pool, s->sc_prog, s->sc_vers);
211 mtx_lock(&pool->sp_lock);
212 }
213 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
214 mtx_unlock(&pool->sp_lock);
215 svc_loss_unreg(pool, sl->slc_dispatch);
216 mtx_lock(&pool->sp_lock);
217 }
218 mtx_unlock(&pool->sp_lock);
219 }
220
221 void
222 svcpool_destroy(SVCPOOL *pool)
223 {
224 SVCGROUP *grp;
225 int g;
226
227 svcpool_cleanup(pool);
228
229 for (g = 0; g < SVC_MAXGROUPS; g++) {
230 grp = &pool->sp_groups[g];
231 mtx_destroy(&grp->sg_lock);
232 }
233 mtx_destroy(&pool->sp_lock);
234
235 if (pool->sp_rcache)
236 replay_freecache(pool->sp_rcache);
237
238 sysctl_ctx_free(&pool->sp_sysctl);
239 free(pool, M_RPC);
240 }
241
242 /*
243 * Similar to svcpool_destroy(), except that it does not destroy the actual
244 * data structures. As such, "pool" may be used again.
245 */
246 void
247 svcpool_close(SVCPOOL *pool)
248 {
249 SVCGROUP *grp;
250 int g;
251
252 svcpool_cleanup(pool);
253
254 /* Now, initialize the pool's state for a fresh svc_run() call. */
255 mtx_lock(&pool->sp_lock);
256 pool->sp_state = SVCPOOL_INIT;
257 mtx_unlock(&pool->sp_lock);
258 for (g = 0; g < SVC_MAXGROUPS; g++) {
259 grp = &pool->sp_groups[g];
260 mtx_lock(&grp->sg_lock);
261 grp->sg_state = SVCPOOL_ACTIVE;
262 mtx_unlock(&grp->sg_lock);
263 }
264 }
265
266 /*
267 * Sysctl handler to get the present thread count on a pool
268 */
269 static int
270 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
271 {
272 SVCPOOL *pool;
273 int threads, error, g;
274
275 pool = oidp->oid_arg1;
276 threads = 0;
277 mtx_lock(&pool->sp_lock);
278 for (g = 0; g < pool->sp_groupcount; g++)
279 threads += pool->sp_groups[g].sg_threadcount;
280 mtx_unlock(&pool->sp_lock);
281 error = sysctl_handle_int(oidp, &threads, 0, req);
282 return (error);
283 }
284
285 /*
286 * Sysctl handler to set the minimum thread count on a pool
287 */
288 static int
289 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
290 {
291 SVCPOOL *pool;
292 int newminthreads, error, g;
293
294 pool = oidp->oid_arg1;
295 newminthreads = pool->sp_minthreads;
296 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
297 if (error == 0 && newminthreads != pool->sp_minthreads) {
298 if (newminthreads > pool->sp_maxthreads)
299 return (EINVAL);
300 mtx_lock(&pool->sp_lock);
301 pool->sp_minthreads = newminthreads;
302 for (g = 0; g < pool->sp_groupcount; g++) {
303 pool->sp_groups[g].sg_minthreads = max(1,
304 pool->sp_minthreads / pool->sp_groupcount);
305 }
306 mtx_unlock(&pool->sp_lock);
307 }
308 return (error);
309 }
310
311 /*
312 * Sysctl handler to set the maximum thread count on a pool
313 */
314 static int
315 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
316 {
317 SVCPOOL *pool;
318 int newmaxthreads, error, g;
319
320 pool = oidp->oid_arg1;
321 newmaxthreads = pool->sp_maxthreads;
322 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
323 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
324 if (newmaxthreads < pool->sp_minthreads)
325 return (EINVAL);
326 mtx_lock(&pool->sp_lock);
327 pool->sp_maxthreads = newmaxthreads;
328 for (g = 0; g < pool->sp_groupcount; g++) {
329 pool->sp_groups[g].sg_maxthreads = max(1,
330 pool->sp_maxthreads / pool->sp_groupcount);
331 }
332 mtx_unlock(&pool->sp_lock);
333 }
334 return (error);
335 }
336
337 /*
338 * Activate a transport handle.
339 */
340 void
341 xprt_register(SVCXPRT *xprt)
342 {
343 SVCPOOL *pool = xprt->xp_pool;
344 SVCGROUP *grp;
345 int g;
346
347 SVC_ACQUIRE(xprt);
348 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
349 xprt->xp_group = grp = &pool->sp_groups[g];
350 mtx_lock(&grp->sg_lock);
351 xprt->xp_registered = TRUE;
352 xprt->xp_active = FALSE;
353 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
354 mtx_unlock(&grp->sg_lock);
355 }
356
357 /*
358 * De-activate a transport handle. Note: the locked version doesn't
359 * release the transport - caller must do that after dropping the pool
360 * lock.
361 */
362 static void
363 xprt_unregister_locked(SVCXPRT *xprt)
364 {
365 SVCGROUP *grp = xprt->xp_group;
366
367 mtx_assert(&grp->sg_lock, MA_OWNED);
368 KASSERT(xprt->xp_registered == TRUE,
369 ("xprt_unregister_locked: not registered"));
370 xprt_inactive_locked(xprt);
371 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
372 xprt->xp_registered = FALSE;
373 }
374
375 void
376 xprt_unregister(SVCXPRT *xprt)
377 {
378 SVCGROUP *grp = xprt->xp_group;
379
380 mtx_lock(&grp->sg_lock);
381 if (xprt->xp_registered == FALSE) {
382 /* Already unregistered by another thread */
383 mtx_unlock(&grp->sg_lock);
384 return;
385 }
386 xprt_unregister_locked(xprt);
387 mtx_unlock(&grp->sg_lock);
388
389 SVC_RELEASE(xprt);
390 }
391
392 /*
393 * Attempt to assign a service thread to this transport.
394 */
395 static int
396 xprt_assignthread(SVCXPRT *xprt)
397 {
398 SVCGROUP *grp = xprt->xp_group;
399 SVCTHREAD *st;
400
401 mtx_assert(&grp->sg_lock, MA_OWNED);
402 st = LIST_FIRST(&grp->sg_idlethreads);
403 if (st) {
404 LIST_REMOVE(st, st_ilink);
405 SVC_ACQUIRE(xprt);
406 xprt->xp_thread = st;
407 st->st_xprt = xprt;
408 cv_signal(&st->st_cond);
409 return (TRUE);
410 } else {
411 /*
412 * See if we can create a new thread. The
413 * actual thread creation happens in
414 * svc_run_internal because our locking state
415 * is poorly defined (we are typically called
416 * from a socket upcall). Don't create more
417 * than one thread per second.
418 */
419 if (grp->sg_state == SVCPOOL_ACTIVE
420 && grp->sg_lastcreatetime < time_uptime
421 && grp->sg_threadcount < grp->sg_maxthreads) {
422 grp->sg_state = SVCPOOL_THREADWANTED;
423 }
424 }
425 return (FALSE);
426 }
427
428 void
429 xprt_active(SVCXPRT *xprt)
430 {
431 SVCGROUP *grp = xprt->xp_group;
432
433 mtx_lock(&grp->sg_lock);
434
435 if (!xprt->xp_registered) {
436 /*
437 * Race with xprt_unregister - we lose.
438 */
439 mtx_unlock(&grp->sg_lock);
440 return;
441 }
442
443 if (!xprt->xp_active) {
444 xprt->xp_active = TRUE;
445 if (xprt->xp_thread == NULL) {
446 if (!svc_request_space_available(xprt->xp_pool) ||
447 !xprt_assignthread(xprt))
448 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
449 xp_alink);
450 }
451 }
452
453 mtx_unlock(&grp->sg_lock);
454 }
455
456 void
457 xprt_inactive_locked(SVCXPRT *xprt)
458 {
459 SVCGROUP *grp = xprt->xp_group;
460
461 mtx_assert(&grp->sg_lock, MA_OWNED);
462 if (xprt->xp_active) {
463 if (xprt->xp_thread == NULL)
464 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
465 xprt->xp_active = FALSE;
466 }
467 }
468
469 void
470 xprt_inactive(SVCXPRT *xprt)
471 {
472 SVCGROUP *grp = xprt->xp_group;
473
474 mtx_lock(&grp->sg_lock);
475 xprt_inactive_locked(xprt);
476 mtx_unlock(&grp->sg_lock);
477 }
478
479 /*
480 * Variant of xprt_inactive() for use only when sure that port is
481 * assigned to thread. For example, withing receive handlers.
482 */
483 void
484 xprt_inactive_self(SVCXPRT *xprt)
485 {
486
487 KASSERT(xprt->xp_thread != NULL,
488 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
489 xprt->xp_active = FALSE;
490 }
491
492 /*
493 * Add a service program to the callout list.
494 * The dispatch routine will be called when a rpc request for this
495 * program number comes in.
496 */
497 bool_t
498 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
499 void (*dispatch)(struct svc_req *, SVCXPRT *),
500 const struct netconfig *nconf)
501 {
502 SVCPOOL *pool = xprt->xp_pool;
503 struct svc_callout *s;
504 char *netid = NULL;
505 int flag = 0;
506
507 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
508
509 if (xprt->xp_netid) {
510 netid = strdup(xprt->xp_netid, M_RPC);
511 flag = 1;
512 } else if (nconf && nconf->nc_netid) {
513 netid = strdup(nconf->nc_netid, M_RPC);
514 flag = 1;
515 } /* must have been created with svc_raw_create */
516 if ((netid == NULL) && (flag == 1)) {
517 return (FALSE);
518 }
519
520 mtx_lock(&pool->sp_lock);
521 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
522 if (netid)
523 free(netid, M_RPC);
524 if (s->sc_dispatch == dispatch)
525 goto rpcb_it; /* he is registering another xptr */
526 mtx_unlock(&pool->sp_lock);
527 return (FALSE);
528 }
529 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
530 if (s == NULL) {
531 if (netid)
532 free(netid, M_RPC);
533 mtx_unlock(&pool->sp_lock);
534 return (FALSE);
535 }
536
537 s->sc_prog = prog;
538 s->sc_vers = vers;
539 s->sc_dispatch = dispatch;
540 s->sc_netid = netid;
541 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
542
543 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
544 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
545
546 rpcb_it:
547 mtx_unlock(&pool->sp_lock);
548 /* now register the information with the local binder service */
549 if (nconf) {
550 bool_t dummy;
551 struct netconfig tnc;
552 struct netbuf nb;
553 tnc = *nconf;
554 nb.buf = &xprt->xp_ltaddr;
555 nb.len = xprt->xp_ltaddr.ss_len;
556 dummy = rpcb_set(prog, vers, &tnc, &nb);
557 return (dummy);
558 }
559 return (TRUE);
560 }
561
562 /*
563 * Remove a service program from the callout list.
564 */
565 void
566 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
567 {
568 struct svc_callout *s;
569
570 /* unregister the information anyway */
571 (void) rpcb_unset(prog, vers, NULL);
572 mtx_lock(&pool->sp_lock);
573 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
574 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
575 if (s->sc_netid)
576 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
577 mem_free(s, sizeof (struct svc_callout));
578 }
579 mtx_unlock(&pool->sp_lock);
580 }
581
582 /*
583 * Add a service connection loss program to the callout list.
584 * The dispatch routine will be called when some port in ths pool die.
585 */
586 bool_t
587 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
588 {
589 SVCPOOL *pool = xprt->xp_pool;
590 struct svc_loss_callout *s;
591
592 mtx_lock(&pool->sp_lock);
593 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
594 if (s->slc_dispatch == dispatch)
595 break;
596 }
597 if (s != NULL) {
598 mtx_unlock(&pool->sp_lock);
599 return (TRUE);
600 }
601 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
602 if (s == NULL) {
603 mtx_unlock(&pool->sp_lock);
604 return (FALSE);
605 }
606 s->slc_dispatch = dispatch;
607 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
608 mtx_unlock(&pool->sp_lock);
609 return (TRUE);
610 }
611
612 /*
613 * Remove a service connection loss program from the callout list.
614 */
615 void
616 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
617 {
618 struct svc_loss_callout *s;
619
620 mtx_lock(&pool->sp_lock);
621 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
622 if (s->slc_dispatch == dispatch) {
623 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
624 free(s, M_RPC);
625 break;
626 }
627 }
628 mtx_unlock(&pool->sp_lock);
629 }
630
631 /* ********************** CALLOUT list related stuff ************* */
632
633 /*
634 * Search the callout list for a program number, return the callout
635 * struct.
636 */
637 static struct svc_callout *
638 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
639 {
640 struct svc_callout *s;
641
642 mtx_assert(&pool->sp_lock, MA_OWNED);
643 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
644 if (s->sc_prog == prog && s->sc_vers == vers
645 && (netid == NULL || s->sc_netid == NULL ||
646 strcmp(netid, s->sc_netid) == 0))
647 break;
648 }
649
650 return (s);
651 }
652
653 /* ******************* REPLY GENERATION ROUTINES ************ */
654
655 static bool_t
656 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
657 struct mbuf *body)
658 {
659 SVCXPRT *xprt = rqstp->rq_xprt;
660 bool_t ok;
661
662 if (rqstp->rq_args) {
663 m_freem(rqstp->rq_args);
664 rqstp->rq_args = NULL;
665 }
666
667 if (xprt->xp_pool->sp_rcache)
668 replay_setreply(xprt->xp_pool->sp_rcache,
669 rply, svc_getrpccaller(rqstp), body);
670
671 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
672 return (FALSE);
673
674 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
675 if (rqstp->rq_addr) {
676 free(rqstp->rq_addr, M_SONAME);
677 rqstp->rq_addr = NULL;
678 }
679
680 return (ok);
681 }
682
683 /*
684 * Send a reply to an rpc request
685 */
686 bool_t
687 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
688 {
689 struct rpc_msg rply;
690 struct mbuf *m;
691 XDR xdrs;
692 bool_t ok;
693
694 rply.rm_xid = rqstp->rq_xid;
695 rply.rm_direction = REPLY;
696 rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 rply.acpted_rply.ar_stat = SUCCESS;
699 rply.acpted_rply.ar_results.where = NULL;
700 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
701
702 m = m_getcl(M_WAITOK, MT_DATA, 0);
703 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
704 ok = xdr_results(&xdrs, xdr_location);
705 XDR_DESTROY(&xdrs);
706
707 if (ok) {
708 return (svc_sendreply_common(rqstp, &rply, m));
709 } else {
710 m_freem(m);
711 return (FALSE);
712 }
713 }
714
715 bool_t
716 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
717 {
718 struct rpc_msg rply;
719
720 rply.rm_xid = rqstp->rq_xid;
721 rply.rm_direction = REPLY;
722 rply.rm_reply.rp_stat = MSG_ACCEPTED;
723 rply.acpted_rply.ar_verf = rqstp->rq_verf;
724 rply.acpted_rply.ar_stat = SUCCESS;
725 rply.acpted_rply.ar_results.where = NULL;
726 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
727
728 return (svc_sendreply_common(rqstp, &rply, m));
729 }
730
731 /*
732 * No procedure error reply
733 */
734 void
735 svcerr_noproc(struct svc_req *rqstp)
736 {
737 SVCXPRT *xprt = rqstp->rq_xprt;
738 struct rpc_msg rply;
739
740 rply.rm_xid = rqstp->rq_xid;
741 rply.rm_direction = REPLY;
742 rply.rm_reply.rp_stat = MSG_ACCEPTED;
743 rply.acpted_rply.ar_verf = rqstp->rq_verf;
744 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
745
746 if (xprt->xp_pool->sp_rcache)
747 replay_setreply(xprt->xp_pool->sp_rcache,
748 &rply, svc_getrpccaller(rqstp), NULL);
749
750 svc_sendreply_common(rqstp, &rply, NULL);
751 }
752
753 /*
754 * Can't decode args error reply
755 */
756 void
757 svcerr_decode(struct svc_req *rqstp)
758 {
759 SVCXPRT *xprt = rqstp->rq_xprt;
760 struct rpc_msg rply;
761
762 rply.rm_xid = rqstp->rq_xid;
763 rply.rm_direction = REPLY;
764 rply.rm_reply.rp_stat = MSG_ACCEPTED;
765 rply.acpted_rply.ar_verf = rqstp->rq_verf;
766 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
767
768 if (xprt->xp_pool->sp_rcache)
769 replay_setreply(xprt->xp_pool->sp_rcache,
770 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
771
772 svc_sendreply_common(rqstp, &rply, NULL);
773 }
774
775 /*
776 * Some system error
777 */
778 void
779 svcerr_systemerr(struct svc_req *rqstp)
780 {
781 SVCXPRT *xprt = rqstp->rq_xprt;
782 struct rpc_msg rply;
783
784 rply.rm_xid = rqstp->rq_xid;
785 rply.rm_direction = REPLY;
786 rply.rm_reply.rp_stat = MSG_ACCEPTED;
787 rply.acpted_rply.ar_verf = rqstp->rq_verf;
788 rply.acpted_rply.ar_stat = SYSTEM_ERR;
789
790 if (xprt->xp_pool->sp_rcache)
791 replay_setreply(xprt->xp_pool->sp_rcache,
792 &rply, svc_getrpccaller(rqstp), NULL);
793
794 svc_sendreply_common(rqstp, &rply, NULL);
795 }
796
797 /*
798 * Authentication error reply
799 */
800 void
801 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
802 {
803 SVCXPRT *xprt = rqstp->rq_xprt;
804 struct rpc_msg rply;
805
806 rply.rm_xid = rqstp->rq_xid;
807 rply.rm_direction = REPLY;
808 rply.rm_reply.rp_stat = MSG_DENIED;
809 rply.rjcted_rply.rj_stat = AUTH_ERROR;
810 rply.rjcted_rply.rj_why = why;
811
812 if (xprt->xp_pool->sp_rcache)
813 replay_setreply(xprt->xp_pool->sp_rcache,
814 &rply, svc_getrpccaller(rqstp), NULL);
815
816 svc_sendreply_common(rqstp, &rply, NULL);
817 }
818
819 /*
820 * Auth too weak error reply
821 */
822 void
823 svcerr_weakauth(struct svc_req *rqstp)
824 {
825
826 svcerr_auth(rqstp, AUTH_TOOWEAK);
827 }
828
829 /*
830 * Program unavailable error reply
831 */
832 void
833 svcerr_noprog(struct svc_req *rqstp)
834 {
835 SVCXPRT *xprt = rqstp->rq_xprt;
836 struct rpc_msg rply;
837
838 rply.rm_xid = rqstp->rq_xid;
839 rply.rm_direction = REPLY;
840 rply.rm_reply.rp_stat = MSG_ACCEPTED;
841 rply.acpted_rply.ar_verf = rqstp->rq_verf;
842 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
843
844 if (xprt->xp_pool->sp_rcache)
845 replay_setreply(xprt->xp_pool->sp_rcache,
846 &rply, svc_getrpccaller(rqstp), NULL);
847
848 svc_sendreply_common(rqstp, &rply, NULL);
849 }
850
851 /*
852 * Program version mismatch error reply
853 */
854 void
855 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
856 {
857 SVCXPRT *xprt = rqstp->rq_xprt;
858 struct rpc_msg rply;
859
860 rply.rm_xid = rqstp->rq_xid;
861 rply.rm_direction = REPLY;
862 rply.rm_reply.rp_stat = MSG_ACCEPTED;
863 rply.acpted_rply.ar_verf = rqstp->rq_verf;
864 rply.acpted_rply.ar_stat = PROG_MISMATCH;
865 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
866 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
867
868 if (xprt->xp_pool->sp_rcache)
869 replay_setreply(xprt->xp_pool->sp_rcache,
870 &rply, svc_getrpccaller(rqstp), NULL);
871
872 svc_sendreply_common(rqstp, &rply, NULL);
873 }
874
875 /*
876 * Allocate a new server transport structure. All fields are
877 * initialized to zero and xp_p3 is initialized to point at an
878 * extension structure to hold various flags and authentication
879 * parameters.
880 */
881 SVCXPRT *
882 svc_xprt_alloc(void)
883 {
884 SVCXPRT *xprt;
885 SVCXPRT_EXT *ext;
886
887 xprt = mem_alloc(sizeof(SVCXPRT));
888 ext = mem_alloc(sizeof(SVCXPRT_EXT));
889 xprt->xp_p3 = ext;
890 refcount_init(&xprt->xp_refs, 1);
891
892 return (xprt);
893 }
894
895 /*
896 * Free a server transport structure.
897 */
898 void
899 svc_xprt_free(SVCXPRT *xprt)
900 {
901
902 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
903 mem_free(xprt, sizeof(SVCXPRT));
904 }
905
906 /* ******************* SERVER INPUT STUFF ******************* */
907
908 /*
909 * Read RPC requests from a transport and queue them to be
910 * executed. We handle authentication and replay cache replies here.
911 * Actually dispatching the RPC is deferred till svc_executereq.
912 */
913 static enum xprt_stat
914 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
915 {
916 SVCPOOL *pool = xprt->xp_pool;
917 struct svc_req *r;
918 struct rpc_msg msg;
919 struct mbuf *args;
920 struct svc_loss_callout *s;
921 enum xprt_stat stat;
922
923 /* now receive msgs from xprtprt (support batch calls) */
924 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
925
926 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
927 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
928 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
929 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
930 enum auth_stat why;
931
932 /*
933 * Handle replays and authenticate before queuing the
934 * request to be executed.
935 */
936 SVC_ACQUIRE(xprt);
937 r->rq_xprt = xprt;
938 if (pool->sp_rcache) {
939 struct rpc_msg repmsg;
940 struct mbuf *repbody;
941 enum replay_state rs;
942 rs = replay_find(pool->sp_rcache, &msg,
943 svc_getrpccaller(r), &repmsg, &repbody);
944 switch (rs) {
945 case RS_NEW:
946 break;
947 case RS_DONE:
948 SVC_REPLY(xprt, &repmsg, r->rq_addr,
949 repbody, &r->rq_reply_seq);
950 if (r->rq_addr) {
951 free(r->rq_addr, M_SONAME);
952 r->rq_addr = NULL;
953 }
954 m_freem(args);
955 goto call_done;
956
957 default:
958 m_freem(args);
959 goto call_done;
960 }
961 }
962
963 r->rq_xid = msg.rm_xid;
964 r->rq_prog = msg.rm_call.cb_prog;
965 r->rq_vers = msg.rm_call.cb_vers;
966 r->rq_proc = msg.rm_call.cb_proc;
967 r->rq_size = sizeof(*r) + m_length(args, NULL);
968 r->rq_args = args;
969 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
970 /*
971 * RPCSEC_GSS uses this return code
972 * for requests that form part of its
973 * context establishment protocol and
974 * should not be dispatched to the
975 * application.
976 */
977 if (why != RPCSEC_GSS_NODISPATCH)
978 svcerr_auth(r, why);
979 goto call_done;
980 }
981
982 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
983 svcerr_decode(r);
984 goto call_done;
985 }
986
987 /*
988 * Everything checks out, return request to caller.
989 */
990 *rqstp_ret = r;
991 r = NULL;
992 }
993 call_done:
994 if (r) {
995 svc_freereq(r);
996 r = NULL;
997 }
998 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
999 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1000 (*s->slc_dispatch)(xprt);
1001 xprt_unregister(xprt);
1002 }
1003
1004 return (stat);
1005 }
1006
1007 static void
1008 svc_executereq(struct svc_req *rqstp)
1009 {
1010 SVCXPRT *xprt = rqstp->rq_xprt;
1011 SVCPOOL *pool = xprt->xp_pool;
1012 int prog_found;
1013 rpcvers_t low_vers;
1014 rpcvers_t high_vers;
1015 struct svc_callout *s;
1016
1017 /* now match message with a registered service*/
1018 prog_found = FALSE;
1019 low_vers = (rpcvers_t) -1L;
1020 high_vers = (rpcvers_t) 0L;
1021 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1022 if (s->sc_prog == rqstp->rq_prog) {
1023 if (s->sc_vers == rqstp->rq_vers) {
1024 /*
1025 * We hand ownership of r to the
1026 * dispatch method - they must call
1027 * svc_freereq.
1028 */
1029 (*s->sc_dispatch)(rqstp, xprt);
1030 return;
1031 } /* found correct version */
1032 prog_found = TRUE;
1033 if (s->sc_vers < low_vers)
1034 low_vers = s->sc_vers;
1035 if (s->sc_vers > high_vers)
1036 high_vers = s->sc_vers;
1037 } /* found correct program */
1038 }
1039
1040 /*
1041 * if we got here, the program or version
1042 * is not served ...
1043 */
1044 if (prog_found)
1045 svcerr_progvers(rqstp, low_vers, high_vers);
1046 else
1047 svcerr_noprog(rqstp);
1048
1049 svc_freereq(rqstp);
1050 }
1051
1052 static void
1053 svc_checkidle(SVCGROUP *grp)
1054 {
1055 SVCXPRT *xprt, *nxprt;
1056 time_t timo;
1057 struct svcxprt_list cleanup;
1058
1059 TAILQ_INIT(&cleanup);
1060 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1061 /*
1062 * Only some transports have idle timers. Don't time
1063 * something out which is just waking up.
1064 */
1065 if (!xprt->xp_idletimeout || xprt->xp_thread)
1066 continue;
1067
1068 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1069 if (time_uptime > timo) {
1070 xprt_unregister_locked(xprt);
1071 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1072 }
1073 }
1074
1075 mtx_unlock(&grp->sg_lock);
1076 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1077 SVC_RELEASE(xprt);
1078 }
1079 mtx_lock(&grp->sg_lock);
1080 }
1081
1082 static void
1083 svc_assign_waiting_sockets(SVCPOOL *pool)
1084 {
1085 SVCGROUP *grp;
1086 SVCXPRT *xprt;
1087 int g;
1088
1089 for (g = 0; g < pool->sp_groupcount; g++) {
1090 grp = &pool->sp_groups[g];
1091 mtx_lock(&grp->sg_lock);
1092 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1093 if (xprt_assignthread(xprt))
1094 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1095 else
1096 break;
1097 }
1098 mtx_unlock(&grp->sg_lock);
1099 }
1100 }
1101
1102 static void
1103 svc_change_space_used(SVCPOOL *pool, long delta)
1104 {
1105 unsigned long value;
1106
1107 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1108 if (delta > 0) {
1109 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1110 pool->sp_space_throttled = TRUE;
1111 pool->sp_space_throttle_count++;
1112 }
1113 if (value > pool->sp_space_used_highest)
1114 pool->sp_space_used_highest = value;
1115 } else {
1116 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1117 pool->sp_space_throttled = FALSE;
1118 svc_assign_waiting_sockets(pool);
1119 }
1120 }
1121 }
1122
1123 static bool_t
1124 svc_request_space_available(SVCPOOL *pool)
1125 {
1126
1127 if (pool->sp_space_throttled)
1128 return (FALSE);
1129 return (TRUE);
1130 }
1131
1132 static void
1133 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1134 {
1135 SVCPOOL *pool = grp->sg_pool;
1136 SVCTHREAD *st, *stpref;
1137 SVCXPRT *xprt;
1138 enum xprt_stat stat;
1139 struct svc_req *rqstp;
1140 struct proc *p;
1141 long sz;
1142 int error;
1143
1144 st = mem_alloc(sizeof(*st));
1145 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1146 st->st_pool = pool;
1147 st->st_xprt = NULL;
1148 STAILQ_INIT(&st->st_reqs);
1149 cv_init(&st->st_cond, "rpcsvc");
1150
1151 mtx_lock(&grp->sg_lock);
1152
1153 /*
1154 * If we are a new thread which was spawned to cope with
1155 * increased load, set the state back to SVCPOOL_ACTIVE.
1156 */
1157 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1158 grp->sg_state = SVCPOOL_ACTIVE;
1159
1160 while (grp->sg_state != SVCPOOL_CLOSING) {
1161 /*
1162 * Create new thread if requested.
1163 */
1164 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1165 grp->sg_state = SVCPOOL_THREADSTARTING;
1166 grp->sg_lastcreatetime = time_uptime;
1167 mtx_unlock(&grp->sg_lock);
1168 svc_new_thread(grp);
1169 mtx_lock(&grp->sg_lock);
1170 continue;
1171 }
1172
1173 /*
1174 * Check for idle transports once per second.
1175 */
1176 if (time_uptime > grp->sg_lastidlecheck) {
1177 grp->sg_lastidlecheck = time_uptime;
1178 svc_checkidle(grp);
1179 }
1180
1181 xprt = st->st_xprt;
1182 if (!xprt) {
1183 /*
1184 * Enforce maxthreads count.
1185 */
1186 if (!ismaster && grp->sg_threadcount >
1187 grp->sg_maxthreads)
1188 break;
1189
1190 /*
1191 * Before sleeping, see if we can find an
1192 * active transport which isn't being serviced
1193 * by a thread.
1194 */
1195 if (svc_request_space_available(pool) &&
1196 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1197 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1198 SVC_ACQUIRE(xprt);
1199 xprt->xp_thread = st;
1200 st->st_xprt = xprt;
1201 continue;
1202 }
1203
1204 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1205 if (ismaster || (!ismaster &&
1206 grp->sg_threadcount > grp->sg_minthreads))
1207 error = cv_timedwait_sig(&st->st_cond,
1208 &grp->sg_lock, 5 * hz);
1209 else
1210 error = cv_wait_sig(&st->st_cond,
1211 &grp->sg_lock);
1212 if (st->st_xprt == NULL)
1213 LIST_REMOVE(st, st_ilink);
1214
1215 /*
1216 * Reduce worker thread count when idle.
1217 */
1218 if (error == EWOULDBLOCK) {
1219 if (!ismaster
1220 && (grp->sg_threadcount
1221 > grp->sg_minthreads)
1222 && !st->st_xprt)
1223 break;
1224 } else if (error != 0) {
1225 KASSERT(error == EINTR || error == ERESTART,
1226 ("non-signal error %d", error));
1227 mtx_unlock(&grp->sg_lock);
1228 p = curproc;
1229 PROC_LOCK(p);
1230 if (P_SHOULDSTOP(p) ||
1231 (p->p_flag & P_TOTAL_STOP) != 0) {
1232 thread_suspend_check(0);
1233 PROC_UNLOCK(p);
1234 mtx_lock(&grp->sg_lock);
1235 } else {
1236 PROC_UNLOCK(p);
1237 svc_exit(pool);
1238 mtx_lock(&grp->sg_lock);
1239 break;
1240 }
1241 }
1242 continue;
1243 }
1244 mtx_unlock(&grp->sg_lock);
1245
1246 /*
1247 * Drain the transport socket and queue up any RPCs.
1248 */
1249 xprt->xp_lastactive = time_uptime;
1250 do {
1251 if (!svc_request_space_available(pool))
1252 break;
1253 rqstp = NULL;
1254 stat = svc_getreq(xprt, &rqstp);
1255 if (rqstp) {
1256 svc_change_space_used(pool, rqstp->rq_size);
1257 /*
1258 * See if the application has a preference
1259 * for some other thread.
1260 */
1261 if (pool->sp_assign) {
1262 stpref = pool->sp_assign(st, rqstp);
1263 rqstp->rq_thread = stpref;
1264 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1265 rqstp, rq_link);
1266 mtx_unlock(&stpref->st_lock);
1267 if (stpref != st)
1268 rqstp = NULL;
1269 } else {
1270 rqstp->rq_thread = st;
1271 STAILQ_INSERT_TAIL(&st->st_reqs,
1272 rqstp, rq_link);
1273 }
1274 }
1275 } while (rqstp == NULL && stat == XPRT_MOREREQS
1276 && grp->sg_state != SVCPOOL_CLOSING);
1277
1278 /*
1279 * Move this transport to the end of the active list to
1280 * ensure fairness when multiple transports are active.
1281 * If this was the last queued request, svc_getreq will end
1282 * up calling xprt_inactive to remove from the active list.
1283 */
1284 mtx_lock(&grp->sg_lock);
1285 xprt->xp_thread = NULL;
1286 st->st_xprt = NULL;
1287 if (xprt->xp_active) {
1288 if (!svc_request_space_available(pool) ||
1289 !xprt_assignthread(xprt))
1290 TAILQ_INSERT_TAIL(&grp->sg_active,
1291 xprt, xp_alink);
1292 }
1293 mtx_unlock(&grp->sg_lock);
1294 SVC_RELEASE(xprt);
1295
1296 /*
1297 * Execute what we have queued.
1298 */
1299 mtx_lock(&st->st_lock);
1300 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1301 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1302 mtx_unlock(&st->st_lock);
1303 sz = (long)rqstp->rq_size;
1304 svc_executereq(rqstp);
1305 svc_change_space_used(pool, -sz);
1306 mtx_lock(&st->st_lock);
1307 }
1308 mtx_unlock(&st->st_lock);
1309 mtx_lock(&grp->sg_lock);
1310 }
1311
1312 if (st->st_xprt) {
1313 xprt = st->st_xprt;
1314 st->st_xprt = NULL;
1315 SVC_RELEASE(xprt);
1316 }
1317 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1318 mtx_destroy(&st->st_lock);
1319 cv_destroy(&st->st_cond);
1320 mem_free(st, sizeof(*st));
1321
1322 grp->sg_threadcount--;
1323 if (!ismaster)
1324 wakeup(grp);
1325 mtx_unlock(&grp->sg_lock);
1326 }
1327
1328 static void
1329 svc_thread_start(void *arg)
1330 {
1331
1332 svc_run_internal((SVCGROUP *) arg, FALSE);
1333 kthread_exit();
1334 }
1335
1336 static void
1337 svc_new_thread(SVCGROUP *grp)
1338 {
1339 SVCPOOL *pool = grp->sg_pool;
1340 struct thread *td;
1341
1342 mtx_lock(&grp->sg_lock);
1343 grp->sg_threadcount++;
1344 mtx_unlock(&grp->sg_lock);
1345 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1346 "%s: service", pool->sp_name);
1347 }
1348
1349 void
1350 svc_run(SVCPOOL *pool)
1351 {
1352 int g, i;
1353 struct proc *p;
1354 struct thread *td;
1355 SVCGROUP *grp;
1356
1357 p = curproc;
1358 td = curthread;
1359 snprintf(td->td_name, sizeof(td->td_name),
1360 "%s: master", pool->sp_name);
1361 pool->sp_state = SVCPOOL_ACTIVE;
1362 pool->sp_proc = p;
1363
1364 /* Choose group count based on number of threads and CPUs. */
1365 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1366 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1367 for (g = 0; g < pool->sp_groupcount; g++) {
1368 grp = &pool->sp_groups[g];
1369 grp->sg_minthreads = max(1,
1370 pool->sp_minthreads / pool->sp_groupcount);
1371 grp->sg_maxthreads = max(1,
1372 pool->sp_maxthreads / pool->sp_groupcount);
1373 grp->sg_lastcreatetime = time_uptime;
1374 }
1375
1376 /* Starting threads */
1377 pool->sp_groups[0].sg_threadcount++;
1378 for (g = 0; g < pool->sp_groupcount; g++) {
1379 grp = &pool->sp_groups[g];
1380 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1381 svc_new_thread(grp);
1382 }
1383 svc_run_internal(&pool->sp_groups[0], TRUE);
1384
1385 /* Waiting for threads to stop. */
1386 for (g = 0; g < pool->sp_groupcount; g++) {
1387 grp = &pool->sp_groups[g];
1388 mtx_lock(&grp->sg_lock);
1389 while (grp->sg_threadcount > 0)
1390 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1391 mtx_unlock(&grp->sg_lock);
1392 }
1393 }
1394
1395 void
1396 svc_exit(SVCPOOL *pool)
1397 {
1398 SVCGROUP *grp;
1399 SVCTHREAD *st;
1400 int g;
1401
1402 pool->sp_state = SVCPOOL_CLOSING;
1403 for (g = 0; g < pool->sp_groupcount; g++) {
1404 grp = &pool->sp_groups[g];
1405 mtx_lock(&grp->sg_lock);
1406 if (grp->sg_state != SVCPOOL_CLOSING) {
1407 grp->sg_state = SVCPOOL_CLOSING;
1408 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1409 cv_signal(&st->st_cond);
1410 }
1411 mtx_unlock(&grp->sg_lock);
1412 }
1413 }
1414
1415 bool_t
1416 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1417 {
1418 struct mbuf *m;
1419 XDR xdrs;
1420 bool_t stat;
1421
1422 m = rqstp->rq_args;
1423 rqstp->rq_args = NULL;
1424
1425 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1426 stat = xargs(&xdrs, args);
1427 XDR_DESTROY(&xdrs);
1428
1429 return (stat);
1430 }
1431
1432 bool_t
1433 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1434 {
1435 XDR xdrs;
1436
1437 if (rqstp->rq_addr) {
1438 free(rqstp->rq_addr, M_SONAME);
1439 rqstp->rq_addr = NULL;
1440 }
1441
1442 xdrs.x_op = XDR_FREE;
1443 return (xargs(&xdrs, args));
1444 }
1445
1446 void
1447 svc_freereq(struct svc_req *rqstp)
1448 {
1449 SVCTHREAD *st;
1450 SVCPOOL *pool;
1451
1452 st = rqstp->rq_thread;
1453 if (st) {
1454 pool = st->st_pool;
1455 if (pool->sp_done)
1456 pool->sp_done(st, rqstp);
1457 }
1458
1459 if (rqstp->rq_auth.svc_ah_ops)
1460 SVCAUTH_RELEASE(&rqstp->rq_auth);
1461
1462 if (rqstp->rq_xprt) {
1463 SVC_RELEASE(rqstp->rq_xprt);
1464 }
1465
1466 if (rqstp->rq_addr)
1467 free(rqstp->rq_addr, M_SONAME);
1468
1469 if (rqstp->rq_args)
1470 m_freem(rqstp->rq_args);
1471
1472 free(rqstp, M_RPC);
1473 }
Cache object: b85a972be1ac0c74f25f8ded598948a6
|