FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_vc.c
1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3 /*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD: releng/7.3/sys/rpc/clnt_vc.c 181328 2008-08-05 10:35:51Z dfr $");
39
40 /*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59 #include <sys/param.h>
60 #include <sys/systm.h>
61 #include <sys/lock.h>
62 #include <sys/malloc.h>
63 #include <sys/mbuf.h>
64 #include <sys/mutex.h>
65 #include <sys/pcpu.h>
66 #include <sys/proc.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/syslog.h>
70 #include <sys/time.h>
71 #include <sys/uio.h>
72
73 #include <rpc/rpc.h>
74 #include <rpc/rpc_com.h>
75
76 #define MCALL_MSG_SIZE 24
77
78 struct cmessage {
79 struct cmsghdr cmsg;
80 struct cmsgcred cmcred;
81 };
82
83 static enum clnt_stat clnt_vc_call(CLIENT *, struct rpc_callextra *,
84 rpcproc_t, xdrproc_t, void *, xdrproc_t, void *, struct timeval);
85 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87 static void clnt_vc_abort(CLIENT *);
88 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89 static void clnt_vc_destroy(CLIENT *);
90 static bool_t time_not_ok(struct timeval *);
91 static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
92
93 static struct clnt_ops clnt_vc_ops = {
94 .cl_call = clnt_vc_call,
95 .cl_abort = clnt_vc_abort,
96 .cl_geterr = clnt_vc_geterr,
97 .cl_freeres = clnt_vc_freeres,
98 .cl_destroy = clnt_vc_destroy,
99 .cl_control = clnt_vc_control
100 };
101
102 /*
103 * A pending RPC request which awaits a reply. Requests which have
104 * received their reply will have cr_xid set to zero and cr_mrep to
105 * the mbuf chain of the reply.
106 */
107 struct ct_request {
108 TAILQ_ENTRY(ct_request) cr_link;
109 uint32_t cr_xid; /* XID of request */
110 struct mbuf *cr_mrep; /* reply received by upcall */
111 int cr_error; /* any error from upcall */
112 };
113
114 TAILQ_HEAD(ct_request_list, ct_request);
115
116 struct ct_data {
117 struct mtx ct_lock;
118 int ct_threads; /* number of threads in clnt_vc_call */
119 bool_t ct_closing; /* TRUE if we are destroying client */
120 struct socket *ct_socket; /* connection socket */
121 bool_t ct_closeit; /* close it on destroy */
122 struct timeval ct_wait; /* wait interval in milliseconds */
123 struct sockaddr_storage ct_addr; /* remote addr */
124 struct rpc_err ct_error;
125 uint32_t ct_xid;
126 char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
127 size_t ct_mpos; /* pos after marshal */
128 const char *ct_waitchan;
129 int ct_waitflag;
130 struct mbuf *ct_record; /* current reply record */
131 size_t ct_record_resid; /* how much left of reply to read */
132 bool_t ct_record_eor; /* true if reading last fragment */
133 struct ct_request_list ct_pending;
134 };
135
136 static const char clnt_vc_errstr[] = "%s : %s";
137 static const char clnt_vc_str[] = "clnt_vc_create";
138 static const char clnt_read_vc_str[] = "read_vc";
139 static const char __no_mem_str[] = "out of memory";
140
141 /*
142 * Create a client handle for a connection.
143 * Default options are set, which the user can change using clnt_control()'s.
144 * The rpc/vc package does buffering similar to stdio, so the client
145 * must pick send and receive buffer sizes, 0 => use the default.
146 * NB: fd is copied into a private area.
147 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
148 * set this something more useful.
149 *
150 * fd should be an open socket
151 */
152 CLIENT *
153 clnt_vc_create(
154 struct socket *so, /* open file descriptor */
155 struct sockaddr *raddr, /* servers address */
156 const rpcprog_t prog, /* program number */
157 const rpcvers_t vers, /* version number */
158 size_t sendsz, /* buffer recv size */
159 size_t recvsz) /* buffer send size */
160 {
161 CLIENT *cl; /* client handle */
162 struct ct_data *ct = NULL; /* client handle */
163 struct timeval now;
164 struct rpc_msg call_msg;
165 static uint32_t disrupt;
166 struct __rpc_sockinfo si;
167 XDR xdrs;
168 int error, interrupted;
169
170 if (disrupt == 0)
171 disrupt = (uint32_t)(long)raddr;
172
173 cl = (CLIENT *)mem_alloc(sizeof (*cl));
174 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
175
176 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
177 ct->ct_threads = 0;
178 ct->ct_closing = FALSE;
179
180 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
181 error = soconnect(so, raddr, curthread);
182 SOCK_LOCK(so);
183 interrupted = 0;
184 while ((so->so_state & SS_ISCONNECTING)
185 && so->so_error == 0) {
186 error = msleep(&so->so_timeo, SOCK_MTX(so),
187 PSOCK | PCATCH, "connec", 0);
188 if (error) {
189 if (error == EINTR || error == ERESTART)
190 interrupted = 1;
191 break;
192 }
193 }
194 if (error == 0) {
195 error = so->so_error;
196 so->so_error = 0;
197 }
198 SOCK_UNLOCK(so);
199 if (error) {
200 if (!interrupted)
201 so->so_state &= ~SS_ISCONNECTING;
202 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
203 rpc_createerr.cf_error.re_errno = error;
204 goto err;
205 }
206 }
207
208 if (!__rpc_socket2sockinfo(so, &si))
209 goto err;
210
211 ct->ct_closeit = FALSE;
212
213 /*
214 * Set up private data struct
215 */
216 ct->ct_socket = so;
217 ct->ct_wait.tv_sec = -1;
218 ct->ct_wait.tv_usec = -1;
219 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
220
221 /*
222 * Initialize call message
223 */
224 getmicrotime(&now);
225 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
226 call_msg.rm_xid = ct->ct_xid;
227 call_msg.rm_direction = CALL;
228 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
229 call_msg.rm_call.cb_prog = (uint32_t)prog;
230 call_msg.rm_call.cb_vers = (uint32_t)vers;
231
232 /*
233 * pre-serialize the static part of the call msg and stash it away
234 */
235 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
236 XDR_ENCODE);
237 if (! xdr_callhdr(&xdrs, &call_msg)) {
238 if (ct->ct_closeit) {
239 soclose(ct->ct_socket);
240 }
241 goto err;
242 }
243 ct->ct_mpos = XDR_GETPOS(&xdrs);
244 XDR_DESTROY(&xdrs);
245 ct->ct_waitchan = "rpcrecv";
246 ct->ct_waitflag = 0;
247
248 /*
249 * Create a client handle which uses xdrrec for serialization
250 * and authnone for authentication.
251 */
252 cl->cl_refs = 1;
253 cl->cl_ops = &clnt_vc_ops;
254 cl->cl_private = ct;
255 cl->cl_auth = authnone_create();
256 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
257 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
258
259 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
260 ct->ct_socket->so_upcallarg = ct;
261 ct->ct_socket->so_upcall = clnt_vc_soupcall;
262 ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
263 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
264
265 ct->ct_record = NULL;
266 ct->ct_record_resid = 0;
267 TAILQ_INIT(&ct->ct_pending);
268 return (cl);
269
270 err:
271 if (cl) {
272 if (ct) {
273 mem_free(ct, sizeof (struct ct_data));
274 }
275 if (cl)
276 mem_free(cl, sizeof (CLIENT));
277 }
278 return ((CLIENT *)NULL);
279 }
280
281 static enum clnt_stat
282 clnt_vc_call(
283 CLIENT *cl,
284 struct rpc_callextra *ext,
285 rpcproc_t proc,
286 xdrproc_t xdr_args,
287 void *args_ptr,
288 xdrproc_t xdr_results,
289 void *results_ptr,
290 struct timeval utimeout)
291 {
292 struct ct_data *ct = (struct ct_data *) cl->cl_private;
293 AUTH *auth;
294 XDR xdrs;
295 struct rpc_msg reply_msg;
296 bool_t ok;
297 int nrefreshes = 2; /* number of times to refresh cred */
298 struct timeval timeout;
299 uint32_t xid;
300 struct mbuf *mreq = NULL;
301 struct ct_request *cr;
302 int error;
303
304 cr = malloc(sizeof(struct ct_request), M_RPC, M_WAITOK);
305
306 mtx_lock(&ct->ct_lock);
307
308 if (ct->ct_closing) {
309 mtx_unlock(&ct->ct_lock);
310 free(cr, M_RPC);
311 return (RPC_CANTSEND);
312 }
313 ct->ct_threads++;
314
315 if (ext)
316 auth = ext->rc_auth;
317 else
318 auth = cl->cl_auth;
319
320 cr->cr_mrep = NULL;
321 cr->cr_error = 0;
322
323 if (ct->ct_wait.tv_usec == -1) {
324 timeout = utimeout; /* use supplied timeout */
325 } else {
326 timeout = ct->ct_wait; /* use default timeout */
327 }
328
329 call_again:
330 mtx_assert(&ct->ct_lock, MA_OWNED);
331
332 ct->ct_xid++;
333 xid = ct->ct_xid;
334
335 mtx_unlock(&ct->ct_lock);
336
337 /*
338 * Leave space to pre-pend the record mark.
339 */
340 MGETHDR(mreq, M_WAIT, MT_DATA);
341 MCLGET(mreq, M_WAIT);
342 mreq->m_len = 0;
343 mreq->m_data += sizeof(uint32_t);
344 m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
345
346 /*
347 * The XID is the first thing in the request.
348 */
349 *mtod(mreq, uint32_t *) = htonl(xid);
350
351 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
352
353 ct->ct_error.re_status = RPC_SUCCESS;
354
355 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
356 (! AUTH_MARSHALL(auth, &xdrs)) ||
357 (! (*xdr_args)(&xdrs, args_ptr))) {
358 if (ct->ct_error.re_status == RPC_SUCCESS)
359 ct->ct_error.re_status = RPC_CANTENCODEARGS;
360 mtx_lock(&ct->ct_lock);
361 goto out;
362 }
363 m_fixhdr(mreq);
364
365 /*
366 * Prepend a record marker containing the packet length.
367 */
368 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
369 *mtod(mreq, uint32_t *) =
370 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
371
372 cr->cr_xid = xid;
373 mtx_lock(&ct->ct_lock);
374 TAILQ_INSERT_TAIL(&ct->ct_pending, cr, cr_link);
375 mtx_unlock(&ct->ct_lock);
376
377 /*
378 * sosend consumes mreq.
379 */
380 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
381 mreq = NULL;
382
383 reply_msg.acpted_rply.ar_verf = _null_auth;
384 reply_msg.acpted_rply.ar_results.where = results_ptr;
385 reply_msg.acpted_rply.ar_results.proc = xdr_results;
386
387 mtx_lock(&ct->ct_lock);
388 if (error) {
389 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
390 ct->ct_error.re_errno = error;
391 ct->ct_error.re_status = RPC_CANTSEND;
392 goto out;
393 }
394
395 /*
396 * Check to see if we got an upcall while waiting for the
397 * lock. In both these cases, the request has been removed
398 * from ct->ct_pending.
399 */
400 if (cr->cr_error) {
401 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
402 ct->ct_error.re_errno = cr->cr_error;
403 ct->ct_error.re_status = RPC_CANTRECV;
404 goto out;
405 }
406 if (cr->cr_mrep) {
407 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
408 goto got_reply;
409 }
410
411 /*
412 * Hack to provide rpc-based message passing
413 */
414 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
415 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
416 ct->ct_error.re_status = RPC_TIMEDOUT;
417 goto out;
418 }
419
420 error = msleep(cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
421 tvtohz(&timeout));
422
423 TAILQ_REMOVE(&ct->ct_pending, cr, cr_link);
424
425 if (error) {
426 /*
427 * The sleep returned an error so our request is still
428 * on the list. Turn the error code into an
429 * appropriate client status.
430 */
431 ct->ct_error.re_errno = error;
432 switch (error) {
433 case EINTR:
434 ct->ct_error.re_status = RPC_INTR;
435 break;
436 case EWOULDBLOCK:
437 ct->ct_error.re_status = RPC_TIMEDOUT;
438 break;
439 default:
440 ct->ct_error.re_status = RPC_CANTRECV;
441 }
442 goto out;
443 } else {
444 /*
445 * We were woken up by the upcall. If the
446 * upcall had a receive error, report that,
447 * otherwise we have a reply.
448 */
449 if (cr->cr_error) {
450 ct->ct_error.re_errno = cr->cr_error;
451 ct->ct_error.re_status = RPC_CANTRECV;
452 goto out;
453 }
454 }
455
456 got_reply:
457 /*
458 * Now decode and validate the response. We need to drop the
459 * lock since xdr_replymsg may end up sleeping in malloc.
460 */
461 mtx_unlock(&ct->ct_lock);
462
463 xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
464 ok = xdr_replymsg(&xdrs, &reply_msg);
465 XDR_DESTROY(&xdrs);
466 cr->cr_mrep = NULL;
467
468 mtx_lock(&ct->ct_lock);
469
470 if (ok) {
471 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
472 (reply_msg.acpted_rply.ar_stat == SUCCESS))
473 ct->ct_error.re_status = RPC_SUCCESS;
474 else
475 _seterr_reply(&reply_msg, &(ct->ct_error));
476
477 if (ct->ct_error.re_status == RPC_SUCCESS) {
478 if (! AUTH_VALIDATE(cl->cl_auth,
479 &reply_msg.acpted_rply.ar_verf)) {
480 ct->ct_error.re_status = RPC_AUTHERROR;
481 ct->ct_error.re_why = AUTH_INVALIDRESP;
482 }
483 if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
484 xdrs.x_op = XDR_FREE;
485 (void) xdr_opaque_auth(&xdrs,
486 &(reply_msg.acpted_rply.ar_verf));
487 }
488 } /* end successful completion */
489 /*
490 * If unsuccesful AND error is an authentication error
491 * then refresh credentials and try again, else break
492 */
493 else if (ct->ct_error.re_status == RPC_AUTHERROR)
494 /* maybe our credentials need to be refreshed ... */
495 if (nrefreshes > 0 &&
496 AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
497 nrefreshes--;
498 goto call_again;
499 }
500 /* end of unsuccessful completion */
501 } /* end of valid reply message */
502 else {
503 ct->ct_error.re_status = RPC_CANTDECODERES;
504 }
505 out:
506 mtx_assert(&ct->ct_lock, MA_OWNED);
507
508 if (mreq)
509 m_freem(mreq);
510 if (cr->cr_mrep)
511 m_freem(cr->cr_mrep);
512
513 ct->ct_threads--;
514 if (ct->ct_closing)
515 wakeup(ct);
516
517 mtx_unlock(&ct->ct_lock);
518
519 free(cr, M_RPC);
520
521 return (ct->ct_error.re_status);
522 }
523
524 static void
525 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
526 {
527 struct ct_data *ct = (struct ct_data *) cl->cl_private;
528
529 *errp = ct->ct_error;
530 }
531
532 static bool_t
533 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
534 {
535 XDR xdrs;
536 bool_t dummy;
537
538 xdrs.x_op = XDR_FREE;
539 dummy = (*xdr_res)(&xdrs, res_ptr);
540
541 return (dummy);
542 }
543
544 /*ARGSUSED*/
545 static void
546 clnt_vc_abort(CLIENT *cl)
547 {
548 }
549
550 static bool_t
551 clnt_vc_control(CLIENT *cl, u_int request, void *info)
552 {
553 struct ct_data *ct = (struct ct_data *)cl->cl_private;
554 void *infop = info;
555
556 mtx_lock(&ct->ct_lock);
557
558 switch (request) {
559 case CLSET_FD_CLOSE:
560 ct->ct_closeit = TRUE;
561 mtx_unlock(&ct->ct_lock);
562 return (TRUE);
563 case CLSET_FD_NCLOSE:
564 ct->ct_closeit = FALSE;
565 mtx_unlock(&ct->ct_lock);
566 return (TRUE);
567 default:
568 break;
569 }
570
571 /* for other requests which use info */
572 if (info == NULL) {
573 mtx_unlock(&ct->ct_lock);
574 return (FALSE);
575 }
576 switch (request) {
577 case CLSET_TIMEOUT:
578 if (time_not_ok((struct timeval *)info)) {
579 mtx_unlock(&ct->ct_lock);
580 return (FALSE);
581 }
582 ct->ct_wait = *(struct timeval *)infop;
583 break;
584 case CLGET_TIMEOUT:
585 *(struct timeval *)infop = ct->ct_wait;
586 break;
587 case CLGET_SERVER_ADDR:
588 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
589 break;
590 case CLGET_SVC_ADDR:
591 /*
592 * Slightly different semantics to userland - we use
593 * sockaddr instead of netbuf.
594 */
595 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
596 break;
597 case CLSET_SVC_ADDR: /* set to new address */
598 mtx_unlock(&ct->ct_lock);
599 return (FALSE);
600 case CLGET_XID:
601 *(uint32_t *)info = ct->ct_xid;
602 break;
603 case CLSET_XID:
604 /* This will set the xid of the NEXT call */
605 /* decrement by 1 as clnt_vc_call() increments once */
606 ct->ct_xid = *(uint32_t *)info - 1;
607 break;
608 case CLGET_VERS:
609 /*
610 * This RELIES on the information that, in the call body,
611 * the version number field is the fifth field from the
612 * begining of the RPC header. MUST be changed if the
613 * call_struct is changed
614 */
615 *(uint32_t *)info =
616 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
617 4 * BYTES_PER_XDR_UNIT));
618 break;
619
620 case CLSET_VERS:
621 *(uint32_t *)(void *)(ct->ct_mcallc +
622 4 * BYTES_PER_XDR_UNIT) =
623 htonl(*(uint32_t *)info);
624 break;
625
626 case CLGET_PROG:
627 /*
628 * This RELIES on the information that, in the call body,
629 * the program number field is the fourth field from the
630 * begining of the RPC header. MUST be changed if the
631 * call_struct is changed
632 */
633 *(uint32_t *)info =
634 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
635 3 * BYTES_PER_XDR_UNIT));
636 break;
637
638 case CLSET_PROG:
639 *(uint32_t *)(void *)(ct->ct_mcallc +
640 3 * BYTES_PER_XDR_UNIT) =
641 htonl(*(uint32_t *)info);
642 break;
643
644 case CLSET_WAITCHAN:
645 ct->ct_waitchan = *(const char **)info;
646 break;
647
648 case CLGET_WAITCHAN:
649 *(const char **) info = ct->ct_waitchan;
650 break;
651
652 case CLSET_INTERRUPTIBLE:
653 if (*(int *) info)
654 ct->ct_waitflag = PCATCH;
655 else
656 ct->ct_waitflag = 0;
657 break;
658
659 case CLGET_INTERRUPTIBLE:
660 if (ct->ct_waitflag)
661 *(int *) info = TRUE;
662 else
663 *(int *) info = FALSE;
664 break;
665
666 default:
667 mtx_unlock(&ct->ct_lock);
668 return (FALSE);
669 }
670
671 mtx_unlock(&ct->ct_lock);
672 return (TRUE);
673 }
674
675 static void
676 clnt_vc_destroy(CLIENT *cl)
677 {
678 struct ct_data *ct = (struct ct_data *) cl->cl_private;
679 struct ct_request *cr;
680 struct socket *so = NULL;
681
682 mtx_lock(&ct->ct_lock);
683
684 if (ct->ct_socket) {
685 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
686 ct->ct_socket->so_upcallarg = NULL;
687 ct->ct_socket->so_upcall = NULL;
688 ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
689 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
690
691 /*
692 * Abort any pending requests and wait until everyone
693 * has finished with clnt_vc_call.
694 */
695 ct->ct_closing = TRUE;
696 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
697 cr->cr_xid = 0;
698 cr->cr_error = ESHUTDOWN;
699 wakeup(cr);
700 }
701
702 while (ct->ct_threads)
703 msleep(ct, &ct->ct_lock, 0, "rpcclose", 0);
704
705 if (ct->ct_closeit) {
706 so = ct->ct_socket;
707 }
708 }
709
710 mtx_unlock(&ct->ct_lock);
711
712 mtx_destroy(&ct->ct_lock);
713 if (so) {
714 soshutdown(so, SHUT_WR);
715 soclose(so);
716 }
717 mem_free(ct, sizeof(struct ct_data));
718 mem_free(cl, sizeof(CLIENT));
719 }
720
721 /*
722 * Make sure that the time is not garbage. -1 value is disallowed.
723 * Note this is different from time_not_ok in clnt_dg.c
724 */
725 static bool_t
726 time_not_ok(struct timeval *t)
727 {
728 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
729 t->tv_usec <= -1 || t->tv_usec > 1000000);
730 }
731
732 void
733 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
734 {
735 struct ct_data *ct = (struct ct_data *) arg;
736 struct uio uio;
737 struct mbuf *m;
738 struct ct_request *cr;
739 int error, rcvflag, foundreq;
740 uint32_t xid, header;
741
742 uio.uio_td = curthread;
743 do {
744 /*
745 * If ct_record_resid is zero, we are waiting for a
746 * record mark.
747 */
748 if (ct->ct_record_resid == 0) {
749 bool_t do_read;
750
751 /*
752 * Make sure there is either a whole record
753 * mark in the buffer or there is some other
754 * error condition
755 */
756 do_read = FALSE;
757 SOCKBUF_LOCK(&so->so_rcv);
758 if (so->so_rcv.sb_cc >= sizeof(uint32_t)
759 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
760 || so->so_error)
761 do_read = TRUE;
762 SOCKBUF_UNLOCK(&so->so_rcv);
763
764 if (!do_read)
765 return;
766
767 uio.uio_resid = sizeof(uint32_t);
768 m = NULL;
769 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
770 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
771
772 if (error == EWOULDBLOCK)
773 break;
774
775 /*
776 * If there was an error, wake up all pending
777 * requests.
778 */
779 if (error || uio.uio_resid > 0) {
780 wakeup_all:
781 mtx_lock(&ct->ct_lock);
782 if (!error) {
783 /*
784 * We must have got EOF trying
785 * to read from the stream.
786 */
787 error = ECONNRESET;
788 }
789 ct->ct_error.re_status = RPC_CANTRECV;
790 ct->ct_error.re_errno = error;
791 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
792 cr->cr_error = error;
793 wakeup(cr);
794 }
795 mtx_unlock(&ct->ct_lock);
796 break;
797 }
798 memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
799 header = ntohl(header);
800 ct->ct_record = NULL;
801 ct->ct_record_resid = header & 0x7fffffff;
802 ct->ct_record_eor = ((header & 0x80000000) != 0);
803 m_freem(m);
804 } else {
805 /*
806 * We have the record mark. Read as much as
807 * the socket has buffered up to the end of
808 * this record.
809 */
810 uio.uio_resid = ct->ct_record_resid;
811 m = NULL;
812 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
813 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
814
815 if (error == EWOULDBLOCK)
816 break;
817
818 if (error || uio.uio_resid == ct->ct_record_resid)
819 goto wakeup_all;
820
821 /*
822 * If we have part of the record already,
823 * chain this bit onto the end.
824 */
825 if (ct->ct_record)
826 m_last(ct->ct_record)->m_next = m;
827 else
828 ct->ct_record = m;
829
830 ct->ct_record_resid = uio.uio_resid;
831
832 /*
833 * If we have the entire record, see if we can
834 * match it to a request.
835 */
836 if (ct->ct_record_resid == 0
837 && ct->ct_record_eor) {
838 /*
839 * The XID is in the first uint32_t of
840 * the reply.
841 */
842 ct->ct_record =
843 m_pullup(ct->ct_record, sizeof(xid));
844 if (!ct->ct_record)
845 break;
846 memcpy(&xid,
847 mtod(ct->ct_record, uint32_t *),
848 sizeof(uint32_t));
849 xid = ntohl(xid);
850
851 mtx_lock(&ct->ct_lock);
852 foundreq = 0;
853 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
854 if (cr->cr_xid == xid) {
855 /*
856 * This one
857 * matches. We leave
858 * the reply mbuf in
859 * cr->cr_mrep. Set
860 * the XID to zero so
861 * that we will ignore
862 * any duplicaed
863 * replies.
864 */
865 cr->cr_xid = 0;
866 cr->cr_mrep = ct->ct_record;
867 cr->cr_error = 0;
868 foundreq = 1;
869 wakeup(cr);
870 break;
871 }
872 }
873 mtx_unlock(&ct->ct_lock);
874
875 if (!foundreq)
876 m_freem(ct->ct_record);
877 ct->ct_record = NULL;
878 }
879 }
880 } while (m);
881 }
Cache object: 313545972ff70bc23358705d4ab4f394
|