FreeBSD/Linux Kernel Cross Reference
sys/kern/sys_pipe.c
1 /*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice immediately at the beginning of the file, without modification,
10 * this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 * John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 * are met.
18 *
19 * $FreeBSD$
20 */
21
22 /*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29 /*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode. The small write mode acts like conventional pipes with
32 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side. In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer. Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching. PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52 #include <sys/param.h>
53 #include <sys/systm.h>
54 #include <sys/proc.h>
55 #include <sys/fcntl.h>
56 #include <sys/file.h>
57 #include <sys/filedesc.h>
58 #include <sys/filio.h>
59 #include <sys/ttycom.h>
60 #include <sys/stat.h>
61 #include <sys/poll.h>
62 #include <sys/select.h>
63 #include <sys/signalvar.h>
64 #include <sys/sysproto.h>
65 #include <sys/pipe.h>
66 #include <sys/uio.h>
67
68 #include <vm/vm.h>
69 #include <vm/vm_prot.h>
70 #include <vm/vm_param.h>
71 #include <sys/lock.h>
72 #include <vm/vm_object.h>
73 #include <vm/vm_kern.h>
74 #include <vm/vm_extern.h>
75 #include <vm/pmap.h>
76 #include <vm/vm_map.h>
77 #include <vm/vm_page.h>
78 #include <vm/vm_zone.h>
79
80 /*
81 * Use this define if you want to disable *fancy* VM things. Expect an
82 * approx 30% decrease in transfer rate. This could be useful for
83 * NetBSD or OpenBSD.
84 */
85 /* #define PIPE_NODIRECT */
86
87 /*
88 * interfaces to the outside world
89 */
90 static int pipe_read __P((struct file *fp, struct uio *uio,
91 struct ucred *cred, int flags));
92 static int pipe_write __P((struct file *fp, struct uio *uio,
93 struct ucred *cred, int flags));
94 static int pipe_close __P((struct file *fp, struct proc *p));
95 static int pipe_poll __P((struct file *fp, int events, struct ucred *cred,
96 struct proc *p));
97 static int pipe_ioctl __P((struct file *fp, u_long cmd, caddr_t data, struct proc *p));
98
99 static struct fileops pipeops =
100 { pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_close };
101
102 /*
103 * Default pipe buffer size(s), this can be kind-of large now because pipe
104 * space is pageable. The pipe code will try to maintain locality of
105 * reference for performance reasons, so small amounts of outstanding I/O
106 * will not wipe the cache.
107 */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110
111 /*
112 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
113 * is there so that on large systems, we don't exhaust it.
114 */
115 #define MAXPIPEKVA (8*1024*1024)
116
117 /*
118 * Limit for direct transfers, we cannot, of course limit
119 * the amount of kva for pipes in general though.
120 */
121 #define LIMITPIPEKVA (16*1024*1024)
122
123 /*
124 * Limit the number of "big" pipes
125 */
126 #define LIMITBIGPIPES 32
127 static int nbigpipe;
128
129 static int amountpipekva;
130
131 static void pipeclose __P((struct pipe *cpipe));
132 static void pipeinit __P((struct pipe *cpipe));
133 static __inline int pipelock __P((struct pipe *cpipe, int catch));
134 static __inline void pipeunlock __P((struct pipe *cpipe));
135 static __inline void pipeselwakeup __P((struct pipe *cpipe));
136 #ifndef PIPE_NODIRECT
137 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
138 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
139 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
140 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
141 #endif
142 static void pipespace __P((struct pipe *cpipe));
143
144 static vm_zone_t pipe_zone;
145
146 /*
147 * The pipe system call for the DTYPE_PIPE type of pipes
148 */
149
150 /* ARGSUSED */
151 int
152 pipe(p, uap)
153 struct proc *p;
154 struct pipe_args /* {
155 int dummy;
156 } */ *uap;
157 {
158 register struct filedesc *fdp = p->p_fd;
159 struct file *rf, *wf;
160 struct pipe *rpipe, *wpipe;
161 int fd, error;
162
163 if (pipe_zone == NULL)
164 pipe_zone = zinit("PIPE", sizeof (struct pipe), 0, 0, 4);
165
166 rpipe = zalloc( pipe_zone);
167 pipeinit(rpipe);
168 rpipe->pipe_state |= PIPE_DIRECTOK;
169 wpipe = zalloc( pipe_zone);
170 pipeinit(wpipe);
171 wpipe->pipe_state |= PIPE_DIRECTOK;
172
173 error = falloc(p, &rf, &fd);
174 if (error)
175 goto free2;
176 p->p_retval[0] = fd;
177 rf->f_flag = FREAD | FWRITE;
178 rf->f_type = DTYPE_PIPE;
179 rf->f_ops = &pipeops;
180 rf->f_data = (caddr_t)rpipe;
181 error = falloc(p, &wf, &fd);
182 if (error)
183 goto free3;
184 wf->f_flag = FREAD | FWRITE;
185 wf->f_type = DTYPE_PIPE;
186 wf->f_ops = &pipeops;
187 wf->f_data = (caddr_t)wpipe;
188 p->p_retval[1] = fd;
189
190 rpipe->pipe_peer = wpipe;
191 wpipe->pipe_peer = rpipe;
192
193 return (0);
194 free3:
195 ffree(rf);
196 fdp->fd_ofiles[p->p_retval[0]] = 0;
197 free2:
198 (void)pipeclose(wpipe);
199 (void)pipeclose(rpipe);
200 return (error);
201 }
202
203 /*
204 * Allocate kva for pipe circular buffer, the space is pageable
205 */
206 static void
207 pipespace(cpipe)
208 struct pipe *cpipe;
209 {
210 int npages, error;
211
212 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
213 /*
214 * Create an object, I don't like the idea of paging to/from
215 * kernel_object.
216 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
217 */
218 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
219 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
220
221 /*
222 * Insert the object into the kernel map, and allocate kva for it.
223 * The map entry is, by default, pageable.
224 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
225 */
226 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
227 (vm_offset_t *) &cpipe->pipe_buffer.buffer,
228 cpipe->pipe_buffer.size, 1,
229 VM_PROT_ALL, VM_PROT_ALL, 0);
230
231 if (error != KERN_SUCCESS)
232 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
233 amountpipekva += cpipe->pipe_buffer.size;
234 }
235
236 /*
237 * initialize and allocate VM and memory for pipe
238 */
239 static void
240 pipeinit(cpipe)
241 struct pipe *cpipe;
242 {
243
244 cpipe->pipe_buffer.in = 0;
245 cpipe->pipe_buffer.out = 0;
246 cpipe->pipe_buffer.cnt = 0;
247 cpipe->pipe_buffer.size = PIPE_SIZE;
248
249 /* Buffer kva gets dynamically allocated */
250 cpipe->pipe_buffer.buffer = NULL;
251 /* cpipe->pipe_buffer.object = invalid */
252
253 cpipe->pipe_state = 0;
254 cpipe->pipe_peer = NULL;
255 cpipe->pipe_busy = 0;
256 getnanotime(&cpipe->pipe_ctime);
257 cpipe->pipe_atime = cpipe->pipe_ctime;
258 cpipe->pipe_mtime = cpipe->pipe_ctime;
259 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
260
261 #ifndef PIPE_NODIRECT
262 /*
263 * pipe data structure initializations to support direct pipe I/O
264 */
265 cpipe->pipe_map.cnt = 0;
266 cpipe->pipe_map.kva = 0;
267 cpipe->pipe_map.pos = 0;
268 cpipe->pipe_map.npages = 0;
269 /* cpipe->pipe_map.ms[] = invalid */
270 #endif
271 }
272
273
274 /*
275 * lock a pipe for I/O, blocking other access
276 */
277 static __inline int
278 pipelock(cpipe, catch)
279 struct pipe *cpipe;
280 int catch;
281 {
282 int error;
283 while (cpipe->pipe_state & PIPE_LOCK) {
284 cpipe->pipe_state |= PIPE_LWANT;
285 if ((error = tsleep( cpipe,
286 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) != 0) {
287 return error;
288 }
289 }
290 cpipe->pipe_state |= PIPE_LOCK;
291 return 0;
292 }
293
294 /*
295 * unlock a pipe I/O lock
296 */
297 static __inline void
298 pipeunlock(cpipe)
299 struct pipe *cpipe;
300 {
301 cpipe->pipe_state &= ~PIPE_LOCK;
302 if (cpipe->pipe_state & PIPE_LWANT) {
303 cpipe->pipe_state &= ~PIPE_LWANT;
304 wakeup(cpipe);
305 }
306 }
307
308 static __inline void
309 pipeselwakeup(cpipe)
310 struct pipe *cpipe;
311 {
312 if (cpipe->pipe_state & PIPE_SEL) {
313 cpipe->pipe_state &= ~PIPE_SEL;
314 selwakeup(&cpipe->pipe_sel);
315 }
316 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
317 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
318 }
319
320 /* ARGSUSED */
321 static int
322 pipe_read(fp, uio, cred, flags)
323 struct file *fp;
324 struct uio *uio;
325 struct ucred *cred;
326 int flags;
327 {
328
329 struct pipe *rpipe = (struct pipe *) fp->f_data;
330 int error;
331 int nread = 0;
332 u_int size;
333
334 ++rpipe->pipe_busy;
335 error = pipelock(rpipe, 1);
336 if (error)
337 goto unlocked_error;
338
339 while (uio->uio_resid) {
340 /*
341 * normal pipe buffer receive
342 */
343 if (rpipe->pipe_buffer.cnt > 0) {
344 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
345 if (size > rpipe->pipe_buffer.cnt)
346 size = rpipe->pipe_buffer.cnt;
347 if (size > (u_int) uio->uio_resid)
348 size = (u_int) uio->uio_resid;
349
350 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
351 size, uio);
352 if (error) {
353 break;
354 }
355 rpipe->pipe_buffer.out += size;
356 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
357 rpipe->pipe_buffer.out = 0;
358
359 rpipe->pipe_buffer.cnt -= size;
360
361 /*
362 * If there is no more to read in the pipe, reset
363 * its pointers to the beginning. This improves
364 * cache hit stats.
365 */
366 if (rpipe->pipe_buffer.cnt == 0) {
367 rpipe->pipe_buffer.in = 0;
368 rpipe->pipe_buffer.out = 0;
369 }
370 nread += size;
371 #ifndef PIPE_NODIRECT
372 /*
373 * Direct copy, bypassing a kernel buffer.
374 */
375 } else if ((size = rpipe->pipe_map.cnt) &&
376 (rpipe->pipe_state & PIPE_DIRECTW)) {
377 caddr_t va;
378 if (size > (u_int) uio->uio_resid)
379 size = (u_int) uio->uio_resid;
380
381 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
382 error = uiomove(va, size, uio);
383 if (error)
384 break;
385 nread += size;
386 rpipe->pipe_map.pos += size;
387 rpipe->pipe_map.cnt -= size;
388 if (rpipe->pipe_map.cnt == 0) {
389 rpipe->pipe_state &= ~PIPE_DIRECTW;
390 wakeup(rpipe);
391 }
392 #endif
393 } else {
394 /*
395 * detect EOF condition
396 */
397 if (rpipe->pipe_state & PIPE_EOF) {
398 /* XXX error = ? */
399 break;
400 }
401
402 /*
403 * If the "write-side" has been blocked, wake it up now.
404 */
405 if (rpipe->pipe_state & PIPE_WANTW) {
406 rpipe->pipe_state &= ~PIPE_WANTW;
407 wakeup(rpipe);
408 }
409
410 /*
411 * Break if some data was read.
412 */
413 if (nread > 0)
414 break;
415
416 /*
417 * Unlock the pipe buffer for our remaining processing. We
418 * will either break out with an error or we will sleep and
419 * relock to loop.
420 */
421 pipeunlock(rpipe);
422
423 /*
424 * Handle non-blocking mode operation or
425 * wait for more data.
426 */
427 if (fp->f_flag & FNONBLOCK)
428 error = EAGAIN;
429 else {
430 rpipe->pipe_state |= PIPE_WANTR;
431 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
432 error = pipelock(rpipe, 1);
433 }
434 if (error)
435 goto unlocked_error;
436 }
437 }
438 pipeunlock(rpipe);
439
440 if (error == 0)
441 getnanotime(&rpipe->pipe_atime);
442 unlocked_error:
443 --rpipe->pipe_busy;
444
445 /*
446 * PIPE_WANT processing only makes sense if pipe_busy is 0.
447 */
448 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
449 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
450 wakeup(rpipe);
451 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
452 /*
453 * Handle write blocking hysteresis.
454 */
455 if (rpipe->pipe_state & PIPE_WANTW) {
456 rpipe->pipe_state &= ~PIPE_WANTW;
457 wakeup(rpipe);
458 }
459 }
460
461 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
462 pipeselwakeup(rpipe);
463
464 return error;
465 }
466
467 #ifndef PIPE_NODIRECT
468 /*
469 * Map the sending processes' buffer into kernel space and wire it.
470 * This is similar to a physical write operation.
471 */
472 static int
473 pipe_build_write_buffer(wpipe, uio)
474 struct pipe *wpipe;
475 struct uio *uio;
476 {
477 u_int size;
478 int i;
479 vm_offset_t addr, endaddr, paddr;
480
481 size = (u_int) uio->uio_iov->iov_len;
482 if (size > wpipe->pipe_buffer.size)
483 size = wpipe->pipe_buffer.size;
484
485 endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
486 for(i = 0, addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
487 addr < endaddr;
488 addr += PAGE_SIZE, i+=1) {
489
490 vm_page_t m;
491
492 vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
493 paddr = pmap_kextract(addr);
494 if (!paddr) {
495 int j;
496 for(j=0;j<i;j++)
497 vm_page_unwire(wpipe->pipe_map.ms[j], 1);
498 return EFAULT;
499 }
500
501 m = PHYS_TO_VM_PAGE(paddr);
502 vm_page_wire(m);
503 wpipe->pipe_map.ms[i] = m;
504 }
505
506 /*
507 * set up the control block
508 */
509 wpipe->pipe_map.npages = i;
510 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
511 wpipe->pipe_map.cnt = size;
512
513 /*
514 * and map the buffer
515 */
516 if (wpipe->pipe_map.kva == 0) {
517 /*
518 * We need to allocate space for an extra page because the
519 * address range might (will) span pages at times.
520 */
521 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
522 wpipe->pipe_buffer.size + PAGE_SIZE);
523 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
524 }
525 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
526 wpipe->pipe_map.npages);
527
528 /*
529 * and update the uio data
530 */
531
532 uio->uio_iov->iov_len -= size;
533 uio->uio_iov->iov_base += size;
534 if (uio->uio_iov->iov_len == 0)
535 uio->uio_iov++;
536 uio->uio_resid -= size;
537 uio->uio_offset += size;
538 return 0;
539 }
540
541 /*
542 * unmap and unwire the process buffer
543 */
544 static void
545 pipe_destroy_write_buffer(wpipe)
546 struct pipe *wpipe;
547 {
548 int i;
549 if (wpipe->pipe_map.kva) {
550 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
551
552 if (amountpipekva > MAXPIPEKVA) {
553 vm_offset_t kva = wpipe->pipe_map.kva;
554 wpipe->pipe_map.kva = 0;
555 kmem_free(kernel_map, kva,
556 wpipe->pipe_buffer.size + PAGE_SIZE);
557 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
558 }
559 }
560 for (i=0;i<wpipe->pipe_map.npages;i++)
561 vm_page_unwire(wpipe->pipe_map.ms[i], 1);
562 }
563
564 /*
565 * In the case of a signal, the writing process might go away. This
566 * code copies the data into the circular buffer so that the source
567 * pages can be freed without loss of data.
568 */
569 static void
570 pipe_clone_write_buffer(wpipe)
571 struct pipe *wpipe;
572 {
573 int size;
574 int pos;
575
576 size = wpipe->pipe_map.cnt;
577 pos = wpipe->pipe_map.pos;
578 bcopy((caddr_t) wpipe->pipe_map.kva+pos,
579 (caddr_t) wpipe->pipe_buffer.buffer,
580 size);
581
582 wpipe->pipe_buffer.in = size;
583 wpipe->pipe_buffer.out = 0;
584 wpipe->pipe_buffer.cnt = size;
585 wpipe->pipe_state &= ~PIPE_DIRECTW;
586
587 pipe_destroy_write_buffer(wpipe);
588 }
589
590 /*
591 * This implements the pipe buffer write mechanism. Note that only
592 * a direct write OR a normal pipe write can be pending at any given time.
593 * If there are any characters in the pipe buffer, the direct write will
594 * be deferred until the receiving process grabs all of the bytes from
595 * the pipe buffer. Then the direct mapping write is set-up.
596 */
597 static int
598 pipe_direct_write(wpipe, uio)
599 struct pipe *wpipe;
600 struct uio *uio;
601 {
602 int error;
603 retry:
604 while (wpipe->pipe_state & PIPE_DIRECTW) {
605 if ( wpipe->pipe_state & PIPE_WANTR) {
606 wpipe->pipe_state &= ~PIPE_WANTR;
607 wakeup(wpipe);
608 }
609 wpipe->pipe_state |= PIPE_WANTW;
610 error = tsleep(wpipe,
611 PRIBIO|PCATCH, "pipdww", 0);
612 if (error)
613 goto error1;
614 if (wpipe->pipe_state & PIPE_EOF) {
615 error = EPIPE;
616 goto error1;
617 }
618 }
619 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
620 if (wpipe->pipe_buffer.cnt > 0) {
621 if ( wpipe->pipe_state & PIPE_WANTR) {
622 wpipe->pipe_state &= ~PIPE_WANTR;
623 wakeup(wpipe);
624 }
625
626 wpipe->pipe_state |= PIPE_WANTW;
627 error = tsleep(wpipe,
628 PRIBIO|PCATCH, "pipdwc", 0);
629 if (error)
630 goto error1;
631 if (wpipe->pipe_state & PIPE_EOF) {
632 error = EPIPE;
633 goto error1;
634 }
635 goto retry;
636 }
637
638 wpipe->pipe_state |= PIPE_DIRECTW;
639
640 error = pipe_build_write_buffer(wpipe, uio);
641 if (error) {
642 wpipe->pipe_state &= ~PIPE_DIRECTW;
643 goto error1;
644 }
645
646 error = 0;
647 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
648 if (wpipe->pipe_state & PIPE_EOF) {
649 pipelock(wpipe, 0);
650 pipe_destroy_write_buffer(wpipe);
651 pipeunlock(wpipe);
652 pipeselwakeup(wpipe);
653 error = EPIPE;
654 goto error1;
655 }
656 if (wpipe->pipe_state & PIPE_WANTR) {
657 wpipe->pipe_state &= ~PIPE_WANTR;
658 wakeup(wpipe);
659 }
660 pipeselwakeup(wpipe);
661 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
662 }
663
664 pipelock(wpipe,0);
665 if (wpipe->pipe_state & PIPE_DIRECTW) {
666 /*
667 * this bit of trickery substitutes a kernel buffer for
668 * the process that might be going away.
669 */
670 pipe_clone_write_buffer(wpipe);
671 } else {
672 pipe_destroy_write_buffer(wpipe);
673 }
674 pipeunlock(wpipe);
675 return error;
676
677 error1:
678 wakeup(wpipe);
679 return error;
680 }
681 #endif
682
683 static int
684 pipe_write(fp, uio, cred, flags)
685 struct file *fp;
686 struct uio *uio;
687 struct ucred *cred;
688 int flags;
689 {
690 int error = 0;
691 int orig_resid;
692
693 struct pipe *wpipe, *rpipe;
694
695 rpipe = (struct pipe *) fp->f_data;
696 wpipe = rpipe->pipe_peer;
697
698 /*
699 * detect loss of pipe read side, issue SIGPIPE if lost.
700 */
701 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
702 return EPIPE;
703 }
704
705 /*
706 * If it is advantageous to resize the pipe buffer, do
707 * so.
708 */
709 if ((uio->uio_resid > PIPE_SIZE) &&
710 (nbigpipe < LIMITBIGPIPES) &&
711 (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
712 (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
713 (wpipe->pipe_buffer.cnt == 0)) {
714
715 if (wpipe->pipe_buffer.buffer) {
716 amountpipekva -= wpipe->pipe_buffer.size;
717 kmem_free(kernel_map,
718 (vm_offset_t)wpipe->pipe_buffer.buffer,
719 wpipe->pipe_buffer.size);
720 }
721
722 #ifndef PIPE_NODIRECT
723 if (wpipe->pipe_map.kva) {
724 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
725 kmem_free(kernel_map,
726 wpipe->pipe_map.kva,
727 wpipe->pipe_buffer.size + PAGE_SIZE);
728 }
729 #endif
730
731 wpipe->pipe_buffer.in = 0;
732 wpipe->pipe_buffer.out = 0;
733 wpipe->pipe_buffer.cnt = 0;
734 wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
735 wpipe->pipe_buffer.buffer = NULL;
736 ++nbigpipe;
737
738 #ifndef PIPE_NODIRECT
739 wpipe->pipe_map.cnt = 0;
740 wpipe->pipe_map.kva = 0;
741 wpipe->pipe_map.pos = 0;
742 wpipe->pipe_map.npages = 0;
743 #endif
744
745 }
746
747
748 if( wpipe->pipe_buffer.buffer == NULL) {
749 if ((error = pipelock(wpipe,1)) == 0) {
750 pipespace(wpipe);
751 pipeunlock(wpipe);
752 } else {
753 return error;
754 }
755 }
756
757 ++wpipe->pipe_busy;
758 orig_resid = uio->uio_resid;
759 while (uio->uio_resid) {
760 int space;
761 #ifndef PIPE_NODIRECT
762 /*
763 * If the transfer is large, we can gain performance if
764 * we do process-to-process copies directly.
765 * If the write is non-blocking, we don't use the
766 * direct write mechanism.
767 */
768 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
769 (fp->f_flag & FNONBLOCK) == 0 &&
770 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
771 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
772 error = pipe_direct_write( wpipe, uio);
773 if (error) {
774 break;
775 }
776 continue;
777 }
778 #endif
779
780 /*
781 * Pipe buffered writes cannot be coincidental with
782 * direct writes. We wait until the currently executing
783 * direct write is completed before we start filling the
784 * pipe buffer.
785 */
786 retrywrite:
787 while (wpipe->pipe_state & PIPE_DIRECTW) {
788 if (wpipe->pipe_state & PIPE_WANTR) {
789 wpipe->pipe_state &= ~PIPE_WANTR;
790 wakeup(wpipe);
791 }
792 error = tsleep(wpipe, PRIBIO|PCATCH, "pipbww", 0);
793 if (wpipe->pipe_state & PIPE_EOF)
794 break;
795 if (error)
796 break;
797 }
798 if (wpipe->pipe_state & PIPE_EOF) {
799 error = EPIPE;
800 break;
801 }
802
803 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
804
805 /* Writes of size <= PIPE_BUF must be atomic. */
806 /* XXX perhaps they need to be contiguous to be atomic? */
807 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
808 space = 0;
809
810 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
811 /*
812 * This set the maximum transfer as a segment of
813 * the buffer.
814 */
815 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
816 /*
817 * space is the size left in the buffer
818 */
819 if (size > space)
820 size = space;
821 /*
822 * now limit it to the size of the uio transfer
823 */
824 if (size > uio->uio_resid)
825 size = uio->uio_resid;
826 if ((error = pipelock(wpipe,1)) == 0) {
827 /*
828 * It is possible for a direct write to
829 * slip in on us... handle it here...
830 */
831 if (wpipe->pipe_state & PIPE_DIRECTW) {
832 pipeunlock(wpipe);
833 goto retrywrite;
834 }
835 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
836 size, uio);
837 pipeunlock(wpipe);
838 }
839 if (error)
840 break;
841
842 wpipe->pipe_buffer.in += size;
843 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
844 wpipe->pipe_buffer.in = 0;
845
846 wpipe->pipe_buffer.cnt += size;
847 } else {
848 /*
849 * If the "read-side" has been blocked, wake it up now.
850 */
851 if (wpipe->pipe_state & PIPE_WANTR) {
852 wpipe->pipe_state &= ~PIPE_WANTR;
853 wakeup(wpipe);
854 }
855
856 /*
857 * don't block on non-blocking I/O
858 */
859 if (fp->f_flag & FNONBLOCK) {
860 error = EAGAIN;
861 break;
862 }
863
864 /*
865 * We have no more space and have something to offer,
866 * wake up select/poll.
867 */
868 pipeselwakeup(wpipe);
869
870 wpipe->pipe_state |= PIPE_WANTW;
871 if ((error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) != 0) {
872 break;
873 }
874 /*
875 * If read side wants to go away, we just issue a signal
876 * to ourselves.
877 */
878 if (wpipe->pipe_state & PIPE_EOF) {
879 error = EPIPE;
880 break;
881 }
882 }
883 }
884
885 --wpipe->pipe_busy;
886 if ((wpipe->pipe_busy == 0) &&
887 (wpipe->pipe_state & PIPE_WANT)) {
888 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
889 wakeup(wpipe);
890 } else if (wpipe->pipe_buffer.cnt > 0) {
891 /*
892 * If we have put any characters in the buffer, we wake up
893 * the reader.
894 */
895 if (wpipe->pipe_state & PIPE_WANTR) {
896 wpipe->pipe_state &= ~PIPE_WANTR;
897 wakeup(wpipe);
898 }
899 }
900
901 /*
902 * Don't return EPIPE if I/O was successful
903 */
904 if ((wpipe->pipe_buffer.cnt == 0) &&
905 (uio->uio_resid == 0) &&
906 (error == EPIPE))
907 error = 0;
908
909 if (error == 0)
910 getnanotime(&wpipe->pipe_mtime);
911
912 /*
913 * We have something to offer,
914 * wake up select/poll.
915 */
916 if (wpipe->pipe_buffer.cnt)
917 pipeselwakeup(wpipe);
918
919 return error;
920 }
921
922 /*
923 * we implement a very minimal set of ioctls for compatibility with sockets.
924 */
925 int
926 pipe_ioctl(fp, cmd, data, p)
927 struct file *fp;
928 u_long cmd;
929 register caddr_t data;
930 struct proc *p;
931 {
932 register struct pipe *mpipe = (struct pipe *)fp->f_data;
933
934 switch (cmd) {
935
936 case FIONBIO:
937 return (0);
938
939 case FIOASYNC:
940 if (*(int *)data) {
941 mpipe->pipe_state |= PIPE_ASYNC;
942 } else {
943 mpipe->pipe_state &= ~PIPE_ASYNC;
944 }
945 return (0);
946
947 case FIONREAD:
948 if (mpipe->pipe_state & PIPE_DIRECTW)
949 *(int *)data = mpipe->pipe_map.cnt;
950 else
951 *(int *)data = mpipe->pipe_buffer.cnt;
952 return (0);
953
954 case FIOSETOWN:
955 return (fsetown(*(int *)data, &mpipe->pipe_sigio));
956
957 case FIOGETOWN:
958 *(int *)data = fgetown(mpipe->pipe_sigio);
959 return (0);
960
961 /* This is deprecated, FIOSETOWN should be used instead. */
962 case TIOCSPGRP:
963 return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
964
965 /* This is deprecated, FIOGETOWN should be used instead. */
966 case TIOCGPGRP:
967 *(int *)data = -fgetown(mpipe->pipe_sigio);
968 return (0);
969
970 }
971 return (ENOTTY);
972 }
973
974 int
975 pipe_poll(fp, events, cred, p)
976 struct file *fp;
977 int events;
978 struct ucred *cred;
979 struct proc *p;
980 {
981 register struct pipe *rpipe = (struct pipe *)fp->f_data;
982 struct pipe *wpipe;
983 int revents = 0;
984
985 wpipe = rpipe->pipe_peer;
986 if (events & (POLLIN | POLLRDNORM))
987 if ((rpipe->pipe_state & PIPE_DIRECTW) ||
988 (rpipe->pipe_buffer.cnt > 0) ||
989 (rpipe->pipe_state & PIPE_EOF))
990 revents |= events & (POLLIN | POLLRDNORM);
991
992 if (events & (POLLOUT | POLLWRNORM))
993 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
994 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
995 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
996 revents |= events & (POLLOUT | POLLWRNORM);
997
998 if ((rpipe->pipe_state & PIPE_EOF) ||
999 (wpipe == NULL) ||
1000 (wpipe->pipe_state & PIPE_EOF))
1001 revents |= POLLHUP;
1002
1003 if (revents == 0) {
1004 if (events & (POLLIN | POLLRDNORM)) {
1005 selrecord(p, &rpipe->pipe_sel);
1006 rpipe->pipe_state |= PIPE_SEL;
1007 }
1008
1009 if (events & (POLLOUT | POLLWRNORM)) {
1010 selrecord(p, &wpipe->pipe_sel);
1011 wpipe->pipe_state |= PIPE_SEL;
1012 }
1013 }
1014
1015 return (revents);
1016 }
1017
1018 int
1019 pipe_stat(pipe, ub)
1020 register struct pipe *pipe;
1021 register struct stat *ub;
1022 {
1023 bzero((caddr_t)ub, sizeof (*ub));
1024 ub->st_mode = S_IFIFO;
1025 ub->st_blksize = pipe->pipe_buffer.size;
1026 ub->st_size = pipe->pipe_buffer.cnt;
1027 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1028 ub->st_atimespec = pipe->pipe_atime;
1029 ub->st_mtimespec = pipe->pipe_mtime;
1030 ub->st_ctimespec = pipe->pipe_ctime;
1031 /*
1032 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1033 * st_flags, st_gen.
1034 * XXX (st_dev, st_ino) should be unique.
1035 */
1036 return 0;
1037 }
1038
1039 /* ARGSUSED */
1040 static int
1041 pipe_close(fp, p)
1042 struct file *fp;
1043 struct proc *p;
1044 {
1045 struct pipe *cpipe = (struct pipe *)fp->f_data;
1046
1047 funsetown(cpipe->pipe_sigio);
1048 pipeclose(cpipe);
1049 fp->f_data = NULL;
1050 return 0;
1051 }
1052
1053 /*
1054 * shutdown the pipe
1055 */
1056 static void
1057 pipeclose(cpipe)
1058 struct pipe *cpipe;
1059 {
1060 struct pipe *ppipe;
1061 if (cpipe) {
1062
1063 pipeselwakeup(cpipe);
1064
1065 /*
1066 * If the other side is blocked, wake it up saying that
1067 * we want to close it down.
1068 */
1069 while (cpipe->pipe_busy) {
1070 wakeup(cpipe);
1071 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1072 tsleep(cpipe, PRIBIO, "pipecl", 0);
1073 }
1074
1075 /*
1076 * Disconnect from peer
1077 */
1078 if ((ppipe = cpipe->pipe_peer) != NULL) {
1079 pipeselwakeup(ppipe);
1080
1081 ppipe->pipe_state |= PIPE_EOF;
1082 wakeup(ppipe);
1083 ppipe->pipe_peer = NULL;
1084 }
1085
1086 /*
1087 * free resources
1088 */
1089 if (cpipe->pipe_buffer.buffer) {
1090 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1091 --nbigpipe;
1092 amountpipekva -= cpipe->pipe_buffer.size;
1093 kmem_free(kernel_map,
1094 (vm_offset_t)cpipe->pipe_buffer.buffer,
1095 cpipe->pipe_buffer.size);
1096 }
1097 #ifndef PIPE_NODIRECT
1098 if (cpipe->pipe_map.kva) {
1099 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1100 kmem_free(kernel_map,
1101 cpipe->pipe_map.kva,
1102 cpipe->pipe_buffer.size + PAGE_SIZE);
1103 }
1104 #endif
1105 zfree(pipe_zone, cpipe);
1106 }
1107 }
Cache object: 742dde96a8e49ad3e0552ede0f4e602e
|