1 /*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/interrupt.h>
34 #include <sys/kernel.h>
35 #include <sys/kthread.h>
36 #include <sys/lock.h>
37 #include <sys/malloc.h>
38 #include <sys/mutex.h>
39 #include <sys/proc.h>
40 #include <sys/sched.h>
41 #include <sys/taskqueue.h>
42 #include <sys/unistd.h>
43 #include <machine/stdarg.h>
44
45 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46 static void *taskqueue_giant_ih;
47 static void *taskqueue_ih;
48 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
49 static struct mtx taskqueue_queues_mutex;
50
51 struct taskqueue_busy {
52 struct task *tb_running;
53 TAILQ_ENTRY(taskqueue_busy) tb_link;
54 };
55
56 struct taskqueue {
57 STAILQ_ENTRY(taskqueue) tq_link;
58 STAILQ_HEAD(, task) tq_queue;
59 const char *tq_name;
60 taskqueue_enqueue_fn tq_enqueue;
61 void *tq_context;
62 TAILQ_HEAD(, taskqueue_busy) tq_active;
63 struct mtx tq_mutex;
64 struct proc **tq_pproc;
65 int tq_pcount;
66 int tq_spin;
67 int tq_flags;
68 };
69
70 #define TQ_FLAGS_ACTIVE (1 << 0)
71 #define TQ_FLAGS_BLOCKED (1 << 1)
72 #define TQ_FLAGS_PENDING (1 << 2)
73
74 static void taskqueue_run_locked(struct taskqueue *);
75
76 static __inline void
77 TQ_LOCK(struct taskqueue *tq)
78 {
79 if (tq->tq_spin)
80 mtx_lock_spin(&tq->tq_mutex);
81 else
82 mtx_lock(&tq->tq_mutex);
83 }
84
85 static __inline void
86 TQ_UNLOCK(struct taskqueue *tq)
87 {
88 if (tq->tq_spin)
89 mtx_unlock_spin(&tq->tq_mutex);
90 else
91 mtx_unlock(&tq->tq_mutex);
92 }
93
94 static void init_taskqueue_list(void *data);
95
96 static __inline int
97 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
98 int t)
99 {
100 if (tq->tq_spin)
101 return (msleep_spin(p, m, wm, t));
102 return (msleep(p, m, pri, wm, t));
103 }
104
105 static void
106 init_taskqueue_list(void *data __unused)
107 {
108
109 mtx_init(&taskqueue_queues_mutex, "taskqueue list", NULL, MTX_DEF);
110 STAILQ_INIT(&taskqueue_queues);
111 }
112 SYSINIT(taskqueue_list, SI_SUB_INTRINSIC, SI_ORDER_ANY, init_taskqueue_list,
113 NULL);
114
115 static struct taskqueue *
116 _taskqueue_create(const char *name, int mflags,
117 taskqueue_enqueue_fn enqueue, void *context,
118 int mtxflags, const char *mtxname)
119 {
120 struct taskqueue *queue;
121
122 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
123 if (!queue)
124 return 0;
125
126 STAILQ_INIT(&queue->tq_queue);
127 TAILQ_INIT(&queue->tq_active);
128 queue->tq_name = name;
129 queue->tq_enqueue = enqueue;
130 queue->tq_context = context;
131 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
132 queue->tq_flags |= TQ_FLAGS_ACTIVE;
133 mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags);
134
135 mtx_lock(&taskqueue_queues_mutex);
136 STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
137 mtx_unlock(&taskqueue_queues_mutex);
138
139 return queue;
140 }
141
142 struct taskqueue *
143 taskqueue_create(const char *name, int mflags,
144 taskqueue_enqueue_fn enqueue, void *context)
145 {
146 return _taskqueue_create(name, mflags, enqueue, context,
147 MTX_DEF, "taskqueue");
148 }
149
150 /*
151 * Signal a taskqueue thread to terminate.
152 */
153 static void
154 taskqueue_terminate(struct proc **pp, struct taskqueue *tq)
155 {
156
157 while (tq->tq_pcount > 0) {
158 wakeup(tq);
159 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
160 }
161 }
162
163 void
164 taskqueue_free(struct taskqueue *queue)
165 {
166
167 mtx_lock(&taskqueue_queues_mutex);
168 STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
169 mtx_unlock(&taskqueue_queues_mutex);
170
171 TQ_LOCK(queue);
172 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
173 taskqueue_run_locked(queue);
174 taskqueue_terminate(queue->tq_pproc, queue);
175 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
176 mtx_destroy(&queue->tq_mutex);
177 free(queue->tq_pproc, M_TASKQUEUE);
178 free(queue, M_TASKQUEUE);
179 }
180
181 /*
182 * Returns with the taskqueue locked.
183 */
184 struct taskqueue *
185 taskqueue_find(const char *name)
186 {
187 struct taskqueue *queue;
188
189 mtx_lock(&taskqueue_queues_mutex);
190 STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
191 if (strcmp(queue->tq_name, name) == 0) {
192 TQ_LOCK(queue);
193 mtx_unlock(&taskqueue_queues_mutex);
194 return queue;
195 }
196 }
197 mtx_unlock(&taskqueue_queues_mutex);
198 return NULL;
199 }
200
201 int
202 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
203 {
204 struct task *ins;
205 struct task *prev;
206
207 TQ_LOCK(queue);
208
209 /*
210 * Count multiple enqueues.
211 */
212 if (task->ta_pending) {
213 task->ta_pending++;
214 TQ_UNLOCK(queue);
215 return 0;
216 }
217
218 /*
219 * Optimise the case when all tasks have the same priority.
220 */
221 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
222 if (!prev || prev->ta_priority >= task->ta_priority) {
223 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
224 } else {
225 prev = 0;
226 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
227 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
228 if (ins->ta_priority < task->ta_priority)
229 break;
230
231 if (prev)
232 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
233 else
234 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
235 }
236
237 task->ta_pending = 1;
238 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
239 queue->tq_enqueue(queue->tq_context);
240 else
241 queue->tq_flags |= TQ_FLAGS_PENDING;
242
243 TQ_UNLOCK(queue);
244
245 return 0;
246 }
247
248 void
249 taskqueue_block(struct taskqueue *queue)
250 {
251
252 TQ_LOCK(queue);
253 queue->tq_flags |= TQ_FLAGS_BLOCKED;
254 TQ_UNLOCK(queue);
255 }
256
257 void
258 taskqueue_unblock(struct taskqueue *queue)
259 {
260
261 TQ_LOCK(queue);
262 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
263 if (queue->tq_flags & TQ_FLAGS_PENDING) {
264 queue->tq_flags &= ~TQ_FLAGS_PENDING;
265 queue->tq_enqueue(queue->tq_context);
266 }
267 TQ_UNLOCK(queue);
268 }
269
270 static void
271 taskqueue_run_locked(struct taskqueue *queue)
272 {
273 struct taskqueue_busy tb;
274 struct task *task;
275 int pending;
276
277 mtx_assert(&queue->tq_mutex, MA_OWNED);
278 tb.tb_running = NULL;
279 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
280
281 while (STAILQ_FIRST(&queue->tq_queue)) {
282 /*
283 * Carefully remove the first task from the queue and
284 * zero its pending count.
285 */
286 task = STAILQ_FIRST(&queue->tq_queue);
287 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
288 pending = task->ta_pending;
289 task->ta_pending = 0;
290 tb.tb_running = task;
291 TQ_UNLOCK(queue);
292
293 task->ta_func(task->ta_context, pending);
294
295 TQ_LOCK(queue);
296 tb.tb_running = NULL;
297 wakeup(task);
298 }
299 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
300 }
301
302 void
303 taskqueue_run(struct taskqueue *queue)
304 {
305
306 TQ_LOCK(queue);
307 taskqueue_run_locked(queue);
308 TQ_UNLOCK(queue);
309 }
310
311 static int
312 task_is_running(struct taskqueue *queue, struct task *task)
313 {
314 struct taskqueue_busy *tb;
315
316 mtx_assert(&queue->tq_mutex, MA_OWNED);
317 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
318 if (tb->tb_running == task)
319 return (1);
320 }
321 return (0);
322 }
323
324 void
325 taskqueue_drain(struct taskqueue *queue, struct task *task)
326 {
327 if (queue->tq_spin) { /* XXX */
328 mtx_lock_spin(&queue->tq_mutex);
329 while (task->ta_pending != 0 || task_is_running(queue, task))
330 msleep_spin(task, &queue->tq_mutex, "-", 0);
331 mtx_unlock_spin(&queue->tq_mutex);
332 } else {
333 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
334
335 mtx_lock(&queue->tq_mutex);
336 while (task->ta_pending != 0 || task_is_running(queue, task))
337 msleep(task, &queue->tq_mutex, PWAIT, "-", 0);
338 mtx_unlock(&queue->tq_mutex);
339 }
340 }
341
342 static void
343 taskqueue_swi_enqueue(void *context)
344 {
345 swi_sched(taskqueue_ih, 0);
346 }
347
348 static void
349 taskqueue_swi_run(void *dummy)
350 {
351 taskqueue_run(taskqueue_swi);
352 }
353
354 static void
355 taskqueue_swi_giant_enqueue(void *context)
356 {
357 swi_sched(taskqueue_giant_ih, 0);
358 }
359
360 static void
361 taskqueue_swi_giant_run(void *dummy)
362 {
363 taskqueue_run(taskqueue_swi_giant);
364 }
365
366 int
367 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
368 const char *name, ...)
369 {
370 va_list ap;
371 struct taskqueue *tq;
372 struct thread *td;
373 char ktname[MAXCOMLEN + 1];
374 int i, error;
375
376 if (count <= 0)
377 return (EINVAL);
378 tq = *tqp;
379
380 va_start(ap, name);
381 vsnprintf(ktname, sizeof(ktname), name, ap);
382 va_end(ap);
383
384 tq->tq_pproc = malloc(sizeof(struct proc *) * count, M_TASKQUEUE,
385 M_NOWAIT | M_ZERO);
386 if (tq->tq_pproc == NULL) {
387 printf("%s: no memory for %s threads\n", __func__, ktname);
388 return (ENOMEM);
389 }
390
391 for (i = 0; i < count; i++) {
392 if (count == 1)
393 error = kthread_create(taskqueue_thread_loop, tqp,
394 &tq->tq_pproc[i], RFSTOPPED, 0, ktname);
395 else
396 error = kthread_create(taskqueue_thread_loop, tqp,
397 &tq->tq_pproc[i], RFSTOPPED, 0, "%s_%d", ktname, i);
398 if (error) {
399 /* should be ok to continue, taskqueue_free will dtrt */
400 printf("%s: kthread_create(%s): error %d",
401 __func__, ktname, error);
402 tq->tq_pproc[i] = NULL; /* paranoid */
403 } else
404 tq->tq_pcount++;
405 }
406 for (i = 0; i < count; i++) {
407 if (tq->tq_pproc[i] == NULL)
408 continue;
409 td = FIRST_THREAD_IN_PROC(tq->tq_pproc[i]);
410 thread_lock(td);
411 sched_prio(td, pri);
412 sched_add(td, SRQ_BORING);
413 thread_unlock(td);
414 }
415
416 return (0);
417 }
418
419 void
420 taskqueue_thread_loop(void *arg)
421 {
422 struct taskqueue **tqp, *tq;
423
424 tqp = arg;
425 tq = *tqp;
426 TQ_LOCK(tq);
427 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
428 taskqueue_run_locked(tq);
429 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
430 }
431
432 /* rendezvous with thread that asked us to terminate */
433 tq->tq_pcount--;
434 wakeup_one(tq->tq_pproc);
435 TQ_UNLOCK(tq);
436 kthread_exit(0);
437 }
438
439 void
440 taskqueue_thread_enqueue(void *context)
441 {
442 struct taskqueue **tqp, *tq;
443
444 tqp = context;
445 tq = *tqp;
446
447 mtx_assert(&tq->tq_mutex, MA_OWNED);
448 wakeup_one(tq);
449 }
450
451 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
452 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
453 INTR_MPSAFE, &taskqueue_ih));
454
455 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, 0,
456 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
457 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
458
459 TASKQUEUE_DEFINE_THREAD(thread);
460
461 struct taskqueue *
462 taskqueue_create_fast(const char *name, int mflags,
463 taskqueue_enqueue_fn enqueue, void *context)
464 {
465 return _taskqueue_create(name, mflags, enqueue, context,
466 MTX_SPIN, "fast_taskqueue");
467 }
468
469 /* NB: for backwards compatibility */
470 int
471 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
472 {
473 return taskqueue_enqueue(queue, task);
474 }
475
476 static void *taskqueue_fast_ih;
477
478 static void
479 taskqueue_fast_enqueue(void *context)
480 {
481 swi_sched(taskqueue_fast_ih, 0);
482 }
483
484 static void
485 taskqueue_fast_run(void *dummy)
486 {
487 taskqueue_run(taskqueue_fast);
488 }
489
490 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, 0,
491 swi_add(NULL, "Fast task queue", taskqueue_fast_run, NULL,
492 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
493
494 int
495 taskqueue_member(struct taskqueue *queue, struct thread *td)
496 {
497 int i, j, ret = 0;
498 struct thread *ptd;
499
500 TQ_LOCK(queue);
501 for (i = 0, j = 0; ; i++) {
502 if (queue->tq_pproc[i] == NULL)
503 continue;
504 ptd = FIRST_THREAD_IN_PROC(queue->tq_pproc[i]);
505 /*
506 * In releng7 all kprocs have only one kthread, so there is
507 * no need to use FOREACH_THREAD_IN_PROC instead.
508 * If this changes at some point, only the first 'if' needs
509 * to be included in the FOREACH_..., the second one can
510 * stay as it is.
511 */
512 if (ptd == td) {
513 ret = 1;
514 break;
515 }
516 if (++j >= queue->tq_pcount)
517 break;
518 }
519 TQ_UNLOCK(queue);
520 return (ret);
521 }
Cache object: c2d33956dd5b639840a2c18e05783852
|