FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c
1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*-
4 * Copyright (c) 2009, Sun Microsystems, Inc.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * - Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * - Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * - Neither the name of Sun Microsystems, Inc. nor the names of its
15 * contributors may be used to endorse or promote products derived
16 * from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #if defined(LIBC_SCCS) && !defined(lint)
32 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
33 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
34 #endif
35 #include <sys/cdefs.h>
36 __FBSDID("$FreeBSD: releng/11.2/sys/rpc/svc.c 331722 2018-03-29 02:50:57Z eadler $");
37
38 /*
39 * svc.c, Server-side remote procedure call interface.
40 *
41 * There are two sets of procedures here. The xprt routines are
42 * for handling transport handles. The svc routines handle the
43 * list of service routines.
44 *
45 * Copyright (C) 1984, Sun Microsystems, Inc.
46 */
47
48 #include <sys/param.h>
49 #include <sys/lock.h>
50 #include <sys/kernel.h>
51 #include <sys/kthread.h>
52 #include <sys/malloc.h>
53 #include <sys/mbuf.h>
54 #include <sys/mutex.h>
55 #include <sys/proc.h>
56 #include <sys/queue.h>
57 #include <sys/socketvar.h>
58 #include <sys/systm.h>
59 #include <sys/smp.h>
60 #include <sys/sx.h>
61 #include <sys/ucred.h>
62
63 #include <rpc/rpc.h>
64 #include <rpc/rpcb_clnt.h>
65 #include <rpc/replay.h>
66
67 #include <rpc/rpc_com.h>
68
69 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
70 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
71
72 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
73 char *);
74 static void svc_new_thread(SVCGROUP *grp);
75 static void xprt_unregister_locked(SVCXPRT *xprt);
76 static void svc_change_space_used(SVCPOOL *pool, long delta);
77 static bool_t svc_request_space_available(SVCPOOL *pool);
78 static void svcpool_cleanup(SVCPOOL *pool);
79
80 /* *************** SVCXPRT related stuff **************** */
81
82 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
83 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
84 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
85
86 SVCPOOL*
87 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
88 {
89 SVCPOOL *pool;
90 SVCGROUP *grp;
91 int g;
92
93 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
94
95 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
96 pool->sp_name = name;
97 pool->sp_state = SVCPOOL_INIT;
98 pool->sp_proc = NULL;
99 TAILQ_INIT(&pool->sp_callouts);
100 TAILQ_INIT(&pool->sp_lcallouts);
101 pool->sp_minthreads = 1;
102 pool->sp_maxthreads = 1;
103 pool->sp_groupcount = 1;
104 for (g = 0; g < SVC_MAXGROUPS; g++) {
105 grp = &pool->sp_groups[g];
106 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
107 grp->sg_pool = pool;
108 grp->sg_state = SVCPOOL_ACTIVE;
109 TAILQ_INIT(&grp->sg_xlist);
110 TAILQ_INIT(&grp->sg_active);
111 LIST_INIT(&grp->sg_idlethreads);
112 grp->sg_minthreads = 1;
113 grp->sg_maxthreads = 1;
114 }
115
116 /*
117 * Don't use more than a quarter of mbuf clusters. Nota bene:
118 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
119 * on LP64 architectures, so cast to u_long to avoid undefined
120 * behavior. (ILP32 architectures cannot have nmbclusters
121 * large enough to overflow for other reasons.)
122 */
123 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
124 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
125
126 sysctl_ctx_init(&pool->sp_sysctl);
127 if (sysctl_base) {
128 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
129 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
130 pool, 0, svcpool_minthread_sysctl, "I",
131 "Minimal number of threads");
132 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
133 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
134 pool, 0, svcpool_maxthread_sysctl, "I",
135 "Maximal number of threads");
136 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 "threads", CTLTYPE_INT | CTLFLAG_RD,
138 pool, 0, svcpool_threads_sysctl, "I",
139 "Current number of threads");
140 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
141 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
142 "Number of thread groups");
143
144 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
145 "request_space_used", CTLFLAG_RD,
146 &pool->sp_space_used,
147 "Space in parsed but not handled requests.");
148
149 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
150 "request_space_used_highest", CTLFLAG_RD,
151 &pool->sp_space_used_highest,
152 "Highest space used since reboot.");
153
154 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
155 "request_space_high", CTLFLAG_RW,
156 &pool->sp_space_high,
157 "Maximum space in parsed but not handled requests.");
158
159 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
160 "request_space_low", CTLFLAG_RW,
161 &pool->sp_space_low,
162 "Low water mark for request space.");
163
164 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
165 "request_space_throttled", CTLFLAG_RD,
166 &pool->sp_space_throttled, 0,
167 "Whether nfs requests are currently throttled");
168
169 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
170 "request_space_throttle_count", CTLFLAG_RD,
171 &pool->sp_space_throttle_count, 0,
172 "Count of times throttling based on request space has occurred");
173 }
174
175 return pool;
176 }
177
178 /*
179 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
180 * the pool data structures.
181 */
182 static void
183 svcpool_cleanup(SVCPOOL *pool)
184 {
185 SVCGROUP *grp;
186 SVCXPRT *xprt, *nxprt;
187 struct svc_callout *s;
188 struct svc_loss_callout *sl;
189 struct svcxprt_list cleanup;
190 int g;
191
192 TAILQ_INIT(&cleanup);
193
194 for (g = 0; g < SVC_MAXGROUPS; g++) {
195 grp = &pool->sp_groups[g];
196 mtx_lock(&grp->sg_lock);
197 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
198 xprt_unregister_locked(xprt);
199 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
200 }
201 mtx_unlock(&grp->sg_lock);
202 }
203 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
204 SVC_RELEASE(xprt);
205 }
206
207 mtx_lock(&pool->sp_lock);
208 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
209 mtx_unlock(&pool->sp_lock);
210 svc_unreg(pool, s->sc_prog, s->sc_vers);
211 mtx_lock(&pool->sp_lock);
212 }
213 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
214 mtx_unlock(&pool->sp_lock);
215 svc_loss_unreg(pool, sl->slc_dispatch);
216 mtx_lock(&pool->sp_lock);
217 }
218 mtx_unlock(&pool->sp_lock);
219 }
220
221 void
222 svcpool_destroy(SVCPOOL *pool)
223 {
224 SVCGROUP *grp;
225 int g;
226
227 svcpool_cleanup(pool);
228
229 for (g = 0; g < SVC_MAXGROUPS; g++) {
230 grp = &pool->sp_groups[g];
231 mtx_destroy(&grp->sg_lock);
232 }
233 mtx_destroy(&pool->sp_lock);
234
235 if (pool->sp_rcache)
236 replay_freecache(pool->sp_rcache);
237
238 sysctl_ctx_free(&pool->sp_sysctl);
239 free(pool, M_RPC);
240 }
241
242 /*
243 * Similar to svcpool_destroy(), except that it does not destroy the actual
244 * data structures. As such, "pool" may be used again.
245 */
246 void
247 svcpool_close(SVCPOOL *pool)
248 {
249 SVCGROUP *grp;
250 int g;
251
252 svcpool_cleanup(pool);
253
254 /* Now, initialize the pool's state for a fresh svc_run() call. */
255 mtx_lock(&pool->sp_lock);
256 pool->sp_state = SVCPOOL_INIT;
257 mtx_unlock(&pool->sp_lock);
258 for (g = 0; g < SVC_MAXGROUPS; g++) {
259 grp = &pool->sp_groups[g];
260 mtx_lock(&grp->sg_lock);
261 grp->sg_state = SVCPOOL_ACTIVE;
262 mtx_unlock(&grp->sg_lock);
263 }
264 }
265
266 /*
267 * Sysctl handler to get the present thread count on a pool
268 */
269 static int
270 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
271 {
272 SVCPOOL *pool;
273 int threads, error, g;
274
275 pool = oidp->oid_arg1;
276 threads = 0;
277 mtx_lock(&pool->sp_lock);
278 for (g = 0; g < pool->sp_groupcount; g++)
279 threads += pool->sp_groups[g].sg_threadcount;
280 mtx_unlock(&pool->sp_lock);
281 error = sysctl_handle_int(oidp, &threads, 0, req);
282 return (error);
283 }
284
285 /*
286 * Sysctl handler to set the minimum thread count on a pool
287 */
288 static int
289 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
290 {
291 SVCPOOL *pool;
292 int newminthreads, error, g;
293
294 pool = oidp->oid_arg1;
295 newminthreads = pool->sp_minthreads;
296 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
297 if (error == 0 && newminthreads != pool->sp_minthreads) {
298 if (newminthreads > pool->sp_maxthreads)
299 return (EINVAL);
300 mtx_lock(&pool->sp_lock);
301 pool->sp_minthreads = newminthreads;
302 for (g = 0; g < pool->sp_groupcount; g++) {
303 pool->sp_groups[g].sg_minthreads = max(1,
304 pool->sp_minthreads / pool->sp_groupcount);
305 }
306 mtx_unlock(&pool->sp_lock);
307 }
308 return (error);
309 }
310
311 /*
312 * Sysctl handler to set the maximum thread count on a pool
313 */
314 static int
315 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
316 {
317 SVCPOOL *pool;
318 int newmaxthreads, error, g;
319
320 pool = oidp->oid_arg1;
321 newmaxthreads = pool->sp_maxthreads;
322 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
323 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
324 if (newmaxthreads < pool->sp_minthreads)
325 return (EINVAL);
326 mtx_lock(&pool->sp_lock);
327 pool->sp_maxthreads = newmaxthreads;
328 for (g = 0; g < pool->sp_groupcount; g++) {
329 pool->sp_groups[g].sg_maxthreads = max(1,
330 pool->sp_maxthreads / pool->sp_groupcount);
331 }
332 mtx_unlock(&pool->sp_lock);
333 }
334 return (error);
335 }
336
337 /*
338 * Activate a transport handle.
339 */
340 void
341 xprt_register(SVCXPRT *xprt)
342 {
343 SVCPOOL *pool = xprt->xp_pool;
344 SVCGROUP *grp;
345 int g;
346
347 SVC_ACQUIRE(xprt);
348 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
349 xprt->xp_group = grp = &pool->sp_groups[g];
350 mtx_lock(&grp->sg_lock);
351 xprt->xp_registered = TRUE;
352 xprt->xp_active = FALSE;
353 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
354 mtx_unlock(&grp->sg_lock);
355 }
356
357 /*
358 * De-activate a transport handle. Note: the locked version doesn't
359 * release the transport - caller must do that after dropping the pool
360 * lock.
361 */
362 static void
363 xprt_unregister_locked(SVCXPRT *xprt)
364 {
365 SVCGROUP *grp = xprt->xp_group;
366
367 mtx_assert(&grp->sg_lock, MA_OWNED);
368 KASSERT(xprt->xp_registered == TRUE,
369 ("xprt_unregister_locked: not registered"));
370 xprt_inactive_locked(xprt);
371 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
372 xprt->xp_registered = FALSE;
373 }
374
375 void
376 xprt_unregister(SVCXPRT *xprt)
377 {
378 SVCGROUP *grp = xprt->xp_group;
379
380 mtx_lock(&grp->sg_lock);
381 if (xprt->xp_registered == FALSE) {
382 /* Already unregistered by another thread */
383 mtx_unlock(&grp->sg_lock);
384 return;
385 }
386 xprt_unregister_locked(xprt);
387 mtx_unlock(&grp->sg_lock);
388
389 SVC_RELEASE(xprt);
390 }
391
392 /*
393 * Attempt to assign a service thread to this transport.
394 */
395 static int
396 xprt_assignthread(SVCXPRT *xprt)
397 {
398 SVCGROUP *grp = xprt->xp_group;
399 SVCTHREAD *st;
400
401 mtx_assert(&grp->sg_lock, MA_OWNED);
402 st = LIST_FIRST(&grp->sg_idlethreads);
403 if (st) {
404 LIST_REMOVE(st, st_ilink);
405 SVC_ACQUIRE(xprt);
406 xprt->xp_thread = st;
407 st->st_xprt = xprt;
408 cv_signal(&st->st_cond);
409 return (TRUE);
410 } else {
411 /*
412 * See if we can create a new thread. The
413 * actual thread creation happens in
414 * svc_run_internal because our locking state
415 * is poorly defined (we are typically called
416 * from a socket upcall). Don't create more
417 * than one thread per second.
418 */
419 if (grp->sg_state == SVCPOOL_ACTIVE
420 && grp->sg_lastcreatetime < time_uptime
421 && grp->sg_threadcount < grp->sg_maxthreads) {
422 grp->sg_state = SVCPOOL_THREADWANTED;
423 }
424 }
425 return (FALSE);
426 }
427
428 void
429 xprt_active(SVCXPRT *xprt)
430 {
431 SVCGROUP *grp = xprt->xp_group;
432
433 mtx_lock(&grp->sg_lock);
434
435 if (!xprt->xp_registered) {
436 /*
437 * Race with xprt_unregister - we lose.
438 */
439 mtx_unlock(&grp->sg_lock);
440 return;
441 }
442
443 if (!xprt->xp_active) {
444 xprt->xp_active = TRUE;
445 if (xprt->xp_thread == NULL) {
446 if (!svc_request_space_available(xprt->xp_pool) ||
447 !xprt_assignthread(xprt))
448 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
449 xp_alink);
450 }
451 }
452
453 mtx_unlock(&grp->sg_lock);
454 }
455
456 void
457 xprt_inactive_locked(SVCXPRT *xprt)
458 {
459 SVCGROUP *grp = xprt->xp_group;
460
461 mtx_assert(&grp->sg_lock, MA_OWNED);
462 if (xprt->xp_active) {
463 if (xprt->xp_thread == NULL)
464 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
465 xprt->xp_active = FALSE;
466 }
467 }
468
469 void
470 xprt_inactive(SVCXPRT *xprt)
471 {
472 SVCGROUP *grp = xprt->xp_group;
473
474 mtx_lock(&grp->sg_lock);
475 xprt_inactive_locked(xprt);
476 mtx_unlock(&grp->sg_lock);
477 }
478
479 /*
480 * Variant of xprt_inactive() for use only when sure that port is
481 * assigned to thread. For example, within receive handlers.
482 */
483 void
484 xprt_inactive_self(SVCXPRT *xprt)
485 {
486
487 KASSERT(xprt->xp_thread != NULL,
488 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
489 xprt->xp_active = FALSE;
490 }
491
492 /*
493 * Add a service program to the callout list.
494 * The dispatch routine will be called when a rpc request for this
495 * program number comes in.
496 */
497 bool_t
498 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
499 void (*dispatch)(struct svc_req *, SVCXPRT *),
500 const struct netconfig *nconf)
501 {
502 SVCPOOL *pool = xprt->xp_pool;
503 struct svc_callout *s;
504 char *netid = NULL;
505 int flag = 0;
506
507 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
508
509 if (xprt->xp_netid) {
510 netid = strdup(xprt->xp_netid, M_RPC);
511 flag = 1;
512 } else if (nconf && nconf->nc_netid) {
513 netid = strdup(nconf->nc_netid, M_RPC);
514 flag = 1;
515 } /* must have been created with svc_raw_create */
516 if ((netid == NULL) && (flag == 1)) {
517 return (FALSE);
518 }
519
520 mtx_lock(&pool->sp_lock);
521 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
522 if (netid)
523 free(netid, M_RPC);
524 if (s->sc_dispatch == dispatch)
525 goto rpcb_it; /* he is registering another xptr */
526 mtx_unlock(&pool->sp_lock);
527 return (FALSE);
528 }
529 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
530 if (s == NULL) {
531 if (netid)
532 free(netid, M_RPC);
533 mtx_unlock(&pool->sp_lock);
534 return (FALSE);
535 }
536
537 s->sc_prog = prog;
538 s->sc_vers = vers;
539 s->sc_dispatch = dispatch;
540 s->sc_netid = netid;
541 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
542
543 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
544 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
545
546 rpcb_it:
547 mtx_unlock(&pool->sp_lock);
548 /* now register the information with the local binder service */
549 if (nconf) {
550 bool_t dummy;
551 struct netconfig tnc;
552 struct netbuf nb;
553 tnc = *nconf;
554 nb.buf = &xprt->xp_ltaddr;
555 nb.len = xprt->xp_ltaddr.ss_len;
556 dummy = rpcb_set(prog, vers, &tnc, &nb);
557 return (dummy);
558 }
559 return (TRUE);
560 }
561
562 /*
563 * Remove a service program from the callout list.
564 */
565 void
566 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
567 {
568 struct svc_callout *s;
569
570 /* unregister the information anyway */
571 (void) rpcb_unset(prog, vers, NULL);
572 mtx_lock(&pool->sp_lock);
573 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
574 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
575 if (s->sc_netid)
576 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
577 mem_free(s, sizeof (struct svc_callout));
578 }
579 mtx_unlock(&pool->sp_lock);
580 }
581
582 /*
583 * Add a service connection loss program to the callout list.
584 * The dispatch routine will be called when some port in ths pool die.
585 */
586 bool_t
587 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
588 {
589 SVCPOOL *pool = xprt->xp_pool;
590 struct svc_loss_callout *s;
591
592 mtx_lock(&pool->sp_lock);
593 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
594 if (s->slc_dispatch == dispatch)
595 break;
596 }
597 if (s != NULL) {
598 mtx_unlock(&pool->sp_lock);
599 return (TRUE);
600 }
601 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
602 if (s == NULL) {
603 mtx_unlock(&pool->sp_lock);
604 return (FALSE);
605 }
606 s->slc_dispatch = dispatch;
607 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
608 mtx_unlock(&pool->sp_lock);
609 return (TRUE);
610 }
611
612 /*
613 * Remove a service connection loss program from the callout list.
614 */
615 void
616 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
617 {
618 struct svc_loss_callout *s;
619
620 mtx_lock(&pool->sp_lock);
621 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
622 if (s->slc_dispatch == dispatch) {
623 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
624 free(s, M_RPC);
625 break;
626 }
627 }
628 mtx_unlock(&pool->sp_lock);
629 }
630
631 /* ********************** CALLOUT list related stuff ************* */
632
633 /*
634 * Search the callout list for a program number, return the callout
635 * struct.
636 */
637 static struct svc_callout *
638 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
639 {
640 struct svc_callout *s;
641
642 mtx_assert(&pool->sp_lock, MA_OWNED);
643 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
644 if (s->sc_prog == prog && s->sc_vers == vers
645 && (netid == NULL || s->sc_netid == NULL ||
646 strcmp(netid, s->sc_netid) == 0))
647 break;
648 }
649
650 return (s);
651 }
652
653 /* ******************* REPLY GENERATION ROUTINES ************ */
654
655 static bool_t
656 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
657 struct mbuf *body)
658 {
659 SVCXPRT *xprt = rqstp->rq_xprt;
660 bool_t ok;
661
662 if (rqstp->rq_args) {
663 m_freem(rqstp->rq_args);
664 rqstp->rq_args = NULL;
665 }
666
667 if (xprt->xp_pool->sp_rcache)
668 replay_setreply(xprt->xp_pool->sp_rcache,
669 rply, svc_getrpccaller(rqstp), body);
670
671 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
672 return (FALSE);
673
674 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
675 if (rqstp->rq_addr) {
676 free(rqstp->rq_addr, M_SONAME);
677 rqstp->rq_addr = NULL;
678 }
679
680 return (ok);
681 }
682
683 /*
684 * Send a reply to an rpc request
685 */
686 bool_t
687 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
688 {
689 struct rpc_msg rply;
690 struct mbuf *m;
691 XDR xdrs;
692 bool_t ok;
693
694 rply.rm_xid = rqstp->rq_xid;
695 rply.rm_direction = REPLY;
696 rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 rply.acpted_rply.ar_stat = SUCCESS;
699 rply.acpted_rply.ar_results.where = NULL;
700 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
701
702 m = m_getcl(M_WAITOK, MT_DATA, 0);
703 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
704 ok = xdr_results(&xdrs, xdr_location);
705 XDR_DESTROY(&xdrs);
706
707 if (ok) {
708 return (svc_sendreply_common(rqstp, &rply, m));
709 } else {
710 m_freem(m);
711 return (FALSE);
712 }
713 }
714
715 bool_t
716 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
717 {
718 struct rpc_msg rply;
719
720 rply.rm_xid = rqstp->rq_xid;
721 rply.rm_direction = REPLY;
722 rply.rm_reply.rp_stat = MSG_ACCEPTED;
723 rply.acpted_rply.ar_verf = rqstp->rq_verf;
724 rply.acpted_rply.ar_stat = SUCCESS;
725 rply.acpted_rply.ar_results.where = NULL;
726 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
727
728 return (svc_sendreply_common(rqstp, &rply, m));
729 }
730
731 /*
732 * No procedure error reply
733 */
734 void
735 svcerr_noproc(struct svc_req *rqstp)
736 {
737 SVCXPRT *xprt = rqstp->rq_xprt;
738 struct rpc_msg rply;
739
740 rply.rm_xid = rqstp->rq_xid;
741 rply.rm_direction = REPLY;
742 rply.rm_reply.rp_stat = MSG_ACCEPTED;
743 rply.acpted_rply.ar_verf = rqstp->rq_verf;
744 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
745
746 if (xprt->xp_pool->sp_rcache)
747 replay_setreply(xprt->xp_pool->sp_rcache,
748 &rply, svc_getrpccaller(rqstp), NULL);
749
750 svc_sendreply_common(rqstp, &rply, NULL);
751 }
752
753 /*
754 * Can't decode args error reply
755 */
756 void
757 svcerr_decode(struct svc_req *rqstp)
758 {
759 SVCXPRT *xprt = rqstp->rq_xprt;
760 struct rpc_msg rply;
761
762 rply.rm_xid = rqstp->rq_xid;
763 rply.rm_direction = REPLY;
764 rply.rm_reply.rp_stat = MSG_ACCEPTED;
765 rply.acpted_rply.ar_verf = rqstp->rq_verf;
766 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
767
768 if (xprt->xp_pool->sp_rcache)
769 replay_setreply(xprt->xp_pool->sp_rcache,
770 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
771
772 svc_sendreply_common(rqstp, &rply, NULL);
773 }
774
775 /*
776 * Some system error
777 */
778 void
779 svcerr_systemerr(struct svc_req *rqstp)
780 {
781 SVCXPRT *xprt = rqstp->rq_xprt;
782 struct rpc_msg rply;
783
784 rply.rm_xid = rqstp->rq_xid;
785 rply.rm_direction = REPLY;
786 rply.rm_reply.rp_stat = MSG_ACCEPTED;
787 rply.acpted_rply.ar_verf = rqstp->rq_verf;
788 rply.acpted_rply.ar_stat = SYSTEM_ERR;
789
790 if (xprt->xp_pool->sp_rcache)
791 replay_setreply(xprt->xp_pool->sp_rcache,
792 &rply, svc_getrpccaller(rqstp), NULL);
793
794 svc_sendreply_common(rqstp, &rply, NULL);
795 }
796
797 /*
798 * Authentication error reply
799 */
800 void
801 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
802 {
803 SVCXPRT *xprt = rqstp->rq_xprt;
804 struct rpc_msg rply;
805
806 rply.rm_xid = rqstp->rq_xid;
807 rply.rm_direction = REPLY;
808 rply.rm_reply.rp_stat = MSG_DENIED;
809 rply.rjcted_rply.rj_stat = AUTH_ERROR;
810 rply.rjcted_rply.rj_why = why;
811
812 if (xprt->xp_pool->sp_rcache)
813 replay_setreply(xprt->xp_pool->sp_rcache,
814 &rply, svc_getrpccaller(rqstp), NULL);
815
816 svc_sendreply_common(rqstp, &rply, NULL);
817 }
818
819 /*
820 * Auth too weak error reply
821 */
822 void
823 svcerr_weakauth(struct svc_req *rqstp)
824 {
825
826 svcerr_auth(rqstp, AUTH_TOOWEAK);
827 }
828
829 /*
830 * Program unavailable error reply
831 */
832 void
833 svcerr_noprog(struct svc_req *rqstp)
834 {
835 SVCXPRT *xprt = rqstp->rq_xprt;
836 struct rpc_msg rply;
837
838 rply.rm_xid = rqstp->rq_xid;
839 rply.rm_direction = REPLY;
840 rply.rm_reply.rp_stat = MSG_ACCEPTED;
841 rply.acpted_rply.ar_verf = rqstp->rq_verf;
842 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
843
844 if (xprt->xp_pool->sp_rcache)
845 replay_setreply(xprt->xp_pool->sp_rcache,
846 &rply, svc_getrpccaller(rqstp), NULL);
847
848 svc_sendreply_common(rqstp, &rply, NULL);
849 }
850
851 /*
852 * Program version mismatch error reply
853 */
854 void
855 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
856 {
857 SVCXPRT *xprt = rqstp->rq_xprt;
858 struct rpc_msg rply;
859
860 rply.rm_xid = rqstp->rq_xid;
861 rply.rm_direction = REPLY;
862 rply.rm_reply.rp_stat = MSG_ACCEPTED;
863 rply.acpted_rply.ar_verf = rqstp->rq_verf;
864 rply.acpted_rply.ar_stat = PROG_MISMATCH;
865 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
866 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
867
868 if (xprt->xp_pool->sp_rcache)
869 replay_setreply(xprt->xp_pool->sp_rcache,
870 &rply, svc_getrpccaller(rqstp), NULL);
871
872 svc_sendreply_common(rqstp, &rply, NULL);
873 }
874
875 /*
876 * Allocate a new server transport structure. All fields are
877 * initialized to zero and xp_p3 is initialized to point at an
878 * extension structure to hold various flags and authentication
879 * parameters.
880 */
881 SVCXPRT *
882 svc_xprt_alloc(void)
883 {
884 SVCXPRT *xprt;
885 SVCXPRT_EXT *ext;
886
887 xprt = mem_alloc(sizeof(SVCXPRT));
888 ext = mem_alloc(sizeof(SVCXPRT_EXT));
889 xprt->xp_p3 = ext;
890 refcount_init(&xprt->xp_refs, 1);
891
892 return (xprt);
893 }
894
895 /*
896 * Free a server transport structure.
897 */
898 void
899 svc_xprt_free(SVCXPRT *xprt)
900 {
901
902 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
903 mem_free(xprt, sizeof(SVCXPRT));
904 }
905
906 /* ******************* SERVER INPUT STUFF ******************* */
907
908 /*
909 * Read RPC requests from a transport and queue them to be
910 * executed. We handle authentication and replay cache replies here.
911 * Actually dispatching the RPC is deferred till svc_executereq.
912 */
913 static enum xprt_stat
914 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
915 {
916 SVCPOOL *pool = xprt->xp_pool;
917 struct svc_req *r;
918 struct rpc_msg msg;
919 struct mbuf *args;
920 struct svc_loss_callout *s;
921 enum xprt_stat stat;
922
923 /* now receive msgs from xprtprt (support batch calls) */
924 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
925
926 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
927 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
928 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
929 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
930 enum auth_stat why;
931
932 /*
933 * Handle replays and authenticate before queuing the
934 * request to be executed.
935 */
936 SVC_ACQUIRE(xprt);
937 r->rq_xprt = xprt;
938 if (pool->sp_rcache) {
939 struct rpc_msg repmsg;
940 struct mbuf *repbody;
941 enum replay_state rs;
942 rs = replay_find(pool->sp_rcache, &msg,
943 svc_getrpccaller(r), &repmsg, &repbody);
944 switch (rs) {
945 case RS_NEW:
946 break;
947 case RS_DONE:
948 SVC_REPLY(xprt, &repmsg, r->rq_addr,
949 repbody, &r->rq_reply_seq);
950 if (r->rq_addr) {
951 free(r->rq_addr, M_SONAME);
952 r->rq_addr = NULL;
953 }
954 m_freem(args);
955 goto call_done;
956
957 default:
958 m_freem(args);
959 goto call_done;
960 }
961 }
962
963 r->rq_xid = msg.rm_xid;
964 r->rq_prog = msg.rm_call.cb_prog;
965 r->rq_vers = msg.rm_call.cb_vers;
966 r->rq_proc = msg.rm_call.cb_proc;
967 r->rq_size = sizeof(*r) + m_length(args, NULL);
968 r->rq_args = args;
969 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
970 /*
971 * RPCSEC_GSS uses this return code
972 * for requests that form part of its
973 * context establishment protocol and
974 * should not be dispatched to the
975 * application.
976 */
977 if (why != RPCSEC_GSS_NODISPATCH)
978 svcerr_auth(r, why);
979 goto call_done;
980 }
981
982 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
983 svcerr_decode(r);
984 goto call_done;
985 }
986
987 /*
988 * Everything checks out, return request to caller.
989 */
990 *rqstp_ret = r;
991 r = NULL;
992 }
993 call_done:
994 if (r) {
995 svc_freereq(r);
996 r = NULL;
997 }
998 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
999 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1000 (*s->slc_dispatch)(xprt);
1001 xprt_unregister(xprt);
1002 }
1003
1004 return (stat);
1005 }
1006
1007 static void
1008 svc_executereq(struct svc_req *rqstp)
1009 {
1010 SVCXPRT *xprt = rqstp->rq_xprt;
1011 SVCPOOL *pool = xprt->xp_pool;
1012 int prog_found;
1013 rpcvers_t low_vers;
1014 rpcvers_t high_vers;
1015 struct svc_callout *s;
1016
1017 /* now match message with a registered service*/
1018 prog_found = FALSE;
1019 low_vers = (rpcvers_t) -1L;
1020 high_vers = (rpcvers_t) 0L;
1021 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1022 if (s->sc_prog == rqstp->rq_prog) {
1023 if (s->sc_vers == rqstp->rq_vers) {
1024 /*
1025 * We hand ownership of r to the
1026 * dispatch method - they must call
1027 * svc_freereq.
1028 */
1029 (*s->sc_dispatch)(rqstp, xprt);
1030 return;
1031 } /* found correct version */
1032 prog_found = TRUE;
1033 if (s->sc_vers < low_vers)
1034 low_vers = s->sc_vers;
1035 if (s->sc_vers > high_vers)
1036 high_vers = s->sc_vers;
1037 } /* found correct program */
1038 }
1039
1040 /*
1041 * if we got here, the program or version
1042 * is not served ...
1043 */
1044 if (prog_found)
1045 svcerr_progvers(rqstp, low_vers, high_vers);
1046 else
1047 svcerr_noprog(rqstp);
1048
1049 svc_freereq(rqstp);
1050 }
1051
1052 static void
1053 svc_checkidle(SVCGROUP *grp)
1054 {
1055 SVCXPRT *xprt, *nxprt;
1056 time_t timo;
1057 struct svcxprt_list cleanup;
1058
1059 TAILQ_INIT(&cleanup);
1060 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1061 /*
1062 * Only some transports have idle timers. Don't time
1063 * something out which is just waking up.
1064 */
1065 if (!xprt->xp_idletimeout || xprt->xp_thread)
1066 continue;
1067
1068 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1069 if (time_uptime > timo) {
1070 xprt_unregister_locked(xprt);
1071 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1072 }
1073 }
1074
1075 mtx_unlock(&grp->sg_lock);
1076 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1077 SVC_RELEASE(xprt);
1078 }
1079 mtx_lock(&grp->sg_lock);
1080 }
1081
1082 static void
1083 svc_assign_waiting_sockets(SVCPOOL *pool)
1084 {
1085 SVCGROUP *grp;
1086 SVCXPRT *xprt;
1087 int g;
1088
1089 for (g = 0; g < pool->sp_groupcount; g++) {
1090 grp = &pool->sp_groups[g];
1091 mtx_lock(&grp->sg_lock);
1092 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1093 if (xprt_assignthread(xprt))
1094 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1095 else
1096 break;
1097 }
1098 mtx_unlock(&grp->sg_lock);
1099 }
1100 }
1101
1102 static void
1103 svc_change_space_used(SVCPOOL *pool, long delta)
1104 {
1105 unsigned long value;
1106
1107 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1108 if (delta > 0) {
1109 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1110 pool->sp_space_throttled = TRUE;
1111 pool->sp_space_throttle_count++;
1112 }
1113 if (value > pool->sp_space_used_highest)
1114 pool->sp_space_used_highest = value;
1115 } else {
1116 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1117 pool->sp_space_throttled = FALSE;
1118 svc_assign_waiting_sockets(pool);
1119 }
1120 }
1121 }
1122
1123 static bool_t
1124 svc_request_space_available(SVCPOOL *pool)
1125 {
1126
1127 if (pool->sp_space_throttled)
1128 return (FALSE);
1129 return (TRUE);
1130 }
1131
1132 static void
1133 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1134 {
1135 SVCPOOL *pool = grp->sg_pool;
1136 SVCTHREAD *st, *stpref;
1137 SVCXPRT *xprt;
1138 enum xprt_stat stat;
1139 struct svc_req *rqstp;
1140 struct proc *p;
1141 long sz;
1142 int error;
1143
1144 st = mem_alloc(sizeof(*st));
1145 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1146 st->st_pool = pool;
1147 st->st_xprt = NULL;
1148 STAILQ_INIT(&st->st_reqs);
1149 cv_init(&st->st_cond, "rpcsvc");
1150
1151 mtx_lock(&grp->sg_lock);
1152
1153 /*
1154 * If we are a new thread which was spawned to cope with
1155 * increased load, set the state back to SVCPOOL_ACTIVE.
1156 */
1157 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1158 grp->sg_state = SVCPOOL_ACTIVE;
1159
1160 while (grp->sg_state != SVCPOOL_CLOSING) {
1161 /*
1162 * Create new thread if requested.
1163 */
1164 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1165 grp->sg_state = SVCPOOL_THREADSTARTING;
1166 grp->sg_lastcreatetime = time_uptime;
1167 mtx_unlock(&grp->sg_lock);
1168 svc_new_thread(grp);
1169 mtx_lock(&grp->sg_lock);
1170 continue;
1171 }
1172
1173 /*
1174 * Check for idle transports once per second.
1175 */
1176 if (time_uptime > grp->sg_lastidlecheck) {
1177 grp->sg_lastidlecheck = time_uptime;
1178 svc_checkidle(grp);
1179 }
1180
1181 xprt = st->st_xprt;
1182 if (!xprt) {
1183 /*
1184 * Enforce maxthreads count.
1185 */
1186 if (grp->sg_threadcount > grp->sg_maxthreads)
1187 break;
1188
1189 /*
1190 * Before sleeping, see if we can find an
1191 * active transport which isn't being serviced
1192 * by a thread.
1193 */
1194 if (svc_request_space_available(pool) &&
1195 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1196 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1197 SVC_ACQUIRE(xprt);
1198 xprt->xp_thread = st;
1199 st->st_xprt = xprt;
1200 continue;
1201 }
1202
1203 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1204 if (ismaster || (!ismaster &&
1205 grp->sg_threadcount > grp->sg_minthreads))
1206 error = cv_timedwait_sig(&st->st_cond,
1207 &grp->sg_lock, 5 * hz);
1208 else
1209 error = cv_wait_sig(&st->st_cond,
1210 &grp->sg_lock);
1211 if (st->st_xprt == NULL)
1212 LIST_REMOVE(st, st_ilink);
1213
1214 /*
1215 * Reduce worker thread count when idle.
1216 */
1217 if (error == EWOULDBLOCK) {
1218 if (!ismaster
1219 && (grp->sg_threadcount
1220 > grp->sg_minthreads)
1221 && !st->st_xprt)
1222 break;
1223 } else if (error != 0) {
1224 KASSERT(error == EINTR || error == ERESTART,
1225 ("non-signal error %d", error));
1226 mtx_unlock(&grp->sg_lock);
1227 p = curproc;
1228 PROC_LOCK(p);
1229 if (P_SHOULDSTOP(p) ||
1230 (p->p_flag & P_TOTAL_STOP) != 0) {
1231 thread_suspend_check(0);
1232 PROC_UNLOCK(p);
1233 mtx_lock(&grp->sg_lock);
1234 } else {
1235 PROC_UNLOCK(p);
1236 svc_exit(pool);
1237 mtx_lock(&grp->sg_lock);
1238 break;
1239 }
1240 }
1241 continue;
1242 }
1243 mtx_unlock(&grp->sg_lock);
1244
1245 /*
1246 * Drain the transport socket and queue up any RPCs.
1247 */
1248 xprt->xp_lastactive = time_uptime;
1249 do {
1250 if (!svc_request_space_available(pool))
1251 break;
1252 rqstp = NULL;
1253 stat = svc_getreq(xprt, &rqstp);
1254 if (rqstp) {
1255 svc_change_space_used(pool, rqstp->rq_size);
1256 /*
1257 * See if the application has a preference
1258 * for some other thread.
1259 */
1260 if (pool->sp_assign) {
1261 stpref = pool->sp_assign(st, rqstp);
1262 rqstp->rq_thread = stpref;
1263 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1264 rqstp, rq_link);
1265 mtx_unlock(&stpref->st_lock);
1266 if (stpref != st)
1267 rqstp = NULL;
1268 } else {
1269 rqstp->rq_thread = st;
1270 STAILQ_INSERT_TAIL(&st->st_reqs,
1271 rqstp, rq_link);
1272 }
1273 }
1274 } while (rqstp == NULL && stat == XPRT_MOREREQS
1275 && grp->sg_state != SVCPOOL_CLOSING);
1276
1277 /*
1278 * Move this transport to the end of the active list to
1279 * ensure fairness when multiple transports are active.
1280 * If this was the last queued request, svc_getreq will end
1281 * up calling xprt_inactive to remove from the active list.
1282 */
1283 mtx_lock(&grp->sg_lock);
1284 xprt->xp_thread = NULL;
1285 st->st_xprt = NULL;
1286 if (xprt->xp_active) {
1287 if (!svc_request_space_available(pool) ||
1288 !xprt_assignthread(xprt))
1289 TAILQ_INSERT_TAIL(&grp->sg_active,
1290 xprt, xp_alink);
1291 }
1292 mtx_unlock(&grp->sg_lock);
1293 SVC_RELEASE(xprt);
1294
1295 /*
1296 * Execute what we have queued.
1297 */
1298 mtx_lock(&st->st_lock);
1299 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1300 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1301 mtx_unlock(&st->st_lock);
1302 sz = (long)rqstp->rq_size;
1303 svc_executereq(rqstp);
1304 svc_change_space_used(pool, -sz);
1305 mtx_lock(&st->st_lock);
1306 }
1307 mtx_unlock(&st->st_lock);
1308 mtx_lock(&grp->sg_lock);
1309 }
1310
1311 if (st->st_xprt) {
1312 xprt = st->st_xprt;
1313 st->st_xprt = NULL;
1314 SVC_RELEASE(xprt);
1315 }
1316 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1317 mtx_destroy(&st->st_lock);
1318 cv_destroy(&st->st_cond);
1319 mem_free(st, sizeof(*st));
1320
1321 grp->sg_threadcount--;
1322 if (!ismaster)
1323 wakeup(grp);
1324 mtx_unlock(&grp->sg_lock);
1325 }
1326
1327 static void
1328 svc_thread_start(void *arg)
1329 {
1330
1331 svc_run_internal((SVCGROUP *) arg, FALSE);
1332 kthread_exit();
1333 }
1334
1335 static void
1336 svc_new_thread(SVCGROUP *grp)
1337 {
1338 SVCPOOL *pool = grp->sg_pool;
1339 struct thread *td;
1340
1341 mtx_lock(&grp->sg_lock);
1342 grp->sg_threadcount++;
1343 mtx_unlock(&grp->sg_lock);
1344 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1345 "%s: service", pool->sp_name);
1346 }
1347
1348 void
1349 svc_run(SVCPOOL *pool)
1350 {
1351 int g, i;
1352 struct proc *p;
1353 struct thread *td;
1354 SVCGROUP *grp;
1355
1356 p = curproc;
1357 td = curthread;
1358 snprintf(td->td_name, sizeof(td->td_name),
1359 "%s: master", pool->sp_name);
1360 pool->sp_state = SVCPOOL_ACTIVE;
1361 pool->sp_proc = p;
1362
1363 /* Choose group count based on number of threads and CPUs. */
1364 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1365 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1366 for (g = 0; g < pool->sp_groupcount; g++) {
1367 grp = &pool->sp_groups[g];
1368 grp->sg_minthreads = max(1,
1369 pool->sp_minthreads / pool->sp_groupcount);
1370 grp->sg_maxthreads = max(1,
1371 pool->sp_maxthreads / pool->sp_groupcount);
1372 grp->sg_lastcreatetime = time_uptime;
1373 }
1374
1375 /* Starting threads */
1376 pool->sp_groups[0].sg_threadcount++;
1377 for (g = 0; g < pool->sp_groupcount; g++) {
1378 grp = &pool->sp_groups[g];
1379 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1380 svc_new_thread(grp);
1381 }
1382 svc_run_internal(&pool->sp_groups[0], TRUE);
1383
1384 /* Waiting for threads to stop. */
1385 for (g = 0; g < pool->sp_groupcount; g++) {
1386 grp = &pool->sp_groups[g];
1387 mtx_lock(&grp->sg_lock);
1388 while (grp->sg_threadcount > 0)
1389 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1390 mtx_unlock(&grp->sg_lock);
1391 }
1392 }
1393
1394 void
1395 svc_exit(SVCPOOL *pool)
1396 {
1397 SVCGROUP *grp;
1398 SVCTHREAD *st;
1399 int g;
1400
1401 pool->sp_state = SVCPOOL_CLOSING;
1402 for (g = 0; g < pool->sp_groupcount; g++) {
1403 grp = &pool->sp_groups[g];
1404 mtx_lock(&grp->sg_lock);
1405 if (grp->sg_state != SVCPOOL_CLOSING) {
1406 grp->sg_state = SVCPOOL_CLOSING;
1407 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1408 cv_signal(&st->st_cond);
1409 }
1410 mtx_unlock(&grp->sg_lock);
1411 }
1412 }
1413
1414 bool_t
1415 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1416 {
1417 struct mbuf *m;
1418 XDR xdrs;
1419 bool_t stat;
1420
1421 m = rqstp->rq_args;
1422 rqstp->rq_args = NULL;
1423
1424 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1425 stat = xargs(&xdrs, args);
1426 XDR_DESTROY(&xdrs);
1427
1428 return (stat);
1429 }
1430
1431 bool_t
1432 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1433 {
1434 XDR xdrs;
1435
1436 if (rqstp->rq_addr) {
1437 free(rqstp->rq_addr, M_SONAME);
1438 rqstp->rq_addr = NULL;
1439 }
1440
1441 xdrs.x_op = XDR_FREE;
1442 return (xargs(&xdrs, args));
1443 }
1444
1445 void
1446 svc_freereq(struct svc_req *rqstp)
1447 {
1448 SVCTHREAD *st;
1449 SVCPOOL *pool;
1450
1451 st = rqstp->rq_thread;
1452 if (st) {
1453 pool = st->st_pool;
1454 if (pool->sp_done)
1455 pool->sp_done(st, rqstp);
1456 }
1457
1458 if (rqstp->rq_auth.svc_ah_ops)
1459 SVCAUTH_RELEASE(&rqstp->rq_auth);
1460
1461 if (rqstp->rq_xprt) {
1462 SVC_RELEASE(rqstp->rq_xprt);
1463 }
1464
1465 if (rqstp->rq_addr)
1466 free(rqstp->rq_addr, M_SONAME);
1467
1468 if (rqstp->rq_args)
1469 m_freem(rqstp->rq_args);
1470
1471 free(rqstp, M_RPC);
1472 }
Cache object: f5c568a390dd41f06727ee26062433d7
|