FreeBSD/Linux Kernel Cross Reference
sys/kern/sys_pipe.c
1 /*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice immediately at the beginning of the file, without modification,
10 * this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 * John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 * are met.
18 *
19 * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.21.2.1 1999/09/05 08:15:17 peter Exp $
20 */
21
22 #ifndef OLD_PIPE
23
24 /*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30
31 /*
32 * This code has two modes of operation, a small write mode and a large
33 * write mode. The small write mode acts like conventional pipes with
34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
37 * the receiving process can copy it directly from the pages in the sending
38 * process.
39 *
40 * If the sending process receives a signal, it is possible that it will
41 * go away, and certainly its address space can change, because control
42 * is returned back to the user-mode side. In that case, the pipe code
43 * arranges to copy the buffer supplied by the user process, to a pageable
44 * kernel buffer, and the receiving process will grab the data from the
45 * pageable kernel buffer. Since signals don't happen all that often,
46 * the copy operation is normally eliminated.
47 *
48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
49 * happen for small transfers so that the system will not spend all of
50 * its time context switching. PIPE_SIZE is constrained by the
51 * amount of kernel virtual memory.
52 */
53
54 #include <sys/param.h>
55 #include <sys/systm.h>
56 #include <sys/proc.h>
57 #include <sys/file.h>
58 #include <sys/protosw.h>
59 #include <sys/stat.h>
60 #include <sys/filedesc.h>
61 #include <sys/malloc.h>
62 #include <sys/ioctl.h>
63 #include <sys/stat.h>
64 #include <sys/select.h>
65 #include <sys/signalvar.h>
66 #include <sys/errno.h>
67 #include <sys/queue.h>
68 #include <sys/vmmeter.h>
69 #include <sys/kernel.h>
70 #include <sys/sysproto.h>
71 #include <sys/pipe.h>
72
73 #include <vm/vm.h>
74 #include <vm/vm_prot.h>
75 #include <vm/vm_param.h>
76 #include <vm/lock.h>
77 #include <vm/vm_object.h>
78 #include <vm/vm_kern.h>
79 #include <vm/vm_extern.h>
80 #include <vm/pmap.h>
81 #include <vm/vm_map.h>
82 #include <vm/vm_page.h>
83
84 /*
85 * Use this define if you want to disable *fancy* VM things. Expect an
86 * approx 30% decrease in transfer rate. This could be useful for
87 * NetBSD or OpenBSD.
88 */
89 /* #define PIPE_NODIRECT */
90
91 /*
92 * interfaces to the outside world
93 */
94 static int pipe_read __P((struct file *fp, struct uio *uio,
95 struct ucred *cred));
96 static int pipe_write __P((struct file *fp, struct uio *uio,
97 struct ucred *cred));
98 static int pipe_close __P((struct file *fp, struct proc *p));
99 static int pipe_select __P((struct file *fp, int which, struct proc *p));
100 static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
101
102 static struct fileops pipeops =
103 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
104
105 /*
106 * Default pipe buffer size(s), this can be kind-of large now because pipe
107 * space is pageable. The pipe code will try to maintain locality of
108 * reference for performance reasons, so small amounts of outstanding I/O
109 * will not wipe the cache.
110 */
111 #define MINPIPESIZE (PIPE_SIZE/3)
112 #define MAXPIPESIZE (2*PIPE_SIZE/3)
113
114 /*
115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
116 * is there so that on large systems, we don't exhaust it.
117 */
118 #define MAXPIPEKVA (8*1024*1024)
119
120 /*
121 * Limit for direct transfers, we cannot, of course limit
122 * the amount of kva for pipes in general though.
123 */
124 #define LIMITPIPEKVA (16*1024*1024)
125
126 /*
127 * Limit the number of "big" pipes
128 */
129 #define LIMITBIGPIPES 32
130 int nbigpipe;
131
132 static int amountpipekva;
133
134 static void pipeclose __P((struct pipe *cpipe));
135 static void pipeinit __P((struct pipe *cpipe));
136 static __inline int pipelock __P((struct pipe *cpipe, int catch));
137 static __inline void pipeunlock __P((struct pipe *cpipe));
138 static __inline void pipeselwakeup __P((struct pipe *cpipe));
139 #ifndef PIPE_NODIRECT
140 static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
141 static void pipe_destroy_write_buffer __P((struct pipe *wpipe));
142 static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
143 static void pipe_clone_write_buffer __P((struct pipe *wpipe));
144 #endif
145 static void pipespace __P((struct pipe *cpipe));
146
147 /*
148 * The pipe system call for the DTYPE_PIPE type of pipes
149 */
150
151 /* ARGSUSED */
152 int
153 pipe(p, uap, retval)
154 struct proc *p;
155 struct pipe_args /* {
156 int dummy;
157 } */ *uap;
158 int retval[];
159 {
160 register struct filedesc *fdp = p->p_fd;
161 struct file *rf, *wf;
162 struct pipe *rpipe, *wpipe;
163 int fd, error;
164
165 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
166 pipeinit(rpipe);
167 rpipe->pipe_state |= PIPE_DIRECTOK;
168 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
169 pipeinit(wpipe);
170 wpipe->pipe_state |= PIPE_DIRECTOK;
171
172 error = falloc(p, &rf, &fd);
173 if (error)
174 goto free2;
175 retval[0] = fd;
176 rf->f_flag = FREAD | FWRITE;
177 rf->f_type = DTYPE_PIPE;
178 rf->f_ops = &pipeops;
179 rf->f_data = (caddr_t)rpipe;
180 error = falloc(p, &wf, &fd);
181 if (error)
182 goto free3;
183 wf->f_flag = FREAD | FWRITE;
184 wf->f_type = DTYPE_PIPE;
185 wf->f_ops = &pipeops;
186 wf->f_data = (caddr_t)wpipe;
187 retval[1] = fd;
188
189 rpipe->pipe_peer = wpipe;
190 wpipe->pipe_peer = rpipe;
191
192 return (0);
193 free3:
194 ffree(rf);
195 fdp->fd_ofiles[retval[0]] = 0;
196 free2:
197 (void)pipeclose(wpipe);
198 (void)pipeclose(rpipe);
199 return (error);
200 }
201
202 /*
203 * Allocate kva for pipe circular buffer, the space is pageable
204 */
205 static void
206 pipespace(cpipe)
207 struct pipe *cpipe;
208 {
209 int npages, error;
210
211 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
212 /*
213 * Create an object, I don't like the idea of paging to/from
214 * kernel_object.
215 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
216 */
217 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
218 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
219
220 /*
221 * Insert the object into the kernel map, and allocate kva for it.
222 * The map entry is, by default, pageable.
223 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
224 */
225 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
226 (vm_offset_t *) &cpipe->pipe_buffer.buffer,
227 cpipe->pipe_buffer.size, 1,
228 VM_PROT_ALL, VM_PROT_ALL, 0);
229
230 if (error != KERN_SUCCESS)
231 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
232 amountpipekva += cpipe->pipe_buffer.size;
233 }
234
235 /*
236 * initialize and allocate VM and memory for pipe
237 */
238 static void
239 pipeinit(cpipe)
240 struct pipe *cpipe;
241 {
242 int s;
243
244 cpipe->pipe_buffer.in = 0;
245 cpipe->pipe_buffer.out = 0;
246 cpipe->pipe_buffer.cnt = 0;
247 cpipe->pipe_buffer.size = PIPE_SIZE;
248
249 /* Buffer kva gets dynamically allocated */
250 cpipe->pipe_buffer.buffer = NULL;
251 /* cpipe->pipe_buffer.object = invalid */
252
253 cpipe->pipe_state = 0;
254 cpipe->pipe_peer = NULL;
255 cpipe->pipe_busy = 0;
256 s = splhigh();
257 cpipe->pipe_ctime = time;
258 cpipe->pipe_atime = time;
259 cpipe->pipe_mtime = time;
260 splx(s);
261 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
262 cpipe->pipe_pgid = NO_PID;
263
264 #ifndef PIPE_NODIRECT
265 /*
266 * pipe data structure initializations to support direct pipe I/O
267 */
268 cpipe->pipe_map.cnt = 0;
269 cpipe->pipe_map.kva = 0;
270 cpipe->pipe_map.pos = 0;
271 cpipe->pipe_map.npages = 0;
272 /* cpipe->pipe_map.ms[] = invalid */
273 #endif
274 }
275
276
277 /*
278 * lock a pipe for I/O, blocking other access
279 */
280 static __inline int
281 pipelock(cpipe, catch)
282 struct pipe *cpipe;
283 int catch;
284 {
285 int error;
286 while (cpipe->pipe_state & PIPE_LOCK) {
287 cpipe->pipe_state |= PIPE_LWANT;
288 if (error = tsleep( cpipe,
289 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
290 return error;
291 }
292 }
293 cpipe->pipe_state |= PIPE_LOCK;
294 return 0;
295 }
296
297 /*
298 * unlock a pipe I/O lock
299 */
300 static __inline void
301 pipeunlock(cpipe)
302 struct pipe *cpipe;
303 {
304 cpipe->pipe_state &= ~PIPE_LOCK;
305 if (cpipe->pipe_state & PIPE_LWANT) {
306 cpipe->pipe_state &= ~PIPE_LWANT;
307 wakeup(cpipe);
308 }
309 }
310
311 static __inline void
312 pipeselwakeup(cpipe)
313 struct pipe *cpipe;
314 {
315 struct proc *p;
316
317 if (cpipe->pipe_state & PIPE_SEL) {
318 cpipe->pipe_state &= ~PIPE_SEL;
319 selwakeup(&cpipe->pipe_sel);
320 }
321 if (cpipe->pipe_state & PIPE_ASYNC) {
322 if (cpipe->pipe_pgid < 0)
323 gsignal(-cpipe->pipe_pgid, SIGIO);
324 else if ((p = pfind(cpipe->pipe_pgid)) != NULL)
325 psignal(p, SIGIO);
326 }
327 }
328
329 /* ARGSUSED */
330 static int
331 pipe_read(fp, uio, cred)
332 struct file *fp;
333 struct uio *uio;
334 struct ucred *cred;
335 {
336
337 struct pipe *rpipe = (struct pipe *) fp->f_data;
338 int error = 0;
339 int nread = 0;
340 u_int size;
341
342 ++rpipe->pipe_busy;
343 while (uio->uio_resid) {
344 /*
345 * normal pipe buffer receive
346 */
347 if (rpipe->pipe_buffer.cnt > 0) {
348 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
349 if (size > rpipe->pipe_buffer.cnt)
350 size = rpipe->pipe_buffer.cnt;
351 if (size > (u_int) uio->uio_resid)
352 size = (u_int) uio->uio_resid;
353 if ((error = pipelock(rpipe,1)) == 0) {
354 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
355 size, uio);
356 pipeunlock(rpipe);
357 }
358 if (error) {
359 break;
360 }
361 rpipe->pipe_buffer.out += size;
362 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
363 rpipe->pipe_buffer.out = 0;
364
365 rpipe->pipe_buffer.cnt -= size;
366 nread += size;
367 #ifndef PIPE_NODIRECT
368 /*
369 * Direct copy, bypassing a kernel buffer.
370 */
371 } else if ((size = rpipe->pipe_map.cnt) &&
372 (rpipe->pipe_state & PIPE_DIRECTW)) {
373 caddr_t va;
374 if (size > (u_int) uio->uio_resid)
375 size = (u_int) uio->uio_resid;
376 if ((error = pipelock(rpipe,1)) == 0) {
377 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
378 error = uiomove(va, size, uio);
379 pipeunlock(rpipe);
380 }
381 if (error)
382 break;
383 nread += size;
384 rpipe->pipe_map.pos += size;
385 rpipe->pipe_map.cnt -= size;
386 if (rpipe->pipe_map.cnt == 0) {
387 rpipe->pipe_state &= ~PIPE_DIRECTW;
388 wakeup(rpipe);
389 }
390 #endif
391 } else {
392 /*
393 * detect EOF condition
394 */
395 if (rpipe->pipe_state & PIPE_EOF) {
396 /* XXX error = ? */
397 break;
398 }
399 /*
400 * If the "write-side" has been blocked, wake it up now.
401 */
402 if (rpipe->pipe_state & PIPE_WANTW) {
403 rpipe->pipe_state &= ~PIPE_WANTW;
404 wakeup(rpipe);
405 }
406 if (nread > 0)
407 break;
408
409 if (fp->f_flag & FNONBLOCK) {
410 error = EAGAIN;
411 break;
412 }
413
414 /*
415 * If there is no more to read in the pipe, reset
416 * its pointers to the beginning. This improves
417 * cache hit stats.
418 */
419
420 if ((error = pipelock(rpipe,1)) == 0) {
421 if (rpipe->pipe_buffer.cnt == 0) {
422 rpipe->pipe_buffer.in = 0;
423 rpipe->pipe_buffer.out = 0;
424 }
425 pipeunlock(rpipe);
426 } else {
427 break;
428 }
429
430 if (rpipe->pipe_state & PIPE_WANTW) {
431 rpipe->pipe_state &= ~PIPE_WANTW;
432 wakeup(rpipe);
433 }
434
435 rpipe->pipe_state |= PIPE_WANTR;
436 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
437 break;
438 }
439 }
440 }
441
442 if (error == 0) {
443 int s = splhigh();
444 rpipe->pipe_atime = time;
445 splx(s);
446 }
447
448 --rpipe->pipe_busy;
449 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
450 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
451 wakeup(rpipe);
452 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
453 /*
454 * If there is no more to read in the pipe, reset
455 * its pointers to the beginning. This improves
456 * cache hit stats.
457 */
458 if (rpipe->pipe_buffer.cnt == 0) {
459 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
460 rpipe->pipe_buffer.in = 0;
461 rpipe->pipe_buffer.out = 0;
462 pipeunlock(rpipe);
463 }
464 }
465
466 /*
467 * If the "write-side" has been blocked, wake it up now.
468 */
469 if (rpipe->pipe_state & PIPE_WANTW) {
470 rpipe->pipe_state &= ~PIPE_WANTW;
471 wakeup(rpipe);
472 }
473 }
474
475 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
476 pipeselwakeup(rpipe);
477
478 return error;
479 }
480
481 #ifndef PIPE_NODIRECT
482 /*
483 * Map the sending processes' buffer into kernel space and wire it.
484 * This is similar to a physical write operation.
485 */
486 static int
487 pipe_build_write_buffer(wpipe, uio)
488 struct pipe *wpipe;
489 struct uio *uio;
490 {
491 u_int size;
492 int i;
493 vm_offset_t addr, endaddr, paddr;
494
495 size = (u_int) uio->uio_iov->iov_len;
496 if (size > wpipe->pipe_buffer.size)
497 size = wpipe->pipe_buffer.size;
498
499 endaddr = round_page(uio->uio_iov->iov_base + size);
500 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
501 addr < endaddr;
502 addr += PAGE_SIZE, i+=1) {
503
504 vm_page_t m;
505
506 vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
507 paddr = pmap_kextract(addr);
508 if (!paddr) {
509 int j;
510 for(j=0;j<i;j++)
511 vm_page_unwire(wpipe->pipe_map.ms[j]);
512 return EFAULT;
513 }
514
515 m = PHYS_TO_VM_PAGE(paddr);
516 vm_page_wire(m);
517 wpipe->pipe_map.ms[i] = m;
518 }
519
520 /*
521 * set up the control block
522 */
523 wpipe->pipe_map.npages = i;
524 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
525 wpipe->pipe_map.cnt = size;
526
527 /*
528 * and map the buffer
529 */
530 if (wpipe->pipe_map.kva == 0) {
531 /*
532 * We need to allocate space for an extra page because the
533 * address range might (will) span pages at times.
534 */
535 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
536 wpipe->pipe_buffer.size + PAGE_SIZE);
537 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
538 }
539 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
540 wpipe->pipe_map.npages);
541
542 /*
543 * and update the uio data
544 */
545
546 uio->uio_iov->iov_len -= size;
547 uio->uio_iov->iov_base += size;
548 if (uio->uio_iov->iov_len == 0)
549 uio->uio_iov++;
550 uio->uio_resid -= size;
551 uio->uio_offset += size;
552 return 0;
553 }
554
555 /*
556 * unmap and unwire the process buffer
557 */
558 static void
559 pipe_destroy_write_buffer(wpipe)
560 struct pipe *wpipe;
561 {
562 int i;
563 if (wpipe->pipe_map.kva) {
564 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
565
566 if (amountpipekva > MAXPIPEKVA) {
567 vm_offset_t kva = wpipe->pipe_map.kva;
568 wpipe->pipe_map.kva = 0;
569 kmem_free(kernel_map, kva,
570 wpipe->pipe_buffer.size + PAGE_SIZE);
571 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
572 }
573 }
574 for (i=0;i<wpipe->pipe_map.npages;i++)
575 vm_page_unwire(wpipe->pipe_map.ms[i]);
576 }
577
578 /*
579 * In the case of a signal, the writing process might go away. This
580 * code copies the data into the circular buffer so that the source
581 * pages can be freed without loss of data.
582 */
583 static void
584 pipe_clone_write_buffer(wpipe)
585 struct pipe *wpipe;
586 {
587 int size;
588 int pos;
589
590 size = wpipe->pipe_map.cnt;
591 pos = wpipe->pipe_map.pos;
592 bcopy((caddr_t) wpipe->pipe_map.kva+pos,
593 (caddr_t) wpipe->pipe_buffer.buffer,
594 size);
595
596 wpipe->pipe_buffer.in = size;
597 wpipe->pipe_buffer.out = 0;
598 wpipe->pipe_buffer.cnt = size;
599 wpipe->pipe_state &= ~PIPE_DIRECTW;
600
601 pipe_destroy_write_buffer(wpipe);
602 }
603
604 /*
605 * This implements the pipe buffer write mechanism. Note that only
606 * a direct write OR a normal pipe write can be pending at any given time.
607 * If there are any characters in the pipe buffer, the direct write will
608 * be deferred until the receiving process grabs all of the bytes from
609 * the pipe buffer. Then the direct mapping write is set-up.
610 */
611 static int
612 pipe_direct_write(wpipe, uio)
613 struct pipe *wpipe;
614 struct uio *uio;
615 {
616 int error;
617 retry:
618 while (wpipe->pipe_state & PIPE_DIRECTW) {
619 if ( wpipe->pipe_state & PIPE_WANTR) {
620 wpipe->pipe_state &= ~PIPE_WANTR;
621 wakeup(wpipe);
622 }
623 wpipe->pipe_state |= PIPE_WANTW;
624 error = tsleep(wpipe,
625 PRIBIO|PCATCH, "pipdww", 0);
626 if (error)
627 goto error1;
628 if (wpipe->pipe_state & PIPE_EOF) {
629 error = EPIPE;
630 goto error1;
631 }
632 }
633 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */
634 if (wpipe->pipe_buffer.cnt > 0) {
635 if ( wpipe->pipe_state & PIPE_WANTR) {
636 wpipe->pipe_state &= ~PIPE_WANTR;
637 wakeup(wpipe);
638 }
639
640 wpipe->pipe_state |= PIPE_WANTW;
641 error = tsleep(wpipe,
642 PRIBIO|PCATCH, "pipdwc", 0);
643 if (error)
644 goto error1;
645 if (wpipe->pipe_state & PIPE_EOF) {
646 error = EPIPE;
647 goto error1;
648 }
649 goto retry;
650 }
651
652 wpipe->pipe_state |= PIPE_DIRECTW;
653
654 error = pipe_build_write_buffer(wpipe, uio);
655 if (error) {
656 wpipe->pipe_state &= ~PIPE_DIRECTW;
657 goto error1;
658 }
659
660 error = 0;
661 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
662 if (wpipe->pipe_state & PIPE_EOF) {
663 pipelock(wpipe, 0);
664 pipe_destroy_write_buffer(wpipe);
665 pipeunlock(wpipe);
666 pipeselwakeup(wpipe);
667 error = EPIPE;
668 goto error1;
669 }
670 if (wpipe->pipe_state & PIPE_WANTR) {
671 wpipe->pipe_state &= ~PIPE_WANTR;
672 wakeup(wpipe);
673 }
674 pipeselwakeup(wpipe);
675 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
676 }
677
678 pipelock(wpipe,0);
679 if (wpipe->pipe_state & PIPE_DIRECTW) {
680 /*
681 * this bit of trickery substitutes a kernel buffer for
682 * the process that might be going away.
683 */
684 pipe_clone_write_buffer(wpipe);
685 } else {
686 pipe_destroy_write_buffer(wpipe);
687 }
688 pipeunlock(wpipe);
689 return error;
690
691 error1:
692 wakeup(wpipe);
693 return error;
694 }
695 #endif
696
697 static int
698 pipe_write(fp, uio, cred)
699 struct file *fp;
700 struct uio *uio;
701 struct ucred *cred;
702 {
703 int error = 0;
704 int orig_resid;
705
706 struct pipe *wpipe, *rpipe;
707
708 rpipe = (struct pipe *) fp->f_data;
709 wpipe = rpipe->pipe_peer;
710
711 /*
712 * detect loss of pipe read side, issue SIGPIPE if lost.
713 */
714 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
715 return EPIPE;
716 }
717
718 /*
719 * If it is advantageous to resize the pipe buffer, do
720 * so.
721 */
722 if ((uio->uio_resid > PIPE_SIZE) &&
723 (nbigpipe < LIMITBIGPIPES) &&
724 (wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
725 (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
726 (wpipe->pipe_buffer.cnt == 0)) {
727
728 if (wpipe->pipe_buffer.buffer) {
729 amountpipekva -= wpipe->pipe_buffer.size;
730 kmem_free(kernel_map,
731 (vm_offset_t)wpipe->pipe_buffer.buffer,
732 wpipe->pipe_buffer.size);
733 }
734
735 #ifndef PIPE_NODIRECT
736 if (wpipe->pipe_map.kva) {
737 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
738 kmem_free(kernel_map,
739 wpipe->pipe_map.kva,
740 wpipe->pipe_buffer.size + PAGE_SIZE);
741 }
742 #endif
743
744 wpipe->pipe_buffer.in = 0;
745 wpipe->pipe_buffer.out = 0;
746 wpipe->pipe_buffer.cnt = 0;
747 wpipe->pipe_buffer.size = BIG_PIPE_SIZE;
748 wpipe->pipe_buffer.buffer = NULL;
749 ++nbigpipe;
750
751 #ifndef PIPE_NODIRECT
752 wpipe->pipe_map.cnt = 0;
753 wpipe->pipe_map.kva = 0;
754 wpipe->pipe_map.pos = 0;
755 wpipe->pipe_map.npages = 0;
756 #endif
757
758 }
759
760
761 if( wpipe->pipe_buffer.buffer == NULL) {
762 if ((error = pipelock(wpipe,1)) == 0) {
763 pipespace(wpipe);
764 pipeunlock(wpipe);
765 } else {
766 return error;
767 }
768 }
769
770 ++wpipe->pipe_busy;
771 orig_resid = uio->uio_resid;
772 while (uio->uio_resid) {
773 int space;
774 #ifndef PIPE_NODIRECT
775 /*
776 * If the transfer is large, we can gain performance if
777 * we do process-to-process copies directly.
778 * If the write is non-blocking, we don't use the
779 * direct write mechanism.
780 */
781 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
782 (fp->f_flag & FNONBLOCK) == 0 &&
783 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
784 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
785 error = pipe_direct_write( wpipe, uio);
786 if (error) {
787 break;
788 }
789 continue;
790 }
791 #endif
792
793 /*
794 * Pipe buffered writes cannot be coincidental with
795 * direct writes. We wait until the currently executing
796 * direct write is completed before we start filling the
797 * pipe buffer.
798 */
799 retrywrite:
800 while (wpipe->pipe_state & PIPE_DIRECTW) {
801 if (wpipe->pipe_state & PIPE_WANTR) {
802 wpipe->pipe_state &= ~PIPE_WANTR;
803 wakeup(wpipe);
804 }
805 error = tsleep(wpipe,
806 PRIBIO|PCATCH, "pipbww", 0);
807 if (error)
808 break;
809 }
810
811 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
812
813 /* Writes of size <= PIPE_BUF must be atomic. */
814 /* XXX perhaps they need to be contiguous to be atomic? */
815 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
816 space = 0;
817
818 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
819 /*
820 * This set the maximum transfer as a segment of
821 * the buffer.
822 */
823 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
824 /*
825 * space is the size left in the buffer
826 */
827 if (size > space)
828 size = space;
829 /*
830 * now limit it to the size of the uio transfer
831 */
832 if (size > uio->uio_resid)
833 size = uio->uio_resid;
834 if ((error = pipelock(wpipe,1)) == 0) {
835 /*
836 * It is possible for a direct write to
837 * slip in on us... handle it here...
838 */
839 if (wpipe->pipe_state & PIPE_DIRECTW) {
840 pipeunlock(wpipe);
841 goto retrywrite;
842 }
843 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
844 size, uio);
845 pipeunlock(wpipe);
846 }
847 if (error)
848 break;
849
850 wpipe->pipe_buffer.in += size;
851 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
852 wpipe->pipe_buffer.in = 0;
853
854 wpipe->pipe_buffer.cnt += size;
855 } else {
856 /*
857 * If the "read-side" has been blocked, wake it up now.
858 */
859 if (wpipe->pipe_state & PIPE_WANTR) {
860 wpipe->pipe_state &= ~PIPE_WANTR;
861 wakeup(wpipe);
862 }
863
864 /*
865 * don't block on non-blocking I/O
866 */
867 if (fp->f_flag & FNONBLOCK) {
868 error = EAGAIN;
869 break;
870 }
871
872 /*
873 * We have no more space and have something to offer,
874 * wake up selects.
875 */
876 pipeselwakeup(wpipe);
877
878 wpipe->pipe_state |= PIPE_WANTW;
879 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
880 break;
881 }
882 /*
883 * If read side wants to go away, we just issue a signal
884 * to ourselves.
885 */
886 if (wpipe->pipe_state & PIPE_EOF) {
887 error = EPIPE;
888 break;
889 }
890 }
891 }
892
893 --wpipe->pipe_busy;
894 if ((wpipe->pipe_busy == 0) &&
895 (wpipe->pipe_state & PIPE_WANT)) {
896 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
897 wakeup(wpipe);
898 } else if (wpipe->pipe_buffer.cnt > 0) {
899 /*
900 * If we have put any characters in the buffer, we wake up
901 * the reader.
902 */
903 if (wpipe->pipe_state & PIPE_WANTR) {
904 wpipe->pipe_state &= ~PIPE_WANTR;
905 wakeup(wpipe);
906 }
907 }
908
909 /*
910 * Don't return EPIPE if I/O was successful
911 */
912 if ((wpipe->pipe_buffer.cnt == 0) &&
913 (uio->uio_resid == 0) &&
914 (error == EPIPE))
915 error = 0;
916
917 if (error == 0) {
918 int s = splhigh();
919 wpipe->pipe_mtime = time;
920 splx(s);
921 }
922 /*
923 * We have something to offer,
924 * wake up select.
925 */
926 if (wpipe->pipe_buffer.cnt)
927 pipeselwakeup(wpipe);
928
929 return error;
930 }
931
932 /*
933 * we implement a very minimal set of ioctls for compatibility with sockets.
934 */
935 int
936 pipe_ioctl(fp, cmd, data, p)
937 struct file *fp;
938 int cmd;
939 register caddr_t data;
940 struct proc *p;
941 {
942 register struct pipe *mpipe = (struct pipe *)fp->f_data;
943
944 switch (cmd) {
945
946 case FIONBIO:
947 return (0);
948
949 case FIOASYNC:
950 if (*(int *)data) {
951 mpipe->pipe_state |= PIPE_ASYNC;
952 } else {
953 mpipe->pipe_state &= ~PIPE_ASYNC;
954 }
955 return (0);
956
957 case FIONREAD:
958 if (mpipe->pipe_state & PIPE_DIRECTW)
959 *(int *)data = mpipe->pipe_map.cnt;
960 else
961 *(int *)data = mpipe->pipe_buffer.cnt;
962 return (0);
963
964 case TIOCSPGRP:
965 mpipe->pipe_pgid = *(int *)data;
966 return (0);
967
968 case TIOCGPGRP:
969 *(int *)data = mpipe->pipe_pgid;
970 return (0);
971
972 }
973 return (ENOTTY);
974 }
975
976 int
977 pipe_select(fp, which, p)
978 struct file *fp;
979 int which;
980 struct proc *p;
981 {
982 register struct pipe *rpipe = (struct pipe *)fp->f_data;
983 struct pipe *wpipe;
984
985 wpipe = rpipe->pipe_peer;
986 switch (which) {
987
988 case FREAD:
989 if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
990 (rpipe->pipe_buffer.cnt > 0) ||
991 (rpipe->pipe_state & PIPE_EOF)) {
992 return (1);
993 }
994 selrecord(p, &rpipe->pipe_sel);
995 rpipe->pipe_state |= PIPE_SEL;
996 break;
997
998 case FWRITE:
999 if ((wpipe == NULL) ||
1000 (wpipe->pipe_state & PIPE_EOF) ||
1001 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1002 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
1003 return (1);
1004 }
1005 selrecord(p, &wpipe->pipe_sel);
1006 wpipe->pipe_state |= PIPE_SEL;
1007 break;
1008
1009 case 0:
1010 if ((rpipe->pipe_state & PIPE_EOF) ||
1011 (wpipe == NULL) ||
1012 (wpipe->pipe_state & PIPE_EOF)) {
1013 return (1);
1014 }
1015
1016 selrecord(p, &rpipe->pipe_sel);
1017 rpipe->pipe_state |= PIPE_SEL;
1018 break;
1019 }
1020 return (0);
1021 }
1022
1023 int
1024 pipe_stat(pipe, ub)
1025 register struct pipe *pipe;
1026 register struct stat *ub;
1027 {
1028 bzero((caddr_t)ub, sizeof (*ub));
1029 ub->st_mode = S_IFIFO;
1030 ub->st_blksize = pipe->pipe_buffer.size;
1031 ub->st_size = pipe->pipe_buffer.cnt;
1032 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1033 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
1034 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
1035 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
1036 /*
1037 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1038 * st_flags, st_gen.
1039 * XXX (st_dev, st_ino) should be unique.
1040 */
1041 return 0;
1042 }
1043
1044 /* ARGSUSED */
1045 static int
1046 pipe_close(fp, p)
1047 struct file *fp;
1048 struct proc *p;
1049 {
1050 struct pipe *cpipe = (struct pipe *)fp->f_data;
1051
1052 pipeclose(cpipe);
1053 fp->f_data = NULL;
1054 return 0;
1055 }
1056
1057 /*
1058 * shutdown the pipe
1059 */
1060 static void
1061 pipeclose(cpipe)
1062 struct pipe *cpipe;
1063 {
1064 struct pipe *ppipe;
1065 if (cpipe) {
1066
1067 pipeselwakeup(cpipe);
1068
1069 /*
1070 * If the other side is blocked, wake it up saying that
1071 * we want to close it down.
1072 */
1073 while (cpipe->pipe_busy) {
1074 wakeup(cpipe);
1075 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1076 tsleep(cpipe, PRIBIO, "pipecl", 0);
1077 }
1078
1079 /*
1080 * Disconnect from peer
1081 */
1082 if (ppipe = cpipe->pipe_peer) {
1083 pipeselwakeup(ppipe);
1084
1085 ppipe->pipe_state |= PIPE_EOF;
1086 wakeup(ppipe);
1087 ppipe->pipe_peer = NULL;
1088 }
1089
1090 /*
1091 * free resources
1092 */
1093 if (cpipe->pipe_buffer.buffer) {
1094 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1095 --nbigpipe;
1096 amountpipekva -= cpipe->pipe_buffer.size;
1097 kmem_free(kernel_map,
1098 (vm_offset_t)cpipe->pipe_buffer.buffer,
1099 cpipe->pipe_buffer.size);
1100 }
1101 #ifndef PIPE_NODIRECT
1102 if (cpipe->pipe_map.kva) {
1103 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1104 kmem_free(kernel_map,
1105 cpipe->pipe_map.kva,
1106 cpipe->pipe_buffer.size + PAGE_SIZE);
1107 }
1108 #endif
1109 free(cpipe, M_TEMP);
1110 }
1111 }
1112 #endif
Cache object: 9c45dfca677ebc517323ebe9f3a594c8
|