From a45a7e97982acc7c9d00fddb0192fbbfcd2030d6 Mon Sep 17 00:00:00 2001 From: David Gibson Date: Thu, 18 Jul 2024 15:26:46 +1000 Subject: udp: Create flows for datagrams from originating sockets This implements the first steps of tracking UDP packets with the flow table rather than its own (buggy) set of port maps. Specifically we create flow table entries for datagrams received from a socket (PIF_HOST or PIF_SPLICE). When splitting datagrams from sockets into batches, we group by the flow as well as splicesrc. This may result in smaller batches, but makes things easier down the line. We can re-optimise this later if necessary. For now we don't do anything else with the flow, not even match reply packets to the same flow. Signed-off-by: David Gibson Signed-off-by: Stefano Brivio --- udp.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 165 insertions(+), 4 deletions(-) (limited to 'udp.c') diff --git a/udp.c b/udp.c index 150f970..fdbe396 100644 --- a/udp.c +++ b/udp.c @@ -15,6 +15,30 @@ /** * DOC: Theory of Operation * + * UDP Flows + * ========= + * + * UDP doesn't have true connections, but many protocols use a connection-like + * format. The flow is initiated by a client sending a datagram from a port of + * its choosing (usually ephemeral) to a specific port (usually well known) on a + * server. Both client and server address must be unicast. The server sends + * replies using the same addresses & ports with src/dest swapped. + * + * We track pseudo-connections of this type as flow table entries of type + * FLOW_UDP. We store the time of the last traffic on the flow in uflow->ts, + * and let the flow expire if there is no traffic for UDP_CONN_TIMEOUT seconds. + * + * NOTE: This won't handle multicast protocols, or some protocols with different + * port usage. We'll need specific logic if we want to handle those. + * + * "Listening" sockets + * =================== + * + * UDP doesn't use listen(), but we consider long term sockets which are allowed + * to create new flows "listening" by analogy with TCP. + * + * Port tracking + * ============= * * For UDP, a reduced version of port-based connection tracking is implemented * with two purposes: @@ -122,6 +146,7 @@ #include "tap.h" #include "pcap.h" #include "log.h" +#include "flow_table.h" #define UDP_CONN_TIMEOUT 180 /* s, timeout for ephemeral or local bind */ #define UDP_MAX_FRAMES 32 /* max # of frames to receive at once */ @@ -200,6 +225,7 @@ static struct ethhdr udp6_eth_hdr; * @taph: Tap backend specific header * @s_in: Source socket address, filled in by recvmmsg() * @splicesrc: Source port for splicing, or -1 if not spliceable + * @tosidx: sidx for the destination side of this datagram's flow */ static struct udp_meta_t { struct ipv6hdr ip6h; @@ -208,6 +234,7 @@ static struct udp_meta_t { union sockaddr_inany s_in; int splicesrc; + flow_sidx_t tosidx; } #ifdef __AVX2__ __attribute__ ((aligned(32))) @@ -491,6 +518,115 @@ static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh) return -1; } +/** + * udp_at_sidx() - Get UDP specific flow at given sidx + * @sidx: Flow and side to retrieve + * + * Return: UDP specific flow at @sidx, or NULL of @sidx is invalid. Asserts if + * the flow at @sidx is not FLOW_UDP. + */ +struct udp_flow *udp_at_sidx(flow_sidx_t sidx) +{ + union flow *flow = flow_at_sidx(sidx); + + if (!flow) + return NULL; + + ASSERT(flow->f.type == FLOW_UDP); + return &flow->udp; +} + +/* + * udp_flow_close() - Close and clean up UDP flow + * @c: Execution context + * @uflow: UDP flow + */ +static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow) +{ + flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE)); +} + +/** + * udp_flow_new() - Common setup for a new UDP flow + * @c: Execution context + * @flow: Initiated flow + * @now: Timestamp + * + * Return: UDP specific flow, if successful, NULL on failure + */ +static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow, + const struct timespec *now) +{ + const struct flowside *ini = &flow->f.side[INISIDE]; + struct udp_flow *uflow = NULL; + + if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) { + flow_trace(flow, "Invalid endpoint to initiate UDP flow"); + goto cancel; + } + + if (!flow_target(c, flow, IPPROTO_UDP)) + goto cancel; + + uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp); + uflow->ts = now->tv_sec; + + flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE)); + FLOW_ACTIVATE(uflow); + + return FLOW_SIDX(uflow, TGTSIDE); + +cancel: + if (uflow) + udp_flow_close(c, uflow); + flow_alloc_cancel(flow); + return FLOW_SIDX_NONE; + +} + +/** + * udp_flow_from_sock() - Find or create UDP flow for "listening" socket + * @c: Execution context + * @ref: epoll reference of the receiving socket + * @meta: Metadata buffer for the datagram + * @now: Timestamp + * + * Return: sidx for the destination side of the flow for this packet, or + * FLOW_SIDX_NONE if we couldn't find or create a flow. + */ +static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref, + struct udp_meta_t *meta, + const struct timespec *now) +{ + struct udp_flow *uflow; + union flow *flow; + flow_sidx_t sidx; + + ASSERT(ref.type == EPOLL_TYPE_UDP); + + /* FIXME: Match reply packets to their flow as well */ + if (!ref.udp.orig) + return FLOW_SIDX_NONE; + + sidx = flow_lookup_sa(c, IPPROTO_UDP, ref.udp.pif, &meta->s_in, ref.udp.port); + if ((uflow = udp_at_sidx(sidx))) { + uflow->ts = now->tv_sec; + return flow_sidx_opposite(sidx); + } + + if (!(flow = flow_alloc())) { + char sastr[SOCKADDR_STRLEN]; + + debug("Couldn't allocate flow for UDP datagram from %s %s", + pif_name(ref.udp.pif), + sockaddr_ntop(&meta->s_in, sastr, sizeof(sastr))); + return FLOW_SIDX_NONE; + } + + flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port); + return udp_flow_new(c, flow, now); +} + /** * udp_splice_prepare() - Prepare one datagram for splicing * @mmh: Receiving mmsghdr array @@ -848,12 +984,15 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve dstport += c->udp.fwd_in.f.delta[dstport]; /* We divide datagrams into batches based on how we need to send them, - * determined by udp_meta[i].splicesrc. To avoid either two passes - * through the array, or recalculating splicesrc for a single entry, we - * have to populate it one entry *ahead* of the loop counter. + * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx. To avoid + * either two passes through the array, or recalculating splicesrc and + * tosidxfor a single entry, we have to populate it one entry *ahead* of + * the loop counter. */ udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv); + udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now); for (i = 0; i < n; ) { + flow_sidx_t batchsidx = udp_meta[i].tosidx; int batchsrc = udp_meta[i].splicesrc; int batchstart = i; @@ -870,7 +1009,11 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve udp_meta[i].splicesrc = udp_mmh_splice_port(ref, &mmh_recv[i]); - } while (udp_meta[i].splicesrc == batchsrc); + udp_meta[i].tosidx = udp_flow_from_sock(c, ref, + &udp_meta[i], + now); + } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) && + udp_meta[i].splicesrc == batchsrc); if (batchsrc >= 0) { udp_splice_send(c, batchstart, i - batchstart, @@ -1268,6 +1411,24 @@ static int udp_port_rebind_outbound(void *arg) return 0; } +/** + * udp_flow_timer() - Handler for timed events related to a given flow + * @c: Execution context + * @uflow: UDP flow + * @now: Current timestamp + * + * Return: true if the flow is ready to free, false otherwise + */ +bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow, + const struct timespec *now) +{ + if (now->tv_sec - uflow->ts <= UDP_CONN_TIMEOUT) + return false; + + udp_flow_close(c, uflow); + return true; +} + /** * udp_timer() - Scan activity bitmaps for ports with associated timed events * @c: Execution context -- cgit v1.2.3