FreeBSD/Linux Kernel Cross Reference
sys/net/mp_ring.c
1 /*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
35 #include <sys/lock.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
39
40 #if defined(__i386__)
41 #define atomic_cmpset_acq_64 atomic_cmpset_64
42 #define atomic_cmpset_rel_64 atomic_cmpset_64
43 #endif
44
45 #include <net/mp_ring.h>
46
47 union ring_state {
48 struct {
49 uint16_t pidx_head;
50 uint16_t pidx_tail;
51 uint16_t cidx;
52 uint16_t flags;
53 };
54 uint64_t state;
55 };
56
57 enum {
58 IDLE = 0, /* consumer ran to completion, nothing more to do. */
59 BUSY, /* consumer is running already, or will be shortly. */
60 STALLED, /* consumer stopped due to lack of resources. */
61 ABDICATED, /* consumer stopped even though there was work to be
62 done because it wants another thread to take over. */
63 };
64
65 static inline uint16_t
66 space_available(struct ifmp_ring *r, union ring_state s)
67 {
68 uint16_t x = r->size - 1;
69
70 if (s.cidx == s.pidx_head)
71 return (x);
72 else if (s.cidx > s.pidx_head)
73 return (s.cidx - s.pidx_head - 1);
74 else
75 return (x - s.pidx_head + s.cidx);
76 }
77
78 static inline uint16_t
79 increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80 {
81 int x = r->size - idx;
82
83 MPASS(x > 0);
84 return (x > n ? idx + n : n - x);
85 }
86
87 /* Consumer is about to update the ring's state to s */
88 static inline uint16_t
89 state_to_flags(union ring_state s, int abdicate)
90 {
91
92 if (s.cidx == s.pidx_tail)
93 return (IDLE);
94 else if (abdicate && s.pidx_tail != s.pidx_head)
95 return (ABDICATED);
96
97 return (BUSY);
98 }
99
100 #ifdef MP_RING_NO_64BIT_ATOMICS
101 static void
102 drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103 {
104 union ring_state ns;
105 int n, pending, total;
106 uint16_t cidx = os.cidx;
107 uint16_t pidx = os.pidx_tail;
108
109 MPASS(os.flags == BUSY);
110 MPASS(cidx != pidx);
111
112 if (prev == IDLE)
113 counter_u64_add(r->starts, 1);
114 pending = 0;
115 total = 0;
116
117 while (cidx != pidx) {
118
119 /* Items from cidx to pidx are available for consumption. */
120 n = r->drain(r, cidx, pidx);
121 if (n == 0) {
122 os.state = ns.state = r->state;
123 ns.cidx = cidx;
124 ns.flags = STALLED;
125 r->state = ns.state;
126 if (prev != STALLED)
127 counter_u64_add(r->stalls, 1);
128 else if (total > 0) {
129 counter_u64_add(r->restarts, 1);
130 counter_u64_add(r->stalls, 1);
131 }
132 break;
133 }
134 cidx = increment_idx(r, cidx, n);
135 pending += n;
136 total += n;
137
138 /*
139 * We update the cidx only if we've caught up with the pidx, the
140 * real cidx is getting too far ahead of the one visible to
141 * everyone else, or we have exceeded our budget.
142 */
143 if (cidx != pidx && pending < 64 && total < budget)
144 continue;
145
146 os.state = ns.state = r->state;
147 ns.cidx = cidx;
148 ns.flags = state_to_flags(ns, total >= budget);
149 r->state = ns.state;
150
151 if (ns.flags == ABDICATED)
152 counter_u64_add(r->abdications, 1);
153 if (ns.flags != BUSY) {
154 /* Wrong loop exit if we're going to stall. */
155 MPASS(ns.flags != STALLED);
156 if (prev == STALLED) {
157 MPASS(total > 0);
158 counter_u64_add(r->restarts, 1);
159 }
160 break;
161 }
162
163 /*
164 * The acquire style atomic above guarantees visibility of items
165 * associated with any pidx change that we notice here.
166 */
167 pidx = ns.pidx_tail;
168 pending = 0;
169 }
170 }
171 #else
172 /*
173 * Caller passes in a state, with a guarantee that there is work to do and that
174 * all items up to the pidx_tail in the state are visible.
175 */
176 static void
177 drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178 {
179 union ring_state ns;
180 int n, pending, total;
181 uint16_t cidx = os.cidx;
182 uint16_t pidx = os.pidx_tail;
183
184 MPASS(os.flags == BUSY);
185 MPASS(cidx != pidx);
186
187 if (prev == IDLE)
188 counter_u64_add(r->starts, 1);
189 pending = 0;
190 total = 0;
191
192 while (cidx != pidx) {
193
194 /* Items from cidx to pidx are available for consumption. */
195 n = r->drain(r, cidx, pidx);
196 if (n == 0) {
197 critical_enter();
198 do {
199 os.state = ns.state = r->state;
200 ns.cidx = cidx;
201 ns.flags = STALLED;
202 } while (atomic_cmpset_64(&r->state, os.state,
203 ns.state) == 0);
204 critical_exit();
205 if (prev != STALLED)
206 counter_u64_add(r->stalls, 1);
207 else if (total > 0) {
208 counter_u64_add(r->restarts, 1);
209 counter_u64_add(r->stalls, 1);
210 }
211 break;
212 }
213 cidx = increment_idx(r, cidx, n);
214 pending += n;
215 total += n;
216
217 /*
218 * We update the cidx only if we've caught up with the pidx, the
219 * real cidx is getting too far ahead of the one visible to
220 * everyone else, or we have exceeded our budget.
221 */
222 if (cidx != pidx && pending < 64 && total < budget)
223 continue;
224 critical_enter();
225 do {
226 os.state = ns.state = r->state;
227 ns.cidx = cidx;
228 ns.flags = state_to_flags(ns, total >= budget);
229 } while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230 critical_exit();
231
232 if (ns.flags == ABDICATED)
233 counter_u64_add(r->abdications, 1);
234 if (ns.flags != BUSY) {
235 /* Wrong loop exit if we're going to stall. */
236 MPASS(ns.flags != STALLED);
237 if (prev == STALLED) {
238 MPASS(total > 0);
239 counter_u64_add(r->restarts, 1);
240 }
241 break;
242 }
243
244 /*
245 * The acquire style atomic above guarantees visibility of items
246 * associated with any pidx change that we notice here.
247 */
248 pidx = ns.pidx_tail;
249 pending = 0;
250 }
251 }
252 #endif
253
254 int
255 ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256 mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257 {
258 struct ifmp_ring *r;
259
260 /* All idx are 16b so size can be 65536 at most */
261 if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262 can_drain == NULL)
263 return (EINVAL);
264 *pr = NULL;
265 flags &= M_NOWAIT | M_WAITOK;
266 MPASS(flags != 0);
267
268 r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269 if (r == NULL)
270 return (ENOMEM);
271 r->size = size;
272 r->cookie = cookie;
273 r->mt = mt;
274 r->drain = drain;
275 r->can_drain = can_drain;
276 r->enqueues = counter_u64_alloc(flags);
277 r->drops = counter_u64_alloc(flags);
278 r->starts = counter_u64_alloc(flags);
279 r->stalls = counter_u64_alloc(flags);
280 r->restarts = counter_u64_alloc(flags);
281 r->abdications = counter_u64_alloc(flags);
282 if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283 r->stalls == NULL || r->restarts == NULL ||
284 r->abdications == NULL) {
285 ifmp_ring_free(r);
286 return (ENOMEM);
287 }
288
289 *pr = r;
290 #ifdef MP_RING_NO_64BIT_ATOMICS
291 mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292 #endif
293 return (0);
294 }
295
296 void
297 ifmp_ring_free(struct ifmp_ring *r)
298 {
299
300 if (r == NULL)
301 return;
302
303 if (r->enqueues != NULL)
304 counter_u64_free(r->enqueues);
305 if (r->drops != NULL)
306 counter_u64_free(r->drops);
307 if (r->starts != NULL)
308 counter_u64_free(r->starts);
309 if (r->stalls != NULL)
310 counter_u64_free(r->stalls);
311 if (r->restarts != NULL)
312 counter_u64_free(r->restarts);
313 if (r->abdications != NULL)
314 counter_u64_free(r->abdications);
315
316 free(r, r->mt);
317 }
318
319 /*
320 * Enqueue n items and maybe drain the ring for some time.
321 *
322 * Returns an errno.
323 */
324 #ifdef MP_RING_NO_64BIT_ATOMICS
325 int
326 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
327 {
328 union ring_state os, ns;
329 uint16_t pidx_start, pidx_stop;
330 int i;
331
332 MPASS(items != NULL);
333 MPASS(n > 0);
334
335 mtx_lock(&r->lock);
336 /*
337 * Reserve room for the new items. Our reservation, if successful, is
338 * from 'pidx_start' to 'pidx_stop'.
339 */
340 os.state = r->state;
341 if (n >= space_available(r, os)) {
342 counter_u64_add(r->drops, n);
343 MPASS(os.flags != IDLE);
344 mtx_unlock(&r->lock);
345 if (os.flags == STALLED)
346 ifmp_ring_check_drainage(r, 0);
347 return (ENOBUFS);
348 }
349 ns.state = os.state;
350 ns.pidx_head = increment_idx(r, os.pidx_head, n);
351 r->state = ns.state;
352 pidx_start = os.pidx_head;
353 pidx_stop = ns.pidx_head;
354
355 /*
356 * Wait for other producers who got in ahead of us to enqueue their
357 * items, one producer at a time. It is our turn when the ring's
358 * pidx_tail reaches the beginning of our reservation (pidx_start).
359 */
360 while (ns.pidx_tail != pidx_start) {
361 cpu_spinwait();
362 ns.state = r->state;
363 }
364
365 /* Now it is our turn to fill up the area we reserved earlier. */
366 i = pidx_start;
367 do {
368 r->items[i] = *items++;
369 if (__predict_false(++i == r->size))
370 i = 0;
371 } while (i != pidx_stop);
372
373 /*
374 * Update the ring's pidx_tail. The release style atomic guarantees
375 * that the items are visible to any thread that sees the updated pidx.
376 */
377 os.state = ns.state = r->state;
378 ns.pidx_tail = pidx_stop;
379 ns.flags = BUSY;
380 r->state = ns.state;
381 counter_u64_add(r->enqueues, n);
382
383 /*
384 * Turn into a consumer if some other thread isn't active as a consumer
385 * already.
386 */
387 if (os.flags != BUSY)
388 drain_ring_locked(r, ns, os.flags, budget);
389
390 mtx_unlock(&r->lock);
391 return (0);
392 }
393
394 #else
395 int
396 ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget)
397 {
398 union ring_state os, ns;
399 uint16_t pidx_start, pidx_stop;
400 int i;
401
402 MPASS(items != NULL);
403 MPASS(n > 0);
404
405 /*
406 * Reserve room for the new items. Our reservation, if successful, is
407 * from 'pidx_start' to 'pidx_stop'.
408 */
409 for (;;) {
410 os.state = r->state;
411 if (n >= space_available(r, os)) {
412 counter_u64_add(r->drops, n);
413 MPASS(os.flags != IDLE);
414 if (os.flags == STALLED)
415 ifmp_ring_check_drainage(r, 0);
416 return (ENOBUFS);
417 }
418 ns.state = os.state;
419 ns.pidx_head = increment_idx(r, os.pidx_head, n);
420 critical_enter();
421 if (atomic_cmpset_64(&r->state, os.state, ns.state))
422 break;
423 critical_exit();
424 cpu_spinwait();
425 }
426 pidx_start = os.pidx_head;
427 pidx_stop = ns.pidx_head;
428
429 /*
430 * Wait for other producers who got in ahead of us to enqueue their
431 * items, one producer at a time. It is our turn when the ring's
432 * pidx_tail reaches the beginning of our reservation (pidx_start).
433 */
434 while (ns.pidx_tail != pidx_start) {
435 cpu_spinwait();
436 ns.state = r->state;
437 }
438
439 /* Now it is our turn to fill up the area we reserved earlier. */
440 i = pidx_start;
441 do {
442 r->items[i] = *items++;
443 if (__predict_false(++i == r->size))
444 i = 0;
445 } while (i != pidx_stop);
446
447 /*
448 * Update the ring's pidx_tail. The release style atomic guarantees
449 * that the items are visible to any thread that sees the updated pidx.
450 */
451 do {
452 os.state = ns.state = r->state;
453 ns.pidx_tail = pidx_stop;
454 if (os.flags == IDLE)
455 ns.flags = ABDICATED;
456 } while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
457 critical_exit();
458 counter_u64_add(r->enqueues, n);
459
460 return (0);
461 }
462 #endif
463
464 void
465 ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
466 {
467 union ring_state os, ns;
468
469 os.state = r->state;
470 if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED
471 os.pidx_head != os.pidx_tail || // Require work to be available
472 (os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left
473 return;
474
475 MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
476 ns.state = os.state;
477 ns.flags = BUSY;
478
479
480 #ifdef MP_RING_NO_64BIT_ATOMICS
481 mtx_lock(&r->lock);
482 if (r->state != os.state) {
483 mtx_unlock(&r->lock);
484 return;
485 }
486 r->state = ns.state;
487 drain_ring_locked(r, ns, os.flags, budget);
488 mtx_unlock(&r->lock);
489 #else
490 /*
491 * The acquire style atomic guarantees visibility of items associated
492 * with the pidx that we read here.
493 */
494 if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
495 return;
496
497
498 drain_ring_lockless(r, ns, os.flags, budget);
499 #endif
500 }
501
502 void
503 ifmp_ring_reset_stats(struct ifmp_ring *r)
504 {
505
506 counter_u64_zero(r->enqueues);
507 counter_u64_zero(r->drops);
508 counter_u64_zero(r->starts);
509 counter_u64_zero(r->stalls);
510 counter_u64_zero(r->restarts);
511 counter_u64_zero(r->abdications);
512 }
513
514 int
515 ifmp_ring_is_idle(struct ifmp_ring *r)
516 {
517 union ring_state s;
518
519 s.state = r->state;
520 if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
521 s.flags == IDLE)
522 return (1);
523
524 return (0);
525 }
526
527 int
528 ifmp_ring_is_stalled(struct ifmp_ring *r)
529 {
530 union ring_state s;
531
532 s.state = r->state;
533 if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
534 return (1);
535
536 return (0);
537 }
Cache object: bbc21bd2debba48516dbbe3d95531f79
|