diff options
Diffstat (limited to 'udp.c')
-rw-r--r-- | udp.c | 702 |
1 files changed, 481 insertions, 221 deletions
@@ -39,27 +39,30 @@ * could receive packets from multiple flows, so we use a hash table match to * find the specific flow for a datagram. * - * When a UDP flow is initiated from a listening socket we take a duplicate of - * the socket and store it in uflow->s[INISIDE]. This will last for the - * lifetime of the flow, even if the original listening socket is closed due to - * port auto-probing. The duplicate is used to deliver replies back to the - * originating side. + * Flow sockets + * ============ * - * Reply sockets - * ============= - * - * When a UDP flow targets a socket, we create a "reply" socket in + * When a UDP flow targets a socket, we create a "flow" socket in * uflow->s[TGTSIDE] both to deliver datagrams to the target side and receive * replies on the target side. This socket is both bound and connected and has - * EPOLL_TYPE_UDP_REPLY. The connect() means it will only receive datagrams + * EPOLL_TYPE_UDP. The connect() means it will only receive datagrams * associated with this flow, so the epoll reference directly points to the flow * and we don't need a hash lookup. * - * NOTE: it's possible that the reply socket could have a bound address - * overlapping with an unrelated listening socket. We assume datagrams for the - * flow will come to the reply socket in preference to a listening socket. The - * sample program doc/platform-requirements/reuseaddr-priority.c documents and - * tests that assumption. + * When a flow is initiated from a listening socket, we create a "flow" socket + * with the same bound address as the listening socket, but also connect()ed to + * the flow's peer. This is stored in uflow->s[INISIDE] and will last for the + * lifetime of the flow, even if the original listening socket is closed due to + * port auto-probing. The duplicate is used to deliver replies back to the + * originating side. + * + * NOTE: A flow socket can have a bound address overlapping with a listening + * socket. That will happen naturally for flows initiated from a socket, but is + * also possible (though unlikely) for tap initiated flows, depending on the + * source port. We assume datagrams for the flow will come to a connect()ed + * socket in preference to a listening socket. The sample program + * doc/platform-requirements/reuseaddr-priority.c documents and tests that + * assumption. * * "Spliced" flows * =============== @@ -71,8 +74,7 @@ * actually used; it doesn't make sense for datagrams and instead a pair of * recvmmsg() and sendmmsg() is used to forward the datagrams. * - * Note that a spliced flow will have *both* a duplicated listening socket and a - * reply socket (see above). + * Note that a spliced flow will have two flow sockets (see above). */ #include <sched.h> @@ -87,6 +89,8 @@ #include <netinet/in.h> #include <netinet/ip.h> #include <netinet/udp.h> +#include <netinet/ip_icmp.h> +#include <netinet/icmp6.h> #include <stdint.h> #include <stddef.h> #include <string.h> @@ -112,6 +116,14 @@ #include "udp_internal.h" #include "udp_vu.h" +#define UDP_MAX_FRAMES 32 /* max # of frames to receive at once */ + +/* Maximum UDP data to be returned in ICMP messages */ +#define ICMP4_MAX_DLEN 8 +#define ICMP6_MAX_DLEN (IPV6_MIN_MTU \ + - sizeof(struct udphdr) \ + - sizeof(struct ipv6hdr)) + /* "Spliced" sockets indexed by bound port (host order) */ static int udp_splice_ns [IP_VERSIONS][NUM_PORTS]; static int udp_splice_init[IP_VERSIONS][NUM_PORTS]; @@ -128,26 +140,31 @@ static struct ethhdr udp4_eth_hdr; static struct ethhdr udp6_eth_hdr; /** - * struct udp_meta_t - Pre-cooked headers and metadata for UDP packets + * struct udp_meta_t - Pre-cooked headers for UDP packets * @ip6h: Pre-filled IPv6 header (except for payload_len and addresses) * @ip4h: Pre-filled IPv4 header (except for tot_len and saddr) * @taph: Tap backend specific header - * @s_in: Source socket address, filled in by recvmmsg() - * @tosidx: sidx for the destination side of this datagram's flow */ static struct udp_meta_t { struct ipv6hdr ip6h; struct iphdr ip4h; struct tap_hdr taph; - - union sockaddr_inany s_in; - flow_sidx_t tosidx; } #ifdef __AVX2__ __attribute__ ((aligned(32))) #endif udp_meta[UDP_MAX_FRAMES]; +#define PKTINFO_SPACE \ + MAX(CMSG_SPACE(sizeof(struct in_pktinfo)), \ + CMSG_SPACE(sizeof(struct in6_pktinfo))) + +#define RECVERR_SPACE \ + MAX(CMSG_SPACE(sizeof(struct sock_extended_err) + \ + sizeof(struct sockaddr_in)), \ + CMSG_SPACE(sizeof(struct sock_extended_err) + \ + sizeof(struct sockaddr_in6))) + /** * enum udp_iov_idx - Indices for the buffers making up a single UDP frame * @UDP_IOV_TAP tap specific header @@ -224,8 +241,6 @@ static void udp_iov_init_one(const struct ctx *c, size_t i) tiov[UDP_IOV_TAP] = tap_hdr_iov(c, &meta->taph); tiov[UDP_IOV_PAYLOAD].iov_base = payload; - mh->msg_name = &meta->s_in; - mh->msg_namelen = sizeof(meta->s_in); mh->msg_iov = siov; mh->msg_iovlen = 1; } @@ -246,41 +261,6 @@ static void udp_iov_init(const struct ctx *c) } /** - * udp_splice_prepare() - Prepare one datagram for splicing - * @mmh: Receiving mmsghdr array - * @idx: Index of the datagram to prepare - */ -static void udp_splice_prepare(struct mmsghdr *mmh, unsigned idx) -{ - udp_mh_splice[idx].msg_hdr.msg_iov->iov_len = mmh[idx].msg_len; -} - -/** - * udp_splice_send() - Send a batch of datagrams from socket to socket - * @c: Execution context - * @start: Index of batch's first datagram in udp[46]_l2_buf - * @n: Number of datagrams in batch - * @src: Source port for datagram (target side) - * @dst: Destination port for datagrams (target side) - * @ref: epoll reference for origin socket - * @now: Timestamp - */ -static void udp_splice_send(const struct ctx *c, size_t start, size_t n, - flow_sidx_t tosidx) -{ - const struct flowside *toside = flowside_at_sidx(tosidx); - const struct udp_flow *uflow = udp_at_sidx(tosidx); - uint8_t topif = pif_at_sidx(tosidx); - int s = uflow->s[tosidx.sidei]; - socklen_t sl; - - pif_sockaddr(c, &udp_splice_to, &sl, topif, - &toside->eaddr, toside->eport); - - sendmmsg(s, udp_mh_splice + start, n, MSG_NOSIGNAL); -} - -/** * udp_update_hdr4() - Update headers for one IPv4 datagram * @ip4h: Pre-filled IPv4 header (except for tot_len and saddr) * @bp: Pointer to udp_payload_t to update @@ -403,27 +383,171 @@ static void udp_tap_prepare(const struct mmsghdr *mmh, } /** + * udp_send_tap_icmp4() - Construct and send ICMPv4 to local peer + * @c: Execution context + * @ee: Extended error descriptor + * @toside: Destination side of flow + * @saddr: Address of ICMP generating node + * @in: First bytes (max 8) of original UDP message body + * @dlen: Length of the read part of original UDP message body + */ +static void udp_send_tap_icmp4(const struct ctx *c, + const struct sock_extended_err *ee, + const struct flowside *toside, + struct in_addr saddr, + const void *in, size_t dlen) +{ + struct in_addr oaddr = toside->oaddr.v4mapped.a4; + struct in_addr eaddr = toside->eaddr.v4mapped.a4; + in_port_t eport = toside->eport; + in_port_t oport = toside->oport; + struct { + struct icmphdr icmp4h; + struct iphdr ip4h; + struct udphdr uh; + char data[ICMP4_MAX_DLEN]; + } __attribute__((packed, aligned(__alignof__(max_align_t)))) msg; + size_t msglen = sizeof(msg) - sizeof(msg.data) + dlen; + size_t l4len = dlen + sizeof(struct udphdr); + + ASSERT(dlen <= ICMP4_MAX_DLEN); + memset(&msg, 0, sizeof(msg)); + msg.icmp4h.type = ee->ee_type; + msg.icmp4h.code = ee->ee_code; + if (ee->ee_type == ICMP_DEST_UNREACH && ee->ee_code == ICMP_FRAG_NEEDED) + msg.icmp4h.un.frag.mtu = htons((uint16_t) ee->ee_info); + + /* Reconstruct the original headers as returned in the ICMP message */ + tap_push_ip4h(&msg.ip4h, eaddr, oaddr, l4len, IPPROTO_UDP); + tap_push_uh4(&msg.uh, eaddr, eport, oaddr, oport, in, dlen); + memcpy(&msg.data, in, dlen); + + tap_icmp4_send(c, saddr, eaddr, &msg, msglen); +} + + +/** + * udp_send_tap_icmp6() - Construct and send ICMPv6 to local peer + * @c: Execution context + * @ee: Extended error descriptor + * @toside: Destination side of flow + * @saddr: Address of ICMP generating node + * @in: First bytes (max 1232) of original UDP message body + * @dlen: Length of the read part of original UDP message body + * @flow: IPv6 flow identifier + */ +static void udp_send_tap_icmp6(const struct ctx *c, + const struct sock_extended_err *ee, + const struct flowside *toside, + const struct in6_addr *saddr, + void *in, size_t dlen, uint32_t flow) +{ + const struct in6_addr *oaddr = &toside->oaddr.a6; + const struct in6_addr *eaddr = &toside->eaddr.a6; + in_port_t eport = toside->eport; + in_port_t oport = toside->oport; + struct { + struct icmp6_hdr icmp6h; + struct ipv6hdr ip6h; + struct udphdr uh; + char data[ICMP6_MAX_DLEN]; + } __attribute__((packed, aligned(__alignof__(max_align_t)))) msg; + size_t msglen = sizeof(msg) - sizeof(msg.data) + dlen; + size_t l4len = dlen + sizeof(struct udphdr); + + ASSERT(dlen <= ICMP6_MAX_DLEN); + memset(&msg, 0, sizeof(msg)); + msg.icmp6h.icmp6_type = ee->ee_type; + msg.icmp6h.icmp6_code = ee->ee_code; + if (ee->ee_type == ICMP6_PACKET_TOO_BIG) + msg.icmp6h.icmp6_dataun.icmp6_un_data32[0] = htonl(ee->ee_info); + + /* Reconstruct the original headers as returned in the ICMP message */ + tap_push_ip6h(&msg.ip6h, eaddr, oaddr, l4len, IPPROTO_UDP, flow); + tap_push_uh6(&msg.uh, eaddr, eport, oaddr, oport, in, dlen); + memcpy(&msg.data, in, dlen); + + tap_icmp6_send(c, saddr, eaddr, &msg, msglen); +} + +/** + * udp_pktinfo() - Retrieve packet destination address from cmsg + * @msg: msghdr into which message has been received + * @dst: (Local) destination address of message in @msg (output) + * + * Return: 0 on success, -1 if the information was missing (@dst is set to + * inany_any6). + */ +static int udp_pktinfo(struct msghdr *msg, union inany_addr *dst) +{ + struct cmsghdr *hdr; + + for (hdr = CMSG_FIRSTHDR(msg); hdr; hdr = CMSG_NXTHDR(msg, hdr)) { + if (hdr->cmsg_level == IPPROTO_IP && + hdr->cmsg_type == IP_PKTINFO) { + const struct in_pktinfo *i4 = (void *)CMSG_DATA(hdr); + + *dst = inany_from_v4(i4->ipi_addr); + return 0; + } + + if (hdr->cmsg_level == IPPROTO_IPV6 && + hdr->cmsg_type == IPV6_PKTINFO) { + const struct in6_pktinfo *i6 = (void *)CMSG_DATA(hdr); + + dst->a6 = i6->ipi6_addr; + return 0; + } + } + + debug("Missing PKTINFO cmsg on datagram"); + *dst = inany_any6; + return -1; +} + +/** * udp_sock_recverr() - Receive and clear an error from a socket - * @s: Socket to receive from + * @c: Execution context + * @s: Socket to receive errors from + * @sidx: Flow and side of @s, or FLOW_SIDX_NONE if unknown + * @pif: Interface on which the error occurred + * (only used if @sidx == FLOW_SIDX_NONE) + * @port: Local port number of @s (only used if @sidx == FLOW_SIDX_NONE) * * Return: 1 if error received and processed, 0 if no more errors in queue, < 0 * if there was an error reading the queue * * #syscalls recvmsg */ -static int udp_sock_recverr(int s) +static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx, + uint8_t pif, in_port_t port) { + char buf[PKTINFO_SPACE + RECVERR_SPACE]; const struct sock_extended_err *ee; - const struct cmsghdr *hdr; - char buf[CMSG_SPACE(sizeof(*ee))]; + char data[ICMP6_MAX_DLEN]; + struct cmsghdr *hdr; + struct iovec iov = { + .iov_base = data, + .iov_len = sizeof(data) + }; + union sockaddr_inany src; struct msghdr mh = { - .msg_name = NULL, - .msg_namelen = 0, - .msg_iov = NULL, - .msg_iovlen = 0, + .msg_name = &src, + .msg_namelen = sizeof(src), + .msg_iov = &iov, + .msg_iovlen = 1, .msg_control = buf, .msg_controllen = sizeof(buf), }; + const struct flowside *fromside, *toside; + union inany_addr offender, otap; + char astr[INANY_ADDRSTRLEN]; + char sastr[SOCKADDR_STRLEN]; + const struct in_addr *o4; + in_port_t offender_port; + struct udp_flow *uflow; + uint8_t topif; + size_t dlen; ssize_t rc; rc = recvmsg(s, &mh, MSG_ERRQUEUE); @@ -440,33 +564,102 @@ static int udp_sock_recverr(int s) return -1; } - hdr = CMSG_FIRSTHDR(&mh); - if (!((hdr->cmsg_level == IPPROTO_IP && - hdr->cmsg_type == IP_RECVERR) || - (hdr->cmsg_level == IPPROTO_IPV6 && - hdr->cmsg_type == IPV6_RECVERR))) { - err("Unexpected cmsg reading error queue"); + for (hdr = CMSG_FIRSTHDR(&mh); hdr; hdr = CMSG_NXTHDR(&mh, hdr)) { + if ((hdr->cmsg_level == IPPROTO_IP && + hdr->cmsg_type == IP_RECVERR) || + (hdr->cmsg_level == IPPROTO_IPV6 && + hdr->cmsg_type == IPV6_RECVERR)) + break; + } + + if (!hdr) { + err("Missing RECVERR cmsg in error queue"); return -1; } ee = (const struct sock_extended_err *)CMSG_DATA(hdr); - /* TODO: When possible propagate and otherwise handle errors */ debug("%s error on UDP socket %i: %s", str_ee_origin(ee), s, strerror_(ee->ee_errno)); + if (!flow_sidx_valid(sidx)) { + /* No hint from the socket, determine flow from addresses */ + union inany_addr dst; + + if (udp_pktinfo(&mh, &dst) < 0) { + debug("Missing PKTINFO on UDP error"); + return 1; + } + + sidx = flow_lookup_sa(c, IPPROTO_UDP, pif, &src, &dst, port); + if (!flow_sidx_valid(sidx)) { + debug("Ignoring UDP error without flow"); + return 1; + } + } else { + pif = pif_at_sidx(sidx); + } + + uflow = udp_at_sidx(sidx); + ASSERT(uflow); + fromside = &uflow->f.side[sidx.sidei]; + toside = &uflow->f.side[!sidx.sidei]; + topif = uflow->f.pif[!sidx.sidei]; + dlen = rc; + + if (inany_from_sockaddr(&offender, &offender_port, + SO_EE_OFFENDER(ee)) < 0) + goto fail; + + if (pif != PIF_HOST || topif != PIF_TAP) + /* XXX Can we support any other cases? */ + goto fail; + + /* If the offender *is* the endpoint, make sure our translation is + * consistent with the flow's translation. This matters if the flow + * endpoint has a port specific translation (like --dns-match). + */ + if (inany_equals(&offender, &fromside->eaddr)) + otap = toside->oaddr; + else if (!nat_inbound(c, &offender, &otap)) + goto fail; + + if (hdr->cmsg_level == IPPROTO_IP && + (o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) { + dlen = MIN(dlen, ICMP4_MAX_DLEN); + udp_send_tap_icmp4(c, ee, toside, *o4, data, dlen); + return 1; + } + + if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) { + udp_send_tap_icmp6(c, ee, toside, &otap.a6, data, dlen, + FLOW_IDX(uflow)); + return 1; + } + +fail: + flow_dbg(uflow, "Can't propagate %s error from %s %s to %s %s", + str_ee_origin(ee), + pif_name(pif), + sockaddr_ntop(SO_EE_OFFENDER(ee), sastr, sizeof(sastr)), + pif_name(topif), + inany_ntop(&toside->eaddr, astr, sizeof(astr))); return 1; } /** * udp_sock_errs() - Process errors on a socket * @c: Execution context - * @s: Socket to receive from - * @events: epoll events bitmap + * @s: Socket to receive errors from + * @sidx: Flow and side of @s, or FLOW_SIDX_NONE if unknown + * @pif: Interface on which the error occurred + * (only used if @sidx == FLOW_SIDX_NONE) + * @port: Local port number of @s (only used if @sidx == FLOW_SIDX_NONE) * * Return: Number of errors handled, or < 0 if we have an unrecoverable error */ -int udp_sock_errs(const struct ctx *c, int s, uint32_t events) +static int udp_sock_errs(const struct ctx *c, int s, flow_sidx_t sidx, + uint8_t pif, in_port_t port) { unsigned n_err = 0; socklen_t errlen; @@ -474,11 +667,8 @@ int udp_sock_errs(const struct ctx *c, int s, uint32_t events) ASSERT(!c->no_udp); - if (!(events & EPOLLERR)) - return 0; /* Nothing to do */ - /* Empty the error queue */ - while ((rc = udp_sock_recverr(s)) > 0) + while ((rc = udp_sock_recverr(c, s, sidx, pif, port)) > 0) n_err += rc; if (rc < 0) @@ -506,36 +696,61 @@ int udp_sock_errs(const struct ctx *c, int s, uint32_t events) } /** + * udp_peek_addr() - Get source address for next packet + * @s: Socket to get information from + * @src: Socket address (output) + * @dst: (Local) destination address (output) + * + * Return: 0 if no more packets, 1 on success, -ve error code on error + */ +static int udp_peek_addr(int s, union sockaddr_inany *src, + union inany_addr *dst) +{ + char sastr[SOCKADDR_STRLEN], dstr[INANY_ADDRSTRLEN]; + char cmsg[PKTINFO_SPACE]; + struct msghdr msg = { + .msg_name = src, + .msg_namelen = sizeof(*src), + .msg_control = cmsg, + .msg_controllen = sizeof(cmsg), + }; + int rc; + + rc = recvmsg(s, &msg, MSG_PEEK | MSG_DONTWAIT); + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; + return -errno; + } + + udp_pktinfo(&msg, dst); + + trace("Peeked UDP datagram: %s -> %s", + sockaddr_ntop(src, sastr, sizeof(sastr)), + inany_ntop(dst, dstr, sizeof(dstr))); + + return 1; +} + +/** * udp_sock_recv() - Receive datagrams from a socket * @c: Execution context * @s: Socket to receive from - * @events: epoll events bitmap - * @mmh mmsghdr array to receive into + * @mmh: mmsghdr array to receive into + * @n: Maximum number of datagrams to receive * * Return: Number of datagrams received * * #syscalls recvmmsg arm:recvmmsg_time64 i686:recvmmsg_time64 */ -static int udp_sock_recv(const struct ctx *c, int s, uint32_t events, - struct mmsghdr *mmh) +static int udp_sock_recv(const struct ctx *c, int s, struct mmsghdr *mmh, int n) { - /* For not entirely clear reasons (data locality?) pasta gets better - * throughput if we receive tap datagrams one at a atime. For small - * splice datagrams throughput is slightly better if we do batch, but - * it's slightly worse for large splice datagrams. Since we don't know - * before we receive whether we'll use tap or splice, always go one at a - * time for pasta mode. - */ - int n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES); - ASSERT(!c->no_udp); - if (!(events & EPOLLIN)) - return 0; - n = recvmmsg(s, mmh, n, 0, NULL); if (n < 0) { - err_perror("Error receiving datagrams"); + trace("Error receiving datagrams: %s", strerror_(errno)); + /* Bail out and let the EPOLLERR handler deal with it */ return 0; } @@ -543,78 +758,121 @@ static int udp_sock_recv(const struct ctx *c, int s, uint32_t events, } /** - * udp_buf_listen_sock_handler() - Handle new data from socket + * udp_sock_to_sock() - Forward datagrams from socket to socket * @c: Execution context - * @ref: epoll reference - * @events: epoll events bitmap - * @now: Current timestamp + * @from_s: Socket to receive datagrams from + * @n: Maximum number of datagrams to forward + * @tosidx: Flow & side to forward datagrams to * - * #syscalls recvmmsg + * #syscalls sendmmsg */ -static void udp_buf_listen_sock_handler(const struct ctx *c, - union epoll_ref ref, uint32_t events, - const struct timespec *now) +static void udp_sock_to_sock(const struct ctx *c, int from_s, int n, + flow_sidx_t tosidx) { - const socklen_t sasize = sizeof(udp_meta[0].s_in); - int n, i; + const struct flowside *toside = flowside_at_sidx(tosidx); + const struct udp_flow *uflow = udp_at_sidx(tosidx); + uint8_t topif = pif_at_sidx(tosidx); + int to_s = uflow->s[tosidx.sidei]; + socklen_t sl; + int i; - if (udp_sock_errs(c, ref.fd, events) < 0) { - err("UDP: Unrecoverable error on listening socket:" - " (%s port %hu)", pif_name(ref.udp.pif), ref.udp.port); - /* FIXME: what now? close/re-open socket? */ + if ((n = udp_sock_recv(c, from_s, udp_mh_recv, n)) <= 0) return; + + for (i = 0; i < n; i++) { + udp_mh_splice[i].msg_hdr.msg_iov->iov_len + = udp_mh_recv[i].msg_len; } - if ((n = udp_sock_recv(c, ref.fd, events, udp_mh_recv)) <= 0) + pif_sockaddr(c, &udp_splice_to, &sl, topif, + &toside->eaddr, toside->eport); + + sendmmsg(to_s, udp_mh_splice, n, MSG_NOSIGNAL); +} + +/** + * udp_buf_sock_to_tap() - Forward datagrams from socket to tap + * @c: Execution context + * @s: Socket to read data from + * @n: Maximum number of datagrams to forward + * @tosidx: Flow & side to forward data from @s to + */ +static void udp_buf_sock_to_tap(const struct ctx *c, int s, int n, + flow_sidx_t tosidx) +{ + const struct flowside *toside = flowside_at_sidx(tosidx); + int i; + + if ((n = udp_sock_recv(c, s, udp_mh_recv, n)) <= 0) return; - /* We divide datagrams into batches based on how we need to send them, - * determined by udp_meta[i].tosidx. To avoid either two passes through - * the array, or recalculating tosidx for a single entry, we have to - * populate it one entry *ahead* of the loop counter. - */ - udp_meta[0].tosidx = udp_flow_from_sock(c, ref, &udp_meta[0].s_in, now); - udp_mh_recv[0].msg_hdr.msg_namelen = sasize; - for (i = 0; i < n; ) { - flow_sidx_t batchsidx = udp_meta[i].tosidx; - uint8_t batchpif = pif_at_sidx(batchsidx); - int batchstart = i; - - do { - if (pif_is_socket(batchpif)) { - udp_splice_prepare(udp_mh_recv, i); - } else if (batchpif == PIF_TAP) { - udp_tap_prepare(udp_mh_recv, i, - flowside_at_sidx(batchsidx), - false); + for (i = 0; i < n; i++) + udp_tap_prepare(udp_mh_recv, i, toside, false); + + tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n); +} + +/** + * udp_sock_fwd() - Forward datagrams from a possibly unconnected socket + * @c: Execution context + * @s: Socket to forward from + * @frompif: Interface to which @s belongs + * @port: Our (local) port number of @s + * @now: Current timestamp + */ +void udp_sock_fwd(const struct ctx *c, int s, uint8_t frompif, + in_port_t port, const struct timespec *now) +{ + union sockaddr_inany src; + union inany_addr dst; + int rc; + + while ((rc = udp_peek_addr(s, &src, &dst)) != 0) { + bool discard = false; + flow_sidx_t tosidx; + uint8_t topif; + + if (rc < 0) { + trace("Error peeking at socket address: %s", + strerror_(-rc)); + /* Clear errors & carry on */ + if (udp_sock_errs(c, s, FLOW_SIDX_NONE, + frompif, port) < 0) { + err( +"UDP: Unrecoverable error on listening socket: (%s port %hu)", + pif_name(frompif), port); + /* FIXME: what now? close/re-open socket? */ } + continue; + } + + tosidx = udp_flow_from_sock(c, frompif, &dst, port, &src, now); + topif = pif_at_sidx(tosidx); - if (++i >= n) - break; - - udp_meta[i].tosidx = udp_flow_from_sock(c, ref, - &udp_meta[i].s_in, - now); - udp_mh_recv[i].msg_hdr.msg_namelen = sasize; - } while (flow_sidx_eq(udp_meta[i].tosidx, batchsidx)); - - if (pif_is_socket(batchpif)) { - udp_splice_send(c, batchstart, i - batchstart, - batchsidx); - } else if (batchpif == PIF_TAP) { - tap_send_frames(c, &udp_l2_iov[batchstart][0], - UDP_NUM_IOVS, i - batchstart); - } else if (flow_sidx_valid(batchsidx)) { - flow_sidx_t fromsidx = flow_sidx_opposite(batchsidx); - struct udp_flow *uflow = udp_at_sidx(batchsidx); + if (pif_is_socket(topif)) { + udp_sock_to_sock(c, s, 1, tosidx); + } else if (topif == PIF_TAP) { + if (c->mode == MODE_VU) + udp_vu_sock_to_tap(c, s, 1, tosidx); + else + udp_buf_sock_to_tap(c, s, 1, tosidx); + } else if (flow_sidx_valid(tosidx)) { + struct udp_flow *uflow = udp_at_sidx(tosidx); flow_err(uflow, "No support for forwarding UDP from %s to %s", - pif_name(pif_at_sidx(fromsidx)), - pif_name(batchpif)); + pif_name(frompif), pif_name(topif)); + discard = true; } else { - debug("Discarding %d datagrams without flow", - i - batchstart); + debug("Discarding datagram without flow"); + discard = true; + } + + if (discard) { + struct msghdr msg = { 0 }; + + if (recvmsg(s, &msg, MSG_DONTWAIT) < 0) + debug_perror("Failed to discard datagram"); } } } @@ -630,87 +888,69 @@ void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events, const struct timespec *now) { - if (c->mode == MODE_VU) { - udp_vu_listen_sock_handler(c, ref, events, now); - return; - } - - udp_buf_listen_sock_handler(c, ref, events, now); + if (events & (EPOLLERR | EPOLLIN)) + udp_sock_fwd(c, ref.fd, ref.udp.pif, ref.udp.port, now); } /** - * udp_buf_reply_sock_handler() - Handle new data from flow specific socket + * udp_sock_handler() - Handle new data from flow specific socket * @c: Execution context * @ref: epoll reference * @events: epoll events bitmap * @now: Current timestamp - * - * #syscalls recvmmsg */ -static void udp_buf_reply_sock_handler(const struct ctx *c, union epoll_ref ref, - uint32_t events, - const struct timespec *now) +void udp_sock_handler(const struct ctx *c, union epoll_ref ref, + uint32_t events, const struct timespec *now) { - flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside); - const struct flowside *toside = flowside_at_sidx(tosidx); struct udp_flow *uflow = udp_at_sidx(ref.flowside); - uint8_t topif = pif_at_sidx(tosidx); - int n, i, from_s; ASSERT(!c->no_udp && uflow); - from_s = uflow->s[ref.flowside.sidei]; - - if (udp_sock_errs(c, from_s, events) < 0) { - flow_err(uflow, "Unrecoverable error on reply socket"); - flow_err_details(uflow); - udp_flow_close(c, uflow); - return; - } - - if ((n = udp_sock_recv(c, from_s, events, udp_mh_recv)) <= 0) - return; - - flow_trace(uflow, "Received %d datagrams on reply socket", n); - uflow->ts = now->tv_sec; - - for (i = 0; i < n; i++) { - if (pif_is_socket(topif)) - udp_splice_prepare(udp_mh_recv, i); - else if (topif == PIF_TAP) - udp_tap_prepare(udp_mh_recv, i, toside, false); - /* Restore sockaddr length clobbered by recvmsg() */ - udp_mh_recv[i].msg_hdr.msg_namelen = sizeof(udp_meta[i].s_in); - } - - if (pif_is_socket(topif)) { - udp_splice_send(c, 0, n, tosidx); - } else if (topif == PIF_TAP) { - tap_send_frames(c, &udp_l2_iov[0][0], UDP_NUM_IOVS, n); - } else { - uint8_t frompif = pif_at_sidx(ref.flowside); - - flow_err(uflow, "No support for forwarding UDP from %s to %s", - pif_name(frompif), pif_name(topif)); + if (events & EPOLLERR) { + if (udp_sock_errs(c, ref.fd, ref.flowside, PIF_NONE, 0) < 0) { + flow_err(uflow, "Unrecoverable error on flow socket"); + goto fail; + } } -} -/** - * udp_reply_sock_handler() - Handle new data from flow specific socket - * @c: Execution context - * @ref: epoll reference - * @events: epoll events bitmap - * @now: Current timestamp - */ -void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, - uint32_t events, const struct timespec *now) -{ - if (c->mode == MODE_VU) { - udp_vu_reply_sock_handler(c, ref, events, now); - return; + if (events & EPOLLIN) { + /* For not entirely clear reasons (data locality?) pasta gets + * better throughput if we receive tap datagrams one at a + * time. For small splice datagrams throughput is slightly + * better if we do batch, but it's slightly worse for large + * splice datagrams. Since we don't know the size before we + * receive, always go one at a time for pasta mode. + */ + size_t n = (c->mode == MODE_PASTA ? 1 : UDP_MAX_FRAMES); + flow_sidx_t tosidx = flow_sidx_opposite(ref.flowside); + uint8_t topif = pif_at_sidx(tosidx); + int s = ref.fd; + + flow_trace(uflow, "Received data on reply socket"); + uflow->ts = now->tv_sec; + + if (pif_is_socket(topif)) { + udp_sock_to_sock(c, ref.fd, n, tosidx); + } else if (topif == PIF_TAP) { + if (c->mode == MODE_VU) { + udp_vu_sock_to_tap(c, s, UDP_MAX_FRAMES, + tosidx); + } else { + udp_buf_sock_to_tap(c, s, n, tosidx); + } + } else { + flow_err(uflow, + "No support for forwarding UDP from %s to %s", + pif_name(pif_at_sidx(ref.flowside)), + pif_name(topif)); + goto fail; + } } + return; - udp_buf_reply_sock_handler(c, ref, events, now); +fail: + flow_err_details(uflow); + udp_flow_close(c, uflow); } /** @@ -720,6 +960,7 @@ void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, * @af: Address family, AF_INET or AF_INET6 * @saddr: Source address * @daddr: Destination address + * @ttl: TTL or hop limit for packets to be sent in this call * @p: Pool of UDP packets, with UDP headers * @idx: Index of first packet to process * @now: Current timestamp @@ -730,7 +971,8 @@ void udp_reply_sock_handler(const struct ctx *c, union epoll_ref ref, */ int udp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af, const void *saddr, const void *daddr, - const struct pool *p, int idx, const struct timespec *now) + uint8_t ttl, const struct pool *p, int idx, + const struct timespec *now) { const struct flowside *toside; struct mmsghdr mm[UIO_MAXIOV]; @@ -778,7 +1020,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif, } toside = flowside_at_sidx(tosidx); - s = udp_at_sidx(tosidx)->s[tosidx.sidei]; + s = uflow->s[tosidx.sidei]; ASSERT(s >= 0); pif_sockaddr(c, &to_sa, &sl, topif, &toside->eaddr, toside->eport); @@ -809,6 +1051,24 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif, mm[i].msg_hdr.msg_controllen = 0; mm[i].msg_hdr.msg_flags = 0; + if (ttl != uflow->ttl[tosidx.sidei]) { + uflow->ttl[tosidx.sidei] = ttl; + if (af == AF_INET) { + if (setsockopt(s, IPPROTO_IP, IP_TTL, + &ttl, sizeof(ttl)) < 0) + flow_perror(uflow, + "setsockopt IP_TTL"); + } else { + /* IPv6 hop_limit cannot be only 1 byte */ + int hop_limit = ttl; + + if (setsockopt(s, SOL_IPV6, IPV6_UNICAST_HOPS, + &hop_limit, sizeof(hop_limit)) < 0) + flow_perror(uflow, + "setsockopt IPV6_UNICAST_HOPS"); + } + } + count++; } |