FreeBSD/Linux Kernel Cross Reference
sys/kern/vfs_aio.c
1 /*-
2 * Copyright (c) 1997 John S. Dyson. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. John S. Dyson's name may not be used to endorse or promote products
10 * derived from this software without specific prior written permission.
11 *
12 * DISCLAIMER: This code isn't warranted to do anything useful. Anything
13 * bad that happens because of using this software isn't the responsibility
14 * of the author. This software is distributed AS-IS.
15 */
16
17 /*
18 * This file contains support for the POSIX 1003.1B AIO/LIO facility.
19 */
20
21 #include <sys/cdefs.h>
22 __FBSDID("$FreeBSD: releng/5.4/sys/kern/vfs_aio.c 145335 2005-04-20 19:11:07Z cvs2svn $");
23
24 #include <sys/param.h>
25 #include <sys/systm.h>
26 #include <sys/malloc.h>
27 #include <sys/bio.h>
28 #include <sys/buf.h>
29 #include <sys/eventhandler.h>
30 #include <sys/sysproto.h>
31 #include <sys/filedesc.h>
32 #include <sys/kernel.h>
33 #include <sys/module.h>
34 #include <sys/kthread.h>
35 #include <sys/fcntl.h>
36 #include <sys/file.h>
37 #include <sys/limits.h>
38 #include <sys/lock.h>
39 #include <sys/mutex.h>
40 #include <sys/unistd.h>
41 #include <sys/proc.h>
42 #include <sys/resourcevar.h>
43 #include <sys/signalvar.h>
44 #include <sys/protosw.h>
45 #include <sys/socketvar.h>
46 #include <sys/syscall.h>
47 #include <sys/sysent.h>
48 #include <sys/sysctl.h>
49 #include <sys/sx.h>
50 #include <sys/vnode.h>
51 #include <sys/conf.h>
52 #include <sys/event.h>
53
54 #include <posix4/posix4.h>
55 #include <vm/vm.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/uma.h>
60 #include <sys/aio.h>
61
62 #include "opt_vfs_aio.h"
63
64 NET_NEEDS_GIANT("aio");
65
66 /*
67 * Counter for allocating reference ids to new jobs. Wrapped to 1 on
68 * overflow.
69 */
70 static long jobrefid;
71
72 #define JOBST_NULL 0x0
73 #define JOBST_JOBQGLOBAL 0x2
74 #define JOBST_JOBRUNNING 0x3
75 #define JOBST_JOBFINISHED 0x4
76 #define JOBST_JOBQBUF 0x5
77 #define JOBST_JOBBFINISHED 0x6
78
79 #ifndef MAX_AIO_PER_PROC
80 #define MAX_AIO_PER_PROC 32
81 #endif
82
83 #ifndef MAX_AIO_QUEUE_PER_PROC
84 #define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
85 #endif
86
87 #ifndef MAX_AIO_PROCS
88 #define MAX_AIO_PROCS 32
89 #endif
90
91 #ifndef MAX_AIO_QUEUE
92 #define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
93 #endif
94
95 #ifndef TARGET_AIO_PROCS
96 #define TARGET_AIO_PROCS 4
97 #endif
98
99 #ifndef MAX_BUF_AIO
100 #define MAX_BUF_AIO 16
101 #endif
102
103 #ifndef AIOD_TIMEOUT_DEFAULT
104 #define AIOD_TIMEOUT_DEFAULT (10 * hz)
105 #endif
106
107 #ifndef AIOD_LIFETIME_DEFAULT
108 #define AIOD_LIFETIME_DEFAULT (30 * hz)
109 #endif
110
111 SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management");
112
113 static int max_aio_procs = MAX_AIO_PROCS;
114 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs,
115 CTLFLAG_RW, &max_aio_procs, 0,
116 "Maximum number of kernel threads to use for handling async IO ");
117
118 static int num_aio_procs = 0;
119 SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_procs,
120 CTLFLAG_RD, &num_aio_procs, 0,
121 "Number of presently active kernel threads for async IO");
122
123 /*
124 * The code will adjust the actual number of AIO processes towards this
125 * number when it gets a chance.
126 */
127 static int target_aio_procs = TARGET_AIO_PROCS;
128 SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs,
129 0, "Preferred number of ready kernel threads for async IO");
130
131 static int max_queue_count = MAX_AIO_QUEUE;
132 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, CTLFLAG_RW, &max_queue_count, 0,
133 "Maximum number of aio requests to queue, globally");
134
135 static int num_queue_count = 0;
136 SYSCTL_INT(_vfs_aio, OID_AUTO, num_queue_count, CTLFLAG_RD, &num_queue_count, 0,
137 "Number of queued aio requests");
138
139 static int num_buf_aio = 0;
140 SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0,
141 "Number of aio requests presently handled by the buf subsystem");
142
143 /* Number of async I/O thread in the process of being started */
144 /* XXX This should be local to _aio_aqueue() */
145 static int num_aio_resv_start = 0;
146
147 static int aiod_timeout;
148 SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, CTLFLAG_RW, &aiod_timeout, 0,
149 "Timeout value for synchronous aio operations");
150
151 static int aiod_lifetime;
152 SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
153 "Maximum lifetime for idle aiod");
154
155 static int unloadable = 0;
156 SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
157 "Allow unload of aio (not recommended)");
158
159
160 static int max_aio_per_proc = MAX_AIO_PER_PROC;
161 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
162 0, "Maximum active aio requests per process (stored in the process)");
163
164 static int max_aio_queue_per_proc = MAX_AIO_QUEUE_PER_PROC;
165 SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue_per_proc, CTLFLAG_RW,
166 &max_aio_queue_per_proc, 0,
167 "Maximum queued aio requests per process (stored in the process)");
168
169 static int max_buf_aio = MAX_BUF_AIO;
170 SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0,
171 "Maximum buf aio requests per process (stored in the process)");
172
173 struct aiocblist {
174 TAILQ_ENTRY(aiocblist) list; /* List of jobs */
175 TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */
176 int jobflags;
177 int jobstate;
178 int inputcharge;
179 int outputcharge;
180 struct callout_handle timeouthandle;
181 struct buf *bp; /* Buffer pointer */
182 struct proc *userproc; /* User process */ /* Not td! */
183 struct ucred *cred; /* Active credential when created */
184 struct file *fd_file; /* Pointer to file structure */
185 struct aio_liojob *lio; /* Optional lio job */
186 struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */
187 struct knlist klist; /* list of knotes */
188 struct aiocb uaiocb; /* Kernel I/O control block */
189 };
190
191 /* jobflags */
192 #define AIOCBLIST_RUNDOWN 0x4
193 #define AIOCBLIST_DONE 0x10
194
195 /*
196 * AIO process info
197 */
198 #define AIOP_FREE 0x1 /* proc on free queue */
199 #define AIOP_SCHED 0x2 /* proc explicitly scheduled */
200
201 struct aiothreadlist {
202 int aiothreadflags; /* AIO proc flags */
203 TAILQ_ENTRY(aiothreadlist) list; /* List of processes */
204 struct thread *aiothread; /* The AIO thread */
205 };
206
207 /*
208 * data-structure for lio signal management
209 */
210 struct aio_liojob {
211 int lioj_flags;
212 int lioj_buffer_count;
213 int lioj_buffer_finished_count;
214 int lioj_queue_count;
215 int lioj_queue_finished_count;
216 struct sigevent lioj_signal; /* signal on all I/O done */
217 TAILQ_ENTRY(aio_liojob) lioj_list;
218 struct kaioinfo *lioj_ki;
219 };
220 #define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */
221 #define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */
222
223 /*
224 * per process aio data structure
225 */
226 struct kaioinfo {
227 int kaio_flags; /* per process kaio flags */
228 int kaio_maxactive_count; /* maximum number of AIOs */
229 int kaio_active_count; /* number of currently used AIOs */
230 int kaio_qallowed_count; /* maxiumu size of AIO queue */
231 int kaio_queue_count; /* size of AIO queue */
232 int kaio_ballowed_count; /* maximum number of buffers */
233 int kaio_queue_finished_count; /* number of daemon jobs finished */
234 int kaio_buffer_count; /* number of physio buffers */
235 int kaio_buffer_finished_count; /* count of I/O done */
236 struct proc *kaio_p; /* process that uses this kaio block */
237 TAILQ_HEAD(,aio_liojob) kaio_liojoblist; /* list of lio jobs */
238 TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */
239 TAILQ_HEAD(,aiocblist) kaio_jobdone; /* done queue for process */
240 TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */
241 TAILQ_HEAD(,aiocblist) kaio_bufdone; /* buffer done queue for process */
242 TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */
243 };
244
245 #define KAIO_RUNDOWN 0x1 /* process is being run down */
246 #define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */
247
248 static TAILQ_HEAD(,aiothreadlist) aio_activeproc; /* Active daemons */
249 static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */
250 static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */
251 static TAILQ_HEAD(,aiocblist) aio_bufjobs; /* Phys I/O job list */
252
253 static void aio_init_aioinfo(struct proc *p);
254 static void aio_onceonly(void);
255 static int aio_free_entry(struct aiocblist *aiocbe);
256 static void aio_process(struct aiocblist *aiocbe);
257 static int aio_newproc(void);
258 static int aio_aqueue(struct thread *td, struct aiocb *job, int type);
259 static void aio_physwakeup(struct buf *bp);
260 static void aio_proc_rundown(void *arg, struct proc *p);
261 static int aio_fphysio(struct aiocblist *aiocbe);
262 static int aio_qphysio(struct proc *p, struct aiocblist *iocb);
263 static void aio_daemon(void *uproc);
264 static void aio_swake_cb(struct socket *, struct sockbuf *);
265 static int aio_unload(void);
266 static void process_signal(void *aioj);
267 static int filt_aioattach(struct knote *kn);
268 static void filt_aiodetach(struct knote *kn);
269 static int filt_aio(struct knote *kn, long hint);
270
271 /*
272 * Zones for:
273 * kaio Per process async io info
274 * aiop async io thread data
275 * aiocb async io jobs
276 * aiol list io job pointer - internal to aio_suspend XXX
277 * aiolio list io jobs
278 */
279 static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone;
280
281 /* kqueue filters for aio */
282 static struct filterops aio_filtops =
283 { 0, filt_aioattach, filt_aiodetach, filt_aio };
284
285 static eventhandler_tag exit_tag, exec_tag;
286
287 /*
288 * Main operations function for use as a kernel module.
289 */
290 static int
291 aio_modload(struct module *module, int cmd, void *arg)
292 {
293 int error = 0;
294
295 switch (cmd) {
296 case MOD_LOAD:
297 aio_onceonly();
298 break;
299 case MOD_UNLOAD:
300 error = aio_unload();
301 break;
302 case MOD_SHUTDOWN:
303 break;
304 default:
305 error = EINVAL;
306 break;
307 }
308 return (error);
309 }
310
311 static moduledata_t aio_mod = {
312 "aio",
313 &aio_modload,
314 NULL
315 };
316
317 SYSCALL_MODULE_HELPER(aio_return);
318 SYSCALL_MODULE_HELPER(aio_suspend);
319 SYSCALL_MODULE_HELPER(aio_cancel);
320 SYSCALL_MODULE_HELPER(aio_error);
321 SYSCALL_MODULE_HELPER(aio_read);
322 SYSCALL_MODULE_HELPER(aio_write);
323 SYSCALL_MODULE_HELPER(aio_waitcomplete);
324 SYSCALL_MODULE_HELPER(lio_listio);
325
326 DECLARE_MODULE(aio, aio_mod,
327 SI_SUB_VFS, SI_ORDER_ANY);
328 MODULE_VERSION(aio, 1);
329
330 /*
331 * Startup initialization
332 */
333 static void
334 aio_onceonly(void)
335 {
336
337 /* XXX: should probably just use so->callback */
338 aio_swake = &aio_swake_cb;
339 exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
340 EVENTHANDLER_PRI_ANY);
341 exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL,
342 EVENTHANDLER_PRI_ANY);
343 kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
344 TAILQ_INIT(&aio_freeproc);
345 TAILQ_INIT(&aio_activeproc);
346 TAILQ_INIT(&aio_jobs);
347 TAILQ_INIT(&aio_bufjobs);
348 kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL,
349 NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
350 aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL,
351 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
352 aiocb_zone = uma_zcreate("AIOCB", sizeof(struct aiocblist), NULL, NULL,
353 NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
354 aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL,
355 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
356 aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aio_liojob), NULL,
357 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
358 aiod_timeout = AIOD_TIMEOUT_DEFAULT;
359 aiod_lifetime = AIOD_LIFETIME_DEFAULT;
360 jobrefid = 1;
361 async_io_version = _POSIX_VERSION;
362 p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, AIO_LISTIO_MAX);
363 p31b_setcfg(CTL_P1003_1B_AIO_MAX, MAX_AIO_QUEUE);
364 p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, 0);
365 }
366
367 /*
368 * Callback for unload of AIO when used as a module.
369 */
370 static int
371 aio_unload(void)
372 {
373 int error;
374
375 /*
376 * XXX: no unloads by default, it's too dangerous.
377 * perhaps we could do it if locked out callers and then
378 * did an aio_proc_rundown() on each process.
379 */
380 if (!unloadable)
381 return (EOPNOTSUPP);
382
383 error = kqueue_del_filteropts(EVFILT_AIO);
384 if (error)
385 return error;
386
387 async_io_version = 0;
388 aio_swake = NULL;
389 EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
390 EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
391 p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
392 p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
393 p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
394 return (0);
395 }
396
397 /*
398 * Init the per-process aioinfo structure. The aioinfo limits are set
399 * per-process for user limit (resource) management.
400 */
401 static void
402 aio_init_aioinfo(struct proc *p)
403 {
404 struct kaioinfo *ki;
405
406 if (p->p_aioinfo == NULL) {
407 ki = uma_zalloc(kaio_zone, M_WAITOK);
408 p->p_aioinfo = ki;
409 ki->kaio_flags = 0;
410 ki->kaio_maxactive_count = max_aio_per_proc;
411 ki->kaio_active_count = 0;
412 ki->kaio_qallowed_count = max_aio_queue_per_proc;
413 ki->kaio_queue_count = 0;
414 ki->kaio_ballowed_count = max_buf_aio;
415 ki->kaio_buffer_count = 0;
416 ki->kaio_buffer_finished_count = 0;
417 ki->kaio_p = p;
418 TAILQ_INIT(&ki->kaio_jobdone);
419 TAILQ_INIT(&ki->kaio_jobqueue);
420 TAILQ_INIT(&ki->kaio_bufdone);
421 TAILQ_INIT(&ki->kaio_bufqueue);
422 TAILQ_INIT(&ki->kaio_liojoblist);
423 TAILQ_INIT(&ki->kaio_sockqueue);
424 }
425
426 while (num_aio_procs < target_aio_procs)
427 aio_newproc();
428 }
429
430 /*
431 * Free a job entry. Wait for completion if it is currently active, but don't
432 * delay forever. If we delay, we return a flag that says that we have to
433 * restart the queue scan.
434 */
435 static int
436 aio_free_entry(struct aiocblist *aiocbe)
437 {
438 struct kaioinfo *ki;
439 struct aio_liojob *lj;
440 struct proc *p;
441 int error;
442 int s;
443
444 if (aiocbe->jobstate == JOBST_NULL)
445 panic("aio_free_entry: freeing already free job");
446
447 p = aiocbe->userproc;
448 ki = p->p_aioinfo;
449 lj = aiocbe->lio;
450 if (ki == NULL)
451 panic("aio_free_entry: missing p->p_aioinfo");
452
453 while (aiocbe->jobstate == JOBST_JOBRUNNING) {
454 aiocbe->jobflags |= AIOCBLIST_RUNDOWN;
455 tsleep(aiocbe, PRIBIO, "jobwai", 0);
456 }
457 if (aiocbe->bp == NULL) {
458 if (ki->kaio_queue_count <= 0)
459 panic("aio_free_entry: process queue size <= 0");
460 if (num_queue_count <= 0)
461 panic("aio_free_entry: system wide queue size <= 0");
462
463 if (lj) {
464 lj->lioj_queue_count--;
465 if (aiocbe->jobflags & AIOCBLIST_DONE)
466 lj->lioj_queue_finished_count--;
467 }
468 ki->kaio_queue_count--;
469 if (aiocbe->jobflags & AIOCBLIST_DONE)
470 ki->kaio_queue_finished_count--;
471 num_queue_count--;
472 } else {
473 if (lj) {
474 lj->lioj_buffer_count--;
475 if (aiocbe->jobflags & AIOCBLIST_DONE)
476 lj->lioj_buffer_finished_count--;
477 }
478 if (aiocbe->jobflags & AIOCBLIST_DONE)
479 ki->kaio_buffer_finished_count--;
480 ki->kaio_buffer_count--;
481 num_buf_aio--;
482 }
483
484 /* aiocbe is going away, we need to destroy any knotes */
485 /* XXXKSE Note the thread here is used to eventually find the
486 * owning process again, but it is also used to do a fo_close
487 * and that requires the thread. (but does it require the
488 * OWNING thread? (or maybe the running thread?)
489 * There is a semantic problem here...
490 */
491 knlist_delete(&aiocbe->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */
492
493 if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN)
494 && ((ki->kaio_buffer_count == 0) && (ki->kaio_queue_count == 0)))) {
495 ki->kaio_flags &= ~KAIO_WAKEUP;
496 wakeup(p);
497 }
498
499 if (aiocbe->jobstate == JOBST_JOBQBUF) {
500 if ((error = aio_fphysio(aiocbe)) != 0)
501 return (error);
502 if (aiocbe->jobstate != JOBST_JOBBFINISHED)
503 panic("aio_free_entry: invalid physio finish-up state");
504 s = splbio();
505 TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
506 splx(s);
507 } else if (aiocbe->jobstate == JOBST_JOBQGLOBAL) {
508 s = splnet();
509 TAILQ_REMOVE(&aio_jobs, aiocbe, list);
510 TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
511 splx(s);
512 } else if (aiocbe->jobstate == JOBST_JOBFINISHED)
513 TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist);
514 else if (aiocbe->jobstate == JOBST_JOBBFINISHED) {
515 s = splbio();
516 TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
517 splx(s);
518 if (aiocbe->bp) {
519 vunmapbuf(aiocbe->bp);
520 relpbuf(aiocbe->bp, NULL);
521 aiocbe->bp = NULL;
522 }
523 }
524 if (lj && (lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) {
525 TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
526 uma_zfree(aiolio_zone, lj);
527 }
528 aiocbe->jobstate = JOBST_NULL;
529 untimeout(process_signal, aiocbe, aiocbe->timeouthandle);
530 fdrop(aiocbe->fd_file, curthread);
531 crfree(aiocbe->cred);
532 uma_zfree(aiocb_zone, aiocbe);
533 return (0);
534 }
535
536 /*
537 * Rundown the jobs for a given process.
538 */
539 static void
540 aio_proc_rundown(void *arg, struct proc *p)
541 {
542 int s;
543 struct kaioinfo *ki;
544 struct aio_liojob *lj, *ljn;
545 struct aiocblist *aiocbe, *aiocbn;
546 struct file *fp;
547 struct socket *so;
548
549 ki = p->p_aioinfo;
550 if (ki == NULL)
551 return;
552
553 mtx_lock(&Giant);
554 ki->kaio_flags |= LIOJ_SIGNAL_POSTED;
555 while ((ki->kaio_active_count > 0) || (ki->kaio_buffer_count >
556 ki->kaio_buffer_finished_count)) {
557 ki->kaio_flags |= KAIO_RUNDOWN;
558 if (tsleep(p, PRIBIO, "kaiowt", aiod_timeout))
559 break;
560 }
561
562 /*
563 * Move any aio ops that are waiting on socket I/O to the normal job
564 * queues so they are cleaned up with any others.
565 */
566 s = splnet();
567 for (aiocbe = TAILQ_FIRST(&ki->kaio_sockqueue); aiocbe; aiocbe =
568 aiocbn) {
569 aiocbn = TAILQ_NEXT(aiocbe, plist);
570 fp = aiocbe->fd_file;
571 if (fp != NULL) {
572 so = fp->f_data;
573 TAILQ_REMOVE(&so->so_aiojobq, aiocbe, list);
574 if (TAILQ_EMPTY(&so->so_aiojobq)) {
575 SOCKBUF_LOCK(&so->so_snd);
576 so->so_snd.sb_flags &= ~SB_AIO;
577 SOCKBUF_UNLOCK(&so->so_snd);
578 SOCKBUF_LOCK(&so->so_rcv);
579 so->so_rcv.sb_flags &= ~SB_AIO;
580 SOCKBUF_UNLOCK(&so->so_rcv);
581 }
582 }
583 TAILQ_REMOVE(&ki->kaio_sockqueue, aiocbe, plist);
584 TAILQ_INSERT_HEAD(&aio_jobs, aiocbe, list);
585 TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, aiocbe, plist);
586 }
587 splx(s);
588
589 restart1:
590 for (aiocbe = TAILQ_FIRST(&ki->kaio_jobdone); aiocbe; aiocbe = aiocbn) {
591 aiocbn = TAILQ_NEXT(aiocbe, plist);
592 if (aio_free_entry(aiocbe))
593 goto restart1;
594 }
595
596 restart2:
597 for (aiocbe = TAILQ_FIRST(&ki->kaio_jobqueue); aiocbe; aiocbe =
598 aiocbn) {
599 aiocbn = TAILQ_NEXT(aiocbe, plist);
600 if (aio_free_entry(aiocbe))
601 goto restart2;
602 }
603
604 /*
605 * Note the use of lots of splbio here, trying to avoid splbio for long chains
606 * of I/O. Probably unnecessary.
607 */
608 restart3:
609 s = splbio();
610 while (TAILQ_FIRST(&ki->kaio_bufqueue)) {
611 ki->kaio_flags |= KAIO_WAKEUP;
612 tsleep(p, PRIBIO, "aioprn", 0);
613 splx(s);
614 goto restart3;
615 }
616 splx(s);
617
618 restart4:
619 s = splbio();
620 for (aiocbe = TAILQ_FIRST(&ki->kaio_bufdone); aiocbe; aiocbe = aiocbn) {
621 aiocbn = TAILQ_NEXT(aiocbe, plist);
622 if (aio_free_entry(aiocbe)) {
623 splx(s);
624 goto restart4;
625 }
626 }
627 splx(s);
628
629 /*
630 * If we've slept, jobs might have moved from one queue to another.
631 * Retry rundown if we didn't manage to empty the queues.
632 */
633 if (TAILQ_FIRST(&ki->kaio_jobdone) != NULL ||
634 TAILQ_FIRST(&ki->kaio_jobqueue) != NULL ||
635 TAILQ_FIRST(&ki->kaio_bufqueue) != NULL ||
636 TAILQ_FIRST(&ki->kaio_bufdone) != NULL)
637 goto restart1;
638
639 for (lj = TAILQ_FIRST(&ki->kaio_liojoblist); lj; lj = ljn) {
640 ljn = TAILQ_NEXT(lj, lioj_list);
641 if ((lj->lioj_buffer_count == 0) && (lj->lioj_queue_count ==
642 0)) {
643 TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
644 uma_zfree(aiolio_zone, lj);
645 } else {
646 #ifdef DIAGNOSTIC
647 printf("LIO job not cleaned up: B:%d, BF:%d, Q:%d, "
648 "QF:%d\n", lj->lioj_buffer_count,
649 lj->lioj_buffer_finished_count,
650 lj->lioj_queue_count,
651 lj->lioj_queue_finished_count);
652 #endif
653 }
654 }
655
656 uma_zfree(kaio_zone, ki);
657 p->p_aioinfo = NULL;
658 mtx_unlock(&Giant);
659 }
660
661 /*
662 * Select a job to run (called by an AIO daemon).
663 */
664 static struct aiocblist *
665 aio_selectjob(struct aiothreadlist *aiop)
666 {
667 int s;
668 struct aiocblist *aiocbe;
669 struct kaioinfo *ki;
670 struct proc *userp;
671
672 s = splnet();
673 for (aiocbe = TAILQ_FIRST(&aio_jobs); aiocbe; aiocbe =
674 TAILQ_NEXT(aiocbe, list)) {
675 userp = aiocbe->userproc;
676 ki = userp->p_aioinfo;
677
678 if (ki->kaio_active_count < ki->kaio_maxactive_count) {
679 TAILQ_REMOVE(&aio_jobs, aiocbe, list);
680 splx(s);
681 return (aiocbe);
682 }
683 }
684 splx(s);
685
686 return (NULL);
687 }
688
689 /*
690 * The AIO processing activity. This is the code that does the I/O request for
691 * the non-physio version of the operations. The normal vn operations are used,
692 * and this code should work in all instances for every type of file, including
693 * pipes, sockets, fifos, and regular files.
694 */
695 static void
696 aio_process(struct aiocblist *aiocbe)
697 {
698 struct ucred *td_savedcred;
699 struct thread *td;
700 struct proc *mycp;
701 struct aiocb *cb;
702 struct file *fp;
703 struct uio auio;
704 struct iovec aiov;
705 int cnt;
706 int error;
707 int oublock_st, oublock_end;
708 int inblock_st, inblock_end;
709
710 td = curthread;
711 td_savedcred = td->td_ucred;
712 td->td_ucred = aiocbe->cred;
713 mycp = td->td_proc;
714 cb = &aiocbe->uaiocb;
715 fp = aiocbe->fd_file;
716
717 aiov.iov_base = (void *)(uintptr_t)cb->aio_buf;
718 aiov.iov_len = cb->aio_nbytes;
719
720 auio.uio_iov = &aiov;
721 auio.uio_iovcnt = 1;
722 auio.uio_offset = cb->aio_offset;
723 auio.uio_resid = cb->aio_nbytes;
724 cnt = cb->aio_nbytes;
725 auio.uio_segflg = UIO_USERSPACE;
726 auio.uio_td = td;
727
728 inblock_st = mycp->p_stats->p_ru.ru_inblock;
729 oublock_st = mycp->p_stats->p_ru.ru_oublock;
730 /*
731 * _aio_aqueue() acquires a reference to the file that is
732 * released in aio_free_entry().
733 */
734 if (cb->aio_lio_opcode == LIO_READ) {
735 auio.uio_rw = UIO_READ;
736 error = fo_read(fp, &auio, fp->f_cred, FOF_OFFSET, td);
737 } else {
738 auio.uio_rw = UIO_WRITE;
739 error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
740 }
741 inblock_end = mycp->p_stats->p_ru.ru_inblock;
742 oublock_end = mycp->p_stats->p_ru.ru_oublock;
743
744 aiocbe->inputcharge = inblock_end - inblock_st;
745 aiocbe->outputcharge = oublock_end - oublock_st;
746
747 if ((error) && (auio.uio_resid != cnt)) {
748 if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
749 error = 0;
750 if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
751 PROC_LOCK(aiocbe->userproc);
752 psignal(aiocbe->userproc, SIGPIPE);
753 PROC_UNLOCK(aiocbe->userproc);
754 }
755 }
756
757 cnt -= auio.uio_resid;
758 cb->_aiocb_private.error = error;
759 cb->_aiocb_private.status = cnt;
760 td->td_ucred = td_savedcred;
761 }
762
763 /*
764 * The AIO daemon, most of the actual work is done in aio_process,
765 * but the setup (and address space mgmt) is done in this routine.
766 */
767 static void
768 aio_daemon(void *uproc)
769 {
770 int s;
771 struct aio_liojob *lj;
772 struct aiocb *cb;
773 struct aiocblist *aiocbe;
774 struct aiothreadlist *aiop;
775 struct kaioinfo *ki;
776 struct proc *curcp, *mycp, *userp;
777 struct vmspace *myvm, *tmpvm;
778 struct thread *td = curthread;
779 struct pgrp *newpgrp;
780 struct session *newsess;
781
782 mtx_lock(&Giant);
783 /*
784 * Local copies of curproc (cp) and vmspace (myvm)
785 */
786 mycp = td->td_proc;
787 myvm = mycp->p_vmspace;
788
789 KASSERT(mycp->p_textvp == NULL, ("kthread has a textvp"));
790
791 /*
792 * Allocate and ready the aio control info. There is one aiop structure
793 * per daemon.
794 */
795 aiop = uma_zalloc(aiop_zone, M_WAITOK);
796 aiop->aiothread = td;
797 aiop->aiothreadflags |= AIOP_FREE;
798
799 s = splnet();
800
801 /*
802 * Place thread (lightweight process) onto the AIO free thread list.
803 */
804 if (TAILQ_EMPTY(&aio_freeproc))
805 wakeup(&aio_freeproc);
806 TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
807
808 splx(s);
809
810 /*
811 * Get rid of our current filedescriptors. AIOD's don't need any
812 * filedescriptors, except as temporarily inherited from the client.
813 */
814 fdfree(td);
815
816 mtx_unlock(&Giant);
817 /* The daemon resides in its own pgrp. */
818 MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP,
819 M_WAITOK | M_ZERO);
820 MALLOC(newsess, struct session *, sizeof(struct session), M_SESSION,
821 M_WAITOK | M_ZERO);
822
823 sx_xlock(&proctree_lock);
824 enterpgrp(mycp, mycp->p_pid, newpgrp, newsess);
825 sx_xunlock(&proctree_lock);
826 mtx_lock(&Giant);
827
828 /*
829 * Wakeup parent process. (Parent sleeps to keep from blasting away
830 * and creating too many daemons.)
831 */
832 wakeup(mycp);
833
834 for (;;) {
835 /*
836 * curcp is the current daemon process context.
837 * userp is the current user process context.
838 */
839 curcp = mycp;
840
841 /*
842 * Take daemon off of free queue
843 */
844 if (aiop->aiothreadflags & AIOP_FREE) {
845 s = splnet();
846 TAILQ_REMOVE(&aio_freeproc, aiop, list);
847 TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
848 aiop->aiothreadflags &= ~AIOP_FREE;
849 splx(s);
850 }
851 aiop->aiothreadflags &= ~AIOP_SCHED;
852
853 /*
854 * Check for jobs.
855 */
856 while ((aiocbe = aio_selectjob(aiop)) != NULL) {
857 cb = &aiocbe->uaiocb;
858 userp = aiocbe->userproc;
859
860 aiocbe->jobstate = JOBST_JOBRUNNING;
861
862 /*
863 * Connect to process address space for user program.
864 */
865 if (userp != curcp) {
866 /*
867 * Save the current address space that we are
868 * connected to.
869 */
870 tmpvm = mycp->p_vmspace;
871
872 /*
873 * Point to the new user address space, and
874 * refer to it.
875 */
876 mycp->p_vmspace = userp->p_vmspace;
877 atomic_add_int(&mycp->p_vmspace->vm_refcnt, 1);
878
879 /* Activate the new mapping. */
880 pmap_activate(FIRST_THREAD_IN_PROC(mycp));
881
882 /*
883 * If the old address space wasn't the daemons
884 * own address space, then we need to remove the
885 * daemon's reference from the other process
886 * that it was acting on behalf of.
887 */
888 if (tmpvm != myvm) {
889 vmspace_free(tmpvm);
890 }
891 curcp = userp;
892 }
893
894 ki = userp->p_aioinfo;
895 lj = aiocbe->lio;
896
897 /* Account for currently active jobs. */
898 ki->kaio_active_count++;
899
900 /* Do the I/O function. */
901 aio_process(aiocbe);
902
903 /* Decrement the active job count. */
904 ki->kaio_active_count--;
905
906 /*
907 * Increment the completion count for wakeup/signal
908 * comparisons.
909 */
910 aiocbe->jobflags |= AIOCBLIST_DONE;
911 ki->kaio_queue_finished_count++;
912 if (lj)
913 lj->lioj_queue_finished_count++;
914 if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags
915 & KAIO_RUNDOWN) && (ki->kaio_active_count == 0))) {
916 ki->kaio_flags &= ~KAIO_WAKEUP;
917 wakeup(userp);
918 }
919
920 s = splbio();
921 if (lj && (lj->lioj_flags &
922 (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == LIOJ_SIGNAL) {
923 if ((lj->lioj_queue_finished_count ==
924 lj->lioj_queue_count) &&
925 (lj->lioj_buffer_finished_count ==
926 lj->lioj_buffer_count)) {
927 PROC_LOCK(userp);
928 psignal(userp,
929 lj->lioj_signal.sigev_signo);
930 PROC_UNLOCK(userp);
931 lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
932 }
933 }
934 splx(s);
935
936 aiocbe->jobstate = JOBST_JOBFINISHED;
937
938 s = splnet();
939 TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
940 TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist);
941 splx(s);
942 KNOTE_UNLOCKED(&aiocbe->klist, 0);
943
944 if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
945 wakeup(aiocbe);
946 aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
947 }
948
949 if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
950 PROC_LOCK(userp);
951 psignal(userp, cb->aio_sigevent.sigev_signo);
952 PROC_UNLOCK(userp);
953 }
954 }
955
956 /*
957 * Disconnect from user address space.
958 */
959 if (curcp != mycp) {
960 /* Get the user address space to disconnect from. */
961 tmpvm = mycp->p_vmspace;
962
963 /* Get original address space for daemon. */
964 mycp->p_vmspace = myvm;
965
966 /* Activate the daemon's address space. */
967 pmap_activate(FIRST_THREAD_IN_PROC(mycp));
968 #ifdef DIAGNOSTIC
969 if (tmpvm == myvm) {
970 printf("AIOD: vmspace problem -- %d\n",
971 mycp->p_pid);
972 }
973 #endif
974 /* Remove our vmspace reference. */
975 vmspace_free(tmpvm);
976
977 curcp = mycp;
978 }
979
980 /*
981 * If we are the first to be put onto the free queue, wakeup
982 * anyone waiting for a daemon.
983 */
984 s = splnet();
985 TAILQ_REMOVE(&aio_activeproc, aiop, list);
986 if (TAILQ_EMPTY(&aio_freeproc))
987 wakeup(&aio_freeproc);
988 TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
989 aiop->aiothreadflags |= AIOP_FREE;
990 splx(s);
991
992 /*
993 * If daemon is inactive for a long time, allow it to exit,
994 * thereby freeing resources.
995 */
996 if ((aiop->aiothreadflags & AIOP_SCHED) == 0 &&
997 tsleep(aiop->aiothread, PRIBIO, "aiordy", aiod_lifetime)) {
998 s = splnet();
999 if (TAILQ_EMPTY(&aio_jobs)) {
1000 if ((aiop->aiothreadflags & AIOP_FREE) &&
1001 (num_aio_procs > target_aio_procs)) {
1002 TAILQ_REMOVE(&aio_freeproc, aiop, list);
1003 splx(s);
1004 uma_zfree(aiop_zone, aiop);
1005 num_aio_procs--;
1006 #ifdef DIAGNOSTIC
1007 if (mycp->p_vmspace->vm_refcnt <= 1) {
1008 printf("AIOD: bad vm refcnt for"
1009 " exiting daemon: %d\n",
1010 mycp->p_vmspace->vm_refcnt);
1011 }
1012 #endif
1013 kthread_exit(0);
1014 }
1015 }
1016 splx(s);
1017 }
1018 }
1019 }
1020
1021 /*
1022 * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The
1023 * AIO daemon modifies its environment itself.
1024 */
1025 static int
1026 aio_newproc(void)
1027 {
1028 int error;
1029 struct proc *p;
1030
1031 error = kthread_create(aio_daemon, curproc, &p, RFNOWAIT, 0, "aiod%d",
1032 num_aio_procs);
1033 if (error)
1034 return (error);
1035
1036 /*
1037 * Wait until daemon is started, but continue on just in case to
1038 * handle error conditions.
1039 */
1040 error = tsleep(p, PZERO, "aiosta", aiod_timeout);
1041
1042 num_aio_procs++;
1043
1044 return (error);
1045 }
1046
1047 /*
1048 * Try the high-performance, low-overhead physio method for eligible
1049 * VCHR devices. This method doesn't use an aio helper thread, and
1050 * thus has very low overhead.
1051 *
1052 * Assumes that the caller, _aio_aqueue(), has incremented the file
1053 * structure's reference count, preventing its deallocation for the
1054 * duration of this call.
1055 */
1056 static int
1057 aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
1058 {
1059 int error;
1060 struct aiocb *cb;
1061 struct file *fp;
1062 struct buf *bp;
1063 struct vnode *vp;
1064 struct kaioinfo *ki;
1065 struct aio_liojob *lj;
1066 int s;
1067 int notify;
1068
1069 cb = &aiocbe->uaiocb;
1070 fp = aiocbe->fd_file;
1071
1072 if (fp->f_type != DTYPE_VNODE)
1073 return (-1);
1074
1075 vp = fp->f_vnode;
1076
1077 /*
1078 * If its not a disk, we don't want to return a positive error.
1079 * It causes the aio code to not fall through to try the thread
1080 * way when you're talking to a regular file.
1081 */
1082 if (!vn_isdisk(vp, &error)) {
1083 if (error == ENOTBLK)
1084 return (-1);
1085 else
1086 return (error);
1087 }
1088
1089 if (cb->aio_nbytes % vp->v_rdev->si_bsize_phys)
1090 return (-1);
1091
1092 if (cb->aio_nbytes >
1093 MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK))
1094 return (-1);
1095
1096 ki = p->p_aioinfo;
1097 if (ki->kaio_buffer_count >= ki->kaio_ballowed_count)
1098 return (-1);
1099
1100 ki->kaio_buffer_count++;
1101
1102 lj = aiocbe->lio;
1103 if (lj)
1104 lj->lioj_buffer_count++;
1105
1106 /* Create and build a buffer header for a transfer. */
1107 bp = (struct buf *)getpbuf(NULL);
1108 BUF_KERNPROC(bp);
1109
1110 /*
1111 * Get a copy of the kva from the physical buffer.
1112 */
1113 bp->b_dev = vp->v_rdev;
1114 error = 0;
1115
1116 bp->b_bcount = cb->aio_nbytes;
1117 bp->b_bufsize = cb->aio_nbytes;
1118 bp->b_iodone = aio_physwakeup;
1119 bp->b_saveaddr = bp->b_data;
1120 bp->b_data = (void *)(uintptr_t)cb->aio_buf;
1121 bp->b_offset = cb->aio_offset;
1122 bp->b_iooffset = cb->aio_offset;
1123 bp->b_blkno = btodb(cb->aio_offset);
1124 bp->b_iocmd = cb->aio_lio_opcode == LIO_WRITE ? BIO_WRITE : BIO_READ;
1125
1126 /*
1127 * Bring buffer into kernel space.
1128 */
1129 if (vmapbuf(bp) < 0) {
1130 error = EFAULT;
1131 goto doerror;
1132 }
1133
1134 s = splbio();
1135 aiocbe->bp = bp;
1136 bp->b_caller1 = (void *)aiocbe;
1137 TAILQ_INSERT_TAIL(&aio_bufjobs, aiocbe, list);
1138 TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
1139 aiocbe->jobstate = JOBST_JOBQBUF;
1140 cb->_aiocb_private.status = cb->aio_nbytes;
1141 num_buf_aio++;
1142 bp->b_error = 0;
1143
1144 splx(s);
1145
1146 /* Perform transfer. */
1147 DEV_STRATEGY(bp);
1148
1149 notify = 0;
1150 s = splbio();
1151
1152 /*
1153 * If we had an error invoking the request, or an error in processing
1154 * the request before we have returned, we process it as an error in
1155 * transfer. Note that such an I/O error is not indicated immediately,
1156 * but is returned using the aio_error mechanism. In this case,
1157 * aio_suspend will return immediately.
1158 */
1159 if (bp->b_error || (bp->b_ioflags & BIO_ERROR)) {
1160 struct aiocb *job = aiocbe->uuaiocb;
1161
1162 aiocbe->uaiocb._aiocb_private.status = 0;
1163 suword(&job->_aiocb_private.status, 0);
1164 aiocbe->uaiocb._aiocb_private.error = bp->b_error;
1165 suword(&job->_aiocb_private.error, bp->b_error);
1166
1167 ki->kaio_buffer_finished_count++;
1168
1169 if (aiocbe->jobstate != JOBST_JOBBFINISHED) {
1170 aiocbe->jobstate = JOBST_JOBBFINISHED;
1171 aiocbe->jobflags |= AIOCBLIST_DONE;
1172 TAILQ_REMOVE(&aio_bufjobs, aiocbe, list);
1173 TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
1174 TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
1175 notify = 1;
1176 }
1177 }
1178 splx(s);
1179 if (notify)
1180 KNOTE_UNLOCKED(&aiocbe->klist, 0);
1181 return (0);
1182
1183 doerror:
1184 ki->kaio_buffer_count--;
1185 if (lj)
1186 lj->lioj_buffer_count--;
1187 aiocbe->bp = NULL;
1188 relpbuf(bp, NULL);
1189 return (error);
1190 }
1191
1192 /*
1193 * This waits/tests physio completion.
1194 */
1195 static int
1196 aio_fphysio(struct aiocblist *iocb)
1197 {
1198 int s;
1199 struct buf *bp;
1200 int error;
1201
1202 bp = iocb->bp;
1203
1204 s = splbio();
1205 while ((bp->b_flags & B_DONE) == 0) {
1206 if (tsleep(bp, PRIBIO, "physstr", aiod_timeout)) {
1207 if ((bp->b_flags & B_DONE) == 0) {
1208 splx(s);
1209 return (EINPROGRESS);
1210 } else
1211 break;
1212 }
1213 }
1214 splx(s);
1215
1216 /* Release mapping into kernel space. */
1217 vunmapbuf(bp);
1218 iocb->bp = 0;
1219
1220 error = 0;
1221
1222 /* Check for an error. */
1223 if (bp->b_ioflags & BIO_ERROR)
1224 error = bp->b_error;
1225
1226 relpbuf(bp, NULL);
1227 return (error);
1228 }
1229
1230 /*
1231 * Wake up aio requests that may be serviceable now.
1232 */
1233 static void
1234 aio_swake_cb(struct socket *so, struct sockbuf *sb)
1235 {
1236 struct aiocblist *cb,*cbn;
1237 struct proc *p;
1238 struct kaioinfo *ki = NULL;
1239 int opcode, wakecount = 0;
1240 struct aiothreadlist *aiop;
1241
1242 if (sb == &so->so_snd) {
1243 opcode = LIO_WRITE;
1244 SOCKBUF_LOCK(&so->so_snd);
1245 so->so_snd.sb_flags &= ~SB_AIO;
1246 SOCKBUF_UNLOCK(&so->so_snd);
1247 } else {
1248 opcode = LIO_READ;
1249 SOCKBUF_LOCK(&so->so_rcv);
1250 so->so_rcv.sb_flags &= ~SB_AIO;
1251 SOCKBUF_UNLOCK(&so->so_rcv);
1252 }
1253
1254 for (cb = TAILQ_FIRST(&so->so_aiojobq); cb; cb = cbn) {
1255 cbn = TAILQ_NEXT(cb, list);
1256 if (opcode == cb->uaiocb.aio_lio_opcode) {
1257 p = cb->userproc;
1258 ki = p->p_aioinfo;
1259 TAILQ_REMOVE(&so->so_aiojobq, cb, list);
1260 TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
1261 TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
1262 TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
1263 wakecount++;
1264 if (cb->jobstate != JOBST_JOBQGLOBAL)
1265 panic("invalid queue value");
1266 }
1267 }
1268
1269 while (wakecount--) {
1270 if ((aiop = TAILQ_FIRST(&aio_freeproc)) != 0) {
1271 TAILQ_REMOVE(&aio_freeproc, aiop, list);
1272 TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
1273 aiop->aiothreadflags &= ~AIOP_FREE;
1274 wakeup(aiop->aiothread);
1275 }
1276 }
1277 }
1278
1279 /*
1280 * Queue a new AIO request. Choosing either the threaded or direct physio VCHR
1281 * technique is done in this code.
1282 */
1283 static int
1284 _aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, int type)
1285 {
1286 struct proc *p = td->td_proc;
1287 struct filedesc *fdp;
1288 struct file *fp;
1289 unsigned int fd;
1290 struct socket *so;
1291 int s;
1292 int error;
1293 int opcode, user_opcode;
1294 struct aiocblist *aiocbe;
1295 struct aiothreadlist *aiop;
1296 struct kaioinfo *ki;
1297 struct kevent kev;
1298 struct kqueue *kq;
1299 struct file *kq_fp;
1300
1301 aiocbe = uma_zalloc(aiocb_zone, M_WAITOK);
1302 aiocbe->inputcharge = 0;
1303 aiocbe->outputcharge = 0;
1304 callout_handle_init(&aiocbe->timeouthandle);
1305 /* XXX - need a lock */
1306 knlist_init(&aiocbe->klist, NULL);
1307
1308 suword(&job->_aiocb_private.status, -1);
1309 suword(&job->_aiocb_private.error, 0);
1310 suword(&job->_aiocb_private.kernelinfo, -1);
1311
1312 error = copyin(job, &aiocbe->uaiocb, sizeof(aiocbe->uaiocb));
1313 if (error) {
1314 suword(&job->_aiocb_private.error, error);
1315 uma_zfree(aiocb_zone, aiocbe);
1316 return (error);
1317 }
1318 if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL &&
1319 !_SIG_VALID(aiocbe->uaiocb.aio_sigevent.sigev_signo)) {
1320 uma_zfree(aiocb_zone, aiocbe);
1321 return (EINVAL);
1322 }
1323
1324 /* Save userspace address of the job info. */
1325 aiocbe->uuaiocb = job;
1326
1327 /* Get the opcode. */
1328 user_opcode = aiocbe->uaiocb.aio_lio_opcode;
1329 if (type != LIO_NOP)
1330 aiocbe->uaiocb.aio_lio_opcode = type;
1331 opcode = aiocbe->uaiocb.aio_lio_opcode;
1332
1333 /* Get the fd info for process. */
1334 fdp = p->p_fd;
1335
1336 /*
1337 * Range check file descriptor.
1338 */
1339 FILEDESC_LOCK(fdp);
1340 fd = aiocbe->uaiocb.aio_fildes;
1341 if (fd >= fdp->fd_nfiles) {
1342 FILEDESC_UNLOCK(fdp);
1343 uma_zfree(aiocb_zone, aiocbe);
1344 if (type == 0)
1345 suword(&job->_aiocb_private.error, EBADF);
1346 return (EBADF);
1347 }
1348
1349 fp = aiocbe->fd_file = fdp->fd_ofiles[fd];
1350 if ((fp == NULL) ||
1351 ((opcode == LIO_WRITE) && ((fp->f_flag & FWRITE) == 0)) ||
1352 ((opcode == LIO_READ) && ((fp->f_flag & FREAD) == 0))) {
1353 FILEDESC_UNLOCK(fdp);
1354 uma_zfree(aiocb_zone, aiocbe);
1355 if (type == 0)
1356 suword(&job->_aiocb_private.error, EBADF);
1357 return (EBADF);
1358 }
1359 fhold(fp);
1360 FILEDESC_UNLOCK(fdp);
1361
1362 if (aiocbe->uaiocb.aio_offset == -1LL) {
1363 error = EINVAL;
1364 goto aqueue_fail;
1365 }
1366 error = suword(&job->_aiocb_private.kernelinfo, jobrefid);
1367 if (error) {
1368 error = EINVAL;
1369 goto aqueue_fail;
1370 }
1371 aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jobrefid;
1372 if (jobrefid == LONG_MAX)
1373 jobrefid = 1;
1374 else
1375 jobrefid++;
1376
1377 if (opcode == LIO_NOP) {
1378 fdrop(fp, td);
1379 uma_zfree(aiocb_zone, aiocbe);
1380 if (type == 0) {
1381 suword(&job->_aiocb_private.error, 0);
1382 suword(&job->_aiocb_private.status, 0);
1383 suword(&job->_aiocb_private.kernelinfo, 0);
1384 }
1385 return (0);
1386 }
1387 if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) {
1388 if (type == 0)
1389 suword(&job->_aiocb_private.status, 0);
1390 error = EINVAL;
1391 goto aqueue_fail;
1392 }
1393
1394 if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
1395 kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue;
1396 kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sigval_ptr;
1397 }
1398 else {
1399 /*
1400 * This method for requesting kevent-based notification won't
1401 * work on the alpha, since we're passing in a pointer
1402 * via aio_lio_opcode, which is an int. Use the SIGEV_KEVENT-
1403 * based method instead.
1404 */
1405 if (user_opcode == LIO_NOP || user_opcode == LIO_READ ||
1406 user_opcode == LIO_WRITE)
1407 goto no_kqueue;
1408
1409 error = copyin((struct kevent *)(uintptr_t)user_opcode,
1410 &kev, sizeof(kev));
1411 if (error)
1412 goto aqueue_fail;
1413 }
1414 if ((u_int)kev.ident >= fdp->fd_nfiles ||
1415 (kq_fp = fdp->fd_ofiles[kev.ident]) == NULL ||
1416 (kq_fp->f_type != DTYPE_KQUEUE)) {
1417 error = EBADF;
1418 goto aqueue_fail;
1419 }
1420 kq = kq_fp->f_data;
1421 kev.ident = (uintptr_t)aiocbe->uuaiocb;
1422 kev.filter = EVFILT_AIO;
1423 kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1424 kev.data = (intptr_t)aiocbe;
1425 error = kqueue_register(kq, &kev, td, 1);
1426 aqueue_fail:
1427 if (error) {
1428 fdrop(fp, td);
1429 uma_zfree(aiocb_zone, aiocbe);
1430 if (type == 0)
1431 suword(&job->_aiocb_private.error, error);
1432 goto done;
1433 }
1434 no_kqueue:
1435
1436 suword(&job->_aiocb_private.error, EINPROGRESS);
1437 aiocbe->uaiocb._aiocb_private.error = EINPROGRESS;
1438 aiocbe->userproc = p;
1439 aiocbe->cred = crhold(td->td_ucred);
1440 aiocbe->jobflags = 0;
1441 aiocbe->lio = lj;
1442 ki = p->p_aioinfo;
1443
1444 if (fp->f_type == DTYPE_SOCKET) {
1445 /*
1446 * Alternate queueing for socket ops: Reach down into the
1447 * descriptor to get the socket data. Then check to see if the
1448 * socket is ready to be read or written (based on the requested
1449 * operation).
1450 *
1451 * If it is not ready for io, then queue the aiocbe on the
1452 * socket, and set the flags so we get a call when sbnotify()
1453 * happens.
1454 */
1455 so = fp->f_data;
1456 s = splnet();
1457 if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
1458 LIO_WRITE) && (!sowriteable(so)))) {
1459 TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
1460 TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
1461 if (opcode == LIO_READ) {
1462 SOCKBUF_LOCK(&so->so_rcv);
1463 so->so_rcv.sb_flags |= SB_AIO;
1464 SOCKBUF_UNLOCK(&so->so_rcv);
1465 } else {
1466 SOCKBUF_LOCK(&so->so_snd);
1467 so->so_snd.sb_flags |= SB_AIO;
1468 SOCKBUF_UNLOCK(&so->so_snd);
1469 }
1470 aiocbe->jobstate = JOBST_JOBQGLOBAL; /* XXX */
1471 ki->kaio_queue_count++;
1472 num_queue_count++;
1473 splx(s);
1474 error = 0;
1475 goto done;
1476 }
1477 splx(s);
1478 }
1479
1480 if ((error = aio_qphysio(p, aiocbe)) == 0)
1481 goto done;
1482 if (error > 0) {
1483 suword(&job->_aiocb_private.status, 0);
1484 aiocbe->uaiocb._aiocb_private.error = error;
1485 suword(&job->_aiocb_private.error, error);
1486 goto done;
1487 }
1488
1489 /* No buffer for daemon I/O. */
1490 aiocbe->bp = NULL;
1491
1492 ki->kaio_queue_count++;
1493 if (lj)
1494 lj->lioj_queue_count++;
1495 s = splnet();
1496 TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
1497 TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
1498 splx(s);
1499 aiocbe->jobstate = JOBST_JOBQGLOBAL;
1500
1501 num_queue_count++;
1502 error = 0;
1503
1504 /*
1505 * If we don't have a free AIO process, and we are below our quota, then
1506 * start one. Otherwise, depend on the subsequent I/O completions to
1507 * pick-up this job. If we don't sucessfully create the new process
1508 * (thread) due to resource issues, we return an error for now (EAGAIN),
1509 * which is likely not the correct thing to do.
1510 */
1511 s = splnet();
1512 retryproc:
1513 if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1514 TAILQ_REMOVE(&aio_freeproc, aiop, list);
1515 TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
1516 aiop->aiothreadflags &= ~AIOP_FREE;
1517 wakeup(aiop->aiothread);
1518 } else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) &&
1519 ((ki->kaio_active_count + num_aio_resv_start) <
1520 ki->kaio_maxactive_count)) {
1521 num_aio_resv_start++;
1522 if ((error = aio_newproc()) == 0) {
1523 num_aio_resv_start--;
1524 goto retryproc;
1525 }
1526 num_aio_resv_start--;
1527 }
1528 splx(s);
1529 done:
1530 return (error);
1531 }
1532
1533 /*
1534 * This routine queues an AIO request, checking for quotas.
1535 */
1536 static int
1537 aio_aqueue(struct thread *td, struct aiocb *job, int type)
1538 {
1539 struct proc *p = td->td_proc;
1540 struct kaioinfo *ki;
1541
1542 if (p->p_aioinfo == NULL)
1543 aio_init_aioinfo(p);
1544
1545 if (num_queue_count >= max_queue_count)
1546 return (EAGAIN);
1547
1548 ki = p->p_aioinfo;
1549 if (ki->kaio_queue_count >= ki->kaio_qallowed_count)
1550 return (EAGAIN);
1551
1552 return _aio_aqueue(td, job, NULL, type);
1553 }
1554
1555 /*
1556 * Support the aio_return system call, as a side-effect, kernel resources are
1557 * released.
1558 */
1559 int
1560 aio_return(struct thread *td, struct aio_return_args *uap)
1561 {
1562 struct proc *p = td->td_proc;
1563 int s;
1564 long jobref;
1565 struct aiocblist *cb, *ncb;
1566 struct aiocb *ujob;
1567 struct kaioinfo *ki;
1568
1569 ujob = uap->aiocbp;
1570 jobref = fuword(&ujob->_aiocb_private.kernelinfo);
1571 if (jobref == -1 || jobref == 0)
1572 return (EINVAL);
1573
1574 ki = p->p_aioinfo;
1575 if (ki == NULL)
1576 return (EINVAL);
1577 TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1578 if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) ==
1579 jobref) {
1580 if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
1581 p->p_stats->p_ru.ru_oublock +=
1582 cb->outputcharge;
1583 cb->outputcharge = 0;
1584 } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
1585 p->p_stats->p_ru.ru_inblock += cb->inputcharge;
1586 cb->inputcharge = 0;
1587 }
1588 goto done;
1589 }
1590 }
1591 s = splbio();
1592 for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = ncb) {
1593 ncb = TAILQ_NEXT(cb, plist);
1594 if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo)
1595 == jobref) {
1596 break;
1597 }
1598 }
1599 splx(s);
1600 done:
1601 if (cb != NULL) {
1602 if (ujob == cb->uuaiocb) {
1603 td->td_retval[0] =
1604 cb->uaiocb._aiocb_private.status;
1605 } else
1606 td->td_retval[0] = EFAULT;
1607 aio_free_entry(cb);
1608 return (0);
1609 }
1610 return (EINVAL);
1611 }
1612
1613 /*
1614 * Allow a process to wakeup when any of the I/O requests are completed.
1615 */
1616 int
1617 aio_suspend(struct thread *td, struct aio_suspend_args *uap)
1618 {
1619 struct proc *p = td->td_proc;
1620 struct timeval atv;
1621 struct timespec ts;
1622 struct aiocb *const *cbptr, *cbp;
1623 struct kaioinfo *ki;
1624 struct aiocblist *cb;
1625 int i;
1626 int njoblist;
1627 int error, s, timo;
1628 long *ijoblist;
1629 struct aiocb **ujoblist;
1630
1631 if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX)
1632 return (EINVAL);
1633
1634 timo = 0;
1635 if (uap->timeout) {
1636 /* Get timespec struct. */
1637 if ((error = copyin(uap->timeout, &ts, sizeof(ts))) != 0)
1638 return (error);
1639
1640 if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000)
1641 return (EINVAL);
1642
1643 TIMESPEC_TO_TIMEVAL(&atv, &ts);
1644 if (itimerfix(&atv))
1645 return (EINVAL);
1646 timo = tvtohz(&atv);
1647 }
1648
1649 ki = p->p_aioinfo;
1650 if (ki == NULL)
1651 return (EAGAIN);
1652
1653 njoblist = 0;
1654 ijoblist = uma_zalloc(aiol_zone, M_WAITOK);
1655 ujoblist = uma_zalloc(aiol_zone, M_WAITOK);
1656 cbptr = uap->aiocbp;
1657
1658 for (i = 0; i < uap->nent; i++) {
1659 cbp = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1660 if (cbp == 0)
1661 continue;
1662 ujoblist[njoblist] = cbp;
1663 ijoblist[njoblist] = fuword(&cbp->_aiocb_private.kernelinfo);
1664 njoblist++;
1665 }
1666
1667 if (njoblist == 0) {
1668 uma_zfree(aiol_zone, ijoblist);
1669 uma_zfree(aiol_zone, ujoblist);
1670 return (0);
1671 }
1672
1673 error = 0;
1674 for (;;) {
1675 TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1676 for (i = 0; i < njoblist; i++) {
1677 if (((intptr_t)
1678 cb->uaiocb._aiocb_private.kernelinfo) ==
1679 ijoblist[i]) {
1680 if (ujoblist[i] != cb->uuaiocb)
1681 error = EINVAL;
1682 uma_zfree(aiol_zone, ijoblist);
1683 uma_zfree(aiol_zone, ujoblist);
1684 return (error);
1685 }
1686 }
1687 }
1688
1689 s = splbio();
1690 for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb =
1691 TAILQ_NEXT(cb, plist)) {
1692 for (i = 0; i < njoblist; i++) {
1693 if (((intptr_t)
1694 cb->uaiocb._aiocb_private.kernelinfo) ==
1695 ijoblist[i]) {
1696 splx(s);
1697 if (ujoblist[i] != cb->uuaiocb)
1698 error = EINVAL;
1699 uma_zfree(aiol_zone, ijoblist);
1700 uma_zfree(aiol_zone, ujoblist);
1701 return (error);
1702 }
1703 }
1704 }
1705
1706 ki->kaio_flags |= KAIO_WAKEUP;
1707 error = tsleep(p, PRIBIO | PCATCH, "aiospn", timo);
1708 splx(s);
1709
1710 if (error == ERESTART || error == EINTR) {
1711 uma_zfree(aiol_zone, ijoblist);
1712 uma_zfree(aiol_zone, ujoblist);
1713 return (EINTR);
1714 } else if (error == EWOULDBLOCK) {
1715 uma_zfree(aiol_zone, ijoblist);
1716 uma_zfree(aiol_zone, ujoblist);
1717 return (EAGAIN);
1718 }
1719 }
1720
1721 /* NOTREACHED */
1722 return (EINVAL);
1723 }
1724
1725 /*
1726 * aio_cancel cancels any non-physio aio operations not currently in
1727 * progress.
1728 */
1729 int
1730 aio_cancel(struct thread *td, struct aio_cancel_args *uap)
1731 {
1732 struct proc *p = td->td_proc;
1733 struct kaioinfo *ki;
1734 struct aiocblist *cbe, *cbn;
1735 struct file *fp;
1736 struct filedesc *fdp;
1737 struct socket *so;
1738 struct proc *po;
1739 int s,error;
1740 int cancelled=0;
1741 int notcancelled=0;
1742 struct vnode *vp;
1743
1744 fdp = p->p_fd;
1745 if ((u_int)uap->fd >= fdp->fd_nfiles ||
1746 (fp = fdp->fd_ofiles[uap->fd]) == NULL)
1747 return (EBADF);
1748
1749 if (fp->f_type == DTYPE_VNODE) {
1750 vp = fp->f_vnode;
1751
1752 if (vn_isdisk(vp,&error)) {
1753 td->td_retval[0] = AIO_NOTCANCELED;
1754 return (0);
1755 }
1756 } else if (fp->f_type == DTYPE_SOCKET) {
1757 so = fp->f_data;
1758
1759 s = splnet();
1760
1761 for (cbe = TAILQ_FIRST(&so->so_aiojobq); cbe; cbe = cbn) {
1762 cbn = TAILQ_NEXT(cbe, list);
1763 if ((uap->aiocbp == NULL) ||
1764 (uap->aiocbp == cbe->uuaiocb) ) {
1765 po = cbe->userproc;
1766 ki = po->p_aioinfo;
1767 TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
1768 TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
1769 TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe, plist);
1770 if (ki->kaio_flags & KAIO_WAKEUP) {
1771 wakeup(po);
1772 }
1773 cbe->jobstate = JOBST_JOBFINISHED;
1774 cbe->uaiocb._aiocb_private.status=-1;
1775 cbe->uaiocb._aiocb_private.error=ECANCELED;
1776 cancelled++;
1777 /* XXX cancelled, knote? */
1778 if (cbe->uaiocb.aio_sigevent.sigev_notify ==
1779 SIGEV_SIGNAL) {
1780 PROC_LOCK(cbe->userproc);
1781 psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo);
1782 PROC_UNLOCK(cbe->userproc);
1783 }
1784 if (uap->aiocbp)
1785 break;
1786 }
1787 }
1788 splx(s);
1789
1790 if ((cancelled) && (uap->aiocbp)) {
1791 td->td_retval[0] = AIO_CANCELED;
1792 return (0);
1793 }
1794 }
1795 ki=p->p_aioinfo;
1796 if (ki == NULL)
1797 goto done;
1798 s = splnet();
1799
1800 for (cbe = TAILQ_FIRST(&ki->kaio_jobqueue); cbe; cbe = cbn) {
1801 cbn = TAILQ_NEXT(cbe, plist);
1802
1803 if ((uap->fd == cbe->uaiocb.aio_fildes) &&
1804 ((uap->aiocbp == NULL ) ||
1805 (uap->aiocbp == cbe->uuaiocb))) {
1806
1807 if (cbe->jobstate == JOBST_JOBQGLOBAL) {
1808 TAILQ_REMOVE(&aio_jobs, cbe, list);
1809 TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
1810 TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe,
1811 plist);
1812 cancelled++;
1813 ki->kaio_queue_finished_count++;
1814 cbe->jobstate = JOBST_JOBFINISHED;
1815 cbe->uaiocb._aiocb_private.status = -1;
1816 cbe->uaiocb._aiocb_private.error = ECANCELED;
1817 /* XXX cancelled, knote? */
1818 if (cbe->uaiocb.aio_sigevent.sigev_notify ==
1819 SIGEV_SIGNAL) {
1820 PROC_LOCK(cbe->userproc);
1821 psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo);
1822 PROC_UNLOCK(cbe->userproc);
1823 }
1824 } else {
1825 notcancelled++;
1826 }
1827 }
1828 }
1829 splx(s);
1830 done:
1831 if (notcancelled) {
1832 td->td_retval[0] = AIO_NOTCANCELED;
1833 return (0);
1834 }
1835 if (cancelled) {
1836 td->td_retval[0] = AIO_CANCELED;
1837 return (0);
1838 }
1839 td->td_retval[0] = AIO_ALLDONE;
1840
1841 return (0);
1842 }
1843
1844 /*
1845 * aio_error is implemented in the kernel level for compatibility purposes only.
1846 * For a user mode async implementation, it would be best to do it in a userland
1847 * subroutine.
1848 */
1849 int
1850 aio_error(struct thread *td, struct aio_error_args *uap)
1851 {
1852 struct proc *p = td->td_proc;
1853 int s;
1854 struct aiocblist *cb;
1855 struct kaioinfo *ki;
1856 long jobref;
1857
1858 ki = p->p_aioinfo;
1859 if (ki == NULL)
1860 return (EINVAL);
1861
1862 jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo);
1863 if ((jobref == -1) || (jobref == 0))
1864 return (EINVAL);
1865
1866 TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1867 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1868 jobref) {
1869 td->td_retval[0] = cb->uaiocb._aiocb_private.error;
1870 return (0);
1871 }
1872 }
1873
1874 s = splnet();
1875
1876 for (cb = TAILQ_FIRST(&ki->kaio_jobqueue); cb; cb = TAILQ_NEXT(cb,
1877 plist)) {
1878 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1879 jobref) {
1880 td->td_retval[0] = EINPROGRESS;
1881 splx(s);
1882 return (0);
1883 }
1884 }
1885
1886 for (cb = TAILQ_FIRST(&ki->kaio_sockqueue); cb; cb = TAILQ_NEXT(cb,
1887 plist)) {
1888 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1889 jobref) {
1890 td->td_retval[0] = EINPROGRESS;
1891 splx(s);
1892 return (0);
1893 }
1894 }
1895 splx(s);
1896
1897 s = splbio();
1898 for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = TAILQ_NEXT(cb,
1899 plist)) {
1900 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1901 jobref) {
1902 td->td_retval[0] = cb->uaiocb._aiocb_private.error;
1903 splx(s);
1904 return (0);
1905 }
1906 }
1907
1908 for (cb = TAILQ_FIRST(&ki->kaio_bufqueue); cb; cb = TAILQ_NEXT(cb,
1909 plist)) {
1910 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1911 jobref) {
1912 td->td_retval[0] = EINPROGRESS;
1913 splx(s);
1914 return (0);
1915 }
1916 }
1917 splx(s);
1918
1919 #if (0)
1920 /*
1921 * Hack for lio.
1922 */
1923 status = fuword(&uap->aiocbp->_aiocb_private.status);
1924 if (status == -1)
1925 return fuword(&uap->aiocbp->_aiocb_private.error);
1926 #endif
1927 return (EINVAL);
1928 }
1929
1930 /* syscall - asynchronous read from a file (REALTIME) */
1931 int
1932 aio_read(struct thread *td, struct aio_read_args *uap)
1933 {
1934
1935 return aio_aqueue(td, uap->aiocbp, LIO_READ);
1936 }
1937
1938 /* syscall - asynchronous write to a file (REALTIME) */
1939 int
1940 aio_write(struct thread *td, struct aio_write_args *uap)
1941 {
1942
1943 return aio_aqueue(td, uap->aiocbp, LIO_WRITE);
1944 }
1945
1946 /* syscall - list directed I/O (REALTIME) */
1947 int
1948 lio_listio(struct thread *td, struct lio_listio_args *uap)
1949 {
1950 struct proc *p = td->td_proc;
1951 int nent, nentqueued;
1952 struct aiocb *iocb, * const *cbptr;
1953 struct aiocblist *cb;
1954 struct kaioinfo *ki;
1955 struct aio_liojob *lj;
1956 int error, runningcode;
1957 int nerror;
1958 int i;
1959 int s;
1960
1961 if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT))
1962 return (EINVAL);
1963
1964 nent = uap->nent;
1965 if (nent < 0 || nent > AIO_LISTIO_MAX)
1966 return (EINVAL);
1967
1968 if (p->p_aioinfo == NULL)
1969 aio_init_aioinfo(p);
1970
1971 if ((nent + num_queue_count) > max_queue_count)
1972 return (EAGAIN);
1973
1974 ki = p->p_aioinfo;
1975 if ((nent + ki->kaio_queue_count) > ki->kaio_qallowed_count)
1976 return (EAGAIN);
1977
1978 lj = uma_zalloc(aiolio_zone, M_WAITOK);
1979 if (!lj)
1980 return (EAGAIN);
1981
1982 lj->lioj_flags = 0;
1983 lj->lioj_buffer_count = 0;
1984 lj->lioj_buffer_finished_count = 0;
1985 lj->lioj_queue_count = 0;
1986 lj->lioj_queue_finished_count = 0;
1987 lj->lioj_ki = ki;
1988
1989 /*
1990 * Setup signal.
1991 */
1992 if (uap->sig && (uap->mode == LIO_NOWAIT)) {
1993 error = copyin(uap->sig, &lj->lioj_signal,
1994 sizeof(lj->lioj_signal));
1995 if (error) {
1996 uma_zfree(aiolio_zone, lj);
1997 return (error);
1998 }
1999 if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
2000 uma_zfree(aiolio_zone, lj);
2001 return (EINVAL);
2002 }
2003 lj->lioj_flags |= LIOJ_SIGNAL;
2004 }
2005 TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
2006 /*
2007 * Get pointers to the list of I/O requests.
2008 */
2009 nerror = 0;
2010 nentqueued = 0;
2011 cbptr = uap->acb_list;
2012 for (i = 0; i < uap->nent; i++) {
2013 iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
2014 if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
2015 error = _aio_aqueue(td, iocb, lj, 0);
2016 if (error == 0)
2017 nentqueued++;
2018 else
2019 nerror++;
2020 }
2021 }
2022
2023 /*
2024 * If we haven't queued any, then just return error.
2025 */
2026 if (nentqueued == 0)
2027 return (0);
2028
2029 /*
2030 * Calculate the appropriate error return.
2031 */
2032 runningcode = 0;
2033 if (nerror)
2034 runningcode = EIO;
2035
2036 if (uap->mode == LIO_WAIT) {
2037 int command, found, jobref;
2038
2039 for (;;) {
2040 found = 0;
2041 for (i = 0; i < uap->nent; i++) {
2042 /*
2043 * Fetch address of the control buf pointer in
2044 * user space.
2045 */
2046 iocb = (struct aiocb *)
2047 (intptr_t)fuword(&cbptr[i]);
2048 if (((intptr_t)iocb == -1) || ((intptr_t)iocb
2049 == 0))
2050 continue;
2051
2052 /*
2053 * Fetch the associated command from user space.
2054 */
2055 command = fuword(&iocb->aio_lio_opcode);
2056 if (command == LIO_NOP) {
2057 found++;
2058 continue;
2059 }
2060
2061 jobref =
2062 fuword(&iocb->_aiocb_private.kernelinfo);
2063
2064 TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
2065 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
2066 == jobref) {
2067 if (cb->uaiocb.aio_lio_opcode
2068 == LIO_WRITE) {
2069 p->p_stats->p_ru.ru_oublock
2070 +=
2071 cb->outputcharge;
2072 cb->outputcharge = 0;
2073 } else if (cb->uaiocb.aio_lio_opcode
2074 == LIO_READ) {
2075 p->p_stats->p_ru.ru_inblock
2076 += cb->inputcharge;
2077 cb->inputcharge = 0;
2078 }
2079 found++;
2080 break;
2081 }
2082 }
2083
2084 s = splbio();
2085 TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) {
2086 if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
2087 == jobref) {
2088 found++;
2089 break;
2090 }
2091 }
2092 splx(s);
2093 }
2094
2095 /*
2096 * If all I/Os have been disposed of, then we can
2097 * return.
2098 */
2099 if (found == nentqueued)
2100 return (runningcode);
2101
2102 ki->kaio_flags |= KAIO_WAKEUP;
2103 error = tsleep(p, PRIBIO | PCATCH, "aiospn", 0);
2104
2105 if (error == EINTR)
2106 return (EINTR);
2107 else if (error == EWOULDBLOCK)
2108 return (EAGAIN);
2109 }
2110 }
2111
2112 return (runningcode);
2113 }
2114
2115 /*
2116 * This is a weird hack so that we can post a signal. It is safe to do so from
2117 * a timeout routine, but *not* from an interrupt routine.
2118 */
2119 static void
2120 process_signal(void *aioj)
2121 {
2122 struct aiocblist *aiocbe = aioj;
2123 struct aio_liojob *lj = aiocbe->lio;
2124 struct aiocb *cb = &aiocbe->uaiocb;
2125
2126 if ((lj) && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL) &&
2127 (lj->lioj_queue_count == lj->lioj_queue_finished_count)) {
2128 PROC_LOCK(lj->lioj_ki->kaio_p);
2129 psignal(lj->lioj_ki->kaio_p, lj->lioj_signal.sigev_signo);
2130 PROC_UNLOCK(lj->lioj_ki->kaio_p);
2131 lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
2132 }
2133
2134 if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2135 PROC_LOCK(aiocbe->userproc);
2136 psignal(aiocbe->userproc, cb->aio_sigevent.sigev_signo);
2137 PROC_UNLOCK(aiocbe->userproc);
2138 }
2139 }
2140
2141 /*
2142 * Interrupt handler for physio, performs the necessary process wakeups, and
2143 * signals.
2144 */
2145 static void
2146 aio_physwakeup(struct buf *bp)
2147 {
2148 struct aiocblist *aiocbe;
2149 struct proc *p;
2150 struct kaioinfo *ki;
2151 struct aio_liojob *lj;
2152
2153 wakeup(bp);
2154
2155 aiocbe = (struct aiocblist *)bp->b_caller1;
2156 if (aiocbe) {
2157 p = aiocbe->userproc;
2158
2159 aiocbe->jobstate = JOBST_JOBBFINISHED;
2160 aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
2161 aiocbe->uaiocb._aiocb_private.error = 0;
2162 aiocbe->jobflags |= AIOCBLIST_DONE;
2163
2164 if (bp->b_ioflags & BIO_ERROR)
2165 aiocbe->uaiocb._aiocb_private.error = bp->b_error;
2166
2167 lj = aiocbe->lio;
2168 if (lj) {
2169 lj->lioj_buffer_finished_count++;
2170
2171 /*
2172 * wakeup/signal if all of the interrupt jobs are done.
2173 */
2174 if (lj->lioj_buffer_finished_count ==
2175 lj->lioj_buffer_count) {
2176 /*
2177 * Post a signal if it is called for.
2178 */
2179 if ((lj->lioj_flags &
2180 (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) ==
2181 LIOJ_SIGNAL) {
2182 lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
2183 aiocbe->timeouthandle =
2184 timeout(process_signal,
2185 aiocbe, 0);
2186 }
2187 }
2188 }
2189
2190 ki = p->p_aioinfo;
2191 if (ki) {
2192 ki->kaio_buffer_finished_count++;
2193 TAILQ_REMOVE(&aio_bufjobs, aiocbe, list);
2194 TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
2195 TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
2196
2197 KNOTE_UNLOCKED(&aiocbe->klist, 0);
2198 /* Do the wakeup. */
2199 if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
2200 ki->kaio_flags &= ~KAIO_WAKEUP;
2201 wakeup(p);
2202 }
2203 }
2204
2205 if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL)
2206 aiocbe->timeouthandle =
2207 timeout(process_signal, aiocbe, 0);
2208 }
2209 }
2210
2211 /* syscall - wait for the next completion of an aio request */
2212 int
2213 aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
2214 {
2215 struct proc *p = td->td_proc;
2216 struct timeval atv;
2217 struct timespec ts;
2218 struct kaioinfo *ki;
2219 struct aiocblist *cb = NULL;
2220 int error, s, timo;
2221
2222 suword(uap->aiocbp, (int)NULL);
2223
2224 timo = 0;
2225 if (uap->timeout) {
2226 /* Get timespec struct. */
2227 error = copyin(uap->timeout, &ts, sizeof(ts));
2228 if (error)
2229 return (error);
2230
2231 if ((ts.tv_nsec < 0) || (ts.tv_nsec >= 1000000000))
2232 return (EINVAL);
2233
2234 TIMESPEC_TO_TIMEVAL(&atv, &ts);
2235 if (itimerfix(&atv))
2236 return (EINVAL);
2237 timo = tvtohz(&atv);
2238 }
2239
2240 ki = p->p_aioinfo;
2241 if (ki == NULL)
2242 return (EAGAIN);
2243
2244 for (;;) {
2245 if ((cb = TAILQ_FIRST(&ki->kaio_jobdone)) != 0) {
2246 suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
2247 td->td_retval[0] = cb->uaiocb._aiocb_private.status;
2248 if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
2249 p->p_stats->p_ru.ru_oublock +=
2250 cb->outputcharge;
2251 cb->outputcharge = 0;
2252 } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
2253 p->p_stats->p_ru.ru_inblock += cb->inputcharge;
2254 cb->inputcharge = 0;
2255 }
2256 aio_free_entry(cb);
2257 return (cb->uaiocb._aiocb_private.error);
2258 }
2259
2260 s = splbio();
2261 if ((cb = TAILQ_FIRST(&ki->kaio_bufdone)) != 0 ) {
2262 splx(s);
2263 suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
2264 td->td_retval[0] = cb->uaiocb._aiocb_private.status;
2265 aio_free_entry(cb);
2266 return (cb->uaiocb._aiocb_private.error);
2267 }
2268
2269 ki->kaio_flags |= KAIO_WAKEUP;
2270 error = tsleep(p, PRIBIO | PCATCH, "aiowc", timo);
2271 splx(s);
2272
2273 if (error == ERESTART)
2274 return (EINTR);
2275 else if (error < 0)
2276 return (error);
2277 else if (error == EINTR)
2278 return (EINTR);
2279 else if (error == EWOULDBLOCK)
2280 return (EAGAIN);
2281 }
2282 }
2283
2284 /* kqueue attach function */
2285 static int
2286 filt_aioattach(struct knote *kn)
2287 {
2288 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2289
2290 /*
2291 * The aiocbe pointer must be validated before using it, so
2292 * registration is restricted to the kernel; the user cannot
2293 * set EV_FLAG1.
2294 */
2295 if ((kn->kn_flags & EV_FLAG1) == 0)
2296 return (EPERM);
2297 kn->kn_flags &= ~EV_FLAG1;
2298
2299 knlist_add(&aiocbe->klist, kn, 0);
2300
2301 return (0);
2302 }
2303
2304 /* kqueue detach function */
2305 static void
2306 filt_aiodetach(struct knote *kn)
2307 {
2308 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2309
2310 knlist_remove(&aiocbe->klist, kn, 0);
2311 }
2312
2313 /* kqueue filter function */
2314 /*ARGSUSED*/
2315 static int
2316 filt_aio(struct knote *kn, long hint)
2317 {
2318 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2319
2320 kn->kn_data = aiocbe->uaiocb._aiocb_private.error;
2321 if (aiocbe->jobstate != JOBST_JOBFINISHED &&
2322 aiocbe->jobstate != JOBST_JOBBFINISHED)
2323 return (0);
2324 kn->kn_flags |= EV_EOF;
2325 return (1);
2326 }
Cache object: 73f850c5d743ce5d98eb02ff7e168c29
|