The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD: releng/9.2/sys/kern/kern_alq.c 252323 2013-06-28 02:38:33Z lstewart $");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 static eventhandler_tag alq_eventhandler_tag = NULL;
  103 
  104 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  105 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  106 
  107 /* Daemon functions */
  108 static int ald_add(struct alq *);
  109 static int ald_rem(struct alq *);
  110 static void ald_startup(void *);
  111 static void ald_daemon(void);
  112 static void ald_shutdown(void *, int);
  113 static void ald_activate(struct alq *);
  114 static void ald_deactivate(struct alq *);
  115 
  116 /* Internal queue functions */
  117 static void alq_shutdown(struct alq *);
  118 static void alq_destroy(struct alq *);
  119 static int alq_doio(struct alq *);
  120 
  121 
  122 /*
  123  * Add a new queue to the global list.  Fail if we're shutting down.
  124  */
  125 static int
  126 ald_add(struct alq *alq)
  127 {
  128         int error;
  129 
  130         error = 0;
  131 
  132         ALD_LOCK();
  133         if (ald_shutingdown) {
  134                 error = EBUSY;
  135                 goto done;
  136         }
  137         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  138 done:
  139         ALD_UNLOCK();
  140         return (error);
  141 }
  142 
  143 /*
  144  * Remove a queue from the global list unless we're shutting down.  If so,
  145  * the ald will take care of cleaning up it's resources.
  146  */
  147 static int
  148 ald_rem(struct alq *alq)
  149 {
  150         int error;
  151 
  152         error = 0;
  153 
  154         ALD_LOCK();
  155         if (ald_shutingdown) {
  156                 error = EBUSY;
  157                 goto done;
  158         }
  159         LIST_REMOVE(alq, aq_link);
  160 done:
  161         ALD_UNLOCK();
  162         return (error);
  163 }
  164 
  165 /*
  166  * Put a queue on the active list.  This will schedule it for writing.
  167  */
  168 static void
  169 ald_activate(struct alq *alq)
  170 {
  171         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  172         wakeup(&ald_active);
  173 }
  174 
  175 static void
  176 ald_deactivate(struct alq *alq)
  177 {
  178         LIST_REMOVE(alq, aq_act);
  179         alq->aq_flags &= ~AQ_ACTIVE;
  180 }
  181 
  182 static void
  183 ald_startup(void *unused)
  184 {
  185         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  186         LIST_INIT(&ald_queues);
  187         LIST_INIT(&ald_active);
  188 }
  189 
  190 static void
  191 ald_daemon(void)
  192 {
  193         int needwakeup;
  194         struct alq *alq;
  195 
  196         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  197 
  198         alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
  199             ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
  200 
  201         ALD_LOCK();
  202 
  203         for (;;) {
  204                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  205                     !ald_shutingdown)
  206                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  207 
  208                 /* Don't shutdown until all active ALQs are flushed. */
  209                 if (ald_shutingdown && alq == NULL) {
  210                         ALD_UNLOCK();
  211                         break;
  212                 }
  213 
  214                 ALQ_LOCK(alq);
  215                 ald_deactivate(alq);
  216                 ALD_UNLOCK();
  217                 needwakeup = alq_doio(alq);
  218                 ALQ_UNLOCK(alq);
  219                 if (needwakeup)
  220                         wakeup_one(alq);
  221                 ALD_LOCK();
  222         }
  223 
  224         kproc_exit(0);
  225 }
  226 
  227 static void
  228 ald_shutdown(void *arg, int howto)
  229 {
  230         struct alq *alq;
  231 
  232         ALD_LOCK();
  233 
  234         /* Ensure no new queues can be created. */
  235         ald_shutingdown = 1;
  236 
  237         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  238         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  239                 LIST_REMOVE(alq, aq_link);
  240                 ALD_UNLOCK();
  241                 alq_shutdown(alq);
  242                 ALD_LOCK();
  243         }
  244 
  245         /* At this point, all ALQs are flushed and shutdown. */
  246 
  247         /*
  248          * Wake ald_daemon so that it exits. It won't be able to do
  249          * anything until we mtx_sleep because we hold the ald_mtx.
  250          */
  251         wakeup(&ald_active);
  252 
  253         /* Wait for ald_daemon to exit. */
  254         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  255 
  256         ALD_UNLOCK();
  257 }
  258 
  259 static void
  260 alq_shutdown(struct alq *alq)
  261 {
  262         ALQ_LOCK(alq);
  263 
  264         /* Stop any new writers. */
  265         alq->aq_flags |= AQ_SHUTDOWN;
  266 
  267         /*
  268          * If the ALQ isn't active but has unwritten data (possible if
  269          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  270          * ALQ here so that the pending data gets flushed by the ald_daemon.
  271          */
  272         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  273                 alq->aq_flags |= AQ_ACTIVE;
  274                 ALQ_UNLOCK(alq);
  275                 ALD_LOCK();
  276                 ald_activate(alq);
  277                 ALD_UNLOCK();
  278                 ALQ_LOCK(alq);
  279         }
  280 
  281         /* Drain IO */
  282         while (alq->aq_flags & AQ_ACTIVE) {
  283                 alq->aq_flags |= AQ_WANTED;
  284                 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
  285         }
  286         ALQ_UNLOCK(alq);
  287 
  288         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  289             curthread);
  290         crfree(alq->aq_cred);
  291 }
  292 
  293 void
  294 alq_destroy(struct alq *alq)
  295 {
  296         /* Drain all pending IO. */
  297         alq_shutdown(alq);
  298 
  299         mtx_destroy(&alq->aq_mtx);
  300         free(alq->aq_entbuf, M_ALD);
  301         free(alq, M_ALD);
  302 }
  303 
  304 /*
  305  * Flush all pending data to disk.  This operation will block.
  306  */
  307 static int
  308 alq_doio(struct alq *alq)
  309 {
  310         struct thread *td;
  311         struct mount *mp;
  312         struct vnode *vp;
  313         struct uio auio;
  314         struct iovec aiov[2];
  315         int totlen;
  316         int iov;
  317         int vfslocked;
  318         int wrapearly;
  319 
  320         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  321 
  322         vp = alq->aq_vp;
  323         td = curthread;
  324         totlen = 0;
  325         iov = 1;
  326         wrapearly = alq->aq_wrapearly;
  327 
  328         bzero(&aiov, sizeof(aiov));
  329         bzero(&auio, sizeof(auio));
  330 
  331         /* Start the write from the location of our buffer tail pointer. */
  332         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  333 
  334         if (alq->aq_writetail < alq->aq_writehead) {
  335                 /* Buffer not wrapped. */
  336                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  337         } else if (alq->aq_writehead == 0) {
  338                 /* Buffer not wrapped (special case to avoid an empty iov). */
  339                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  340                     wrapearly;
  341         } else {
  342                 /*
  343                  * Buffer wrapped, requires 2 aiov entries:
  344                  * - first is from writetail to end of buffer
  345                  * - second is from start of buffer to writehead
  346                  */
  347                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  348                     wrapearly;
  349                 iov++;
  350                 aiov[1].iov_base = alq->aq_entbuf;
  351                 aiov[1].iov_len =  alq->aq_writehead;
  352                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  353         }
  354 
  355         alq->aq_flags |= AQ_FLUSHING;
  356         ALQ_UNLOCK(alq);
  357 
  358         auio.uio_iov = &aiov[0];
  359         auio.uio_offset = 0;
  360         auio.uio_segflg = UIO_SYSSPACE;
  361         auio.uio_rw = UIO_WRITE;
  362         auio.uio_iovcnt = iov;
  363         auio.uio_resid = totlen;
  364         auio.uio_td = td;
  365 
  366         /*
  367          * Do all of the junk required to write now.
  368          */
  369         vfslocked = VFS_LOCK_GIANT(vp->v_mount);
  370         vn_start_write(vp, &mp, V_WAIT);
  371         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
  372         /*
  373          * XXX: VOP_WRITE error checks are ignored.
  374          */
  375 #ifdef MAC
  376         if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
  377 #endif
  378                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  379         VOP_UNLOCK(vp, 0);
  380         vn_finished_write(mp);
  381         VFS_UNLOCK_GIANT(vfslocked);
  382 
  383         ALQ_LOCK(alq);
  384         alq->aq_flags &= ~AQ_FLUSHING;
  385 
  386         /* Adjust writetail as required, taking into account wrapping. */
  387         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  388             alq->aq_buflen;
  389         alq->aq_freebytes += totlen + wrapearly;
  390 
  391         /*
  392          * If we just flushed part of the buffer which wrapped, reset the
  393          * wrapearly indicator.
  394          */
  395         if (wrapearly)
  396                 alq->aq_wrapearly = 0;
  397 
  398         /*
  399          * If we just flushed the buffer completely, reset indexes to 0 to
  400          * minimise buffer wraps.
  401          * This is also required to ensure alq_getn() can't wedge itself.
  402          */
  403         if (!HAS_PENDING_DATA(alq))
  404                 alq->aq_writehead = alq->aq_writetail = 0;
  405 
  406         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  407             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  408 
  409         if (alq->aq_flags & AQ_WANTED) {
  410                 alq->aq_flags &= ~AQ_WANTED;
  411                 return (1);
  412         }
  413 
  414         return(0);
  415 }
  416 
  417 static struct kproc_desc ald_kp = {
  418         "ALQ Daemon",
  419         ald_daemon,
  420         &ald_proc
  421 };
  422 
  423 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  424 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  425 
  426 
  427 /* User visible queue functions */
  428 
  429 /*
  430  * Create the queue data structure, allocate the buffer, and open the file.
  431  */
  432 
  433 int
  434 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  435     int size, int flags)
  436 {
  437         struct thread *td;
  438         struct nameidata nd;
  439         struct alq *alq;
  440         int oflags;
  441         int error;
  442         int vfslocked;
  443 
  444         KASSERT((size > 0), ("%s: size <= 0", __func__));
  445 
  446         *alqp = NULL;
  447         td = curthread;
  448 
  449         NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
  450         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  451 
  452         error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
  453         if (error)
  454                 return (error);
  455 
  456         vfslocked = NDHASGIANT(&nd);
  457         NDFREE(&nd, NDF_ONLY_PNBUF);
  458         /* We just unlock so we hold a reference */
  459         VOP_UNLOCK(nd.ni_vp, 0);
  460         VFS_UNLOCK_GIANT(vfslocked);
  461 
  462         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  463         alq->aq_vp = nd.ni_vp;
  464         alq->aq_cred = crhold(cred);
  465 
  466         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  467 
  468         alq->aq_buflen = size;
  469         alq->aq_entmax = 0;
  470         alq->aq_entlen = 0;
  471 
  472         alq->aq_freebytes = alq->aq_buflen;
  473         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  474         alq->aq_writehead = alq->aq_writetail = 0;
  475         if (flags & ALQ_ORDERED)
  476                 alq->aq_flags |= AQ_ORDERED;
  477 
  478         if ((error = ald_add(alq)) != 0) {
  479                 alq_destroy(alq);
  480                 return (error);
  481         }
  482 
  483         *alqp = alq;
  484 
  485         return (0);
  486 }
  487 
  488 int
  489 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  490     int size, int count)
  491 {
  492         int ret;
  493 
  494         KASSERT((count >= 0), ("%s: count < 0", __func__));
  495 
  496         if (count > 0) {
  497                 ret = alq_open_flags(alqp, file, cred, cmode, size*count, 0);
  498                 (*alqp)->aq_flags |= AQ_LEGACY;
  499                 (*alqp)->aq_entmax = count;
  500                 (*alqp)->aq_entlen = size;
  501         } else
  502                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  503 
  504         return (ret);
  505 }
  506 
  507 
  508 /*
  509  * Copy a new entry into the queue.  If the operation would block either
  510  * wait or return an error depending on the value of waitok.
  511  */
  512 int
  513 alq_writen(struct alq *alq, void *data, int len, int flags)
  514 {
  515         int activate, copy, ret;
  516         void *waitchan;
  517 
  518         KASSERT((len > 0 && len <= alq->aq_buflen),
  519             ("%s: len <= 0 || len > aq_buflen", __func__));
  520 
  521         activate = ret = 0;
  522         copy = len;
  523         waitchan = NULL;
  524 
  525         ALQ_LOCK(alq);
  526 
  527         /*
  528          * Fail to perform the write and return EWOULDBLOCK if:
  529          * - The message is larger than our underlying buffer.
  530          * - The ALQ is being shutdown.
  531          * - There is insufficient free space in our underlying buffer
  532          *   to accept the message and the user can't wait for space.
  533          * - There is insufficient free space in our underlying buffer
  534          *   to accept the message and the alq is inactive due to prior
  535          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  536          */
  537         if (len > alq->aq_buflen ||
  538             alq->aq_flags & AQ_SHUTDOWN ||
  539             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  540             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  541                 ALQ_UNLOCK(alq);
  542                 return (EWOULDBLOCK);
  543         }
  544 
  545         /*
  546          * If we want ordered writes and there is already at least one thread
  547          * waiting for resources to become available, sleep until we're woken.
  548          */
  549         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  550                 KASSERT(!(flags & ALQ_NOWAIT),
  551                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  552                 alq->aq_waiters++;
  553                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  554                 alq->aq_waiters--;
  555         }
  556 
  557         /*
  558          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  559          * enter while loop and sleep until we have enough free bytes (former)
  560          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  561          * be in this loop. Otherwise, multiple threads may be sleeping here
  562          * competing for ALQ resources.
  563          */
  564         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  565                 KASSERT(!(flags & ALQ_NOWAIT),
  566                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  567                 alq->aq_flags |= AQ_WANTED;
  568                 alq->aq_waiters++;
  569                 if (waitchan)
  570                         wakeup(waitchan);
  571                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  572                 alq->aq_waiters--;
  573 
  574                 /*
  575                  * If we're the first thread to wake after an AQ_WANTED wakeup
  576                  * but there isn't enough free space for us, we're going to loop
  577                  * and sleep again. If there are other threads waiting in this
  578                  * loop, schedule a wakeup so that they can see if the space
  579                  * they require is available.
  580                  */
  581                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  582                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  583                         waitchan = alq;
  584                 else
  585                         waitchan = NULL;
  586         }
  587 
  588         /*
  589          * If there are waiters, we need to signal the waiting threads after we
  590          * complete our work. The alq ptr is used as a wait channel for threads
  591          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  592          * are not allowed to concurrently compete for resources in the above
  593          * while loop, so we use a different wait channel in this case.
  594          */
  595         if (alq->aq_waiters > 0) {
  596                 if (alq->aq_flags & AQ_ORDERED)
  597                         waitchan = &alq->aq_waiters;
  598                 else
  599                         waitchan = alq;
  600         } else
  601                 waitchan = NULL;
  602 
  603         /* Bail if we're shutting down. */
  604         if (alq->aq_flags & AQ_SHUTDOWN) {
  605                 ret = EWOULDBLOCK;
  606                 goto unlock;
  607         }
  608 
  609         /*
  610          * If we need to wrap the buffer to accommodate the write,
  611          * we'll need 2 calls to bcopy.
  612          */
  613         if ((alq->aq_buflen - alq->aq_writehead) < len)
  614                 copy = alq->aq_buflen - alq->aq_writehead;
  615 
  616         /* Copy message (or part thereof if wrap required) to the buffer. */
  617         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  618         alq->aq_writehead += copy;
  619 
  620         if (alq->aq_writehead >= alq->aq_buflen) {
  621                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  622                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  623                     __func__,
  624                     alq->aq_writehead,
  625                     alq->aq_buflen));
  626                 alq->aq_writehead = 0;
  627         }
  628 
  629         if (copy != len) {
  630                 /*
  631                  * Wrap the buffer by copying the remainder of our message
  632                  * to the start of the buffer and resetting aq_writehead.
  633                  */
  634                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  635                 alq->aq_writehead = len - copy;
  636         }
  637 
  638         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  639             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  640 
  641         alq->aq_freebytes -= len;
  642 
  643         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  644                 alq->aq_flags |= AQ_ACTIVE;
  645                 activate = 1;
  646         }
  647 
  648         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  649 
  650 unlock:
  651         ALQ_UNLOCK(alq);
  652 
  653         if (activate) {
  654                 ALD_LOCK();
  655                 ald_activate(alq);
  656                 ALD_UNLOCK();
  657         }
  658 
  659         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  660         if (waitchan != NULL)
  661                 wakeup_one(waitchan);
  662 
  663         return (ret);
  664 }
  665 
  666 int
  667 alq_write(struct alq *alq, void *data, int flags)
  668 {
  669         /* Should only be called in fixed length message (legacy) mode. */
  670         KASSERT((alq->aq_flags & AQ_LEGACY),
  671             ("%s: fixed length write on variable length queue", __func__));
  672         return (alq_writen(alq, data, alq->aq_entlen, flags));
  673 }
  674 
  675 /*
  676  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  677  */
  678 struct ale *
  679 alq_getn(struct alq *alq, int len, int flags)
  680 {
  681         int contigbytes;
  682         void *waitchan;
  683 
  684         KASSERT((len > 0 && len <= alq->aq_buflen),
  685             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  686 
  687         waitchan = NULL;
  688 
  689         ALQ_LOCK(alq);
  690 
  691         /*
  692          * Determine the number of free contiguous bytes.
  693          * We ensure elsewhere that if aq_writehead == aq_writetail because
  694          * the buffer is empty, they will both be set to 0 and therefore
  695          * aq_freebytes == aq_buflen and is fully contiguous.
  696          * If they are equal and the buffer is not empty, aq_freebytes will
  697          * be 0 indicating the buffer is full.
  698          */
  699         if (alq->aq_writehead <= alq->aq_writetail)
  700                 contigbytes = alq->aq_freebytes;
  701         else {
  702                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  703 
  704                 if (contigbytes < len) {
  705                         /*
  706                          * Insufficient space at end of buffer to handle a
  707                          * contiguous write. Wrap early if there's space at
  708                          * the beginning. This will leave a hole at the end
  709                          * of the buffer which we will have to skip over when
  710                          * flushing the buffer to disk.
  711                          */
  712                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  713                                 /* Keep track of # bytes left blank. */
  714                                 alq->aq_wrapearly = contigbytes;
  715                                 /* Do the wrap and adjust counters. */
  716                                 contigbytes = alq->aq_freebytes =
  717                                     alq->aq_writetail;
  718                                 alq->aq_writehead = 0;
  719                         }
  720                 }
  721         }
  722 
  723         /*
  724          * Return a NULL ALE if:
  725          * - The message is larger than our underlying buffer.
  726          * - The ALQ is being shutdown.
  727          * - There is insufficient free space in our underlying buffer
  728          *   to accept the message and the user can't wait for space.
  729          * - There is insufficient free space in our underlying buffer
  730          *   to accept the message and the alq is inactive due to prior
  731          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  732          */
  733         if (len > alq->aq_buflen ||
  734             alq->aq_flags & AQ_SHUTDOWN ||
  735             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  736             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  737                 ALQ_UNLOCK(alq);
  738                 return (NULL);
  739         }
  740 
  741         /*
  742          * If we want ordered writes and there is already at least one thread
  743          * waiting for resources to become available, sleep until we're woken.
  744          */
  745         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  746                 KASSERT(!(flags & ALQ_NOWAIT),
  747                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  748                 alq->aq_waiters++;
  749                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  750                 alq->aq_waiters--;
  751         }
  752 
  753         /*
  754          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  755          * while loop and sleep until we have enough contiguous free bytes
  756          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  757          * time will be in this loop. Otherwise, multiple threads may be
  758          * sleeping here competing for ALQ resources.
  759          */
  760         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  761                 KASSERT(!(flags & ALQ_NOWAIT),
  762                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  763                 alq->aq_flags |= AQ_WANTED;
  764                 alq->aq_waiters++;
  765                 if (waitchan)
  766                         wakeup(waitchan);
  767                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  768                 alq->aq_waiters--;
  769 
  770                 if (alq->aq_writehead <= alq->aq_writetail)
  771                         contigbytes = alq->aq_freebytes;
  772                 else
  773                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  774 
  775                 /*
  776                  * If we're the first thread to wake after an AQ_WANTED wakeup
  777                  * but there isn't enough free space for us, we're going to loop
  778                  * and sleep again. If there are other threads waiting in this
  779                  * loop, schedule a wakeup so that they can see if the space
  780                  * they require is available.
  781                  */
  782                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  783                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  784                         waitchan = alq;
  785                 else
  786                         waitchan = NULL;
  787         }
  788 
  789         /*
  790          * If there are waiters, we need to signal the waiting threads after we
  791          * complete our work. The alq ptr is used as a wait channel for threads
  792          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  793          * are not allowed to concurrently compete for resources in the above
  794          * while loop, so we use a different wait channel in this case.
  795          */
  796         if (alq->aq_waiters > 0) {
  797                 if (alq->aq_flags & AQ_ORDERED)
  798                         waitchan = &alq->aq_waiters;
  799                 else
  800                         waitchan = alq;
  801         } else
  802                 waitchan = NULL;
  803 
  804         /* Bail if we're shutting down. */
  805         if (alq->aq_flags & AQ_SHUTDOWN) {
  806                 ALQ_UNLOCK(alq);
  807                 if (waitchan != NULL)
  808                         wakeup_one(waitchan);
  809                 return (NULL);
  810         }
  811 
  812         /*
  813          * If we are here, we have a contiguous number of bytes >= len
  814          * available in our buffer starting at aq_writehead.
  815          */
  816         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  817         alq->aq_getpost.ae_bytesused = len;
  818 
  819         return (&alq->aq_getpost);
  820 }
  821 
  822 struct ale *
  823 alq_get(struct alq *alq, int flags)
  824 {
  825         /* Should only be called in fixed length message (legacy) mode. */
  826         KASSERT((alq->aq_flags & AQ_LEGACY),
  827             ("%s: fixed length get on variable length queue", __func__));
  828         return (alq_getn(alq, alq->aq_entlen, flags));
  829 }
  830 
  831 void
  832 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  833 {
  834         int activate;
  835         void *waitchan;
  836 
  837         activate = 0;
  838 
  839         if (ale->ae_bytesused > 0) {
  840                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  841                     !(flags & ALQ_NOACTIVATE)) {
  842                         alq->aq_flags |= AQ_ACTIVE;
  843                         activate = 1;
  844                 }
  845 
  846                 alq->aq_writehead += ale->ae_bytesused;
  847                 alq->aq_freebytes -= ale->ae_bytesused;
  848 
  849                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  850                 if (alq->aq_writehead == alq->aq_buflen)
  851                         alq->aq_writehead = 0;
  852 
  853                 KASSERT((alq->aq_writehead >= 0 &&
  854                     alq->aq_writehead < alq->aq_buflen),
  855                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  856                     __func__));
  857 
  858                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  859         }
  860 
  861         /*
  862          * If there are waiters, we need to signal the waiting threads after we
  863          * complete our work. The alq ptr is used as a wait channel for threads
  864          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  865          * are not allowed to concurrently compete for resources in the
  866          * alq_getn() while loop, so we use a different wait channel in this case.
  867          */
  868         if (alq->aq_waiters > 0) {
  869                 if (alq->aq_flags & AQ_ORDERED)
  870                         waitchan = &alq->aq_waiters;
  871                 else
  872                         waitchan = alq;
  873         } else
  874                 waitchan = NULL;
  875 
  876         ALQ_UNLOCK(alq);
  877 
  878         if (activate) {
  879                 ALD_LOCK();
  880                 ald_activate(alq);
  881                 ALD_UNLOCK();
  882         }
  883 
  884         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  885         if (waitchan != NULL)
  886                 wakeup_one(waitchan);
  887 }
  888 
  889 void
  890 alq_flush(struct alq *alq)
  891 {
  892         int needwakeup = 0;
  893 
  894         ALD_LOCK();
  895         ALQ_LOCK(alq);
  896 
  897         /*
  898          * Pull the lever iff there is data to flush and we're
  899          * not already in the middle of a flush operation.
  900          */
  901         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  902                 if (alq->aq_flags & AQ_ACTIVE)
  903                         ald_deactivate(alq);
  904 
  905                 ALD_UNLOCK();
  906                 needwakeup = alq_doio(alq);
  907         } else
  908                 ALD_UNLOCK();
  909 
  910         ALQ_UNLOCK(alq);
  911 
  912         if (needwakeup)
  913                 wakeup_one(alq);
  914 }
  915 
  916 /*
  917  * Flush remaining data, close the file and free all resources.
  918  */
  919 void
  920 alq_close(struct alq *alq)
  921 {
  922         /* Only flush and destroy alq if not already shutting down. */
  923         if (ald_rem(alq) == 0)
  924                 alq_destroy(alq);
  925 }
  926 
  927 static int
  928 alq_load_handler(module_t mod, int what, void *arg)
  929 {
  930         int ret;
  931         
  932         ret = 0;
  933 
  934         switch (what) {
  935         case MOD_LOAD:
  936         case MOD_SHUTDOWN:
  937                 break;
  938 
  939         case MOD_QUIESCE:
  940                 ALD_LOCK();
  941                 /* Only allow unload if there are no open queues. */
  942                 if (LIST_FIRST(&ald_queues) == NULL) {
  943                         ald_shutingdown = 1;
  944                         ALD_UNLOCK();
  945                         EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
  946                             alq_eventhandler_tag);
  947                         ald_shutdown(NULL, 0);
  948                         mtx_destroy(&ald_mtx);
  949                 } else {
  950                         ALD_UNLOCK();
  951                         ret = EBUSY;
  952                 }
  953                 break;
  954 
  955         case MOD_UNLOAD:
  956                 /* If MOD_QUIESCE failed we must fail here too. */
  957                 if (ald_shutingdown == 0)
  958                         ret = EBUSY;
  959                 break;
  960 
  961         default:
  962                 ret = EINVAL;
  963                 break;
  964         }
  965 
  966         return (ret);
  967 }
  968 
  969 static moduledata_t alq_mod =
  970 {
  971         "alq",
  972         alq_load_handler,
  973         NULL
  974 };
  975 
  976 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
  977 MODULE_VERSION(alq, 1);

Cache object: 5b00f3492c0bf0c24c85e35f6e70b938


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.