FreeBSD/Linux Kernel Cross Reference
sys/port/qio.c
1 #include "u.h"
2 #include "../port/lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "../port/error.h"
7
8 static ulong padblockcnt;
9 static ulong concatblockcnt;
10 static ulong pullupblockcnt;
11 static ulong copyblockcnt;
12 static ulong consumecnt;
13 static ulong producecnt;
14 static ulong qcopycnt;
15
16 static int debugging;
17
18 #define QDEBUG if(0)
19
20 /*
21 * IO queues
22 */
23 typedef struct Queue Queue;
24
25 struct Queue
26 {
27 Lock;
28
29 Block* bfirst; /* buffer */
30 Block* blast;
31
32 int len; /* bytes allocated to queue */
33 int dlen; /* data bytes in queue */
34 int limit; /* max bytes in queue */
35 int inilim; /* initial limit */
36 int state;
37 int noblock; /* true if writes return immediately when q full */
38 int eof; /* number of eofs read by user */
39
40 void (*kick)(void*); /* restart output */
41 void (*bypass)(void*, Block*); /* bypass queue altogether */
42 void* arg; /* argument to kick */
43
44 QLock rlock; /* mutex for reading processes */
45 Rendez rr; /* process waiting to read */
46 QLock wlock; /* mutex for writing processes */
47 Rendez wr; /* process waiting to write */
48
49 char err[ERRMAX];
50 };
51
52 enum
53 {
54 Maxatomic = 64*1024,
55 };
56
57 uint qiomaxatomic = Maxatomic;
58
59 void
60 ixsummary(void)
61 {
62 debugging ^= 1;
63 iallocsummary();
64 print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
65 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
66 print("consume %lud, produce %lud, qcopy %lud\n",
67 consumecnt, producecnt, qcopycnt);
68 }
69
70 /*
71 * free a list of blocks
72 */
73 void
74 freeblist(Block *b)
75 {
76 Block *next;
77
78 for(; b != 0; b = next){
79 next = b->next;
80 if(b->ref == 1)
81 b->next = nil;
82 freeb(b);
83 }
84 }
85
86 /*
87 * pad a block to the front (or the back if size is negative)
88 */
89 Block*
90 padblock(Block *bp, int size)
91 {
92 int n;
93 Block *nbp;
94
95 QDEBUG checkb(bp, "padblock 1");
96 if(size >= 0){
97 if(bp->rp - bp->base >= size){
98 bp->rp -= size;
99 return bp;
100 }
101
102 if(bp->next)
103 panic("padblock %#p", getcallerpc(&bp));
104 n = BLEN(bp);
105 padblockcnt++;
106 nbp = allocb(size+n);
107 nbp->rp += size;
108 nbp->wp = nbp->rp;
109 memmove(nbp->wp, bp->rp, n);
110 nbp->wp += n;
111 freeb(bp);
112 nbp->rp -= size;
113 } else {
114 size = -size;
115
116 if(bp->next)
117 panic("padblock %#p", getcallerpc(&bp));
118
119 if(bp->lim - bp->wp >= size)
120 return bp;
121
122 n = BLEN(bp);
123 padblockcnt++;
124 nbp = allocb(size+n);
125 memmove(nbp->wp, bp->rp, n);
126 nbp->wp += n;
127 freeb(bp);
128 }
129 QDEBUG checkb(nbp, "padblock 1");
130 return nbp;
131 }
132
133 /*
134 * return count of bytes in a string of blocks
135 */
136 int
137 blocklen(Block *bp)
138 {
139 int len;
140
141 len = 0;
142 while(bp) {
143 len += BLEN(bp);
144 bp = bp->next;
145 }
146 return len;
147 }
148
149 /*
150 * return count of space in blocks
151 */
152 int
153 blockalloclen(Block *bp)
154 {
155 int len;
156
157 len = 0;
158 while(bp) {
159 len += BALLOC(bp);
160 bp = bp->next;
161 }
162 return len;
163 }
164
165 /*
166 * copy the string of blocks into
167 * a single block and free the string
168 */
169 Block*
170 concatblock(Block *bp)
171 {
172 int len;
173 Block *nb, *f;
174
175 if(bp->next == 0)
176 return bp;
177
178 nb = allocb(blocklen(bp));
179 for(f = bp; f; f = f->next) {
180 len = BLEN(f);
181 memmove(nb->wp, f->rp, len);
182 nb->wp += len;
183 }
184 concatblockcnt += BLEN(nb);
185 freeblist(bp);
186 QDEBUG checkb(nb, "concatblock 1");
187 return nb;
188 }
189
190 /*
191 * make sure the first block has at least n bytes
192 */
193 Block*
194 pullupblock(Block *bp, int n)
195 {
196 int i;
197 Block *nbp;
198
199 /*
200 * this should almost always be true, it's
201 * just to avoid every caller checking.
202 */
203 if(BLEN(bp) >= n)
204 return bp;
205
206 /*
207 * if not enough room in the first block,
208 * add another to the front of the list.
209 */
210 if(bp->lim - bp->rp < n){
211 nbp = allocb(n);
212 nbp->next = bp;
213 bp = nbp;
214 }
215
216 /*
217 * copy bytes from the trailing blocks into the first
218 */
219 n -= BLEN(bp);
220 while(nbp = bp->next){
221 i = BLEN(nbp);
222 if(i > n) {
223 memmove(bp->wp, nbp->rp, n);
224 pullupblockcnt++;
225 bp->wp += n;
226 nbp->rp += n;
227 QDEBUG checkb(bp, "pullupblock 1");
228 return bp;
229 } else {
230 /* shouldn't happen but why crash if it does */
231 if(i < 0){
232 print("pullup negative length packet, called from %#p\n",
233 getcallerpc(&bp));
234 i = 0;
235 }
236 memmove(bp->wp, nbp->rp, i);
237 pullupblockcnt++;
238 bp->wp += i;
239 bp->next = nbp->next;
240 nbp->next = 0;
241 freeb(nbp);
242 n -= i;
243 if(n == 0){
244 QDEBUG checkb(bp, "pullupblock 2");
245 return bp;
246 }
247 }
248 }
249 freeb(bp);
250 return 0;
251 }
252
253 /*
254 * make sure the first block has at least n bytes
255 */
256 Block*
257 pullupqueue(Queue *q, int n)
258 {
259 Block *b;
260
261 if(BLEN(q->bfirst) >= n)
262 return q->bfirst;
263 q->bfirst = pullupblock(q->bfirst, n);
264 for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
265 ;
266 q->blast = b;
267 return q->bfirst;
268 }
269
270 /*
271 * trim to len bytes starting at offset
272 */
273 Block *
274 trimblock(Block *bp, int offset, int len)
275 {
276 ulong l;
277 Block *nb, *startb;
278
279 QDEBUG checkb(bp, "trimblock 1");
280 if(blocklen(bp) < offset+len) {
281 freeblist(bp);
282 return nil;
283 }
284
285 while((l = BLEN(bp)) < offset) {
286 offset -= l;
287 nb = bp->next;
288 bp->next = nil;
289 freeb(bp);
290 bp = nb;
291 }
292
293 startb = bp;
294 bp->rp += offset;
295
296 while((l = BLEN(bp)) < len) {
297 len -= l;
298 bp = bp->next;
299 }
300
301 bp->wp -= (BLEN(bp) - len);
302
303 if(bp->next) {
304 freeblist(bp->next);
305 bp->next = nil;
306 }
307
308 return startb;
309 }
310
311 /*
312 * copy 'count' bytes into a new block
313 */
314 Block*
315 copyblock(Block *bp, int count)
316 {
317 int l;
318 Block *nbp;
319
320 QDEBUG checkb(bp, "copyblock 0");
321 nbp = allocb(count);
322 for(; count > 0 && bp != 0; bp = bp->next){
323 l = BLEN(bp);
324 if(l > count)
325 l = count;
326 memmove(nbp->wp, bp->rp, l);
327 nbp->wp += l;
328 count -= l;
329 }
330 if(count > 0){
331 memset(nbp->wp, 0, count);
332 nbp->wp += count;
333 }
334 copyblockcnt++;
335 QDEBUG checkb(nbp, "copyblock 1");
336
337 return nbp;
338 }
339
340 Block*
341 adjustblock(Block* bp, int len)
342 {
343 int n;
344 Block *nbp;
345
346 if(len < 0){
347 freeb(bp);
348 return nil;
349 }
350
351 if(bp->rp+len > bp->lim){
352 nbp = copyblock(bp, len);
353 freeblist(bp);
354 QDEBUG checkb(nbp, "adjustblock 1");
355
356 return nbp;
357 }
358
359 n = BLEN(bp);
360 if(len > n)
361 memset(bp->wp, 0, len-n);
362 bp->wp = bp->rp+len;
363 QDEBUG checkb(bp, "adjustblock 2");
364
365 return bp;
366 }
367
368
369 /*
370 * throw away up to count bytes from a
371 * list of blocks. Return count of bytes
372 * thrown away.
373 */
374 int
375 pullblock(Block **bph, int count)
376 {
377 Block *bp;
378 int n, bytes;
379
380 bytes = 0;
381 if(bph == nil)
382 return 0;
383
384 while(*bph != nil && count != 0) {
385 bp = *bph;
386 n = BLEN(bp);
387 if(count < n)
388 n = count;
389 bytes += n;
390 count -= n;
391 bp->rp += n;
392 QDEBUG checkb(bp, "pullblock ");
393 if(BLEN(bp) == 0) {
394 *bph = bp->next;
395 bp->next = nil;
396 freeb(bp);
397 }
398 }
399 return bytes;
400 }
401
402 /*
403 * get next block from a queue, return null if nothing there
404 */
405 Block*
406 qget(Queue *q)
407 {
408 int dowakeup;
409 Block *b;
410
411 /* sync with qwrite */
412 ilock(q);
413
414 b = q->bfirst;
415 if(b == nil){
416 q->state |= Qstarve;
417 iunlock(q);
418 return nil;
419 }
420 q->bfirst = b->next;
421 b->next = 0;
422 q->len -= BALLOC(b);
423 q->dlen -= BLEN(b);
424 QDEBUG checkb(b, "qget");
425
426 /* if writer flow controlled, restart */
427 if((q->state & Qflow) && q->len < q->limit/2){
428 q->state &= ~Qflow;
429 dowakeup = 1;
430 } else
431 dowakeup = 0;
432
433 iunlock(q);
434
435 if(dowakeup)
436 wakeup(&q->wr);
437
438 return b;
439 }
440
441 /*
442 * throw away the next 'len' bytes in the queue
443 */
444 int
445 qdiscard(Queue *q, int len)
446 {
447 Block *b;
448 int dowakeup, n, sofar;
449
450 ilock(q);
451 for(sofar = 0; sofar < len; sofar += n){
452 b = q->bfirst;
453 if(b == nil)
454 break;
455 QDEBUG checkb(b, "qdiscard");
456 n = BLEN(b);
457 if(n <= len - sofar){
458 q->bfirst = b->next;
459 b->next = 0;
460 q->len -= BALLOC(b);
461 q->dlen -= BLEN(b);
462 freeb(b);
463 } else {
464 n = len - sofar;
465 b->rp += n;
466 q->dlen -= n;
467 }
468 }
469
470 /*
471 * if writer flow controlled, restart
472 *
473 * This used to be
474 * q->len < q->limit/2
475 * but it slows down tcp too much for certain write sizes.
476 * I really don't understand it completely. It may be
477 * due to the queue draining so fast that the transmission
478 * stalls waiting for the app to produce more data. - presotto
479 */
480 if((q->state & Qflow) && q->len < q->limit){
481 q->state &= ~Qflow;
482 dowakeup = 1;
483 } else
484 dowakeup = 0;
485
486 iunlock(q);
487
488 if(dowakeup)
489 wakeup(&q->wr);
490
491 return sofar;
492 }
493
494 /*
495 * Interrupt level copy out of a queue, return # bytes copied.
496 */
497 int
498 qconsume(Queue *q, void *vp, int len)
499 {
500 Block *b;
501 int n, dowakeup;
502 uchar *p = vp;
503 Block *tofree = nil;
504
505 /* sync with qwrite */
506 ilock(q);
507
508 for(;;) {
509 b = q->bfirst;
510 if(b == 0){
511 q->state |= Qstarve;
512 iunlock(q);
513 return -1;
514 }
515 QDEBUG checkb(b, "qconsume 1");
516
517 n = BLEN(b);
518 if(n > 0)
519 break;
520 q->bfirst = b->next;
521 q->len -= BALLOC(b);
522
523 /* remember to free this */
524 b->next = tofree;
525 tofree = b;
526 };
527
528 if(n < len)
529 len = n;
530 memmove(p, b->rp, len);
531 consumecnt += n;
532 b->rp += len;
533 q->dlen -= len;
534
535 /* discard the block if we're done with it */
536 if((q->state & Qmsg) || len == n){
537 q->bfirst = b->next;
538 b->next = 0;
539 q->len -= BALLOC(b);
540 q->dlen -= BLEN(b);
541
542 /* remember to free this */
543 b->next = tofree;
544 tofree = b;
545 }
546
547 /* if writer flow controlled, restart */
548 if((q->state & Qflow) && q->len < q->limit/2){
549 q->state &= ~Qflow;
550 dowakeup = 1;
551 } else
552 dowakeup = 0;
553
554 iunlock(q);
555
556 if(dowakeup)
557 wakeup(&q->wr);
558
559 if(tofree != nil)
560 freeblist(tofree);
561
562 return len;
563 }
564
565 int
566 qpass(Queue *q, Block *b)
567 {
568 int dlen, len, dowakeup;
569
570 /* sync with qread */
571 dowakeup = 0;
572 ilock(q);
573 if(q->len >= q->limit){
574 freeblist(b);
575 iunlock(q);
576 return -1;
577 }
578 if(q->state & Qclosed){
579 len = BALLOC(b);
580 freeblist(b);
581 iunlock(q);
582 return len;
583 }
584
585 /* add buffer to queue */
586 if(q->bfirst)
587 q->blast->next = b;
588 else
589 q->bfirst = b;
590 len = BALLOC(b);
591 dlen = BLEN(b);
592 QDEBUG checkb(b, "qpass");
593 while(b->next){
594 b = b->next;
595 QDEBUG checkb(b, "qpass");
596 len += BALLOC(b);
597 dlen += BLEN(b);
598 }
599 q->blast = b;
600 q->len += len;
601 q->dlen += dlen;
602
603 if(q->len >= q->limit/2)
604 q->state |= Qflow;
605
606 if(q->state & Qstarve){
607 q->state &= ~Qstarve;
608 dowakeup = 1;
609 }
610 iunlock(q);
611
612 if(dowakeup)
613 wakeup(&q->rr);
614
615 return len;
616 }
617
618 int
619 qpassnolim(Queue *q, Block *b)
620 {
621 int dlen, len, dowakeup;
622
623 /* sync with qread */
624 dowakeup = 0;
625 ilock(q);
626
627 if(q->state & Qclosed){
628 freeblist(b);
629 iunlock(q);
630 return BALLOC(b);
631 }
632
633 /* add buffer to queue */
634 if(q->bfirst)
635 q->blast->next = b;
636 else
637 q->bfirst = b;
638 len = BALLOC(b);
639 dlen = BLEN(b);
640 QDEBUG checkb(b, "qpass");
641 while(b->next){
642 b = b->next;
643 QDEBUG checkb(b, "qpass");
644 len += BALLOC(b);
645 dlen += BLEN(b);
646 }
647 q->blast = b;
648 q->len += len;
649 q->dlen += dlen;
650
651 if(q->len >= q->limit/2)
652 q->state |= Qflow;
653
654 if(q->state & Qstarve){
655 q->state &= ~Qstarve;
656 dowakeup = 1;
657 }
658 iunlock(q);
659
660 if(dowakeup)
661 wakeup(&q->rr);
662
663 return len;
664 }
665
666 /*
667 * if the allocated space is way out of line with the used
668 * space, reallocate to a smaller block
669 */
670 Block*
671 packblock(Block *bp)
672 {
673 Block **l, *nbp;
674 int n;
675
676 for(l = &bp; *l; l = &(*l)->next){
677 nbp = *l;
678 n = BLEN(nbp);
679 if((n<<2) < BALLOC(nbp)){
680 *l = allocb(n);
681 memmove((*l)->wp, nbp->rp, n);
682 (*l)->wp += n;
683 (*l)->next = nbp->next;
684 freeb(nbp);
685 }
686 }
687
688 return bp;
689 }
690
691 int
692 qproduce(Queue *q, void *vp, int len)
693 {
694 Block *b;
695 int dowakeup;
696 uchar *p = vp;
697
698 /* sync with qread */
699 dowakeup = 0;
700 ilock(q);
701
702 /* no waiting receivers, room in buffer? */
703 if(q->len >= q->limit){
704 q->state |= Qflow;
705 iunlock(q);
706 return -1;
707 }
708
709 /* save in buffer */
710 b = iallocb(len);
711 if(b == 0){
712 iunlock(q);
713 return 0;
714 }
715 memmove(b->wp, p, len);
716 producecnt += len;
717 b->wp += len;
718 if(q->bfirst)
719 q->blast->next = b;
720 else
721 q->bfirst = b;
722 q->blast = b;
723 /* b->next = 0; done by iallocb() */
724 q->len += BALLOC(b);
725 q->dlen += BLEN(b);
726 QDEBUG checkb(b, "qproduce");
727
728 if(q->state & Qstarve){
729 q->state &= ~Qstarve;
730 dowakeup = 1;
731 }
732
733 if(q->len >= q->limit)
734 q->state |= Qflow;
735 iunlock(q);
736
737 if(dowakeup)
738 wakeup(&q->rr);
739
740 return len;
741 }
742
743 /*
744 * copy from offset in the queue
745 */
746 Block*
747 qcopy(Queue *q, int len, ulong offset)
748 {
749 int sofar;
750 int n;
751 Block *b, *nb;
752 uchar *p;
753
754 nb = allocb(len);
755
756 ilock(q);
757
758 /* go to offset */
759 b = q->bfirst;
760 for(sofar = 0; ; sofar += n){
761 if(b == nil){
762 iunlock(q);
763 return nb;
764 }
765 n = BLEN(b);
766 if(sofar + n > offset){
767 p = b->rp + offset - sofar;
768 n -= offset - sofar;
769 break;
770 }
771 QDEBUG checkb(b, "qcopy");
772 b = b->next;
773 }
774
775 /* copy bytes from there */
776 for(sofar = 0; sofar < len;){
777 if(n > len - sofar)
778 n = len - sofar;
779 memmove(nb->wp, p, n);
780 qcopycnt += n;
781 sofar += n;
782 nb->wp += n;
783 b = b->next;
784 if(b == nil)
785 break;
786 n = BLEN(b);
787 p = b->rp;
788 }
789 iunlock(q);
790
791 return nb;
792 }
793
794 /*
795 * called by non-interrupt code
796 */
797 Queue*
798 qopen(int limit, int msg, void (*kick)(void*), void *arg)
799 {
800 Queue *q;
801
802 q = malloc(sizeof(Queue));
803 if(q == 0)
804 return 0;
805
806 q->limit = q->inilim = limit;
807 q->kick = kick;
808 q->arg = arg;
809 q->state = msg;
810
811 q->state |= Qstarve;
812 q->eof = 0;
813 q->noblock = 0;
814
815 return q;
816 }
817
818 /* open a queue to be bypassed */
819 Queue*
820 qbypass(void (*bypass)(void*, Block*), void *arg)
821 {
822 Queue *q;
823
824 q = malloc(sizeof(Queue));
825 if(q == 0)
826 return 0;
827
828 q->limit = 0;
829 q->arg = arg;
830 q->bypass = bypass;
831 q->state = 0;
832
833 return q;
834 }
835
836 static int
837 notempty(void *a)
838 {
839 Queue *q = a;
840
841 return (q->state & Qclosed) || q->bfirst != 0;
842 }
843
844 /*
845 * wait for the queue to be non-empty or closed.
846 * called with q ilocked.
847 */
848 static int
849 qwait(Queue *q)
850 {
851 /* wait for data */
852 for(;;){
853 if(q->bfirst != nil)
854 break;
855
856 if(q->state & Qclosed){
857 if(++q->eof > 3)
858 return -1;
859 if(*q->err && strcmp(q->err, Ehungup) != 0)
860 return -1;
861 return 0;
862 }
863
864 q->state |= Qstarve; /* flag requesting producer to wake me */
865 iunlock(q);
866 sleep(&q->rr, notempty, q);
867 ilock(q);
868 }
869 return 1;
870 }
871
872 /*
873 * add a block list to a queue
874 */
875 void
876 qaddlist(Queue *q, Block *b)
877 {
878 /* queue the block */
879 if(q->bfirst)
880 q->blast->next = b;
881 else
882 q->bfirst = b;
883 q->len += blockalloclen(b);
884 q->dlen += blocklen(b);
885 while(b->next)
886 b = b->next;
887 q->blast = b;
888 }
889
890 /*
891 * called with q ilocked
892 */
893 Block*
894 qremove(Queue *q)
895 {
896 Block *b;
897
898 b = q->bfirst;
899 if(b == nil)
900 return nil;
901 q->bfirst = b->next;
902 b->next = nil;
903 q->dlen -= BLEN(b);
904 q->len -= BALLOC(b);
905 QDEBUG checkb(b, "qremove");
906 return b;
907 }
908
909 /*
910 * copy the contents of a string of blocks into
911 * memory. emptied blocks are freed. return
912 * pointer to first unconsumed block.
913 */
914 Block*
915 bl2mem(uchar *p, Block *b, int n)
916 {
917 int i;
918 Block *next;
919
920 for(; b != nil; b = next){
921 i = BLEN(b);
922 if(i > n){
923 memmove(p, b->rp, n);
924 b->rp += n;
925 return b;
926 }
927 memmove(p, b->rp, i);
928 n -= i;
929 p += i;
930 b->rp += i;
931 next = b->next;
932 freeb(b);
933 }
934 return nil;
935 }
936
937 /*
938 * copy the contents of memory into a string of blocks.
939 * return nil on error.
940 */
941 Block*
942 mem2bl(uchar *p, int len)
943 {
944 int n;
945 Block *b, *first, **l;
946
947 first = nil;
948 l = &first;
949 if(waserror()){
950 freeblist(first);
951 nexterror();
952 }
953 do {
954 n = len;
955 if(n > Maxatomic)
956 n = Maxatomic;
957
958 *l = b = allocb(n);
959 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
960 memmove(b->wp, p, n);
961 b->wp += n;
962 p += n;
963 len -= n;
964 l = &b->next;
965 } while(len > 0);
966 poperror();
967
968 return first;
969 }
970
971 /*
972 * put a block back to the front of the queue
973 * called with q ilocked
974 */
975 void
976 qputback(Queue *q, Block *b)
977 {
978 b->next = q->bfirst;
979 if(q->bfirst == nil)
980 q->blast = b;
981 q->bfirst = b;
982 q->len += BALLOC(b);
983 q->dlen += BLEN(b);
984 }
985
986 /*
987 * flow control, get producer going again
988 * called with q ilocked
989 */
990 static void
991 qwakeup_iunlock(Queue *q)
992 {
993 int dowakeup = 0;
994
995 /* if writer flow controlled, restart */
996 if((q->state & Qflow) && q->len < q->limit/2){
997 q->state &= ~Qflow;
998 dowakeup = 1;
999 }
1000
1001 iunlock(q);
1002
1003 /* wakeup flow controlled writers */
1004 if(dowakeup){
1005 if(q->kick)
1006 q->kick(q->arg);
1007 wakeup(&q->wr);
1008 }
1009 }
1010
1011 /*
1012 * get next block from a queue (up to a limit)
1013 */
1014 Block*
1015 qbread(Queue *q, int len)
1016 {
1017 Block *b, *nb;
1018 int n;
1019
1020 qlock(&q->rlock);
1021 if(waserror()){
1022 qunlock(&q->rlock);
1023 nexterror();
1024 }
1025
1026 ilock(q);
1027 switch(qwait(q)){
1028 case 0:
1029 /* queue closed */
1030 iunlock(q);
1031 qunlock(&q->rlock);
1032 poperror();
1033 return nil;
1034 case -1:
1035 /* multiple reads on a closed queue */
1036 iunlock(q);
1037 error(q->err);
1038 }
1039
1040 /* if we get here, there's at least one block in the queue */
1041 b = qremove(q);
1042 n = BLEN(b);
1043
1044 /* split block if it's too big and this is not a message queue */
1045 nb = b;
1046 if(n > len){
1047 if((q->state&Qmsg) == 0){
1048 n -= len;
1049 b = allocb(n);
1050 memmove(b->wp, nb->rp+len, n);
1051 b->wp += n;
1052 qputback(q, b);
1053 }
1054 nb->wp = nb->rp + len;
1055 }
1056
1057 /* restart producer */
1058 qwakeup_iunlock(q);
1059
1060 poperror();
1061 qunlock(&q->rlock);
1062 return nb;
1063 }
1064
1065 /*
1066 * read a queue. if no data is queued, post a Block
1067 * and wait on its Rendez.
1068 */
1069 long
1070 qread(Queue *q, void *vp, int len)
1071 {
1072 Block *b, *first, **l;
1073 int m, n;
1074
1075 qlock(&q->rlock);
1076 if(waserror()){
1077 qunlock(&q->rlock);
1078 nexterror();
1079 }
1080
1081 ilock(q);
1082 again:
1083 switch(qwait(q)){
1084 case 0:
1085 /* queue closed */
1086 iunlock(q);
1087 qunlock(&q->rlock);
1088 poperror();
1089 return 0;
1090 case -1:
1091 /* multiple reads on a closed queue */
1092 iunlock(q);
1093 error(q->err);
1094 }
1095
1096 /* if we get here, there's at least one block in the queue */
1097 if(q->state & Qcoalesce){
1098 /* when coalescing, 0 length blocks just go away */
1099 b = q->bfirst;
1100 if(BLEN(b) <= 0){
1101 freeb(qremove(q));
1102 goto again;
1103 }
1104
1105 /* grab the first block plus as many
1106 * following blocks as will completely
1107 * fit in the read.
1108 */
1109 n = 0;
1110 l = &first;
1111 m = BLEN(b);
1112 for(;;) {
1113 *l = qremove(q);
1114 l = &b->next;
1115 n += m;
1116
1117 b = q->bfirst;
1118 if(b == nil)
1119 break;
1120 m = BLEN(b);
1121 if(n+m > len)
1122 break;
1123 }
1124 } else {
1125 first = qremove(q);
1126 n = BLEN(first);
1127 }
1128
1129 /* copy to user space outside of the ilock */
1130 iunlock(q);
1131 b = bl2mem(vp, first, len);
1132 ilock(q);
1133
1134 /* take care of any left over partial block */
1135 if(b != nil){
1136 n -= BLEN(b);
1137 if(q->state & Qmsg)
1138 freeb(b);
1139 else
1140 qputback(q, b);
1141 }
1142
1143 /* restart producer */
1144 qwakeup_iunlock(q);
1145
1146 poperror();
1147 qunlock(&q->rlock);
1148 return n;
1149 }
1150
1151 static int
1152 qnotfull(void *a)
1153 {
1154 Queue *q = a;
1155
1156 return q->len < q->limit || (q->state & Qclosed);
1157 }
1158
1159 ulong noblockcnt;
1160
1161 /*
1162 * add a block to a queue obeying flow control
1163 */
1164 long
1165 qbwrite(Queue *q, Block *b)
1166 {
1167 int n, dowakeup;
1168 Proc *p;
1169
1170 n = BLEN(b);
1171
1172 if(q->bypass){
1173 (*q->bypass)(q->arg, b);
1174 return n;
1175 }
1176
1177 dowakeup = 0;
1178 qlock(&q->wlock);
1179 if(waserror()){
1180 if(b != nil)
1181 freeb(b);
1182 qunlock(&q->wlock);
1183 nexterror();
1184 }
1185
1186 ilock(q);
1187
1188 /* give up if the queue is closed */
1189 if(q->state & Qclosed){
1190 iunlock(q);
1191 error(q->err);
1192 }
1193
1194 /* if nonblocking, don't queue over the limit */
1195 if(q->len >= q->limit){
1196 if(q->noblock){
1197 iunlock(q);
1198 freeb(b);
1199 noblockcnt += n;
1200 qunlock(&q->wlock);
1201 poperror();
1202 return n;
1203 }
1204 }
1205
1206 /* queue the block */
1207 if(q->bfirst)
1208 q->blast->next = b;
1209 else
1210 q->bfirst = b;
1211 q->blast = b;
1212 b->next = 0;
1213 q->len += BALLOC(b);
1214 q->dlen += n;
1215 QDEBUG checkb(b, "qbwrite");
1216 b = nil;
1217
1218 /* make sure other end gets awakened */
1219 if(q->state & Qstarve){
1220 q->state &= ~Qstarve;
1221 dowakeup = 1;
1222 }
1223 iunlock(q);
1224
1225 /* get output going again */
1226 if(q->kick && (dowakeup || (q->state&Qkick)))
1227 q->kick(q->arg);
1228
1229 /* wakeup anyone consuming at the other end */
1230 if(dowakeup){
1231 p = wakeup(&q->rr);
1232
1233 /* if we just wokeup a higher priority process, let it run */
1234 if(p != nil && p->priority > up->priority)
1235 sched();
1236 }
1237
1238 /*
1239 * flow control, wait for queue to get below the limit
1240 * before allowing the process to continue and queue
1241 * more. We do this here so that postnote can only
1242 * interrupt us after the data has been queued. This
1243 * means that things like 9p flushes and ssl messages
1244 * will not be disrupted by software interrupts.
1245 *
1246 * Note - this is moderately dangerous since a process
1247 * that keeps getting interrupted and rewriting will
1248 * queue infinite crud.
1249 */
1250 for(;;){
1251 if(q->noblock || qnotfull(q))
1252 break;
1253
1254 ilock(q);
1255 q->state |= Qflow;
1256 iunlock(q);
1257 sleep(&q->wr, qnotfull, q);
1258 }
1259 USED(b);
1260
1261 qunlock(&q->wlock);
1262 poperror();
1263 return n;
1264 }
1265
1266 /*
1267 * write to a queue. only Maxatomic bytes at a time is atomic.
1268 */
1269 int
1270 qwrite(Queue *q, void *vp, int len)
1271 {
1272 int n, sofar;
1273 Block *b;
1274 uchar *p = vp;
1275
1276 QDEBUG if(!islo())
1277 print("qwrite hi %#p\n", getcallerpc(&q));
1278
1279 sofar = 0;
1280 do {
1281 n = len-sofar;
1282 if(n > Maxatomic)
1283 n = Maxatomic;
1284
1285 b = allocb(n);
1286 setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
1287 if(waserror()){
1288 freeb(b);
1289 nexterror();
1290 }
1291 memmove(b->wp, p+sofar, n);
1292 poperror();
1293 b->wp += n;
1294
1295 qbwrite(q, b);
1296
1297 sofar += n;
1298 } while(sofar < len && (q->state & Qmsg) == 0);
1299
1300 return len;
1301 }
1302
1303 /*
1304 * used by print() to write to a queue. Since we may be splhi or not in
1305 * a process, don't qlock.
1306 *
1307 * this routine merges adjacent blocks if block n+1 will fit into
1308 * the free space of block n.
1309 */
1310 int
1311 qiwrite(Queue *q, void *vp, int len)
1312 {
1313 int n, sofar, dowakeup;
1314 Block *b;
1315 uchar *p = vp;
1316
1317 dowakeup = 0;
1318
1319 sofar = 0;
1320 do {
1321 n = len-sofar;
1322 if(n > Maxatomic)
1323 n = Maxatomic;
1324
1325 b = iallocb(n);
1326 if(b == nil)
1327 break;
1328 memmove(b->wp, p+sofar, n);
1329 b->wp += n;
1330
1331 ilock(q);
1332
1333 /* we use an artificially high limit for kernel prints since anything
1334 * over the limit gets dropped
1335 */
1336 if(q->dlen >= 16*1024){
1337 iunlock(q);
1338 freeb(b);
1339 break;
1340 }
1341
1342 QDEBUG checkb(b, "qiwrite");
1343 if(q->bfirst)
1344 q->blast->next = b;
1345 else
1346 q->bfirst = b;
1347 q->blast = b;
1348 q->len += BALLOC(b);
1349 q->dlen += n;
1350
1351 if(q->state & Qstarve){
1352 q->state &= ~Qstarve;
1353 dowakeup = 1;
1354 }
1355
1356 iunlock(q);
1357
1358 if(dowakeup){
1359 if(q->kick)
1360 q->kick(q->arg);
1361 wakeup(&q->rr);
1362 }
1363
1364 sofar += n;
1365 } while(sofar < len && (q->state & Qmsg) == 0);
1366
1367 return sofar;
1368 }
1369
1370 /*
1371 * be extremely careful when calling this,
1372 * as there is no reference accounting
1373 */
1374 void
1375 qfree(Queue *q)
1376 {
1377 qclose(q);
1378 free(q);
1379 }
1380
1381 /*
1382 * Mark a queue as closed. No further IO is permitted.
1383 * All blocks are released.
1384 */
1385 void
1386 qclose(Queue *q)
1387 {
1388 Block *bfirst;
1389
1390 if(q == nil)
1391 return;
1392
1393 /* mark it */
1394 ilock(q);
1395 q->state |= Qclosed;
1396 q->state &= ~(Qflow|Qstarve);
1397 strcpy(q->err, Ehungup);
1398 bfirst = q->bfirst;
1399 q->bfirst = 0;
1400 q->len = 0;
1401 q->dlen = 0;
1402 q->noblock = 0;
1403 iunlock(q);
1404
1405 /* free queued blocks */
1406 freeblist(bfirst);
1407
1408 /* wake up readers/writers */
1409 wakeup(&q->rr);
1410 wakeup(&q->wr);
1411 }
1412
1413 /*
1414 * Mark a queue as closed. Wakeup any readers. Don't remove queued
1415 * blocks.
1416 */
1417 void
1418 qhangup(Queue *q, char *msg)
1419 {
1420 /* mark it */
1421 ilock(q);
1422 q->state |= Qclosed;
1423 if(msg == 0 || *msg == 0)
1424 strcpy(q->err, Ehungup);
1425 else
1426 strncpy(q->err, msg, ERRMAX-1);
1427 iunlock(q);
1428
1429 /* wake up readers/writers */
1430 wakeup(&q->rr);
1431 wakeup(&q->wr);
1432 }
1433
1434 /*
1435 * return non-zero if the q is hungup
1436 */
1437 int
1438 qisclosed(Queue *q)
1439 {
1440 return q->state & Qclosed;
1441 }
1442
1443 /*
1444 * mark a queue as no longer hung up
1445 */
1446 void
1447 qreopen(Queue *q)
1448 {
1449 ilock(q);
1450 q->state &= ~Qclosed;
1451 q->state |= Qstarve;
1452 q->eof = 0;
1453 q->limit = q->inilim;
1454 iunlock(q);
1455 }
1456
1457 /*
1458 * return bytes queued
1459 */
1460 int
1461 qlen(Queue *q)
1462 {
1463 return q->dlen;
1464 }
1465
1466 /*
1467 * return space remaining before flow control
1468 */
1469 int
1470 qwindow(Queue *q)
1471 {
1472 int l;
1473
1474 l = q->limit - q->len;
1475 if(l < 0)
1476 l = 0;
1477 return l;
1478 }
1479
1480 /*
1481 * return true if we can read without blocking
1482 */
1483 int
1484 qcanread(Queue *q)
1485 {
1486 return q->bfirst!=0;
1487 }
1488
1489 /*
1490 * change queue limit
1491 */
1492 void
1493 qsetlimit(Queue *q, int limit)
1494 {
1495 q->limit = limit;
1496 }
1497
1498 /*
1499 * set blocking/nonblocking
1500 */
1501 void
1502 qnoblock(Queue *q, int onoff)
1503 {
1504 q->noblock = onoff;
1505 }
1506
1507 /*
1508 * flush the output queue
1509 */
1510 void
1511 qflush(Queue *q)
1512 {
1513 Block *bfirst;
1514
1515 /* mark it */
1516 ilock(q);
1517 bfirst = q->bfirst;
1518 q->bfirst = 0;
1519 q->len = 0;
1520 q->dlen = 0;
1521 iunlock(q);
1522
1523 /* free queued blocks */
1524 freeblist(bfirst);
1525
1526 /* wake up readers/writers */
1527 wakeup(&q->wr);
1528 }
1529
1530 int
1531 qfull(Queue *q)
1532 {
1533 return q->state & Qflow;
1534 }
1535
1536 int
1537 qstate(Queue *q)
1538 {
1539 return q->state;
1540 }
Cache object: 7194e5f191f93ddbd03aebf112216268
|