The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/fs/aio.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  *      An async IO implementation for Linux
    3  *      Written by Benjamin LaHaise <bcrl@kvack.org>
    4  *
    5  *      Implements an efficient asynchronous io interface.
    6  *
    7  *      Copyright 2000, 2001, 2002 Red Hat, Inc.  All Rights Reserved.
    8  *
    9  *      See ../COPYING for licensing terms.
   10  */
   11 #include <linux/kernel.h>
   12 #include <linux/init.h>
   13 #include <linux/errno.h>
   14 #include <linux/time.h>
   15 #include <linux/aio_abi.h>
   16 #include <linux/export.h>
   17 #include <linux/syscalls.h>
   18 #include <linux/backing-dev.h>
   19 #include <linux/uio.h>
   20 
   21 #define DEBUG 0
   22 
   23 #include <linux/sched.h>
   24 #include <linux/fs.h>
   25 #include <linux/file.h>
   26 #include <linux/mm.h>
   27 #include <linux/mman.h>
   28 #include <linux/mmu_context.h>
   29 #include <linux/slab.h>
   30 #include <linux/timer.h>
   31 #include <linux/aio.h>
   32 #include <linux/highmem.h>
   33 #include <linux/workqueue.h>
   34 #include <linux/security.h>
   35 #include <linux/eventfd.h>
   36 #include <linux/blkdev.h>
   37 #include <linux/compat.h>
   38 
   39 #include <asm/kmap_types.h>
   40 #include <asm/uaccess.h>
   41 
   42 #if DEBUG > 1
   43 #define dprintk         printk
   44 #else
   45 #define dprintk(x...)   do { ; } while (0)
   46 #endif
   47 
   48 /*------ sysctl variables----*/
   49 static DEFINE_SPINLOCK(aio_nr_lock);
   50 unsigned long aio_nr;           /* current system wide number of aio requests */
   51 unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
   52 /*----end sysctl variables---*/
   53 
   54 static struct kmem_cache        *kiocb_cachep;
   55 static struct kmem_cache        *kioctx_cachep;
   56 
   57 static struct workqueue_struct *aio_wq;
   58 
   59 static void aio_kick_handler(struct work_struct *);
   60 static void aio_queue_work(struct kioctx *);
   61 
   62 /* aio_setup
   63  *      Creates the slab caches used by the aio routines, panic on
   64  *      failure as this is done early during the boot sequence.
   65  */
   66 static int __init aio_setup(void)
   67 {
   68         kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
   69         kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
   70 
   71         aio_wq = alloc_workqueue("aio", 0, 1);  /* used to limit concurrency */
   72         BUG_ON(!aio_wq);
   73 
   74         pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
   75 
   76         return 0;
   77 }
   78 __initcall(aio_setup);
   79 
   80 static void aio_free_ring(struct kioctx *ctx)
   81 {
   82         struct aio_ring_info *info = &ctx->ring_info;
   83         long i;
   84 
   85         for (i=0; i<info->nr_pages; i++)
   86                 put_page(info->ring_pages[i]);
   87 
   88         if (info->mmap_size) {
   89                 BUG_ON(ctx->mm != current->mm);
   90                 vm_munmap(info->mmap_base, info->mmap_size);
   91         }
   92 
   93         if (info->ring_pages && info->ring_pages != info->internal_pages)
   94                 kfree(info->ring_pages);
   95         info->ring_pages = NULL;
   96         info->nr = 0;
   97 }
   98 
   99 static int aio_setup_ring(struct kioctx *ctx)
  100 {
  101         struct aio_ring *ring;
  102         struct aio_ring_info *info = &ctx->ring_info;
  103         unsigned nr_events = ctx->max_reqs;
  104         unsigned long size;
  105         int nr_pages;
  106 
  107         /* Compensate for the ring buffer's head/tail overlap entry */
  108         nr_events += 2; /* 1 is required, 2 for good luck */
  109 
  110         size = sizeof(struct aio_ring);
  111         size += sizeof(struct io_event) * nr_events;
  112         nr_pages = (size + PAGE_SIZE-1) >> PAGE_SHIFT;
  113 
  114         if (nr_pages < 0)
  115                 return -EINVAL;
  116 
  117         nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
  118 
  119         info->nr = 0;
  120         info->ring_pages = info->internal_pages;
  121         if (nr_pages > AIO_RING_PAGES) {
  122                 info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
  123                 if (!info->ring_pages)
  124                         return -ENOMEM;
  125         }
  126 
  127         info->mmap_size = nr_pages * PAGE_SIZE;
  128         dprintk("attempting mmap of %lu bytes\n", info->mmap_size);
  129         down_write(&ctx->mm->mmap_sem);
  130         info->mmap_base = do_mmap_pgoff(NULL, 0, info->mmap_size, 
  131                                         PROT_READ|PROT_WRITE,
  132                                         MAP_ANONYMOUS|MAP_PRIVATE, 0);
  133         if (IS_ERR((void *)info->mmap_base)) {
  134                 up_write(&ctx->mm->mmap_sem);
  135                 info->mmap_size = 0;
  136                 aio_free_ring(ctx);
  137                 return -EAGAIN;
  138         }
  139 
  140         dprintk("mmap address: 0x%08lx\n", info->mmap_base);
  141         info->nr_pages = get_user_pages(current, ctx->mm,
  142                                         info->mmap_base, nr_pages, 
  143                                         1, 0, info->ring_pages, NULL);
  144         up_write(&ctx->mm->mmap_sem);
  145 
  146         if (unlikely(info->nr_pages != nr_pages)) {
  147                 aio_free_ring(ctx);
  148                 return -EAGAIN;
  149         }
  150 
  151         ctx->user_id = info->mmap_base;
  152 
  153         info->nr = nr_events;           /* trusted copy */
  154 
  155         ring = kmap_atomic(info->ring_pages[0]);
  156         ring->nr = nr_events;   /* user copy */
  157         ring->id = ctx->user_id;
  158         ring->head = ring->tail = 0;
  159         ring->magic = AIO_RING_MAGIC;
  160         ring->compat_features = AIO_RING_COMPAT_FEATURES;
  161         ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
  162         ring->header_length = sizeof(struct aio_ring);
  163         kunmap_atomic(ring);
  164 
  165         return 0;
  166 }
  167 
  168 
  169 /* aio_ring_event: returns a pointer to the event at the given index from
  170  * kmap_atomic().  Release the pointer with put_aio_ring_event();
  171  */
  172 #define AIO_EVENTS_PER_PAGE     (PAGE_SIZE / sizeof(struct io_event))
  173 #define AIO_EVENTS_FIRST_PAGE   ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
  174 #define AIO_EVENTS_OFFSET       (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
  175 
  176 #define aio_ring_event(info, nr) ({                                     \
  177         unsigned pos = (nr) + AIO_EVENTS_OFFSET;                        \
  178         struct io_event *__event;                                       \
  179         __event = kmap_atomic(                                          \
  180                         (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE]); \
  181         __event += pos % AIO_EVENTS_PER_PAGE;                           \
  182         __event;                                                        \
  183 })
  184 
  185 #define put_aio_ring_event(event) do {          \
  186         struct io_event *__event = (event);     \
  187         (void)__event;                          \
  188         kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
  189 } while(0)
  190 
  191 static void ctx_rcu_free(struct rcu_head *head)
  192 {
  193         struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
  194         kmem_cache_free(kioctx_cachep, ctx);
  195 }
  196 
  197 /* __put_ioctx
  198  *      Called when the last user of an aio context has gone away,
  199  *      and the struct needs to be freed.
  200  */
  201 static void __put_ioctx(struct kioctx *ctx)
  202 {
  203         unsigned nr_events = ctx->max_reqs;
  204         BUG_ON(ctx->reqs_active);
  205 
  206         cancel_delayed_work_sync(&ctx->wq);
  207         aio_free_ring(ctx);
  208         mmdrop(ctx->mm);
  209         ctx->mm = NULL;
  210         if (nr_events) {
  211                 spin_lock(&aio_nr_lock);
  212                 BUG_ON(aio_nr - nr_events > aio_nr);
  213                 aio_nr -= nr_events;
  214                 spin_unlock(&aio_nr_lock);
  215         }
  216         pr_debug("__put_ioctx: freeing %p\n", ctx);
  217         call_rcu(&ctx->rcu_head, ctx_rcu_free);
  218 }
  219 
  220 static inline int try_get_ioctx(struct kioctx *kioctx)
  221 {
  222         return atomic_inc_not_zero(&kioctx->users);
  223 }
  224 
  225 static inline void put_ioctx(struct kioctx *kioctx)
  226 {
  227         BUG_ON(atomic_read(&kioctx->users) <= 0);
  228         if (unlikely(atomic_dec_and_test(&kioctx->users)))
  229                 __put_ioctx(kioctx);
  230 }
  231 
  232 /* ioctx_alloc
  233  *      Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
  234  */
  235 static struct kioctx *ioctx_alloc(unsigned nr_events)
  236 {
  237         struct mm_struct *mm;
  238         struct kioctx *ctx;
  239         int err = -ENOMEM;
  240 
  241         /* Prevent overflows */
  242         if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
  243             (nr_events > (0x10000000U / sizeof(struct kiocb)))) {
  244                 pr_debug("ENOMEM: nr_events too high\n");
  245                 return ERR_PTR(-EINVAL);
  246         }
  247 
  248         if (!nr_events || (unsigned long)nr_events > aio_max_nr)
  249                 return ERR_PTR(-EAGAIN);
  250 
  251         ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
  252         if (!ctx)
  253                 return ERR_PTR(-ENOMEM);
  254 
  255         ctx->max_reqs = nr_events;
  256         mm = ctx->mm = current->mm;
  257         atomic_inc(&mm->mm_count);
  258 
  259         atomic_set(&ctx->users, 2);
  260         spin_lock_init(&ctx->ctx_lock);
  261         spin_lock_init(&ctx->ring_info.ring_lock);
  262         init_waitqueue_head(&ctx->wait);
  263 
  264         INIT_LIST_HEAD(&ctx->active_reqs);
  265         INIT_LIST_HEAD(&ctx->run_list);
  266         INIT_DELAYED_WORK(&ctx->wq, aio_kick_handler);
  267 
  268         if (aio_setup_ring(ctx) < 0)
  269                 goto out_freectx;
  270 
  271         /* limit the number of system wide aios */
  272         spin_lock(&aio_nr_lock);
  273         if (aio_nr + nr_events > aio_max_nr ||
  274             aio_nr + nr_events < aio_nr) {
  275                 spin_unlock(&aio_nr_lock);
  276                 goto out_cleanup;
  277         }
  278         aio_nr += ctx->max_reqs;
  279         spin_unlock(&aio_nr_lock);
  280 
  281         /* now link into global list. */
  282         spin_lock(&mm->ioctx_lock);
  283         hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
  284         spin_unlock(&mm->ioctx_lock);
  285 
  286         dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
  287                 ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
  288         return ctx;
  289 
  290 out_cleanup:
  291         err = -EAGAIN;
  292         aio_free_ring(ctx);
  293 out_freectx:
  294         mmdrop(mm);
  295         kmem_cache_free(kioctx_cachep, ctx);
  296         dprintk("aio: error allocating ioctx %d\n", err);
  297         return ERR_PTR(err);
  298 }
  299 
  300 /* kill_ctx
  301  *      Cancels all outstanding aio requests on an aio context.  Used 
  302  *      when the processes owning a context have all exited to encourage 
  303  *      the rapid destruction of the kioctx.
  304  */
  305 static void kill_ctx(struct kioctx *ctx)
  306 {
  307         int (*cancel)(struct kiocb *, struct io_event *);
  308         struct task_struct *tsk = current;
  309         DECLARE_WAITQUEUE(wait, tsk);
  310         struct io_event res;
  311 
  312         spin_lock_irq(&ctx->ctx_lock);
  313         ctx->dead = 1;
  314         while (!list_empty(&ctx->active_reqs)) {
  315                 struct list_head *pos = ctx->active_reqs.next;
  316                 struct kiocb *iocb = list_kiocb(pos);
  317                 list_del_init(&iocb->ki_list);
  318                 cancel = iocb->ki_cancel;
  319                 kiocbSetCancelled(iocb);
  320                 if (cancel) {
  321                         iocb->ki_users++;
  322                         spin_unlock_irq(&ctx->ctx_lock);
  323                         cancel(iocb, &res);
  324                         spin_lock_irq(&ctx->ctx_lock);
  325                 }
  326         }
  327 
  328         if (!ctx->reqs_active)
  329                 goto out;
  330 
  331         add_wait_queue(&ctx->wait, &wait);
  332         set_task_state(tsk, TASK_UNINTERRUPTIBLE);
  333         while (ctx->reqs_active) {
  334                 spin_unlock_irq(&ctx->ctx_lock);
  335                 io_schedule();
  336                 set_task_state(tsk, TASK_UNINTERRUPTIBLE);
  337                 spin_lock_irq(&ctx->ctx_lock);
  338         }
  339         __set_task_state(tsk, TASK_RUNNING);
  340         remove_wait_queue(&ctx->wait, &wait);
  341 
  342 out:
  343         spin_unlock_irq(&ctx->ctx_lock);
  344 }
  345 
  346 /* wait_on_sync_kiocb:
  347  *      Waits on the given sync kiocb to complete.
  348  */
  349 ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
  350 {
  351         while (iocb->ki_users) {
  352                 set_current_state(TASK_UNINTERRUPTIBLE);
  353                 if (!iocb->ki_users)
  354                         break;
  355                 io_schedule();
  356         }
  357         __set_current_state(TASK_RUNNING);
  358         return iocb->ki_user_data;
  359 }
  360 EXPORT_SYMBOL(wait_on_sync_kiocb);
  361 
  362 /* exit_aio: called when the last user of mm goes away.  At this point, 
  363  * there is no way for any new requests to be submited or any of the 
  364  * io_* syscalls to be called on the context.  However, there may be 
  365  * outstanding requests which hold references to the context; as they 
  366  * go away, they will call put_ioctx and release any pinned memory
  367  * associated with the request (held via struct page * references).
  368  */
  369 void exit_aio(struct mm_struct *mm)
  370 {
  371         struct kioctx *ctx;
  372 
  373         while (!hlist_empty(&mm->ioctx_list)) {
  374                 ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
  375                 hlist_del_rcu(&ctx->list);
  376 
  377                 kill_ctx(ctx);
  378 
  379                 if (1 != atomic_read(&ctx->users))
  380                         printk(KERN_DEBUG
  381                                 "exit_aio:ioctx still alive: %d %d %d\n",
  382                                 atomic_read(&ctx->users), ctx->dead,
  383                                 ctx->reqs_active);
  384                 /*
  385                  * We don't need to bother with munmap() here -
  386                  * exit_mmap(mm) is coming and it'll unmap everything.
  387                  * Since aio_free_ring() uses non-zero ->mmap_size
  388                  * as indicator that it needs to unmap the area,
  389                  * just set it to 0; aio_free_ring() is the only
  390                  * place that uses ->mmap_size, so it's safe.
  391                  * That way we get all munmap done to current->mm -
  392                  * all other callers have ctx->mm == current->mm.
  393                  */
  394                 ctx->ring_info.mmap_size = 0;
  395                 put_ioctx(ctx);
  396         }
  397 }
  398 
  399 /* aio_get_req
  400  *      Allocate a slot for an aio request.  Increments the users count
  401  * of the kioctx so that the kioctx stays around until all requests are
  402  * complete.  Returns NULL if no requests are free.
  403  *
  404  * Returns with kiocb->users set to 2.  The io submit code path holds
  405  * an extra reference while submitting the i/o.
  406  * This prevents races between the aio code path referencing the
  407  * req (after submitting it) and aio_complete() freeing the req.
  408  */
  409 static struct kiocb *__aio_get_req(struct kioctx *ctx)
  410 {
  411         struct kiocb *req = NULL;
  412 
  413         req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
  414         if (unlikely(!req))
  415                 return NULL;
  416 
  417         req->ki_flags = 0;
  418         req->ki_users = 2;
  419         req->ki_key = 0;
  420         req->ki_ctx = ctx;
  421         req->ki_cancel = NULL;
  422         req->ki_retry = NULL;
  423         req->ki_dtor = NULL;
  424         req->private = NULL;
  425         req->ki_iovec = NULL;
  426         INIT_LIST_HEAD(&req->ki_run_list);
  427         req->ki_eventfd = NULL;
  428 
  429         return req;
  430 }
  431 
  432 /*
  433  * struct kiocb's are allocated in batches to reduce the number of
  434  * times the ctx lock is acquired and released.
  435  */
  436 #define KIOCB_BATCH_SIZE        32L
  437 struct kiocb_batch {
  438         struct list_head head;
  439         long count; /* number of requests left to allocate */
  440 };
  441 
  442 static void kiocb_batch_init(struct kiocb_batch *batch, long total)
  443 {
  444         INIT_LIST_HEAD(&batch->head);
  445         batch->count = total;
  446 }
  447 
  448 static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
  449 {
  450         struct kiocb *req, *n;
  451 
  452         if (list_empty(&batch->head))
  453                 return;
  454 
  455         spin_lock_irq(&ctx->ctx_lock);
  456         list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
  457                 list_del(&req->ki_batch);
  458                 list_del(&req->ki_list);
  459                 kmem_cache_free(kiocb_cachep, req);
  460                 ctx->reqs_active--;
  461         }
  462         if (unlikely(!ctx->reqs_active && ctx->dead))
  463                 wake_up_all(&ctx->wait);
  464         spin_unlock_irq(&ctx->ctx_lock);
  465 }
  466 
  467 /*
  468  * Allocate a batch of kiocbs.  This avoids taking and dropping the
  469  * context lock a lot during setup.
  470  */
  471 static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
  472 {
  473         unsigned short allocated, to_alloc;
  474         long avail;
  475         struct kiocb *req, *n;
  476         struct aio_ring *ring;
  477 
  478         to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
  479         for (allocated = 0; allocated < to_alloc; allocated++) {
  480                 req = __aio_get_req(ctx);
  481                 if (!req)
  482                         /* allocation failed, go with what we've got */
  483                         break;
  484                 list_add(&req->ki_batch, &batch->head);
  485         }
  486 
  487         if (allocated == 0)
  488                 goto out;
  489 
  490         spin_lock_irq(&ctx->ctx_lock);
  491         ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
  492 
  493         avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
  494         BUG_ON(avail < 0);
  495         if (avail < allocated) {
  496                 /* Trim back the number of requests. */
  497                 list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
  498                         list_del(&req->ki_batch);
  499                         kmem_cache_free(kiocb_cachep, req);
  500                         if (--allocated <= avail)
  501                                 break;
  502                 }
  503         }
  504 
  505         batch->count -= allocated;
  506         list_for_each_entry(req, &batch->head, ki_batch) {
  507                 list_add(&req->ki_list, &ctx->active_reqs);
  508                 ctx->reqs_active++;
  509         }
  510 
  511         kunmap_atomic(ring);
  512         spin_unlock_irq(&ctx->ctx_lock);
  513 
  514 out:
  515         return allocated;
  516 }
  517 
  518 static inline struct kiocb *aio_get_req(struct kioctx *ctx,
  519                                         struct kiocb_batch *batch)
  520 {
  521         struct kiocb *req;
  522 
  523         if (list_empty(&batch->head))
  524                 if (kiocb_batch_refill(ctx, batch) == 0)
  525                         return NULL;
  526         req = list_first_entry(&batch->head, struct kiocb, ki_batch);
  527         list_del(&req->ki_batch);
  528         return req;
  529 }
  530 
  531 static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
  532 {
  533         assert_spin_locked(&ctx->ctx_lock);
  534 
  535         if (req->ki_eventfd != NULL)
  536                 eventfd_ctx_put(req->ki_eventfd);
  537         if (req->ki_dtor)
  538                 req->ki_dtor(req);
  539         if (req->ki_iovec != &req->ki_inline_vec)
  540                 kfree(req->ki_iovec);
  541         kmem_cache_free(kiocb_cachep, req);
  542         ctx->reqs_active--;
  543 
  544         if (unlikely(!ctx->reqs_active && ctx->dead))
  545                 wake_up_all(&ctx->wait);
  546 }
  547 
  548 /* __aio_put_req
  549  *      Returns true if this put was the last user of the request.
  550  */
  551 static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
  552 {
  553         dprintk(KERN_DEBUG "aio_put(%p): f_count=%ld\n",
  554                 req, atomic_long_read(&req->ki_filp->f_count));
  555 
  556         assert_spin_locked(&ctx->ctx_lock);
  557 
  558         req->ki_users--;
  559         BUG_ON(req->ki_users < 0);
  560         if (likely(req->ki_users))
  561                 return 0;
  562         list_del(&req->ki_list);                /* remove from active_reqs */
  563         req->ki_cancel = NULL;
  564         req->ki_retry = NULL;
  565 
  566         fput(req->ki_filp);
  567         req->ki_filp = NULL;
  568         really_put_req(ctx, req);
  569         return 1;
  570 }
  571 
  572 /* aio_put_req
  573  *      Returns true if this put was the last user of the kiocb,
  574  *      false if the request is still in use.
  575  */
  576 int aio_put_req(struct kiocb *req)
  577 {
  578         struct kioctx *ctx = req->ki_ctx;
  579         int ret;
  580         spin_lock_irq(&ctx->ctx_lock);
  581         ret = __aio_put_req(ctx, req);
  582         spin_unlock_irq(&ctx->ctx_lock);
  583         return ret;
  584 }
  585 EXPORT_SYMBOL(aio_put_req);
  586 
  587 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
  588 {
  589         struct mm_struct *mm = current->mm;
  590         struct kioctx *ctx, *ret = NULL;
  591         struct hlist_node *n;
  592 
  593         rcu_read_lock();
  594 
  595         hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
  596                 /*
  597                  * RCU protects us against accessing freed memory but
  598                  * we have to be careful not to get a reference when the
  599                  * reference count already dropped to 0 (ctx->dead test
  600                  * is unreliable because of races).
  601                  */
  602                 if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
  603                         ret = ctx;
  604                         break;
  605                 }
  606         }
  607 
  608         rcu_read_unlock();
  609         return ret;
  610 }
  611 
  612 /*
  613  * Queue up a kiocb to be retried. Assumes that the kiocb
  614  * has already been marked as kicked, and places it on
  615  * the retry run list for the corresponding ioctx, if it
  616  * isn't already queued. Returns 1 if it actually queued
  617  * the kiocb (to tell the caller to activate the work
  618  * queue to process it), or 0, if it found that it was
  619  * already queued.
  620  */
  621 static inline int __queue_kicked_iocb(struct kiocb *iocb)
  622 {
  623         struct kioctx *ctx = iocb->ki_ctx;
  624 
  625         assert_spin_locked(&ctx->ctx_lock);
  626 
  627         if (list_empty(&iocb->ki_run_list)) {
  628                 list_add_tail(&iocb->ki_run_list,
  629                         &ctx->run_list);
  630                 return 1;
  631         }
  632         return 0;
  633 }
  634 
  635 /* aio_run_iocb
  636  *      This is the core aio execution routine. It is
  637  *      invoked both for initial i/o submission and
  638  *      subsequent retries via the aio_kick_handler.
  639  *      Expects to be invoked with iocb->ki_ctx->lock
  640  *      already held. The lock is released and reacquired
  641  *      as needed during processing.
  642  *
  643  * Calls the iocb retry method (already setup for the
  644  * iocb on initial submission) for operation specific
  645  * handling, but takes care of most of common retry
  646  * execution details for a given iocb. The retry method
  647  * needs to be non-blocking as far as possible, to avoid
  648  * holding up other iocbs waiting to be serviced by the
  649  * retry kernel thread.
  650  *
  651  * The trickier parts in this code have to do with
  652  * ensuring that only one retry instance is in progress
  653  * for a given iocb at any time. Providing that guarantee
  654  * simplifies the coding of individual aio operations as
  655  * it avoids various potential races.
  656  */
  657 static ssize_t aio_run_iocb(struct kiocb *iocb)
  658 {
  659         struct kioctx   *ctx = iocb->ki_ctx;
  660         ssize_t (*retry)(struct kiocb *);
  661         ssize_t ret;
  662 
  663         if (!(retry = iocb->ki_retry)) {
  664                 printk("aio_run_iocb: iocb->ki_retry = NULL\n");
  665                 return 0;
  666         }
  667 
  668         /*
  669          * We don't want the next retry iteration for this
  670          * operation to start until this one has returned and
  671          * updated the iocb state. However, wait_queue functions
  672          * can trigger a kick_iocb from interrupt context in the
  673          * meantime, indicating that data is available for the next
  674          * iteration. We want to remember that and enable the
  675          * next retry iteration _after_ we are through with
  676          * this one.
  677          *
  678          * So, in order to be able to register a "kick", but
  679          * prevent it from being queued now, we clear the kick
  680          * flag, but make the kick code *think* that the iocb is
  681          * still on the run list until we are actually done.
  682          * When we are done with this iteration, we check if
  683          * the iocb was kicked in the meantime and if so, queue
  684          * it up afresh.
  685          */
  686 
  687         kiocbClearKicked(iocb);
  688 
  689         /*
  690          * This is so that aio_complete knows it doesn't need to
  691          * pull the iocb off the run list (We can't just call
  692          * INIT_LIST_HEAD because we don't want a kick_iocb to
  693          * queue this on the run list yet)
  694          */
  695         iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
  696         spin_unlock_irq(&ctx->ctx_lock);
  697 
  698         /* Quit retrying if the i/o has been cancelled */
  699         if (kiocbIsCancelled(iocb)) {
  700                 ret = -EINTR;
  701                 aio_complete(iocb, ret, 0);
  702                 /* must not access the iocb after this */
  703                 goto out;
  704         }
  705 
  706         /*
  707          * Now we are all set to call the retry method in async
  708          * context.
  709          */
  710         ret = retry(iocb);
  711 
  712         if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
  713                 /*
  714                  * There's no easy way to restart the syscall since other AIO's
  715                  * may be already running. Just fail this IO with EINTR.
  716                  */
  717                 if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
  718                              ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
  719                         ret = -EINTR;
  720                 aio_complete(iocb, ret, 0);
  721         }
  722 out:
  723         spin_lock_irq(&ctx->ctx_lock);
  724 
  725         if (-EIOCBRETRY == ret) {
  726                 /*
  727                  * OK, now that we are done with this iteration
  728                  * and know that there is more left to go,
  729                  * this is where we let go so that a subsequent
  730                  * "kick" can start the next iteration
  731                  */
  732 
  733                 /* will make __queue_kicked_iocb succeed from here on */
  734                 INIT_LIST_HEAD(&iocb->ki_run_list);
  735                 /* we must queue the next iteration ourselves, if it
  736                  * has already been kicked */
  737                 if (kiocbIsKicked(iocb)) {
  738                         __queue_kicked_iocb(iocb);
  739 
  740                         /*
  741                          * __queue_kicked_iocb will always return 1 here, because
  742                          * iocb->ki_run_list is empty at this point so it should
  743                          * be safe to unconditionally queue the context into the
  744                          * work queue.
  745                          */
  746                         aio_queue_work(ctx);
  747                 }
  748         }
  749         return ret;
  750 }
  751 
  752 /*
  753  * __aio_run_iocbs:
  754  *      Process all pending retries queued on the ioctx
  755  *      run list.
  756  * Assumes it is operating within the aio issuer's mm
  757  * context.
  758  */
  759 static int __aio_run_iocbs(struct kioctx *ctx)
  760 {
  761         struct kiocb *iocb;
  762         struct list_head run_list;
  763 
  764         assert_spin_locked(&ctx->ctx_lock);
  765 
  766         list_replace_init(&ctx->run_list, &run_list);
  767         while (!list_empty(&run_list)) {
  768                 iocb = list_entry(run_list.next, struct kiocb,
  769                         ki_run_list);
  770                 list_del(&iocb->ki_run_list);
  771                 /*
  772                  * Hold an extra reference while retrying i/o.
  773                  */
  774                 iocb->ki_users++;       /* grab extra reference */
  775                 aio_run_iocb(iocb);
  776                 __aio_put_req(ctx, iocb);
  777         }
  778         if (!list_empty(&ctx->run_list))
  779                 return 1;
  780         return 0;
  781 }
  782 
  783 static void aio_queue_work(struct kioctx * ctx)
  784 {
  785         unsigned long timeout;
  786         /*
  787          * if someone is waiting, get the work started right
  788          * away, otherwise, use a longer delay
  789          */
  790         smp_mb();
  791         if (waitqueue_active(&ctx->wait))
  792                 timeout = 1;
  793         else
  794                 timeout = HZ/10;
  795         queue_delayed_work(aio_wq, &ctx->wq, timeout);
  796 }
  797 
  798 /*
  799  * aio_run_all_iocbs:
  800  *      Process all pending retries queued on the ioctx
  801  *      run list, and keep running them until the list
  802  *      stays empty.
  803  * Assumes it is operating within the aio issuer's mm context.
  804  */
  805 static inline void aio_run_all_iocbs(struct kioctx *ctx)
  806 {
  807         spin_lock_irq(&ctx->ctx_lock);
  808         while (__aio_run_iocbs(ctx))
  809                 ;
  810         spin_unlock_irq(&ctx->ctx_lock);
  811 }
  812 
  813 /*
  814  * aio_kick_handler:
  815  *      Work queue handler triggered to process pending
  816  *      retries on an ioctx. Takes on the aio issuer's
  817  *      mm context before running the iocbs, so that
  818  *      copy_xxx_user operates on the issuer's address
  819  *      space.
  820  * Run on aiod's context.
  821  */
  822 static void aio_kick_handler(struct work_struct *work)
  823 {
  824         struct kioctx *ctx = container_of(work, struct kioctx, wq.work);
  825         mm_segment_t oldfs = get_fs();
  826         struct mm_struct *mm;
  827         int requeue;
  828 
  829         set_fs(USER_DS);
  830         use_mm(ctx->mm);
  831         spin_lock_irq(&ctx->ctx_lock);
  832         requeue =__aio_run_iocbs(ctx);
  833         mm = ctx->mm;
  834         spin_unlock_irq(&ctx->ctx_lock);
  835         unuse_mm(mm);
  836         set_fs(oldfs);
  837         /*
  838          * we're in a worker thread already; no point using non-zero delay
  839          */
  840         if (requeue)
  841                 queue_delayed_work(aio_wq, &ctx->wq, 0);
  842 }
  843 
  844 
  845 /*
  846  * Called by kick_iocb to queue the kiocb for retry
  847  * and if required activate the aio work queue to process
  848  * it
  849  */
  850 static void try_queue_kicked_iocb(struct kiocb *iocb)
  851 {
  852         struct kioctx   *ctx = iocb->ki_ctx;
  853         unsigned long flags;
  854         int run = 0;
  855 
  856         spin_lock_irqsave(&ctx->ctx_lock, flags);
  857         /* set this inside the lock so that we can't race with aio_run_iocb()
  858          * testing it and putting the iocb on the run list under the lock */
  859         if (!kiocbTryKick(iocb))
  860                 run = __queue_kicked_iocb(iocb);
  861         spin_unlock_irqrestore(&ctx->ctx_lock, flags);
  862         if (run)
  863                 aio_queue_work(ctx);
  864 }
  865 
  866 /*
  867  * kick_iocb:
  868  *      Called typically from a wait queue callback context
  869  *      to trigger a retry of the iocb.
  870  *      The retry is usually executed by aio workqueue
  871  *      threads (See aio_kick_handler).
  872  */
  873 void kick_iocb(struct kiocb *iocb)
  874 {
  875         /* sync iocbs are easy: they can only ever be executing from a 
  876          * single context. */
  877         if (is_sync_kiocb(iocb)) {
  878                 kiocbSetKicked(iocb);
  879                 wake_up_process(iocb->ki_obj.tsk);
  880                 return;
  881         }
  882 
  883         try_queue_kicked_iocb(iocb);
  884 }
  885 EXPORT_SYMBOL(kick_iocb);
  886 
  887 /* aio_complete
  888  *      Called when the io request on the given iocb is complete.
  889  *      Returns true if this is the last user of the request.  The 
  890  *      only other user of the request can be the cancellation code.
  891  */
  892 int aio_complete(struct kiocb *iocb, long res, long res2)
  893 {
  894         struct kioctx   *ctx = iocb->ki_ctx;
  895         struct aio_ring_info    *info;
  896         struct aio_ring *ring;
  897         struct io_event *event;
  898         unsigned long   flags;
  899         unsigned long   tail;
  900         int             ret;
  901 
  902         /*
  903          * Special case handling for sync iocbs:
  904          *  - events go directly into the iocb for fast handling
  905          *  - the sync task with the iocb in its stack holds the single iocb
  906          *    ref, no other paths have a way to get another ref
  907          *  - the sync task helpfully left a reference to itself in the iocb
  908          */
  909         if (is_sync_kiocb(iocb)) {
  910                 BUG_ON(iocb->ki_users != 1);
  911                 iocb->ki_user_data = res;
  912                 iocb->ki_users = 0;
  913                 wake_up_process(iocb->ki_obj.tsk);
  914                 return 1;
  915         }
  916 
  917         info = &ctx->ring_info;
  918 
  919         /* add a completion event to the ring buffer.
  920          * must be done holding ctx->ctx_lock to prevent
  921          * other code from messing with the tail
  922          * pointer since we might be called from irq
  923          * context.
  924          */
  925         spin_lock_irqsave(&ctx->ctx_lock, flags);
  926 
  927         if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
  928                 list_del_init(&iocb->ki_run_list);
  929 
  930         /*
  931          * cancelled requests don't get events, userland was given one
  932          * when the event got cancelled.
  933          */
  934         if (kiocbIsCancelled(iocb))
  935                 goto put_rq;
  936 
  937         ring = kmap_atomic(info->ring_pages[0]);
  938 
  939         tail = info->tail;
  940         event = aio_ring_event(info, tail);
  941         if (++tail >= info->nr)
  942                 tail = 0;
  943 
  944         event->obj = (u64)(unsigned long)iocb->ki_obj.user;
  945         event->data = iocb->ki_user_data;
  946         event->res = res;
  947         event->res2 = res2;
  948 
  949         dprintk("aio_complete: %p[%lu]: %p: %p %Lx %lx %lx\n",
  950                 ctx, tail, iocb, iocb->ki_obj.user, iocb->ki_user_data,
  951                 res, res2);
  952 
  953         /* after flagging the request as done, we
  954          * must never even look at it again
  955          */
  956         smp_wmb();      /* make event visible before updating tail */
  957 
  958         info->tail = tail;
  959         ring->tail = tail;
  960 
  961         put_aio_ring_event(event);
  962         kunmap_atomic(ring);
  963 
  964         pr_debug("added to ring %p at [%lu]\n", iocb, tail);
  965 
  966         /*
  967          * Check if the user asked us to deliver the result through an
  968          * eventfd. The eventfd_signal() function is safe to be called
  969          * from IRQ context.
  970          */
  971         if (iocb->ki_eventfd != NULL)
  972                 eventfd_signal(iocb->ki_eventfd, 1);
  973 
  974 put_rq:
  975         /* everything turned out well, dispose of the aiocb. */
  976         ret = __aio_put_req(ctx, iocb);
  977 
  978         /*
  979          * We have to order our ring_info tail store above and test
  980          * of the wait list below outside the wait lock.  This is
  981          * like in wake_up_bit() where clearing a bit has to be
  982          * ordered with the unlocked test.
  983          */
  984         smp_mb();
  985 
  986         if (waitqueue_active(&ctx->wait))
  987                 wake_up(&ctx->wait);
  988 
  989         spin_unlock_irqrestore(&ctx->ctx_lock, flags);
  990         return ret;
  991 }
  992 EXPORT_SYMBOL(aio_complete);
  993 
  994 /* aio_read_evt
  995  *      Pull an event off of the ioctx's event ring.  Returns the number of 
  996  *      events fetched (0 or 1 ;-)
  997  *      FIXME: make this use cmpxchg.
  998  *      TODO: make the ringbuffer user mmap()able (requires FIXME).
  999  */
 1000 static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
 1001 {
 1002         struct aio_ring_info *info = &ioctx->ring_info;
 1003         struct aio_ring *ring;
 1004         unsigned long head;
 1005         int ret = 0;
 1006 
 1007         ring = kmap_atomic(info->ring_pages[0]);
 1008         dprintk("in aio_read_evt h%lu t%lu m%lu\n",
 1009                  (unsigned long)ring->head, (unsigned long)ring->tail,
 1010                  (unsigned long)ring->nr);
 1011 
 1012         if (ring->head == ring->tail)
 1013                 goto out;
 1014 
 1015         spin_lock(&info->ring_lock);
 1016 
 1017         head = ring->head % info->nr;
 1018         if (head != ring->tail) {
 1019                 struct io_event *evp = aio_ring_event(info, head);
 1020                 *ent = *evp;
 1021                 head = (head + 1) % info->nr;
 1022                 smp_mb(); /* finish reading the event before updatng the head */
 1023                 ring->head = head;
 1024                 ret = 1;
 1025                 put_aio_ring_event(evp);
 1026         }
 1027         spin_unlock(&info->ring_lock);
 1028 
 1029 out:
 1030         kunmap_atomic(ring);
 1031         dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
 1032                  (unsigned long)ring->head, (unsigned long)ring->tail);
 1033         return ret;
 1034 }
 1035 
 1036 struct aio_timeout {
 1037         struct timer_list       timer;
 1038         int                     timed_out;
 1039         struct task_struct      *p;
 1040 };
 1041 
 1042 static void timeout_func(unsigned long data)
 1043 {
 1044         struct aio_timeout *to = (struct aio_timeout *)data;
 1045 
 1046         to->timed_out = 1;
 1047         wake_up_process(to->p);
 1048 }
 1049 
 1050 static inline void init_timeout(struct aio_timeout *to)
 1051 {
 1052         setup_timer_on_stack(&to->timer, timeout_func, (unsigned long) to);
 1053         to->timed_out = 0;
 1054         to->p = current;
 1055 }
 1056 
 1057 static inline void set_timeout(long start_jiffies, struct aio_timeout *to,
 1058                                const struct timespec *ts)
 1059 {
 1060         to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
 1061         if (time_after(to->timer.expires, jiffies))
 1062                 add_timer(&to->timer);
 1063         else
 1064                 to->timed_out = 1;
 1065 }
 1066 
 1067 static inline void clear_timeout(struct aio_timeout *to)
 1068 {
 1069         del_singleshot_timer_sync(&to->timer);
 1070 }
 1071 
 1072 static int read_events(struct kioctx *ctx,
 1073                         long min_nr, long nr,
 1074                         struct io_event __user *event,
 1075                         struct timespec __user *timeout)
 1076 {
 1077         long                    start_jiffies = jiffies;
 1078         struct task_struct      *tsk = current;
 1079         DECLARE_WAITQUEUE(wait, tsk);
 1080         int                     ret;
 1081         int                     i = 0;
 1082         struct io_event         ent;
 1083         struct aio_timeout      to;
 1084         int                     retry = 0;
 1085 
 1086         /* needed to zero any padding within an entry (there shouldn't be 
 1087          * any, but C is fun!
 1088          */
 1089         memset(&ent, 0, sizeof(ent));
 1090 retry:
 1091         ret = 0;
 1092         while (likely(i < nr)) {
 1093                 ret = aio_read_evt(ctx, &ent);
 1094                 if (unlikely(ret <= 0))
 1095                         break;
 1096 
 1097                 dprintk("read event: %Lx %Lx %Lx %Lx\n",
 1098                         ent.data, ent.obj, ent.res, ent.res2);
 1099 
 1100                 /* Could we split the check in two? */
 1101                 ret = -EFAULT;
 1102                 if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
 1103                         dprintk("aio: lost an event due to EFAULT.\n");
 1104                         break;
 1105                 }
 1106                 ret = 0;
 1107 
 1108                 /* Good, event copied to userland, update counts. */
 1109                 event ++;
 1110                 i ++;
 1111         }
 1112 
 1113         if (min_nr <= i)
 1114                 return i;
 1115         if (ret)
 1116                 return ret;
 1117 
 1118         /* End fast path */
 1119 
 1120         /* racey check, but it gets redone */
 1121         if (!retry && unlikely(!list_empty(&ctx->run_list))) {
 1122                 retry = 1;
 1123                 aio_run_all_iocbs(ctx);
 1124                 goto retry;
 1125         }
 1126 
 1127         init_timeout(&to);
 1128         if (timeout) {
 1129                 struct timespec ts;
 1130                 ret = -EFAULT;
 1131                 if (unlikely(copy_from_user(&ts, timeout, sizeof(ts))))
 1132                         goto out;
 1133 
 1134                 set_timeout(start_jiffies, &to, &ts);
 1135         }
 1136 
 1137         while (likely(i < nr)) {
 1138                 add_wait_queue_exclusive(&ctx->wait, &wait);
 1139                 do {
 1140                         set_task_state(tsk, TASK_INTERRUPTIBLE);
 1141                         ret = aio_read_evt(ctx, &ent);
 1142                         if (ret)
 1143                                 break;
 1144                         if (min_nr <= i)
 1145                                 break;
 1146                         if (unlikely(ctx->dead)) {
 1147                                 ret = -EINVAL;
 1148                                 break;
 1149                         }
 1150                         if (to.timed_out)       /* Only check after read evt */
 1151                                 break;
 1152                         /* Try to only show up in io wait if there are ops
 1153                          *  in flight */
 1154                         if (ctx->reqs_active)
 1155                                 io_schedule();
 1156                         else
 1157                                 schedule();
 1158                         if (signal_pending(tsk)) {
 1159                                 ret = -EINTR;
 1160                                 break;
 1161                         }
 1162                         /*ret = aio_read_evt(ctx, &ent);*/
 1163                 } while (1) ;
 1164 
 1165                 set_task_state(tsk, TASK_RUNNING);
 1166                 remove_wait_queue(&ctx->wait, &wait);
 1167 
 1168                 if (unlikely(ret <= 0))
 1169                         break;
 1170 
 1171                 ret = -EFAULT;
 1172                 if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {
 1173                         dprintk("aio: lost an event due to EFAULT.\n");
 1174                         break;
 1175                 }
 1176 
 1177                 /* Good, event copied to userland, update counts. */
 1178                 event ++;
 1179                 i ++;
 1180         }
 1181 
 1182         if (timeout)
 1183                 clear_timeout(&to);
 1184 out:
 1185         destroy_timer_on_stack(&to.timer);
 1186         return i ? i : ret;
 1187 }
 1188 
 1189 /* Take an ioctx and remove it from the list of ioctx's.  Protects 
 1190  * against races with itself via ->dead.
 1191  */
 1192 static void io_destroy(struct kioctx *ioctx)
 1193 {
 1194         struct mm_struct *mm = current->mm;
 1195         int was_dead;
 1196 
 1197         /* delete the entry from the list is someone else hasn't already */
 1198         spin_lock(&mm->ioctx_lock);
 1199         was_dead = ioctx->dead;
 1200         ioctx->dead = 1;
 1201         hlist_del_rcu(&ioctx->list);
 1202         spin_unlock(&mm->ioctx_lock);
 1203 
 1204         dprintk("aio_release(%p)\n", ioctx);
 1205         if (likely(!was_dead))
 1206                 put_ioctx(ioctx);       /* twice for the list */
 1207 
 1208         kill_ctx(ioctx);
 1209 
 1210         /*
 1211          * Wake up any waiters.  The setting of ctx->dead must be seen
 1212          * by other CPUs at this point.  Right now, we rely on the
 1213          * locking done by the above calls to ensure this consistency.
 1214          */
 1215         wake_up_all(&ioctx->wait);
 1216 }
 1217 
 1218 /* sys_io_setup:
 1219  *      Create an aio_context capable of receiving at least nr_events.
 1220  *      ctxp must not point to an aio_context that already exists, and
 1221  *      must be initialized to 0 prior to the call.  On successful
 1222  *      creation of the aio_context, *ctxp is filled in with the resulting 
 1223  *      handle.  May fail with -EINVAL if *ctxp is not initialized,
 1224  *      if the specified nr_events exceeds internal limits.  May fail 
 1225  *      with -EAGAIN if the specified nr_events exceeds the user's limit 
 1226  *      of available events.  May fail with -ENOMEM if insufficient kernel
 1227  *      resources are available.  May fail with -EFAULT if an invalid
 1228  *      pointer is passed for ctxp.  Will fail with -ENOSYS if not
 1229  *      implemented.
 1230  */
 1231 SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 1232 {
 1233         struct kioctx *ioctx = NULL;
 1234         unsigned long ctx;
 1235         long ret;
 1236 
 1237         ret = get_user(ctx, ctxp);
 1238         if (unlikely(ret))
 1239                 goto out;
 1240 
 1241         ret = -EINVAL;
 1242         if (unlikely(ctx || nr_events == 0)) {
 1243                 pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
 1244                          ctx, nr_events);
 1245                 goto out;
 1246         }
 1247 
 1248         ioctx = ioctx_alloc(nr_events);
 1249         ret = PTR_ERR(ioctx);
 1250         if (!IS_ERR(ioctx)) {
 1251                 ret = put_user(ioctx->user_id, ctxp);
 1252                 if (ret)
 1253                         io_destroy(ioctx);
 1254                 put_ioctx(ioctx);
 1255         }
 1256 
 1257 out:
 1258         return ret;
 1259 }
 1260 
 1261 /* sys_io_destroy:
 1262  *      Destroy the aio_context specified.  May cancel any outstanding 
 1263  *      AIOs and block on completion.  Will fail with -ENOSYS if not
 1264  *      implemented.  May fail with -EINVAL if the context pointed to
 1265  *      is invalid.
 1266  */
 1267 SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
 1268 {
 1269         struct kioctx *ioctx = lookup_ioctx(ctx);
 1270         if (likely(NULL != ioctx)) {
 1271                 io_destroy(ioctx);
 1272                 put_ioctx(ioctx);
 1273                 return 0;
 1274         }
 1275         pr_debug("EINVAL: io_destroy: invalid context id\n");
 1276         return -EINVAL;
 1277 }
 1278 
 1279 static void aio_advance_iovec(struct kiocb *iocb, ssize_t ret)
 1280 {
 1281         struct iovec *iov = &iocb->ki_iovec[iocb->ki_cur_seg];
 1282 
 1283         BUG_ON(ret <= 0);
 1284 
 1285         while (iocb->ki_cur_seg < iocb->ki_nr_segs && ret > 0) {
 1286                 ssize_t this = min((ssize_t)iov->iov_len, ret);
 1287                 iov->iov_base += this;
 1288                 iov->iov_len -= this;
 1289                 iocb->ki_left -= this;
 1290                 ret -= this;
 1291                 if (iov->iov_len == 0) {
 1292                         iocb->ki_cur_seg++;
 1293                         iov++;
 1294                 }
 1295         }
 1296 
 1297         /* the caller should not have done more io than what fit in
 1298          * the remaining iovecs */
 1299         BUG_ON(ret > 0 && iocb->ki_left == 0);
 1300 }
 1301 
 1302 static ssize_t aio_rw_vect_retry(struct kiocb *iocb)
 1303 {
 1304         struct file *file = iocb->ki_filp;
 1305         struct address_space *mapping = file->f_mapping;
 1306         struct inode *inode = mapping->host;
 1307         ssize_t (*rw_op)(struct kiocb *, const struct iovec *,
 1308                          unsigned long, loff_t);
 1309         ssize_t ret = 0;
 1310         unsigned short opcode;
 1311 
 1312         if ((iocb->ki_opcode == IOCB_CMD_PREADV) ||
 1313                 (iocb->ki_opcode == IOCB_CMD_PREAD)) {
 1314                 rw_op = file->f_op->aio_read;
 1315                 opcode = IOCB_CMD_PREADV;
 1316         } else {
 1317                 rw_op = file->f_op->aio_write;
 1318                 opcode = IOCB_CMD_PWRITEV;
 1319         }
 1320 
 1321         /* This matches the pread()/pwrite() logic */
 1322         if (iocb->ki_pos < 0)
 1323                 return -EINVAL;
 1324 
 1325         do {
 1326                 ret = rw_op(iocb, &iocb->ki_iovec[iocb->ki_cur_seg],
 1327                             iocb->ki_nr_segs - iocb->ki_cur_seg,
 1328                             iocb->ki_pos);
 1329                 if (ret > 0)
 1330                         aio_advance_iovec(iocb, ret);
 1331 
 1332         /* retry all partial writes.  retry partial reads as long as its a
 1333          * regular file. */
 1334         } while (ret > 0 && iocb->ki_left > 0 &&
 1335                  (opcode == IOCB_CMD_PWRITEV ||
 1336                   (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))));
 1337 
 1338         /* This means we must have transferred all that we could */
 1339         /* No need to retry anymore */
 1340         if ((ret == 0) || (iocb->ki_left == 0))
 1341                 ret = iocb->ki_nbytes - iocb->ki_left;
 1342 
 1343         /* If we managed to write some out we return that, rather than
 1344          * the eventual error. */
 1345         if (opcode == IOCB_CMD_PWRITEV
 1346             && ret < 0 && ret != -EIOCBQUEUED && ret != -EIOCBRETRY
 1347             && iocb->ki_nbytes - iocb->ki_left)
 1348                 ret = iocb->ki_nbytes - iocb->ki_left;
 1349 
 1350         return ret;
 1351 }
 1352 
 1353 static ssize_t aio_fdsync(struct kiocb *iocb)
 1354 {
 1355         struct file *file = iocb->ki_filp;
 1356         ssize_t ret = -EINVAL;
 1357 
 1358         if (file->f_op->aio_fsync)
 1359                 ret = file->f_op->aio_fsync(iocb, 1);
 1360         return ret;
 1361 }
 1362 
 1363 static ssize_t aio_fsync(struct kiocb *iocb)
 1364 {
 1365         struct file *file = iocb->ki_filp;
 1366         ssize_t ret = -EINVAL;
 1367 
 1368         if (file->f_op->aio_fsync)
 1369                 ret = file->f_op->aio_fsync(iocb, 0);
 1370         return ret;
 1371 }
 1372 
 1373 static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb, bool compat)
 1374 {
 1375         ssize_t ret;
 1376 
 1377 #ifdef CONFIG_COMPAT
 1378         if (compat)
 1379                 ret = compat_rw_copy_check_uvector(type,
 1380                                 (struct compat_iovec __user *)kiocb->ki_buf,
 1381                                 kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
 1382                                 &kiocb->ki_iovec);
 1383         else
 1384 #endif
 1385                 ret = rw_copy_check_uvector(type,
 1386                                 (struct iovec __user *)kiocb->ki_buf,
 1387                                 kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
 1388                                 &kiocb->ki_iovec);
 1389         if (ret < 0)
 1390                 goto out;
 1391 
 1392         ret = rw_verify_area(type, kiocb->ki_filp, &kiocb->ki_pos, ret);
 1393         if (ret < 0)
 1394                 goto out;
 1395 
 1396         kiocb->ki_nr_segs = kiocb->ki_nbytes;
 1397         kiocb->ki_cur_seg = 0;
 1398         /* ki_nbytes/left now reflect bytes instead of segs */
 1399         kiocb->ki_nbytes = ret;
 1400         kiocb->ki_left = ret;
 1401 
 1402         ret = 0;
 1403 out:
 1404         return ret;
 1405 }
 1406 
 1407 static ssize_t aio_setup_single_vector(int type, struct file * file, struct kiocb *kiocb)
 1408 {
 1409         int bytes;
 1410 
 1411         bytes = rw_verify_area(type, file, &kiocb->ki_pos, kiocb->ki_left);
 1412         if (bytes < 0)
 1413                 return bytes;
 1414 
 1415         kiocb->ki_iovec = &kiocb->ki_inline_vec;
 1416         kiocb->ki_iovec->iov_base = kiocb->ki_buf;
 1417         kiocb->ki_iovec->iov_len = bytes;
 1418         kiocb->ki_nr_segs = 1;
 1419         kiocb->ki_cur_seg = 0;
 1420         return 0;
 1421 }
 1422 
 1423 /*
 1424  * aio_setup_iocb:
 1425  *      Performs the initial checks and aio retry method
 1426  *      setup for the kiocb at the time of io submission.
 1427  */
 1428 static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
 1429 {
 1430         struct file *file = kiocb->ki_filp;
 1431         ssize_t ret = 0;
 1432 
 1433         switch (kiocb->ki_opcode) {
 1434         case IOCB_CMD_PREAD:
 1435                 ret = -EBADF;
 1436                 if (unlikely(!(file->f_mode & FMODE_READ)))
 1437                         break;
 1438                 ret = -EFAULT;
 1439                 if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
 1440                         kiocb->ki_left)))
 1441                         break;
 1442                 ret = aio_setup_single_vector(READ, file, kiocb);
 1443                 if (ret)
 1444                         break;
 1445                 ret = -EINVAL;
 1446                 if (file->f_op->aio_read)
 1447                         kiocb->ki_retry = aio_rw_vect_retry;
 1448                 break;
 1449         case IOCB_CMD_PWRITE:
 1450                 ret = -EBADF;
 1451                 if (unlikely(!(file->f_mode & FMODE_WRITE)))
 1452                         break;
 1453                 ret = -EFAULT;
 1454                 if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
 1455                         kiocb->ki_left)))
 1456                         break;
 1457                 ret = aio_setup_single_vector(WRITE, file, kiocb);
 1458                 if (ret)
 1459                         break;
 1460                 ret = -EINVAL;
 1461                 if (file->f_op->aio_write)
 1462                         kiocb->ki_retry = aio_rw_vect_retry;
 1463                 break;
 1464         case IOCB_CMD_PREADV:
 1465                 ret = -EBADF;
 1466                 if (unlikely(!(file->f_mode & FMODE_READ)))
 1467                         break;
 1468                 ret = aio_setup_vectored_rw(READ, kiocb, compat);
 1469                 if (ret)
 1470                         break;
 1471                 ret = -EINVAL;
 1472                 if (file->f_op->aio_read)
 1473                         kiocb->ki_retry = aio_rw_vect_retry;
 1474                 break;
 1475         case IOCB_CMD_PWRITEV:
 1476                 ret = -EBADF;
 1477                 if (unlikely(!(file->f_mode & FMODE_WRITE)))
 1478                         break;
 1479                 ret = aio_setup_vectored_rw(WRITE, kiocb, compat);
 1480                 if (ret)
 1481                         break;
 1482                 ret = -EINVAL;
 1483                 if (file->f_op->aio_write)
 1484                         kiocb->ki_retry = aio_rw_vect_retry;
 1485                 break;
 1486         case IOCB_CMD_FDSYNC:
 1487                 ret = -EINVAL;
 1488                 if (file->f_op->aio_fsync)
 1489                         kiocb->ki_retry = aio_fdsync;
 1490                 break;
 1491         case IOCB_CMD_FSYNC:
 1492                 ret = -EINVAL;
 1493                 if (file->f_op->aio_fsync)
 1494                         kiocb->ki_retry = aio_fsync;
 1495                 break;
 1496         default:
 1497                 dprintk("EINVAL: io_submit: no operation provided\n");
 1498                 ret = -EINVAL;
 1499         }
 1500 
 1501         if (!kiocb->ki_retry)
 1502                 return ret;
 1503 
 1504         return 0;
 1505 }
 1506 
 1507 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 1508                          struct iocb *iocb, struct kiocb_batch *batch,
 1509                          bool compat)
 1510 {
 1511         struct kiocb *req;
 1512         struct file *file;
 1513         ssize_t ret;
 1514 
 1515         /* enforce forwards compatibility on users */
 1516         if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2)) {
 1517                 pr_debug("EINVAL: io_submit: reserve field set\n");
 1518                 return -EINVAL;
 1519         }
 1520 
 1521         /* prevent overflows */
 1522         if (unlikely(
 1523             (iocb->aio_buf != (unsigned long)iocb->aio_buf) ||
 1524             (iocb->aio_nbytes != (size_t)iocb->aio_nbytes) ||
 1525             ((ssize_t)iocb->aio_nbytes < 0)
 1526            )) {
 1527                 pr_debug("EINVAL: io_submit: overflow check\n");
 1528                 return -EINVAL;
 1529         }
 1530 
 1531         file = fget(iocb->aio_fildes);
 1532         if (unlikely(!file))
 1533                 return -EBADF;
 1534 
 1535         req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
 1536         if (unlikely(!req)) {
 1537                 fput(file);
 1538                 return -EAGAIN;
 1539         }
 1540         req->ki_filp = file;
 1541         if (iocb->aio_flags & IOCB_FLAG_RESFD) {
 1542                 /*
 1543                  * If the IOCB_FLAG_RESFD flag of aio_flags is set, get an
 1544                  * instance of the file* now. The file descriptor must be
 1545                  * an eventfd() fd, and will be signaled for each completed
 1546                  * event using the eventfd_signal() function.
 1547                  */
 1548                 req->ki_eventfd = eventfd_ctx_fdget((int) iocb->aio_resfd);
 1549                 if (IS_ERR(req->ki_eventfd)) {
 1550                         ret = PTR_ERR(req->ki_eventfd);
 1551                         req->ki_eventfd = NULL;
 1552                         goto out_put_req;
 1553                 }
 1554         }
 1555 
 1556         ret = put_user(req->ki_key, &user_iocb->aio_key);
 1557         if (unlikely(ret)) {
 1558                 dprintk("EFAULT: aio_key\n");
 1559                 goto out_put_req;
 1560         }
 1561 
 1562         req->ki_obj.user = user_iocb;
 1563         req->ki_user_data = iocb->aio_data;
 1564         req->ki_pos = iocb->aio_offset;
 1565 
 1566         req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
 1567         req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
 1568         req->ki_opcode = iocb->aio_lio_opcode;
 1569 
 1570         ret = aio_setup_iocb(req, compat);
 1571 
 1572         if (ret)
 1573                 goto out_put_req;
 1574 
 1575         spin_lock_irq(&ctx->ctx_lock);
 1576         /*
 1577          * We could have raced with io_destroy() and are currently holding a
 1578          * reference to ctx which should be destroyed. We cannot submit IO
 1579          * since ctx gets freed as soon as io_submit() puts its reference.  The
 1580          * check here is reliable: io_destroy() sets ctx->dead before waiting
 1581          * for outstanding IO and the barrier between these two is realized by
 1582          * unlock of mm->ioctx_lock and lock of ctx->ctx_lock.  Analogously we
 1583          * increment ctx->reqs_active before checking for ctx->dead and the
 1584          * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
 1585          * don't see ctx->dead set here, io_destroy() waits for our IO to
 1586          * finish.
 1587          */
 1588         if (ctx->dead) {
 1589                 spin_unlock_irq(&ctx->ctx_lock);
 1590                 ret = -EINVAL;
 1591                 goto out_put_req;
 1592         }
 1593         aio_run_iocb(req);
 1594         if (!list_empty(&ctx->run_list)) {
 1595                 /* drain the run list */
 1596                 while (__aio_run_iocbs(ctx))
 1597                         ;
 1598         }
 1599         spin_unlock_irq(&ctx->ctx_lock);
 1600 
 1601         aio_put_req(req);       /* drop extra ref to req */
 1602         return 0;
 1603 
 1604 out_put_req:
 1605         aio_put_req(req);       /* drop extra ref to req */
 1606         aio_put_req(req);       /* drop i/o ref to req */
 1607         return ret;
 1608 }
 1609 
 1610 long do_io_submit(aio_context_t ctx_id, long nr,
 1611                   struct iocb __user *__user *iocbpp, bool compat)
 1612 {
 1613         struct kioctx *ctx;
 1614         long ret = 0;
 1615         int i = 0;
 1616         struct blk_plug plug;
 1617         struct kiocb_batch batch;
 1618 
 1619         if (unlikely(nr < 0))
 1620                 return -EINVAL;
 1621 
 1622         if (unlikely(nr > LONG_MAX/sizeof(*iocbpp)))
 1623                 nr = LONG_MAX/sizeof(*iocbpp);
 1624 
 1625         if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
 1626                 return -EFAULT;
 1627 
 1628         ctx = lookup_ioctx(ctx_id);
 1629         if (unlikely(!ctx)) {
 1630                 pr_debug("EINVAL: io_submit: invalid context id\n");
 1631                 return -EINVAL;
 1632         }
 1633 
 1634         kiocb_batch_init(&batch, nr);
 1635 
 1636         blk_start_plug(&plug);
 1637 
 1638         /*
 1639          * AKPM: should this return a partial result if some of the IOs were
 1640          * successfully submitted?
 1641          */
 1642         for (i=0; i<nr; i++) {
 1643                 struct iocb __user *user_iocb;
 1644                 struct iocb tmp;
 1645 
 1646                 if (unlikely(__get_user(user_iocb, iocbpp + i))) {
 1647                         ret = -EFAULT;
 1648                         break;
 1649                 }
 1650 
 1651                 if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
 1652                         ret = -EFAULT;
 1653                         break;
 1654                 }
 1655 
 1656                 ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
 1657                 if (ret)
 1658                         break;
 1659         }
 1660         blk_finish_plug(&plug);
 1661 
 1662         kiocb_batch_free(ctx, &batch);
 1663         put_ioctx(ctx);
 1664         return i ? i : ret;
 1665 }
 1666 
 1667 /* sys_io_submit:
 1668  *      Queue the nr iocbs pointed to by iocbpp for processing.  Returns
 1669  *      the number of iocbs queued.  May return -EINVAL if the aio_context
 1670  *      specified by ctx_id is invalid, if nr is < 0, if the iocb at
 1671  *      *iocbpp[0] is not properly initialized, if the operation specified
 1672  *      is invalid for the file descriptor in the iocb.  May fail with
 1673  *      -EFAULT if any of the data structures point to invalid data.  May
 1674  *      fail with -EBADF if the file descriptor specified in the first
 1675  *      iocb is invalid.  May fail with -EAGAIN if insufficient resources
 1676  *      are available to queue any iocbs.  Will return 0 if nr is 0.  Will
 1677  *      fail with -ENOSYS if not implemented.
 1678  */
 1679 SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
 1680                 struct iocb __user * __user *, iocbpp)
 1681 {
 1682         return do_io_submit(ctx_id, nr, iocbpp, 0);
 1683 }
 1684 
 1685 /* lookup_kiocb
 1686  *      Finds a given iocb for cancellation.
 1687  */
 1688 static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
 1689                                   u32 key)
 1690 {
 1691         struct list_head *pos;
 1692 
 1693         assert_spin_locked(&ctx->ctx_lock);
 1694 
 1695         /* TODO: use a hash or array, this sucks. */
 1696         list_for_each(pos, &ctx->active_reqs) {
 1697                 struct kiocb *kiocb = list_kiocb(pos);
 1698                 if (kiocb->ki_obj.user == iocb && kiocb->ki_key == key)
 1699                         return kiocb;
 1700         }
 1701         return NULL;
 1702 }
 1703 
 1704 /* sys_io_cancel:
 1705  *      Attempts to cancel an iocb previously passed to io_submit.  If
 1706  *      the operation is successfully cancelled, the resulting event is
 1707  *      copied into the memory pointed to by result without being placed
 1708  *      into the completion queue and 0 is returned.  May fail with
 1709  *      -EFAULT if any of the data structures pointed to are invalid.
 1710  *      May fail with -EINVAL if aio_context specified by ctx_id is
 1711  *      invalid.  May fail with -EAGAIN if the iocb specified was not
 1712  *      cancelled.  Will fail with -ENOSYS if not implemented.
 1713  */
 1714 SYSCALL_DEFINE3(io_cancel, aio_context_t, ctx_id, struct iocb __user *, iocb,
 1715                 struct io_event __user *, result)
 1716 {
 1717         int (*cancel)(struct kiocb *iocb, struct io_event *res);
 1718         struct kioctx *ctx;
 1719         struct kiocb *kiocb;
 1720         u32 key;
 1721         int ret;
 1722 
 1723         ret = get_user(key, &iocb->aio_key);
 1724         if (unlikely(ret))
 1725                 return -EFAULT;
 1726 
 1727         ctx = lookup_ioctx(ctx_id);
 1728         if (unlikely(!ctx))
 1729                 return -EINVAL;
 1730 
 1731         spin_lock_irq(&ctx->ctx_lock);
 1732         ret = -EAGAIN;
 1733         kiocb = lookup_kiocb(ctx, iocb, key);
 1734         if (kiocb && kiocb->ki_cancel) {
 1735                 cancel = kiocb->ki_cancel;
 1736                 kiocb->ki_users ++;
 1737                 kiocbSetCancelled(kiocb);
 1738         } else
 1739                 cancel = NULL;
 1740         spin_unlock_irq(&ctx->ctx_lock);
 1741 
 1742         if (NULL != cancel) {
 1743                 struct io_event tmp;
 1744                 pr_debug("calling cancel\n");
 1745                 memset(&tmp, 0, sizeof(tmp));
 1746                 tmp.obj = (u64)(unsigned long)kiocb->ki_obj.user;
 1747                 tmp.data = kiocb->ki_user_data;
 1748                 ret = cancel(kiocb, &tmp);
 1749                 if (!ret) {
 1750                         /* Cancellation succeeded -- copy the result
 1751                          * into the user's buffer.
 1752                          */
 1753                         if (copy_to_user(result, &tmp, sizeof(tmp)))
 1754                                 ret = -EFAULT;
 1755                 }
 1756         } else
 1757                 ret = -EINVAL;
 1758 
 1759         put_ioctx(ctx);
 1760 
 1761         return ret;
 1762 }
 1763 
 1764 /* io_getevents:
 1765  *      Attempts to read at least min_nr events and up to nr events from
 1766  *      the completion queue for the aio_context specified by ctx_id. If
 1767  *      it succeeds, the number of read events is returned. May fail with
 1768  *      -EINVAL if ctx_id is invalid, if min_nr is out of range, if nr is
 1769  *      out of range, if timeout is out of range.  May fail with -EFAULT
 1770  *      if any of the memory specified is invalid.  May return 0 or
 1771  *      < min_nr if the timeout specified by timeout has elapsed
 1772  *      before sufficient events are available, where timeout == NULL
 1773  *      specifies an infinite timeout. Note that the timeout pointed to by
 1774  *      timeout is relative and will be updated if not NULL and the
 1775  *      operation blocks. Will fail with -ENOSYS if not implemented.
 1776  */
 1777 SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
 1778                 long, min_nr,
 1779                 long, nr,
 1780                 struct io_event __user *, events,
 1781                 struct timespec __user *, timeout)
 1782 {
 1783         struct kioctx *ioctx = lookup_ioctx(ctx_id);
 1784         long ret = -EINVAL;
 1785 
 1786         if (likely(ioctx)) {
 1787                 if (likely(min_nr <= nr && min_nr >= 0))
 1788                         ret = read_events(ioctx, min_nr, nr, events, timeout);
 1789                 put_ioctx(ioctx);
 1790         }
 1791 
 1792         asmlinkage_protect(5, ret, ctx_id, min_nr, nr, events, timeout);
 1793         return ret;
 1794 }

Cache object: a38481954339695572b18658a289df8c


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.