The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/port/qio.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 #include        "u.h"
    2 #include        "../port/lib.h"
    3 #include        "mem.h"
    4 #include        "dat.h"
    5 #include        "fns.h"
    6 #include        "../port/error.h"
    7 
    8 static ulong padblockcnt;
    9 static ulong concatblockcnt;
   10 static ulong pullupblockcnt;
   11 static ulong copyblockcnt;
   12 static ulong consumecnt;
   13 static ulong producecnt;
   14 static ulong qcopycnt;
   15 
   16 static int debugging;
   17 
   18 #define QDEBUG  if(0)
   19 
   20 /*
   21  *  IO queues
   22  */
   23 typedef struct Queue    Queue;
   24 
   25 struct Queue
   26 {
   27         Lock;
   28 
   29         Block*  bfirst;         /* buffer */
   30         Block*  blast;
   31 
   32         int     len;            /* bytes allocated to queue */
   33         int     dlen;           /* data bytes in queue */
   34         int     limit;          /* max bytes in queue */
   35         int     inilim;         /* initial limit */
   36         int     state;
   37         int     noblock;        /* true if writes return immediately when q full */
   38         int     eof;            /* number of eofs read by user */
   39 
   40         void    (*kick)(void*); /* restart output */
   41         void    (*bypass)(void*, Block*);       /* bypass queue altogether */
   42         void*   arg;            /* argument to kick */
   43 
   44         QLock   rlock;          /* mutex for reading processes */
   45         Rendez  rr;             /* process waiting to read */
   46         QLock   wlock;          /* mutex for writing processes */
   47         Rendez  wr;             /* process waiting to write */
   48 
   49         char    err[ERRMAX];
   50 };
   51 
   52 enum
   53 {
   54         Maxatomic       = 64*1024,
   55 };
   56 
   57 uint    qiomaxatomic = Maxatomic;
   58 
   59 void
   60 ixsummary(void)
   61 {
   62         debugging ^= 1;
   63         iallocsummary();
   64         print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
   65                 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
   66         print("consume %lud, produce %lud, qcopy %lud\n",
   67                 consumecnt, producecnt, qcopycnt);
   68 }
   69 
   70 /*
   71  *  free a list of blocks
   72  */
   73 void
   74 freeblist(Block *b)
   75 {
   76         Block *next;
   77 
   78         for(; b != 0; b = next){
   79                 next = b->next;
   80                 if(b->ref == 1)
   81                         b->next = nil;
   82                 freeb(b);
   83         }
   84 }
   85 
   86 /*
   87  *  pad a block to the front (or the back if size is negative)
   88  */
   89 Block*
   90 padblock(Block *bp, int size)
   91 {
   92         int n;
   93         Block *nbp;
   94 
   95         QDEBUG checkb(bp, "padblock 1");
   96         if(size >= 0){
   97                 if(bp->rp - bp->base >= size){
   98                         bp->rp -= size;
   99                         return bp;
  100                 }
  101 
  102                 if(bp->next)
  103                         panic("padblock %#p", getcallerpc(&bp));
  104                 n = BLEN(bp);
  105                 padblockcnt++;
  106                 nbp = allocb(size+n);
  107                 nbp->rp += size;
  108                 nbp->wp = nbp->rp;
  109                 memmove(nbp->wp, bp->rp, n);
  110                 nbp->wp += n;
  111                 freeb(bp);
  112                 nbp->rp -= size;
  113         } else {
  114                 size = -size;
  115 
  116                 if(bp->next)
  117                         panic("padblock %#p", getcallerpc(&bp));
  118 
  119                 if(bp->lim - bp->wp >= size)
  120                         return bp;
  121 
  122                 n = BLEN(bp);
  123                 padblockcnt++;
  124                 nbp = allocb(size+n);
  125                 memmove(nbp->wp, bp->rp, n);
  126                 nbp->wp += n;
  127                 freeb(bp);
  128         }
  129         QDEBUG checkb(nbp, "padblock 1");
  130         return nbp;
  131 }
  132 
  133 /*
  134  *  return count of bytes in a string of blocks
  135  */
  136 int
  137 blocklen(Block *bp)
  138 {
  139         int len;
  140 
  141         len = 0;
  142         while(bp) {
  143                 len += BLEN(bp);
  144                 bp = bp->next;
  145         }
  146         return len;
  147 }
  148 
  149 /*
  150  * return count of space in blocks
  151  */
  152 int
  153 blockalloclen(Block *bp)
  154 {
  155         int len;
  156 
  157         len = 0;
  158         while(bp) {
  159                 len += BALLOC(bp);
  160                 bp = bp->next;
  161         }
  162         return len;
  163 }
  164 
  165 /*
  166  *  copy the  string of blocks into
  167  *  a single block and free the string
  168  */
  169 Block*
  170 concatblock(Block *bp)
  171 {
  172         int len;
  173         Block *nb, *f;
  174 
  175         if(bp->next == 0)
  176                 return bp;
  177 
  178         nb = allocb(blocklen(bp));
  179         for(f = bp; f; f = f->next) {
  180                 len = BLEN(f);
  181                 memmove(nb->wp, f->rp, len);
  182                 nb->wp += len;
  183         }
  184         concatblockcnt += BLEN(nb);
  185         freeblist(bp);
  186         QDEBUG checkb(nb, "concatblock 1");
  187         return nb;
  188 }
  189 
  190 /*
  191  *  make sure the first block has at least n bytes
  192  */
  193 Block*
  194 pullupblock(Block *bp, int n)
  195 {
  196         int i;
  197         Block *nbp;
  198 
  199         /*
  200          *  this should almost always be true, it's
  201          *  just to avoid every caller checking.
  202          */
  203         if(BLEN(bp) >= n)
  204                 return bp;
  205 
  206         /*
  207          *  if not enough room in the first block,
  208          *  add another to the front of the list.
  209          */
  210         if(bp->lim - bp->rp < n){
  211                 nbp = allocb(n);
  212                 nbp->next = bp;
  213                 bp = nbp;
  214         }
  215 
  216         /*
  217          *  copy bytes from the trailing blocks into the first
  218          */
  219         n -= BLEN(bp);
  220         while(nbp = bp->next){
  221                 i = BLEN(nbp);
  222                 if(i > n) {
  223                         memmove(bp->wp, nbp->rp, n);
  224                         pullupblockcnt++;
  225                         bp->wp += n;
  226                         nbp->rp += n;
  227                         QDEBUG checkb(bp, "pullupblock 1");
  228                         return bp;
  229                 } else {
  230                         /* shouldn't happen but why crash if it does */
  231                         if(i < 0){
  232                                 print("pullup negative length packet, called from %#p\n",
  233                                         getcallerpc(&bp));
  234                                 i = 0;
  235                         }
  236                         memmove(bp->wp, nbp->rp, i);
  237                         pullupblockcnt++;
  238                         bp->wp += i;
  239                         bp->next = nbp->next;
  240                         nbp->next = 0;
  241                         freeb(nbp);
  242                         n -= i;
  243                         if(n == 0){
  244                                 QDEBUG checkb(bp, "pullupblock 2");
  245                                 return bp;
  246                         }
  247                 }
  248         }
  249         freeb(bp);
  250         return 0;
  251 }
  252 
  253 /*
  254  *  make sure the first block has at least n bytes
  255  */
  256 Block*
  257 pullupqueue(Queue *q, int n)
  258 {
  259         Block *b;
  260 
  261         if(BLEN(q->bfirst) >= n)
  262                 return q->bfirst;
  263         q->bfirst = pullupblock(q->bfirst, n);
  264         for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
  265                 ;
  266         q->blast = b;
  267         return q->bfirst;
  268 }
  269 
  270 /*
  271  *  trim to len bytes starting at offset
  272  */
  273 Block *
  274 trimblock(Block *bp, int offset, int len)
  275 {
  276         ulong l;
  277         Block *nb, *startb;
  278 
  279         QDEBUG checkb(bp, "trimblock 1");
  280         if(blocklen(bp) < offset+len) {
  281                 freeblist(bp);
  282                 return nil;
  283         }
  284 
  285         while((l = BLEN(bp)) < offset) {
  286                 offset -= l;
  287                 nb = bp->next;
  288                 bp->next = nil;
  289                 freeb(bp);
  290                 bp = nb;
  291         }
  292 
  293         startb = bp;
  294         bp->rp += offset;
  295 
  296         while((l = BLEN(bp)) < len) {
  297                 len -= l;
  298                 bp = bp->next;
  299         }
  300 
  301         bp->wp -= (BLEN(bp) - len);
  302 
  303         if(bp->next) {
  304                 freeblist(bp->next);
  305                 bp->next = nil;
  306         }
  307 
  308         return startb;
  309 }
  310 
  311 /*
  312  *  copy 'count' bytes into a new block
  313  */
  314 Block*
  315 copyblock(Block *bp, int count)
  316 {
  317         int l;
  318         Block *nbp;
  319 
  320         QDEBUG checkb(bp, "copyblock 0");
  321         nbp = allocb(count);
  322         for(; count > 0 && bp != 0; bp = bp->next){
  323                 l = BLEN(bp);
  324                 if(l > count)
  325                         l = count;
  326                 memmove(nbp->wp, bp->rp, l);
  327                 nbp->wp += l;
  328                 count -= l;
  329         }
  330         if(count > 0){
  331                 memset(nbp->wp, 0, count);
  332                 nbp->wp += count;
  333         }
  334         copyblockcnt++;
  335         QDEBUG checkb(nbp, "copyblock 1");
  336 
  337         return nbp;
  338 }
  339 
  340 Block*
  341 adjustblock(Block* bp, int len)
  342 {
  343         int n;
  344         Block *nbp;
  345 
  346         if(len < 0){
  347                 freeb(bp);
  348                 return nil;
  349         }
  350 
  351         if(bp->rp+len > bp->lim){
  352                 nbp = copyblock(bp, len);
  353                 freeblist(bp);
  354                 QDEBUG checkb(nbp, "adjustblock 1");
  355 
  356                 return nbp;
  357         }
  358 
  359         n = BLEN(bp);
  360         if(len > n)
  361                 memset(bp->wp, 0, len-n);
  362         bp->wp = bp->rp+len;
  363         QDEBUG checkb(bp, "adjustblock 2");
  364 
  365         return bp;
  366 }
  367 
  368 
  369 /*
  370  *  throw away up to count bytes from a
  371  *  list of blocks.  Return count of bytes
  372  *  thrown away.
  373  */
  374 int
  375 pullblock(Block **bph, int count)
  376 {
  377         Block *bp;
  378         int n, bytes;
  379 
  380         bytes = 0;
  381         if(bph == nil)
  382                 return 0;
  383 
  384         while(*bph != nil && count != 0) {
  385                 bp = *bph;
  386                 n = BLEN(bp);
  387                 if(count < n)
  388                         n = count;
  389                 bytes += n;
  390                 count -= n;
  391                 bp->rp += n;
  392                 QDEBUG checkb(bp, "pullblock ");
  393                 if(BLEN(bp) == 0) {
  394                         *bph = bp->next;
  395                         bp->next = nil;
  396                         freeb(bp);
  397                 }
  398         }
  399         return bytes;
  400 }
  401 
  402 /*
  403  *  get next block from a queue, return null if nothing there
  404  */
  405 Block*
  406 qget(Queue *q)
  407 {
  408         int dowakeup;
  409         Block *b;
  410 
  411         /* sync with qwrite */
  412         ilock(q);
  413 
  414         b = q->bfirst;
  415         if(b == nil){
  416                 q->state |= Qstarve;
  417                 iunlock(q);
  418                 return nil;
  419         }
  420         q->bfirst = b->next;
  421         b->next = 0;
  422         q->len -= BALLOC(b);
  423         q->dlen -= BLEN(b);
  424         QDEBUG checkb(b, "qget");
  425 
  426         /* if writer flow controlled, restart */
  427         if((q->state & Qflow) && q->len < q->limit/2){
  428                 q->state &= ~Qflow;
  429                 dowakeup = 1;
  430         } else
  431                 dowakeup = 0;
  432 
  433         iunlock(q);
  434 
  435         if(dowakeup)
  436                 wakeup(&q->wr);
  437 
  438         return b;
  439 }
  440 
  441 /*
  442  *  throw away the next 'len' bytes in the queue
  443  */
  444 int
  445 qdiscard(Queue *q, int len)
  446 {
  447         Block *b;
  448         int dowakeup, n, sofar;
  449 
  450         ilock(q);
  451         for(sofar = 0; sofar < len; sofar += n){
  452                 b = q->bfirst;
  453                 if(b == nil)
  454                         break;
  455                 QDEBUG checkb(b, "qdiscard");
  456                 n = BLEN(b);
  457                 if(n <= len - sofar){
  458                         q->bfirst = b->next;
  459                         b->next = 0;
  460                         q->len -= BALLOC(b);
  461                         q->dlen -= BLEN(b);
  462                         freeb(b);
  463                 } else {
  464                         n = len - sofar;
  465                         b->rp += n;
  466                         q->dlen -= n;
  467                 }
  468         }
  469 
  470         /*
  471          *  if writer flow controlled, restart
  472          *
  473          *  This used to be
  474          *      q->len < q->limit/2
  475          *  but it slows down tcp too much for certain write sizes.
  476          *  I really don't understand it completely.  It may be
  477          *  due to the queue draining so fast that the transmission
  478          *  stalls waiting for the app to produce more data.  - presotto
  479          */
  480         if((q->state & Qflow) && q->len < q->limit){
  481                 q->state &= ~Qflow;
  482                 dowakeup = 1;
  483         } else
  484                 dowakeup = 0;
  485 
  486         iunlock(q);
  487 
  488         if(dowakeup)
  489                 wakeup(&q->wr);
  490 
  491         return sofar;
  492 }
  493 
  494 /*
  495  *  Interrupt level copy out of a queue, return # bytes copied.
  496  */
  497 int
  498 qconsume(Queue *q, void *vp, int len)
  499 {
  500         Block *b;
  501         int n, dowakeup;
  502         uchar *p = vp;
  503         Block *tofree = nil;
  504 
  505         /* sync with qwrite */
  506         ilock(q);
  507 
  508         for(;;) {
  509                 b = q->bfirst;
  510                 if(b == 0){
  511                         q->state |= Qstarve;
  512                         iunlock(q);
  513                         return -1;
  514                 }
  515                 QDEBUG checkb(b, "qconsume 1");
  516 
  517                 n = BLEN(b);
  518                 if(n > 0)
  519                         break;
  520                 q->bfirst = b->next;
  521                 q->len -= BALLOC(b);
  522 
  523                 /* remember to free this */
  524                 b->next = tofree;
  525                 tofree = b;
  526         };
  527 
  528         if(n < len)
  529                 len = n;
  530         memmove(p, b->rp, len);
  531         consumecnt += n;
  532         b->rp += len;
  533         q->dlen -= len;
  534 
  535         /* discard the block if we're done with it */
  536         if((q->state & Qmsg) || len == n){
  537                 q->bfirst = b->next;
  538                 b->next = 0;
  539                 q->len -= BALLOC(b);
  540                 q->dlen -= BLEN(b);
  541 
  542                 /* remember to free this */
  543                 b->next = tofree;
  544                 tofree = b;
  545         }
  546 
  547         /* if writer flow controlled, restart */
  548         if((q->state & Qflow) && q->len < q->limit/2){
  549                 q->state &= ~Qflow;
  550                 dowakeup = 1;
  551         } else
  552                 dowakeup = 0;
  553 
  554         iunlock(q);
  555 
  556         if(dowakeup)
  557                 wakeup(&q->wr);
  558 
  559         if(tofree != nil)
  560                 freeblist(tofree);
  561 
  562         return len;
  563 }
  564 
  565 int
  566 qpass(Queue *q, Block *b)
  567 {
  568         int dlen, len, dowakeup;
  569 
  570         /* sync with qread */
  571         dowakeup = 0;
  572         ilock(q);
  573         if(q->len >= q->limit){
  574                 freeblist(b);
  575                 iunlock(q);
  576                 return -1;
  577         }
  578         if(q->state & Qclosed){
  579                 len = BALLOC(b);
  580                 freeblist(b);
  581                 iunlock(q);
  582                 return len;
  583         }
  584 
  585         /* add buffer to queue */
  586         if(q->bfirst)
  587                 q->blast->next = b;
  588         else
  589                 q->bfirst = b;
  590         len = BALLOC(b);
  591         dlen = BLEN(b);
  592         QDEBUG checkb(b, "qpass");
  593         while(b->next){
  594                 b = b->next;
  595                 QDEBUG checkb(b, "qpass");
  596                 len += BALLOC(b);
  597                 dlen += BLEN(b);
  598         }
  599         q->blast = b;
  600         q->len += len;
  601         q->dlen += dlen;
  602 
  603         if(q->len >= q->limit/2)
  604                 q->state |= Qflow;
  605 
  606         if(q->state & Qstarve){
  607                 q->state &= ~Qstarve;
  608                 dowakeup = 1;
  609         }
  610         iunlock(q);
  611 
  612         if(dowakeup)
  613                 wakeup(&q->rr);
  614 
  615         return len;
  616 }
  617 
  618 int
  619 qpassnolim(Queue *q, Block *b)
  620 {
  621         int dlen, len, dowakeup;
  622 
  623         /* sync with qread */
  624         dowakeup = 0;
  625         ilock(q);
  626 
  627         if(q->state & Qclosed){
  628                 freeblist(b);
  629                 iunlock(q);
  630                 return BALLOC(b);
  631         }
  632 
  633         /* add buffer to queue */
  634         if(q->bfirst)
  635                 q->blast->next = b;
  636         else
  637                 q->bfirst = b;
  638         len = BALLOC(b);
  639         dlen = BLEN(b);
  640         QDEBUG checkb(b, "qpass");
  641         while(b->next){
  642                 b = b->next;
  643                 QDEBUG checkb(b, "qpass");
  644                 len += BALLOC(b);
  645                 dlen += BLEN(b);
  646         }
  647         q->blast = b;
  648         q->len += len;
  649         q->dlen += dlen;
  650 
  651         if(q->len >= q->limit/2)
  652                 q->state |= Qflow;
  653 
  654         if(q->state & Qstarve){
  655                 q->state &= ~Qstarve;
  656                 dowakeup = 1;
  657         }
  658         iunlock(q);
  659 
  660         if(dowakeup)
  661                 wakeup(&q->rr);
  662 
  663         return len;
  664 }
  665 
  666 /*
  667  *  if the allocated space is way out of line with the used
  668  *  space, reallocate to a smaller block
  669  */
  670 Block*
  671 packblock(Block *bp)
  672 {
  673         Block **l, *nbp;
  674         int n;
  675 
  676         for(l = &bp; *l; l = &(*l)->next){
  677                 nbp = *l;
  678                 n = BLEN(nbp);
  679                 if((n<<2) < BALLOC(nbp)){
  680                         *l = allocb(n);
  681                         memmove((*l)->wp, nbp->rp, n);
  682                         (*l)->wp += n;
  683                         (*l)->next = nbp->next;
  684                         freeb(nbp);
  685                 }
  686         }
  687 
  688         return bp;
  689 }
  690 
  691 int
  692 qproduce(Queue *q, void *vp, int len)
  693 {
  694         Block *b;
  695         int dowakeup;
  696         uchar *p = vp;
  697 
  698         /* sync with qread */
  699         dowakeup = 0;
  700         ilock(q);
  701 
  702         /* no waiting receivers, room in buffer? */
  703         if(q->len >= q->limit){
  704                 q->state |= Qflow;
  705                 iunlock(q);
  706                 return -1;
  707         }
  708 
  709         /* save in buffer */
  710         b = iallocb(len);
  711         if(b == 0){
  712                 iunlock(q);
  713                 return 0;
  714         }
  715         memmove(b->wp, p, len);
  716         producecnt += len;
  717         b->wp += len;
  718         if(q->bfirst)
  719                 q->blast->next = b;
  720         else
  721                 q->bfirst = b;
  722         q->blast = b;
  723         /* b->next = 0; done by iallocb() */
  724         q->len += BALLOC(b);
  725         q->dlen += BLEN(b);
  726         QDEBUG checkb(b, "qproduce");
  727 
  728         if(q->state & Qstarve){
  729                 q->state &= ~Qstarve;
  730                 dowakeup = 1;
  731         }
  732 
  733         if(q->len >= q->limit)
  734                 q->state |= Qflow;
  735         iunlock(q);
  736 
  737         if(dowakeup)
  738                 wakeup(&q->rr);
  739 
  740         return len;
  741 }
  742 
  743 /*
  744  *  copy from offset in the queue
  745  */
  746 Block*
  747 qcopy(Queue *q, int len, ulong offset)
  748 {
  749         int sofar;
  750         int n;
  751         Block *b, *nb;
  752         uchar *p;
  753 
  754         nb = allocb(len);
  755 
  756         ilock(q);
  757 
  758         /* go to offset */
  759         b = q->bfirst;
  760         for(sofar = 0; ; sofar += n){
  761                 if(b == nil){
  762                         iunlock(q);
  763                         return nb;
  764                 }
  765                 n = BLEN(b);
  766                 if(sofar + n > offset){
  767                         p = b->rp + offset - sofar;
  768                         n -= offset - sofar;
  769                         break;
  770                 }
  771                 QDEBUG checkb(b, "qcopy");
  772                 b = b->next;
  773         }
  774 
  775         /* copy bytes from there */
  776         for(sofar = 0; sofar < len;){
  777                 if(n > len - sofar)
  778                         n = len - sofar;
  779                 memmove(nb->wp, p, n);
  780                 qcopycnt += n;
  781                 sofar += n;
  782                 nb->wp += n;
  783                 b = b->next;
  784                 if(b == nil)
  785                         break;
  786                 n = BLEN(b);
  787                 p = b->rp;
  788         }
  789         iunlock(q);
  790 
  791         return nb;
  792 }
  793 
  794 /*
  795  *  called by non-interrupt code
  796  */
  797 Queue*
  798 qopen(int limit, int msg, void (*kick)(void*), void *arg)
  799 {
  800         Queue *q;
  801 
  802         q = malloc(sizeof(Queue));
  803         if(q == 0)
  804                 return 0;
  805 
  806         q->limit = q->inilim = limit;
  807         q->kick = kick;
  808         q->arg = arg;
  809         q->state = msg;
  810         
  811         q->state |= Qstarve;
  812         q->eof = 0;
  813         q->noblock = 0;
  814 
  815         return q;
  816 }
  817 
  818 /* open a queue to be bypassed */
  819 Queue*
  820 qbypass(void (*bypass)(void*, Block*), void *arg)
  821 {
  822         Queue *q;
  823 
  824         q = malloc(sizeof(Queue));
  825         if(q == 0)
  826                 return 0;
  827 
  828         q->limit = 0;
  829         q->arg = arg;
  830         q->bypass = bypass;
  831         q->state = 0;
  832 
  833         return q;
  834 }
  835 
  836 static int
  837 notempty(void *a)
  838 {
  839         Queue *q = a;
  840 
  841         return (q->state & Qclosed) || q->bfirst != 0;
  842 }
  843 
  844 /*
  845  *  wait for the queue to be non-empty or closed.
  846  *  called with q ilocked.
  847  */
  848 static int
  849 qwait(Queue *q)
  850 {
  851         /* wait for data */
  852         for(;;){
  853                 if(q->bfirst != nil)
  854                         break;
  855 
  856                 if(q->state & Qclosed){
  857                         if(++q->eof > 3)
  858                                 return -1;
  859                         if(*q->err && strcmp(q->err, Ehungup) != 0)
  860                                 return -1;
  861                         return 0;
  862                 }
  863 
  864                 q->state |= Qstarve;    /* flag requesting producer to wake me */
  865                 iunlock(q);
  866                 sleep(&q->rr, notempty, q);
  867                 ilock(q);
  868         }
  869         return 1;
  870 }
  871 
  872 /*
  873  * add a block list to a queue
  874  */
  875 void
  876 qaddlist(Queue *q, Block *b)
  877 {
  878         /* queue the block */
  879         if(q->bfirst)
  880                 q->blast->next = b;
  881         else
  882                 q->bfirst = b;
  883         q->len += blockalloclen(b);
  884         q->dlen += blocklen(b);
  885         while(b->next)
  886                 b = b->next;
  887         q->blast = b;
  888 }
  889 
  890 /*
  891  *  called with q ilocked
  892  */
  893 Block*
  894 qremove(Queue *q)
  895 {
  896         Block *b;
  897 
  898         b = q->bfirst;
  899         if(b == nil)
  900                 return nil;
  901         q->bfirst = b->next;
  902         b->next = nil;
  903         q->dlen -= BLEN(b);
  904         q->len -= BALLOC(b);
  905         QDEBUG checkb(b, "qremove");
  906         return b;
  907 }
  908 
  909 /*
  910  *  copy the contents of a string of blocks into
  911  *  memory.  emptied blocks are freed.  return
  912  *  pointer to first unconsumed block.
  913  */
  914 Block*
  915 bl2mem(uchar *p, Block *b, int n)
  916 {
  917         int i;
  918         Block *next;
  919 
  920         for(; b != nil; b = next){
  921                 i = BLEN(b);
  922                 if(i > n){
  923                         memmove(p, b->rp, n);
  924                         b->rp += n;
  925                         return b;
  926                 }
  927                 memmove(p, b->rp, i);
  928                 n -= i;
  929                 p += i;
  930                 b->rp += i;
  931                 next = b->next;
  932                 freeb(b);
  933         }
  934         return nil;
  935 }
  936 
  937 /*
  938  *  copy the contents of memory into a string of blocks.
  939  *  return nil on error.
  940  */
  941 Block*
  942 mem2bl(uchar *p, int len)
  943 {
  944         int n;
  945         Block *b, *first, **l;
  946 
  947         first = nil;
  948         l = &first;
  949         if(waserror()){
  950                 freeblist(first);
  951                 nexterror();
  952         }
  953         do {
  954                 n = len;
  955                 if(n > Maxatomic)
  956                         n = Maxatomic;
  957 
  958                 *l = b = allocb(n);
  959                 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
  960                 memmove(b->wp, p, n);
  961                 b->wp += n;
  962                 p += n;
  963                 len -= n;
  964                 l = &b->next;
  965         } while(len > 0);
  966         poperror();
  967 
  968         return first;
  969 }
  970 
  971 /*
  972  *  put a block back to the front of the queue
  973  *  called with q ilocked
  974  */
  975 void
  976 qputback(Queue *q, Block *b)
  977 {
  978         b->next = q->bfirst;
  979         if(q->bfirst == nil)
  980                 q->blast = b;
  981         q->bfirst = b;
  982         q->len += BALLOC(b);
  983         q->dlen += BLEN(b);
  984 }
  985 
  986 /*
  987  *  flow control, get producer going again
  988  *  called with q ilocked
  989  */
  990 static void
  991 qwakeup_iunlock(Queue *q)
  992 {
  993         int dowakeup = 0;
  994 
  995         /* if writer flow controlled, restart */
  996         if((q->state & Qflow) && q->len < q->limit/2){
  997                 q->state &= ~Qflow;
  998                 dowakeup = 1;
  999         }
 1000 
 1001         iunlock(q);
 1002 
 1003         /* wakeup flow controlled writers */
 1004         if(dowakeup){
 1005                 if(q->kick)
 1006                         q->kick(q->arg);
 1007                 wakeup(&q->wr);
 1008         }
 1009 }
 1010 
 1011 /*
 1012  *  get next block from a queue (up to a limit)
 1013  */
 1014 Block*
 1015 qbread(Queue *q, int len)
 1016 {
 1017         Block *b, *nb;
 1018         int n;
 1019 
 1020         qlock(&q->rlock);
 1021         if(waserror()){
 1022                 qunlock(&q->rlock);
 1023                 nexterror();
 1024         }
 1025 
 1026         ilock(q);
 1027         switch(qwait(q)){
 1028         case 0:
 1029                 /* queue closed */
 1030                 iunlock(q);
 1031                 qunlock(&q->rlock);
 1032                 poperror();
 1033                 return nil;
 1034         case -1:
 1035                 /* multiple reads on a closed queue */
 1036                 iunlock(q);
 1037                 error(q->err);
 1038         }
 1039 
 1040         /* if we get here, there's at least one block in the queue */
 1041         b = qremove(q);
 1042         n = BLEN(b);
 1043 
 1044         /* split block if it's too big and this is not a message queue */
 1045         nb = b;
 1046         if(n > len){
 1047                 if((q->state&Qmsg) == 0){
 1048                         n -= len;
 1049                         b = allocb(n);
 1050                         memmove(b->wp, nb->rp+len, n);
 1051                         b->wp += n;
 1052                         qputback(q, b);
 1053                 }
 1054                 nb->wp = nb->rp + len;
 1055         }
 1056 
 1057         /* restart producer */
 1058         qwakeup_iunlock(q);
 1059 
 1060         poperror();
 1061         qunlock(&q->rlock);
 1062         return nb;
 1063 }
 1064 
 1065 /*
 1066  *  read a queue.  if no data is queued, post a Block
 1067  *  and wait on its Rendez.
 1068  */
 1069 long
 1070 qread(Queue *q, void *vp, int len)
 1071 {
 1072         Block *b, *first, **l;
 1073         int m, n;
 1074 
 1075         qlock(&q->rlock);
 1076         if(waserror()){
 1077                 qunlock(&q->rlock);
 1078                 nexterror();
 1079         }
 1080 
 1081         ilock(q);
 1082 again:
 1083         switch(qwait(q)){
 1084         case 0:
 1085                 /* queue closed */
 1086                 iunlock(q);
 1087                 qunlock(&q->rlock);
 1088                 poperror();
 1089                 return 0;
 1090         case -1:
 1091                 /* multiple reads on a closed queue */
 1092                 iunlock(q);
 1093                 error(q->err);
 1094         }
 1095 
 1096         /* if we get here, there's at least one block in the queue */
 1097         if(q->state & Qcoalesce){
 1098                 /* when coalescing, 0 length blocks just go away */
 1099                 b = q->bfirst;
 1100                 if(BLEN(b) <= 0){
 1101                         freeb(qremove(q));
 1102                         goto again;
 1103                 }
 1104 
 1105                 /*  grab the first block plus as many
 1106                  *  following blocks as will completely
 1107                  *  fit in the read.
 1108                  */
 1109                 n = 0;
 1110                 l = &first;
 1111                 m = BLEN(b);
 1112                 for(;;) {
 1113                         *l = qremove(q);
 1114                         l = &b->next;
 1115                         n += m;
 1116 
 1117                         b = q->bfirst;
 1118                         if(b == nil)
 1119                                 break;
 1120                         m = BLEN(b);
 1121                         if(n+m > len)
 1122                                 break;
 1123                 }
 1124         } else {
 1125                 first = qremove(q);
 1126                 n = BLEN(first);
 1127         }
 1128 
 1129         /* copy to user space outside of the ilock */
 1130         iunlock(q);
 1131         b = bl2mem(vp, first, len);
 1132         ilock(q);
 1133 
 1134         /* take care of any left over partial block */
 1135         if(b != nil){
 1136                 n -= BLEN(b);
 1137                 if(q->state & Qmsg)
 1138                         freeb(b);
 1139                 else
 1140                         qputback(q, b);
 1141         }
 1142 
 1143         /* restart producer */
 1144         qwakeup_iunlock(q);
 1145 
 1146         poperror();
 1147         qunlock(&q->rlock);
 1148         return n;
 1149 }
 1150 
 1151 static int
 1152 qnotfull(void *a)
 1153 {
 1154         Queue *q = a;
 1155 
 1156         return q->len < q->limit || (q->state & Qclosed);
 1157 }
 1158 
 1159 ulong noblockcnt;
 1160 
 1161 /*
 1162  *  add a block to a queue obeying flow control
 1163  */
 1164 long
 1165 qbwrite(Queue *q, Block *b)
 1166 {
 1167         int n, dowakeup;
 1168         Proc *p;
 1169 
 1170         n = BLEN(b);
 1171 
 1172         if(q->bypass){
 1173                 (*q->bypass)(q->arg, b);
 1174                 return n;
 1175         }
 1176 
 1177         dowakeup = 0;
 1178         qlock(&q->wlock);
 1179         if(waserror()){
 1180                 if(b != nil)
 1181                         freeb(b);
 1182                 qunlock(&q->wlock);
 1183                 nexterror();
 1184         }
 1185 
 1186         ilock(q);
 1187 
 1188         /* give up if the queue is closed */
 1189         if(q->state & Qclosed){
 1190                 iunlock(q);
 1191                 error(q->err);
 1192         }
 1193 
 1194         /* if nonblocking, don't queue over the limit */
 1195         if(q->len >= q->limit){
 1196                 if(q->noblock){
 1197                         iunlock(q);
 1198                         freeb(b);
 1199                         noblockcnt += n;
 1200                         qunlock(&q->wlock);
 1201                         poperror();
 1202                         return n;
 1203                 }
 1204         }
 1205 
 1206         /* queue the block */
 1207         if(q->bfirst)
 1208                 q->blast->next = b;
 1209         else
 1210                 q->bfirst = b;
 1211         q->blast = b;
 1212         b->next = 0;
 1213         q->len += BALLOC(b);
 1214         q->dlen += n;
 1215         QDEBUG checkb(b, "qbwrite");
 1216         b = nil;
 1217 
 1218         /* make sure other end gets awakened */
 1219         if(q->state & Qstarve){
 1220                 q->state &= ~Qstarve;
 1221                 dowakeup = 1;
 1222         }
 1223         iunlock(q);
 1224 
 1225         /*  get output going again */
 1226         if(q->kick && (dowakeup || (q->state&Qkick)))
 1227                 q->kick(q->arg);
 1228 
 1229         /* wakeup anyone consuming at the other end */
 1230         if(dowakeup){
 1231                 p = wakeup(&q->rr);
 1232 
 1233                 /* if we just wokeup a higher priority process, let it run */
 1234                 if(p != nil && p->priority > up->priority)
 1235                         sched();
 1236         }
 1237 
 1238         /*
 1239          *  flow control, wait for queue to get below the limit
 1240          *  before allowing the process to continue and queue
 1241          *  more.  We do this here so that postnote can only
 1242          *  interrupt us after the data has been queued.  This
 1243          *  means that things like 9p flushes and ssl messages
 1244          *  will not be disrupted by software interrupts.
 1245          *
 1246          *  Note - this is moderately dangerous since a process
 1247          *  that keeps getting interrupted and rewriting will
 1248          *  queue infinite crud.
 1249          */
 1250         for(;;){
 1251                 if(q->noblock || qnotfull(q))
 1252                         break;
 1253 
 1254                 ilock(q);
 1255                 q->state |= Qflow;
 1256                 iunlock(q);
 1257                 sleep(&q->wr, qnotfull, q);
 1258         }
 1259         USED(b);
 1260 
 1261         qunlock(&q->wlock);
 1262         poperror();
 1263         return n;
 1264 }
 1265 
 1266 /*
 1267  *  write to a queue.  only Maxatomic bytes at a time is atomic.
 1268  */
 1269 int
 1270 qwrite(Queue *q, void *vp, int len)
 1271 {
 1272         int n, sofar;
 1273         Block *b;
 1274         uchar *p = vp;
 1275 
 1276         QDEBUG if(!islo())
 1277                 print("qwrite hi %#p\n", getcallerpc(&q));
 1278 
 1279         sofar = 0;
 1280         do {
 1281                 n = len-sofar;
 1282                 if(n > Maxatomic)
 1283                         n = Maxatomic;
 1284 
 1285                 b = allocb(n);
 1286                 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
 1287                 if(waserror()){
 1288                         freeb(b);
 1289                         nexterror();
 1290                 }
 1291                 memmove(b->wp, p+sofar, n);
 1292                 poperror();
 1293                 b->wp += n;
 1294 
 1295                 qbwrite(q, b);
 1296 
 1297                 sofar += n;
 1298         } while(sofar < len && (q->state & Qmsg) == 0);
 1299 
 1300         return len;
 1301 }
 1302 
 1303 /*
 1304  *  used by print() to write to a queue.  Since we may be splhi or not in
 1305  *  a process, don't qlock.
 1306  *
 1307  *  this routine merges adjacent blocks if block n+1 will fit into
 1308  *  the free space of block n.
 1309  */
 1310 int
 1311 qiwrite(Queue *q, void *vp, int len)
 1312 {
 1313         int n, sofar, dowakeup;
 1314         Block *b;
 1315         uchar *p = vp;
 1316 
 1317         dowakeup = 0;
 1318 
 1319         sofar = 0;
 1320         do {
 1321                 n = len-sofar;
 1322                 if(n > Maxatomic)
 1323                         n = Maxatomic;
 1324 
 1325                 b = iallocb(n);
 1326                 if(b == nil)
 1327                         break;
 1328                 memmove(b->wp, p+sofar, n);
 1329                 b->wp += n;
 1330 
 1331                 ilock(q);
 1332 
 1333                 /* we use an artificially high limit for kernel prints since anything
 1334                  * over the limit gets dropped
 1335                  */
 1336                 if(q->dlen >= 16*1024){
 1337                         iunlock(q);
 1338                         freeb(b);
 1339                         break;
 1340                 }
 1341 
 1342                 QDEBUG checkb(b, "qiwrite");
 1343                 if(q->bfirst)
 1344                         q->blast->next = b;
 1345                 else
 1346                         q->bfirst = b;
 1347                 q->blast = b;
 1348                 q->len += BALLOC(b);
 1349                 q->dlen += n;
 1350 
 1351                 if(q->state & Qstarve){
 1352                         q->state &= ~Qstarve;
 1353                         dowakeup = 1;
 1354                 }
 1355 
 1356                 iunlock(q);
 1357 
 1358                 if(dowakeup){
 1359                         if(q->kick)
 1360                                 q->kick(q->arg);
 1361                         wakeup(&q->rr);
 1362                 }
 1363 
 1364                 sofar += n;
 1365         } while(sofar < len && (q->state & Qmsg) == 0);
 1366 
 1367         return sofar;
 1368 }
 1369 
 1370 /*
 1371  *  be extremely careful when calling this,
 1372  *  as there is no reference accounting
 1373  */
 1374 void
 1375 qfree(Queue *q)
 1376 {
 1377         qclose(q);
 1378         free(q);
 1379 }
 1380 
 1381 /*
 1382  *  Mark a queue as closed.  No further IO is permitted.
 1383  *  All blocks are released.
 1384  */
 1385 void
 1386 qclose(Queue *q)
 1387 {
 1388         Block *bfirst;
 1389 
 1390         if(q == nil)
 1391                 return;
 1392 
 1393         /* mark it */
 1394         ilock(q);
 1395         q->state |= Qclosed;
 1396         q->state &= ~(Qflow|Qstarve);
 1397         strcpy(q->err, Ehungup);
 1398         bfirst = q->bfirst;
 1399         q->bfirst = 0;
 1400         q->len = 0;
 1401         q->dlen = 0;
 1402         q->noblock = 0;
 1403         iunlock(q);
 1404 
 1405         /* free queued blocks */
 1406         freeblist(bfirst);
 1407 
 1408         /* wake up readers/writers */
 1409         wakeup(&q->rr);
 1410         wakeup(&q->wr);
 1411 }
 1412 
 1413 /*
 1414  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
 1415  *  blocks.
 1416  */
 1417 void
 1418 qhangup(Queue *q, char *msg)
 1419 {
 1420         /* mark it */
 1421         ilock(q);
 1422         q->state |= Qclosed;
 1423         if(msg == 0 || *msg == 0)
 1424                 strcpy(q->err, Ehungup);
 1425         else
 1426                 strncpy(q->err, msg, ERRMAX-1);
 1427         iunlock(q);
 1428 
 1429         /* wake up readers/writers */
 1430         wakeup(&q->rr);
 1431         wakeup(&q->wr);
 1432 }
 1433 
 1434 /*
 1435  *  return non-zero if the q is hungup
 1436  */
 1437 int
 1438 qisclosed(Queue *q)
 1439 {
 1440         return q->state & Qclosed;
 1441 }
 1442 
 1443 /*
 1444  *  mark a queue as no longer hung up
 1445  */
 1446 void
 1447 qreopen(Queue *q)
 1448 {
 1449         ilock(q);
 1450         q->state &= ~Qclosed;
 1451         q->state |= Qstarve;
 1452         q->eof = 0;
 1453         q->limit = q->inilim;
 1454         iunlock(q);
 1455 }
 1456 
 1457 /*
 1458  *  return bytes queued
 1459  */
 1460 int
 1461 qlen(Queue *q)
 1462 {
 1463         return q->dlen;
 1464 }
 1465 
 1466 /*
 1467  * return space remaining before flow control
 1468  */
 1469 int
 1470 qwindow(Queue *q)
 1471 {
 1472         int l;
 1473 
 1474         l = q->limit - q->len;
 1475         if(l < 0)
 1476                 l = 0;
 1477         return l;
 1478 }
 1479 
 1480 /*
 1481  *  return true if we can read without blocking
 1482  */
 1483 int
 1484 qcanread(Queue *q)
 1485 {
 1486         return q->bfirst!=0;
 1487 }
 1488 
 1489 /*
 1490  *  change queue limit
 1491  */
 1492 void
 1493 qsetlimit(Queue *q, int limit)
 1494 {
 1495         q->limit = limit;
 1496 }
 1497 
 1498 /*
 1499  *  set blocking/nonblocking
 1500  */
 1501 void
 1502 qnoblock(Queue *q, int onoff)
 1503 {
 1504         q->noblock = onoff;
 1505 }
 1506 
 1507 /*
 1508  *  flush the output queue
 1509  */
 1510 void
 1511 qflush(Queue *q)
 1512 {
 1513         Block *bfirst;
 1514 
 1515         /* mark it */
 1516         ilock(q);
 1517         bfirst = q->bfirst;
 1518         q->bfirst = 0;
 1519         q->len = 0;
 1520         q->dlen = 0;
 1521         iunlock(q);
 1522 
 1523         /* free queued blocks */
 1524         freeblist(bfirst);
 1525 
 1526         /* wake up readers/writers */
 1527         wakeup(&q->wr);
 1528 }
 1529 
 1530 int
 1531 qfull(Queue *q)
 1532 {
 1533         return q->state & Qflow;
 1534 }
 1535 
 1536 int
 1537 qstate(Queue *q)
 1538 {
 1539         return q->state;
 1540 }

Cache object: 7194e5f191f93ddbd03aebf112216268


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.