aboutgitcodebugslistschat
path: root/udp.c
diff options
context:
space:
mode:
Diffstat (limited to 'udp.c')
-rw-r--r--udp.c702
1 files changed, 481 insertions, 221 deletions
diff --git a/udp.c b/udp.c
index 923cc38..65a52e0 100644
--- a/udp.c
+++ b/udp.c
@@ -39,27 +39,30 @@
* could receive packets from multiple flows, so we use a hash table match to
* find the specific flow for a datagram.
*
- * When a UDP flow is initiated from a listening socket we take a duplicate of
- * the socket and store it in uflow->s[INISIDE]. This will last for the
- * lifetime of the flow, even if the original listening socket is closed due to
- * port auto-probing. The duplicate is used to deliver replies back to the
- * originating side.
+ * Flow sockets
+ * ============
*
- * Reply sockets
- * =============
- *
- * When a UDP flow targets a socket, we create a "reply" socket in
+ * When a UDP flow targets a socket, we create a "flow" socket in
* uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive
* replies on the target side. This socket is both bound and connected and has
- * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams
+ * EPOLL_TYPE_UDP. The connect() means it will only receive datagrams
* associated with this flow, so the epoll reference directly points to the flow
* and we don't need a hash lookup.
*
- * NOTE: it's possible that the reply socket could have a bound address
- * overlapping with an unrelated listening socket. We assume datagrams for the
- * flow will come to the reply socket in preference to a listening socket. The
- * sample program doc/platform-requirements/reuseaddr-priority.c documents and
- * tests that assumption.
+ * When a flow is initiated from a listening socket, we create a "flow" socket
+ * with the same bound address as the listening socket, but also connect()ed to
+ * the flow's peer. This is stored in uflow->s[INISIDE] and will last for the
+ * lifetime of the flow, even if the original listening socket is closed due to
+ * port auto-probing. The duplicate is used to deliver replies back to the
+ * originating side.
+ *
+ * NOTE: A flow socket can have a bound address overlapping with a listening
+ * socket. That will happen naturally for flows initiated from a socket, but is
+ * also possible (though unlikely) for tap initiated flows, depending on the
+ * source port. We assume datagrams for the flow will come to a connect()ed
+ * socket in preference to a listening socket. The sample program
+ * doc/platform-requirements/reuseaddr-priority.c documents and tests that
+ * assumption.
*
* "Spliced" flows
* ===============
@@ -71,8 +74,7 @@
* actually used; it doesn't make sense for datagrams and instead a pair of
* recvmmsg() and sendmmsg() is used to forward the datagrams.
*
- * Note that a spliced flow will have *both* a duplicated listening socket and a
- * reply socket (see above).
+ * Note that a spliced flow will have two flow sockets (see above).
*/
#include <sched.h>
@@ -87,6 +89,8 @@
#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/udp.h>
+#include <netinet/ip_icmp.h>
+#include <netinet/icmp6.h>
#include <stdint.h>
#include <stddef.h>
#include <string.h>
@@ -112,6 +116,14 @@
#include "udp_internal.h"
#include "udp_vu.h"
+#define UDP_MAX_FRAMES 32 /* max # of frames to receive at once */
+
+/* Maximum UDP data to be returned in ICMP messages */
+#define ICMP4_MAX_DLEN 8
+#define ICMP6_MAX_DLEN (IPV6_MIN_MTU \
+ - sizeof(struct udphdr) \
+ - sizeof(struct ipv6hdr))
+
/* "Spliced" sockets indexed by bound port (host order) */
static int udp_splice_ns [IP_VERSIONS][NUM_PORTS];
static int udp_splice_init[IP_VERSIONS][NUM_PORTS];
@@ -128,26 +140,31 @@ static struct ethhdr udp4_eth_hdr;
static struct ethhdr udp6_eth_hdr;
/**
- * struct udp_meta_t - Pre-cooked headers and metadata for UDP packets
+ * struct udp_meta_t - Pre-cooked headers for UDP packets
* @ip6h: Pre-filled IPv6 header (except for payload_len and addresses)
* @ip4h: Pre-filled IPv4 header (except for tot_len and saddr)
* @taph: Tap backend specific header
- * @s_in: Source socket address, filled in by recvmmsg()
- * @tosidx: sidx for the destination side of this datagram's flow
*/
static struct udp_meta_t {
struct ipv6hdr ip6h;
struct iphdr ip4h;
struct tap_hdr taph;
-
- union sockaddr_inany s_in;
- flow_sidx_t tosidx;
}
#ifdef __AVX2__
__attribute__ ((aligned(32)))
#endif
udp_meta[UDP_MAX_FRAMES];
+#define PKTINFO_SPACE \
+ MAX(CMSG_SPACE(sizeof(struct in_pktinfo)), \
+ CMSG_SPACE(sizeof(struct in6_pktinfo)))
+
+#define RECVERR_SPACE \
+ MAX(CMSG_SPACE(sizeof(struct sock_extended_err) + \
+ sizeof(struct sockaddr_in)), \
+ CMSG_SPACE(sizeof(struct sock_extended_err) + \
+ sizeof(struct sockaddr_in6)))
+
/**
* enum udp_iov_idx - Indices for the buffers making up a single UDP frame
* @UDP_IOV_TAP tap specific header
@@ -224,8 +241,6 @@ static void udp_iov_init_one(const struct ctx *c, size_t i)
tiov[UDP_IOV_TAP] = tap_hdr_iov(c, &meta->taph);
tiov[UDP_IOV_PAYLOAD].iov_base = payload;
- mh->msg_name = &meta->s_in;
- mh->msg_namelen = sizeof(meta->s_in);
mh->msg_iov = siov;
mh->msg_iovlen = 1;
}
@@ -246,41 +261,6 @@ static void udp_iov_init(const struct ctx *c)
}
/**
- * udp_splice_prepare() - Prepare one datagram for splicing
- * @mmh: Receiving mmsghdr array
- * @idx: Index of the datagram to prepare
- */
-static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx)
-{
- udp_mh_splice[idx].msg_hdr.msg_iov->iov_len = mmh[idx].msg_len;
-}
-
-/**
- * udp_splice_send() - Send a batch of datagrams from socket to socket
- * @c: Execution context
- * @start: Index of batch's first datagram in udp[46]_l2_buf
- * @n: Number of datagrams in batch
- * @src: Source port for datagram (target side)
- * @dst: Destination port for datagrams (target side)
- * @ref: epoll reference for origin socket
- * @now: Timestamp
- */
-static void udp_splice_send(const struct ctx *c, size_t start, size_t n,
- flow_sidx_t tosidx)
-{
- const struct flowside *toside = flowside_at_sidx(tosidx);
- const struct udp_flow *uflow = udp_at_sidx(tosidx);
- uint8_t topif = pif_at_sidx(tosidx);
- int s = uflow->s[tosidx.sidei];
- socklen_t sl;
-
- pif_sockaddr(c, &udp_splice_to, &sl, topif,
- &toside->eaddr, toside->eport);
-
- sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL);
-}
-
-/**
* udp_update_hdr4() - Update headers for one IPv4 datagram
* @ip4h: Pre-filled IPv4 header (except for tot_len and saddr)
* @bp: Pointer to udp_payload_t to update
@@ -403,27 +383,171 @@ static void udp_tap_prepare(const struct mmsghdr *mmh,
}
/**
+ * udp_send_tap_icmp4() - Construct and send ICMPv4 to local peer
+ * @c: Execution context
+ * @ee: Extended error descriptor
+ * @toside: Destination side of flow
+ * @saddr: Address of ICMP generating node
+ * @in: First bytes (max 8) of original UDP message body
+ * @dlen: Length of the read part of original UDP message body
+ */
+static void udp_send_tap_icmp4(const struct ctx *c,
+ const struct sock_extended_err *ee,
+ const struct flowside *toside,
+ struct in_addr saddr,
+ const void *in, size_t dlen)
+{
+ struct in_addr oaddr = toside->oaddr.v4mapped.a4;
+ struct in_addr eaddr = toside->eaddr.v4mapped.a4;
+ in_port_t eport = toside->eport;
+ in_port_t oport = toside->oport;
+ struct {
+ struct icmphdr icmp4h;
+ struct iphdr ip4h;
+ struct udphdr uh;
+ char data[ICMP4_MAX_DLEN];
+ } __attribute__((packed, aligned(__alignof__(max_align_t)))) msg;
+ size_t msglen = sizeof(msg) - sizeof(msg.data) + dlen;
+ size_t l4len = dlen + sizeof(struct udphdr);
+
+ ASSERT(dlen <= ICMP4_MAX_DLEN);
+ memset(&msg, 0, sizeof(msg));
+ msg.icmp4h.type = ee->ee_type;
+ msg.icmp4h.code = ee->ee_code;
+ if (ee->ee_type == ICMP_DEST_UNREACH && ee->ee_code == ICMP_FRAG_NEEDED)
+ msg.icmp4h.un.frag.mtu = htons((uint16_t) ee->ee_info);
+
+ /* Reconstruct the original headers as returned in the ICMP message */
+ tap_push_ip4h(&msg.ip4h, eaddr, oaddr, l4len, IPPROTO_UDP);
+ tap_push_uh4(&msg.uh, eaddr, eport, oaddr, oport, in, dlen);
+ memcpy(&msg.data, in, dlen);
+
+ tap_icmp4_send(c, saddr, eaddr, &msg, msglen);
+}
+
+
+/**
+ * udp_send_tap_icmp6() - Construct and send ICMPv6 to local peer
+ * @c: Execution context
+ * @ee: Extended error descriptor
+ * @toside: Destination side of flow
+ * @saddr: Address of ICMP generating node
+ * @in: First bytes (max 1232) of original UDP message body
+ * @dlen: Length of the read part of original UDP message body
+ * @flow: IPv6 flow identifier
+ */
+static void udp_send_tap_icmp6(const struct ctx *c,
+ const struct sock_extended_err *ee,
+ const struct flowside *toside,
+ const struct in6_addr *saddr,
+ void *in, size_t dlen, uint32_t flow)
+{
+ const struct in6_addr *oaddr = &toside->oaddr.a6;
+ const struct in6_addr *eaddr = &toside->eaddr.a6;
+ in_port_t eport = toside->eport;
+ in_port_t oport = toside->oport;
+ struct {
+ struct icmp6_hdr icmp6h;
+ struct ipv6hdr ip6h;
+ struct udphdr uh;
+ char data[ICMP6_MAX_DLEN];
+ } __attribute__((packed, aligned(__alignof__(max_align_t)))) msg;
+ size_t msglen = sizeof(msg) - sizeof(msg.data) + dlen;
+ size_t l4len = dlen + sizeof(struct udphdr);
+
+ ASSERT(dlen <= ICMP6_MAX_DLEN);
+ memset(&msg, 0, sizeof(msg));
+ msg.icmp6h.icmp6_type = ee->ee_type;
+ msg.icmp6h.icmp6_code = ee->ee_code;
+ if (ee->ee_type == ICMP6_PACKET_TOO_BIG)
+ msg.icmp6h.icmp6_dataun.icmp6_un_data32[0] = htonl(ee->ee_info);
+
+ /* Reconstruct the original headers as returned in the ICMP message */
+ tap_push_ip6h(&msg.ip6h, eaddr, oaddr, l4len, IPPROTO_UDP, flow);
+ tap_push_uh6(&msg.uh, eaddr, eport, oaddr, oport, in, dlen);
+ memcpy(&msg.data, in, dlen);
+
+ tap_icmp6_send(c, saddr, eaddr, &msg, msglen);
+}
+
+/**
+ * udp_pktinfo() - Retrieve packet destination address from cmsg
+ * @msg: msghdr into which message has been received
+ * @dst: (Local) destination address of message in @msg (output)
+ *
+ * Return: 0 on success, -1 if the information was missing (@dst is set to
+ * inany_any6).
+ */
+static int udp_pktinfo(struct msghdr *msg, union inany_addr *dst)
+{
+ struct cmsghdr *hdr;
+
+ for (hdr = CMSG_FIRSTHDR(msg); hdr; hdr = CMSG_NXTHDR(msg, hdr)) {
+ if (hdr->cmsg_level == IPPROTO_IP &&
+ hdr->cmsg_type == IP_PKTINFO) {
+ const struct in_pktinfo *i4 = (void *)CMSG_DATA(hdr);
+
+ *dst = inany_from_v4(i4->ipi_addr);
+ return 0;
+ }
+
+ if (hdr->cmsg_level == IPPROTO_IPV6 &&
+ hdr->cmsg_type == IPV6_PKTINFO) {
+ const struct in6_pktinfo *i6 = (void *)CMSG_DATA(hdr);
+
+ dst->a6 = i6->ipi6_addr;
+ return 0;
+ }
+ }
+
+ debug("Missing PKTINFO cmsg on datagram");
+ *dst = inany_any6;
+ return -1;
+}
+
+/**
* udp_sock_recverr() - Receive and clear an error from a socket
- * @s: Socket to receive from
+ * @c: Execution context
+ * @s: Socket to receive errors from
+ * @sidx: Flow and side of @s, or FLOW_SIDX_NONE if unknown
+ * @pif: Interface on which the error occurred
+ * (only used if @sidx == FLOW_SIDX_NONE)
+ * @port: Local port number of @s (only used if @sidx == FLOW_SIDX_NONE)
*
* Return: 1 if error received and processed, 0 if no more errors in queue, < 0
* if there was an error reading the queue
*
* #syscalls recvmsg
*/
-static int udp_sock_recverr(int s)
+static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
+ uint8_t pif, in_port_t port)
{
+ char buf[PKTINFO_SPACE + RECVERR_SPACE];
const struct sock_extended_err *ee;
- const struct cmsghdr *hdr;
- char buf[CMSG_SPACE(sizeof(*ee))];
+ char data[ICMP6_MAX_DLEN];
+ struct cmsghdr *hdr;
+ struct iovec iov = {
+ .iov_base = data,
+ .iov_len = sizeof(data)
+ };
+ union sockaddr_inany src;
struct msghdr mh = {
- .msg_name = NULL,
- .msg_namelen = 0,
- .msg_iov = NULL,
- .msg_iovlen = 0,
+ .msg_name = &src,
+ .msg_namelen = sizeof(src),
+ .msg_iov = &iov,
+ .msg_iovlen = 1,
.msg_control = buf,
.msg_controllen = sizeof(buf),
};
+ const struct flowside *fromside, *toside;
+ union inany_addr offender, otap;
+ char astr[INANY_ADDRSTRLEN];
+ char sastr[SOCKADDR_STRLEN];
+ const struct in_addr *o4;
+ in_port_t offender_port;
+ struct udp_flow *uflow;
+ uint8_t topif;
+ size_t dlen;
ssize_t rc;
rc = recvmsg(s, &mh, MSG_ERRQUEUE);
@@ -440,33 +564,102 @@ static int udp_sock_recverr(int s)
return -1;
}
- hdr = CMSG_FIRSTHDR(&mh);
- if (!((hdr->cmsg_level == IPPROTO_IP &&
- hdr->cmsg_type == IP_RECVERR) ||
- (hdr->cmsg_level == IPPROTO_IPV6 &&
- hdr->cmsg_type == IPV6_RECVERR))) {
- err("Unexpected cmsg reading error queue");
+ for (hdr = CMSG_FIRSTHDR(&mh); hdr; hdr = CMSG_NXTHDR(&mh, hdr)) {
+ if ((hdr->cmsg_level == IPPROTO_IP &&
+ hdr->cmsg_type == IP_RECVERR) ||
+ (hdr->cmsg_level == IPPROTO_IPV6 &&
+ hdr->cmsg_type == IPV6_RECVERR))
+ break;
+ }
+
+ if (!hdr) {
+ err("Missing RECVERR cmsg in error queue");
return -1;
}
ee = (const struct sock_extended_err *)CMSG_DATA(hdr);
- /* TODO: When possible propagate and otherwise handle errors */
debug("%s error on UDP socket %i: %s",
str_ee_origin(ee), s, strerror_(ee->ee_errno));
+ if (!flow_sidx_valid(sidx)) {
+ /* No hint from the socket, determine flow from addresses */
+ union inany_addr dst;
+
+ if (udp_pktinfo(&mh, &dst) < 0) {
+ debug("Missing PKTINFO on UDP error");
+ return 1;
+ }
+
+ sidx = flow_lookup_sa(c, IPPROTO_UDP, pif, &src, &dst, port);
+ if (!flow_sidx_valid(sidx)) {
+ debug("Ignoring UDP error without flow");
+ return 1;
+ }
+ } else {
+ pif = pif_at_sidx(sidx);
+ }
+
+ uflow = udp_at_sidx(sidx);
+ ASSERT(uflow);
+ fromside = &uflow->f.side[sidx.sidei];
+ toside = &uflow->f.side[!sidx.sidei];
+ topif = uflow->f.pif[!sidx.sidei];
+ dlen = rc;
+
+ if (inany_from_sockaddr(&offender, &offender_port,
+ SO_EE_OFFENDER(ee)) < 0)
+ goto fail;
+
+ if (pif != PIF_HOST || topif != PIF_TAP)
+ /* XXX Can we support any other cases? */
+ goto fail;
+
+ /* If the offender *is* the endpoint, make sure our translation is
+ * consistent with the flow's translation. This matters if the flow
+ * endpoint has a port specific translation (like --dns-match).
+ */
+ if (inany_equals(&offender, &fromside->eaddr))
+ otap = toside->oaddr;
+ else if (!nat_inbound(c, &offender, &otap))
+ goto fail;
+
+ if (hdr->cmsg_level == IPPROTO_IP &&
+ (o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
+ dlen = MIN(dlen, ICMP4_MAX_DLEN);
+ udp_send_tap_icmp4(c, ee, toside, *o4, data, dlen);
+ return 1;
+ }
+
+ if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
+ udp_send_tap_icmp6(c, ee, toside, &otap.a6, data, dlen,
+ FLOW_IDX(uflow));
+ return 1;
+ }
+
+fail:
+ flow_dbg(uflow, "Can't propagate %s error from %s %s to %s %s",
+ str_ee_origin(ee),
+ pif_name(pif),
+ sockaddr_ntop(SO_EE_OFFENDER(ee), sastr, sizeof(sastr)),
+ pif_name(topif),
+ inany_ntop(&toside->eaddr, astr, sizeof(astr)));
return 1;
}
/**
* udp_sock_errs() - Process errors on a socket
* @c: Execution context
- * @s: Socket to receive from
- * @events: epoll events bitmap
+ * @s: Socket to receive errors from
+ * @sidx: Flow and side of @s, or FLOW_SIDX_NONE if unknown
+ * @pif: Interface on which the error occurred
+ * (only used if @sidx == FLOW_SIDX_NONE)
+ * @port: Local port number of @s (only used if @sidx == FLOW_SIDX_NONE)
*
* Return: Number of errors handled, or < 0 if we have an unrecoverable error
*/
-int udp_sock_errs(const struct ctx *c, int s, uint32_t events)
+static int udp_sock_errs(const struct ctx *c, int s, flow_sidx_t sidx,
+ uint8_t pif, in_port_t port)
{
unsigned n_err = 0;
socklen_t errlen;
@@ -474,11 +667,8 @@ int udp_sock_errs(const struct ctx *c, int s, uint32_t events)
ASSERT(!c->no_udp);
- if (!(events & EPOLLERR))
- return 0; /* Nothing to do */
-
/* Empty the error queue */
- while ((rc = udp_sock_recverr(s)) > 0)
+ while ((rc = udp_sock_recverr(c, s, sidx, pif, port)) > 0)
n_err += rc;
if (rc < 0)
@@ -506,36 +696,61 @@ int udp_sock_errs(const struct ctx *c, int s, uint32_t events)
}
/**
+ * udp_peek_addr() - Get source address for next packet
+ * @s: Socket to get information from
+ * @src: Socket address (output)
+ * @dst: (Local) destination address (output)
+ *
+ * Return: 0 if no more packets, 1 on success, -ve error code on error
+ */
+static int udp_peek_addr(int s, union sockaddr_inany *src,
+ union inany_addr *dst)
+{
+ char sastr[SOCKADDR_STRLEN], dstr[INANY_ADDRSTRLEN];
+ char cmsg[PKTINFO_SPACE];
+ struct msghdr msg = {
+ .msg_name = src,
+ .msg_namelen = sizeof(*src),
+ .msg_control = cmsg,
+ .msg_controllen = sizeof(cmsg),
+ };
+ int rc;
+
+ rc = recvmsg(s, &msg, MSG_PEEK | MSG_DONTWAIT);
+ if (rc < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return 0;
+ return -errno;
+ }
+
+ udp_pktinfo(&msg, dst);
+
+ trace("Peeked UDP datagram: %s -> %s",
+ sockaddr_ntop(src, sastr, sizeof(sastr)),
+ inany_ntop(dst, dstr, sizeof(dstr)));
+
+ return 1;
+}
+
+/**
* udp_sock_recv() - Receive datagrams from a socket
* @c: Execution context
* @s: Socket to receive from
- * @events: epoll events bitmap
- * @mmh mmsghdr array to receive into
+ * @mmh: mmsghdr array to receive into
+ * @n: Maximum number of datagrams to receive
*
* Return: Number of datagrams received
*
* #syscalls recvmmsg arm:recvmmsg_time64 i686:recvmmsg_time64
*/
-static int udp_sock_recv(const struct ctx *c, int s, uint32_t events,
- struct mmsghdr *mmh)
+static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n)
{
- /* For not entirely clear reasons (data locality?) pasta gets better
- * throughput if we receive tap datagrams one at a atime. For small
- * splice datagrams throughput is slightly better if we do batch, but
- * it's slightly worse for large splice datagrams. Since we don't know
- * before we receive whether we'll use tap or splice, always go one at a
- * time for pasta mode.
- */
- int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
-
ASSERT(!c->no_udp);
- if (!(events & EPOLLIN))
- return 0;
-
n = recvmmsg(s, mmh, n, 0, NULL);
if (n < 0) {
- err_perror("Error receiving datagrams");
+ trace("Error receiving datagrams: %s", strerror_(errno));
+ /* Bail out and let the EPOLLERR handler deal with it */
return 0;
}
@@ -543,78 +758,121 @@ static int udp_sock_recv(const struct ctx *c, int s, uint32_t events,
}
/**
- * udp_buf_listen_sock_handler() - Handle new data from socket
+ * udp_sock_to_sock() - Forward datagrams from socket to socket
* @c: Execution context
- * @ref: epoll reference
- * @events: epoll events bitmap
- * @now: Current timestamp
+ * @from_s: Socket to receive datagrams from
+ * @n: Maximum number of datagrams to forward
+ * @tosidx: Flow & side to forward datagrams to
*
- * #syscalls recvmmsg
+ * #syscalls sendmmsg
*/
-static void udp_buf_listen_sock_handler(const struct ctx *c,
- union epoll_ref ref, uint32_t events,
- const struct timespec *now)
+static void udp_sock_to_sock(const struct ctx *c, int from_s, int n,
+ flow_sidx_t tosidx)
{
- const socklen_t sasize = sizeof(udp_meta[0].s_in);
- int n, i;
+ const struct flowside *toside = flowside_at_sidx(tosidx);
+ const struct udp_flow *uflow = udp_at_sidx(tosidx);
+ uint8_t topif = pif_at_sidx(tosidx);
+ int to_s = uflow->s[tosidx.sidei];
+ socklen_t sl;
+ int i;
- if (udp_sock_errs(c, ref.fd, events) < 0) {
- err("UDP: Unrecoverable error on listening socket:"
- " (%s port %hu)", pif_name(ref.udp.pif), ref.udp.port);
- /* FIXME: what now? close/re-open socket? */
+ if ((n = udp_sock_recv(c, from_s, udp_mh_recv, n)) <= 0)
return;
+
+ for (i = 0; i < n; i++) {
+ udp_mh_splice[i].msg_hdr.msg_iov->iov_len
+ = udp_mh_recv[i].msg_len;
}
- if ((n = udp_sock_recv(c, ref.fd, events, udp_mh_recv)) <= 0)
+ pif_sockaddr(c, &udp_splice_to, &sl, topif,
+ &toside->eaddr, toside->eport);
+
+ sendmmsg(to_s, udp_mh_splice, n, MSG_NOSIGNAL);
+}
+
+/**
+ * udp_buf_sock_to_tap() - Forward datagrams from socket to tap
+ * @c: Execution context
+ * @s: Socket to read data from
+ * @n: Maximum number of datagrams to forward
+ * @tosidx: Flow & side to forward data from @s to
+ */
+static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n,
+ flow_sidx_t tosidx)
+{
+ const struct flowside *toside = flowside_at_sidx(tosidx);
+ int i;
+
+ if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0)
return;
- /* We divide datagrams into batches based on how we need to send them,
- * determined by udp_meta[i].tosidx. To avoid either two passes through
- * the array, or recalculating tosidx for a single entry, we have to
- * populate it one entry *ahead* of the loop counter.
- */
- udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0].s_in, now);
- udp_mh_recv[0].msg_hdr.msg_namelen = sasize;
- for (i = 0; i < n; ) {
- flow_sidx_t batchsidx = udp_meta[i].tosidx;
- uint8_t batchpif = pif_at_sidx(batchsidx);
- int batchstart = i;
-
- do {
- if (pif_is_socket(batchpif)) {
- udp_splice_prepare(udp_mh_recv, i);
- } else if (batchpif == PIF_TAP) {
- udp_tap_prepare(udp_mh_recv, i,
- flowside_at_sidx(batchsidx),
- false);
+ for (i = 0; i < n; i++)
+ udp_tap_prepare(udp_mh_recv, i, toside, false);
+
+ tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n);
+}
+
+/**
+ * udp_sock_fwd() - Forward datagrams from a possibly unconnected socket
+ * @c: Execution context
+ * @s: Socket to forward from
+ * @frompif: Interface to which @s belongs
+ * @port: Our (local) port number of @s
+ * @now: Current timestamp
+ */
+void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif,
+ in_port_t port, const struct timespec *now)
+{
+ union sockaddr_inany src;
+ union inany_addr dst;
+ int rc;
+
+ while ((rc = udp_peek_addr(s, &src, &dst)) != 0) {
+ bool discard = false;
+ flow_sidx_t tosidx;
+ uint8_t topif;
+
+ if (rc < 0) {
+ trace("Error peeking at socket address: %s",
+ strerror_(-rc));
+ /* Clear errors & carry on */
+ if (udp_sock_errs(c, s, FLOW_SIDX_NONE,
+ frompif, port) < 0) {
+ err(
+"UDP: Unrecoverable error on listening socket: (%s port %hu)",
+ pif_name(frompif), port);
+ /* FIXME: what now? close/re-open socket? */
}
+ continue;
+ }
+
+ tosidx = udp_flow_from_sock(c, frompif, &dst, port, &src, now);
+ topif = pif_at_sidx(tosidx);
- if (++i >= n)
- break;
-
- udp_meta[i].tosidx = udp_flow_from_sock(c, ref,
- &udp_meta[i].s_in,
- now);
- udp_mh_recv[i].msg_hdr.msg_namelen = sasize;
- } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx));
-
- if (pif_is_socket(batchpif)) {
- udp_splice_send(c, batchstart, i - batchstart,
- batchsidx);
- } else if (batchpif == PIF_TAP) {
- tap_send_frames(c, &udp_l2_iov[batchstart][0],
- UDP_NUM_IOVS, i - batchstart);
- } else if (flow_sidx_valid(batchsidx)) {
- flow_sidx_t fromsidx = flow_sidx_opposite(batchsidx);
- struct udp_flow *uflow = udp_at_sidx(batchsidx);
+ if (pif_is_socket(topif)) {
+ udp_sock_to_sock(c, s, 1, tosidx);
+ } else if (topif == PIF_TAP) {
+ if (c->mode == MODE_VU)
+ udp_vu_sock_to_tap(c, s, 1, tosidx);
+ else
+ udp_buf_sock_to_tap(c, s, 1, tosidx);
+ } else if (flow_sidx_valid(tosidx)) {
+ struct udp_flow *uflow = udp_at_sidx(tosidx);
flow_err(uflow,
"No support for forwarding UDP from %s to %s",
- pif_name(pif_at_sidx(fromsidx)),
- pif_name(batchpif));
+ pif_name(frompif), pif_name(topif));
+ discard = true;
} else {
- debug("Discarding %d datagrams without flow",
- i - batchstart);
+ debug("Discarding datagram without flow");
+ discard = true;
+ }
+
+ if (discard) {
+ struct msghdr msg = { 0 };
+
+ if (recvmsg(s, &msg, MSG_DONTWAIT) < 0)
+ debug_perror("Failed to discard datagram");
}
}
}
@@ -630,87 +888,69 @@ void udp_listen_sock_handler(const struct ctx *c,
union epoll_ref ref, uint32_t events,
const struct timespec *now)
{
- if (c->mode == MODE_VU) {
- udp_vu_listen_sock_handler(c, ref, events, now);
- return;
- }
-
- udp_buf_listen_sock_handler(c, ref, events, now);
+ if (events & (EPOLLERR | EPOLLIN))
+ udp_sock_fwd(c, ref.fd, ref.udp.pif, ref.udp.port, now);
}
/**
- * udp_buf_reply_sock_handler() - Handle new data from flow specific socket
+ * udp_sock_handler() - Handle new data from flow specific socket
* @c: Execution context
* @ref: epoll reference
* @events: epoll events bitmap
* @now: Current timestamp
- *
- * #syscalls recvmmsg
*/
-static void udp_buf_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
- uint32_t events,
- const struct timespec *now)
+void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
+ uint32_t events, const struct timespec *now)
{
- flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
- const struct flowside *toside = flowside_at_sidx(tosidx);
struct udp_flow *uflow = udp_at_sidx(ref.flowside);
- uint8_t topif = pif_at_sidx(tosidx);
- int n, i, from_s;
ASSERT(!c->no_udp && uflow);
- from_s = uflow->s[ref.flowside.sidei];
-
- if (udp_sock_errs(c, from_s, events) < 0) {
- flow_err(uflow, "Unrecoverable error on reply socket");
- flow_err_details(uflow);
- udp_flow_close(c, uflow);
- return;
- }
-
- if ((n = udp_sock_recv(c, from_s, events, udp_mh_recv)) <= 0)
- return;
-
- flow_trace(uflow, "Received %d datagrams on reply socket", n);
- uflow->ts = now->tv_sec;
-
- for (i = 0; i < n; i++) {
- if (pif_is_socket(topif))
- udp_splice_prepare(udp_mh_recv, i);
- else if (topif == PIF_TAP)
- udp_tap_prepare(udp_mh_recv, i, toside, false);
- /* Restore sockaddr length clobbered by recvmsg() */
- udp_mh_recv[i].msg_hdr.msg_namelen = sizeof(udp_meta[i].s_in);
- }
-
- if (pif_is_socket(topif)) {
- udp_splice_send(c, 0, n, tosidx);
- } else if (topif == PIF_TAP) {
- tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n);
- } else {
- uint8_t frompif = pif_at_sidx(ref.flowside);
-
- flow_err(uflow, "No support for forwarding UDP from %s to %s",
- pif_name(frompif), pif_name(topif));
+ if (events & EPOLLERR) {
+ if (udp_sock_errs(c, ref.fd, ref.flowside, PIF_NONE, 0) < 0) {
+ flow_err(uflow, "Unrecoverable error on flow socket");
+ goto fail;
+ }
}
-}
-/**
- * udp_reply_sock_handler() - Handle new data from flow specific socket
- * @c: Execution context
- * @ref: epoll reference
- * @events: epoll events bitmap
- * @now: Current timestamp
- */
-void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
- uint32_t events, const struct timespec *now)
-{
- if (c->mode == MODE_VU) {
- udp_vu_reply_sock_handler(c, ref, events, now);
- return;
+ if (events & EPOLLIN) {
+ /* For not entirely clear reasons (data locality?) pasta gets
+ * better throughput if we receive tap datagrams one at a
+ * time. For small splice datagrams throughput is slightly
+ * better if we do batch, but it's slightly worse for large
+ * splice datagrams. Since we don't know the size before we
+ * receive, always go one at a time for pasta mode.
+ */
+ size_t n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES);
+ flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside);
+ uint8_t topif = pif_at_sidx(tosidx);
+ int s = ref.fd;
+
+ flow_trace(uflow, "Received data on reply socket");
+ uflow->ts = now->tv_sec;
+
+ if (pif_is_socket(topif)) {
+ udp_sock_to_sock(c, ref.fd, n, tosidx);
+ } else if (topif == PIF_TAP) {
+ if (c->mode == MODE_VU) {
+ udp_vu_sock_to_tap(c, s, UDP_MAX_FRAMES,
+ tosidx);
+ } else {
+ udp_buf_sock_to_tap(c, s, n, tosidx);
+ }
+ } else {
+ flow_err(uflow,
+ "No support for forwarding UDP from %s to %s",
+ pif_name(pif_at_sidx(ref.flowside)),
+ pif_name(topif));
+ goto fail;
+ }
}
+ return;
- udp_buf_reply_sock_handler(c, ref, events, now);
+fail:
+ flow_err_details(uflow);
+ udp_flow_close(c, uflow);
}
/**
@@ -720,6 +960,7 @@ void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
* @af: Address family, AF_INET or AF_INET6
* @saddr: Source address
* @daddr: Destination address
+ * @ttl: TTL or hop limit for packets to be sent in this call
* @p: Pool of UDP packets, with UDP headers
* @idx: Index of first packet to process
* @now: Current timestamp
@@ -730,7 +971,8 @@ void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref,
*/
int udp_tap_handler(const struct ctx *c, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
- const struct pool *p, int idx, const struct timespec *now)
+ uint8_t ttl, const struct pool *p, int idx,
+ const struct timespec *now)
{
const struct flowside *toside;
struct mmsghdr mm[UIO_MAXIOV];
@@ -778,7 +1020,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
}
toside = flowside_at_sidx(tosidx);
- s = udp_at_sidx(tosidx)->s[tosidx.sidei];
+ s = uflow->s[tosidx.sidei];
ASSERT(s >= 0);
pif_sockaddr(c, &to_sa, &sl, topif, &toside->eaddr, toside->eport);
@@ -809,6 +1051,24 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
mm[i].msg_hdr.msg_controllen = 0;
mm[i].msg_hdr.msg_flags = 0;
+ if (ttl != uflow->ttl[tosidx.sidei]) {
+ uflow->ttl[tosidx.sidei] = ttl;
+ if (af == AF_INET) {
+ if (setsockopt(s, IPPROTO_IP, IP_TTL,
+ &ttl, sizeof(ttl)) < 0)
+ flow_perror(uflow,
+ "setsockopt IP_TTL");
+ } else {
+ /* IPv6 hop_limit cannot be only 1 byte */
+ int hop_limit = ttl;
+
+ if (setsockopt(s, SOL_IPV6, IPV6_UNICAST_HOPS,
+ &hop_limit, sizeof(hop_limit)) < 0)
+ flow_perror(uflow,
+ "setsockopt IPV6_UNICAST_HOPS");
+ }
+ }
+
count++;
}