1 /*
2 * Mach Operating System
3 * Copyright (c) 1992 Carnegie Mellon University
4 * All Rights Reserved.
5 *
6 * Permission to use, copy, modify and distribute this software and its
7 * documentation is hereby granted, provided that both the copyright
8 * notice and this permission notice appear in all copies of the
9 * software, derivative works or modified versions, and any portions
10 * thereof, and that both notices appear in supporting documentation.
11 *
12 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
13 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
14 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
15 *
16 * Carnegie Mellon requests users of this software to return to
17 *
18 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
19 * School of Computer Science
20 * Carnegie Mellon University
21 * Pittsburgh PA 15213-3890
22 *
23 * any improvements or extensions that they make and grant Carnegie Mellon
24 * the rights to redistribute these changes.
25 */
26 /*
27 * HISTORY
28 * $Log: ipc_unreliable.c,v $
29 * Revision 2.2 92/03/10 16:28:31 jsb
30 * Merged in norma branch changes as of NORMA_MK7.
31 * [92/03/09 12:50:36 jsb]
32 *
33 * Revision 2.1.2.3 92/02/21 11:25:08 jsb
34 * Reduced netipc_ticks to something more reasonable.
35 * [92/02/20 14:01:35 jsb]
36 *
37 * Revision 2.1.2.2 92/02/18 19:16:47 jeffreyh
38 * [intel] added callhere debugging stuff for iPSC.
39 * [92/02/13 13:07:35 jeffreyh]
40 *
41 * Revision 2.1.2.1 92/01/21 21:53:02 jsb
42 * Use softclock timeouts instead of call from clock_interrupt.
43 * [92/01/17 12:20:09 jsb]
44 *
45 * Added netipc_checksum flag for use under NETIPC_CHECKSUM conditional.
46 * Never panic on checksum failure. Enter send queue when remembering
47 * nack when netipc_sending is true. Forget nack when receiving ack.
48 * Added netipc_receiving flag.
49 * [92/01/16 22:27:47 jsb]
50 *
51 * Added conditionalized checksumming code. Eliminated redundant and
52 * unused pcs_last_unacked field. Added pcs_nacked field to avoid
53 * throwing away nacks when netipc_sending is true. Removed panics
54 * on obsolete seqids. Eliminated ancient obsolete comments.
55 * Added netipc_start_receive call, which takes account of ipc_ether.c
56 * now modifying vec[i].size to indicate actual received amounts.
57 * [92/01/14 21:53:20 jsb]
58 *
59 * Removed netipc_packet definitions and references. Changes for new
60 * interface with norma/ipc_output.c (look there for explanation).
61 * [92/01/13 20:21:54 jsb]
62 *
63 * Changed to use pcs structures and queue macros.
64 * Added netipc_pcs_print. De-linted.
65 * [92/01/13 10:19:19 jsb]
66 *
67 * First checkin. Simple protocol for unreliable networks.
68 * [92/01/11 17:39:37 jsb]
69 *
70 * First checkin. Contains functions moved from norma/ipc_net.c.
71 * [92/01/10 20:47:49 jsb]
72 *
73 */
74 /*
75 * File: norma/ipc_send.c
76 * Author: Joseph S. Barrera III
77 * Date: 1990
78 *
79 * Functions to support ipc between nodes in a single Mach cluster.
80 */
81
82 #include <kern/queue.h>
83 #include <kern/time_out.h>
84 #include <norma/ipc_net.h>
85
86 #define CTL_NONE 0L
87 #define CTL_ACK 1L
88 #define CTL_NACK 2L
89 #define CTL_SYNC 3L
90 #define CTL_QUENCH 4L /* not used yet */
91
92 #define MAX_NUM_NODES 256 /* XXX */
93
94 int netipc_self_stopped;
95 struct netipc_hdr netipc_recv_hdr;
96
97 extern int Noise6;
98 extern int Noise7;
99
100 extern void netipc_recv_ack_with_status();
101
102 boolean_t netipc_receiving = FALSE;
103 struct netvec netvec_r[2];
104
105 vm_page_t netipc_recv_page;
106 vm_page_t netipc_fallback_page = VM_PAGE_NULL;
107 extern vm_page_t netipc_page_grab();
108
109 int c_netipc_stop = 0;
110 int c_netipc_unstop = 0;
111 int c_netipc_old_recv = 0;
112
113 #define MAX_WINDOW_SIZE 1
114
115 #define WX (MAX_WINDOW_SIZE + 1)
116
117 /* XXX must make sure seqids never wrap to 0 */
118
119 extern unsigned long node_incarnation;
120
121 /*
122 * Protocol control structure
123 * (struct pcb is already in use)
124 */
125 typedef struct pcs *pcs_t;
126 #define PCS_NULL ((pcs_t) 0)
127 struct pcs {
128 unsigned long pcs_remote;
129 unsigned long pcs_last_received;
130 unsigned long pcs_last_sent;
131 unsigned long pcs_nacked;
132 unsigned long pcs_unacked_packetid[WX];
133 unsigned long pcs_incarnation;
134 unsigned long pcs_new_incarnation;
135 unsigned long pcs_ctl;
136 unsigned long pcs_ctl_seqid;
137 kern_return_t pcs_ctl_status;
138 unsigned long pcs_ctl_data;
139 int pcs_ticks;
140 queue_chain_t pcs_timer;
141 queue_chain_t pcs_unack;
142 queue_chain_t pcs_send;
143 };
144
145 queue_head_t netipc_timers;
146 queue_head_t netipc_unacks;
147 queue_head_t netipc_sends;
148
149 struct pcs netipc_pcs[MAX_NUM_NODES];
150
151 /*
152 * Counters and such for debugging
153 */
154 int c_netipc_timeout = 0;
155 int c_netipc_retry_k = 0;
156 int c_netipc_retry_m = 0;
157 int c_netipc_retry_p = 0;
158 int c_netipc_retry_o = 0;
159
160 struct netipc_hdr send_hdr_a;
161 unsigned long send_data_a = 0xabcd9876;
162
163 struct netvec netvec_a[2];
164
165 boolean_t netipc_sending;
166
167 #if NETIPC_CHECKSUM
168 int netipc_checksum = 1;
169 int netipc_checksum_print = 1;
170
171 #define longword_aligned(x) ((((unsigned long)(x)) & (sizeof(long)-1)) == 0)
172
173 unsigned long
174 netipc_compute_checksum(vec, count)
175 register struct netvec *vec;
176 unsigned int count;
177 {
178 int i;
179 register unsigned long checksum = 0;
180 register unsigned long *data;
181 register int j;
182
183 if (! netipc_checksum) {
184 return 0;
185 }
186 for (i = 0; i < count; i++) {
187 data = (unsigned long *) DEVTOKV(vec[i].addr);
188 assert(longword_aligned((unsigned long) data));
189 assert(longword_aligned(vec[i].size));
190 for (j = vec[i].size / sizeof(*data) - 1; j >= 0; j--) {
191 checksum += data[j];
192 }
193 }
194 return checksum;
195 }
196 #endif NETIPC_CHECKSUM
197
198 #if mips
199 #include <mips/mips_cpu.h>
200 /*
201 * XXX
202 * This works for write-through caches, albeit slowly.
203 * Need a much better solution.
204 */
205 vm_offset_t
206 phystokv(phys)
207 vm_offset_t phys;
208 {
209 return PHYS_TO_K1SEG(phys); /* uncached */
210 }
211 #endif
212
213 /*
214 * Use queue_chain_t to record whether we are already on the queue.
215 */
216 #define queue_untable(head, elt, type, field)\
217 {\
218 queue_remove(head, elt, type, field);\
219 (elt)->field.prev = &((elt)->field);\
220 }
221 #define queue_tabled(q) ((q)->prev != (q))
222
223 netipc_cleanup_send_state(pcs)
224 pcs_t pcs;
225 {
226 int i;
227
228 /*
229 * Clean up connection state.
230 */
231 pcs->pcs_last_sent = 0;
232 for (i = 0; i < WX; i++) {
233 pcs->pcs_unacked_packetid[i] = 0;
234 }
235 if (queue_tabled(&pcs->pcs_timer)) {
236 queue_untable(&netipc_timers, pcs, pcs_t, pcs_timer);
237 }
238 if (queue_tabled(&pcs->pcs_send)) {
239 queue_untable(&netipc_sends, pcs, pcs_t, pcs_send);
240 }
241 if (queue_tabled(&pcs->pcs_unack)) {
242 queue_untable(&netipc_unacks, pcs, pcs_t, pcs_unack);
243 }
244
245 /*
246 * XXX
247 * This should be called by norma ipc layer
248 */
249 netipc_cleanup_incarnation_complete(pcs->pcs_remote);
250 }
251
252 /*
253 * A timer for a remote node gets set to 1 when a message is sent
254 * to that node. Every so many milliseconds, the timer value is
255 * incremented. When it reaches a certain value (currently 2),
256 * a sync message is sent to see whether we should retransmit.
257 *
258 * The timer SHOULD be set to 0 when the message is acknowledged.
259 */
260
261 int
262 netipc_unacked_seqid(pcs)
263 register pcs_t pcs;
264 {
265 register unsigned long seqid;
266
267 seqid = pcs->pcs_last_sent - (MAX_WINDOW_SIZE - 1);
268 if ((long) seqid < 0) {
269 seqid = 0;
270 }
271 for (; seqid <= pcs->pcs_last_sent; seqid++) {
272 if (pcs->pcs_unacked_packetid[seqid % WX]) {
273 return seqid;
274 }
275 }
276 return 0;
277 }
278
279 extern int netipc_timeout_intr();
280
281 struct timer_elt netipc_timer_elt;
282
283 int netipc_ticks = 5;
284
285 netipc_set_timeout()
286 {
287 netipc_timer_elt.fcn = netipc_timeout_intr;
288 netipc_timer_elt.param = 0;
289 set_timeout(&netipc_timer_elt, netipc_ticks);
290 }
291
292 _netipc_timeout_intr()
293 {
294 register pcs_t pcs;
295 register unsigned long seqid;
296
297 #if iPSC386 || iPSC860
298 netipc_called_here(__FILE__, __LINE__, "_netipc_timeout_intr (enter)");
299 #endif iPSC386 || iPSC860
300 queue_iterate(&netipc_timers, pcs, pcs_t, pcs_timer) {
301 if (pcs->pcs_ticks++ == 0) {
302 #if iPSC386 || iPSC860
303 netipc_called_here(__FILE__, __LINE__, "{pcs->pcs_ticks++ == 0}");
304 #endif iPSC386 || iPSC860
305 continue;
306 }
307 assert(pcs->pcs_nacked == 0L);
308 seqid = netipc_unacked_seqid(pcs);
309 if (seqid == 0) {
310 queue_untable(&netipc_timers, pcs, pcs_t, pcs_timer);
311 #if iPSC386 || iPSC860
312 netipc_called_here(__FILE__, __LINE__, "{seqid == 0}");
313 #endif iPSC386 || iPSC860
314 continue;
315 }
316
317 /*
318 * Something has timed out. Send a sync.
319 * XXX (for just first seqid? How about a bitmask?)
320 * XXX add exponential backoff here, perhaps
321 */
322 c_netipc_timeout++;
323 #if iPSC386 || iPSC860
324 netipc_called_here(__FILE__, __LINE__, "{c_netipc_timeout++}");
325 #endif iPSC386 || iPSC860
326 printf2("timeout %d\n", pcs->pcs_remote);
327 netipc_queue_ctl(pcs, CTL_SYNC, seqid, KERN_SUCCESS, 0L);
328 }
329 netipc_set_timeout();
330 #if iPSC386 || iPSC860
331 netipc_called_here(__FILE__, __LINE__, "_netipc_timeout_intr (leave)");
332 #endif iPSC386 || iPSC860
333 }
334
335 /*
336 * Should not panic, since a bad seqid is the sender's fault.
337 */
338 boolean_t
339 netipc_obsolete_seqid(pcs, seqid)
340 register pcs_t pcs;
341 unsigned long seqid;
342 {
343 if (seqid > pcs->pcs_last_sent) {
344 printf("premature seqid %d > %d\n", seqid, pcs->pcs_last_sent);
345 return TRUE;
346 }
347 if (seqid <= pcs->pcs_last_sent - MAX_WINDOW_SIZE) {
348 printf5("obsolete seqid %d <= %d\n",
349 seqid, pcs->pcs_last_sent - MAX_WINDOW_SIZE);
350 return TRUE;
351 }
352 if (pcs->pcs_unacked_packetid[seqid % WX] == 0) {
353 printf5("seqid %d already acked\n", seqid);
354 return TRUE;
355 }
356 return FALSE;
357 }
358
359 /*
360 * Called with interrupts blocked, from pcs->pcs_recv_intr,
361 * when a nack is received.
362 * We will be stopped waiting for an ack; resending does not change this.
363 */
364 netipc_recv_nack(pcs, seqid)
365 register pcs_t pcs;
366 unsigned long seqid;
367 {
368 assert(netipc_locked());
369
370 /*
371 * Ignore obsolete nacks.
372 */
373 if (netipc_obsolete_seqid(pcs, seqid)) {
374 return;
375 }
376
377 /*
378 * Even if we cannot retransmit right away, remember the nack
379 * so that we don't send another sync. I have seen sync-nack
380 * loops under certain conditions when nacks are simply dropped.
381 * We also remove ourselves from the timeout queue so that
382 * the timeout routine doesn't have to check for already-nacked
383 * packets. We have to add ourselves to the send queue because
384 * it is send_intr who is now responsible for sending the nack.
385 */
386 if (netipc_sending) {
387 pcs->pcs_nacked = seqid;
388 queue_untable(&netipc_timers, pcs, pcs_t, pcs_timer);
389 if (! queue_tabled(&pcs->pcs_send)) {
390 queue_enter(&netipc_sends, pcs, pcs_t, pcs_send);
391 }
392 return;
393 }
394
395 /*
396 * We can retransmit now, so do so.
397 */
398 assert(pcs->pcs_unacked_packetid[seqid % WX]);
399 netipc_send_old(pcs->pcs_unacked_packetid[seqid % WX], seqid);
400 }
401
402 netipc_recv_ack(pcs, seqid, status, data)
403 register pcs_t pcs;
404 unsigned long seqid;
405 kern_return_t status;
406 unsigned long data;
407 {
408 unsigned long packetid;
409
410 assert(netipc_locked());
411 /*
412 * Ignore obsolete acks.
413 */
414 if (netipc_obsolete_seqid(pcs, seqid)) {
415 return;
416 }
417
418 /*
419 * Acknowledge the seqid, possibly freeing the kmsg.
420 * Forget any recorded nack.
421 *
422 * XXX
423 * Should we cancel the timer? It's not really necessary.
424 * If we do, we should check to see whether there are
425 * any other unacked packets, and only cancel the timer
426 * if there aren't.
427 */
428 packetid = pcs->pcs_unacked_packetid[seqid % WX];
429 pcs->pcs_unacked_packetid[seqid % WX] = 0;
430 if (pcs->pcs_nacked == seqid) {
431 pcs->pcs_nacked = 0L;
432 }
433
434 /*
435 * Pass acknowledgement to upper level.
436 */
437 netipc_recv_ack_with_status(packetid, seqid, status, data);
438 }
439
440 /*
441 * Called by upper level to indicate that it has something to send,
442 * and that we should make an upcall when we can perform that send.
443 */
444 void
445 netipc_start(remote)
446 unsigned long remote;
447 {
448 register pcs_t pcs = &netipc_pcs[remote];
449
450 #if iPSC386 || iPSC860
451 netipc_called_here(__FILE__, __LINE__, "netipc_start (enter)");
452 #endif iPSC386 || iPSC860
453 assert(netipc_locked());
454 if (! netipc_sending) {
455 boolean_t sending;
456 #if iPSC386 || iPSC860
457 netipc_called_here(__FILE__, __LINE__, "{netipc_sending == FALSE}");
458 #endif iPSC386 || iPSC860
459 sending = netipc_send_new(remote, pcs->pcs_last_sent + 1);
460 assert(sending);
461 } else if (! queue_tabled(&pcs->pcs_send)) {
462 #if iPSC386 || iPSC860
463 netipc_called_here(__FILE__, __LINE__, "{queue_tabled==0}");
464 #endif iPSC386 || iPSC860
465 queue_enter(&netipc_sends, pcs, pcs_t, pcs_send);
466 }
467 #if iPSC386 || iPSC860
468 else {
469 netipc_called_here(__FILE__, __LINE__, "{neither!!}");
470 }
471 netipc_called_here(__FILE__, __LINE__, "netipc_start (leave)");
472 #endif iPSC386 || iPSC860
473 }
474
475 Xnetipc_next_seqid(remote)
476 unsigned long remote;
477 {
478 pcs_t pcs = &netipc_pcs[remote];
479
480 return pcs->pcs_last_sent + 1;
481 }
482
483 /*
484 * XXX Still need to worry about running out of copy objects
485 */
486
487 norma_kmsg_put(kmsg)
488 ipc_kmsg_t kmsg;
489 {
490 netipc_thread_lock();
491 netipc_page_put(kmsg->ikm_page);
492 netipc_thread_unlock();
493 }
494
495 netipc_cleanup_receive_state(pcs)
496 register pcs_t pcs;
497 {
498 pcs->pcs_last_received = 0;
499 }
500
501 /*
502 * Called with interrupts blocked when a page becomes available.
503 * Replaces current dummy page with new page, so that
504 * any incoming page will be valid.
505 * Note that with dma, a receiving may currently be happening.
506 *
507 * This has the bonus of saving a retransmit if we find a page
508 * quickly enough. I don't know how often this will happen.
509 *----
510 * up to date comment:
511 * shouldn't transfer all at once since we might stop again
512 *
513 * XXX
514 * Why don't we simply pass in a page instead of having this bogus assertion
515 * assert(page != NULL) ?
516 */
517 netipc_self_unstop()
518 {
519 register pcs_t pcs;
520
521 assert(netipc_locked());
522 c_netipc_unstop++;
523 assert(netipc_self_stopped);
524 netipc_fallback_page = netipc_page_grab();
525 assert(netipc_fallback_page != VM_PAGE_NULL);
526 netipc_self_stopped = FALSE;
527
528 queue_iterate(&netipc_unacks, pcs, pcs_t, pcs_unack) {
529 netipc_queue_ctl(pcs, CTL_NACK, pcs->pcs_last_received,
530 KERN_SUCCESS, 0L);
531 queue_untable(&netipc_unacks, pcs, pcs_t, pcs_unack);
532 }
533 }
534
535 int netipc_drop_freq = 0;
536 int netipc_drop_counter = 0;
537
538 /*
539 * A general comment about acknowledgements and seqids, since
540 * I've screwed this up in the past...
541 *
542 * If seqid > pcs->pcs_last_received + 1:
543 * Something is very wrong, since we are using stop-and-wait.
544 * The sender sent a packet before receiving an ack for the
545 * preceeding packet.
546 *
547 * If seqid = pcs->pcs_last_received + 1:
548 * This is a packet we've not processed before.
549 * Pass it up to the ipc code, and ack it (now or later).
550 *
551 * If seqid = pcs->pcs_last_received:
552 * We've seen this before and acked it, but the ack
553 * might have been lost. Ack it again.
554 *
555 * If seqid < pcs->pcs_last_received:
556 * We don't need to ack this, because we know (from having
557 * received later packets) that the sender has already
558 * received an ack for this packet.
559 *
560 * Again... this code needs to be modified to deal with window sizes > 1.
561 */
562 _netipc_recv_intr()
563 {
564 register struct netipc_hdr *hdr = &netipc_recv_hdr;
565 register pcs_t pcs;
566 register unsigned long seqid = hdr->seqid;
567 register unsigned long type = hdr->type;
568 register unsigned long incarnation = hdr->incarnation;
569 register unsigned long ctl = hdr->ctl;
570 register unsigned long ctl_seqid = hdr->ctl_seqid;
571 #if NETIPC_CHECKSUM
572 unsigned long checksum;
573 #endif NETIPC_CHECKSUM
574
575 #if iPSC386 || iPSC860
576 netipc_called_here(__FILE__, __LINE__, "_netipc_recv_intr");
577 #endif iPSC386 || iPSC860
578 assert(netipc_locked());
579 assert(netipc_receiving);
580 netipc_receiving = FALSE;
581
582 /*
583 * Netipc_drop_freq > 0 enables debugging code.
584 */
585 if (netipc_drop_freq) {
586 if (++netipc_drop_counter >= netipc_drop_freq) {
587 /*
588 * Reset counter, drop packet, and rearm.
589 */
590 netipc_drop_counter = 0;
591 netipc_start_receive();
592 return;
593 }
594 }
595
596 #if NETIPC_CHECKSUM
597 checksum = hdr->checksum;
598 hdr->checksum = 0;
599 hdr->checksum = netipc_compute_checksum(netvec_r, 2);
600 if (hdr->checksum != checksum) {
601 if (netipc_checksum_print) {
602 extern int Noise0;
603 int noise0 = Noise0;
604 Noise0 = 1;
605 netipc_print('r', type, hdr->remote, ctl, seqid,
606 ctl_seqid, hdr->ctl_status,
607 hdr->ctl_data, incarnation);
608 Noise0 = noise0;
609 printf("_netipc_recv_intr: checksum 0x%x != 0x%x!\n",
610 hdr->checksum, checksum);
611 }
612 netipc_start_receive();
613 return;
614 }
615 #endif NETIPC_CHECKSUM
616
617 /*
618 * XXX
619 * This isn't safe, but we'll fix it when we install a dynamic
620 * pcs table instead of a fixed size array.
621 */
622 pcs = &netipc_pcs[hdr->remote];
623
624 /*
625 * Print packet if so desired.
626 */
627 netipc_print('r', type, hdr->remote, ctl, seqid, ctl_seqid,
628 hdr->ctl_status, hdr->ctl_data, incarnation);
629
630 /*
631 * Check incarnation of sender with what we thought it was.
632 * This lets us detect old packets as well as newly rebooted senders.
633 * The incarnation value always increases with each reboot.
634 */
635 if (hdr->incarnation != pcs->pcs_incarnation) {
636 if (hdr->incarnation < pcs->pcs_incarnation) {
637 /*
638 * This is an old packet from a previous incarnation.
639 * We should ignore it.
640 */
641 netipc_start_receive();
642 return;
643 } else if (pcs->pcs_incarnation == 0L) {
644 /*
645 * This is the first packet we have ever received
646 * from this node. If it looks like a first packet
647 * (an implicit connection open), then remember
648 * incarnation number; otherwise, tell sender our
649 * new incarnation number, which should force him
650 * to do a netipc_cleanup_incarnation.
651 *
652 * Valid first packets are:
653 * non-control messages with seqid = 1
654 * control messages with ctl_seqid = 1
655 * Any others???
656 */
657 assert(pcs->pcs_last_received == 0L);
658 if ((type == NETIPC_TYPE_KMSG ||
659 type == NETIPC_TYPE_PAGE) && seqid == 1L ||
660 type == NETIPC_TYPE_CTL && ctl_seqid == 1L) {
661 /*
662 * A valid first packet.
663 */
664 pcs->pcs_incarnation = incarnation;
665 } else {
666 /*
667 * Probably left over from a previous
668 * incarnation. Use a dummy ctl to
669 * tell sender our new incarnation.
670 */
671 netipc_queue_ctl(pcs, CTL_NONE, seqid,
672 KERN_SUCCESS, 0L);
673 netipc_start_receive();
674 return;
675 }
676 } else {
677 /*
678 * This is a packet from a new incarnation.
679 * We don't change incarnation number or process
680 * any packets until we have finished cleaning up
681 * anything that depended on previous incarnation.
682 */
683 assert(incarnation > pcs->pcs_incarnation);
684 if (pcs->pcs_new_incarnation == 0L) {
685 pcs->pcs_new_incarnation = incarnation;
686 netipc_cleanup_incarnation(pcs);
687 }
688 netipc_start_receive();
689 return;
690 }
691 }
692 assert(incarnation == pcs->pcs_incarnation);
693
694 /*
695 * Check type of packet.
696 * Discard or acknowledge old packets.
697 */
698 if (type == NETIPC_TYPE_KMSG || type == NETIPC_TYPE_PAGE) {
699 /*
700 * If this is an old packet, then discard or acknowledge.
701 */
702 if (seqid <= pcs->pcs_last_received) {
703 /*
704 * We have seen this packet before...
705 * but we might still need to reack it.
706 */
707 c_netipc_old_recv++;
708 if (seqid == pcs->pcs_last_received) {
709 /*
710 * The sender may not yet have received an ack.
711 * Send the ack immediately.
712 * Should we use ackdelay logic here?
713 */
714 netipc_queue_ctl(pcs, CTL_ACK, seqid,
715 KERN_SUCCESS, 0L);
716 }
717 /*
718 * Rearm with same buffer, and return.
719 */
720 netipc_start_receive();
721 return;
722 }
723 } else if (type != NETIPC_TYPE_CTL) {
724 printf("netipc_recv_intr: bad type 0x%x\n", type);
725 netipc_start_receive();
726 return;
727 }
728
729 /*
730 * Process incoming ctl, if any.
731 */
732 if (ctl == CTL_NONE) {
733 /* nothing to do */
734 } else if (ctl == CTL_SYNC) {
735 printf2("synch %d\n", ctl_seqid);
736 if (ctl_seqid < pcs->pcs_last_received) {
737 /* already acked, since sender sent newer packet. */
738 } else if (ctl_seqid > pcs->pcs_last_received) {
739 /* not yet seen; nack it. */
740 /* use pending unacks? */
741 assert(ctl_seqid == pcs->pcs_last_received + 1);
742 if (ctl_seqid != pcs->pcs_last_received + 1) {
743 printf("X%d: %d != %d\n",
744 ctl_seqid, pcs->pcs_last_received + 1);
745 }
746 netipc_queue_ctl(pcs, CTL_NACK, ctl_seqid,
747 KERN_SUCCESS, 0L);
748 } else {
749 /* may not be acked; ack it. */
750 /* use ackdelay? */
751 assert(ctl_seqid == pcs->pcs_last_received);
752 netipc_queue_ctl(pcs, CTL_ACK, ctl_seqid,
753 KERN_SUCCESS, 0L);
754 }
755 } else if (ctl == CTL_ACK) {
756 netipc_recv_ack(pcs, ctl_seqid, hdr->ctl_status,
757 hdr->ctl_data);
758 } else if (ctl == CTL_NACK) {
759 netipc_recv_nack(pcs, ctl_seqid);
760 } else {
761 printf("%d: ctl?%d %d\n", node_self(), ctl, hdr->remote);
762 }
763
764 /*
765 * If this was just a ctl, then restart receive in same buffer.
766 */
767 if (type == NETIPC_TYPE_CTL) {
768 netipc_start_receive();
769 return;
770 }
771
772 /*
773 * If we are stopped, set up an nack (since this was more than
774 * just a ctl) and restart receive in same buffer.
775 */
776 if (netipc_self_stopped) {
777 if (! queue_tabled(&pcs->pcs_unack)) {
778 queue_enter(&netipc_unacks, pcs, pcs_t, pcs_unack);
779 }
780 netipc_start_receive();
781 return;
782 }
783
784 /*
785 * At this point:
786 * This is a previously unseen packet
787 * We have room to keep it
788 * It is either a kmsg or a page
789 *
790 * Deliver message according to its type.
791 */
792 assert(pcs->pcs_last_received == seqid - 1);
793 assert(! netipc_self_stopped);
794 assert(type == NETIPC_TYPE_KMSG || type == NETIPC_TYPE_PAGE);
795 if (type == NETIPC_TYPE_KMSG) {
796 register ipc_kmsg_t kmsg;
797
798 /*
799 * This is a kmsg. Kmsgs are word aligned,
800 * and contain their own length.
801 */
802 kmsg = (ipc_kmsg_t) VPTOKV(netipc_recv_page);
803 kmsg->ikm_size = IKM_SIZE_NORMA;
804 kmsg->ikm_marequest = IMAR_NULL;
805 kmsg->ikm_page = netipc_recv_page;
806 printf6("deliver kmsg: remote=%d msgh_id=%d dest=%x\n",
807 hdr->remote,
808 kmsg->ikm_header.msgh_id,
809 kmsg->ikm_header.msgh_remote_port);
810 norma_deliver_kmsg(kmsg, hdr->remote);
811 } else {
812 /*
813 * This is out-of-line data, or out-of-line ports,
814 * or more of a bigger-than-page-size kmsg.
815 */
816 printf6("deliver_page: remote=%d\n", hdr->remote);
817 norma_deliver_page(netipc_recv_page,
818 hdr->pg.pg_msgh_offset,
819 hdr->remote,
820 hdr->pg.pg_page_first,
821 hdr->pg.pg_page_last,
822 hdr->pg.pg_copy_last,
823 hdr->pg.pg_copy_size,
824 hdr->pg.pg_copy_offset);
825 }
826 }
827
828 norma_ipc_ack(status, data)
829 kern_return_t status;
830 unsigned long data;
831 {
832 register int remote = netipc_recv_hdr.remote;
833 register unsigned long seqid = netipc_recv_hdr.seqid;
834 register pcs_t pcs = &netipc_pcs[remote];
835
836 assert(netipc_locked());
837 /*
838 * Send acknowledgements.
839 * Eventually, we should wait and try to piggyback these ctls.
840 * This could lead to deadlock if we aren't tricky.
841 * We should for example send acks as soon as we are idle.
842 */
843 pcs->pcs_last_received = seqid;
844 netipc_queue_ctl(pcs, CTL_ACK, seqid, status, data);
845
846 /*
847 * Start a new receive.
848 * If status is success, the buffer is being kept,
849 * so allocate a new one.
850 * XXX should we have done this before the deliver calls above?
851 */
852 if (status == KERN_SUCCESS) {
853 netipc_recv_hdr.type = NETIPC_TYPE_INVALID;
854 netipc_recv_page = netipc_page_grab();
855 if (netipc_recv_page == VM_PAGE_NULL) {
856 c_netipc_stop++;
857 netipc_self_stopped = TRUE;
858 netipc_recv_page = netipc_fallback_page;
859 }
860 netvec_r[1].addr = VPTODEV(netipc_recv_page);
861 }
862 netipc_start_receive();
863 }
864
865 /*
866 * Drop the packet; pretend that we never saw it.
867 * Start a new receive.
868 *
869 * In a reliable interconnect module, we would register a nack here.
870 */
871 norma_ipc_drop()
872 {
873 netipc_start_receive();
874 }
875
876 /*
877 * Called with interrupts blocked, from netipc_self_unstop and
878 * netipc_recv_intr. Ctl may be either CTL_ACK or CTL_NACK. If we are
879 * currently sending, use netipc_pending_ctl to have netipc_send_intr do
880 * the send when the current send completes.
881 *
882 * The netipc_pending_ctl mechanism should be generalized to allow for
883 * arbitrary pending send operations, so that no one needs to spin on
884 * netipc_sending.
885 */
886 netipc_queue_ctl(pcs, ctl, ctl_seqid, ctl_status, ctl_data)
887 register pcs_t pcs;
888 unsigned long ctl;
889 unsigned long ctl_seqid;
890 kern_return_t ctl_status;
891 unsigned long ctl_data;
892 {
893 assert(netipc_locked());
894
895 /*
896 * If net is busy sending, let netipc_send_intr send the ctl.
897 */
898 if (netipc_sending) {
899 /*
900 * We may blow away a preceeding ctl, which is unfortunate
901 * but not fatal.
902 */
903 pcs->pcs_ctl = ctl;
904 pcs->pcs_ctl_seqid = ctl_seqid;
905 pcs->pcs_ctl_status = ctl_status;
906 pcs->pcs_ctl_data = ctl_data;
907 if (! queue_tabled(&pcs->pcs_send)) {
908 queue_enter(&netipc_sends, pcs, pcs_t, pcs_send);
909 }
910 return;
911 }
912
913 /*
914 * Fill in send_hdr_a.
915 */
916 send_hdr_a.ctl = ctl;
917 send_hdr_a.ctl_seqid = ctl_seqid;
918 send_hdr_a.ctl_status = ctl_status;
919 send_hdr_a.ctl_data = ctl_data;
920
921 /*
922 * Start the send.
923 */
924 netipc_send_ctl(pcs->pcs_remote);
925 }
926
927 /*
928 * Net send interrupt routine: called when a send completes.
929 * If there is something else to send (currently, only ctls),
930 * send it; otherwise, set netipc_sending FALSE.
931 */
932 _netipc_send_intr()
933 {
934 register pcs_t pcs;
935
936 assert(netipc_locked());
937 assert(netipc_sending);
938 netipc_sending = FALSE;
939 #if iPSC386 || iPSC860
940 netipc_called_here(__FILE__, __LINE__, "_netipc_send_intr");
941 #endif iPSC386 || iPSC860
942
943 /*
944 * Scan the pending list, looking for something to send.
945 * If something is on the list but doesn't belong, remove it.
946 */
947 queue_iterate(&netipc_sends, pcs, pcs_t, pcs_send) {
948 if (pcs->pcs_ctl != CTL_NONE) {
949 /*
950 * There is a ctl to send; send it.
951 */
952 send_hdr_a.ctl = pcs->pcs_ctl;
953 send_hdr_a.ctl_seqid = pcs->pcs_ctl_seqid;
954 send_hdr_a.ctl_status = pcs->pcs_ctl_status;
955 send_hdr_a.ctl_data = pcs->pcs_ctl_data;
956 pcs->pcs_ctl = CTL_NONE;
957 netipc_send_ctl(pcs->pcs_remote);
958 return;
959 }
960
961 /*
962 * There may be something to retransmit.
963 */
964 if (pcs->pcs_nacked != 0L) {
965 register unsigned long seqid = pcs->pcs_nacked;
966 pcs->pcs_nacked = 0L;
967 assert(pcs->pcs_unacked_packetid[seqid % WX]);
968 netipc_send_old(pcs->pcs_unacked_packetid[seqid % WX],
969 seqid);
970 }
971
972 /*
973 * There may be something new to send.
974 */
975 if (netipc_send_new(pcs->pcs_remote, pcs->pcs_last_sent + 1)) {
976 return;
977 }
978
979 /*
980 * Nothing to send -- remove from queue.
981 */
982 queue_untable(&netipc_sends, pcs, pcs_t, pcs_send);
983 }
984 }
985
986 netipc_protocol_init()
987 {
988 register unsigned long remote;
989 register pcs_t pcs;
990
991 /*
992 * Initialize pcs structs, queues, and enable timer.
993 */
994 for (remote = 0; remote < MAX_NUM_NODES; remote++) {
995 pcs = &netipc_pcs[remote];
996 pcs->pcs_remote = remote;
997 queue_init(&pcs->pcs_timer);
998 queue_init(&pcs->pcs_unack);
999 queue_init(&pcs->pcs_send);
1000 }
1001 queue_init(&netipc_timers);
1002 queue_init(&netipc_unacks);
1003 queue_init(&netipc_sends);
1004 netipc_set_timeout();
1005
1006 /*
1007 * Initialize netipc_recv_hdr and netvec_r
1008 */
1009 netipc_recv_hdr.type = NETIPC_TYPE_INVALID;
1010 netvec_r[0].addr = KVTODEV(&netipc_recv_hdr);
1011 netvec_r[0].size = sizeof(struct netipc_hdr);
1012 netvec_r[1].size = PAGE_SIZE;
1013
1014 /*
1015 * Initialize send_hdr_a and netvec_a
1016 */
1017 send_hdr_a.type = NETIPC_TYPE_CTL;
1018 send_hdr_a.remote = node_self();
1019 netvec_a[0].addr = KVTODEV(&send_hdr_a);
1020 netvec_a[0].size = sizeof(struct netipc_hdr);
1021 netvec_a[1].addr = KVTODEV(&send_data_a);
1022 netvec_a[1].size = sizeof(unsigned long);
1023
1024 /*
1025 * Start receiving.
1026 */
1027 netipc_recv_page = netipc_fallback_page = vm_page_grab();
1028 assert(netipc_recv_page != VM_PAGE_NULL);
1029 netipc_self_stopped = TRUE;
1030 netvec_r[1].addr = VPTODEV(netipc_recv_page);
1031 netipc_start_receive();
1032 }
1033
1034 netipc_start_receive()
1035 {
1036 assert(! netipc_receiving);
1037 netipc_receiving = TRUE;
1038 netvec_r[0].size = sizeof(struct netipc_hdr);
1039 netvec_r[1].size = PAGE_SIZE;
1040 netipc_recv(netvec_r, 2);
1041 }
1042
1043 netipc_init()
1044 {
1045 netipc_network_init();
1046 netipc_output_init();
1047 netipc_protocol_init();
1048 }
1049
1050 netipc_cleanup_incarnation(pcs)
1051 register pcs_t pcs;
1052 {
1053 printf1("netipc_cleanup_incarnation(%d)\n", pcs->pcs_remote);
1054
1055 /*
1056 * Clean up connection state and higher-level ipc state.
1057 */
1058 netipc_cleanup_send_state(pcs);
1059 netipc_cleanup_receive_state(pcs);
1060 netipc_cleanup_ipc_state(pcs->pcs_remote);
1061 }
1062
1063 #if 666
1064 /*
1065 * This be in the norma ipc layer and should do something real
1066 */
1067 netipc_cleanup_ipc_state(remote)
1068 unsigned long remote;
1069 {
1070 /* XXX */
1071 netipc_cleanup_incarnation_complete(remote);
1072 }
1073 #endif
1074
1075 netipc_cleanup_incarnation_complete(remote)
1076 unsigned long remote;
1077 {
1078 register pcs_t pcs = &netipc_pcs[remote];
1079
1080 pcs->pcs_incarnation = pcs->pcs_new_incarnation;
1081 pcs->pcs_new_incarnation = 0;
1082 }
1083
1084 netipc_send_ctl(remote)
1085 unsigned long remote;
1086 {
1087 register struct netipc_hdr *hdr = &send_hdr_a;
1088
1089 assert(netipc_locked());
1090 assert(! netipc_sending);
1091 netipc_sending = TRUE;
1092
1093 hdr->incarnation = node_incarnation;
1094 if (remote == node_self()) {
1095 panic("netipc_send_ctl: sending to node_self!\n");
1096 }
1097 netipc_print('s', NETIPC_TYPE_CTL, remote, hdr->ctl, 0L,
1098 hdr->ctl_seqid, hdr->ctl_status, hdr->ctl_data,
1099 node_incarnation);
1100 #if NETIPC_CHECKSUM
1101 hdr->checksum = 0;
1102 hdr->checksum = netipc_compute_checksum(netvec_a, 2);
1103 #endif NETIPC_CHECKSUM
1104 netipc_send(remote, netvec_a, 2);
1105 }
1106
1107 netipc_send_with_timeout(remote, vec, count, packetid, seqid)
1108 unsigned long remote;
1109 register struct netvec *vec;
1110 unsigned int count;
1111 unsigned long packetid;
1112 unsigned long seqid;
1113 {
1114 struct netipc_hdr *hdr = (struct netipc_hdr *) DEVTOKV(vec[0].addr);
1115 /* register unsigned long seqid = hdr->seqid;*/
1116 register unsigned long type = hdr->type;
1117 register unsigned long ctl;
1118 register unsigned long ctl_seqid = hdr->ctl_seqid;
1119 register kern_return_t ctl_status = hdr->ctl_status;
1120 register unsigned long ctl_data = hdr->ctl_data;
1121 register pcs_t pcs = &netipc_pcs[remote];
1122
1123 #if iPSC386 || iPSC860
1124 netipc_called_here(__FILE__, __LINE__, "netipc_send_with_timeout");
1125 #endif iPSC386 || iPSC860
1126 assert(netipc_locked());
1127 assert(! netipc_sending);
1128 assert(seqid = hdr->seqid);
1129 netipc_sending = TRUE;
1130
1131 ctl = hdr->ctl = CTL_NONE;
1132
1133 hdr->incarnation = node_incarnation;
1134 while (pcs->pcs_new_incarnation != 0L) {
1135 /*
1136 * Shouldn't get this far!
1137 */
1138 panic("netipc_send_with_timeout: incarnation clean!!!\n");
1139 }
1140 if (pcs->pcs_remote == node_self()) {
1141 panic("netipc_send_with_timeout: sending to node_self!\n");
1142 }
1143 netipc_print('s', type, pcs->pcs_remote, ctl, seqid, ctl_seqid,
1144 ctl_status, ctl_data, node_incarnation);
1145 if (pcs->pcs_unacked_packetid[seqid % WX] == 0) {
1146 pcs->pcs_unacked_packetid[seqid % WX] = packetid;
1147 }
1148 if (seqid > pcs->pcs_last_sent) {
1149 pcs->pcs_last_sent = seqid;
1150 }
1151 if (! queue_tabled(&pcs->pcs_timer)) {
1152 pcs->pcs_ticks = 0;
1153 queue_enter(&netipc_timers, pcs, pcs_t, pcs_timer);
1154 }
1155 #if NETIPC_CHECKSUM
1156 hdr->checksum = 0;
1157 hdr->checksum = netipc_compute_checksum(vec, count);
1158 #endif NETIPC_CHECKSUM
1159 netipc_send(pcs->pcs_remote, vec, count);
1160 }
1161
1162 netipc_print(c, type, remote, ctl, seqid, ctl_seqid, ctl_status, ctl_data,
1163 incarnation)
1164 char c;
1165 unsigned long type;
1166 unsigned long remote;
1167 unsigned long ctl;
1168 unsigned long seqid;
1169 unsigned long ctl_seqid;
1170 kern_return_t ctl_status;
1171 unsigned long ctl_data;
1172 unsigned long incarnation;
1173 {
1174 char *ts;
1175 char *cs;
1176 extern int Noise0;
1177
1178 assert(netipc_locked());
1179 if (Noise0 == 2) {
1180 printf("%c", c);
1181 return;
1182 }
1183 if (Noise0 == 0) {
1184 return;
1185 }
1186 if (type == NETIPC_TYPE_KMSG) {
1187 ts = "kmsg";
1188 } else if (type == NETIPC_TYPE_PAGE) {
1189 ts = "page";
1190 } else if (type == NETIPC_TYPE_CTL) {
1191 ts = "ctrl";
1192 seqid = 0;
1193 } else {
1194 ts = "????";
1195 }
1196 if (ctl == CTL_NONE) {
1197 cs = "none";
1198 ctl_seqid = 0;
1199 } else if (ctl == CTL_ACK) {
1200 cs = "ack ";
1201 } else if (ctl == CTL_NACK) {
1202 cs = "nack";
1203 } else if (ctl == CTL_SYNC) {
1204 cs = "sync";
1205 } else if (ctl == CTL_QUENCH) {
1206 cs = "qnch";
1207 } else {
1208 cs = "????";
1209 }
1210 printf("%c remote=%d type=%s.%04d ctl=%s.%04d kr=%2d data=%2d %10d\n",
1211 c, remote, ts, seqid, cs, ctl_seqid, ctl_status, ctl_data,
1212 incarnation);
1213 }
1214
1215 #include <mach_kdb.h>
1216 #if MACH_KDB
1217
1218 #define printf kdbprintf
1219
1220 /*
1221 * Routine: netipc_pcs_print
1222 * Purpose:
1223 * Pretty-print a netipc protocol control structure for ddb.
1224 */
1225
1226 netipc_pcs_print(pcs)
1227 pcs_t pcs;
1228 {
1229 extern int indent;
1230 int i;
1231
1232 if ((unsigned int) pcs < MAX_NUM_NODES) {
1233 pcs = &netipc_pcs[(unsigned int) pcs];
1234 }
1235
1236 printf("netipc pcs 0x%x\n", pcs);
1237
1238 indent += 2;
1239
1240 iprintf("remote=%d", pcs->pcs_remote);
1241 printf(", last_received=%d", pcs->pcs_last_received);
1242 printf(", last_sent=%d", pcs->pcs_last_sent);
1243 printf(", nacked=%d\n", pcs->pcs_nacked);
1244
1245 iprintf("sent_packet[0..%d]={", WX - 1);
1246 for (i = 0; i < WX - 1; i++) {
1247 printf("0x%x, ", pcs->pcs_unacked_packetid[i]);
1248 }
1249 printf("0x%x}\n", pcs->pcs_unacked_packetid[WX - 1]);
1250
1251 iprintf("incarnation=%d", pcs->pcs_incarnation);
1252 printf(", new_incarnation=%d\n", pcs->pcs_new_incarnation);
1253
1254 iprintf("ctl=%d", pcs->pcs_ctl);
1255 switch ((int) pcs->pcs_ctl) {
1256 case CTL_NONE:
1257 printf("[none]");
1258 break;
1259
1260 case CTL_ACK:
1261 printf("[ack]");
1262 break;
1263
1264 case CTL_NACK:
1265 printf("[nack]");
1266 break;
1267
1268 case CTL_SYNC:
1269 printf("[sync]");
1270 break;
1271
1272 case CTL_QUENCH:
1273 printf("[quench]");
1274 break;
1275
1276 default:
1277 printf("[bad type]");
1278 break;
1279 }
1280 printf(", ctl_seqid=%d", pcs->pcs_ctl_seqid);
1281 printf(", ctl_status=%ld", pcs->pcs_ctl_status);
1282 printf(", ctl_data=%ld\n", pcs->pcs_ctl_data);
1283
1284 iprintf("ticks=%d", pcs->pcs_ticks);
1285 printf(", timer=[%x,%x t=%d]",
1286 pcs->pcs_timer.prev,
1287 pcs->pcs_timer.next,
1288 queue_tabled(&pcs->pcs_timer));
1289 printf(", unack=[%x,%x t=%d]",
1290 pcs->pcs_unack.prev,
1291 pcs->pcs_unack.next,
1292 queue_tabled(&pcs->pcs_unack));
1293 printf(", send=[%x,%x t=%d]",
1294 pcs->pcs_send.prev,
1295 pcs->pcs_send.next,
1296 queue_tabled(&pcs->pcs_send));
1297 printf("\n");
1298
1299 indent -=2;
1300 }
1301 #endif MACH_KDB
Cache object: 02303a0fffef882c6aa1c03a715702ae
|