diff options
-rw-r--r-- | contrib/selinux/passt.te | 4 | ||||
-rw-r--r-- | flow.c | 243 | ||||
-rw-r--r-- | flow.h | 8 | ||||
-rw-r--r-- | migrate.c | 10 | ||||
-rw-r--r-- | passt.c | 6 | ||||
-rw-r--r-- | repair.c | 1 | ||||
-rw-r--r-- | tcp.c | 919 | ||||
-rw-r--r-- | tcp_conn.h | 103 |
8 files changed, 1288 insertions, 6 deletions
diff --git a/contrib/selinux/passt.te b/contrib/selinux/passt.te index fc1320d..f595079 100644 --- a/contrib/selinux/passt.te +++ b/contrib/selinux/passt.te @@ -45,7 +45,7 @@ require { type net_conf_t; type proc_net_t; type node_t; - class tcp_socket { create accept listen name_bind name_connect }; + class tcp_socket { create accept listen name_bind name_connect getattr }; class udp_socket { create accept listen }; class icmp_socket { bind create name_bind node_bind setopt read write }; class sock_file { create unlink write }; @@ -129,7 +129,7 @@ corenet_udp_sendrecv_all_ports(passt_t) allow passt_t node_t:icmp_socket { name_bind node_bind }; allow passt_t port_t:icmp_socket name_bind; -allow passt_t self:tcp_socket { create getopt setopt connect bind listen accept shutdown read write }; +allow passt_t self:tcp_socket { create getopt setopt connect bind listen accept shutdown read write getattr }; allow passt_t self:udp_socket { create getopt setopt connect bind read write }; allow passt_t self:icmp_socket { bind create setopt read write }; @@ -19,6 +19,7 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" +#include "repair.h" const char *flow_state_str[] = { [FLOW_STATE_FREE] = "FREE", @@ -52,6 +53,35 @@ const uint8_t flow_proto[] = { static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, "flow_proto[] doesn't match enum flow_type"); +#define foreach_flow(i, flow, bound) \ + for ((i) = 0, (flow) = &flowtab[(i)]; \ + (i) < (bound); \ + (i)++, (flow) = &flowtab[(i)]) \ + if ((flow)->f.state == FLOW_STATE_FREE) \ + (i) += (flow)->free.n - 1; \ + else + +#define foreach_active_flow(i, flow, bound) \ + foreach_flow((i), (flow), (bound)) \ + if ((flow)->f.state != FLOW_STATE_ACTIVE) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + +#define foreach_tcp_flow(i, flow, bound) \ + foreach_active_flow((i), (flow), (bound)) \ + if ((flow)->f.type != FLOW_TCP) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + +#define foreach_established_tcp_flow(i, flow, bound) \ + foreach_tcp_flow((i), (flow), (bound)) \ + if (!tcp_flow_is_established(&(flow)->tcp)) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + /* Global Flow Table */ /** @@ -875,6 +905,219 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) } /** + * flow_migrate_source_rollback() - Disable repair mode, return failure + * @c: Execution context + * @max_flow: Maximum index of affected flows + * @ret: Negative error code + * + * Return: @ret + */ +static int flow_migrate_source_rollback(struct ctx *c, unsigned max_flow, + int ret) +{ + union flow *flow; + unsigned i; + + debug("...roll back migration"); + + foreach_established_tcp_flow(i, flow, max_flow) + if (tcp_flow_repair_off(c, &flow->tcp)) + die("Failed to roll back TCP_REPAIR mode"); + + if (repair_flush(c)) + die("Failed to roll back TCP_REPAIR mode"); + + return ret; +} + +/** + * flow_migrate_repair_all() - Turn repair mode on or off for all flows + * @c: Execution context + * @enable: Switch repair mode on if set, off otherwise + * + * Return: 0 on success, negative error code on failure + */ +static int flow_migrate_repair_all(struct ctx *c, bool enable) +{ + union flow *flow; + unsigned i; + int rc; + + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + if (enable) + rc = tcp_flow_repair_on(c, &flow->tcp); + else + rc = tcp_flow_repair_off(c, &flow->tcp); + + if (rc) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, i, rc); + } + } + + if ((rc = repair_flush(c))) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, i, rc); + } + + return 0; +} + +/** + * flow_migrate_source_pre() - Prepare flows for migration: enable repair mode + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor (unused) + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + int rc; + + (void)stage; + (void)fd; + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + return 0; +} + +/** + * flow_migrate_source() - Dump all the remaining information and send data + * @c: Execution context (unused) + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count = 0; + bool first = true; + union flow *flow; + unsigned i; + int rc; + + (void)c; + (void)stage; + + foreach_established_tcp_flow(i, flow, FLOW_MAX) + count++; + + count = htonl(count); + if (write_all_buf(fd, &count, sizeof(count))) { + rc = errno; + err_perror("Can't send flow count (%u)", ntohl(count)); + return flow_migrate_source_rollback(c, FLOW_MAX, rc); + } + + debug("Sending %u flows", ntohl(count)); + + /* Dump and send information that can be stored in the flow table. + * + * Limited rollback options here: if we fail to transfer any data (that + * is, on the first flow), undo everything and resume. Otherwise, the + * stream might now be inconsistent, and we might have closed listening + * TCP sockets, so just terminate. + */ + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + rc = tcp_flow_migrate_source(fd, &flow->tcp); + if (rc) { + err("Can't send data, flow %u: %s", i, strerror_(-rc)); + if (!first) + die("Inconsistent migration state, exiting"); + + return flow_migrate_source_rollback(c, FLOW_MAX, -rc); + } + + first = false; + } + + /* And then "extended" data (including window data we saved previously): + * the target needs to set repair mode on sockets before it can set + * this stuff, but it needs sockets (and flows) for that. + * + * This also closes sockets so that the target can start connecting + * theirs: you can't sendmsg() to queues (using the socket) if the + * socket is not connected (EPIPE), not even in repair mode. And the + * target needs to restore queues now because we're sending the data. + * + * So, no rollback here, just try as hard as we can. Tolerate per-flow + * failures but not if the stream might be inconsistent (reported here + * as EIO). + */ + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + rc = tcp_flow_migrate_source_ext(fd, i, &flow->tcp); + if (rc) { + err("Extended data for flow %u: %s", i, strerror_(-rc)); + + if (rc == -EIO) + die("Inconsistent migration state, exiting"); + } + } + + return 0; +} + +/** + * flow_migrate_target() - Receive flows and insert in flow table + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count; + unsigned i; + int rc; + + (void)stage; + + if (read_all_buf(fd, &count, sizeof(count))) + return errno; + + count = ntohl(count); + debug("Receiving %u flows", count); + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + repair_flush(c); + + /* TODO: flow header with type, instead? */ + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target(c, fd); + if (rc) { + debug("Migration data failure at flow %u: %s, abort", + i, strerror_(-rc)); + return -rc; + } + } + + repair_flush(c); + + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target_ext(c, flowtab + i, fd); + if (rc) { + debug("Migration data failure at flow %u: %s, abort", + i, strerror_(-rc)); + return -rc; + } + } + + return 0; +} + +/** * flow_init() - Initialise flow related data structures */ void flow_init(void) @@ -249,6 +249,14 @@ union flow; void flow_init(void); void flow_defer_handler(const struct ctx *c, const struct timespec *now); +int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage, + int fd); +int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage, + int fd); +int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, + int fd); +int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, + int fd); void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) __attribute__((format(printf, 3, 4))); @@ -103,6 +103,16 @@ static const struct migrate_stage stages_v1[] = { .source = seen_addrs_source_v1, .target = seen_addrs_target_v1, }, + { + .name = "prepare flows", + .source = flow_migrate_source_pre, + .target = NULL, + }, + { + .name = "transfer flows", + .source = flow_migrate_source, + .target = flow_migrate_target, + }, { 0 }, }; @@ -223,9 +223,6 @@ int main(int argc, char **argv) if (sigaction(SIGCHLD, &sa, NULL)) die_perror("Couldn't install signal handlers"); - if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) - die_perror("Couldn't set disposition for SIGPIPE"); - c.mode = MODE_PASTA; } else if (strstr(name, "passt")) { c.mode = MODE_PASST; @@ -233,6 +230,9 @@ int main(int argc, char **argv) _exit(EXIT_FAILURE); } + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) + die_perror("Couldn't set disposition for SIGPIPE"); + madvise(pkt_buf, TAP_BUF_BYTES, MADV_HUGEPAGE); c.epollfd = epoll_create1(EPOLL_CLOEXEC); @@ -202,7 +202,6 @@ int repair_flush(struct ctx *c) * * Return: 0 on success, negative error code on failure */ -/* cppcheck-suppress unusedFunction */ int repair_set(struct ctx *c, int s, int cmd) { int rc; @@ -280,6 +280,7 @@ #include <stddef.h> #include <string.h> #include <sys/epoll.h> +#include <sys/ioctl.h> #include <sys/socket.h> #include <sys/timerfd.h> #include <sys/types.h> @@ -287,6 +288,8 @@ #include <time.h> #include <arpa/inet.h> +#include <linux/sockios.h> + #include "checksum.h" #include "util.h" #include "iov.h" @@ -299,6 +302,7 @@ #include "log.h" #include "inany.h" #include "flow.h" +#include "repair.h" #include "linux_dep.h" #include "flow_table.h" @@ -306,6 +310,21 @@ #include "tcp_buf.h" #include "tcp_vu.h" +#ifndef __USE_MISC +/* From Linux UAPI, missing in netinet/tcp.h provided by musl */ +struct tcp_repair_opt { + __u32 opt_code; + __u32 opt_val; +}; + +enum { + TCP_NO_QUEUE, + TCP_RECV_QUEUE, + TCP_SEND_QUEUE, + TCP_QUEUES_NR, +}; +#endif + /* MSS rounding: see SET_MSS() */ #define MSS_DEFAULT 536 #define WINDOW_DEFAULT 14600 /* RFC 6928 */ @@ -326,6 +345,19 @@ ((conn)->events & (SOCK_FIN_RCVD | TAP_FIN_RCVD))) #define CONN_HAS(conn, set) (((conn)->events & (set)) == (set)) +/* Buffers to migrate pending data from send and receive queues. No, they don't + * use memory if we don't use them. And we're going away after this, so splurge. + */ +#define TCP_MIGRATE_SND_QUEUE_MAX (64 << 20) +#define TCP_MIGRATE_RCV_QUEUE_MAX (64 << 20) +uint8_t tcp_migrate_snd_queue [TCP_MIGRATE_SND_QUEUE_MAX]; +uint8_t tcp_migrate_rcv_queue [TCP_MIGRATE_RCV_QUEUE_MAX]; + +#define TCP_MIGRATE_RESTORE_CHUNK_MIN 1024 /* Try smaller when above this */ + +/* "Extended" data (not stored in the flow table) for TCP flow migration */ +static struct tcp_tap_transfer_ext migrate_ext[FLOW_MAX]; + static const char *tcp_event_str[] __attribute((__unused__)) = { "SOCK_ACCEPTED", "TAP_SYN_RCVD", "ESTABLISHED", "TAP_SYN_ACK_SENT", @@ -1468,6 +1500,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af, conn->sock = s; conn->timer = -1; + conn->listening_sock = -1; conn_event(c, conn, TAP_SYN_RCVD); conn->wnd_to_tap = WINDOW_DEFAULT; @@ -1968,10 +2001,27 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af, ack_due = 1; if ((conn->events & TAP_FIN_RCVD) && !(conn->events & SOCK_FIN_SENT)) { + socklen_t sl; + struct tcp_info tinfo; + shutdown(conn->sock, SHUT_WR); conn_event(c, conn, SOCK_FIN_SENT); tcp_send_flag(c, conn, ACK); ack_due = 0; + + /* If we received a FIN, but the socket is in TCP_ESTABLISHED + * state, it must be a migrated socket. The kernel saw the FIN + * on the source socket, but not on the target socket. + * + * Approximate the effect of that FIN: as we're sending a FIN + * out ourselves, the socket is now in a state equivalent to + * LAST_ACK. Now that we sent the FIN out, close it with a RST. + */ + sl = sizeof(tinfo); + getsockopt(conn->sock, SOL_TCP, TCP_INFO, &tinfo, &sl); + if (tinfo.tcpi_state == TCP_ESTABLISHED && + conn->events & SOCK_FIN_RCVD) + goto reset; } if (ack_due) @@ -2054,6 +2104,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow, void tcp_listen_handler(const struct ctx *c, union epoll_ref ref, const struct timespec *now) { + struct tcp_tap_conn *conn; union sockaddr_inany sa; socklen_t sl = sizeof(sa); struct flowside *ini; @@ -2069,6 +2120,9 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref, if (s < 0) goto cancel; + conn = (struct tcp_tap_conn *)flow; + conn->listening_sock = ref.fd; + tcp_sock_set_nodelay(s); /* FIXME: If useful: when the listening port has a specific bound @@ -2634,3 +2688,868 @@ void tcp_timer(struct ctx *c, const struct timespec *now) if (c->mode == MODE_PASTA) tcp_splice_refill(c); } + +/** + * tcp_flow_is_established() - Was the connection established? Includes closing + * @conn: Pointer to the TCP connection structure + * + * Return: true if the connection was established, false otherwise + */ +bool tcp_flow_is_established(const struct tcp_tap_conn *conn) +{ + return conn->events & ESTABLISHED; +} + +/** + * tcp_flow_repair_on() - Enable repair mode for a single TCP flow + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_ON))) + err("Failed to set TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_repair_off() - Clear repair mode for a single TCP flow + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_OFF))) + err("Failed to clear TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t) +{ + struct tcp_info tinfo; + socklen_t sl; + + sl = sizeof(tinfo); + if (getsockopt(s, SOL_TCP, TCP_INFO, &tinfo, &sl)) { + int rc = -errno; + err_perror("Querying TCP_INFO, socket %i", s); + return rc; + } + + t->snd_ws = tinfo.tcpi_snd_wscale; + t->rcv_ws = tinfo.tcpi_rcv_wscale; + t->tcpi_state = tinfo.tcpi_state; + t->tcpi_options = tinfo.tcpi_options; + + return 0; +} + +/** + * tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t) +{ + socklen_t sl = sizeof(t->mss); + + if (getsockopt(s, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) { + int rc = -errno; + err_perror("Getting MSS, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t) +{ + struct tcp_repair_window wnd; + socklen_t sl = sizeof(wnd); + + if (getsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) { + int rc = -errno; + err_perror("Getting window repair data, socket %i", s); + return rc; + } + + t->snd_wl1 = wnd.snd_wl1; + t->snd_wnd = wnd.snd_wnd; + t->max_window = wnd.max_window; + t->rcv_wnd = wnd.rcv_wnd; + t->rcv_wup = wnd.rcv_wup; + + /* If we received a FIN, we also need to adjust window parameters. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK) { + t->rcv_wup--; + t->rcv_wnd++; + } + + return 0; +} + +/** + * tcp_flow_repair_wnd() - Restore window parameters from extended data + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t) +{ + struct tcp_repair_window wnd; + + wnd.snd_wl1 = t->snd_wl1; + wnd.snd_wnd = t->snd_wnd; + wnd.max_window = t->max_window; + wnd.rcv_wnd = t->rcv_wnd; + wnd.rcv_wup = t->rcv_wup; + + if (setsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, sizeof(wnd))) { + int rc = -errno; + err_perror("Setting window data, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_select_queue() - Select queue (receive or send) for next operation + * @s: Socket + * @queue: TCP_RECV_QUEUE or TCP_SEND_QUEUE + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_select_queue(int s, int queue) +{ + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &queue, sizeof(queue))) { + int rc = -errno; + err_perror("Selecting TCP_SEND_QUEUE, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + * + * #syscalls:vu ioctl + */ +static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t) +{ + ssize_t rc; + + if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) { + rc = -errno; + err_perror("Getting send queue size, socket %i", s); + return rc; + } + + if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) { + rc = -errno; + err_perror("Getting not sent count, socket %i", s); + return rc; + } + + /* If we sent a FIN, SIOCOUTQ and SIOCOUTQNSD are one greater than the + * actual pending queue length, because they are based on the sequence + * numbers, not directly on the buffer contents. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->tcpi_state == TCP_FIN_WAIT1 || t->tcpi_state == TCP_FIN_WAIT2 || + t->tcpi_state == TCP_LAST_ACK || t->tcpi_state == TCP_CLOSING) { + if (t->sndq) + t->sndq--; + if (t->notsent) + t->notsent--; + } + + if (t->notsent > t->sndq) { + err("Invalid notsent count socket %i, send: %u, not sent: %u", + s, t->sndq, t->notsent); + return -EINVAL; + } + + if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) { + err("Send queue too large to migrate socket %i: %u bytes", + s, t->sndq); + return -ENOBUFS; + } + + rc = recv(s, tcp_migrate_snd_queue, + MIN(t->sndq, TCP_MIGRATE_SND_QUEUE_MAX), MSG_PEEK); + if (rc < 0) { + if (errno == EAGAIN) { /* EAGAIN means empty */ + rc = 0; + } else { + rc = -errno; + err_perror("Can't read send queue, socket %i", s); + return rc; + } + } + + if ((uint32_t)rc < t->sndq) { + err("Short read migrating send queue"); + return -ENXIO; + } + + t->notsent = MIN(t->notsent, t->sndq); + + return 0; +} + +/** + * tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue + * @s: Socket + * @len: Length of data to be restored + * @buf: Buffer with content of pending data queue + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf) +{ + size_t chunk = len; + uint8_t *p = buf; + + while (len > 0) { + ssize_t rc = send(s, p, MIN(len, chunk), 0); + + if (rc < 0) { + if ((errno == ENOBUFS || errno == ENOMEM) && + chunk >= TCP_MIGRATE_RESTORE_CHUNK_MIN) { + chunk /= 2; + continue; + } + + rc = -errno; + err_perror("Can't write queue, socket %i", s); + return rc; + } + + len -= rc; + p += rc; + } + + return 0; +} + +/** + * tcp_flow_dump_seq() - Dump current sequence of pre-selected queue + * @s: Socket + * @v: Sequence value, set on return + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_seq(int s, uint32_t *v) +{ + socklen_t sl = sizeof(*v); + + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) { + int rc = -errno; + err_perror("Dumping sequence, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_repair_seq() - Restore sequence for pre-selected queue + * @s: Socket + * @v: Sequence value to be set + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_seq(int s, const uint32_t *v) +{ + if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) { + int rc = -errno; + err_perror("Setting sequence, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + * + * #syscalls:vu ioctl + */ +static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t) +{ + ssize_t rc; + + if (ioctl(s, SIOCINQ, &t->rcvq) < 0) { + rc = -errno; + err_perror("Get receive queue size, socket %i", s); + return rc; + } + + /* If we received a FIN, SIOCINQ is one greater than the actual number + * of bytes on the queue, because it's based on the sequence number + * rather than directly on the buffer contents. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->rcvq && + (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK)) + t->rcvq--; + + if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { + err("Receive queue too large to migrate socket %i: %u bytes", + s, t->rcvq); + return -ENOBUFS; + } + + rc = recv(s, tcp_migrate_rcv_queue, t->rcvq, MSG_PEEK); + if (rc < 0) { + if (errno == EAGAIN) { /* EAGAIN means empty */ + rc = 0; + } else { + rc = -errno; + err_perror("Can't read receive queue for socket %i", s); + return rc; + } + } + + if ((uint32_t)rc < t->rcvq) { + err("Short read migrating receive queue"); + return -ENXIO; + } + + return 0; +} + +/** + * tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps) + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t) +{ + const struct tcp_repair_opt opts[] = { + { TCPOPT_WINDOW, t->snd_ws + (t->rcv_ws << 16) }, + { TCPOPT_MAXSEG, t->mss }, + { TCPOPT_SACK_PERMITTED, 0 }, + { TCPOPT_TIMESTAMP, 0 }, + }; + socklen_t sl; + + sl = sizeof(opts[0]) * (2 + + !!(t->tcpi_options & TCPI_OPT_SACK) + + !!(t->tcpi_options & TCPI_OPT_TIMESTAMPS)); + + if (setsockopt(s, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) { + int rc = -errno; + err_perror("Setting repair options, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_migrate_source() - Send data (flow table) for flow, close listening + * @fd: Descriptor for state migration + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn) +{ + struct tcp_tap_transfer t = { + .retrans = conn->retrans, + .ws_from_tap = conn->ws_from_tap, + .ws_to_tap = conn->ws_to_tap, + .events = conn->events, + + .tap_mss = htonl(MSS_GET(conn)), + + .sndbuf = htonl(conn->sndbuf), + + .flags = conn->flags, + .seq_dup_ack_approx = conn->seq_dup_ack_approx, + + .wnd_from_tap = htons(conn->wnd_from_tap), + .wnd_to_tap = htons(conn->wnd_to_tap), + + .seq_to_tap = htonl(conn->seq_to_tap), + .seq_ack_from_tap = htonl(conn->seq_ack_from_tap), + .seq_from_tap = htonl(conn->seq_from_tap), + .seq_ack_to_tap = htonl(conn->seq_ack_to_tap), + .seq_init_from_tap = htonl(conn->seq_init_from_tap), + }; + + memcpy(&t.pif, conn->f.pif, sizeof(t.pif)); + memcpy(&t.side, conn->f.side, sizeof(t.side)); + + if (write_all_buf(fd, &t, sizeof(t))) { + int rc = -errno; + err_perror("Can't write migration data, socket %i", conn->sock); + return rc; + } + + if (conn->listening_sock != -1 && !fcntl(conn->listening_sock, F_GETFD)) + close(conn->listening_sock); + + return 0; +} + +/** + * tcp_flow_migrate_source_ext() - Dump queues, close sockets, send final data + * @fd: Descriptor for state migration + * @fidx: Flow index + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative (not -EIO) on failure, -EIO on sending failure + */ +int tcp_flow_migrate_source_ext(int fd, int fidx, + const struct tcp_tap_conn *conn) +{ + uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; + struct tcp_tap_transfer_ext *t = &migrate_ext[fidx]; + int s = conn->sock; + int rc; + + /* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode + * weird. + */ + if (tcp_set_peek_offset(s, -1)) { + rc = -errno; + goto fail; + } + + if ((rc = tcp_flow_dump_tinfo(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_mss(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_wnd(s, t))) + goto fail; + + if ((rc = tcp_flow_select_queue(s, TCP_SEND_QUEUE))) + goto fail; + + if ((rc = tcp_flow_dump_sndqueue(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_seq(s, &t->seq_snd))) + goto fail; + + if ((rc = tcp_flow_select_queue(s, TCP_RECV_QUEUE))) + goto fail; + + if ((rc = tcp_flow_dump_rcvqueue(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_seq(s, &t->seq_rcv))) + goto fail; + + close(s); + + /* Adjustments unrelated to FIN segments: sequence numbers we dumped are + * based on the end of the queues. + */ + t->seq_rcv -= t->rcvq; + t->seq_snd -= t->sndq; + + debug("Extended migration data, socket %i sequences send %u receive %u", + s, t->seq_snd, t->seq_rcv); + debug(" pending queues: send %u not sent %u receive %u", + t->sndq, t->notsent, t->rcvq); + debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", + t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup); + debug(" SO_PEEK_OFF %s offset=%"PRIu32, + peek_offset_cap ? "enabled" : "disabled", peek_offset); + + /* Endianness fix-ups */ + t->seq_snd = htonl(t->seq_snd); + t->seq_rcv = htonl(t->seq_rcv); + t->sndq = htonl(t->sndq); + t->notsent = htonl(t->notsent); + t->rcvq = htonl(t->rcvq); + + t->snd_wl1 = htonl(t->snd_wl1); + t->snd_wnd = htonl(t->snd_wnd); + t->max_window = htonl(t->max_window); + t->rcv_wnd = htonl(t->rcv_wnd); + t->rcv_wup = htonl(t->rcv_wup); + + if (write_all_buf(fd, t, sizeof(*t))) { + err_perror("Failed to write extended data, socket %i", s); + return -EIO; + } + + if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) { + err_perror("Failed to write send queue data, socket %i", s); + return -EIO; + } + + if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) { + err_perror("Failed to write receive queue data, socket %i", s); + return -EIO; + } + + return 0; + +fail: + /* For any type of failure dumping data, write an invalid extended data + * descriptor that allows us to keep the stream in sync, but tells the + * target to skip the flow. If we fail to transfer data, that's fatal: + * return -EIO in that case (and only in that case). + */ + t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */ + + if (write_all_buf(fd, t, sizeof(*t))) { + err_perror("Failed to write extended data, socket %i", s); + return -EIO; + } + + if (rc == -EIO) /* but not a migration data transfer failure */ + return -ENODATA; + + return rc; +} + +/** + * tcp_flow_repair_socket() - Open and bind socket, request repair mode + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn) +{ + sa_family_t af = CONN_V4(conn) ? AF_INET : AF_INET6; + const struct flowside *sockside = HOSTFLOW(conn); + union sockaddr_inany a; + socklen_t sl; + int s, rc; + + pif_sockaddr(c, &a, &sl, PIF_HOST, &sockside->oaddr, sockside->oport); + + if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, + IPPROTO_TCP)) < 0) { + rc = -errno; + err_perror("Failed to create socket for migrated flow"); + return rc; + } + s = conn->sock; + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int))) + debug_perror("Setting SO_REUSEADDR on socket %i", s); + + tcp_sock_set_nodelay(s); + + if ((rc = bind(s, &a.sa, sizeof(a)))) { + err_perror("Failed to bind socket for migrated flow"); + goto err; + } + + if ((rc = tcp_flow_repair_on(c, conn))) + goto err; + + return 0; + +err: + close(s); + conn->sock = -1; + return rc; +} + +/** + * tcp_flow_repair_connect() - Connect socket in repair mode, then turn it off + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_connect(const struct ctx *c, + struct tcp_tap_conn *conn) +{ + const struct flowside *tgt = HOSTFLOW(conn); + int rc; + + rc = flowside_connect(c, conn->sock, PIF_HOST, tgt); + if (rc) { + rc = -errno; + err_perror("Failed to connect migrated socket %i", conn->sock); + return rc; + } + + conn->in_epoll = 0; + conn->timer = -1; + conn->listening_sock = -1; + + return 0; +} + +/** + * tcp_flow_migrate_target() - Receive data (flow table part) for flow, insert + * @c: Execution context + * @fd: Descriptor for state migration + * + * Return: 0 on success, negative on fatal failure, but 0 on single flow failure + */ +int tcp_flow_migrate_target(struct ctx *c, int fd) +{ + struct tcp_tap_transfer t; + struct tcp_tap_conn *conn; + union flow *flow; + int rc; + + if (!(flow = flow_alloc())) { + err("Flow table full on migration target"); + return 0; + } + + if (read_all_buf(fd, &t, sizeof(t))) { + flow_alloc_cancel(flow); + err_perror("Failed to receive migration data"); + return -errno; + } + + flow->f.state = FLOW_STATE_TGT; + memcpy(&flow->f.pif, &t.pif, sizeof(flow->f.pif)); + memcpy(&flow->f.side, &t.side, sizeof(flow->f.side)); + conn = FLOW_SET_TYPE(flow, FLOW_TCP, tcp); + + conn->retrans = t.retrans; + conn->ws_from_tap = t.ws_from_tap; + conn->ws_to_tap = t.ws_to_tap; + conn->events = t.events; + + conn->sndbuf = htonl(t.sndbuf); + + conn->flags = t.flags; + conn->seq_dup_ack_approx = t.seq_dup_ack_approx; + + MSS_SET(conn, ntohl(t.tap_mss)); + + conn->wnd_from_tap = ntohs(t.wnd_from_tap); + conn->wnd_to_tap = ntohs(t.wnd_to_tap); + + conn->seq_to_tap = ntohl(t.seq_to_tap); + conn->seq_ack_from_tap = ntohl(t.seq_ack_from_tap); + conn->seq_from_tap = ntohl(t.seq_from_tap); + conn->seq_ack_to_tap = ntohl(t.seq_ack_to_tap); + conn->seq_init_from_tap = ntohl(t.seq_init_from_tap); + + if ((rc = tcp_flow_repair_socket(c, conn))) { + flow_err(flow, "Can't set up socket: %s, drop", strerror_(rc)); + flow_alloc_cancel(flow); + return 0; + } + + flow_hash_insert(c, TAP_SIDX(conn)); + FLOW_ACTIVATE(conn); + + return 0; +} + +/** + * tcp_flow_migrate_target_ext() - Receive extended data for flow, set, connect + * @c: Execution context + * @flow: Existing flow for this connection data + * @fd: Descriptor for state migration + * + * Return: 0 on success, negative on fatal failure, but 0 on single flow failure + */ +int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd) +{ + struct tcp_tap_conn *conn = &flow->tcp; + uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; + struct tcp_tap_transfer_ext t; + int s = conn->sock, rc; + + if (read_all_buf(fd, &t, sizeof(t))) { + rc = -errno; + err_perror("Failed to read extended data for socket %i", s); + return rc; + } + + if (!t.tcpi_state) { /* Source wants us to skip this flow */ + flow_err(flow, "Dropping as requested by source"); + goto fail; + } + + /* Endianness fix-ups */ + t.seq_snd = ntohl(t.seq_snd); + t.seq_rcv = ntohl(t.seq_rcv); + t.sndq = ntohl(t.sndq); + t.notsent = ntohl(t.notsent); + t.rcvq = ntohl(t.rcvq); + + t.snd_wl1 = ntohl(t.snd_wl1); + t.snd_wnd = ntohl(t.snd_wnd); + t.max_window = ntohl(t.max_window); + t.rcv_wnd = ntohl(t.rcv_wnd); + t.rcv_wup = ntohl(t.rcv_wup); + + debug("Extended migration data, socket %i sequences send %u receive %u", + s, t.seq_snd, t.seq_rcv); + debug(" pending queues: send %u not sent %u receive %u", + t.sndq, t.notsent, t.rcvq); + debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", + t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup); + debug(" SO_PEEK_OFF %s offset=%"PRIu32, + peek_offset_cap ? "enabled" : "disabled", peek_offset); + + if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq || + t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { + err("Bad queues socket %i, send: %u, not sent: %u, receive: %u", + s, t.sndq, t.notsent, t.rcvq); + return -EINVAL; + } + + if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) { + rc = -errno; + err_perror("Failed to read send queue data, socket %i", s); + return rc; + } + + if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) { + rc = -errno; + err_perror("Failed to read receive queue data, socket %i", s); + return rc; + } + + if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) + goto fail; + + if (tcp_flow_repair_seq(s, &t.seq_snd)) + goto fail; + + if (tcp_flow_select_queue(s, TCP_RECV_QUEUE)) + goto fail; + + if (tcp_flow_repair_seq(s, &t.seq_rcv)) + goto fail; + + if (tcp_flow_repair_connect(c, conn)) + goto fail; + + if (tcp_flow_repair_queue(s, t.rcvq, tcp_migrate_rcv_queue)) + goto fail; + + if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) + goto fail; + + if (tcp_flow_repair_queue(s, t.sndq - t.notsent, + tcp_migrate_snd_queue)) + goto fail; + + if (tcp_flow_repair_opt(s, &t)) + goto fail; + + /* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't + * send it out, because we already sent it for sure. + * + * Call shutdown(x, SHUT_WR) in repair mode, so that we move to + * FIN_WAIT_1 (tcp_shutdown()) without sending anything + * (goto in tcp_write_xmit()). + */ + if (t.tcpi_state == TCP_FIN_WAIT2) { + int v; + + v = TCP_SEND_QUEUE; + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v))) + debug_perror("Selecting repair queue, socket %i", s); + else + shutdown(s, SHUT_WR); + } + + if (tcp_flow_repair_wnd(s, &t)) + goto fail; + + tcp_flow_repair_off(c, conn); + repair_flush(c); + + if (t.notsent) { + if (tcp_flow_repair_queue(s, t.notsent, + tcp_migrate_snd_queue + + (t.sndq - t.notsent))) { + /* This sometimes seems to fail for unclear reasons. + * Don't fail the whole migration, just reset the flow + * and carry on to the next one. + */ + goto fail; + } + } + + /* If we sent a FIN but it wasn't acknowledged yet (TCP_FIN_WAIT1), send + * it out, because we don't know if we already sent it. + * + * Call shutdown(x, SHUT_WR) *not* in repair mode, which moves us to + * TCP_FIN_WAIT1. + */ + if (t.tcpi_state == TCP_FIN_WAIT1) + shutdown(s, SHUT_WR); + + if (tcp_set_peek_offset(conn->sock, peek_offset)) + goto fail; + + tcp_send_flag(c, conn, ACK); + tcp_data_from_sock(c, conn); + + if ((rc = tcp_epoll_ctl(c, conn))) { + debug("Failed to subscribe to epoll for migrated socket %i: %s", + conn->sock, strerror_(-rc)); + goto fail; + } + + return 0; + +fail: + tcp_flow_repair_off(c, conn); + repair_flush(c); + + conn->flags = 0; /* Not waiting for ACK, don't schedule timer */ + tcp_rst(c, conn); + + return 0; +} @@ -19,6 +19,7 @@ * @tap_mss: MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS * @sock: Socket descriptor number * @events: Connection events, implying connection states + * @listening_sock: Listening socket this socket was accept()ed from, or -1 * @timer: timerfd descriptor for timeout events * @flags: Connection flags representing internal attributes * @sndbuf: Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS @@ -68,6 +69,7 @@ struct tcp_tap_conn { #define CONN_STATE_BITS /* Setting these clears other flags */ \ (SOCK_ACCEPTED | TAP_SYN_RCVD | ESTABLISHED) + int listening_sock; int timer :FD_REF_BITS; @@ -97,6 +99,93 @@ struct tcp_tap_conn { }; /** + * struct tcp_tap_transfer - Migrated TCP data, flow table part, network order + * @pif: Interfaces for each side of the flow + * @side: Addresses and ports for each side of the flow + * @retrans: Number of retransmissions occurred due to ACK_TIMEOUT + * @ws_from_tap: Window scaling factor advertised from tap/guest + * @ws_to_tap: Window scaling factor advertised to tap/guest + * @events: Connection events, implying connection states + * @tap_mss: MSS advertised by tap/guest, rounded to 2 ^ TCP_MSS_BITS + * @sndbuf: Sending buffer in kernel, rounded to 2 ^ SNDBUF_BITS + * @flags: Connection flags representing internal attributes + * @seq_dup_ack_approx: Last duplicate ACK number sent to tap + * @wnd_from_tap: Last window size from tap, unscaled (as received) + * @wnd_to_tap: Sending window advertised to tap, unscaled (as sent) + * @seq_to_tap: Next sequence for packets to tap + * @seq_ack_from_tap: Last ACK number received from tap + * @seq_from_tap: Next sequence for packets from tap (not actually sent) + * @seq_ack_to_tap: Last ACK number sent to tap + * @seq_init_from_tap: Initial sequence number from tap +*/ +struct tcp_tap_transfer { + uint8_t pif[SIDES]; + struct flowside side[SIDES]; + + uint8_t retrans; + uint8_t ws_from_tap; + uint8_t ws_to_tap; + uint8_t events; + + uint32_t tap_mss; + + uint32_t sndbuf; + + uint8_t flags; + uint8_t seq_dup_ack_approx; + + uint16_t wnd_from_tap; + uint16_t wnd_to_tap; + + uint32_t seq_to_tap; + uint32_t seq_ack_from_tap; + uint32_t seq_from_tap; + uint32_t seq_ack_to_tap; + uint32_t seq_init_from_tap; +} __attribute__((packed, aligned(__alignof__(uint32_t)))); + +/** + * struct tcp_tap_transfer_ext - Migrated TCP data, outside flow, network order + * @seq_snd: Socket-side send sequence + * @seq_rcv: Socket-side receive sequence + * @sndq: Length of pending send queue (unacknowledged / not sent) + * @notsent: Part of pending send queue that wasn't sent out yet + * @rcvq: Length of pending receive queue + * @mss: Socket-side MSS clamp + * @snd_wl1: Next sequence used in window probe (next sequence - 1) + * @snd_wnd: Socket-side sending window + * @max_window: Window clamp + * @rcv_wnd: Socket-side receive window + * @rcv_wup: rcv_nxt on last window update sent + * @snd_ws: Window scaling factor, send + * @rcv_ws: Window scaling factor, receive + * @tcpi_state: Connection state in TCP_INFO style (enum, tcp_states.h) + * @tcpi_options: TCPI_OPT_* constants (timestamps, selective ACK) + */ +struct tcp_tap_transfer_ext { + uint32_t seq_snd; + uint32_t seq_rcv; + + uint32_t sndq; + uint32_t notsent; + uint32_t rcvq; + + uint32_t mss; + + /* We can't just use struct tcp_repair_window: we need network order */ + uint32_t snd_wl1; + uint32_t snd_wnd; + uint32_t max_window; + uint32_t rcv_wnd; + uint32_t rcv_wup; + + uint8_t snd_ws; + uint8_t rcv_ws; + uint8_t tcpi_state; + uint8_t tcpi_options; +} __attribute__((packed, aligned(__alignof__(uint32_t)))); + +/** * struct tcp_splice_conn - Descriptor for a spliced TCP connection * @f: Generic flow information * @s: File descriptor for sockets @@ -140,6 +229,20 @@ extern int init_sock_pool4 [TCP_SOCK_POOL_SIZE]; extern int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; bool tcp_flow_defer(const struct tcp_tap_conn *conn); + +int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn); +int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn); + +int tcp_flow_migrate_shrink_window(int fidx, const struct tcp_tap_conn *conn); +int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn); +int tcp_flow_migrate_source_ext(int fd, int fidx, + const struct tcp_tap_conn *conn); + +int tcp_flow_migrate_target(struct ctx *c, int fd); +int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd); + +bool tcp_flow_is_established(const struct tcp_tap_conn *conn); + bool tcp_splice_flow_defer(struct tcp_splice_conn *conn); void tcp_splice_timer(const struct ctx *c, struct tcp_splice_conn *conn); int tcp_conn_pool_sock(int pool[]); |