From e0647ad80c63fcad6a9dc31541881fa02aeaac98 Mon Sep 17 00:00:00 2001 From: David Gibson Date: Thu, 18 Jul 2024 15:26:47 +1000 Subject: udp: Handle "spliced" datagrams with per-flow sockets When forwarding a datagram to a socket, we need to find a socket with a suitable local address to send it. Currently we keep track of such sockets in an array indexed by local port, but this can't properly handle cases where we have multiple local addresses in active use. For "spliced" (socket to socket) cases, improve this by instead opening a socket specifically for the target side of the flow. We connect() as well as bind()ing that socket, so that it will only receive the flow's reply packets, not anything else. We direct datagrams sent via that socket using the addresses from the flow table, effectively replacing bespoke addressing logic with the unified logic in fwd.c When we create the flow, we also take a duplicate of the originating socket, and use that to deliver reply datagrams back to the origin, again using addresses from the flow table entry. Signed-off-by: David Gibson Signed-off-by: Stefano Brivio --- udp.c | 436 +++++++++++++++++++++++++++--------------------------------------- 1 file changed, 175 insertions(+), 261 deletions(-) (limited to 'udp.c') diff --git a/udp.c b/udp.c index fdbe396..5543e61 100644 --- a/udp.c +++ b/udp.c @@ -35,7 +35,44 @@ * =================== * * UDP doesn't use listen(), but we consider long term sockets which are allowed - * to create new flows "listening" by analogy with TCP. + * to create new flows "listening" by analogy with TCP. This listening socket + * could receive packets from multiple flows, so we use a hash table match to + * find the specific flow for a datagram. + * + * When a UDP flow is initiated from a listening socket we take a duplicate of + * the socket and store it in uflow->s[INISIDE]. This will last for the + * lifetime of the flow, even if the original listening socket is closed due to + * port auto-probing. The duplicate is used to deliver replies back to the + * originating side. + * + * Reply sockets + * ============= + * + * When a UDP flow targets a socket, we create a "reply" socket in + * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive + * replies on the target side. This socket is both bound and connected and has + * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams + * associated with this flow, so the epoll reference directly points to the flow + * and we don't need a hash lookup. + * + * NOTE: it's possible that the reply socket could have a bound address + * overlapping with an unrelated listening socket. We assume datagrams for the + * flow will come to the reply socket in preference to a listening socket. The + * sample program doc/platform-requirements/reuseaddr-priority.c documents and + * tests that assumption. + * + * "Spliced" flows + * =============== + * + * In PASTA mode, L2-L4 translation is skipped for connections to ports bound + * between namespaces using the loopback interface, messages are directly + * transferred between L4 sockets instead. These are called spliced connections + * in analogy with the TCP implementation. The the splice() syscall isn't + * actually used; it doesn't make sense for datagrams and instead a pair of + * recvmmsg() and sendmmsg() is used to forward the datagrams. + * + * Note that a spliced flow will have *both* a duplicated listening socket and a + * reply socket (see above). * * Port tracking * ============= @@ -56,62 +93,6 @@ * * Packets are forwarded back and forth, by prepending and stripping UDP headers * in the obvious way, with no port translation. - * - * In PASTA mode, the L2-L4 translation is skipped for connections to ports - * bound between namespaces using the loopback interface, messages are directly - * transferred between L4 sockets instead. These are called spliced connections - * for consistency with the TCP implementation, but the splice() syscall isn't - * actually used as it wouldn't make sense for datagram-based connections: a - * pair of recvmmsg() and sendmmsg() deals with this case. - * - * The connection tracking for PASTA mode is slightly complicated by the absence - * of actual connections, see struct udp_splice_port, and these examples: - * - * - from init to namespace: - * - * - forward direction: 127.0.0.1:5000 -> 127.0.0.1:80 in init from socket s, - * with epoll reference: index = 80, splice = 1, orig = 1, ns = 0 - * - if udp_splice_ns[V4][5000].sock: - * - send packet to udp_splice_ns[V4][5000].sock, with destination port - * 80 - * - otherwise: - * - create new socket udp_splice_ns[V4][5000].sock - * - bind in namespace to 127.0.0.1:5000 - * - add to epoll with reference: index = 5000, splice = 1, orig = 0, - * ns = 1 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - * - reverse direction: 127.0.0.1:80 -> 127.0.0.1:5000 in namespace socket s, - * having epoll reference: index = 5000, splice = 1, orig = 0, ns = 1 - * - if udp_splice_init[V4][80].sock: - * - send to udp_splice_init[V4][80].sock, with destination port 5000 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - otherwise, discard - * - * - from namespace to init: - * - * - forward direction: 127.0.0.1:2000 -> 127.0.0.1:22 in namespace from - * socket s, with epoll reference: index = 22, splice = 1, orig = 1, ns = 1 - * - if udp4_splice_init[V4][2000].sock: - * - send packet to udp_splice_init[V4][2000].sock, with destination - * port 22 - * - otherwise: - * - create new socket udp_splice_init[V4][2000].sock - * - bind in init to 127.0.0.1:2000 - * - add to epoll with reference: index = 2000, splice = 1, orig = 0, - * ns = 0 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - * - reverse direction: 127.0.0.1:22 -> 127.0.0.1:2000 in init from socket s, - * having epoll reference: index = 2000, splice = 1, orig = 0, ns = 0 - * - if udp_splice_ns[V4][22].sock: - * - send to udp_splice_ns[V4][22].sock, with destination port 2000 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - otherwise, discard */ #include @@ -134,6 +115,7 @@ #include #include #include +#include #include #include "checksum.h" @@ -224,7 +206,6 @@ static struct ethhdr udp6_eth_hdr; * @ip4h: Pre-filled IPv4 header (except for tot_len and saddr) * @taph: Tap backend specific header * @s_in: Source socket address, filled in by recvmmsg() - * @splicesrc: Source port for splicing, or -1 if not spliceable * @tosidx: sidx for the destination side of this datagram's flow */ static struct udp_meta_t { @@ -233,7 +214,6 @@ static struct udp_meta_t { struct tap_hdr taph; union sockaddr_inany s_in; - int splicesrc; flow_sidx_t tosidx; } #ifdef __AVX2__ @@ -271,7 +251,6 @@ static struct mmsghdr udp_mh_splice [UDP_MAX_FRAMES]; /* IOVs for L2 frames */ static struct iovec udp_l2_iov [UDP_MAX_FRAMES][UDP_NUM_IOVS]; - /** * udp_portmap_clear() - Clear UDP port map before configuration */ @@ -384,140 +363,6 @@ static void udp_iov_init(const struct ctx *c) udp_iov_init_one(c, i); } -/** - * udp_splice_new() - Create and prepare socket for "spliced" binding - * @c: Execution context - * @v6: Set for IPv6 sockets - * @src: Source port of original connection, host order - * @ns: Does the splice originate in the ns or not - * - * Return: prepared socket, negative error code on failure - * - * #syscalls:pasta getsockname - */ -int udp_splice_new(const struct ctx *c, int v6, in_port_t src, bool ns) -{ - struct epoll_event ev = { .events = EPOLLIN | EPOLLRDHUP | EPOLLHUP }; - union epoll_ref ref = { .type = EPOLL_TYPE_UDP, - .udp = { .splice = true, .v6 = v6, .port = src } - }; - struct udp_splice_port *sp; - int act, s; - - if (ns) { - ref.udp.pif = PIF_SPLICE; - sp = &udp_splice_ns[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_NS; - } else { - ref.udp.pif = PIF_HOST; - sp = &udp_splice_init[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_INIT; - } - - s = socket(v6 ? AF_INET6 : AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, - IPPROTO_UDP); - - if (s > FD_REF_MAX) { - close(s); - return -EIO; - } - - if (s < 0) - return s; - - ref.fd = s; - - if (v6) { - struct sockaddr_in6 addr6 = { - .sin6_family = AF_INET6, - .sin6_port = htons(src), - .sin6_addr = IN6ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr6, sizeof(addr6))) - goto fail; - } else { - struct sockaddr_in addr4 = { - .sin_family = AF_INET, - .sin_port = htons(src), - .sin_addr = IN4ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr4, sizeof(addr4))) - goto fail; - } - - sp->sock = s; - bitmap_set(udp_act[v6 ? V6 : V4][act], src); - - ev.data.u64 = ref.u64; - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, s, &ev); - return s; - -fail: - close(s); - return -1; -} - -/** - * struct udp_splice_new_ns_arg - Arguments for udp_splice_new_ns() - * @c: Execution context - * @v6: Set for IPv6 - * @src: Source port of originating datagram, host order - * @dst: Destination port of originating datagram, host order - * @s: Newly created socket or negative error code - */ -struct udp_splice_new_ns_arg { - const struct ctx *c; - int v6; - in_port_t src; - int s; -}; - -/** - * udp_splice_new_ns() - Enter namespace and call udp_splice_new() - * @arg: See struct udp_splice_new_ns_arg - * - * Return: 0 - */ -static int udp_splice_new_ns(void *arg) -{ - struct udp_splice_new_ns_arg *a; - - a = (struct udp_splice_new_ns_arg *)arg; - - ns_enter(a->c); - - a->s = udp_splice_new(a->c, a->v6, a->src, true); - - return 0; -} - -/** - * udp_mmh_splice_port() - Is source address of message suitable for splicing? - * @ref: epoll reference for incoming message's origin socket - * @mmh: mmsghdr of incoming message - * - * Return: if source address of message in @mmh refers to localhost (127.0.0.1 - * or ::1) its source port (host order), otherwise -1. - */ -static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh) -{ - const struct sockaddr_in6 *sa6 = mmh->msg_hdr.msg_name; - const struct sockaddr_in *sa4 = mmh->msg_hdr.msg_name; - - ASSERT(ref.type == EPOLL_TYPE_UDP); - - if (!ref.udp.splice) - return -1; - - if (ref.udp.v6 && IN6_IS_ADDR_LOOPBACK(&sa6->sin6_addr)) - return ntohs(sa6->sin6_port); - - if (!ref.udp.v6 && IN4_IS_ADDR_LOOPBACK(&sa4->sin_addr)) - return ntohs(sa4->sin_port); - - return -1; -} - /** * udp_at_sidx() - Get UDP specific flow at given sidx * @sidx: Flow and side to retrieve @@ -541,8 +386,20 @@ struct udp_flow *udp_at_sidx(flow_sidx_t sidx) * @c: Execution context * @uflow: UDP flow */ -static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) +static void udp_flow_close(const struct ctx *c, struct udp_flow *uflow) { + if (uflow->s[INISIDE] >= 0) { + /* The listening socket needs to stay in epoll */ + close(uflow->s[INISIDE]); + uflow->s[INISIDE] = -1; + } + + if (uflow->s[TGTSIDE] >= 0) { + /* But the flow specific one needs to be removed */ + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, uflow->s[TGTSIDE], NULL); + close(uflow->s[TGTSIDE]); + uflow->s[TGTSIDE] = -1; + } flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE)); } @@ -550,26 +407,92 @@ static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) * udp_flow_new() - Common setup for a new UDP flow * @c: Execution context * @flow: Initiated flow + * @s_ini: Initiating socket (or -1) * @now: Timestamp * * Return: UDP specific flow, if successful, NULL on failure */ static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow, - const struct timespec *now) + int s_ini, const struct timespec *now) { const struct flowside *ini = &flow->f.side[INISIDE]; struct udp_flow *uflow = NULL; + const struct flowside *tgt; + uint8_t tgtpif; if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) { flow_trace(flow, "Invalid endpoint to initiate UDP flow"); goto cancel; } - if (!flow_target(c, flow, IPPROTO_UDP)) + if (!(tgt = flow_target(c, flow, IPPROTO_UDP))) goto cancel; + tgtpif = flow->f.pif[TGTSIDE]; uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp); uflow->ts = now->tv_sec; + uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1; + + if (s_ini >= 0) { + /* When using auto port-scanning the listening port could go + * away, so we need to duplicate the socket + */ + uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0); + if (uflow->s[INISIDE] < 0) { + flow_err(uflow, + "Couldn't duplicate listening socket: %s", + strerror(errno)); + goto cancel; + } + } + + if (pif_is_socket(tgtpif)) { + struct mmsghdr discard[UIO_MAXIOV] = { 0 }; + union { + flow_sidx_t sidx; + uint32_t data; + } fref = { + .sidx = FLOW_SIDX(flow, TGTSIDE), + }; + int rc; + + uflow->s[TGTSIDE] = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY, + tgtpif, tgt, fref.data); + if (uflow->s[TGTSIDE] < 0) { + flow_dbg(uflow, + "Couldn't open socket for spliced flow: %s", + strerror(errno)); + goto cancel; + } + + if (flowside_connect(c, uflow->s[TGTSIDE], tgtpif, tgt) < 0) { + flow_dbg(uflow, + "Couldn't connect flow socket: %s", + strerror(errno)); + goto cancel; + } + + /* It's possible, if unlikely, that we could receive some + * unrelated packets in between the bind() and connect() of this + * socket. For now we just discard these. We could consider + * trying to redirect these to an appropriate handler, if we + * need to. + */ + rc = recvmmsg(uflow->s[TGTSIDE], discard, ARRAY_SIZE(discard), + MSG_DONTWAIT, NULL); + if (rc >= ARRAY_SIZE(discard)) { + flow_dbg(uflow, + "Too many (%d) spurious reply datagrams", rc); + goto cancel; + } else if (rc > 0) { + flow_trace(uflow, + "Discarded %d spurious reply datagrams", rc); + } else if (errno != EAGAIN) { + flow_err(uflow, + "Unexpected error discarding datagrams: %s", + strerror(errno)); + } + } flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE)); FLOW_ACTIVATE(uflow); @@ -581,7 +504,6 @@ cancel: udp_flow_close(c, uflow); flow_alloc_cancel(flow); return FLOW_SIDX_NONE; - } /** @@ -591,6 +513,8 @@ cancel: * @meta: Metadata buffer for the datagram * @now: Timestamp * + * #syscalls fcntl + * * Return: sidx for the destination side of the flow for this packet, or * FLOW_SIDX_NONE if we couldn't find or create a flow. */ @@ -624,7 +548,7 @@ static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref, } flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port); - return udp_flow_new(c, flow, now); + return udp_flow_new(c, flow, ref.fd, now); } /** @@ -648,55 +572,16 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx) * @now: Timestamp */ static void udp_splice_send(const struct ctx *c, size_t start, size_t n, - in_port_t src, in_port_t dst, - union epoll_ref ref, - const struct timespec *now) + flow_sidx_t tosidx) { - int s; - - if (ref.udp.v6) { - udp_splice_to.sa6 = (struct sockaddr_in6) { - .sin6_family = AF_INET6, - .sin6_addr = in6addr_loopback, - .sin6_port = htons(dst), - }; - } else { - udp_splice_to.sa4 = (struct sockaddr_in) { - .sin_family = AF_INET, - .sin_addr = in4addr_loopback, - .sin_port = htons(dst), - }; - } - - if (ref.udp.pif == PIF_SPLICE) { - src += c->udp.fwd_in.rdelta[src]; - s = udp_splice_init[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) - s = udp_splice_new(c, ref.udp.v6, src, false); - - if (s < 0) - return; - - udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_init[ref.udp.v6][src].ts = now->tv_sec; - } else { - ASSERT(ref.udp.pif == PIF_HOST); - src += c->udp.fwd_out.rdelta[src]; - s = udp_splice_ns[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) { - struct udp_splice_new_ns_arg arg = { - c, ref.udp.v6, src, -1, - }; - - NS_CALL(udp_splice_new_ns, &arg); - s = arg.s; - } - if (s < 0) - return; + const struct flowside *toside = flowside_at_sidx(tosidx); + const struct udp_flow *uflow = udp_at_sidx(tosidx); + uint8_t topif = pif_at_sidx(tosidx); + int s = uflow->s[tosidx.sidei]; + socklen_t sl; - udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec; - } + pif_sockaddr(c, &udp_splice_to, &sl, topif, + &toside->eaddr, toside->eport); sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL); } @@ -984,20 +869,18 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve dstport += c->udp.fwd_in.f.delta[dstport]; /* We divide datagrams into batches based on how we need to send them, - * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx. To avoid - * either two passes through the array, or recalculating splicesrc and - * tosidxfor a single entry, we have to populate it one entry *ahead* of - * the loop counter. + * determined by udp_meta[i].tosidx. To avoid either two passes through + * the array, or recalculating tosidx for a single entry, we have to + * populate it one entry *ahead* of the loop counter. */ - udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv); udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now); for (i = 0; i < n; ) { flow_sidx_t batchsidx = udp_meta[i].tosidx; - int batchsrc = udp_meta[i].splicesrc; + uint8_t batchpif = pif_at_sidx(batchsidx); int batchstart = i; do { - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_prepare(mmh_recv, i); } else { udp_tap_prepare(c, mmh_recv, i, dstport, @@ -1007,17 +890,14 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve if (++i >= n) break; - udp_meta[i].splicesrc = udp_mmh_splice_port(ref, - &mmh_recv[i]); udp_meta[i].tosidx = udp_flow_from_sock(c, ref, &udp_meta[i], now); - } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) && - udp_meta[i].splicesrc == batchsrc); + } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx)); - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_send(c, batchstart, i - batchstart, - batchsrc, dstport, ref, now); + batchsidx); } else { tap_send_frames(c, &udp_l2_iov[batchstart][0], UDP_NUM_IOVS, i - batchstart); @@ -1025,6 +905,40 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve } } +/** + * udp_reply_sock_handler() - Handle new data from flow specific socket + * @c: Execution context + * @ref: epoll reference + * @events: epoll events bitmap + * @now: Current timestamp + * + * #syscalls recvmmsg + */ +void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now) +{ + const struct flowside *fromside = flowside_at_sidx(ref.flowside); + flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside); + struct udp_flow *uflow = udp_at_sidx(ref.flowside); + int from_s = uflow->s[ref.flowside.sidei]; + bool v6 = !inany_v4(&fromside->eaddr); + struct mmsghdr *mmh_recv = v6 ? udp6_mh_recv : udp4_mh_recv; + int n, i; + + ASSERT(!c->no_udp && uflow); + + if ((n = udp_sock_recv(c, from_s, events, mmh_recv)) <= 0) + return; + + flow_trace(uflow, "Received %d datagrams on reply socket", n); + uflow->ts = now->tv_sec; + + for (i = 0; i < n; i++) + udp_splice_prepare(mmh_recv, i); + + udp_splice_send(c, 0, n, tosidx); +} + /** * udp_tap_handler() - Handle packets from tap * @c: Execution context @@ -1419,8 +1333,8 @@ static int udp_port_rebind_outbound(void *arg) * * Return: true if the flow is ready to free, false otherwise */ -bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow, - const struct timespec *now) +bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow, + const struct timespec *now) { if (now->tv_sec - uflow->ts <= UDP_CONN_TIMEOUT) return false; -- cgit v1.2.3