FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc.c
1 /* $NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $ */
2
3 /*-
4 * SPDX-License-Identifier: BSD-3-Clause
5 *
6 * Copyright (c) 2009, Sun Microsystems, Inc.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 * - Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * - Redistributions in binary form must reproduce the above copyright notice,
14 * this list of conditions and the following disclaimer in the documentation
15 * and/or other materials provided with the distribution.
16 * - Neither the name of Sun Microsystems, Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c 2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39
40 /*
41 * svc.c, Server-side remote procedure call interface.
42 *
43 * There are two sets of procedures here. The xprt routines are
44 * for handling transport handles. The svc routines handle the
45 * list of service routines.
46 *
47 * Copyright (C) 1984, Sun Microsystems, Inc.
48 */
49
50 #include <sys/param.h>
51 #include <sys/jail.h>
52 #include <sys/lock.h>
53 #include <sys/kernel.h>
54 #include <sys/kthread.h>
55 #include <sys/malloc.h>
56 #include <sys/mbuf.h>
57 #include <sys/mutex.h>
58 #include <sys/proc.h>
59 #include <sys/queue.h>
60 #include <sys/socketvar.h>
61 #include <sys/systm.h>
62 #include <sys/smp.h>
63 #include <sys/sx.h>
64 #include <sys/ucred.h>
65
66 #include <rpc/rpc.h>
67 #include <rpc/rpcb_clnt.h>
68 #include <rpc/replay.h>
69
70 #include <rpc/rpc_com.h>
71
72 #define SVC_VERSQUIET 0x0001 /* keep quiet about vers mismatch */
73 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
74
75 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
76 char *);
77 static void svc_new_thread(SVCGROUP *grp);
78 static void xprt_unregister_locked(SVCXPRT *xprt);
79 static void svc_change_space_used(SVCPOOL *pool, long delta);
80 static bool_t svc_request_space_available(SVCPOOL *pool);
81 static void svcpool_cleanup(SVCPOOL *pool);
82
83 /* *************** SVCXPRT related stuff **************** */
84
85 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
87 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
88
89 SVCPOOL*
90 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
91 {
92 SVCPOOL *pool;
93 SVCGROUP *grp;
94 int g;
95
96 pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
97
98 mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
99 pool->sp_name = name;
100 pool->sp_state = SVCPOOL_INIT;
101 pool->sp_proc = NULL;
102 TAILQ_INIT(&pool->sp_callouts);
103 TAILQ_INIT(&pool->sp_lcallouts);
104 pool->sp_minthreads = 1;
105 pool->sp_maxthreads = 1;
106 pool->sp_groupcount = 1;
107 for (g = 0; g < SVC_MAXGROUPS; g++) {
108 grp = &pool->sp_groups[g];
109 mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
110 grp->sg_pool = pool;
111 grp->sg_state = SVCPOOL_ACTIVE;
112 TAILQ_INIT(&grp->sg_xlist);
113 TAILQ_INIT(&grp->sg_active);
114 LIST_INIT(&grp->sg_idlethreads);
115 grp->sg_minthreads = 1;
116 grp->sg_maxthreads = 1;
117 }
118
119 /*
120 * Don't use more than a quarter of mbuf clusters. Nota bene:
121 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
122 * on LP64 architectures, so cast to u_long to avoid undefined
123 * behavior. (ILP32 architectures cannot have nmbclusters
124 * large enough to overflow for other reasons.)
125 */
126 pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
127 pool->sp_space_low = (pool->sp_space_high / 3) * 2;
128
129 sysctl_ctx_init(&pool->sp_sysctl);
130 if (!jailed(curthread->td_ucred) && sysctl_base) {
131 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
133 pool, 0, svcpool_minthread_sysctl, "I",
134 "Minimal number of threads");
135 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
136 "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
137 pool, 0, svcpool_maxthread_sysctl, "I",
138 "Maximal number of threads");
139 SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
140 "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
141 pool, 0, svcpool_threads_sysctl, "I",
142 "Current number of threads");
143 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
144 "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
145 "Number of thread groups");
146
147 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
148 "request_space_used", CTLFLAG_RD,
149 &pool->sp_space_used,
150 "Space in parsed but not handled requests.");
151
152 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
153 "request_space_used_highest", CTLFLAG_RD,
154 &pool->sp_space_used_highest,
155 "Highest space used since reboot.");
156
157 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
158 "request_space_high", CTLFLAG_RW,
159 &pool->sp_space_high,
160 "Maximum space in parsed but not handled requests.");
161
162 SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
163 "request_space_low", CTLFLAG_RW,
164 &pool->sp_space_low,
165 "Low water mark for request space.");
166
167 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
168 "request_space_throttled", CTLFLAG_RD,
169 &pool->sp_space_throttled, 0,
170 "Whether nfs requests are currently throttled");
171
172 SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
173 "request_space_throttle_count", CTLFLAG_RD,
174 &pool->sp_space_throttle_count, 0,
175 "Count of times throttling based on request space has occurred");
176 }
177
178 return pool;
179 }
180
181 /*
182 * Code common to svcpool_destroy() and svcpool_close(), which cleans up
183 * the pool data structures.
184 */
185 static void
186 svcpool_cleanup(SVCPOOL *pool)
187 {
188 SVCGROUP *grp;
189 SVCXPRT *xprt, *nxprt;
190 struct svc_callout *s;
191 struct svc_loss_callout *sl;
192 struct svcxprt_list cleanup;
193 int g;
194
195 TAILQ_INIT(&cleanup);
196
197 for (g = 0; g < SVC_MAXGROUPS; g++) {
198 grp = &pool->sp_groups[g];
199 mtx_lock(&grp->sg_lock);
200 while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
201 xprt_unregister_locked(xprt);
202 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
203 }
204 mtx_unlock(&grp->sg_lock);
205 }
206 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
207 if (xprt->xp_socket != NULL)
208 soshutdown(xprt->xp_socket, SHUT_WR);
209 SVC_RELEASE(xprt);
210 }
211
212 mtx_lock(&pool->sp_lock);
213 while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
214 mtx_unlock(&pool->sp_lock);
215 svc_unreg(pool, s->sc_prog, s->sc_vers);
216 mtx_lock(&pool->sp_lock);
217 }
218 while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
219 mtx_unlock(&pool->sp_lock);
220 svc_loss_unreg(pool, sl->slc_dispatch);
221 mtx_lock(&pool->sp_lock);
222 }
223 mtx_unlock(&pool->sp_lock);
224 }
225
226 void
227 svcpool_destroy(SVCPOOL *pool)
228 {
229 SVCGROUP *grp;
230 int g;
231
232 svcpool_cleanup(pool);
233
234 for (g = 0; g < SVC_MAXGROUPS; g++) {
235 grp = &pool->sp_groups[g];
236 mtx_destroy(&grp->sg_lock);
237 }
238 mtx_destroy(&pool->sp_lock);
239
240 if (pool->sp_rcache)
241 replay_freecache(pool->sp_rcache);
242
243 sysctl_ctx_free(&pool->sp_sysctl);
244 free(pool, M_RPC);
245 }
246
247 /*
248 * Similar to svcpool_destroy(), except that it does not destroy the actual
249 * data structures. As such, "pool" may be used again.
250 */
251 void
252 svcpool_close(SVCPOOL *pool)
253 {
254 SVCGROUP *grp;
255 int g;
256
257 svcpool_cleanup(pool);
258
259 /* Now, initialize the pool's state for a fresh svc_run() call. */
260 mtx_lock(&pool->sp_lock);
261 pool->sp_state = SVCPOOL_INIT;
262 mtx_unlock(&pool->sp_lock);
263 for (g = 0; g < SVC_MAXGROUPS; g++) {
264 grp = &pool->sp_groups[g];
265 mtx_lock(&grp->sg_lock);
266 grp->sg_state = SVCPOOL_ACTIVE;
267 mtx_unlock(&grp->sg_lock);
268 }
269 }
270
271 /*
272 * Sysctl handler to get the present thread count on a pool
273 */
274 static int
275 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
276 {
277 SVCPOOL *pool;
278 int threads, error, g;
279
280 pool = oidp->oid_arg1;
281 threads = 0;
282 mtx_lock(&pool->sp_lock);
283 for (g = 0; g < pool->sp_groupcount; g++)
284 threads += pool->sp_groups[g].sg_threadcount;
285 mtx_unlock(&pool->sp_lock);
286 error = sysctl_handle_int(oidp, &threads, 0, req);
287 return (error);
288 }
289
290 /*
291 * Sysctl handler to set the minimum thread count on a pool
292 */
293 static int
294 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
295 {
296 SVCPOOL *pool;
297 int newminthreads, error, g;
298
299 pool = oidp->oid_arg1;
300 newminthreads = pool->sp_minthreads;
301 error = sysctl_handle_int(oidp, &newminthreads, 0, req);
302 if (error == 0 && newminthreads != pool->sp_minthreads) {
303 if (newminthreads > pool->sp_maxthreads)
304 return (EINVAL);
305 mtx_lock(&pool->sp_lock);
306 pool->sp_minthreads = newminthreads;
307 for (g = 0; g < pool->sp_groupcount; g++) {
308 pool->sp_groups[g].sg_minthreads = max(1,
309 pool->sp_minthreads / pool->sp_groupcount);
310 }
311 mtx_unlock(&pool->sp_lock);
312 }
313 return (error);
314 }
315
316 /*
317 * Sysctl handler to set the maximum thread count on a pool
318 */
319 static int
320 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
321 {
322 SVCPOOL *pool;
323 int newmaxthreads, error, g;
324
325 pool = oidp->oid_arg1;
326 newmaxthreads = pool->sp_maxthreads;
327 error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
328 if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
329 if (newmaxthreads < pool->sp_minthreads)
330 return (EINVAL);
331 mtx_lock(&pool->sp_lock);
332 pool->sp_maxthreads = newmaxthreads;
333 for (g = 0; g < pool->sp_groupcount; g++) {
334 pool->sp_groups[g].sg_maxthreads = max(1,
335 pool->sp_maxthreads / pool->sp_groupcount);
336 }
337 mtx_unlock(&pool->sp_lock);
338 }
339 return (error);
340 }
341
342 /*
343 * Activate a transport handle.
344 */
345 void
346 xprt_register(SVCXPRT *xprt)
347 {
348 SVCPOOL *pool = xprt->xp_pool;
349 SVCGROUP *grp;
350 int g;
351
352 SVC_ACQUIRE(xprt);
353 g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
354 xprt->xp_group = grp = &pool->sp_groups[g];
355 mtx_lock(&grp->sg_lock);
356 xprt->xp_registered = TRUE;
357 xprt->xp_active = FALSE;
358 TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
359 mtx_unlock(&grp->sg_lock);
360 }
361
362 /*
363 * De-activate a transport handle. Note: the locked version doesn't
364 * release the transport - caller must do that after dropping the pool
365 * lock.
366 */
367 static void
368 xprt_unregister_locked(SVCXPRT *xprt)
369 {
370 SVCGROUP *grp = xprt->xp_group;
371
372 mtx_assert(&grp->sg_lock, MA_OWNED);
373 KASSERT(xprt->xp_registered == TRUE,
374 ("xprt_unregister_locked: not registered"));
375 xprt_inactive_locked(xprt);
376 TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
377 xprt->xp_registered = FALSE;
378 }
379
380 void
381 xprt_unregister(SVCXPRT *xprt)
382 {
383 SVCGROUP *grp = xprt->xp_group;
384
385 mtx_lock(&grp->sg_lock);
386 if (xprt->xp_registered == FALSE) {
387 /* Already unregistered by another thread */
388 mtx_unlock(&grp->sg_lock);
389 return;
390 }
391 xprt_unregister_locked(xprt);
392 mtx_unlock(&grp->sg_lock);
393
394 if (xprt->xp_socket != NULL)
395 soshutdown(xprt->xp_socket, SHUT_WR);
396 SVC_RELEASE(xprt);
397 }
398
399 /*
400 * Attempt to assign a service thread to this transport.
401 */
402 static int
403 xprt_assignthread(SVCXPRT *xprt)
404 {
405 SVCGROUP *grp = xprt->xp_group;
406 SVCTHREAD *st;
407
408 mtx_assert(&grp->sg_lock, MA_OWNED);
409 st = LIST_FIRST(&grp->sg_idlethreads);
410 if (st) {
411 LIST_REMOVE(st, st_ilink);
412 SVC_ACQUIRE(xprt);
413 xprt->xp_thread = st;
414 st->st_xprt = xprt;
415 cv_signal(&st->st_cond);
416 return (TRUE);
417 } else {
418 /*
419 * See if we can create a new thread. The
420 * actual thread creation happens in
421 * svc_run_internal because our locking state
422 * is poorly defined (we are typically called
423 * from a socket upcall). Don't create more
424 * than one thread per second.
425 */
426 if (grp->sg_state == SVCPOOL_ACTIVE
427 && grp->sg_lastcreatetime < time_uptime
428 && grp->sg_threadcount < grp->sg_maxthreads) {
429 grp->sg_state = SVCPOOL_THREADWANTED;
430 }
431 }
432 return (FALSE);
433 }
434
435 void
436 xprt_active(SVCXPRT *xprt)
437 {
438 SVCGROUP *grp = xprt->xp_group;
439
440 mtx_lock(&grp->sg_lock);
441
442 if (!xprt->xp_registered) {
443 /*
444 * Race with xprt_unregister - we lose.
445 */
446 mtx_unlock(&grp->sg_lock);
447 return;
448 }
449
450 if (!xprt->xp_active) {
451 xprt->xp_active = TRUE;
452 if (xprt->xp_thread == NULL) {
453 if (!svc_request_space_available(xprt->xp_pool) ||
454 !xprt_assignthread(xprt))
455 TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
456 xp_alink);
457 }
458 }
459
460 mtx_unlock(&grp->sg_lock);
461 }
462
463 void
464 xprt_inactive_locked(SVCXPRT *xprt)
465 {
466 SVCGROUP *grp = xprt->xp_group;
467
468 mtx_assert(&grp->sg_lock, MA_OWNED);
469 if (xprt->xp_active) {
470 if (xprt->xp_thread == NULL)
471 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
472 xprt->xp_active = FALSE;
473 }
474 }
475
476 void
477 xprt_inactive(SVCXPRT *xprt)
478 {
479 SVCGROUP *grp = xprt->xp_group;
480
481 mtx_lock(&grp->sg_lock);
482 xprt_inactive_locked(xprt);
483 mtx_unlock(&grp->sg_lock);
484 }
485
486 /*
487 * Variant of xprt_inactive() for use only when sure that port is
488 * assigned to thread. For example, within receive handlers.
489 */
490 void
491 xprt_inactive_self(SVCXPRT *xprt)
492 {
493
494 KASSERT(xprt->xp_thread != NULL,
495 ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
496 xprt->xp_active = FALSE;
497 }
498
499 /*
500 * Add a service program to the callout list.
501 * The dispatch routine will be called when a rpc request for this
502 * program number comes in.
503 */
504 bool_t
505 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
506 void (*dispatch)(struct svc_req *, SVCXPRT *),
507 const struct netconfig *nconf)
508 {
509 SVCPOOL *pool = xprt->xp_pool;
510 struct svc_callout *s;
511 char *netid = NULL;
512 int flag = 0;
513
514 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
515
516 if (xprt->xp_netid) {
517 netid = strdup(xprt->xp_netid, M_RPC);
518 flag = 1;
519 } else if (nconf && nconf->nc_netid) {
520 netid = strdup(nconf->nc_netid, M_RPC);
521 flag = 1;
522 } /* must have been created with svc_raw_create */
523 if ((netid == NULL) && (flag == 1)) {
524 return (FALSE);
525 }
526
527 mtx_lock(&pool->sp_lock);
528 if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
529 if (netid)
530 free(netid, M_RPC);
531 if (s->sc_dispatch == dispatch)
532 goto rpcb_it; /* he is registering another xptr */
533 mtx_unlock(&pool->sp_lock);
534 return (FALSE);
535 }
536 s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
537 if (s == NULL) {
538 if (netid)
539 free(netid, M_RPC);
540 mtx_unlock(&pool->sp_lock);
541 return (FALSE);
542 }
543
544 s->sc_prog = prog;
545 s->sc_vers = vers;
546 s->sc_dispatch = dispatch;
547 s->sc_netid = netid;
548 TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
549
550 if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
551 ((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
552
553 rpcb_it:
554 mtx_unlock(&pool->sp_lock);
555 /* now register the information with the local binder service */
556 if (nconf) {
557 bool_t dummy;
558 struct netconfig tnc;
559 struct netbuf nb;
560 tnc = *nconf;
561 nb.buf = &xprt->xp_ltaddr;
562 nb.len = xprt->xp_ltaddr.ss_len;
563 dummy = rpcb_set(prog, vers, &tnc, &nb);
564 return (dummy);
565 }
566 return (TRUE);
567 }
568
569 /*
570 * Remove a service program from the callout list.
571 */
572 void
573 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
574 {
575 struct svc_callout *s;
576
577 /* unregister the information anyway */
578 (void) rpcb_unset(prog, vers, NULL);
579 mtx_lock(&pool->sp_lock);
580 while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
581 TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
582 if (s->sc_netid)
583 mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
584 mem_free(s, sizeof (struct svc_callout));
585 }
586 mtx_unlock(&pool->sp_lock);
587 }
588
589 /*
590 * Add a service connection loss program to the callout list.
591 * The dispatch routine will be called when some port in ths pool die.
592 */
593 bool_t
594 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
595 {
596 SVCPOOL *pool = xprt->xp_pool;
597 struct svc_loss_callout *s;
598
599 mtx_lock(&pool->sp_lock);
600 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
601 if (s->slc_dispatch == dispatch)
602 break;
603 }
604 if (s != NULL) {
605 mtx_unlock(&pool->sp_lock);
606 return (TRUE);
607 }
608 s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
609 if (s == NULL) {
610 mtx_unlock(&pool->sp_lock);
611 return (FALSE);
612 }
613 s->slc_dispatch = dispatch;
614 TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
615 mtx_unlock(&pool->sp_lock);
616 return (TRUE);
617 }
618
619 /*
620 * Remove a service connection loss program from the callout list.
621 */
622 void
623 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
624 {
625 struct svc_loss_callout *s;
626
627 mtx_lock(&pool->sp_lock);
628 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
629 if (s->slc_dispatch == dispatch) {
630 TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
631 free(s, M_RPC);
632 break;
633 }
634 }
635 mtx_unlock(&pool->sp_lock);
636 }
637
638 /* ********************** CALLOUT list related stuff ************* */
639
640 /*
641 * Search the callout list for a program number, return the callout
642 * struct.
643 */
644 static struct svc_callout *
645 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
646 {
647 struct svc_callout *s;
648
649 mtx_assert(&pool->sp_lock, MA_OWNED);
650 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
651 if (s->sc_prog == prog && s->sc_vers == vers
652 && (netid == NULL || s->sc_netid == NULL ||
653 strcmp(netid, s->sc_netid) == 0))
654 break;
655 }
656
657 return (s);
658 }
659
660 /* ******************* REPLY GENERATION ROUTINES ************ */
661
662 static bool_t
663 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
664 struct mbuf *body)
665 {
666 SVCXPRT *xprt = rqstp->rq_xprt;
667 bool_t ok;
668
669 if (rqstp->rq_args) {
670 m_freem(rqstp->rq_args);
671 rqstp->rq_args = NULL;
672 }
673
674 if (xprt->xp_pool->sp_rcache)
675 replay_setreply(xprt->xp_pool->sp_rcache,
676 rply, svc_getrpccaller(rqstp), body);
677
678 if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
679 return (FALSE);
680
681 ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
682 if (rqstp->rq_addr) {
683 free(rqstp->rq_addr, M_SONAME);
684 rqstp->rq_addr = NULL;
685 }
686
687 return (ok);
688 }
689
690 /*
691 * Send a reply to an rpc request
692 */
693 bool_t
694 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
695 {
696 struct rpc_msg rply;
697 struct mbuf *m;
698 XDR xdrs;
699 bool_t ok;
700
701 rply.rm_xid = rqstp->rq_xid;
702 rply.rm_direction = REPLY;
703 rply.rm_reply.rp_stat = MSG_ACCEPTED;
704 rply.acpted_rply.ar_verf = rqstp->rq_verf;
705 rply.acpted_rply.ar_stat = SUCCESS;
706 rply.acpted_rply.ar_results.where = NULL;
707 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
708
709 m = m_getcl(M_WAITOK, MT_DATA, 0);
710 xdrmbuf_create(&xdrs, m, XDR_ENCODE);
711 ok = xdr_results(&xdrs, xdr_location);
712 XDR_DESTROY(&xdrs);
713
714 if (ok) {
715 return (svc_sendreply_common(rqstp, &rply, m));
716 } else {
717 m_freem(m);
718 return (FALSE);
719 }
720 }
721
722 bool_t
723 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
724 {
725 struct rpc_msg rply;
726
727 rply.rm_xid = rqstp->rq_xid;
728 rply.rm_direction = REPLY;
729 rply.rm_reply.rp_stat = MSG_ACCEPTED;
730 rply.acpted_rply.ar_verf = rqstp->rq_verf;
731 rply.acpted_rply.ar_stat = SUCCESS;
732 rply.acpted_rply.ar_results.where = NULL;
733 rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
734
735 return (svc_sendreply_common(rqstp, &rply, m));
736 }
737
738 /*
739 * No procedure error reply
740 */
741 void
742 svcerr_noproc(struct svc_req *rqstp)
743 {
744 SVCXPRT *xprt = rqstp->rq_xprt;
745 struct rpc_msg rply;
746
747 rply.rm_xid = rqstp->rq_xid;
748 rply.rm_direction = REPLY;
749 rply.rm_reply.rp_stat = MSG_ACCEPTED;
750 rply.acpted_rply.ar_verf = rqstp->rq_verf;
751 rply.acpted_rply.ar_stat = PROC_UNAVAIL;
752
753 if (xprt->xp_pool->sp_rcache)
754 replay_setreply(xprt->xp_pool->sp_rcache,
755 &rply, svc_getrpccaller(rqstp), NULL);
756
757 svc_sendreply_common(rqstp, &rply, NULL);
758 }
759
760 /*
761 * Can't decode args error reply
762 */
763 void
764 svcerr_decode(struct svc_req *rqstp)
765 {
766 SVCXPRT *xprt = rqstp->rq_xprt;
767 struct rpc_msg rply;
768
769 rply.rm_xid = rqstp->rq_xid;
770 rply.rm_direction = REPLY;
771 rply.rm_reply.rp_stat = MSG_ACCEPTED;
772 rply.acpted_rply.ar_verf = rqstp->rq_verf;
773 rply.acpted_rply.ar_stat = GARBAGE_ARGS;
774
775 if (xprt->xp_pool->sp_rcache)
776 replay_setreply(xprt->xp_pool->sp_rcache,
777 &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
778
779 svc_sendreply_common(rqstp, &rply, NULL);
780 }
781
782 /*
783 * Some system error
784 */
785 void
786 svcerr_systemerr(struct svc_req *rqstp)
787 {
788 SVCXPRT *xprt = rqstp->rq_xprt;
789 struct rpc_msg rply;
790
791 rply.rm_xid = rqstp->rq_xid;
792 rply.rm_direction = REPLY;
793 rply.rm_reply.rp_stat = MSG_ACCEPTED;
794 rply.acpted_rply.ar_verf = rqstp->rq_verf;
795 rply.acpted_rply.ar_stat = SYSTEM_ERR;
796
797 if (xprt->xp_pool->sp_rcache)
798 replay_setreply(xprt->xp_pool->sp_rcache,
799 &rply, svc_getrpccaller(rqstp), NULL);
800
801 svc_sendreply_common(rqstp, &rply, NULL);
802 }
803
804 /*
805 * Authentication error reply
806 */
807 void
808 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
809 {
810 SVCXPRT *xprt = rqstp->rq_xprt;
811 struct rpc_msg rply;
812
813 rply.rm_xid = rqstp->rq_xid;
814 rply.rm_direction = REPLY;
815 rply.rm_reply.rp_stat = MSG_DENIED;
816 rply.rjcted_rply.rj_stat = AUTH_ERROR;
817 rply.rjcted_rply.rj_why = why;
818
819 if (xprt->xp_pool->sp_rcache)
820 replay_setreply(xprt->xp_pool->sp_rcache,
821 &rply, svc_getrpccaller(rqstp), NULL);
822
823 svc_sendreply_common(rqstp, &rply, NULL);
824 }
825
826 /*
827 * Auth too weak error reply
828 */
829 void
830 svcerr_weakauth(struct svc_req *rqstp)
831 {
832
833 svcerr_auth(rqstp, AUTH_TOOWEAK);
834 }
835
836 /*
837 * Program unavailable error reply
838 */
839 void
840 svcerr_noprog(struct svc_req *rqstp)
841 {
842 SVCXPRT *xprt = rqstp->rq_xprt;
843 struct rpc_msg rply;
844
845 rply.rm_xid = rqstp->rq_xid;
846 rply.rm_direction = REPLY;
847 rply.rm_reply.rp_stat = MSG_ACCEPTED;
848 rply.acpted_rply.ar_verf = rqstp->rq_verf;
849 rply.acpted_rply.ar_stat = PROG_UNAVAIL;
850
851 if (xprt->xp_pool->sp_rcache)
852 replay_setreply(xprt->xp_pool->sp_rcache,
853 &rply, svc_getrpccaller(rqstp), NULL);
854
855 svc_sendreply_common(rqstp, &rply, NULL);
856 }
857
858 /*
859 * Program version mismatch error reply
860 */
861 void
862 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
863 {
864 SVCXPRT *xprt = rqstp->rq_xprt;
865 struct rpc_msg rply;
866
867 rply.rm_xid = rqstp->rq_xid;
868 rply.rm_direction = REPLY;
869 rply.rm_reply.rp_stat = MSG_ACCEPTED;
870 rply.acpted_rply.ar_verf = rqstp->rq_verf;
871 rply.acpted_rply.ar_stat = PROG_MISMATCH;
872 rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
873 rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
874
875 if (xprt->xp_pool->sp_rcache)
876 replay_setreply(xprt->xp_pool->sp_rcache,
877 &rply, svc_getrpccaller(rqstp), NULL);
878
879 svc_sendreply_common(rqstp, &rply, NULL);
880 }
881
882 /*
883 * Allocate a new server transport structure. All fields are
884 * initialized to zero and xp_p3 is initialized to point at an
885 * extension structure to hold various flags and authentication
886 * parameters.
887 */
888 SVCXPRT *
889 svc_xprt_alloc(void)
890 {
891 SVCXPRT *xprt;
892 SVCXPRT_EXT *ext;
893
894 xprt = mem_alloc(sizeof(SVCXPRT));
895 ext = mem_alloc(sizeof(SVCXPRT_EXT));
896 xprt->xp_p3 = ext;
897 refcount_init(&xprt->xp_refs, 1);
898
899 return (xprt);
900 }
901
902 /*
903 * Free a server transport structure.
904 */
905 void
906 svc_xprt_free(SVCXPRT *xprt)
907 {
908
909 mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
910 /* The size argument is ignored, so 0 is ok. */
911 mem_free(xprt->xp_gidp, 0);
912 mem_free(xprt, sizeof(SVCXPRT));
913 }
914
915 /* ******************* SERVER INPUT STUFF ******************* */
916
917 /*
918 * Read RPC requests from a transport and queue them to be
919 * executed. We handle authentication and replay cache replies here.
920 * Actually dispatching the RPC is deferred till svc_executereq.
921 */
922 static enum xprt_stat
923 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
924 {
925 SVCPOOL *pool = xprt->xp_pool;
926 struct svc_req *r;
927 struct rpc_msg msg;
928 struct mbuf *args;
929 struct svc_loss_callout *s;
930 enum xprt_stat stat;
931
932 /* now receive msgs from xprtprt (support batch calls) */
933 r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
934
935 msg.rm_call.cb_cred.oa_base = r->rq_credarea;
936 msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
937 r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
938 if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
939 enum auth_stat why;
940
941 /*
942 * Handle replays and authenticate before queuing the
943 * request to be executed.
944 */
945 SVC_ACQUIRE(xprt);
946 r->rq_xprt = xprt;
947 if (pool->sp_rcache) {
948 struct rpc_msg repmsg;
949 struct mbuf *repbody;
950 enum replay_state rs;
951 rs = replay_find(pool->sp_rcache, &msg,
952 svc_getrpccaller(r), &repmsg, &repbody);
953 switch (rs) {
954 case RS_NEW:
955 break;
956 case RS_DONE:
957 SVC_REPLY(xprt, &repmsg, r->rq_addr,
958 repbody, &r->rq_reply_seq);
959 if (r->rq_addr) {
960 free(r->rq_addr, M_SONAME);
961 r->rq_addr = NULL;
962 }
963 m_freem(args);
964 goto call_done;
965
966 default:
967 m_freem(args);
968 goto call_done;
969 }
970 }
971
972 r->rq_xid = msg.rm_xid;
973 r->rq_prog = msg.rm_call.cb_prog;
974 r->rq_vers = msg.rm_call.cb_vers;
975 r->rq_proc = msg.rm_call.cb_proc;
976 r->rq_size = sizeof(*r) + m_length(args, NULL);
977 r->rq_args = args;
978 if ((why = _authenticate(r, &msg)) != AUTH_OK) {
979 /*
980 * RPCSEC_GSS uses this return code
981 * for requests that form part of its
982 * context establishment protocol and
983 * should not be dispatched to the
984 * application.
985 */
986 if (why != RPCSEC_GSS_NODISPATCH)
987 svcerr_auth(r, why);
988 goto call_done;
989 }
990
991 if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
992 svcerr_decode(r);
993 goto call_done;
994 }
995
996 /*
997 * Everything checks out, return request to caller.
998 */
999 *rqstp_ret = r;
1000 r = NULL;
1001 }
1002 call_done:
1003 if (r) {
1004 svc_freereq(r);
1005 r = NULL;
1006 }
1007 if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1008 TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1009 (*s->slc_dispatch)(xprt);
1010 xprt_unregister(xprt);
1011 }
1012
1013 return (stat);
1014 }
1015
1016 static void
1017 svc_executereq(struct svc_req *rqstp)
1018 {
1019 SVCXPRT *xprt = rqstp->rq_xprt;
1020 SVCPOOL *pool = xprt->xp_pool;
1021 int prog_found;
1022 rpcvers_t low_vers;
1023 rpcvers_t high_vers;
1024 struct svc_callout *s;
1025
1026 /* now match message with a registered service*/
1027 prog_found = FALSE;
1028 low_vers = (rpcvers_t) -1L;
1029 high_vers = (rpcvers_t) 0L;
1030 TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1031 if (s->sc_prog == rqstp->rq_prog) {
1032 if (s->sc_vers == rqstp->rq_vers) {
1033 /*
1034 * We hand ownership of r to the
1035 * dispatch method - they must call
1036 * svc_freereq.
1037 */
1038 (*s->sc_dispatch)(rqstp, xprt);
1039 return;
1040 } /* found correct version */
1041 prog_found = TRUE;
1042 if (s->sc_vers < low_vers)
1043 low_vers = s->sc_vers;
1044 if (s->sc_vers > high_vers)
1045 high_vers = s->sc_vers;
1046 } /* found correct program */
1047 }
1048
1049 /*
1050 * if we got here, the program or version
1051 * is not served ...
1052 */
1053 if (prog_found)
1054 svcerr_progvers(rqstp, low_vers, high_vers);
1055 else
1056 svcerr_noprog(rqstp);
1057
1058 svc_freereq(rqstp);
1059 }
1060
1061 static void
1062 svc_checkidle(SVCGROUP *grp)
1063 {
1064 SVCXPRT *xprt, *nxprt;
1065 time_t timo;
1066 struct svcxprt_list cleanup;
1067
1068 TAILQ_INIT(&cleanup);
1069 TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1070 /*
1071 * Only some transports have idle timers. Don't time
1072 * something out which is just waking up.
1073 */
1074 if (!xprt->xp_idletimeout || xprt->xp_thread)
1075 continue;
1076
1077 timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1078 if (time_uptime > timo) {
1079 xprt_unregister_locked(xprt);
1080 TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1081 }
1082 }
1083
1084 mtx_unlock(&grp->sg_lock);
1085 TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1086 soshutdown(xprt->xp_socket, SHUT_WR);
1087 SVC_RELEASE(xprt);
1088 }
1089 mtx_lock(&grp->sg_lock);
1090 }
1091
1092 static void
1093 svc_assign_waiting_sockets(SVCPOOL *pool)
1094 {
1095 SVCGROUP *grp;
1096 SVCXPRT *xprt;
1097 int g;
1098
1099 for (g = 0; g < pool->sp_groupcount; g++) {
1100 grp = &pool->sp_groups[g];
1101 mtx_lock(&grp->sg_lock);
1102 while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1103 if (xprt_assignthread(xprt))
1104 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1105 else
1106 break;
1107 }
1108 mtx_unlock(&grp->sg_lock);
1109 }
1110 }
1111
1112 static void
1113 svc_change_space_used(SVCPOOL *pool, long delta)
1114 {
1115 unsigned long value;
1116
1117 value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1118 if (delta > 0) {
1119 if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1120 pool->sp_space_throttled = TRUE;
1121 pool->sp_space_throttle_count++;
1122 }
1123 if (value > pool->sp_space_used_highest)
1124 pool->sp_space_used_highest = value;
1125 } else {
1126 if (value < pool->sp_space_low && pool->sp_space_throttled) {
1127 pool->sp_space_throttled = FALSE;
1128 svc_assign_waiting_sockets(pool);
1129 }
1130 }
1131 }
1132
1133 static bool_t
1134 svc_request_space_available(SVCPOOL *pool)
1135 {
1136
1137 if (pool->sp_space_throttled)
1138 return (FALSE);
1139 return (TRUE);
1140 }
1141
1142 static void
1143 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1144 {
1145 SVCPOOL *pool = grp->sg_pool;
1146 SVCTHREAD *st, *stpref;
1147 SVCXPRT *xprt;
1148 enum xprt_stat stat;
1149 struct svc_req *rqstp;
1150 struct proc *p;
1151 long sz;
1152 int error;
1153
1154 st = mem_alloc(sizeof(*st));
1155 mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1156 st->st_pool = pool;
1157 st->st_xprt = NULL;
1158 STAILQ_INIT(&st->st_reqs);
1159 cv_init(&st->st_cond, "rpcsvc");
1160
1161 mtx_lock(&grp->sg_lock);
1162
1163 /*
1164 * If we are a new thread which was spawned to cope with
1165 * increased load, set the state back to SVCPOOL_ACTIVE.
1166 */
1167 if (grp->sg_state == SVCPOOL_THREADSTARTING)
1168 grp->sg_state = SVCPOOL_ACTIVE;
1169
1170 while (grp->sg_state != SVCPOOL_CLOSING) {
1171 /*
1172 * Create new thread if requested.
1173 */
1174 if (grp->sg_state == SVCPOOL_THREADWANTED) {
1175 grp->sg_state = SVCPOOL_THREADSTARTING;
1176 grp->sg_lastcreatetime = time_uptime;
1177 mtx_unlock(&grp->sg_lock);
1178 svc_new_thread(grp);
1179 mtx_lock(&grp->sg_lock);
1180 continue;
1181 }
1182
1183 /*
1184 * Check for idle transports once per second.
1185 */
1186 if (time_uptime > grp->sg_lastidlecheck) {
1187 grp->sg_lastidlecheck = time_uptime;
1188 svc_checkidle(grp);
1189 }
1190
1191 xprt = st->st_xprt;
1192 if (!xprt) {
1193 /*
1194 * Enforce maxthreads count.
1195 */
1196 if (!ismaster && grp->sg_threadcount >
1197 grp->sg_maxthreads)
1198 break;
1199
1200 /*
1201 * Before sleeping, see if we can find an
1202 * active transport which isn't being serviced
1203 * by a thread.
1204 */
1205 if (svc_request_space_available(pool) &&
1206 (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1207 TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1208 SVC_ACQUIRE(xprt);
1209 xprt->xp_thread = st;
1210 st->st_xprt = xprt;
1211 continue;
1212 }
1213
1214 LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1215 if (ismaster || (!ismaster &&
1216 grp->sg_threadcount > grp->sg_minthreads))
1217 error = cv_timedwait_sig(&st->st_cond,
1218 &grp->sg_lock, 5 * hz);
1219 else
1220 error = cv_wait_sig(&st->st_cond,
1221 &grp->sg_lock);
1222 if (st->st_xprt == NULL)
1223 LIST_REMOVE(st, st_ilink);
1224
1225 /*
1226 * Reduce worker thread count when idle.
1227 */
1228 if (error == EWOULDBLOCK) {
1229 if (!ismaster
1230 && (grp->sg_threadcount
1231 > grp->sg_minthreads)
1232 && !st->st_xprt)
1233 break;
1234 } else if (error != 0) {
1235 KASSERT(error == EINTR || error == ERESTART,
1236 ("non-signal error %d", error));
1237 mtx_unlock(&grp->sg_lock);
1238 p = curproc;
1239 PROC_LOCK(p);
1240 if (P_SHOULDSTOP(p) ||
1241 (p->p_flag & P_TOTAL_STOP) != 0) {
1242 thread_suspend_check(0);
1243 PROC_UNLOCK(p);
1244 mtx_lock(&grp->sg_lock);
1245 } else {
1246 PROC_UNLOCK(p);
1247 svc_exit(pool);
1248 mtx_lock(&grp->sg_lock);
1249 break;
1250 }
1251 }
1252 continue;
1253 }
1254 mtx_unlock(&grp->sg_lock);
1255
1256 /*
1257 * Drain the transport socket and queue up any RPCs.
1258 */
1259 xprt->xp_lastactive = time_uptime;
1260 do {
1261 if (!svc_request_space_available(pool))
1262 break;
1263 rqstp = NULL;
1264 stat = svc_getreq(xprt, &rqstp);
1265 if (rqstp) {
1266 svc_change_space_used(pool, rqstp->rq_size);
1267 /*
1268 * See if the application has a preference
1269 * for some other thread.
1270 */
1271 if (pool->sp_assign) {
1272 stpref = pool->sp_assign(st, rqstp);
1273 rqstp->rq_thread = stpref;
1274 STAILQ_INSERT_TAIL(&stpref->st_reqs,
1275 rqstp, rq_link);
1276 mtx_unlock(&stpref->st_lock);
1277 if (stpref != st)
1278 rqstp = NULL;
1279 } else {
1280 rqstp->rq_thread = st;
1281 STAILQ_INSERT_TAIL(&st->st_reqs,
1282 rqstp, rq_link);
1283 }
1284 }
1285 } while (rqstp == NULL && stat == XPRT_MOREREQS
1286 && grp->sg_state != SVCPOOL_CLOSING);
1287
1288 /*
1289 * Move this transport to the end of the active list to
1290 * ensure fairness when multiple transports are active.
1291 * If this was the last queued request, svc_getreq will end
1292 * up calling xprt_inactive to remove from the active list.
1293 */
1294 mtx_lock(&grp->sg_lock);
1295 xprt->xp_thread = NULL;
1296 st->st_xprt = NULL;
1297 if (xprt->xp_active) {
1298 if (!svc_request_space_available(pool) ||
1299 !xprt_assignthread(xprt))
1300 TAILQ_INSERT_TAIL(&grp->sg_active,
1301 xprt, xp_alink);
1302 }
1303 mtx_unlock(&grp->sg_lock);
1304 SVC_RELEASE(xprt);
1305
1306 /*
1307 * Execute what we have queued.
1308 */
1309 mtx_lock(&st->st_lock);
1310 while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1311 STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1312 mtx_unlock(&st->st_lock);
1313 sz = (long)rqstp->rq_size;
1314 svc_executereq(rqstp);
1315 svc_change_space_used(pool, -sz);
1316 mtx_lock(&st->st_lock);
1317 }
1318 mtx_unlock(&st->st_lock);
1319 mtx_lock(&grp->sg_lock);
1320 }
1321
1322 if (st->st_xprt) {
1323 xprt = st->st_xprt;
1324 st->st_xprt = NULL;
1325 SVC_RELEASE(xprt);
1326 }
1327 KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1328 mtx_destroy(&st->st_lock);
1329 cv_destroy(&st->st_cond);
1330 mem_free(st, sizeof(*st));
1331
1332 grp->sg_threadcount--;
1333 if (!ismaster)
1334 wakeup(grp);
1335 mtx_unlock(&grp->sg_lock);
1336 }
1337
1338 static void
1339 svc_thread_start(void *arg)
1340 {
1341
1342 svc_run_internal((SVCGROUP *) arg, FALSE);
1343 kthread_exit();
1344 }
1345
1346 static void
1347 svc_new_thread(SVCGROUP *grp)
1348 {
1349 SVCPOOL *pool = grp->sg_pool;
1350 struct thread *td;
1351
1352 mtx_lock(&grp->sg_lock);
1353 grp->sg_threadcount++;
1354 mtx_unlock(&grp->sg_lock);
1355 kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1356 "%s: service", pool->sp_name);
1357 }
1358
1359 void
1360 svc_run(SVCPOOL *pool)
1361 {
1362 int g, i;
1363 struct proc *p;
1364 struct thread *td;
1365 SVCGROUP *grp;
1366
1367 p = curproc;
1368 td = curthread;
1369 snprintf(td->td_name, sizeof(td->td_name),
1370 "%s: master", pool->sp_name);
1371 pool->sp_state = SVCPOOL_ACTIVE;
1372 pool->sp_proc = p;
1373
1374 /* Choose group count based on number of threads and CPUs. */
1375 pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1376 min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1377 for (g = 0; g < pool->sp_groupcount; g++) {
1378 grp = &pool->sp_groups[g];
1379 grp->sg_minthreads = max(1,
1380 pool->sp_minthreads / pool->sp_groupcount);
1381 grp->sg_maxthreads = max(1,
1382 pool->sp_maxthreads / pool->sp_groupcount);
1383 grp->sg_lastcreatetime = time_uptime;
1384 }
1385
1386 /* Starting threads */
1387 pool->sp_groups[0].sg_threadcount++;
1388 for (g = 0; g < pool->sp_groupcount; g++) {
1389 grp = &pool->sp_groups[g];
1390 for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1391 svc_new_thread(grp);
1392 }
1393 svc_run_internal(&pool->sp_groups[0], TRUE);
1394
1395 /* Waiting for threads to stop. */
1396 for (g = 0; g < pool->sp_groupcount; g++) {
1397 grp = &pool->sp_groups[g];
1398 mtx_lock(&grp->sg_lock);
1399 while (grp->sg_threadcount > 0)
1400 msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1401 mtx_unlock(&grp->sg_lock);
1402 }
1403 }
1404
1405 void
1406 svc_exit(SVCPOOL *pool)
1407 {
1408 SVCGROUP *grp;
1409 SVCTHREAD *st;
1410 int g;
1411
1412 pool->sp_state = SVCPOOL_CLOSING;
1413 for (g = 0; g < pool->sp_groupcount; g++) {
1414 grp = &pool->sp_groups[g];
1415 mtx_lock(&grp->sg_lock);
1416 if (grp->sg_state != SVCPOOL_CLOSING) {
1417 grp->sg_state = SVCPOOL_CLOSING;
1418 LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1419 cv_signal(&st->st_cond);
1420 }
1421 mtx_unlock(&grp->sg_lock);
1422 }
1423 }
1424
1425 bool_t
1426 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1427 {
1428 struct mbuf *m;
1429 XDR xdrs;
1430 bool_t stat;
1431
1432 m = rqstp->rq_args;
1433 rqstp->rq_args = NULL;
1434
1435 xdrmbuf_create(&xdrs, m, XDR_DECODE);
1436 stat = xargs(&xdrs, args);
1437 XDR_DESTROY(&xdrs);
1438
1439 return (stat);
1440 }
1441
1442 bool_t
1443 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1444 {
1445 XDR xdrs;
1446
1447 if (rqstp->rq_addr) {
1448 free(rqstp->rq_addr, M_SONAME);
1449 rqstp->rq_addr = NULL;
1450 }
1451
1452 xdrs.x_op = XDR_FREE;
1453 return (xargs(&xdrs, args));
1454 }
1455
1456 void
1457 svc_freereq(struct svc_req *rqstp)
1458 {
1459 SVCTHREAD *st;
1460 SVCPOOL *pool;
1461
1462 st = rqstp->rq_thread;
1463 if (st) {
1464 pool = st->st_pool;
1465 if (pool->sp_done)
1466 pool->sp_done(st, rqstp);
1467 }
1468
1469 if (rqstp->rq_auth.svc_ah_ops)
1470 SVCAUTH_RELEASE(&rqstp->rq_auth);
1471
1472 if (rqstp->rq_xprt) {
1473 SVC_RELEASE(rqstp->rq_xprt);
1474 }
1475
1476 if (rqstp->rq_addr)
1477 free(rqstp->rq_addr, M_SONAME);
1478
1479 if (rqstp->rq_args)
1480 m_freem(rqstp->rq_args);
1481
1482 free(rqstp, M_RPC);
1483 }
Cache object: 702da0d958d40db14df49edbbd89ee5c
|