The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD$");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 static eventhandler_tag alq_eventhandler_tag = NULL;
  103 
  104 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  105 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  106 
  107 /* Daemon functions */
  108 static int ald_add(struct alq *);
  109 static int ald_rem(struct alq *);
  110 static void ald_startup(void *);
  111 static void ald_daemon(void);
  112 static void ald_shutdown(void *, int);
  113 static void ald_activate(struct alq *);
  114 static void ald_deactivate(struct alq *);
  115 
  116 /* Internal queue functions */
  117 static void alq_shutdown(struct alq *);
  118 static void alq_destroy(struct alq *);
  119 static int alq_doio(struct alq *);
  120 
  121 
  122 /*
  123  * Add a new queue to the global list.  Fail if we're shutting down.
  124  */
  125 static int
  126 ald_add(struct alq *alq)
  127 {
  128         int error;
  129 
  130         error = 0;
  131 
  132         ALD_LOCK();
  133         if (ald_shutingdown) {
  134                 error = EBUSY;
  135                 goto done;
  136         }
  137         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  138 done:
  139         ALD_UNLOCK();
  140         return (error);
  141 }
  142 
  143 /*
  144  * Remove a queue from the global list unless we're shutting down.  If so,
  145  * the ald will take care of cleaning up it's resources.
  146  */
  147 static int
  148 ald_rem(struct alq *alq)
  149 {
  150         int error;
  151 
  152         error = 0;
  153 
  154         ALD_LOCK();
  155         if (ald_shutingdown) {
  156                 error = EBUSY;
  157                 goto done;
  158         }
  159         LIST_REMOVE(alq, aq_link);
  160 done:
  161         ALD_UNLOCK();
  162         return (error);
  163 }
  164 
  165 /*
  166  * Put a queue on the active list.  This will schedule it for writing.
  167  */
  168 static void
  169 ald_activate(struct alq *alq)
  170 {
  171         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  172         wakeup(&ald_active);
  173 }
  174 
  175 static void
  176 ald_deactivate(struct alq *alq)
  177 {
  178         LIST_REMOVE(alq, aq_act);
  179         alq->aq_flags &= ~AQ_ACTIVE;
  180 }
  181 
  182 static void
  183 ald_startup(void *unused)
  184 {
  185         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  186         LIST_INIT(&ald_queues);
  187         LIST_INIT(&ald_active);
  188 }
  189 
  190 static void
  191 ald_daemon(void)
  192 {
  193         int needwakeup;
  194         struct alq *alq;
  195 
  196         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  197 
  198         alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
  199             ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
  200 
  201         ALD_LOCK();
  202 
  203         for (;;) {
  204                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  205                     !ald_shutingdown)
  206                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  207 
  208                 /* Don't shutdown until all active ALQs are flushed. */
  209                 if (ald_shutingdown && alq == NULL) {
  210                         ALD_UNLOCK();
  211                         break;
  212                 }
  213 
  214                 ALQ_LOCK(alq);
  215                 ald_deactivate(alq);
  216                 ALD_UNLOCK();
  217                 needwakeup = alq_doio(alq);
  218                 ALQ_UNLOCK(alq);
  219                 if (needwakeup)
  220                         wakeup_one(alq);
  221                 ALD_LOCK();
  222         }
  223 
  224         kproc_exit(0);
  225 }
  226 
  227 static void
  228 ald_shutdown(void *arg, int howto)
  229 {
  230         struct alq *alq;
  231 
  232         ALD_LOCK();
  233 
  234         /* Ensure no new queues can be created. */
  235         ald_shutingdown = 1;
  236 
  237         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  238         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  239                 LIST_REMOVE(alq, aq_link);
  240                 ALD_UNLOCK();
  241                 alq_shutdown(alq);
  242                 ALD_LOCK();
  243         }
  244 
  245         /* At this point, all ALQs are flushed and shutdown. */
  246 
  247         /*
  248          * Wake ald_daemon so that it exits. It won't be able to do
  249          * anything until we mtx_sleep because we hold the ald_mtx.
  250          */
  251         wakeup(&ald_active);
  252 
  253         /* Wait for ald_daemon to exit. */
  254         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  255 
  256         ALD_UNLOCK();
  257 }
  258 
  259 static void
  260 alq_shutdown(struct alq *alq)
  261 {
  262         ALQ_LOCK(alq);
  263 
  264         /* Stop any new writers. */
  265         alq->aq_flags |= AQ_SHUTDOWN;
  266 
  267         /*
  268          * If the ALQ isn't active but has unwritten data (possible if
  269          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  270          * ALQ here so that the pending data gets flushed by the ald_daemon.
  271          */
  272         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  273                 alq->aq_flags |= AQ_ACTIVE;
  274                 ALQ_UNLOCK(alq);
  275                 ALD_LOCK();
  276                 ald_activate(alq);
  277                 ALD_UNLOCK();
  278                 ALQ_LOCK(alq);
  279         }
  280 
  281         /* Drain IO */
  282         while (alq->aq_flags & AQ_ACTIVE) {
  283                 alq->aq_flags |= AQ_WANTED;
  284                 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
  285         }
  286         ALQ_UNLOCK(alq);
  287 
  288         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  289             curthread);
  290         crfree(alq->aq_cred);
  291 }
  292 
  293 void
  294 alq_destroy(struct alq *alq)
  295 {
  296         /* Drain all pending IO. */
  297         alq_shutdown(alq);
  298 
  299         mtx_destroy(&alq->aq_mtx);
  300         free(alq->aq_entbuf, M_ALD);
  301         free(alq, M_ALD);
  302 }
  303 
  304 /*
  305  * Flush all pending data to disk.  This operation will block.
  306  */
  307 static int
  308 alq_doio(struct alq *alq)
  309 {
  310         struct thread *td;
  311         struct mount *mp;
  312         struct vnode *vp;
  313         struct uio auio;
  314         struct iovec aiov[2];
  315         int totlen;
  316         int iov;
  317         int wrapearly;
  318 
  319         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  320 
  321         vp = alq->aq_vp;
  322         td = curthread;
  323         totlen = 0;
  324         iov = 1;
  325         wrapearly = alq->aq_wrapearly;
  326 
  327         bzero(&aiov, sizeof(aiov));
  328         bzero(&auio, sizeof(auio));
  329 
  330         /* Start the write from the location of our buffer tail pointer. */
  331         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  332 
  333         if (alq->aq_writetail < alq->aq_writehead) {
  334                 /* Buffer not wrapped. */
  335                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  336         } else if (alq->aq_writehead == 0) {
  337                 /* Buffer not wrapped (special case to avoid an empty iov). */
  338                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  339                     wrapearly;
  340         } else {
  341                 /*
  342                  * Buffer wrapped, requires 2 aiov entries:
  343                  * - first is from writetail to end of buffer
  344                  * - second is from start of buffer to writehead
  345                  */
  346                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  347                     wrapearly;
  348                 iov++;
  349                 aiov[1].iov_base = alq->aq_entbuf;
  350                 aiov[1].iov_len =  alq->aq_writehead;
  351                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  352         }
  353 
  354         alq->aq_flags |= AQ_FLUSHING;
  355         ALQ_UNLOCK(alq);
  356 
  357         auio.uio_iov = &aiov[0];
  358         auio.uio_offset = 0;
  359         auio.uio_segflg = UIO_SYSSPACE;
  360         auio.uio_rw = UIO_WRITE;
  361         auio.uio_iovcnt = iov;
  362         auio.uio_resid = totlen;
  363         auio.uio_td = td;
  364 
  365         /*
  366          * Do all of the junk required to write now.
  367          */
  368         vn_start_write(vp, &mp, V_WAIT);
  369         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
  370         /*
  371          * XXX: VOP_WRITE error checks are ignored.
  372          */
  373 #ifdef MAC
  374         if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
  375 #endif
  376                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  377         VOP_UNLOCK(vp, 0);
  378         vn_finished_write(mp);
  379 
  380         ALQ_LOCK(alq);
  381         alq->aq_flags &= ~AQ_FLUSHING;
  382 
  383         /* Adjust writetail as required, taking into account wrapping. */
  384         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  385             alq->aq_buflen;
  386         alq->aq_freebytes += totlen + wrapearly;
  387 
  388         /*
  389          * If we just flushed part of the buffer which wrapped, reset the
  390          * wrapearly indicator.
  391          */
  392         if (wrapearly)
  393                 alq->aq_wrapearly = 0;
  394 
  395         /*
  396          * If we just flushed the buffer completely, reset indexes to 0 to
  397          * minimise buffer wraps.
  398          * This is also required to ensure alq_getn() can't wedge itself.
  399          */
  400         if (!HAS_PENDING_DATA(alq))
  401                 alq->aq_writehead = alq->aq_writetail = 0;
  402 
  403         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  404             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  405 
  406         if (alq->aq_flags & AQ_WANTED) {
  407                 alq->aq_flags &= ~AQ_WANTED;
  408                 return (1);
  409         }
  410 
  411         return(0);
  412 }
  413 
  414 static struct kproc_desc ald_kp = {
  415         "ALQ Daemon",
  416         ald_daemon,
  417         &ald_proc
  418 };
  419 
  420 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  421 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  422 
  423 
  424 /* User visible queue functions */
  425 
  426 /*
  427  * Create the queue data structure, allocate the buffer, and open the file.
  428  */
  429 
  430 int
  431 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  432     int size, int flags)
  433 {
  434         struct thread *td;
  435         struct nameidata nd;
  436         struct alq *alq;
  437         int oflags;
  438         int error;
  439 
  440         KASSERT((size > 0), ("%s: size <= 0", __func__));
  441 
  442         *alqp = NULL;
  443         td = curthread;
  444 
  445         NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file, td);
  446         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  447 
  448         error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
  449         if (error)
  450                 return (error);
  451 
  452         NDFREE(&nd, NDF_ONLY_PNBUF);
  453         /* We just unlock so we hold a reference */
  454         VOP_UNLOCK(nd.ni_vp, 0);
  455 
  456         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  457         alq->aq_vp = nd.ni_vp;
  458         alq->aq_cred = crhold(cred);
  459 
  460         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  461 
  462         alq->aq_buflen = size;
  463         alq->aq_entmax = 0;
  464         alq->aq_entlen = 0;
  465 
  466         alq->aq_freebytes = alq->aq_buflen;
  467         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  468         alq->aq_writehead = alq->aq_writetail = 0;
  469         if (flags & ALQ_ORDERED)
  470                 alq->aq_flags |= AQ_ORDERED;
  471 
  472         if ((error = ald_add(alq)) != 0) {
  473                 alq_destroy(alq);
  474                 return (error);
  475         }
  476 
  477         *alqp = alq;
  478 
  479         return (0);
  480 }
  481 
  482 int
  483 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  484     int size, int count)
  485 {
  486         int ret;
  487 
  488         KASSERT((count >= 0), ("%s: count < 0", __func__));
  489 
  490         if (count > 0) {
  491                 if ((ret = alq_open_flags(alqp, file, cred, cmode,
  492                     size*count, 0)) == 0) {
  493                         (*alqp)->aq_flags |= AQ_LEGACY;
  494                         (*alqp)->aq_entmax = count;
  495                         (*alqp)->aq_entlen = size;
  496                 }
  497         } else
  498                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  499 
  500         return (ret);
  501 }
  502 
  503 
  504 /*
  505  * Copy a new entry into the queue.  If the operation would block either
  506  * wait or return an error depending on the value of waitok.
  507  */
  508 int
  509 alq_writen(struct alq *alq, void *data, int len, int flags)
  510 {
  511         int activate, copy, ret;
  512         void *waitchan;
  513 
  514         KASSERT((len > 0 && len <= alq->aq_buflen),
  515             ("%s: len <= 0 || len > aq_buflen", __func__));
  516 
  517         activate = ret = 0;
  518         copy = len;
  519         waitchan = NULL;
  520 
  521         ALQ_LOCK(alq);
  522 
  523         /*
  524          * Fail to perform the write and return EWOULDBLOCK if:
  525          * - The message is larger than our underlying buffer.
  526          * - The ALQ is being shutdown.
  527          * - There is insufficient free space in our underlying buffer
  528          *   to accept the message and the user can't wait for space.
  529          * - There is insufficient free space in our underlying buffer
  530          *   to accept the message and the alq is inactive due to prior
  531          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  532          */
  533         if (len > alq->aq_buflen ||
  534             alq->aq_flags & AQ_SHUTDOWN ||
  535             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  536             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  537                 ALQ_UNLOCK(alq);
  538                 return (EWOULDBLOCK);
  539         }
  540 
  541         /*
  542          * If we want ordered writes and there is already at least one thread
  543          * waiting for resources to become available, sleep until we're woken.
  544          */
  545         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  546                 KASSERT(!(flags & ALQ_NOWAIT),
  547                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  548                 alq->aq_waiters++;
  549                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  550                 alq->aq_waiters--;
  551         }
  552 
  553         /*
  554          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  555          * enter while loop and sleep until we have enough free bytes (former)
  556          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  557          * be in this loop. Otherwise, multiple threads may be sleeping here
  558          * competing for ALQ resources.
  559          */
  560         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  561                 KASSERT(!(flags & ALQ_NOWAIT),
  562                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  563                 alq->aq_flags |= AQ_WANTED;
  564                 alq->aq_waiters++;
  565                 if (waitchan)
  566                         wakeup(waitchan);
  567                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  568                 alq->aq_waiters--;
  569 
  570                 /*
  571                  * If we're the first thread to wake after an AQ_WANTED wakeup
  572                  * but there isn't enough free space for us, we're going to loop
  573                  * and sleep again. If there are other threads waiting in this
  574                  * loop, schedule a wakeup so that they can see if the space
  575                  * they require is available.
  576                  */
  577                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  578                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  579                         waitchan = alq;
  580                 else
  581                         waitchan = NULL;
  582         }
  583 
  584         /*
  585          * If there are waiters, we need to signal the waiting threads after we
  586          * complete our work. The alq ptr is used as a wait channel for threads
  587          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  588          * are not allowed to concurrently compete for resources in the above
  589          * while loop, so we use a different wait channel in this case.
  590          */
  591         if (alq->aq_waiters > 0) {
  592                 if (alq->aq_flags & AQ_ORDERED)
  593                         waitchan = &alq->aq_waiters;
  594                 else
  595                         waitchan = alq;
  596         } else
  597                 waitchan = NULL;
  598 
  599         /* Bail if we're shutting down. */
  600         if (alq->aq_flags & AQ_SHUTDOWN) {
  601                 ret = EWOULDBLOCK;
  602                 goto unlock;
  603         }
  604 
  605         /*
  606          * If we need to wrap the buffer to accommodate the write,
  607          * we'll need 2 calls to bcopy.
  608          */
  609         if ((alq->aq_buflen - alq->aq_writehead) < len)
  610                 copy = alq->aq_buflen - alq->aq_writehead;
  611 
  612         /* Copy message (or part thereof if wrap required) to the buffer. */
  613         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  614         alq->aq_writehead += copy;
  615 
  616         if (alq->aq_writehead >= alq->aq_buflen) {
  617                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  618                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  619                     __func__,
  620                     alq->aq_writehead,
  621                     alq->aq_buflen));
  622                 alq->aq_writehead = 0;
  623         }
  624 
  625         if (copy != len) {
  626                 /*
  627                  * Wrap the buffer by copying the remainder of our message
  628                  * to the start of the buffer and resetting aq_writehead.
  629                  */
  630                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  631                 alq->aq_writehead = len - copy;
  632         }
  633 
  634         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  635             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  636 
  637         alq->aq_freebytes -= len;
  638 
  639         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  640                 alq->aq_flags |= AQ_ACTIVE;
  641                 activate = 1;
  642         }
  643 
  644         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  645 
  646 unlock:
  647         ALQ_UNLOCK(alq);
  648 
  649         if (activate) {
  650                 ALD_LOCK();
  651                 ald_activate(alq);
  652                 ALD_UNLOCK();
  653         }
  654 
  655         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  656         if (waitchan != NULL)
  657                 wakeup_one(waitchan);
  658 
  659         return (ret);
  660 }
  661 
  662 int
  663 alq_write(struct alq *alq, void *data, int flags)
  664 {
  665         /* Should only be called in fixed length message (legacy) mode. */
  666         KASSERT((alq->aq_flags & AQ_LEGACY),
  667             ("%s: fixed length write on variable length queue", __func__));
  668         return (alq_writen(alq, data, alq->aq_entlen, flags));
  669 }
  670 
  671 /*
  672  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  673  */
  674 struct ale *
  675 alq_getn(struct alq *alq, int len, int flags)
  676 {
  677         int contigbytes;
  678         void *waitchan;
  679 
  680         KASSERT((len > 0 && len <= alq->aq_buflen),
  681             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  682 
  683         waitchan = NULL;
  684 
  685         ALQ_LOCK(alq);
  686 
  687         /*
  688          * Determine the number of free contiguous bytes.
  689          * We ensure elsewhere that if aq_writehead == aq_writetail because
  690          * the buffer is empty, they will both be set to 0 and therefore
  691          * aq_freebytes == aq_buflen and is fully contiguous.
  692          * If they are equal and the buffer is not empty, aq_freebytes will
  693          * be 0 indicating the buffer is full.
  694          */
  695         if (alq->aq_writehead <= alq->aq_writetail)
  696                 contigbytes = alq->aq_freebytes;
  697         else {
  698                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  699 
  700                 if (contigbytes < len) {
  701                         /*
  702                          * Insufficient space at end of buffer to handle a
  703                          * contiguous write. Wrap early if there's space at
  704                          * the beginning. This will leave a hole at the end
  705                          * of the buffer which we will have to skip over when
  706                          * flushing the buffer to disk.
  707                          */
  708                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  709                                 /* Keep track of # bytes left blank. */
  710                                 alq->aq_wrapearly = contigbytes;
  711                                 /* Do the wrap and adjust counters. */
  712                                 contigbytes = alq->aq_freebytes =
  713                                     alq->aq_writetail;
  714                                 alq->aq_writehead = 0;
  715                         }
  716                 }
  717         }
  718 
  719         /*
  720          * Return a NULL ALE if:
  721          * - The message is larger than our underlying buffer.
  722          * - The ALQ is being shutdown.
  723          * - There is insufficient free space in our underlying buffer
  724          *   to accept the message and the user can't wait for space.
  725          * - There is insufficient free space in our underlying buffer
  726          *   to accept the message and the alq is inactive due to prior
  727          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  728          */
  729         if (len > alq->aq_buflen ||
  730             alq->aq_flags & AQ_SHUTDOWN ||
  731             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  732             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  733                 ALQ_UNLOCK(alq);
  734                 return (NULL);
  735         }
  736 
  737         /*
  738          * If we want ordered writes and there is already at least one thread
  739          * waiting for resources to become available, sleep until we're woken.
  740          */
  741         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  742                 KASSERT(!(flags & ALQ_NOWAIT),
  743                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  744                 alq->aq_waiters++;
  745                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  746                 alq->aq_waiters--;
  747         }
  748 
  749         /*
  750          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  751          * while loop and sleep until we have enough contiguous free bytes
  752          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  753          * time will be in this loop. Otherwise, multiple threads may be
  754          * sleeping here competing for ALQ resources.
  755          */
  756         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  757                 KASSERT(!(flags & ALQ_NOWAIT),
  758                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  759                 alq->aq_flags |= AQ_WANTED;
  760                 alq->aq_waiters++;
  761                 if (waitchan)
  762                         wakeup(waitchan);
  763                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  764                 alq->aq_waiters--;
  765 
  766                 if (alq->aq_writehead <= alq->aq_writetail)
  767                         contigbytes = alq->aq_freebytes;
  768                 else
  769                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  770 
  771                 /*
  772                  * If we're the first thread to wake after an AQ_WANTED wakeup
  773                  * but there isn't enough free space for us, we're going to loop
  774                  * and sleep again. If there are other threads waiting in this
  775                  * loop, schedule a wakeup so that they can see if the space
  776                  * they require is available.
  777                  */
  778                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  779                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  780                         waitchan = alq;
  781                 else
  782                         waitchan = NULL;
  783         }
  784 
  785         /*
  786          * If there are waiters, we need to signal the waiting threads after we
  787          * complete our work. The alq ptr is used as a wait channel for threads
  788          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  789          * are not allowed to concurrently compete for resources in the above
  790          * while loop, so we use a different wait channel in this case.
  791          */
  792         if (alq->aq_waiters > 0) {
  793                 if (alq->aq_flags & AQ_ORDERED)
  794                         waitchan = &alq->aq_waiters;
  795                 else
  796                         waitchan = alq;
  797         } else
  798                 waitchan = NULL;
  799 
  800         /* Bail if we're shutting down. */
  801         if (alq->aq_flags & AQ_SHUTDOWN) {
  802                 ALQ_UNLOCK(alq);
  803                 if (waitchan != NULL)
  804                         wakeup_one(waitchan);
  805                 return (NULL);
  806         }
  807 
  808         /*
  809          * If we are here, we have a contiguous number of bytes >= len
  810          * available in our buffer starting at aq_writehead.
  811          */
  812         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  813         alq->aq_getpost.ae_bytesused = len;
  814 
  815         return (&alq->aq_getpost);
  816 }
  817 
  818 struct ale *
  819 alq_get(struct alq *alq, int flags)
  820 {
  821         /* Should only be called in fixed length message (legacy) mode. */
  822         KASSERT((alq->aq_flags & AQ_LEGACY),
  823             ("%s: fixed length get on variable length queue", __func__));
  824         return (alq_getn(alq, alq->aq_entlen, flags));
  825 }
  826 
  827 void
  828 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  829 {
  830         int activate;
  831         void *waitchan;
  832 
  833         activate = 0;
  834 
  835         if (ale->ae_bytesused > 0) {
  836                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  837                     !(flags & ALQ_NOACTIVATE)) {
  838                         alq->aq_flags |= AQ_ACTIVE;
  839                         activate = 1;
  840                 }
  841 
  842                 alq->aq_writehead += ale->ae_bytesused;
  843                 alq->aq_freebytes -= ale->ae_bytesused;
  844 
  845                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  846                 if (alq->aq_writehead == alq->aq_buflen)
  847                         alq->aq_writehead = 0;
  848 
  849                 KASSERT((alq->aq_writehead >= 0 &&
  850                     alq->aq_writehead < alq->aq_buflen),
  851                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  852                     __func__));
  853 
  854                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  855         }
  856 
  857         /*
  858          * If there are waiters, we need to signal the waiting threads after we
  859          * complete our work. The alq ptr is used as a wait channel for threads
  860          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  861          * are not allowed to concurrently compete for resources in the
  862          * alq_getn() while loop, so we use a different wait channel in this case.
  863          */
  864         if (alq->aq_waiters > 0) {
  865                 if (alq->aq_flags & AQ_ORDERED)
  866                         waitchan = &alq->aq_waiters;
  867                 else
  868                         waitchan = alq;
  869         } else
  870                 waitchan = NULL;
  871 
  872         ALQ_UNLOCK(alq);
  873 
  874         if (activate) {
  875                 ALD_LOCK();
  876                 ald_activate(alq);
  877                 ALD_UNLOCK();
  878         }
  879 
  880         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  881         if (waitchan != NULL)
  882                 wakeup_one(waitchan);
  883 }
  884 
  885 void
  886 alq_flush(struct alq *alq)
  887 {
  888         int needwakeup = 0;
  889 
  890         ALD_LOCK();
  891         ALQ_LOCK(alq);
  892 
  893         /*
  894          * Pull the lever iff there is data to flush and we're
  895          * not already in the middle of a flush operation.
  896          */
  897         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  898                 if (alq->aq_flags & AQ_ACTIVE)
  899                         ald_deactivate(alq);
  900 
  901                 ALD_UNLOCK();
  902                 needwakeup = alq_doio(alq);
  903         } else
  904                 ALD_UNLOCK();
  905 
  906         ALQ_UNLOCK(alq);
  907 
  908         if (needwakeup)
  909                 wakeup_one(alq);
  910 }
  911 
  912 /*
  913  * Flush remaining data, close the file and free all resources.
  914  */
  915 void
  916 alq_close(struct alq *alq)
  917 {
  918         /* Only flush and destroy alq if not already shutting down. */
  919         if (ald_rem(alq) == 0)
  920                 alq_destroy(alq);
  921 }
  922 
  923 static int
  924 alq_load_handler(module_t mod, int what, void *arg)
  925 {
  926         int ret;
  927         
  928         ret = 0;
  929 
  930         switch (what) {
  931         case MOD_LOAD:
  932         case MOD_SHUTDOWN:
  933                 break;
  934 
  935         case MOD_QUIESCE:
  936                 ALD_LOCK();
  937                 /* Only allow unload if there are no open queues. */
  938                 if (LIST_FIRST(&ald_queues) == NULL) {
  939                         ald_shutingdown = 1;
  940                         ALD_UNLOCK();
  941                         EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
  942                             alq_eventhandler_tag);
  943                         ald_shutdown(NULL, 0);
  944                         mtx_destroy(&ald_mtx);
  945                 } else {
  946                         ALD_UNLOCK();
  947                         ret = EBUSY;
  948                 }
  949                 break;
  950 
  951         case MOD_UNLOAD:
  952                 /* If MOD_QUIESCE failed we must fail here too. */
  953                 if (ald_shutingdown == 0)
  954                         ret = EBUSY;
  955                 break;
  956 
  957         default:
  958                 ret = EINVAL;
  959                 break;
  960         }
  961 
  962         return (ret);
  963 }
  964 
  965 static moduledata_t alq_mod =
  966 {
  967         "alq",
  968         alq_load_handler,
  969         NULL
  970 };
  971 
  972 DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY);
  973 MODULE_VERSION(alq, 1);

Cache object: 91be31cb291b0eb1fe8e9892ba89f297


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.