The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD$");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 
  103 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  104 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  105 
  106 /* Daemon functions */
  107 static int ald_add(struct alq *);
  108 static int ald_rem(struct alq *);
  109 static void ald_startup(void *);
  110 static void ald_daemon(void);
  111 static void ald_shutdown(void *, int);
  112 static void ald_activate(struct alq *);
  113 static void ald_deactivate(struct alq *);
  114 
  115 /* Internal queue functions */
  116 static void alq_shutdown(struct alq *);
  117 static void alq_destroy(struct alq *);
  118 static int alq_doio(struct alq *);
  119 
  120 
  121 /*
  122  * Add a new queue to the global list.  Fail if we're shutting down.
  123  */
  124 static int
  125 ald_add(struct alq *alq)
  126 {
  127         int error;
  128 
  129         error = 0;
  130 
  131         ALD_LOCK();
  132         if (ald_shutingdown) {
  133                 error = EBUSY;
  134                 goto done;
  135         }
  136         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  137 done:
  138         ALD_UNLOCK();
  139         return (error);
  140 }
  141 
  142 /*
  143  * Remove a queue from the global list unless we're shutting down.  If so,
  144  * the ald will take care of cleaning up it's resources.
  145  */
  146 static int
  147 ald_rem(struct alq *alq)
  148 {
  149         int error;
  150 
  151         error = 0;
  152 
  153         ALD_LOCK();
  154         if (ald_shutingdown) {
  155                 error = EBUSY;
  156                 goto done;
  157         }
  158         LIST_REMOVE(alq, aq_link);
  159 done:
  160         ALD_UNLOCK();
  161         return (error);
  162 }
  163 
  164 /*
  165  * Put a queue on the active list.  This will schedule it for writing.
  166  */
  167 static void
  168 ald_activate(struct alq *alq)
  169 {
  170         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  171         wakeup(&ald_active);
  172 }
  173 
  174 static void
  175 ald_deactivate(struct alq *alq)
  176 {
  177         LIST_REMOVE(alq, aq_act);
  178         alq->aq_flags &= ~AQ_ACTIVE;
  179 }
  180 
  181 static void
  182 ald_startup(void *unused)
  183 {
  184         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  185         LIST_INIT(&ald_queues);
  186         LIST_INIT(&ald_active);
  187 }
  188 
  189 static void
  190 ald_daemon(void)
  191 {
  192         int needwakeup;
  193         struct alq *alq;
  194 
  195         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  196 
  197         EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL,
  198             SHUTDOWN_PRI_FIRST);
  199 
  200         ALD_LOCK();
  201 
  202         for (;;) {
  203                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  204                     !ald_shutingdown)
  205                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  206 
  207                 /* Don't shutdown until all active ALQs are flushed. */
  208                 if (ald_shutingdown && alq == NULL) {
  209                         ALD_UNLOCK();
  210                         break;
  211                 }
  212 
  213                 ALQ_LOCK(alq);
  214                 ald_deactivate(alq);
  215                 ALD_UNLOCK();
  216                 needwakeup = alq_doio(alq);
  217                 ALQ_UNLOCK(alq);
  218                 if (needwakeup)
  219                         wakeup_one(alq);
  220                 ALD_LOCK();
  221         }
  222 
  223         kthread_exit(0);
  224 }
  225 
  226 static void
  227 ald_shutdown(void *arg, int howto)
  228 {
  229         struct alq *alq;
  230 
  231         ALD_LOCK();
  232 
  233         /* Ensure no new queues can be created. */
  234         ald_shutingdown = 1;
  235 
  236         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  237         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  238                 LIST_REMOVE(alq, aq_link);
  239                 ALD_UNLOCK();
  240                 alq_shutdown(alq);
  241                 ALD_LOCK();
  242         }
  243 
  244         /* At this point, all ALQs are flushed and shutdown. */
  245 
  246         /*
  247          * Wake ald_daemon so that it exits. It won't be able to do
  248          * anything until we mtx_sleep because we hold the ald_mtx.
  249          */
  250         wakeup(&ald_active);
  251 
  252         /* Wait for ald_daemon to exit. */
  253         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  254 
  255         ALD_UNLOCK();
  256 }
  257 
  258 static void
  259 alq_shutdown(struct alq *alq)
  260 {
  261         ALQ_LOCK(alq);
  262 
  263         /* Stop any new writers. */
  264         alq->aq_flags |= AQ_SHUTDOWN;
  265 
  266         /*
  267          * If the ALQ isn't active but has unwritten data (possible if
  268          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  269          * ALQ here so that the pending data gets flushed by the ald_daemon.
  270          */
  271         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  272                 alq->aq_flags |= AQ_ACTIVE;
  273                 ALQ_UNLOCK(alq);
  274                 ALD_LOCK();
  275                 ald_activate(alq);
  276                 ALD_UNLOCK();
  277                 ALQ_LOCK(alq);
  278         }
  279 
  280         /* Drain IO */
  281         while (alq->aq_flags & AQ_ACTIVE) {
  282                 alq->aq_flags |= AQ_WANTED;
  283                 ALQ_UNLOCK(alq);
  284                 tsleep(alq, PWAIT, "aldclose", 0);
  285                 ALQ_LOCK(alq);
  286         }
  287         ALQ_UNLOCK(alq);
  288 
  289         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  290             curthread);
  291         crfree(alq->aq_cred);
  292 }
  293 
  294 void
  295 alq_destroy(struct alq *alq)
  296 {
  297         /* Drain all pending IO. */
  298         alq_shutdown(alq);
  299 
  300         mtx_destroy(&alq->aq_mtx);
  301         free(alq->aq_entbuf, M_ALD);
  302         free(alq, M_ALD);
  303 }
  304 
  305 /*
  306  * Flush all pending data to disk.  This operation will block.
  307  */
  308 static int
  309 alq_doio(struct alq *alq)
  310 {
  311         struct thread *td;
  312         struct mount *mp;
  313         struct vnode *vp;
  314         struct uio auio;
  315         struct iovec aiov[2];
  316         int totlen;
  317         int iov;
  318         int vfslocked;
  319         int wrapearly;
  320 
  321         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  322 
  323         vp = alq->aq_vp;
  324         td = curthread;
  325         totlen = 0;
  326         iov = 1;
  327         wrapearly = alq->aq_wrapearly;
  328 
  329         bzero(&aiov, sizeof(aiov));
  330         bzero(&auio, sizeof(auio));
  331 
  332         /* Start the write from the location of our buffer tail pointer. */
  333         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  334 
  335         if (alq->aq_writetail < alq->aq_writehead) {
  336                 /* Buffer not wrapped. */
  337                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  338         } else if (alq->aq_writehead == 0) {
  339                 /* Buffer not wrapped (special case to avoid an empty iov). */
  340                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  341                     wrapearly;
  342         } else {
  343                 /*
  344                  * Buffer wrapped, requires 2 aiov entries:
  345                  * - first is from writetail to end of buffer
  346                  * - second is from start of buffer to writehead
  347                  */
  348                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  349                     wrapearly;
  350                 iov++;
  351                 aiov[1].iov_base = alq->aq_entbuf;
  352                 aiov[1].iov_len =  alq->aq_writehead;
  353                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  354         }
  355 
  356         alq->aq_flags |= AQ_FLUSHING;
  357         ALQ_UNLOCK(alq);
  358 
  359         auio.uio_iov = &aiov[0];
  360         auio.uio_offset = 0;
  361         auio.uio_segflg = UIO_SYSSPACE;
  362         auio.uio_rw = UIO_WRITE;
  363         auio.uio_iovcnt = iov;
  364         auio.uio_resid = totlen;
  365         auio.uio_td = td;
  366 
  367         /*
  368          * Do all of the junk required to write now.
  369          */
  370         vfslocked = VFS_LOCK_GIANT(vp->v_mount);
  371         vn_start_write(vp, &mp, V_WAIT);
  372         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY, td);
  373         VOP_LEASE(vp, td, alq->aq_cred, LEASE_WRITE);
  374         /*
  375          * XXX: VOP_WRITE error checks are ignored.
  376          */
  377 #ifdef MAC
  378         if (mac_check_vnode_write(alq->aq_cred, NOCRED, vp) == 0)
  379 #endif
  380                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  381         VOP_UNLOCK(vp, 0, td);
  382         vn_finished_write(mp);
  383         VFS_UNLOCK_GIANT(vfslocked);
  384 
  385         ALQ_LOCK(alq);
  386         alq->aq_flags &= ~AQ_FLUSHING;
  387 
  388         /* Adjust writetail as required, taking into account wrapping. */
  389         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  390             alq->aq_buflen;
  391         alq->aq_freebytes += totlen + wrapearly;
  392 
  393         /*
  394          * If we just flushed part of the buffer which wrapped, reset the
  395          * wrapearly indicator.
  396          */
  397         if (wrapearly)
  398                 alq->aq_wrapearly = 0;
  399 
  400         /*
  401          * If we just flushed the buffer completely, reset indexes to 0 to
  402          * minimise buffer wraps.
  403          * This is also required to ensure alq_getn() can't wedge itself.
  404          */
  405         if (!HAS_PENDING_DATA(alq))
  406                 alq->aq_writehead = alq->aq_writetail = 0;
  407 
  408         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  409             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  410 
  411         if (alq->aq_flags & AQ_WANTED) {
  412                 alq->aq_flags &= ~AQ_WANTED;
  413                 return (1);
  414         }
  415 
  416         return(0);
  417 }
  418 
  419 static struct kproc_desc ald_kp = {
  420         "ALQ Daemon",
  421         ald_daemon,
  422         &ald_proc
  423 };
  424 
  425 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  426 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  427 
  428 
  429 /* User visible queue functions */
  430 
  431 /*
  432  * Create the queue data structure, allocate the buffer, and open the file.
  433  */
  434 
  435 int
  436 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  437     int size, int flags)
  438 {
  439         struct thread *td;
  440         struct nameidata nd;
  441         struct alq *alq;
  442         int oflags;
  443         int error;
  444         int vfslocked;
  445 
  446         KASSERT((size > 0), ("%s: size <= 0", __func__));
  447 
  448         *alqp = NULL;
  449         td = curthread;
  450 
  451         NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
  452         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  453 
  454         error = vn_open_cred(&nd, &oflags, cmode, cred, NULL);
  455         if (error)
  456                 return (error);
  457 
  458         vfslocked = NDHASGIANT(&nd);
  459         NDFREE(&nd, NDF_ONLY_PNBUF);
  460         /* We just unlock so we hold a reference */
  461         VOP_UNLOCK(nd.ni_vp, 0, td);
  462         VFS_UNLOCK_GIANT(vfslocked);
  463 
  464         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  465         alq->aq_vp = nd.ni_vp;
  466         alq->aq_cred = crhold(cred);
  467 
  468         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  469 
  470         alq->aq_buflen = size;
  471         alq->aq_entmax = 0;
  472         alq->aq_entlen = 0;
  473 
  474         alq->aq_freebytes = alq->aq_buflen;
  475         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  476         alq->aq_writehead = alq->aq_writetail = 0;
  477         if (flags & ALQ_ORDERED)
  478                 alq->aq_flags |= AQ_ORDERED;
  479 
  480         if ((error = ald_add(alq)) != 0) {
  481                 alq_destroy(alq);
  482                 return (error);
  483         }
  484 
  485         *alqp = alq;
  486 
  487         return (0);
  488 }
  489 
  490 int
  491 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  492     int size, int count)
  493 {
  494         int ret;
  495 
  496         KASSERT((count >= 0), ("%s: count < 0", __func__));
  497 
  498         if (count > 0) {
  499                 ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
  500                 (*alqp)->aq_flags |= AQ_LEGACY;
  501                 (*alqp)->aq_entmax = count;
  502                 (*alqp)->aq_entlen = size;
  503         } else
  504                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  505 
  506         return (ret);
  507 }
  508 
  509 
  510 /*
  511  * Copy a new entry into the queue.  If the operation would block either
  512  * wait or return an error depending on the value of waitok.
  513  */
  514 int
  515 alq_writen(struct alq *alq, void *data, int len, int flags)
  516 {
  517         int activate, copy, ret;
  518         void *waitchan;
  519 
  520         KASSERT((len > 0 && len <= alq->aq_buflen),
  521             ("%s: len <= 0 || len > aq_buflen", __func__));
  522 
  523         activate = ret = 0;
  524         copy = len;
  525         waitchan = NULL;
  526 
  527         ALQ_LOCK(alq);
  528 
  529         /*
  530          * Fail to perform the write and return EWOULDBLOCK if:
  531          * - The message is larger than our underlying buffer.
  532          * - The ALQ is being shutdown.
  533          * - There is insufficient free space in our underlying buffer
  534          *   to accept the message and the user can't wait for space.
  535          * - There is insufficient free space in our underlying buffer
  536          *   to accept the message and the alq is inactive due to prior
  537          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  538          */
  539         if (len > alq->aq_buflen ||
  540             alq->aq_flags & AQ_SHUTDOWN ||
  541             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  542             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  543                 ALQ_UNLOCK(alq);
  544                 return (EWOULDBLOCK);
  545         }
  546 
  547         /*
  548          * If we want ordered writes and there is already at least one thread
  549          * waiting for resources to become available, sleep until we're woken.
  550          */
  551         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  552                 KASSERT(!(flags & ALQ_NOWAIT),
  553                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  554                 alq->aq_waiters++;
  555                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  556                 alq->aq_waiters--;
  557         }
  558 
  559         /*
  560          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  561          * enter while loop and sleep until we have enough free bytes (former)
  562          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  563          * be in this loop. Otherwise, multiple threads may be sleeping here
  564          * competing for ALQ resources.
  565          */
  566         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  567                 KASSERT(!(flags & ALQ_NOWAIT),
  568                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  569                 alq->aq_flags |= AQ_WANTED;
  570                 alq->aq_waiters++;
  571                 if (waitchan)
  572                         wakeup(waitchan);
  573                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  574                 alq->aq_waiters--;
  575 
  576                 /*
  577                  * If we're the first thread to wake after an AQ_WANTED wakeup
  578                  * but there isn't enough free space for us, we're going to loop
  579                  * and sleep again. If there are other threads waiting in this
  580                  * loop, schedule a wakeup so that they can see if the space
  581                  * they require is available.
  582                  */
  583                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  584                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  585                         waitchan = alq;
  586                 else
  587                         waitchan = NULL;
  588         }
  589 
  590         /*
  591          * If there are waiters, we need to signal the waiting threads after we
  592          * complete our work. The alq ptr is used as a wait channel for threads
  593          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  594          * are not allowed to concurrently compete for resources in the above
  595          * while loop, so we use a different wait channel in this case.
  596          */
  597         if (alq->aq_waiters > 0) {
  598                 if (alq->aq_flags & AQ_ORDERED)
  599                         waitchan = &alq->aq_waiters;
  600                 else
  601                         waitchan = alq;
  602         } else
  603                 waitchan = NULL;
  604 
  605         /* Bail if we're shutting down. */
  606         if (alq->aq_flags & AQ_SHUTDOWN) {
  607                 ret = EWOULDBLOCK;
  608                 goto unlock;
  609         }
  610 
  611         /*
  612          * If we need to wrap the buffer to accommodate the write,
  613          * we'll need 2 calls to bcopy.
  614          */
  615         if ((alq->aq_buflen - alq->aq_writehead) < len)
  616                 copy = alq->aq_buflen - alq->aq_writehead;
  617 
  618         /* Copy message (or part thereof if wrap required) to the buffer. */
  619         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  620         alq->aq_writehead += copy;
  621 
  622         if (alq->aq_writehead >= alq->aq_buflen) {
  623                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  624                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  625                     __func__,
  626                     alq->aq_writehead,
  627                     alq->aq_buflen));
  628                 alq->aq_writehead = 0;
  629         }
  630 
  631         if (copy != len) {
  632                 /*
  633                  * Wrap the buffer by copying the remainder of our message
  634                  * to the start of the buffer and resetting aq_writehead.
  635                  */
  636                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  637                 alq->aq_writehead = len - copy;
  638         }
  639 
  640         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  641             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  642 
  643         alq->aq_freebytes -= len;
  644 
  645         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  646                 alq->aq_flags |= AQ_ACTIVE;
  647                 activate = 1;
  648         }
  649 
  650         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  651 
  652 unlock:
  653         ALQ_UNLOCK(alq);
  654 
  655         if (activate) {
  656                 ALD_LOCK();
  657                 ald_activate(alq);
  658                 ALD_UNLOCK();
  659         }
  660 
  661         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  662         if (waitchan != NULL)
  663                 wakeup_one(waitchan);
  664 
  665         return (ret);
  666 }
  667 
  668 int
  669 alq_write(struct alq *alq, void *data, int flags)
  670 {
  671         /* Should only be called in fixed length message (legacy) mode. */
  672         KASSERT((alq->aq_flags & AQ_LEGACY),
  673             ("%s: fixed length write on variable length queue", __func__));
  674         return (alq_writen(alq, data, alq->aq_entlen, flags));
  675 }
  676 
  677 /*
  678  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  679  */
  680 struct ale *
  681 alq_getn(struct alq *alq, int len, int flags)
  682 {
  683         int contigbytes;
  684         void *waitchan;
  685 
  686         KASSERT((len > 0 && len <= alq->aq_buflen),
  687             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  688 
  689         waitchan = NULL;
  690 
  691         ALQ_LOCK(alq);
  692 
  693         /*
  694          * Determine the number of free contiguous bytes.
  695          * We ensure elsewhere that if aq_writehead == aq_writetail because
  696          * the buffer is empty, they will both be set to 0 and therefore
  697          * aq_freebytes == aq_buflen and is fully contiguous.
  698          * If they are equal and the buffer is not empty, aq_freebytes will
  699          * be 0 indicating the buffer is full.
  700          */
  701         if (alq->aq_writehead <= alq->aq_writetail)
  702                 contigbytes = alq->aq_freebytes;
  703         else {
  704                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  705 
  706                 if (contigbytes < len) {
  707                         /*
  708                          * Insufficient space at end of buffer to handle a
  709                          * contiguous write. Wrap early if there's space at
  710                          * the beginning. This will leave a hole at the end
  711                          * of the buffer which we will have to skip over when
  712                          * flushing the buffer to disk.
  713                          */
  714                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  715                                 /* Keep track of # bytes left blank. */
  716                                 alq->aq_wrapearly = contigbytes;
  717                                 /* Do the wrap and adjust counters. */
  718                                 contigbytes = alq->aq_freebytes =
  719                                     alq->aq_writetail;
  720                                 alq->aq_writehead = 0;
  721                         }
  722                 }
  723         }
  724 
  725         /*
  726          * Return a NULL ALE if:
  727          * - The message is larger than our underlying buffer.
  728          * - The ALQ is being shutdown.
  729          * - There is insufficient free space in our underlying buffer
  730          *   to accept the message and the user can't wait for space.
  731          * - There is insufficient free space in our underlying buffer
  732          *   to accept the message and the alq is inactive due to prior
  733          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  734          */
  735         if (len > alq->aq_buflen ||
  736             alq->aq_flags & AQ_SHUTDOWN ||
  737             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  738             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  739                 ALQ_UNLOCK(alq);
  740                 return (NULL);
  741         }
  742 
  743         /*
  744          * If we want ordered writes and there is already at least one thread
  745          * waiting for resources to become available, sleep until we're woken.
  746          */
  747         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  748                 KASSERT(!(flags & ALQ_NOWAIT),
  749                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  750                 alq->aq_waiters++;
  751                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  752                 alq->aq_waiters--;
  753         }
  754 
  755         /*
  756          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  757          * while loop and sleep until we have enough contiguous free bytes
  758          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  759          * time will be in this loop. Otherwise, multiple threads may be
  760          * sleeping here competing for ALQ resources.
  761          */
  762         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  763                 KASSERT(!(flags & ALQ_NOWAIT),
  764                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  765                 alq->aq_flags |= AQ_WANTED;
  766                 alq->aq_waiters++;
  767                 if (waitchan)
  768                         wakeup(waitchan);
  769                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  770                 alq->aq_waiters--;
  771 
  772                 if (alq->aq_writehead <= alq->aq_writetail)
  773                         contigbytes = alq->aq_freebytes;
  774                 else
  775                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  776 
  777                 /*
  778                  * If we're the first thread to wake after an AQ_WANTED wakeup
  779                  * but there isn't enough free space for us, we're going to loop
  780                  * and sleep again. If there are other threads waiting in this
  781                  * loop, schedule a wakeup so that they can see if the space
  782                  * they require is available.
  783                  */
  784                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  785                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  786                         waitchan = alq;
  787                 else
  788                         waitchan = NULL;
  789         }
  790 
  791         /*
  792          * If there are waiters, we need to signal the waiting threads after we
  793          * complete our work. The alq ptr is used as a wait channel for threads
  794          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  795          * are not allowed to concurrently compete for resources in the above
  796          * while loop, so we use a different wait channel in this case.
  797          */
  798         if (alq->aq_waiters > 0) {
  799                 if (alq->aq_flags & AQ_ORDERED)
  800                         waitchan = &alq->aq_waiters;
  801                 else
  802                         waitchan = alq;
  803         } else
  804                 waitchan = NULL;
  805 
  806         /* Bail if we're shutting down. */
  807         if (alq->aq_flags & AQ_SHUTDOWN) {
  808                 ALQ_UNLOCK(alq);
  809                 if (waitchan != NULL)
  810                         wakeup_one(waitchan);
  811                 return (NULL);
  812         }
  813 
  814         /*
  815          * If we are here, we have a contiguous number of bytes >= len
  816          * available in our buffer starting at aq_writehead.
  817          */
  818         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  819         alq->aq_getpost.ae_bytesused = len;
  820 
  821         return (&alq->aq_getpost);
  822 }
  823 
  824 struct ale *
  825 alq_get(struct alq *alq, int flags)
  826 {
  827         /* Should only be called in fixed length message (legacy) mode. */
  828         KASSERT((alq->aq_flags & AQ_LEGACY),
  829             ("%s: fixed length get on variable length queue", __func__));
  830         return (alq_getn(alq, alq->aq_entlen, flags));
  831 }
  832 
  833 void
  834 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  835 {
  836         int activate;
  837         void *waitchan;
  838 
  839         activate = 0;
  840 
  841         if (ale->ae_bytesused > 0) {
  842                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  843                     !(flags & ALQ_NOACTIVATE)) {
  844                         alq->aq_flags |= AQ_ACTIVE;
  845                         activate = 1;
  846                 }
  847 
  848                 alq->aq_writehead += ale->ae_bytesused;
  849                 alq->aq_freebytes -= ale->ae_bytesused;
  850 
  851                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  852                 if (alq->aq_writehead == alq->aq_buflen)
  853                         alq->aq_writehead = 0;
  854 
  855                 KASSERT((alq->aq_writehead >= 0 &&
  856                     alq->aq_writehead < alq->aq_buflen),
  857                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  858                     __func__));
  859 
  860                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  861         }
  862 
  863         /*
  864          * If there are waiters, we need to signal the waiting threads after we
  865          * complete our work. The alq ptr is used as a wait channel for threads
  866          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  867          * are not allowed to concurrently compete for resources in the
  868          * alq_getn() while loop, so we use a different wait channel in this case.
  869          */
  870         if (alq->aq_waiters > 0) {
  871                 if (alq->aq_flags & AQ_ORDERED)
  872                         waitchan = &alq->aq_waiters;
  873                 else
  874                         waitchan = alq;
  875         } else
  876                 waitchan = NULL;
  877 
  878         ALQ_UNLOCK(alq);
  879 
  880         if (activate) {
  881                 ALD_LOCK();
  882                 ald_activate(alq);
  883                 ALD_UNLOCK();
  884         }
  885 
  886         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  887         if (waitchan != NULL)
  888                 wakeup_one(waitchan);
  889 }
  890 
  891 void
  892 alq_flush(struct alq *alq)
  893 {
  894         int needwakeup = 0;
  895 
  896         ALD_LOCK();
  897         ALQ_LOCK(alq);
  898 
  899         /*
  900          * Pull the lever iff there is data to flush and we're
  901          * not already in the middle of a flush operation.
  902          */
  903         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  904                 if (alq->aq_flags & AQ_ACTIVE)
  905                         ald_deactivate(alq);
  906 
  907                 ALD_UNLOCK();
  908                 needwakeup = alq_doio(alq);
  909         } else
  910                 ALD_UNLOCK();
  911 
  912         ALQ_UNLOCK(alq);
  913 
  914         if (needwakeup)
  915                 wakeup_one(alq);
  916 }
  917 
  918 /*
  919  * Flush remaining data, close the file and free all resources.
  920  */
  921 void
  922 alq_close(struct alq *alq)
  923 {
  924         /* Only flush and destroy alq if not already shutting down. */
  925         if (ald_rem(alq) == 0)
  926                 alq_destroy(alq);
  927 }
  928 
  929 static int
  930 alq_load_handler(module_t mod, int what, void *arg)
  931 {
  932         int ret;
  933         
  934         ret = 0;
  935 
  936         switch (what) {
  937         case MOD_LOAD:
  938         case MOD_SHUTDOWN:
  939                 break;
  940 
  941         case MOD_QUIESCE:
  942                 ALD_LOCK();
  943                 /* Only allow unload if there are no open queues. */
  944                 if (LIST_FIRST(&ald_queues) == NULL) {
  945                         ald_shutingdown = 1;
  946                         ALD_UNLOCK();
  947                         ald_shutdown(NULL, 0);
  948                         mtx_destroy(&ald_mtx);
  949                 } else {
  950                         ALD_UNLOCK();
  951                         ret = EBUSY;
  952                 }
  953                 break;
  954 
  955         case MOD_UNLOAD:
  956                 /* If MOD_QUIESCE failed we must fail here too. */
  957                 if (ald_shutingdown == 0)
  958                         ret = EBUSY;
  959                 break;
  960 
  961         default:
  962                 ret = EINVAL;
  963                 break;
  964         }
  965 
  966         return (ret);
  967 }
  968 
  969 static moduledata_t alq_mod =
  970 {
  971         "alq",
  972         alq_load_handler,
  973         NULL
  974 };
  975 
  976 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
  977 MODULE_VERSION(alq, 1);

Cache object: 5a7777de9c0d5ce04a7bfb28097efa21


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.