FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c
1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39
40 /*
41 * svc.c, Server-side remote procedure call interface.
42 *
43 * There are two sets of procedures here. The xprt routines are
44 * for handling transport handles. The svc routines handle the
45 * list of service routines.
46 *
47 * Copyright (C) 1984, Sun Microsystems, Inc.
48 */
49
50 #include <sys/param.h>
51 #include <sys/lock.h>
52 #include <sys/kernel.h>
53 #include <sys/kthread.h>
54 #include <sys/malloc.h>
55 #include <sys/mbuf.h>
56 #include <sys/mutex.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/socketvar.h>
60 #include <sys/systm.h>
61 #include <sys/smp.h>
62 #include <sys/sx.h>
63 #include <sys/ucred.h>
64
65 #include <rpc/rpc.h>
66 #include <rpc/rpcb_clnt.h>
67 #include <rpc/replay.h>
68
69 #include <rpc/rpc_com.h>
70
71 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73
74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75 char *);
76 static void svc_new_thread(SVCGROUP *grp);
77 static void xprt_unregister_locked(SVCXPRT *xprt);
78 static void svc_change_space_used(SVCPOOL *pool, long delta);
79 static bool_t svc_request_space_available(SVCPOOL *pool);
80 static void svcpool_cleanup(SVCPOOL *pool);
81
82 /* *************** SVCXPRT related stuff **************** */
83
84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87
88 SVCPOOL*
89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90 {
91 SVCPOOL *pool;
92 SVCGROUP *grp;
93 int g;
94
95 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96
97 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98 pool->sp_name = name;
99 pool->sp_state = SVCPOOL_INIT;
100 pool->sp_proc = NULL;
101 TAILQ_INIT(&pool->sp_callouts);
102 TAILQ_INIT(&pool->sp_lcallouts);
103 pool->sp_minthreads = 1;
104 pool->sp_maxthreads = 1;
105 pool->sp_groupcount = 1;
106 for (g = 0; g < SVC_MAXGROUPS; g++) {
107 grp = &pool->sp_groups[g];
108 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109 grp->sg_pool = pool;
110 grp->sg_state = SVCPOOL_ACTIVE;
111 TAILQ_INIT(&grp->sg_xlist);
112 TAILQ_INIT(&grp->sg_active);
113 LIST_INIT(&grp->sg_idlethreads);
114 grp->sg_minthreads = 1;
115 grp->sg_maxthreads = 1;
116 }
117
118 /*
119 * Don't use more than a quarter of mbuf clusters. Nota bene:
120 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
121 * on LP64 architectures, so cast to u_long to avoid undefined
122 * behavior. (ILP32 architectures cannot have nmbclusters
123 * large enough to overflow for other reasons.)
124 */
125 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
126 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127
128 sysctl_ctx_init(&pool->sp_sysctl);
129 if (sysctl_base) {
130 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 "minthreads", CTLTYPE_INT | CTLFLAG_RW,
132 pool, 0, svcpool_minthread_sysctl, "I",
133 "Minimal number of threads");
134 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135 "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
136 pool, 0, svcpool_maxthread_sysctl, "I",
137 "Maximal number of threads");
138 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139 "threads", CTLTYPE_INT | CTLFLAG_RD,
140 pool, 0, svcpool_threads_sysctl, "I",
141 "Current number of threads");
142 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
144 "Number of thread groups");
145
146 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 "request_space_used", CTLFLAG_RD,
148 &pool->sp_space_used,
149 "Space in parsed but not handled requests.");
150
151 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152 "request_space_used_highest", CTLFLAG_RD,
153 &pool->sp_space_used_highest,
154 "Highest space used since reboot.");
155
156 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157 "request_space_high", CTLFLAG_RW,
158 &pool->sp_space_high,
159 "Maximum space in parsed but not handled requests.");
160
161 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162 "request_space_low", CTLFLAG_RW,
163 &pool->sp_space_low,
164 "Low water mark for request space.");
165
166 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167 "request_space_throttled", CTLFLAG_RD,
168 &pool->sp_space_throttled, 0,
169 "Whether nfs requests are currently throttled");
170
171 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
172 "request_space_throttle_count", CTLFLAG_RD,
173 &pool->sp_space_throttle_count, 0,
174 "Count of times throttling based on request space has occurred");
175 }
176
177 return pool;
178 }
179
180 /*
181 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
182 * the pool data structures.
183 */
184 static void
185 svcpool_cleanup(SVCPOOL *pool)
186 {
187 SVCGROUP *grp;
188 SVCXPRT *xprt, *nxprt;
189 struct svc_callout *s;
190 struct svc_loss_callout *sl;
191 struct svcxprt_list cleanup;
192 int g;
193
194 TAILQ_INIT(&cleanup);
195
196 for (g = 0; g < SVC_MAXGROUPS; g++) {
197 grp = &pool->sp_groups[g];
198 mtx_lock(&grp->sg_lock);
199 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
200 xprt_unregister_locked(xprt);
201 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202 }
203 mtx_unlock(&grp->sg_lock);
204 }
205 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
206 if (xprt->xp_socket != NULL)
207 soshutdown(xprt->xp_socket, SHUT_WR);
208 SVC_RELEASE(xprt);
209 }
210
211 mtx_lock(&pool->sp_lock);
212 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
213 mtx_unlock(&pool->sp_lock);
214 svc_unreg(pool, s->sc_prog, s->sc_vers);
215 mtx_lock(&pool->sp_lock);
216 }
217 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
218 mtx_unlock(&pool->sp_lock);
219 svc_loss_unreg(pool, sl->slc_dispatch);
220 mtx_lock(&pool->sp_lock);
221 }
222 mtx_unlock(&pool->sp_lock);
223 }
224
225 void
226 svcpool_destroy(SVCPOOL *pool)
227 {
228 SVCGROUP *grp;
229 int g;
230
231 svcpool_cleanup(pool);
232
233 for (g = 0; g < SVC_MAXGROUPS; g++) {
234 grp = &pool->sp_groups[g];
235 mtx_destroy(&grp->sg_lock);
236 }
237 mtx_destroy(&pool->sp_lock);
238
239 if (pool->sp_rcache)
240 replay_freecache(pool->sp_rcache);
241
242 sysctl_ctx_free(&pool->sp_sysctl);
243 free(pool, M_RPC);
244 }
245
246 /*
247 * Similar to svcpool_destroy(), except that it does not destroy the actual
248 * data structures. As such, "pool" may be used again.
249 */
250 void
251 svcpool_close(SVCPOOL *pool)
252 {
253 SVCGROUP *grp;
254 int g;
255
256 svcpool_cleanup(pool);
257
258 /* Now, initialize the pool's state for a fresh svc_run() call. */
259 mtx_lock(&pool->sp_lock);
260 pool->sp_state = SVCPOOL_INIT;
261 mtx_unlock(&pool->sp_lock);
262 for (g = 0; g < SVC_MAXGROUPS; g++) {
263 grp = &pool->sp_groups[g];
264 mtx_lock(&grp->sg_lock);
265 grp->sg_state = SVCPOOL_ACTIVE;
266 mtx_unlock(&grp->sg_lock);
267 }
268 }
269
270 /*
271 * Sysctl handler to get the present thread count on a pool
272 */
273 static int
274 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
275 {
276 SVCPOOL *pool;
277 int threads, error, g;
278
279 pool = oidp->oid_arg1;
280 threads = 0;
281 mtx_lock(&pool->sp_lock);
282 for (g = 0; g < pool->sp_groupcount; g++)
283 threads += pool->sp_groups[g].sg_threadcount;
284 mtx_unlock(&pool->sp_lock);
285 error = sysctl_handle_int(oidp, &threads, 0, req);
286 return (error);
287 }
288
289 /*
290 * Sysctl handler to set the minimum thread count on a pool
291 */
292 static int
293 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
294 {
295 SVCPOOL *pool;
296 int newminthreads, error, g;
297
298 pool = oidp->oid_arg1;
299 newminthreads = pool->sp_minthreads;
300 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
301 if (error == 0 && newminthreads != pool->sp_minthreads) {
302 if (newminthreads > pool->sp_maxthreads)
303 return (EINVAL);
304 mtx_lock(&pool->sp_lock);
305 pool->sp_minthreads = newminthreads;
306 for (g = 0; g < pool->sp_groupcount; g++) {
307 pool->sp_groups[g].sg_minthreads = max(1,
308 pool->sp_minthreads / pool->sp_groupcount);
309 }
310 mtx_unlock(&pool->sp_lock);
311 }
312 return (error);
313 }
314
315 /*
316 * Sysctl handler to set the maximum thread count on a pool
317 */
318 static int
319 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
320 {
321 SVCPOOL *pool;
322 int newmaxthreads, error, g;
323
324 pool = oidp->oid_arg1;
325 newmaxthreads = pool->sp_maxthreads;
326 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
327 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
328 if (newmaxthreads < pool->sp_minthreads)
329 return (EINVAL);
330 mtx_lock(&pool->sp_lock);
331 pool->sp_maxthreads = newmaxthreads;
332 for (g = 0; g < pool->sp_groupcount; g++) {
333 pool->sp_groups[g].sg_maxthreads = max(1,
334 pool->sp_maxthreads / pool->sp_groupcount);
335 }
336 mtx_unlock(&pool->sp_lock);
337 }
338 return (error);
339 }
340
341 /*
342 * Activate a transport handle.
343 */
344 void
345 xprt_register(SVCXPRT *xprt)
346 {
347 SVCPOOL *pool = xprt->xp_pool;
348 SVCGROUP *grp;
349 int g;
350
351 SVC_ACQUIRE(xprt);
352 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
353 xprt->xp_group = grp = &pool->sp_groups[g];
354 mtx_lock(&grp->sg_lock);
355 xprt->xp_registered = TRUE;
356 xprt->xp_active = FALSE;
357 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
358 mtx_unlock(&grp->sg_lock);
359 }
360
361 /*
362 * De-activate a transport handle. Note: the locked version doesn't
363 * release the transport - caller must do that after dropping the pool
364 * lock.
365 */
366 static void
367 xprt_unregister_locked(SVCXPRT *xprt)
368 {
369 SVCGROUP *grp = xprt->xp_group;
370
371 mtx_assert(&grp->sg_lock, MA_OWNED);
372 KASSERT(xprt->xp_registered == TRUE,
373 ("xprt_unregister_locked: not registered"));
374 xprt_inactive_locked(xprt);
375 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
376 xprt->xp_registered = FALSE;
377 }
378
379 void
380 xprt_unregister(SVCXPRT *xprt)
381 {
382 SVCGROUP *grp = xprt->xp_group;
383
384 mtx_lock(&grp->sg_lock);
385 if (xprt->xp_registered == FALSE) {
386 /* Already unregistered by another thread */
387 mtx_unlock(&grp->sg_lock);
388 return;
389 }
390 xprt_unregister_locked(xprt);
391 mtx_unlock(&grp->sg_lock);
392
393 if (xprt->xp_socket != NULL)
394 soshutdown(xprt->xp_socket, SHUT_WR);
395 SVC_RELEASE(xprt);
396 }
397
398 /*
399 * Attempt to assign a service thread to this transport.
400 */
401 static int
402 xprt_assignthread(SVCXPRT *xprt)
403 {
404 SVCGROUP *grp = xprt->xp_group;
405 SVCTHREAD *st;
406
407 mtx_assert(&grp->sg_lock, MA_OWNED);
408 st = LIST_FIRST(&grp->sg_idlethreads);
409 if (st) {
410 LIST_REMOVE(st, st_ilink);
411 SVC_ACQUIRE(xprt);
412 xprt->xp_thread = st;
413 st->st_xprt = xprt;
414 cv_signal(&st->st_cond);
415 return (TRUE);
416 } else {
417 /*
418 * See if we can create a new thread. The
419 * actual thread creation happens in
420 * svc_run_internal because our locking state
421 * is poorly defined (we are typically called
422 * from a socket upcall). Don't create more
423 * than one thread per second.
424 */
425 if (grp->sg_state == SVCPOOL_ACTIVE
426 && grp->sg_lastcreatetime < time_uptime
427 && grp->sg_threadcount < grp->sg_maxthreads) {
428 grp->sg_state = SVCPOOL_THREADWANTED;
429 }
430 }
431 return (FALSE);
432 }
433
434 void
435 xprt_active(SVCXPRT *xprt)
436 {
437 SVCGROUP *grp = xprt->xp_group;
438
439 mtx_lock(&grp->sg_lock);
440
441 if (!xprt->xp_registered) {
442 /*
443 * Race with xprt_unregister - we lose.
444 */
445 mtx_unlock(&grp->sg_lock);
446 return;
447 }
448
449 if (!xprt->xp_active) {
450 xprt->xp_active = TRUE;
451 if (xprt->xp_thread == NULL) {
452 if (!svc_request_space_available(xprt->xp_pool) ||
453 !xprt_assignthread(xprt))
454 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
455 xp_alink);
456 }
457 }
458
459 mtx_unlock(&grp->sg_lock);
460 }
461
462 void
463 xprt_inactive_locked(SVCXPRT *xprt)
464 {
465 SVCGROUP *grp = xprt->xp_group;
466
467 mtx_assert(&grp->sg_lock, MA_OWNED);
468 if (xprt->xp_active) {
469 if (xprt->xp_thread == NULL)
470 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
471 xprt->xp_active = FALSE;
472 }
473 }
474
475 void
476 xprt_inactive(SVCXPRT *xprt)
477 {
478 SVCGROUP *grp = xprt->xp_group;
479
480 mtx_lock(&grp->sg_lock);
481 xprt_inactive_locked(xprt);
482 mtx_unlock(&grp->sg_lock);
483 }
484
485 /*
486 * Variant of xprt_inactive() for use only when sure that port is
487 * assigned to thread. For example, within receive handlers.
488 */
489 void
490 xprt_inactive_self(SVCXPRT *xprt)
491 {
492
493 KASSERT(xprt->xp_thread != NULL,
494 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
495 xprt->xp_active = FALSE;
496 }
497
498 /*
499 * Add a service program to the callout list.
500 * The dispatch routine will be called when a rpc request for this
501 * program number comes in.
502 */
503 bool_t
504 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
505 void (*dispatch)(struct svc_req *, SVCXPRT *),
506 const struct netconfig *nconf)
507 {
508 SVCPOOL *pool = xprt->xp_pool;
509 struct svc_callout *s;
510 char *netid = NULL;
511 int flag = 0;
512
513 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
514
515 if (xprt->xp_netid) {
516 netid = strdup(xprt->xp_netid, M_RPC);
517 flag = 1;
518 } else if (nconf && nconf->nc_netid) {
519 netid = strdup(nconf->nc_netid, M_RPC);
520 flag = 1;
521 } /* must have been created with svc_raw_create */
522 if ((netid == NULL) && (flag == 1)) {
523 return (FALSE);
524 }
525
526 mtx_lock(&pool->sp_lock);
527 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
528 if (netid)
529 free(netid, M_RPC);
530 if (s->sc_dispatch == dispatch)
531 goto rpcb_it; /* he is registering another xptr */
532 mtx_unlock(&pool->sp_lock);
533 return (FALSE);
534 }
535 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
536 if (s == NULL) {
537 if (netid)
538 free(netid, M_RPC);
539 mtx_unlock(&pool->sp_lock);
540 return (FALSE);
541 }
542
543 s->sc_prog = prog;
544 s->sc_vers = vers;
545 s->sc_dispatch = dispatch;
546 s->sc_netid = netid;
547 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
548
549 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
550 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
551
552 rpcb_it:
553 mtx_unlock(&pool->sp_lock);
554 /* now register the information with the local binder service */
555 if (nconf) {
556 bool_t dummy;
557 struct netconfig tnc;
558 struct netbuf nb;
559 tnc = *nconf;
560 nb.buf = &xprt->xp_ltaddr;
561 nb.len = xprt->xp_ltaddr.ss_len;
562 dummy = rpcb_set(prog, vers, &tnc, &nb);
563 return (dummy);
564 }
565 return (TRUE);
566 }
567
568 /*
569 * Remove a service program from the callout list.
570 */
571 void
572 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
573 {
574 struct svc_callout *s;
575
576 /* unregister the information anyway */
577 (void) rpcb_unset(prog, vers, NULL);
578 mtx_lock(&pool->sp_lock);
579 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
580 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
581 if (s->sc_netid)
582 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
583 mem_free(s, sizeof (struct svc_callout));
584 }
585 mtx_unlock(&pool->sp_lock);
586 }
587
588 /*
589 * Add a service connection loss program to the callout list.
590 * The dispatch routine will be called when some port in ths pool die.
591 */
592 bool_t
593 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
594 {
595 SVCPOOL *pool = xprt->xp_pool;
596 struct svc_loss_callout *s;
597
598 mtx_lock(&pool->sp_lock);
599 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
600 if (s->slc_dispatch == dispatch)
601 break;
602 }
603 if (s != NULL) {
604 mtx_unlock(&pool->sp_lock);
605 return (TRUE);
606 }
607 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
608 if (s == NULL) {
609 mtx_unlock(&pool->sp_lock);
610 return (FALSE);
611 }
612 s->slc_dispatch = dispatch;
613 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
614 mtx_unlock(&pool->sp_lock);
615 return (TRUE);
616 }
617
618 /*
619 * Remove a service connection loss program from the callout list.
620 */
621 void
622 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
623 {
624 struct svc_loss_callout *s;
625
626 mtx_lock(&pool->sp_lock);
627 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
628 if (s->slc_dispatch == dispatch) {
629 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
630 free(s, M_RPC);
631 break;
632 }
633 }
634 mtx_unlock(&pool->sp_lock);
635 }
636
637 /* ********************** CALLOUT list related stuff ************* */
638
639 /*
640 * Search the callout list for a program number, return the callout
641 * struct.
642 */
643 static struct svc_callout *
644 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
645 {
646 struct svc_callout *s;
647
648 mtx_assert(&pool->sp_lock, MA_OWNED);
649 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
650 if (s->sc_prog == prog && s->sc_vers == vers
651 && (netid == NULL || s->sc_netid == NULL ||
652 strcmp(netid, s->sc_netid) == 0))
653 break;
654 }
655
656 return (s);
657 }
658
659 /* ******************* REPLY GENERATION ROUTINES ************ */
660
661 static bool_t
662 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
663 struct mbuf *body)
664 {
665 SVCXPRT *xprt = rqstp->rq_xprt;
666 bool_t ok;
667
668 if (rqstp->rq_args) {
669 m_freem(rqstp->rq_args);
670 rqstp->rq_args = NULL;
671 }
672
673 if (xprt->xp_pool->sp_rcache)
674 replay_setreply(xprt->xp_pool->sp_rcache,
675 rply, svc_getrpccaller(rqstp), body);
676
677 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
678 return (FALSE);
679
680 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
681 if (rqstp->rq_addr) {
682 free(rqstp->rq_addr, M_SONAME);
683 rqstp->rq_addr = NULL;
684 }
685
686 return (ok);
687 }
688
689 /*
690 * Send a reply to an rpc request
691 */
692 bool_t
693 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
694 {
695 struct rpc_msg rply;
696 struct mbuf *m;
697 XDR xdrs;
698 bool_t ok;
699
700 rply.rm_xid = rqstp->rq_xid;
701 rply.rm_direction = REPLY;
702 rply.rm_reply.rp_stat = MSG_ACCEPTED;
703 rply.acpted_rply.ar_verf = rqstp->rq_verf;
704 rply.acpted_rply.ar_stat = SUCCESS;
705 rply.acpted_rply.ar_results.where = NULL;
706 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
707
708 m = m_getcl(M_WAITOK, MT_DATA, 0);
709 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
710 ok = xdr_results(&xdrs, xdr_location);
711 XDR_DESTROY(&xdrs);
712
713 if (ok) {
714 return (svc_sendreply_common(rqstp, &rply, m));
715 } else {
716 m_freem(m);
717 return (FALSE);
718 }
719 }
720
721 bool_t
722 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
723 {
724 struct rpc_msg rply;
725
726 rply.rm_xid = rqstp->rq_xid;
727 rply.rm_direction = REPLY;
728 rply.rm_reply.rp_stat = MSG_ACCEPTED;
729 rply.acpted_rply.ar_verf = rqstp->rq_verf;
730 rply.acpted_rply.ar_stat = SUCCESS;
731 rply.acpted_rply.ar_results.where = NULL;
732 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
733
734 return (svc_sendreply_common(rqstp, &rply, m));
735 }
736
737 /*
738 * No procedure error reply
739 */
740 void
741 svcerr_noproc(struct svc_req *rqstp)
742 {
743 SVCXPRT *xprt = rqstp->rq_xprt;
744 struct rpc_msg rply;
745
746 rply.rm_xid = rqstp->rq_xid;
747 rply.rm_direction = REPLY;
748 rply.rm_reply.rp_stat = MSG_ACCEPTED;
749 rply.acpted_rply.ar_verf = rqstp->rq_verf;
750 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
751
752 if (xprt->xp_pool->sp_rcache)
753 replay_setreply(xprt->xp_pool->sp_rcache,
754 &rply, svc_getrpccaller(rqstp), NULL);
755
756 svc_sendreply_common(rqstp, &rply, NULL);
757 }
758
759 /*
760 * Can't decode args error reply
761 */
762 void
763 svcerr_decode(struct svc_req *rqstp)
764 {
765 SVCXPRT *xprt = rqstp->rq_xprt;
766 struct rpc_msg rply;
767
768 rply.rm_xid = rqstp->rq_xid;
769 rply.rm_direction = REPLY;
770 rply.rm_reply.rp_stat = MSG_ACCEPTED;
771 rply.acpted_rply.ar_verf = rqstp->rq_verf;
772 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
773
774 if (xprt->xp_pool->sp_rcache)
775 replay_setreply(xprt->xp_pool->sp_rcache,
776 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
777
778 svc_sendreply_common(rqstp, &rply, NULL);
779 }
780
781 /*
782 * Some system error
783 */
784 void
785 svcerr_systemerr(struct svc_req *rqstp)
786 {
787 SVCXPRT *xprt = rqstp->rq_xprt;
788 struct rpc_msg rply;
789
790 rply.rm_xid = rqstp->rq_xid;
791 rply.rm_direction = REPLY;
792 rply.rm_reply.rp_stat = MSG_ACCEPTED;
793 rply.acpted_rply.ar_verf = rqstp->rq_verf;
794 rply.acpted_rply.ar_stat = SYSTEM_ERR;
795
796 if (xprt->xp_pool->sp_rcache)
797 replay_setreply(xprt->xp_pool->sp_rcache,
798 &rply, svc_getrpccaller(rqstp), NULL);
799
800 svc_sendreply_common(rqstp, &rply, NULL);
801 }
802
803 /*
804 * Authentication error reply
805 */
806 void
807 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
808 {
809 SVCXPRT *xprt = rqstp->rq_xprt;
810 struct rpc_msg rply;
811
812 rply.rm_xid = rqstp->rq_xid;
813 rply.rm_direction = REPLY;
814 rply.rm_reply.rp_stat = MSG_DENIED;
815 rply.rjcted_rply.rj_stat = AUTH_ERROR;
816 rply.rjcted_rply.rj_why = why;
817
818 if (xprt->xp_pool->sp_rcache)
819 replay_setreply(xprt->xp_pool->sp_rcache,
820 &rply, svc_getrpccaller(rqstp), NULL);
821
822 svc_sendreply_common(rqstp, &rply, NULL);
823 }
824
825 /*
826 * Auth too weak error reply
827 */
828 void
829 svcerr_weakauth(struct svc_req *rqstp)
830 {
831
832 svcerr_auth(rqstp, AUTH_TOOWEAK);
833 }
834
835 /*
836 * Program unavailable error reply
837 */
838 void
839 svcerr_noprog(struct svc_req *rqstp)
840 {
841 SVCXPRT *xprt = rqstp->rq_xprt;
842 struct rpc_msg rply;
843
844 rply.rm_xid = rqstp->rq_xid;
845 rply.rm_direction = REPLY;
846 rply.rm_reply.rp_stat = MSG_ACCEPTED;
847 rply.acpted_rply.ar_verf = rqstp->rq_verf;
848 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
849
850 if (xprt->xp_pool->sp_rcache)
851 replay_setreply(xprt->xp_pool->sp_rcache,
852 &rply, svc_getrpccaller(rqstp), NULL);
853
854 svc_sendreply_common(rqstp, &rply, NULL);
855 }
856
857 /*
858 * Program version mismatch error reply
859 */
860 void
861 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
862 {
863 SVCXPRT *xprt = rqstp->rq_xprt;
864 struct rpc_msg rply;
865
866 rply.rm_xid = rqstp->rq_xid;
867 rply.rm_direction = REPLY;
868 rply.rm_reply.rp_stat = MSG_ACCEPTED;
869 rply.acpted_rply.ar_verf = rqstp->rq_verf;
870 rply.acpted_rply.ar_stat = PROG_MISMATCH;
871 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
872 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
873
874 if (xprt->xp_pool->sp_rcache)
875 replay_setreply(xprt->xp_pool->sp_rcache,
876 &rply, svc_getrpccaller(rqstp), NULL);
877
878 svc_sendreply_common(rqstp, &rply, NULL);
879 }
880
881 /*
882 * Allocate a new server transport structure. All fields are
883 * initialized to zero and xp_p3 is initialized to point at an
884 * extension structure to hold various flags and authentication
885 * parameters.
886 */
887 SVCXPRT *
888 svc_xprt_alloc(void)
889 {
890 SVCXPRT *xprt;
891 SVCXPRT_EXT *ext;
892
893 xprt = mem_alloc(sizeof(SVCXPRT));
894 ext = mem_alloc(sizeof(SVCXPRT_EXT));
895 xprt->xp_p3 = ext;
896 refcount_init(&xprt->xp_refs, 1);
897
898 return (xprt);
899 }
900
901 /*
902 * Free a server transport structure.
903 */
904 void
905 svc_xprt_free(SVCXPRT *xprt)
906 {
907
908 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
909 mem_free(xprt, sizeof(SVCXPRT));
910 }
911
912 /* ******************* SERVER INPUT STUFF ******************* */
913
914 /*
915 * Read RPC requests from a transport and queue them to be
916 * executed. We handle authentication and replay cache replies here.
917 * Actually dispatching the RPC is deferred till svc_executereq.
918 */
919 static enum xprt_stat
920 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
921 {
922 SVCPOOL *pool = xprt->xp_pool;
923 struct svc_req *r;
924 struct rpc_msg msg;
925 struct mbuf *args;
926 struct svc_loss_callout *s;
927 enum xprt_stat stat;
928
929 /* now receive msgs from xprtprt (support batch calls) */
930 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
931
932 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
933 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
934 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
935 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
936 enum auth_stat why;
937
938 /*
939 * Handle replays and authenticate before queuing the
940 * request to be executed.
941 */
942 SVC_ACQUIRE(xprt);
943 r->rq_xprt = xprt;
944 if (pool->sp_rcache) {
945 struct rpc_msg repmsg;
946 struct mbuf *repbody;
947 enum replay_state rs;
948 rs = replay_find(pool->sp_rcache, &msg,
949 svc_getrpccaller(r), &repmsg, &repbody);
950 switch (rs) {
951 case RS_NEW:
952 break;
953 case RS_DONE:
954 SVC_REPLY(xprt, &repmsg, r->rq_addr,
955 repbody, &r->rq_reply_seq);
956 if (r->rq_addr) {
957 free(r->rq_addr, M_SONAME);
958 r->rq_addr = NULL;
959 }
960 m_freem(args);
961 goto call_done;
962
963 default:
964 m_freem(args);
965 goto call_done;
966 }
967 }
968
969 r->rq_xid = msg.rm_xid;
970 r->rq_prog = msg.rm_call.cb_prog;
971 r->rq_vers = msg.rm_call.cb_vers;
972 r->rq_proc = msg.rm_call.cb_proc;
973 r->rq_size = sizeof(*r) + m_length(args, NULL);
974 r->rq_args = args;
975 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
976 /*
977 * RPCSEC_GSS uses this return code
978 * for requests that form part of its
979 * context establishment protocol and
980 * should not be dispatched to the
981 * application.
982 */
983 if (why != RPCSEC_GSS_NODISPATCH)
984 svcerr_auth(r, why);
985 goto call_done;
986 }
987
988 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
989 svcerr_decode(r);
990 goto call_done;
991 }
992
993 /*
994 * Everything checks out, return request to caller.
995 */
996 *rqstp_ret = r;
997 r = NULL;
998 }
999 call_done:
1000 if (r) {
1001 svc_freereq(r);
1002 r = NULL;
1003 }
1004 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1005 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1006 (*s->slc_dispatch)(xprt);
1007 xprt_unregister(xprt);
1008 }
1009
1010 return (stat);
1011 }
1012
1013 static void
1014 svc_executereq(struct svc_req *rqstp)
1015 {
1016 SVCXPRT *xprt = rqstp->rq_xprt;
1017 SVCPOOL *pool = xprt->xp_pool;
1018 int prog_found;
1019 rpcvers_t low_vers;
1020 rpcvers_t high_vers;
1021 struct svc_callout *s;
1022
1023 /* now match message with a registered service*/
1024 prog_found = FALSE;
1025 low_vers = (rpcvers_t) -1L;
1026 high_vers = (rpcvers_t) 0L;
1027 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1028 if (s->sc_prog == rqstp->rq_prog) {
1029 if (s->sc_vers == rqstp->rq_vers) {
1030 /*
1031 * We hand ownership of r to the
1032 * dispatch method - they must call
1033 * svc_freereq.
1034 */
1035 (*s->sc_dispatch)(rqstp, xprt);
1036 return;
1037 } /* found correct version */
1038 prog_found = TRUE;
1039 if (s->sc_vers < low_vers)
1040 low_vers = s->sc_vers;
1041 if (s->sc_vers > high_vers)
1042 high_vers = s->sc_vers;
1043 } /* found correct program */
1044 }
1045
1046 /*
1047 * if we got here, the program or version
1048 * is not served ...
1049 */
1050 if (prog_found)
1051 svcerr_progvers(rqstp, low_vers, high_vers);
1052 else
1053 svcerr_noprog(rqstp);
1054
1055 svc_freereq(rqstp);
1056 }
1057
1058 static void
1059 svc_checkidle(SVCGROUP *grp)
1060 {
1061 SVCXPRT *xprt, *nxprt;
1062 time_t timo;
1063 struct svcxprt_list cleanup;
1064
1065 TAILQ_INIT(&cleanup);
1066 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1067 /*
1068 * Only some transports have idle timers. Don't time
1069 * something out which is just waking up.
1070 */
1071 if (!xprt->xp_idletimeout || xprt->xp_thread)
1072 continue;
1073
1074 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1075 if (time_uptime > timo) {
1076 xprt_unregister_locked(xprt);
1077 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1078 }
1079 }
1080
1081 mtx_unlock(&grp->sg_lock);
1082 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1083 soshutdown(xprt->xp_socket, SHUT_WR);
1084 SVC_RELEASE(xprt);
1085 }
1086 mtx_lock(&grp->sg_lock);
1087 }
1088
1089 static void
1090 svc_assign_waiting_sockets(SVCPOOL *pool)
1091 {
1092 SVCGROUP *grp;
1093 SVCXPRT *xprt;
1094 int g;
1095
1096 for (g = 0; g < pool->sp_groupcount; g++) {
1097 grp = &pool->sp_groups[g];
1098 mtx_lock(&grp->sg_lock);
1099 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1100 if (xprt_assignthread(xprt))
1101 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1102 else
1103 break;
1104 }
1105 mtx_unlock(&grp->sg_lock);
1106 }
1107 }
1108
1109 static void
1110 svc_change_space_used(SVCPOOL *pool, long delta)
1111 {
1112 unsigned long value;
1113
1114 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1115 if (delta > 0) {
1116 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1117 pool->sp_space_throttled = TRUE;
1118 pool->sp_space_throttle_count++;
1119 }
1120 if (value > pool->sp_space_used_highest)
1121 pool->sp_space_used_highest = value;
1122 } else {
1123 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1124 pool->sp_space_throttled = FALSE;
1125 svc_assign_waiting_sockets(pool);
1126 }
1127 }
1128 }
1129
1130 static bool_t
1131 svc_request_space_available(SVCPOOL *pool)
1132 {
1133
1134 if (pool->sp_space_throttled)
1135 return (FALSE);
1136 return (TRUE);
1137 }
1138
1139 static void
1140 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1141 {
1142 SVCPOOL *pool = grp->sg_pool;
1143 SVCTHREAD *st, *stpref;
1144 SVCXPRT *xprt;
1145 enum xprt_stat stat;
1146 struct svc_req *rqstp;
1147 struct proc *p;
1148 long sz;
1149 int error;
1150
1151 st = mem_alloc(sizeof(*st));
1152 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1153 st->st_pool = pool;
1154 st->st_xprt = NULL;
1155 STAILQ_INIT(&st->st_reqs);
1156 cv_init(&st->st_cond, "rpcsvc");
1157
1158 mtx_lock(&grp->sg_lock);
1159
1160 /*
1161 * If we are a new thread which was spawned to cope with
1162 * increased load, set the state back to SVCPOOL_ACTIVE.
1163 */
1164 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1165 grp->sg_state = SVCPOOL_ACTIVE;
1166
1167 while (grp->sg_state != SVCPOOL_CLOSING) {
1168 /*
1169 * Create new thread if requested.
1170 */
1171 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1172 grp->sg_state = SVCPOOL_THREADSTARTING;
1173 grp->sg_lastcreatetime = time_uptime;
1174 mtx_unlock(&grp->sg_lock);
1175 svc_new_thread(grp);
1176 mtx_lock(&grp->sg_lock);
1177 continue;
1178 }
1179
1180 /*
1181 * Check for idle transports once per second.
1182 */
1183 if (time_uptime > grp->sg_lastidlecheck) {
1184 grp->sg_lastidlecheck = time_uptime;
1185 svc_checkidle(grp);
1186 }
1187
1188 xprt = st->st_xprt;
1189 if (!xprt) {
1190 /*
1191 * Enforce maxthreads count.
1192 */
1193 if (!ismaster && grp->sg_threadcount >
1194 grp->sg_maxthreads)
1195 break;
1196
1197 /*
1198 * Before sleeping, see if we can find an
1199 * active transport which isn't being serviced
1200 * by a thread.
1201 */
1202 if (svc_request_space_available(pool) &&
1203 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1204 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1205 SVC_ACQUIRE(xprt);
1206 xprt->xp_thread = st;
1207 st->st_xprt = xprt;
1208 continue;
1209 }
1210
1211 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1212 if (ismaster || (!ismaster &&
1213 grp->sg_threadcount > grp->sg_minthreads))
1214 error = cv_timedwait_sig(&st->st_cond,
1215 &grp->sg_lock, 5 * hz);
1216 else
1217 error = cv_wait_sig(&st->st_cond,
1218 &grp->sg_lock);
1219 if (st->st_xprt == NULL)
1220 LIST_REMOVE(st, st_ilink);
1221
1222 /*
1223 * Reduce worker thread count when idle.
1224 */
1225 if (error == EWOULDBLOCK) {
1226 if (!ismaster
1227 && (grp->sg_threadcount
1228 > grp->sg_minthreads)
1229 && !st->st_xprt)
1230 break;
1231 } else if (error != 0) {
1232 KASSERT(error == EINTR || error == ERESTART,
1233 ("non-signal error %d", error));
1234 mtx_unlock(&grp->sg_lock);
1235 p = curproc;
1236 PROC_LOCK(p);
1237 if (P_SHOULDSTOP(p) ||
1238 (p->p_flag & P_TOTAL_STOP) != 0) {
1239 thread_suspend_check(0);
1240 PROC_UNLOCK(p);
1241 mtx_lock(&grp->sg_lock);
1242 } else {
1243 PROC_UNLOCK(p);
1244 svc_exit(pool);
1245 mtx_lock(&grp->sg_lock);
1246 break;
1247 }
1248 }
1249 continue;
1250 }
1251 mtx_unlock(&grp->sg_lock);
1252
1253 /*
1254 * Drain the transport socket and queue up any RPCs.
1255 */
1256 xprt->xp_lastactive = time_uptime;
1257 do {
1258 if (!svc_request_space_available(pool))
1259 break;
1260 rqstp = NULL;
1261 stat = svc_getreq(xprt, &rqstp);
1262 if (rqstp) {
1263 svc_change_space_used(pool, rqstp->rq_size);
1264 /*
1265 * See if the application has a preference
1266 * for some other thread.
1267 */
1268 if (pool->sp_assign) {
1269 stpref = pool->sp_assign(st, rqstp);
1270 rqstp->rq_thread = stpref;
1271 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1272 rqstp, rq_link);
1273 mtx_unlock(&stpref->st_lock);
1274 if (stpref != st)
1275 rqstp = NULL;
1276 } else {
1277 rqstp->rq_thread = st;
1278 STAILQ_INSERT_TAIL(&st->st_reqs,
1279 rqstp, rq_link);
1280 }
1281 }
1282 } while (rqstp == NULL && stat == XPRT_MOREREQS
1283 && grp->sg_state != SVCPOOL_CLOSING);
1284
1285 /*
1286 * Move this transport to the end of the active list to
1287 * ensure fairness when multiple transports are active.
1288 * If this was the last queued request, svc_getreq will end
1289 * up calling xprt_inactive to remove from the active list.
1290 */
1291 mtx_lock(&grp->sg_lock);
1292 xprt->xp_thread = NULL;
1293 st->st_xprt = NULL;
1294 if (xprt->xp_active) {
1295 if (!svc_request_space_available(pool) ||
1296 !xprt_assignthread(xprt))
1297 TAILQ_INSERT_TAIL(&grp->sg_active,
1298 xprt, xp_alink);
1299 }
1300 mtx_unlock(&grp->sg_lock);
1301 SVC_RELEASE(xprt);
1302
1303 /*
1304 * Execute what we have queued.
1305 */
1306 mtx_lock(&st->st_lock);
1307 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1308 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1309 mtx_unlock(&st->st_lock);
1310 sz = (long)rqstp->rq_size;
1311 svc_executereq(rqstp);
1312 svc_change_space_used(pool, -sz);
1313 mtx_lock(&st->st_lock);
1314 }
1315 mtx_unlock(&st->st_lock);
1316 mtx_lock(&grp->sg_lock);
1317 }
1318
1319 if (st->st_xprt) {
1320 xprt = st->st_xprt;
1321 st->st_xprt = NULL;
1322 SVC_RELEASE(xprt);
1323 }
1324 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1325 mtx_destroy(&st->st_lock);
1326 cv_destroy(&st->st_cond);
1327 mem_free(st, sizeof(*st));
1328
1329 grp->sg_threadcount--;
1330 if (!ismaster)
1331 wakeup(grp);
1332 mtx_unlock(&grp->sg_lock);
1333 }
1334
1335 static void
1336 svc_thread_start(void *arg)
1337 {
1338
1339 svc_run_internal((SVCGROUP *) arg, FALSE);
1340 kthread_exit();
1341 }
1342
1343 static void
1344 svc_new_thread(SVCGROUP *grp)
1345 {
1346 SVCPOOL *pool = grp->sg_pool;
1347 struct thread *td;
1348
1349 mtx_lock(&grp->sg_lock);
1350 grp->sg_threadcount++;
1351 mtx_unlock(&grp->sg_lock);
1352 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1353 "%s: service", pool->sp_name);
1354 }
1355
1356 void
1357 svc_run(SVCPOOL *pool)
1358 {
1359 int g, i;
1360 struct proc *p;
1361 struct thread *td;
1362 SVCGROUP *grp;
1363
1364 p = curproc;
1365 td = curthread;
1366 snprintf(td->td_name, sizeof(td->td_name),
1367 "%s: master", pool->sp_name);
1368 pool->sp_state = SVCPOOL_ACTIVE;
1369 pool->sp_proc = p;
1370
1371 /* Choose group count based on number of threads and CPUs. */
1372 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1373 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1374 for (g = 0; g < pool->sp_groupcount; g++) {
1375 grp = &pool->sp_groups[g];
1376 grp->sg_minthreads = max(1,
1377 pool->sp_minthreads / pool->sp_groupcount);
1378 grp->sg_maxthreads = max(1,
1379 pool->sp_maxthreads / pool->sp_groupcount);
1380 grp->sg_lastcreatetime = time_uptime;
1381 }
1382
1383 /* Starting threads */
1384 pool->sp_groups[0].sg_threadcount++;
1385 for (g = 0; g < pool->sp_groupcount; g++) {
1386 grp = &pool->sp_groups[g];
1387 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1388 svc_new_thread(grp);
1389 }
1390 svc_run_internal(&pool->sp_groups[0], TRUE);
1391
1392 /* Waiting for threads to stop. */
1393 for (g = 0; g < pool->sp_groupcount; g++) {
1394 grp = &pool->sp_groups[g];
1395 mtx_lock(&grp->sg_lock);
1396 while (grp->sg_threadcount > 0)
1397 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1398 mtx_unlock(&grp->sg_lock);
1399 }
1400 }
1401
1402 void
1403 svc_exit(SVCPOOL *pool)
1404 {
1405 SVCGROUP *grp;
1406 SVCTHREAD *st;
1407 int g;
1408
1409 pool->sp_state = SVCPOOL_CLOSING;
1410 for (g = 0; g < pool->sp_groupcount; g++) {
1411 grp = &pool->sp_groups[g];
1412 mtx_lock(&grp->sg_lock);
1413 if (grp->sg_state != SVCPOOL_CLOSING) {
1414 grp->sg_state = SVCPOOL_CLOSING;
1415 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1416 cv_signal(&st->st_cond);
1417 }
1418 mtx_unlock(&grp->sg_lock);
1419 }
1420 }
1421
1422 bool_t
1423 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1424 {
1425 struct mbuf *m;
1426 XDR xdrs;
1427 bool_t stat;
1428
1429 m = rqstp->rq_args;
1430 rqstp->rq_args = NULL;
1431
1432 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1433 stat = xargs(&xdrs, args);
1434 XDR_DESTROY(&xdrs);
1435
1436 return (stat);
1437 }
1438
1439 bool_t
1440 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1441 {
1442 XDR xdrs;
1443
1444 if (rqstp->rq_addr) {
1445 free(rqstp->rq_addr, M_SONAME);
1446 rqstp->rq_addr = NULL;
1447 }
1448
1449 xdrs.x_op = XDR_FREE;
1450 return (xargs(&xdrs, args));
1451 }
1452
1453 void
1454 svc_freereq(struct svc_req *rqstp)
1455 {
1456 SVCTHREAD *st;
1457 SVCPOOL *pool;
1458
1459 st = rqstp->rq_thread;
1460 if (st) {
1461 pool = st->st_pool;
1462 if (pool->sp_done)
1463 pool->sp_done(st, rqstp);
1464 }
1465
1466 if (rqstp->rq_auth.svc_ah_ops)
1467 SVCAUTH_RELEASE(&rqstp->rq_auth);
1468
1469 if (rqstp->rq_xprt) {
1470 SVC_RELEASE(rqstp->rq_xprt);
1471 }
1472
1473 if (rqstp->rq_addr)
1474 free(rqstp->rq_addr, M_SONAME);
1475
1476 if (rqstp->rq_args)
1477 m_freem(rqstp->rq_args);
1478
1479 free(rqstp, M_RPC);
1480 }
Cache object: 8879d27ade3253cc051ac9666ea42a7e
|