The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/tools/tests/affinity/pool.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 #include <AvailabilityMacros.h>
    2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
    3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
    4 #endif
    5 #include <mach/mach.h>
    6 #include <mach/mach_error.h>
    7 #include <mach/mach_time.h>
    8 #include <pthread.h>
    9 #include <sys/queue.h>
   10 #include <stdio.h>
   11 #include <stdlib.h>
   12 #include <string.h>
   13 #include <unistd.h>
   14 #include <err.h>
   15 
   16 /*
   17  * Pool is another multithreaded test/benchmarking program to evaluate
   18  * affinity set placement in Leopard.
   19  *
   20  * The basic picture is:
   21  *
   22  *                  -> producer --                 -> consumer --
   23  *       free     /                \    work     /                \
   24  *    -> queue --      ...          --> queue --                   --
   25  *   |            \                /             \                /  |
   26  *   |              -> producer --                 -> consumer --    |
   27  *    ---------------------------------------------------------------
   28  *
   29  *       <---------- "stage" ---------> <---------- "stage" --------->
   30  *
   31  * There are a series of work stages. Each stage has an input and an output
   32  * queue and multiple threads. The first stage is the producer and subsequent
   33  * stages are consumers. By defuaut there are 2 stages. There are N producer
   34  * and M consumer threads. The are B buffers per producer threads circulating
   35  * through the system.
   36  *
   37  * When affinity is enabled, each producer thread is tagged with an affinity tag
   38  * 1 .. N - so each runs on a different L2 cache. When a buffer is queued to
   39  * the work queue it is tagged with this affinity. When a consumer dequeues a
   40  * work item, it sets its affinity to this tag. Hence consumer threads migrate
   41  * to the same affinity set where the data was produced.
   42  *
   43  * Buffer management uses pthread mutex/condition variables. A thread blocks
   44  * when no buffer is available on a queue and it is signaled when a buffer
   45  * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
   46  * The queue management is centralized in a single routine: what queues to
   47  * use as input and output and what function to call for processing is
   48  * data-driven.
   49  */
   50   
   51 pthread_mutex_t funnel;
   52 pthread_cond_t  barrier;
   53 
   54 uint64_t        timer;
   55 int             threads;
   56 int             threads_ready = 0;
   57 
   58 int             iterations = 10000;
   59 boolean_t       affinity = FALSE;
   60 boolean_t       halting = FALSE;
   61 int             verbosity = 1;
   62 
   63 typedef struct work {
   64         TAILQ_ENTRY(work)       link;
   65         int                     *data;
   66         int                     isize;
   67         int                     tag;
   68         int                     number;
   69 } work_t;
   70 
   71 /*
   72  * A work queue, complete with pthread objects for its management
   73  */
   74 typedef struct work_queue {
   75         pthread_mutex_t         mtx;
   76         pthread_cond_t          cnd;
   77         TAILQ_HEAD(, work)      queue;
   78         unsigned int            waiters;
   79 } work_queue_t;
   80 
   81 /* Worker functions take a integer array and size */
   82 typedef void (worker_fn_t)(int *, int); 
   83 
   84 /* This struct controls the function of a stage */
   85 #define WORKERS_MAX 10
   86 typedef struct {
   87         int                     stagenum;
   88         char                    *name;
   89         worker_fn_t             *fn;
   90         work_queue_t            *input;         
   91         work_queue_t            *output;                
   92         work_queue_t            bufq;
   93         int                     work_todo;
   94 } stage_info_t;
   95 
   96 /* This defines a worker thread */
   97 typedef struct worker_info {
   98         int                     setnum;
   99         stage_info_t            *stage;
  100         pthread_t               thread;
  101 } worker_info_t;
  102 
  103 #define DBG(x...) do {                          \
  104         if (verbosity > 1) {                    \
  105                 pthread_mutex_lock(&funnel);    \
  106                 printf(x);                      \
  107                 pthread_mutex_unlock(&funnel);  \
  108         }                                       \
  109 } while (0)
  110 
  111 #define mutter(x...) do {                       \
  112         if (verbosity > 0) {                    \
  113                 printf(x);                      \
  114         }                                       \
  115 } while (0)
  116 
  117 #define s_if_plural(x)  (((x) > 1) ? "s" : "")
  118 
  119 static void
  120 usage()
  121 {
  122         fprintf(stderr,
  123 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  124                 "usage: pool [-a]    Turn affinity on (off)\n"
  125                 "            [-b B]  Number of buffers per producer (2)\n"
  126 #else
  127                 "usage: pool [-b B]  Number of buffers per producer (2)\n"
  128 #endif
  129                 "            [-i I]  Number of buffers to produce (10000)\n"
  130                 "            [-s S]  Number of stages (2)\n"
  131                 "            [-p P]  Number of pages per buffer (256=1MB)]\n"
  132                 "            [-w]    Consumer writes data\n"
  133                 "            [-v V]  Verbosity level 0..2 (1)\n"
  134                 "            [N [M]] Number of producer and consumers (2)\n"
  135         );
  136         exit(1);
  137 }
  138 
  139 /* Trivial producer: write to each byte */
  140 void
  141 writer_fn(int *data, int isize)
  142 {
  143         int     i;
  144 
  145         for (i = 0; i < isize; i++) {
  146                 data[i] = i;
  147         }
  148 }
  149 
  150 /* Trivial consumer: read each byte */
  151 void
  152 reader_fn(int *data, int isize)
  153 {
  154         int     i;
  155         int     datum;
  156 
  157         for (i = 0; i < isize; i++) {
  158                 datum = data[i];
  159         }
  160 }
  161 
  162 /* Consumer reading and writing the buffer */
  163 void
  164 reader_writer_fn(int *data, int isize)
  165 {
  166         int     i;
  167 
  168         for (i = 0; i < isize; i++) {
  169                 data[i] += 1;
  170         }
  171 }
  172 
  173 void
  174 affinity_set(int tag)
  175 {
  176 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  177         kern_return_t                   ret;
  178         thread_affinity_policy_data_t   policy;
  179         if (affinity) {
  180                 policy.affinity_tag = tag;
  181                 ret = thread_policy_set(
  182                                 mach_thread_self(), THREAD_AFFINITY_POLICY,
  183                                 (thread_policy_t) &policy,
  184                                 THREAD_AFFINITY_POLICY_COUNT);
  185                 if (ret != KERN_SUCCESS)
  186                         printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
  187         }
  188 #endif
  189 }
  190 
  191 /*
  192  * This is the central function for every thread.
  193  * For each invocation, its role is ets by (a pointer to) a stage_info_t.
  194  */
  195 void *
  196 manager_fn(void *arg)
  197 {
  198         worker_info_t   *wp = (worker_info_t *) arg;
  199         stage_info_t    *sp = wp->stage;
  200         boolean_t       is_producer = (sp->stagenum == 0);
  201         long            iteration = 0;
  202         int             current_tag = 0;
  203 
  204 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  205         kern_return_t                   ret;
  206         thread_extended_policy_data_t   epolicy;
  207         epolicy.timeshare = FALSE;
  208         ret = thread_policy_set(
  209                         mach_thread_self(), THREAD_EXTENDED_POLICY,
  210                         (thread_policy_t) &epolicy,
  211                         THREAD_EXTENDED_POLICY_COUNT);
  212         if (ret != KERN_SUCCESS)
  213                 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
  214         
  215 #endif
  216         /*
  217          * If we're using affinity sets and we're a producer
  218          * set our tag to by our thread set number.
  219          */
  220         if (affinity && is_producer) {
  221                 affinity_set(wp->setnum);
  222                 current_tag = wp->setnum;
  223         }
  224 
  225         DBG("Starting %s %d, stage: %d\n", sp->name, wp->setnum, sp->stagenum);
  226 
  227         /*
  228          * Start barrier.
  229          * The tets thread to get here releases everyone and starts the timer.
  230          */
  231         pthread_mutex_lock(&funnel);
  232         threads_ready++;
  233         if (threads_ready == threads) {
  234                 pthread_mutex_unlock(&funnel);
  235                 if (halting) {
  236                         printf("  all threads ready for process %d, "
  237                                 "hit any key to start", getpid());
  238                         fflush(stdout);
  239                         (void) getchar();
  240                 }
  241                 pthread_cond_broadcast(&barrier);
  242                 timer = mach_absolute_time();
  243         } else {
  244                 pthread_cond_wait(&barrier, &funnel);
  245                 pthread_mutex_unlock(&funnel);
  246         }
  247 
  248         do {
  249                 work_t          *workp;
  250 
  251                 /*
  252                  * Get a buffer from the input queue.
  253                  * Block if none.
  254                  * Quit if all work done.
  255                  */
  256                 pthread_mutex_lock(&sp->input->mtx);
  257                 while (1) {
  258                         if (sp->work_todo == 0) {
  259                                 pthread_mutex_unlock(&sp->input->mtx);
  260                                 goto out;
  261                         }
  262                         workp = TAILQ_FIRST(&(sp->input->queue));
  263                         if (workp != NULL)
  264                                 break;
  265                         DBG("    %s[%d,%d] todo %d waiting for buffer\n",
  266                                 sp->name, wp->setnum, sp->stagenum, sp->work_todo);
  267                         sp->input->waiters++;
  268                         pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
  269                         sp->input->waiters--;
  270                 }
  271                 TAILQ_REMOVE(&(sp->input->queue), workp, link);
  272                 iteration = sp->work_todo--;
  273                 pthread_mutex_unlock(&sp->input->mtx);
  274 
  275                 if (is_producer) {
  276                         workp->number = iteration;
  277                         workp->tag = wp->setnum;
  278                 } else {
  279                         if (affinity && current_tag != workp->tag) {
  280                                 affinity_set(workp->tag);
  281                                 current_tag = workp->tag;
  282                         }
  283                 }
  284 
  285                 DBG("  %s[%d,%d] todo %d work %p data %p\n",
  286                         sp->name, wp->setnum, sp->stagenum, iteration, workp, workp->data);
  287 
  288                 /* Do our stuff with the buffer */
  289                 (void) sp->fn(workp->data, workp->isize);
  290 
  291                 /*
  292                  * Place the buffer on the input queue of the next stage.
  293                  * Signal waiters if required.
  294                  */
  295                 pthread_mutex_lock(&sp->output->mtx);
  296                 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
  297                 if (sp->output->waiters) {
  298                         DBG("    %s[%d,%d] todo %d signaling work\n",
  299                                 sp->name, wp->setnum, sp->stagenum, iteration);
  300                         pthread_cond_signal(&sp->output->cnd);
  301                 }
  302                 pthread_mutex_unlock(&sp->output->mtx);
  303 
  304         } while (1);
  305 
  306 out:
  307         pthread_cond_broadcast(&sp->output->cnd);
  308 
  309         DBG("Ending %s[%d,%d]\n", sp->name, wp->setnum, sp->stagenum);
  310 
  311         return (void *) iteration;
  312 }
  313 
  314 void (*producer_fnp)(int *data, int isize) = &writer_fn;
  315 void (*consumer_fnp)(int *data, int isize) = &reader_fn;
  316 
  317 int
  318 main(int argc, char *argv[])
  319 {
  320         int                     i;
  321         int                     j;
  322         int                     k;
  323         int                     pages = 256; /* 1MB */
  324         int                     buffers = 2;
  325         int                     producers = 2;
  326         int                     consumers = 2;
  327         int                     stages = 2;
  328         int                     *status;
  329         stage_info_t            *stage_info;
  330         stage_info_t            *sp;
  331         worker_info_t           *worker_info;
  332         worker_info_t           *wp;
  333         kern_return_t           ret;
  334         int                     c;
  335 
  336         /* Do switch parsing: */
  337         while ((c = getopt (argc, argv, "ab:i:p:s:twv:")) != -1) {
  338                 switch (c) {
  339                 case 'a':
  340 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  341                         affinity = !affinity;
  342                         break;
  343 #else
  344                         usage();
  345 #endif
  346                 case 'b':
  347                         buffers = atoi(optarg);
  348                         break;
  349                 case 'i':
  350                         iterations = atoi(optarg);
  351                         break;
  352                 case 'p':
  353                         pages = atoi(optarg);
  354                         break;
  355                 case 's':
  356                         stages = atoi(optarg);
  357                         if (stages >= WORKERS_MAX)
  358                                 usage();
  359                         break;
  360                 case 't':
  361                         halting = TRUE;
  362                         break;
  363                 case 'w':
  364                         consumer_fnp = &reader_writer_fn;
  365                         break;
  366                 case 'v':
  367                         verbosity = atoi(optarg);
  368                         break;
  369                 case 'h':
  370                 case '?':
  371                 default:
  372                         usage();
  373                 }
  374         }
  375         argc -= optind; argv += optind;
  376         if (argc > 0)
  377                 producers = atoi(*argv);
  378         argc--; argv++;
  379         if (argc > 0)
  380                 consumers = atoi(*argv);
  381         
  382         pthread_mutex_init(&funnel, NULL);
  383         pthread_cond_init(&barrier, NULL);
  384 
  385         /*
  386          * Fire up the worker threads.
  387          */
  388         threads = consumers * (stages - 1) + producers;
  389         mutter("Launching %d producer%s with %d stage%s of %d consumer%s\n"
  390                 "  with %saffinity, consumer reads%s data\n",
  391                 producers, s_if_plural(producers),
  392                 stages - 1, s_if_plural(stages - 1),
  393                 consumers, s_if_plural(consumers),
  394                 affinity? "": "no ",
  395                 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
  396         if (pages < 256)
  397                 mutter("  %dkB bytes per buffer, ", pages * 4);
  398         else
  399                 mutter("  %dMB bytes per buffer, ", pages / 256);
  400         mutter("%d buffer%s per producer ",
  401                 buffers, s_if_plural(buffers));
  402         if (buffers * pages < 256)
  403                 mutter("(total %dkB)\n", buffers * pages * 4);
  404         else
  405                 mutter("(total %dMB)\n", buffers * pages / 256);
  406         mutter("  processing %d buffer%s...\n",
  407                 iterations, s_if_plural(iterations));
  408 
  409         stage_info = (stage_info_t *) malloc(stages * sizeof(stage_info_t));
  410         worker_info = (worker_info_t *) malloc(threads * sizeof(worker_info_t));
  411 
  412         /* Set up the queue for the workers of this thread set: */
  413         for (i = 0; i < stages; i++) {
  414                 sp = &stage_info[i];
  415                 sp->stagenum = i;
  416                 pthread_mutex_init(&sp->bufq.mtx, NULL);
  417                 pthread_cond_init(&sp->bufq.cnd, NULL);
  418                 TAILQ_INIT(&sp->bufq.queue);
  419                 sp->bufq.waiters = 0;
  420                 if (i == 0) {
  421                         sp->fn = producer_fnp;
  422                         sp->name = "producer";
  423                 } else {
  424                         sp->fn = consumer_fnp;
  425                         sp->name = "consumer";
  426                 }
  427                 sp->input = &sp->bufq;
  428                 sp->output = &stage_info[(i + 1) % stages].bufq;
  429                 stage_info[i].work_todo = iterations;
  430         }
  431  
  432         /* Create the producers */
  433         for (i = 0; i < producers; i++) {
  434                 work_t  *work_array;
  435                 int     *data;
  436                 int     isize;
  437 
  438                 isize = pages * 4096 / sizeof(int);
  439                 data = (int *) malloc(buffers * pages * 4096);
  440 
  441                 /* Set up the empty work buffers */
  442                 work_array = (work_t *)  malloc(buffers * sizeof(work_t));
  443                 for (j = 0; j < buffers; j++) {
  444                         work_array[j].data = data + (isize * j);        
  445                         work_array[j].isize = isize;
  446                         work_array[j].tag = 0;
  447                         TAILQ_INSERT_TAIL(&stage_info[0].bufq.queue, &work_array[j], link);
  448                         DBG("  empty work item %p for data %p\n",
  449                                 &work_array[j], work_array[j].data);
  450                 }
  451                 wp = &worker_info[i];
  452                 wp->setnum = i + 1;
  453                 wp->stage = &stage_info[0];
  454                 if (ret = pthread_create(&wp->thread,
  455                                          NULL,
  456                                          &manager_fn,
  457                                          (void *) wp))
  458                         err(1, "pthread_create %d,%d", 0, i);
  459         }
  460 
  461         /* Create consumers */
  462         for (i = 1; i < stages; i++) {
  463                 for (j = 0; j < consumers; j++) {
  464                         wp = &worker_info[producers + (consumers*(i-1)) + j];
  465                         wp->setnum = j + 1;
  466                         wp->stage = &stage_info[i];
  467                         if (ret = pthread_create(&wp->thread,
  468                                                 NULL,
  469                                                 &manager_fn,
  470                                                 (void *) wp))
  471                                 err(1, "pthread_create %d,%d", i, j);
  472                 }
  473         }
  474 
  475         /*
  476          * We sit back anf wait for the slaves to finish.
  477          */
  478         for (k = 0; k < threads; k++) {
  479                 int     i;
  480                 int     j;
  481 
  482                 wp = &worker_info[k];
  483                 if (k < producers) {
  484                         i = 0;
  485                         j = k;
  486                 } else {
  487                         i = (k - producers) / consumers;
  488                         j = (k - producers) % consumers;
  489                 }
  490                 if(ret = pthread_join(wp->thread, (void **)&status))
  491                     err(1, "pthread_join %d,%d", i, j);
  492                 DBG("Thread %d,%d status %d\n", i, j, status);
  493         }
  494 
  495         /*
  496          * See how long the work took.
  497          */
  498         timer = mach_absolute_time() - timer;
  499         timer = timer / 1000000ULL;
  500         printf("%d.%03d seconds elapsed.\n",
  501                 (int) (timer/1000ULL), (int) (timer % 1000ULL));
  502 
  503         return 0;
  504 }

Cache object: a194433879b8d180710b28ed166a2c37


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.