[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_dg.c

Version: -  FREEBSD  -  FREEBSD7  -  FREEBSD70  -  FREEBSD6  -  FREEBSD63  -  FREEBSD62  -  FREEBSD61  -  FREEBSD60  -  FREEBSD5  -  FREEBSD55  -  FREEBSD54  -  FREEBSD53  -  FREEBSD52  -  FREEBSD51  -  FREEBSD50  -  FREEBSD4  -  FREEBSD3  -  FREEBSD22  -  linux-2.6  -  linux-2.4.22  -  MK83  -  MK84  -  PLAN9  -  DFBSD  -  NETBSD  -  NETBSD4  -  NETBSD3  -  NETBSD20  -  OPENBSD  -  xnu-517  -  xnu-792  -  xnu-792.6.70  -  xnu-1228  -  OPENSOLARIS  -  minix-3-1-1  -  TRUSTEDBSD-SEBSD  -  FREEBSD-LIBC  -  FREEBSD7-LIBC  -  FREEBSD6-LIBC  -  GLIBC27 
SearchContext: -  none  -  excerpts  -  bigexcerpts 

  1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
  2 
  3 /*
  4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
  5  * unrestricted use provided that this legend is included on all tape
  6  * media and as a part of the software program in whole or part.  Users
  7  * may copy or modify Sun RPC without charge, but are not authorized
  8  * to license or distribute it to anyone else except as part of a product or
  9  * program developed by the user.
 10  * 
 11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
 12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
 13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
 14  * 
 15  * Sun RPC is provided with no support and without any obligation on the
 16  * part of Sun Microsystems, Inc. to assist in its use, correction,
 17  * modification or enhancement.
 18  * 
 19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
 20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
 21  * OR ANY PART THEREOF.
 22  * 
 23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
 24  * or profits or other special, indirect and consequential damages, even if
 25  * Sun has been advised of the possibility of such damages.
 26  * 
 27  * Sun Microsystems, Inc.
 28  * 2550 Garcia Avenue
 29  * Mountain View, California  94043
 30  */
 31 /*
 32  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
 33  */
 34 
 35 #if defined(LIBC_SCCS) && !defined(lint)
 36 #ident  "@(#)clnt_dg.c  1.23    94/04/22 SMI"
 37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
 38 #endif
 39 #include <sys/cdefs.h>
 40 __FBSDID("$FreeBSD: src/sys/rpc/clnt_dg.c,v 1.4 2008/11/03 10:38:00 dfr Exp $");
 41 
 42 /*
 43  * Implements a connectionless client side RPC.
 44  */
 45 
 46 #include <sys/param.h>
 47 #include <sys/systm.h>
 48 #include <sys/kernel.h>
 49 #include <sys/lock.h>
 50 #include <sys/malloc.h>
 51 #include <sys/mbuf.h>
 52 #include <sys/mutex.h>
 53 #include <sys/pcpu.h>
 54 #include <sys/proc.h>
 55 #include <sys/socket.h>
 56 #include <sys/socketvar.h>
 57 #include <sys/time.h>
 58 #include <sys/uio.h>
 59 
 60 #include <rpc/rpc.h>
 61 #include <rpc/rpc_com.h>
 62 
 63 
 64 #ifdef _FREEFALL_CONFIG
 65 /*
 66  * Disable RPC exponential back-off for FreeBSD.org systems.
 67  */
 68 #define RPC_MAX_BACKOFF         1 /* second */
 69 #else
 70 #define RPC_MAX_BACKOFF         30 /* seconds */
 71 #endif
 72 
 73 static bool_t time_not_ok(struct timeval *);
 74 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
 75     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
 76 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
 77 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
 78 static void clnt_dg_abort(CLIENT *);
 79 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
 80 static void clnt_dg_close(CLIENT *);
 81 static void clnt_dg_destroy(CLIENT *);
 82 static void clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
 83 
 84 static struct clnt_ops clnt_dg_ops = {
 85         .cl_call =      clnt_dg_call,
 86         .cl_abort =     clnt_dg_abort,
 87         .cl_geterr =    clnt_dg_geterr,
 88         .cl_freeres =   clnt_dg_freeres,
 89         .cl_close =     clnt_dg_close,
 90         .cl_destroy =   clnt_dg_destroy,
 91         .cl_control =   clnt_dg_control
 92 };
 93 
 94 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
 95 
 96 /*
 97  * A pending RPC request which awaits a reply. Requests which have
 98  * received their reply will have cr_xid set to zero and cr_mrep to
 99  * the mbuf chain of the reply.
100  */
101 struct cu_request {
102         TAILQ_ENTRY(cu_request) cr_link;
103         CLIENT                  *cr_client;     /* owner */
104         uint32_t                cr_xid;         /* XID of request */
105         struct mbuf             *cr_mrep;       /* reply received by upcall */
106         int                     cr_error;       /* any error from upcall */
107         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
108 };
109 
110 TAILQ_HEAD(cu_request_list, cu_request);
111 
112 #define MCALL_MSG_SIZE 24
113 
114 /*
115  * This structure is pointed to by the socket's so_upcallarg
116  * member. It is separate from the client private data to facilitate
117  * multiple clients sharing the same socket. The cs_lock mutex is used
118  * to protect all fields of this structure, the socket's receive
119  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
120  * structures is installed on the socket.
121  */
122 struct cu_socket {
123         struct mtx              cs_lock;
124         int                     cs_refs;        /* Count of clients */
125         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
126 };
127 
128 /*
129  * Private data kept per client handle
130  */
131 struct cu_data {
132         int                     cu_threads;     /* # threads in clnt_vc_call */
133         bool_t                  cu_closing;     /* TRUE if we are closing */
134         bool_t                  cu_closed;      /* TRUE if we are closed */
135         struct socket           *cu_socket;     /* connection socket */
136         bool_t                  cu_closeit;     /* opened by library */
137         struct sockaddr_storage cu_raddr;       /* remote address */
138         int                     cu_rlen;
139         struct timeval          cu_wait;        /* retransmit interval */
140         struct timeval          cu_total;       /* total time for the call */
141         struct rpc_err          cu_error;
142         uint32_t                cu_xid;
143         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
144         size_t                  cu_mcalllen;
145         size_t                  cu_sendsz;      /* send size */
146         size_t                  cu_recvsz;      /* recv size */
147         int                     cu_async;
148         int                     cu_connect;     /* Use connect(). */
149         int                     cu_connected;   /* Have done connect(). */
150         const char              *cu_waitchan;
151         int                     cu_waitflag;
152         int                     cu_cwnd;        /* congestion window */
153         int                     cu_sent;        /* number of in-flight RPCs */
154         bool_t                  cu_cwnd_wait;
155 };
156 
157 #define CWNDSCALE       256
158 #define MAXCWND         (32 * CWNDSCALE)
159 
160 /*
161  * Connection less client creation returns with client handle parameters.
162  * Default options are set, which the user can change using clnt_control().
163  * fd should be open and bound.
164  * NB: The rpch->cl_auth is initialized to null authentication.
165  *      Caller may wish to set this something more useful.
166  *
167  * sendsz and recvsz are the maximum allowable packet sizes that can be
168  * sent and received. Normally they are the same, but they can be
169  * changed to improve the program efficiency and buffer allocation.
170  * If they are 0, use the transport default.
171  *
172  * If svcaddr is NULL, returns NULL.
173  */
174 CLIENT *
175 clnt_dg_create(
176         struct socket *so,
177         struct sockaddr *svcaddr,       /* servers address */
178         rpcprog_t program,              /* program number */
179         rpcvers_t version,              /* version number */
180         size_t sendsz,                  /* buffer recv size */
181         size_t recvsz)                  /* buffer send size */
182 {
183         CLIENT *cl = NULL;              /* client handle */
184         struct cu_data *cu = NULL;      /* private data */
185         struct cu_socket *cs = NULL;
186         struct timeval now;
187         struct rpc_msg call_msg;
188         struct __rpc_sockinfo si;
189         XDR xdrs;
190 
191         if (svcaddr == NULL) {
192                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
193                 return (NULL);
194         }
195 
196         if (!__rpc_socket2sockinfo(so, &si)) {
197                 rpc_createerr.cf_stat = RPC_TLIERROR;
198                 rpc_createerr.cf_error.re_errno = 0;
199                 return (NULL);
200         }
201 
202         /*
203          * Find the receive and the send size
204          */
205         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
206         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
207         if ((sendsz == 0) || (recvsz == 0)) {
208                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
209                 rpc_createerr.cf_error.re_errno = 0;
210                 return (NULL);
211         }
212 
213         cl = mem_alloc(sizeof (CLIENT));
214 
215         /*
216          * Should be multiple of 4 for XDR.
217          */
218         sendsz = ((sendsz + 3) / 4) * 4;
219         recvsz = ((recvsz + 3) / 4) * 4;
220         cu = mem_alloc(sizeof (*cu));
221         cu->cu_threads = 0;
222         cu->cu_closing = FALSE;
223         cu->cu_closed = FALSE;
224         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
225         cu->cu_rlen = svcaddr->sa_len;
226         /* Other values can also be set through clnt_control() */
227         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
228         cu->cu_wait.tv_usec = 0;
229         cu->cu_total.tv_sec = -1;
230         cu->cu_total.tv_usec = -1;
231         cu->cu_sendsz = sendsz;
232         cu->cu_recvsz = recvsz;
233         cu->cu_async = FALSE;
234         cu->cu_connect = FALSE;
235         cu->cu_connected = FALSE;
236         cu->cu_waitchan = "rpcrecv";
237         cu->cu_waitflag = 0;
238         cu->cu_cwnd = MAXCWND / 2;
239         cu->cu_sent = 0;
240         cu->cu_cwnd_wait = FALSE;
241         (void) getmicrotime(&now);
242         cu->cu_xid = __RPC_GETXID(&now);
243         call_msg.rm_xid = cu->cu_xid;
244         call_msg.rm_call.cb_prog = program;
245         call_msg.rm_call.cb_vers = version;
246         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
247         if (! xdr_callhdr(&xdrs, &call_msg)) {
248                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
249                 rpc_createerr.cf_error.re_errno = 0;
250                 goto err2;
251         }
252         cu->cu_mcalllen = XDR_GETPOS(&xdrs);;
253 
254         /*
255          * By default, closeit is always FALSE. It is users responsibility
256          * to do a close on it, else the user may use clnt_control
257          * to let clnt_destroy do it for him/her.
258          */
259         cu->cu_closeit = FALSE;
260         cu->cu_socket = so;
261         soreserve(so, 256*1024, 256*1024);
262 
263         SOCKBUF_LOCK(&so->so_rcv);
264 recheck_socket:
265         if (so->so_upcall) {
266                 if (so->so_upcall != clnt_dg_soupcall) {
267                         SOCKBUF_UNLOCK(&so->so_rcv);
268                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
269                         goto err2;
270                 }
271                 cs = (struct cu_socket *) so->so_upcallarg;
272                 mtx_lock(&cs->cs_lock);
273                 cs->cs_refs++;
274                 mtx_unlock(&cs->cs_lock);
275         } else {
276                 /*
277                  * We are the first on this socket - allocate the
278                  * structure and install it in the socket.
279                  */
280                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
281                 cs = mem_alloc(sizeof(*cs));
282                 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
283                 if (so->so_upcall) {
284                         /*
285                          * We have lost a race with some other client.
286                          */
287                         mem_free(cs, sizeof(*cs));
288                         goto recheck_socket;
289                 }
290                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
291                 cs->cs_refs = 1;
292                 TAILQ_INIT(&cs->cs_pending);
293                 so->so_upcallarg = cs;
294                 so->so_upcall = clnt_dg_soupcall;
295                 so->so_rcv.sb_flags |= SB_UPCALL;
296         }
297         SOCKBUF_UNLOCK(&so->so_rcv);
298 
299         cl->cl_refs = 1;
300         cl->cl_ops = &clnt_dg_ops;
301         cl->cl_private = (caddr_t)(void *)cu;
302         cl->cl_auth = authnone_create();
303         cl->cl_tp = NULL;
304         cl->cl_netid = NULL;
305         return (cl);
306 err2:
307         if (cl) {
308                 mem_free(cl, sizeof (CLIENT));
309                 if (cu)
310                         mem_free(cu, sizeof (*cu));
311         }
312         return (NULL);
313 }
314 
315 static enum clnt_stat
316 clnt_dg_call(
317         CLIENT          *cl,            /* client handle */
318         struct rpc_callextra *ext,      /* call metadata */
319         rpcproc_t       proc,           /* procedure number */
320         struct mbuf     *args,          /* pointer to args */
321         struct mbuf     **resultsp,     /* pointer to results */
322         struct timeval  utimeout)       /* seconds to wait before giving up */
323 {
324         struct cu_data *cu = (struct cu_data *)cl->cl_private;
325         struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
326         struct rpc_timers *rt;
327         AUTH *auth;
328         struct rpc_err *errp;
329         enum clnt_stat stat;
330         XDR xdrs;
331         struct rpc_msg reply_msg;
332         bool_t ok;
333         int retrans;                    /* number of re-transmits so far */
334         int nrefreshes = 2;             /* number of times to refresh cred */
335         struct timeval *tvp;
336         int timeout;
337         int retransmit_time;
338         int next_sendtime, starttime, rtt, time_waited, tv = 0;
339         struct sockaddr *sa;
340         socklen_t salen;
341         uint32_t xid = 0;
342         struct mbuf *mreq = NULL, *results;
343         struct cu_request *cr;
344         int error;
345 
346         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
347 
348         mtx_lock(&cs->cs_lock);
349 
350         if (cu->cu_closing || cu->cu_closed) {
351                 mtx_unlock(&cs->cs_lock);
352                 free(cr, M_RPC);
353                 return (RPC_CANTSEND);
354         }
355         cu->cu_threads++;
356 
357         if (ext) {
358                 auth = ext->rc_auth;
359                 errp = &ext->rc_err;
360         } else {
361                 auth = cl->cl_auth;
362                 errp = &cu->cu_error;
363         }
364 
365         cr->cr_client = cl;
366         cr->cr_mrep = NULL;
367         cr->cr_error = 0;
368 
369         if (cu->cu_total.tv_usec == -1) {
370                 tvp = &utimeout; /* use supplied timeout */
371         } else {
372                 tvp = &cu->cu_total; /* use default timeout */
373         }
374         if (tvp->tv_sec || tvp->tv_usec)
375                 timeout = tvtohz(tvp);
376         else
377                 timeout = 0;
378 
379         if (cu->cu_connect && !cu->cu_connected) {
380                 mtx_unlock(&cs->cs_lock);
381                 error = soconnect(cu->cu_socket,
382                     (struct sockaddr *)&cu->cu_raddr, curthread);
383                 mtx_lock(&cs->cs_lock);
384                 if (error) {
385                         errp->re_errno = error;
386                         errp->re_status = stat = RPC_CANTSEND;
387                         goto out;
388                 }
389                 cu->cu_connected = 1;
390         }
391         if (cu->cu_connected) {
392                 sa = NULL;
393                 salen = 0;
394         } else {
395                 sa = (struct sockaddr *)&cu->cu_raddr;
396                 salen = cu->cu_rlen;
397         }
398         time_waited = 0;
399         retrans = 0;
400         if (ext && ext->rc_timers) {
401                 rt = ext->rc_timers;
402                 if (!rt->rt_rtxcur)
403                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
404                 retransmit_time = next_sendtime = rt->rt_rtxcur;
405         } else {
406                 rt = NULL;
407                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
408         }
409 
410         starttime = ticks;
411 
412 call_again:
413         mtx_assert(&cs->cs_lock, MA_OWNED);
414 
415         cu->cu_xid++;
416         xid = cu->cu_xid;
417 
418 send_again:
419         mtx_unlock(&cs->cs_lock);
420 
421         MGETHDR(mreq, M_WAIT, MT_DATA);
422         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
423         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
424         mreq->m_len = cu->cu_mcalllen;
425 
426         /*
427          * The XID is the first thing in the request.
428          */
429         *mtod(mreq, uint32_t *) = htonl(xid);
430 
431         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
432 
433         if (cu->cu_async == TRUE && args == NULL)
434                 goto get_reply;
435 
436         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
437             (! AUTH_MARSHALL(auth, xid, &xdrs,
438                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
439                 errp->re_status = stat = RPC_CANTENCODEARGS;
440                 mtx_lock(&cs->cs_lock);
441                 goto out;
442         }
443         mreq->m_pkthdr.len = m_length(mreq, NULL);
444 
445         cr->cr_xid = xid;
446         mtx_lock(&cs->cs_lock);
447 
448         /*
449          * Try to get a place in the congestion window.
450          */
451         while (cu->cu_sent >= cu->cu_cwnd) {
452                 cu->cu_cwnd_wait = TRUE;
453                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
454                     cu->cu_waitflag, "rpccwnd", 0);
455                 if (error) {
456                         errp->re_errno = error;
457                         errp->re_status = stat = RPC_CANTSEND;
458                         goto out;
459                 }
460         }
461         cu->cu_sent += CWNDSCALE;
462 
463         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
464         mtx_unlock(&cs->cs_lock);
465 
466         /*
467          * sosend consumes mreq.
468          */
469         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
470         mreq = NULL;
471 
472         /*
473          * sub-optimal code appears here because we have
474          * some clock time to spare while the packets are in flight.
475          * (We assume that this is actually only executed once.)
476          */
477         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
478         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
479         reply_msg.acpted_rply.ar_verf.oa_length = 0;
480         reply_msg.acpted_rply.ar_results.where = NULL;
481         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
482 
483         mtx_lock(&cs->cs_lock);
484         if (error) {
485                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
486                 errp->re_errno = error;
487                 errp->re_status = stat = RPC_CANTSEND;
488                 cu->cu_sent -= CWNDSCALE;
489                 if (cu->cu_cwnd_wait) {
490                         cu->cu_cwnd_wait = FALSE;
491                         wakeup(&cu->cu_cwnd_wait);
492                 }
493                 goto out;
494         }
495 
496         /*
497          * Check to see if we got an upcall while waiting for the
498          * lock.
499          */
500         if (cr->cr_error) {
501                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
502                 errp->re_errno = cr->cr_error;
503                 errp->re_status = stat = RPC_CANTRECV;
504                 cu->cu_sent -= CWNDSCALE;
505                 if (cu->cu_cwnd_wait) {
506                         cu->cu_cwnd_wait = FALSE;
507                         wakeup(&cu->cu_cwnd_wait);
508                 }
509                 goto out;
510         }
511         if (cr->cr_mrep) {
512                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
513                 cu->cu_sent -= CWNDSCALE;
514                 if (cu->cu_cwnd_wait) {
515                         cu->cu_cwnd_wait = FALSE;
516                         wakeup(&cu->cu_cwnd_wait);
517                 }
518                 goto got_reply;
519         }
520 
521         /*
522          * Hack to provide rpc-based message passing
523          */
524         if (timeout == 0) {
525                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
526                 errp->re_status = stat = RPC_TIMEDOUT;
527                 cu->cu_sent -= CWNDSCALE;
528                 if (cu->cu_cwnd_wait) {
529                         cu->cu_cwnd_wait = FALSE;
530                         wakeup(&cu->cu_cwnd_wait);
531                 }
532                 goto out;
533         }
534 
535 get_reply:
536         for (;;) {
537                 /* Decide how long to wait. */
538                 if (next_sendtime < timeout)
539                         tv = next_sendtime;
540                 else
541                         tv = timeout;
542                 tv -= time_waited;
543 
544                 if (tv > 0) {
545                         if (cu->cu_closing || cu->cu_closed)
546                                 error = 0;
547                         else
548                                 error = msleep(cr, &cs->cs_lock,
549                                     cu->cu_waitflag, cu->cu_waitchan, tv);
550                 } else {
551                         error = EWOULDBLOCK;
552                 }
553 
554                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
555                 cu->cu_sent -= CWNDSCALE;
556                 if (cu->cu_cwnd_wait) {
557                         cu->cu_cwnd_wait = FALSE;
558                         wakeup(&cu->cu_cwnd_wait);
559                 }
560 
561                 if (!error) {
562                         /*
563                          * We were woken up by the upcall.  If the
564                          * upcall had a receive error, report that,
565                          * otherwise we have a reply.
566                          */
567                         if (cr->cr_error) {
568                                 errp->re_errno = cr->cr_error;
569                                 errp->re_status = stat = RPC_CANTRECV;
570                                 goto out;
571                         }
572 
573                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
574                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
575                         if (cu->cu_cwnd > MAXCWND)
576                                 cu->cu_cwnd = MAXCWND;
577 
578                         if (rt) {
579                                 /*
580                                  * Add one to the time since a tick
581                                  * count of N means that the actual
582                                  * time taken was somewhere between N
583                                  * and N+1.
584                                  */
585                                 rtt = ticks - starttime + 1;
586 
587                                 /*
588                                  * Update our estimate of the round
589                                  * trip time using roughly the
590                                  * algorithm described in RFC
591                                  * 2988. Given an RTT sample R:
592                                  *
593                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
594                                  * SRTT = (1-alpha) * SRTT + alpha * R
595                                  *
596                                  * where alpha = 0.125 and beta = 0.25.
597                                  *
598                                  * The initial retransmit timeout is
599                                  * SRTT + 4*RTTVAR and doubles on each
600                                  * retransmision.
601                                  */
602                                 if (rt->rt_srtt == 0) {
603                                         rt->rt_srtt = rtt;
604                                         rt->rt_deviate = rtt / 2;
605                                 } else {
606                                         int32_t error = rtt - rt->rt_srtt;
607                                         rt->rt_srtt += error / 8;
608                                         error = abs(error) - rt->rt_deviate;
609                                         rt->rt_deviate += error / 4;
610                                 }
611                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
612                         }
613 
614                         break;
615                 }
616 
617                 /*
618                  * The sleep returned an error so our request is still
619                  * on the list. If we got EWOULDBLOCK, we may want to
620                  * re-send the request.
621                  */
622                 if (error != EWOULDBLOCK) {
623                         errp->re_errno = error;
624                         if (error == EINTR)
625                                 errp->re_status = stat = RPC_INTR;
626                         else
627                                 errp->re_status = stat = RPC_CANTRECV;
628                         goto out;
629                 }
630 
631                 time_waited = ticks - starttime;
632 
633                 /* Check for timeout. */
634                 if (time_waited > timeout) {
635                         errp->re_errno = EWOULDBLOCK;
636                         errp->re_status = stat = RPC_TIMEDOUT;
637                         goto out;
638                 }
639 
640                 /* Retransmit if necessary. */          
641                 if (time_waited >= next_sendtime) {
642                         cu->cu_cwnd /= 2;
643                         if (cu->cu_cwnd < CWNDSCALE)
644                                 cu->cu_cwnd = CWNDSCALE;
645                         if (ext && ext->rc_feedback) {
646                                 mtx_unlock(&cs->cs_lock);
647                                 if (retrans == 0)
648                                         ext->rc_feedback(FEEDBACK_REXMIT1,
649                                             proc, ext->rc_feedback_arg);
650                                 else
651                                         ext->rc_feedback(FEEDBACK_REXMIT2,
652                                             proc, ext->rc_feedback_arg);
653                                 mtx_lock(&cs->cs_lock);
654                         }
655                         if (cu->cu_closing || cu->cu_closed) {
656                                 errp->re_errno = ESHUTDOWN;
657                                 errp->re_status = stat = RPC_CANTRECV;
658                                 goto out;
659                         }
660                         retrans++;
661                         /* update retransmit_time */
662                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
663                                 retransmit_time = 2 * retransmit_time;
664                         next_sendtime += retransmit_time;
665                         goto send_again;
666                 }
667                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
668         }
669 
670 got_reply:
671         /*
672          * Now decode and validate the response. We need to drop the
673          * lock since xdr_replymsg may end up sleeping in malloc.
674          */
675         mtx_unlock(&cs->cs_lock);
676 
677         if (ext && ext->rc_feedback)
678                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
679 
680         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
681         ok = xdr_replymsg(&xdrs, &reply_msg);
682         cr->cr_mrep = NULL;
683 
684         if (ok) {
685                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
686                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
687                         errp->re_status = stat = RPC_SUCCESS;
688                 else
689                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
690 
691                 if (errp->re_status == RPC_SUCCESS) {
692                         results = xdrmbuf_getall(&xdrs);
693                         if (! AUTH_VALIDATE(auth, xid,
694                                 &reply_msg.acpted_rply.ar_verf,
695                                 &results)) {
696                                 errp->re_status = stat = RPC_AUTHERROR;
697                                 errp->re_why = AUTH_INVALIDRESP;
698                                 if (retrans &&
699                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
700                                         /*
701                                          * If we retransmitted, its
702                                          * possible that we will
703                                          * receive a reply for one of
704                                          * the earlier transmissions
705                                          * (which will use an older
706                                          * RPCSEC_GSS sequence
707                                          * number). In this case, just
708                                          * go back and listen for a
709                                          * new reply. We could keep a
710                                          * record of all the seq
711                                          * numbers we have transmitted
712                                          * so far so that we could
713                                          * accept a reply for any of
714                                          * them here.
715                                          */
716                                         XDR_DESTROY(&xdrs);
717                                         mtx_lock(&cs->cs_lock);
718                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
719                                             cr, cr_link);
720                                         cr->cr_mrep = NULL;
721                                         goto get_reply;
722                                 }
723                         } else {
724                                 *resultsp = results;
725                         }
726                 }               /* end successful completion */
727                 /*
728                  * If unsuccesful AND error is an authentication error
729                  * then refresh credentials and try again, else break
730                  */
731                 else if (stat == RPC_AUTHERROR)
732                         /* maybe our credentials need to be refreshed ... */
733                         if (nrefreshes > 0 &&
734                             AUTH_REFRESH(auth, &reply_msg)) {
735                                 nrefreshes--;
736                                 XDR_DESTROY(&xdrs);
737                                 mtx_lock(&cs->cs_lock);
738                                 goto call_again;
739                         }
740                 /* end of unsuccessful completion */
741         }       /* end of valid reply message */
742         else {
743                 errp->re_status = stat = RPC_CANTDECODERES;
744 
745         }
746         XDR_DESTROY(&xdrs);
747         mtx_lock(&cs->cs_lock);
748 out:
749         mtx_assert(&cs->cs_lock, MA_OWNED);
750 
751         if (mreq)
752                 m_freem(mreq);
753         if (cr->cr_mrep)
754                 m_freem(cr->cr_mrep);
755 
756         cu->cu_threads--;
757         if (cu->cu_closing)
758                 wakeup(cu);
759                 
760         mtx_unlock(&cs->cs_lock);
761 
762         if (auth && stat != RPC_SUCCESS)
763                 AUTH_VALIDATE(auth, xid, NULL, NULL);
764 
765         free(cr, M_RPC);
766 
767         return (stat);
768 }
769 
770 static void
771 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
772 {
773         struct cu_data *cu = (struct cu_data *)cl->cl_private;
774 
775         *errp = cu->cu_error;
776 }
777 
778 static bool_t
779 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
780 {
781         XDR xdrs;
782         bool_t dummy;
783 
784         xdrs.x_op = XDR_FREE;
785         dummy = (*xdr_res)(&xdrs, res_ptr);
786 
787         return (dummy);
788 }
789 
790 /*ARGSUSED*/
791 static void
792 clnt_dg_abort(CLIENT *h)
793 {
794 }
795 
796 static bool_t
797 clnt_dg_control(CLIENT *cl, u_int request, void *info)
798 {
799         struct cu_data *cu = (struct cu_data *)cl->cl_private;
800         struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
801         struct sockaddr *addr;
802 
803         mtx_lock(&cs->cs_lock);
804 
805         switch (request) {
806         case CLSET_FD_CLOSE:
807                 cu->cu_closeit = TRUE;
808                 mtx_unlock(&cs->cs_lock);
809                 return (TRUE);
810         case CLSET_FD_NCLOSE:
811                 cu->cu_closeit = FALSE;
812                 mtx_unlock(&cs->cs_lock);
813                 return (TRUE);
814         }
815 
816         /* for other requests which use info */
817         if (info == NULL) {
818                 mtx_unlock(&cs->cs_lock);
819                 return (FALSE);
820         }
821         switch (request) {
822         case CLSET_TIMEOUT:
823                 if (time_not_ok((struct timeval *)info)) {
824                         mtx_unlock(&cs->cs_lock);
825                         return (FALSE);
826                 }
827                 cu->cu_total = *(struct timeval *)info;
828                 break;
829         case CLGET_TIMEOUT:
830                 *(struct timeval *)info = cu->cu_total;
831                 break;
832         case CLSET_RETRY_TIMEOUT:
833                 if (time_not_ok((struct timeval *)info)) {
834                         mtx_unlock(&cs->cs_lock);
835                         return (FALSE);
836                 }
837                 cu->cu_wait = *(struct timeval *)info;
838                 break;
839         case CLGET_RETRY_TIMEOUT:
840                 *(struct timeval *)info = cu->cu_wait;
841                 break;
842         case CLGET_SVC_ADDR:
843                 /*
844                  * Slightly different semantics to userland - we use
845                  * sockaddr instead of netbuf.
846                  */
847                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
848                 break;
849         case CLSET_SVC_ADDR:            /* set to new address */
850                 addr = (struct sockaddr *)info;
851                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
852                 break;
853         case CLGET_XID:
854                 *(uint32_t *)info = cu->cu_xid;
855                 break;
856 
857         case CLSET_XID:
858                 /* This will set the xid of the NEXT call */
859                 /* decrement by 1 as clnt_dg_call() increments once */
860                 cu->cu_xid = *(uint32_t *)info - 1;
861                 break;
862 
863         case CLGET_VERS:
864                 /*
865                  * This RELIES on the information that, in the call body,
866                  * the version number field is the fifth field from the
867                  * begining of the RPC header. MUST be changed if the
868                  * call_struct is changed
869                  */
870                 *(uint32_t *)info =
871                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
872                     4 * BYTES_PER_XDR_UNIT));
873                 break;
874 
875         case CLSET_VERS:
876                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
877                         = htonl(*(uint32_t *)info);
878                 break;
879 
880         case CLGET_PROG:
881                 /*
882                  * This RELIES on the information that, in the call body,
883                  * the program number field is the fourth field from the
884                  * begining of the RPC header. MUST be changed if the
885                  * call_struct is changed
886                  */
887                 *(uint32_t *)info =
888                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
889                     3 * BYTES_PER_XDR_UNIT));
890                 break;
891 
892         case CLSET_PROG:
893                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
894                         = htonl(*(uint32_t *)info);
895                 break;
896         case CLSET_ASYNC:
897                 cu->cu_async = *(int *)info;
898                 break;
899         case CLSET_CONNECT:
900                 cu->cu_connect = *(int *)info;
901                 break;
902         case CLSET_WAITCHAN:
903                 cu->cu_waitchan = (const char *)info;
904                 break;
905         case CLGET_WAITCHAN:
906                 *(const char **) info = cu->cu_waitchan;
907                 break;
908         case CLSET_INTERRUPTIBLE:
909                 if (*(int *) info)
910                         cu->cu_waitflag = PCATCH;
911                 else
912                         cu->cu_waitflag = 0;
913                 break;
914         case CLGET_INTERRUPTIBLE:
915                 if (cu->cu_waitflag)
916                         *(int *) info = TRUE;
917                 else
918                         *(int *) info = FALSE;
919                 break;
920         default:
921                 mtx_unlock(&cs->cs_lock);
922                 return (FALSE);
923         }
924         mtx_unlock(&cs->cs_lock);
925         return (TRUE);
926 }
927 
928 static void
929 clnt_dg_close(CLIENT *cl)
930 {
931         struct cu_data *cu = (struct cu_data *)cl->cl_private;
932         struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
933         struct cu_request *cr;
934 
935         mtx_lock(&cs->cs_lock);
936 
937         if (cu->cu_closed) {
938                 mtx_unlock(&cs->cs_lock);
939                 return;
940         }
941 
942         if (cu->cu_closing) {
943                 while (cu->cu_closing)
944                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
945                 KASSERT(cu->cu_closed, ("client should be closed"));
946                 mtx_unlock(&cs->cs_lock);
947                 return;
948         }
949 
950         /*
951          * Abort any pending requests and wait until everyone
952          * has finished with clnt_vc_call.
953          */
954         cu->cu_closing = TRUE;
955         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
956                 if (cr->cr_client == cl) {
957                         cr->cr_xid = 0;
958                         cr->cr_error = ESHUTDOWN;
959                         wakeup(cr);
960                 }
961         }
962 
963         while (cu->cu_threads)
964                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
965 
966         cu->cu_closing = FALSE;
967         cu->cu_closed = TRUE;
968 
969         mtx_unlock(&cs->cs_lock);
970         wakeup(cu);
971 }
972 
973 static void
974 clnt_dg_destroy(CLIENT *cl)
975 {
976         struct cu_data *cu = (struct cu_data *)cl->cl_private;
977         struct cu_socket *cs = (struct cu_socket *) cu->cu_socket->so_upcallarg;
978         struct socket *so = NULL;
979         bool_t lastsocketref;
980 
981         clnt_dg_close(cl);
982 
983         mtx_lock(&cs->cs_lock);
984 
985         cs->cs_refs--;
986         if (cs->cs_refs == 0) {
987                 mtx_destroy(&cs->cs_lock);
988                 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
989                 cu->cu_socket->so_upcallarg = NULL;
990                 cu->cu_socket->so_upcall = NULL;
991                 cu->cu_socket->so_rcv.sb_flags &= ~SB_UPCALL;
992                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
993                 mem_free(cs, sizeof(*cs));
994                 lastsocketref = TRUE;
995         } else {
996                 mtx_unlock(&cs->cs_lock);
997                 lastsocketref = FALSE;
998         }
999 
1000         if (cu->cu_closeit && lastsocketref) {
1001                 so = cu->cu_socket;
1002                 cu->cu_socket = NULL;
1003         }
1004 
1005         if (so)
1006                 soclose(so);
1007 
1008         if (cl->cl_netid && cl->cl_netid[0])
1009                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
1010         if (cl->cl_tp && cl->cl_tp[0])
1011                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
1012         mem_free(cu, sizeof (*cu));
1013         mem_free(cl, sizeof (CLIENT));
1014 }
1015 
1016 /*
1017  * Make sure that the time is not garbage.  -1 value is allowed.
1018  */
1019 static bool_t
1020 time_not_ok(struct timeval *t)
1021 {
1022         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
1023                 t->tv_usec < -1 || t->tv_usec > 1000000);
1024 }
1025 
1026 void
1027 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
1028 {
1029         struct cu_socket *cs = (struct cu_socket *) arg;
1030         struct uio uio;
1031         struct mbuf *m;
1032         struct mbuf *control;
1033         struct cu_request *cr;
1034         int error, rcvflag, foundreq;
1035         uint32_t xid;
1036 
1037         uio.uio_resid = 1000000000;
1038         uio.uio_td = curthread;
1039         do {
1040                 m = NULL;
1041                 control = NULL;
1042                 rcvflag = MSG_DONTWAIT;
1043                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
1044                 if (control)
1045                         m_freem(control);
1046 
1047                 if (error == EWOULDBLOCK)
1048                         break;
1049 
1050                 /*
1051                  * If there was an error, wake up all pending
1052                  * requests.
1053                  */
1054                 if (error) {
1055                         mtx_lock(&cs->cs_lock);
1056                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
1057