The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/sys_pipe.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*      $NetBSD: sys_pipe.c,v 1.158 2021/10/11 01:07:36 thorpej Exp $   */
    2 
    3 /*-
    4  * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc.
    5  * All rights reserved.
    6  *
    7  * This code is derived from software contributed to The NetBSD Foundation
    8  * by Paul Kranenburg, and by Andrew Doran.
    9  *
   10  * Redistribution and use in source and binary forms, with or without
   11  * modification, are permitted provided that the following conditions
   12  * are met:
   13  * 1. Redistributions of source code must retain the above copyright
   14  *    notice, this list of conditions and the following disclaimer.
   15  * 2. Redistributions in binary form must reproduce the above copyright
   16  *    notice, this list of conditions and the following disclaimer in the
   17  *    documentation and/or other materials provided with the distribution.
   18  *
   19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
   20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
   21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
   22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
   23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
   24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
   25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
   26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
   27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
   28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   29  * POSSIBILITY OF SUCH DAMAGE.
   30  */
   31 
   32 /*
   33  * Copyright (c) 1996 John S. Dyson
   34  * All rights reserved.
   35  *
   36  * Redistribution and use in source and binary forms, with or without
   37  * modification, are permitted provided that the following conditions
   38  * are met:
   39  * 1. Redistributions of source code must retain the above copyright
   40  *    notice immediately at the beginning of the file, without modification,
   41  *    this list of conditions, and the following disclaimer.
   42  * 2. Redistributions in binary form must reproduce the above copyright
   43  *    notice, this list of conditions and the following disclaimer in the
   44  *    documentation and/or other materials provided with the distribution.
   45  * 3. Absolutely no warranty of function or purpose is made by the author
   46  *    John S. Dyson.
   47  * 4. Modifications may be freely made to this file if the above conditions
   48  *    are met.
   49  */
   50 
   51 /*
   52  * This file contains a high-performance replacement for the socket-based
   53  * pipes scheme originally used.  It does not support all features of
   54  * sockets, but does do everything that pipes normally do.
   55  *
   56  * This code has two modes of operation, a small write mode and a large
   57  * write mode.  The small write mode acts like conventional pipes with
   58  * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
   59  * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
   60  * and PIPE_SIZE in size it is mapped read-only into the kernel address space
   61  * using the UVM page loan facility from where the receiving process can copy
   62  * the data directly from the pages in the sending process.
   63  *
   64  * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
   65  * happen for small transfers so that the system will not spend all of
   66  * its time context switching.  PIPE_SIZE is constrained by the
   67  * amount of kernel virtual memory.
   68  */
   69 
   70 #include <sys/cdefs.h>
   71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.158 2021/10/11 01:07:36 thorpej Exp $");
   72 
   73 #include <sys/param.h>
   74 #include <sys/systm.h>
   75 #include <sys/proc.h>
   76 #include <sys/fcntl.h>
   77 #include <sys/file.h>
   78 #include <sys/filedesc.h>
   79 #include <sys/filio.h>
   80 #include <sys/kernel.h>
   81 #include <sys/ttycom.h>
   82 #include <sys/stat.h>
   83 #include <sys/poll.h>
   84 #include <sys/signalvar.h>
   85 #include <sys/vnode.h>
   86 #include <sys/uio.h>
   87 #include <sys/select.h>
   88 #include <sys/mount.h>
   89 #include <sys/syscallargs.h>
   90 #include <sys/sysctl.h>
   91 #include <sys/kauth.h>
   92 #include <sys/atomic.h>
   93 #include <sys/pipe.h>
   94 
   95 static int      pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int);
   96 static int      pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int);
   97 static int      pipe_close(file_t *);
   98 static int      pipe_poll(file_t *, int);
   99 static int      pipe_kqfilter(file_t *, struct knote *);
  100 static int      pipe_stat(file_t *, struct stat *);
  101 static int      pipe_ioctl(file_t *, u_long, void *);
  102 static void     pipe_restart(file_t *);
  103 
  104 static const struct fileops pipeops = {
  105         .fo_name = "pipe",
  106         .fo_read = pipe_read,
  107         .fo_write = pipe_write,
  108         .fo_ioctl = pipe_ioctl,
  109         .fo_fcntl = fnullop_fcntl,
  110         .fo_poll = pipe_poll,
  111         .fo_stat = pipe_stat,
  112         .fo_close = pipe_close,
  113         .fo_kqfilter = pipe_kqfilter,
  114         .fo_restart = pipe_restart,
  115 };
  116 
  117 /*
  118  * Default pipe buffer size(s), this can be kind-of large now because pipe
  119  * space is pageable.  The pipe code will try to maintain locality of
  120  * reference for performance reasons, so small amounts of outstanding I/O
  121  * will not wipe the cache.
  122  */
  123 #define MINPIPESIZE     (PIPE_SIZE / 3)
  124 #define MAXPIPESIZE     (2 * PIPE_SIZE / 3)
  125 
  126 /*
  127  * Limit the number of "big" pipes
  128  */
  129 #define LIMITBIGPIPES   32
  130 static u_int    maxbigpipes = LIMITBIGPIPES;
  131 static u_int    nbigpipe = 0;
  132 
  133 /*
  134  * Amount of KVA consumed by pipe buffers.
  135  */
  136 static u_int    amountpipekva = 0;
  137 
  138 static void     pipeclose(struct pipe *);
  139 static void     pipe_free_kmem(struct pipe *);
  140 static int      pipe_create(struct pipe **, pool_cache_t);
  141 static int      pipelock(struct pipe *, bool);
  142 static inline void pipeunlock(struct pipe *);
  143 static void     pipeselwakeup(struct pipe *, struct pipe *, int);
  144 static int      pipespace(struct pipe *, int);
  145 static int      pipe_ctor(void *, void *, int);
  146 static void     pipe_dtor(void *, void *);
  147 
  148 static pool_cache_t     pipe_wr_cache;
  149 static pool_cache_t     pipe_rd_cache;
  150 
  151 void
  152 pipe_init(void)
  153 {
  154 
  155         /* Writer side is not automatically allocated KVA. */
  156         pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr",
  157             NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL);
  158         KASSERT(pipe_wr_cache != NULL);
  159 
  160         /* Reader side gets preallocated KVA. */
  161         pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd",
  162             NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1);
  163         KASSERT(pipe_rd_cache != NULL);
  164 }
  165 
  166 static int
  167 pipe_ctor(void *arg, void *obj, int flags)
  168 {
  169         struct pipe *pipe;
  170         vaddr_t va;
  171 
  172         pipe = obj;
  173 
  174         memset(pipe, 0, sizeof(struct pipe));
  175         if (arg != NULL) {
  176                 /* Preallocate space. */
  177                 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0,
  178                     UVM_KMF_PAGEABLE | UVM_KMF_WAITVA);
  179                 KASSERT(va != 0);
  180                 pipe->pipe_kmem = va;
  181                 atomic_add_int(&amountpipekva, PIPE_SIZE);
  182         }
  183         cv_init(&pipe->pipe_rcv, "pipe_rd");
  184         cv_init(&pipe->pipe_wcv, "pipe_wr");
  185         cv_init(&pipe->pipe_draincv, "pipe_drn");
  186         cv_init(&pipe->pipe_lkcv, "pipe_lk");
  187         selinit(&pipe->pipe_sel);
  188         pipe->pipe_state = PIPE_SIGNALR;
  189 
  190         return 0;
  191 }
  192 
  193 static void
  194 pipe_dtor(void *arg, void *obj)
  195 {
  196         struct pipe *pipe;
  197 
  198         pipe = obj;
  199 
  200         cv_destroy(&pipe->pipe_rcv);
  201         cv_destroy(&pipe->pipe_wcv);
  202         cv_destroy(&pipe->pipe_draincv);
  203         cv_destroy(&pipe->pipe_lkcv);
  204         seldestroy(&pipe->pipe_sel);
  205         if (pipe->pipe_kmem != 0) {
  206                 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE,
  207                     UVM_KMF_PAGEABLE);
  208                 atomic_add_int(&amountpipekva, -PIPE_SIZE);
  209         }
  210 }
  211 
  212 /*
  213  * The pipe system call for the DTYPE_PIPE type of pipes
  214  */
  215 int
  216 pipe1(struct lwp *l, int *fildes, int flags)
  217 {
  218         struct pipe *rpipe, *wpipe;
  219         file_t *rf, *wf;
  220         int fd, error;
  221         proc_t *p;
  222 
  223         if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE))
  224                 return EINVAL;
  225         p = curproc;
  226         rpipe = wpipe = NULL;
  227         if ((error = pipe_create(&rpipe, pipe_rd_cache)) ||
  228             (error = pipe_create(&wpipe, pipe_wr_cache))) {
  229                 goto free2;
  230         }
  231         rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
  232         wpipe->pipe_lock = rpipe->pipe_lock;
  233         mutex_obj_hold(wpipe->pipe_lock);
  234 
  235         error = fd_allocfile(&rf, &fd);
  236         if (error)
  237                 goto free2;
  238         fildes[0] = fd;
  239 
  240         error = fd_allocfile(&wf, &fd);
  241         if (error)
  242                 goto free3;
  243         fildes[1] = fd;
  244 
  245         rf->f_flag = FREAD | flags;
  246         rf->f_type = DTYPE_PIPE;
  247         rf->f_pipe = rpipe;
  248         rf->f_ops = &pipeops;
  249         fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0);
  250 
  251         wf->f_flag = FWRITE | flags;
  252         wf->f_type = DTYPE_PIPE;
  253         wf->f_pipe = wpipe;
  254         wf->f_ops = &pipeops;
  255         fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0);
  256 
  257         rpipe->pipe_peer = wpipe;
  258         wpipe->pipe_peer = rpipe;
  259 
  260         fd_affix(p, rf, fildes[0]);
  261         fd_affix(p, wf, fildes[1]);
  262         return (0);
  263 free3:
  264         fd_abort(p, rf, fildes[0]);
  265 free2:
  266         pipeclose(wpipe);
  267         pipeclose(rpipe);
  268 
  269         return (error);
  270 }
  271 
  272 /*
  273  * Allocate kva for pipe circular buffer, the space is pageable
  274  * This routine will 'realloc' the size of a pipe safely, if it fails
  275  * it will retain the old buffer.
  276  * If it fails it will return ENOMEM.
  277  */
  278 static int
  279 pipespace(struct pipe *pipe, int size)
  280 {
  281         void *buffer;
  282 
  283         /*
  284          * Allocate pageable virtual address space.  Physical memory is
  285          * allocated on demand.
  286          */
  287         if (size == PIPE_SIZE && pipe->pipe_kmem != 0) {
  288                 buffer = (void *)pipe->pipe_kmem;
  289         } else {
  290                 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size),
  291                     0, UVM_KMF_PAGEABLE);
  292                 if (buffer == NULL)
  293                         return (ENOMEM);
  294                 atomic_add_int(&amountpipekva, size);
  295         }
  296 
  297         /* free old resources if we're resizing */
  298         pipe_free_kmem(pipe);
  299         pipe->pipe_buffer.buffer = buffer;
  300         pipe->pipe_buffer.size = size;
  301         pipe->pipe_buffer.in = 0;
  302         pipe->pipe_buffer.out = 0;
  303         pipe->pipe_buffer.cnt = 0;
  304         return (0);
  305 }
  306 
  307 /*
  308  * Initialize and allocate VM and memory for pipe.
  309  */
  310 static int
  311 pipe_create(struct pipe **pipep, pool_cache_t cache)
  312 {
  313         struct pipe *pipe;
  314         int error;
  315 
  316         pipe = pool_cache_get(cache, PR_WAITOK);
  317         KASSERT(pipe != NULL);
  318         *pipep = pipe;
  319         error = 0;
  320         getnanotime(&pipe->pipe_btime);
  321         pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime;
  322         pipe->pipe_lock = NULL;
  323         if (cache == pipe_rd_cache) {
  324                 error = pipespace(pipe, PIPE_SIZE);
  325         } else {
  326                 pipe->pipe_buffer.buffer = NULL;
  327                 pipe->pipe_buffer.size = 0;
  328                 pipe->pipe_buffer.in = 0;
  329                 pipe->pipe_buffer.out = 0;
  330                 pipe->pipe_buffer.cnt = 0;
  331         }
  332         return error;
  333 }
  334 
  335 /*
  336  * Lock a pipe for I/O, blocking other access
  337  * Called with pipe spin lock held.
  338  */
  339 static int
  340 pipelock(struct pipe *pipe, bool catch_p)
  341 {
  342         int error;
  343 
  344         KASSERT(mutex_owned(pipe->pipe_lock));
  345 
  346         while (pipe->pipe_state & PIPE_LOCKFL) {
  347                 pipe->pipe_waiters++;
  348                 KASSERT(pipe->pipe_waiters != 0); /* just in case */
  349                 if (catch_p) {
  350                         error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock);
  351                         if (error != 0) {
  352                                 KASSERT(pipe->pipe_waiters > 0);
  353                                 pipe->pipe_waiters--;
  354                                 return error;
  355                         }
  356                 } else
  357                         cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock);
  358                 KASSERT(pipe->pipe_waiters > 0);
  359                 pipe->pipe_waiters--;
  360         }
  361 
  362         pipe->pipe_state |= PIPE_LOCKFL;
  363 
  364         return 0;
  365 }
  366 
  367 /*
  368  * unlock a pipe I/O lock
  369  */
  370 static inline void
  371 pipeunlock(struct pipe *pipe)
  372 {
  373 
  374         KASSERT(pipe->pipe_state & PIPE_LOCKFL);
  375 
  376         pipe->pipe_state &= ~PIPE_LOCKFL;
  377         if (pipe->pipe_waiters > 0) {
  378                 cv_signal(&pipe->pipe_lkcv);
  379         }
  380 }
  381 
  382 /*
  383  * Select/poll wakup. This also sends SIGIO to peer connected to
  384  * 'sigpipe' side of pipe.
  385  */
  386 static void
  387 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code)
  388 {
  389         int band;
  390 
  391         switch (code) {
  392         case POLL_IN:
  393                 band = POLLIN|POLLRDNORM;
  394                 break;
  395         case POLL_OUT:
  396                 band = POLLOUT|POLLWRNORM;
  397                 break;
  398         case POLL_HUP:
  399                 band = POLLHUP;
  400                 break;
  401         case POLL_ERR:
  402                 band = POLLERR;
  403                 break;
  404         default:
  405                 band = 0;
  406 #ifdef DIAGNOSTIC
  407                 printf("bad siginfo code %d in pipe notification.\n", code);
  408 #endif
  409                 break;
  410         }
  411 
  412         selnotify(&selp->pipe_sel, band, NOTE_SUBMIT);
  413 
  414         if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0)
  415                 return;
  416 
  417         fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp);
  418 }
  419 
  420 static int
  421 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
  422     int flags)
  423 {
  424         struct pipe *rpipe = fp->f_pipe;
  425         struct pipebuf *bp = &rpipe->pipe_buffer;
  426         kmutex_t *lock = rpipe->pipe_lock;
  427         int error;
  428         size_t nread = 0;
  429         size_t size;
  430         size_t ocnt;
  431         unsigned int wakeup_state = 0;
  432 
  433         mutex_enter(lock);
  434         ++rpipe->pipe_busy;
  435         ocnt = bp->cnt;
  436 
  437 again:
  438         error = pipelock(rpipe, true);
  439         if (error)
  440                 goto unlocked_error;
  441 
  442         while (uio->uio_resid) {
  443                 /*
  444                  * Normal pipe buffer receive.
  445                  */
  446                 if (bp->cnt > 0) {
  447                         size = bp->size - bp->out;
  448                         if (size > bp->cnt)
  449                                 size = bp->cnt;
  450                         if (size > uio->uio_resid)
  451                                 size = uio->uio_resid;
  452 
  453                         mutex_exit(lock);
  454                         error = uiomove((char *)bp->buffer + bp->out, size, uio);
  455                         mutex_enter(lock);
  456                         if (error)
  457                                 break;
  458 
  459                         bp->out += size;
  460                         if (bp->out >= bp->size)
  461                                 bp->out = 0;
  462 
  463                         bp->cnt -= size;
  464 
  465                         /*
  466                          * If there is no more to read in the pipe, reset
  467                          * its pointers to the beginning.  This improves
  468                          * cache hit stats.
  469                          */
  470                         if (bp->cnt == 0) {
  471                                 bp->in = 0;
  472                                 bp->out = 0;
  473                         }
  474                         nread += size;
  475                         continue;
  476                 }
  477 
  478                 /*
  479                  * Break if some data was read.
  480                  */
  481                 if (nread > 0)
  482                         break;
  483 
  484                 /*
  485                  * Detect EOF condition.
  486                  * Read returns 0 on EOF, no need to set error.
  487                  */
  488                 if (rpipe->pipe_state & PIPE_EOF)
  489                         break;
  490 
  491                 /*
  492                  * Don't block on non-blocking I/O.
  493                  */
  494                 if (fp->f_flag & FNONBLOCK) {
  495                         error = EAGAIN;
  496                         break;
  497                 }
  498 
  499                 /*
  500                  * Unlock the pipe buffer for our remaining processing.
  501                  * We will either break out with an error or we will
  502                  * sleep and relock to loop.
  503                  */
  504                 pipeunlock(rpipe);
  505 
  506 #if 1   /* XXX (dsl) I'm sure these aren't needed here ... */
  507                 /*
  508                  * We want to read more, wake up select/poll.
  509                  */
  510                 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
  511 
  512                 /*
  513                  * If the "write-side" is blocked, wake it up now.
  514                  */
  515                 cv_broadcast(&rpipe->pipe_wcv);
  516 #endif
  517 
  518                 if (wakeup_state & PIPE_RESTART) {
  519                         error = ERESTART;
  520                         goto unlocked_error;
  521                 }
  522 
  523                 /* Now wait until the pipe is filled */
  524                 error = cv_wait_sig(&rpipe->pipe_rcv, lock);
  525                 if (error != 0)
  526                         goto unlocked_error;
  527                 wakeup_state = rpipe->pipe_state;
  528                 goto again;
  529         }
  530 
  531         if (error == 0)
  532                 getnanotime(&rpipe->pipe_atime);
  533         pipeunlock(rpipe);
  534 
  535 unlocked_error:
  536         --rpipe->pipe_busy;
  537         if (rpipe->pipe_busy == 0) {
  538                 rpipe->pipe_state &= ~PIPE_RESTART;
  539                 cv_broadcast(&rpipe->pipe_draincv);
  540         }
  541         if (bp->cnt < MINPIPESIZE) {
  542                 cv_broadcast(&rpipe->pipe_wcv);
  543         }
  544 
  545         /*
  546          * If anything was read off the buffer, signal to the writer it's
  547          * possible to write more data. Also send signal if we are here for the
  548          * first time after last write.
  549          */
  550         if ((bp->size - bp->cnt) >= PIPE_BUF
  551             && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) {
  552                 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
  553                 rpipe->pipe_state &= ~PIPE_SIGNALR;
  554         }
  555 
  556         mutex_exit(lock);
  557         return (error);
  558 }
  559 
  560 static int
  561 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
  562     int flags)
  563 {
  564         struct pipe *wpipe, *rpipe;
  565         struct pipebuf *bp;
  566         kmutex_t *lock;
  567         int error;
  568         unsigned int wakeup_state = 0;
  569 
  570         /* We want to write to our peer */
  571         rpipe = fp->f_pipe;
  572         lock = rpipe->pipe_lock;
  573         error = 0;
  574 
  575         mutex_enter(lock);
  576         wpipe = rpipe->pipe_peer;
  577 
  578         /*
  579          * Detect loss of pipe read side, issue SIGPIPE if lost.
  580          */
  581         if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) {
  582                 mutex_exit(lock);
  583                 return EPIPE;
  584         }
  585         ++wpipe->pipe_busy;
  586 
  587         /* Acquire the long-term pipe lock */
  588         if ((error = pipelock(wpipe, true)) != 0) {
  589                 --wpipe->pipe_busy;
  590                 if (wpipe->pipe_busy == 0) {
  591                         wpipe->pipe_state &= ~PIPE_RESTART;
  592                         cv_broadcast(&wpipe->pipe_draincv);
  593                 }
  594                 mutex_exit(lock);
  595                 return (error);
  596         }
  597 
  598         bp = &wpipe->pipe_buffer;
  599 
  600         /*
  601          * If it is advantageous to resize the pipe buffer, do so.
  602          */
  603         if ((uio->uio_resid > PIPE_SIZE) &&
  604             (nbigpipe < maxbigpipes) &&
  605             (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) {
  606 
  607                 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
  608                         atomic_inc_uint(&nbigpipe);
  609         }
  610 
  611         while (uio->uio_resid) {
  612                 size_t space;
  613 
  614                 space = bp->size - bp->cnt;
  615 
  616                 /* Writes of size <= PIPE_BUF must be atomic. */
  617                 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF))
  618                         space = 0;
  619 
  620                 if (space > 0) {
  621                         int size;       /* Transfer size */
  622                         int segsize;    /* first segment to transfer */
  623 
  624                         /*
  625                          * Transfer size is minimum of uio transfer
  626                          * and free space in pipe buffer.
  627                          */
  628                         if (space > uio->uio_resid)
  629                                 size = uio->uio_resid;
  630                         else
  631                                 size = space;
  632                         /*
  633                          * First segment to transfer is minimum of
  634                          * transfer size and contiguous space in
  635                          * pipe buffer.  If first segment to transfer
  636                          * is less than the transfer size, we've got
  637                          * a wraparound in the buffer.
  638                          */
  639                         segsize = bp->size - bp->in;
  640                         if (segsize > size)
  641                                 segsize = size;
  642 
  643                         /* Transfer first segment */
  644                         mutex_exit(lock);
  645                         error = uiomove((char *)bp->buffer + bp->in, segsize,
  646                             uio);
  647 
  648                         if (error == 0 && segsize < size) {
  649                                 /*
  650                                  * Transfer remaining part now, to
  651                                  * support atomic writes.  Wraparound
  652                                  * happened.
  653                                  */
  654                                 KASSERT(bp->in + segsize == bp->size);
  655                                 error = uiomove(bp->buffer,
  656                                     size - segsize, uio);
  657                         }
  658                         mutex_enter(lock);
  659                         if (error)
  660                                 break;
  661 
  662                         bp->in += size;
  663                         if (bp->in >= bp->size) {
  664                                 KASSERT(bp->in == size - segsize + bp->size);
  665                                 bp->in = size - segsize;
  666                         }
  667 
  668                         bp->cnt += size;
  669                         KASSERT(bp->cnt <= bp->size);
  670                         wakeup_state = 0;
  671                 } else {
  672                         /*
  673                          * If the "read-side" has been blocked, wake it up now.
  674                          */
  675                         cv_broadcast(&wpipe->pipe_rcv);
  676 
  677                         /*
  678                          * Don't block on non-blocking I/O.
  679                          */
  680                         if (fp->f_flag & FNONBLOCK) {
  681                                 error = EAGAIN;
  682                                 break;
  683                         }
  684 
  685                         /*
  686                          * We have no more space and have something to offer,
  687                          * wake up select/poll.
  688                          */
  689                         if (bp->cnt)
  690                                 pipeselwakeup(wpipe, wpipe, POLL_IN);
  691 
  692                         if (wakeup_state & PIPE_RESTART) {
  693                                 error = ERESTART;
  694                                 break;
  695                         }
  696 
  697                         /*
  698                          * If read side wants to go away, we just issue a signal
  699                          * to ourselves.
  700                          */
  701                         if (wpipe->pipe_state & PIPE_EOF) {
  702                                 error = EPIPE;
  703                                 break;
  704                         }
  705 
  706                         pipeunlock(wpipe);
  707                         error = cv_wait_sig(&wpipe->pipe_wcv, lock);
  708                         (void)pipelock(wpipe, false);
  709                         if (error != 0)
  710                                 break;
  711                         wakeup_state = wpipe->pipe_state;
  712                 }
  713         }
  714 
  715         --wpipe->pipe_busy;
  716         if (wpipe->pipe_busy == 0) {
  717                 wpipe->pipe_state &= ~PIPE_RESTART;
  718                 cv_broadcast(&wpipe->pipe_draincv);
  719         }
  720         if (bp->cnt > 0) {
  721                 cv_broadcast(&wpipe->pipe_rcv);
  722         }
  723 
  724         /*
  725          * Don't return EPIPE if I/O was successful
  726          */
  727         if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0)
  728                 error = 0;
  729 
  730         if (error == 0)
  731                 getnanotime(&wpipe->pipe_mtime);
  732 
  733         /*
  734          * We have something to offer, wake up select/poll.
  735          * wmap->cnt is always 0 in this point (direct write
  736          * is only done synchronously), so check only wpipe->pipe_buffer.cnt
  737          */
  738         if (bp->cnt)
  739                 pipeselwakeup(wpipe, wpipe, POLL_IN);
  740 
  741         /*
  742          * Arrange for next read(2) to do a signal.
  743          */
  744         wpipe->pipe_state |= PIPE_SIGNALR;
  745 
  746         pipeunlock(wpipe);
  747         mutex_exit(lock);
  748         return (error);
  749 }
  750 
  751 /*
  752  * We implement a very minimal set of ioctls for compatibility with sockets.
  753  */
  754 int
  755 pipe_ioctl(file_t *fp, u_long cmd, void *data)
  756 {
  757         struct pipe *pipe = fp->f_pipe;
  758         kmutex_t *lock = pipe->pipe_lock;
  759 
  760         switch (cmd) {
  761 
  762         case FIONBIO:
  763                 return (0);
  764 
  765         case FIOASYNC:
  766                 mutex_enter(lock);
  767                 if (*(int *)data) {
  768                         pipe->pipe_state |= PIPE_ASYNC;
  769                 } else {
  770                         pipe->pipe_state &= ~PIPE_ASYNC;
  771                 }
  772                 mutex_exit(lock);
  773                 return (0);
  774 
  775         case FIONREAD:
  776                 mutex_enter(lock);
  777                 *(int *)data = pipe->pipe_buffer.cnt;
  778                 mutex_exit(lock);
  779                 return (0);
  780 
  781         case FIONWRITE:
  782                 /* Look at other side */
  783                 mutex_enter(lock);
  784                 pipe = pipe->pipe_peer;
  785                 if (pipe == NULL)
  786                         *(int *)data = 0;
  787                 else
  788                         *(int *)data = pipe->pipe_buffer.cnt;
  789                 mutex_exit(lock);
  790                 return (0);
  791 
  792         case FIONSPACE:
  793                 /* Look at other side */
  794                 mutex_enter(lock);
  795                 pipe = pipe->pipe_peer;
  796                 if (pipe == NULL)
  797                         *(int *)data = 0;
  798                 else
  799                         *(int *)data = pipe->pipe_buffer.size -
  800                             pipe->pipe_buffer.cnt;
  801                 mutex_exit(lock);
  802                 return (0);
  803 
  804         case TIOCSPGRP:
  805         case FIOSETOWN:
  806                 return fsetown(&pipe->pipe_pgid, cmd, data);
  807 
  808         case TIOCGPGRP:
  809         case FIOGETOWN:
  810                 return fgetown(pipe->pipe_pgid, cmd, data);
  811 
  812         }
  813         return (EPASSTHROUGH);
  814 }
  815 
  816 int
  817 pipe_poll(file_t *fp, int events)
  818 {
  819         struct pipe *rpipe = fp->f_pipe;
  820         struct pipe *wpipe;
  821         int eof = 0;
  822         int revents = 0;
  823 
  824         mutex_enter(rpipe->pipe_lock);
  825         wpipe = rpipe->pipe_peer;
  826 
  827         if (events & (POLLIN | POLLRDNORM))
  828                 if ((rpipe->pipe_buffer.cnt > 0) ||
  829                     (rpipe->pipe_state & PIPE_EOF))
  830                         revents |= events & (POLLIN | POLLRDNORM);
  831 
  832         eof |= (rpipe->pipe_state & PIPE_EOF);
  833 
  834         if (wpipe == NULL)
  835                 revents |= events & (POLLOUT | POLLWRNORM);
  836         else {
  837                 if (events & (POLLOUT | POLLWRNORM))
  838                         if ((wpipe->pipe_state & PIPE_EOF) || (
  839                              (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
  840                                 revents |= events & (POLLOUT | POLLWRNORM);
  841 
  842                 eof |= (wpipe->pipe_state & PIPE_EOF);
  843         }
  844 
  845         if (wpipe == NULL || eof)
  846                 revents |= POLLHUP;
  847 
  848         if (revents == 0) {
  849                 if (events & (POLLIN | POLLRDNORM))
  850                         selrecord(curlwp, &rpipe->pipe_sel);
  851 
  852                 if (events & (POLLOUT | POLLWRNORM))
  853                         selrecord(curlwp, &wpipe->pipe_sel);
  854         }
  855         mutex_exit(rpipe->pipe_lock);
  856 
  857         return (revents);
  858 }
  859 
  860 static int
  861 pipe_stat(file_t *fp, struct stat *ub)
  862 {
  863         struct pipe *pipe = fp->f_pipe;
  864 
  865         mutex_enter(pipe->pipe_lock);
  866         memset(ub, 0, sizeof(*ub));
  867         ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR;
  868         ub->st_blksize = pipe->pipe_buffer.size;
  869         if (ub->st_blksize == 0 && pipe->pipe_peer)
  870                 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size;
  871         ub->st_size = pipe->pipe_buffer.cnt;
  872         ub->st_blocks = (ub->st_size) ? 1 : 0;
  873         ub->st_atimespec = pipe->pipe_atime;
  874         ub->st_mtimespec = pipe->pipe_mtime;
  875         ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime;
  876         ub->st_uid = kauth_cred_geteuid(fp->f_cred);
  877         ub->st_gid = kauth_cred_getegid(fp->f_cred);
  878 
  879         /*
  880          * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
  881          * XXX (st_dev, st_ino) should be unique.
  882          */
  883         mutex_exit(pipe->pipe_lock);
  884         return 0;
  885 }
  886 
  887 static int
  888 pipe_close(file_t *fp)
  889 {
  890         struct pipe *pipe = fp->f_pipe;
  891 
  892         fp->f_pipe = NULL;
  893         pipeclose(pipe);
  894         return (0);
  895 }
  896 
  897 static void
  898 pipe_restart(file_t *fp)
  899 {
  900         struct pipe *pipe = fp->f_pipe;
  901 
  902         /*
  903          * Unblock blocked reads/writes in order to allow close() to complete.
  904          * System calls return ERESTART so that the fd is revalidated.
  905          * (Partial writes return the transfer length.)
  906          */
  907         mutex_enter(pipe->pipe_lock);
  908         pipe->pipe_state |= PIPE_RESTART;
  909         /* Wakeup both cvs, maybe we only need one, but maybe there are some
  910          * other paths where wakeup is needed, and it saves deciding which! */
  911         cv_broadcast(&pipe->pipe_rcv);
  912         cv_broadcast(&pipe->pipe_wcv);
  913         mutex_exit(pipe->pipe_lock);
  914 }
  915 
  916 static void
  917 pipe_free_kmem(struct pipe *pipe)
  918 {
  919 
  920         if (pipe->pipe_buffer.buffer != NULL) {
  921                 if (pipe->pipe_buffer.size > PIPE_SIZE) {
  922                         atomic_dec_uint(&nbigpipe);
  923                 }
  924                 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) {
  925                         uvm_km_free(kernel_map,
  926                             (vaddr_t)pipe->pipe_buffer.buffer,
  927                             pipe->pipe_buffer.size, UVM_KMF_PAGEABLE);
  928                         atomic_add_int(&amountpipekva,
  929                             -pipe->pipe_buffer.size);
  930                 }
  931                 pipe->pipe_buffer.buffer = NULL;
  932         }
  933 }
  934 
  935 /*
  936  * Shutdown the pipe.
  937  */
  938 static void
  939 pipeclose(struct pipe *pipe)
  940 {
  941         kmutex_t *lock;
  942         struct pipe *ppipe;
  943 
  944         if (pipe == NULL)
  945                 return;
  946 
  947         KASSERT(cv_is_valid(&pipe->pipe_rcv));
  948         KASSERT(cv_is_valid(&pipe->pipe_wcv));
  949         KASSERT(cv_is_valid(&pipe->pipe_draincv));
  950         KASSERT(cv_is_valid(&pipe->pipe_lkcv));
  951 
  952         lock = pipe->pipe_lock;
  953         if (lock == NULL)
  954                 /* Must have failed during create */
  955                 goto free_resources;
  956 
  957         mutex_enter(lock);
  958         pipeselwakeup(pipe, pipe, POLL_HUP);
  959 
  960         /*
  961          * If the other side is blocked, wake it up saying that
  962          * we want to close it down.
  963          */
  964         pipe->pipe_state |= PIPE_EOF;
  965         if (pipe->pipe_busy) {
  966                 while (pipe->pipe_busy) {
  967                         cv_broadcast(&pipe->pipe_wcv);
  968                         cv_wait_sig(&pipe->pipe_draincv, lock);
  969                 }
  970         }
  971 
  972         /*
  973          * Disconnect from peer.
  974          */
  975         if ((ppipe = pipe->pipe_peer) != NULL) {
  976                 pipeselwakeup(ppipe, ppipe, POLL_HUP);
  977                 ppipe->pipe_state |= PIPE_EOF;
  978                 cv_broadcast(&ppipe->pipe_rcv);
  979                 ppipe->pipe_peer = NULL;
  980         }
  981 
  982         /*
  983          * Any knote objects still left in the list are
  984          * the one attached by peer.  Since no one will
  985          * traverse this list, we just clear it.
  986          *
  987          * XXX Exposes select/kqueue internals.
  988          */
  989         SLIST_INIT(&pipe->pipe_sel.sel_klist);
  990 
  991         KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0);
  992         mutex_exit(lock);
  993         mutex_obj_free(lock);
  994 
  995         /*
  996          * Free resources.
  997          */
  998     free_resources:
  999         pipe->pipe_pgid = 0;
 1000         pipe->pipe_state = PIPE_SIGNALR;
 1001         pipe->pipe_peer = NULL;
 1002         pipe->pipe_lock = NULL;
 1003         pipe_free_kmem(pipe);
 1004         if (pipe->pipe_kmem != 0) {
 1005                 pool_cache_put(pipe_rd_cache, pipe);
 1006         } else {
 1007                 pool_cache_put(pipe_wr_cache, pipe);
 1008         }
 1009 }
 1010 
 1011 static void
 1012 filt_pipedetach(struct knote *kn)
 1013 {
 1014         struct pipe *pipe;
 1015         kmutex_t *lock;
 1016 
 1017         pipe = ((file_t *)kn->kn_obj)->f_pipe;
 1018         lock = pipe->pipe_lock;
 1019 
 1020         mutex_enter(lock);
 1021 
 1022         switch(kn->kn_filter) {
 1023         case EVFILT_WRITE:
 1024                 /* Need the peer structure, not our own. */
 1025                 pipe = pipe->pipe_peer;
 1026 
 1027                 /* If reader end already closed, just return. */
 1028                 if (pipe == NULL) {
 1029                         mutex_exit(lock);
 1030                         return;
 1031                 }
 1032 
 1033                 break;
 1034         default:
 1035                 /* Nothing to do. */
 1036                 break;
 1037         }
 1038 
 1039         KASSERT(kn->kn_hook == pipe);
 1040         selremove_knote(&pipe->pipe_sel, kn);
 1041         mutex_exit(lock);
 1042 }
 1043 
 1044 static int
 1045 filt_piperead(struct knote *kn, long hint)
 1046 {
 1047         struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
 1048         struct pipe *wpipe;
 1049         int rv;
 1050 
 1051         if ((hint & NOTE_SUBMIT) == 0) {
 1052                 mutex_enter(rpipe->pipe_lock);
 1053         }
 1054         wpipe = rpipe->pipe_peer;
 1055         kn->kn_data = rpipe->pipe_buffer.cnt;
 1056 
 1057         if ((rpipe->pipe_state & PIPE_EOF) ||
 1058             (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
 1059                 knote_set_eof(kn, 0);
 1060                 rv = 1;
 1061         } else {
 1062                 rv = kn->kn_data > 0;
 1063         }
 1064 
 1065         if ((hint & NOTE_SUBMIT) == 0) {
 1066                 mutex_exit(rpipe->pipe_lock);
 1067         }
 1068         return rv;
 1069 }
 1070 
 1071 static int
 1072 filt_pipewrite(struct knote *kn, long hint)
 1073 {
 1074         struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
 1075         struct pipe *wpipe;
 1076         int rv;
 1077 
 1078         if ((hint & NOTE_SUBMIT) == 0) {
 1079                 mutex_enter(rpipe->pipe_lock);
 1080         }
 1081         wpipe = rpipe->pipe_peer;
 1082 
 1083         if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
 1084                 kn->kn_data = 0;
 1085                 knote_set_eof(kn, 0);
 1086                 rv = 1;
 1087         } else {
 1088                 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
 1089                 rv = kn->kn_data >= PIPE_BUF;
 1090         }
 1091 
 1092         if ((hint & NOTE_SUBMIT) == 0) {
 1093                 mutex_exit(rpipe->pipe_lock);
 1094         }
 1095         return rv;
 1096 }
 1097 
 1098 static const struct filterops pipe_rfiltops = {
 1099         .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
 1100         .f_attach = NULL,
 1101         .f_detach = filt_pipedetach,
 1102         .f_event = filt_piperead,
 1103 };
 1104 
 1105 static const struct filterops pipe_wfiltops = {
 1106         .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
 1107         .f_attach = NULL,
 1108         .f_detach = filt_pipedetach,
 1109         .f_event = filt_pipewrite,
 1110 };
 1111 
 1112 static int
 1113 pipe_kqfilter(file_t *fp, struct knote *kn)
 1114 {
 1115         struct pipe *pipe;
 1116         kmutex_t *lock;
 1117 
 1118         pipe = ((file_t *)kn->kn_obj)->f_pipe;
 1119         lock = pipe->pipe_lock;
 1120 
 1121         mutex_enter(lock);
 1122 
 1123         switch (kn->kn_filter) {
 1124         case EVFILT_READ:
 1125                 kn->kn_fop = &pipe_rfiltops;
 1126                 break;
 1127         case EVFILT_WRITE:
 1128                 kn->kn_fop = &pipe_wfiltops;
 1129                 pipe = pipe->pipe_peer;
 1130                 if (pipe == NULL) {
 1131                         /* Other end of pipe has been closed. */
 1132                         mutex_exit(lock);
 1133                         return (EBADF);
 1134                 }
 1135                 break;
 1136         default:
 1137                 mutex_exit(lock);
 1138                 return (EINVAL);
 1139         }
 1140 
 1141         kn->kn_hook = pipe;
 1142         selrecord_knote(&pipe->pipe_sel, kn);
 1143         mutex_exit(lock);
 1144 
 1145         return (0);
 1146 }
 1147 
 1148 /*
 1149  * Handle pipe sysctls.
 1150  */
 1151 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup")
 1152 {
 1153 
 1154         sysctl_createv(clog, 0, NULL, NULL,
 1155                        CTLFLAG_PERMANENT,
 1156                        CTLTYPE_NODE, "pipe",
 1157                        SYSCTL_DESCR("Pipe settings"),
 1158                        NULL, 0, NULL, 0,
 1159                        CTL_KERN, KERN_PIPE, CTL_EOL);
 1160 
 1161         sysctl_createv(clog, 0, NULL, NULL,
 1162                        CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
 1163                        CTLTYPE_INT, "maxbigpipes",
 1164                        SYSCTL_DESCR("Maximum number of \"big\" pipes"),
 1165                        NULL, 0, &maxbigpipes, 0,
 1166                        CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL);
 1167         sysctl_createv(clog, 0, NULL, NULL,
 1168                        CTLFLAG_PERMANENT,
 1169                        CTLTYPE_INT, "nbigpipes",
 1170                        SYSCTL_DESCR("Number of \"big\" pipes"),
 1171                        NULL, 0, &nbigpipe, 0,
 1172                        CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL);
 1173         sysctl_createv(clog, 0, NULL, NULL,
 1174                        CTLFLAG_PERMANENT,
 1175                        CTLTYPE_INT, "kvasize",
 1176                        SYSCTL_DESCR("Amount of kernel memory consumed by pipe "
 1177                                     "buffers"),
 1178                        NULL, 0, &amountpipekva, 0,
 1179                        CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL);
 1180 }

Cache object: 6868220c29ca70808586cd9b317587ff


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.