FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_vc.c
1 /* $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $ */
2
3 /*
4 * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5 * unrestricted use provided that this legend is included on all tape
6 * media and as a part of the software program in whole or part. Users
7 * may copy or modify Sun RPC without charge, but are not authorized
8 * to license or distribute it to anyone else except as part of a product or
9 * program developed by the user.
10 *
11 * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12 * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13 * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14 *
15 * Sun RPC is provided with no support and without any obligation on the
16 * part of Sun Microsystems, Inc. to assist in its use, correction,
17 * modification or enhancement.
18 *
19 * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20 * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21 * OR ANY PART THEREOF.
22 *
23 * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24 * or profits or other special, indirect and consequential damages, even if
25 * Sun has been advised of the possibility of such damages.
26 *
27 * Sun Microsystems, Inc.
28 * 2550 Garcia Avenue
29 * Mountain View, California 94043
30 */
31
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c 2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39
40 /*
41 * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42 *
43 * Copyright (C) 1984, Sun Microsystems, Inc.
44 *
45 * TCP based RPC supports 'batched calls'.
46 * A sequence of calls may be batched-up in a send buffer. The rpc call
47 * return immediately to the client even though the call was not necessarily
48 * sent. The batching occurs if the results' xdr routine is NULL (0) AND
49 * the rpc timeout value is zero (see clnt.h, rpc).
50 *
51 * Clients should NOT casually batch calls that in fact return results; that is,
52 * the server side should be aware that a call is batched and not produce any
53 * return message. Batched calls that produce many result messages can
54 * deadlock (netlock) the client and the server....
55 *
56 * Now go hang yourself.
57 */
58
59 #include <sys/param.h>
60 #include <sys/systm.h>
61 #include <sys/lock.h>
62 #include <sys/malloc.h>
63 #include <sys/mbuf.h>
64 #include <sys/mutex.h>
65 #include <sys/pcpu.h>
66 #include <sys/proc.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/syslog.h>
70 #include <sys/time.h>
71 #include <sys/uio.h>
72
73 #include <rpc/rpc.h>
74 #include <rpc/rpc_com.h>
75
76 #define MCALL_MSG_SIZE 24
77
78 struct cmessage {
79 struct cmsghdr cmsg;
80 struct cmsgcred cmcred;
81 };
82
83 static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
84 xdrproc_t, void *, struct timeval);
85 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87 static void clnt_vc_abort(CLIENT *);
88 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89 static void clnt_vc_destroy(CLIENT *);
90 static bool_t time_not_ok(struct timeval *);
91 static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
92
93 static struct clnt_ops clnt_vc_ops = {
94 .cl_call = clnt_vc_call,
95 .cl_abort = clnt_vc_abort,
96 .cl_geterr = clnt_vc_geterr,
97 .cl_freeres = clnt_vc_freeres,
98 .cl_destroy = clnt_vc_destroy,
99 .cl_control = clnt_vc_control
100 };
101
102 /*
103 * A pending RPC request which awaits a reply.
104 */
105 struct ct_request {
106 TAILQ_ENTRY(ct_request) cr_link;
107 uint32_t cr_xid; /* XID of request */
108 struct mbuf *cr_mrep; /* reply received by upcall */
109 int cr_error; /* any error from upcall */
110 };
111
112 TAILQ_HEAD(ct_request_list, ct_request);
113
114 struct ct_data {
115 struct mtx ct_lock;
116 struct socket *ct_socket; /* connection socket */
117 bool_t ct_closeit; /* close it on destroy */
118 struct timeval ct_wait; /* wait interval in milliseconds */
119 struct sockaddr_storage ct_addr; /* remote addr */
120 struct rpc_err ct_error;
121 uint32_t ct_xid;
122 char ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
123 size_t ct_mpos; /* pos after marshal */
124 const char *ct_waitchan;
125 int ct_waitflag;
126 struct mbuf *ct_record; /* current reply record */
127 size_t ct_record_resid; /* how much left of reply to read */
128 bool_t ct_record_eor; /* true if reading last fragment */
129 struct ct_request_list ct_pending;
130 };
131
132 static const char clnt_vc_errstr[] = "%s : %s";
133 static const char clnt_vc_str[] = "clnt_vc_create";
134 static const char clnt_read_vc_str[] = "read_vc";
135 static const char __no_mem_str[] = "out of memory";
136
137 /*
138 * Create a client handle for a connection.
139 * Default options are set, which the user can change using clnt_control()'s.
140 * The rpc/vc package does buffering similar to stdio, so the client
141 * must pick send and receive buffer sizes, 0 => use the default.
142 * NB: fd is copied into a private area.
143 * NB: The rpch->cl_auth is set null authentication. Caller may wish to
144 * set this something more useful.
145 *
146 * fd should be an open socket
147 */
148 CLIENT *
149 clnt_vc_create(
150 struct socket *so, /* open file descriptor */
151 struct sockaddr *raddr, /* servers address */
152 const rpcprog_t prog, /* program number */
153 const rpcvers_t vers, /* version number */
154 size_t sendsz, /* buffer recv size */
155 size_t recvsz) /* buffer send size */
156 {
157 CLIENT *cl; /* client handle */
158 struct ct_data *ct = NULL; /* client handle */
159 struct timeval now;
160 struct rpc_msg call_msg;
161 static uint32_t disrupt;
162 struct __rpc_sockinfo si;
163 XDR xdrs;
164 int error;
165
166 if (disrupt == 0)
167 disrupt = (uint32_t)(long)raddr;
168
169 cl = (CLIENT *)mem_alloc(sizeof (*cl));
170 ct = (struct ct_data *)mem_alloc(sizeof (*ct));
171
172 mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
173
174 if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
175 error = soconnect(so, raddr, curthread);
176 if (error) {
177 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
178 rpc_createerr.cf_error.re_errno = error;
179 goto err;
180 }
181 }
182
183 if (!__rpc_socket2sockinfo(so, &si))
184 goto err;
185
186 ct->ct_closeit = FALSE;
187
188 /*
189 * Set up private data struct
190 */
191 ct->ct_socket = so;
192 ct->ct_wait.tv_sec = -1;
193 ct->ct_wait.tv_usec = -1;
194 memcpy(&ct->ct_addr, raddr, raddr->sa_len);
195
196 /*
197 * Initialize call message
198 */
199 getmicrotime(&now);
200 ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
201 call_msg.rm_xid = ct->ct_xid;
202 call_msg.rm_direction = CALL;
203 call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
204 call_msg.rm_call.cb_prog = (uint32_t)prog;
205 call_msg.rm_call.cb_vers = (uint32_t)vers;
206
207 /*
208 * pre-serialize the static part of the call msg and stash it away
209 */
210 xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
211 XDR_ENCODE);
212 if (! xdr_callhdr(&xdrs, &call_msg)) {
213 if (ct->ct_closeit) {
214 soclose(ct->ct_socket);
215 }
216 goto err;
217 }
218 ct->ct_mpos = XDR_GETPOS(&xdrs);
219 XDR_DESTROY(&xdrs);
220 ct->ct_waitchan = "rpcrecv";
221 ct->ct_waitflag = 0;
222
223 /*
224 * Create a client handle which uses xdrrec for serialization
225 * and authnone for authentication.
226 */
227 cl->cl_ops = &clnt_vc_ops;
228 cl->cl_private = ct;
229 cl->cl_auth = authnone_create();
230 sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
231 recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
232
233 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
234 ct->ct_socket->so_upcallarg = ct;
235 ct->ct_socket->so_upcall = clnt_vc_soupcall;
236 ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
237 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
238
239 ct->ct_record = NULL;
240 ct->ct_record_resid = 0;
241 TAILQ_INIT(&ct->ct_pending);
242 return (cl);
243
244 err:
245 if (cl) {
246 if (ct) {
247 mem_free(ct, sizeof (struct ct_data));
248 }
249 if (cl)
250 mem_free(cl, sizeof (CLIENT));
251 }
252 return ((CLIENT *)NULL);
253 }
254
255 static enum clnt_stat
256 clnt_vc_call(
257 CLIENT *cl,
258 rpcproc_t proc,
259 xdrproc_t xdr_args,
260 void *args_ptr,
261 xdrproc_t xdr_results,
262 void *results_ptr,
263 struct timeval utimeout)
264 {
265 struct ct_data *ct = (struct ct_data *) cl->cl_private;
266 XDR xdrs;
267 struct rpc_msg reply_msg;
268 bool_t ok;
269 int nrefreshes = 2; /* number of times to refresh cred */
270 struct timeval timeout;
271 uint32_t xid;
272 struct mbuf *mreq = NULL;
273 struct ct_request cr;
274 int error;
275
276 mtx_lock(&ct->ct_lock);
277
278 cr.cr_mrep = NULL;
279 cr.cr_error = 0;
280
281 if (ct->ct_wait.tv_usec == -1) {
282 timeout = utimeout; /* use supplied timeout */
283 } else {
284 timeout = ct->ct_wait; /* use default timeout */
285 }
286
287 call_again:
288 mtx_assert(&ct->ct_lock, MA_OWNED);
289
290 ct->ct_xid++;
291 xid = ct->ct_xid;
292
293 mtx_unlock(&ct->ct_lock);
294
295 /*
296 * Leave space to pre-pend the record mark.
297 */
298 MGETHDR(mreq, M_WAIT, MT_DATA);
299 MCLGET(mreq, M_WAIT);
300 mreq->m_len = 0;
301 mreq->m_data += sizeof(uint32_t);
302 m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
303
304 /*
305 * The XID is the first thing in the request.
306 */
307 *mtod(mreq, uint32_t *) = htonl(xid);
308
309 xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
310
311 ct->ct_error.re_status = RPC_SUCCESS;
312
313 if ((! XDR_PUTINT32(&xdrs, &proc)) ||
314 (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
315 (! (*xdr_args)(&xdrs, args_ptr))) {
316 if (ct->ct_error.re_status == RPC_SUCCESS)
317 ct->ct_error.re_status = RPC_CANTENCODEARGS;
318 m_freem(mreq);
319 return (ct->ct_error.re_status);
320 }
321 m_fixhdr(mreq);
322
323 /*
324 * Prepend a record marker containing the packet length.
325 */
326 M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
327 *mtod(mreq, uint32_t *) =
328 htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
329
330 cr.cr_xid = xid;
331 mtx_lock(&ct->ct_lock);
332 TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
333 mtx_unlock(&ct->ct_lock);
334
335 /*
336 * sosend consumes mreq.
337 */
338 error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
339 mreq = NULL;
340
341 reply_msg.acpted_rply.ar_verf = _null_auth;
342 reply_msg.acpted_rply.ar_results.where = results_ptr;
343 reply_msg.acpted_rply.ar_results.proc = xdr_results;
344
345 mtx_lock(&ct->ct_lock);
346
347 if (error) {
348 TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
349
350 ct->ct_error.re_errno = error;
351 ct->ct_error.re_status = RPC_CANTSEND;
352 goto out;
353 }
354
355 /*
356 * Check to see if we got an upcall while waiting for the
357 * lock. In both these cases, the request has been removed
358 * from ct->ct_pending.
359 */
360 if (cr.cr_error) {
361 ct->ct_error.re_errno = cr.cr_error;
362 ct->ct_error.re_status = RPC_CANTRECV;
363 goto out;
364 }
365 if (cr.cr_mrep) {
366 goto got_reply;
367 }
368
369 /*
370 * Hack to provide rpc-based message passing
371 */
372 if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
373 if (cr.cr_xid)
374 TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
375 ct->ct_error.re_status = RPC_TIMEDOUT;
376 goto out;
377 }
378
379 error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
380 tvtohz(&timeout));
381
382 if (error) {
383 /*
384 * The sleep returned an error so our request is still
385 * on the list. Turn the error code into an
386 * appropriate client status.
387 */
388 if (cr.cr_xid)
389 TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
390 ct->ct_error.re_errno = error;
391 switch (error) {
392 case EINTR:
393 ct->ct_error.re_status = RPC_INTR;
394 break;
395 case EWOULDBLOCK:
396 ct->ct_error.re_status = RPC_TIMEDOUT;
397 break;
398 default:
399 ct->ct_error.re_status = RPC_CANTRECV;
400 }
401 goto out;
402 } else {
403 /*
404 * We were woken up by the upcall. If the
405 * upcall had a receive error, report that,
406 * otherwise we have a reply.
407 */
408 if (cr.cr_error) {
409 ct->ct_error.re_errno = cr.cr_error;
410 ct->ct_error.re_status = RPC_CANTRECV;
411 goto out;
412 }
413 }
414
415 got_reply:
416 /*
417 * Now decode and validate the response. We need to drop the
418 * lock since xdr_replymsg may end up sleeping in malloc.
419 */
420 mtx_unlock(&ct->ct_lock);
421
422 xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
423 ok = xdr_replymsg(&xdrs, &reply_msg);
424 XDR_DESTROY(&xdrs);
425 cr.cr_mrep = NULL;
426
427 mtx_lock(&ct->ct_lock);
428
429 if (ok) {
430 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
431 (reply_msg.acpted_rply.ar_stat == SUCCESS))
432 ct->ct_error.re_status = RPC_SUCCESS;
433 else
434 _seterr_reply(&reply_msg, &(ct->ct_error));
435
436 if (ct->ct_error.re_status == RPC_SUCCESS) {
437 if (! AUTH_VALIDATE(cl->cl_auth,
438 &reply_msg.acpted_rply.ar_verf)) {
439 ct->ct_error.re_status = RPC_AUTHERROR;
440 ct->ct_error.re_why = AUTH_INVALIDRESP;
441 }
442 if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
443 xdrs.x_op = XDR_FREE;
444 (void) xdr_opaque_auth(&xdrs,
445 &(reply_msg.acpted_rply.ar_verf));
446 }
447 } /* end successful completion */
448 /*
449 * If unsuccesful AND error is an authentication error
450 * then refresh credentials and try again, else break
451 */
452 else if (ct->ct_error.re_status == RPC_AUTHERROR)
453 /* maybe our credentials need to be refreshed ... */
454 if (nrefreshes > 0 &&
455 AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
456 nrefreshes--;
457 goto call_again;
458 }
459 /* end of unsuccessful completion */
460 } /* end of valid reply message */
461 else {
462 ct->ct_error.re_status = RPC_CANTDECODERES;
463 }
464 out:
465 mtx_assert(&ct->ct_lock, MA_OWNED);
466
467 if (mreq)
468 m_freem(mreq);
469 if (cr.cr_mrep)
470 m_freem(cr.cr_mrep);
471
472 mtx_unlock(&ct->ct_lock);
473 return (ct->ct_error.re_status);
474 }
475
476 static void
477 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
478 {
479 struct ct_data *ct = (struct ct_data *) cl->cl_private;
480
481 *errp = ct->ct_error;
482 }
483
484 static bool_t
485 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
486 {
487 XDR xdrs;
488 bool_t dummy;
489
490 xdrs.x_op = XDR_FREE;
491 dummy = (*xdr_res)(&xdrs, res_ptr);
492
493 return (dummy);
494 }
495
496 /*ARGSUSED*/
497 static void
498 clnt_vc_abort(CLIENT *cl)
499 {
500 }
501
502 static bool_t
503 clnt_vc_control(CLIENT *cl, u_int request, void *info)
504 {
505 struct ct_data *ct = (struct ct_data *)cl->cl_private;
506 void *infop = info;
507
508 mtx_lock(&ct->ct_lock);
509
510 switch (request) {
511 case CLSET_FD_CLOSE:
512 ct->ct_closeit = TRUE;
513 mtx_unlock(&ct->ct_lock);
514 return (TRUE);
515 case CLSET_FD_NCLOSE:
516 ct->ct_closeit = FALSE;
517 mtx_unlock(&ct->ct_lock);
518 return (TRUE);
519 default:
520 break;
521 }
522
523 /* for other requests which use info */
524 if (info == NULL) {
525 mtx_unlock(&ct->ct_lock);
526 return (FALSE);
527 }
528 switch (request) {
529 case CLSET_TIMEOUT:
530 if (time_not_ok((struct timeval *)info)) {
531 mtx_unlock(&ct->ct_lock);
532 return (FALSE);
533 }
534 ct->ct_wait = *(struct timeval *)infop;
535 break;
536 case CLGET_TIMEOUT:
537 *(struct timeval *)infop = ct->ct_wait;
538 break;
539 case CLGET_SERVER_ADDR:
540 (void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
541 break;
542 case CLGET_SVC_ADDR:
543 /*
544 * Slightly different semantics to userland - we use
545 * sockaddr instead of netbuf.
546 */
547 memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
548 break;
549 case CLSET_SVC_ADDR: /* set to new address */
550 mtx_unlock(&ct->ct_lock);
551 return (FALSE);
552 case CLGET_XID:
553 *(uint32_t *)info = ct->ct_xid;
554 break;
555 case CLSET_XID:
556 /* This will set the xid of the NEXT call */
557 /* decrement by 1 as clnt_vc_call() increments once */
558 ct->ct_xid = *(uint32_t *)info - 1;
559 break;
560 case CLGET_VERS:
561 /*
562 * This RELIES on the information that, in the call body,
563 * the version number field is the fifth field from the
564 * begining of the RPC header. MUST be changed if the
565 * call_struct is changed
566 */
567 *(uint32_t *)info =
568 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
569 4 * BYTES_PER_XDR_UNIT));
570 break;
571
572 case CLSET_VERS:
573 *(uint32_t *)(void *)(ct->ct_mcallc +
574 4 * BYTES_PER_XDR_UNIT) =
575 htonl(*(uint32_t *)info);
576 break;
577
578 case CLGET_PROG:
579 /*
580 * This RELIES on the information that, in the call body,
581 * the program number field is the fourth field from the
582 * begining of the RPC header. MUST be changed if the
583 * call_struct is changed
584 */
585 *(uint32_t *)info =
586 ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
587 3 * BYTES_PER_XDR_UNIT));
588 break;
589
590 case CLSET_PROG:
591 *(uint32_t *)(void *)(ct->ct_mcallc +
592 3 * BYTES_PER_XDR_UNIT) =
593 htonl(*(uint32_t *)info);
594 break;
595
596 case CLSET_WAITCHAN:
597 ct->ct_waitchan = *(const char **)info;
598 break;
599
600 case CLGET_WAITCHAN:
601 *(const char **) info = ct->ct_waitchan;
602 break;
603
604 case CLSET_INTERRUPTIBLE:
605 if (*(int *) info)
606 ct->ct_waitflag = PCATCH;
607 else
608 ct->ct_waitflag = 0;
609 break;
610
611 case CLGET_INTERRUPTIBLE:
612 if (ct->ct_waitflag)
613 *(int *) info = TRUE;
614 else
615 *(int *) info = FALSE;
616 break;
617
618 default:
619 mtx_unlock(&ct->ct_lock);
620 return (FALSE);
621 }
622
623 mtx_unlock(&ct->ct_lock);
624 return (TRUE);
625 }
626
627 static void
628 clnt_vc_destroy(CLIENT *cl)
629 {
630 struct ct_data *ct = (struct ct_data *) cl->cl_private;
631 struct socket *so = NULL;
632
633 mtx_lock(&ct->ct_lock);
634
635 if (ct->ct_socket) {
636 SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
637 ct->ct_socket->so_upcallarg = NULL;
638 ct->ct_socket->so_upcall = NULL;
639 ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
640 SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
641
642 KASSERT(!TAILQ_FIRST(&ct->ct_pending),
643 ("Destroying RPC client with pending RPC requests"));
644
645 if (ct->ct_closeit) {
646 so = ct->ct_socket;
647 }
648 }
649
650 mtx_unlock(&ct->ct_lock);
651
652 mtx_destroy(&ct->ct_lock);
653 if (so) {
654 soshutdown(so, SHUT_WR);
655 soclose(so);
656 }
657 mem_free(ct, sizeof(struct ct_data));
658 mem_free(cl, sizeof(CLIENT));
659 }
660
661 /*
662 * Make sure that the time is not garbage. -1 value is disallowed.
663 * Note this is different from time_not_ok in clnt_dg.c
664 */
665 static bool_t
666 time_not_ok(struct timeval *t)
667 {
668 return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
669 t->tv_usec <= -1 || t->tv_usec > 1000000);
670 }
671
672 void
673 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
674 {
675 struct ct_data *ct = (struct ct_data *) arg;
676 struct uio uio;
677 struct mbuf *m;
678 struct ct_request *cr;
679 int error, rcvflag, foundreq;
680 uint32_t xid, header;
681
682 uio.uio_td = curthread;
683 do {
684 /*
685 * If ct_record_resid is zero, we are waiting for a
686 * record mark.
687 */
688 if (ct->ct_record_resid == 0) {
689 bool_t do_read;
690
691 /*
692 * Make sure there is either a whole record
693 * mark in the buffer or there is some other
694 * error condition
695 */
696 do_read = FALSE;
697 SOCKBUF_LOCK(&so->so_rcv);
698 if (so->so_rcv.sb_cc >= sizeof(uint32_t)
699 || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
700 || so->so_error)
701 do_read = TRUE;
702 SOCKBUF_UNLOCK(&so->so_rcv);
703
704 if (!do_read)
705 return;
706
707 uio.uio_resid = sizeof(uint32_t);
708 m = NULL;
709 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
710 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
711
712 if (error == EWOULDBLOCK)
713 break;
714
715 /*
716 * If there was an error, wake up all pending
717 * requests.
718 */
719 if (error || uio.uio_resid > 0) {
720 wakeup_all:
721 mtx_lock(&ct->ct_lock);
722 if (!error) {
723 /*
724 * We must have got EOF trying
725 * to read from the stream.
726 */
727 error = ECONNRESET;
728 }
729 ct->ct_error.re_status = RPC_CANTRECV;
730 ct->ct_error.re_errno = error;
731 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
732 cr->cr_error = error;
733 wakeup(cr);
734 }
735 TAILQ_INIT(&ct->ct_pending);
736 mtx_unlock(&ct->ct_lock);
737 break;
738 }
739 memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
740 header = ntohl(header);
741 ct->ct_record = NULL;
742 ct->ct_record_resid = header & 0x7fffffff;
743 ct->ct_record_eor = ((header & 0x80000000) != 0);
744 m_freem(m);
745 } else {
746 /*
747 * We have the record mark. Read as much as
748 * the socket has buffered up to the end of
749 * this record.
750 */
751 uio.uio_resid = ct->ct_record_resid;
752 m = NULL;
753 rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
754 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
755
756 if (error == EWOULDBLOCK)
757 break;
758
759 if (error || uio.uio_resid == ct->ct_record_resid)
760 goto wakeup_all;
761
762 /*
763 * If we have part of the record already,
764 * chain this bit onto the end.
765 */
766 if (ct->ct_record)
767 m_last(ct->ct_record)->m_next = m;
768 else
769 ct->ct_record = m;
770
771 ct->ct_record_resid = uio.uio_resid;
772
773 /*
774 * If we have the entire record, see if we can
775 * match it to a request.
776 */
777 if (ct->ct_record_resid == 0
778 && ct->ct_record_eor) {
779 /*
780 * The XID is in the first uint32_t of
781 * the reply.
782 */
783 ct->ct_record =
784 m_pullup(ct->ct_record, sizeof(xid));
785 if (!ct->ct_record)
786 break;
787 memcpy(&xid,
788 mtod(ct->ct_record, uint32_t *),
789 sizeof(uint32_t));
790 xid = ntohl(xid);
791
792 mtx_lock(&ct->ct_lock);
793 foundreq = 0;
794 TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
795 if (cr->cr_xid == xid) {
796 /*
797 * This one
798 * matches. We snip it
799 * out of the pending
800 * list and leave the
801 * reply mbuf in
802 * cr->cr_mrep. Set
803 * the XID to zero so
804 * that clnt_vc_call
805 * can know not to
806 * repeat the
807 * TAILQ_REMOVE.
808 */
809 TAILQ_REMOVE(&ct->ct_pending,
810 cr, cr_link);
811 cr->cr_xid = 0;
812 cr->cr_mrep = ct->ct_record;
813 cr->cr_error = 0;
814 foundreq = 1;
815 wakeup(cr);
816 break;
817 }
818 }
819 mtx_unlock(&ct->ct_lock);
820
821 if (!foundreq)
822 m_freem(ct->ct_record);
823 ct->ct_record = NULL;
824 }
825 }
826 } while (m);
827 }
Cache object: a3716da0bf1c4f6f940938aa70313013
|