FreeBSD/Linux Kernel Cross Reference
sys/ip/rudp.c
1 /*
2 * Reliable User Datagram Protocol, currently only for IPv4.
3 * This protocol is compatible with UDP's packet format.
4 * It could be done over UDP if need be.
5 */
6 #include "u.h"
7 #include "../port/lib.h"
8 #include "mem.h"
9 #include "dat.h"
10 #include "fns.h"
11 #include "../port/error.h"
12
13 #include "ip.h"
14
15 #define DEBUG 0
16 #define DPRINT if(DEBUG)print
17
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
19 (a)-(b):\
20 0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 ((a)>(start)&&(a)<=(end)):\
23 ((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26
27 enum
28 {
29 UDP_PHDRSIZE = 12, /* pseudo header */
30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */
31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
32 UDP_IPHDR = 8, /* ip header */
33 IP_UDPPROTO = 254,
34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
35
36 Rudprxms = 200,
37 Rudptickms = 50,
38 Rudpmaxxmit = 10,
39 Maxunacked = 100,
40 };
41
42 #define Hangupgen 0xffffffff /* used only in hangup messages */
43
44 typedef struct Udphdr Udphdr;
45 struct Udphdr
46 {
47 /* ip header */
48 uchar vihl; /* Version and header length */
49 uchar tos; /* Type of service */
50 uchar length[2]; /* packet length */
51 uchar id[2]; /* Identification */
52 uchar frag[2]; /* Fragment information */
53
54 /* pseudo header starts here */
55 uchar Unused;
56 uchar udpproto; /* Protocol */
57 uchar udpplen[2]; /* Header plus data length */
58 uchar udpsrc[4]; /* Ip source */
59 uchar udpdst[4]; /* Ip destination */
60
61 /* udp header */
62 uchar udpsport[2]; /* Source port */
63 uchar udpdport[2]; /* Destination port */
64 uchar udplen[2]; /* data length */
65 uchar udpcksum[2]; /* Checksum */
66 };
67
68 typedef struct Rudphdr Rudphdr;
69 struct Rudphdr
70 {
71 /* ip header */
72 uchar vihl; /* Version and header length */
73 uchar tos; /* Type of service */
74 uchar length[2]; /* packet length */
75 uchar id[2]; /* Identification */
76 uchar frag[2]; /* Fragment information */
77
78 /* pseudo header starts here */
79 uchar Unused;
80 uchar udpproto; /* Protocol */
81 uchar udpplen[2]; /* Header plus data length */
82 uchar udpsrc[4]; /* Ip source */
83 uchar udpdst[4]; /* Ip destination */
84
85 /* udp header */
86 uchar udpsport[2]; /* Source port */
87 uchar udpdport[2]; /* Destination port */
88 uchar udplen[2]; /* data length (includes rudp header) */
89 uchar udpcksum[2]; /* Checksum */
90
91 /* rudp header */
92 uchar relseq[4]; /* id of this packet (or 0) */
93 uchar relsgen[4]; /* generation/time stamp */
94 uchar relack[4]; /* packet being acked (or 0) */
95 uchar relagen[4]; /* generation/time stamp */
96 };
97
98
99 /*
100 * one state structure per destination
101 */
102 typedef struct Reliable Reliable;
103 struct Reliable
104 {
105 Ref;
106
107 Reliable *next;
108
109 uchar addr[IPaddrlen]; /* always V6 when put here */
110 ushort port;
111
112 Block *unacked; /* unacked msg list */
113 Block *unackedtail; /* and its tail */
114
115 int timeout; /* time since first unacked msg sent */
116 int xmits; /* number of times first unacked msg sent */
117
118 ulong sndseq; /* next packet to be sent */
119 ulong sndgen; /* and its generation */
120
121 ulong rcvseq; /* last packet received */
122 ulong rcvgen; /* and its generation */
123
124 ulong acksent; /* last ack sent */
125 ulong ackrcvd; /* last msg for which ack was rcvd */
126
127 /* flow control */
128 QLock lock;
129 Rendez vous;
130 int blocked;
131 };
132
133
134
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
137 struct Rudpstats
138 {
139 ulong rudpInDatagrams;
140 ulong rudpNoPorts;
141 ulong rudpInErrors;
142 ulong rudpOutDatagrams;
143 };
144
145 typedef struct Rudppriv Rudppriv;
146 struct Rudppriv
147 {
148 Ipht ht;
149
150 /* MIB counters */
151 Rudpstats ustats;
152
153 /* non-MIB stats */
154 ulong csumerr; /* checksum errors */
155 ulong lenerr; /* short packet */
156 ulong rxmits; /* # of retransmissions */
157 ulong orders; /* # of out of order pkts */
158
159 /* keeping track of the ack kproc */
160 int ackprocstarted;
161 QLock apl;
162 };
163
164
165 static ulong generation = 0;
166 static Rendez rend;
167
168 /*
169 * protocol specific part of Conv
170 */
171 typedef struct Rudpcb Rudpcb;
172 struct Rudpcb
173 {
174 QLock;
175 uchar headers;
176 uchar randdrop;
177 Reliable *r;
178 };
179
180 /*
181 * local functions
182 */
183 void relsendack(Conv*, Reliable*, int);
184 int reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void relput(Reliable*);
187 void relforget(Conv *, uchar*, int, int);
188 void relackproc(void *);
189 void relackq(Reliable *, Block*);
190 void relhangup(Conv *, Reliable*);
191 void relrexmit(Conv *, Reliable*);
192 void relput(Reliable*);
193 void rudpkick(void *x);
194
195 static void
196 rudpstartackproc(Proto *rudp)
197 {
198 Rudppriv *rpriv;
199 char kpname[KNAMELEN];
200
201 rpriv = rudp->priv;
202 if(rpriv->ackprocstarted == 0){
203 qlock(&rpriv->apl);
204 if(rpriv->ackprocstarted == 0){
205 sprint(kpname, "#I%drudpack", rudp->f->dev);
206 kproc(kpname, relackproc, rudp);
207 rpriv->ackprocstarted = 1;
208 }
209 qunlock(&rpriv->apl);
210 }
211 }
212
213 static char*
214 rudpconnect(Conv *c, char **argv, int argc)
215 {
216 char *e;
217 Rudppriv *upriv;
218
219 upriv = c->p->priv;
220 rudpstartackproc(c->p);
221 e = Fsstdconnect(c, argv, argc);
222 Fsconnected(c, e);
223 iphtadd(&upriv->ht, c);
224
225 return e;
226 }
227
228
229 static int
230 rudpstate(Conv *c, char *state, int n)
231 {
232 Rudpcb *ucb;
233 Reliable *r;
234 int m;
235
236 m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
237 ucb = (Rudpcb*)c->ptcl;
238 qlock(ucb);
239 for(r = ucb->r; r; r = r->next)
240 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
241 m += snprint(state+m, n-m, "\n");
242 qunlock(ucb);
243 return m;
244 }
245
246 static char*
247 rudpannounce(Conv *c, char** argv, int argc)
248 {
249 char *e;
250 Rudppriv *upriv;
251
252 upriv = c->p->priv;
253 rudpstartackproc(c->p);
254 e = Fsstdannounce(c, argv, argc);
255 if(e != nil)
256 return e;
257 Fsconnected(c, nil);
258 iphtadd(&upriv->ht, c);
259
260 return nil;
261 }
262
263 static void
264 rudpcreate(Conv *c)
265 {
266 c->rq = qopen(64*1024, Qmsg, 0, 0);
267 c->wq = qopen(64*1024, Qkick, rudpkick, c);
268 }
269
270 static void
271 rudpclose(Conv *c)
272 {
273 Rudpcb *ucb;
274 Reliable *r, *nr;
275 Rudppriv *upriv;
276
277 upriv = c->p->priv;
278 iphtrem(&upriv->ht, c);
279
280 /* force out any delayed acks */
281 ucb = (Rudpcb*)c->ptcl;
282 qlock(ucb);
283 for(r = ucb->r; r; r = r->next){
284 if(r->acksent != r->rcvseq)
285 relsendack(c, r, 0);
286 }
287 qunlock(ucb);
288
289 qclose(c->rq);
290 qclose(c->wq);
291 qclose(c->eq);
292 ipmove(c->laddr, IPnoaddr);
293 ipmove(c->raddr, IPnoaddr);
294 c->lport = 0;
295 c->rport = 0;
296
297 ucb->headers = 0;
298 ucb->randdrop = 0;
299 qlock(ucb);
300 for(r = ucb->r; r; r = nr){
301 if(r->acksent != r->rcvseq)
302 relsendack(c, r, 0);
303 nr = r->next;
304 relhangup(c, r);
305 relput(r);
306 }
307 ucb->r = 0;
308
309 qunlock(ucb);
310 }
311
312 /*
313 * randomly don't send packets
314 */
315 static void
316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
317 {
318 Rudpcb *ucb;
319
320 ucb = (Rudpcb*)c->ptcl;
321 if(ucb->randdrop && nrand(100) < ucb->randdrop)
322 freeblist(bp);
323 else
324 ipoput4(f, bp, x, ttl, tos, nil);
325 }
326
327 int
328 flow(void *v)
329 {
330 Reliable *r = v;
331
332 return UNACKED(r) <= Maxunacked;
333 }
334
335 void
336 rudpkick(void *x)
337 {
338 Conv *c = x;
339 Udphdr *uh;
340 ushort rport;
341 uchar laddr[IPaddrlen], raddr[IPaddrlen];
342 Block *bp;
343 Rudpcb *ucb;
344 Rudphdr *rh;
345 Reliable *r;
346 int dlen, ptcllen;
347 Rudppriv *upriv;
348 Fs *f;
349
350 upriv = c->p->priv;
351 f = c->p->f;
352
353 netlog(c->p->f, Logrudp, "rudp: kick\n");
354 bp = qget(c->wq);
355 if(bp == nil)
356 return;
357
358 ucb = (Rudpcb*)c->ptcl;
359 switch(ucb->headers) {
360 case 7:
361 /* get user specified addresses */
362 bp = pullupblock(bp, UDP_USEAD7);
363 if(bp == nil)
364 return;
365 ipmove(raddr, bp->rp);
366 bp->rp += IPaddrlen;
367 ipmove(laddr, bp->rp);
368 bp->rp += IPaddrlen;
369 /* pick interface closest to dest */
370 if(ipforme(f, laddr) != Runi)
371 findlocalip(f, laddr, raddr);
372 bp->rp += IPaddrlen; /* Ignore ifc address */
373 rport = nhgets(bp->rp);
374 bp->rp += 2+2; /* Ignore local port */
375 break;
376 default:
377 ipmove(raddr, c->raddr);
378 ipmove(laddr, c->laddr);
379 rport = c->rport;
380 break;
381 }
382
383 dlen = blocklen(bp);
384
385 /* Make space to fit rudp & ip header */
386 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
387 if(bp == nil)
388 return;
389
390 uh = (Udphdr *)(bp->rp);
391 uh->vihl = IP_VER4;
392
393 rh = (Rudphdr*)uh;
394
395 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
396 uh->Unused = 0;
397 uh->udpproto = IP_UDPPROTO;
398 uh->frag[0] = 0;
399 uh->frag[1] = 0;
400 hnputs(uh->udpplen, ptcllen);
401 switch(ucb->headers){
402 case 7:
403 v6tov4(uh->udpdst, raddr);
404 hnputs(uh->udpdport, rport);
405 v6tov4(uh->udpsrc, laddr);
406 break;
407 default:
408 v6tov4(uh->udpdst, c->raddr);
409 hnputs(uh->udpdport, c->rport);
410 if(ipcmp(c->laddr, IPnoaddr) == 0)
411 findlocalip(f, c->laddr, c->raddr);
412 v6tov4(uh->udpsrc, c->laddr);
413 break;
414 }
415 hnputs(uh->udpsport, c->lport);
416 hnputs(uh->udplen, ptcllen);
417 uh->udpcksum[0] = 0;
418 uh->udpcksum[1] = 0;
419
420 qlock(ucb);
421 r = relstate(ucb, raddr, rport, "kick");
422 r->sndseq = NEXTSEQ(r->sndseq);
423 hnputl(rh->relseq, r->sndseq);
424 hnputl(rh->relsgen, r->sndgen);
425
426 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
427 hnputl(rh->relagen, r->rcvgen);
428
429 if(r->rcvseq != r->acksent)
430 r->acksent = r->rcvseq;
431
432 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
433
434 relackq(r, bp);
435 qunlock(ucb);
436
437 upriv->ustats.rudpOutDatagrams++;
438
439 DPRINT("sent: %lud/%lud, %lud/%lud\n",
440 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
441
442 doipoput(c, f, bp, 0, c->ttl, c->tos);
443
444 if(waserror()) {
445 relput(r);
446 qunlock(&r->lock);
447 nexterror();
448 }
449
450 /* flow control of sorts */
451 qlock(&r->lock);
452 if(UNACKED(r) > Maxunacked){
453 r->blocked = 1;
454 sleep(&r->vous, flow, r);
455 r->blocked = 0;
456 }
457
458 qunlock(&r->lock);
459 relput(r);
460 poperror();
461 }
462
463 void
464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
465 {
466 int len, olen, ottl;
467 Udphdr *uh;
468 Conv *c;
469 Rudpcb *ucb;
470 uchar raddr[IPaddrlen], laddr[IPaddrlen];
471 ushort rport, lport;
472 Rudppriv *upriv;
473 Fs *f;
474 uchar *p;
475
476 upriv = rudp->priv;
477 f = rudp->f;
478
479 upriv->ustats.rudpInDatagrams++;
480
481 uh = (Udphdr*)(bp->rp);
482
483 /* Put back pseudo header for checksum
484 * (remember old values for icmpnoconv())
485 */
486 ottl = uh->Unused;
487 uh->Unused = 0;
488 len = nhgets(uh->udplen);
489 olen = nhgets(uh->udpplen);
490 hnputs(uh->udpplen, len);
491
492 v4tov6(raddr, uh->udpsrc);
493 v4tov6(laddr, uh->udpdst);
494 lport = nhgets(uh->udpdport);
495 rport = nhgets(uh->udpsport);
496
497 if(nhgets(uh->udpcksum)) {
498 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
499 upriv->ustats.rudpInErrors++;
500 upriv->csumerr++;
501 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
502 DPRINT("rudp: checksum error %I\n", raddr);
503 freeblist(bp);
504 return;
505 }
506 }
507
508 qlock(rudp);
509
510 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
511 if(c == nil){
512 /* no conversation found */
513 upriv->ustats.rudpNoPorts++;
514 qunlock(rudp);
515 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
516 laddr, lport);
517 uh->Unused = ottl;
518 hnputs(uh->udpplen, olen);
519 icmpnoconv(f, bp);
520 freeblist(bp);
521 return;
522 }
523 ucb = (Rudpcb*)c->ptcl;
524 qlock(ucb);
525 qunlock(rudp);
526
527 if(reliput(c, bp, raddr, rport) < 0){
528 qunlock(ucb);
529 freeb(bp);
530 return;
531 }
532
533 /*
534 * Trim the packet down to data size
535 */
536
537 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
538 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
539 if(bp == nil) {
540 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
541 raddr, rport, laddr, lport);
542 DPRINT("rudp: len err %I.%d -> %I.%d\n",
543 raddr, rport, laddr, lport);
544 upriv->lenerr++;
545 return;
546 }
547
548 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
549 raddr, rport, laddr, lport, len);
550
551 switch(ucb->headers){
552 case 7:
553 /* pass the src address */
554 bp = padblock(bp, UDP_USEAD7);
555 p = bp->rp;
556 ipmove(p, raddr); p += IPaddrlen;
557 ipmove(p, laddr); p += IPaddrlen;
558 ipmove(p, ifc->lifc->local); p += IPaddrlen;
559 hnputs(p, rport); p += 2;
560 hnputs(p, lport);
561 break;
562 default:
563 /* connection oriented rudp */
564 if(ipcmp(c->raddr, IPnoaddr) == 0){
565 /* save the src address in the conversation */
566 ipmove(c->raddr, raddr);
567 c->rport = rport;
568
569 /* reply with the same ip address (if not broadcast) */
570 if(ipforme(f, laddr) == Runi)
571 ipmove(c->laddr, laddr);
572 else
573 v4tov6(c->laddr, ifc->lifc->local);
574 }
575 break;
576 }
577 if(bp->next)
578 bp = concatblock(bp);
579
580 if(qfull(c->rq)) {
581 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
582 laddr, lport);
583 freeblist(bp);
584 }
585 else
586 qpass(c->rq, bp);
587
588 qunlock(ucb);
589 }
590
591 static char *rudpunknown = "unknown rudp ctl request";
592
593 char*
594 rudpctl(Conv *c, char **f, int n)
595 {
596 Rudpcb *ucb;
597 uchar ip[IPaddrlen];
598 int x;
599
600 ucb = (Rudpcb*)c->ptcl;
601 if(n < 1)
602 return rudpunknown;
603
604 if(strcmp(f[0], "headers") == 0){
605 ucb->headers = 7; /* new headers format */
606 return nil;
607 } else if(strcmp(f[0], "hangup") == 0){
608 if(n < 3)
609 return "bad syntax";
610 if (parseip(ip, f[1]) == -1)
611 return Ebadip;
612 x = atoi(f[2]);
613 qlock(ucb);
614 relforget(c, ip, x, 1);
615 qunlock(ucb);
616 return nil;
617 } else if(strcmp(f[0], "randdrop") == 0){
618 x = 10; /* default is 10% */
619 if(n > 1)
620 x = atoi(f[1]);
621 if(x > 100 || x < 0)
622 return "illegal rudp drop rate";
623 ucb->randdrop = x;
624 return nil;
625 }
626 return rudpunknown;
627 }
628
629 void
630 rudpadvise(Proto *rudp, Block *bp, char *msg)
631 {
632 Udphdr *h;
633 uchar source[IPaddrlen], dest[IPaddrlen];
634 ushort psource, pdest;
635 Conv *s, **p;
636
637 h = (Udphdr*)(bp->rp);
638
639 v4tov6(dest, h->udpdst);
640 v4tov6(source, h->udpsrc);
641 psource = nhgets(h->udpsport);
642 pdest = nhgets(h->udpdport);
643
644 /* Look for a connection */
645 for(p = rudp->conv; *p; p++) {
646 s = *p;
647 if(s->rport == pdest)
648 if(s->lport == psource)
649 if(ipcmp(s->raddr, dest) == 0)
650 if(ipcmp(s->laddr, source) == 0){
651 qhangup(s->rq, msg);
652 qhangup(s->wq, msg);
653 break;
654 }
655 }
656 freeblist(bp);
657 }
658
659 int
660 rudpstats(Proto *rudp, char *buf, int len)
661 {
662 Rudppriv *upriv;
663
664 upriv = rudp->priv;
665 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
666 upriv->ustats.rudpInDatagrams,
667 upriv->ustats.rudpNoPorts,
668 upriv->ustats.rudpInErrors,
669 upriv->ustats.rudpOutDatagrams,
670 upriv->rxmits,
671 upriv->orders);
672 }
673
674 void
675 rudpinit(Fs *fs)
676 {
677
678 Proto *rudp;
679
680 rudp = smalloc(sizeof(Proto));
681 rudp->priv = smalloc(sizeof(Rudppriv));
682 rudp->name = "rudp";
683 rudp->connect = rudpconnect;
684 rudp->announce = rudpannounce;
685 rudp->ctl = rudpctl;
686 rudp->state = rudpstate;
687 rudp->create = rudpcreate;
688 rudp->close = rudpclose;
689 rudp->rcv = rudpiput;
690 rudp->advise = rudpadvise;
691 rudp->stats = rudpstats;
692 rudp->ipproto = IP_UDPPROTO;
693 rudp->nc = 16;
694 rudp->ptclsize = sizeof(Rudpcb);
695
696 Fsproto(fs, rudp);
697 }
698
699 /*********************************************/
700 /* Here starts the reliable helper functions */
701 /*********************************************/
702 /*
703 * Enqueue a copy of an unacked block for possible retransmissions
704 */
705 void
706 relackq(Reliable *r, Block *bp)
707 {
708 Block *np;
709
710 np = copyblock(bp, blocklen(bp));
711 if(r->unacked)
712 r->unackedtail->list = np;
713 else {
714 /* restart timer */
715 r->timeout = 0;
716 r->xmits = 1;
717 r->unacked = np;
718 }
719 r->unackedtail = np;
720 np->list = nil;
721 }
722
723 /*
724 * retransmit unacked blocks
725 */
726 void
727 relackproc(void *a)
728 {
729 Rudpcb *ucb;
730 Proto *rudp;
731 Reliable *r;
732 Conv **s, *c;
733
734 rudp = (Proto *)a;
735
736 loop:
737 tsleep(&up->sleep, return0, 0, Rudptickms);
738
739 for(s = rudp->conv; *s; s++) {
740 c = *s;
741 ucb = (Rudpcb*)c->ptcl;
742 qlock(ucb);
743
744 for(r = ucb->r; r; r = r->next) {
745 if(r->unacked != nil){
746 r->timeout += Rudptickms;
747 if(r->timeout > Rudprxms*r->xmits)
748 relrexmit(c, r);
749 }
750 if(r->acksent != r->rcvseq)
751 relsendack(c, r, 0);
752 }
753 qunlock(ucb);
754 }
755 goto loop;
756 }
757
758 /*
759 * get the state record for a conversation
760 */
761 Reliable*
762 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
763 {
764 Reliable *r, **l;
765
766 l = &ucb->r;
767 for(r = *l; r; r = *l){
768 if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
769 port == r->port)
770 break;
771 l = &r->next;
772 }
773
774 /* no state for this addr/port, create some */
775 if(r == nil){
776 while(generation == 0)
777 generation = rand();
778
779 DPRINT("from %s new state %lud for %I!%ud\n",
780 from, generation, addr, port);
781
782 r = smalloc(sizeof(Reliable));
783 memmove(r->addr, addr, IPaddrlen);
784 r->port = port;
785 r->unacked = 0;
786 if(generation == Hangupgen)
787 generation++;
788 r->sndgen = generation++;
789 r->sndseq = 0;
790 r->ackrcvd = 0;
791 r->rcvgen = 0;
792 r->rcvseq = 0;
793 r->acksent = 0;
794 r->xmits = 0;
795 r->timeout = 0;
796 r->ref = 0;
797 incref(r); /* one reference for being in the list */
798
799 *l = r;
800 }
801
802 incref(r);
803 return r;
804 }
805
806 void
807 relput(Reliable *r)
808 {
809 if(decref(r) == 0)
810 free(r);
811 }
812
813 /*
814 * forget a Reliable state
815 */
816 void
817 relforget(Conv *c, uchar *ip, int port, int originator)
818 {
819 Rudpcb *ucb;
820 Reliable *r, **l;
821
822 ucb = (Rudpcb*)c->ptcl;
823
824 l = &ucb->r;
825 for(r = *l; r; r = *l){
826 if(ipcmp(ip, r->addr) == 0 && port == r->port){
827 *l = r->next;
828 if(originator)
829 relsendack(c, r, 1);
830 relhangup(c, r);
831 relput(r); /* remove from the list */
832 break;
833 }
834 l = &r->next;
835 }
836 }
837
838 /*
839 * process a rcvd reliable packet. return -1 if not to be passed to user process,
840 * 0 therwise.
841 *
842 * called with ucb locked.
843 */
844 int
845 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
846 {
847 Block *nbp;
848 Rudpcb *ucb;
849 Rudppriv *upriv;
850 Udphdr *uh;
851 Reliable *r;
852 Rudphdr *rh;
853 ulong seq, ack, sgen, agen, ackreal;
854 int rv = -1;
855
856 /* get fields */
857 uh = (Udphdr*)(bp->rp);
858 rh = (Rudphdr*)uh;
859 seq = nhgetl(rh->relseq);
860 sgen = nhgetl(rh->relsgen);
861 ack = nhgetl(rh->relack);
862 agen = nhgetl(rh->relagen);
863
864 upriv = c->p->priv;
865 ucb = (Rudpcb*)c->ptcl;
866 r = relstate(ucb, addr, port, "input");
867
868 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
869 seq, sgen, ack, agen, r->sndgen);
870
871 /* if acking an incorrect generation, ignore */
872 if(ack && agen != r->sndgen)
873 goto out;
874
875 /* Look for a hangup */
876 if(sgen == Hangupgen) {
877 if(agen == r->sndgen)
878 relforget(c, addr, port, 0);
879 goto out;
880 }
881
882 /* make sure we're not talking to a new remote side */
883 if(r->rcvgen != sgen){
884 if(seq != 0 && seq != 1)
885 goto out;
886
887 /* new connection */
888 if(r->rcvgen != 0){
889 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
890 relhangup(c, r);
891 }
892 r->rcvgen = sgen;
893 }
894
895 /* dequeue acked packets */
896 if(ack && agen == r->sndgen){
897 ackreal = 0;
898 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
899 nbp = r->unacked;
900 r->unacked = nbp->list;
901 DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
902 ack, agen, r->sndgen);
903 freeb(nbp);
904 r->ackrcvd = NEXTSEQ(r->ackrcvd);
905 ackreal = 1;
906 }
907
908 /* flow control */
909 if(UNACKED(r) < Maxunacked/8 && r->blocked)
910 wakeup(&r->vous);
911
912 /*
913 * retransmit next packet if the acked packet
914 * was transmitted more than once
915 */
916 if(ackreal && r->unacked != nil){
917 r->timeout = 0;
918 if(r->xmits > 1){
919 r->xmits = 1;
920 relrexmit(c, r);
921 }
922 }
923
924 }
925
926 /* no message or input queue full */
927 if(seq == 0 || qfull(c->rq))
928 goto out;
929
930 /* refuse out of order delivery */
931 if(seq != NEXTSEQ(r->rcvseq)){
932 relsendack(c, r, 0); /* tell him we got it already */
933 upriv->orders++;
934 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
935 goto out;
936 }
937 r->rcvseq = seq;
938
939 rv = 0;
940 out:
941 relput(r);
942 return rv;
943 }
944
945 void
946 relsendack(Conv *c, Reliable *r, int hangup)
947 {
948 Udphdr *uh;
949 Block *bp;
950 Rudphdr *rh;
951 int ptcllen;
952 Fs *f;
953
954 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
955 if(bp == nil)
956 return;
957 bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
958 f = c->p->f;
959 uh = (Udphdr *)(bp->rp);
960 uh->vihl = IP_VER4;
961 rh = (Rudphdr*)uh;
962
963 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
964 uh->Unused = 0;
965 uh->udpproto = IP_UDPPROTO;
966 uh->frag[0] = 0;
967 uh->frag[1] = 0;
968 hnputs(uh->udpplen, ptcllen);
969
970 v6tov4(uh->udpdst, r->addr);
971 hnputs(uh->udpdport, r->port);
972 hnputs(uh->udpsport, c->lport);
973 if(ipcmp(c->laddr, IPnoaddr) == 0)
974 findlocalip(f, c->laddr, c->raddr);
975 v6tov4(uh->udpsrc, c->laddr);
976 hnputs(uh->udplen, ptcllen);
977
978 if(hangup)
979 hnputl(rh->relsgen, Hangupgen);
980 else
981 hnputl(rh->relsgen, r->sndgen);
982 hnputl(rh->relseq, 0);
983 hnputl(rh->relagen, r->rcvgen);
984 hnputl(rh->relack, r->rcvseq);
985
986 if(r->acksent < r->rcvseq)
987 r->acksent = r->rcvseq;
988
989 uh->udpcksum[0] = 0;
990 uh->udpcksum[1] = 0;
991 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
992
993 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
994 doipoput(c, f, bp, 0, c->ttl, c->tos);
995 }
996
997
998 /*
999 * called with ucb locked (and c locked if user initiated close)
1000 */
1001 void
1002 relhangup(Conv *c, Reliable *r)
1003 {
1004 int n;
1005 Block *bp;
1006 char hup[ERRMAX];
1007
1008 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1009 qproduce(c->eq, hup, n);
1010
1011 /*
1012 * dump any unacked outgoing messages
1013 */
1014 for(bp = r->unacked; bp != nil; bp = r->unacked){
1015 r->unacked = bp->list;
1016 bp->list = nil;
1017 freeb(bp);
1018 }
1019
1020 r->rcvgen = 0;
1021 r->rcvseq = 0;
1022 r->acksent = 0;
1023 if(generation == Hangupgen)
1024 generation++;
1025 r->sndgen = generation++;
1026 r->sndseq = 0;
1027 r->ackrcvd = 0;
1028 r->xmits = 0;
1029 r->timeout = 0;
1030 wakeup(&r->vous);
1031 }
1032
1033 /*
1034 * called with ucb locked
1035 */
1036 void
1037 relrexmit(Conv *c, Reliable *r)
1038 {
1039 Rudppriv *upriv;
1040 Block *np;
1041 Fs *f;
1042
1043 upriv = c->p->priv;
1044 f = c->p->f;
1045 r->timeout = 0;
1046 if(r->xmits++ > Rudpmaxxmit){
1047 relhangup(c, r);
1048 return;
1049 }
1050
1051 upriv->rxmits++;
1052 np = copyblock(r->unacked, blocklen(r->unacked));
1053 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1054 doipoput(c, f, np, 0, c->ttl, c->tos);
1055 }
Cache object: d5ebc9b7e513eac7a25b90cd6df47d6d
|