The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/kern/kern_alq.c

Version: -  FREEBSD  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-2  -  FREEBSD-11-1  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-4  -  FREEBSD-10-3  -  FREEBSD-10-2  -  FREEBSD-10-1  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-3  -  FREEBSD-9-2  -  FREEBSD-9-1  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-4  -  FREEBSD-8-3  -  FREEBSD-8-2  -  FREEBSD-8-1  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-4  -  FREEBSD-7-3  -  FREEBSD-7-2  -  FREEBSD-7-1  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-4  -  FREEBSD-6-3  -  FREEBSD-6-2  -  FREEBSD-6-1  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-5  -  FREEBSD-5-4  -  FREEBSD-5-3  -  FREEBSD-5-2  -  FREEBSD-5-1  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  linux-2.6  -  linux-2.4.22  -  MK83  -  MK84  -  PLAN9  -  DFBSD  -  NETBSD  -  NETBSD5  -  NETBSD4  -  NETBSD3  -  NETBSD20  -  OPENBSD  -  xnu-517  -  xnu-792  -  xnu-792.6.70  -  xnu-1228  -  xnu-1456.1.26  -  xnu-1699.24.8  -  xnu-2050.18.24  -  OPENSOLARIS  -  minix-3-1-1 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
    3  * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
    4  * Copyright (c) 2009-2010, The FreeBSD Foundation
    5  * All rights reserved.
    6  *
    7  * Portions of this software were developed at the Centre for Advanced
    8  * Internet Architectures, Swinburne University of Technology, Melbourne,
    9  * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
   10  *
   11  * Redistribution and use in source and binary forms, with or without
   12  * modification, are permitted provided that the following conditions
   13  * are met:
   14  * 1. Redistributions of source code must retain the above copyright
   15  *    notice unmodified, this list of conditions, and the following
   16  *    disclaimer.
   17  * 2. Redistributions in binary form must reproduce the above copyright
   18  *    notice, this list of conditions and the following disclaimer in the
   19  *    documentation and/or other materials provided with the distribution.
   20  *
   21  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   22  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   23  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   24  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   25  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   26  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   30  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   31  */
   32 
   33 #include <sys/cdefs.h>
   34 __FBSDID("$FreeBSD: stable/9/sys/kern/kern_alq.c 264365 2014-04-12 06:49:10Z dchagin $");
   35 
   36 #include "opt_mac.h"
   37 
   38 #include <sys/param.h>
   39 #include <sys/systm.h>
   40 #include <sys/kernel.h>
   41 #include <sys/kthread.h>
   42 #include <sys/lock.h>
   43 #include <sys/mount.h>
   44 #include <sys/mutex.h>
   45 #include <sys/namei.h>
   46 #include <sys/proc.h>
   47 #include <sys/vnode.h>
   48 #include <sys/alq.h>
   49 #include <sys/malloc.h>
   50 #include <sys/unistd.h>
   51 #include <sys/fcntl.h>
   52 #include <sys/eventhandler.h>
   53 
   54 #include <security/mac/mac_framework.h>
   55 
   56 /* Async. Logging Queue */
   57 struct alq {
   58         char    *aq_entbuf;             /* Buffer for stored entries */
   59         int     aq_entmax;              /* Max entries */
   60         int     aq_entlen;              /* Entry length */
   61         int     aq_freebytes;           /* Bytes available in buffer */
   62         int     aq_buflen;              /* Total length of our buffer */
   63         int     aq_writehead;           /* Location for next write */
   64         int     aq_writetail;           /* Flush starts at this location */
   65         int     aq_wrapearly;           /* # bytes left blank at end of buf */
   66         int     aq_flags;               /* Queue flags */
   67         int     aq_waiters;             /* Num threads waiting for resources
   68                                          * NB: Used as a wait channel so must
   69                                          * not be first field in the alq struct
   70                                          */
   71         struct  ale     aq_getpost;     /* ALE for use by get/post */
   72         struct mtx      aq_mtx;         /* Queue lock */
   73         struct vnode    *aq_vp;         /* Open vnode handle */
   74         struct ucred    *aq_cred;       /* Credentials of the opening thread */
   75         LIST_ENTRY(alq) aq_act;         /* List of active queues */
   76         LIST_ENTRY(alq) aq_link;        /* List of all queues */
   77 };
   78 
   79 #define AQ_WANTED       0x0001          /* Wakeup sleeper when io is done */
   80 #define AQ_ACTIVE       0x0002          /* on the active list */
   81 #define AQ_FLUSHING     0x0004          /* doing IO */
   82 #define AQ_SHUTDOWN     0x0008          /* Queue no longer valid */
   83 #define AQ_ORDERED      0x0010          /* Queue enforces ordered writes */
   84 #define AQ_LEGACY       0x0020          /* Legacy queue (fixed length writes) */
   85 
   86 #define ALQ_LOCK(alq)   mtx_lock_spin(&(alq)->aq_mtx)
   87 #define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
   88 
   89 #define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
   90 
   91 static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
   92 
   93 /*
   94  * The ald_mtx protects the ald_queues list and the ald_active list.
   95  */
   96 static struct mtx ald_mtx;
   97 static LIST_HEAD(, alq) ald_queues;
   98 static LIST_HEAD(, alq) ald_active;
   99 static int ald_shutingdown = 0;
  100 struct thread *ald_thread;
  101 static struct proc *ald_proc;
  102 static eventhandler_tag alq_eventhandler_tag = NULL;
  103 
  104 #define ALD_LOCK()      mtx_lock(&ald_mtx)
  105 #define ALD_UNLOCK()    mtx_unlock(&ald_mtx)
  106 
  107 /* Daemon functions */
  108 static int ald_add(struct alq *);
  109 static int ald_rem(struct alq *);
  110 static void ald_startup(void *);
  111 static void ald_daemon(void);
  112 static void ald_shutdown(void *, int);
  113 static void ald_activate(struct alq *);
  114 static void ald_deactivate(struct alq *);
  115 
  116 /* Internal queue functions */
  117 static void alq_shutdown(struct alq *);
  118 static void alq_destroy(struct alq *);
  119 static int alq_doio(struct alq *);
  120 
  121 
  122 /*
  123  * Add a new queue to the global list.  Fail if we're shutting down.
  124  */
  125 static int
  126 ald_add(struct alq *alq)
  127 {
  128         int error;
  129 
  130         error = 0;
  131 
  132         ALD_LOCK();
  133         if (ald_shutingdown) {
  134                 error = EBUSY;
  135                 goto done;
  136         }
  137         LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
  138 done:
  139         ALD_UNLOCK();
  140         return (error);
  141 }
  142 
  143 /*
  144  * Remove a queue from the global list unless we're shutting down.  If so,
  145  * the ald will take care of cleaning up it's resources.
  146  */
  147 static int
  148 ald_rem(struct alq *alq)
  149 {
  150         int error;
  151 
  152         error = 0;
  153 
  154         ALD_LOCK();
  155         if (ald_shutingdown) {
  156                 error = EBUSY;
  157                 goto done;
  158         }
  159         LIST_REMOVE(alq, aq_link);
  160 done:
  161         ALD_UNLOCK();
  162         return (error);
  163 }
  164 
  165 /*
  166  * Put a queue on the active list.  This will schedule it for writing.
  167  */
  168 static void
  169 ald_activate(struct alq *alq)
  170 {
  171         LIST_INSERT_HEAD(&ald_active, alq, aq_act);
  172         wakeup(&ald_active);
  173 }
  174 
  175 static void
  176 ald_deactivate(struct alq *alq)
  177 {
  178         LIST_REMOVE(alq, aq_act);
  179         alq->aq_flags &= ~AQ_ACTIVE;
  180 }
  181 
  182 static void
  183 ald_startup(void *unused)
  184 {
  185         mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
  186         LIST_INIT(&ald_queues);
  187         LIST_INIT(&ald_active);
  188 }
  189 
  190 static void
  191 ald_daemon(void)
  192 {
  193         int needwakeup;
  194         struct alq *alq;
  195 
  196         ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
  197 
  198         alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
  199             ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
  200 
  201         ALD_LOCK();
  202 
  203         for (;;) {
  204                 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
  205                     !ald_shutingdown)
  206                         mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
  207 
  208                 /* Don't shutdown until all active ALQs are flushed. */
  209                 if (ald_shutingdown && alq == NULL) {
  210                         ALD_UNLOCK();
  211                         break;
  212                 }
  213 
  214                 ALQ_LOCK(alq);
  215                 ald_deactivate(alq);
  216                 ALD_UNLOCK();
  217                 needwakeup = alq_doio(alq);
  218                 ALQ_UNLOCK(alq);
  219                 if (needwakeup)
  220                         wakeup_one(alq);
  221                 ALD_LOCK();
  222         }
  223 
  224         kproc_exit(0);
  225 }
  226 
  227 static void
  228 ald_shutdown(void *arg, int howto)
  229 {
  230         struct alq *alq;
  231 
  232         ALD_LOCK();
  233 
  234         /* Ensure no new queues can be created. */
  235         ald_shutingdown = 1;
  236 
  237         /* Shutdown all ALQs prior to terminating the ald_daemon. */
  238         while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
  239                 LIST_REMOVE(alq, aq_link);
  240                 ALD_UNLOCK();
  241                 alq_shutdown(alq);
  242                 ALD_LOCK();
  243         }
  244 
  245         /* At this point, all ALQs are flushed and shutdown. */
  246 
  247         /*
  248          * Wake ald_daemon so that it exits. It won't be able to do
  249          * anything until we mtx_sleep because we hold the ald_mtx.
  250          */
  251         wakeup(&ald_active);
  252 
  253         /* Wait for ald_daemon to exit. */
  254         mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
  255 
  256         ALD_UNLOCK();
  257 }
  258 
  259 static void
  260 alq_shutdown(struct alq *alq)
  261 {
  262         ALQ_LOCK(alq);
  263 
  264         /* Stop any new writers. */
  265         alq->aq_flags |= AQ_SHUTDOWN;
  266 
  267         /*
  268          * If the ALQ isn't active but has unwritten data (possible if
  269          * the ALQ_NOACTIVATE flag has been used), explicitly activate the
  270          * ALQ here so that the pending data gets flushed by the ald_daemon.
  271          */
  272         if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
  273                 alq->aq_flags |= AQ_ACTIVE;
  274                 ALQ_UNLOCK(alq);
  275                 ALD_LOCK();
  276                 ald_activate(alq);
  277                 ALD_UNLOCK();
  278                 ALQ_LOCK(alq);
  279         }
  280 
  281         /* Drain IO */
  282         while (alq->aq_flags & AQ_ACTIVE) {
  283                 alq->aq_flags |= AQ_WANTED;
  284                 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
  285         }
  286         ALQ_UNLOCK(alq);
  287 
  288         vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
  289             curthread);
  290         crfree(alq->aq_cred);
  291 }
  292 
  293 void
  294 alq_destroy(struct alq *alq)
  295 {
  296         /* Drain all pending IO. */
  297         alq_shutdown(alq);
  298 
  299         mtx_destroy(&alq->aq_mtx);
  300         free(alq->aq_entbuf, M_ALD);
  301         free(alq, M_ALD);
  302 }
  303 
  304 /*
  305  * Flush all pending data to disk.  This operation will block.
  306  */
  307 static int
  308 alq_doio(struct alq *alq)
  309 {
  310         struct thread *td;
  311         struct mount *mp;
  312         struct vnode *vp;
  313         struct uio auio;
  314         struct iovec aiov[2];
  315         int totlen;
  316         int iov;
  317         int vfslocked;
  318         int wrapearly;
  319 
  320         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  321 
  322         vp = alq->aq_vp;
  323         td = curthread;
  324         totlen = 0;
  325         iov = 1;
  326         wrapearly = alq->aq_wrapearly;
  327 
  328         bzero(&aiov, sizeof(aiov));
  329         bzero(&auio, sizeof(auio));
  330 
  331         /* Start the write from the location of our buffer tail pointer. */
  332         aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
  333 
  334         if (alq->aq_writetail < alq->aq_writehead) {
  335                 /* Buffer not wrapped. */
  336                 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
  337         } else if (alq->aq_writehead == 0) {
  338                 /* Buffer not wrapped (special case to avoid an empty iov). */
  339                 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  340                     wrapearly;
  341         } else {
  342                 /*
  343                  * Buffer wrapped, requires 2 aiov entries:
  344                  * - first is from writetail to end of buffer
  345                  * - second is from start of buffer to writehead
  346                  */
  347                 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
  348                     wrapearly;
  349                 iov++;
  350                 aiov[1].iov_base = alq->aq_entbuf;
  351                 aiov[1].iov_len =  alq->aq_writehead;
  352                 totlen = aiov[0].iov_len + aiov[1].iov_len;
  353         }
  354 
  355         alq->aq_flags |= AQ_FLUSHING;
  356         ALQ_UNLOCK(alq);
  357 
  358         auio.uio_iov = &aiov[0];
  359         auio.uio_offset = 0;
  360         auio.uio_segflg = UIO_SYSSPACE;
  361         auio.uio_rw = UIO_WRITE;
  362         auio.uio_iovcnt = iov;
  363         auio.uio_resid = totlen;
  364         auio.uio_td = td;
  365 
  366         /*
  367          * Do all of the junk required to write now.
  368          */
  369         vfslocked = VFS_LOCK_GIANT(vp->v_mount);
  370         vn_start_write(vp, &mp, V_WAIT);
  371         vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
  372         /*
  373          * XXX: VOP_WRITE error checks are ignored.
  374          */
  375 #ifdef MAC
  376         if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
  377 #endif
  378                 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
  379         VOP_UNLOCK(vp, 0);
  380         vn_finished_write(mp);
  381         VFS_UNLOCK_GIANT(vfslocked);
  382 
  383         ALQ_LOCK(alq);
  384         alq->aq_flags &= ~AQ_FLUSHING;
  385 
  386         /* Adjust writetail as required, taking into account wrapping. */
  387         alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
  388             alq->aq_buflen;
  389         alq->aq_freebytes += totlen + wrapearly;
  390 
  391         /*
  392          * If we just flushed part of the buffer which wrapped, reset the
  393          * wrapearly indicator.
  394          */
  395         if (wrapearly)
  396                 alq->aq_wrapearly = 0;
  397 
  398         /*
  399          * If we just flushed the buffer completely, reset indexes to 0 to
  400          * minimise buffer wraps.
  401          * This is also required to ensure alq_getn() can't wedge itself.
  402          */
  403         if (!HAS_PENDING_DATA(alq))
  404                 alq->aq_writehead = alq->aq_writetail = 0;
  405 
  406         KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
  407             ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
  408 
  409         if (alq->aq_flags & AQ_WANTED) {
  410                 alq->aq_flags &= ~AQ_WANTED;
  411                 return (1);
  412         }
  413 
  414         return(0);
  415 }
  416 
  417 static struct kproc_desc ald_kp = {
  418         "ALQ Daemon",
  419         ald_daemon,
  420         &ald_proc
  421 };
  422 
  423 SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
  424 SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
  425 
  426 
  427 /* User visible queue functions */
  428 
  429 /*
  430  * Create the queue data structure, allocate the buffer, and open the file.
  431  */
  432 
  433 int
  434 alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  435     int size, int flags)
  436 {
  437         struct thread *td;
  438         struct nameidata nd;
  439         struct alq *alq;
  440         int oflags;
  441         int error;
  442         int vfslocked;
  443 
  444         KASSERT((size > 0), ("%s: size <= 0", __func__));
  445 
  446         *alqp = NULL;
  447         td = curthread;
  448 
  449         NDINIT(&nd, LOOKUP, NOFOLLOW | MPSAFE, UIO_SYSSPACE, file, td);
  450         oflags = FWRITE | O_NOFOLLOW | O_CREAT;
  451 
  452         error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
  453         if (error)
  454                 return (error);
  455 
  456         vfslocked = NDHASGIANT(&nd);
  457         NDFREE(&nd, NDF_ONLY_PNBUF);
  458         /* We just unlock so we hold a reference */
  459         VOP_UNLOCK(nd.ni_vp, 0);
  460         VFS_UNLOCK_GIANT(vfslocked);
  461 
  462         alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
  463         alq->aq_vp = nd.ni_vp;
  464         alq->aq_cred = crhold(cred);
  465 
  466         mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
  467 
  468         alq->aq_buflen = size;
  469         alq->aq_entmax = 0;
  470         alq->aq_entlen = 0;
  471 
  472         alq->aq_freebytes = alq->aq_buflen;
  473         alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
  474         alq->aq_writehead = alq->aq_writetail = 0;
  475         if (flags & ALQ_ORDERED)
  476                 alq->aq_flags |= AQ_ORDERED;
  477 
  478         if ((error = ald_add(alq)) != 0) {
  479                 alq_destroy(alq);
  480                 return (error);
  481         }
  482 
  483         *alqp = alq;
  484 
  485         return (0);
  486 }
  487 
  488 int
  489 alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
  490     int size, int count)
  491 {
  492         int ret;
  493 
  494         KASSERT((count >= 0), ("%s: count < 0", __func__));
  495 
  496         if (count > 0) {
  497                 if ((ret = alq_open_flags(alqp, file, cred, cmode,
  498                     size*count, 0)) == 0) {
  499                         (*alqp)->aq_flags |= AQ_LEGACY;
  500                         (*alqp)->aq_entmax = count;
  501                         (*alqp)->aq_entlen = size;
  502                 }
  503         } else
  504                 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
  505 
  506         return (ret);
  507 }
  508 
  509 
  510 /*
  511  * Copy a new entry into the queue.  If the operation would block either
  512  * wait or return an error depending on the value of waitok.
  513  */
  514 int
  515 alq_writen(struct alq *alq, void *data, int len, int flags)
  516 {
  517         int activate, copy, ret;
  518         void *waitchan;
  519 
  520         KASSERT((len > 0 && len <= alq->aq_buflen),
  521             ("%s: len <= 0 || len > aq_buflen", __func__));
  522 
  523         activate = ret = 0;
  524         copy = len;
  525         waitchan = NULL;
  526 
  527         ALQ_LOCK(alq);
  528 
  529         /*
  530          * Fail to perform the write and return EWOULDBLOCK if:
  531          * - The message is larger than our underlying buffer.
  532          * - The ALQ is being shutdown.
  533          * - There is insufficient free space in our underlying buffer
  534          *   to accept the message and the user can't wait for space.
  535          * - There is insufficient free space in our underlying buffer
  536          *   to accept the message and the alq is inactive due to prior
  537          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  538          */
  539         if (len > alq->aq_buflen ||
  540             alq->aq_flags & AQ_SHUTDOWN ||
  541             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  542             HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
  543                 ALQ_UNLOCK(alq);
  544                 return (EWOULDBLOCK);
  545         }
  546 
  547         /*
  548          * If we want ordered writes and there is already at least one thread
  549          * waiting for resources to become available, sleep until we're woken.
  550          */
  551         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  552                 KASSERT(!(flags & ALQ_NOWAIT),
  553                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  554                 alq->aq_waiters++;
  555                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
  556                 alq->aq_waiters--;
  557         }
  558 
  559         /*
  560          * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
  561          * enter while loop and sleep until we have enough free bytes (former)
  562          * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
  563          * be in this loop. Otherwise, multiple threads may be sleeping here
  564          * competing for ALQ resources.
  565          */
  566         while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  567                 KASSERT(!(flags & ALQ_NOWAIT),
  568                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  569                 alq->aq_flags |= AQ_WANTED;
  570                 alq->aq_waiters++;
  571                 if (waitchan)
  572                         wakeup(waitchan);
  573                 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
  574                 alq->aq_waiters--;
  575 
  576                 /*
  577                  * If we're the first thread to wake after an AQ_WANTED wakeup
  578                  * but there isn't enough free space for us, we're going to loop
  579                  * and sleep again. If there are other threads waiting in this
  580                  * loop, schedule a wakeup so that they can see if the space
  581                  * they require is available.
  582                  */
  583                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  584                     alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
  585                         waitchan = alq;
  586                 else
  587                         waitchan = NULL;
  588         }
  589 
  590         /*
  591          * If there are waiters, we need to signal the waiting threads after we
  592          * complete our work. The alq ptr is used as a wait channel for threads
  593          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  594          * are not allowed to concurrently compete for resources in the above
  595          * while loop, so we use a different wait channel in this case.
  596          */
  597         if (alq->aq_waiters > 0) {
  598                 if (alq->aq_flags & AQ_ORDERED)
  599                         waitchan = &alq->aq_waiters;
  600                 else
  601                         waitchan = alq;
  602         } else
  603                 waitchan = NULL;
  604 
  605         /* Bail if we're shutting down. */
  606         if (alq->aq_flags & AQ_SHUTDOWN) {
  607                 ret = EWOULDBLOCK;
  608                 goto unlock;
  609         }
  610 
  611         /*
  612          * If we need to wrap the buffer to accommodate the write,
  613          * we'll need 2 calls to bcopy.
  614          */
  615         if ((alq->aq_buflen - alq->aq_writehead) < len)
  616                 copy = alq->aq_buflen - alq->aq_writehead;
  617 
  618         /* Copy message (or part thereof if wrap required) to the buffer. */
  619         bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
  620         alq->aq_writehead += copy;
  621 
  622         if (alq->aq_writehead >= alq->aq_buflen) {
  623                 KASSERT((alq->aq_writehead == alq->aq_buflen),
  624                     ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
  625                     __func__,
  626                     alq->aq_writehead,
  627                     alq->aq_buflen));
  628                 alq->aq_writehead = 0;
  629         }
  630 
  631         if (copy != len) {
  632                 /*
  633                  * Wrap the buffer by copying the remainder of our message
  634                  * to the start of the buffer and resetting aq_writehead.
  635                  */
  636                 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
  637                 alq->aq_writehead = len - copy;
  638         }
  639 
  640         KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
  641             ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
  642 
  643         alq->aq_freebytes -= len;
  644 
  645         if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
  646                 alq->aq_flags |= AQ_ACTIVE;
  647                 activate = 1;
  648         }
  649 
  650         KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  651 
  652 unlock:
  653         ALQ_UNLOCK(alq);
  654 
  655         if (activate) {
  656                 ALD_LOCK();
  657                 ald_activate(alq);
  658                 ALD_UNLOCK();
  659         }
  660 
  661         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  662         if (waitchan != NULL)
  663                 wakeup_one(waitchan);
  664 
  665         return (ret);
  666 }
  667 
  668 int
  669 alq_write(struct alq *alq, void *data, int flags)
  670 {
  671         /* Should only be called in fixed length message (legacy) mode. */
  672         KASSERT((alq->aq_flags & AQ_LEGACY),
  673             ("%s: fixed length write on variable length queue", __func__));
  674         return (alq_writen(alq, data, alq->aq_entlen, flags));
  675 }
  676 
  677 /*
  678  * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
  679  */
  680 struct ale *
  681 alq_getn(struct alq *alq, int len, int flags)
  682 {
  683         int contigbytes;
  684         void *waitchan;
  685 
  686         KASSERT((len > 0 && len <= alq->aq_buflen),
  687             ("%s: len <= 0 || len > alq->aq_buflen", __func__));
  688 
  689         waitchan = NULL;
  690 
  691         ALQ_LOCK(alq);
  692 
  693         /*
  694          * Determine the number of free contiguous bytes.
  695          * We ensure elsewhere that if aq_writehead == aq_writetail because
  696          * the buffer is empty, they will both be set to 0 and therefore
  697          * aq_freebytes == aq_buflen and is fully contiguous.
  698          * If they are equal and the buffer is not empty, aq_freebytes will
  699          * be 0 indicating the buffer is full.
  700          */
  701         if (alq->aq_writehead <= alq->aq_writetail)
  702                 contigbytes = alq->aq_freebytes;
  703         else {
  704                 contigbytes = alq->aq_buflen - alq->aq_writehead;
  705 
  706                 if (contigbytes < len) {
  707                         /*
  708                          * Insufficient space at end of buffer to handle a
  709                          * contiguous write. Wrap early if there's space at
  710                          * the beginning. This will leave a hole at the end
  711                          * of the buffer which we will have to skip over when
  712                          * flushing the buffer to disk.
  713                          */
  714                         if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
  715                                 /* Keep track of # bytes left blank. */
  716                                 alq->aq_wrapearly = contigbytes;
  717                                 /* Do the wrap and adjust counters. */
  718                                 contigbytes = alq->aq_freebytes =
  719                                     alq->aq_writetail;
  720                                 alq->aq_writehead = 0;
  721                         }
  722                 }
  723         }
  724 
  725         /*
  726          * Return a NULL ALE if:
  727          * - The message is larger than our underlying buffer.
  728          * - The ALQ is being shutdown.
  729          * - There is insufficient free space in our underlying buffer
  730          *   to accept the message and the user can't wait for space.
  731          * - There is insufficient free space in our underlying buffer
  732          *   to accept the message and the alq is inactive due to prior
  733          *   use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
  734          */
  735         if (len > alq->aq_buflen ||
  736             alq->aq_flags & AQ_SHUTDOWN ||
  737             (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
  738             HAS_PENDING_DATA(alq))) && contigbytes < len)) {
  739                 ALQ_UNLOCK(alq);
  740                 return (NULL);
  741         }
  742 
  743         /*
  744          * If we want ordered writes and there is already at least one thread
  745          * waiting for resources to become available, sleep until we're woken.
  746          */
  747         if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
  748                 KASSERT(!(flags & ALQ_NOWAIT),
  749                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  750                 alq->aq_waiters++;
  751                 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
  752                 alq->aq_waiters--;
  753         }
  754 
  755         /*
  756          * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
  757          * while loop and sleep until we have enough contiguous free bytes
  758          * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
  759          * time will be in this loop. Otherwise, multiple threads may be
  760          * sleeping here competing for ALQ resources.
  761          */
  762         while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
  763                 KASSERT(!(flags & ALQ_NOWAIT),
  764                     ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
  765                 alq->aq_flags |= AQ_WANTED;
  766                 alq->aq_waiters++;
  767                 if (waitchan)
  768                         wakeup(waitchan);
  769                 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
  770                 alq->aq_waiters--;
  771 
  772                 if (alq->aq_writehead <= alq->aq_writetail)
  773                         contigbytes = alq->aq_freebytes;
  774                 else
  775                         contigbytes = alq->aq_buflen - alq->aq_writehead;
  776 
  777                 /*
  778                  * If we're the first thread to wake after an AQ_WANTED wakeup
  779                  * but there isn't enough free space for us, we're going to loop
  780                  * and sleep again. If there are other threads waiting in this
  781                  * loop, schedule a wakeup so that they can see if the space
  782                  * they require is available.
  783                  */
  784                 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
  785                     contigbytes < len && !(alq->aq_flags & AQ_WANTED))
  786                         waitchan = alq;
  787                 else
  788                         waitchan = NULL;
  789         }
  790 
  791         /*
  792          * If there are waiters, we need to signal the waiting threads after we
  793          * complete our work. The alq ptr is used as a wait channel for threads
  794          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  795          * are not allowed to concurrently compete for resources in the above
  796          * while loop, so we use a different wait channel in this case.
  797          */
  798         if (alq->aq_waiters > 0) {
  799                 if (alq->aq_flags & AQ_ORDERED)
  800                         waitchan = &alq->aq_waiters;
  801                 else
  802                         waitchan = alq;
  803         } else
  804                 waitchan = NULL;
  805 
  806         /* Bail if we're shutting down. */
  807         if (alq->aq_flags & AQ_SHUTDOWN) {
  808                 ALQ_UNLOCK(alq);
  809                 if (waitchan != NULL)
  810                         wakeup_one(waitchan);
  811                 return (NULL);
  812         }
  813 
  814         /*
  815          * If we are here, we have a contiguous number of bytes >= len
  816          * available in our buffer starting at aq_writehead.
  817          */
  818         alq->aq_getpost.ae_data = alq->aq_entbuf + alq->aq_writehead;
  819         alq->aq_getpost.ae_bytesused = len;
  820 
  821         return (&alq->aq_getpost);
  822 }
  823 
  824 struct ale *
  825 alq_get(struct alq *alq, int flags)
  826 {
  827         /* Should only be called in fixed length message (legacy) mode. */
  828         KASSERT((alq->aq_flags & AQ_LEGACY),
  829             ("%s: fixed length get on variable length queue", __func__));
  830         return (alq_getn(alq, alq->aq_entlen, flags));
  831 }
  832 
  833 void
  834 alq_post_flags(struct alq *alq, struct ale *ale, int flags)
  835 {
  836         int activate;
  837         void *waitchan;
  838 
  839         activate = 0;
  840 
  841         if (ale->ae_bytesused > 0) {
  842                 if (!(alq->aq_flags & AQ_ACTIVE) &&
  843                     !(flags & ALQ_NOACTIVATE)) {
  844                         alq->aq_flags |= AQ_ACTIVE;
  845                         activate = 1;
  846                 }
  847 
  848                 alq->aq_writehead += ale->ae_bytesused;
  849                 alq->aq_freebytes -= ale->ae_bytesused;
  850 
  851                 /* Wrap aq_writehead if we filled to the end of the buffer. */
  852                 if (alq->aq_writehead == alq->aq_buflen)
  853                         alq->aq_writehead = 0;
  854 
  855                 KASSERT((alq->aq_writehead >= 0 &&
  856                     alq->aq_writehead < alq->aq_buflen),
  857                     ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
  858                     __func__));
  859 
  860                 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
  861         }
  862 
  863         /*
  864          * If there are waiters, we need to signal the waiting threads after we
  865          * complete our work. The alq ptr is used as a wait channel for threads
  866          * requiring resources to be freed up. In the AQ_ORDERED case, threads
  867          * are not allowed to concurrently compete for resources in the
  868          * alq_getn() while loop, so we use a different wait channel in this case.
  869          */
  870         if (alq->aq_waiters > 0) {
  871                 if (alq->aq_flags & AQ_ORDERED)
  872                         waitchan = &alq->aq_waiters;
  873                 else
  874                         waitchan = alq;
  875         } else
  876                 waitchan = NULL;
  877 
  878         ALQ_UNLOCK(alq);
  879 
  880         if (activate) {
  881                 ALD_LOCK();
  882                 ald_activate(alq);
  883                 ALD_UNLOCK();
  884         }
  885 
  886         /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
  887         if (waitchan != NULL)
  888                 wakeup_one(waitchan);
  889 }
  890 
  891 void
  892 alq_flush(struct alq *alq)
  893 {
  894         int needwakeup = 0;
  895 
  896         ALD_LOCK();
  897         ALQ_LOCK(alq);
  898 
  899         /*
  900          * Pull the lever iff there is data to flush and we're
  901          * not already in the middle of a flush operation.
  902          */
  903         if (HAS_PENDING_DATA(alq) && !(alq->aq_flags & AQ_FLUSHING)) {
  904                 if (alq->aq_flags & AQ_ACTIVE)
  905                         ald_deactivate(alq);
  906 
  907                 ALD_UNLOCK();
  908                 needwakeup = alq_doio(alq);
  909         } else
  910                 ALD_UNLOCK();
  911 
  912         ALQ_UNLOCK(alq);
  913 
  914         if (needwakeup)
  915                 wakeup_one(alq);
  916 }
  917 
  918 /*
  919  * Flush remaining data, close the file and free all resources.
  920  */
  921 void
  922 alq_close(struct alq *alq)
  923 {
  924         /* Only flush and destroy alq if not already shutting down. */
  925         if (ald_rem(alq) == 0)
  926                 alq_destroy(alq);
  927 }
  928 
  929 static int
  930 alq_load_handler(module_t mod, int what, void *arg)
  931 {
  932         int ret;
  933         
  934         ret = 0;
  935 
  936         switch (what) {
  937         case MOD_LOAD:
  938         case MOD_SHUTDOWN:
  939                 break;
  940 
  941         case MOD_QUIESCE:
  942                 ALD_LOCK();
  943                 /* Only allow unload if there are no open queues. */
  944                 if (LIST_FIRST(&ald_queues) == NULL) {
  945                         ald_shutingdown = 1;
  946                         ALD_UNLOCK();
  947                         EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
  948                             alq_eventhandler_tag);
  949                         ald_shutdown(NULL, 0);
  950                         mtx_destroy(&ald_mtx);
  951                 } else {
  952                         ALD_UNLOCK();
  953                         ret = EBUSY;
  954                 }
  955                 break;
  956 
  957         case MOD_UNLOAD:
  958                 /* If MOD_QUIESCE failed we must fail here too. */
  959                 if (ald_shutingdown == 0)
  960                         ret = EBUSY;
  961                 break;
  962 
  963         default:
  964                 ret = EINVAL;
  965                 break;
  966         }
  967 
  968         return (ret);
  969 }
  970 
  971 static moduledata_t alq_mod =
  972 {
  973         "alq",
  974         alq_load_handler,
  975         NULL
  976 };
  977 
  978 DECLARE_MODULE(alq, alq_mod, SI_SUB_SMP, SI_ORDER_ANY);
  979 MODULE_VERSION(alq, 1);

Cache object: 8028c645b5135b3cf1551b37548e9993


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.