Add a new UDP socket option, IP_SUBSET, which allows a socket to match only a subset of the packets associated with its specific binding and/or connection state. When combined with SO_REUSEPORT, this allows applications to balance the workload for a binding over multiple sockets, which will presumably be used by multiple threads. This moves the work of load balancing from the application to the kernel, and avoids high levels of contention on a single socket receive buffer lock. Consumers of the IP option specify three parameters: a strategy, a number of sockets over which to balance the work, and the instance a particular socket represents. The current patch defines three strategies: IP_SUBSET_STRATEGY_DISABLED No subsetting IP_SUBSET_STRATEGY_FLOW Maintain ordering within flows having identical IP/port tuples by assigning all datagrams from the same flow to the same socket. IP_SUBSET_STRATEGY_RANDOM Disregard ordering by allowing datagrams to be (effectively) randomly distributed over the set of sockets. These offer a spectrum of ordering (and hence synchronization) requirements from strong (total ordering to one socket) to weak (no ordering and multiple sockets), depending on how weak semantics an application can support. Most applications will find that flow ordering is adequate, and some (such as DNS) may find random ordering is adequate. Typical consumer code will do something like the following: bzero(&sin, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_len = sizeof(sin); sin.sin_addr.s_addr = htonl(INADDR_ANY); sin.sin_port = htons(5000); bzero(&is, sizeof(is)); is.is_strategy = IP_SUBSET_STRATEGY_FLOW; is.is_count = SOCK_ARRAY_LEN; optval = 1; for (i = 0; i < SOCK_ARRAY_LEN; i++) { sock_array[i] = socket(PF_INET, SOCK_DGRAM, 0); if (sock_array[i] < 0) err(-1, "socket %d", i); if (setsockopt(sock_array[i], SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) < 0) &optval, sizeof(optval)) < 0) err(-1, "socket %d setsockopt SO_REUSEPORT", i); if (bind(sock_array[i], (struct sockaddr *)&sin, sizeof(sin)) < 0) err(-1, "socket %d bind", i); is.is_member = i; if (setsockopt(sock_array[i], IPPROTO_UDP, IP_SUBSET, &is, sizeof(is)) < 0) err(-1, "socket %d setsockopt IP_SUBSET", i); } This will distribute the work to SOCK_ARRAY_LEN sockets, but maintain flow ordering by delivering datagrams for the same flow to the same socket. Where available, the RSS or other hash provided by the input device will be used; otherwise we calculate our own flow identifier using a poorly chosen hash. Index: kern/kern_mbuf.c =================================================================== --- kern/kern_mbuf.c (revision 186233) +++ kern/kern_mbuf.c (working copy) @@ -421,6 +421,7 @@ m->m_pkthdr.csum_data = 0; m->m_pkthdr.tso_segsz = 0; m->m_pkthdr.ether_vtag = 0; + m->m_pkthdr.flowid = 0; SLIST_INIT(&m->m_pkthdr.tags); #ifdef MAC /* If the label init fails, fail the alloc */ @@ -645,6 +646,7 @@ m->m_pkthdr.csum_data = 0; m->m_pkthdr.tso_segsz = 0; m->m_pkthdr.ether_vtag = 0; + m->m_pkthdr.flowid = 0; SLIST_INIT(&m->m_pkthdr.tags); #ifdef MAC /* If the label init fails, fail the alloc */ Index: netinet/udp_var.h =================================================================== --- netinet/udp_var.h (revision 186233) +++ netinet/udp_var.h (working copy) @@ -106,6 +106,7 @@ extern int udp_log_in_vain; void udp_ctlinput(int, struct sockaddr *, void *); +int udp_ctloutput(struct socket *so, struct sockopt *sopt); void udp_init(void); void udp_input(struct mbuf *, int); struct inpcb *udp_notify(struct inpcb *inp, int errno); Index: netinet/in.h =================================================================== --- netinet/in.h (revision 186233) +++ netinet/in.h (working copy) @@ -486,6 +486,19 @@ #define MCAST_BLOCK_SOURCE 84 /* block a source */ #define MCAST_UNBLOCK_SOURCE 85 /* unblock a source */ +/* Binding subsets. */ +#define IP_SUBSET 86 /* get/set binding subset */ + +struct ip_subset { + u_int is_strategy; + u_int is_count; + u_int is_member; +}; + +#define IP_SUBSET_STRATEGY_DISABLED 0 +#define IP_SUBSET_STRATEGY_FLOW 1 +#define IP_SUBSET_STRATEGY_RANDOM 2 + /* * Defaults and limits for options */ Index: netinet/in_pcb.c =================================================================== --- netinet/in_pcb.c (revision 186233) +++ netinet/in_pcb.c (working copy) @@ -204,6 +204,7 @@ inp->inp_socket = so; inp->inp_cred = crhold(so->so_cred); inp->inp_inc.inc_fibnum = so->so_fibnum; + inp->inp_subset_strategy = IP_SUBSET_STRATEGY_DISABLED; #ifdef MAC error = mac_inpcb_init(inp, M_NOWAIT); if (error != 0) @@ -1284,12 +1285,66 @@ #undef INP_LOOKUP_MAPPED_PCB_COST /* + * Implement various subsetting strategies: determine whether a particular + * inpcb, implementing a particular strategy, matches the passed tuple or + * not. + */ +static int +in_subset_match(struct inpcb *inp, struct in_addr faddr, u_short fport, + struct in_addr laddr, u_short lport, u_short ip_id, u_int32_t flowid) +{ + + switch (inp->inp_subset_strategy) { + case IP_SUBSET_STRATEGY_FLOW: + if (flowid != 0) { + if ((flowid % inp->inp_subset_count) == + inp->inp_subset_member) + return (1); + } else { + /* + * XXXRW: This hash is not the hash that you are + * looking for. + */ + if (((faddr.s_addr ^ laddr.s_addr ^ fport ^ lport) % + inp->inp_subset_count) == inp->inp_subset_member) + return (1); + } + return (0); + + case IP_SUBSET_STRATEGY_RANDOM: + /* + * We include the flow in the hash as well in case we find + * that remote endpoints are seriously bad at the IP ID + * entropy thing. + * + * XXXRW: This hash is also not the hash that you are looking + * for. + */ + if (flowid != 0) { + if (((flowid ^ ip_id) % inp->inp_subset_count) == + inp->inp_subset_member) + return (1); + } else { + if (((faddr.s_addr ^ laddr.s_addr ^ fport ^ lport ^ + ip_id) % inp->inp_subset_count) == + inp->inp_subset_member) + return (1); + } + return (0); + + default: + panic("in_subset_match: strategy %d", + inp->inp_subset_strategy); + } +} + +/* * Lookup PCB in hash list. */ struct inpcb * -in_pcblookup_hash(struct inpcbinfo *pcbinfo, struct in_addr faddr, - u_int fport_arg, struct in_addr laddr, u_int lport_arg, int wildcard, - struct ifnet *ifp) +in_pcblookup_hash_full(struct inpcbinfo *pcbinfo, struct in_addr faddr, + u_int fport_arg, struct in_addr laddr, u_int lport_arg, u_short ip_id, + u_int32_t flowid, int wildcard, struct ifnet *ifp) { struct inpcbhead *head; struct inpcb *inp, *tmpinp; @@ -1309,20 +1364,25 @@ if ((inp->inp_vflag & INP_IPV4) == 0) continue; #endif - if (inp->inp_faddr.s_addr == faddr.s_addr && - inp->inp_laddr.s_addr == laddr.s_addr && - inp->inp_fport == fport && - inp->inp_lport == lport) { - /* - * XXX We should be able to directly return - * the inp here, without any checks. - * Well unless both bound with SO_REUSEPORT? - */ - if (jailed(inp->inp_cred)) - return (inp); - if (tmpinp == NULL) - tmpinp = inp; - } + if (inp->inp_faddr.s_addr != faddr.s_addr || + inp->inp_laddr.s_addr != laddr.s_addr || + inp->inp_fport != fport || + inp->inp_lport != lport) + continue; + if (inp->inp_subset_strategy != IP_SUBSET_STRATEGY_DISABLED + && !in_subset_match(inp, faddr, fport, laddr, lport, + ip_id, flowid)) + continue; + + /* + * XXX We should be able to directly return + * the inp here, without any checks. + * Well unless both bound with SO_REUSEPORT? + */ + if (jailed(inp->inp_cred)) + return (inp); + if (tmpinp == NULL) + tmpinp = inp; } if (tmpinp != NULL) return (tmpinp); @@ -1372,6 +1432,12 @@ continue; } + if (inp->inp_subset_strategy != + IP_SUBSET_STRATEGY_DISABLED && + !in_subset_match(inp, faddr, fport, laddr, lport, + ip_id, flowid)) + continue; + if (inp->inp_laddr.s_addr == laddr.s_addr) { if (injail) return (inp); @@ -1405,6 +1471,16 @@ return (NULL); } +struct inpcb * +in_pcblookup_hash(struct inpcbinfo *pcbinfo, struct in_addr faddr, + u_int fport_arg, struct in_addr laddr, u_int lport_arg, int wildcard, + struct ifnet *ifp) +{ + + return (in_pcblookup_hash_full(pcbinfo, faddr, fport_arg, laddr, + lport_arg, 0, 0, wildcard, ifp)); +} + /* * Insert PCB onto various hash lists. */ Index: netinet/in_pcb.h =================================================================== --- netinet/in_pcb.h (revision 186233) +++ netinet/in_pcb.h (working copy) @@ -199,6 +199,9 @@ } inp_depend6; LIST_ENTRY(inpcb) inp_portlist; /* (i/p) */ struct inpcbport *inp_phd; /* (i/p) head of this list */ + u_int inp_subset_strategy; + u_int inp_subset_count; + u_int inp_subset_member; #define inp_zero_size offsetof(struct inpcb, inp_gencnt) inp_gen_t inp_gencnt; /* (c) generation count */ struct rwlock inp_lock; @@ -493,6 +496,11 @@ struct inpcb * in_pcblookup_hash(struct inpcbinfo *, struct in_addr, u_int, struct in_addr, u_int, int, struct ifnet *); +struct inpcb * + in_pcblookup_hash_full(struct inpcbinfo *pcbinfo, + struct in_addr faddr, u_int fport_arg, struct in_addr laddr, + u_int lport_arg, u_short ip_id, u_int32_t flowid, int wildcard, + struct ifnet *ifp); void in_pcbnotifyall(struct inpcbinfo *pcbinfo, struct in_addr, int, struct inpcb *(*)(struct inpcb *, int)); void in_pcbref(struct inpcb *); Index: netinet/in_proto.c =================================================================== --- netinet/in_proto.c (revision 186233) +++ netinet/in_proto.c (working copy) @@ -124,7 +124,7 @@ .pr_flags = PR_ATOMIC|PR_ADDR, .pr_input = udp_input, .pr_ctlinput = udp_ctlinput, - .pr_ctloutput = ip_ctloutput, + .pr_ctloutput = udp_ctloutput, .pr_init = udp_init, .pr_usrreqs = &udp_usrreqs }, Index: netinet/udp_usrreq.c =================================================================== --- netinet/udp_usrreq.c (revision 186233) +++ netinet/udp_usrreq.c (working copy) @@ -526,8 +526,8 @@ /* * Locate pcb for datagram. */ - inp = in_pcblookup_hash(&V_udbinfo, ip->ip_src, uh->uh_sport, - ip->ip_dst, uh->uh_dport, 1, ifp); + inp = in_pcblookup_hash_full(&V_udbinfo, ip->ip_src, uh->uh_sport, + ip->ip_dst, uh->uh_dport, ip->ip_id, m->m_pkthdr.flowid, 1, ifp); if (inp == NULL) { if (udp_log_in_vain) { char buf[4*sizeof "123"]; @@ -621,6 +621,9 @@ * * XXX: We never get this from ICMP, otherwise it makes an excellent * DoS attack on machines with many connections. + * + * XXXRW: With subsetting, we should deliver this to all matching + * connections for the specific tuple. */ if (cmd == PRC_HOSTDEAD) ip = NULL; @@ -644,6 +647,67 @@ udp_notify); } +int +udp_ctloutput(struct socket *so, struct sockopt *sopt) +{ + INIT_VNET_INET(so->so_vnet); + struct ip_subset is; + struct inpcb *inp; + int error; + + inp = sotoinpcb(so); + KASSERT(inp != NULL, ("udp_ctloutput: inp == NULL")); + + if (sopt->sopt_level != IPPROTO_UDP) + return (ip_ctloutput(so, sopt)); + + switch (sopt->sopt_dir) { + case SOPT_GET: + switch (sopt->sopt_name) { + case IP_SUBSET: + bzero(&is, sizeof(is)); + INP_RLOCK(inp); + is.is_strategy = inp->inp_subset_strategy; + is.is_count = inp->inp_subset_count; + is.is_member = inp->inp_subset_member; + INP_RUNLOCK(inp); + return (sooptcopyout(sopt, &is, sizeof(is))); + } + break; + + case SOPT_SET: + switch (sopt->sopt_name) { + case IP_SUBSET: + error = sooptcopyin(sopt, &is, sizeof(is), + sizeof(is)); + if (error) + return (error); + switch (is.is_strategy) { + case IP_SUBSET_STRATEGY_DISABLED: + break; + + case IP_SUBSET_STRATEGY_FLOW: + case IP_SUBSET_STRATEGY_RANDOM: + if (is.is_count == 0 || + is.is_member >= is.is_count) + return (EINVAL); + break; + + default: + return (EINVAL); + } + INP_WLOCK(inp); + inp->inp_subset_strategy = is.is_strategy; + inp->inp_subset_count = is.is_count; + inp->inp_subset_member = is.is_member; + INP_WUNLOCK(inp); + return (0); + } + break; + } + return (ENOPROTOOPT); +} + static int udp_pcblist(SYSCTL_HANDLER_ARGS) { @@ -758,6 +822,11 @@ error = SYSCTL_IN(req, addrs, sizeof(addrs)); if (error) return (error); + + /* + * XXXRW: with IP subsetting, potentially more than one socket may + * match, so we just return the cred for the first one. + */ INP_INFO_RLOCK(&V_udbinfo); inp = in_pcblookup_hash(&V_udbinfo, addrs[1].sin_addr, addrs[1].sin_port, addrs[0].sin_addr, addrs[0].sin_port, 1, NULL);