From e0647ad80c63fcad6a9dc31541881fa02aeaac98 Mon Sep 17 00:00:00 2001 From: David Gibson Date: Thu, 18 Jul 2024 15:26:47 +1000 Subject: udp: Handle "spliced" datagrams with per-flow sockets When forwarding a datagram to a socket, we need to find a socket with a suitable local address to send it. Currently we keep track of such sockets in an array indexed by local port, but this can't properly handle cases where we have multiple local addresses in active use. For "spliced" (socket to socket) cases, improve this by instead opening a socket specifically for the target side of the flow. We connect() as well as bind()ing that socket, so that it will only receive the flow's reply packets, not anything else. We direct datagrams sent via that socket using the addresses from the flow table, effectively replacing bespoke addressing logic with the unified logic in fwd.c When we create the flow, we also take a duplicate of the originating socket, and use that to deliver reply datagrams back to the origin, again using addresses from the flow table entry. Signed-off-by: David Gibson Signed-off-by: Stefano Brivio --- epoll_type.h | 2 + flow.c | 20 +++ flow.h | 2 + flow_table.h | 15 ++ passt.c | 4 + udp.c | 436 ++++++++++++++++++++++++----------------------------------- udp.h | 6 +- udp_flow.h | 4 +- util.c | 1 + 9 files changed, 226 insertions(+), 264 deletions(-) diff --git a/epoll_type.h b/epoll_type.h index b6c0419..7a752ed 100644 --- a/epoll_type.h +++ b/epoll_type.h @@ -22,6 +22,8 @@ enum epoll_type { EPOLL_TYPE_TCP_TIMER, /* UDP sockets */ EPOLL_TYPE_UDP, + /* UDP socket for replies on a specific flow */ + EPOLL_TYPE_UDP_REPLY, /* ICMP/ICMPv6 ping sockets */ EPOLL_TYPE_PING, /* inotify fd watching for end of netns (pasta) */ diff --git a/flow.c b/flow.c index 4e337d4..d7d548d 100644 --- a/flow.c +++ b/flow.c @@ -237,6 +237,26 @@ int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif, } } +/** flowside_connect() - Connect a socket based on flowside + * @c: Execution context + * @s: Socket to connect + * @pif: Target pif + * @tgt: Target flowside + * + * Connect @s to the endpoint address and port from @tgt. + * + * Return: 0 on success, negative on error + */ +int flowside_connect(const struct ctx *c, int s, + uint8_t pif, const struct flowside *tgt) +{ + union sockaddr_inany sa; + socklen_t sl; + + pif_sockaddr(c, &sa, &sl, pif, &tgt->eaddr, tgt->eport); + return connect(s, &sa.sa, sl); +} + /** flow_log_ - Log flow-related message * @f: flow the message is related to * @pri: Log priority diff --git a/flow.h b/flow.h index 7866477..078fd60 100644 --- a/flow.h +++ b/flow.h @@ -168,6 +168,8 @@ static inline bool flowside_eq(const struct flowside *left, int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif, const struct flowside *tgt, uint32_t data); +int flowside_connect(const struct ctx *c, int s, + uint8_t pif, const struct flowside *tgt); /** * struct flow_common - Common fields for packet flows diff --git a/flow_table.h b/flow_table.h index df253be..a499e7b 100644 --- a/flow_table.h +++ b/flow_table.h @@ -100,6 +100,21 @@ static inline uint8_t pif_at_sidx(flow_sidx_t sidx) return flow->f.pif[sidx.sidei]; } +/** flowside_at_sidx() - Retrieve a specific flowside + * @sidx: Flow & side index + * + * Return: Flowside for the flow & side given by @sidx + */ +static inline const struct flowside *flowside_at_sidx(flow_sidx_t sidx) +{ + const union flow *flow = flow_at_sidx(sidx); + + if (!flow) + return PIF_NONE; + + return &flow->f.side[sidx.sidei]; +} + /** flow_sidx_opposite() - Get the other side of the same flow * @sidx: Flow & side index * diff --git a/passt.c b/passt.c index e4d45da..f9405be 100644 --- a/passt.c +++ b/passt.c @@ -67,6 +67,7 @@ char *epoll_type_str[] = { [EPOLL_TYPE_TCP_LISTEN] = "listening TCP socket", [EPOLL_TYPE_TCP_TIMER] = "TCP timer", [EPOLL_TYPE_UDP] = "UDP socket", + [EPOLL_TYPE_UDP_REPLY] = "UDP reply socket", [EPOLL_TYPE_PING] = "ICMP/ICMPv6 ping socket", [EPOLL_TYPE_NSQUIT_INOTIFY] = "namespace inotify watch", [EPOLL_TYPE_NSQUIT_TIMER] = "namespace timer watch", @@ -349,6 +350,9 @@ loop: case EPOLL_TYPE_UDP: udp_buf_sock_handler(&c, ref, eventmask, &now); break; + case EPOLL_TYPE_UDP_REPLY: + udp_reply_sock_handler(&c, ref, eventmask, &now); + break; case EPOLL_TYPE_PING: icmp_sock_handler(&c, ref); break; diff --git a/udp.c b/udp.c index fdbe396..5543e61 100644 --- a/udp.c +++ b/udp.c @@ -35,7 +35,44 @@ * =================== * * UDP doesn't use listen(), but we consider long term sockets which are allowed - * to create new flows "listening" by analogy with TCP. + * to create new flows "listening" by analogy with TCP. This listening socket + * could receive packets from multiple flows, so we use a hash table match to + * find the specific flow for a datagram. + * + * When a UDP flow is initiated from a listening socket we take a duplicate of + * the socket and store it in uflow->s[INISIDE]. This will last for the + * lifetime of the flow, even if the original listening socket is closed due to + * port auto-probing. The duplicate is used to deliver replies back to the + * originating side. + * + * Reply sockets + * ============= + * + * When a UDP flow targets a socket, we create a "reply" socket in + * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive + * replies on the target side. This socket is both bound and connected and has + * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams + * associated with this flow, so the epoll reference directly points to the flow + * and we don't need a hash lookup. + * + * NOTE: it's possible that the reply socket could have a bound address + * overlapping with an unrelated listening socket. We assume datagrams for the + * flow will come to the reply socket in preference to a listening socket. The + * sample program doc/platform-requirements/reuseaddr-priority.c documents and + * tests that assumption. + * + * "Spliced" flows + * =============== + * + * In PASTA mode, L2-L4 translation is skipped for connections to ports bound + * between namespaces using the loopback interface, messages are directly + * transferred between L4 sockets instead. These are called spliced connections + * in analogy with the TCP implementation. The the splice() syscall isn't + * actually used; it doesn't make sense for datagrams and instead a pair of + * recvmmsg() and sendmmsg() is used to forward the datagrams. + * + * Note that a spliced flow will have *both* a duplicated listening socket and a + * reply socket (see above). * * Port tracking * ============= @@ -56,62 +93,6 @@ * * Packets are forwarded back and forth, by prepending and stripping UDP headers * in the obvious way, with no port translation. - * - * In PASTA mode, the L2-L4 translation is skipped for connections to ports - * bound between namespaces using the loopback interface, messages are directly - * transferred between L4 sockets instead. These are called spliced connections - * for consistency with the TCP implementation, but the splice() syscall isn't - * actually used as it wouldn't make sense for datagram-based connections: a - * pair of recvmmsg() and sendmmsg() deals with this case. - * - * The connection tracking for PASTA mode is slightly complicated by the absence - * of actual connections, see struct udp_splice_port, and these examples: - * - * - from init to namespace: - * - * - forward direction: 127.0.0.1:5000 -> 127.0.0.1:80 in init from socket s, - * with epoll reference: index = 80, splice = 1, orig = 1, ns = 0 - * - if udp_splice_ns[V4][5000].sock: - * - send packet to udp_splice_ns[V4][5000].sock, with destination port - * 80 - * - otherwise: - * - create new socket udp_splice_ns[V4][5000].sock - * - bind in namespace to 127.0.0.1:5000 - * - add to epoll with reference: index = 5000, splice = 1, orig = 0, - * ns = 1 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - * - reverse direction: 127.0.0.1:80 -> 127.0.0.1:5000 in namespace socket s, - * having epoll reference: index = 5000, splice = 1, orig = 0, ns = 1 - * - if udp_splice_init[V4][80].sock: - * - send to udp_splice_init[V4][80].sock, with destination port 5000 - * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with - * current time - * - otherwise, discard - * - * - from namespace to init: - * - * - forward direction: 127.0.0.1:2000 -> 127.0.0.1:22 in namespace from - * socket s, with epoll reference: index = 22, splice = 1, orig = 1, ns = 1 - * - if udp4_splice_init[V4][2000].sock: - * - send packet to udp_splice_init[V4][2000].sock, with destination - * port 22 - * - otherwise: - * - create new socket udp_splice_init[V4][2000].sock - * - bind in init to 127.0.0.1:2000 - * - add to epoll with reference: index = 2000, splice = 1, orig = 0, - * ns = 0 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - * - reverse direction: 127.0.0.1:22 -> 127.0.0.1:2000 in init from socket s, - * having epoll reference: index = 2000, splice = 1, orig = 0, ns = 0 - * - if udp_splice_ns[V4][22].sock: - * - send to udp_splice_ns[V4][22].sock, with destination port 2000 - * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with - * current time - * - otherwise, discard */ #include @@ -134,6 +115,7 @@ #include #include #include +#include #include #include "checksum.h" @@ -224,7 +206,6 @@ static struct ethhdr udp6_eth_hdr; * @ip4h: Pre-filled IPv4 header (except for tot_len and saddr) * @taph: Tap backend specific header * @s_in: Source socket address, filled in by recvmmsg() - * @splicesrc: Source port for splicing, or -1 if not spliceable * @tosidx: sidx for the destination side of this datagram's flow */ static struct udp_meta_t { @@ -233,7 +214,6 @@ static struct udp_meta_t { struct tap_hdr taph; union sockaddr_inany s_in; - int splicesrc; flow_sidx_t tosidx; } #ifdef __AVX2__ @@ -271,7 +251,6 @@ static struct mmsghdr udp_mh_splice [UDP_MAX_FRAMES]; /* IOVs for L2 frames */ static struct iovec udp_l2_iov [UDP_MAX_FRAMES][UDP_NUM_IOVS]; - /** * udp_portmap_clear() - Clear UDP port map before configuration */ @@ -384,140 +363,6 @@ static void udp_iov_init(const struct ctx *c) udp_iov_init_one(c, i); } -/** - * udp_splice_new() - Create and prepare socket for "spliced" binding - * @c: Execution context - * @v6: Set for IPv6 sockets - * @src: Source port of original connection, host order - * @ns: Does the splice originate in the ns or not - * - * Return: prepared socket, negative error code on failure - * - * #syscalls:pasta getsockname - */ -int udp_splice_new(const struct ctx *c, int v6, in_port_t src, bool ns) -{ - struct epoll_event ev = { .events = EPOLLIN | EPOLLRDHUP | EPOLLHUP }; - union epoll_ref ref = { .type = EPOLL_TYPE_UDP, - .udp = { .splice = true, .v6 = v6, .port = src } - }; - struct udp_splice_port *sp; - int act, s; - - if (ns) { - ref.udp.pif = PIF_SPLICE; - sp = &udp_splice_ns[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_NS; - } else { - ref.udp.pif = PIF_HOST; - sp = &udp_splice_init[v6 ? V6 : V4][src]; - act = UDP_ACT_SPLICE_INIT; - } - - s = socket(v6 ? AF_INET6 : AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, - IPPROTO_UDP); - - if (s > FD_REF_MAX) { - close(s); - return -EIO; - } - - if (s < 0) - return s; - - ref.fd = s; - - if (v6) { - struct sockaddr_in6 addr6 = { - .sin6_family = AF_INET6, - .sin6_port = htons(src), - .sin6_addr = IN6ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr6, sizeof(addr6))) - goto fail; - } else { - struct sockaddr_in addr4 = { - .sin_family = AF_INET, - .sin_port = htons(src), - .sin_addr = IN4ADDR_LOOPBACK_INIT, - }; - if (bind(s, (struct sockaddr *)&addr4, sizeof(addr4))) - goto fail; - } - - sp->sock = s; - bitmap_set(udp_act[v6 ? V6 : V4][act], src); - - ev.data.u64 = ref.u64; - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, s, &ev); - return s; - -fail: - close(s); - return -1; -} - -/** - * struct udp_splice_new_ns_arg - Arguments for udp_splice_new_ns() - * @c: Execution context - * @v6: Set for IPv6 - * @src: Source port of originating datagram, host order - * @dst: Destination port of originating datagram, host order - * @s: Newly created socket or negative error code - */ -struct udp_splice_new_ns_arg { - const struct ctx *c; - int v6; - in_port_t src; - int s; -}; - -/** - * udp_splice_new_ns() - Enter namespace and call udp_splice_new() - * @arg: See struct udp_splice_new_ns_arg - * - * Return: 0 - */ -static int udp_splice_new_ns(void *arg) -{ - struct udp_splice_new_ns_arg *a; - - a = (struct udp_splice_new_ns_arg *)arg; - - ns_enter(a->c); - - a->s = udp_splice_new(a->c, a->v6, a->src, true); - - return 0; -} - -/** - * udp_mmh_splice_port() - Is source address of message suitable for splicing? - * @ref: epoll reference for incoming message's origin socket - * @mmh: mmsghdr of incoming message - * - * Return: if source address of message in @mmh refers to localhost (127.0.0.1 - * or ::1) its source port (host order), otherwise -1. - */ -static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh) -{ - const struct sockaddr_in6 *sa6 = mmh->msg_hdr.msg_name; - const struct sockaddr_in *sa4 = mmh->msg_hdr.msg_name; - - ASSERT(ref.type == EPOLL_TYPE_UDP); - - if (!ref.udp.splice) - return -1; - - if (ref.udp.v6 && IN6_IS_ADDR_LOOPBACK(&sa6->sin6_addr)) - return ntohs(sa6->sin6_port); - - if (!ref.udp.v6 && IN4_IS_ADDR_LOOPBACK(&sa4->sin_addr)) - return ntohs(sa4->sin_port); - - return -1; -} - /** * udp_at_sidx() - Get UDP specific flow at given sidx * @sidx: Flow and side to retrieve @@ -541,8 +386,20 @@ struct udp_flow *udp_at_sidx(flow_sidx_t sidx) * @c: Execution context * @uflow: UDP flow */ -static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) +static void udp_flow_close(const struct ctx *c, struct udp_flow *uflow) { + if (uflow->s[INISIDE] >= 0) { + /* The listening socket needs to stay in epoll */ + close(uflow->s[INISIDE]); + uflow->s[INISIDE] = -1; + } + + if (uflow->s[TGTSIDE] >= 0) { + /* But the flow specific one needs to be removed */ + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, uflow->s[TGTSIDE], NULL); + close(uflow->s[TGTSIDE]); + uflow->s[TGTSIDE] = -1; + } flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE)); } @@ -550,26 +407,92 @@ static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) * udp_flow_new() - Common setup for a new UDP flow * @c: Execution context * @flow: Initiated flow + * @s_ini: Initiating socket (or -1) * @now: Timestamp * * Return: UDP specific flow, if successful, NULL on failure */ static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow, - const struct timespec *now) + int s_ini, const struct timespec *now) { const struct flowside *ini = &flow->f.side[INISIDE]; struct udp_flow *uflow = NULL; + const struct flowside *tgt; + uint8_t tgtpif; if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) { flow_trace(flow, "Invalid endpoint to initiate UDP flow"); goto cancel; } - if (!flow_target(c, flow, IPPROTO_UDP)) + if (!(tgt = flow_target(c, flow, IPPROTO_UDP))) goto cancel; + tgtpif = flow->f.pif[TGTSIDE]; uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp); uflow->ts = now->tv_sec; + uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1; + + if (s_ini >= 0) { + /* When using auto port-scanning the listening port could go + * away, so we need to duplicate the socket + */ + uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0); + if (uflow->s[INISIDE] < 0) { + flow_err(uflow, + "Couldn't duplicate listening socket: %s", + strerror(errno)); + goto cancel; + } + } + + if (pif_is_socket(tgtpif)) { + struct mmsghdr discard[UIO_MAXIOV] = { 0 }; + union { + flow_sidx_t sidx; + uint32_t data; + } fref = { + .sidx = FLOW_SIDX(flow, TGTSIDE), + }; + int rc; + + uflow->s[TGTSIDE] = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY, + tgtpif, tgt, fref.data); + if (uflow->s[TGTSIDE] < 0) { + flow_dbg(uflow, + "Couldn't open socket for spliced flow: %s", + strerror(errno)); + goto cancel; + } + + if (flowside_connect(c, uflow->s[TGTSIDE], tgtpif, tgt) < 0) { + flow_dbg(uflow, + "Couldn't connect flow socket: %s", + strerror(errno)); + goto cancel; + } + + /* It's possible, if unlikely, that we could receive some + * unrelated packets in between the bind() and connect() of this + * socket. For now we just discard these. We could consider + * trying to redirect these to an appropriate handler, if we + * need to. + */ + rc = recvmmsg(uflow->s[TGTSIDE], discard, ARRAY_SIZE(discard), + MSG_DONTWAIT, NULL); + if (rc >= ARRAY_SIZE(discard)) { + flow_dbg(uflow, + "Too many (%d) spurious reply datagrams", rc); + goto cancel; + } else if (rc > 0) { + flow_trace(uflow, + "Discarded %d spurious reply datagrams", rc); + } else if (errno != EAGAIN) { + flow_err(uflow, + "Unexpected error discarding datagrams: %s", + strerror(errno)); + } + } flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE)); FLOW_ACTIVATE(uflow); @@ -581,7 +504,6 @@ cancel: udp_flow_close(c, uflow); flow_alloc_cancel(flow); return FLOW_SIDX_NONE; - } /** @@ -591,6 +513,8 @@ cancel: * @meta: Metadata buffer for the datagram * @now: Timestamp * + * #syscalls fcntl + * * Return: sidx for the destination side of the flow for this packet, or * FLOW_SIDX_NONE if we couldn't find or create a flow. */ @@ -624,7 +548,7 @@ static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref, } flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port); - return udp_flow_new(c, flow, now); + return udp_flow_new(c, flow, ref.fd, now); } /** @@ -648,55 +572,16 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx) * @now: Timestamp */ static void udp_splice_send(const struct ctx *c, size_t start, size_t n, - in_port_t src, in_port_t dst, - union epoll_ref ref, - const struct timespec *now) + flow_sidx_t tosidx) { - int s; - - if (ref.udp.v6) { - udp_splice_to.sa6 = (struct sockaddr_in6) { - .sin6_family = AF_INET6, - .sin6_addr = in6addr_loopback, - .sin6_port = htons(dst), - }; - } else { - udp_splice_to.sa4 = (struct sockaddr_in) { - .sin_family = AF_INET, - .sin_addr = in4addr_loopback, - .sin_port = htons(dst), - }; - } - - if (ref.udp.pif == PIF_SPLICE) { - src += c->udp.fwd_in.rdelta[src]; - s = udp_splice_init[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) - s = udp_splice_new(c, ref.udp.v6, src, false); - - if (s < 0) - return; - - udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_init[ref.udp.v6][src].ts = now->tv_sec; - } else { - ASSERT(ref.udp.pif == PIF_HOST); - src += c->udp.fwd_out.rdelta[src]; - s = udp_splice_ns[ref.udp.v6][src].sock; - if (s < 0 && ref.udp.orig) { - struct udp_splice_new_ns_arg arg = { - c, ref.udp.v6, src, -1, - }; - - NS_CALL(udp_splice_new_ns, &arg); - s = arg.s; - } - if (s < 0) - return; + const struct flowside *toside = flowside_at_sidx(tosidx); + const struct udp_flow *uflow = udp_at_sidx(tosidx); + uint8_t topif = pif_at_sidx(tosidx); + int s = uflow->s[tosidx.sidei]; + socklen_t sl; - udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec; - udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec; - } + pif_sockaddr(c, &udp_splice_to, &sl, topif, + &toside->eaddr, toside->eport); sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL); } @@ -984,20 +869,18 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve dstport += c->udp.fwd_in.f.delta[dstport]; /* We divide datagrams into batches based on how we need to send them, - * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx. To avoid - * either two passes through the array, or recalculating splicesrc and - * tosidxfor a single entry, we have to populate it one entry *ahead* of - * the loop counter. + * determined by udp_meta[i].tosidx. To avoid either two passes through + * the array, or recalculating tosidx for a single entry, we have to + * populate it one entry *ahead* of the loop counter. */ - udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv); udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now); for (i = 0; i < n; ) { flow_sidx_t batchsidx = udp_meta[i].tosidx; - int batchsrc = udp_meta[i].splicesrc; + uint8_t batchpif = pif_at_sidx(batchsidx); int batchstart = i; do { - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_prepare(mmh_recv, i); } else { udp_tap_prepare(c, mmh_recv, i, dstport, @@ -1007,17 +890,14 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve if (++i >= n) break; - udp_meta[i].splicesrc = udp_mmh_splice_port(ref, - &mmh_recv[i]); udp_meta[i].tosidx = udp_flow_from_sock(c, ref, &udp_meta[i], now); - } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) && - udp_meta[i].splicesrc == batchsrc); + } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx)); - if (batchsrc >= 0) { + if (pif_is_socket(batchpif)) { udp_splice_send(c, batchstart, i - batchstart, - batchsrc, dstport, ref, now); + batchsidx); } else { tap_send_frames(c, &udp_l2_iov[batchstart][0], UDP_NUM_IOVS, i - batchstart); @@ -1025,6 +905,40 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve } } +/** + * udp_reply_sock_handler() - Handle new data from flow specific socket + * @c: Execution context + * @ref: epoll reference + * @events: epoll events bitmap + * @now: Current timestamp + * + * #syscalls recvmmsg + */ +void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now) +{ + const struct flowside *fromside = flowside_at_sidx(ref.flowside); + flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside); + struct udp_flow *uflow = udp_at_sidx(ref.flowside); + int from_s = uflow->s[ref.flowside.sidei]; + bool v6 = !inany_v4(&fromside->eaddr); + struct mmsghdr *mmh_recv = v6 ? udp6_mh_recv : udp4_mh_recv; + int n, i; + + ASSERT(!c->no_udp && uflow); + + if ((n = udp_sock_recv(c, from_s, events, mmh_recv)) <= 0) + return; + + flow_trace(uflow, "Received %d datagrams on reply socket", n); + uflow->ts = now->tv_sec; + + for (i = 0; i < n; i++) + udp_splice_prepare(mmh_recv, i); + + udp_splice_send(c, 0, n, tosidx); +} + /** * udp_tap_handler() - Handle packets from tap * @c: Execution context @@ -1419,8 +1333,8 @@ static int udp_port_rebind_outbound(void *arg) * * Return: true if the flow is ready to free, false otherwise */ -bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow, - const struct timespec *now) +bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow, + const struct timespec *now) { if (now->tv_sec - uflow->ts <= UDP_CONN_TIMEOUT) return false; diff --git a/udp.h b/udp.h index 5865def..db5e546 100644 --- a/udp.h +++ b/udp.h @@ -9,8 +9,10 @@ #define UDP_TIMER_INTERVAL 1000 /* ms */ void udp_portmap_clear(void); -void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events, - const struct timespec *now); +void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now); +void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now); int udp_tap_handler(struct ctx *c, uint8_t pif, sa_family_t af, const void *saddr, const void *daddr, const struct pool *p, int idx, const struct timespec *now); diff --git a/udp_flow.h b/udp_flow.h index 18af9ac..e0736f8 100644 --- a/udp_flow.h +++ b/udp_flow.h @@ -11,15 +11,17 @@ * struct udp - Descriptor for a flow of UDP packets * @f: Generic flow information * @ts: Activity timestamp + * @s: Socket fd (or -1) for each side of the flow */ struct udp_flow { /* Must be first element */ struct flow_common f; time_t ts; + int s[SIDES]; }; -bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow, +bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow, const struct timespec *now); #endif /* UDP_FLOW_H */ diff --git a/util.c b/util.c index 6b51fc5..8dc8ff7 100644 --- a/util.c +++ b/util.c @@ -62,6 +62,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type, socktype = SOCK_STREAM | SOCK_NONBLOCK; break; case EPOLL_TYPE_UDP: + case EPOLL_TYPE_UDP_REPLY: proto = IPPROTO_UDP; socktype = SOCK_DGRAM | SOCK_NONBLOCK; break; -- cgit v1.2.3