The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System, Second Edition

[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/clnt_dg.c

Version: -  FREEBSD  -  FREEBSD10  -  FREEBSD9  -  FREEBSD92  -  FREEBSD91  -  FREEBSD90  -  FREEBSD8  -  FREEBSD82  -  FREEBSD81  -  FREEBSD80  -  FREEBSD7  -  FREEBSD74  -  FREEBSD73  -  FREEBSD72  -  FREEBSD71  -  FREEBSD70  -  FREEBSD6  -  FREEBSD64  -  FREEBSD63  -  FREEBSD62  -  FREEBSD61  -  FREEBSD60  -  FREEBSD5  -  FREEBSD55  -  FREEBSD54  -  FREEBSD53  -  FREEBSD52  -  FREEBSD51  -  FREEBSD50  -  FREEBSD4  -  FREEBSD3  -  FREEBSD22  -  linux-2.6  -  linux-2.4.22  -  MK83  -  MK84  -  PLAN9  -  DFBSD  -  NETBSD  -  NETBSD5  -  NETBSD4  -  NETBSD3  -  NETBSD20  -  OPENBSD  -  xnu-517  -  xnu-792  -  xnu-792.6.70  -  xnu-1228  -  xnu-1456.1.26  -  xnu-1699.24.8  -  xnu-2050.18.24  -  OPENSOLARIS  -  minix-3-1-1 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: clnt_dg.c,v 1.4 2000/07/14 08:40:41 fvdl Exp $ */
    2 
    3 /*
    4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
    5  * unrestricted use provided that this legend is included on all tape
    6  * media and as a part of the software program in whole or part.  Users
    7  * may copy or modify Sun RPC without charge, but are not authorized
    8  * to license or distribute it to anyone else except as part of a product or
    9  * program developed by the user.
   10  * 
   11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
   12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
   13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
   14  * 
   15  * Sun RPC is provided with no support and without any obligation on the
   16  * part of Sun Microsystems, Inc. to assist in its use, correction,
   17  * modification or enhancement.
   18  * 
   19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
   20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
   21  * OR ANY PART THEREOF.
   22  * 
   23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
   24  * or profits or other special, indirect and consequential damages, even if
   25  * Sun has been advised of the possibility of such damages.
   26  * 
   27  * Sun Microsystems, Inc.
   28  * 2550 Garcia Avenue
   29  * Mountain View, California  94043
   30  */
   31 /*
   32  * Copyright (c) 1986-1991 by Sun Microsystems Inc. 
   33  */
   34 
   35 #if defined(LIBC_SCCS) && !defined(lint)
   36 #ident  "@(#)clnt_dg.c  1.23    94/04/22 SMI"
   37 static char sccsid[] = "@(#)clnt_dg.c 1.19 89/03/16 Copyr 1988 Sun Micro";
   38 #endif
   39 #include <sys/cdefs.h>
   40 __FBSDID("$FreeBSD: releng/8.1/sys/rpc/clnt_dg.c 203786 2010-02-11 18:34:06Z mjacob $");
   41 
   42 /*
   43  * Implements a connectionless client side RPC.
   44  */
   45 
   46 #include <sys/param.h>
   47 #include <sys/systm.h>
   48 #include <sys/kernel.h>
   49 #include <sys/lock.h>
   50 #include <sys/malloc.h>
   51 #include <sys/mbuf.h>
   52 #include <sys/mutex.h>
   53 #include <sys/pcpu.h>
   54 #include <sys/proc.h>
   55 #include <sys/socket.h>
   56 #include <sys/socketvar.h>
   57 #include <sys/time.h>
   58 #include <sys/uio.h>
   59 
   60 #include <net/vnet.h>
   61 
   62 #include <rpc/rpc.h>
   63 #include <rpc/rpc_com.h>
   64 
   65 
   66 #ifdef _FREEFALL_CONFIG
   67 /*
   68  * Disable RPC exponential back-off for FreeBSD.org systems.
   69  */
   70 #define RPC_MAX_BACKOFF         1 /* second */
   71 #else
   72 #define RPC_MAX_BACKOFF         30 /* seconds */
   73 #endif
   74 
   75 static bool_t time_not_ok(struct timeval *);
   76 static enum clnt_stat clnt_dg_call(CLIENT *, struct rpc_callextra *,
   77     rpcproc_t, struct mbuf *, struct mbuf **, struct timeval);
   78 static void clnt_dg_geterr(CLIENT *, struct rpc_err *);
   79 static bool_t clnt_dg_freeres(CLIENT *, xdrproc_t, void *);
   80 static void clnt_dg_abort(CLIENT *);
   81 static bool_t clnt_dg_control(CLIENT *, u_int, void *);
   82 static void clnt_dg_close(CLIENT *);
   83 static void clnt_dg_destroy(CLIENT *);
   84 static int clnt_dg_soupcall(struct socket *so, void *arg, int waitflag);
   85 
   86 static struct clnt_ops clnt_dg_ops = {
   87         .cl_call =      clnt_dg_call,
   88         .cl_abort =     clnt_dg_abort,
   89         .cl_geterr =    clnt_dg_geterr,
   90         .cl_freeres =   clnt_dg_freeres,
   91         .cl_close =     clnt_dg_close,
   92         .cl_destroy =   clnt_dg_destroy,
   93         .cl_control =   clnt_dg_control
   94 };
   95 
   96 static const char mem_err_clnt_dg[] = "clnt_dg_create: out of memory";
   97 
   98 /*
   99  * A pending RPC request which awaits a reply. Requests which have
  100  * received their reply will have cr_xid set to zero and cr_mrep to
  101  * the mbuf chain of the reply.
  102  */
  103 struct cu_request {
  104         TAILQ_ENTRY(cu_request) cr_link;
  105         CLIENT                  *cr_client;     /* owner */
  106         uint32_t                cr_xid;         /* XID of request */
  107         struct mbuf             *cr_mrep;       /* reply received by upcall */
  108         int                     cr_error;       /* any error from upcall */
  109         char                    cr_verf[MAX_AUTH_BYTES]; /* reply verf */
  110 };
  111 
  112 TAILQ_HEAD(cu_request_list, cu_request);
  113 
  114 #define MCALL_MSG_SIZE 24
  115 
  116 /*
  117  * This structure is pointed to by the socket buffer's sb_upcallarg
  118  * member. It is separate from the client private data to facilitate
  119  * multiple clients sharing the same socket. The cs_lock mutex is used
  120  * to protect all fields of this structure, the socket's receive
  121  * buffer SOCKBUF_LOCK is used to ensure that exactly one of these
  122  * structures is installed on the socket.
  123  */
  124 struct cu_socket {
  125         struct mtx              cs_lock;
  126         int                     cs_refs;        /* Count of clients */
  127         struct cu_request_list  cs_pending;     /* Requests awaiting replies */
  128         int                     cs_upcallrefs;  /* Refcnt of upcalls in prog.*/
  129 };
  130 
  131 static void clnt_dg_upcallsdone(struct socket *, struct cu_socket *);
  132 
  133 /*
  134  * Private data kept per client handle
  135  */
  136 struct cu_data {
  137         int                     cu_threads;     /* # threads in clnt_vc_call */
  138         bool_t                  cu_closing;     /* TRUE if we are closing */
  139         bool_t                  cu_closed;      /* TRUE if we are closed */
  140         struct socket           *cu_socket;     /* connection socket */
  141         bool_t                  cu_closeit;     /* opened by library */
  142         struct sockaddr_storage cu_raddr;       /* remote address */
  143         int                     cu_rlen;
  144         struct timeval          cu_wait;        /* retransmit interval */
  145         struct timeval          cu_total;       /* total time for the call */
  146         struct rpc_err          cu_error;
  147         uint32_t                cu_xid;
  148         char                    cu_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
  149         size_t                  cu_mcalllen;
  150         size_t                  cu_sendsz;      /* send size */
  151         size_t                  cu_recvsz;      /* recv size */
  152         int                     cu_async;
  153         int                     cu_connect;     /* Use connect(). */
  154         int                     cu_connected;   /* Have done connect(). */
  155         const char              *cu_waitchan;
  156         int                     cu_waitflag;
  157         int                     cu_cwnd;        /* congestion window */
  158         int                     cu_sent;        /* number of in-flight RPCs */
  159         bool_t                  cu_cwnd_wait;
  160 };
  161 
  162 #define CWNDSCALE       256
  163 #define MAXCWND         (32 * CWNDSCALE)
  164 
  165 /*
  166  * Connection less client creation returns with client handle parameters.
  167  * Default options are set, which the user can change using clnt_control().
  168  * fd should be open and bound.
  169  * NB: The rpch->cl_auth is initialized to null authentication.
  170  *      Caller may wish to set this something more useful.
  171  *
  172  * sendsz and recvsz are the maximum allowable packet sizes that can be
  173  * sent and received. Normally they are the same, but they can be
  174  * changed to improve the program efficiency and buffer allocation.
  175  * If they are 0, use the transport default.
  176  *
  177  * If svcaddr is NULL, returns NULL.
  178  */
  179 CLIENT *
  180 clnt_dg_create(
  181         struct socket *so,
  182         struct sockaddr *svcaddr,       /* servers address */
  183         rpcprog_t program,              /* program number */
  184         rpcvers_t version,              /* version number */
  185         size_t sendsz,                  /* buffer recv size */
  186         size_t recvsz)                  /* buffer send size */
  187 {
  188         CLIENT *cl = NULL;              /* client handle */
  189         struct cu_data *cu = NULL;      /* private data */
  190         struct cu_socket *cs = NULL;
  191         struct sockbuf *sb;
  192         struct timeval now;
  193         struct rpc_msg call_msg;
  194         struct __rpc_sockinfo si;
  195         XDR xdrs;
  196 
  197         if (svcaddr == NULL) {
  198                 rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
  199                 return (NULL);
  200         }
  201 
  202         CURVNET_SET(so->so_vnet);
  203         if (!__rpc_socket2sockinfo(so, &si)) {
  204                 rpc_createerr.cf_stat = RPC_TLIERROR;
  205                 rpc_createerr.cf_error.re_errno = 0;
  206                 CURVNET_RESTORE();
  207                 return (NULL);
  208         }
  209         CURVNET_RESTORE();
  210 
  211         /*
  212          * Find the receive and the send size
  213          */
  214         sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
  215         recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
  216         if ((sendsz == 0) || (recvsz == 0)) {
  217                 rpc_createerr.cf_stat = RPC_TLIERROR; /* XXX */
  218                 rpc_createerr.cf_error.re_errno = 0;
  219                 return (NULL);
  220         }
  221 
  222         cl = mem_alloc(sizeof (CLIENT));
  223 
  224         /*
  225          * Should be multiple of 4 for XDR.
  226          */
  227         sendsz = ((sendsz + 3) / 4) * 4;
  228         recvsz = ((recvsz + 3) / 4) * 4;
  229         cu = mem_alloc(sizeof (*cu));
  230         cu->cu_threads = 0;
  231         cu->cu_closing = FALSE;
  232         cu->cu_closed = FALSE;
  233         (void) memcpy(&cu->cu_raddr, svcaddr, (size_t)svcaddr->sa_len);
  234         cu->cu_rlen = svcaddr->sa_len;
  235         /* Other values can also be set through clnt_control() */
  236         cu->cu_wait.tv_sec = 3; /* heuristically chosen */
  237         cu->cu_wait.tv_usec = 0;
  238         cu->cu_total.tv_sec = -1;
  239         cu->cu_total.tv_usec = -1;
  240         cu->cu_sendsz = sendsz;
  241         cu->cu_recvsz = recvsz;
  242         cu->cu_async = FALSE;
  243         cu->cu_connect = FALSE;
  244         cu->cu_connected = FALSE;
  245         cu->cu_waitchan = "rpcrecv";
  246         cu->cu_waitflag = 0;
  247         cu->cu_cwnd = MAXCWND / 2;
  248         cu->cu_sent = 0;
  249         cu->cu_cwnd_wait = FALSE;
  250         (void) getmicrotime(&now);
  251         cu->cu_xid = __RPC_GETXID(&now);
  252         call_msg.rm_xid = cu->cu_xid;
  253         call_msg.rm_call.cb_prog = program;
  254         call_msg.rm_call.cb_vers = version;
  255         xdrmem_create(&xdrs, cu->cu_mcallc, MCALL_MSG_SIZE, XDR_ENCODE);
  256         if (! xdr_callhdr(&xdrs, &call_msg)) {
  257                 rpc_createerr.cf_stat = RPC_CANTENCODEARGS;  /* XXX */
  258                 rpc_createerr.cf_error.re_errno = 0;
  259                 goto err2;
  260         }
  261         cu->cu_mcalllen = XDR_GETPOS(&xdrs);
  262 
  263         /*
  264          * By default, closeit is always FALSE. It is users responsibility
  265          * to do a close on it, else the user may use clnt_control
  266          * to let clnt_destroy do it for him/her.
  267          */
  268         cu->cu_closeit = FALSE;
  269         cu->cu_socket = so;
  270         soreserve(so, 256*1024, 256*1024);
  271 
  272         sb = &so->so_rcv;
  273         SOCKBUF_LOCK(&so->so_rcv);
  274 recheck_socket:
  275         if (sb->sb_upcall) {
  276                 if (sb->sb_upcall != clnt_dg_soupcall) {
  277                         SOCKBUF_UNLOCK(&so->so_rcv);
  278                         printf("clnt_dg_create(): socket already has an incompatible upcall\n");
  279                         goto err2;
  280                 }
  281                 cs = (struct cu_socket *) sb->sb_upcallarg;
  282                 mtx_lock(&cs->cs_lock);
  283                 cs->cs_refs++;
  284                 mtx_unlock(&cs->cs_lock);
  285         } else {
  286                 /*
  287                  * We are the first on this socket - allocate the
  288                  * structure and install it in the socket.
  289                  */
  290                 SOCKBUF_UNLOCK(&so->so_rcv);
  291                 cs = mem_alloc(sizeof(*cs));
  292                 SOCKBUF_LOCK(&so->so_rcv);
  293                 if (sb->sb_upcall) {
  294                         /*
  295                          * We have lost a race with some other client.
  296                          */
  297                         mem_free(cs, sizeof(*cs));
  298                         goto recheck_socket;
  299                 }
  300                 mtx_init(&cs->cs_lock, "cs->cs_lock", NULL, MTX_DEF);
  301                 cs->cs_refs = 1;
  302                 cs->cs_upcallrefs = 0;
  303                 TAILQ_INIT(&cs->cs_pending);
  304                 soupcall_set(so, SO_RCV, clnt_dg_soupcall, cs);
  305         }
  306         SOCKBUF_UNLOCK(&so->so_rcv);
  307 
  308         cl->cl_refs = 1;
  309         cl->cl_ops = &clnt_dg_ops;
  310         cl->cl_private = (caddr_t)(void *)cu;
  311         cl->cl_auth = authnone_create();
  312         cl->cl_tp = NULL;
  313         cl->cl_netid = NULL;
  314         return (cl);
  315 err2:
  316         if (cl) {
  317                 mem_free(cl, sizeof (CLIENT));
  318                 if (cu)
  319                         mem_free(cu, sizeof (*cu));
  320         }
  321         return (NULL);
  322 }
  323 
  324 static enum clnt_stat
  325 clnt_dg_call(
  326         CLIENT          *cl,            /* client handle */
  327         struct rpc_callextra *ext,      /* call metadata */
  328         rpcproc_t       proc,           /* procedure number */
  329         struct mbuf     *args,          /* pointer to args */
  330         struct mbuf     **resultsp,     /* pointer to results */
  331         struct timeval  utimeout)       /* seconds to wait before giving up */
  332 {
  333         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  334         struct cu_socket *cs;
  335         struct rpc_timers *rt;
  336         AUTH *auth;
  337         struct rpc_err *errp;
  338         enum clnt_stat stat;
  339         XDR xdrs;
  340         struct rpc_msg reply_msg;
  341         bool_t ok;
  342         int retrans;                    /* number of re-transmits so far */
  343         int nrefreshes = 2;             /* number of times to refresh cred */
  344         struct timeval *tvp;
  345         int timeout;
  346         int retransmit_time;
  347         int next_sendtime, starttime, rtt, time_waited, tv = 0;
  348         struct sockaddr *sa;
  349         socklen_t salen;
  350         uint32_t xid = 0;
  351         struct mbuf *mreq = NULL, *results;
  352         struct cu_request *cr;
  353         int error;
  354 
  355         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  356         cr = malloc(sizeof(struct cu_request), M_RPC, M_WAITOK);
  357 
  358         mtx_lock(&cs->cs_lock);
  359 
  360         if (cu->cu_closing || cu->cu_closed) {
  361                 mtx_unlock(&cs->cs_lock);
  362                 free(cr, M_RPC);
  363                 return (RPC_CANTSEND);
  364         }
  365         cu->cu_threads++;
  366 
  367         if (ext) {
  368                 auth = ext->rc_auth;
  369                 errp = &ext->rc_err;
  370         } else {
  371                 auth = cl->cl_auth;
  372                 errp = &cu->cu_error;
  373         }
  374 
  375         cr->cr_client = cl;
  376         cr->cr_mrep = NULL;
  377         cr->cr_error = 0;
  378 
  379         if (cu->cu_total.tv_usec == -1) {
  380                 tvp = &utimeout; /* use supplied timeout */
  381         } else {
  382                 tvp = &cu->cu_total; /* use default timeout */
  383         }
  384         if (tvp->tv_sec || tvp->tv_usec)
  385                 timeout = tvtohz(tvp);
  386         else
  387                 timeout = 0;
  388 
  389         if (cu->cu_connect && !cu->cu_connected) {
  390                 mtx_unlock(&cs->cs_lock);
  391                 error = soconnect(cu->cu_socket,
  392                     (struct sockaddr *)&cu->cu_raddr, curthread);
  393                 mtx_lock(&cs->cs_lock);
  394                 if (error) {
  395                         errp->re_errno = error;
  396                         errp->re_status = stat = RPC_CANTSEND;
  397                         goto out;
  398                 }
  399                 cu->cu_connected = 1;
  400         }
  401         if (cu->cu_connected) {
  402                 sa = NULL;
  403                 salen = 0;
  404         } else {
  405                 sa = (struct sockaddr *)&cu->cu_raddr;
  406                 salen = cu->cu_rlen;
  407         }
  408         time_waited = 0;
  409         retrans = 0;
  410         if (ext && ext->rc_timers) {
  411                 rt = ext->rc_timers;
  412                 if (!rt->rt_rtxcur)
  413                         rt->rt_rtxcur = tvtohz(&cu->cu_wait);
  414                 retransmit_time = next_sendtime = rt->rt_rtxcur;
  415         } else {
  416                 rt = NULL;
  417                 retransmit_time = next_sendtime = tvtohz(&cu->cu_wait);
  418         }
  419 
  420         starttime = ticks;
  421 
  422 call_again:
  423         mtx_assert(&cs->cs_lock, MA_OWNED);
  424 
  425         cu->cu_xid++;
  426         xid = cu->cu_xid;
  427 
  428 send_again:
  429         mtx_unlock(&cs->cs_lock);
  430 
  431         MGETHDR(mreq, M_WAIT, MT_DATA);
  432         KASSERT(cu->cu_mcalllen <= MHLEN, ("RPC header too big"));
  433         bcopy(cu->cu_mcallc, mreq->m_data, cu->cu_mcalllen);
  434         mreq->m_len = cu->cu_mcalllen;
  435 
  436         /*
  437          * The XID is the first thing in the request.
  438          */
  439         *mtod(mreq, uint32_t *) = htonl(xid);
  440 
  441         xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
  442 
  443         if (cu->cu_async == TRUE && args == NULL)
  444                 goto get_reply;
  445 
  446         if ((! XDR_PUTINT32(&xdrs, &proc)) ||
  447             (! AUTH_MARSHALL(auth, xid, &xdrs,
  448                 m_copym(args, 0, M_COPYALL, M_WAITOK)))) {
  449                 errp->re_status = stat = RPC_CANTENCODEARGS;
  450                 mtx_lock(&cs->cs_lock);
  451                 goto out;
  452         }
  453         mreq->m_pkthdr.len = m_length(mreq, NULL);
  454 
  455         cr->cr_xid = xid;
  456         mtx_lock(&cs->cs_lock);
  457 
  458         /*
  459          * Try to get a place in the congestion window.
  460          */
  461         while (cu->cu_sent >= cu->cu_cwnd) {
  462                 cu->cu_cwnd_wait = TRUE;
  463                 error = msleep(&cu->cu_cwnd_wait, &cs->cs_lock,
  464                     cu->cu_waitflag, "rpccwnd", 0);
  465                 if (error) {
  466                         errp->re_errno = error;
  467                         errp->re_status = stat = RPC_CANTSEND;
  468                         goto out;
  469                 }
  470         }
  471         cu->cu_sent += CWNDSCALE;
  472 
  473         TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  474         mtx_unlock(&cs->cs_lock);
  475 
  476         /*
  477          * sosend consumes mreq.
  478          */
  479         error = sosend(cu->cu_socket, sa, NULL, mreq, NULL, 0, curthread);
  480         mreq = NULL;
  481 
  482         /*
  483          * sub-optimal code appears here because we have
  484          * some clock time to spare while the packets are in flight.
  485          * (We assume that this is actually only executed once.)
  486          */
  487         reply_msg.acpted_rply.ar_verf.oa_flavor = AUTH_NULL;
  488         reply_msg.acpted_rply.ar_verf.oa_base = cr->cr_verf;
  489         reply_msg.acpted_rply.ar_verf.oa_length = 0;
  490         reply_msg.acpted_rply.ar_results.where = NULL;
  491         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
  492 
  493         mtx_lock(&cs->cs_lock);
  494         if (error) {
  495                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  496                 errp->re_errno = error;
  497                 errp->re_status = stat = RPC_CANTSEND;
  498                 cu->cu_sent -= CWNDSCALE;
  499                 if (cu->cu_cwnd_wait) {
  500                         cu->cu_cwnd_wait = FALSE;
  501                         wakeup(&cu->cu_cwnd_wait);
  502                 }
  503                 goto out;
  504         }
  505 
  506         /*
  507          * Check to see if we got an upcall while waiting for the
  508          * lock.
  509          */
  510         if (cr->cr_error) {
  511                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  512                 errp->re_errno = cr->cr_error;
  513                 errp->re_status = stat = RPC_CANTRECV;
  514                 cu->cu_sent -= CWNDSCALE;
  515                 if (cu->cu_cwnd_wait) {
  516                         cu->cu_cwnd_wait = FALSE;
  517                         wakeup(&cu->cu_cwnd_wait);
  518                 }
  519                 goto out;
  520         }
  521         if (cr->cr_mrep) {
  522                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  523                 cu->cu_sent -= CWNDSCALE;
  524                 if (cu->cu_cwnd_wait) {
  525                         cu->cu_cwnd_wait = FALSE;
  526                         wakeup(&cu->cu_cwnd_wait);
  527                 }
  528                 goto got_reply;
  529         }
  530 
  531         /*
  532          * Hack to provide rpc-based message passing
  533          */
  534         if (timeout == 0) {
  535                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  536                 errp->re_status = stat = RPC_TIMEDOUT;
  537                 cu->cu_sent -= CWNDSCALE;
  538                 if (cu->cu_cwnd_wait) {
  539                         cu->cu_cwnd_wait = FALSE;
  540                         wakeup(&cu->cu_cwnd_wait);
  541                 }
  542                 goto out;
  543         }
  544 
  545 get_reply:
  546         for (;;) {
  547                 /* Decide how long to wait. */
  548                 if (next_sendtime < timeout)
  549                         tv = next_sendtime;
  550                 else
  551                         tv = timeout;
  552                 tv -= time_waited;
  553 
  554                 if (tv > 0) {
  555                         if (cu->cu_closing || cu->cu_closed) {
  556                                 error = 0;
  557                                 cr->cr_error = ESHUTDOWN;
  558                         } else {
  559                                 error = msleep(cr, &cs->cs_lock,
  560                                     cu->cu_waitflag, cu->cu_waitchan, tv);
  561                         }
  562                 } else {
  563                         error = EWOULDBLOCK;
  564                 }
  565 
  566                 TAILQ_REMOVE(&cs->cs_pending, cr, cr_link);
  567                 cu->cu_sent -= CWNDSCALE;
  568                 if (cu->cu_cwnd_wait) {
  569                         cu->cu_cwnd_wait = FALSE;
  570                         wakeup(&cu->cu_cwnd_wait);
  571                 }
  572 
  573                 if (!error) {
  574                         /*
  575                          * We were woken up by the upcall.  If the
  576                          * upcall had a receive error, report that,
  577                          * otherwise we have a reply.
  578                          */
  579                         if (cr->cr_error) {
  580                                 errp->re_errno = cr->cr_error;
  581                                 errp->re_status = stat = RPC_CANTRECV;
  582                                 goto out;
  583                         }
  584 
  585                         cu->cu_cwnd += (CWNDSCALE * CWNDSCALE
  586                             + cu->cu_cwnd / 2) / cu->cu_cwnd;
  587                         if (cu->cu_cwnd > MAXCWND)
  588                                 cu->cu_cwnd = MAXCWND;
  589 
  590                         if (rt) {
  591                                 /*
  592                                  * Add one to the time since a tick
  593                                  * count of N means that the actual
  594                                  * time taken was somewhere between N
  595                                  * and N+1.
  596                                  */
  597                                 rtt = ticks - starttime + 1;
  598 
  599                                 /*
  600                                  * Update our estimate of the round
  601                                  * trip time using roughly the
  602                                  * algorithm described in RFC
  603                                  * 2988. Given an RTT sample R:
  604                                  *
  605                                  * RTTVAR = (1-beta) * RTTVAR + beta * |SRTT-R|
  606                                  * SRTT = (1-alpha) * SRTT + alpha * R
  607                                  *
  608                                  * where alpha = 0.125 and beta = 0.25.
  609                                  *
  610                                  * The initial retransmit timeout is
  611                                  * SRTT + 4*RTTVAR and doubles on each
  612                                  * retransmision.
  613                                  */
  614                                 if (rt->rt_srtt == 0) {
  615                                         rt->rt_srtt = rtt;
  616                                         rt->rt_deviate = rtt / 2;
  617                                 } else {
  618                                         int32_t error = rtt - rt->rt_srtt;
  619                                         rt->rt_srtt += error / 8;
  620                                         error = abs(error) - rt->rt_deviate;
  621                                         rt->rt_deviate += error / 4;
  622                                 }
  623                                 rt->rt_rtxcur = rt->rt_srtt + 4*rt->rt_deviate;
  624                         }
  625 
  626                         break;
  627                 }
  628 
  629                 /*
  630                  * The sleep returned an error so our request is still
  631                  * on the list. If we got EWOULDBLOCK, we may want to
  632                  * re-send the request.
  633                  */
  634                 if (error != EWOULDBLOCK) {
  635                         errp->re_errno = error;
  636                         if (error == EINTR)
  637                                 errp->re_status = stat = RPC_INTR;
  638                         else
  639                                 errp->re_status = stat = RPC_CANTRECV;
  640                         goto out;
  641                 }
  642 
  643                 time_waited = ticks - starttime;
  644 
  645                 /* Check for timeout. */
  646                 if (time_waited > timeout) {
  647                         errp->re_errno = EWOULDBLOCK;
  648                         errp->re_status = stat = RPC_TIMEDOUT;
  649                         goto out;
  650                 }
  651 
  652                 /* Retransmit if necessary. */          
  653                 if (time_waited >= next_sendtime) {
  654                         cu->cu_cwnd /= 2;
  655                         if (cu->cu_cwnd < CWNDSCALE)
  656                                 cu->cu_cwnd = CWNDSCALE;
  657                         if (ext && ext->rc_feedback) {
  658                                 mtx_unlock(&cs->cs_lock);
  659                                 if (retrans == 0)
  660                                         ext->rc_feedback(FEEDBACK_REXMIT1,
  661                                             proc, ext->rc_feedback_arg);
  662                                 else
  663                                         ext->rc_feedback(FEEDBACK_REXMIT2,
  664                                             proc, ext->rc_feedback_arg);
  665                                 mtx_lock(&cs->cs_lock);
  666                         }
  667                         if (cu->cu_closing || cu->cu_closed) {
  668                                 errp->re_errno = ESHUTDOWN;
  669                                 errp->re_status = stat = RPC_CANTRECV;
  670                                 goto out;
  671                         }
  672                         retrans++;
  673                         /* update retransmit_time */
  674                         if (retransmit_time < RPC_MAX_BACKOFF * hz)
  675                                 retransmit_time = 2 * retransmit_time;
  676                         next_sendtime += retransmit_time;
  677                         goto send_again;
  678                 }
  679                 TAILQ_INSERT_TAIL(&cs->cs_pending, cr, cr_link);
  680         }
  681 
  682 got_reply:
  683         /*
  684          * Now decode and validate the response. We need to drop the
  685          * lock since xdr_replymsg may end up sleeping in malloc.
  686          */
  687         mtx_unlock(&cs->cs_lock);
  688 
  689         if (ext && ext->rc_feedback)
  690                 ext->rc_feedback(FEEDBACK_OK, proc, ext->rc_feedback_arg);
  691 
  692         xdrmbuf_create(&xdrs, cr->cr_mrep, XDR_DECODE);
  693         ok = xdr_replymsg(&xdrs, &reply_msg);
  694         cr->cr_mrep = NULL;
  695 
  696         if (ok) {
  697                 if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
  698                     (reply_msg.acpted_rply.ar_stat == SUCCESS))
  699                         errp->re_status = stat = RPC_SUCCESS;
  700                 else
  701                         stat = _seterr_reply(&reply_msg, &(cu->cu_error));
  702 
  703                 if (errp->re_status == RPC_SUCCESS) {
  704                         results = xdrmbuf_getall(&xdrs);
  705                         if (! AUTH_VALIDATE(auth, xid,
  706                                 &reply_msg.acpted_rply.ar_verf,
  707                                 &results)) {
  708                                 errp->re_status = stat = RPC_AUTHERROR;
  709                                 errp->re_why = AUTH_INVALIDRESP;
  710                                 if (retrans &&
  711                                     auth->ah_cred.oa_flavor == RPCSEC_GSS) {
  712                                         /*
  713                                          * If we retransmitted, its
  714                                          * possible that we will
  715                                          * receive a reply for one of
  716                                          * the earlier transmissions
  717                                          * (which will use an older
  718                                          * RPCSEC_GSS sequence
  719                                          * number). In this case, just
  720                                          * go back and listen for a
  721                                          * new reply. We could keep a
  722                                          * record of all the seq
  723                                          * numbers we have transmitted
  724                                          * so far so that we could
  725                                          * accept a reply for any of
  726                                          * them here.
  727                                          */
  728                                         XDR_DESTROY(&xdrs);
  729                                         mtx_lock(&cs->cs_lock);
  730                                         TAILQ_INSERT_TAIL(&cs->cs_pending,
  731                                             cr, cr_link);
  732                                         cr->cr_mrep = NULL;
  733                                         goto get_reply;
  734                                 }
  735                         } else {
  736                                 *resultsp = results;
  737                         }
  738                 }               /* end successful completion */
  739                 /*
  740                  * If unsuccesful AND error is an authentication error
  741                  * then refresh credentials and try again, else break
  742                  */
  743                 else if (stat == RPC_AUTHERROR)
  744                         /* maybe our credentials need to be refreshed ... */
  745                         if (nrefreshes > 0 &&
  746                             AUTH_REFRESH(auth, &reply_msg)) {
  747                                 nrefreshes--;
  748                                 XDR_DESTROY(&xdrs);
  749                                 mtx_lock(&cs->cs_lock);
  750                                 goto call_again;
  751                         }
  752                 /* end of unsuccessful completion */
  753         }       /* end of valid reply message */
  754         else {
  755                 errp->re_status = stat = RPC_CANTDECODERES;
  756 
  757         }
  758         XDR_DESTROY(&xdrs);
  759         mtx_lock(&cs->cs_lock);
  760 out:
  761         mtx_assert(&cs->cs_lock, MA_OWNED);
  762 
  763         if (mreq)
  764                 m_freem(mreq);
  765         if (cr->cr_mrep)
  766                 m_freem(cr->cr_mrep);
  767 
  768         cu->cu_threads--;
  769         if (cu->cu_closing)
  770                 wakeup(cu);
  771                 
  772         mtx_unlock(&cs->cs_lock);
  773 
  774         if (auth && stat != RPC_SUCCESS)
  775                 AUTH_VALIDATE(auth, xid, NULL, NULL);
  776 
  777         free(cr, M_RPC);
  778 
  779         return (stat);
  780 }
  781 
  782 static void
  783 clnt_dg_geterr(CLIENT *cl, struct rpc_err *errp)
  784 {
  785         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  786 
  787         *errp = cu->cu_error;
  788 }
  789 
  790 static bool_t
  791 clnt_dg_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
  792 {
  793         XDR xdrs;
  794         bool_t dummy;
  795 
  796         xdrs.x_op = XDR_FREE;
  797         dummy = (*xdr_res)(&xdrs, res_ptr);
  798 
  799         return (dummy);
  800 }
  801 
  802 /*ARGSUSED*/
  803 static void
  804 clnt_dg_abort(CLIENT *h)
  805 {
  806 }
  807 
  808 static bool_t
  809 clnt_dg_control(CLIENT *cl, u_int request, void *info)
  810 {
  811         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  812         struct cu_socket *cs;
  813         struct sockaddr *addr;
  814 
  815         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  816         mtx_lock(&cs->cs_lock);
  817 
  818         switch (request) {
  819         case CLSET_FD_CLOSE:
  820                 cu->cu_closeit = TRUE;
  821                 mtx_unlock(&cs->cs_lock);
  822                 return (TRUE);
  823         case CLSET_FD_NCLOSE:
  824                 cu->cu_closeit = FALSE;
  825                 mtx_unlock(&cs->cs_lock);
  826                 return (TRUE);
  827         }
  828 
  829         /* for other requests which use info */
  830         if (info == NULL) {
  831                 mtx_unlock(&cs->cs_lock);
  832                 return (FALSE);
  833         }
  834         switch (request) {
  835         case CLSET_TIMEOUT:
  836                 if (time_not_ok((struct timeval *)info)) {
  837                         mtx_unlock(&cs->cs_lock);
  838                         return (FALSE);
  839                 }
  840                 cu->cu_total = *(struct timeval *)info;
  841                 break;
  842         case CLGET_TIMEOUT:
  843                 *(struct timeval *)info = cu->cu_total;
  844                 break;
  845         case CLSET_RETRY_TIMEOUT:
  846                 if (time_not_ok((struct timeval *)info)) {
  847                         mtx_unlock(&cs->cs_lock);
  848                         return (FALSE);
  849                 }
  850                 cu->cu_wait = *(struct timeval *)info;
  851                 break;
  852         case CLGET_RETRY_TIMEOUT:
  853                 *(struct timeval *)info = cu->cu_wait;
  854                 break;
  855         case CLGET_SVC_ADDR:
  856                 /*
  857                  * Slightly different semantics to userland - we use
  858                  * sockaddr instead of netbuf.
  859                  */
  860                 memcpy(info, &cu->cu_raddr, cu->cu_raddr.ss_len);
  861                 break;
  862         case CLSET_SVC_ADDR:            /* set to new address */
  863                 addr = (struct sockaddr *)info;
  864                 (void) memcpy(&cu->cu_raddr, addr, addr->sa_len);
  865                 break;
  866         case CLGET_XID:
  867                 *(uint32_t *)info = cu->cu_xid;
  868                 break;
  869 
  870         case CLSET_XID:
  871                 /* This will set the xid of the NEXT call */
  872                 /* decrement by 1 as clnt_dg_call() increments once */
  873                 cu->cu_xid = *(uint32_t *)info - 1;
  874                 break;
  875 
  876         case CLGET_VERS:
  877                 /*
  878                  * This RELIES on the information that, in the call body,
  879                  * the version number field is the fifth field from the
  880                  * begining of the RPC header. MUST be changed if the
  881                  * call_struct is changed
  882                  */
  883                 *(uint32_t *)info =
  884                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  885                     4 * BYTES_PER_XDR_UNIT));
  886                 break;
  887 
  888         case CLSET_VERS:
  889                 *(uint32_t *)(void *)(cu->cu_mcallc + 4 * BYTES_PER_XDR_UNIT)
  890                         = htonl(*(uint32_t *)info);
  891                 break;
  892 
  893         case CLGET_PROG:
  894                 /*
  895                  * This RELIES on the information that, in the call body,
  896                  * the program number field is the fourth field from the
  897                  * begining of the RPC header. MUST be changed if the
  898                  * call_struct is changed
  899                  */
  900                 *(uint32_t *)info =
  901                     ntohl(*(uint32_t *)(void *)(cu->cu_mcallc +
  902                     3 * BYTES_PER_XDR_UNIT));
  903                 break;
  904 
  905         case CLSET_PROG:
  906                 *(uint32_t *)(void *)(cu->cu_mcallc + 3 * BYTES_PER_XDR_UNIT)
  907                         = htonl(*(uint32_t *)info);
  908                 break;
  909         case CLSET_ASYNC:
  910                 cu->cu_async = *(int *)info;
  911                 break;
  912         case CLSET_CONNECT:
  913                 cu->cu_connect = *(int *)info;
  914                 break;
  915         case CLSET_WAITCHAN:
  916                 cu->cu_waitchan = (const char *)info;
  917                 break;
  918         case CLGET_WAITCHAN:
  919                 *(const char **) info = cu->cu_waitchan;
  920                 break;
  921         case CLSET_INTERRUPTIBLE:
  922                 if (*(int *) info)
  923                         cu->cu_waitflag = PCATCH;
  924                 else
  925                         cu->cu_waitflag = 0;
  926                 break;
  927         case CLGET_INTERRUPTIBLE:
  928                 if (cu->cu_waitflag)
  929                         *(int *) info = TRUE;
  930                 else
  931                         *(int *) info = FALSE;
  932                 break;
  933         default:
  934                 mtx_unlock(&cs->cs_lock);
  935                 return (FALSE);
  936         }
  937         mtx_unlock(&cs->cs_lock);
  938         return (TRUE);
  939 }
  940 
  941 static void
  942 clnt_dg_close(CLIENT *cl)
  943 {
  944         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  945         struct cu_socket *cs;
  946         struct cu_request *cr;
  947 
  948         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  949         mtx_lock(&cs->cs_lock);
  950 
  951         if (cu->cu_closed) {
  952                 mtx_unlock(&cs->cs_lock);
  953                 return;
  954         }
  955 
  956         if (cu->cu_closing) {
  957                 while (cu->cu_closing)
  958                         msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  959                 KASSERT(cu->cu_closed, ("client should be closed"));
  960                 mtx_unlock(&cs->cs_lock);
  961                 return;
  962         }
  963 
  964         /*
  965          * Abort any pending requests and wait until everyone
  966          * has finished with clnt_vc_call.
  967          */
  968         cu->cu_closing = TRUE;
  969         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
  970                 if (cr->cr_client == cl) {
  971                         cr->cr_xid = 0;
  972                         cr->cr_error = ESHUTDOWN;
  973                         wakeup(cr);
  974                 }
  975         }
  976 
  977         while (cu->cu_threads)
  978                 msleep(cu, &cs->cs_lock, 0, "rpcclose", 0);
  979 
  980         cu->cu_closing = FALSE;
  981         cu->cu_closed = TRUE;
  982 
  983         mtx_unlock(&cs->cs_lock);
  984         wakeup(cu);
  985 }
  986 
  987 static void
  988 clnt_dg_destroy(CLIENT *cl)
  989 {
  990         struct cu_data *cu = (struct cu_data *)cl->cl_private;
  991         struct cu_socket *cs;
  992         struct socket *so = NULL;
  993         bool_t lastsocketref;
  994 
  995         cs = cu->cu_socket->so_rcv.sb_upcallarg;
  996         clnt_dg_close(cl);
  997 
  998         mtx_lock(&cs->cs_lock);
  999 
 1000         cs->cs_refs--;
 1001         if (cs->cs_refs == 0) {
 1002                 mtx_unlock(&cs->cs_lock);
 1003                 SOCKBUF_LOCK(&cu->cu_socket->so_rcv);
 1004                 soupcall_clear(cu->cu_socket, SO_RCV);
 1005                 clnt_dg_upcallsdone(cu->cu_socket, cs);
 1006                 SOCKBUF_UNLOCK(&cu->cu_socket->so_rcv);
 1007                 mtx_destroy(&cs->cs_lock);
 1008                 mem_free(cs, sizeof(*cs));
 1009                 lastsocketref = TRUE;
 1010         } else {
 1011                 mtx_unlock(&cs->cs_lock);
 1012                 lastsocketref = FALSE;
 1013         }
 1014 
 1015         if (cu->cu_closeit && lastsocketref) {
 1016                 so = cu->cu_socket;
 1017                 cu->cu_socket = NULL;
 1018         }
 1019 
 1020         if (so)
 1021                 soclose(so);
 1022 
 1023         if (cl->cl_netid && cl->cl_netid[0])
 1024                 mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
 1025         if (cl->cl_tp && cl->cl_tp[0])
 1026                 mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
 1027         mem_free(cu, sizeof (*cu));
 1028         mem_free(cl, sizeof (CLIENT));
 1029 }
 1030 
 1031 /*
 1032  * Make sure that the time is not garbage.  -1 value is allowed.
 1033  */
 1034 static bool_t
 1035 time_not_ok(struct timeval *t)
 1036 {
 1037         return (t->tv_sec < -1 || t->tv_sec > 100000000 ||
 1038                 t->tv_usec < -1 || t->tv_usec > 1000000);
 1039 }
 1040 
 1041 int
 1042 clnt_dg_soupcall(struct socket *so, void *arg, int waitflag)
 1043 {
 1044         struct cu_socket *cs = (struct cu_socket *) arg;
 1045         struct uio uio;
 1046         struct mbuf *m;
 1047         struct mbuf *control;
 1048         struct cu_request *cr;
 1049         int error, rcvflag, foundreq;
 1050         uint32_t xid;
 1051 
 1052         cs->cs_upcallrefs++;
 1053         uio.uio_resid = 1000000000;
 1054         uio.uio_td = curthread;
 1055         do {
 1056                 SOCKBUF_UNLOCK(&so->so_rcv);
 1057                 m = NULL;
 1058                 control = NULL;
 1059                 rcvflag = MSG_DONTWAIT;
 1060                 error = soreceive(so, NULL, &uio, &m, &control, &rcvflag);
 1061                 if (control)
 1062                         m_freem(control);
 1063                 SOCKBUF_LOCK(&so->so_rcv);
 1064 
 1065                 if (error == EWOULDBLOCK)
 1066                         break;
 1067 
 1068                 /*
 1069                  * If there was an error, wake up all pending
 1070                  * requests.
 1071                  */
 1072                 if (error) {
 1073                         mtx_lock(&cs->cs_lock);
 1074                         TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1075                                 cr->cr_xid = 0;
 1076                                 cr->cr_error = error;
 1077                                 wakeup(cr);
 1078                         }
 1079                         mtx_unlock(&cs->cs_lock);
 1080                         break;
 1081                 }
 1082 
 1083                 /*
 1084                  * The XID is in the first uint32_t of the reply.
 1085                  */
 1086                 if (m->m_len < sizeof(xid))
 1087                         m = m_pullup(m, sizeof(xid));
 1088                 if (!m)
 1089                         /*
 1090                          * Should never happen.
 1091                          */
 1092                         continue;
 1093 
 1094                 xid = ntohl(*mtod(m, uint32_t *));
 1095 
 1096                 /*
 1097                  * Attempt to match this reply with a pending request.
 1098                  */
 1099                 mtx_lock(&cs->cs_lock);
 1100                 foundreq = 0;
 1101                 TAILQ_FOREACH(cr, &cs->cs_pending, cr_link) {
 1102                         if (cr->cr_xid == xid) {
 1103                                 /*
 1104                                  * This one matches. We leave the
 1105                                  * reply mbuf in cr->cr_mrep. Set the
 1106                                  * XID to zero so that we will ignore
 1107                                  * any duplicated replies that arrive
 1108                                  * before clnt_dg_call removes it from
 1109                                  * the queue.
 1110                                  */
 1111                                 cr->cr_xid = 0;
 1112                                 cr->cr_mrep = m;
 1113                                 cr->cr_error = 0;
 1114                                 foundreq = 1;
 1115                                 wakeup(cr);
 1116                                 break;
 1117                         }
 1118                 }
 1119                 mtx_unlock(&cs->cs_lock);
 1120 
 1121                 /*
 1122                  * If we didn't find the matching request, just drop
 1123                  * it - its probably a repeated reply.
 1124                  */
 1125                 if (!foundreq)
 1126                         m_freem(m);
 1127         } while (m);
 1128         cs->cs_upcallrefs--;
 1129         if (cs->cs_upcallrefs < 0)
 1130                 panic("rpcdg upcall refcnt");
 1131         if (cs->cs_upcallrefs == 0)
 1132                 wakeup(&cs->cs_upcallrefs);
 1133         return (SU_OK);
 1134 }
 1135 
 1136 /*
 1137  * Wait for all upcalls in progress to complete.
 1138  */
 1139 static void
 1140 clnt_dg_upcallsdone(struct socket *so, struct cu_socket *cs)
 1141 {
 1142 
 1143         SOCKBUF_LOCK_ASSERT(&so->so_rcv);
 1144 
 1145         while (cs->cs_upcallrefs > 0)
 1146                 (void) msleep(&cs->cs_upcallrefs, SOCKBUF_MTX(&so->so_rcv), 0,
 1147                     "rpcdgup", 0);
 1148 }

Cache object: 84b17017530357567f4b449136d9ea43


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.