The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_dg.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
    2 
    3 /*
    4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
    5  * unrestricted use provided that this legend is included on all tape
    6  * media and as a part of the software program in whole or part.  Users
    7  * may copy or modify Sun RPC without charge, but are not authorized
    8  * to license or distribute it to anyone else except as part of a product or
    9  * program developed by the user.
   10  * 
   11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
   12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
   13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
   14  * 
   15  * Sun RPC is provided with no support and without any obligation on the
   16  * part of Sun Microsystems, Inc. to assist in its use, correction,
   17  * modification or enhancement.
   18  * 
   19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
   20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
   21  * OR ANY PART THEREOF.
   22  * 
   23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
   24  * or profits or other special, indirect and consequential damages, even if
   25  * Sun has been advised of the possibility of such damages.
   26  * 
   27  * Sun Microsystems, Inc.
   28  * 2550 Garcia Avenue
   29  * Mountain View, California  94043
   30  */
   31 /*
   32  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
   33  */
   34 
   35 #if defined(LIBC_SCCS) && !defined(lint)
   36 #ident  "@(#)clnt_dg.c  1.23    94/04/22 SMI"
   37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
   38 #endif
   39 #include <sys/cdefs.h>
   40 __FBSDID("$FreeBSD: releng/9.0/sys/rpc/clnt_dg.c 227631 2011-11-17 16:38:22Z rmacklem $");
   41 
   42 /*
   43  * Implements a connectionless client side RPC.
   44  */
   45 
   46 #include <sys/param.h>
   47 #include <sys/systm.h>
   48 #include <sys/kernel.h>
   49 #include <sys/lock.h>
   50 #include <sys/malloc.h>
   51 #include <sys/mbuf.h>
   52 #include <sys/mutex.h>
   53 #include <sys/pcpu.h>
   54 #include <sys/proc.h>
   55 #include <sys/socket.h>
   56 #include <sys/socketvar.h>
   57 #include <sys/time.h>
   58 #include <sys/uio.h>
   59 
   60 #include <net/vnet.h>
   61 
   62 #include <rpc/rpc.h>
   63 #include <rpc/rpc_com.h>
   64 
   65 
   66 #ifdef _FREEFALL_CONFIG
   67 /*
   68  * Disable RPC exponential back-off for FreeBSD.org systems.
   69  */
   70 #define RPC_MAX_BACKOFF         1 /* second */
   71 #else
   72 #define RPC_MAX_BACKOFF         30 /* seconds */
   73 #endif
   74 
   75 static bool_t time_not_ok(struct timeval *);
   76 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
   77     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
   78 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
   79 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
   80 static void clnt_dg_abort(CLIENT *);
   81 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
   82 static void clnt_dg_close(CLIENT *);
   83 static void clnt_dg_destroy(CLIENT *);
   84 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
   85 
   86 static struct clnt_ops clnt_dg_ops = {
   87         .cl_call =      clnt_dg_call,
   88         .cl_abort =     clnt_dg_abort,
   89         .cl_geterr =    clnt_dg_geterr,
   90         .cl_freeres =   clnt_dg_freeres,
   91         .cl_close =     clnt_dg_close,
   92         .cl_destroy =   clnt_dg_destroy,
   93         .cl_control =   clnt_dg_control
   94 };
   95 
   96 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
   97 
   98 /*
   99  * A pending RPC request which awaits a reply. Requests which have
  100  * received their reply will have cr_xid set to zero and cr_mrep to
  101  * the mbuf chain of the reply.
  102  */
  103 struct cu_request {
  104         TAILQ_ENTRY(cu_request) cr_link;
  105         CLIENT                  *cr_client;     /* owner */
  106         uint32_t                cr_xid;         /* XID of request */
  107         struct mbuf             *cr_mrep;       /* reply received by upcall */
  108         int                     cr_error;       /* any error from upcall */
  109         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
  110 };
  111 
  112 TAILQ_HEAD(cu_request_list, cu_request);
  113 
  114 #define MCALL_MSG_SIZE 24
  115 
  116 /*
  117  * This structure is pointed to by the socket buffer's sb_upcallarg
  118  * member. It is separate from the client private data to facilitate
  119  * multiple clients sharing the same socket. The cs_lock mutex is used
  120  * to protect all fields of this structure, the socket's receive
  121  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
  122  * structures is installed on the socket.
  123  */
  124 struct cu_socket {
  125         struct mtx              cs_lock;
  126         int                     cs_refs;        /* Count of clients */
  127         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
  128         int                     cs_upcallrefs;  /* Refcnt of upcalls in prog.*/
  129 };
  130 
  131 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
  132 
  133 /*
  134  * Private data kept per client handle
  135  */
  136 struct cu_data {
  137         int                     cu_threads;     /* # threads in clnt_vc_call */
  138         bool_t                  cu_closing;     /* TRUE if we are closing */
  139         bool_t                  cu_closed;      /* TRUE if we are closed */
  140         struct socket           *cu_socket;     /* connection socket */
  141         bool_t                  cu_closeit;     /* opened by library */
  142         struct sockaddr_storage cu_raddr;       /* remote address */
  143         int                     cu_rlen;
  144         struct timeval          cu_wait;        /* retransmit interval */
  145         struct timeval          cu_total;       /* total time for the call */
  146         struct rpc_err          cu_error;
  147         uint32_t                cu_xid;
  148         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
  149         size_t                  cu_mcalllen;
  150         size_t                  cu_sendsz;      /* send size */
  151         size_t                  cu_recvsz;      /* recv size */
  152         int                     cu_async;
  153         int                     cu_connect;     /* Use connect(). */
  154         int                     cu_connected;   /* Have done connect(). */
  155         const char              *cu_waitchan;
  156         int                     cu_waitflag;
  157         int                     cu_cwnd;        /* congestion window */
  158         int                     cu_sent;        /* number of in-flight RPCs */
  159         bool_t                  cu_cwnd_wait;
  160 };
  161 
  162 #define CWNDSCALE       256
  163 #define MAXCWND         (32 * CWNDSCALE)
  164 
  165 /*
  166  * Connection less client creation returns with client handle parameters.
  167  * Default options are set, which the user can change using clnt_control().
  168  * fd should be open and bound.
  169  * NB: The rpch->cl_auth is initialized to null authentication.
  170  *      Caller may wish to set this something more useful.
  171  *
  172  * sendsz and recvsz are the maximum allowable packet sizes that can be
  173  * sent and received. Normally they are the same, but they can be
  174  * changed to improve the program efficiency and buffer allocation.
  175  * If they are 0, use the transport default.
  176  *
  177  * If svcaddr is NULL, returns NULL.
  178  */
  179 CLIENT *
  180 clnt_dg_create(
  181         struct socket *so,
  182         struct sockaddr *svcaddr,       /* servers address */
  183         rpcprog_t program,              /* program number */
  184         rpcvers_t version,              /* version number */
  185         size_t sendsz,                  /* buffer recv size */
  186         size_t recvsz)                  /* buffer send size */
  187 {
  188         CLIENT *cl = NULL;              /* client handle */
  189         struct cu_data *cu = NULL;      /* private data */
  190         struct cu_socket *cs = NULL;
  191         struct sockbuf *sb;
  192         struct timeval now;
  193         struct rpc_msg call_msg;
  194         struct __rpc_sockinfo si;
  195         XDR xdrs;
  196         int error;
  197 
  198         if (svcaddr == NULL) {
  199                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
  200                 return (NULL);
  201         }
  202 
  203         if (!__rpc_socket2sockinfo(so, &si)) {
  204                 rpc_createerr.cf_stat = RPC_TLIERROR;
  205                 rpc_createerr.cf_error.re_errno = 0;
  206                 return (NULL);
  207         }
  208 
  209         /*
  210          * Find the receive and the send size
  211          */
  212         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
  213         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
  214         if ((sendsz == 0) || (recvsz == 0)) {
  215                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
  216                 rpc_createerr.cf_error.re_errno = 0;
  217                 return (NULL);
  218         }
  219 
  220         cl = mem_alloc(sizeof (CLIENT));
  221 
  222         /*
  223          * Should be multiple of 4 for XDR.
  224          */
  225         sendsz = ((sendsz + 3) / 4) * 4;
  226         recvsz = ((recvsz + 3) / 4) * 4;
  227         cu = mem_alloc(sizeof (*cu));
  228         cu->cu_threads = 0;
  229         cu->cu_closing = FALSE;
  230         cu->cu_closed = FALSE;
  231         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
  232         cu->cu_rlen = svcaddr->sa_len;
  233         /* Other values can also be set through clnt_control() */
  234         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
  235         cu->cu_wait.tv_usec = 0;
  236         cu->cu_total.tv_sec = -1;
  237         cu->cu_total.tv_usec = -1;
  238         cu->cu_sendsz = sendsz;
  239         cu->cu_recvsz = recvsz;
  240         cu->cu_async = FALSE;
  241         cu->cu_connect = FALSE;
  242         cu->cu_connected = FALSE;
  243         cu->cu_waitchan = "rpcrecv";
  244         cu->cu_waitflag = 0;
  245         cu->cu_cwnd = MAXCWND / 2;
  246         cu->cu_sent = 0;
  247         cu->cu_cwnd_wait = FALSE;
  248         (void) getmicrotime(&now);
  249         cu->cu_xid = __RPC_GETXID(&now);
  250         call_msg.rm_xid = cu->cu_xid;
  251         call_msg.rm_call.cb_prog = program;
  252         call_msg.rm_call.cb_vers = version;
  253         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
  254         if (! xdr_callhdr(&xdrs, &call_msg)) {
  255                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
  256                 rpc_createerr.cf_error.re_errno = 0;
  257                 goto err2;
  258         }
  259         cu->cu_mcalllen = XDR_GETPOS(&xdrs);
  260 
  261         /*
  262          * By default, closeit is always FALSE. It is users responsibility
  263          * to do a close on it, else the user may use clnt_control
  264          * to let clnt_destroy do it for him/her.
  265          */
  266         cu->cu_closeit = FALSE;
  267         cu->cu_socket = so;
  268         error = soreserve(so, (u_long)sendsz, (u_long)recvsz);
  269         if (error != 0) {
  270                 rpc_createerr.cf_stat = RPC_FAILED;
  271                 rpc_createerr.cf_error.re_errno = error;
  272                 goto err2;
  273         }
  274 
  275         sb = &so->so_rcv;
  276         SOCKBUF_LOCK(&so->so_rcv);
  277 recheck_socket:
  278         if (sb->sb_upcall) {
  279                 if (sb->sb_upcall != clnt_dg_soupcall) {
  280                         SOCKBUF_UNLOCK(&so->so_rcv);
  281                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
  282                         goto err2;
  283                 }
  284                 cs = (struct cu_socket *) sb->sb_upcallarg;
  285                 mtx_lock(&cs->cs_lock);
  286                 cs->cs_refs++;
  287                 mtx_unlock(&cs->cs_lock);
  288         } else {
  289                 /*
  290                  * We are the first on this socket - allocate the
  291                  * structure and install it in the socket.
  292                  */
  293                 SOCKBUF_UNLOCK(&so->so_rcv);
  294                 cs = mem_alloc(sizeof(*cs));
  295                 SOCKBUF_LOCK(&so->so_rcv);
  296                 if (sb->sb_upcall) {
  297                         /*
  298                          * We have lost a race with some other client.
  299                          */
  300                         mem_free(cs, sizeof(*cs));
  301                         goto recheck_socket;
  302                 }
  303                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
  304                 cs->cs_refs = 1;
  305                 cs->cs_upcallrefs = 0;
  306                 TAILQ_INIT(&cs->cs_pending);
  307                 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
  308         }
  309         SOCKBUF_UNLOCK(&so->so_rcv);
  310 
  311         cl->cl_refs = 1;
  312         cl->cl_ops = &clnt_dg_ops;
  313         cl->cl_private = (caddr_t)(void *)cu;
  314         cl->cl_auth = authnone_create();
  315         cl->cl_tp = NULL;
  316         cl->cl_netid = NULL;
  317         return (cl);
  318 err2:
  319         if (cl) {
  320                 mem_free(cl, sizeof (CLIENT));
  321                 if (cu)
  322                         mem_free(cu, sizeof (*cu));
  323         }
  324         return (NULL);
  325 }
  326 
  327 static enum clnt_stat
  328 clnt_dg_call(
  329         CLIENT          *cl,            /* client handle */
  330         struct rpc_callextra *ext,      /* call metadata */
  331         rpcproc_t       proc,           /* procedure number */
  332         struct mbuf     *args,          /* pointer to args */
  333         struct mbuf     **resultsp,     /* pointer to results */
  334         struct timeval  utimeout)       /* seconds to wait before giving up */
  335 {
  336         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  337         struct cu_socket *cs;
  338         struct rpc_timers *rt;
  339         AUTH *auth;
  340         struct rpc_err *errp;
  341         enum clnt_stat stat;
  342         XDR xdrs;
  343         struct rpc_msg reply_msg;
  344         bool_t ok;
  345         int retrans;                    /* number of re-transmits so far */
  346         int nrefreshes = 2;             /* number of times to refresh cred */
  347         struct timeval *tvp;
  348         int timeout;
  349         int retransmit_time;
  350         int next_sendtime, starttime, rtt, time_waited, tv = 0;
  351         struct sockaddr *sa;
  352         socklen_t salen;
  353         uint32_t xid = 0;
  354         struct mbuf *mreq = NULL, *results;
  355         struct cu_request *cr;
  356         int error;
  357 
  358         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  359         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
  360 
  361         mtx_lock(&cs->cs_lock);
  362 
  363         if (cu->cu_closing || cu->cu_closed) {
  364                 mtx_unlock(&cs->cs_lock);
  365                 free(cr, M_RPC);
  366                 return (RPC_CANTSEND);
  367         }
  368         cu->cu_threads++;
  369 
  370         if (ext) {
  371                 auth = ext->rc_auth;
  372                 errp = &ext->rc_err;
  373         } else {
  374                 auth = cl->cl_auth;
  375                 errp = &cu->cu_error;
  376         }
  377 
  378         cr->cr_client = cl;
  379         cr->cr_mrep = NULL;
  380         cr->cr_error = 0;
  381 
  382         if (cu->cu_total.tv_usec == -1) {
  383                 tvp = &utimeout; /* use supplied timeout */
  384         } else {
  385                 tvp = &cu->cu_total; /* use default timeout */
  386         }
  387         if (tvp->tv_sec || tvp->tv_usec)
  388                 timeout = tvtohz(tvp);
  389         else
  390                 timeout = 0;
  391 
  392         if (cu->cu_connect && !cu->cu_connected) {
  393                 mtx_unlock(&cs->cs_lock);
  394                 error = soconnect(cu->cu_socket,
  395                     (struct sockaddr *)&cu->cu_raddr, curthread);
  396                 mtx_lock(&cs->cs_lock);
  397                 if (error) {
  398                         errp->re_errno = error;
  399                         errp->re_status = stat = RPC_CANTSEND;
  400                         goto out;
  401                 }
  402                 cu->cu_connected = 1;
  403         }
  404         if (cu->cu_connected) {
  405                 sa = NULL;
  406                 salen = 0;
  407         } else {
  408                 sa = (struct sockaddr *)&cu->cu_raddr;
  409                 salen = cu->cu_rlen;
  410         }
  411         time_waited = 0;
  412         retrans = 0;
  413         if (ext && ext->rc_timers) {
  414                 rt = ext->rc_timers;
  415                 if (!rt->rt_rtxcur)
  416                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
  417                 retransmit_time = next_sendtime = rt->rt_rtxcur;
  418         } else {
  419                 rt = NULL;
  420                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
  421         }
  422 
  423         starttime = ticks;
  424 
  425 call_again:
  426         mtx_assert(&cs->cs_lock, MA_OWNED);
  427 
  428         cu->cu_xid++;
  429         xid = cu->cu_xid;
  430 
  431 send_again:
  432         mtx_unlock(&cs->cs_lock);
  433 
  434         MGETHDR(mreq, M_WAIT, MT_DATA);
  435         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
  436         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
  437         mreq->m_len = cu->cu_mcalllen;
  438 
  439         /*
  440          * The XID is the first thing in the request.
  441          */
  442         *mtod(mreq, uint32_t *) = htonl(xid);
  443 
  444         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
  445 
  446         if (cu->cu_async == TRUE && args == NULL)
  447                 goto get_reply;
  448 
  449         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
  450             (! AUTH_MARSHALL(auth, xid, &xdrs,
  451                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
  452                 errp->re_status = stat = RPC_CANTENCODEARGS;
  453                 mtx_lock(&cs->cs_lock);
  454                 goto out;
  455         }
  456         mreq->m_pkthdr.len = m_length(mreq, NULL);
  457 
  458         cr->cr_xid = xid;
  459         mtx_lock(&cs->cs_lock);
  460 
  461         /*
  462          * Try to get a place in the congestion window.
  463          */
  464         while (cu->cu_sent >= cu->cu_cwnd) {
  465                 cu->cu_cwnd_wait = TRUE;
  466                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
  467                     cu->cu_waitflag, "rpccwnd", 0);
  468                 if (error) {
  469                         errp->re_errno = error;
  470                         if (error == EINTR || error == ERESTART)
  471                                 errp->re_status = stat = RPC_INTR;
  472                         else
  473                                 errp->re_status = stat = RPC_CANTSEND;
  474                         goto out;
  475                 }
  476         }
  477         cu->cu_sent += CWNDSCALE;
  478 
  479         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  480         mtx_unlock(&cs->cs_lock);
  481 
  482         /*
  483          * sosend consumes mreq.
  484          */
  485         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
  486         mreq = NULL;
  487 
  488         /*
  489          * sub-optimal code appears here because we have
  490          * some clock time to spare while the packets are in flight.
  491          * (We assume that this is actually only executed once.)
  492          */
  493         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
  494         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
  495         reply_msg.acpted_rply.ar_verf.oa_length = 0;
  496         reply_msg.acpted_rply.ar_results.where = NULL;
  497         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
  498 
  499         mtx_lock(&cs->cs_lock);
  500         if (error) {
  501                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  502                 errp->re_errno = error;
  503                 errp->re_status = stat = RPC_CANTSEND;
  504                 cu->cu_sent -= CWNDSCALE;
  505                 if (cu->cu_cwnd_wait) {
  506                         cu->cu_cwnd_wait = FALSE;
  507                         wakeup(&cu->cu_cwnd_wait);
  508                 }
  509                 goto out;
  510         }
  511 
  512         /*
  513          * Check to see if we got an upcall while waiting for the
  514          * lock.
  515          */
  516         if (cr->cr_error) {
  517                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  518                 errp->re_errno = cr->cr_error;
  519                 errp->re_status = stat = RPC_CANTRECV;
  520                 cu->cu_sent -= CWNDSCALE;
  521                 if (cu->cu_cwnd_wait) {
  522                         cu->cu_cwnd_wait = FALSE;
  523                         wakeup(&cu->cu_cwnd_wait);
  524                 }
  525                 goto out;
  526         }
  527         if (cr->cr_mrep) {
  528                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  529                 cu->cu_sent -= CWNDSCALE;
  530                 if (cu->cu_cwnd_wait) {
  531                         cu->cu_cwnd_wait = FALSE;
  532                         wakeup(&cu->cu_cwnd_wait);
  533                 }
  534                 goto got_reply;
  535         }
  536 
  537         /*
  538          * Hack to provide rpc-based message passing
  539          */
  540         if (timeout == 0) {
  541                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  542                 errp->re_status = stat = RPC_TIMEDOUT;
  543                 cu->cu_sent -= CWNDSCALE;
  544                 if (cu->cu_cwnd_wait) {
  545                         cu->cu_cwnd_wait = FALSE;
  546                         wakeup(&cu->cu_cwnd_wait);
  547                 }
  548                 goto out;
  549         }
  550 
  551 get_reply:
  552         for (;;) {
  553                 /* Decide how long to wait. */
  554                 if (next_sendtime < timeout)
  555                         tv = next_sendtime;
  556                 else
  557                         tv = timeout;
  558                 tv -= time_waited;
  559 
  560                 if (tv > 0) {
  561                         if (cu->cu_closing || cu->cu_closed) {
  562                                 error = 0;
  563                                 cr->cr_error = ESHUTDOWN;
  564                         } else {
  565                                 error = msleep(cr, &cs->cs_lock,
  566                                     cu->cu_waitflag, cu->cu_waitchan, tv);
  567                         }
  568                 } else {
  569                         error = EWOULDBLOCK;
  570                 }
  571 
  572                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  573                 cu->cu_sent -= CWNDSCALE;
  574                 if (cu->cu_cwnd_wait) {
  575                         cu->cu_cwnd_wait = FALSE;
  576                         wakeup(&cu->cu_cwnd_wait);
  577                 }
  578 
  579                 if (!error) {
  580                         /*
  581                          * We were woken up by the upcall.  If the
  582                          * upcall had a receive error, report that,
  583                          * otherwise we have a reply.
  584                          */
  585                         if (cr->cr_error) {
  586                                 errp->re_errno = cr->cr_error;
  587                                 errp->re_status = stat = RPC_CANTRECV;
  588                                 goto out;
  589                         }
  590 
  591                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
  592                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
  593                         if (cu->cu_cwnd > MAXCWND)
  594                                 cu->cu_cwnd = MAXCWND;
  595 
  596                         if (rt) {
  597                                 /*
  598                                  * Add one to the time since a tick
  599                                  * count of N means that the actual
  600                                  * time taken was somewhere between N
  601                                  * and N+1.
  602                                  */
  603                                 rtt = ticks - starttime + 1;
  604 
  605                                 /*
  606                                  * Update our estimate of the round
  607                                  * trip time using roughly the
  608                                  * algorithm described in RFC
  609                                  * 2988. Given an RTT sample R:
  610                                  *
  611                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
  612                                  * SRTT = (1-alpha) * SRTT + alpha * R
  613                                  *
  614                                  * where alpha = 0.125 and beta = 0.25.
  615                                  *
  616                                  * The initial retransmit timeout is
  617                                  * SRTT + 4*RTTVAR and doubles on each
  618                                  * retransmision.
  619                                  */
  620                                 if (rt->rt_srtt == 0) {
  621                                         rt->rt_srtt = rtt;
  622                                         rt->rt_deviate = rtt / 2;
  623                                 } else {
  624                                         int32_t error = rtt - rt->rt_srtt;
  625                                         rt->rt_srtt += error / 8;
  626                                         error = abs(error) - rt->rt_deviate;
  627                                         rt->rt_deviate += error / 4;
  628                                 }
  629                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
  630                         }
  631 
  632                         break;
  633                 }
  634 
  635                 /*
  636                  * The sleep returned an error so our request is still
  637                  * on the list. If we got EWOULDBLOCK, we may want to
  638                  * re-send the request.
  639                  */
  640                 if (error != EWOULDBLOCK) {
  641                         errp->re_errno = error;
  642                         if (error == EINTR || error == ERESTART)
  643                                 errp->re_status = stat = RPC_INTR;
  644                         else
  645                                 errp->re_status = stat = RPC_CANTRECV;
  646                         goto out;
  647                 }
  648 
  649                 time_waited = ticks - starttime;
  650 
  651                 /* Check for timeout. */
  652                 if (time_waited > timeout) {
  653                         errp->re_errno = EWOULDBLOCK;
  654                         errp->re_status = stat = RPC_TIMEDOUT;
  655                         goto out;
  656                 }
  657 
  658                 /* Retransmit if necessary. */          
  659                 if (time_waited >= next_sendtime) {
  660                         cu->cu_cwnd /= 2;
  661                         if (cu->cu_cwnd < CWNDSCALE)
  662                                 cu->cu_cwnd = CWNDSCALE;
  663                         if (ext && ext->rc_feedback) {
  664                                 mtx_unlock(&cs->cs_lock);
  665                                 if (retrans == 0)
  666                                         ext->rc_feedback(FEEDBACK_REXMIT1,
  667                                             proc, ext->rc_feedback_arg);
  668                                 else
  669                                         ext->rc_feedback(FEEDBACK_REXMIT2,
  670                                             proc, ext->rc_feedback_arg);
  671                                 mtx_lock(&cs->cs_lock);
  672                         }
  673                         if (cu->cu_closing || cu->cu_closed) {
  674                                 errp->re_errno = ESHUTDOWN;
  675                                 errp->re_status = stat = RPC_CANTRECV;
  676                                 goto out;
  677                         }
  678                         retrans++;
  679                         /* update retransmit_time */
  680                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
  681                                 retransmit_time = 2 * retransmit_time;
  682                         next_sendtime += retransmit_time;
  683                         goto send_again;
  684                 }
  685                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  686         }
  687 
  688 got_reply:
  689         /*
  690          * Now decode and validate the response. We need to drop the
  691          * lock since xdr_replymsg may end up sleeping in malloc.
  692          */
  693         mtx_unlock(&cs->cs_lock);
  694 
  695         if (ext && ext->rc_feedback)
  696                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
  697 
  698         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
  699         ok = xdr_replymsg(&xdrs, &reply_msg);
  700         cr->cr_mrep = NULL;
  701 
  702         if (ok) {
  703                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
  704                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
  705                         errp->re_status = stat = RPC_SUCCESS;
  706                 else
  707                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
  708 
  709                 if (errp->re_status == RPC_SUCCESS) {
  710                         results = xdrmbuf_getall(&xdrs);
  711                         if (! AUTH_VALIDATE(auth, xid,
  712                                 &reply_msg.acpted_rply.ar_verf,
  713                                 &results)) {
  714                                 errp->re_status = stat = RPC_AUTHERROR;
  715                                 errp->re_why = AUTH_INVALIDRESP;
  716                                 if (retrans &&
  717                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
  718                                         /*
  719                                          * If we retransmitted, its
  720                                          * possible that we will
  721                                          * receive a reply for one of
  722                                          * the earlier transmissions
  723                                          * (which will use an older
  724                                          * RPCSEC_GSS sequence
  725                                          * number). In this case, just
  726                                          * go back and listen for a
  727                                          * new reply. We could keep a
  728                                          * record of all the seq
  729                                          * numbers we have transmitted
  730                                          * so far so that we could
  731                                          * accept a reply for any of
  732                                          * them here.
  733                                          */
  734                                         XDR_DESTROY(&xdrs);
  735                                         mtx_lock(&cs->cs_lock);
  736                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
  737                                             cr, cr_link);
  738                                         cr->cr_mrep = NULL;
  739                                         goto get_reply;
  740                                 }
  741                         } else {
  742                                 *resultsp = results;
  743                         }
  744                 }               /* end successful completion */
  745                 /*
  746                  * If unsuccesful AND error is an authentication error
  747                  * then refresh credentials and try again, else break
  748                  */
  749                 else if (stat == RPC_AUTHERROR)
  750                         /* maybe our credentials need to be refreshed ... */
  751                         if (nrefreshes > 0 &&
  752                             AUTH_REFRESH(auth, &reply_msg)) {
  753                                 nrefreshes--;
  754                                 XDR_DESTROY(&xdrs);
  755                                 mtx_lock(&cs->cs_lock);
  756                                 goto call_again;
  757                         }
  758                 /* end of unsuccessful completion */
  759         }       /* end of valid reply message */
  760         else {
  761                 errp->re_status = stat = RPC_CANTDECODERES;
  762 
  763         }
  764         XDR_DESTROY(&xdrs);
  765         mtx_lock(&cs->cs_lock);
  766 out:
  767         mtx_assert(&cs->cs_lock, MA_OWNED);
  768 
  769         if (mreq)
  770                 m_freem(mreq);
  771         if (cr->cr_mrep)
  772                 m_freem(cr->cr_mrep);
  773 
  774         cu->cu_threads--;
  775         if (cu->cu_closing)
  776                 wakeup(cu);
  777                 
  778         mtx_unlock(&cs->cs_lock);
  779 
  780         if (auth && stat != RPC_SUCCESS)
  781                 AUTH_VALIDATE(auth, xid, NULL, NULL);
  782 
  783         free(cr, M_RPC);
  784 
  785         return (stat);
  786 }
  787 
  788 static void
  789 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
  790 {
  791         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  792 
  793         *errp = cu->cu_error;
  794 }
  795 
  796 static bool_t
  797 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
  798 {
  799         XDR xdrs;
  800         bool_t dummy;
  801 
  802         xdrs.x_op = XDR_FREE;
  803         dummy = (*xdr_res)(&xdrs, res_ptr);
  804 
  805         return (dummy);
  806 }
  807 
  808 /*ARGSUSED*/
  809 static void
  810 clnt_dg_abort(CLIENT *h)
  811 {
  812 }
  813 
  814 static bool_t
  815 clnt_dg_control(CLIENT *cl, u_int request, void *info)
  816 {
  817         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  818         struct cu_socket *cs;
  819         struct sockaddr *addr;
  820 
  821         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  822         mtx_lock(&cs->cs_lock);
  823 
  824         switch (request) {
  825         case CLSET_FD_CLOSE:
  826                 cu->cu_closeit = TRUE;
  827                 mtx_unlock(&cs->cs_lock);
  828                 return (TRUE);
  829         case CLSET_FD_NCLOSE:
  830                 cu->cu_closeit = FALSE;
  831                 mtx_unlock(&cs->cs_lock);
  832                 return (TRUE);
  833         }
  834 
  835         /* for other requests which use info */
  836         if (info == NULL) {
  837                 mtx_unlock(&cs->cs_lock);
  838                 return (FALSE);
  839         }
  840         switch (request) {
  841         case CLSET_TIMEOUT:
  842                 if (time_not_ok((struct timeval *)info)) {
  843                         mtx_unlock(&cs->cs_lock);
  844                         return (FALSE);
  845                 }
  846                 cu->cu_total = *(struct timeval *)info;
  847                 break;
  848         case CLGET_TIMEOUT:
  849                 *(struct timeval *)info = cu->cu_total;
  850                 break;
  851         case CLSET_RETRY_TIMEOUT:
  852                 if (time_not_ok((struct timeval *)info)) {
  853                         mtx_unlock(&cs->cs_lock);
  854                         return (FALSE);
  855                 }
  856                 cu->cu_wait = *(struct timeval *)info;
  857                 break;
  858         case CLGET_RETRY_TIMEOUT:
  859                 *(struct timeval *)info = cu->cu_wait;
  860                 break;
  861         case CLGET_SVC_ADDR:
  862                 /*
  863                  * Slightly different semantics to userland - we use
  864                  * sockaddr instead of netbuf.
  865                  */
  866                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
  867                 break;
  868         case CLSET_SVC_ADDR:            /* set to new address */
  869                 addr = (struct sockaddr *)info;
  870                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
  871                 break;
  872         case CLGET_XID:
  873                 *(uint32_t *)info = cu->cu_xid;
  874                 break;
  875 
  876         case CLSET_XID:
  877                 /* This will set the xid of the NEXT call */
  878                 /* decrement by 1 as clnt_dg_call() increments once */
  879                 cu->cu_xid = *(uint32_t *)info - 1;
  880                 break;
  881 
  882         case CLGET_VERS:
  883                 /*
  884                  * This RELIES on the information that, in the call body,
  885                  * the version number field is the fifth field from the
  886                  * begining of the RPC header. MUST be changed if the
  887                  * call_struct is changed
  888                  */
  889                 *(uint32_t *)info =
  890                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  891                     4 * BYTES_PER_XDR_UNIT));
  892                 break;
  893 
  894         case CLSET_VERS:
  895                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
  896                         = htonl(*(uint32_t *)info);
  897                 break;
  898 
  899         case CLGET_PROG:
  900                 /*
  901                  * This RELIES on the information that, in the call body,
  902                  * the program number field is the fourth field from the
  903                  * begining of the RPC header. MUST be changed if the
  904                  * call_struct is changed
  905                  */
  906                 *(uint32_t *)info =
  907                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  908                     3 * BYTES_PER_XDR_UNIT));
  909                 break;
  910 
  911         case CLSET_PROG:
  912                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
  913                         = htonl(*(uint32_t *)info);
  914                 break;
  915         case CLSET_ASYNC:
  916                 cu->cu_async = *(int *)info;
  917                 break;
  918         case CLSET_CONNECT:
  919                 cu->cu_connect = *(int *)info;
  920                 break;
  921         case CLSET_WAITCHAN:
  922                 cu->cu_waitchan = (const char *)info;
  923                 break;
  924         case CLGET_WAITCHAN:
  925                 *(const char **) info = cu->cu_waitchan;
  926                 break;
  927         case CLSET_INTERRUPTIBLE:
  928                 if (*(int *) info)
  929                         cu->cu_waitflag = PCATCH;
  930                 else
  931                         cu->cu_waitflag = 0;
  932                 break;
  933         case CLGET_INTERRUPTIBLE:
  934                 if (cu->cu_waitflag)
  935                         *(int *) info = TRUE;
  936                 else
  937                         *(int *) info = FALSE;
  938                 break;
  939         default:
  940                 mtx_unlock(&cs->cs_lock);
  941                 return (FALSE);
  942         }
  943         mtx_unlock(&cs->cs_lock);
  944         return (TRUE);
  945 }
  946 
  947 static void
  948 clnt_dg_close(CLIENT *cl)
  949 {
  950         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  951         struct cu_socket *cs;
  952         struct cu_request *cr;
  953 
  954         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  955         mtx_lock(&cs->cs_lock);
  956 
  957         if (cu->cu_closed) {
  958                 mtx_unlock(&cs->cs_lock);
  959                 return;
  960         }
  961 
  962         if (cu->cu_closing) {
  963                 while (cu->cu_closing)
  964                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  965                 KASSERT(cu->cu_closed, ("client should be closed"));
  966                 mtx_unlock(&cs->cs_lock);
  967                 return;
  968         }
  969 
  970         /*
  971          * Abort any pending requests and wait until everyone
  972          * has finished with clnt_vc_call.
  973          */
  974         cu->cu_closing = TRUE;
  975         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
  976                 if (cr->cr_client == cl) {
  977                         cr->cr_xid = 0;
  978                         cr->cr_error = ESHUTDOWN;
  979                         wakeup(cr);
  980                 }
  981         }
  982 
  983         while (cu->cu_threads)
  984                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  985 
  986         cu->cu_closing = FALSE;
  987         cu->cu_closed = TRUE;
  988 
  989         mtx_unlock(&cs->cs_lock);
  990         wakeup(cu);
  991 }
  992 
  993 static void
  994 clnt_dg_destroy(CLIENT *cl)
  995 {
  996         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  997         struct cu_socket *cs;
  998         struct socket *so = NULL;
  999         bool_t lastsocketref;
 1000 
 1001         cs = cu->cu_socket->so_rcv.sb_upcallarg;
 1002         clnt_dg_close(cl);
 1003 
 1004         SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
 1005         mtx_lock(&cs->cs_lock);
 1006 
 1007         cs->cs_refs--;
 1008         if (cs->cs_refs == 0) {
 1009                 mtx_unlock(&cs->cs_lock);
 1010                 soupcall_clear(cu->cu_socket, SO_RCV);
 1011                 clnt_dg_upcallsdone(cu->cu_socket, cs);
 1012                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
 1013                 mtx_destroy(&cs->cs_lock);
 1014                 mem_free(cs, sizeof(*cs));
 1015                 lastsocketref = TRUE;
 1016         } else {
 1017                 mtx_unlock(&cs->cs_lock);
 1018                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
 1019                 lastsocketref = FALSE;
 1020         }
 1021 
 1022         if (cu->cu_closeit && lastsocketref) {
 1023                 so = cu->cu_socket;
 1024                 cu->cu_socket = NULL;
 1025         }
 1026 
 1027         if (so)
 1028                 soclose(so);
 1029 
 1030         if (cl->cl_netid && cl->cl_netid[0])
 1031                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
 1032         if (cl->cl_tp && cl->cl_tp[0])
 1033                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
 1034         mem_free(cu, sizeof (*cu));
 1035         mem_free(cl, sizeof (CLIENT));
 1036 }
 1037 
 1038 /*
 1039  * Make sure that the time is not garbage.  -1 value is allowed.
 1040  */
 1041 static bool_t
 1042 time_not_ok(struct timeval *t)
 1043 {
 1044         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
 1045                 t->tv_usec < -1 || t->tv_usec > 1000000);
 1046 }
 1047 
 1048 int
 1049 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
 1050 {
 1051         struct cu_socket *cs = (struct cu_socket *) arg;
 1052         struct uio uio;
 1053         struct mbuf *m;
 1054         struct mbuf *control;
 1055         struct cu_request *cr;
 1056         int error, rcvflag, foundreq;
 1057         uint32_t xid;
 1058 
 1059         cs->cs_upcallrefs++;
 1060         uio.uio_resid = 1000000000;
 1061         uio.uio_td = curthread;
 1062         do {
 1063                 SOCKBUF_UNLOCK(&so->so_rcv);
 1064                 m = NULL;
 1065                 control = NULL;
 1066                 rcvflag = MSG_DONTWAIT;
 1067                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
 1068                 if (control)
 1069                         m_freem(control);
 1070                 SOCKBUF_LOCK(&so->so_rcv);
 1071 
 1072                 if (error == EWOULDBLOCK)
 1073                         break;
 1074 
 1075                 /*
 1076                  * If there was an error, wake up all pending
 1077                  * requests.
 1078                  */
 1079                 if (error) {
 1080                         mtx_lock(&cs->cs_lock);
 1081                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1082                                 cr->cr_xid = 0;
 1083                                 cr->cr_error = error;
 1084                                 wakeup(cr);
 1085                         }
 1086                         mtx_unlock(&cs->cs_lock);
 1087                         break;
 1088                 }
 1089 
 1090                 /*
 1091                  * The XID is in the first uint32_t of the reply.
 1092                  */
 1093                 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid)) {
 1094                         /*
 1095                          * Should never happen.
 1096                          */
 1097                         m_freem(m);
 1098                         continue;
 1099                 }
 1100 
 1101                 m_copydata(m, 0, sizeof(xid), (char *)&xid);
 1102                 xid = ntohl(xid);
 1103 
 1104                 /*
 1105                  * Attempt to match this reply with a pending request.
 1106                  */
 1107                 mtx_lock(&cs->cs_lock);
 1108                 foundreq = 0;
 1109                 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1110                         if (cr->cr_xid == xid) {
 1111                                 /*
 1112                                  * This one matches. We leave the
 1113                                  * reply mbuf in cr->cr_mrep. Set the
 1114                                  * XID to zero so that we will ignore
 1115                                  * any duplicated replies that arrive
 1116                                  * before clnt_dg_call removes it from
 1117                                  * the queue.
 1118                                  */
 1119                                 cr->cr_xid = 0;
 1120                                 cr->cr_mrep = m;
 1121                                 cr->cr_error = 0;
 1122                                 foundreq = 1;
 1123                                 wakeup(cr);
 1124                                 break;
 1125                         }
 1126                 }
 1127                 mtx_unlock(&cs->cs_lock);
 1128 
 1129                 /*
 1130                  * If we didn't find the matching request, just drop
 1131                  * it - its probably a repeated reply.
 1132                  */
 1133                 if (!foundreq)
 1134                         m_freem(m);
 1135         } while (m);
 1136         cs->cs_upcallrefs--;
 1137         if (cs->cs_upcallrefs < 0)
 1138                 panic("rpcdg upcall refcnt");
 1139         if (cs->cs_upcallrefs == 0)
 1140                 wakeup(&cs->cs_upcallrefs);
 1141         return (SU_OK);
 1142 }
 1143 
 1144 /*
 1145  * Wait for all upcalls in progress to complete.
 1146  */
 1147 static void
 1148 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
 1149 {
 1150 
 1151         SOCKBUF_LOCK_ASSERT(&so->so_rcv);
 1152 
 1153         while (cs->cs_upcallrefs > 0)
 1154                 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
 1155                     "rpcdgup", 0);
 1156 }

Cache object: 01ba6bbd4447398514594563f58ba048


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.