diff options
Diffstat (limited to 'udp.c')
-rw-r--r-- | udp.c | 436 |
1 files changed, 175 insertions, 261 deletions
@@ -35,7 +35,44 @@ * =================== * * UDP doesn't use listen(), but we consider long term sockets which are allowed - * to create new flows "listening" by analogy with TCP. + * to create new flows "listening" by analogy with TCP. This listening socket + * could receive packets from multiple flows, so we use a hash table match to + * find the specific flow for a datagram. + * + * When a UDP flow is initiated from a listening socket we take a duplicate of + * the socket and store it in uflow->s[INISIDE]. This will last for the + * lifetime of the flow, even if the original listening socket is closed due to + * port auto-probing. The duplicate is used to deliver replies back to the + * originating side. + * + * Reply sockets + * ============= + * + * When a UDP flow targets a socket, we create a "reply" socket in + * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive + * replies on the target side. This socket is both bound and connected and has + * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams + * associated with this flow, so the epoll reference directly points to the flow + * and we don't need a hash lookup. + * + * NOTE: it's possible that the reply socket could have a bound address + * overlapping with an unrelated listening socket. We assume datagrams for the + * flow will come to the reply socket in preference to a listening socket. The + * sample program doc/platform-requirements/reuseaddr-priority.c documents and + * tests that assumption. + * + * "Spliced" flows + * =============== + * + * In PASTA mode, L2-L4 translation is skipped for connections to ports bound + * between namespaces using the loopback interface, messages are directly + * transferred between L4 sockets instead. These are called spliced connections + * in analogy with the TCP implementation. The the splice() syscall isn't + * actually used; it doesn't make sense for datagrams and instead a pair of + * recvmmsg() and sendmmsg() is used to forward the datagrams. + * + * Note that a spliced flow will have *both* a duplicated listening socket and a + * reply socket (see above). * * Port tracking * ============= @@ -56,62 +93,6 @@ * * Packets are forwarded back and forth, by prepending and stripping UDP headers * in the obvious way, with no port translation. - * - * In PASTA mode, the L2-L4 translation is skipped for connections to ports - * bound between namespaces using the loopback interface, messages are directly - * transferred between L4 sockets instead. These are called spliced connections - * for consistency with the TCP implementation, but the splice() syscall isn't - * actually used as it wouldn't make sense for datagram-based connections: a - * pair of recvmmsg() and sendmmsg() deals with this case. - * - * The connection tracking for PASTA mode is slightly complicated by the absence - * of actual connections, see struct udp_splice_port, and these examples: - * - * - from init to namespace: - * - * - forward direction: 127.0.0.1:5000 -> 127.0.0.1:80 in init from socket s, - * with epoll reference: index = 80, splice = 1, orig = 1, ns = 0 - * - if udp_splice_ns[V4][5000].sock: - * - send packet to udp_splice_ns[V4][5000].sock, with destination port - * 80 - * - otherwise: - * - create new socket udp_splice_ns[V4][5000].sock - * - bind in namespace to 127.0.0.1:5000 - * - add to epoll with reference: index = 5000, splice = 1, orig = 0, - * ns = 1 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - * - reverse direction: 127.0.0.1:80 -> 127.0.0.1:5000 in namespace socket s, - * having epoll reference: index = 5000, splice = 1, orig = 0, ns = 1 - * - if udp_splice_init[V4][80].sock: - * - send to udp_splice_init[V4][80].sock, with destination port 5000 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - otherwise, discard - * - * - from namespace to init: - * - * - forward direction: 127.0.0.1:2000 -> 127.0.0.1:22 in namespace from - * socket s, with epoll reference: index = 22, splice = 1, orig = 1, ns = 1 - * - if udp4_splice_init[V4][2000].sock: - * - send packet to udp_splice_init[V4][2000].sock, with destination - * port 22 - * - otherwise: - * - create new socket udp_splice_init[V4][2000].sock - * - bind in init to 127.0.0.1:2000 - * - add to epoll with reference: index = 2000, splice = 1, orig = 0, - * ns = 0 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - * - reverse direction: 127.0.0.1:22 -> 127.0.0.1:2000 in init from socket s, - * having epoll reference: index = 2000, splice = 1, orig = 0, ns = 0 - * - if udp_splice_ns[V4][22].sock: - * - send to udp_splice_ns[V4][22].sock, with destination port 2000 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - otherwise, discard */ #include <sched.h> @@ -134,6 +115,7 @@ #include <sys/socket.h> #include <sys/uio.h> #include <time.h> +#include <fcntl.h> #include <linux/errqueue.h> #include "checksum.h" @@ -224,7 +206,6 @@ static struct ethhdr udp6_eth_hdr; * @ip4h: Pre-filled IPv4 header (except for tot_len and saddr) * @taph: Tap backend specific header * @s_in: Source socket address, filled in by recvmmsg() - * @splicesrc: Source port for splicing, or -1 if not spliceable * @tosidx: sidx for the destination side of this datagram's flow */ static struct udp_meta_t { @@ -233,7 +214,6 @@ static struct udp_meta_t { struct tap_hdr taph; union sockaddr_inany s_in; - int splicesrc; flow_sidx_t tosidx; } #ifdef __AVX2__ @@ -271,7 +251,6 @@ static struct mmsghdr udp_mh_splice [UDP_MAX_FRAMES]; /* IOVs for L2 frames */ static struct iovec udp_l2_iov [UDP_MAX_FRAMES][UDP_NUM_IOVS]; - /** * udp_portmap_clear() - Clear UDP port map before configuration */ @@ -385,140 +364,6 @@ static void udp_iov_init(const struct ctx *c) } /** - * udp_splice_new() - Create and prepare socket for "spliced" binding - * @c: Execution context - * @v6: Set for IPv6 sockets - * @src: Source port of original connection, host order - * @ns: Does the splice originate in the ns or not - * - * Return: prepared socket, negative error code on failure - * - * #syscalls:pasta getsockname - */ -int udp_splice_new(const struct ctx *c, int v6, in_port_t src, bool ns) -{ - struct epoll_event ev = { .events = EPOLLIN | EPOLLRDHUP | EPOLLHUP }; - union epoll_ref ref = { .type = EPOLL_TYPE_UDP, - .udp = { .splice = true, .v6 = v6, .port = src } - }; - struct udp_splice_port *sp; - int act, s; - - if (ns) { - ref.udp.pif = PIF_SPLICE; - sp = &udp_splice_ns[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_NS; - } else { - ref.udp.pif = PIF_HOST; - sp = &udp_splice_init[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_INIT; - } - - s = socket(v6 ? AF_INET6 : AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, - IPPROTO_UDP); - - if (s > FD_REF_MAX) { - close(s); - return -EIO; - } - - if (s < 0) - return s; - - ref.fd = s; - - if (v6) { - struct sockaddr_in6 addr6 = { - .sin6_family = AF_INET6, - .sin6_port = htons(src), - .sin6_addr = IN6ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr6, sizeof(addr6))) - goto fail; - } else { - struct sockaddr_in addr4 = { - .sin_family = AF_INET, - .sin_port = htons(src), - .sin_addr = IN4ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr4, sizeof(addr4))) - goto fail; - } - - sp->sock = s; - bitmap_set(udp_act[v6 ? V6 : V4][act], src); - - ev.data.u64 = ref.u64; - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, s, &ev); - return s; - -fail: - close(s); - return -1; -} - -/** - * struct udp_splice_new_ns_arg - Arguments for udp_splice_new_ns() - * @c: Execution context - * @v6: Set for IPv6 - * @src: Source port of originating datagram, host order - * @dst: Destination port of originating datagram, host order - * @s: Newly created socket or negative error code - */ -struct udp_splice_new_ns_arg { - const struct ctx *c; - int v6; - in_port_t src; - int s; -}; - -/** - * udp_splice_new_ns() - Enter namespace and call udp_splice_new() - * @arg: See struct udp_splice_new_ns_arg - * - * Return: 0 - */ -static int udp_splice_new_ns(void *arg) -{ - struct udp_splice_new_ns_arg *a; - - a = (struct udp_splice_new_ns_arg *)arg; - - ns_enter(a->c); - - a->s = udp_splice_new(a->c, a->v6, a->src, true); - - return 0; -} - -/** - * udp_mmh_splice_port() - Is source address of message suitable for splicing? - * @ref: epoll reference for incoming message's origin socket - * @mmh: mmsghdr of incoming message - * - * Return: if source address of message in @mmh refers to localhost (127.0.0.1 - * or ::1) its source port (host order), otherwise -1. - */ -static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh) -{ - const struct sockaddr_in6 *sa6 = mmh->msg_hdr.msg_name; - const struct sockaddr_in *sa4 = mmh->msg_hdr.msg_name; - - ASSERT(ref.type == EPOLL_TYPE_UDP); - - if (!ref.udp.splice) - return -1; - - if (ref.udp.v6 && IN6_IS_ADDR_LOOPBACK(&sa6->sin6_addr)) - return ntohs(sa6->sin6_port); - - if (!ref.udp.v6 && IN4_IS_ADDR_LOOPBACK(&sa4->sin_addr)) - return ntohs(sa4->sin_port); - - return -1; -} - -/** * udp_at_sidx() - Get UDP specific flow at given sidx * @sidx: Flow and side to retrieve * @@ -541,8 +386,20 @@ struct udp_flow *udp_at_sidx(flow_sidx_t sidx) * @c: Execution context * @uflow: UDP flow */ -static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) +static void udp_flow_close(const struct ctx *c, struct udp_flow *uflow) { + if (uflow->s[INISIDE] >= 0) { + /* The listening socket needs to stay in epoll */ + close(uflow->s[INISIDE]); + uflow->s[INISIDE] = -1; + } + + if (uflow->s[TGTSIDE] >= 0) { + /* But the flow specific one needs to be removed */ + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, uflow->s[TGTSIDE], NULL); + close(uflow->s[TGTSIDE]); + uflow->s[TGTSIDE] = -1; + } flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE)); } @@ -550,26 +407,92 @@ static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) * udp_flow_new() - Common setup for a new UDP flow * @c: Execution context * @flow: Initiated flow + * @s_ini: Initiating socket (or -1) * @now: Timestamp * * Return: UDP specific flow, if successful, NULL on failure */ static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow, - const struct timespec *now) + int s_ini, const struct timespec *now) { const struct flowside *ini = &flow->f.side[INISIDE]; struct udp_flow *uflow = NULL; + const struct flowside *tgt; + uint8_t tgtpif; if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) { flow_trace(flow, "Invalid endpoint to initiate UDP flow"); goto cancel; } - if (!flow_target(c, flow, IPPROTO_UDP)) + if (!(tgt = flow_target(c, flow, IPPROTO_UDP))) goto cancel; + tgtpif = flow->f.pif[TGTSIDE]; uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp); uflow->ts = now->tv_sec; + uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1; + + if (s_ini >= 0) { + /* When using auto port-scanning the listening port could go + * away, so we need to duplicate the socket + */ + uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0); + if (uflow->s[INISIDE] < 0) { + flow_err(uflow, + "Couldn't duplicate listening socket: %s", + strerror(errno)); + goto cancel; + } + } + + if (pif_is_socket(tgtpif)) { + struct mmsghdr discard[UIO_MAXIOV] = { 0 }; + union { + flow_sidx_t sidx; + uint32_t data; + } fref = { + .sidx = FLOW_SIDX(flow, TGTSIDE), + }; + int rc; + + uflow->s[TGTSIDE] = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY, + tgtpif, tgt, fref.data); + if (uflow->s[TGTSIDE] < 0) { + flow_dbg(uflow, + "Couldn't open socket for spliced flow: %s", + strerror(errno)); + goto cancel; + } + + if (flowside_connect(c, uflow->s[TGTSIDE], tgtpif, tgt) < 0) { + flow_dbg(uflow, + "Couldn't connect flow socket: %s", + strerror(errno)); + goto cancel; + } + + /* It's possible, if unlikely, that we could receive some + * unrelated packets in between the bind() and connect() of this + * socket. For now we just discard these. We could consider + * trying to redirect these to an appropriate handler, if we + * need to. + */ + rc = recvmmsg(uflow->s[TGTSIDE], discard, ARRAY_SIZE(discard), + MSG_DONTWAIT, NULL); + if (rc >= ARRAY_SIZE(discard)) { + flow_dbg(uflow, + "Too many (%d) spurious reply datagrams", rc); + goto cancel; + } else if (rc > 0) { + flow_trace(uflow, + "Discarded %d spurious reply datagrams", rc); + } else if (errno != EAGAIN) { + flow_err(uflow, + "Unexpected error discarding datagrams: %s", + strerror(errno)); + } + } flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE)); FLOW_ACTIVATE(uflow); @@ -581,7 +504,6 @@ cancel: udp_flow_close(c, uflow); flow_alloc_cancel(flow); return FLOW_SIDX_NONE; - } /** @@ -591,6 +513,8 @@ cancel: * @meta: Metadata buffer for the datagram * @now: Timestamp * + * #syscalls fcntl + * * Return: sidx for the destination side of the flow for this packet, or * FLOW_SIDX_NONE if we couldn't find or create a flow. */ @@ -624,7 +548,7 @@ static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref, } flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port); - return udp_flow_new(c, flow, now); + return udp_flow_new(c, flow, ref.fd, now); } /** @@ -648,55 +572,16 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx) * @now: Timestamp */ static void udp_splice_send(const struct ctx *c, size_t start, size_t n, - in_port_t src, in_port_t dst, - union epoll_ref ref, - const struct timespec *now) + flow_sidx_t tosidx) { - int s; - - if (ref.udp.v6) { - udp_splice_to.sa6 = (struct sockaddr_in6) { - .sin6_family = AF_INET6, - .sin6_addr = in6addr_loopback, - .sin6_port = htons(dst), - }; - } else { - udp_splice_to.sa4 = (struct sockaddr_in) { - .sin_family = AF_INET, - .sin_addr = in4addr_loopback, - .sin_port = htons(dst), - }; - } - - if (ref.udp.pif == PIF_SPLICE) { - src += c->udp.fwd_in.rdelta[src]; - s = udp_splice_init[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) - s = udp_splice_new(c, ref.udp.v6, src, false); - - if (s < 0) - return; - - udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_init[ref.udp.v6][src].ts = now->tv_sec; - } else { - ASSERT(ref.udp.pif == PIF_HOST); - src += c->udp.fwd_out.rdelta[src]; - s = udp_splice_ns[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) { - struct udp_splice_new_ns_arg arg = { - c, ref.udp.v6, src, -1, - }; - - NS_CALL(udp_splice_new_ns, &arg); - s = arg.s; - } - if (s < 0) - return; + const struct flowside *toside = flowside_at_sidx(tosidx); + const struct udp_flow *uflow = udp_at_sidx(tosidx); + uint8_t topif = pif_at_sidx(tosidx); + int s = uflow->s[tosidx.sidei]; + socklen_t sl; - udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec; - } + pif_sockaddr(c, &udp_splice_to, &sl, topif, + &toside->eaddr, toside->eport); sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL); } @@ -984,20 +869,18 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve dstport += c->udp.fwd_in.f.delta[dstport]; /* We divide datagrams into batches based on how we need to send them, - * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx. To avoid - * either two passes through the array, or recalculating splicesrc and - * tosidxfor a single entry, we have to populate it one entry *ahead* of - * the loop counter. + * determined by udp_meta[i].tosidx. To avoid either two passes through + * the array, or recalculating tosidx for a single entry, we have to + * populate it one entry *ahead* of the loop counter. */ - udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv); udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now); for (i = 0; i < n; ) { flow_sidx_t batchsidx = udp_meta[i].tosidx; - int batchsrc = udp_meta[i].splicesrc; + uint8_t batchpif = pif_at_sidx(batchsidx); int batchstart = i; do { - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_prepare(mmh_recv, i); } else { udp_tap_prepare(c, mmh_recv, i, dstport, @@ -1007,17 +890,14 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve if (++i >= n) break; - udp_meta[i].splicesrc = udp_mmh_splice_port(ref, - &mmh_recv[i]); udp_meta[i].tosidx = udp_flow_from_sock(c, ref, &udp_meta[i], now); - } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) && - udp_meta[i].splicesrc == batchsrc); + } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx)); - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_send(c, batchstart, i - batchstart, - batchsrc, dstport, ref, now); + batchsidx); } else { tap_send_frames(c, &udp_l2_iov[batchstart][0], UDP_NUM_IOVS, i - batchstart); @@ -1026,6 +906,40 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve } /** + * udp_reply_sock_handler() - Handle new data from flow specific socket + * @c: Execution context + * @ref: epoll reference + * @events: epoll events bitmap + * @now: Current timestamp + * + * #syscalls recvmmsg + */ +void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now) +{ + const struct flowside *fromside = flowside_at_sidx(ref.flowside); + flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside); + struct udp_flow *uflow = udp_at_sidx(ref.flowside); + int from_s = uflow->s[ref.flowside.sidei]; + bool v6 = !inany_v4(&fromside->eaddr); + struct mmsghdr *mmh_recv = v6 ? udp6_mh_recv : udp4_mh_recv; + int n, i; + + ASSERT(!c->no_udp && uflow); + + if ((n = udp_sock_recv(c, from_s, events, mmh_recv)) <= 0) + return; + + flow_trace(uflow, "Received %d datagrams on reply socket", n); + uflow->ts = now->tv_sec; + + for (i = 0; i < n; i++) + udp_splice_prepare(mmh_recv, i); + + udp_splice_send(c, 0, n, tosidx); +} + +/** * udp_tap_handler() - Handle packets from tap * @c: Execution context * @pif: pif on which the packet is arriving @@ -1419,8 +1333,8 @@ static int udp_port_rebind_outbound(void *arg) * * Return: true if the flow is ready to free, false otherwise */ -bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow, - const struct timespec *now) +bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow, + const struct timespec *now) { if (now->tv_sec - uflow->ts <= UDP_CONN_TIMEOUT) return false; |