The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_dg.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
    2 
    3 /*
    4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
    5  * unrestricted use provided that this legend is included on all tape
    6  * media and as a part of the software program in whole or part.  Users
    7  * may copy or modify Sun RPC without charge, but are not authorized
    8  * to license or distribute it to anyone else except as part of a product or
    9  * program developed by the user.
   10  * 
   11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
   12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
   13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
   14  * 
   15  * Sun RPC is provided with no support and without any obligation on the
   16  * part of Sun Microsystems, Inc. to assist in its use, correction,
   17  * modification or enhancement.
   18  * 
   19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
   20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
   21  * OR ANY PART THEREOF.
   22  * 
   23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
   24  * or profits or other special, indirect and consequential damages, even if
   25  * Sun has been advised of the possibility of such damages.
   26  * 
   27  * Sun Microsystems, Inc.
   28  * 2550 Garcia Avenue
   29  * Mountain View, California  94043
   30  */
   31 /*
   32  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
   33  */
   34 
   35 #if defined(LIBC_SCCS) && !defined(lint)
   36 #ident  "@(#)clnt_dg.c  1.23    94/04/22 SMI"
   37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
   38 #endif
   39 #include <sys/cdefs.h>
   40 __FBSDID("$FreeBSD: releng/8.2/sys/rpc/clnt_dg.c 217617 2011-01-20 00:54:12Z rmacklem $");
   41 
   42 /*
   43  * Implements a connectionless client side RPC.
   44  */
   45 
   46 #include <sys/param.h>
   47 #include <sys/systm.h>
   48 #include <sys/kernel.h>
   49 #include <sys/lock.h>
   50 #include <sys/malloc.h>
   51 #include <sys/mbuf.h>
   52 #include <sys/mutex.h>
   53 #include <sys/pcpu.h>
   54 #include <sys/proc.h>
   55 #include <sys/socket.h>
   56 #include <sys/socketvar.h>
   57 #include <sys/time.h>
   58 #include <sys/uio.h>
   59 
   60 #include <net/vnet.h>
   61 
   62 #include <rpc/rpc.h>
   63 #include <rpc/rpc_com.h>
   64 
   65 
   66 #ifdef _FREEFALL_CONFIG
   67 /*
   68  * Disable RPC exponential back-off for FreeBSD.org systems.
   69  */
   70 #define RPC_MAX_BACKOFF         1 /* second */
   71 #else
   72 #define RPC_MAX_BACKOFF         30 /* seconds */
   73 #endif
   74 
   75 static bool_t time_not_ok(struct timeval *);
   76 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
   77     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
   78 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
   79 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
   80 static void clnt_dg_abort(CLIENT *);
   81 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
   82 static void clnt_dg_close(CLIENT *);
   83 static void clnt_dg_destroy(CLIENT *);
   84 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
   85 
   86 static struct clnt_ops clnt_dg_ops = {
   87         .cl_call =      clnt_dg_call,
   88         .cl_abort =     clnt_dg_abort,
   89         .cl_geterr =    clnt_dg_geterr,
   90         .cl_freeres =   clnt_dg_freeres,
   91         .cl_close =     clnt_dg_close,
   92         .cl_destroy =   clnt_dg_destroy,
   93         .cl_control =   clnt_dg_control
   94 };
   95 
   96 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
   97 
   98 /*
   99  * A pending RPC request which awaits a reply. Requests which have
  100  * received their reply will have cr_xid set to zero and cr_mrep to
  101  * the mbuf chain of the reply.
  102  */
  103 struct cu_request {
  104         TAILQ_ENTRY(cu_request) cr_link;
  105         CLIENT                  *cr_client;     /* owner */
  106         uint32_t                cr_xid;         /* XID of request */
  107         struct mbuf             *cr_mrep;       /* reply received by upcall */
  108         int                     cr_error;       /* any error from upcall */
  109         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
  110 };
  111 
  112 TAILQ_HEAD(cu_request_list, cu_request);
  113 
  114 #define MCALL_MSG_SIZE 24
  115 
  116 /*
  117  * This structure is pointed to by the socket buffer's sb_upcallarg
  118  * member. It is separate from the client private data to facilitate
  119  * multiple clients sharing the same socket. The cs_lock mutex is used
  120  * to protect all fields of this structure, the socket's receive
  121  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
  122  * structures is installed on the socket.
  123  */
  124 struct cu_socket {
  125         struct mtx              cs_lock;
  126         int                     cs_refs;        /* Count of clients */
  127         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
  128         int                     cs_upcallrefs;  /* Refcnt of upcalls in prog.*/
  129 };
  130 
  131 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
  132 
  133 /*
  134  * Private data kept per client handle
  135  */
  136 struct cu_data {
  137         int                     cu_threads;     /* # threads in clnt_vc_call */
  138         bool_t                  cu_closing;     /* TRUE if we are closing */
  139         bool_t                  cu_closed;      /* TRUE if we are closed */
  140         struct socket           *cu_socket;     /* connection socket */
  141         bool_t                  cu_closeit;     /* opened by library */
  142         struct sockaddr_storage cu_raddr;       /* remote address */
  143         int                     cu_rlen;
  144         struct timeval          cu_wait;        /* retransmit interval */
  145         struct timeval          cu_total;       /* total time for the call */
  146         struct rpc_err          cu_error;
  147         uint32_t                cu_xid;
  148         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
  149         size_t                  cu_mcalllen;
  150         size_t                  cu_sendsz;      /* send size */
  151         size_t                  cu_recvsz;      /* recv size */
  152         int                     cu_async;
  153         int                     cu_connect;     /* Use connect(). */
  154         int                     cu_connected;   /* Have done connect(). */
  155         const char              *cu_waitchan;
  156         int                     cu_waitflag;
  157         int                     cu_cwnd;        /* congestion window */
  158         int                     cu_sent;        /* number of in-flight RPCs */
  159         bool_t                  cu_cwnd_wait;
  160 };
  161 
  162 #define CWNDSCALE       256
  163 #define MAXCWND         (32 * CWNDSCALE)
  164 
  165 /*
  166  * Connection less client creation returns with client handle parameters.
  167  * Default options are set, which the user can change using clnt_control().
  168  * fd should be open and bound.
  169  * NB: The rpch->cl_auth is initialized to null authentication.
  170  *      Caller may wish to set this something more useful.
  171  *
  172  * sendsz and recvsz are the maximum allowable packet sizes that can be
  173  * sent and received. Normally they are the same, but they can be
  174  * changed to improve the program efficiency and buffer allocation.
  175  * If they are 0, use the transport default.
  176  *
  177  * If svcaddr is NULL, returns NULL.
  178  */
  179 CLIENT *
  180 clnt_dg_create(
  181         struct socket *so,
  182         struct sockaddr *svcaddr,       /* servers address */
  183         rpcprog_t program,              /* program number */
  184         rpcvers_t version,              /* version number */
  185         size_t sendsz,                  /* buffer recv size */
  186         size_t recvsz)                  /* buffer send size */
  187 {
  188         CLIENT *cl = NULL;              /* client handle */
  189         struct cu_data *cu = NULL;      /* private data */
  190         struct cu_socket *cs = NULL;
  191         struct sockbuf *sb;
  192         struct timeval now;
  193         struct rpc_msg call_msg;
  194         struct __rpc_sockinfo si;
  195         XDR xdrs;
  196         int error;
  197 
  198         if (svcaddr == NULL) {
  199                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
  200                 return (NULL);
  201         }
  202 
  203         CURVNET_SET(so->so_vnet);
  204         if (!__rpc_socket2sockinfo(so, &si)) {
  205                 rpc_createerr.cf_stat = RPC_TLIERROR;
  206                 rpc_createerr.cf_error.re_errno = 0;
  207                 CURVNET_RESTORE();
  208                 return (NULL);
  209         }
  210         CURVNET_RESTORE();
  211 
  212         /*
  213          * Find the receive and the send size
  214          */
  215         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
  216         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
  217         if ((sendsz == 0) || (recvsz == 0)) {
  218                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
  219                 rpc_createerr.cf_error.re_errno = 0;
  220                 return (NULL);
  221         }
  222 
  223         cl = mem_alloc(sizeof (CLIENT));
  224 
  225         /*
  226          * Should be multiple of 4 for XDR.
  227          */
  228         sendsz = ((sendsz + 3) / 4) * 4;
  229         recvsz = ((recvsz + 3) / 4) * 4;
  230         cu = mem_alloc(sizeof (*cu));
  231         cu->cu_threads = 0;
  232         cu->cu_closing = FALSE;
  233         cu->cu_closed = FALSE;
  234         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
  235         cu->cu_rlen = svcaddr->sa_len;
  236         /* Other values can also be set through clnt_control() */
  237         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
  238         cu->cu_wait.tv_usec = 0;
  239         cu->cu_total.tv_sec = -1;
  240         cu->cu_total.tv_usec = -1;
  241         cu->cu_sendsz = sendsz;
  242         cu->cu_recvsz = recvsz;
  243         cu->cu_async = FALSE;
  244         cu->cu_connect = FALSE;
  245         cu->cu_connected = FALSE;
  246         cu->cu_waitchan = "rpcrecv";
  247         cu->cu_waitflag = 0;
  248         cu->cu_cwnd = MAXCWND / 2;
  249         cu->cu_sent = 0;
  250         cu->cu_cwnd_wait = FALSE;
  251         (void) getmicrotime(&now);
  252         cu->cu_xid = __RPC_GETXID(&now);
  253         call_msg.rm_xid = cu->cu_xid;
  254         call_msg.rm_call.cb_prog = program;
  255         call_msg.rm_call.cb_vers = version;
  256         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
  257         if (! xdr_callhdr(&xdrs, &call_msg)) {
  258                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
  259                 rpc_createerr.cf_error.re_errno = 0;
  260                 goto err2;
  261         }
  262         cu->cu_mcalllen = XDR_GETPOS(&xdrs);
  263 
  264         /*
  265          * By default, closeit is always FALSE. It is users responsibility
  266          * to do a close on it, else the user may use clnt_control
  267          * to let clnt_destroy do it for him/her.
  268          */
  269         cu->cu_closeit = FALSE;
  270         cu->cu_socket = so;
  271         error = soreserve(so, (u_long)sendsz, (u_long)recvsz);
  272         if (error != 0) {
  273                 rpc_createerr.cf_stat = RPC_FAILED;
  274                 rpc_createerr.cf_error.re_errno = error;
  275                 goto err2;
  276         }
  277 
  278         sb = &so->so_rcv;
  279         SOCKBUF_LOCK(&so->so_rcv);
  280 recheck_socket:
  281         if (sb->sb_upcall) {
  282                 if (sb->sb_upcall != clnt_dg_soupcall) {
  283                         SOCKBUF_UNLOCK(&so->so_rcv);
  284                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
  285                         goto err2;
  286                 }
  287                 cs = (struct cu_socket *) sb->sb_upcallarg;
  288                 mtx_lock(&cs->cs_lock);
  289                 cs->cs_refs++;
  290                 mtx_unlock(&cs->cs_lock);
  291         } else {
  292                 /*
  293                  * We are the first on this socket - allocate the
  294                  * structure and install it in the socket.
  295                  */
  296                 SOCKBUF_UNLOCK(&so->so_rcv);
  297                 cs = mem_alloc(sizeof(*cs));
  298                 SOCKBUF_LOCK(&so->so_rcv);
  299                 if (sb->sb_upcall) {
  300                         /*
  301                          * We have lost a race with some other client.
  302                          */
  303                         mem_free(cs, sizeof(*cs));
  304                         goto recheck_socket;
  305                 }
  306                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
  307                 cs->cs_refs = 1;
  308                 cs->cs_upcallrefs = 0;
  309                 TAILQ_INIT(&cs->cs_pending);
  310                 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
  311         }
  312         SOCKBUF_UNLOCK(&so->so_rcv);
  313 
  314         cl->cl_refs = 1;
  315         cl->cl_ops = &clnt_dg_ops;
  316         cl->cl_private = (caddr_t)(void *)cu;
  317         cl->cl_auth = authnone_create();
  318         cl->cl_tp = NULL;
  319         cl->cl_netid = NULL;
  320         return (cl);
  321 err2:
  322         if (cl) {
  323                 mem_free(cl, sizeof (CLIENT));
  324                 if (cu)
  325                         mem_free(cu, sizeof (*cu));
  326         }
  327         return (NULL);
  328 }
  329 
  330 static enum clnt_stat
  331 clnt_dg_call(
  332         CLIENT          *cl,            /* client handle */
  333         struct rpc_callextra *ext,      /* call metadata */
  334         rpcproc_t       proc,           /* procedure number */
  335         struct mbuf     *args,          /* pointer to args */
  336         struct mbuf     **resultsp,     /* pointer to results */
  337         struct timeval  utimeout)       /* seconds to wait before giving up */
  338 {
  339         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  340         struct cu_socket *cs;
  341         struct rpc_timers *rt;
  342         AUTH *auth;
  343         struct rpc_err *errp;
  344         enum clnt_stat stat;
  345         XDR xdrs;
  346         struct rpc_msg reply_msg;
  347         bool_t ok;
  348         int retrans;                    /* number of re-transmits so far */
  349         int nrefreshes = 2;             /* number of times to refresh cred */
  350         struct timeval *tvp;
  351         int timeout;
  352         int retransmit_time;
  353         int next_sendtime, starttime, rtt, time_waited, tv = 0;
  354         struct sockaddr *sa;
  355         socklen_t salen;
  356         uint32_t xid = 0;
  357         struct mbuf *mreq = NULL, *results;
  358         struct cu_request *cr;
  359         int error;
  360 
  361         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  362         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
  363 
  364         mtx_lock(&cs->cs_lock);
  365 
  366         if (cu->cu_closing || cu->cu_closed) {
  367                 mtx_unlock(&cs->cs_lock);
  368                 free(cr, M_RPC);
  369                 return (RPC_CANTSEND);
  370         }
  371         cu->cu_threads++;
  372 
  373         if (ext) {
  374                 auth = ext->rc_auth;
  375                 errp = &ext->rc_err;
  376         } else {
  377                 auth = cl->cl_auth;
  378                 errp = &cu->cu_error;
  379         }
  380 
  381         cr->cr_client = cl;
  382         cr->cr_mrep = NULL;
  383         cr->cr_error = 0;
  384 
  385         if (cu->cu_total.tv_usec == -1) {
  386                 tvp = &utimeout; /* use supplied timeout */
  387         } else {
  388                 tvp = &cu->cu_total; /* use default timeout */
  389         }
  390         if (tvp->tv_sec || tvp->tv_usec)
  391                 timeout = tvtohz(tvp);
  392         else
  393                 timeout = 0;
  394 
  395         if (cu->cu_connect && !cu->cu_connected) {
  396                 mtx_unlock(&cs->cs_lock);
  397                 error = soconnect(cu->cu_socket,
  398                     (struct sockaddr *)&cu->cu_raddr, curthread);
  399                 mtx_lock(&cs->cs_lock);
  400                 if (error) {
  401                         errp->re_errno = error;
  402                         errp->re_status = stat = RPC_CANTSEND;
  403                         goto out;
  404                 }
  405                 cu->cu_connected = 1;
  406         }
  407         if (cu->cu_connected) {
  408                 sa = NULL;
  409                 salen = 0;
  410         } else {
  411                 sa = (struct sockaddr *)&cu->cu_raddr;
  412                 salen = cu->cu_rlen;
  413         }
  414         time_waited = 0;
  415         retrans = 0;
  416         if (ext && ext->rc_timers) {
  417                 rt = ext->rc_timers;
  418                 if (!rt->rt_rtxcur)
  419                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
  420                 retransmit_time = next_sendtime = rt->rt_rtxcur;
  421         } else {
  422                 rt = NULL;
  423                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
  424         }
  425 
  426         starttime = ticks;
  427 
  428 call_again:
  429         mtx_assert(&cs->cs_lock, MA_OWNED);
  430 
  431         cu->cu_xid++;
  432         xid = cu->cu_xid;
  433 
  434 send_again:
  435         mtx_unlock(&cs->cs_lock);
  436 
  437         MGETHDR(mreq, M_WAIT, MT_DATA);
  438         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
  439         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
  440         mreq->m_len = cu->cu_mcalllen;
  441 
  442         /*
  443          * The XID is the first thing in the request.
  444          */
  445         *mtod(mreq, uint32_t *) = htonl(xid);
  446 
  447         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
  448 
  449         if (cu->cu_async == TRUE && args == NULL)
  450                 goto get_reply;
  451 
  452         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
  453             (! AUTH_MARSHALL(auth, xid, &xdrs,
  454                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
  455                 errp->re_status = stat = RPC_CANTENCODEARGS;
  456                 mtx_lock(&cs->cs_lock);
  457                 goto out;
  458         }
  459         mreq->m_pkthdr.len = m_length(mreq, NULL);
  460 
  461         cr->cr_xid = xid;
  462         mtx_lock(&cs->cs_lock);
  463 
  464         /*
  465          * Try to get a place in the congestion window.
  466          */
  467         while (cu->cu_sent >= cu->cu_cwnd) {
  468                 cu->cu_cwnd_wait = TRUE;
  469                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
  470                     cu->cu_waitflag, "rpccwnd", 0);
  471                 if (error) {
  472                         errp->re_errno = error;
  473                         errp->re_status = stat = RPC_CANTSEND;
  474                         goto out;
  475                 }
  476         }
  477         cu->cu_sent += CWNDSCALE;
  478 
  479         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  480         mtx_unlock(&cs->cs_lock);
  481 
  482         /*
  483          * sosend consumes mreq.
  484          */
  485         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
  486         mreq = NULL;
  487 
  488         /*
  489          * sub-optimal code appears here because we have
  490          * some clock time to spare while the packets are in flight.
  491          * (We assume that this is actually only executed once.)
  492          */
  493         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
  494         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
  495         reply_msg.acpted_rply.ar_verf.oa_length = 0;
  496         reply_msg.acpted_rply.ar_results.where = NULL;
  497         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
  498 
  499         mtx_lock(&cs->cs_lock);
  500         if (error) {
  501                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  502                 errp->re_errno = error;
  503                 errp->re_status = stat = RPC_CANTSEND;
  504                 cu->cu_sent -= CWNDSCALE;
  505                 if (cu->cu_cwnd_wait) {
  506                         cu->cu_cwnd_wait = FALSE;
  507                         wakeup(&cu->cu_cwnd_wait);
  508                 }
  509                 goto out;
  510         }
  511 
  512         /*
  513          * Check to see if we got an upcall while waiting for the
  514          * lock.
  515          */
  516         if (cr->cr_error) {
  517                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  518                 errp->re_errno = cr->cr_error;
  519                 errp->re_status = stat = RPC_CANTRECV;
  520                 cu->cu_sent -= CWNDSCALE;
  521                 if (cu->cu_cwnd_wait) {
  522                         cu->cu_cwnd_wait = FALSE;
  523                         wakeup(&cu->cu_cwnd_wait);
  524                 }
  525                 goto out;
  526         }
  527         if (cr->cr_mrep) {
  528                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  529                 cu->cu_sent -= CWNDSCALE;
  530                 if (cu->cu_cwnd_wait) {
  531                         cu->cu_cwnd_wait = FALSE;
  532                         wakeup(&cu->cu_cwnd_wait);
  533                 }
  534                 goto got_reply;
  535         }
  536 
  537         /*
  538          * Hack to provide rpc-based message passing
  539          */
  540         if (timeout == 0) {
  541                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  542                 errp->re_status = stat = RPC_TIMEDOUT;
  543                 cu->cu_sent -= CWNDSCALE;
  544                 if (cu->cu_cwnd_wait) {
  545                         cu->cu_cwnd_wait = FALSE;
  546                         wakeup(&cu->cu_cwnd_wait);
  547                 }
  548                 goto out;
  549         }
  550 
  551 get_reply:
  552         for (;;) {
  553                 /* Decide how long to wait. */
  554                 if (next_sendtime < timeout)
  555                         tv = next_sendtime;
  556                 else
  557                         tv = timeout;
  558                 tv -= time_waited;
  559 
  560                 if (tv > 0) {
  561                         if (cu->cu_closing || cu->cu_closed) {
  562                                 error = 0;
  563                                 cr->cr_error = ESHUTDOWN;
  564                         } else {
  565                                 error = msleep(cr, &cs->cs_lock,
  566                                     cu->cu_waitflag, cu->cu_waitchan, tv);
  567                         }
  568                 } else {
  569                         error = EWOULDBLOCK;
  570                 }
  571 
  572                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  573                 cu->cu_sent -= CWNDSCALE;
  574                 if (cu->cu_cwnd_wait) {
  575                         cu->cu_cwnd_wait = FALSE;
  576                         wakeup(&cu->cu_cwnd_wait);
  577                 }
  578 
  579                 if (!error) {
  580                         /*
  581                          * We were woken up by the upcall.  If the
  582                          * upcall had a receive error, report that,
  583                          * otherwise we have a reply.
  584                          */
  585                         if (cr->cr_error) {
  586                                 errp->re_errno = cr->cr_error;
  587                                 errp->re_status = stat = RPC_CANTRECV;
  588                                 goto out;
  589                         }
  590 
  591                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
  592                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
  593                         if (cu->cu_cwnd > MAXCWND)
  594                                 cu->cu_cwnd = MAXCWND;
  595 
  596                         if (rt) {
  597                                 /*
  598                                  * Add one to the time since a tick
  599                                  * count of N means that the actual
  600                                  * time taken was somewhere between N
  601                                  * and N+1.
  602                                  */
  603                                 rtt = ticks - starttime + 1;
  604 
  605                                 /*
  606                                  * Update our estimate of the round
  607                                  * trip time using roughly the
  608                                  * algorithm described in RFC
  609                                  * 2988. Given an RTT sample R:
  610                                  *
  611                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
  612                                  * SRTT = (1-alpha) * SRTT + alpha * R
  613                                  *
  614                                  * where alpha = 0.125 and beta = 0.25.
  615                                  *
  616                                  * The initial retransmit timeout is
  617                                  * SRTT + 4*RTTVAR and doubles on each
  618                                  * retransmision.
  619                                  */
  620                                 if (rt->rt_srtt == 0) {
  621                                         rt->rt_srtt = rtt;
  622                                         rt->rt_deviate = rtt / 2;
  623                                 } else {
  624                                         int32_t error = rtt - rt->rt_srtt;
  625                                         rt->rt_srtt += error / 8;
  626                                         error = abs(error) - rt->rt_deviate;
  627                                         rt->rt_deviate += error / 4;
  628                                 }
  629                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
  630                         }
  631 
  632                         break;
  633                 }
  634 
  635                 /*
  636                  * The sleep returned an error so our request is still
  637                  * on the list. If we got EWOULDBLOCK, we may want to
  638                  * re-send the request.
  639                  */
  640                 if (error != EWOULDBLOCK) {
  641                         errp->re_errno = error;
  642                         if (error == EINTR)
  643                                 errp->re_status = stat = RPC_INTR;
  644                         else
  645                                 errp->re_status = stat = RPC_CANTRECV;
  646                         goto out;
  647                 }
  648 
  649                 time_waited = ticks - starttime;
  650 
  651                 /* Check for timeout. */
  652                 if (time_waited > timeout) {
  653                         errp->re_errno = EWOULDBLOCK;
  654                         errp->re_status = stat = RPC_TIMEDOUT;
  655                         goto out;
  656                 }
  657 
  658                 /* Retransmit if necessary. */          
  659                 if (time_waited >= next_sendtime) {
  660                         cu->cu_cwnd /= 2;
  661                         if (cu->cu_cwnd < CWNDSCALE)
  662                                 cu->cu_cwnd = CWNDSCALE;
  663                         if (ext && ext->rc_feedback) {
  664                                 mtx_unlock(&cs->cs_lock);
  665                                 if (retrans == 0)
  666                                         ext->rc_feedback(FEEDBACK_REXMIT1,
  667                                             proc, ext->rc_feedback_arg);
  668                                 else
  669                                         ext->rc_feedback(FEEDBACK_REXMIT2,
  670                                             proc, ext->rc_feedback_arg);
  671                                 mtx_lock(&cs->cs_lock);
  672                         }
  673                         if (cu->cu_closing || cu->cu_closed) {
  674                                 errp->re_errno = ESHUTDOWN;
  675                                 errp->re_status = stat = RPC_CANTRECV;
  676                                 goto out;
  677                         }
  678                         retrans++;
  679                         /* update retransmit_time */
  680                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
  681                                 retransmit_time = 2 * retransmit_time;
  682                         next_sendtime += retransmit_time;
  683                         goto send_again;
  684                 }
  685                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  686         }
  687 
  688 got_reply:
  689         /*
  690          * Now decode and validate the response. We need to drop the
  691          * lock since xdr_replymsg may end up sleeping in malloc.
  692          */
  693         mtx_unlock(&cs->cs_lock);
  694 
  695         if (ext && ext->rc_feedback)
  696                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
  697 
  698         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
  699         ok = xdr_replymsg(&xdrs, &reply_msg);
  700         cr->cr_mrep = NULL;
  701 
  702         if (ok) {
  703                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
  704                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
  705                         errp->re_status = stat = RPC_SUCCESS;
  706                 else
  707                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
  708 
  709                 if (errp->re_status == RPC_SUCCESS) {
  710                         results = xdrmbuf_getall(&xdrs);
  711                         if (! AUTH_VALIDATE(auth, xid,
  712                                 &reply_msg.acpted_rply.ar_verf,
  713                                 &results)) {
  714                                 errp->re_status = stat = RPC_AUTHERROR;
  715                                 errp->re_why = AUTH_INVALIDRESP;
  716                                 if (retrans &&
  717                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
  718                                         /*
  719                                          * If we retransmitted, its
  720                                          * possible that we will
  721                                          * receive a reply for one of
  722                                          * the earlier transmissions
  723                                          * (which will use an older
  724                                          * RPCSEC_GSS sequence
  725                                          * number). In this case, just
  726                                          * go back and listen for a
  727                                          * new reply. We could keep a
  728                                          * record of all the seq
  729                                          * numbers we have transmitted
  730                                          * so far so that we could
  731                                          * accept a reply for any of
  732                                          * them here.
  733                                          */
  734                                         XDR_DESTROY(&xdrs);
  735                                         mtx_lock(&cs->cs_lock);
  736                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
  737                                             cr, cr_link);
  738                                         cr->cr_mrep = NULL;
  739                                         goto get_reply;
  740                                 }
  741                         } else {
  742                                 *resultsp = results;
  743                         }
  744                 }               /* end successful completion */
  745                 /*
  746                  * If unsuccesful AND error is an authentication error
  747                  * then refresh credentials and try again, else break
  748                  */
  749                 else if (stat == RPC_AUTHERROR)
  750                         /* maybe our credentials need to be refreshed ... */
  751                         if (nrefreshes > 0 &&
  752                             AUTH_REFRESH(auth, &reply_msg)) {
  753                                 nrefreshes--;
  754                                 XDR_DESTROY(&xdrs);
  755                                 mtx_lock(&cs->cs_lock);
  756                                 goto call_again;
  757                         }
  758                 /* end of unsuccessful completion */
  759         }       /* end of valid reply message */
  760         else {
  761                 errp->re_status = stat = RPC_CANTDECODERES;
  762 
  763         }
  764         XDR_DESTROY(&xdrs);
  765         mtx_lock(&cs->cs_lock);
  766 out:
  767         mtx_assert(&cs->cs_lock, MA_OWNED);
  768 
  769         if (mreq)
  770                 m_freem(mreq);
  771         if (cr->cr_mrep)
  772                 m_freem(cr->cr_mrep);
  773 
  774         cu->cu_threads--;
  775         if (cu->cu_closing)
  776                 wakeup(cu);
  777                 
  778         mtx_unlock(&cs->cs_lock);
  779 
  780         if (auth && stat != RPC_SUCCESS)
  781                 AUTH_VALIDATE(auth, xid, NULL, NULL);
  782 
  783         free(cr, M_RPC);
  784 
  785         return (stat);
  786 }
  787 
  788 static void
  789 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
  790 {
  791         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  792 
  793         *errp = cu->cu_error;
  794 }
  795 
  796 static bool_t
  797 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
  798 {
  799         XDR xdrs;
  800         bool_t dummy;
  801 
  802         xdrs.x_op = XDR_FREE;
  803         dummy = (*xdr_res)(&xdrs, res_ptr);
  804 
  805         return (dummy);
  806 }
  807 
  808 /*ARGSUSED*/
  809 static void
  810 clnt_dg_abort(CLIENT *h)
  811 {
  812 }
  813 
  814 static bool_t
  815 clnt_dg_control(CLIENT *cl, u_int request, void *info)
  816 {
  817         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  818         struct cu_socket *cs;
  819         struct sockaddr *addr;
  820 
  821         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  822         mtx_lock(&cs->cs_lock);
  823 
  824         switch (request) {
  825         case CLSET_FD_CLOSE:
  826                 cu->cu_closeit = TRUE;
  827                 mtx_unlock(&cs->cs_lock);
  828                 return (TRUE);
  829         case CLSET_FD_NCLOSE:
  830                 cu->cu_closeit = FALSE;
  831                 mtx_unlock(&cs->cs_lock);
  832                 return (TRUE);
  833         }
  834 
  835         /* for other requests which use info */
  836         if (info == NULL) {
  837                 mtx_unlock(&cs->cs_lock);
  838                 return (FALSE);
  839         }
  840         switch (request) {
  841         case CLSET_TIMEOUT:
  842                 if (time_not_ok((struct timeval *)info)) {
  843                         mtx_unlock(&cs->cs_lock);
  844                         return (FALSE);
  845                 }
  846                 cu->cu_total = *(struct timeval *)info;
  847                 break;
  848         case CLGET_TIMEOUT:
  849                 *(struct timeval *)info = cu->cu_total;
  850                 break;
  851         case CLSET_RETRY_TIMEOUT:
  852                 if (time_not_ok((struct timeval *)info)) {
  853                         mtx_unlock(&cs->cs_lock);
  854                         return (FALSE);
  855                 }
  856                 cu->cu_wait = *(struct timeval *)info;
  857                 break;
  858         case CLGET_RETRY_TIMEOUT:
  859                 *(struct timeval *)info = cu->cu_wait;
  860                 break;
  861         case CLGET_SVC_ADDR:
  862                 /*
  863                  * Slightly different semantics to userland - we use
  864                  * sockaddr instead of netbuf.
  865                  */
  866                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
  867                 break;
  868         case CLSET_SVC_ADDR:            /* set to new address */
  869                 addr = (struct sockaddr *)info;
  870                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
  871                 break;
  872         case CLGET_XID:
  873                 *(uint32_t *)info = cu->cu_xid;
  874                 break;
  875 
  876         case CLSET_XID:
  877                 /* This will set the xid of the NEXT call */
  878                 /* decrement by 1 as clnt_dg_call() increments once */
  879                 cu->cu_xid = *(uint32_t *)info - 1;
  880                 break;
  881 
  882         case CLGET_VERS:
  883                 /*
  884                  * This RELIES on the information that, in the call body,
  885                  * the version number field is the fifth field from the
  886                  * begining of the RPC header. MUST be changed if the
  887                  * call_struct is changed
  888                  */
  889                 *(uint32_t *)info =
  890                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  891                     4 * BYTES_PER_XDR_UNIT));
  892                 break;
  893 
  894         case CLSET_VERS:
  895                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
  896                         = htonl(*(uint32_t *)info);
  897                 break;
  898 
  899         case CLGET_PROG:
  900                 /*
  901                  * This RELIES on the information that, in the call body,
  902                  * the program number field is the fourth field from the
  903                  * begining of the RPC header. MUST be changed if the
  904                  * call_struct is changed
  905                  */
  906                 *(uint32_t *)info =
  907                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  908                     3 * BYTES_PER_XDR_UNIT));
  909                 break;
  910 
  911         case CLSET_PROG:
  912                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
  913                         = htonl(*(uint32_t *)info);
  914                 break;
  915         case CLSET_ASYNC:
  916                 cu->cu_async = *(int *)info;
  917                 break;
  918         case CLSET_CONNECT:
  919                 cu->cu_connect = *(int *)info;
  920                 break;
  921         case CLSET_WAITCHAN:
  922                 cu->cu_waitchan = (const char *)info;
  923                 break;
  924         case CLGET_WAITCHAN:
  925                 *(const char **) info = cu->cu_waitchan;
  926                 break;
  927         case CLSET_INTERRUPTIBLE:
  928                 if (*(int *) info)
  929                         cu->cu_waitflag = PCATCH;
  930                 else
  931                         cu->cu_waitflag = 0;
  932                 break;
  933         case CLGET_INTERRUPTIBLE:
  934                 if (cu->cu_waitflag)
  935                         *(int *) info = TRUE;
  936                 else
  937                         *(int *) info = FALSE;
  938                 break;
  939         default:
  940                 mtx_unlock(&cs->cs_lock);
  941                 return (FALSE);
  942         }
  943         mtx_unlock(&cs->cs_lock);
  944         return (TRUE);
  945 }
  946 
  947 static void
  948 clnt_dg_close(CLIENT *cl)
  949 {
  950         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  951         struct cu_socket *cs;
  952         struct cu_request *cr;
  953 
  954         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  955         mtx_lock(&cs->cs_lock);
  956 
  957         if (cu->cu_closed) {
  958                 mtx_unlock(&cs->cs_lock);
  959                 return;
  960         }
  961 
  962         if (cu->cu_closing) {
  963                 while (cu->cu_closing)
  964                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  965                 KASSERT(cu->cu_closed, ("client should be closed"));
  966                 mtx_unlock(&cs->cs_lock);
  967                 return;
  968         }
  969 
  970         /*
  971          * Abort any pending requests and wait until everyone
  972          * has finished with clnt_vc_call.
  973          */
  974         cu->cu_closing = TRUE;
  975         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
  976                 if (cr->cr_client == cl) {
  977                         cr->cr_xid = 0;
  978                         cr->cr_error = ESHUTDOWN;
  979                         wakeup(cr);
  980                 }
  981         }
  982 
  983         while (cu->cu_threads)
  984                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  985 
  986         cu->cu_closing = FALSE;
  987         cu->cu_closed = TRUE;
  988 
  989         mtx_unlock(&cs->cs_lock);
  990         wakeup(cu);
  991 }
  992 
  993 static void
  994 clnt_dg_destroy(CLIENT *cl)
  995 {
  996         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  997         struct cu_socket *cs;
  998         struct socket *so = NULL;
  999         bool_t lastsocketref;
 1000 
 1001         cs = cu->cu_socket->so_rcv.sb_upcallarg;
 1002         clnt_dg_close(cl);
 1003 
 1004         mtx_lock(&cs->cs_lock);
 1005 
 1006         cs->cs_refs--;
 1007         if (cs->cs_refs == 0) {
 1008                 mtx_unlock(&cs->cs_lock);
 1009                 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
 1010                 soupcall_clear(cu->cu_socket, SO_RCV);
 1011                 clnt_dg_upcallsdone(cu->cu_socket, cs);
 1012                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
 1013                 mtx_destroy(&cs->cs_lock);
 1014                 mem_free(cs, sizeof(*cs));
 1015                 lastsocketref = TRUE;
 1016         } else {
 1017                 mtx_unlock(&cs->cs_lock);
 1018                 lastsocketref = FALSE;
 1019         }
 1020 
 1021         if (cu->cu_closeit && lastsocketref) {
 1022                 so = cu->cu_socket;
 1023                 cu->cu_socket = NULL;
 1024         }
 1025 
 1026         if (so)
 1027                 soclose(so);
 1028 
 1029         if (cl->cl_netid && cl->cl_netid[0])
 1030                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
 1031         if (cl->cl_tp && cl->cl_tp[0])
 1032                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
 1033         mem_free(cu, sizeof (*cu));
 1034         mem_free(cl, sizeof (CLIENT));
 1035 }
 1036 
 1037 /*
 1038  * Make sure that the time is not garbage.  -1 value is allowed.
 1039  */
 1040 static bool_t
 1041 time_not_ok(struct timeval *t)
 1042 {
 1043         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
 1044                 t->tv_usec < -1 || t->tv_usec > 1000000);
 1045 }
 1046 
 1047 int
 1048 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
 1049 {
 1050         struct cu_socket *cs = (struct cu_socket *) arg;
 1051         struct uio uio;
 1052         struct mbuf *m;
 1053         struct mbuf *control;
 1054         struct cu_request *cr;
 1055         int error, rcvflag, foundreq;
 1056         uint32_t xid;
 1057 
 1058         cs->cs_upcallrefs++;
 1059         uio.uio_resid = 1000000000;
 1060         uio.uio_td = curthread;
 1061         do {
 1062                 SOCKBUF_UNLOCK(&so->so_rcv);
 1063                 m = NULL;
 1064                 control = NULL;
 1065                 rcvflag = MSG_DONTWAIT;
 1066                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
 1067                 if (control)
 1068                         m_freem(control);
 1069                 SOCKBUF_LOCK(&so->so_rcv);
 1070 
 1071                 if (error == EWOULDBLOCK)
 1072                         break;
 1073 
 1074                 /*
 1075                  * If there was an error, wake up all pending
 1076                  * requests.
 1077                  */
 1078                 if (error) {
 1079                         mtx_lock(&cs->cs_lock);
 1080                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1081                                 cr->cr_xid = 0;
 1082                                 cr->cr_error = error;
 1083                                 wakeup(cr);
 1084                         }
 1085                         mtx_unlock(&cs->cs_lock);
 1086                         break;
 1087                 }
 1088 
 1089                 /*
 1090                  * The XID is in the first uint32_t of the reply.
 1091                  */
 1092                 if (m->m_len < sizeof(xid) && m_length(m, NULL) < sizeof(xid))
 1093                         /*
 1094                          * Should never happen.
 1095                          */
 1096                         continue;
 1097 
 1098                 m_copydata(m, 0, sizeof(xid), (char *)&xid);
 1099                 xid = ntohl(xid);
 1100 
 1101                 /*
 1102                  * Attempt to match this reply with a pending request.
 1103                  */
 1104                 mtx_lock(&cs->cs_lock);
 1105                 foundreq = 0;
 1106                 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1107                         if (cr->cr_xid == xid) {
 1108                                 /*
 1109                                  * This one matches. We leave the
 1110                                  * reply mbuf in cr->cr_mrep. Set the
 1111                                  * XID to zero so that we will ignore
 1112                                  * any duplicated replies that arrive
 1113                                  * before clnt_dg_call removes it from
 1114                                  * the queue.
 1115                                  */
 1116                                 cr->cr_xid = 0;
 1117                                 cr->cr_mrep = m;
 1118                                 cr->cr_error = 0;
 1119                                 foundreq = 1;
 1120                                 wakeup(cr);
 1121                                 break;
 1122                         }
 1123                 }
 1124                 mtx_unlock(&cs->cs_lock);
 1125 
 1126                 /*
 1127                  * If we didn't find the matching request, just drop
 1128                  * it - its probably a repeated reply.
 1129                  */
 1130                 if (!foundreq)
 1131                         m_freem(m);
 1132         } while (m);
 1133         cs->cs_upcallrefs--;
 1134         if (cs->cs_upcallrefs < 0)
 1135                 panic("rpcdg upcall refcnt");
 1136         if (cs->cs_upcallrefs == 0)
 1137                 wakeup(&cs->cs_upcallrefs);
 1138         return (SU_OK);
 1139 }
 1140 
 1141 /*
 1142  * Wait for all upcalls in progress to complete.
 1143  */
 1144 static void
 1145 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
 1146 {
 1147 
 1148         SOCKBUF_LOCK_ASSERT(&so->so_rcv);
 1149 
 1150         while (cs->cs_upcallrefs > 0)
 1151                 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
 1152                     "rpcdgup", 0);
 1153 }

Cache object: e7986153d11bf45cdc32fc25cc2d6679


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.