The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/contrib/zstd/lib/common/pool.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*
    2  * Copyright (c) Yann Collet, Facebook, Inc.
    3  * All rights reserved.
    4  *
    5  * This source code is licensed under both the BSD-style license (found in the
    6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
    7  * in the COPYING file in the root directory of this source tree).
    8  * You may select, at your option, one of the above-listed licenses.
    9  */
   10 
   11 
   12 /* ======   Dependencies   ======= */
   13 #include "zstd_deps.h" /* size_t */
   14 #include "debug.h"     /* assert */
   15 #include "zstd_internal.h"  /* ZSTD_customMalloc, ZSTD_customFree */
   16 #include "pool.h"
   17 
   18 /* ======   Compiler specifics   ====== */
   19 #if defined(_MSC_VER)
   20 #  pragma warning(disable : 4204)        /* disable: C4204: non-constant aggregate initializer */
   21 #endif
   22 
   23 
   24 #ifdef ZSTD_MULTITHREAD
   25 
   26 #include "threading.h"   /* pthread adaptation */
   27 
   28 /* A job is a function and an opaque argument */
   29 typedef struct POOL_job_s {
   30     POOL_function function;
   31     void *opaque;
   32 } POOL_job;
   33 
   34 struct POOL_ctx_s {
   35     ZSTD_customMem customMem;
   36     /* Keep track of the threads */
   37     ZSTD_pthread_t* threads;
   38     size_t threadCapacity;
   39     size_t threadLimit;
   40 
   41     /* The queue is a circular buffer */
   42     POOL_job *queue;
   43     size_t queueHead;
   44     size_t queueTail;
   45     size_t queueSize;
   46 
   47     /* The number of threads working on jobs */
   48     size_t numThreadsBusy;
   49     /* Indicates if the queue is empty */
   50     int queueEmpty;
   51 
   52     /* The mutex protects the queue */
   53     ZSTD_pthread_mutex_t queueMutex;
   54     /* Condition variable for pushers to wait on when the queue is full */
   55     ZSTD_pthread_cond_t queuePushCond;
   56     /* Condition variables for poppers to wait on when the queue is empty */
   57     ZSTD_pthread_cond_t queuePopCond;
   58     /* Indicates if the queue is shutting down */
   59     int shutdown;
   60 };
   61 
   62 /* POOL_thread() :
   63  * Work thread for the thread pool.
   64  * Waits for jobs and executes them.
   65  * @returns : NULL on failure else non-null.
   66  */
   67 static void* POOL_thread(void* opaque) {
   68     POOL_ctx* const ctx = (POOL_ctx*)opaque;
   69     if (!ctx) { return NULL; }
   70     for (;;) {
   71         /* Lock the mutex and wait for a non-empty queue or until shutdown */
   72         ZSTD_pthread_mutex_lock(&ctx->queueMutex);
   73 
   74         while ( ctx->queueEmpty
   75             || (ctx->numThreadsBusy >= ctx->threadLimit) ) {
   76             if (ctx->shutdown) {
   77                 /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit),
   78                  * a few threads will be shutdown while !queueEmpty,
   79                  * but enough threads will remain active to finish the queue */
   80                 ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
   81                 return opaque;
   82             }
   83             ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
   84         }
   85         /* Pop a job off the queue */
   86         {   POOL_job const job = ctx->queue[ctx->queueHead];
   87             ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize;
   88             ctx->numThreadsBusy++;
   89             ctx->queueEmpty = (ctx->queueHead == ctx->queueTail);
   90             /* Unlock the mutex, signal a pusher, and run the job */
   91             ZSTD_pthread_cond_signal(&ctx->queuePushCond);
   92             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
   93 
   94             job.function(job.opaque);
   95 
   96             /* If the intended queue size was 0, signal after finishing job */
   97             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
   98             ctx->numThreadsBusy--;
   99             if (ctx->queueSize == 1) {
  100                 ZSTD_pthread_cond_signal(&ctx->queuePushCond);
  101             }
  102             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  103         }
  104     }  /* for (;;) */
  105     assert(0);  /* Unreachable */
  106 }
  107 
  108 /* ZSTD_createThreadPool() : public access point */
  109 POOL_ctx* ZSTD_createThreadPool(size_t numThreads) {
  110     return POOL_create (numThreads, 0);
  111 }
  112 
  113 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
  114     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
  115 }
  116 
  117 POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
  118                                ZSTD_customMem customMem)
  119 {
  120     POOL_ctx* ctx;
  121     /* Check parameters */
  122     if (!numThreads) { return NULL; }
  123     /* Allocate the context and zero initialize */
  124     ctx = (POOL_ctx*)ZSTD_customCalloc(sizeof(POOL_ctx), customMem);
  125     if (!ctx) { return NULL; }
  126     /* Initialize the job queue.
  127      * It needs one extra space since one space is wasted to differentiate
  128      * empty and full queues.
  129      */
  130     ctx->queueSize = queueSize + 1;
  131     ctx->queue = (POOL_job*)ZSTD_customMalloc(ctx->queueSize * sizeof(POOL_job), customMem);
  132     ctx->queueHead = 0;
  133     ctx->queueTail = 0;
  134     ctx->numThreadsBusy = 0;
  135     ctx->queueEmpty = 1;
  136     {
  137         int error = 0;
  138         error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
  139         error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
  140         error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
  141         if (error) { POOL_free(ctx); return NULL; }
  142     }
  143     ctx->shutdown = 0;
  144     /* Allocate space for the thread handles */
  145     ctx->threads = (ZSTD_pthread_t*)ZSTD_customMalloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
  146     ctx->threadCapacity = 0;
  147     ctx->customMem = customMem;
  148     /* Check for errors */
  149     if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; }
  150     /* Initialize the threads */
  151     {   size_t i;
  152         for (i = 0; i < numThreads; ++i) {
  153             if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
  154                 ctx->threadCapacity = i;
  155                 POOL_free(ctx);
  156                 return NULL;
  157         }   }
  158         ctx->threadCapacity = numThreads;
  159         ctx->threadLimit = numThreads;
  160     }
  161     return ctx;
  162 }
  163 
  164 /*! POOL_join() :
  165     Shutdown the queue, wake any sleeping threads, and join all of the threads.
  166 */
  167 static void POOL_join(POOL_ctx* ctx) {
  168     /* Shut down the queue */
  169     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  170     ctx->shutdown = 1;
  171     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  172     /* Wake up sleeping threads */
  173     ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
  174     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  175     /* Join all of the threads */
  176     {   size_t i;
  177         for (i = 0; i < ctx->threadCapacity; ++i) {
  178             ZSTD_pthread_join(ctx->threads[i], NULL);  /* note : could fail */
  179     }   }
  180 }
  181 
  182 void POOL_free(POOL_ctx *ctx) {
  183     if (!ctx) { return; }
  184     POOL_join(ctx);
  185     ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
  186     ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
  187     ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
  188     ZSTD_customFree(ctx->queue, ctx->customMem);
  189     ZSTD_customFree(ctx->threads, ctx->customMem);
  190     ZSTD_customFree(ctx, ctx->customMem);
  191 }
  192 
  193 void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
  194   POOL_free (pool);
  195 }
  196 
  197 size_t POOL_sizeof(const POOL_ctx* ctx) {
  198     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  199     return sizeof(*ctx)
  200         + ctx->queueSize * sizeof(POOL_job)
  201         + ctx->threadCapacity * sizeof(ZSTD_pthread_t);
  202 }
  203 
  204 
  205 /* @return : 0 on success, 1 on error */
  206 static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
  207 {
  208     if (numThreads <= ctx->threadCapacity) {
  209         if (!numThreads) return 1;
  210         ctx->threadLimit = numThreads;
  211         return 0;
  212     }
  213     /* numThreads > threadCapacity */
  214     {   ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_customMalloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
  215         if (!threadPool) return 1;
  216         /* replace existing thread pool */
  217         ZSTD_memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool));
  218         ZSTD_customFree(ctx->threads, ctx->customMem);
  219         ctx->threads = threadPool;
  220         /* Initialize additional threads */
  221         {   size_t threadId;
  222             for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
  223                 if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) {
  224                     ctx->threadCapacity = threadId;
  225                     return 1;
  226             }   }
  227     }   }
  228     /* successfully expanded */
  229     ctx->threadCapacity = numThreads;
  230     ctx->threadLimit = numThreads;
  231     return 0;
  232 }
  233 
  234 /* @return : 0 on success, 1 on error */
  235 int POOL_resize(POOL_ctx* ctx, size_t numThreads)
  236 {
  237     int result;
  238     if (ctx==NULL) return 1;
  239     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  240     result = POOL_resize_internal(ctx, numThreads);
  241     ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
  242     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  243     return result;
  244 }
  245 
  246 /**
  247  * Returns 1 if the queue is full and 0 otherwise.
  248  *
  249  * When queueSize is 1 (pool was created with an intended queueSize of 0),
  250  * then a queue is empty if there is a thread free _and_ no job is waiting.
  251  */
  252 static int isQueueFull(POOL_ctx const* ctx) {
  253     if (ctx->queueSize > 1) {
  254         return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
  255     } else {
  256         return (ctx->numThreadsBusy == ctx->threadLimit) ||
  257                !ctx->queueEmpty;
  258     }
  259 }
  260 
  261 
  262 static void
  263 POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque)
  264 {
  265     POOL_job const job = {function, opaque};
  266     assert(ctx != NULL);
  267     if (ctx->shutdown) return;
  268 
  269     ctx->queueEmpty = 0;
  270     ctx->queue[ctx->queueTail] = job;
  271     ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
  272     ZSTD_pthread_cond_signal(&ctx->queuePopCond);
  273 }
  274 
  275 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque)
  276 {
  277     assert(ctx != NULL);
  278     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  279     /* Wait until there is space in the queue for the new job */
  280     while (isQueueFull(ctx) && (!ctx->shutdown)) {
  281         ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
  282     }
  283     POOL_add_internal(ctx, function, opaque);
  284     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  285 }
  286 
  287 
  288 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque)
  289 {
  290     assert(ctx != NULL);
  291     ZSTD_pthread_mutex_lock(&ctx->queueMutex);
  292     if (isQueueFull(ctx)) {
  293         ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  294         return 0;
  295     }
  296     POOL_add_internal(ctx, function, opaque);
  297     ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
  298     return 1;
  299 }
  300 
  301 
  302 #else  /* ZSTD_MULTITHREAD  not defined */
  303 
  304 /* ========================== */
  305 /* No multi-threading support */
  306 /* ========================== */
  307 
  308 
  309 /* We don't need any data, but if it is empty, malloc() might return NULL. */
  310 struct POOL_ctx_s {
  311     int dummy;
  312 };
  313 static POOL_ctx g_poolCtx;
  314 
  315 POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) {
  316     return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem);
  317 }
  318 
  319 POOL_ctx*
  320 POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem)
  321 {
  322     (void)numThreads;
  323     (void)queueSize;
  324     (void)customMem;
  325     return &g_poolCtx;
  326 }
  327 
  328 void POOL_free(POOL_ctx* ctx) {
  329     assert(!ctx || ctx == &g_poolCtx);
  330     (void)ctx;
  331 }
  332 
  333 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
  334     (void)ctx; (void)numThreads;
  335     return 0;
  336 }
  337 
  338 void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
  339     (void)ctx;
  340     function(opaque);
  341 }
  342 
  343 int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) {
  344     (void)ctx;
  345     function(opaque);
  346     return 1;
  347 }
  348 
  349 size_t POOL_sizeof(const POOL_ctx* ctx) {
  350     if (ctx==NULL) return 0;  /* supports sizeof NULL */
  351     assert(ctx == &g_poolCtx);
  352     return sizeof(*ctx);
  353 }
  354 
  355 #endif  /* ZSTD_MULTITHREAD */

Cache object: 31d3aedbad45e45cfcfdc894d7c6d42f


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.