FreeBSD/Linux Kernel Cross Reference
sys/port/devpipe.c
1 #include "u.h"
2 #include "../port/lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "../port/error.h"
7
8 #include "netif.h"
9
10 typedef struct Pipe Pipe;
11 struct Pipe
12 {
13 QLock;
14 Pipe *next;
15 int ref;
16 ulong path;
17 Queue *q[2];
18 int qref[2];
19 };
20
21 struct
22 {
23 Lock;
24 ulong path;
25 } pipealloc;
26
27 enum
28 {
29 Qdir,
30 Qdata0,
31 Qdata1,
32 };
33
34 Dirtab pipedir[] =
35 {
36 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
37 "data", {Qdata0}, 0, 0600,
38 "data1", {Qdata1}, 0, 0600,
39 };
40 #define NPIPEDIR 3
41
42 static void
43 pipeinit(void)
44 {
45 if(conf.pipeqsize == 0){
46 if(conf.nmach > 1)
47 conf.pipeqsize = 256*1024;
48 else
49 conf.pipeqsize = 32*1024;
50 }
51 }
52
53 /*
54 * create a pipe, no streams are created until an open
55 */
56 static Chan*
57 pipeattach(char *spec)
58 {
59 Pipe *p;
60 Chan *c;
61
62 c = devattach('|', spec);
63 p = malloc(sizeof(Pipe));
64 if(p == 0)
65 exhausted("memory");
66 p->ref = 1;
67
68 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
69 if(p->q[0] == 0){
70 free(p);
71 exhausted("memory");
72 }
73 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
74 if(p->q[1] == 0){
75 free(p->q[0]);
76 free(p);
77 exhausted("memory");
78 }
79
80 lock(&pipealloc);
81 p->path = ++pipealloc.path;
82 unlock(&pipealloc);
83
84 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
85 c->aux = p;
86 c->dev = 0;
87 return c;
88 }
89
90 static int
91 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
92 {
93 Qid q;
94 int len;
95 Pipe *p;
96
97 if(i == DEVDOTDOT){
98 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
99 return 1;
100 }
101 i++; /* skip . */
102 if(tab==0 || i>=ntab)
103 return -1;
104
105 tab += i;
106 p = c->aux;
107 switch((ulong)tab->qid.path){
108 case Qdata0:
109 len = qlen(p->q[0]);
110 break;
111 case Qdata1:
112 len = qlen(p->q[1]);
113 break;
114 default:
115 len = tab->length;
116 break;
117 }
118 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
119 devdir(c, q, tab->name, len, eve, tab->perm, dp);
120 return 1;
121 }
122
123
124 static Walkqid*
125 pipewalk(Chan *c, Chan *nc, char **name, int nname)
126 {
127 Walkqid *wq;
128 Pipe *p;
129
130 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
131 if(wq != nil && wq->clone != nil && wq->clone != c){
132 p = c->aux;
133 qlock(p);
134 p->ref++;
135 if(c->flag & COPEN){
136 print("channel open in pipewalk\n");
137 switch(NETTYPE(c->qid.path)){
138 case Qdata0:
139 p->qref[0]++;
140 break;
141 case Qdata1:
142 p->qref[1]++;
143 break;
144 }
145 }
146 qunlock(p);
147 }
148 return wq;
149 }
150
151 static int
152 pipestat(Chan *c, uchar *db, int n)
153 {
154 Pipe *p;
155 Dir dir;
156
157 p = c->aux;
158
159 switch(NETTYPE(c->qid.path)){
160 case Qdir:
161 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
162 break;
163 case Qdata0:
164 devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
165 break;
166 case Qdata1:
167 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
168 break;
169 default:
170 panic("pipestat");
171 }
172 n = convD2M(&dir, db, n);
173 if(n < BIT16SZ)
174 error(Eshortstat);
175 return n;
176 }
177
178 /*
179 * if the stream doesn't exist, create it
180 */
181 static Chan*
182 pipeopen(Chan *c, int omode)
183 {
184 Pipe *p;
185
186 if(c->qid.type & QTDIR){
187 if(omode != OREAD)
188 error(Ebadarg);
189 c->mode = omode;
190 c->flag |= COPEN;
191 c->offset = 0;
192 return c;
193 }
194
195 p = c->aux;
196 qlock(p);
197 switch(NETTYPE(c->qid.path)){
198 case Qdata0:
199 p->qref[0]++;
200 break;
201 case Qdata1:
202 p->qref[1]++;
203 break;
204 }
205 qunlock(p);
206
207 c->mode = openmode(omode);
208 c->flag |= COPEN;
209 c->offset = 0;
210 c->iounit = qiomaxatomic;
211 return c;
212 }
213
214 static void
215 pipeclose(Chan *c)
216 {
217 Pipe *p;
218
219 p = c->aux;
220 qlock(p);
221
222 if(c->flag & COPEN){
223 /*
224 * closing either side hangs up the stream
225 */
226 switch(NETTYPE(c->qid.path)){
227 case Qdata0:
228 p->qref[0]--;
229 if(p->qref[0] == 0){
230 qhangup(p->q[1], 0);
231 qclose(p->q[0]);
232 }
233 break;
234 case Qdata1:
235 p->qref[1]--;
236 if(p->qref[1] == 0){
237 qhangup(p->q[0], 0);
238 qclose(p->q[1]);
239 }
240 break;
241 }
242 }
243
244
245 /*
246 * if both sides are closed, they are reusable
247 */
248 if(p->qref[0] == 0 && p->qref[1] == 0){
249 qreopen(p->q[0]);
250 qreopen(p->q[1]);
251 }
252
253 /*
254 * free the structure on last close
255 */
256 p->ref--;
257 if(p->ref == 0){
258 qunlock(p);
259 free(p->q[0]);
260 free(p->q[1]);
261 free(p);
262 } else
263 qunlock(p);
264 }
265
266 static long
267 piperead(Chan *c, void *va, long n, vlong)
268 {
269 Pipe *p;
270
271 p = c->aux;
272
273 switch(NETTYPE(c->qid.path)){
274 case Qdir:
275 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
276 case Qdata0:
277 return qread(p->q[0], va, n);
278 case Qdata1:
279 return qread(p->q[1], va, n);
280 default:
281 panic("piperead");
282 }
283 return -1; /* not reached */
284 }
285
286 static Block*
287 pipebread(Chan *c, long n, ulong offset)
288 {
289 Pipe *p;
290
291 p = c->aux;
292
293 switch(NETTYPE(c->qid.path)){
294 case Qdata0:
295 return qbread(p->q[0], n);
296 case Qdata1:
297 return qbread(p->q[1], n);
298 }
299
300 return devbread(c, n, offset);
301 }
302
303 /*
304 * a write to a closed pipe causes a note to be sent to
305 * the process.
306 */
307 static long
308 pipewrite(Chan *c, void *va, long n, vlong)
309 {
310 Pipe *p;
311
312 if(!islo())
313 print("pipewrite hi %#p\n", getcallerpc(&c));
314 if(waserror()) {
315 /* avoid notes when pipe is a mounted queue */
316 if((c->flag & CMSG) == 0)
317 postnote(up, 1, "sys: write on closed pipe", NUser);
318 nexterror();
319 }
320
321 p = c->aux;
322
323 switch(NETTYPE(c->qid.path)){
324 case Qdata0:
325 n = qwrite(p->q[1], va, n);
326 break;
327
328 case Qdata1:
329 n = qwrite(p->q[0], va, n);
330 break;
331
332 default:
333 panic("pipewrite");
334 }
335
336 poperror();
337 return n;
338 }
339
340 static long
341 pipebwrite(Chan *c, Block *bp, ulong)
342 {
343 long n;
344 Pipe *p;
345
346 if(waserror()) {
347 /* avoid notes when pipe is a mounted queue */
348 if((c->flag & CMSG) == 0)
349 postnote(up, 1, "sys: write on closed pipe", NUser);
350 nexterror();
351 }
352
353 p = c->aux;
354 switch(NETTYPE(c->qid.path)){
355 case Qdata0:
356 n = qbwrite(p->q[1], bp);
357 break;
358
359 case Qdata1:
360 n = qbwrite(p->q[0], bp);
361 break;
362
363 default:
364 n = 0;
365 panic("pipebwrite");
366 }
367
368 poperror();
369 return n;
370 }
371
372 Dev pipedevtab = {
373 '|',
374 "pipe",
375
376 devreset,
377 pipeinit,
378 devshutdown,
379 pipeattach,
380 pipewalk,
381 pipestat,
382 pipeopen,
383 devcreate,
384 pipeclose,
385 piperead,
386 pipebread,
387 pipewrite,
388 pipebwrite,
389 devremove,
390 devwstat,
391 };
Cache object: 0c337f391d11213911fd11b70c94003a
|