The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/rpc/svc_vc.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: svc_vc.c,v 1.7 2000/08/03 00:01:53 fvdl Exp $  */
    2 
    3 /*-
    4  * Copyright (c) 2009, Sun Microsystems, Inc.
    5  * All rights reserved.
    6  *
    7  * Redistribution and use in source and binary forms, with or without 
    8  * modification, are permitted provided that the following conditions are met:
    9  * - Redistributions of source code must retain the above copyright notice, 
   10  *   this list of conditions and the following disclaimer.
   11  * - Redistributions in binary form must reproduce the above copyright notice, 
   12  *   this list of conditions and the following disclaimer in the documentation 
   13  *   and/or other materials provided with the distribution.
   14  * - Neither the name of Sun Microsystems, Inc. nor the names of its 
   15  *   contributors may be used to endorse or promote products derived 
   16  *   from this software without specific prior written permission.
   17  * 
   18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
   19  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
   20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
   21  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
   22  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
   23  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
   24  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
   25  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
   26  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
   27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
   28  * POSSIBILITY OF SUCH DAMAGE.
   29  */
   30 
   31 #if defined(LIBC_SCCS) && !defined(lint)
   32 static char *sccsid2 = "@(#)svc_tcp.c 1.21 87/08/11 Copyr 1984 Sun Micro";
   33 static char *sccsid = "@(#)svc_tcp.c    2.2 88/08/01 4.0 RPCSRC";
   34 #endif
   35 #include <sys/cdefs.h>
   36 __FBSDID("$FreeBSD: releng/10.2/sys/rpc/svc_vc.c 281520 2015-04-14 09:58:10Z mav $");
   37 
   38 /*
   39  * svc_vc.c, Server side for Connection Oriented based RPC. 
   40  *
   41  * Actually implements two flavors of transporter -
   42  * a tcp rendezvouser (a listner and connection establisher)
   43  * and a record/tcp stream.
   44  */
   45 
   46 #include <sys/param.h>
   47 #include <sys/lock.h>
   48 #include <sys/kernel.h>
   49 #include <sys/malloc.h>
   50 #include <sys/mbuf.h>
   51 #include <sys/mutex.h>
   52 #include <sys/proc.h>
   53 #include <sys/protosw.h>
   54 #include <sys/queue.h>
   55 #include <sys/socket.h>
   56 #include <sys/socketvar.h>
   57 #include <sys/sx.h>
   58 #include <sys/systm.h>
   59 #include <sys/uio.h>
   60 
   61 #include <net/vnet.h>
   62 
   63 #include <netinet/tcp.h>
   64 
   65 #include <rpc/rpc.h>
   66 
   67 #include <rpc/krpc.h>
   68 #include <rpc/rpc_com.h>
   69 
   70 #include <security/mac/mac_framework.h>
   71 
   72 static bool_t svc_vc_rendezvous_recv(SVCXPRT *, struct rpc_msg *,
   73     struct sockaddr **, struct mbuf **);
   74 static enum xprt_stat svc_vc_rendezvous_stat(SVCXPRT *);
   75 static void svc_vc_rendezvous_destroy(SVCXPRT *);
   76 static bool_t svc_vc_null(void);
   77 static void svc_vc_destroy(SVCXPRT *);
   78 static enum xprt_stat svc_vc_stat(SVCXPRT *);
   79 static bool_t svc_vc_ack(SVCXPRT *, uint32_t *);
   80 static bool_t svc_vc_recv(SVCXPRT *, struct rpc_msg *,
   81     struct sockaddr **, struct mbuf **);
   82 static bool_t svc_vc_reply(SVCXPRT *, struct rpc_msg *,
   83     struct sockaddr *, struct mbuf *, uint32_t *seq);
   84 static bool_t svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in);
   85 static bool_t svc_vc_rendezvous_control (SVCXPRT *xprt, const u_int rq,
   86     void *in);
   87 static void svc_vc_backchannel_destroy(SVCXPRT *);
   88 static enum xprt_stat svc_vc_backchannel_stat(SVCXPRT *);
   89 static bool_t svc_vc_backchannel_recv(SVCXPRT *, struct rpc_msg *,
   90     struct sockaddr **, struct mbuf **);
   91 static bool_t svc_vc_backchannel_reply(SVCXPRT *, struct rpc_msg *,
   92     struct sockaddr *, struct mbuf *, uint32_t *);
   93 static bool_t svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq,
   94     void *in);
   95 static SVCXPRT *svc_vc_create_conn(SVCPOOL *pool, struct socket *so,
   96     struct sockaddr *raddr);
   97 static int svc_vc_accept(struct socket *head, struct socket **sop);
   98 static int svc_vc_soupcall(struct socket *so, void *arg, int waitflag);
   99 
  100 static struct xp_ops svc_vc_rendezvous_ops = {
  101         .xp_recv =      svc_vc_rendezvous_recv,
  102         .xp_stat =      svc_vc_rendezvous_stat,
  103         .xp_reply =     (bool_t (*)(SVCXPRT *, struct rpc_msg *,
  104                 struct sockaddr *, struct mbuf *, uint32_t *))svc_vc_null,
  105         .xp_destroy =   svc_vc_rendezvous_destroy,
  106         .xp_control =   svc_vc_rendezvous_control
  107 };
  108 
  109 static struct xp_ops svc_vc_ops = {
  110         .xp_recv =      svc_vc_recv,
  111         .xp_stat =      svc_vc_stat,
  112         .xp_ack =       svc_vc_ack,
  113         .xp_reply =     svc_vc_reply,
  114         .xp_destroy =   svc_vc_destroy,
  115         .xp_control =   svc_vc_control
  116 };
  117 
  118 static struct xp_ops svc_vc_backchannel_ops = {
  119         .xp_recv =      svc_vc_backchannel_recv,
  120         .xp_stat =      svc_vc_backchannel_stat,
  121         .xp_reply =     svc_vc_backchannel_reply,
  122         .xp_destroy =   svc_vc_backchannel_destroy,
  123         .xp_control =   svc_vc_backchannel_control
  124 };
  125 
  126 /*
  127  * Usage:
  128  *      xprt = svc_vc_create(sock, send_buf_size, recv_buf_size);
  129  *
  130  * Creates, registers, and returns a (rpc) tcp based transporter.
  131  * Once *xprt is initialized, it is registered as a transporter
  132  * see (svc.h, xprt_register).  This routine returns
  133  * a NULL if a problem occurred.
  134  *
  135  * The filedescriptor passed in is expected to refer to a bound, but
  136  * not yet connected socket.
  137  *
  138  * Since streams do buffered io similar to stdio, the caller can specify
  139  * how big the send and receive buffers are via the second and third parms;
  140  * 0 => use the system default.
  141  */
  142 SVCXPRT *
  143 svc_vc_create(SVCPOOL *pool, struct socket *so, size_t sendsize,
  144     size_t recvsize)
  145 {
  146         SVCXPRT *xprt;
  147         struct sockaddr* sa;
  148         int error;
  149 
  150         SOCK_LOCK(so);
  151         if (so->so_state & (SS_ISCONNECTED|SS_ISDISCONNECTED)) {
  152                 SOCK_UNLOCK(so);
  153                 error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
  154                 if (error)
  155                         return (NULL);
  156                 xprt = svc_vc_create_conn(pool, so, sa);
  157                 free(sa, M_SONAME);
  158                 return (xprt);
  159         }
  160         SOCK_UNLOCK(so);
  161 
  162         xprt = svc_xprt_alloc();
  163         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  164         xprt->xp_pool = pool;
  165         xprt->xp_socket = so;
  166         xprt->xp_p1 = NULL;
  167         xprt->xp_p2 = NULL;
  168         xprt->xp_ops = &svc_vc_rendezvous_ops;
  169 
  170         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
  171         if (error) {
  172                 goto cleanup_svc_vc_create;
  173         }
  174 
  175         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
  176         free(sa, M_SONAME);
  177 
  178         xprt_register(xprt);
  179 
  180         solisten(so, -1, curthread);
  181 
  182         SOCKBUF_LOCK(&so->so_rcv);
  183         xprt->xp_upcallset = 1;
  184         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
  185         SOCKBUF_UNLOCK(&so->so_rcv);
  186 
  187         return (xprt);
  188 cleanup_svc_vc_create:
  189         if (xprt) {
  190                 sx_destroy(&xprt->xp_lock);
  191                 svc_xprt_free(xprt);
  192         }
  193         return (NULL);
  194 }
  195 
  196 /*
  197  * Create a new transport for a socket optained via soaccept().
  198  */
  199 SVCXPRT *
  200 svc_vc_create_conn(SVCPOOL *pool, struct socket *so, struct sockaddr *raddr)
  201 {
  202         SVCXPRT *xprt = NULL;
  203         struct cf_conn *cd = NULL;
  204         struct sockaddr* sa = NULL;
  205         struct sockopt opt;
  206         int one = 1;
  207         int error;
  208 
  209         bzero(&opt, sizeof(struct sockopt));
  210         opt.sopt_dir = SOPT_SET;
  211         opt.sopt_level = SOL_SOCKET;
  212         opt.sopt_name = SO_KEEPALIVE;
  213         opt.sopt_val = &one;
  214         opt.sopt_valsize = sizeof(one);
  215         error = sosetopt(so, &opt);
  216         if (error) {
  217                 return (NULL);
  218         }
  219 
  220         if (so->so_proto->pr_protocol == IPPROTO_TCP) {
  221                 bzero(&opt, sizeof(struct sockopt));
  222                 opt.sopt_dir = SOPT_SET;
  223                 opt.sopt_level = IPPROTO_TCP;
  224                 opt.sopt_name = TCP_NODELAY;
  225                 opt.sopt_val = &one;
  226                 opt.sopt_valsize = sizeof(one);
  227                 error = sosetopt(so, &opt);
  228                 if (error) {
  229                         return (NULL);
  230                 }
  231         }
  232 
  233         cd = mem_alloc(sizeof(*cd));
  234         cd->strm_stat = XPRT_IDLE;
  235 
  236         xprt = svc_xprt_alloc();
  237         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  238         xprt->xp_pool = pool;
  239         xprt->xp_socket = so;
  240         xprt->xp_p1 = cd;
  241         xprt->xp_p2 = NULL;
  242         xprt->xp_ops = &svc_vc_ops;
  243 
  244         /*
  245          * See http://www.connectathon.org/talks96/nfstcp.pdf - client
  246          * has a 5 minute timer, server has a 6 minute timer.
  247          */
  248         xprt->xp_idletimeout = 6 * 60;
  249 
  250         memcpy(&xprt->xp_rtaddr, raddr, raddr->sa_len);
  251 
  252         error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
  253         if (error)
  254                 goto cleanup_svc_vc_create;
  255 
  256         memcpy(&xprt->xp_ltaddr, sa, sa->sa_len);
  257         free(sa, M_SONAME);
  258 
  259         xprt_register(xprt);
  260 
  261         SOCKBUF_LOCK(&so->so_rcv);
  262         xprt->xp_upcallset = 1;
  263         soupcall_set(so, SO_RCV, svc_vc_soupcall, xprt);
  264         SOCKBUF_UNLOCK(&so->so_rcv);
  265 
  266         /*
  267          * Throw the transport into the active list in case it already
  268          * has some data buffered.
  269          */
  270         sx_xlock(&xprt->xp_lock);
  271         xprt_active(xprt);
  272         sx_xunlock(&xprt->xp_lock);
  273 
  274         return (xprt);
  275 cleanup_svc_vc_create:
  276         if (xprt) {
  277                 sx_destroy(&xprt->xp_lock);
  278                 svc_xprt_free(xprt);
  279         }
  280         if (cd)
  281                 mem_free(cd, sizeof(*cd));
  282         return (NULL);
  283 }
  284 
  285 /*
  286  * Create a new transport for a backchannel on a clnt_vc socket.
  287  */
  288 SVCXPRT *
  289 svc_vc_create_backchannel(SVCPOOL *pool)
  290 {
  291         SVCXPRT *xprt = NULL;
  292         struct cf_conn *cd = NULL;
  293 
  294         cd = mem_alloc(sizeof(*cd));
  295         cd->strm_stat = XPRT_IDLE;
  296 
  297         xprt = svc_xprt_alloc();
  298         sx_init(&xprt->xp_lock, "xprt->xp_lock");
  299         xprt->xp_pool = pool;
  300         xprt->xp_socket = NULL;
  301         xprt->xp_p1 = cd;
  302         xprt->xp_p2 = NULL;
  303         xprt->xp_ops = &svc_vc_backchannel_ops;
  304         return (xprt);
  305 }
  306 
  307 /*
  308  * This does all of the accept except the final call to soaccept. The
  309  * caller will call soaccept after dropping its locks (soaccept may
  310  * call malloc).
  311  */
  312 int
  313 svc_vc_accept(struct socket *head, struct socket **sop)
  314 {
  315         int error = 0;
  316         struct socket *so;
  317 
  318         if ((head->so_options & SO_ACCEPTCONN) == 0) {
  319                 error = EINVAL;
  320                 goto done;
  321         }
  322 #ifdef MAC
  323         error = mac_socket_check_accept(curthread->td_ucred, head);
  324         if (error != 0)
  325                 goto done;
  326 #endif
  327         ACCEPT_LOCK();
  328         if (TAILQ_EMPTY(&head->so_comp)) {
  329                 ACCEPT_UNLOCK();
  330                 error = EWOULDBLOCK;
  331                 goto done;
  332         }
  333         so = TAILQ_FIRST(&head->so_comp);
  334         KASSERT(!(so->so_qstate & SQ_INCOMP), ("svc_vc_accept: so SQ_INCOMP"));
  335         KASSERT(so->so_qstate & SQ_COMP, ("svc_vc_accept: so not SQ_COMP"));
  336 
  337         /*
  338          * Before changing the flags on the socket, we have to bump the
  339          * reference count.  Otherwise, if the protocol calls sofree(),
  340          * the socket will be released due to a zero refcount.
  341          * XXX might not need soref() since this is simpler than kern_accept.
  342          */
  343         SOCK_LOCK(so);                  /* soref() and so_state update */
  344         soref(so);                      /* file descriptor reference */
  345 
  346         TAILQ_REMOVE(&head->so_comp, so, so_list);
  347         head->so_qlen--;
  348         so->so_state |= (head->so_state & SS_NBIO);
  349         so->so_qstate &= ~SQ_COMP;
  350         so->so_head = NULL;
  351 
  352         SOCK_UNLOCK(so);
  353         ACCEPT_UNLOCK();
  354 
  355         *sop = so;
  356 
  357         /* connection has been removed from the listen queue */
  358         KNOTE_UNLOCKED(&head->so_rcv.sb_sel.si_note, 0);
  359 done:
  360         return (error);
  361 }
  362 
  363 /*ARGSUSED*/
  364 static bool_t
  365 svc_vc_rendezvous_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  366     struct sockaddr **addrp, struct mbuf **mp)
  367 {
  368         struct socket *so = NULL;
  369         struct sockaddr *sa = NULL;
  370         int error;
  371         SVCXPRT *new_xprt;
  372 
  373         /*
  374          * The socket upcall calls xprt_active() which will eventually
  375          * cause the server to call us here. We attempt to accept a
  376          * connection from the socket and turn it into a new
  377          * transport. If the accept fails, we have drained all pending
  378          * connections so we call xprt_inactive().
  379          */
  380         sx_xlock(&xprt->xp_lock);
  381 
  382         error = svc_vc_accept(xprt->xp_socket, &so);
  383 
  384         if (error == EWOULDBLOCK) {
  385                 /*
  386                  * We must re-test for new connections after taking
  387                  * the lock to protect us in the case where a new
  388                  * connection arrives after our call to accept fails
  389                  * with EWOULDBLOCK.
  390                  */
  391                 ACCEPT_LOCK();
  392                 if (TAILQ_EMPTY(&xprt->xp_socket->so_comp))
  393                         xprt_inactive_self(xprt);
  394                 ACCEPT_UNLOCK();
  395                 sx_xunlock(&xprt->xp_lock);
  396                 return (FALSE);
  397         }
  398 
  399         if (error) {
  400                 SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
  401                 if (xprt->xp_upcallset) {
  402                         xprt->xp_upcallset = 0;
  403                         soupcall_clear(xprt->xp_socket, SO_RCV);
  404                 }
  405                 SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
  406                 xprt_inactive_self(xprt);
  407                 sx_xunlock(&xprt->xp_lock);
  408                 return (FALSE);
  409         }
  410 
  411         sx_xunlock(&xprt->xp_lock);
  412 
  413         sa = 0;
  414         error = soaccept(so, &sa);
  415 
  416         if (error) {
  417                 /*
  418                  * XXX not sure if I need to call sofree or soclose here.
  419                  */
  420                 if (sa)
  421                         free(sa, M_SONAME);
  422                 return (FALSE);
  423         }
  424 
  425         /*
  426          * svc_vc_create_conn will call xprt_register - we don't need
  427          * to do anything with the new connection except derefence it.
  428          */
  429         new_xprt = svc_vc_create_conn(xprt->xp_pool, so, sa);
  430         if (!new_xprt) {
  431                 soclose(so);
  432         } else {
  433                 SVC_RELEASE(new_xprt);
  434         }
  435 
  436         free(sa, M_SONAME);
  437 
  438         return (FALSE); /* there is never an rpc msg to be processed */
  439 }
  440 
  441 /*ARGSUSED*/
  442 static enum xprt_stat
  443 svc_vc_rendezvous_stat(SVCXPRT *xprt)
  444 {
  445 
  446         return (XPRT_IDLE);
  447 }
  448 
  449 static void
  450 svc_vc_destroy_common(SVCXPRT *xprt)
  451 {
  452         SOCKBUF_LOCK(&xprt->xp_socket->so_rcv);
  453         if (xprt->xp_upcallset) {
  454                 xprt->xp_upcallset = 0;
  455                 soupcall_clear(xprt->xp_socket, SO_RCV);
  456         }
  457         SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv);
  458 
  459         if (xprt->xp_socket)
  460                 (void)soclose(xprt->xp_socket);
  461 
  462         if (xprt->xp_netid)
  463                 (void) mem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
  464         svc_xprt_free(xprt);
  465 }
  466 
  467 static void
  468 svc_vc_rendezvous_destroy(SVCXPRT *xprt)
  469 {
  470 
  471         svc_vc_destroy_common(xprt);
  472 }
  473 
  474 static void
  475 svc_vc_destroy(SVCXPRT *xprt)
  476 {
  477         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
  478 
  479         svc_vc_destroy_common(xprt);
  480 
  481         if (cd->mreq)
  482                 m_freem(cd->mreq);
  483         if (cd->mpending)
  484                 m_freem(cd->mpending);
  485         mem_free(cd, sizeof(*cd));
  486 }
  487 
  488 static void
  489 svc_vc_backchannel_destroy(SVCXPRT *xprt)
  490 {
  491         struct cf_conn *cd = (struct cf_conn *)xprt->xp_p1;
  492         struct mbuf *m, *m2;
  493 
  494         svc_xprt_free(xprt);
  495         m = cd->mreq;
  496         while (m != NULL) {
  497                 m2 = m;
  498                 m = m->m_nextpkt;
  499                 m_freem(m2);
  500         }
  501         mem_free(cd, sizeof(*cd));
  502 }
  503 
  504 /*ARGSUSED*/
  505 static bool_t
  506 svc_vc_control(SVCXPRT *xprt, const u_int rq, void *in)
  507 {
  508         return (FALSE);
  509 }
  510 
  511 static bool_t
  512 svc_vc_rendezvous_control(SVCXPRT *xprt, const u_int rq, void *in)
  513 {
  514 
  515         return (FALSE);
  516 }
  517 
  518 static bool_t
  519 svc_vc_backchannel_control(SVCXPRT *xprt, const u_int rq, void *in)
  520 {
  521 
  522         return (FALSE);
  523 }
  524 
  525 static enum xprt_stat
  526 svc_vc_stat(SVCXPRT *xprt)
  527 {
  528         struct cf_conn *cd;
  529 
  530         cd = (struct cf_conn *)(xprt->xp_p1);
  531 
  532         if (cd->strm_stat == XPRT_DIED)
  533                 return (XPRT_DIED);
  534 
  535         if (cd->mreq != NULL && cd->resid == 0 && cd->eor)
  536                 return (XPRT_MOREREQS);
  537 
  538         if (soreadable(xprt->xp_socket))
  539                 return (XPRT_MOREREQS);
  540 
  541         return (XPRT_IDLE);
  542 }
  543 
  544 static bool_t
  545 svc_vc_ack(SVCXPRT *xprt, uint32_t *ack)
  546 {
  547 
  548         *ack = atomic_load_acq_32(&xprt->xp_snt_cnt);
  549         *ack -= xprt->xp_socket->so_snd.sb_cc;
  550         return (TRUE);
  551 }
  552 
  553 static enum xprt_stat
  554 svc_vc_backchannel_stat(SVCXPRT *xprt)
  555 {
  556         struct cf_conn *cd;
  557 
  558         cd = (struct cf_conn *)(xprt->xp_p1);
  559 
  560         if (cd->mreq != NULL)
  561                 return (XPRT_MOREREQS);
  562 
  563         return (XPRT_IDLE);
  564 }
  565 
  566 /*
  567  * If we have an mbuf chain in cd->mpending, try to parse a record from it,
  568  * leaving the result in cd->mreq. If we don't have a complete record, leave
  569  * the partial result in cd->mreq and try to read more from the socket.
  570  */
  571 static int
  572 svc_vc_process_pending(SVCXPRT *xprt)
  573 {
  574         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  575         struct socket *so = xprt->xp_socket;
  576         struct mbuf *m;
  577 
  578         /*
  579          * If cd->resid is non-zero, we have part of the
  580          * record already, otherwise we are expecting a record
  581          * marker.
  582          */
  583         if (!cd->resid && cd->mpending) {
  584                 /*
  585                  * See if there is enough data buffered to
  586                  * make up a record marker. Make sure we can
  587                  * handle the case where the record marker is
  588                  * split across more than one mbuf.
  589                  */
  590                 size_t n = 0;
  591                 uint32_t header;
  592 
  593                 m = cd->mpending;
  594                 while (n < sizeof(uint32_t) && m) {
  595                         n += m->m_len;
  596                         m = m->m_next;
  597                 }
  598                 if (n < sizeof(uint32_t)) {
  599                         so->so_rcv.sb_lowat = sizeof(uint32_t) - n;
  600                         return (FALSE);
  601                 }
  602                 m_copydata(cd->mpending, 0, sizeof(header),
  603                     (char *)&header);
  604                 header = ntohl(header);
  605                 cd->eor = (header & 0x80000000) != 0;
  606                 cd->resid = header & 0x7fffffff;
  607                 m_adj(cd->mpending, sizeof(uint32_t));
  608         }
  609 
  610         /*
  611          * Start pulling off mbufs from cd->mpending
  612          * until we either have a complete record or
  613          * we run out of data. We use m_split to pull
  614          * data - it will pull as much as possible and
  615          * split the last mbuf if necessary.
  616          */
  617         while (cd->mpending && cd->resid) {
  618                 m = cd->mpending;
  619                 if (cd->mpending->m_next
  620                     || cd->mpending->m_len > cd->resid)
  621                         cd->mpending = m_split(cd->mpending,
  622                             cd->resid, M_WAITOK);
  623                 else
  624                         cd->mpending = NULL;
  625                 if (cd->mreq)
  626                         m_last(cd->mreq)->m_next = m;
  627                 else
  628                         cd->mreq = m;
  629                 while (m) {
  630                         cd->resid -= m->m_len;
  631                         m = m->m_next;
  632                 }
  633         }
  634 
  635         /*
  636          * Block receive upcalls if we have more data pending,
  637          * otherwise report our need.
  638          */
  639         if (cd->mpending)
  640                 so->so_rcv.sb_lowat = INT_MAX;
  641         else
  642                 so->so_rcv.sb_lowat =
  643                     imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2));
  644         return (TRUE);
  645 }
  646 
  647 static bool_t
  648 svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  649     struct sockaddr **addrp, struct mbuf **mp)
  650 {
  651         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  652         struct uio uio;
  653         struct mbuf *m;
  654         struct socket* so = xprt->xp_socket;
  655         XDR xdrs;
  656         int error, rcvflag;
  657         uint32_t xid_plus_direction[2];
  658 
  659         /*
  660          * Serialise access to the socket and our own record parsing
  661          * state.
  662          */
  663         sx_xlock(&xprt->xp_lock);
  664 
  665         for (;;) {
  666                 /* If we have no request ready, check pending queue. */
  667                 while (cd->mpending &&
  668                     (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) {
  669                         if (!svc_vc_process_pending(xprt))
  670                                 break;
  671                 }
  672 
  673                 /* Process and return complete request in cd->mreq. */
  674                 if (cd->mreq != NULL && cd->resid == 0 && cd->eor) {
  675 
  676                         /*
  677                          * Now, check for a backchannel reply.
  678                          * The XID is in the first uint32_t of the reply
  679                          * and the message direction is the second one.
  680                          */
  681                         if ((cd->mreq->m_len >= sizeof(xid_plus_direction) ||
  682                             m_length(cd->mreq, NULL) >=
  683                             sizeof(xid_plus_direction)) &&
  684                             xprt->xp_p2 != NULL) {
  685                                 m_copydata(cd->mreq, 0,
  686                                     sizeof(xid_plus_direction),
  687                                     (char *)xid_plus_direction);
  688                                 xid_plus_direction[0] =
  689                                     ntohl(xid_plus_direction[0]);
  690                                 xid_plus_direction[1] =
  691                                     ntohl(xid_plus_direction[1]);
  692                                 /* Check message direction. */
  693                                 if (xid_plus_direction[1] == REPLY) {
  694                                         clnt_bck_svccall(xprt->xp_p2,
  695                                             cd->mreq,
  696                                             xid_plus_direction[0]);
  697                                         cd->mreq = NULL;
  698                                         continue;
  699                                 }
  700                         }
  701 
  702                         xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE);
  703                         cd->mreq = NULL;
  704 
  705                         /* Check for next request in a pending queue. */
  706                         svc_vc_process_pending(xprt);
  707                         if (cd->mreq == NULL || cd->resid != 0) {
  708                                 SOCKBUF_LOCK(&so->so_rcv);
  709                                 if (!soreadable(so))
  710                                         xprt_inactive_self(xprt);
  711                                 SOCKBUF_UNLOCK(&so->so_rcv);
  712                         }
  713 
  714                         sx_xunlock(&xprt->xp_lock);
  715 
  716                         if (! xdr_callmsg(&xdrs, msg)) {
  717                                 XDR_DESTROY(&xdrs);
  718                                 return (FALSE);
  719                         }
  720 
  721                         *addrp = NULL;
  722                         *mp = xdrmbuf_getall(&xdrs);
  723                         XDR_DESTROY(&xdrs);
  724 
  725                         return (TRUE);
  726                 }
  727 
  728                 /*
  729                  * The socket upcall calls xprt_active() which will eventually
  730                  * cause the server to call us here. We attempt to
  731                  * read as much as possible from the socket and put
  732                  * the result in cd->mpending. If the read fails,
  733                  * we have drained both cd->mpending and the socket so
  734                  * we can call xprt_inactive().
  735                  */
  736                 uio.uio_resid = 1000000000;
  737                 uio.uio_td = curthread;
  738                 m = NULL;
  739                 rcvflag = MSG_DONTWAIT;
  740                 error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
  741 
  742                 if (error == EWOULDBLOCK) {
  743                         /*
  744                          * We must re-test for readability after
  745                          * taking the lock to protect us in the case
  746                          * where a new packet arrives on the socket
  747                          * after our call to soreceive fails with
  748                          * EWOULDBLOCK.
  749                          */
  750                         SOCKBUF_LOCK(&so->so_rcv);
  751                         if (!soreadable(so))
  752                                 xprt_inactive_self(xprt);
  753                         SOCKBUF_UNLOCK(&so->so_rcv);
  754                         sx_xunlock(&xprt->xp_lock);
  755                         return (FALSE);
  756                 }
  757 
  758                 if (error) {
  759                         SOCKBUF_LOCK(&so->so_rcv);
  760                         if (xprt->xp_upcallset) {
  761                                 xprt->xp_upcallset = 0;
  762                                 soupcall_clear(so, SO_RCV);
  763                         }
  764                         SOCKBUF_UNLOCK(&so->so_rcv);
  765                         xprt_inactive_self(xprt);
  766                         cd->strm_stat = XPRT_DIED;
  767                         sx_xunlock(&xprt->xp_lock);
  768                         return (FALSE);
  769                 }
  770 
  771                 if (!m) {
  772                         /*
  773                          * EOF - the other end has closed the socket.
  774                          */
  775                         xprt_inactive_self(xprt);
  776                         cd->strm_stat = XPRT_DIED;
  777                         sx_xunlock(&xprt->xp_lock);
  778                         return (FALSE);
  779                 }
  780 
  781                 if (cd->mpending)
  782                         m_last(cd->mpending)->m_next = m;
  783                 else
  784                         cd->mpending = m;
  785         }
  786 }
  787 
  788 static bool_t
  789 svc_vc_backchannel_recv(SVCXPRT *xprt, struct rpc_msg *msg,
  790     struct sockaddr **addrp, struct mbuf **mp)
  791 {
  792         struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1;
  793         struct ct_data *ct;
  794         struct mbuf *m;
  795         XDR xdrs;
  796 
  797         sx_xlock(&xprt->xp_lock);
  798         ct = (struct ct_data *)xprt->xp_p2;
  799         if (ct == NULL) {
  800                 sx_xunlock(&xprt->xp_lock);
  801                 return (FALSE);
  802         }
  803         mtx_lock(&ct->ct_lock);
  804         m = cd->mreq;
  805         if (m == NULL) {
  806                 xprt_inactive_self(xprt);
  807                 mtx_unlock(&ct->ct_lock);
  808                 sx_xunlock(&xprt->xp_lock);
  809                 return (FALSE);
  810         }
  811         cd->mreq = m->m_nextpkt;
  812         mtx_unlock(&ct->ct_lock);
  813         sx_xunlock(&xprt->xp_lock);
  814 
  815         xdrmbuf_create(&xdrs, m, XDR_DECODE);
  816         if (! xdr_callmsg(&xdrs, msg)) {
  817                 XDR_DESTROY(&xdrs);
  818                 return (FALSE);
  819         }
  820         *addrp = NULL;
  821         *mp = xdrmbuf_getall(&xdrs);
  822         XDR_DESTROY(&xdrs);
  823         return (TRUE);
  824 }
  825 
  826 static bool_t
  827 svc_vc_reply(SVCXPRT *xprt, struct rpc_msg *msg,
  828     struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
  829 {
  830         XDR xdrs;
  831         struct mbuf *mrep;
  832         bool_t stat = TRUE;
  833         int error, len;
  834 
  835         /*
  836          * Leave space for record mark.
  837          */
  838         mrep = m_gethdr(M_WAITOK, MT_DATA);
  839         mrep->m_data += sizeof(uint32_t);
  840 
  841         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
  842 
  843         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
  844             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
  845                 if (!xdr_replymsg(&xdrs, msg))
  846                         stat = FALSE;
  847                 else
  848                         xdrmbuf_append(&xdrs, m);
  849         } else {
  850                 stat = xdr_replymsg(&xdrs, msg);
  851         }
  852 
  853         if (stat) {
  854                 m_fixhdr(mrep);
  855 
  856                 /*
  857                  * Prepend a record marker containing the reply length.
  858                  */
  859                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
  860                 len = mrep->m_pkthdr.len;
  861                 *mtod(mrep, uint32_t *) =
  862                         htonl(0x80000000 | (len - sizeof(uint32_t)));
  863                 atomic_add_acq_32(&xprt->xp_snd_cnt, len);
  864                 error = sosend(xprt->xp_socket, NULL, NULL, mrep, NULL,
  865                     0, curthread);
  866                 if (!error) {
  867                         atomic_add_rel_32(&xprt->xp_snt_cnt, len);
  868                         if (seq)
  869                                 *seq = xprt->xp_snd_cnt;
  870                         stat = TRUE;
  871                 } else
  872                         atomic_subtract_32(&xprt->xp_snd_cnt, len);
  873         } else {
  874                 m_freem(mrep);
  875         }
  876 
  877         XDR_DESTROY(&xdrs);
  878 
  879         return (stat);
  880 }
  881 
  882 static bool_t
  883 svc_vc_backchannel_reply(SVCXPRT *xprt, struct rpc_msg *msg,
  884     struct sockaddr *addr, struct mbuf *m, uint32_t *seq)
  885 {
  886         struct ct_data *ct;
  887         XDR xdrs;
  888         struct mbuf *mrep;
  889         bool_t stat = TRUE;
  890         int error;
  891 
  892         /*
  893          * Leave space for record mark.
  894          */
  895         mrep = m_gethdr(M_WAITOK, MT_DATA);
  896         mrep->m_data += sizeof(uint32_t);
  897 
  898         xdrmbuf_create(&xdrs, mrep, XDR_ENCODE);
  899 
  900         if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
  901             msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
  902                 if (!xdr_replymsg(&xdrs, msg))
  903                         stat = FALSE;
  904                 else
  905                         xdrmbuf_append(&xdrs, m);
  906         } else {
  907                 stat = xdr_replymsg(&xdrs, msg);
  908         }
  909 
  910         if (stat) {
  911                 m_fixhdr(mrep);
  912 
  913                 /*
  914                  * Prepend a record marker containing the reply length.
  915                  */
  916                 M_PREPEND(mrep, sizeof(uint32_t), M_WAITOK);
  917                 *mtod(mrep, uint32_t *) =
  918                         htonl(0x80000000 | (mrep->m_pkthdr.len
  919                                 - sizeof(uint32_t)));
  920                 sx_xlock(&xprt->xp_lock);
  921                 ct = (struct ct_data *)xprt->xp_p2;
  922                 if (ct != NULL)
  923                         error = sosend(ct->ct_socket, NULL, NULL, mrep, NULL,
  924                             0, curthread);
  925                 else
  926                         error = EPIPE;
  927                 sx_xunlock(&xprt->xp_lock);
  928                 if (!error) {
  929                         stat = TRUE;
  930                 }
  931         } else {
  932                 m_freem(mrep);
  933         }
  934 
  935         XDR_DESTROY(&xdrs);
  936 
  937         return (stat);
  938 }
  939 
  940 static bool_t
  941 svc_vc_null()
  942 {
  943 
  944         return (FALSE);
  945 }
  946 
  947 static int
  948 svc_vc_soupcall(struct socket *so, void *arg, int waitflag)
  949 {
  950         SVCXPRT *xprt = (SVCXPRT *) arg;
  951 
  952         if (soreadable(xprt->xp_socket))
  953                 xprt_active(xprt);
  954         return (SU_OK);
  955 }
  956 
  957 #if 0
  958 /*
  959  * Get the effective UID of the sending process. Used by rpcbind, keyserv
  960  * and rpc.yppasswdd on AF_LOCAL.
  961  */
  962 int
  963 __rpc_get_local_uid(SVCXPRT *transp, uid_t *uid) {
  964         int sock, ret;
  965         gid_t egid;
  966         uid_t euid;
  967         struct sockaddr *sa;
  968 
  969         sock = transp->xp_fd;
  970         sa = (struct sockaddr *)transp->xp_rtaddr;
  971         if (sa->sa_family == AF_LOCAL) {
  972                 ret = getpeereid(sock, &euid, &egid);
  973                 if (ret == 0)
  974                         *uid = euid;
  975                 return (ret);
  976         } else
  977                 return (-1);
  978 }
  979 #endif

Cache object: 5fc123f90888a91a7c51583eab57393c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.