The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD: releng/10.0/sys/kern/kern_alq.c 251838 2013-06-17 09:49:07Z lstewart $");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 static eventhandler_tag alq_eventhandler_tag = NULL;
  103 
  104 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  105 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  106 
  107 /* Daemon functions */
  108 static int ald_add(struct alq *);
  109 static int ald_rem(struct alq *);
  110 static void ald_startup(void *);
  111 static void ald_daemon(void);
  112 static void ald_shutdown(void *, int);
  113 static void ald_activate(struct alq *);
  114 static void ald_deactivate(struct alq *);
  115 
  116 /* Internal queue functions */
  117 static void alq_shutdown(struct alq *);
  118 static void alq_destroy(struct alq *);
  119 static int alq_doio(struct alq *);
  120 
  121 
  122 /*
  123  * Add a new queue to the global list.  Fail if we're shutting down.
  124  */
  125 static int
  126 ald_add(struct alq *alq)
  127 {
  128         int error;
  129 
  130         error = 0;
  131 
  132         ALD_LOCK();
  133         if (ald_shutingdown) {
  134                 error = EBUSY;
  135                 goto done;
  136         }
  137         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  138 done:
  139         ALD_UNLOCK();
  140         return (error);
  141 }
  142 
  143 /*
  144  * Remove a queue from the global list unless we're shutting down.  If so,
  145  * the ald will take care of cleaning up it's resources.
  146  */
  147 static int
  148 ald_rem(struct alq *alq)
  149 {
  150         int error;
  151 
  152         error = 0;
  153 
  154         ALD_LOCK();
  155         if (ald_shutingdown) {
  156                 error = EBUSY;
  157                 goto done;
  158         }
  159         LIST_REMOVE(alq, aq_link);
  160 done:
  161         ALD_UNLOCK();
  162         return (error);
  163 }
  164 
  165 /*
  166  * Put a queue on the active list.  This will schedule it for writing.
  167  */
  168 static void
  169 ald_activate(struct alq *alq)
  170 {
  171         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  172         wakeup(&ald_active);
  173 }
  174 
  175 static void
  176 ald_deactivate(struct alq *alq)
  177 {
  178         LIST_REMOVE(alq, aq_act);
  179         alq->aq_flags &= ~AQ_ACTIVE;
  180 }
  181 
  182 static void
  183 ald_startup(void *unused)
  184 {
  185         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  186         LIST_INIT(&ald_queues);
  187         LIST_INIT(&ald_active);
  188 }
  189 
  190 static void
  191 ald_daemon(void)
  192 {
  193         int needwakeup;
  194         struct alq *alq;
  195 
  196         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  197 
  198         alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
  199             ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
  200 
  201         ALD_LOCK();
  202 
  203         for (;;) {
  204                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  205                     !ald_shutingdown)
  206                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  207 
  208                 /* Don't shutdown until all active ALQs are flushed. */
  209                 if (ald_shutingdown && alq == NULL) {
  210                         ALD_UNLOCK();
  211                         break;
  212                 }
  213 
  214                 ALQ_LOCK(alq);
  215                 ald_deactivate(alq);
  216                 ALD_UNLOCK();
  217                 needwakeup = alq_doio(alq);
  218                 ALQ_UNLOCK(alq);
  219                 if (needwakeup)
  220                         wakeup_one(alq);
  221                 ALD_LOCK();
  222         }
  223 
  224         kproc_exit(0);
  225 }
  226 
  227 static void
  228 ald_shutdown(void *arg, int howto)
  229 {
  230         struct alq *alq;
  231 
  232         ALD_LOCK();
  233 
  234         /* Ensure no new queues can be created. */
  235         ald_shutingdown = 1;
  236 
  237         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  238         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  239                 LIST_REMOVE(alq, aq_link);
  240                 ALD_UNLOCK();
  241                 alq_shutdown(alq);
  242                 ALD_LOCK();
  243         }
  244 
  245         /* At this point, all ALQs are flushed and shutdown. */
  246 
  247         /*
  248          * Wake ald_daemon so that it exits. It won't be able to do
  249          * anything until we mtx_sleep because we hold the ald_mtx.
  250          */
  251         wakeup(&ald_active);
  252 
  253         /* Wait for ald_daemon to exit. */
  254         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  255 
  256         ALD_UNLOCK();
  257 }
  258 
  259 static void
  260 alq_shutdown(struct alq *alq)
  261 {
  262         ALQ_LOCK(alq);
  263 
  264         /* Stop any new writers. */
  265         alq->aq_flags |= AQ_SHUTDOWN;
  266 
  267         /*
  268          * If the ALQ isn't active but has unwritten data (possible if
  269          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  270          * ALQ here so that the pending data gets flushed by the ald_daemon.
  271          */
  272         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  273                 alq->aq_flags |= AQ_ACTIVE;
  274                 ALQ_UNLOCK(alq);
  275                 ALD_LOCK();
  276                 ald_activate(alq);
  277                 ALD_UNLOCK();
  278                 ALQ_LOCK(alq);
  279         }
  280 
  281         /* Drain IO */
  282         while (alq->aq_flags & AQ_ACTIVE) {
  283                 alq->aq_flags |= AQ_WANTED;
  284                 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
  285         }
  286         ALQ_UNLOCK(alq);
  287 
  288         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  289             curthread);
  290         crfree(alq->aq_cred);
  291 }
  292 
  293 void
  294 alq_destroy(struct alq *alq)
  295 {
  296         /* Drain all pending IO. */
  297         alq_shutdown(alq);
  298 
  299         mtx_destroy(&alq->aq_mtx);
  300         free(alq->aq_entbuf, M_ALD);
  301         free(alq, M_ALD);
  302 }
  303 
  304 /*
  305  * Flush all pending data to disk.  This operation will block.
  306  */
  307 static int
  308 alq_doio(struct alq *alq)
  309 {
  310         struct thread *td;
  311         struct mount *mp;
  312         struct vnode *vp;
  313         struct uio auio;
  314         struct iovec aiov[2];
  315         int totlen;
  316         int iov;
  317         int wrapearly;
  318 
  319         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  320 
  321         vp = alq->aq_vp;
  322         td = curthread;
  323         totlen = 0;
  324         iov = 1;
  325         wrapearly = alq->aq_wrapearly;
  326 
  327         bzero(&aiov, sizeof(aiov));
  328         bzero(&auio, sizeof(auio));
  329 
  330         /* Start the write from the location of our buffer tail pointer. */
  331         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  332 
  333         if (alq->aq_writetail < alq->aq_writehead) {
  334                 /* Buffer not wrapped. */
  335                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  336         } else if (alq->aq_writehead == 0) {
  337                 /* Buffer not wrapped (special case to avoid an empty iov). */
  338                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  339                     wrapearly;
  340         } else {
  341                 /*
  342                  * Buffer wrapped, requires 2 aiov entries:
  343                  * - first is from writetail to end of buffer
  344                  * - second is from start of buffer to writehead
  345                  */
  346                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  347                     wrapearly;
  348                 iov++;
  349                 aiov[1].iov_base = alq->aq_entbuf;
  350                 aiov[1].iov_len =  alq->aq_writehead;
  351                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  352         }
  353 
  354         alq->aq_flags |= AQ_FLUSHING;
  355         ALQ_UNLOCK(alq);
  356 
  357         auio.uio_iov = &aiov[0];
  358         auio.uio_offset = 0;
  359         auio.uio_segflg = UIO_SYSSPACE;
  360         auio.uio_rw = UIO_WRITE;
  361         auio.uio_iovcnt = iov;
  362         auio.uio_resid = totlen;
  363         auio.uio_td = td;
  364 
  365         /*
  366          * Do all of the junk required to write now.
  367          */
  368         vn_start_write(vp, &mp, V_WAIT);
  369         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
  370         /*
  371          * XXX: VOP_WRITE error checks are ignored.
  372          */
  373 #ifdef MAC
  374         if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
  375 #endif
  376                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  377         VOP_UNLOCK(vp, 0);
  378         vn_finished_write(mp);
  379 
  380         ALQ_LOCK(alq);
  381         alq->aq_flags &= ~AQ_FLUSHING;
  382 
  383         /* Adjust writetail as required, taking into account wrapping. */
  384         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  385             alq->aq_buflen;
  386         alq->aq_freebytes += totlen + wrapearly;
  387 
  388         /*
  389          * If we just flushed part of the buffer which wrapped, reset the
  390          * wrapearly indicator.
  391          */
  392         if (wrapearly)
  393                 alq->aq_wrapearly = 0;
  394 
  395         /*
  396          * If we just flushed the buffer completely, reset indexes to 0 to
  397          * minimise buffer wraps.
  398          * This is also required to ensure alq_getn() can't wedge itself.
  399          */
  400         if (!HAS_PENDING_DATA(alq))
  401                 alq->aq_writehead = alq->aq_writetail = 0;
  402 
  403         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  404             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  405 
  406         if (alq->aq_flags & AQ_WANTED) {
  407                 alq->aq_flags &= ~AQ_WANTED;
  408                 return (1);
  409         }
  410 
  411         return(0);
  412 }
  413 
  414 static struct kproc_desc ald_kp = {
  415         "ALQ Daemon",
  416         ald_daemon,
  417         &ald_proc
  418 };
  419 
  420 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  421 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  422 
  423 
  424 /* User visible queue functions */
  425 
  426 /*
  427  * Create the queue data structure, allocate the buffer, and open the file.
  428  */
  429 
  430 int
  431 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  432     int size, int flags)
  433 {
  434         struct thread *td;
  435         struct nameidata nd;
  436         struct alq *alq;
  437         int oflags;
  438         int error;
  439 
  440         KASSERT((size > 0), ("%s: size <= 0", __func__));
  441 
  442         *alqp = NULL;
  443         td = curthread;
  444 
  445         NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td);
  446         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  447 
  448         error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
  449         if (error)
  450                 return (error);
  451 
  452         NDFREE(&nd, NDF_ONLY_PNBUF);
  453         /* We just unlock so we hold a reference */
  454         VOP_UNLOCK(nd.ni_vp, 0);
  455 
  456         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  457         alq->aq_vp = nd.ni_vp;
  458         alq->aq_cred = crhold(cred);
  459 
  460         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  461 
  462         alq->aq_buflen = size;
  463         alq->aq_entmax = 0;
  464         alq->aq_entlen = 0;
  465 
  466         alq->aq_freebytes = alq->aq_buflen;
  467         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  468         alq->aq_writehead = alq->aq_writetail = 0;
  469         if (flags & ALQ_ORDERED)
  470                 alq->aq_flags |= AQ_ORDERED;
  471 
  472         if ((error = ald_add(alq)) != 0) {
  473                 alq_destroy(alq);
  474                 return (error);
  475         }
  476 
  477         *alqp = alq;
  478 
  479         return (0);
  480 }
  481 
  482 int
  483 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  484     int size, int count)
  485 {
  486         int ret;
  487 
  488         KASSERT((count >= 0), ("%s: count < 0", __func__));
  489 
  490         if (count > 0) {
  491                 ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
  492                 (*alqp)->aq_flags |= AQ_LEGACY;
  493                 (*alqp)->aq_entmax = count;
  494                 (*alqp)->aq_entlen = size;
  495         } else
  496                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  497 
  498         return (ret);
  499 }
  500 
  501 
  502 /*
  503  * Copy a new entry into the queue.  If the operation would block either
  504  * wait or return an error depending on the value of waitok.
  505  */
  506 int
  507 alq_writen(struct alq *alq, void *data, int len, int flags)
  508 {
  509         int activate, copy, ret;
  510         void *waitchan;
  511 
  512         KASSERT((len > 0 && len <= alq->aq_buflen),
  513             ("%s: len <= 0 || len > aq_buflen", __func__));
  514 
  515         activate = ret = 0;
  516         copy = len;
  517         waitchan = NULL;
  518 
  519         ALQ_LOCK(alq);
  520 
  521         /*
  522          * Fail to perform the write and return EWOULDBLOCK if:
  523          * - The message is larger than our underlying buffer.
  524          * - The ALQ is being shutdown.
  525          * - There is insufficient free space in our underlying buffer
  526          *   to accept the message and the user can't wait for space.
  527          * - There is insufficient free space in our underlying buffer
  528          *   to accept the message and the alq is inactive due to prior
  529          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  530          */
  531         if (len > alq->aq_buflen ||
  532             alq->aq_flags & AQ_SHUTDOWN ||
  533             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  534             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  535                 ALQ_UNLOCK(alq);
  536                 return (EWOULDBLOCK);
  537         }
  538 
  539         /*
  540          * If we want ordered writes and there is already at least one thread
  541          * waiting for resources to become available, sleep until we're woken.
  542          */
  543         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  544                 KASSERT(!(flags & ALQ_NOWAIT),
  545                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  546                 alq->aq_waiters++;
  547                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  548                 alq->aq_waiters--;
  549         }
  550 
  551         /*
  552          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  553          * enter while loop and sleep until we have enough free bytes (former)
  554          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  555          * be in this loop. Otherwise, multiple threads may be sleeping here
  556          * competing for ALQ resources.
  557          */
  558         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  559                 KASSERT(!(flags & ALQ_NOWAIT),
  560                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  561                 alq->aq_flags |= AQ_WANTED;
  562                 alq->aq_waiters++;
  563                 if (waitchan)
  564                         wakeup(waitchan);
  565                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  566                 alq->aq_waiters--;
  567 
  568                 /*
  569                  * If we're the first thread to wake after an AQ_WANTED wakeup
  570                  * but there isn't enough free space for us, we're going to loop
  571                  * and sleep again. If there are other threads waiting in this
  572                  * loop, schedule a wakeup so that they can see if the space
  573                  * they require is available.
  574                  */
  575                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  576                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  577                         waitchan = alq;
  578                 else
  579                         waitchan = NULL;
  580         }
  581 
  582         /*
  583          * If there are waiters, we need to signal the waiting threads after we
  584          * complete our work. The alq ptr is used as a wait channel for threads
  585          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  586          * are not allowed to concurrently compete for resources in the above
  587          * while loop, so we use a different wait channel in this case.
  588          */
  589         if (alq->aq_waiters > 0) {
  590                 if (alq->aq_flags & AQ_ORDERED)
  591                         waitchan = &alq->aq_waiters;
  592                 else
  593                         waitchan = alq;
  594         } else
  595                 waitchan = NULL;
  596 
  597         /* Bail if we're shutting down. */
  598         if (alq->aq_flags & AQ_SHUTDOWN) {
  599                 ret = EWOULDBLOCK;
  600                 goto unlock;
  601         }
  602 
  603         /*
  604          * If we need to wrap the buffer to accommodate the write,
  605          * we'll need 2 calls to bcopy.
  606          */
  607         if ((alq->aq_buflen - alq->aq_writehead) < len)
  608                 copy = alq->aq_buflen - alq->aq_writehead;
  609 
  610         /* Copy message (or part thereof if wrap required) to the buffer. */
  611         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  612         alq->aq_writehead += copy;
  613 
  614         if (alq->aq_writehead >= alq->aq_buflen) {
  615                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  616                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  617                     __func__,
  618                     alq->aq_writehead,
  619                     alq->aq_buflen));
  620                 alq->aq_writehead = 0;
  621         }
  622 
  623         if (copy != len) {
  624                 /*
  625                  * Wrap the buffer by copying the remainder of our message
  626                  * to the start of the buffer and resetting aq_writehead.
  627                  */
  628                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  629                 alq->aq_writehead = len - copy;
  630         }
  631 
  632         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  633             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  634 
  635         alq->aq_freebytes -= len;
  636 
  637         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  638                 alq->aq_flags |= AQ_ACTIVE;
  639                 activate = 1;
  640         }
  641 
  642         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  643 
  644 unlock:
  645         ALQ_UNLOCK(alq);
  646 
  647         if (activate) {
  648                 ALD_LOCK();
  649                 ald_activate(alq);
  650                 ALD_UNLOCK();
  651         }
  652 
  653         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  654         if (waitchan != NULL)
  655                 wakeup_one(waitchan);
  656 
  657         return (ret);
  658 }
  659 
  660 int
  661 alq_write(struct alq *alq, void *data, int flags)
  662 {
  663         /* Should only be called in fixed length message (legacy) mode. */
  664         KASSERT((alq->aq_flags & AQ_LEGACY),
  665             ("%s: fixed length write on variable length queue", __func__));
  666         return (alq_writen(alq, data, alq->aq_entlen, flags));
  667 }
  668 
  669 /*
  670  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  671  */
  672 struct ale *
  673 alq_getn(struct alq *alq, int len, int flags)
  674 {
  675         int contigbytes;
  676         void *waitchan;
  677 
  678         KASSERT((len > 0 && len <= alq->aq_buflen),
  679             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  680 
  681         waitchan = NULL;
  682 
  683         ALQ_LOCK(alq);
  684 
  685         /*
  686          * Determine the number of free contiguous bytes.
  687          * We ensure elsewhere that if aq_writehead == aq_writetail because
  688          * the buffer is empty, they will both be set to 0 and therefore
  689          * aq_freebytes == aq_buflen and is fully contiguous.
  690          * If they are equal and the buffer is not empty, aq_freebytes will
  691          * be 0 indicating the buffer is full.
  692          */
  693         if (alq->aq_writehead <= alq->aq_writetail)
  694                 contigbytes = alq->aq_freebytes;
  695         else {
  696                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  697 
  698                 if (contigbytes < len) {
  699                         /*
  700                          * Insufficient space at end of buffer to handle a
  701                          * contiguous write. Wrap early if there's space at
  702                          * the beginning. This will leave a hole at the end
  703                          * of the buffer which we will have to skip over when
  704                          * flushing the buffer to disk.
  705                          */
  706                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  707                                 /* Keep track of # bytes left blank. */
  708                                 alq->aq_wrapearly = contigbytes;
  709                                 /* Do the wrap and adjust counters. */
  710                                 contigbytes = alq->aq_freebytes =
  711                                     alq->aq_writetail;
  712                                 alq->aq_writehead = 0;
  713                         }
  714                 }
  715         }
  716 
  717         /*
  718          * Return a NULL ALE if:
  719          * - The message is larger than our underlying buffer.
  720          * - The ALQ is being shutdown.
  721          * - There is insufficient free space in our underlying buffer
  722          *   to accept the message and the user can't wait for space.
  723          * - There is insufficient free space in our underlying buffer
  724          *   to accept the message and the alq is inactive due to prior
  725          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  726          */
  727         if (len > alq->aq_buflen ||
  728             alq->aq_flags & AQ_SHUTDOWN ||
  729             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  730             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  731                 ALQ_UNLOCK(alq);
  732                 return (NULL);
  733         }
  734 
  735         /*
  736          * If we want ordered writes and there is already at least one thread
  737          * waiting for resources to become available, sleep until we're woken.
  738          */
  739         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  740                 KASSERT(!(flags & ALQ_NOWAIT),
  741                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  742                 alq->aq_waiters++;
  743                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  744                 alq->aq_waiters--;
  745         }
  746 
  747         /*
  748          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  749          * while loop and sleep until we have enough contiguous free bytes
  750          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  751          * time will be in this loop. Otherwise, multiple threads may be
  752          * sleeping here competing for ALQ resources.
  753          */
  754         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  755                 KASSERT(!(flags & ALQ_NOWAIT),
  756                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  757                 alq->aq_flags |= AQ_WANTED;
  758                 alq->aq_waiters++;
  759                 if (waitchan)
  760                         wakeup(waitchan);
  761                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  762                 alq->aq_waiters--;
  763 
  764                 if (alq->aq_writehead <= alq->aq_writetail)
  765                         contigbytes = alq->aq_freebytes;
  766                 else
  767                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  768 
  769                 /*
  770                  * If we're the first thread to wake after an AQ_WANTED wakeup
  771                  * but there isn't enough free space for us, we're going to loop
  772                  * and sleep again. If there are other threads waiting in this
  773                  * loop, schedule a wakeup so that they can see if the space
  774                  * they require is available.
  775                  */
  776                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  777                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  778                         waitchan = alq;
  779                 else
  780                         waitchan = NULL;
  781         }
  782 
  783         /*
  784          * If there are waiters, we need to signal the waiting threads after we
  785          * complete our work. The alq ptr is used as a wait channel for threads
  786          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  787          * are not allowed to concurrently compete for resources in the above
  788          * while loop, so we use a different wait channel in this case.
  789          */
  790         if (alq->aq_waiters > 0) {
  791                 if (alq->aq_flags & AQ_ORDERED)
  792                         waitchan = &alq->aq_waiters;
  793                 else
  794                         waitchan = alq;
  795         } else
  796                 waitchan = NULL;
  797 
  798         /* Bail if we're shutting down. */
  799         if (alq->aq_flags & AQ_SHUTDOWN) {
  800                 ALQ_UNLOCK(alq);
  801                 if (waitchan != NULL)
  802                         wakeup_one(waitchan);
  803                 return (NULL);
  804         }
  805 
  806         /*
  807          * If we are here, we have a contiguous number of bytes >= len
  808          * available in our buffer starting at aq_writehead.
  809          */
  810         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  811         alq->aq_getpost.ae_bytesused = len;
  812 
  813         return (&alq->aq_getpost);
  814 }
  815 
  816 struct ale *
  817 alq_get(struct alq *alq, int flags)
  818 {
  819         /* Should only be called in fixed length message (legacy) mode. */
  820         KASSERT((alq->aq_flags & AQ_LEGACY),
  821             ("%s: fixed length get on variable length queue", __func__));
  822         return (alq_getn(alq, alq->aq_entlen, flags));
  823 }
  824 
  825 void
  826 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  827 {
  828         int activate;
  829         void *waitchan;
  830 
  831         activate = 0;
  832 
  833         if (ale->ae_bytesused > 0) {
  834                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  835                     !(flags & ALQ_NOACTIVATE)) {
  836                         alq->aq_flags |= AQ_ACTIVE;
  837                         activate = 1;
  838                 }
  839 
  840                 alq->aq_writehead += ale->ae_bytesused;
  841                 alq->aq_freebytes -= ale->ae_bytesused;
  842 
  843                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  844                 if (alq->aq_writehead == alq->aq_buflen)
  845                         alq->aq_writehead = 0;
  846 
  847                 KASSERT((alq->aq_writehead >= 0 &&
  848                     alq->aq_writehead < alq->aq_buflen),
  849                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  850                     __func__));
  851 
  852                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  853         }
  854 
  855         /*
  856          * If there are waiters, we need to signal the waiting threads after we
  857          * complete our work. The alq ptr is used as a wait channel for threads
  858          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  859          * are not allowed to concurrently compete for resources in the
  860          * alq_getn() while loop, so we use a different wait channel in this case.
  861          */
  862         if (alq->aq_waiters > 0) {
  863                 if (alq->aq_flags & AQ_ORDERED)
  864                         waitchan = &alq->aq_waiters;
  865                 else
  866                         waitchan = alq;
  867         } else
  868                 waitchan = NULL;
  869 
  870         ALQ_UNLOCK(alq);
  871 
  872         if (activate) {
  873                 ALD_LOCK();
  874                 ald_activate(alq);
  875                 ALD_UNLOCK();
  876         }
  877 
  878         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  879         if (waitchan != NULL)
  880                 wakeup_one(waitchan);
  881 }
  882 
  883 void
  884 alq_flush(struct alq *alq)
  885 {
  886         int needwakeup = 0;
  887 
  888         ALD_LOCK();
  889         ALQ_LOCK(alq);
  890 
  891         /*
  892          * Pull the lever iff there is data to flush and we're
  893          * not already in the middle of a flush operation.
  894          */
  895         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  896                 if (alq->aq_flags & AQ_ACTIVE)
  897                         ald_deactivate(alq);
  898 
  899                 ALD_UNLOCK();
  900                 needwakeup = alq_doio(alq);
  901         } else
  902                 ALD_UNLOCK();
  903 
  904         ALQ_UNLOCK(alq);
  905 
  906         if (needwakeup)
  907                 wakeup_one(alq);
  908 }
  909 
  910 /*
  911  * Flush remaining data, close the file and free all resources.
  912  */
  913 void
  914 alq_close(struct alq *alq)
  915 {
  916         /* Only flush and destroy alq if not already shutting down. */
  917         if (ald_rem(alq) == 0)
  918                 alq_destroy(alq);
  919 }
  920 
  921 static int
  922 alq_load_handler(module_t mod, int what, void *arg)
  923 {
  924         int ret;
  925         
  926         ret = 0;
  927 
  928         switch (what) {
  929         case MOD_LOAD:
  930         case MOD_SHUTDOWN:
  931                 break;
  932 
  933         case MOD_QUIESCE:
  934                 ALD_LOCK();
  935                 /* Only allow unload if there are no open queues. */
  936                 if (LIST_FIRST(&ald_queues) == NULL) {
  937                         ald_shutingdown = 1;
  938                         ALD_UNLOCK();
  939                         EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
  940                             alq_eventhandler_tag);
  941                         ald_shutdown(NULL, 0);
  942                         mtx_destroy(&ald_mtx);
  943                 } else {
  944                         ALD_UNLOCK();
  945                         ret = EBUSY;
  946                 }
  947                 break;
  948 
  949         case MOD_UNLOAD:
  950                 /* If MOD_QUIESCE failed we must fail here too. */
  951                 if (ald_shutingdown == 0)
  952                         ret = EBUSY;
  953                 break;
  954 
  955         default:
  956                 ret = EINVAL;
  957                 break;
  958         }
  959 
  960         return (ret);
  961 }
  962 
  963 static moduledata_t alq_mod =
  964 {
  965         "alq",
  966         alq_load_handler,
  967         NULL
  968 };
  969 
  970 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
  971 MODULE_VERSION(alq, 1);

Cache object: a3563b5efb72e5ae10a6bb827f00cecc


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.