The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/tools/tests/affinity/sets.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 #include <AvailabilityMacros.h>
    2 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
    3 #include </System/Library/Frameworks/System.framework/PrivateHeaders/mach/thread_policy.h>
    4 #endif
    5 #include <mach/mach.h>
    6 #include <mach/mach_error.h>
    7 #include <mach/mach_time.h>
    8 #include <pthread.h>
    9 #include <sys/queue.h>
   10 #include <stdio.h>
   11 #include <stdlib.h>
   12 #include <string.h>
   13 #include <unistd.h>
   14 #include <err.h>
   15 #include <errno.h>
   16 
   17 /*
   18  * Sets is a multithreaded test/benchmarking program to evaluate
   19  * affinity set placement in Leopard.
   20  *
   21  * The picture here, for each set, is:
   22  *  
   23  *       free                   work
   24  *    -> queue --> producer --> queue --> consumer --
   25  *   |                                               |
   26  *    -----------------------------------------------
   27  *
   28  *       <------ "stage" -----> <------ "stage" ----->
   29 
   30  * We spin off sets of production line threads (2 sets by default).
   31  * All threads of each line sets the same affinity tag (unless disabled).
   32  * By default there are 2 stage (worker) threads per production line.
   33  * A worker thread removes a buffer from an input queue, processses it and
   34  * queues it on an output queue.  By default the initial stage (producer)
   35  * writes every byte in a buffer and the other (consumer) stages read every
   36  * byte. By default the buffers are 1MB (256 pages) in size but this can be
   37  * overidden.  By default there are 2 buffers per set (again overridable).
   38  * Worker threads process (iterate over) 10000 buffers by default.
   39  *
   40  * With affinity enabled, each producer and consumer thread sets its affinity
   41  * to the set number, 1 .. N. So the threads of each set share an L2 cache.
   42  *
   43  * Buffer management uses pthread mutex/condition variables. A thread blocks
   44  * when no buffer is available on a queue and it is signaled when a buffer
   45  * is placed on an empty queue. Queues are tailq'a a la <sys/queue.h>.
   46  * The queue management is centralized in a single routine: what queues to
   47  * use as input and output and what function to call for processing is
   48  * data-driven.
   49  */
   50   
   51 pthread_mutex_t funnel;
   52 pthread_cond_t  barrier;
   53 
   54 uint64_t        timer;
   55 int             threads;
   56 int             threads_ready = 0;
   57 
   58 int             iterations = 10000;
   59 boolean_t       affinity = FALSE;
   60 boolean_t       halting = FALSE;
   61 boolean_t       cache_config = FALSE;
   62 int             verbosity = 1;
   63 
   64 typedef struct work {
   65         TAILQ_ENTRY(work)       link;
   66         int                     *data;
   67 } work_t;
   68 
   69 /*
   70  * A work queue, complete with pthread objects for its management
   71  */
   72 typedef struct work_queue {
   73         pthread_mutex_t         mtx;
   74         pthread_cond_t          cnd;
   75         TAILQ_HEAD(, work)      queue;
   76         boolean_t               waiters;
   77 } work_queue_t;
   78 
   79 /* Worker functions take a integer array and size */
   80 typedef void (worker_fn_t)(int *, int); 
   81 
   82 /* This struct controls the function of a thread */
   83 typedef struct {
   84         int                     stagenum;
   85         char                    *name;
   86         worker_fn_t             *fn;
   87         work_queue_t            *input;         
   88         work_queue_t            *output;                
   89         struct line_info        *set;
   90         pthread_t               thread;
   91         work_queue_t            bufq;
   92 } stage_info_t;
   93 
   94 /* This defines a thread set */
   95 #define WORKERS_MAX 10
   96 typedef struct line_info {
   97         int                     setnum;
   98         int                     *data;
   99         int                     isize;
  100         stage_info_t            *stage[WORKERS_MAX];
  101 } line_info_t;
  102 
  103 #define DBG(x...) do {                          \
  104         if (verbosity > 1) {                    \
  105                 pthread_mutex_lock(&funnel);    \
  106                 printf(x);                      \
  107                 pthread_mutex_unlock(&funnel);  \
  108         }                                       \
  109 } while (0)
  110 
  111 #define mutter(x...) do {                       \
  112         if (verbosity > 0) {                    \
  113                 printf(x);                      \
  114         }                                       \
  115 } while (0)
  116 
  117 #define s_if_plural(x)  (((x) > 1) ? "s" : "")
  118 
  119 static void
  120 usage()
  121 {
  122         fprintf(stderr,
  123 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  124                 "usage: sets [-a]   Turn affinity on (off)\n"
  125                 "            [-b B] Number of buffers per set/line (2)\n"
  126 #else
  127                 "usage: sets [-b B] Number of buffers per set/line (2)\n"
  128 #endif
  129                 "            [-c]   Configure for max cache performance\n"
  130                 "            [-h]   Print this\n"
  131                 "            [-i I] Number of items/buffers to process (1000)\n"
  132                 "            [-s S] Number of stages per set/line (2)\n"
  133                 "            [-t]   Halt for keyboard input to start\n"
  134                 "            [-p P] Number of pages per buffer (256=1MB)]\n"
  135                 "            [-w]   Consumer writes data\n"
  136                 "            [-v V] Level of verbosity 0..2 (1)\n"
  137                 "            [N]    Number of sets/lines (2)\n"
  138         );
  139         exit(1);
  140 }
  141 
  142 /* Trivial producer: write to each byte */
  143 void
  144 writer_fn(int *data, int isize)
  145 {
  146         int     i;
  147 
  148         for (i = 0; i < isize; i++) {
  149                 data[i] = i;
  150         }
  151 }
  152 
  153 /* Trivial consumer: read each byte */
  154 void
  155 reader_fn(int *data, int isize)
  156 {
  157         int     i;
  158         int     datum;
  159 
  160         for (i = 0; i < isize; i++) {
  161                 datum = data[i];
  162         }
  163 }
  164 
  165 /* Consumer reading and writing the buffer */
  166 void
  167 reader_writer_fn(int *data, int isize)
  168 {
  169         int     i;
  170 
  171         for (i = 0; i < isize; i++) {
  172                 data[i] += 1;
  173         }
  174 }
  175 
  176 /*
  177  * This is the central function for every thread.
  178  * For each invocation, its role is ets by (a pointer to) a stage_info_t.
  179  */
  180 void *
  181 manager_fn(void *arg)
  182 {
  183         stage_info_t                    *sp = (stage_info_t *) arg;
  184         line_info_t                     *lp = sp->set;
  185         kern_return_t                   ret;
  186         long                            iteration = 0;
  187 
  188         /*
  189          * If we're using affinity sets (we are by default)
  190          * set our tag to by our thread set number.
  191          */
  192 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  193         thread_extended_policy_data_t   epolicy;
  194         thread_affinity_policy_data_t   policy;
  195 
  196         epolicy.timeshare = FALSE;
  197         ret = thread_policy_set(
  198                         mach_thread_self(), THREAD_EXTENDED_POLICY,
  199                         (thread_policy_t) &epolicy,
  200                         THREAD_EXTENDED_POLICY_COUNT);
  201         if (ret != KERN_SUCCESS)
  202                 printf("thread_policy_set(THREAD_EXTENDED_POLICY) returned %d\n", ret);
  203         
  204         if (affinity) {
  205                 policy.affinity_tag = lp->setnum;
  206                 ret = thread_policy_set(
  207                                 mach_thread_self(), THREAD_AFFINITY_POLICY,
  208                                 (thread_policy_t) &policy,
  209                                 THREAD_AFFINITY_POLICY_COUNT);
  210                 if (ret != KERN_SUCCESS)
  211                         printf("thread_policy_set(THREAD_AFFINITY_POLICY) returned %d\n", ret);
  212         }
  213 #endif
  214 
  215         DBG("Starting %s set: %d stage: %d\n", sp->name, lp->setnum, sp->stagenum);
  216 
  217         /*
  218          * Start barrier.
  219          * The tets thread to get here releases everyone and starts the timer.
  220          */
  221         pthread_mutex_lock(&funnel);
  222         threads_ready++;
  223         if (threads_ready == threads) {
  224                 pthread_mutex_unlock(&funnel);
  225                 if (halting) {
  226                         printf("  all threads ready for process %d, "
  227                                 "hit any key to start", getpid());
  228                         fflush(stdout);
  229                         (void) getchar();
  230                 }
  231                 pthread_cond_broadcast(&barrier);
  232                 timer = mach_absolute_time();
  233         } else {
  234                 pthread_cond_wait(&barrier, &funnel);
  235                 pthread_mutex_unlock(&funnel);
  236         }
  237 
  238         do {
  239                 int             i;
  240                 work_t          *workp;
  241 
  242                 /*
  243                  * Get a buffer from the input queue.
  244                  * Block if none.
  245                  */
  246                 pthread_mutex_lock(&sp->input->mtx);
  247                 while (1) {
  248                         workp = TAILQ_FIRST(&(sp->input->queue));
  249                         if (workp != NULL)
  250                                 break;
  251                         DBG("    %s[%d,%d] iteration %d waiting for buffer\n",
  252                                 sp->name, lp->setnum, sp->stagenum, iteration);
  253                         sp->input->waiters = TRUE;
  254                         pthread_cond_wait(&sp->input->cnd, &sp->input->mtx);
  255                         sp->input->waiters = FALSE;
  256                 }
  257                 TAILQ_REMOVE(&(sp->input->queue), workp, link);
  258                 pthread_mutex_unlock(&sp->input->mtx);
  259 
  260                 DBG("  %s[%d,%d] iteration %d work %p data %p\n",
  261                         sp->name, lp->setnum, sp->stagenum, iteration, workp, workp->data);
  262 
  263                 /* Do our stuff with the buffer */
  264                 (void) sp->fn(workp->data, lp->isize);
  265 
  266                 /*
  267                  * Place the buffer on the input queue.
  268                  * Signal  waiters if required.
  269                  */
  270                 pthread_mutex_lock(&sp->output->mtx);
  271                 TAILQ_INSERT_TAIL(&(sp->output->queue), workp, link);
  272                 if (sp->output->waiters) {
  273                         DBG("    %s[%d,%d] iteration %d signaling work\n",
  274                                 sp->name, lp->setnum, sp->stagenum, iteration);
  275                         pthread_cond_signal(&sp->output->cnd);
  276                 }
  277                 pthread_mutex_unlock(&sp->output->mtx);
  278         } while (++iteration < iterations);
  279 
  280         DBG("Ending %s[%d,%d]\n", sp->name, lp->setnum, sp->stagenum);
  281 
  282         return (void *) iteration;
  283 }
  284 
  285 static void
  286 auto_config(int npages, int *nbufs, int *nsets)
  287 {
  288         int     len;
  289         int     ncpu;
  290         int64_t cacheconfig[10];
  291         int64_t cachesize[10];
  292 
  293         mutter("Autoconfiguring...\n");
  294 
  295         len = sizeof(cacheconfig);
  296         if (sysctlbyname("hw.cacheconfig",
  297                          &cacheconfig[0], &len, NULL, 0) != 0) {
  298                 printf("Unable to get hw.cacheconfig, %d\n", errno);
  299                 exit(1);
  300         }
  301         len = sizeof(cachesize);
  302         if (sysctlbyname("hw.cachesize",
  303                          &cachesize[0],  &len, NULL, 0) != 0) {
  304                 printf("Unable to get hw.cachesize, %d\n", errno);
  305                 exit(1);
  306         }
  307 
  308         /*
  309          * Calculate number of buffers of size pages*4096 bytes
  310          * fit into 90% of an L2 cache.
  311          */
  312         *nbufs = cachesize[2] * 9 / (npages * 4096 * 10);
  313         mutter("  L2 cache %qd bytes: "
  314                 "using %d buffers of size %d bytes\n",
  315                 cachesize[2], *nbufs, (npages * 4096));
  316 
  317         /* 
  318          * Calcalute how many sets:
  319          */
  320         *nsets = cacheconfig[0]/cacheconfig[2];
  321         mutter("  %qd cpus; %qd cpus per L2 cache: using %d sets\n",
  322                 cacheconfig[0], cacheconfig[2], *nsets);
  323 }
  324 
  325 void (*producer_fnp)(int *data, int isize) = &writer_fn;
  326 void (*consumer_fnp)(int *data, int isize) = &reader_fn;
  327 
  328 int
  329 main(int argc, char *argv[])
  330 {
  331         int                     i;
  332         int                     j;
  333         int                     pages = 256; /* 1MB */
  334         int                     buffers = 2;
  335         int                     sets = 2;
  336         int                     stages = 2;
  337         int                     *status;
  338         line_info_t             *line_info;
  339         line_info_t             *lp;
  340         stage_info_t            *stage_info;
  341         stage_info_t            *sp;
  342         kern_return_t           ret;
  343         int                     c;
  344 
  345         /* Do switch parsing: */
  346         while ((c = getopt (argc, argv, "ab:chi:p:s:twv:")) != -1) {
  347                 switch (c) {
  348                 case 'a':
  349 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_5_AND_LATER
  350                         affinity = !affinity;
  351                         break;
  352 #else
  353                         usage();
  354 #endif
  355                 case 'b':
  356                         buffers = atoi(optarg);
  357                         break;
  358                 case 'c':
  359                         cache_config = TRUE;
  360                         break;
  361                 case 'i':
  362                         iterations = atoi(optarg);
  363                         break;
  364                 case 'p':
  365                         pages = atoi(optarg);
  366                         break;
  367                 case 's':
  368                         stages = atoi(optarg);
  369                         if (stages >= WORKERS_MAX)
  370                                 usage();
  371                         break;
  372                 case 't':
  373                         halting = TRUE;
  374                         break;
  375                 case 'w':
  376                         consumer_fnp = &reader_writer_fn;
  377                         break;
  378                 case 'v':
  379                         verbosity = atoi(optarg);
  380                         break;
  381                 case '?':
  382                 case 'h':
  383                 default:
  384                         usage();
  385                 }
  386         }
  387         argc -= optind; argv += optind;
  388         if (argc > 0)
  389                 sets = atoi(*argv);
  390 
  391         if (cache_config)
  392                 auto_config(pages, &buffers, &sets);
  393 
  394         pthread_mutex_init(&funnel, NULL);
  395         pthread_cond_init(&barrier, NULL);
  396 
  397         /*
  398          * Fire up the worker threads.
  399          */
  400         threads = sets * stages;
  401         mutter("Launching %d set%s of %d threads with %saffinity, "
  402                         "consumer reads%s data\n",
  403                 sets, s_if_plural(sets), stages, affinity? "": "no ",
  404                 (consumer_fnp == &reader_writer_fn)? " and writes" : "");
  405         if (pages < 256)
  406                 mutter("  %dkB bytes per buffer, ", pages * 4);
  407         else
  408                 mutter("  %dMB bytes per buffer, ", pages / 256);
  409         mutter("%d buffer%s per set ",
  410                 buffers, s_if_plural(buffers));
  411         if (buffers * pages < 256)
  412                 mutter("(total %dkB)\n", buffers * pages * 4);
  413         else
  414                 mutter("(total %dMB)\n", buffers * pages / 256);
  415         mutter("  processing %d buffer%s...\n",
  416                 iterations, s_if_plural(iterations));
  417         line_info = (line_info_t *) malloc(sets * sizeof(line_info_t));
  418         stage_info = (stage_info_t *) malloc(sets * stages * sizeof(stage_info_t));
  419         for (i = 0; i < sets; i++) {
  420                 work_t  *work_array;
  421 
  422                 lp = &line_info[i];
  423 
  424                 lp->setnum = i + 1;
  425                 lp->isize = pages * 4096 / sizeof(int);
  426                 lp->data = (int *) malloc(buffers * pages * 4096);
  427 
  428                 /* Set up the queue for the workers of this thread set: */
  429                 for (j = 0; j < stages; j++) {
  430                         sp = &stage_info[(i*stages) + j];
  431                         sp->stagenum = j;
  432                         sp->set = lp;
  433                         lp->stage[j] = sp;
  434                         pthread_mutex_init(&sp->bufq.mtx, NULL);
  435                         pthread_cond_init(&sp->bufq.cnd, NULL);
  436                         TAILQ_INIT(&sp->bufq.queue);
  437                         sp->bufq.waiters = FALSE;
  438                 }
  439 
  440                 /*
  441                  * Take a second pass through the stages
  442                  * to define what the workers are and to interconnect their input/outputs
  443                  */
  444                 for (j = 0; j < stages; j++) {
  445                         sp = lp->stage[j];
  446                         if (j == 0) {
  447                                 sp->fn = producer_fnp;
  448                                 sp->name = "producer";
  449                         } else {
  450                                 sp->fn = consumer_fnp;
  451                                 sp->name = "consumer";
  452                         }
  453                         sp->input = &lp->stage[j]->bufq;
  454                         sp->output = &lp->stage[(j + 1) % stages]->bufq;
  455                 }
  456 
  457                 /* Set up the buffers on the first worker of the set. */
  458                 work_array = (work_t *)  malloc(buffers * sizeof(work_t));
  459                 for (j = 0; j < buffers; j++) {
  460                         work_array[j].data = lp->data + (lp->isize * j);        
  461                         TAILQ_INSERT_TAIL(&lp->stage[0]->bufq.queue, &work_array[j], link);
  462                         DBG("  empty work item %p for set %d data %p\n",
  463                                 &work_array[j], i, work_array[j].data);
  464                 }
  465 
  466                 /* Create this set of threads */
  467                 for (j = 0; j < stages; j++) {
  468                         if (ret = pthread_create(&lp->stage[j]->thread, NULL,
  469                                         &manager_fn,
  470                                         (void *) lp->stage[j]))
  471                         err(1, "pthread_create %d,%d", i, j);
  472                 }
  473         }
  474 
  475         /*
  476          * We sit back anf wait for the slave to finish.
  477          */
  478         for (i = 0; i < sets; i++) {
  479                 lp = &line_info[i];
  480                 for (j = 0; j < stages; j++) {
  481                         if(ret = pthread_join(lp->stage[j]->thread, (void **)&status))
  482                             err(1, "pthread_join %d,%d", i, j);
  483                         DBG("Thread %d,%d status %d\n", i, j, status);
  484                 }
  485         }
  486 
  487         /*
  488          * See how long the work took.
  489          */
  490         timer = mach_absolute_time() - timer;
  491         timer = timer / 1000000ULL;
  492         printf("%d.%03d seconds elapsed.\n",
  493                 (int) (timer/1000ULL), (int) (timer % 1000ULL));
  494 
  495         return 0;
  496 }

Cache object: 4ce96801184a2830564917dcc43b8902


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.