1 /*
2 * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org>
3 * All rights reserved.
4 *
5 * Copyright (c) 2012 Spectra Logic Corporation. All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
31
32 #include <sys/param.h>
33 #include <sys/ck.h>
34 #include <sys/epoch.h>
35 #include <sys/kernel.h>
36 #include <sys/kmem.h>
37 #include <sys/lock.h>
38 #include <sys/mutex.h>
39 #include <sys/queue.h>
40 #include <sys/taskq.h>
41 #include <sys/taskqueue.h>
42 #include <sys/zfs_context.h>
43
44 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__)
45 #include <machine/pcb.h>
46 #endif
47
48 #include <vm/uma.h>
49
50 #if __FreeBSD_version < 1201522
51 #define taskqueue_start_threads_in_proc(tqp, count, pri, proc, name, ...) \
52 taskqueue_start_threads(tqp, count, pri, name, __VA_ARGS__)
53 #endif
54
55 static uint_t taskq_tsd;
56 static uma_zone_t taskq_zone;
57
58 /*
59 * Global system-wide dynamic task queue available for all consumers. This
60 * taskq is not intended for long-running tasks; instead, a dedicated taskq
61 * should be created.
62 */
63 taskq_t *system_taskq = NULL;
64 taskq_t *system_delay_taskq = NULL;
65 taskq_t *dynamic_taskq = NULL;
66
67 proc_t *system_proc;
68
69 extern int uma_align_cache;
70
71 static MALLOC_DEFINE(M_TASKQ, "taskq", "taskq structures");
72
73 static CK_LIST_HEAD(tqenthashhead, taskq_ent) *tqenthashtbl;
74 static unsigned long tqenthash;
75 static unsigned long tqenthashlock;
76 static struct sx *tqenthashtbl_lock;
77
78 static taskqid_t tqidnext;
79
80 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
81 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
82
83 #define TIMEOUT_TASK 1
84 #define NORMAL_TASK 2
85
86 static void
87 system_taskq_init(void *arg)
88 {
89 int i;
90
91 tsd_create(&taskq_tsd, NULL);
92 tqenthashtbl = hashinit(mp_ncpus * 8, M_TASKQ, &tqenthash);
93 tqenthashlock = (tqenthash + 1) / 8;
94 if (tqenthashlock > 0)
95 tqenthashlock--;
96 tqenthashtbl_lock =
97 malloc(sizeof (*tqenthashtbl_lock) * (tqenthashlock + 1),
98 M_TASKQ, M_WAITOK | M_ZERO);
99 for (i = 0; i < tqenthashlock + 1; i++)
100 sx_init_flags(&tqenthashtbl_lock[i], "tqenthash", SX_DUPOK);
101 taskq_zone = uma_zcreate("taskq_zone", sizeof (taskq_ent_t),
102 NULL, NULL, NULL, NULL,
103 UMA_ALIGN_CACHE, 0);
104 system_taskq = taskq_create("system_taskq", mp_ncpus, minclsyspri,
105 0, 0, 0);
106 system_delay_taskq = taskq_create("system_delay_taskq", mp_ncpus,
107 minclsyspri, 0, 0, 0);
108 }
109 SYSINIT(system_taskq_init, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_init,
110 NULL);
111
112 static void
113 system_taskq_fini(void *arg)
114 {
115 int i;
116
117 taskq_destroy(system_delay_taskq);
118 taskq_destroy(system_taskq);
119 uma_zdestroy(taskq_zone);
120 tsd_destroy(&taskq_tsd);
121 for (i = 0; i < tqenthashlock + 1; i++)
122 sx_destroy(&tqenthashtbl_lock[i]);
123 for (i = 0; i < tqenthash + 1; i++)
124 VERIFY(CK_LIST_EMPTY(&tqenthashtbl[i]));
125 free(tqenthashtbl_lock, M_TASKQ);
126 free(tqenthashtbl, M_TASKQ);
127 }
128 SYSUNINIT(system_taskq_fini, SI_SUB_CONFIGURE, SI_ORDER_ANY, system_taskq_fini,
129 NULL);
130
131 #ifdef __LP64__
132 static taskqid_t
133 __taskq_genid(void)
134 {
135 taskqid_t tqid;
136
137 /*
138 * Assume a 64-bit counter will not wrap in practice.
139 */
140 tqid = atomic_add_64_nv(&tqidnext, 1);
141 VERIFY(tqid);
142 return (tqid);
143 }
144 #else
145 static taskqid_t
146 __taskq_genid(void)
147 {
148 taskqid_t tqid;
149
150 for (;;) {
151 tqid = atomic_add_32_nv(&tqidnext, 1);
152 if (__predict_true(tqid != 0))
153 break;
154 }
155 VERIFY(tqid);
156 return (tqid);
157 }
158 #endif
159
160 static taskq_ent_t *
161 taskq_lookup(taskqid_t tqid)
162 {
163 taskq_ent_t *ent = NULL;
164
165 sx_xlock(TQIDHASHLOCK(tqid));
166 CK_LIST_FOREACH(ent, TQIDHASH(tqid), tqent_hash) {
167 if (ent->tqent_id == tqid)
168 break;
169 }
170 if (ent != NULL)
171 refcount_acquire(&ent->tqent_rc);
172 sx_xunlock(TQIDHASHLOCK(tqid));
173 return (ent);
174 }
175
176 static taskqid_t
177 taskq_insert(taskq_ent_t *ent)
178 {
179 taskqid_t tqid;
180
181 tqid = __taskq_genid();
182 ent->tqent_id = tqid;
183 ent->tqent_registered = B_TRUE;
184 sx_xlock(TQIDHASHLOCK(tqid));
185 CK_LIST_INSERT_HEAD(TQIDHASH(tqid), ent, tqent_hash);
186 sx_xunlock(TQIDHASHLOCK(tqid));
187 return (tqid);
188 }
189
190 static void
191 taskq_remove(taskq_ent_t *ent)
192 {
193 taskqid_t tqid = ent->tqent_id;
194
195 if (!ent->tqent_registered)
196 return;
197
198 sx_xlock(TQIDHASHLOCK(tqid));
199 CK_LIST_REMOVE(ent, tqent_hash);
200 sx_xunlock(TQIDHASHLOCK(tqid));
201 ent->tqent_registered = B_FALSE;
202 }
203
204 static void
205 taskq_tsd_set(void *context)
206 {
207 taskq_t *tq = context;
208
209 #if defined(__amd64__) || defined(__aarch64__)
210 if (context != NULL && tsd_get(taskq_tsd) == NULL)
211 fpu_kern_thread(FPU_KERN_NORMAL);
212 #endif
213 tsd_set(taskq_tsd, tq);
214 }
215
216 static taskq_t *
217 taskq_create_impl(const char *name, int nthreads, pri_t pri,
218 proc_t *proc __maybe_unused, uint_t flags)
219 {
220 taskq_t *tq;
221
222 if ((flags & TASKQ_THREADS_CPU_PCT) != 0)
223 nthreads = MAX((mp_ncpus * nthreads) / 100, 1);
224
225 tq = kmem_alloc(sizeof (*tq), KM_SLEEP);
226 tq->tq_queue = taskqueue_create(name, M_WAITOK,
227 taskqueue_thread_enqueue, &tq->tq_queue);
228 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_INIT,
229 taskq_tsd_set, tq);
230 taskqueue_set_callback(tq->tq_queue, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN,
231 taskq_tsd_set, NULL);
232 (void) taskqueue_start_threads_in_proc(&tq->tq_queue, nthreads, pri,
233 proc, "%s", name);
234
235 return ((taskq_t *)tq);
236 }
237
238 taskq_t *
239 taskq_create(const char *name, int nthreads, pri_t pri, int minalloc __unused,
240 int maxalloc __unused, uint_t flags)
241 {
242 return (taskq_create_impl(name, nthreads, pri, system_proc, flags));
243 }
244
245 taskq_t *
246 taskq_create_proc(const char *name, int nthreads, pri_t pri,
247 int minalloc __unused, int maxalloc __unused, proc_t *proc, uint_t flags)
248 {
249 return (taskq_create_impl(name, nthreads, pri, proc, flags));
250 }
251
252 void
253 taskq_destroy(taskq_t *tq)
254 {
255
256 taskqueue_free(tq->tq_queue);
257 kmem_free(tq, sizeof (*tq));
258 }
259
260 int
261 taskq_member(taskq_t *tq, kthread_t *thread)
262 {
263
264 return (taskqueue_member(tq->tq_queue, thread));
265 }
266
267 taskq_t *
268 taskq_of_curthread(void)
269 {
270 return (tsd_get(taskq_tsd));
271 }
272
273 static void
274 taskq_free(taskq_ent_t *task)
275 {
276 taskq_remove(task);
277 if (refcount_release(&task->tqent_rc))
278 uma_zfree(taskq_zone, task);
279 }
280
281 int
282 taskq_cancel_id(taskq_t *tq, taskqid_t tid)
283 {
284 uint32_t pend;
285 int rc;
286 taskq_ent_t *ent;
287
288 if (tid == 0)
289 return (0);
290
291 if ((ent = taskq_lookup(tid)) == NULL)
292 return (0);
293
294 ent->tqent_cancelled = B_TRUE;
295 if (ent->tqent_type == TIMEOUT_TASK) {
296 rc = taskqueue_cancel_timeout(tq->tq_queue,
297 &ent->tqent_timeout_task, &pend);
298 } else
299 rc = taskqueue_cancel(tq->tq_queue, &ent->tqent_task, &pend);
300 if (rc == EBUSY) {
301 taskqueue_drain(tq->tq_queue, &ent->tqent_task);
302 } else if (pend) {
303 /*
304 * Tasks normally free themselves when run, but here the task
305 * was cancelled so it did not free itself.
306 */
307 taskq_free(ent);
308 }
309 /* Free the extra reference we added with taskq_lookup. */
310 taskq_free(ent);
311 return (rc);
312 }
313
314 static void
315 taskq_run(void *arg, int pending __unused)
316 {
317 taskq_ent_t *task = arg;
318
319 if (!task->tqent_cancelled)
320 task->tqent_func(task->tqent_arg);
321 taskq_free(task);
322 }
323
324 taskqid_t
325 taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg,
326 uint_t flags, clock_t expire_time)
327 {
328 taskq_ent_t *task;
329 taskqid_t tqid;
330 clock_t timo;
331 int mflag;
332
333 timo = expire_time - ddi_get_lbolt();
334 if (timo <= 0)
335 return (taskq_dispatch(tq, func, arg, flags));
336
337 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
338 mflag = M_WAITOK;
339 else
340 mflag = M_NOWAIT;
341
342 task = uma_zalloc(taskq_zone, mflag);
343 if (task == NULL)
344 return (0);
345 task->tqent_func = func;
346 task->tqent_arg = arg;
347 task->tqent_type = TIMEOUT_TASK;
348 task->tqent_cancelled = B_FALSE;
349 refcount_init(&task->tqent_rc, 1);
350 tqid = taskq_insert(task);
351 TIMEOUT_TASK_INIT(tq->tq_queue, &task->tqent_timeout_task, 0,
352 taskq_run, task);
353
354 taskqueue_enqueue_timeout(tq->tq_queue, &task->tqent_timeout_task,
355 timo);
356 return (tqid);
357 }
358
359 taskqid_t
360 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
361 {
362 taskq_ent_t *task;
363 int mflag, prio;
364 taskqid_t tqid;
365
366 if ((flags & (TQ_SLEEP | TQ_NOQUEUE)) == TQ_SLEEP)
367 mflag = M_WAITOK;
368 else
369 mflag = M_NOWAIT;
370 /*
371 * If TQ_FRONT is given, we want higher priority for this task, so it
372 * can go at the front of the queue.
373 */
374 prio = !!(flags & TQ_FRONT);
375
376 task = uma_zalloc(taskq_zone, mflag);
377 if (task == NULL)
378 return (0);
379 refcount_init(&task->tqent_rc, 1);
380 task->tqent_func = func;
381 task->tqent_arg = arg;
382 task->tqent_cancelled = B_FALSE;
383 task->tqent_type = NORMAL_TASK;
384 tqid = taskq_insert(task);
385 TASK_INIT(&task->tqent_task, prio, taskq_run, task);
386 taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
387 return (tqid);
388 }
389
390 static void
391 taskq_run_ent(void *arg, int pending __unused)
392 {
393 taskq_ent_t *task = arg;
394
395 task->tqent_func(task->tqent_arg);
396 }
397
398 void
399 taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint32_t flags,
400 taskq_ent_t *task)
401 {
402 int prio;
403
404 /*
405 * If TQ_FRONT is given, we want higher priority for this task, so it
406 * can go at the front of the queue.
407 */
408 prio = !!(flags & TQ_FRONT);
409 task->tqent_cancelled = B_FALSE;
410 task->tqent_registered = B_FALSE;
411 task->tqent_id = 0;
412 task->tqent_func = func;
413 task->tqent_arg = arg;
414
415 TASK_INIT(&task->tqent_task, prio, taskq_run_ent, task);
416 taskqueue_enqueue(tq->tq_queue, &task->tqent_task);
417 }
418
419 void
420 taskq_wait(taskq_t *tq)
421 {
422 taskqueue_quiesce(tq->tq_queue);
423 }
424
425 void
426 taskq_wait_id(taskq_t *tq, taskqid_t tid)
427 {
428 taskq_ent_t *ent;
429
430 if (tid == 0)
431 return;
432 if ((ent = taskq_lookup(tid)) == NULL)
433 return;
434
435 taskqueue_drain(tq->tq_queue, &ent->tqent_task);
436 taskq_free(ent);
437 }
438
439 void
440 taskq_wait_outstanding(taskq_t *tq, taskqid_t id __unused)
441 {
442 taskqueue_drain_all(tq->tq_queue);
443 }
444
445 int
446 taskq_empty_ent(taskq_ent_t *t)
447 {
448 return (t->tqent_task.ta_pending == 0);
449 }
Cache object: 1869a30a34232f5a6008cfa2626e6e33
|