From 904b86ade7dba15f74cebde7d351920a76a82d2a Mon Sep 17 00:00:00 2001 From: Stefano Brivio Date: Sun, 19 Sep 2021 02:29:05 +0200 Subject: tcp: Rework window handling, timers, add SO_RCVLOWAT and pools for sockets/pipes This introduces a number of fundamental changes that would be quite messy to split. Summary: - advertised window scaling can be as big as we want, we just need to clamp window sizes to avoid exceeding the size of our "discard" buffer for unacknowledged data from socket - add macros to compare sequence numbers - force sending ACK to guest/tap on PSH segments, always in pasta mode, whenever we see an overlapping segment, or when we reach a given threshold compared to our window - we don't actually use recvmmsg() here, fix comments and label - introduce pools for pre-opened sockets and pipes, to decrease latency on new connections - set receiving and sending buffer sizes to the maximum allowed, kernel will clamp and round appropriately - defer clean-up of spliced and non-spliced connection to timer - in tcp_send_to_tap(), there's no need anymore to keep a large buffer, shrink it down to what we actually need - introduce SO_RCVLOWAT setting and activity tracking for spliced connections, to coalesce data moved by splice() calls as much as possible - as we now have a compacted connection table, there's no need to keep sparse bitmaps tracking connection activity -- simply go through active connections with a loop in the timer handler - always clamp the advertised window to half our sending buffer, too, to minimise retransmissions from the guest/tap - set TCP_QUICKACK for originating socket in spliced connections, there's no need to delay them - fix up timeout for unacknowledged data from socket Signed-off-by: Stefano Brivio --- tcp.c | 1175 ++++++++++++++++++++++++++++++++++++++++------------------------- tcp.h | 10 +- 2 files changed, 727 insertions(+), 458 deletions(-) diff --git a/tcp.c b/tcp.c index 91b82ee..830bf4a 100644 --- a/tcp.c +++ b/tcp.c @@ -66,20 +66,15 @@ * ------ * * To avoid the need for dynamic memory allocation, a maximum, reasonable amount - * of connections is defined by MAX_TAP_CONNS below (currently 1M, close to - * the maximum amount of file descriptors typically available to a process on - * Linux). - * - * While fragmentation and reassembly are not implemented, tracking of missing - * segments and retransmissions needs to be, thus data needs to linger on - * sockets as long as it's not acknowledged by the guest, and read using - * MSG_PEEK into a single, preallocated static buffer sized to the maximum - * supported window, 16MiB. This imposes a practical limitation on window - * scaling, that is, the maximum factor is 512. If a bigger window scaling - * factor is observed during connection establishment, connection is reset and - * reestablished by omitting the scaling factor in the SYN segment. This - * limitation only applies to the window scaling advertised by the guest, but - * if exceeded, no window scaling will be allowed at all toward either endpoint. + * of connections is defined by MAX_TAP_CONNS below (currently 128k). + * + * Data needs to linger on sockets as long as it's not acknowledged by the + * guest, and is read using MSG_PEEK into preallocated static buffers sized + * to the maximum supported window, 64MiB ("discard" buffer, for already-sent + * data) plus a number of maximum-MSS-sized buffers. This imposes a practical + * limitation on window scaling, that is, the maximum factor is 1024. Larger + * factors will be accepted, but resulting, larger values are never advertised + * to the other side, and not used while queueing data. * * * Ports @@ -245,8 +240,7 @@ * * @seq_init_from_tap: initial sequence number from tap * - * @tap_window: last window size received from tap, scaled - * @tcpi_acked_last: most recent value of tcpi_bytes_acked (TCP_INFO) + * @wnd_from_tap: last window size received from tap, scaled * * - from socket to tap: * - on new data from socket: @@ -255,7 +249,7 @@ * - starting at offset (@seq_to_tap - @seq_ack_from_tap) * - in MSS-sized segments * - increasing @seq_to_tap at each segment - * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @tap_window) + * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @wnd_from_tap) * - mark socket in bitmap for periodic ACK check, set @last_ts_to_tap * - on read error, send RST to tap, close socket * - on zero read, send FIN to tap, enter ESTABLISHED_SOCK_FIN @@ -271,25 +265,20 @@ * - periodically: * - if @seq_ack_from_tap < @seq_to_tap and the retransmission timer * (TODO: implement requirements from RFC 6298, currently 3s fixed) from - * @ts_sock elapsed, reset @seq_to_tap to @seq_ack_from_tap, and + * @ts_tap_from_ack elapsed, reset @seq_to_tap to @seq_ack_from_tap, and * resend data with the steps listed above * * - from tap to socket: * - on packet from tap: - * - set @ts_tap + * - set @ts_tap_ack * - set TCP_WINDOW_CLAMP from TCP header from tap * - check seq from header against @seq_from_tap, if data is missing, send * two ACKs with number @seq_ack_to_tap, discard packet * - otherwise queue data to socket, set @seq_from_tap to seq from header * plus payload length - * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last, - * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap - * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and - * send ACK to tap - * - periodically: - * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last, - * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap - * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and + * - in ESTABLISHED state, send ACK to tap as soon as we queue to the + * socket. In other states, query socket for TCP_INFO, set + * @seq_ack_to_tap to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and * send ACK to tap * * @@ -351,13 +340,13 @@ #define TCP_TAP_FRAMES 8 -#define PIPE_SIZE (1024 * 1024) +#define MAX_PIPE_SIZE (2 * 1024 * 1024) #define TCP_HASH_TABLE_LOAD 70 /* % */ #define TCP_HASH_TABLE_SIZE (MAX_TAP_CONNS * 100 / \ TCP_HASH_TABLE_LOAD) -#define MAX_WS 9 +#define MAX_WS 10 #define MAX_WINDOW (1 << (16 + (MAX_WS))) #define MSS_DEFAULT 536 #define WINDOW_DEFAULT 14600 /* RFC 6928 */ @@ -369,12 +358,21 @@ #define FIN_TIMEOUT 240000 #define LAST_ACK_TIMEOUT 240000 +#define TCP_SOCK_POOL_SIZE 256 +#define TCP_SOCK_POOL_TSH 128 /* Refill in ns if > x used */ +#define TCP_SPLICE_PIPE_POOL_SIZE 256 +#define REFILL_INTERVAL 1000 /* We need to include for tcpi_bytes_acked, instead of * , but that doesn't include a definition for SOL_TCP */ #define SOL_TCP IPPROTO_TCP +#define SEQ_LE(a, b) ((b) - (a) < MAX_WINDOW) +#define SEQ_LT(a, b) ((b) - (a) - 1 < MAX_WINDOW) +#define SEQ_GE(a, b) ((a) - (b) < MAX_WINDOW) +#define SEQ_GT(a, b) ((a) - (b) - 1 < MAX_WINDOW) + enum tcp_state { CLOSED = 0, TAP_SYN_SENT, @@ -409,7 +407,9 @@ static char *tcp_state_str[TCP_STATE_STR_SIZE] __attribute((__unused__)) = { #define RST (1 << 2) #define ACK (1 << 4) /* Flags for internal usage */ -#define ZERO_WINDOW (1 << 5) +#define UPDATE_WINDOW (1 << 5) +#define DUP_ACK (1 << 6) +#define FORCE_ACK (1 << 7) #define OPT_EOL 0 #define OPT_NOP 1 @@ -439,18 +439,21 @@ struct tcp_tap_conn; * @seq_ack_from_tap: Last ACK number received from tap * @seq_from_tap: Next sequence for packets from tap (not actually sent) * @seq_ack_to_tap: Last ACK number sent to tap + * @seq_dup_ack: Last duplicate ACK number sent to tap * @seq_init_from_tap: Initial sequence number from tap - * @tcpi_acked_last: Most recent value of tcpi_bytes_acked (TCP_INFO query) - * @ws_allowed: Window scaling allowed + * @seq_init_from_tap: Initial sequence number to tap + * @ws_tap: Window scaling factor from tap * @ws: Window scaling factor - * @tap_window: Last window size received from tap, scaled + * @wnd_from_tap: Last window size received from tap, scaled + * @wnd_to_tap: Socket-side sending window, advertised to tap * @window_clamped: Window was clamped on socket at least once - * @no_snd_wnd: Kernel won't report window (without commit 8f7baad7f035) - * @tcpi_acked_last: Most recent value of tcpi_snd_wnd (TCP_INFO query) - * @ts_sock: Last activity timestamp from socket for timeout purposes - * @ts_tap: Last activity timestamp from tap for timeout purposes - * @ts_ack_tap: Last ACK segment timestamp from tap for timeout purposes + * @ts_sock_act: Last activity timestamp from socket for timeout purposes + * @ts_tap_act: Last activity timestamp from tap for timeout purposes + * @ts_ack_from_tap: Last ACK segment timestamp from tap + * @ts_ack_to_tap: Last ACK segment timestamp to tap + * @tap_data_noack: Last unacked data to tap, set to { 0, 0 } on ACK * @mss_guest: Maximum segment size advertised by guest + * @events: epoll events currently enabled for socket */ struct tcp_tap_conn { struct tcp_tap_conn *next; @@ -473,23 +476,24 @@ struct tcp_tap_conn { uint32_t seq_ack_from_tap; uint32_t seq_from_tap; uint32_t seq_ack_to_tap; + uint32_t seq_dup_ack; uint32_t seq_init_from_tap; uint32_t seq_init_to_tap; - uint64_t tcpi_acked_last; - int ws_allowed; - int ws; - uint32_t tap_window; + uint16_t ws_tap; + uint16_t ws; + uint32_t wnd_from_tap; + uint32_t wnd_to_tap; int window_clamped; - int no_snd_wnd; - uint32_t tcpi_snd_wnd; + int snd_buf; - struct timespec ts_sock; - struct timespec ts_tap; - struct timespec ts_ack_tap; + struct timespec ts_sock_act; + struct timespec ts_tap_act; + struct timespec ts_ack_from_tap; + struct timespec ts_ack_to_tap; + struct timespec tap_data_noack; int mss_guest; - uint32_t sndbuf; uint32_t events; }; @@ -626,7 +630,7 @@ static int tcp6_l2_buf_mss_nr_set; static int tcp6_l2_buf_mss_tap; static int tcp6_l2_buf_mss_tap_nr_set; -/* recvmmsg()/sendmmsg() data for tap */ +/* recvmsg()/sendmsg() data for tap */ static struct iovec tcp4_l2_iov_sock [TCP_TAP_FRAMES + 1]; static struct iovec tcp6_l2_iov_sock [TCP_TAP_FRAMES + 1]; static char tcp_buf_discard [MAX_WINDOW]; @@ -647,8 +651,9 @@ static struct mmsghdr tcp_l2_mh_tap [TCP_TAP_FRAMES] = { /* sendmsg() to socket */ static struct iovec tcp_tap_iov [UIO_MAXIOV]; -/* Bitmap, activity monitoring needed for connection via tap */ -static uint8_t tcp_act[MAX_TAP_CONNS / 8] = { 0 }; +/* SO_RCVLOWAT set on source ([0]) or destination ([1]) socket, and activity */ +static uint8_t splice_rcvlowat_set[MAX_SPLICE_CONNS / 8][2]; +static uint8_t splice_rcvlowat_act[MAX_SPLICE_CONNS / 8][2]; /* TCP connections */ static struct tcp_tap_conn tt[MAX_TAP_CONNS]; @@ -657,6 +662,13 @@ static struct tcp_splice_conn ts[MAX_SPLICE_CONNS]; /* Table for lookup from remote address, local port, remote port */ static struct tcp_tap_conn *tt_hash[TCP_HASH_TABLE_SIZE]; +/* Pools for pre-opened sockets and pipes */ +static int splice_pipe_pool [TCP_SPLICE_PIPE_POOL_SIZE][2][2]; +static int init_sock_pool4 [TCP_SOCK_POOL_SIZE]; +static int init_sock_pool6 [TCP_SOCK_POOL_SIZE]; +static int ns_sock_pool4 [TCP_SOCK_POOL_SIZE]; +static int ns_sock_pool6 [TCP_SOCK_POOL_SIZE]; + /** * tcp_tap_state() - Set given TCP state for tap connection, report to stderr * @conn: Connection pointer @@ -681,6 +693,21 @@ static void tcp_splice_state(struct tcp_splice_conn *conn, enum tcp_state state) conn->state = state; } +/** + * tcp_sock_set_bufsize() - Set SO_RCVBUF and SO_SNDBUF to maximum values + * @s: Socket, can be -1 to avoid check in the caller + */ +static void tcp_sock_set_bufsize(int s) +{ + int v = INT_MAX / 2; /* Kernel clamps and rounds, no need to check */ + + if (s == -1) + return; + + setsockopt(s, SOL_SOCKET, SO_RCVBUF, &v, sizeof(v)); + setsockopt(s, SOL_SOCKET, SO_SNDBUF, &v, sizeof(v)); +} + /** * tcp_update_check_ip4() - Update IPv4 with variable parts from stored one * @buf: L2 packet buffer with final IPv4 header @@ -1077,7 +1104,6 @@ static void tcp_table_tap_compact(struct ctx *c, struct tcp_tap_conn *hole) uint32_t events; if ((hole - tt) == --c->tcp.tap_conn_count) { - bitmap_clear(tcp_act, hole - tt); debug("TCP: hash table compaction: index %i (%p) was max index", hole - tt, hole); return; @@ -1112,8 +1138,10 @@ static void tcp_tap_destroy(struct ctx *c, struct tcp_tap_conn *conn) epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->sock, NULL); tcp_tap_state(conn, CLOSED); close(conn->sock); - tcp_hash_remove(conn); - tcp_table_tap_compact(c, conn); + + /* Removal from hash table and connection table compaction deferred to + * timer. + */ } static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn); @@ -1123,51 +1151,34 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn); * @c: Execution context * @conn: Connection pointer * @flags: TCP flags to set - * @in: Payload buffer - * @len: Payload length + * @now: Current timestamp, can be NULL * * Return: negative error code on connection reset, 0 otherwise */ -static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, - int flags, char *in, int len) +static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, int flags, + struct timespec *now) { - uint32_t ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap; - char buf[USHRT_MAX] = { 0 }, *data; + char buf[sizeof(struct tcphdr) + OPT_MSS_LEN + OPT_WS_LEN + 1] = { 0 }; + uint32_t prev_ack_to_tap = conn->seq_ack_to_tap; struct tcp_info info = { 0 }; - int ws = 0, err, ack_pending; socklen_t sl = sizeof(info); struct tcphdr *th; + char *data; - if (conn->state == ESTABLISHED && !ack_offset && !flags && - conn->tcpi_snd_wnd) { - err = 0; - info.tcpi_bytes_acked = conn->tcpi_acked_last; - info.tcpi_snd_wnd = conn->tcpi_snd_wnd; - info.tcpi_snd_wscale = conn->ws; - } else if (conn->no_snd_wnd && !(flags & SYN)) { - err = 0; - } else { - err = getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl); - if (err && !(flags & RST)) { - tcp_rst(c, conn); - return err; - } - - sl = sizeof(conn->sndbuf); - if (getsockopt(conn->sock, SOL_SOCKET, SO_SNDBUF, - &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; + if (SEQ_GE(conn->seq_ack_to_tap, conn->seq_from_tap) && + !flags && conn->wnd_to_tap) + return 0; - info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd, - conn->sndbuf * 90 / 100); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; + if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl)) { + tcp_rst(c, conn); + return -ECONNRESET; } th = (struct tcphdr *)buf; data = (char *)(th + 1); th->doff = sizeof(*th) / 4; - if ((flags & SYN) && !err) { + if (flags & SYN) { /* Options: MSS, NOP and window scale if allowed (4-8 bytes) */ *data++ = OPT_MSS; *data++ = OPT_MSS_LEN; @@ -1175,72 +1186,42 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, data += OPT_MSS_LEN - 2; th->doff += OPT_MSS_LEN / 4; - /* Check if kernel includes commit: - * 8f7baad7f035 ("tcp: Add snd_wnd to TCP_INFO") - */ - conn->no_snd_wnd = !info.tcpi_snd_wnd; + if (!c->tcp.kernel_snd_wnd && info.tcpi_snd_wnd) + c->tcp.kernel_snd_wnd = 1; - if (conn->ws_allowed && (ws = info.tcpi_snd_wscale) && - !conn->no_snd_wnd) { - *data++ = OPT_NOP; + conn->ws = MIN(MAX_WS, info.tcpi_snd_wscale); - *data++ = OPT_WS; - *data++ = OPT_WS_LEN; - *data++ = ws; - - th->doff += (1 + OPT_WS_LEN) / 4; - } + *data++ = OPT_NOP; + *data++ = OPT_WS; + *data++ = OPT_WS_LEN; + *data++ = conn->ws; + th->doff += (1 + OPT_WS_LEN) / 4; /* RFC 793, 3.1: "[...] and the first data octet is ISN+1." */ th->seq = htonl(conn->seq_to_tap++); + + th->ack = !!(flags & ACK); } else { + th->ack = 1; th->seq = htonl(conn->seq_to_tap); - conn->seq_to_tap += len; } - if (flags & SYN) { - ack_pending = 0; - } else if (conn->state == ESTABLISHED || conn->no_snd_wnd) { - ack_pending = conn->seq_from_tap != conn->seq_ack_to_tap && - (conn->seq_from_tap - conn->seq_ack_to_tap) < - MAX_WINDOW; + if (conn->state > ESTABLISHED || (flags & (DUP_ACK | FORCE_ACK))) { + conn->seq_ack_to_tap = conn->seq_from_tap; } else { - ack_pending = info.tcpi_bytes_acked > conn->tcpi_acked_last; - } - - if (!err && (ack_pending || (flags & ACK) || len)) { - th->ack = 1; - - if (conn->no_snd_wnd) { - conn->seq_ack_to_tap = conn->seq_from_tap; - } else { - if (conn->state == ESTABLISHED) - conn->seq_ack_to_tap = conn->seq_from_tap; - else - conn->seq_ack_to_tap = info.tcpi_bytes_acked + - conn->seq_init_from_tap; + conn->seq_ack_to_tap = info.tcpi_bytes_acked + + conn->seq_init_from_tap; - conn->tcpi_acked_last = info.tcpi_bytes_acked; - } + if (SEQ_LT(conn->seq_ack_to_tap, prev_ack_to_tap)) + conn->seq_ack_to_tap = prev_ack_to_tap; + } - /* seq_ack_to_tap matching seq_from_tap means, in these states, - * that we shut the writing half down, but the FIN segment - * wasn't acknowledged yet. We sent the FIN for sure, so adjust - * the sequence number in that case. - */ - if ((conn->state == LAST_ACK || - conn->state == FIN_WAIT_1_SOCK_FIN || - conn->state == FIN_WAIT_1) && - conn->seq_ack_to_tap == conn->seq_from_tap) - th->ack_seq = htonl(conn->seq_ack_to_tap + 1); - else - th->ack_seq = htonl(conn->seq_ack_to_tap); - } else { - if (!len && !flags) - return 0; + if (!flags && + conn->seq_ack_to_tap == prev_ack_to_tap && + c->tcp.kernel_snd_wnd && conn->wnd_to_tap == info.tcpi_snd_wnd) + return 0; - th->ack = th->ack_seq = 0; - } + th->ack_seq = htonl(conn->seq_ack_to_tap); th->rst = !!(flags & RST); th->syn = !!(flags & SYN); @@ -1249,29 +1230,40 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, th->source = htons(conn->sock_port); th->dest = htons(conn->tap_port); - if (flags & ZERO_WINDOW) { - th->window = 0; - } else if (!err && !conn->no_snd_wnd) { + if (th->syn) { /* First value sent by receiver is not scaled */ - th->window = htons(info.tcpi_snd_wnd >> - (th->syn ? 0 : info.tcpi_snd_wscale)); + th->window = htons(conn->wnd_to_tap = WINDOW_DEFAULT); } else { - th->window = htons(WINDOW_DEFAULT); - } + if (c->tcp.kernel_snd_wnd) { + conn->wnd_to_tap = MIN(info.tcpi_snd_wnd, + conn->snd_buf); + } else { + conn->wnd_to_tap = conn->snd_buf; + } + conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW); - if (!th->window) - conn->tcpi_snd_wnd = 0; + th->window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); + } th->urg_ptr = 0; th->check = 0; - memcpy(data, in, len); + if (th->ack && now) + conn->ts_ack_to_tap = *now; - tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4 + len, + tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4, conn->seq_init_to_tap); - if (th->fin) + if (flags & DUP_ACK) { + tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4, + conn->seq_init_to_tap); + } + + if (th->fin) { + conn->tap_data_noack = *now; conn->seq_to_tap++; + } return 0; } @@ -1286,7 +1278,7 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn) if (conn->state == CLOSED) return; - tcp_send_to_tap(c, conn, RST, NULL, 0); + tcp_send_to_tap(c, conn, RST, NULL); tcp_tap_destroy(c, conn); } @@ -1302,37 +1294,39 @@ static void tcp_clamp_window(struct tcp_tap_conn *conn, struct tcphdr *th, int len, unsigned int window, int init) { if (init) { - conn->ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); - conn->ws_allowed = conn->ws >= 0 && conn->ws <= MAX_WS; - conn->ws *= conn->ws_allowed; + int ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); + + conn->ws_tap = ws; /* RFC 7323, 2.2: first value is not scaled. Also, don't clamp * yet, to avoid getting a zero scale just because we set a * small window now. */ - conn->tap_window = ntohs(th->window); + conn->wnd_from_tap = ntohs(th->window); conn->window_clamped = 0; } else { if (th) - window = ntohs(th->window) << conn->ws; + window = ntohs(th->window) << conn->ws_tap; else - window <<= conn->ws; + window <<= conn->ws_tap; + + window = MIN(MAX_WINDOW, window); if (conn->window_clamped) { - if (conn->tap_window == window) + if (conn->wnd_from_tap == window) return; /* Discard +/- 1% updates to spare some syscalls. */ - if ((window > conn->tap_window && - window * 99 / 100 < conn->tap_window) || - (window < conn->tap_window && - window * 101 / 100 > conn->tap_window)) { - conn->tap_window = window; + if ((window > conn->wnd_from_tap && + window * 99 / 100 < conn->wnd_from_tap) || + (window < conn->wnd_from_tap && + window * 101 / 100 > conn->wnd_from_tap)) { + conn->wnd_from_tap = window; return; } } - conn->tap_window = window; + conn->wnd_from_tap = window; if (window < 256) window = 256; setsockopt(conn->sock, SOL_TCP, TCP_WINDOW_CLAMP, @@ -1420,17 +1414,33 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, union epoll_ref ref = { .proto = IPPROTO_TCP }; const struct sockaddr *sa; struct tcp_tap_conn *conn; + int i, s, *sock_pool_p; struct epoll_event ev; socklen_t sl; - int s; if (c->tcp.tap_conn_count >= MAX_TAP_CONNS) return; - ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + for (i = 0; i < TCP_SOCK_POOL_SIZE; i++) { + if (af == AF_INET6) + sock_pool_p = &init_sock_pool6[i]; + else + sock_pool_p = &init_sock_pool4[i]; + if ((ref.s = s = *sock_pool_p) > 0) { + *sock_pool_p = -1; + break; + } + } + + if (s < 0) + ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); + if (s < 0) return; + tcp_sock_set_bufsize(s); + if (af == AF_INET && addr4.sin_addr.s_addr == c->gw4) addr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK); else if (af == AF_INET6 && !memcmp(addr, &c->gw6, sizeof(c->gw6))) @@ -1447,10 +1457,9 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, conn = &tt[c->tcp.tap_conn_count++]; conn->sock = s; + conn->events = 0; - sl = sizeof(conn->sndbuf); - if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; + conn->wnd_to_tap = WINDOW_DEFAULT; conn->mss_guest = tcp_opt_get(th, len, OPT_MSS, NULL, NULL); if (conn->mss_guest < 0) @@ -1488,9 +1497,8 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, conn->sock_port = ntohs(th->dest); conn->tap_port = ntohs(th->source); - conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now; - - bitmap_set(tcp_act, conn - tt); + conn->ts_sock_act = conn->ts_tap_act = *now; + conn->ts_ack_to_tap = conn->ts_ack_from_tap = *now; conn->seq_init_from_tap = ntohl(th->seq); conn->seq_from_tap = conn->seq_init_from_tap + 1; @@ -1514,12 +1522,18 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr, } else { tcp_tap_state(conn, TAP_SYN_RCVD); - if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0)) + if (tcp_send_to_tap(c, conn, SYN | ACK, now)) return; ev.events = EPOLLIN | EPOLLRDHUP; } + sl = sizeof(conn->snd_buf); + if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl)) + conn->snd_buf = WINDOW_DEFAULT; + else + conn->snd_buf /= 2; + conn->events = ev.events; ref.tcp.index = conn - tt; ev.data.u64 = ref.u64; @@ -1542,10 +1556,22 @@ static void tcp_table_splice_compact(struct ctx *c, struct epoll_event ev_from; struct epoll_event ev_to; + hole->from_fin_sent = hole->to_fin_sent = 0; + hole->from_read = hole->from_written = 0; + hole->to_read = hole->to_written = 0; + + bitmap_clear(splice_rcvlowat_set[0], hole - ts); + bitmap_clear(splice_rcvlowat_set[1], hole - ts); + bitmap_clear(splice_rcvlowat_act[0], hole - ts); + bitmap_clear(splice_rcvlowat_act[1], hole - ts); + if ((hole - ts) == --c->tcp.splice_conn_count) return; move = &ts[c->tcp.splice_conn_count]; + if (move->state == CLOSED) + return; + memcpy(hole, move, sizeof(*hole)); move->state = CLOSED; move = hole; @@ -1558,8 +1584,8 @@ static void tcp_table_splice_compact(struct ctx *c, if (move->state == SPLICE_ACCEPTED) { ev_from.events = ev_to.events = 0; } else if (move->state == SPLICE_CONNECT) { - ev_from.events = EPOLLET | EPOLLRDHUP; - ev_to.events = EPOLLET | EPOLLOUT | EPOLLRDHUP; + ev_from.events = 0; + ev_to.events = EPOLLOUT; } else { ev_from.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; ev_to.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; @@ -1579,11 +1605,17 @@ static void tcp_table_splice_compact(struct ctx *c, */ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) { + int epoll_del_done = 0; + switch (conn->state) { + case CLOSED: + epoll_del_done = 1; + /* Falls through */ case SPLICE_FIN_BOTH: case SPLICE_FIN_FROM: case SPLICE_FIN_TO: case SPLICE_ESTABLISHED: + /* Flushing might need to block: don't recycle them. */ if (conn->pipe_from_to[0] != -1) { close(conn->pipe_from_to[0]); conn->pipe_from_to[0] = -1; @@ -1598,18 +1630,17 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) } /* Falls through */ case SPLICE_CONNECT: - epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); - epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + if (!epoll_del_done) { + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + } close(conn->to); /* Falls through */ case SPLICE_ACCEPTED: close(conn->from); tcp_splice_state(conn, CLOSED); tcp_table_splice_compact(c, conn); - conn->from_fin_sent = conn->to_fin_sent = 0; - conn->from_read = conn->from_written = 0; - conn->to_read = conn->to_written = 0; - return; + break; default: return; } @@ -1622,21 +1653,15 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn) */ static void tcp_sock_consume(struct tcp_tap_conn *conn, uint32_t ack_seq) { - uint32_t to_ack; - - /* Implicitly take care of wrap-arounds */ - to_ack = ack_seq - conn->seq_ack_from_tap; - /* Simply ignore out-of-order ACKs: we already consumed the data we * needed from the buffer, and we won't rewind back to a lower ACK * sequence. */ - - if (to_ack > MAX_WINDOW) + if (SEQ_LE(ack_seq, conn->seq_ack_from_tap)) return; - if (to_ack) - recv(conn->sock, NULL, to_ack, MSG_DONTWAIT | MSG_TRUNC); + recv(conn->sock, NULL, ack_seq - conn->seq_ack_from_tap, + MSG_DONTWAIT | MSG_TRUNC); conn->seq_ack_from_tap = ack_seq; } @@ -1665,24 +1690,24 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn, already_sent = conn->seq_to_tap - conn->seq_ack_from_tap; - if (already_sent > MAX_WINDOW) { + if (SEQ_LT(already_sent, 0)) { /* RFC 761, section 2.1. */ seq_to_tap = conn->seq_to_tap = conn->seq_ack_from_tap; already_sent = 0; } - if (!conn->tap_window || already_sent >= conn->tap_window) { + if (!conn->wnd_from_tap || already_sent >= conn->wnd_from_tap) { tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET); return 0; } - fill_bufs = DIV_ROUND_UP(conn->tap_window - already_sent, + fill_bufs = DIV_ROUND_UP(conn->wnd_from_tap - already_sent, conn->mss_guest); if (fill_bufs > TCP_TAP_FRAMES) { fill_bufs = TCP_TAP_FRAMES; iov_rem = 0; } else { - iov_rem = (conn->tap_window - already_sent) % conn->mss_guest; + iov_rem = (conn->wnd_from_tap - already_sent) % conn->mss_guest; } /* Adjust iovec length for recvmsg() based on what was set last time. */ @@ -1712,11 +1737,11 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn, tcp6_l2_mh_sock.msg_iovlen = fill_bufs + 1; /* Don't dequeue until acknowledged by guest. */ -recvmmsg: +recvmsg: len = recvmsg(s, v4 ? &tcp4_l2_mh_sock : &tcp6_l2_mh_sock, MSG_PEEK); if (len < 0) { if (errno == EINTR) - goto recvmmsg; + goto recvmsg; goto err; } @@ -1726,7 +1751,7 @@ recvmmsg: send = len - already_sent; if (send <= 0) { tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET); - goto out_restore_iov; + goto out; } tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); @@ -1762,32 +1787,22 @@ recvmmsg: iov_tap[send_bufs - 1].iov_len = mss_tap - conn->mss_guest + last_len; /* Likely, some new data was acked too. */ - if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->tcpi_snd_wnd) { - if (conn->no_snd_wnd) { + if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->wnd_to_tap) { + if (conn->state != ESTABLISHED || + getsockopt(s, SOL_TCP, TCP_INFO, &info, &sl)) { conn->seq_ack_to_tap = conn->seq_from_tap; } else { - if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, - &sl)) - goto err; - - if (getsockopt(conn->sock, SOL_SOCKET, - SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; + conn->seq_ack_to_tap = info.tcpi_bytes_acked + + conn->seq_init_from_tap; - info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd, - conn->sndbuf * 90 / 100); - - if (conn->state == ESTABLISHED) - conn->seq_ack_to_tap = conn->seq_from_tap; - else - conn->seq_ack_to_tap = info.tcpi_bytes_acked + - conn->seq_init_from_tap; - - conn->tcpi_acked_last = info.tcpi_bytes_acked; + if (c->tcp.kernel_snd_wnd) { + conn->wnd_to_tap = MIN(info.tcpi_snd_wnd, + conn->snd_buf); + } else { + conn->wnd_to_tap = conn->snd_buf; + } + conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW); } - } else { - info.tcpi_snd_wscale = conn->ws; - info.tcpi_snd_wnd = conn->tcpi_snd_wnd; } plen = conn->mss_guest; @@ -1815,29 +1830,22 @@ recvmmsg: b->th.dest = htons(conn->tap_port); b->th.seq = htonl(seq_to_tap); b->th.ack_seq = htonl(conn->seq_ack_to_tap); - - if (conn->no_snd_wnd) { - b->th.window = htons(WINDOW_DEFAULT); - } else { - b->th.window = htons(info.tcpi_snd_wnd >> - info.tcpi_snd_wscale); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; - } + b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); tcp_update_check_tcp4(b); - if (c->mode == MODE_PASTA) { - ip_len += sizeof(struct ethhdr); - write(c->fd_tap, &b->eh, ip_len); - pcap((char *)&b->eh, ip_len); - - conn->seq_to_tap += plen; + if (c->mode == MODE_PASST) { + b->vnet_len = htonl(sizeof(struct ethhdr) + + ip_len); + mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i]; + seq_to_tap += plen; continue; } - b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len); - - mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i]; + ip_len += sizeof(struct ethhdr); + pcap((char *)&b->eh, ip_len); + ret = write(c->fd_tap, &b->eh, ip_len); } else { struct tcp6_l2_buf_t *b = &tcp6_l2_buf[i]; uint32_t flow = conn->seq_init_to_tap; @@ -1857,14 +1865,8 @@ recvmmsg: b->th.dest = htons(conn->tap_port); b->th.seq = htonl(seq_to_tap); b->th.ack_seq = htonl(conn->seq_ack_to_tap); - - if (conn->no_snd_wnd) { - b->th.window = htons(WINDOW_DEFAULT); - } else { - b->th.window = htons(info.tcpi_snd_wnd >> - info.tcpi_snd_wscale); - conn->tcpi_snd_wnd = info.tcpi_snd_wnd; - } + b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws, + USHRT_MAX)); memset(b->ip6h.flow_lbl, 0, 3); tcp_update_check_tcp6(b); @@ -1873,21 +1875,32 @@ recvmmsg: b->ip6h.flow_lbl[1] = (flow >> 8) & 0xff; b->ip6h.flow_lbl[2] = (flow >> 0) & 0xff; - if (c->mode == MODE_PASTA) { - ip_len += sizeof(struct ethhdr); - write(c->fd_tap, &b->eh, ip_len); - pcap((char *)&b->eh, ip_len); - - conn->seq_to_tap += plen; + if (c->mode == MODE_PASST) { + b->vnet_len = htonl(sizeof(struct ethhdr) + + ip_len); + mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i]; + seq_to_tap += plen; continue; } - b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len); + ip_len += sizeof(struct ethhdr); + pcap((char *)&b->eh, ip_len); + ret = write(c->fd_tap, &b->eh, ip_len); + } + + if (ret < ip_len) { + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; - mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i]; + tap_handler(c, EPOLLERR, now); + } + + i--; + continue; } - seq_to_tap += plen; + conn->seq_to_tap += plen; } if (c->mode == MODE_PASTA) @@ -1902,6 +1915,7 @@ sendmmsg: if (ret <= 0) goto out; + conn->tap_data_noack = *now; conn->seq_to_tap += conn->mss_guest * (ret - 1) + last_len; /* sendmmsg() indicates how many messages were sent at least partially. @@ -1925,6 +1939,8 @@ sendmmsg: *iov_base -= part_sent; } + conn->ts_ack_to_tap = *now; + pcapmm(tcp_l2_mh_tap, ret); goto out; @@ -1934,23 +1950,16 @@ err: tcp_rst(c, conn); ret = -errno; } - goto out_restore_iov; + goto out; zero_len: if (conn->state == ESTABLISHED_SOCK_FIN) { - uint8_t probe; - - if (!recv(conn->sock, &probe, 1, MSG_PEEK)) { - tcp_tap_epoll_mask(c, conn, EPOLLET); - tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0); - tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT); - } + tcp_tap_epoll_mask(c, conn, EPOLLET); + tcp_send_to_tap(c, conn, FIN | ACK, now); + tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT); } out: - conn->ts_sock = *now; - -out_restore_iov: if (iov_rem) iov[fill_bufs - 1].iov_len = conn->mss_guest; if (send_bufs) @@ -1971,12 +1980,14 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, struct tap_l4_msg *msg, int count, struct timespec *now) { - int i, iov_i, ack = 0, fin = 0, retr = 0, keep = -1; + int i, iov_i, ack = 0, fin = 0, psh = 0, retr = 0, keep = -1; struct msghdr mh = { .msg_iov = tcp_tap_iov }; uint32_t max_ack_seq = conn->seq_ack_from_tap; + uint16_t max_ack_seq_wnd = conn->wnd_from_tap; uint32_t seq_from_tap = conn->seq_from_tap; - uint16_t max_ack_seq_wnd = WINDOW_DEFAULT; - ssize_t len; + int partial_send = 0; + uint16_t len; + ssize_t n; for (i = 0, iov_i = 0; i < count; i++) { uint32_t seq, seq_offset, ack_seq; @@ -2008,14 +2019,12 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, seq = ntohl(th->seq); ack_seq = ntohl(th->ack_seq); - if (!i) - max_ack_seq_wnd = ntohs(th->window); if (th->ack) { ack = 1; - if (ack_seq - conn->seq_ack_from_tap < MAX_WINDOW && - ack_seq - max_ack_seq < MAX_WINDOW) { + if (SEQ_GE(ack_seq, conn->seq_ack_from_tap) && + SEQ_GE(ack_seq, max_ack_seq)) { /* Fast re-transmit */ retr = !len && !th->fin && ack_seq == max_ack_seq && @@ -2029,6 +2038,9 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, if (th->fin) fin = 1; + if (th->psh) + psh = 1; + if (!len) continue; @@ -2046,19 +2058,22 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, * |--------| <-- len |--------| <-- len * '--------' <-- offset '-----| <- offset * ^ seq ^ seq - * (offset >= 0 i.e. < MAX_WINDOW, seq + len <= seq_from_tap) + * (offset >= 0, seq + len <= seq_from_tap) * * keep, look for another buffer, then go back, in this case: * , seq_from_tap * |--------| <-- len * '===' <-- offset * ^ seq - * (offset < 0 i.e. > MAX_WINDOW) + * (offset < 0) */ - if (seq_offset < MAX_WINDOW && seq + len <= seq_from_tap) + if (SEQ_GE(seq_offset, 0) && SEQ_LE(seq + len, seq_from_tap)) { + /* Force sending ACK, sender might have lost one */ + psh = 1; continue; + } - if (seq_offset > MAX_WINDOW) { + if (SEQ_LT(seq_offset, 0)) { if (keep == -1) keep = i; continue; @@ -2079,14 +2094,15 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, tcp_clamp_window(conn, NULL, 0, max_ack_seq_wnd, 0); if (ack) { - conn->ts_ack_tap = *now; + conn->ts_ack_from_tap = *now; + conn->tap_data_noack = ((struct timespec) { 0, 0 }); tcp_sock_consume(conn, max_ack_seq); } if (retr) { + conn->seq_ack_from_tap = max_ack_seq; conn->seq_to_tap = max_ack_seq; tcp_data_from_sock(c, conn, now); - return; } if (!iov_i) @@ -2094,30 +2110,41 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn, mh.msg_iovlen = iov_i; eintr: - len = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL); - if (len < 0) { + n = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL); + if (n < 0) { + if (errno == EPIPE) { + /* Here's the wrap, said the tap. + * In my pocket, said the socket. + * Then swiftly looked away and left. + */ + conn->seq_from_tap = seq_from_tap; + tcp_send_to_tap(c, conn, FORCE_ACK, now); + } + if (errno == EINTR) goto eintr; if (errno == EAGAIN || errno == EWOULDBLOCK) { - tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); + tcp_send_to_tap(c, conn, UPDATE_WINDOW, now); return; } tcp_rst(c, conn); return; } - if (len < (seq_from_tap - conn->seq_from_tap)) { - conn->seq_from_tap += len; - tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0); - } else { - conn->seq_from_tap += len; + if (n < (seq_from_tap - conn->seq_from_tap)) { + partial_send = 1; + tcp_send_to_tap(c, conn, UPDATE_WINDOW, now); } + conn->seq_from_tap += n; + out: if (keep != -1) { - tcp_send_to_tap(c, conn, ACK, NULL, 0); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + if (conn->seq_dup_ack != conn->seq_from_tap) { + conn->seq_dup_ack = conn->seq_from_tap; + tcp_send_to_tap(c, conn, DUP_ACK, now); + } return; } @@ -2127,19 +2154,27 @@ out: tcp_tap_state(conn, CLOSE_WAIT); } - if (!fin) { - tcp_send_to_tap(c, conn, 0, NULL, 0); - return; - } + if (fin && !partial_send) { + conn->seq_from_tap++; + + if (conn->state == ESTABLISHED) { + shutdown(conn->sock, SHUT_WR); + tcp_tap_state(conn, FIN_WAIT_1); + tcp_send_to_tap(c, conn, ACK, now); + } else if (conn->state == CLOSE_WAIT) { + shutdown(conn->sock, SHUT_WR); + tcp_tap_state(conn, LAST_ACK); + tcp_send_to_tap(c, conn, ACK, now); + } + } else { + int ack_to_tap = timespec_diff_ms(now, &conn->ts_ack_to_tap); + int ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap; - if (conn->state == ESTABLISHED) { - shutdown(conn->sock, SHUT_WR); - tcp_tap_state(conn, FIN_WAIT_1); - tcp_send_to_tap(c, conn, ACK, NULL, 0); - } else if (conn->state == CLOSE_WAIT) { - shutdown(conn->sock, SHUT_WR); - tcp_tap_state(conn, LAST_ACK); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + if (c->mode == MODE_PASTA || + psh || SEQ_GE(ack_offset, conn->wnd_to_tap / 2) || + ack_to_tap > ACK_INTERVAL) { + tcp_send_to_tap(c, conn, psh ? FORCE_ACK : 0, now); + } } } @@ -2170,16 +2205,16 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, if (th->rst) { tcp_tap_destroy(c, conn); - return 1; + return count; } - conn->ts_tap = *now; + conn->ts_tap_act = *now; switch (conn->state) { case SOCK_SYN_SENT: if (!th->syn || !th->ack) { tcp_rst(c, conn); - return 1; + return count; } tcp_clamp_window(conn, th, len, 0, 1); @@ -2198,17 +2233,6 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, conn->mss_guest); } - ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL); - if (ws > MAX_WS) { - if (tcp_send_to_tap(c, conn, RST, NULL, 0)) - return 1; - - conn->seq_to_tap = 0; - conn->ws_allowed = 0; - tcp_send_to_tap(c, conn, SYN, NULL, 0); - return 1; - } - /* info.tcpi_bytes_acked already includes one byte for SYN, but * not for incoming connections. */ @@ -2222,35 +2246,54 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, * dequeue waiting for SYN,ACK from tap -- check now. */ tcp_data_from_sock(c, conn, now); - tcp_send_to_tap(c, conn, 0, NULL, 0); + tcp_send_to_tap(c, conn, 0, now); tcp_tap_epoll_mask(c, conn, EPOLLIN | EPOLLRDHUP); break; case TAP_SYN_RCVD: if (th->fin) { + conn->seq_from_tap++; + shutdown(conn->sock, SHUT_WR); - tcp_send_to_tap(c, conn, ACK, NULL, 0); + tcp_send_to_tap(c, conn, ACK, now); tcp_tap_state(conn, FIN_WAIT_1); break; } if (!th->ack) { tcp_rst(c, conn); - return 1; + return count; } tcp_clamp_window(conn, th, len, 0, 0); tcp_tap_state(conn, ESTABLISHED); - break; + if (count == 1) + break; + + /* Falls through */ case ESTABLISHED: case ESTABLISHED_SOCK_FIN: case ESTABLISHED_SOCK_FIN_SENT: + tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); + tcp_data_from_tap(c, conn, msg, count, now); + return count; case CLOSE_WAIT: case FIN_WAIT_1_SOCK_FIN: case FIN_WAIT_1: + if (th->ack) { + conn->tap_data_noack = ((struct timespec) { 0, 0 }); + conn->ts_ack_from_tap = *now; + } + + tcp_sock_consume(conn, ntohl(th->ack_seq)); + if (conn->state == FIN_WAIT_1_SOCK_FIN && + conn->seq_ack_from_tap == conn->seq_to_tap) { + tcp_tap_destroy(c, conn); + return count; + } + tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET); - tcp_data_from_tap(c, conn, msg, count, now); return count; case TAP_SYN_SENT: case LAST_ACK: @@ -2271,8 +2314,10 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr, * tcp_connect_finish() - Handle completion of connect() from EPOLLOUT event * @c: Execution context * @s: File descriptor number for socket + * @now: Current timestamp */ -static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn) +static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn, + struct timespec *now) { socklen_t sl; int so; @@ -2283,7 +2328,7 @@ static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn) return; } - if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0)) + if (tcp_send_to_tap(c, conn, SYN | ACK, now)) return; /* Drop EPOLLOUT, only used to wait for connect() to complete */ @@ -2308,29 +2353,32 @@ static void tcp_splice_connect_finish(struct ctx *c, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; struct epoll_event ev_from, ev_to; + int i; - if (conn->state == SPLICE_CONNECT) { - socklen_t sl; - int so; + conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1; + conn->pipe_from_to[1] = conn->pipe_to_from[1] = -1; + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) { + if (splice_pipe_pool[i][0][0] > 0) { + SWAP(conn->pipe_from_to[0], splice_pipe_pool[i][0][0]); + SWAP(conn->pipe_from_to[1], splice_pipe_pool[i][0][1]); + + SWAP(conn->pipe_to_from[0], splice_pipe_pool[i][1][0]); + SWAP(conn->pipe_to_from[1], splice_pipe_pool[i][1][1]); + break; + } + } - sl = sizeof(so); - if (getsockopt(conn->to, SOL_SOCKET, SO_ERROR, &so, &sl) || - so) { + if (conn->pipe_from_to[0] <= 0) { + if (pipe2(conn->pipe_to_from, O_NONBLOCK) || + pipe2(conn->pipe_from_to, O_NONBLOCK)) { tcp_splice_destroy(c, conn); return; } - } - conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1; - if (pipe2(conn->pipe_to_from, O_NONBLOCK) || - pipe2(conn->pipe_from_to, O_NONBLOCK)) { - tcp_splice_destroy(c, conn); - return; + fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, c->tcp.pipe_size); + fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, c->tcp.pipe_size); } - fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, PIPE_SIZE); - fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, PIPE_SIZE); - if (conn->state == SPLICE_CONNECT) { tcp_splice_state(conn, SPLICE_ESTABLISHED); @@ -2338,7 +2386,7 @@ static void tcp_splice_connect_finish(struct ctx *c, ev_from.data.u64 = ref_from.u64; ev_to.data.u64 = ref_to.u64; - epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->from, &ev_from); + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_from); epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->to, &ev_to); } } @@ -2353,20 +2401,19 @@ static void tcp_splice_connect_finish(struct ctx *c, * Return: 0 for connect() succeeded or in progress, negative value on error */ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, - int v6, in_port_t port) + int s, int v6, in_port_t port) { - int sock_conn = socket(v6 ? AF_INET6 : AF_INET, - SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + int sock_conn = (s > 0) ? s : socket(v6 ? AF_INET6 : AF_INET, + SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); union epoll_ref ref_accept = { .proto = IPPROTO_TCP, .s = conn->from, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; union epoll_ref ref_conn = { .proto = IPPROTO_TCP, .s = sock_conn, .tcp = { .splice = 1, .v6 = v6, .index = conn - ts } }; - struct epoll_event ev_accept = { .events = EPOLLET, - .data.u64 = ref_accept.u64 }; - struct epoll_event ev_conn = { .events = EPOLLET, - .data.u64 = ref_conn.u64 }; + struct epoll_event ev_accept = { .data.u64 = ref_accept.u64 }; + struct epoll_event ev_conn = { .data.u64 = ref_conn.u64 }; struct sockaddr_in6 addr6 = { .sin6_family = AF_INET6, .sin6_port = htons(port), @@ -2381,11 +2428,11 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, socklen_t sl; int ret; - if (sock_conn < 0) - return -errno; - conn->to = sock_conn; + if (s <= 0) + tcp_sock_set_bufsize(sock_conn); + if (v6) { sa = (struct sockaddr *)&addr6; sl = sizeof(addr6); @@ -2402,16 +2449,17 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn, } tcp_splice_state(conn, SPLICE_CONNECT); - ev_conn.events |= EPOLLOUT; + ev_conn.events = EPOLLOUT; } else { tcp_splice_state(conn, SPLICE_ESTABLISHED); tcp_splice_connect_finish(c, conn, v6); - ev_conn.events |= EPOLLIN; - ev_accept.events |= EPOLLIN; + ev_accept.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; + ev_conn.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP; + + epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept); } - epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept); epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->to, &ev_conn); return 0; @@ -2445,7 +2493,7 @@ static int tcp_splice_connect_ns(void *arg) a = (struct tcp_splice_connect_ns_arg *)arg; ns_enter(a->c->pasta_pid); - a->ret = tcp_splice_connect(a->c, a->conn, a->v6, a->port); + a->ret = tcp_splice_connect(a->c, a->conn, -1, a->v6, a->port); return 0; } @@ -2462,13 +2510,26 @@ static int tcp_splice_new(struct ctx *c, struct tcp_splice_conn *conn, int v6, in_port_t port) { struct tcp_splice_connect_ns_arg ns_arg = { c, conn, v6, port, 0 }; + int *sock_pool_p, i, s = -1; - if (bitmap_isset(c->tcp.port_to_tap, port)) { + if (bitmap_isset(c->tcp.port_to_tap, port)) + sock_pool_p = v6 ? ns_sock_pool6 : ns_sock_pool4; + else + sock_pool_p = v6 ? init_sock_pool6 : init_sock_pool4; + + for (i = 0; i < TCP_SOCK_POOL_SIZE; i++, sock_pool_p++) { + if ((s = *sock_pool_p) > 0) { + *sock_pool_p = -1; + break; + } + } + + if (s <= 0 && bitmap_isset(c->tcp.port_to_tap, port)) { NS_CALL(tcp_splice_connect_ns, &ns_arg); return ns_arg.ret; } - return tcp_splice_connect(c, conn, v6, port); + return tcp_splice_connect(c, conn, s, v6, port); } /** @@ -2500,10 +2561,6 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref, ref_conn.tcp.index = conn - tt; ref_conn.s = conn->sock = s; - sl = sizeof(conn->sndbuf); - if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl)) - conn->sndbuf = USHRT_MAX; - if (ref.tcp.v6) { struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&sa; @@ -2551,19 +2608,24 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref, conn->seq_ack_from_tap = conn->seq_to_tap + 1; - conn->tap_window = WINDOW_DEFAULT; - conn->ws_allowed = 1; + conn->wnd_from_tap = WINDOW_DEFAULT; - conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now; + conn->ts_sock_act = conn->ts_tap_act = *now; + conn->ts_ack_from_tap = conn->ts_ack_to_tap = *now; - bitmap_set(tcp_act, conn - tt); + tcp_send_to_tap(c, conn, SYN, now); conn->events = ev.events = EPOLLRDHUP; ev.data.u64 = ref_conn.u64; epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->sock, &ev); tcp_tap_state(conn, SOCK_SYN_SENT); - tcp_send_to_tap(c, conn, SYN, NULL, 0); + + sl = sizeof(conn->snd_buf); + if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl)) + conn->snd_buf = WINDOW_DEFAULT; + else + conn->snd_buf /= 2; } /** @@ -2576,11 +2638,13 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, uint32_t events) { int move_from, move_to, *pipes, eof, never_read; + uint8_t *rcvlowat_set, *rcvlowat_act; + uint64_t *seq_read, *seq_write; struct tcp_splice_conn *conn; struct epoll_event ev; if (ref.tcp.listen) { - int s; + int s, one = 1; if (c->tcp.splice_conn_count >= MAX_SPLICE_CONNS) return; @@ -2588,6 +2652,8 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, if ((s = accept4(ref.s, NULL, NULL, SOCK_NONBLOCK)) < 0) return; + setsockopt(s, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + conn = &ts[c->tcp.splice_conn_count++]; conn->from = s; tcp_splice_state(conn, SPLICE_ACCEPTED); @@ -2614,8 +2680,7 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref, if (conn->state == SPLICE_CONNECT) tcp_splice_connect_finish(c, conn, ref.tcp.v6); - - if (conn->state == SPLICE_ESTABLISHED) + else if (conn->state == SPLICE_ESTABLISHED) epoll_ctl(c->epollfd, EPOLL_CTL_MOD, ref.s, &ev); move_to = ref.s; @@ -2655,12 +2720,25 @@ swap: eof = 0; never_read = 1; + if (move_from == conn->from) { + seq_read = &conn->from_read; + seq_write = &conn->from_written; + rcvlowat_set = splice_rcvlowat_set[0]; + rcvlowat_act = splice_rcvlowat_act[0]; + } else { + seq_read = &conn->to_read; + seq_write = &conn->to_written; + rcvlowat_set = splice_rcvlowat_set[1]; + rcvlowat_act = splice_rcvlowat_act[1]; + } + + while (1) { - int retry_write = 1; + int retry_write = 0, more = 0; ssize_t read, to_write = 0, written; retry: - read = splice(move_from, NULL, pipes[1], NULL, PIPE_SIZE, + read = splice(move_from, NULL, pipes[1], NULL, c->tcp.pipe_size, SPLICE_F_MOVE); if (read < 0) { if (errno == EINTR) @@ -2669,35 +2747,46 @@ retry: if (errno != EAGAIN) goto close; - to_write = PIPE_SIZE; + to_write = c->tcp.pipe_size; } else if (!read) { eof = 1; - to_write = PIPE_SIZE; + to_write = c->tcp.pipe_size; } else { never_read = 0; to_write += read; - if (move_from == conn->from) - conn->from_read += read; - else - conn->to_read += read; + if (read >= (long)c->tcp.pipe_size * 90 / 100) + more = SPLICE_F_MORE; + + if (bitmap_isset(rcvlowat_set, conn - ts)) + bitmap_set(rcvlowat_act, conn - ts); } eintr: written = splice(pipes[0], NULL, move_to, NULL, to_write, - SPLICE_F_MOVE); + SPLICE_F_MOVE | more); - if (written > 0) { - if (move_from == conn->from) { - conn->from_written += written; - if (conn->from_read == conn->from_written) - break; - } else { - conn->to_written += written; - if (conn->to_read == conn->to_written) - break; + /* Most common case: skip updating counters. */ + if (read > 0 && read == written) { + if (read >= (long)c->tcp.pipe_size * 10 / 100) + continue; + + if (!bitmap_isset(rcvlowat_set, conn - ts) && + read > (long)c->tcp.pipe_size / 10) { + int lowat = c->tcp.pipe_size / 4; + + setsockopt(move_from, SOL_SOCKET, SO_RCVLOWAT, + &lowat, sizeof(lowat)); + + bitmap_set(rcvlowat_set, conn - ts); + bitmap_set(rcvlowat_act, conn - ts); } + + break; } + *seq_read += read > 0 ? read : 0; + *seq_write += written > 0 ? written : 0; + if (written < 0) { if (errno == EINTR) goto eintr; @@ -2716,9 +2805,9 @@ eintr: ev.data.u64 = ref.u64, epoll_ctl(c->epollfd, EPOLL_CTL_MOD, move_to, &ev); break; - } else if (never_read && written == PIPE_SIZE) { + } else if (never_read && written == (long)(c->tcp.pipe_size)) { goto retry; - } else if (!never_read &&written < to_write) { + } else if (!never_read && written < to_write) { to_write -= written; goto retry; } @@ -2727,11 +2816,10 @@ eintr: break; } - if (conn->state == SPLICE_FIN_BOTH || - (conn->state == SPLICE_FIN_FROM && move_from == conn->from) || - (conn->state == SPLICE_FIN_TO && move_from == conn->to)) { + if (*seq_read == *seq_write) { if (move_from == conn->from && - conn->from_read == conn->from_written) { + (conn->state == SPLICE_FIN_FROM || + conn->state == SPLICE_FIN_BOTH)) { if (!conn->from_fin_sent) { shutdown(conn->to, SHUT_WR); conn->from_fin_sent = 1; @@ -2746,7 +2834,8 @@ eintr: if (conn->to_fin_sent) goto close; } else if (move_from == conn->to && - conn->to_read == conn->to_written) { + (conn->state == SPLICE_FIN_TO || + conn->state == SPLICE_FIN_BOTH)) { if (!conn->to_fin_sent) { shutdown(conn->from, SHUT_WR); conn->to_fin_sent = 1; @@ -2778,7 +2867,9 @@ eintr: return; close: - tcp_splice_destroy(c, conn); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL); + epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL); + conn->state = CLOSED; return; } @@ -2806,39 +2897,42 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, conn = &tt[ref.tcp.index]; + conn->ts_sock_act = *now; + if (events & EPOLLERR) { if (conn->state != CLOSED) tcp_rst(c, conn); + return; } switch (conn->state) { case TAP_SYN_SENT: if (events & EPOLLOUT) - tcp_connect_finish(c, conn); + tcp_connect_finish(c, conn, now); else tcp_rst(c, conn); return; case ESTABLISHED_SOCK_FIN: case ESTABLISHED_SOCK_FIN_SENT: case ESTABLISHED: - tcp_data_from_sock(c, conn, now); if (events & EPOLLRDHUP) { if (conn->state == ESTABLISHED) tcp_tap_state(conn, ESTABLISHED_SOCK_FIN); - tcp_data_from_sock(c, conn, now); } + tcp_data_from_sock(c, conn, now); return; case LAST_ACK: - tcp_send_to_tap(c, conn, 0, NULL, 0); - if (conn->seq_ack_to_tap == conn->seq_from_tap + 1) + tcp_send_to_tap(c, conn, 0, now); + if (conn->seq_ack_to_tap == conn->seq_from_tap + 1 || + conn->seq_ack_to_tap == conn->seq_from_tap) tcp_tap_destroy(c, conn); return; case FIN_WAIT_1: if (events & EPOLLIN) tcp_data_from_sock(c, conn, now); if (events & EPOLLRDHUP) { - tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0); + tcp_send_to_tap(c, conn, FIN | ACK, now); tcp_tap_state(conn, FIN_WAIT_1_SOCK_FIN); } return; @@ -2847,11 +2941,13 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, if (events & EPOLLIN) tcp_data_from_sock(c, conn, now); if (events & EPOLLHUP) { - if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1) && - (conn->seq_ack_from_tap == conn->seq_to_tap)) { + if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1 || + conn->seq_ack_to_tap == conn->seq_from_tap) && + (conn->seq_ack_from_tap == conn->seq_to_tap - 1 || + conn->seq_ack_from_tap == conn->seq_to_tap)) { tcp_tap_destroy(c, conn); } else { - tcp_send_to_tap(c, conn, 0, NULL, 0); + tcp_send_to_tap(c, conn, ACK, now); } } return; @@ -2868,6 +2964,43 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, } } +/** + * tcp_set_pipe_size() - Set usable pipe size, probe starting from MAX_PIPE_SIZE + * @c: Execution context + */ +static void tcp_set_pipe_size(struct ctx *c) +{ + int probe_pipe[TCP_SPLICE_PIPE_POOL_SIZE * 2][2], i, j; + + c->tcp.pipe_size = MAX_PIPE_SIZE; + +smaller: + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE * 2; i++) { + if (pipe(probe_pipe[i])) { + i++; + break; + } + + if (fcntl(probe_pipe[i][0], F_SETPIPE_SZ, c->tcp.pipe_size) < 0) + break; + } + + for (j = i - 1; j >= 0; j--) { + close(probe_pipe[j][0]); + close(probe_pipe[j][1]); + } + + if (i == TCP_SPLICE_PIPE_POOL_SIZE * 2) + return; + + if (!(c->tcp.pipe_size /= 2)) { + c->tcp.pipe_size = MAX_PIPE_SIZE; + return; + } + + goto smaller; +} + /** * tcp_sock_init_ns() - Bind sockets in namespace for inbound connections * @arg: Execution context @@ -2879,6 +3012,7 @@ static int tcp_sock_init_ns(void *arg) union tcp_epoll_ref tref = { .listen = 1, .splice = 1 }; struct ctx *c = (struct ctx *)arg; in_port_t port; + int s; ns_enter(c->pasta_pid); @@ -2890,30 +3024,113 @@ static int tcp_sock_init_ns(void *arg) if (c->v4) { tref.v6 = 0; - sock_l4(c, AF_INET, IPPROTO_TCP, port, BIND_LOOPBACK, - tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } if (c->v6) { tref.v6 = 1; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, BIND_LOOPBACK, - tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } return 0; } +/** + * tcp_splice_pipe_refill() - Refill pool of pre-opened pipes + * @c: Execution context + */ +static void tcp_splice_pipe_refill(struct ctx *c) +{ + int i; + + for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) { + if (splice_pipe_pool[i][0][0] > 0) + break; + if (pipe2(splice_pipe_pool[i][0], O_NONBLOCK)) + continue; + if (pipe2(splice_pipe_pool[i][1], O_NONBLOCK)) { + close(splice_pipe_pool[i][1][0]); + close(splice_pipe_pool[i][1][1]); + continue; + } + + fcntl(splice_pipe_pool[i][0][0], F_SETPIPE_SZ, + c->tcp.pipe_size); + fcntl(splice_pipe_pool[i][1][0], F_SETPIPE_SZ, + c->tcp.pipe_size); + } +} + +/** + * struct tcp_sock_refill_arg - Arguments for tcp_sock_refill() + * @c: Execution context + * @ns: Set to refill pool of sockets created in namespace + */ +struct tcp_sock_refill_arg { + struct ctx *c; + int ns; +}; + +/** + * tcp_sock_refill() - Refill pool of pre-opened sockets + * @arg: See @tcp_sock_refill_arg + * + * Return: 0 + */ +static int tcp_sock_refill(void *arg) +{ + struct tcp_sock_refill_arg *a = (struct tcp_sock_refill_arg *)arg; + int i, *p4, *p6, one = 1; + + if (a->ns) { + if (ns_enter(a->c->pasta_pid)) + return 0; + p4 = ns_sock_pool4; + p6 = ns_sock_pool6; + } else { + p4 = init_sock_pool4; + p6 = init_sock_pool6; + } + + for (i = 0; a->c->v4 && i < TCP_SOCK_POOL_SIZE; i++, p4++) { + if (*p4 > 0) { + break; + } + *p4 = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + setsockopt(*p4, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + tcp_sock_set_bufsize(*p4); + } + + for (i = 0; a->c->v6 && i < TCP_SOCK_POOL_SIZE; i++, p6++) { + if (*p6 > 0) { + break; + } + *p6 = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, + IPPROTO_TCP); + setsockopt(*p6, SOL_TCP, TCP_QUICKACK, &one, sizeof(one)); + tcp_sock_set_bufsize(*p6); + } + + return 0; +} + /** * tcp_sock_init() - Bind sockets for inbound connections, get key for sequence * @c: Execution context * * Return: 0 on success, -1 on failure */ -int tcp_sock_init(struct ctx *c) +int tcp_sock_init(struct ctx *c, struct timespec *now) { + struct tcp_sock_refill_arg refill_arg = { c, 0 }; union tcp_epoll_ref tref = { .listen = 1 }; in_port_t port; + int s; getrandom(&c->tcp.hash_secret, sizeof(c->tcp.hash_secret), GRND_RANDOM); @@ -2926,14 +3143,16 @@ int tcp_sock_init(struct ctx *c) tref.v6 = 0; tref.splice = 0; - sock_l4(c, AF_INET, IPPROTO_TCP, port, - c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, - tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, + tref.u32); + tcp_sock_set_bufsize(s); if (c->mode == MODE_PASTA) { tref.splice = 1; - sock_l4(c, AF_INET, IPPROTO_TCP, port, - BIND_LOOPBACK, tref.u32); + s = sock_l4(c, AF_INET, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } @@ -2941,14 +3160,16 @@ int tcp_sock_init(struct ctx *c) tref.v6 = 1; tref.splice = 0; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, - c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, - tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY, + tref.u32); + tcp_sock_set_bufsize(s); if (c->mode == MODE_PASTA) { tref.splice = 1; - sock_l4(c, AF_INET6, IPPROTO_TCP, port, - BIND_LOOPBACK, tref.u32); + s = sock_l4(c, AF_INET6, IPPROTO_TCP, port, + BIND_LOOPBACK, tref.u32); + tcp_sock_set_bufsize(s); } } } @@ -2959,9 +3180,18 @@ int tcp_sock_init(struct ctx *c) if (c->v6) tcp_sock6_iov_init(); - if (c->mode == MODE_PASTA) + c->tcp.refill_ts = *now; + tcp_sock_refill(&refill_arg); + + if (c->mode == MODE_PASTA) { + tcp_set_pipe_size(c); NS_CALL(tcp_sock_init_ns, c); + refill_arg.ns = 1; + NS_CALL(tcp_sock_refill, &refill_arg); + tcp_splice_pipe_refill(c); + } + return 0; } @@ -2974,72 +3204,72 @@ int tcp_sock_init(struct ctx *c) static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, struct timespec *ts) { - int ack_tap_ms = timespec_diff_ms(ts, &conn->ts_ack_tap); - int sock_ms = timespec_diff_ms(ts, &conn->ts_sock); - int tap_ms = timespec_diff_ms(ts, &conn->ts_tap); + int ack_from_tap = timespec_diff_ms(ts, &conn->ts_ack_from_tap); + int ack_to_tap = timespec_diff_ms(ts, &conn->ts_ack_to_tap); + int sock_act = timespec_diff_ms(ts, &conn->ts_sock_act); + int tap_act = timespec_diff_ms(ts, &conn->ts_tap_act); + int tap_data_noack; + + if (memcmp(&conn->tap_data_noack, &((struct timespec){ 0, 0 }), + sizeof(struct timespec))) + tap_data_noack = 0; + else + tap_data_noack = timespec_diff_ms(ts, &conn->tap_data_noack); switch (conn->state) { + case CLOSED: + tcp_hash_remove(conn); + tcp_table_tap_compact(c, conn); + break; case SOCK_SYN_SENT: case TAP_SYN_RCVD: - if (ack_tap_ms > SYN_TIMEOUT) + if (ack_from_tap > SYN_TIMEOUT) tcp_rst(c, conn); break; case ESTABLISHED_SOCK_FIN_SENT: - if (ack_tap_ms > FIN_TIMEOUT) { + if (tap_data_noack > FIN_TIMEOUT) { tcp_rst(c, conn); break; } /* Falls through */ case ESTABLISHED: case ESTABLISHED_SOCK_FIN: - if (tap_ms > ACT_TIMEOUT && sock_ms > ACT_TIMEOUT) { + if (tap_act > ACT_TIMEOUT && sock_act > ACT_TIMEOUT) { tcp_rst(c, conn); break; } - if (conn->seq_to_tap == conn->seq_ack_from_tap && - conn->seq_from_tap == conn->seq_ack_to_tap) { - conn->ts_sock = *ts; - break; - } - - if (sock_ms > ACK_INTERVAL) { - if (conn->seq_from_tap > conn->seq_ack_to_tap) - tcp_send_to_tap(c, conn, 0, NULL, 0); - } + if (!conn->wnd_to_tap) + tcp_send_to_tap(c, conn, UPDATE_WINDOW, ts); + else if (ack_to_tap > ACK_INTERVAL) + tcp_send_to_tap(c, conn, 0, ts); - if (sock_ms - ack_tap_ms > ACK_TIMEOUT) { + if (tap_data_noack > ACK_TIMEOUT) { if (conn->seq_ack_from_tap < conn->seq_to_tap) { - if (sock_ms - ack_tap_ms > 10 * ACK_TIMEOUT) { + if (tap_data_noack > LAST_ACK_TIMEOUT) { tcp_rst(c, conn); break; } conn->seq_to_tap = conn->seq_ack_from_tap; - if (sock_ms > ACK_TIMEOUT) - tcp_data_from_sock(c, conn, ts); + tcp_data_from_sock(c, conn, ts); } } - - if (conn->seq_from_tap == conn->seq_ack_to_tap) - conn->ts_sock = *ts; - - if (!conn->tcpi_snd_wnd) - tcp_send_to_tap(c, conn, 0, NULL, 0); - break; case CLOSE_WAIT: case FIN_WAIT_1_SOCK_FIN: - if (sock_ms - ack_tap_ms > FIN_TIMEOUT) + if (tap_data_noack > FIN_TIMEOUT) tcp_rst(c, conn); break; case FIN_WAIT_1: - if (sock_ms > FIN_TIMEOUT) + if (sock_act > FIN_TIMEOUT) tcp_rst(c, conn); break; case LAST_ACK: - if (sock_ms > LAST_ACK_TIMEOUT) + if (sock_act > LAST_ACK_TIMEOUT) + tcp_rst(c, conn); + else if (tap_act > LAST_ACK_TIMEOUT) tcp_rst(c, conn); break; case TAP_SYN_SENT: @@ -3049,7 +3279,6 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, case SPLICE_FIN_FROM: case SPLICE_FIN_TO: case SPLICE_FIN_BOTH: - case CLOSED: break; } } @@ -3059,19 +3288,53 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn, * @c: Execution context * @ts: Timestamp from caller */ -void tcp_timer(struct ctx *c, struct timespec *ts) +void tcp_timer(struct ctx *c, struct timespec *now) { - long *word = (long *)tcp_act, tmp; - unsigned int i; - int n; + struct tcp_sock_refill_arg refill_arg = { c, 0 }; + int i; + + if (timespec_diff_ms(now, &c->tcp.refill_ts) > REFILL_INTERVAL) { + tcp_sock_refill(&refill_arg); + if (c->mode == MODE_PASTA) { + refill_arg.ns = 1; + if ((c->v4 && ns_sock_pool4[TCP_SOCK_POOL_TSH] <= 0) || + (c->v6 && ns_sock_pool6[TCP_SOCK_POOL_TSH] <= 0)) + NS_CALL(tcp_sock_refill, &refill_arg); + + tcp_splice_pipe_refill(c); + } + } - for (i = 0; i < sizeof(tcp_act) / sizeof(long); i++, word++) { - tmp = *word; - while ((n = ffsl(tmp))) { - int index = i * sizeof(long) * 8 + n - 1; + for (i = c->tcp.tap_conn_count - 1; i >= 0; i--) + tcp_timer_one(c, tt + i, now); + + if (c->mode == MODE_PASTA) { + for (i = c->tcp.splice_conn_count - 1; i >= 0; i--) { + if ((ts + i)->state == CLOSED) { + tcp_splice_destroy(c, ts + i); + continue; + } + + if (bitmap_isset(splice_rcvlowat_set[0], i) && + !bitmap_isset(splice_rcvlowat_act[0], i)) { + int lowat = 1; + + setsockopt((ts + i)->from, SOL_SOCKET, + SO_RCVLOWAT, &lowat, sizeof(lowat)); + bitmap_clear(splice_rcvlowat_set[0], i); + } + + if (bitmap_isset(splice_rcvlowat_set[1], i) && + !bitmap_isset(splice_rcvlowat_act[1], i)) { + int lowat = 1; + + setsockopt((ts + i)->to, SOL_SOCKET, + SO_RCVLOWAT, &lowat, sizeof(lowat)); + bitmap_clear(splice_rcvlowat_set[1], i); + } - tmp &= ~(1UL << (n - 1)); - tcp_timer_one(c, &tt[index], ts); + bitmap_clear(splice_rcvlowat_act[0], i); + bitmap_clear(splice_rcvlowat_act[1], i); } } } diff --git a/tcp.h b/tcp.h index 359414c..ae983ed 100644 --- a/tcp.h +++ b/tcp.h @@ -11,8 +11,8 @@ struct ctx; void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events, struct timespec *now); int tcp_tap_handler(struct ctx *c, int af, void *addr, - struct tap_msg *msg, int count, struct timespec *now); -int tcp_sock_init(struct ctx *c); + struct tap_l4_msg *msg, int count, struct timespec *now); +int tcp_sock_init(struct ctx *c, struct timespec *now); void tcp_timer(struct ctx *c, struct timespec *ts); void tcp_update_l2_buf(unsigned char *eth_d, unsigned char *eth_s, uint32_t *ip_da); @@ -45,6 +45,9 @@ union tcp_epoll_ref { * @port_to_tap: Ports bound host-side, packets to tap or spliced * @port_to_init: Ports bound namespace-side, spliced to init * @timer_run: Timestamp of most recent timer run + * @kernel_snd_wnd: Kernel reports sending window (with commit 8f7baad7f035) + * @pipe_size: Size of pipes for spliced connections + * @refill_ts: Time of last refill operation for pools of sockets/pipes */ struct tcp_ctx { uint64_t hash_secret[2]; @@ -53,6 +56,9 @@ struct tcp_ctx { uint8_t port_to_tap [USHRT_MAX / 8]; uint8_t port_to_init [USHRT_MAX / 8]; struct timespec timer_run; + int kernel_snd_wnd; + size_t pipe_size; + struct timespec refill_ts; }; #endif /* TCP_H */ -- cgit v1.2.3