aboutgitcodebugslistschat
diff options
context:
space:
mode:
-rw-r--r--epoll_type.h2
-rw-r--r--flow.c20
-rw-r--r--flow.h2
-rw-r--r--flow_table.h15
-rw-r--r--passt.c4
-rw-r--r--udp.c436
-rw-r--r--udp.h6
-rw-r--r--udp_flow.h4
-rw-r--r--util.c1
9 files changed, 226 insertions, 264 deletions
diff --git a/epoll_type.h b/epoll_type.h
index b6c0419..7a752ed 100644
--- a/epoll_type.h
+++ b/epoll_type.h
@@ -22,6 +22,8 @@ enum epoll_type {
EPOLL_TYPE_TCP_TIMER,
/* UDP sockets */
EPOLL_TYPE_UDP,
+ /* UDP socket for replies on a specific flow */
+ EPOLL_TYPE_UDP_REPLY,
/* ICMP/ICMPv6 ping sockets */
EPOLL_TYPE_PING,
/* inotify fd watching for end of netns (pasta) */
diff --git a/flow.c b/flow.c
index 4e337d4..d7d548d 100644
--- a/flow.c
+++ b/flow.c
@@ -237,6 +237,26 @@ int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif,
}
}
+/** flowside_connect() - Connect a socket based on flowside
+ * @c: Execution context
+ * @s: Socket to connect
+ * @pif: Target pif
+ * @tgt: Target flowside
+ *
+ * Connect @s to the endpoint address and port from @tgt.
+ *
+ * Return: 0 on success, negative on error
+ */
+int flowside_connect(const struct ctx *c, int s,
+ uint8_t pif, const struct flowside *tgt)
+{
+ union sockaddr_inany sa;
+ socklen_t sl;
+
+ pif_sockaddr(c, &sa, &sl, pif, &tgt->eaddr, tgt->eport);
+ return connect(s, &sa.sa, sl);
+}
+
/** flow_log_ - Log flow-related message
* @f: flow the message is related to
* @pri: Log priority
diff --git a/flow.h b/flow.h
index 7866477..078fd60 100644
--- a/flow.h
+++ b/flow.h
@@ -168,6 +168,8 @@ static inline bool flowside_eq(const struct flowside *left,
int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif,
const struct flowside *tgt, uint32_t data);
+int flowside_connect(const struct ctx *c, int s,
+ uint8_t pif, const struct flowside *tgt);
/**
* struct flow_common - Common fields for packet flows
diff --git a/flow_table.h b/flow_table.h
index df253be..a499e7b 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -100,6 +100,21 @@ static inline uint8_t pif_at_sidx(flow_sidx_t sidx)
return flow->f.pif[sidx.sidei];
}
+/** flowside_at_sidx() - Retrieve a specific flowside
+ * @sidx: Flow & side index
+ *
+ * Return: Flowside for the flow & side given by @sidx
+ */
+static inline const struct flowside *flowside_at_sidx(flow_sidx_t sidx)
+{
+ const union flow *flow = flow_at_sidx(sidx);
+
+ if (!flow)
+ return PIF_NONE;
+
+ return &flow->f.side[sidx.sidei];
+}
+
/** flow_sidx_opposite() - Get the other side of the same flow
* @sidx: Flow & side index
*
diff --git a/passt.c b/passt.c
index e4d45da..f9405be 100644
--- a/passt.c
+++ b/passt.c
@@ -67,6 +67,7 @@ char *epoll_type_str[] = {
[EPOLL_TYPE_TCP_LISTEN] = "listening TCP socket",
[EPOLL_TYPE_TCP_TIMER] = "TCP timer",
[EPOLL_TYPE_UDP] = "UDP socket",
+ [EPOLL_TYPE_UDP_REPLY] = "UDP reply socket",
[EPOLL_TYPE_PING] = "ICMP/ICMPv6 ping socket",
[EPOLL_TYPE_NSQUIT_INOTIFY] = "namespace inotify watch",
[EPOLL_TYPE_NSQUIT_TIMER] = "namespace timer watch",
@@ -349,6 +350,9 @@ loop:
case EPOLL_TYPE_UDP:
udp_buf_sock_handler(&c, ref, eventmask, &now);
break;
+ case EPOLL_TYPE_UDP_REPLY:
+ udp_reply_sock_handler(&c, ref, eventmask, &now);
+ break;
case EPOLL_TYPE_PING:
icmp_sock_handler(&c, ref);
break;
diff --git a/udp.c b/udp.c
index fdbe396..5543e61 100644
--- a/udp.c
+++ b/udp.c
@@ -35,7 +35,44 @@
* ===================
*
* UDP doesn't use listen(), but we consider long term sockets which are allowed
- * to create new flows "listening" by analogy with TCP.
+ * to create new flows "listening" by analogy with TCP. This listening socket
+ * could receive packets from multiple flows, so we use a hash table match to
+ * find the specific flow for a datagram.
+ *
+ * When a UDP flow is initiated from a listening socket we take a duplicate of
+ * the socket and store it in uflow->s[INISIDE]. This will last for the
+ * lifetime of the flow, even if the original listening socket is closed due to
+ * port auto-probing. The duplicate is used to deliver replies back to the
+ * originating side.
+ *
+ * Reply sockets
+ * =============
+ *
+ * When a UDP flow targets a socket, we create a "reply" socket in
+ * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive
+ * replies on the target side. This socket is both bound and connected and has
+ * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams
+ * associated with this flow, so the epoll reference directly points to the flow
+ * and we don't need a hash lookup.
+ *
+ * NOTE: it's possible that the reply socket could have a bound address
+ * overlapping with an unrelated listening socket. We assume datagrams for the
+ * flow will come to the reply socket in preference to a listening socket. The
+ * sample program doc/platform-requirements/reuseaddr-priority.c documents and
+ * tests that assumption.
+ *
+ * "Spliced" flows
+ * ===============
+ *
+ * In PASTA mode, L2-L4 translation is skipped for connections to ports bound
+ * between namespaces using the loopback interface, messages are directly
+ * transferred between L4 sockets instead. These are called spliced connections
+ * in analogy with the TCP implementation. The the splice() syscall isn't
+ * actually used; it doesn't make sense for datagrams and instead a pair of
+ * recvmmsg() and sendmmsg() is used to forward the datagrams.
+ *
+ * Note that a spliced flow will have *both* a duplicated listening socket and a
+ * reply socket (see above).
*
* Port tracking
* =============
@@ -56,62 +93,6 @@
*
* Packets are forwarded back and forth, by prepending and stripping UDP headers
* in the obvious way, with no port translation.
- *
- * In PASTA mode, the L2-L4 translation is skipped for connections to ports
- * bound between namespaces using the loopback interface, messages are directly
- * transferred between L4 sockets instead. These are called spliced connections
- * for consistency with the TCP implementation, but the splice() syscall isn't
- * actually used as it wouldn't make sense for datagram-based connections: a
- * pair of recvmmsg() and sendmmsg() deals with this case.
- *
- * The connection tracking for PASTA mode is slightly complicated by the absence
- * of actual connections, see struct udp_splice_port, and these examples:
- *
- * - from init to namespace:
- *
- * - forward direction: 127.0.0.1:5000 -> 127.0.0.1:80 in init from socket s,
- * with epoll reference: index = 80, splice = 1, orig = 1, ns = 0
- * - if udp_splice_ns[V4][5000].sock:
- * - send packet to udp_splice_ns[V4][5000].sock, with destination port
- * 80
- * - otherwise:
- * - create new socket udp_splice_ns[V4][5000].sock
- * - bind in namespace to 127.0.0.1:5000
- * - add to epoll with reference: index = 5000, splice = 1, orig = 0,
- * ns = 1
- * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with
- * current time
- *
- * - reverse direction: 127.0.0.1:80 -> 127.0.0.1:5000 in namespace socket s,
- * having epoll reference: index = 5000, splice = 1, orig = 0, ns = 1
- * - if udp_splice_init[V4][80].sock:
- * - send to udp_splice_init[V4][80].sock, with destination port 5000
- * - update udp_splice_init[V4][80].ts and udp_splice_ns[V4][5000].ts with
- * current time
- * - otherwise, discard
- *
- * - from namespace to init:
- *
- * - forward direction: 127.0.0.1:2000 -> 127.0.0.1:22 in namespace from
- * socket s, with epoll reference: index = 22, splice = 1, orig = 1, ns = 1
- * - if udp4_splice_init[V4][2000].sock:
- * - send packet to udp_splice_init[V4][2000].sock, with destination
- * port 22
- * - otherwise:
- * - create new socket udp_splice_init[V4][2000].sock
- * - bind in init to 127.0.0.1:2000
- * - add to epoll with reference: index = 2000, splice = 1, orig = 0,
- * ns = 0
- * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with
- * current time
- *
- * - reverse direction: 127.0.0.1:22 -> 127.0.0.1:2000 in init from socket s,
- * having epoll reference: index = 2000, splice = 1, orig = 0, ns = 0
- * - if udp_splice_ns[V4][22].sock:
- * - send to udp_splice_ns[V4][22].sock, with destination port 2000
- * - update udp_splice_ns[V4][22].ts and udp_splice_init[V4][2000].ts with
- * current time
- * - otherwise, discard
*/
#include <sched.h>
@@ -134,6 +115,7 @@
#include <sys/socket.h>
#include <sys/uio.h>
#include <time.h>
+#include <fcntl.h>
#include <linux/errqueue.h>
#include "checksum.h"
@@ -224,7 +206,6 @@ static struct ethhdr udp6_eth_hdr;
* @ip4h: Pre-filled IPv4 header (except for tot_len and saddr)
* @taph: Tap backend specific header
* @s_in: Source socket address, filled in by recvmmsg()
- * @splicesrc: Source port for splicing, or -1 if not spliceable
* @tosidx: sidx for the destination side of this datagram's flow
*/
static struct udp_meta_t {
@@ -233,7 +214,6 @@ static struct udp_meta_t {
struct tap_hdr taph;
union sockaddr_inany s_in;
- int splicesrc;
flow_sidx_t tosidx;
}
#ifdef __AVX2__
@@ -271,7 +251,6 @@ static struct mmsghdr udp_mh_splice [UDP_MAX_FRAMES];
/* IOVs for L2 frames */
static struct iovec udp_l2_iov [UDP_MAX_FRAMES][UDP_NUM_IOVS];
-
/**
* udp_portmap_clear() - Clear UDP port map before configuration
*/
@@ -385,140 +364,6 @@ static void udp_iov_init(const struct ctx *c)
}
/**
- * udp_splice_new() - Create and prepare socket for "spliced" binding
- * @c: Execution context
- * @v6: Set for IPv6 sockets
- * @src: Source port of original connection, host order
- * @ns: Does the splice originate in the ns or not
- *
- * Return: prepared socket, negative error code on failure
- *
- * #syscalls:pasta getsockname
- */
-int udp_splice_new(const struct ctx *c, int v6, in_port_t src, bool ns)
-{
- struct epoll_event ev = { .events = EPOLLIN | EPOLLRDHUP | EPOLLHUP };
- union epoll_ref ref = { .type = EPOLL_TYPE_UDP,
- .udp = { .splice = true, .v6 = v6, .port = src }
- };
- struct udp_splice_port *sp;
- int act, s;
-
- if (ns) {
- ref.udp.pif = PIF_SPLICE;
- sp = &udp_splice_ns[v6 ? V6 : V4][src];
- act = UDP_ACT_SPLICE_NS;
- } else {
- ref.udp.pif = PIF_HOST;
- sp = &udp_splice_init[v6 ? V6 : V4][src];
- act = UDP_ACT_SPLICE_INIT;
- }
-
- s = socket(v6 ? AF_INET6 : AF_INET, SOCK_DGRAM | SOCK_NONBLOCK,
- IPPROTO_UDP);
-
- if (s > FD_REF_MAX) {
- close(s);
- return -EIO;
- }
-
- if (s < 0)
- return s;
-
- ref.fd = s;
-
- if (v6) {
- struct sockaddr_in6 addr6 = {
- .sin6_family = AF_INET6,
- .sin6_port = htons(src),
- .sin6_addr = IN6ADDR_LOOPBACK_INIT,
- };
- if (bind(s, (struct sockaddr *)&addr6, sizeof(addr6)))
- goto fail;
- } else {
- struct sockaddr_in addr4 = {
- .sin_family = AF_INET,
- .sin_port = htons(src),
- .sin_addr = IN4ADDR_LOOPBACK_INIT,
- };
- if (bind(s, (struct sockaddr *)&addr4, sizeof(addr4)))
- goto fail;
- }
-
- sp->sock = s;
- bitmap_set(udp_act[v6 ? V6 : V4][act], src);
-
- ev.data.u64 = ref.u64;
- epoll_ctl(c->epollfd, EPOLL_CTL_ADD, s, &ev);
- return s;
-
-fail:
- close(s);
- return -1;
-}
-
-/**
- * struct udp_splice_new_ns_arg - Arguments for udp_splice_new_ns()
- * @c: Execution context
- * @v6: Set for IPv6
- * @src: Source port of originating datagram, host order
- * @dst: Destination port of originating datagram, host order
- * @s: Newly created socket or negative error code
- */
-struct udp_splice_new_ns_arg {
- const struct ctx *c;
- int v6;
- in_port_t src;
- int s;
-};
-
-/**
- * udp_splice_new_ns() - Enter namespace and call udp_splice_new()
- * @arg: See struct udp_splice_new_ns_arg
- *
- * Return: 0
- */
-static int udp_splice_new_ns(void *arg)
-{
- struct udp_splice_new_ns_arg *a;
-
- a = (struct udp_splice_new_ns_arg *)arg;
-
- ns_enter(a->c);
-
- a->s = udp_splice_new(a->c, a->v6, a->src, true);
-
- return 0;
-}
-
-/**
- * udp_mmh_splice_port() - Is source address of message suitable for splicing?
- * @ref: epoll reference for incoming message's origin socket
- * @mmh: mmsghdr of incoming message
- *
- * Return: if source address of message in @mmh refers to localhost (127.0.0.1
- * or ::1) its source port (host order), otherwise -1.
- */
-static int udp_mmh_splice_port(union epoll_ref ref, const struct mmsghdr *mmh)
-{
- const struct sockaddr_in6 *sa6 = mmh->msg_hdr.msg_name;
- const struct sockaddr_in *sa4 = mmh->msg_hdr.msg_name;
-
- ASSERT(ref.type == EPOLL_TYPE_UDP);
-
- if (!ref.udp.splice)
- return -1;
-
- if (ref.udp.v6 && IN6_IS_ADDR_LOOPBACK(&sa6->sin6_addr))
- return ntohs(sa6->sin6_port);
-
- if (!ref.udp.v6 && IN4_IS_ADDR_LOOPBACK(&sa4->sin_addr))
- return ntohs(sa4->sin_port);
-
- return -1;
-}
-
-/**
* udp_at_sidx() - Get UDP specific flow at given sidx
* @sidx: Flow and side to retrieve
*
@@ -541,8 +386,20 @@ struct udp_flow *udp_at_sidx(flow_sidx_t sidx)
* @c: Execution context
* @uflow: UDP flow
*/
-static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow)
+static void udp_flow_close(const struct ctx *c, struct udp_flow *uflow)
{
+ if (uflow->s[INISIDE] >= 0) {
+ /* The listening socket needs to stay in epoll */
+ close(uflow->s[INISIDE]);
+ uflow->s[INISIDE] = -1;
+ }
+
+ if (uflow->s[TGTSIDE] >= 0) {
+ /* But the flow specific one needs to be removed */
+ epoll_ctl(c->epollfd, EPOLL_CTL_DEL, uflow->s[TGTSIDE], NULL);
+ close(uflow->s[TGTSIDE]);
+ uflow->s[TGTSIDE] = -1;
+ }
flow_hash_remove(c, FLOW_SIDX(uflow, INISIDE));
}
@@ -550,26 +407,92 @@ static void udp_flow_close(const struct ctx *c, const struct udp_flow *uflow)
* udp_flow_new() - Common setup for a new UDP flow
* @c: Execution context
* @flow: Initiated flow
+ * @s_ini: Initiating socket (or -1)
* @now: Timestamp
*
* Return: UDP specific flow, if successful, NULL on failure
*/
static flow_sidx_t udp_flow_new(const struct ctx *c, union flow *flow,
- const struct timespec *now)
+ int s_ini, const struct timespec *now)
{
const struct flowside *ini = &flow->f.side[INISIDE];
struct udp_flow *uflow = NULL;
+ const struct flowside *tgt;
+ uint8_t tgtpif;
if (!inany_is_unicast(&ini->eaddr) || ini->eport == 0) {
flow_trace(flow, "Invalid endpoint to initiate UDP flow");
goto cancel;
}
- if (!flow_target(c, flow, IPPROTO_UDP))
+ if (!(tgt = flow_target(c, flow, IPPROTO_UDP)))
goto cancel;
+ tgtpif = flow->f.pif[TGTSIDE];
uflow = FLOW_SET_TYPE(flow, FLOW_UDP, udp);
uflow->ts = now->tv_sec;
+ uflow->s[INISIDE] = uflow->s[TGTSIDE] = -1;
+
+ if (s_ini >= 0) {
+ /* When using auto port-scanning the listening port could go
+ * away, so we need to duplicate the socket
+ */
+ uflow->s[INISIDE] = fcntl(s_ini, F_DUPFD_CLOEXEC, 0);
+ if (uflow->s[INISIDE] < 0) {
+ flow_err(uflow,
+ "Couldn't duplicate listening socket: %s",
+ strerror(errno));
+ goto cancel;
+ }
+ }
+
+ if (pif_is_socket(tgtpif)) {
+ struct mmsghdr discard[UIO_MAXIOV] = { 0 };
+ union {
+ flow_sidx_t sidx;
+ uint32_t data;
+ } fref = {
+ .sidx = FLOW_SIDX(flow, TGTSIDE),
+ };
+ int rc;
+
+ uflow->s[TGTSIDE] = flowside_sock_l4(c, EPOLL_TYPE_UDP_REPLY,
+ tgtpif, tgt, fref.data);
+ if (uflow->s[TGTSIDE] < 0) {
+ flow_dbg(uflow,
+ "Couldn't open socket for spliced flow: %s",
+ strerror(errno));
+ goto cancel;
+ }
+
+ if (flowside_connect(c, uflow->s[TGTSIDE], tgtpif, tgt) < 0) {
+ flow_dbg(uflow,
+ "Couldn't connect flow socket: %s",
+ strerror(errno));
+ goto cancel;
+ }
+
+ /* It's possible, if unlikely, that we could receive some
+ * unrelated packets in between the bind() and connect() of this
+ * socket. For now we just discard these. We could consider
+ * trying to redirect these to an appropriate handler, if we
+ * need to.
+ */
+ rc = recvmmsg(uflow->s[TGTSIDE], discard, ARRAY_SIZE(discard),
+ MSG_DONTWAIT, NULL);
+ if (rc >= ARRAY_SIZE(discard)) {
+ flow_dbg(uflow,
+ "Too many (%d) spurious reply datagrams", rc);
+ goto cancel;
+ } else if (rc > 0) {
+ flow_trace(uflow,
+ "Discarded %d spurious reply datagrams", rc);
+ } else if (errno != EAGAIN) {
+ flow_err(uflow,
+ "Unexpected error discarding datagrams: %s",
+ strerror(errno));
+ }
+ }
flow_hash_insert(c, FLOW_SIDX(uflow, INISIDE));
FLOW_ACTIVATE(uflow);
@@ -581,7 +504,6 @@ cancel:
udp_flow_close(c, uflow);
flow_alloc_cancel(flow);
return FLOW_SIDX_NONE;
-
}
/**
@@ -591,6 +513,8 @@ cancel:
* @meta: Metadata buffer for the datagram
* @now: Timestamp
*
+ * #syscalls fcntl
+ *
* Return: sidx for the destination side of the flow for this packet, or
* FLOW_SIDX_NONE if we couldn't find or create a flow.
*/
@@ -624,7 +548,7 @@ static flow_sidx_t udp_flow_from_sock(const struct ctx *c, union epoll_ref ref,
}
flow_initiate_sa(flow, ref.udp.pif, &meta->s_in, ref.udp.port);
- return udp_flow_new(c, flow, now);
+ return udp_flow_new(c, flow, ref.fd, now);
}
/**
@@ -648,55 +572,16 @@ static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
* @now: Timestamp
*/
static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
- in_port_t src, in_port_t dst,
- union epoll_ref ref,
- const struct timespec *now)
+ flow_sidx_t tosidx)
{
- int s;
-
- if (ref.udp.v6) {
- udp_splice_to.sa6 = (struct sockaddr_in6) {
- .sin6_family = AF_INET6,
- .sin6_addr = in6addr_loopback,
- .sin6_port = htons(dst),
- };
- } else {
- udp_splice_to.sa4 = (struct sockaddr_in) {
- .sin_family = AF_INET,
- .sin_addr = in4addr_loopback,
- .sin_port = htons(dst),
- };
- }
-
- if (ref.udp.pif == PIF_SPLICE) {
- src += c->udp.fwd_in.rdelta[src];
- s = udp_splice_init[ref.udp.v6][src].sock;
- if (s < 0 && ref.udp.orig)
- s = udp_splice_new(c, ref.udp.v6, src, false);
-
- if (s < 0)
- return;
-
- udp_splice_ns[ref.udp.v6][dst].ts = now->tv_sec;
- udp_splice_init[ref.udp.v6][src].ts = now->tv_sec;
- } else {
- ASSERT(ref.udp.pif == PIF_HOST);
- src += c->udp.fwd_out.rdelta[src];
- s = udp_splice_ns[ref.udp.v6][src].sock;
- if (s < 0 && ref.udp.orig) {
- struct udp_splice_new_ns_arg arg = {
- c, ref.udp.v6, src, -1,
- };
-
- NS_CALL(udp_splice_new_ns, &arg);
- s = arg.s;
- }
- if (s < 0)
- return;
+ const struct flowside *toside = flowside_at_sidx(tosidx);
+ const struct udp_flow *uflow = udp_at_sidx(tosidx);
+ uint8_t topif = pif_at_sidx(tosidx);
+ int s = uflow->s[tosidx.sidei];
+ socklen_t sl;
- udp_splice_init[ref.udp.v6][dst].ts = now->tv_sec;
- udp_splice_ns[ref.udp.v6][src].ts = now->tv_sec;
- }
+ pif_sockaddr(c, &udp_splice_to, &sl, topif,
+ &toside->eaddr, toside->eport);
sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
}
@@ -984,20 +869,18 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
dstport += c->udp.fwd_in.f.delta[dstport];
/* We divide datagrams into batches based on how we need to send them,
- * determined by udp_meta[i].splicesrc and udp_meta[i].tosidx. To avoid
- * either two passes through the array, or recalculating splicesrc and
- * tosidxfor a single entry, we have to populate it one entry *ahead* of
- * the loop counter.
+ * determined by udp_meta[i].tosidx. To avoid either two passes through
+ * the array, or recalculating tosidx for a single entry, we have to
+ * populate it one entry *ahead* of the loop counter.
*/
- udp_meta[0].splicesrc = udp_mmh_splice_port(ref, mmh_recv);
udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0], now);
for (i = 0; i < n; ) {
flow_sidx_t batchsidx = udp_meta[i].tosidx;
- int batchsrc = udp_meta[i].splicesrc;
+ uint8_t batchpif = pif_at_sidx(batchsidx);
int batchstart = i;
do {
- if (batchsrc >= 0) {
+ if (pif_is_socket(batchpif)) {
udp_splice_prepare(mmh_recv, i);
} else {
udp_tap_prepare(c, mmh_recv, i, dstport,
@@ -1007,17 +890,14 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
if (++i >= n)
break;
- udp_meta[i].splicesrc = udp_mmh_splice_port(ref,
- &mmh_recv[i]);
udp_meta[i].tosidx = udp_flow_from_sock(c, ref,
&udp_meta[i],
now);
- } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx) &&
- udp_meta[i].splicesrc == batchsrc);
+ } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx));
- if (batchsrc >= 0) {
+ if (pif_is_socket(batchpif)) {
udp_splice_send(c, batchstart, i - batchstart,
- batchsrc, dstport, ref, now);
+ batchsidx);
} else {
tap_send_frames(c, &udp_l2_iov[batchstart][0],
UDP_NUM_IOVS, i - batchstart);
@@ -1026,6 +906,40 @@ void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t eve
}
/**
+ * udp_reply_sock_handler() - Handle new data from flow specific socket
+ * @c: Execution context
+ * @ref: epoll reference
+ * @events: epoll events bitmap
+ * @now: Current timestamp
+ *
+ * #syscalls recvmmsg
+ */
+void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
+ uint32_t events, const struct timespec *now)
+{
+ const struct flowside *fromside = flowside_at_sidx(ref.flowside);
+ flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
+ struct udp_flow *uflow = udp_at_sidx(ref.flowside);
+ int from_s = uflow->s[ref.flowside.sidei];
+ bool v6 = !inany_v4(&fromside->eaddr);
+ struct mmsghdr *mmh_recv = v6 ? udp6_mh_recv : udp4_mh_recv;
+ int n, i;
+
+ ASSERT(!c->no_udp && uflow);
+
+ if ((n = udp_sock_recv(c, from_s, events, mmh_recv)) <= 0)
+ return;
+
+ flow_trace(uflow, "Received %d datagrams on reply socket", n);
+ uflow->ts = now->tv_sec;
+
+ for (i = 0; i < n; i++)
+ udp_splice_prepare(mmh_recv, i);
+
+ udp_splice_send(c, 0, n, tosidx);
+}
+
+/**
* udp_tap_handler() - Handle packets from tap
* @c: Execution context
* @pif: pif on which the packet is arriving
@@ -1419,8 +1333,8 @@ static int udp_port_rebind_outbound(void *arg)
*
* Return: true if the flow is ready to free, false otherwise
*/
-bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow,
- const struct timespec *now)
+bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow,
+ const struct timespec *now)
{
if (now->tv_sec - uflow->ts <= UDP_CONN_TIMEOUT)
return false;
diff --git a/udp.h b/udp.h
index 5865def..db5e546 100644
--- a/udp.h
+++ b/udp.h
@@ -9,8 +9,10 @@
#define UDP_TIMER_INTERVAL 1000 /* ms */
void udp_portmap_clear(void);
-void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events,
- const struct timespec *now);
+void udp_buf_sock_handler(const struct ctx *c, union epoll_ref ref,
+ uint32_t events, const struct timespec *now);
+void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
+ uint32_t events, const struct timespec *now);
int udp_tap_handler(struct ctx *c, uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
const struct pool *p, int idx, const struct timespec *now);
diff --git a/udp_flow.h b/udp_flow.h
index 18af9ac..e0736f8 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -11,15 +11,17 @@
* struct udp - Descriptor for a flow of UDP packets
* @f: Generic flow information
* @ts: Activity timestamp
+ * @s: Socket fd (or -1) for each side of the flow
*/
struct udp_flow {
/* Must be first element */
struct flow_common f;
time_t ts;
+ int s[SIDES];
};
-bool udp_flow_timer(const struct ctx *c, const struct udp_flow *uflow,
+bool udp_flow_timer(const struct ctx *c, struct udp_flow *uflow,
const struct timespec *now);
#endif /* UDP_FLOW_H */
diff --git a/util.c b/util.c
index 6b51fc5..8dc8ff7 100644
--- a/util.c
+++ b/util.c
@@ -62,6 +62,7 @@ int sock_l4_sa(const struct ctx *c, enum epoll_type type,
socktype = SOCK_STREAM | SOCK_NONBLOCK;
break;
case EPOLL_TYPE_UDP:
+ case EPOLL_TYPE_UDP_REPLY:
proto = IPPROTO_UDP;
socktype = SOCK_DGRAM | SOCK_NONBLOCK;
break;