The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/openzfs/module/zstd/lib/common/pool.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
    3  * All rights reserved.
    4  *
    5  * This source code is licensed under both the BSD-style license (found in the
    6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
    7  * in the COPYING file in the root directory of this source tree).
    8  * You may select, at your option, one of the above-listed licenses.
    9  */
   10 
   11 
   12 /* ======   Dependencies   ======= */
   13 #include <stddef.h>    /* size_t */
   14 #include "debug.h"     /* assert */
   15 #include "zstd_internal.h"  /* ZSTD_malloc, ZSTD_free */
   16 #include "pool.h"
   17 
   18 /* ======   Compiler specifics   ====== */
   19 #if defined(_MSC_VER)
   20 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
   21 #endif
   22 
   23 
   24 #ifdef ZSTD_MULTITHREAD
   25 
   26 #include "threading.h"   /* pthread adaptation */
   27 
   28 /* A job is a function and an opaque argument */
   29 typedef struct POOL_job_s {
   30     POOL_function function;
   31     void *opaque;
   32 } POOL_job;
   33 
   34 struct POOL_ctx_s {
   35     ZSTD_customMem customMem;
   36     /* Keep track of the threads */
   37     ZSTD_pthread_t* threads;
   38     size_t threadCapacity;
   39     size_t threadLimit;
   40 
   41     /* The queue is a circular buffer */
   42     POOL_job *queue;
   43     size_t queueHead;
   44     size_t queueTail;
   45     size_t queueSize;
   46 
   47     /* The number of threads working on jobs */
   48     size_t numThreadsBusy;
   49     /* Indicates if the queue is empty */
   50     int queueEmpty;
   51 
   52     /* The mutex protects the queue */
   53     ZSTD_pthread_mutex_t queueMutex;
   54     /* Condition variable for pushers to wait on when the queue is full */
   55     ZSTD_pthread_cond_t queuePushCond;
   56     /* Condition variables for poppers to wait on when the queue is empty */
   57     ZSTD_pthread_cond_t queuePopCond;
   58     /* Indicates if the queue is shutting down */
   59     int shutdown;
   60 };
   61 
   62 /* POOL_thread() :
   63  * Work thread for the thread pool.
   64  * Waits for jobs and executes them.
   65  * @returns : NULL on failure else non-null.
   66  */
   67 static void* POOL_thread(void* opaque) {
   68     POOL_ctx* const ctx = (POOL_ctx*)opaque;
   69     if (!ctx) { return NULL; }
   70     for (;;) {
   71         /* Lock the mutex and wait for a non-empty queue or until shutdown */
   72         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
   73 
   74         while ( ctx->queueEmpty
   75             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
   76             if (ctx->shutdown) {
   77                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
   78                  * a few threads will be shutdown while !queueEmpty,
   79                  * but enough threads will remain active to finish the queue */
   80                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
   81                 return opaque;
   82             }
   83             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
   84         }
   85         /* Pop a job off the queue */
   86         {   POOL_job const job = ctx->queue[ctx->queueHead];
   87             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
   88             ctx->numThreadsBusy++;
   89             ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
   90             /* Unlock the mutex, signal a pusher, and run the job */
   91             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
   92             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
   93 
   94             job.function(job.opaque);
   95 
   96             /* If the intended queue size was 0, signal after finishing job */
   97             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
   98             ctx->numThreadsBusy--;
   99             if (ctx->queueSize == 1) {
  100                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
  101             }
  102             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  103         }
  104     }  /* for (;;) */
  105     assert(0);  /* Unreachable */
  106 }
  107 
  108 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
  109     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
  110 }
  111 
  112 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
  113                                ZSTD_customMem customMem) {
  114     POOL_ctx* ctx;
  115     /* Check parameters */
  116     if (!numThreads) { return NULL; }
  117     /* Allocate the context and zero initialize */
  118     ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem);
  119     if (!ctx) { return NULL; }
  120     /* Initialize the job queue.
  121      * It needs one extra space since one space is wasted to differentiate
  122      * empty and full queues.
  123      */
  124     ctx->queueSize = queueSize + 1;
  125     ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem);
  126     ctx->queueHead = 0;
  127     ctx->queueTail = 0;
  128     ctx->numThreadsBusy = 0;
  129     ctx->queueEmpty = 1;
  130     {
  131         int error = 0;
  132         error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
  133         error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
  134         error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
  135         if (error) { POOL_free(ctx); return NULL; }
  136     }
  137     ctx->shutdown = 0;
  138     /* Allocate space for the thread handles */
  139     ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
  140     ctx->threadCapacity = 0;
  141     ctx->customMem = customMem;
  142     /* Check for errors */
  143     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
  144     /* Initialize the threads */
  145     {   size_t i;
  146         for (i = 0; i < numThreads; ++i) {
  147             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
  148                 ctx->threadCapacity = i;
  149                 POOL_free(ctx);
  150                 return NULL;
  151         }   }
  152         ctx->threadCapacity = numThreads;
  153         ctx->threadLimit = numThreads;
  154     }
  155     return ctx;
  156 }
  157 
  158 /*! POOL_join() :
  159     Shutdown the queue, wake any sleeping threads, and join all of the threads.
  160 */
  161 static void POOL_join(POOL_ctx* ctx) {
  162     /* Shut down the queue */
  163     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  164     ctx->shutdown = 1;
  165     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  166     /* Wake up sleeping threads */
  167     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
  168     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  169     /* Join all of the threads */
  170     {   size_t i;
  171         for (i = 0; i < ctx->threadCapacity; ++i) {
  172             ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
  173     }   }
  174 }
  175 
  176 void POOL_free(POOL_ctx *ctx) {
  177     if (!ctx) { return; }
  178     POOL_join(ctx);
  179     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
  180     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
  181     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
  182     ZSTD_free(ctx->queue, ctx->customMem);
  183     ZSTD_free(ctx->threads, ctx->customMem);
  184     ZSTD_free(ctx, ctx->customMem);
  185 }
  186 
  187 
  188 
  189 size_t POOL_sizeof(POOL_ctx *ctx) {
  190     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  191     return sizeof(*ctx)
  192         + ctx->queueSize * sizeof(POOL_job)
  193         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
  194 }
  195 
  196 
  197 /* @return : 0 on success, 1 on error */
  198 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
  199 {
  200     if (numThreads <= ctx->threadCapacity) {
  201         if (!numThreads) return 1;
  202         ctx->threadLimit = numThreads;
  203         return 0;
  204     }
  205     /* numThreads > threadCapacity */
  206     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
  207         if (!threadPool) return 1;
  208         /* replace existing thread pool */
  209         memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
  210         ZSTD_free(ctx->threads, ctx->customMem);
  211         ctx->threads = threadPool;
  212         /* Initialize additional threads */
  213         {   size_t threadId;
  214             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
  215                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
  216                     ctx->threadCapacity = threadId;
  217                     return 1;
  218             }   }
  219     }   }
  220     /* successfully expanded */
  221     ctx->threadCapacity = numThreads;
  222     ctx->threadLimit = numThreads;
  223     return 0;
  224 }
  225 
  226 /* @return : 0 on success, 1 on error */
  227 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
  228 {
  229     int result;
  230     if (ctx==NULL) return 1;
  231     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  232     result = POOL_resize_internal(ctx, numThreads);
  233     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  234     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  235     return result;
  236 }
  237 
  238 /**
  239  * Returns 1 if the queue is full and 0 otherwise.
  240  *
  241  * When queueSize is 1 (pool was created with an intended queueSize of 0),
  242  * then a queue is empty if there is a thread free _and_ no job is waiting.
  243  */
  244 static int isQueueFull(POOL_ctx const* ctx) {
  245     if (ctx->queueSize > 1) {
  246         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
  247     } else {
  248         return (ctx->numThreadsBusy == ctx->threadLimit) ||
  249                !ctx->queueEmpty;
  250     }
  251 }
  252 
  253 
  254 static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
  255 {
  256     POOL_job const job = {function, opaque};
  257     assert(ctx != NULL);
  258     if (ctx->shutdown) return;
  259 
  260     ctx->queueEmpty = 0;
  261     ctx->queue[ctx->queueTail] = job;
  262     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
  263     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
  264 }
  265 
  266 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
  267 {
  268     assert(ctx != NULL);
  269     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  270     /* Wait until there is space in the queue for the new job */
  271     while (isQueueFull(ctx) && (!ctx->shutdown)) {
  272         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
  273     }
  274     POOL_add_internal(ctx, function, opaque);
  275     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  276 }
  277 
  278 
  279 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
  280 {
  281     assert(ctx != NULL);
  282     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  283     if (isQueueFull(ctx)) {
  284         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  285         return 0;
  286     }
  287     POOL_add_internal(ctx, function, opaque);
  288     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  289     return 1;
  290 }
  291 
  292 
  293 #else  /* ZSTD_MULTITHREAD  not defined */
  294 
  295 /* ========================== */
  296 /* No multi-threading support */
  297 /* ========================== */
  298 
  299 
  300 /* We don't need any data, but if it is empty, malloc() might return NULL. */
  301 struct POOL_ctx_s {
  302     int dummy;
  303 };
  304 static POOL_ctx g_ctx;
  305 
  306 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
  307     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
  308 }
  309 
  310 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) {
  311     (void)numThreads;
  312     (void)queueSize;
  313     (void)customMem;
  314     return &g_ctx;
  315 }
  316 
  317 void POOL_free(POOL_ctx* ctx) {
  318     assert(!ctx || ctx == &g_ctx);
  319     (void)ctx;
  320 }
  321 
  322 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
  323     (void)ctx; (void)numThreads;
  324     return 0;
  325 }
  326 
  327 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
  328     (void)ctx;
  329     function(opaque);
  330 }
  331 
  332 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
  333     (void)ctx;
  334     function(opaque);
  335     return 1;
  336 }
  337 
  338 size_t POOL_sizeof(POOL_ctx* ctx) {
  339     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  340     assert(ctx == &g_ctx);
  341     return sizeof(*ctx);
  342 }
  343 
  344 #endif  /* ZSTD_MULTITHREAD */

Cache object: b359b6ae92edb72038ab85cb0eb711ec


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.