The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD: releng/9.0/sys/kern/kern_alq.c 207223 2010-04-26 13:48:22Z lstewart $");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 
  103 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  104 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  105 
  106 /* Daemon functions */
  107 static int ald_add(struct alq *);
  108 static int ald_rem(struct alq *);
  109 static void ald_startup(void *);
  110 static void ald_daemon(void);
  111 static void ald_shutdown(void *, int);
  112 static void ald_activate(struct alq *);
  113 static void ald_deactivate(struct alq *);
  114 
  115 /* Internal queue functions */
  116 static void alq_shutdown(struct alq *);
  117 static void alq_destroy(struct alq *);
  118 static int alq_doio(struct alq *);
  119 
  120 
  121 /*
  122  * Add a new queue to the global list.  Fail if we're shutting down.
  123  */
  124 static int
  125 ald_add(struct alq *alq)
  126 {
  127         int error;
  128 
  129         error = 0;
  130 
  131         ALD_LOCK();
  132         if (ald_shutingdown) {
  133                 error = EBUSY;
  134                 goto done;
  135         }
  136         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  137 done:
  138         ALD_UNLOCK();
  139         return (error);
  140 }
  141 
  142 /*
  143  * Remove a queue from the global list unless we're shutting down.  If so,
  144  * the ald will take care of cleaning up it's resources.
  145  */
  146 static int
  147 ald_rem(struct alq *alq)
  148 {
  149         int error;
  150 
  151         error = 0;
  152 
  153         ALD_LOCK();
  154         if (ald_shutingdown) {
  155                 error = EBUSY;
  156                 goto done;
  157         }
  158         LIST_REMOVE(alq, aq_link);
  159 done:
  160         ALD_UNLOCK();
  161         return (error);
  162 }
  163 
  164 /*
  165  * Put a queue on the active list.  This will schedule it for writing.
  166  */
  167 static void
  168 ald_activate(struct alq *alq)
  169 {
  170         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  171         wakeup(&ald_active);
  172 }
  173 
  174 static void
  175 ald_deactivate(struct alq *alq)
  176 {
  177         LIST_REMOVE(alq, aq_act);
  178         alq->aq_flags &= ~AQ_ACTIVE;
  179 }
  180 
  181 static void
  182 ald_startup(void *unused)
  183 {
  184         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  185         LIST_INIT(&ald_queues);
  186         LIST_INIT(&ald_active);
  187 }
  188 
  189 static void
  190 ald_daemon(void)
  191 {
  192         int needwakeup;
  193         struct alq *alq;
  194 
  195         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  196 
  197         EVENTHANDLER_REGISTER(shutdown_pre_sync, ald_shutdown, NULL,
  198             SHUTDOWN_PRI_FIRST);
  199 
  200         ALD_LOCK();
  201 
  202         for (;;) {
  203                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  204                     !ald_shutingdown)
  205                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  206 
  207                 /* Don't shutdown until all active ALQs are flushed. */
  208                 if (ald_shutingdown && alq == NULL) {
  209                         ALD_UNLOCK();
  210                         break;
  211                 }
  212 
  213                 ALQ_LOCK(alq);
  214                 ald_deactivate(alq);
  215                 ALD_UNLOCK();
  216                 needwakeup = alq_doio(alq);
  217                 ALQ_UNLOCK(alq);
  218                 if (needwakeup)
  219                         wakeup_one(alq);
  220                 ALD_LOCK();
  221         }
  222 
  223         kproc_exit(0);
  224 }
  225 
  226 static void
  227 ald_shutdown(void *arg, int howto)
  228 {
  229         struct alq *alq;
  230 
  231         ALD_LOCK();
  232 
  233         /* Ensure no new queues can be created. */
  234         ald_shutingdown = 1;
  235 
  236         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  237         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  238                 LIST_REMOVE(alq, aq_link);
  239                 ALD_UNLOCK();
  240                 alq_shutdown(alq);
  241                 ALD_LOCK();
  242         }
  243 
  244         /* At this point, all ALQs are flushed and shutdown. */
  245 
  246         /*
  247          * Wake ald_daemon so that it exits. It won't be able to do
  248          * anything until we mtx_sleep because we hold the ald_mtx.
  249          */
  250         wakeup(&ald_active);
  251 
  252         /* Wait for ald_daemon to exit. */
  253         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  254 
  255         ALD_UNLOCK();
  256 }
  257 
  258 static void
  259 alq_shutdown(struct alq *alq)
  260 {
  261         ALQ_LOCK(alq);
  262 
  263         /* Stop any new writers. */
  264         alq->aq_flags |= AQ_SHUTDOWN;
  265 
  266         /*
  267          * If the ALQ isn't active but has unwritten data (possible if
  268          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  269          * ALQ here so that the pending data gets flushed by the ald_daemon.
  270          */
  271         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  272                 alq->aq_flags |= AQ_ACTIVE;
  273                 ALQ_UNLOCK(alq);
  274                 ALD_LOCK();
  275                 ald_activate(alq);
  276                 ALD_UNLOCK();
  277                 ALQ_LOCK(alq);
  278         }
  279 
  280         /* Drain IO */
  281         while (alq->aq_flags & AQ_ACTIVE) {
  282                 alq->aq_flags |= AQ_WANTED;
  283                 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
  284         }
  285         ALQ_UNLOCK(alq);
  286 
  287         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  288             curthread);
  289         crfree(alq->aq_cred);
  290 }
  291 
  292 void
  293 alq_destroy(struct alq *alq)
  294 {
  295         /* Drain all pending IO. */
  296         alq_shutdown(alq);
  297 
  298         mtx_destroy(&alq->aq_mtx);
  299         free(alq->aq_entbuf, M_ALD);
  300         free(alq, M_ALD);
  301 }
  302 
  303 /*
  304  * Flush all pending data to disk.  This operation will block.
  305  */
  306 static int
  307 alq_doio(struct alq *alq)
  308 {
  309         struct thread *td;
  310         struct mount *mp;
  311         struct vnode *vp;
  312         struct uio auio;
  313         struct iovec aiov[2];
  314         int totlen;
  315         int iov;
  316         int vfslocked;
  317         int wrapearly;
  318 
  319         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  320 
  321         vp = alq->aq_vp;
  322         td = curthread;
  323         totlen = 0;
  324         iov = 1;
  325         wrapearly = alq->aq_wrapearly;
  326 
  327         bzero(&aiov, sizeof(aiov));
  328         bzero(&auio, sizeof(auio));
  329 
  330         /* Start the write from the location of our buffer tail pointer. */
  331         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  332 
  333         if (alq->aq_writetail < alq->aq_writehead) {
  334                 /* Buffer not wrapped. */
  335                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  336         } else if (alq->aq_writehead == 0) {
  337                 /* Buffer not wrapped (special case to avoid an empty iov). */
  338                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  339                     wrapearly;
  340         } else {
  341                 /*
  342                  * Buffer wrapped, requires 2 aiov entries:
  343                  * - first is from writetail to end of buffer
  344                  * - second is from start of buffer to writehead
  345                  */
  346                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  347                     wrapearly;
  348                 iov++;
  349                 aiov[1].iov_base = alq->aq_entbuf;
  350                 aiov[1].iov_len =  alq->aq_writehead;
  351                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  352         }
  353 
  354         alq->aq_flags |= AQ_FLUSHING;
  355         ALQ_UNLOCK(alq);
  356 
  357         auio.uio_iov = &aiov[0];
  358         auio.uio_offset = 0;
  359         auio.uio_segflg = UIO_SYSSPACE;
  360         auio.uio_rw = UIO_WRITE;
  361         auio.uio_iovcnt = iov;
  362         auio.uio_resid = totlen;
  363         auio.uio_td = td;
  364 
  365         /*
  366          * Do all of the junk required to write now.
  367          */
  368         vfslocked = VFS_LOCK_GIANT(vp->v_mount);
  369         vn_start_write(vp, &mp, V_WAIT);
  370         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
  371         /*
  372          * XXX: VOP_WRITE error checks are ignored.
  373          */
  374 #ifdef MAC
  375         if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
  376 #endif
  377                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  378         VOP_UNLOCK(vp, 0);
  379         vn_finished_write(mp);
  380         VFS_UNLOCK_GIANT(vfslocked);
  381 
  382         ALQ_LOCK(alq);
  383         alq->aq_flags &= ~AQ_FLUSHING;
  384 
  385         /* Adjust writetail as required, taking into account wrapping. */
  386         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  387             alq->aq_buflen;
  388         alq->aq_freebytes += totlen + wrapearly;
  389 
  390         /*
  391          * If we just flushed part of the buffer which wrapped, reset the
  392          * wrapearly indicator.
  393          */
  394         if (wrapearly)
  395                 alq->aq_wrapearly = 0;
  396 
  397         /*
  398          * If we just flushed the buffer completely, reset indexes to 0 to
  399          * minimise buffer wraps.
  400          * This is also required to ensure alq_getn() can't wedge itself.
  401          */
  402         if (!HAS_PENDING_DATA(alq))
  403                 alq->aq_writehead = alq->aq_writetail = 0;
  404 
  405         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  406             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  407 
  408         if (alq->aq_flags & AQ_WANTED) {
  409                 alq->aq_flags &= ~AQ_WANTED;
  410                 return (1);
  411         }
  412 
  413         return(0);
  414 }
  415 
  416 static struct kproc_desc ald_kp = {
  417         "ALQ Daemon",
  418         ald_daemon,
  419         &ald_proc
  420 };
  421 
  422 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  423 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  424 
  425 
  426 /* User visible queue functions */
  427 
  428 /*
  429  * Create the queue data structure, allocate the buffer, and open the file.
  430  */
  431 
  432 int
  433 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  434     int size, int flags)
  435 {
  436         struct thread *td;
  437         struct nameidata nd;
  438         struct alq *alq;
  439         int oflags;
  440         int error;
  441         int vfslocked;
  442 
  443         KASSERT((size > 0), ("%s: size <= 0", __func__));
  444 
  445         *alqp = NULL;
  446         td = curthread;
  447 
  448         NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
  449         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  450 
  451         error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
  452         if (error)
  453                 return (error);
  454 
  455         vfslocked = NDHASGIANT(&nd);
  456         NDFREE(&nd, NDF_ONLY_PNBUF);
  457         /* We just unlock so we hold a reference */
  458         VOP_UNLOCK(nd.ni_vp, 0);
  459         VFS_UNLOCK_GIANT(vfslocked);
  460 
  461         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  462         alq->aq_vp = nd.ni_vp;
  463         alq->aq_cred = crhold(cred);
  464 
  465         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  466 
  467         alq->aq_buflen = size;
  468         alq->aq_entmax = 0;
  469         alq->aq_entlen = 0;
  470 
  471         alq->aq_freebytes = alq->aq_buflen;
  472         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  473         alq->aq_writehead = alq->aq_writetail = 0;
  474         if (flags & ALQ_ORDERED)
  475                 alq->aq_flags |= AQ_ORDERED;
  476 
  477         if ((error = ald_add(alq)) != 0) {
  478                 alq_destroy(alq);
  479                 return (error);
  480         }
  481 
  482         *alqp = alq;
  483 
  484         return (0);
  485 }
  486 
  487 int
  488 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  489     int size, int count)
  490 {
  491         int ret;
  492 
  493         KASSERT((count >= 0), ("%s: count < 0", __func__));
  494 
  495         if (count > 0) {
  496                 ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
  497                 (*alqp)->aq_flags |= AQ_LEGACY;
  498                 (*alqp)->aq_entmax = count;
  499                 (*alqp)->aq_entlen = size;
  500         } else
  501                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  502 
  503         return (ret);
  504 }
  505 
  506 
  507 /*
  508  * Copy a new entry into the queue.  If the operation would block either
  509  * wait or return an error depending on the value of waitok.
  510  */
  511 int
  512 alq_writen(struct alq *alq, void *data, int len, int flags)
  513 {
  514         int activate, copy, ret;
  515         void *waitchan;
  516 
  517         KASSERT((len > 0 && len <= alq->aq_buflen),
  518             ("%s: len <= 0 || len > aq_buflen", __func__));
  519 
  520         activate = ret = 0;
  521         copy = len;
  522         waitchan = NULL;
  523 
  524         ALQ_LOCK(alq);
  525 
  526         /*
  527          * Fail to perform the write and return EWOULDBLOCK if:
  528          * - The message is larger than our underlying buffer.
  529          * - The ALQ is being shutdown.
  530          * - There is insufficient free space in our underlying buffer
  531          *   to accept the message and the user can't wait for space.
  532          * - There is insufficient free space in our underlying buffer
  533          *   to accept the message and the alq is inactive due to prior
  534          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  535          */
  536         if (len > alq->aq_buflen ||
  537             alq->aq_flags & AQ_SHUTDOWN ||
  538             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  539             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  540                 ALQ_UNLOCK(alq);
  541                 return (EWOULDBLOCK);
  542         }
  543 
  544         /*
  545          * If we want ordered writes and there is already at least one thread
  546          * waiting for resources to become available, sleep until we're woken.
  547          */
  548         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  549                 KASSERT(!(flags & ALQ_NOWAIT),
  550                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  551                 alq->aq_waiters++;
  552                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  553                 alq->aq_waiters--;
  554         }
  555 
  556         /*
  557          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  558          * enter while loop and sleep until we have enough free bytes (former)
  559          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  560          * be in this loop. Otherwise, multiple threads may be sleeping here
  561          * competing for ALQ resources.
  562          */
  563         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  564                 KASSERT(!(flags & ALQ_NOWAIT),
  565                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  566                 alq->aq_flags |= AQ_WANTED;
  567                 alq->aq_waiters++;
  568                 if (waitchan)
  569                         wakeup(waitchan);
  570                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  571                 alq->aq_waiters--;
  572 
  573                 /*
  574                  * If we're the first thread to wake after an AQ_WANTED wakeup
  575                  * but there isn't enough free space for us, we're going to loop
  576                  * and sleep again. If there are other threads waiting in this
  577                  * loop, schedule a wakeup so that they can see if the space
  578                  * they require is available.
  579                  */
  580                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  581                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  582                         waitchan = alq;
  583                 else
  584                         waitchan = NULL;
  585         }
  586 
  587         /*
  588          * If there are waiters, we need to signal the waiting threads after we
  589          * complete our work. The alq ptr is used as a wait channel for threads
  590          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  591          * are not allowed to concurrently compete for resources in the above
  592          * while loop, so we use a different wait channel in this case.
  593          */
  594         if (alq->aq_waiters > 0) {
  595                 if (alq->aq_flags & AQ_ORDERED)
  596                         waitchan = &alq->aq_waiters;
  597                 else
  598                         waitchan = alq;
  599         } else
  600                 waitchan = NULL;
  601 
  602         /* Bail if we're shutting down. */
  603         if (alq->aq_flags & AQ_SHUTDOWN) {
  604                 ret = EWOULDBLOCK;
  605                 goto unlock;
  606         }
  607 
  608         /*
  609          * If we need to wrap the buffer to accommodate the write,
  610          * we'll need 2 calls to bcopy.
  611          */
  612         if ((alq->aq_buflen - alq->aq_writehead) < len)
  613                 copy = alq->aq_buflen - alq->aq_writehead;
  614 
  615         /* Copy message (or part thereof if wrap required) to the buffer. */
  616         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  617         alq->aq_writehead += copy;
  618 
  619         if (alq->aq_writehead >= alq->aq_buflen) {
  620                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  621                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  622                     __func__,
  623                     alq->aq_writehead,
  624                     alq->aq_buflen));
  625                 alq->aq_writehead = 0;
  626         }
  627 
  628         if (copy != len) {
  629                 /*
  630                  * Wrap the buffer by copying the remainder of our message
  631                  * to the start of the buffer and resetting aq_writehead.
  632                  */
  633                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  634                 alq->aq_writehead = len - copy;
  635         }
  636 
  637         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  638             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  639 
  640         alq->aq_freebytes -= len;
  641 
  642         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  643                 alq->aq_flags |= AQ_ACTIVE;
  644                 activate = 1;
  645         }
  646 
  647         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  648 
  649 unlock:
  650         ALQ_UNLOCK(alq);
  651 
  652         if (activate) {
  653                 ALD_LOCK();
  654                 ald_activate(alq);
  655                 ALD_UNLOCK();
  656         }
  657 
  658         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  659         if (waitchan != NULL)
  660                 wakeup_one(waitchan);
  661 
  662         return (ret);
  663 }
  664 
  665 int
  666 alq_write(struct alq *alq, void *data, int flags)
  667 {
  668         /* Should only be called in fixed length message (legacy) mode. */
  669         KASSERT((alq->aq_flags & AQ_LEGACY),
  670             ("%s: fixed length write on variable length queue", __func__));
  671         return (alq_writen(alq, data, alq->aq_entlen, flags));
  672 }
  673 
  674 /*
  675  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  676  */
  677 struct ale *
  678 alq_getn(struct alq *alq, int len, int flags)
  679 {
  680         int contigbytes;
  681         void *waitchan;
  682 
  683         KASSERT((len > 0 && len <= alq->aq_buflen),
  684             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  685 
  686         waitchan = NULL;
  687 
  688         ALQ_LOCK(alq);
  689 
  690         /*
  691          * Determine the number of free contiguous bytes.
  692          * We ensure elsewhere that if aq_writehead == aq_writetail because
  693          * the buffer is empty, they will both be set to 0 and therefore
  694          * aq_freebytes == aq_buflen and is fully contiguous.
  695          * If they are equal and the buffer is not empty, aq_freebytes will
  696          * be 0 indicating the buffer is full.
  697          */
  698         if (alq->aq_writehead <= alq->aq_writetail)
  699                 contigbytes = alq->aq_freebytes;
  700         else {
  701                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  702 
  703                 if (contigbytes < len) {
  704                         /*
  705                          * Insufficient space at end of buffer to handle a
  706                          * contiguous write. Wrap early if there's space at
  707                          * the beginning. This will leave a hole at the end
  708                          * of the buffer which we will have to skip over when
  709                          * flushing the buffer to disk.
  710                          */
  711                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  712                                 /* Keep track of # bytes left blank. */
  713                                 alq->aq_wrapearly = contigbytes;
  714                                 /* Do the wrap and adjust counters. */
  715                                 contigbytes = alq->aq_freebytes =
  716                                     alq->aq_writetail;
  717                                 alq->aq_writehead = 0;
  718                         }
  719                 }
  720         }
  721 
  722         /*
  723          * Return a NULL ALE if:
  724          * - The message is larger than our underlying buffer.
  725          * - The ALQ is being shutdown.
  726          * - There is insufficient free space in our underlying buffer
  727          *   to accept the message and the user can't wait for space.
  728          * - There is insufficient free space in our underlying buffer
  729          *   to accept the message and the alq is inactive due to prior
  730          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  731          */
  732         if (len > alq->aq_buflen ||
  733             alq->aq_flags & AQ_SHUTDOWN ||
  734             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  735             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  736                 ALQ_UNLOCK(alq);
  737                 return (NULL);
  738         }
  739 
  740         /*
  741          * If we want ordered writes and there is already at least one thread
  742          * waiting for resources to become available, sleep until we're woken.
  743          */
  744         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  745                 KASSERT(!(flags & ALQ_NOWAIT),
  746                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  747                 alq->aq_waiters++;
  748                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  749                 alq->aq_waiters--;
  750         }
  751 
  752         /*
  753          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  754          * while loop and sleep until we have enough contiguous free bytes
  755          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  756          * time will be in this loop. Otherwise, multiple threads may be
  757          * sleeping here competing for ALQ resources.
  758          */
  759         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  760                 KASSERT(!(flags & ALQ_NOWAIT),
  761                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  762                 alq->aq_flags |= AQ_WANTED;
  763                 alq->aq_waiters++;
  764                 if (waitchan)
  765                         wakeup(waitchan);
  766                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  767                 alq->aq_waiters--;
  768 
  769                 if (alq->aq_writehead <= alq->aq_writetail)
  770                         contigbytes = alq->aq_freebytes;
  771                 else
  772                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  773 
  774                 /*
  775                  * If we're the first thread to wake after an AQ_WANTED wakeup
  776                  * but there isn't enough free space for us, we're going to loop
  777                  * and sleep again. If there are other threads waiting in this
  778                  * loop, schedule a wakeup so that they can see if the space
  779                  * they require is available.
  780                  */
  781                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  782                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  783                         waitchan = alq;
  784                 else
  785                         waitchan = NULL;
  786         }
  787 
  788         /*
  789          * If there are waiters, we need to signal the waiting threads after we
  790          * complete our work. The alq ptr is used as a wait channel for threads
  791          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  792          * are not allowed to concurrently compete for resources in the above
  793          * while loop, so we use a different wait channel in this case.
  794          */
  795         if (alq->aq_waiters > 0) {
  796                 if (alq->aq_flags & AQ_ORDERED)
  797                         waitchan = &alq->aq_waiters;
  798                 else
  799                         waitchan = alq;
  800         } else
  801                 waitchan = NULL;
  802 
  803         /* Bail if we're shutting down. */
  804         if (alq->aq_flags & AQ_SHUTDOWN) {
  805                 ALQ_UNLOCK(alq);
  806                 if (waitchan != NULL)
  807                         wakeup_one(waitchan);
  808                 return (NULL);
  809         }
  810 
  811         /*
  812          * If we are here, we have a contiguous number of bytes >= len
  813          * available in our buffer starting at aq_writehead.
  814          */
  815         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  816         alq->aq_getpost.ae_bytesused = len;
  817 
  818         return (&alq->aq_getpost);
  819 }
  820 
  821 struct ale *
  822 alq_get(struct alq *alq, int flags)
  823 {
  824         /* Should only be called in fixed length message (legacy) mode. */
  825         KASSERT((alq->aq_flags & AQ_LEGACY),
  826             ("%s: fixed length get on variable length queue", __func__));
  827         return (alq_getn(alq, alq->aq_entlen, flags));
  828 }
  829 
  830 void
  831 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  832 {
  833         int activate;
  834         void *waitchan;
  835 
  836         activate = 0;
  837 
  838         if (ale->ae_bytesused > 0) {
  839                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  840                     !(flags & ALQ_NOACTIVATE)) {
  841                         alq->aq_flags |= AQ_ACTIVE;
  842                         activate = 1;
  843                 }
  844 
  845                 alq->aq_writehead += ale->ae_bytesused;
  846                 alq->aq_freebytes -= ale->ae_bytesused;
  847 
  848                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  849                 if (alq->aq_writehead == alq->aq_buflen)
  850                         alq->aq_writehead = 0;
  851 
  852                 KASSERT((alq->aq_writehead >= 0 &&
  853                     alq->aq_writehead < alq->aq_buflen),
  854                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  855                     __func__));
  856 
  857                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  858         }
  859 
  860         /*
  861          * If there are waiters, we need to signal the waiting threads after we
  862          * complete our work. The alq ptr is used as a wait channel for threads
  863          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  864          * are not allowed to concurrently compete for resources in the
  865          * alq_getn() while loop, so we use a different wait channel in this case.
  866          */
  867         if (alq->aq_waiters > 0) {
  868                 if (alq->aq_flags & AQ_ORDERED)
  869                         waitchan = &alq->aq_waiters;
  870                 else
  871                         waitchan = alq;
  872         } else
  873                 waitchan = NULL;
  874 
  875         ALQ_UNLOCK(alq);
  876 
  877         if (activate) {
  878                 ALD_LOCK();
  879                 ald_activate(alq);
  880                 ALD_UNLOCK();
  881         }
  882 
  883         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  884         if (waitchan != NULL)
  885                 wakeup_one(waitchan);
  886 }
  887 
  888 void
  889 alq_flush(struct alq *alq)
  890 {
  891         int needwakeup = 0;
  892 
  893         ALD_LOCK();
  894         ALQ_LOCK(alq);
  895 
  896         /*
  897          * Pull the lever iff there is data to flush and we're
  898          * not already in the middle of a flush operation.
  899          */
  900         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  901                 if (alq->aq_flags & AQ_ACTIVE)
  902                         ald_deactivate(alq);
  903 
  904                 ALD_UNLOCK();
  905                 needwakeup = alq_doio(alq);
  906         } else
  907                 ALD_UNLOCK();
  908 
  909         ALQ_UNLOCK(alq);
  910 
  911         if (needwakeup)
  912                 wakeup_one(alq);
  913 }
  914 
  915 /*
  916  * Flush remaining data, close the file and free all resources.
  917  */
  918 void
  919 alq_close(struct alq *alq)
  920 {
  921         /* Only flush and destroy alq if not already shutting down. */
  922         if (ald_rem(alq) == 0)
  923                 alq_destroy(alq);
  924 }
  925 
  926 static int
  927 alq_load_handler(module_t mod, int what, void *arg)
  928 {
  929         int ret;
  930         
  931         ret = 0;
  932 
  933         switch (what) {
  934         case MOD_LOAD:
  935         case MOD_SHUTDOWN:
  936                 break;
  937 
  938         case MOD_QUIESCE:
  939                 ALD_LOCK();
  940                 /* Only allow unload if there are no open queues. */
  941                 if (LIST_FIRST(&ald_queues) == NULL) {
  942                         ald_shutingdown = 1;
  943                         ALD_UNLOCK();
  944                         ald_shutdown(NULL, 0);
  945                         mtx_destroy(&ald_mtx);
  946                 } else {
  947                         ALD_UNLOCK();
  948                         ret = EBUSY;
  949                 }
  950                 break;
  951 
  952         case MOD_UNLOAD:
  953                 /* If MOD_QUIESCE failed we must fail here too. */
  954                 if (ald_shutingdown == 0)
  955                         ret = EBUSY;
  956                 break;
  957 
  958         default:
  959                 ret = EINVAL;
  960                 break;
  961         }
  962 
  963         return (ret);
  964 }
  965 
  966 static moduledata_t alq_mod =
  967 {
  968         "alq",
  969         alq_load_handler,
  970         NULL
  971 };
  972 
  973 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
  974 MODULE_VERSION(alq, 1);

Cache object: c13774309f0fc575464ce9612d609102


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.