diff options
Diffstat (limited to 'tcp.c')
-rw-r--r-- | tcp.c | 919 |
1 files changed, 919 insertions, 0 deletions
@@ -280,6 +280,7 @@ #include <stddef.h> #include <string.h> #include <sys/epoll.h> +#include <sys/ioctl.h> #include <sys/socket.h> #include <sys/timerfd.h> #include <sys/types.h> @@ -287,6 +288,8 @@ #include <time.h> #include <arpa/inet.h> +#include <linux/sockios.h> + #include "checksum.h" #include "util.h" #include "iov.h" @@ -299,6 +302,7 @@ #include "log.h" #include "inany.h" #include "flow.h" +#include "repair.h" #include "linux_dep.h" #include "flow_table.h" @@ -306,6 +310,21 @@ #include "tcp_buf.h" #include "tcp_vu.h" +#ifndef __USE_MISC +/* From Linux UAPI, missing in netinet/tcp.h provided by musl */ +struct tcp_repair_opt { + __u32 opt_code; + __u32 opt_val; +}; + +enum { + TCP_NO_QUEUE, + TCP_RECV_QUEUE, + TCP_SEND_QUEUE, + TCP_QUEUES_NR, +}; +#endif + /* MSS rounding: see SET_MSS() */ #define MSS_DEFAULT 536 #define WINDOW_DEFAULT 14600 /* RFC 6928 */ @@ -326,6 +345,19 @@ ((conn)->events & (SOCK_FIN_RCVD | TAP_FIN_RCVD))) #define CONN_HAS(conn, set) (((conn)->events & (set)) == (set)) +/* Buffers to migrate pending data from send and receive queues. No, they don't + * use memory if we don't use them. And we're going away after this, so splurge. + */ +#define TCP_MIGRATE_SND_QUEUE_MAX (64 << 20) +#define TCP_MIGRATE_RCV_QUEUE_MAX (64 << 20) +uint8_t tcp_migrate_snd_queue [TCP_MIGRATE_SND_QUEUE_MAX]; +uint8_t tcp_migrate_rcv_queue [TCP_MIGRATE_RCV_QUEUE_MAX]; + +#define TCP_MIGRATE_RESTORE_CHUNK_MIN 1024 /* Try smaller when above this */ + +/* "Extended" data (not stored in the flow table) for TCP flow migration */ +static struct tcp_tap_transfer_ext migrate_ext[FLOW_MAX]; + static const char *tcp_event_str[] __attribute((__unused__)) = { "SOCK_ACCEPTED", "TAP_SYN_RCVD", "ESTABLISHED", "TAP_SYN_ACK_SENT", @@ -1468,6 +1500,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af, conn->sock = s; conn->timer = -1; + conn->listening_sock = -1; conn_event(c, conn, TAP_SYN_RCVD); conn->wnd_to_tap = WINDOW_DEFAULT; @@ -1968,10 +2001,27 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af, ack_due = 1; if ((conn->events & TAP_FIN_RCVD) && !(conn->events & SOCK_FIN_SENT)) { + socklen_t sl; + struct tcp_info tinfo; + shutdown(conn->sock, SHUT_WR); conn_event(c, conn, SOCK_FIN_SENT); tcp_send_flag(c, conn, ACK); ack_due = 0; + + /* If we received a FIN, but the socket is in TCP_ESTABLISHED + * state, it must be a migrated socket. The kernel saw the FIN + * on the source socket, but not on the target socket. + * + * Approximate the effect of that FIN: as we're sending a FIN + * out ourselves, the socket is now in a state equivalent to + * LAST_ACK. Now that we sent the FIN out, close it with a RST. + */ + sl = sizeof(tinfo); + getsockopt(conn->sock, SOL_TCP, TCP_INFO, &tinfo, &sl); + if (tinfo.tcpi_state == TCP_ESTABLISHED && + conn->events & SOCK_FIN_RCVD) + goto reset; } if (ack_due) @@ -2054,6 +2104,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow, void tcp_listen_handler(const struct ctx *c, union epoll_ref ref, const struct timespec *now) { + struct tcp_tap_conn *conn; union sockaddr_inany sa; socklen_t sl = sizeof(sa); struct flowside *ini; @@ -2069,6 +2120,9 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref, if (s < 0) goto cancel; + conn = (struct tcp_tap_conn *)flow; + conn->listening_sock = ref.fd; + tcp_sock_set_nodelay(s); /* FIXME: If useful: when the listening port has a specific bound @@ -2634,3 +2688,868 @@ void tcp_timer(struct ctx *c, const struct timespec *now) if (c->mode == MODE_PASTA) tcp_splice_refill(c); } + +/** + * tcp_flow_is_established() - Was the connection established? Includes closing + * @conn: Pointer to the TCP connection structure + * + * Return: true if the connection was established, false otherwise + */ +bool tcp_flow_is_established(const struct tcp_tap_conn *conn) +{ + return conn->events & ESTABLISHED; +} + +/** + * tcp_flow_repair_on() - Enable repair mode for a single TCP flow + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_ON))) + err("Failed to set TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_repair_off() - Clear repair mode for a single TCP flow + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn) +{ + int rc = 0; + + if ((rc = repair_set(c, conn->sock, TCP_REPAIR_OFF))) + err("Failed to clear TCP_REPAIR"); + + return rc; +} + +/** + * tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t) +{ + struct tcp_info tinfo; + socklen_t sl; + + sl = sizeof(tinfo); + if (getsockopt(s, SOL_TCP, TCP_INFO, &tinfo, &sl)) { + int rc = -errno; + err_perror("Querying TCP_INFO, socket %i", s); + return rc; + } + + t->snd_ws = tinfo.tcpi_snd_wscale; + t->rcv_ws = tinfo.tcpi_rcv_wscale; + t->tcpi_state = tinfo.tcpi_state; + t->tcpi_options = tinfo.tcpi_options; + + return 0; +} + +/** + * tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t) +{ + socklen_t sl = sizeof(t->mss); + + if (getsockopt(s, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) { + int rc = -errno; + err_perror("Getting MSS, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t) +{ + struct tcp_repair_window wnd; + socklen_t sl = sizeof(wnd); + + if (getsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) { + int rc = -errno; + err_perror("Getting window repair data, socket %i", s); + return rc; + } + + t->snd_wl1 = wnd.snd_wl1; + t->snd_wnd = wnd.snd_wnd; + t->max_window = wnd.max_window; + t->rcv_wnd = wnd.rcv_wnd; + t->rcv_wup = wnd.rcv_wup; + + /* If we received a FIN, we also need to adjust window parameters. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK) { + t->rcv_wup--; + t->rcv_wnd++; + } + + return 0; +} + +/** + * tcp_flow_repair_wnd() - Restore window parameters from extended data + * @c: Execution context + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t) +{ + struct tcp_repair_window wnd; + + wnd.snd_wl1 = t->snd_wl1; + wnd.snd_wnd = t->snd_wnd; + wnd.max_window = t->max_window; + wnd.rcv_wnd = t->rcv_wnd; + wnd.rcv_wup = t->rcv_wup; + + if (setsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, sizeof(wnd))) { + int rc = -errno; + err_perror("Setting window data, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_select_queue() - Select queue (receive or send) for next operation + * @s: Socket + * @queue: TCP_RECV_QUEUE or TCP_SEND_QUEUE + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_select_queue(int s, int queue) +{ + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &queue, sizeof(queue))) { + int rc = -errno; + err_perror("Selecting TCP_SEND_QUEUE, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + * + * #syscalls:vu ioctl + */ +static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t) +{ + ssize_t rc; + + if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) { + rc = -errno; + err_perror("Getting send queue size, socket %i", s); + return rc; + } + + if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) { + rc = -errno; + err_perror("Getting not sent count, socket %i", s); + return rc; + } + + /* If we sent a FIN, SIOCOUTQ and SIOCOUTQNSD are one greater than the + * actual pending queue length, because they are based on the sequence + * numbers, not directly on the buffer contents. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->tcpi_state == TCP_FIN_WAIT1 || t->tcpi_state == TCP_FIN_WAIT2 || + t->tcpi_state == TCP_LAST_ACK || t->tcpi_state == TCP_CLOSING) { + if (t->sndq) + t->sndq--; + if (t->notsent) + t->notsent--; + } + + if (t->notsent > t->sndq) { + err("Invalid notsent count socket %i, send: %u, not sent: %u", + s, t->sndq, t->notsent); + return -EINVAL; + } + + if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) { + err("Send queue too large to migrate socket %i: %u bytes", + s, t->sndq); + return -ENOBUFS; + } + + rc = recv(s, tcp_migrate_snd_queue, + MIN(t->sndq, TCP_MIGRATE_SND_QUEUE_MAX), MSG_PEEK); + if (rc < 0) { + if (errno == EAGAIN) { /* EAGAIN means empty */ + rc = 0; + } else { + rc = -errno; + err_perror("Can't read send queue, socket %i", s); + return rc; + } + } + + if ((uint32_t)rc < t->sndq) { + err("Short read migrating send queue"); + return -ENXIO; + } + + t->notsent = MIN(t->notsent, t->sndq); + + return 0; +} + +/** + * tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue + * @s: Socket + * @len: Length of data to be restored + * @buf: Buffer with content of pending data queue + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf) +{ + size_t chunk = len; + uint8_t *p = buf; + + while (len > 0) { + ssize_t rc = send(s, p, MIN(len, chunk), 0); + + if (rc < 0) { + if ((errno == ENOBUFS || errno == ENOMEM) && + chunk >= TCP_MIGRATE_RESTORE_CHUNK_MIN) { + chunk /= 2; + continue; + } + + rc = -errno; + err_perror("Can't write queue, socket %i", s); + return rc; + } + + len -= rc; + p += rc; + } + + return 0; +} + +/** + * tcp_flow_dump_seq() - Dump current sequence of pre-selected queue + * @s: Socket + * @v: Sequence value, set on return + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_dump_seq(int s, uint32_t *v) +{ + socklen_t sl = sizeof(*v); + + if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) { + int rc = -errno; + err_perror("Dumping sequence, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_repair_seq() - Restore sequence for pre-selected queue + * @s: Socket + * @v: Sequence value to be set + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_seq(int s, const uint32_t *v) +{ + if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) { + int rc = -errno; + err_perror("Setting sequence, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + * + * #syscalls:vu ioctl + */ +static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t) +{ + ssize_t rc; + + if (ioctl(s, SIOCINQ, &t->rcvq) < 0) { + rc = -errno; + err_perror("Get receive queue size, socket %i", s); + return rc; + } + + /* If we received a FIN, SIOCINQ is one greater than the actual number + * of bytes on the queue, because it's based on the sequence number + * rather than directly on the buffer contents. + * + * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state. + */ + if (t->rcvq && + (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK)) + t->rcvq--; + + if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { + err("Receive queue too large to migrate socket %i: %u bytes", + s, t->rcvq); + return -ENOBUFS; + } + + rc = recv(s, tcp_migrate_rcv_queue, t->rcvq, MSG_PEEK); + if (rc < 0) { + if (errno == EAGAIN) { /* EAGAIN means empty */ + rc = 0; + } else { + rc = -errno; + err_perror("Can't read receive queue for socket %i", s); + return rc; + } + } + + if ((uint32_t)rc < t->rcvq) { + err("Short read migrating receive queue"); + return -ENXIO; + } + + return 0; +} + +/** + * tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps) + * @s: Socket + * @t: Extended migration data + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t) +{ + const struct tcp_repair_opt opts[] = { + { TCPOPT_WINDOW, t->snd_ws + (t->rcv_ws << 16) }, + { TCPOPT_MAXSEG, t->mss }, + { TCPOPT_SACK_PERMITTED, 0 }, + { TCPOPT_TIMESTAMP, 0 }, + }; + socklen_t sl; + + sl = sizeof(opts[0]) * (2 + + !!(t->tcpi_options & TCPI_OPT_SACK) + + !!(t->tcpi_options & TCPI_OPT_TIMESTAMPS)); + + if (setsockopt(s, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) { + int rc = -errno; + err_perror("Setting repair options, socket %i", s); + return rc; + } + + return 0; +} + +/** + * tcp_flow_migrate_source() - Send data (flow table) for flow, close listening + * @fd: Descriptor for state migration + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn) +{ + struct tcp_tap_transfer t = { + .retrans = conn->retrans, + .ws_from_tap = conn->ws_from_tap, + .ws_to_tap = conn->ws_to_tap, + .events = conn->events, + + .tap_mss = htonl(MSS_GET(conn)), + + .sndbuf = htonl(conn->sndbuf), + + .flags = conn->flags, + .seq_dup_ack_approx = conn->seq_dup_ack_approx, + + .wnd_from_tap = htons(conn->wnd_from_tap), + .wnd_to_tap = htons(conn->wnd_to_tap), + + .seq_to_tap = htonl(conn->seq_to_tap), + .seq_ack_from_tap = htonl(conn->seq_ack_from_tap), + .seq_from_tap = htonl(conn->seq_from_tap), + .seq_ack_to_tap = htonl(conn->seq_ack_to_tap), + .seq_init_from_tap = htonl(conn->seq_init_from_tap), + }; + + memcpy(&t.pif, conn->f.pif, sizeof(t.pif)); + memcpy(&t.side, conn->f.side, sizeof(t.side)); + + if (write_all_buf(fd, &t, sizeof(t))) { + int rc = -errno; + err_perror("Can't write migration data, socket %i", conn->sock); + return rc; + } + + if (conn->listening_sock != -1 && !fcntl(conn->listening_sock, F_GETFD)) + close(conn->listening_sock); + + return 0; +} + +/** + * tcp_flow_migrate_source_ext() - Dump queues, close sockets, send final data + * @fd: Descriptor for state migration + * @fidx: Flow index + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative (not -EIO) on failure, -EIO on sending failure + */ +int tcp_flow_migrate_source_ext(int fd, int fidx, + const struct tcp_tap_conn *conn) +{ + uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; + struct tcp_tap_transfer_ext *t = &migrate_ext[fidx]; + int s = conn->sock; + int rc; + + /* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode + * weird. + */ + if (tcp_set_peek_offset(s, -1)) { + rc = -errno; + goto fail; + } + + if ((rc = tcp_flow_dump_tinfo(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_mss(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_wnd(s, t))) + goto fail; + + if ((rc = tcp_flow_select_queue(s, TCP_SEND_QUEUE))) + goto fail; + + if ((rc = tcp_flow_dump_sndqueue(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_seq(s, &t->seq_snd))) + goto fail; + + if ((rc = tcp_flow_select_queue(s, TCP_RECV_QUEUE))) + goto fail; + + if ((rc = tcp_flow_dump_rcvqueue(s, t))) + goto fail; + + if ((rc = tcp_flow_dump_seq(s, &t->seq_rcv))) + goto fail; + + close(s); + + /* Adjustments unrelated to FIN segments: sequence numbers we dumped are + * based on the end of the queues. + */ + t->seq_rcv -= t->rcvq; + t->seq_snd -= t->sndq; + + debug("Extended migration data, socket %i sequences send %u receive %u", + s, t->seq_snd, t->seq_rcv); + debug(" pending queues: send %u not sent %u receive %u", + t->sndq, t->notsent, t->rcvq); + debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", + t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup); + debug(" SO_PEEK_OFF %s offset=%"PRIu32, + peek_offset_cap ? "enabled" : "disabled", peek_offset); + + /* Endianness fix-ups */ + t->seq_snd = htonl(t->seq_snd); + t->seq_rcv = htonl(t->seq_rcv); + t->sndq = htonl(t->sndq); + t->notsent = htonl(t->notsent); + t->rcvq = htonl(t->rcvq); + + t->snd_wl1 = htonl(t->snd_wl1); + t->snd_wnd = htonl(t->snd_wnd); + t->max_window = htonl(t->max_window); + t->rcv_wnd = htonl(t->rcv_wnd); + t->rcv_wup = htonl(t->rcv_wup); + + if (write_all_buf(fd, t, sizeof(*t))) { + err_perror("Failed to write extended data, socket %i", s); + return -EIO; + } + + if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) { + err_perror("Failed to write send queue data, socket %i", s); + return -EIO; + } + + if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) { + err_perror("Failed to write receive queue data, socket %i", s); + return -EIO; + } + + return 0; + +fail: + /* For any type of failure dumping data, write an invalid extended data + * descriptor that allows us to keep the stream in sync, but tells the + * target to skip the flow. If we fail to transfer data, that's fatal: + * return -EIO in that case (and only in that case). + */ + t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */ + + if (write_all_buf(fd, t, sizeof(*t))) { + err_perror("Failed to write extended data, socket %i", s); + return -EIO; + } + + if (rc == -EIO) /* but not a migration data transfer failure */ + return -ENODATA; + + return rc; +} + +/** + * tcp_flow_repair_socket() - Open and bind socket, request repair mode + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn) +{ + sa_family_t af = CONN_V4(conn) ? AF_INET : AF_INET6; + const struct flowside *sockside = HOSTFLOW(conn); + union sockaddr_inany a; + socklen_t sl; + int s, rc; + + pif_sockaddr(c, &a, &sl, PIF_HOST, &sockside->oaddr, sockside->oport); + + if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, + IPPROTO_TCP)) < 0) { + rc = -errno; + err_perror("Failed to create socket for migrated flow"); + return rc; + } + s = conn->sock; + + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int))) + debug_perror("Setting SO_REUSEADDR on socket %i", s); + + tcp_sock_set_nodelay(s); + + if ((rc = bind(s, &a.sa, sizeof(a)))) { + err_perror("Failed to bind socket for migrated flow"); + goto err; + } + + if ((rc = tcp_flow_repair_on(c, conn))) + goto err; + + return 0; + +err: + close(s); + conn->sock = -1; + return rc; +} + +/** + * tcp_flow_repair_connect() - Connect socket in repair mode, then turn it off + * @c: Execution context + * @conn: Pointer to the TCP connection structure + * + * Return: 0 on success, negative error code on failure + */ +static int tcp_flow_repair_connect(const struct ctx *c, + struct tcp_tap_conn *conn) +{ + const struct flowside *tgt = HOSTFLOW(conn); + int rc; + + rc = flowside_connect(c, conn->sock, PIF_HOST, tgt); + if (rc) { + rc = -errno; + err_perror("Failed to connect migrated socket %i", conn->sock); + return rc; + } + + conn->in_epoll = 0; + conn->timer = -1; + conn->listening_sock = -1; + + return 0; +} + +/** + * tcp_flow_migrate_target() - Receive data (flow table part) for flow, insert + * @c: Execution context + * @fd: Descriptor for state migration + * + * Return: 0 on success, negative on fatal failure, but 0 on single flow failure + */ +int tcp_flow_migrate_target(struct ctx *c, int fd) +{ + struct tcp_tap_transfer t; + struct tcp_tap_conn *conn; + union flow *flow; + int rc; + + if (!(flow = flow_alloc())) { + err("Flow table full on migration target"); + return 0; + } + + if (read_all_buf(fd, &t, sizeof(t))) { + flow_alloc_cancel(flow); + err_perror("Failed to receive migration data"); + return -errno; + } + + flow->f.state = FLOW_STATE_TGT; + memcpy(&flow->f.pif, &t.pif, sizeof(flow->f.pif)); + memcpy(&flow->f.side, &t.side, sizeof(flow->f.side)); + conn = FLOW_SET_TYPE(flow, FLOW_TCP, tcp); + + conn->retrans = t.retrans; + conn->ws_from_tap = t.ws_from_tap; + conn->ws_to_tap = t.ws_to_tap; + conn->events = t.events; + + conn->sndbuf = htonl(t.sndbuf); + + conn->flags = t.flags; + conn->seq_dup_ack_approx = t.seq_dup_ack_approx; + + MSS_SET(conn, ntohl(t.tap_mss)); + + conn->wnd_from_tap = ntohs(t.wnd_from_tap); + conn->wnd_to_tap = ntohs(t.wnd_to_tap); + + conn->seq_to_tap = ntohl(t.seq_to_tap); + conn->seq_ack_from_tap = ntohl(t.seq_ack_from_tap); + conn->seq_from_tap = ntohl(t.seq_from_tap); + conn->seq_ack_to_tap = ntohl(t.seq_ack_to_tap); + conn->seq_init_from_tap = ntohl(t.seq_init_from_tap); + + if ((rc = tcp_flow_repair_socket(c, conn))) { + flow_err(flow, "Can't set up socket: %s, drop", strerror_(rc)); + flow_alloc_cancel(flow); + return 0; + } + + flow_hash_insert(c, TAP_SIDX(conn)); + FLOW_ACTIVATE(conn); + + return 0; +} + +/** + * tcp_flow_migrate_target_ext() - Receive extended data for flow, set, connect + * @c: Execution context + * @flow: Existing flow for this connection data + * @fd: Descriptor for state migration + * + * Return: 0 on success, negative on fatal failure, but 0 on single flow failure + */ +int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd) +{ + struct tcp_tap_conn *conn = &flow->tcp; + uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap; + struct tcp_tap_transfer_ext t; + int s = conn->sock, rc; + + if (read_all_buf(fd, &t, sizeof(t))) { + rc = -errno; + err_perror("Failed to read extended data for socket %i", s); + return rc; + } + + if (!t.tcpi_state) { /* Source wants us to skip this flow */ + flow_err(flow, "Dropping as requested by source"); + goto fail; + } + + /* Endianness fix-ups */ + t.seq_snd = ntohl(t.seq_snd); + t.seq_rcv = ntohl(t.seq_rcv); + t.sndq = ntohl(t.sndq); + t.notsent = ntohl(t.notsent); + t.rcvq = ntohl(t.rcvq); + + t.snd_wl1 = ntohl(t.snd_wl1); + t.snd_wnd = ntohl(t.snd_wnd); + t.max_window = ntohl(t.max_window); + t.rcv_wnd = ntohl(t.rcv_wnd); + t.rcv_wup = ntohl(t.rcv_wup); + + debug("Extended migration data, socket %i sequences send %u receive %u", + s, t.seq_snd, t.seq_rcv); + debug(" pending queues: send %u not sent %u receive %u", + t.sndq, t.notsent, t.rcvq); + debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u", + t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup); + debug(" SO_PEEK_OFF %s offset=%"PRIu32, + peek_offset_cap ? "enabled" : "disabled", peek_offset); + + if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq || + t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) { + err("Bad queues socket %i, send: %u, not sent: %u, receive: %u", + s, t.sndq, t.notsent, t.rcvq); + return -EINVAL; + } + + if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) { + rc = -errno; + err_perror("Failed to read send queue data, socket %i", s); + return rc; + } + + if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) { + rc = -errno; + err_perror("Failed to read receive queue data, socket %i", s); + return rc; + } + + if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) + goto fail; + + if (tcp_flow_repair_seq(s, &t.seq_snd)) + goto fail; + + if (tcp_flow_select_queue(s, TCP_RECV_QUEUE)) + goto fail; + + if (tcp_flow_repair_seq(s, &t.seq_rcv)) + goto fail; + + if (tcp_flow_repair_connect(c, conn)) + goto fail; + + if (tcp_flow_repair_queue(s, t.rcvq, tcp_migrate_rcv_queue)) + goto fail; + + if (tcp_flow_select_queue(s, TCP_SEND_QUEUE)) + goto fail; + + if (tcp_flow_repair_queue(s, t.sndq - t.notsent, + tcp_migrate_snd_queue)) + goto fail; + + if (tcp_flow_repair_opt(s, &t)) + goto fail; + + /* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't + * send it out, because we already sent it for sure. + * + * Call shutdown(x, SHUT_WR) in repair mode, so that we move to + * FIN_WAIT_1 (tcp_shutdown()) without sending anything + * (goto in tcp_write_xmit()). + */ + if (t.tcpi_state == TCP_FIN_WAIT2) { + int v; + + v = TCP_SEND_QUEUE; + if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v))) + debug_perror("Selecting repair queue, socket %i", s); + else + shutdown(s, SHUT_WR); + } + + if (tcp_flow_repair_wnd(s, &t)) + goto fail; + + tcp_flow_repair_off(c, conn); + repair_flush(c); + + if (t.notsent) { + if (tcp_flow_repair_queue(s, t.notsent, + tcp_migrate_snd_queue + + (t.sndq - t.notsent))) { + /* This sometimes seems to fail for unclear reasons. + * Don't fail the whole migration, just reset the flow + * and carry on to the next one. + */ + goto fail; + } + } + + /* If we sent a FIN but it wasn't acknowledged yet (TCP_FIN_WAIT1), send + * it out, because we don't know if we already sent it. + * + * Call shutdown(x, SHUT_WR) *not* in repair mode, which moves us to + * TCP_FIN_WAIT1. + */ + if (t.tcpi_state == TCP_FIN_WAIT1) + shutdown(s, SHUT_WR); + + if (tcp_set_peek_offset(conn->sock, peek_offset)) + goto fail; + + tcp_send_flag(c, conn, ACK); + tcp_data_from_sock(c, conn); + + if ((rc = tcp_epoll_ctl(c, conn))) { + debug("Failed to subscribe to epoll for migrated socket %i: %s", + conn->sock, strerror_(-rc)); + goto fail; + } + + return 0; + +fail: + tcp_flow_repair_off(c, conn); + repair_flush(c); + + conn->flags = 0; /* Not waiting for ACK, don't schedule timer */ + tcp_rst(c, conn); + + return 0; +} |