aboutgitcodebugslistschat
path: root/tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcp.c')
-rw-r--r--tcp.c919
1 files changed, 919 insertions, 0 deletions
diff --git a/tcp.c b/tcp.c
index b978b30..98e1c6a 100644
--- a/tcp.c
+++ b/tcp.c
@@ -280,6 +280,7 @@
#include <stddef.h>
#include <string.h>
#include <sys/epoll.h>
+#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <sys/types.h>
@@ -287,6 +288,8 @@
#include <time.h>
#include <arpa/inet.h>
+#include <linux/sockios.h>
+
#include "checksum.h"
#include "util.h"
#include "iov.h"
@@ -299,6 +302,7 @@
#include "log.h"
#include "inany.h"
#include "flow.h"
+#include "repair.h"
#include "linux_dep.h"
#include "flow_table.h"
@@ -306,6 +310,21 @@
#include "tcp_buf.h"
#include "tcp_vu.h"
+#ifndef __USE_MISC
+/* From Linux UAPI, missing in netinet/tcp.h provided by musl */
+struct tcp_repair_opt {
+ __u32 opt_code;
+ __u32 opt_val;
+};
+
+enum {
+ TCP_NO_QUEUE,
+ TCP_RECV_QUEUE,
+ TCP_SEND_QUEUE,
+ TCP_QUEUES_NR,
+};
+#endif
+
/* MSS rounding: see SET_MSS() */
#define MSS_DEFAULT 536
#define WINDOW_DEFAULT 14600 /* RFC 6928 */
@@ -326,6 +345,19 @@
((conn)->events & (SOCK_FIN_RCVD | TAP_FIN_RCVD)))
#define CONN_HAS(conn, set) (((conn)->events & (set)) == (set))
+/* Buffers to migrate pending data from send and receive queues. No, they don't
+ * use memory if we don't use them. And we're going away after this, so splurge.
+ */
+#define TCP_MIGRATE_SND_QUEUE_MAX (64 << 20)
+#define TCP_MIGRATE_RCV_QUEUE_MAX (64 << 20)
+uint8_t tcp_migrate_snd_queue [TCP_MIGRATE_SND_QUEUE_MAX];
+uint8_t tcp_migrate_rcv_queue [TCP_MIGRATE_RCV_QUEUE_MAX];
+
+#define TCP_MIGRATE_RESTORE_CHUNK_MIN 1024 /* Try smaller when above this */
+
+/* "Extended" data (not stored in the flow table) for TCP flow migration */
+static struct tcp_tap_transfer_ext migrate_ext[FLOW_MAX];
+
static const char *tcp_event_str[] __attribute((__unused__)) = {
"SOCK_ACCEPTED", "TAP_SYN_RCVD", "ESTABLISHED", "TAP_SYN_ACK_SENT",
@@ -1468,6 +1500,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
conn->sock = s;
conn->timer = -1;
+ conn->listening_sock = -1;
conn_event(c, conn, TAP_SYN_RCVD);
conn->wnd_to_tap = WINDOW_DEFAULT;
@@ -1968,10 +2001,27 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
ack_due = 1;
if ((conn->events & TAP_FIN_RCVD) && !(conn->events & SOCK_FIN_SENT)) {
+ socklen_t sl;
+ struct tcp_info tinfo;
+
shutdown(conn->sock, SHUT_WR);
conn_event(c, conn, SOCK_FIN_SENT);
tcp_send_flag(c, conn, ACK);
ack_due = 0;
+
+ /* If we received a FIN, but the socket is in TCP_ESTABLISHED
+ * state, it must be a migrated socket. The kernel saw the FIN
+ * on the source socket, but not on the target socket.
+ *
+ * Approximate the effect of that FIN: as we're sending a FIN
+ * out ourselves, the socket is now in a state equivalent to
+ * LAST_ACK. Now that we sent the FIN out, close it with a RST.
+ */
+ sl = sizeof(tinfo);
+ getsockopt(conn->sock, SOL_TCP, TCP_INFO, &tinfo, &sl);
+ if (tinfo.tcpi_state == TCP_ESTABLISHED &&
+ conn->events & SOCK_FIN_RCVD)
+ goto reset;
}
if (ack_due)
@@ -2054,6 +2104,7 @@ static void tcp_tap_conn_from_sock(const struct ctx *c, union flow *flow,
void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
const struct timespec *now)
{
+ struct tcp_tap_conn *conn;
union sockaddr_inany sa;
socklen_t sl = sizeof(sa);
struct flowside *ini;
@@ -2069,6 +2120,9 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
if (s < 0)
goto cancel;
+ conn = (struct tcp_tap_conn *)flow;
+ conn->listening_sock = ref.fd;
+
tcp_sock_set_nodelay(s);
/* FIXME: If useful: when the listening port has a specific bound
@@ -2634,3 +2688,868 @@ void tcp_timer(struct ctx *c, const struct timespec *now)
if (c->mode == MODE_PASTA)
tcp_splice_refill(c);
}
+
+/**
+ * tcp_flow_is_established() - Was the connection established? Includes closing
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: true if the connection was established, false otherwise
+ */
+bool tcp_flow_is_established(const struct tcp_tap_conn *conn)
+{
+ return conn->events & ESTABLISHED;
+}
+
+/**
+ * tcp_flow_repair_on() - Enable repair mode for a single TCP flow
+ * @c: Execution context
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_on(struct ctx *c, const struct tcp_tap_conn *conn)
+{
+ int rc = 0;
+
+ if ((rc = repair_set(c, conn->sock, TCP_REPAIR_ON)))
+ err("Failed to set TCP_REPAIR");
+
+ return rc;
+}
+
+/**
+ * tcp_flow_repair_off() - Clear repair mode for a single TCP flow
+ * @c: Execution context
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_off(struct ctx *c, const struct tcp_tap_conn *conn)
+{
+ int rc = 0;
+
+ if ((rc = repair_set(c, conn->sock, TCP_REPAIR_OFF)))
+ err("Failed to clear TCP_REPAIR");
+
+ return rc;
+}
+
+/**
+ * tcp_flow_dump_tinfo() - Dump window scale, tcpi_state, tcpi_options
+ * @c: Execution context
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_tinfo(int s, struct tcp_tap_transfer_ext *t)
+{
+ struct tcp_info tinfo;
+ socklen_t sl;
+
+ sl = sizeof(tinfo);
+ if (getsockopt(s, SOL_TCP, TCP_INFO, &tinfo, &sl)) {
+ int rc = -errno;
+ err_perror("Querying TCP_INFO, socket %i", s);
+ return rc;
+ }
+
+ t->snd_ws = tinfo.tcpi_snd_wscale;
+ t->rcv_ws = tinfo.tcpi_rcv_wscale;
+ t->tcpi_state = tinfo.tcpi_state;
+ t->tcpi_options = tinfo.tcpi_options;
+
+ return 0;
+}
+
+/**
+ * tcp_flow_dump_mss() - Dump MSS clamp (not current MSS) via TCP_MAXSEG
+ * @c: Execution context
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_mss(int s, struct tcp_tap_transfer_ext *t)
+{
+ socklen_t sl = sizeof(t->mss);
+
+ if (getsockopt(s, SOL_TCP, TCP_MAXSEG, &t->mss, &sl)) {
+ int rc = -errno;
+ err_perror("Getting MSS, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_dump_wnd() - Dump current tcp_repair_window parameters
+ * @c: Execution context
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_wnd(int s, struct tcp_tap_transfer_ext *t)
+{
+ struct tcp_repair_window wnd;
+ socklen_t sl = sizeof(wnd);
+
+ if (getsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, &sl)) {
+ int rc = -errno;
+ err_perror("Getting window repair data, socket %i", s);
+ return rc;
+ }
+
+ t->snd_wl1 = wnd.snd_wl1;
+ t->snd_wnd = wnd.snd_wnd;
+ t->max_window = wnd.max_window;
+ t->rcv_wnd = wnd.rcv_wnd;
+ t->rcv_wup = wnd.rcv_wup;
+
+ /* If we received a FIN, we also need to adjust window parameters.
+ *
+ * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+ */
+ if (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK) {
+ t->rcv_wup--;
+ t->rcv_wnd++;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_repair_wnd() - Restore window parameters from extended data
+ * @c: Execution context
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_wnd(int s, const struct tcp_tap_transfer_ext *t)
+{
+ struct tcp_repair_window wnd;
+
+ wnd.snd_wl1 = t->snd_wl1;
+ wnd.snd_wnd = t->snd_wnd;
+ wnd.max_window = t->max_window;
+ wnd.rcv_wnd = t->rcv_wnd;
+ wnd.rcv_wup = t->rcv_wup;
+
+ if (setsockopt(s, IPPROTO_TCP, TCP_REPAIR_WINDOW, &wnd, sizeof(wnd))) {
+ int rc = -errno;
+ err_perror("Setting window data, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_select_queue() - Select queue (receive or send) for next operation
+ * @s: Socket
+ * @queue: TCP_RECV_QUEUE or TCP_SEND_QUEUE
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_select_queue(int s, int queue)
+{
+ if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &queue, sizeof(queue))) {
+ int rc = -errno;
+ err_perror("Selecting TCP_SEND_QUEUE, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_dump_sndqueue() - Dump send queue, length of sent and not sent data
+ * @s: Socket
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ *
+ * #syscalls:vu ioctl
+ */
+static int tcp_flow_dump_sndqueue(int s, struct tcp_tap_transfer_ext *t)
+{
+ ssize_t rc;
+
+ if (ioctl(s, SIOCOUTQ, &t->sndq) < 0) {
+ rc = -errno;
+ err_perror("Getting send queue size, socket %i", s);
+ return rc;
+ }
+
+ if (ioctl(s, SIOCOUTQNSD, &t->notsent) < 0) {
+ rc = -errno;
+ err_perror("Getting not sent count, socket %i", s);
+ return rc;
+ }
+
+ /* If we sent a FIN, SIOCOUTQ and SIOCOUTQNSD are one greater than the
+ * actual pending queue length, because they are based on the sequence
+ * numbers, not directly on the buffer contents.
+ *
+ * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+ */
+ if (t->tcpi_state == TCP_FIN_WAIT1 || t->tcpi_state == TCP_FIN_WAIT2 ||
+ t->tcpi_state == TCP_LAST_ACK || t->tcpi_state == TCP_CLOSING) {
+ if (t->sndq)
+ t->sndq--;
+ if (t->notsent)
+ t->notsent--;
+ }
+
+ if (t->notsent > t->sndq) {
+ err("Invalid notsent count socket %i, send: %u, not sent: %u",
+ s, t->sndq, t->notsent);
+ return -EINVAL;
+ }
+
+ if (t->sndq > TCP_MIGRATE_SND_QUEUE_MAX) {
+ err("Send queue too large to migrate socket %i: %u bytes",
+ s, t->sndq);
+ return -ENOBUFS;
+ }
+
+ rc = recv(s, tcp_migrate_snd_queue,
+ MIN(t->sndq, TCP_MIGRATE_SND_QUEUE_MAX), MSG_PEEK);
+ if (rc < 0) {
+ if (errno == EAGAIN) { /* EAGAIN means empty */
+ rc = 0;
+ } else {
+ rc = -errno;
+ err_perror("Can't read send queue, socket %i", s);
+ return rc;
+ }
+ }
+
+ if ((uint32_t)rc < t->sndq) {
+ err("Short read migrating send queue");
+ return -ENXIO;
+ }
+
+ t->notsent = MIN(t->notsent, t->sndq);
+
+ return 0;
+}
+
+/**
+ * tcp_flow_repair_queue() - Restore contents of a given (pre-selected) queue
+ * @s: Socket
+ * @len: Length of data to be restored
+ * @buf: Buffer with content of pending data queue
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_queue(int s, size_t len, uint8_t *buf)
+{
+ size_t chunk = len;
+ uint8_t *p = buf;
+
+ while (len > 0) {
+ ssize_t rc = send(s, p, MIN(len, chunk), 0);
+
+ if (rc < 0) {
+ if ((errno == ENOBUFS || errno == ENOMEM) &&
+ chunk >= TCP_MIGRATE_RESTORE_CHUNK_MIN) {
+ chunk /= 2;
+ continue;
+ }
+
+ rc = -errno;
+ err_perror("Can't write queue, socket %i", s);
+ return rc;
+ }
+
+ len -= rc;
+ p += rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_dump_seq() - Dump current sequence of pre-selected queue
+ * @s: Socket
+ * @v: Sequence value, set on return
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_dump_seq(int s, uint32_t *v)
+{
+ socklen_t sl = sizeof(*v);
+
+ if (getsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, &sl)) {
+ int rc = -errno;
+ err_perror("Dumping sequence, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_repair_seq() - Restore sequence for pre-selected queue
+ * @s: Socket
+ * @v: Sequence value to be set
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_seq(int s, const uint32_t *v)
+{
+ if (setsockopt(s, SOL_TCP, TCP_QUEUE_SEQ, v, sizeof(*v))) {
+ int rc = -errno;
+ err_perror("Setting sequence, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_dump_rcvqueue() - Dump receive queue and its length, seal/block it
+ * @s: Socket
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ *
+ * #syscalls:vu ioctl
+ */
+static int tcp_flow_dump_rcvqueue(int s, struct tcp_tap_transfer_ext *t)
+{
+ ssize_t rc;
+
+ if (ioctl(s, SIOCINQ, &t->rcvq) < 0) {
+ rc = -errno;
+ err_perror("Get receive queue size, socket %i", s);
+ return rc;
+ }
+
+ /* If we received a FIN, SIOCINQ is one greater than the actual number
+ * of bytes on the queue, because it's based on the sequence number
+ * rather than directly on the buffer contents.
+ *
+ * This must be called after tcp_flow_dump_tinfo(), for t->tcpi_state.
+ */
+ if (t->rcvq &&
+ (t->tcpi_state == TCP_CLOSE_WAIT || t->tcpi_state == TCP_LAST_ACK))
+ t->rcvq--;
+
+ if (t->rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
+ err("Receive queue too large to migrate socket %i: %u bytes",
+ s, t->rcvq);
+ return -ENOBUFS;
+ }
+
+ rc = recv(s, tcp_migrate_rcv_queue, t->rcvq, MSG_PEEK);
+ if (rc < 0) {
+ if (errno == EAGAIN) { /* EAGAIN means empty */
+ rc = 0;
+ } else {
+ rc = -errno;
+ err_perror("Can't read receive queue for socket %i", s);
+ return rc;
+ }
+ }
+
+ if ((uint32_t)rc < t->rcvq) {
+ err("Short read migrating receive queue");
+ return -ENXIO;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_repair_opt() - Set repair "options" (MSS, scale, SACK, timestamps)
+ * @s: Socket
+ * @t: Extended migration data
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_opt(int s, const struct tcp_tap_transfer_ext *t)
+{
+ const struct tcp_repair_opt opts[] = {
+ { TCPOPT_WINDOW, t->snd_ws + (t->rcv_ws << 16) },
+ { TCPOPT_MAXSEG, t->mss },
+ { TCPOPT_SACK_PERMITTED, 0 },
+ { TCPOPT_TIMESTAMP, 0 },
+ };
+ socklen_t sl;
+
+ sl = sizeof(opts[0]) * (2 +
+ !!(t->tcpi_options & TCPI_OPT_SACK) +
+ !!(t->tcpi_options & TCPI_OPT_TIMESTAMPS));
+
+ if (setsockopt(s, SOL_TCP, TCP_REPAIR_OPTIONS, opts, sl)) {
+ int rc = -errno;
+ err_perror("Setting repair options, socket %i", s);
+ return rc;
+ }
+
+ return 0;
+}
+
+/**
+ * tcp_flow_migrate_source() - Send data (flow table) for flow, close listening
+ * @fd: Descriptor for state migration
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_migrate_source(int fd, struct tcp_tap_conn *conn)
+{
+ struct tcp_tap_transfer t = {
+ .retrans = conn->retrans,
+ .ws_from_tap = conn->ws_from_tap,
+ .ws_to_tap = conn->ws_to_tap,
+ .events = conn->events,
+
+ .tap_mss = htonl(MSS_GET(conn)),
+
+ .sndbuf = htonl(conn->sndbuf),
+
+ .flags = conn->flags,
+ .seq_dup_ack_approx = conn->seq_dup_ack_approx,
+
+ .wnd_from_tap = htons(conn->wnd_from_tap),
+ .wnd_to_tap = htons(conn->wnd_to_tap),
+
+ .seq_to_tap = htonl(conn->seq_to_tap),
+ .seq_ack_from_tap = htonl(conn->seq_ack_from_tap),
+ .seq_from_tap = htonl(conn->seq_from_tap),
+ .seq_ack_to_tap = htonl(conn->seq_ack_to_tap),
+ .seq_init_from_tap = htonl(conn->seq_init_from_tap),
+ };
+
+ memcpy(&t.pif, conn->f.pif, sizeof(t.pif));
+ memcpy(&t.side, conn->f.side, sizeof(t.side));
+
+ if (write_all_buf(fd, &t, sizeof(t))) {
+ int rc = -errno;
+ err_perror("Can't write migration data, socket %i", conn->sock);
+ return rc;
+ }
+
+ if (conn->listening_sock != -1 && !fcntl(conn->listening_sock, F_GETFD))
+ close(conn->listening_sock);
+
+ return 0;
+}
+
+/**
+ * tcp_flow_migrate_source_ext() - Dump queues, close sockets, send final data
+ * @fd: Descriptor for state migration
+ * @fidx: Flow index
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative (not -EIO) on failure, -EIO on sending failure
+ */
+int tcp_flow_migrate_source_ext(int fd, int fidx,
+ const struct tcp_tap_conn *conn)
+{
+ uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
+ struct tcp_tap_transfer_ext *t = &migrate_ext[fidx];
+ int s = conn->sock;
+ int rc;
+
+ /* Disable SO_PEEK_OFF, it will make accessing the queues in repair mode
+ * weird.
+ */
+ if (tcp_set_peek_offset(s, -1)) {
+ rc = -errno;
+ goto fail;
+ }
+
+ if ((rc = tcp_flow_dump_tinfo(s, t)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_mss(s, t)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_wnd(s, t)))
+ goto fail;
+
+ if ((rc = tcp_flow_select_queue(s, TCP_SEND_QUEUE)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_sndqueue(s, t)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_seq(s, &t->seq_snd)))
+ goto fail;
+
+ if ((rc = tcp_flow_select_queue(s, TCP_RECV_QUEUE)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_rcvqueue(s, t)))
+ goto fail;
+
+ if ((rc = tcp_flow_dump_seq(s, &t->seq_rcv)))
+ goto fail;
+
+ close(s);
+
+ /* Adjustments unrelated to FIN segments: sequence numbers we dumped are
+ * based on the end of the queues.
+ */
+ t->seq_rcv -= t->rcvq;
+ t->seq_snd -= t->sndq;
+
+ debug("Extended migration data, socket %i sequences send %u receive %u",
+ s, t->seq_snd, t->seq_rcv);
+ debug(" pending queues: send %u not sent %u receive %u",
+ t->sndq, t->notsent, t->rcvq);
+ debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
+ t->snd_wl1, t->snd_wnd, t->max_window, t->rcv_wnd, t->rcv_wup);
+ debug(" SO_PEEK_OFF %s offset=%"PRIu32,
+ peek_offset_cap ? "enabled" : "disabled", peek_offset);
+
+ /* Endianness fix-ups */
+ t->seq_snd = htonl(t->seq_snd);
+ t->seq_rcv = htonl(t->seq_rcv);
+ t->sndq = htonl(t->sndq);
+ t->notsent = htonl(t->notsent);
+ t->rcvq = htonl(t->rcvq);
+
+ t->snd_wl1 = htonl(t->snd_wl1);
+ t->snd_wnd = htonl(t->snd_wnd);
+ t->max_window = htonl(t->max_window);
+ t->rcv_wnd = htonl(t->rcv_wnd);
+ t->rcv_wup = htonl(t->rcv_wup);
+
+ if (write_all_buf(fd, t, sizeof(*t))) {
+ err_perror("Failed to write extended data, socket %i", s);
+ return -EIO;
+ }
+
+ if (write_all_buf(fd, tcp_migrate_snd_queue, ntohl(t->sndq))) {
+ err_perror("Failed to write send queue data, socket %i", s);
+ return -EIO;
+ }
+
+ if (write_all_buf(fd, tcp_migrate_rcv_queue, ntohl(t->rcvq))) {
+ err_perror("Failed to write receive queue data, socket %i", s);
+ return -EIO;
+ }
+
+ return 0;
+
+fail:
+ /* For any type of failure dumping data, write an invalid extended data
+ * descriptor that allows us to keep the stream in sync, but tells the
+ * target to skip the flow. If we fail to transfer data, that's fatal:
+ * return -EIO in that case (and only in that case).
+ */
+ t->tcpi_state = 0; /* Not defined: tell the target to skip this flow */
+
+ if (write_all_buf(fd, t, sizeof(*t))) {
+ err_perror("Failed to write extended data, socket %i", s);
+ return -EIO;
+ }
+
+ if (rc == -EIO) /* but not a migration data transfer failure */
+ return -ENODATA;
+
+ return rc;
+}
+
+/**
+ * tcp_flow_repair_socket() - Open and bind socket, request repair mode
+ * @c: Execution context
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+int tcp_flow_repair_socket(struct ctx *c, struct tcp_tap_conn *conn)
+{
+ sa_family_t af = CONN_V4(conn) ? AF_INET : AF_INET6;
+ const struct flowside *sockside = HOSTFLOW(conn);
+ union sockaddr_inany a;
+ socklen_t sl;
+ int s, rc;
+
+ pif_sockaddr(c, &a, &sl, PIF_HOST, &sockside->oaddr, sockside->oport);
+
+ if ((conn->sock = socket(af, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+ IPPROTO_TCP)) < 0) {
+ rc = -errno;
+ err_perror("Failed to create socket for migrated flow");
+ return rc;
+ }
+ s = conn->sock;
+
+ if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &(int){ 1 }, sizeof(int)))
+ debug_perror("Setting SO_REUSEADDR on socket %i", s);
+
+ tcp_sock_set_nodelay(s);
+
+ if ((rc = bind(s, &a.sa, sizeof(a)))) {
+ err_perror("Failed to bind socket for migrated flow");
+ goto err;
+ }
+
+ if ((rc = tcp_flow_repair_on(c, conn)))
+ goto err;
+
+ return 0;
+
+err:
+ close(s);
+ conn->sock = -1;
+ return rc;
+}
+
+/**
+ * tcp_flow_repair_connect() - Connect socket in repair mode, then turn it off
+ * @c: Execution context
+ * @conn: Pointer to the TCP connection structure
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int tcp_flow_repair_connect(const struct ctx *c,
+ struct tcp_tap_conn *conn)
+{
+ const struct flowside *tgt = HOSTFLOW(conn);
+ int rc;
+
+ rc = flowside_connect(c, conn->sock, PIF_HOST, tgt);
+ if (rc) {
+ rc = -errno;
+ err_perror("Failed to connect migrated socket %i", conn->sock);
+ return rc;
+ }
+
+ conn->in_epoll = 0;
+ conn->timer = -1;
+ conn->listening_sock = -1;
+
+ return 0;
+}
+
+/**
+ * tcp_flow_migrate_target() - Receive data (flow table part) for flow, insert
+ * @c: Execution context
+ * @fd: Descriptor for state migration
+ *
+ * Return: 0 on success, negative on fatal failure, but 0 on single flow failure
+ */
+int tcp_flow_migrate_target(struct ctx *c, int fd)
+{
+ struct tcp_tap_transfer t;
+ struct tcp_tap_conn *conn;
+ union flow *flow;
+ int rc;
+
+ if (!(flow = flow_alloc())) {
+ err("Flow table full on migration target");
+ return 0;
+ }
+
+ if (read_all_buf(fd, &t, sizeof(t))) {
+ flow_alloc_cancel(flow);
+ err_perror("Failed to receive migration data");
+ return -errno;
+ }
+
+ flow->f.state = FLOW_STATE_TGT;
+ memcpy(&flow->f.pif, &t.pif, sizeof(flow->f.pif));
+ memcpy(&flow->f.side, &t.side, sizeof(flow->f.side));
+ conn = FLOW_SET_TYPE(flow, FLOW_TCP, tcp);
+
+ conn->retrans = t.retrans;
+ conn->ws_from_tap = t.ws_from_tap;
+ conn->ws_to_tap = t.ws_to_tap;
+ conn->events = t.events;
+
+ conn->sndbuf = htonl(t.sndbuf);
+
+ conn->flags = t.flags;
+ conn->seq_dup_ack_approx = t.seq_dup_ack_approx;
+
+ MSS_SET(conn, ntohl(t.tap_mss));
+
+ conn->wnd_from_tap = ntohs(t.wnd_from_tap);
+ conn->wnd_to_tap = ntohs(t.wnd_to_tap);
+
+ conn->seq_to_tap = ntohl(t.seq_to_tap);
+ conn->seq_ack_from_tap = ntohl(t.seq_ack_from_tap);
+ conn->seq_from_tap = ntohl(t.seq_from_tap);
+ conn->seq_ack_to_tap = ntohl(t.seq_ack_to_tap);
+ conn->seq_init_from_tap = ntohl(t.seq_init_from_tap);
+
+ if ((rc = tcp_flow_repair_socket(c, conn))) {
+ flow_err(flow, "Can't set up socket: %s, drop", strerror_(rc));
+ flow_alloc_cancel(flow);
+ return 0;
+ }
+
+ flow_hash_insert(c, TAP_SIDX(conn));
+ FLOW_ACTIVATE(conn);
+
+ return 0;
+}
+
+/**
+ * tcp_flow_migrate_target_ext() - Receive extended data for flow, set, connect
+ * @c: Execution context
+ * @flow: Existing flow for this connection data
+ * @fd: Descriptor for state migration
+ *
+ * Return: 0 on success, negative on fatal failure, but 0 on single flow failure
+ */
+int tcp_flow_migrate_target_ext(struct ctx *c, union flow *flow, int fd)
+{
+ struct tcp_tap_conn *conn = &flow->tcp;
+ uint32_t peek_offset = conn->seq_to_tap - conn->seq_ack_from_tap;
+ struct tcp_tap_transfer_ext t;
+ int s = conn->sock, rc;
+
+ if (read_all_buf(fd, &t, sizeof(t))) {
+ rc = -errno;
+ err_perror("Failed to read extended data for socket %i", s);
+ return rc;
+ }
+
+ if (!t.tcpi_state) { /* Source wants us to skip this flow */
+ flow_err(flow, "Dropping as requested by source");
+ goto fail;
+ }
+
+ /* Endianness fix-ups */
+ t.seq_snd = ntohl(t.seq_snd);
+ t.seq_rcv = ntohl(t.seq_rcv);
+ t.sndq = ntohl(t.sndq);
+ t.notsent = ntohl(t.notsent);
+ t.rcvq = ntohl(t.rcvq);
+
+ t.snd_wl1 = ntohl(t.snd_wl1);
+ t.snd_wnd = ntohl(t.snd_wnd);
+ t.max_window = ntohl(t.max_window);
+ t.rcv_wnd = ntohl(t.rcv_wnd);
+ t.rcv_wup = ntohl(t.rcv_wup);
+
+ debug("Extended migration data, socket %i sequences send %u receive %u",
+ s, t.seq_snd, t.seq_rcv);
+ debug(" pending queues: send %u not sent %u receive %u",
+ t.sndq, t.notsent, t.rcvq);
+ debug(" window: snd_wl1 %u snd_wnd %u max %u rcv_wnd %u rcv_wup %u",
+ t.snd_wl1, t.snd_wnd, t.max_window, t.rcv_wnd, t.rcv_wup);
+ debug(" SO_PEEK_OFF %s offset=%"PRIu32,
+ peek_offset_cap ? "enabled" : "disabled", peek_offset);
+
+ if (t.sndq > TCP_MIGRATE_SND_QUEUE_MAX || t.notsent > t.sndq ||
+ t.rcvq > TCP_MIGRATE_RCV_QUEUE_MAX) {
+ err("Bad queues socket %i, send: %u, not sent: %u, receive: %u",
+ s, t.sndq, t.notsent, t.rcvq);
+ return -EINVAL;
+ }
+
+ if (read_all_buf(fd, tcp_migrate_snd_queue, t.sndq)) {
+ rc = -errno;
+ err_perror("Failed to read send queue data, socket %i", s);
+ return rc;
+ }
+
+ if (read_all_buf(fd, tcp_migrate_rcv_queue, t.rcvq)) {
+ rc = -errno;
+ err_perror("Failed to read receive queue data, socket %i", s);
+ return rc;
+ }
+
+ if (tcp_flow_select_queue(s, TCP_SEND_QUEUE))
+ goto fail;
+
+ if (tcp_flow_repair_seq(s, &t.seq_snd))
+ goto fail;
+
+ if (tcp_flow_select_queue(s, TCP_RECV_QUEUE))
+ goto fail;
+
+ if (tcp_flow_repair_seq(s, &t.seq_rcv))
+ goto fail;
+
+ if (tcp_flow_repair_connect(c, conn))
+ goto fail;
+
+ if (tcp_flow_repair_queue(s, t.rcvq, tcp_migrate_rcv_queue))
+ goto fail;
+
+ if (tcp_flow_select_queue(s, TCP_SEND_QUEUE))
+ goto fail;
+
+ if (tcp_flow_repair_queue(s, t.sndq - t.notsent,
+ tcp_migrate_snd_queue))
+ goto fail;
+
+ if (tcp_flow_repair_opt(s, &t))
+ goto fail;
+
+ /* If we sent a FIN sent and it was acknowledged (TCP_FIN_WAIT2), don't
+ * send it out, because we already sent it for sure.
+ *
+ * Call shutdown(x, SHUT_WR) in repair mode, so that we move to
+ * FIN_WAIT_1 (tcp_shutdown()) without sending anything
+ * (goto in tcp_write_xmit()).
+ */
+ if (t.tcpi_state == TCP_FIN_WAIT2) {
+ int v;
+
+ v = TCP_SEND_QUEUE;
+ if (setsockopt(s, SOL_TCP, TCP_REPAIR_QUEUE, &v, sizeof(v)))
+ debug_perror("Selecting repair queue, socket %i", s);
+ else
+ shutdown(s, SHUT_WR);
+ }
+
+ if (tcp_flow_repair_wnd(s, &t))
+ goto fail;
+
+ tcp_flow_repair_off(c, conn);
+ repair_flush(c);
+
+ if (t.notsent) {
+ if (tcp_flow_repair_queue(s, t.notsent,
+ tcp_migrate_snd_queue +
+ (t.sndq - t.notsent))) {
+ /* This sometimes seems to fail for unclear reasons.
+ * Don't fail the whole migration, just reset the flow
+ * and carry on to the next one.
+ */
+ goto fail;
+ }
+ }
+
+ /* If we sent a FIN but it wasn't acknowledged yet (TCP_FIN_WAIT1), send
+ * it out, because we don't know if we already sent it.
+ *
+ * Call shutdown(x, SHUT_WR) *not* in repair mode, which moves us to
+ * TCP_FIN_WAIT1.
+ */
+ if (t.tcpi_state == TCP_FIN_WAIT1)
+ shutdown(s, SHUT_WR);
+
+ if (tcp_set_peek_offset(conn->sock, peek_offset))
+ goto fail;
+
+ tcp_send_flag(c, conn, ACK);
+ tcp_data_from_sock(c, conn);
+
+ if ((rc = tcp_epoll_ctl(c, conn))) {
+ debug("Failed to subscribe to epoll for migrated socket %i: %s",
+ conn->sock, strerror_(-rc));
+ goto fail;
+ }
+
+ return 0;
+
+fail:
+ tcp_flow_repair_off(c, conn);
+ repair_flush(c);
+
+ conn->flags = 0; /* Not waiting for ACK, don't schedule timer */
+ tcp_rst(c, conn);
+
+ return 0;
+}