The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc_vc.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $  */
    2 
    3 /*-
    4  * Copyright (c) 2009, Sun Microsystems, Inc.
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without 
    8  * modification, are permitted provided that the following conditions are met:
    9  * - Redistributions of source code must retain the above copyright notice, 
   10  *   this list of conditions and the following disclaimer.
   11  * - Redistributions in binary form must reproduce the above copyright notice, 
   12  *   this list of conditions and the following disclaimer in the documentation 
   13  *   and/or other materials provided with the distribution.
   14  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
   15  *   contributors may be used to endorse or promote products derived 
   16  *   from this software without specific prior written permission.
   17  * 
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 #if defined(LIBC_SCCS) && !defined(lint)
   32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
   33 static char *sccsid = "@(#)svc_tcp.c    2.2 88/08/01 4.0 RPCSRC";
   34 #endif
   35 #include <sys/cdefs.h>
   36 __FBSDID("$FreeBSD: releng/10.3/sys/rpc/svc_vc.c 287338 2015-09-01 01:01:35Z delphij $");
   37 
   38 /*
   39  * svc_vc.c, Server side for Connection Oriented based RPC. 
   40  *
   41  * Actually implements two flavors of transporter -
   42  * a tcp rendezvouser (a listner and connection establisher)
   43  * and a record/tcp stream.
   44  */
   45 
   46 #include <sys/param.h>
   47 #include <sys/lock.h>
   48 #include <sys/kernel.h>
   49 #include <sys/malloc.h>
   50 #include <sys/mbuf.h>
   51 #include <sys/mutex.h>
   52 #include <sys/proc.h>
   53 #include <sys/protosw.h>
   54 #include <sys/queue.h>
   55 #include <sys/socket.h>
   56 #include <sys/socketvar.h>
   57 #include <sys/sx.h>
   58 #include <sys/systm.h>
   59 #include <sys/uio.h>
   60 
   61 #include <net/vnet.h>
   62 
   63 #include <netinet/tcp.h>
   64 
   65 #include <rpc/rpc.h>
   66 
   67 #include <rpc/krpc.h>
   68 #include <rpc/rpc_com.h>
   69 
   70 #include <security/mac/mac_framework.h>
   71 
   72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
   73     struct sockaddr **, struct mbuf **);
   74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
   75 static void svc_vc_rendezvous_destroy(SVCXPRT *);
   76 static bool_t svc_vc_null(void);
   77 static void svc_vc_destroy(SVCXPRT *);
   78 static enum xprt_stat svc_vc_stat(SVCXPRT *);
   79 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
   80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
   81     struct sockaddr **, struct mbuf **);
   82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
   83     struct sockaddr *, struct mbuf *, uint32_t *seq);
   84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
   85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
   86     void *in);
   87 static void svc_vc_backchannel_destroy(SVCXPRT *);
   88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
   89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
   90     struct sockaddr **, struct mbuf **);
   91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
   92     struct sockaddr *, struct mbuf *, uint32_t *);
   93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
   94     void *in);
   95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
   96     struct sockaddr *raddr);
   97 static int svc_vc_accept(struct socket *head, struct socket **sop);
   98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
   99 
  100 static struct xp_ops svc_vc_rendezvous_ops = {
  101         .xp_recv =      svc_vc_rendezvous_recv,
  102         .xp_stat =      svc_vc_rendezvous_stat,
  103         .xp_reply =     (bool_t (*)(SVCXPRT *, struct rpc_msg *,
  104                 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
  105         .xp_destroy =   svc_vc_rendezvous_destroy,
  106         .xp_control =   svc_vc_rendezvous_control
  107 };
  108 
  109 static struct xp_ops svc_vc_ops = {
  110         .xp_recv =      svc_vc_recv,
  111         .xp_stat =      svc_vc_stat,
  112         .xp_ack =       svc_vc_ack,
  113         .xp_reply =     svc_vc_reply,
  114         .xp_destroy =   svc_vc_destroy,
  115         .xp_control =   svc_vc_control
  116 };
  117 
  118 static struct xp_ops svc_vc_backchannel_ops = {
  119         .xp_recv =      svc_vc_backchannel_recv,
  120         .xp_stat =      svc_vc_backchannel_stat,
  121         .xp_reply =     svc_vc_backchannel_reply,
  122         .xp_destroy =   svc_vc_backchannel_destroy,
  123         .xp_control =   svc_vc_backchannel_control
  124 };
  125 
  126 /*
  127  * Usage:
  128  *      xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
  129  *
  130  * Creates, registers, and returns a (rpc) tcp based transporter.
  131  * Once *xprt is initialized, it is registered as a transporter
  132  * see (svc.h, xprt_register).  This routine returns
  133  * a NULL if a problem occurred.
  134  *
  135  * The filedescriptor passed in is expected to refer to a bound, but
  136  * not yet connected socket.
  137  *
  138  * Since streams do buffered io similar to stdio, the caller can specify
  139  * how big the send and receive buffers are via the second and third parms;
  140  * 0 => use the system default.
  141  */
  142 SVCXPRT *
  143 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
  144     size_t recvsize)
  145 {
  146         SVCXPRT *xprt;
  147         struct sockaddr* sa;
  148         int error;
  149 
  150         SOCK_LOCK(so);
  151         if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
  152                 SOCK_UNLOCK(so);
  153                 CURVNET_SET(so->so_vnet);
  154                 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
  155                 CURVNET_RESTORE();
  156                 if (error)
  157                         return (NULL);
  158                 xprt = svc_vc_create_conn(pool, so, sa);
  159                 free(sa, M_SONAME);
  160                 return (xprt);
  161         }
  162         SOCK_UNLOCK(so);
  163 
  164         xprt = svc_xprt_alloc();
  165         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  166         xprt->xp_pool = pool;
  167         xprt->xp_socket = so;
  168         xprt->xp_p1 = NULL;
  169         xprt->xp_p2 = NULL;
  170         xprt->xp_ops = &svc_vc_rendezvous_ops;
  171 
  172         CURVNET_SET(so->so_vnet);
  173         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
  174         CURVNET_RESTORE();
  175         if (error) {
  176                 goto cleanup_svc_vc_create;
  177         }
  178 
  179         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
  180         free(sa, M_SONAME);
  181 
  182         xprt_register(xprt);
  183 
  184         solisten(so, -1, curthread);
  185 
  186         SOCKBUF_LOCK(&so->so_rcv);
  187         xprt->xp_upcallset = 1;
  188         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
  189         SOCKBUF_UNLOCK(&so->so_rcv);
  190 
  191         return (xprt);
  192 cleanup_svc_vc_create:
  193         if (xprt) {
  194                 sx_destroy(&xprt->xp_lock);
  195                 svc_xprt_free(xprt);
  196         }
  197         return (NULL);
  198 }
  199 
  200 /*
  201  * Create a new transport for a socket optained via soaccept().
  202  */
  203 SVCXPRT *
  204 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
  205 {
  206         SVCXPRT *xprt = NULL;
  207         struct cf_conn *cd = NULL;
  208         struct sockaddr* sa = NULL;
  209         struct sockopt opt;
  210         int one = 1;
  211         int error;
  212 
  213         bzero(&opt, sizeof(struct sockopt));
  214         opt.sopt_dir = SOPT_SET;
  215         opt.sopt_level = SOL_SOCKET;
  216         opt.sopt_name = SO_KEEPALIVE;
  217         opt.sopt_val = &one;
  218         opt.sopt_valsize = sizeof(one);
  219         error = sosetopt(so, &opt);
  220         if (error) {
  221                 return (NULL);
  222         }
  223 
  224         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
  225                 bzero(&opt, sizeof(struct sockopt));
  226                 opt.sopt_dir = SOPT_SET;
  227                 opt.sopt_level = IPPROTO_TCP;
  228                 opt.sopt_name = TCP_NODELAY;
  229                 opt.sopt_val = &one;
  230                 opt.sopt_valsize = sizeof(one);
  231                 error = sosetopt(so, &opt);
  232                 if (error) {
  233                         return (NULL);
  234                 }
  235         }
  236 
  237         cd = mem_alloc(sizeof(*cd));
  238         cd->strm_stat = XPRT_IDLE;
  239 
  240         xprt = svc_xprt_alloc();
  241         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  242         xprt->xp_pool = pool;
  243         xprt->xp_socket = so;
  244         xprt->xp_p1 = cd;
  245         xprt->xp_p2 = NULL;
  246         xprt->xp_ops = &svc_vc_ops;
  247 
  248         /*
  249          * See http://www.connectathon.org/talks96/nfstcp.pdf - client
  250          * has a 5 minute timer, server has a 6 minute timer.
  251          */
  252         xprt->xp_idletimeout = 6 * 60;
  253 
  254         memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
  255 
  256         CURVNET_SET(so->so_vnet);
  257         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
  258         CURVNET_RESTORE();
  259         if (error)
  260                 goto cleanup_svc_vc_create;
  261 
  262         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
  263         free(sa, M_SONAME);
  264 
  265         xprt_register(xprt);
  266 
  267         SOCKBUF_LOCK(&so->so_rcv);
  268         xprt->xp_upcallset = 1;
  269         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
  270         SOCKBUF_UNLOCK(&so->so_rcv);
  271 
  272         /*
  273          * Throw the transport into the active list in case it already
  274          * has some data buffered.
  275          */
  276         sx_xlock(&xprt->xp_lock);
  277         xprt_active(xprt);
  278         sx_xunlock(&xprt->xp_lock);
  279 
  280         return (xprt);
  281 cleanup_svc_vc_create:
  282         if (xprt) {
  283                 sx_destroy(&xprt->xp_lock);
  284                 svc_xprt_free(xprt);
  285         }
  286         if (cd)
  287                 mem_free(cd, sizeof(*cd));
  288         return (NULL);
  289 }
  290 
  291 /*
  292  * Create a new transport for a backchannel on a clnt_vc socket.
  293  */
  294 SVCXPRT *
  295 svc_vc_create_backchannel(SVCPOOL *pool)
  296 {
  297         SVCXPRT *xprt = NULL;
  298         struct cf_conn *cd = NULL;
  299 
  300         cd = mem_alloc(sizeof(*cd));
  301         cd->strm_stat = XPRT_IDLE;
  302 
  303         xprt = svc_xprt_alloc();
  304         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  305         xprt->xp_pool = pool;
  306         xprt->xp_socket = NULL;
  307         xprt->xp_p1 = cd;
  308         xprt->xp_p2 = NULL;
  309         xprt->xp_ops = &svc_vc_backchannel_ops;
  310         return (xprt);
  311 }
  312 
  313 /*
  314  * This does all of the accept except the final call to soaccept. The
  315  * caller will call soaccept after dropping its locks (soaccept may
  316  * call malloc).
  317  */
  318 int
  319 svc_vc_accept(struct socket *head, struct socket **sop)
  320 {
  321         int error = 0;
  322         struct socket *so;
  323 
  324         if ((head->so_options & SO_ACCEPTCONN) == 0) {
  325                 error = EINVAL;
  326                 goto done;
  327         }
  328 #ifdef MAC
  329         error = mac_socket_check_accept(curthread->td_ucred, head);
  330         if (error != 0)
  331                 goto done;
  332 #endif
  333         ACCEPT_LOCK();
  334         if (TAILQ_EMPTY(&head->so_comp)) {
  335                 ACCEPT_UNLOCK();
  336                 error = EWOULDBLOCK;
  337                 goto done;
  338         }
  339         so = TAILQ_FIRST(&head->so_comp);
  340         KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
  341         KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
  342 
  343         /*
  344          * Before changing the flags on the socket, we have to bump the
  345          * reference count.  Otherwise, if the protocol calls sofree(),
  346          * the socket will be released due to a zero refcount.
  347          * XXX might not need soref() since this is simpler than kern_accept.
  348          */
  349         SOCK_LOCK(so);                  /* soref() and so_state update */
  350         soref(so);                      /* file descriptor reference */
  351 
  352         TAILQ_REMOVE(&head->so_comp, so, so_list);
  353         head->so_qlen--;
  354         so->so_state |= (head->so_state & SS_NBIO);
  355         so->so_qstate &= ~SQ_COMP;
  356         so->so_head = NULL;
  357 
  358         SOCK_UNLOCK(so);
  359         ACCEPT_UNLOCK();
  360 
  361         *sop = so;
  362 
  363         /* connection has been removed from the listen queue */
  364         KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
  365 done:
  366         return (error);
  367 }
  368 
  369 /*ARGSUSED*/
  370 static bool_t
  371 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  372     struct sockaddr **addrp, struct mbuf **mp)
  373 {
  374         struct socket *so = NULL;
  375         struct sockaddr *sa = NULL;
  376         int error;
  377         SVCXPRT *new_xprt;
  378 
  379         /*
  380          * The socket upcall calls xprt_active() which will eventually
  381          * cause the server to call us here. We attempt to accept a
  382          * connection from the socket and turn it into a new
  383          * transport. If the accept fails, we have drained all pending
  384          * connections so we call xprt_inactive().
  385          */
  386         sx_xlock(&xprt->xp_lock);
  387 
  388         error = svc_vc_accept(xprt->xp_socket, &so);
  389 
  390         if (error == EWOULDBLOCK) {
  391                 /*
  392                  * We must re-test for new connections after taking
  393                  * the lock to protect us in the case where a new
  394                  * connection arrives after our call to accept fails
  395                  * with EWOULDBLOCK.
  396                  */
  397                 ACCEPT_LOCK();
  398                 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
  399                         xprt_inactive_self(xprt);
  400                 ACCEPT_UNLOCK();
  401                 sx_xunlock(&xprt->xp_lock);
  402                 return (FALSE);
  403         }
  404 
  405         if (error) {
  406                 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
  407                 if (xprt->xp_upcallset) {
  408                         xprt->xp_upcallset = 0;
  409                         soupcall_clear(xprt->xp_socket, SO_RCV);
  410                 }
  411                 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
  412                 xprt_inactive_self(xprt);
  413                 sx_xunlock(&xprt->xp_lock);
  414                 return (FALSE);
  415         }
  416 
  417         sx_xunlock(&xprt->xp_lock);
  418 
  419         sa = 0;
  420         error = soaccept(so, &sa);
  421 
  422         if (error) {
  423                 /*
  424                  * XXX not sure if I need to call sofree or soclose here.
  425                  */
  426                 if (sa)
  427                         free(sa, M_SONAME);
  428                 return (FALSE);
  429         }
  430 
  431         /*
  432          * svc_vc_create_conn will call xprt_register - we don't need
  433          * to do anything with the new connection except derefence it.
  434          */
  435         new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
  436         if (!new_xprt) {
  437                 soclose(so);
  438         } else {
  439                 SVC_RELEASE(new_xprt);
  440         }
  441 
  442         free(sa, M_SONAME);
  443 
  444         return (FALSE); /* there is never an rpc msg to be processed */
  445 }
  446 
  447 /*ARGSUSED*/
  448 static enum xprt_stat
  449 svc_vc_rendezvous_stat(SVCXPRT *xprt)
  450 {
  451 
  452         return (XPRT_IDLE);
  453 }
  454 
  455 static void
  456 svc_vc_destroy_common(SVCXPRT *xprt)
  457 {
  458         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
  459         if (xprt->xp_upcallset) {
  460                 xprt->xp_upcallset = 0;
  461                 soupcall_clear(xprt->xp_socket, SO_RCV);
  462         }
  463         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
  464 
  465         if (xprt->xp_socket)
  466                 (void)soclose(xprt->xp_socket);
  467 
  468         if (xprt->xp_netid)
  469                 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
  470         svc_xprt_free(xprt);
  471 }
  472 
  473 static void
  474 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
  475 {
  476 
  477         svc_vc_destroy_common(xprt);
  478 }
  479 
  480 static void
  481 svc_vc_destroy(SVCXPRT *xprt)
  482 {
  483         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
  484 
  485         svc_vc_destroy_common(xprt);
  486 
  487         if (cd->mreq)
  488                 m_freem(cd->mreq);
  489         if (cd->mpending)
  490                 m_freem(cd->mpending);
  491         mem_free(cd, sizeof(*cd));
  492 }
  493 
  494 static void
  495 svc_vc_backchannel_destroy(SVCXPRT *xprt)
  496 {
  497         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
  498         struct mbuf *m, *m2;
  499 
  500         svc_xprt_free(xprt);
  501         m = cd->mreq;
  502         while (m != NULL) {
  503                 m2 = m;
  504                 m = m->m_nextpkt;
  505                 m_freem(m2);
  506         }
  507         mem_free(cd, sizeof(*cd));
  508 }
  509 
  510 /*ARGSUSED*/
  511 static bool_t
  512 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
  513 {
  514         return (FALSE);
  515 }
  516 
  517 static bool_t
  518 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
  519 {
  520 
  521         return (FALSE);
  522 }
  523 
  524 static bool_t
  525 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
  526 {
  527 
  528         return (FALSE);
  529 }
  530 
  531 static enum xprt_stat
  532 svc_vc_stat(SVCXPRT *xprt)
  533 {
  534         struct cf_conn *cd;
  535 
  536         cd = (struct cf_conn *)(xprt->xp_p1);
  537 
  538         if (cd->strm_stat == XPRT_DIED)
  539                 return (XPRT_DIED);
  540 
  541         if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
  542                 return (XPRT_MOREREQS);
  543 
  544         if (soreadable(xprt->xp_socket))
  545                 return (XPRT_MOREREQS);
  546 
  547         return (XPRT_IDLE);
  548 }
  549 
  550 static bool_t
  551 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
  552 {
  553 
  554         *ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
  555         *ack -= xprt->xp_socket->so_snd.sb_cc;
  556         return (TRUE);
  557 }
  558 
  559 static enum xprt_stat
  560 svc_vc_backchannel_stat(SVCXPRT *xprt)
  561 {
  562         struct cf_conn *cd;
  563 
  564         cd = (struct cf_conn *)(xprt->xp_p1);
  565 
  566         if (cd->mreq != NULL)
  567                 return (XPRT_MOREREQS);
  568 
  569         return (XPRT_IDLE);
  570 }
  571 
  572 /*
  573  * If we have an mbuf chain in cd->mpending, try to parse a record from it,
  574  * leaving the result in cd->mreq. If we don't have a complete record, leave
  575  * the partial result in cd->mreq and try to read more from the socket.
  576  */
  577 static int
  578 svc_vc_process_pending(SVCXPRT *xprt)
  579 {
  580         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  581         struct socket *so = xprt->xp_socket;
  582         struct mbuf *m;
  583 
  584         /*
  585          * If cd->resid is non-zero, we have part of the
  586          * record already, otherwise we are expecting a record
  587          * marker.
  588          */
  589         if (!cd->resid && cd->mpending) {
  590                 /*
  591                  * See if there is enough data buffered to
  592                  * make up a record marker. Make sure we can
  593                  * handle the case where the record marker is
  594                  * split across more than one mbuf.
  595                  */
  596                 size_t n = 0;
  597                 uint32_t header;
  598 
  599                 m = cd->mpending;
  600                 while (n < sizeof(uint32_t) && m) {
  601                         n += m->m_len;
  602                         m = m->m_next;
  603                 }
  604                 if (n < sizeof(uint32_t)) {
  605                         so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
  606                         return (FALSE);
  607                 }
  608                 m_copydata(cd->mpending, 0, sizeof(header),
  609                     (char *)&header);
  610                 header = ntohl(header);
  611                 cd->eor = (header & 0x80000000) != 0;
  612                 cd->resid = header & 0x7fffffff;
  613                 m_adj(cd->mpending, sizeof(uint32_t));
  614         }
  615 
  616         /*
  617          * Start pulling off mbufs from cd->mpending
  618          * until we either have a complete record or
  619          * we run out of data. We use m_split to pull
  620          * data - it will pull as much as possible and
  621          * split the last mbuf if necessary.
  622          */
  623         while (cd->mpending && cd->resid) {
  624                 m = cd->mpending;
  625                 if (cd->mpending->m_next
  626                     || cd->mpending->m_len > cd->resid)
  627                         cd->mpending = m_split(cd->mpending,
  628                             cd->resid, M_WAITOK);
  629                 else
  630                         cd->mpending = NULL;
  631                 if (cd->mreq)
  632                         m_last(cd->mreq)->m_next = m;
  633                 else
  634                         cd->mreq = m;
  635                 while (m) {
  636                         cd->resid -= m->m_len;
  637                         m = m->m_next;
  638                 }
  639         }
  640 
  641         /*
  642          * Block receive upcalls if we have more data pending,
  643          * otherwise report our need.
  644          */
  645         if (cd->mpending)
  646                 so->so_rcv.sb_lowat = INT_MAX;
  647         else
  648                 so->so_rcv.sb_lowat =
  649                     imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
  650         return (TRUE);
  651 }
  652 
  653 static bool_t
  654 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  655     struct sockaddr **addrp, struct mbuf **mp)
  656 {
  657         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  658         struct uio uio;
  659         struct mbuf *m;
  660         struct socket* so = xprt->xp_socket;
  661         XDR xdrs;
  662         int error, rcvflag;
  663         uint32_t xid_plus_direction[2];
  664 
  665         /*
  666          * Serialise access to the socket and our own record parsing
  667          * state.
  668          */
  669         sx_xlock(&xprt->xp_lock);
  670 
  671         for (;;) {
  672                 /* If we have no request ready, check pending queue. */
  673                 while (cd->mpending &&
  674                     (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) {
  675                         if (!svc_vc_process_pending(xprt))
  676                                 break;
  677                 }
  678 
  679                 /* Process and return complete request in cd->mreq. */
  680                 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
  681 
  682                         /*
  683                          * Now, check for a backchannel reply.
  684                          * The XID is in the first uint32_t of the reply
  685                          * and the message direction is the second one.
  686                          */
  687                         if ((cd->mreq->m_len >= sizeof(xid_plus_direction) ||
  688                             m_length(cd->mreq, NULL) >=
  689                             sizeof(xid_plus_direction)) &&
  690                             xprt->xp_p2 != NULL) {
  691                                 m_copydata(cd->mreq, 0,
  692                                     sizeof(xid_plus_direction),
  693                                     (char *)xid_plus_direction);
  694                                 xid_plus_direction[0] =
  695                                     ntohl(xid_plus_direction[0]);
  696                                 xid_plus_direction[1] =
  697                                     ntohl(xid_plus_direction[1]);
  698                                 /* Check message direction. */
  699                                 if (xid_plus_direction[1] == REPLY) {
  700                                         clnt_bck_svccall(xprt->xp_p2,
  701                                             cd->mreq,
  702                                             xid_plus_direction[0]);
  703                                         cd->mreq = NULL;
  704                                         continue;
  705                                 }
  706                         }
  707 
  708                         xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
  709                         cd->mreq = NULL;
  710 
  711                         /* Check for next request in a pending queue. */
  712                         svc_vc_process_pending(xprt);
  713                         if (cd->mreq == NULL || cd->resid != 0) {
  714                                 SOCKBUF_LOCK(&so->so_rcv);
  715                                 if (!soreadable(so))
  716                                         xprt_inactive_self(xprt);
  717                                 SOCKBUF_UNLOCK(&so->so_rcv);
  718                         }
  719 
  720                         sx_xunlock(&xprt->xp_lock);
  721 
  722                         if (! xdr_callmsg(&xdrs, msg)) {
  723                                 XDR_DESTROY(&xdrs);
  724                                 return (FALSE);
  725                         }
  726 
  727                         *addrp = NULL;
  728                         *mp = xdrmbuf_getall(&xdrs);
  729                         XDR_DESTROY(&xdrs);
  730 
  731                         return (TRUE);
  732                 }
  733 
  734                 /*
  735                  * The socket upcall calls xprt_active() which will eventually
  736                  * cause the server to call us here. We attempt to
  737                  * read as much as possible from the socket and put
  738                  * the result in cd->mpending. If the read fails,
  739                  * we have drained both cd->mpending and the socket so
  740                  * we can call xprt_inactive().
  741                  */
  742                 uio.uio_resid = 1000000000;
  743                 uio.uio_td = curthread;
  744                 m = NULL;
  745                 rcvflag = MSG_DONTWAIT;
  746                 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
  747 
  748                 if (error == EWOULDBLOCK) {
  749                         /*
  750                          * We must re-test for readability after
  751                          * taking the lock to protect us in the case
  752                          * where a new packet arrives on the socket
  753                          * after our call to soreceive fails with
  754                          * EWOULDBLOCK.
  755                          */
  756                         SOCKBUF_LOCK(&so->so_rcv);
  757                         if (!soreadable(so))
  758                                 xprt_inactive_self(xprt);
  759                         SOCKBUF_UNLOCK(&so->so_rcv);
  760                         sx_xunlock(&xprt->xp_lock);
  761                         return (FALSE);
  762                 }
  763 
  764                 if (error) {
  765                         SOCKBUF_LOCK(&so->so_rcv);
  766                         if (xprt->xp_upcallset) {
  767                                 xprt->xp_upcallset = 0;
  768                                 soupcall_clear(so, SO_RCV);
  769                         }
  770                         SOCKBUF_UNLOCK(&so->so_rcv);
  771                         xprt_inactive_self(xprt);
  772                         cd->strm_stat = XPRT_DIED;
  773                         sx_xunlock(&xprt->xp_lock);
  774                         return (FALSE);
  775                 }
  776 
  777                 if (!m) {
  778                         /*
  779                          * EOF - the other end has closed the socket.
  780                          */
  781                         xprt_inactive_self(xprt);
  782                         cd->strm_stat = XPRT_DIED;
  783                         sx_xunlock(&xprt->xp_lock);
  784                         return (FALSE);
  785                 }
  786 
  787                 if (cd->mpending)
  788                         m_last(cd->mpending)->m_next = m;
  789                 else
  790                         cd->mpending = m;
  791         }
  792 }
  793 
  794 static bool_t
  795 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  796     struct sockaddr **addrp, struct mbuf **mp)
  797 {
  798         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  799         struct ct_data *ct;
  800         struct mbuf *m;
  801         XDR xdrs;
  802 
  803         sx_xlock(&xprt->xp_lock);
  804         ct = (struct ct_data *)xprt->xp_p2;
  805         if (ct == NULL) {
  806                 sx_xunlock(&xprt->xp_lock);
  807                 return (FALSE);
  808         }
  809         mtx_lock(&ct->ct_lock);
  810         m = cd->mreq;
  811         if (m == NULL) {
  812                 xprt_inactive_self(xprt);
  813                 mtx_unlock(&ct->ct_lock);
  814                 sx_xunlock(&xprt->xp_lock);
  815                 return (FALSE);
  816         }
  817         cd->mreq = m->m_nextpkt;
  818         mtx_unlock(&ct->ct_lock);
  819         sx_xunlock(&xprt->xp_lock);
  820 
  821         xdrmbuf_create(&xdrs, m, XDR_DECODE);
  822         if (! xdr_callmsg(&xdrs, msg)) {
  823                 XDR_DESTROY(&xdrs);
  824                 return (FALSE);
  825         }
  826         *addrp = NULL;
  827         *mp = xdrmbuf_getall(&xdrs);
  828         XDR_DESTROY(&xdrs);
  829         return (TRUE);
  830 }
  831 
  832 static bool_t
  833 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
  834     struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
  835 {
  836         XDR xdrs;
  837         struct mbuf *mrep;
  838         bool_t stat = TRUE;
  839         int error, len;
  840 
  841         /*
  842          * Leave space for record mark.
  843          */
  844         mrep = m_gethdr(M_WAITOK, MT_DATA);
  845         mrep->m_data += sizeof(uint32_t);
  846 
  847         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
  848 
  849         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
  850             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
  851                 if (!xdr_replymsg(&xdrs, msg))
  852                         stat = FALSE;
  853                 else
  854                         xdrmbuf_append(&xdrs, m);
  855         } else {
  856                 stat = xdr_replymsg(&xdrs, msg);
  857         }
  858 
  859         if (stat) {
  860                 m_fixhdr(mrep);
  861 
  862                 /*
  863                  * Prepend a record marker containing the reply length.
  864                  */
  865                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
  866                 len = mrep->m_pkthdr.len;
  867                 *mtod(mrep, uint32_t *) =
  868                         htonl(0x80000000 | (len - sizeof(uint32_t)));
  869                 atomic_add_acq_32(&xprt->xp_snd_cnt, len);
  870                 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
  871                     0, curthread);
  872                 if (!error) {
  873                         atomic_add_rel_32(&xprt->xp_snt_cnt, len);
  874                         if (seq)
  875                                 *seq = xprt->xp_snd_cnt;
  876                         stat = TRUE;
  877                 } else
  878                         atomic_subtract_32(&xprt->xp_snd_cnt, len);
  879         } else {
  880                 m_freem(mrep);
  881         }
  882 
  883         XDR_DESTROY(&xdrs);
  884 
  885         return (stat);
  886 }
  887 
  888 static bool_t
  889 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
  890     struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
  891 {
  892         struct ct_data *ct;
  893         XDR xdrs;
  894         struct mbuf *mrep;
  895         bool_t stat = TRUE;
  896         int error;
  897 
  898         /*
  899          * Leave space for record mark.
  900          */
  901         mrep = m_gethdr(M_WAITOK, MT_DATA);
  902         mrep->m_data += sizeof(uint32_t);
  903 
  904         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
  905 
  906         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
  907             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
  908                 if (!xdr_replymsg(&xdrs, msg))
  909                         stat = FALSE;
  910                 else
  911                         xdrmbuf_append(&xdrs, m);
  912         } else {
  913                 stat = xdr_replymsg(&xdrs, msg);
  914         }
  915 
  916         if (stat) {
  917                 m_fixhdr(mrep);
  918 
  919                 /*
  920                  * Prepend a record marker containing the reply length.
  921                  */
  922                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
  923                 *mtod(mrep, uint32_t *) =
  924                         htonl(0x80000000 | (mrep->m_pkthdr.len
  925                                 - sizeof(uint32_t)));
  926                 sx_xlock(&xprt->xp_lock);
  927                 ct = (struct ct_data *)xprt->xp_p2;
  928                 if (ct != NULL)
  929                         error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
  930                             0, curthread);
  931                 else
  932                         error = EPIPE;
  933                 sx_xunlock(&xprt->xp_lock);
  934                 if (!error) {
  935                         stat = TRUE;
  936                 }
  937         } else {
  938                 m_freem(mrep);
  939         }
  940 
  941         XDR_DESTROY(&xdrs);
  942 
  943         return (stat);
  944 }
  945 
  946 static bool_t
  947 svc_vc_null()
  948 {
  949 
  950         return (FALSE);
  951 }
  952 
  953 static int
  954 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
  955 {
  956         SVCXPRT *xprt = (SVCXPRT *) arg;
  957 
  958         if (soreadable(xprt->xp_socket))
  959                 xprt_active(xprt);
  960         return (SU_OK);
  961 }
  962 
  963 #if 0
  964 /*
  965  * Get the effective UID of the sending process. Used by rpcbind, keyserv
  966  * and rpc.yppasswdd on AF_LOCAL.
  967  */
  968 int
  969 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
  970         int sock, ret;
  971         gid_t egid;
  972         uid_t euid;
  973         struct sockaddr *sa;
  974 
  975         sock = transp->xp_fd;
  976         sa = (struct sockaddr *)transp->xp_rtaddr;
  977         if (sa->sa_family == AF_LOCAL) {
  978                 ret = getpeereid(sock, &euid, &egid);
  979                 if (ret == 0)
  980                         *uid = euid;
  981                 return (ret);
  982         } else
  983                 return (-1);
  984 }
  985 #endif

Cache object: f30b1e25ee757c8ab72bac026108f744


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.