aboutgitcodebugslistschat
diff options
context:
space:
mode:
-rw-r--r--tcp.c1175
-rw-r--r--tcp.h10
2 files changed, 727 insertions, 458 deletions
diff --git a/tcp.c b/tcp.c
index 91b82ee..830bf4a 100644
--- a/tcp.c
+++ b/tcp.c
@@ -66,20 +66,15 @@
* ------
*
* To avoid the need for dynamic memory allocation, a maximum, reasonable amount
- * of connections is defined by MAX_TAP_CONNS below (currently 1M, close to
- * the maximum amount of file descriptors typically available to a process on
- * Linux).
- *
- * While fragmentation and reassembly are not implemented, tracking of missing
- * segments and retransmissions needs to be, thus data needs to linger on
- * sockets as long as it's not acknowledged by the guest, and read using
- * MSG_PEEK into a single, preallocated static buffer sized to the maximum
- * supported window, 16MiB. This imposes a practical limitation on window
- * scaling, that is, the maximum factor is 512. If a bigger window scaling
- * factor is observed during connection establishment, connection is reset and
- * reestablished by omitting the scaling factor in the SYN segment. This
- * limitation only applies to the window scaling advertised by the guest, but
- * if exceeded, no window scaling will be allowed at all toward either endpoint.
+ * of connections is defined by MAX_TAP_CONNS below (currently 128k).
+ *
+ * Data needs to linger on sockets as long as it's not acknowledged by the
+ * guest, and is read using MSG_PEEK into preallocated static buffers sized
+ * to the maximum supported window, 64MiB ("discard" buffer, for already-sent
+ * data) plus a number of maximum-MSS-sized buffers. This imposes a practical
+ * limitation on window scaling, that is, the maximum factor is 1024. Larger
+ * factors will be accepted, but resulting, larger values are never advertised
+ * to the other side, and not used while queueing data.
*
*
* Ports
@@ -245,8 +240,7 @@
*
* @seq_init_from_tap: initial sequence number from tap
*
- * @tap_window: last window size received from tap, scaled
- * @tcpi_acked_last: most recent value of tcpi_bytes_acked (TCP_INFO)
+ * @wnd_from_tap: last window size received from tap, scaled
*
* - from socket to tap:
* - on new data from socket:
@@ -255,7 +249,7 @@
* - starting at offset (@seq_to_tap - @seq_ack_from_tap)
* - in MSS-sized segments
* - increasing @seq_to_tap at each segment
- * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @tap_window)
+ * - up to window (until @seq_to_tap - @seq_ack_from_tap <= @wnd_from_tap)
* - mark socket in bitmap for periodic ACK check, set @last_ts_to_tap
* - on read error, send RST to tap, close socket
* - on zero read, send FIN to tap, enter ESTABLISHED_SOCK_FIN
@@ -271,25 +265,20 @@
* - periodically:
* - if @seq_ack_from_tap < @seq_to_tap and the retransmission timer
* (TODO: implement requirements from RFC 6298, currently 3s fixed) from
- * @ts_sock elapsed, reset @seq_to_tap to @seq_ack_from_tap, and
+ * @ts_tap_from_ack elapsed, reset @seq_to_tap to @seq_ack_from_tap, and
* resend data with the steps listed above
*
* - from tap to socket:
* - on packet from tap:
- * - set @ts_tap
+ * - set @ts_tap_ack
* - set TCP_WINDOW_CLAMP from TCP header from tap
* - check seq from header against @seq_from_tap, if data is missing, send
* two ACKs with number @seq_ack_to_tap, discard packet
* - otherwise queue data to socket, set @seq_from_tap to seq from header
* plus payload length
- * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last,
- * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap
- * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and
- * send ACK to tap
- * - periodically:
- * - query socket for TCP_INFO, on tcpi_bytes_acked > @tcpi_acked_last,
- * set @tcpi_acked_last to tcpi_bytes_acked, set @seq_ack_to_tap
- * to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and
+ * - in ESTABLISHED state, send ACK to tap as soon as we queue to the
+ * socket. In other states, query socket for TCP_INFO, set
+ * @seq_ack_to_tap to (tcpi_bytes_acked + @seq_init_from_tap) % 2^32 and
* send ACK to tap
*
*
@@ -351,13 +340,13 @@
#define TCP_TAP_FRAMES 8
-#define PIPE_SIZE (1024 * 1024)
+#define MAX_PIPE_SIZE (2 * 1024 * 1024)
#define TCP_HASH_TABLE_LOAD 70 /* % */
#define TCP_HASH_TABLE_SIZE (MAX_TAP_CONNS * 100 / \
TCP_HASH_TABLE_LOAD)
-#define MAX_WS 9
+#define MAX_WS 10
#define MAX_WINDOW (1 << (16 + (MAX_WS)))
#define MSS_DEFAULT 536
#define WINDOW_DEFAULT 14600 /* RFC 6928 */
@@ -369,12 +358,21 @@
#define FIN_TIMEOUT 240000
#define LAST_ACK_TIMEOUT 240000
+#define TCP_SOCK_POOL_SIZE 256
+#define TCP_SOCK_POOL_TSH 128 /* Refill in ns if > x used */
+#define TCP_SPLICE_PIPE_POOL_SIZE 256
+#define REFILL_INTERVAL 1000
/* We need to include <linux/tcp.h> for tcpi_bytes_acked, instead of
* <netinet/tcp.h>, but that doesn't include a definition for SOL_TCP
*/
#define SOL_TCP IPPROTO_TCP
+#define SEQ_LE(a, b) ((b) - (a) < MAX_WINDOW)
+#define SEQ_LT(a, b) ((b) - (a) - 1 < MAX_WINDOW)
+#define SEQ_GE(a, b) ((a) - (b) < MAX_WINDOW)
+#define SEQ_GT(a, b) ((a) - (b) - 1 < MAX_WINDOW)
+
enum tcp_state {
CLOSED = 0,
TAP_SYN_SENT,
@@ -409,7 +407,9 @@ static char *tcp_state_str[TCP_STATE_STR_SIZE] __attribute((__unused__)) = {
#define RST (1 << 2)
#define ACK (1 << 4)
/* Flags for internal usage */
-#define ZERO_WINDOW (1 << 5)
+#define UPDATE_WINDOW (1 << 5)
+#define DUP_ACK (1 << 6)
+#define FORCE_ACK (1 << 7)
#define OPT_EOL 0
#define OPT_NOP 1
@@ -439,18 +439,21 @@ struct tcp_tap_conn;
* @seq_ack_from_tap: Last ACK number received from tap
* @seq_from_tap: Next sequence for packets from tap (not actually sent)
* @seq_ack_to_tap: Last ACK number sent to tap
+ * @seq_dup_ack: Last duplicate ACK number sent to tap
* @seq_init_from_tap: Initial sequence number from tap
- * @tcpi_acked_last: Most recent value of tcpi_bytes_acked (TCP_INFO query)
- * @ws_allowed: Window scaling allowed
+ * @seq_init_from_tap: Initial sequence number to tap
+ * @ws_tap: Window scaling factor from tap
* @ws: Window scaling factor
- * @tap_window: Last window size received from tap, scaled
+ * @wnd_from_tap: Last window size received from tap, scaled
+ * @wnd_to_tap: Socket-side sending window, advertised to tap
* @window_clamped: Window was clamped on socket at least once
- * @no_snd_wnd: Kernel won't report window (without commit 8f7baad7f035)
- * @tcpi_acked_last: Most recent value of tcpi_snd_wnd (TCP_INFO query)
- * @ts_sock: Last activity timestamp from socket for timeout purposes
- * @ts_tap: Last activity timestamp from tap for timeout purposes
- * @ts_ack_tap: Last ACK segment timestamp from tap for timeout purposes
+ * @ts_sock_act: Last activity timestamp from socket for timeout purposes
+ * @ts_tap_act: Last activity timestamp from tap for timeout purposes
+ * @ts_ack_from_tap: Last ACK segment timestamp from tap
+ * @ts_ack_to_tap: Last ACK segment timestamp to tap
+ * @tap_data_noack: Last unacked data to tap, set to { 0, 0 } on ACK
* @mss_guest: Maximum segment size advertised by guest
+ * @events: epoll events currently enabled for socket
*/
struct tcp_tap_conn {
struct tcp_tap_conn *next;
@@ -473,23 +476,24 @@ struct tcp_tap_conn {
uint32_t seq_ack_from_tap;
uint32_t seq_from_tap;
uint32_t seq_ack_to_tap;
+ uint32_t seq_dup_ack;
uint32_t seq_init_from_tap;
uint32_t seq_init_to_tap;
- uint64_t tcpi_acked_last;
- int ws_allowed;
- int ws;
- uint32_t tap_window;
+ uint16_t ws_tap;
+ uint16_t ws;
+ uint32_t wnd_from_tap;
+ uint32_t wnd_to_tap;
int window_clamped;
- int no_snd_wnd;
- uint32_t tcpi_snd_wnd;
+ int snd_buf;
- struct timespec ts_sock;
- struct timespec ts_tap;
- struct timespec ts_ack_tap;
+ struct timespec ts_sock_act;
+ struct timespec ts_tap_act;
+ struct timespec ts_ack_from_tap;
+ struct timespec ts_ack_to_tap;
+ struct timespec tap_data_noack;
int mss_guest;
- uint32_t sndbuf;
uint32_t events;
};
@@ -626,7 +630,7 @@ static int tcp6_l2_buf_mss_nr_set;
static int tcp6_l2_buf_mss_tap;
static int tcp6_l2_buf_mss_tap_nr_set;
-/* recvmmsg()/sendmmsg() data for tap */
+/* recvmsg()/sendmsg() data for tap */
static struct iovec tcp4_l2_iov_sock [TCP_TAP_FRAMES + 1];
static struct iovec tcp6_l2_iov_sock [TCP_TAP_FRAMES + 1];
static char tcp_buf_discard [MAX_WINDOW];
@@ -647,8 +651,9 @@ static struct mmsghdr tcp_l2_mh_tap [TCP_TAP_FRAMES] = {
/* sendmsg() to socket */
static struct iovec tcp_tap_iov [UIO_MAXIOV];
-/* Bitmap, activity monitoring needed for connection via tap */
-static uint8_t tcp_act[MAX_TAP_CONNS / 8] = { 0 };
+/* SO_RCVLOWAT set on source ([0]) or destination ([1]) socket, and activity */
+static uint8_t splice_rcvlowat_set[MAX_SPLICE_CONNS / 8][2];
+static uint8_t splice_rcvlowat_act[MAX_SPLICE_CONNS / 8][2];
/* TCP connections */
static struct tcp_tap_conn tt[MAX_TAP_CONNS];
@@ -657,6 +662,13 @@ static struct tcp_splice_conn ts[MAX_SPLICE_CONNS];
/* Table for lookup from remote address, local port, remote port */
static struct tcp_tap_conn *tt_hash[TCP_HASH_TABLE_SIZE];
+/* Pools for pre-opened sockets and pipes */
+static int splice_pipe_pool [TCP_SPLICE_PIPE_POOL_SIZE][2][2];
+static int init_sock_pool4 [TCP_SOCK_POOL_SIZE];
+static int init_sock_pool6 [TCP_SOCK_POOL_SIZE];
+static int ns_sock_pool4 [TCP_SOCK_POOL_SIZE];
+static int ns_sock_pool6 [TCP_SOCK_POOL_SIZE];
+
/**
* tcp_tap_state() - Set given TCP state for tap connection, report to stderr
* @conn: Connection pointer
@@ -682,6 +694,21 @@ static void tcp_splice_state(struct tcp_splice_conn *conn, enum tcp_state state)
}
/**
+ * tcp_sock_set_bufsize() - Set SO_RCVBUF and SO_SNDBUF to maximum values
+ * @s: Socket, can be -1 to avoid check in the caller
+ */
+static void tcp_sock_set_bufsize(int s)
+{
+ int v = INT_MAX / 2; /* Kernel clamps and rounds, no need to check */
+
+ if (s == -1)
+ return;
+
+ setsockopt(s, SOL_SOCKET, SO_RCVBUF, &v, sizeof(v));
+ setsockopt(s, SOL_SOCKET, SO_SNDBUF, &v, sizeof(v));
+}
+
+/**
* tcp_update_check_ip4() - Update IPv4 with variable parts from stored one
* @buf: L2 packet buffer with final IPv4 header
*/
@@ -1077,7 +1104,6 @@ static void tcp_table_tap_compact(struct ctx *c, struct tcp_tap_conn *hole)
uint32_t events;
if ((hole - tt) == --c->tcp.tap_conn_count) {
- bitmap_clear(tcp_act, hole - tt);
debug("TCP: hash table compaction: index %i (%p) was max index",
hole - tt, hole);
return;
@@ -1112,8 +1138,10 @@ static void tcp_tap_destroy(struct ctx *c, struct tcp_tap_conn *conn)
epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->sock, NULL);
tcp_tap_state(conn, CLOSED);
close(conn->sock);
- tcp_hash_remove(conn);
- tcp_table_tap_compact(c, conn);
+
+ /* Removal from hash table and connection table compaction deferred to
+ * timer.
+ */
}
static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn);
@@ -1123,51 +1151,34 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn);
* @c: Execution context
* @conn: Connection pointer
* @flags: TCP flags to set
- * @in: Payload buffer
- * @len: Payload length
+ * @now: Current timestamp, can be NULL
*
* Return: negative error code on connection reset, 0 otherwise
*/
-static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn,
- int flags, char *in, int len)
+static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn, int flags,
+ struct timespec *now)
{
- uint32_t ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap;
- char buf[USHRT_MAX] = { 0 }, *data;
+ char buf[sizeof(struct tcphdr) + OPT_MSS_LEN + OPT_WS_LEN + 1] = { 0 };
+ uint32_t prev_ack_to_tap = conn->seq_ack_to_tap;
struct tcp_info info = { 0 };
- int ws = 0, err, ack_pending;
socklen_t sl = sizeof(info);
struct tcphdr *th;
+ char *data;
- if (conn->state == ESTABLISHED && !ack_offset && !flags &&
- conn->tcpi_snd_wnd) {
- err = 0;
- info.tcpi_bytes_acked = conn->tcpi_acked_last;
- info.tcpi_snd_wnd = conn->tcpi_snd_wnd;
- info.tcpi_snd_wscale = conn->ws;
- } else if (conn->no_snd_wnd && !(flags & SYN)) {
- err = 0;
- } else {
- err = getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl);
- if (err && !(flags & RST)) {
- tcp_rst(c, conn);
- return err;
- }
-
- sl = sizeof(conn->sndbuf);
- if (getsockopt(conn->sock, SOL_SOCKET, SO_SNDBUF,
- &conn->sndbuf, &sl))
- conn->sndbuf = USHRT_MAX;
+ if (SEQ_GE(conn->seq_ack_to_tap, conn->seq_from_tap) &&
+ !flags && conn->wnd_to_tap)
+ return 0;
- info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd,
- conn->sndbuf * 90 / 100);
- conn->tcpi_snd_wnd = info.tcpi_snd_wnd;
+ if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info, &sl)) {
+ tcp_rst(c, conn);
+ return -ECONNRESET;
}
th = (struct tcphdr *)buf;
data = (char *)(th + 1);
th->doff = sizeof(*th) / 4;
- if ((flags & SYN) && !err) {
+ if (flags & SYN) {
/* Options: MSS, NOP and window scale if allowed (4-8 bytes) */
*data++ = OPT_MSS;
*data++ = OPT_MSS_LEN;
@@ -1175,72 +1186,42 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn,
data += OPT_MSS_LEN - 2;
th->doff += OPT_MSS_LEN / 4;
- /* Check if kernel includes commit:
- * 8f7baad7f035 ("tcp: Add snd_wnd to TCP_INFO")
- */
- conn->no_snd_wnd = !info.tcpi_snd_wnd;
+ if (!c->tcp.kernel_snd_wnd && info.tcpi_snd_wnd)
+ c->tcp.kernel_snd_wnd = 1;
- if (conn->ws_allowed && (ws = info.tcpi_snd_wscale) &&
- !conn->no_snd_wnd) {
- *data++ = OPT_NOP;
+ conn->ws = MIN(MAX_WS, info.tcpi_snd_wscale);
- *data++ = OPT_WS;
- *data++ = OPT_WS_LEN;
- *data++ = ws;
-
- th->doff += (1 + OPT_WS_LEN) / 4;
- }
+ *data++ = OPT_NOP;
+ *data++ = OPT_WS;
+ *data++ = OPT_WS_LEN;
+ *data++ = conn->ws;
+ th->doff += (1 + OPT_WS_LEN) / 4;
/* RFC 793, 3.1: "[...] and the first data octet is ISN+1." */
th->seq = htonl(conn->seq_to_tap++);
+
+ th->ack = !!(flags & ACK);
} else {
+ th->ack = 1;
th->seq = htonl(conn->seq_to_tap);
- conn->seq_to_tap += len;
}
- if (flags & SYN) {
- ack_pending = 0;
- } else if (conn->state == ESTABLISHED || conn->no_snd_wnd) {
- ack_pending = conn->seq_from_tap != conn->seq_ack_to_tap &&
- (conn->seq_from_tap - conn->seq_ack_to_tap) <
- MAX_WINDOW;
+ if (conn->state > ESTABLISHED || (flags & (DUP_ACK | FORCE_ACK))) {
+ conn->seq_ack_to_tap = conn->seq_from_tap;
} else {
- ack_pending = info.tcpi_bytes_acked > conn->tcpi_acked_last;
- }
-
- if (!err && (ack_pending || (flags & ACK) || len)) {
- th->ack = 1;
-
- if (conn->no_snd_wnd) {
- conn->seq_ack_to_tap = conn->seq_from_tap;
- } else {
- if (conn->state == ESTABLISHED)
- conn->seq_ack_to_tap = conn->seq_from_tap;
- else
- conn->seq_ack_to_tap = info.tcpi_bytes_acked +
- conn->seq_init_from_tap;
+ conn->seq_ack_to_tap = info.tcpi_bytes_acked +
+ conn->seq_init_from_tap;
- conn->tcpi_acked_last = info.tcpi_bytes_acked;
- }
+ if (SEQ_LT(conn->seq_ack_to_tap, prev_ack_to_tap))
+ conn->seq_ack_to_tap = prev_ack_to_tap;
+ }
- /* seq_ack_to_tap matching seq_from_tap means, in these states,
- * that we shut the writing half down, but the FIN segment
- * wasn't acknowledged yet. We sent the FIN for sure, so adjust
- * the sequence number in that case.
- */
- if ((conn->state == LAST_ACK ||
- conn->state == FIN_WAIT_1_SOCK_FIN ||
- conn->state == FIN_WAIT_1) &&
- conn->seq_ack_to_tap == conn->seq_from_tap)
- th->ack_seq = htonl(conn->seq_ack_to_tap + 1);
- else
- th->ack_seq = htonl(conn->seq_ack_to_tap);
- } else {
- if (!len && !flags)
- return 0;
+ if (!flags &&
+ conn->seq_ack_to_tap == prev_ack_to_tap &&
+ c->tcp.kernel_snd_wnd && conn->wnd_to_tap == info.tcpi_snd_wnd)
+ return 0;
- th->ack = th->ack_seq = 0;
- }
+ th->ack_seq = htonl(conn->seq_ack_to_tap);
th->rst = !!(flags & RST);
th->syn = !!(flags & SYN);
@@ -1249,29 +1230,40 @@ static int tcp_send_to_tap(struct ctx *c, struct tcp_tap_conn *conn,
th->source = htons(conn->sock_port);
th->dest = htons(conn->tap_port);
- if (flags & ZERO_WINDOW) {
- th->window = 0;
- } else if (!err && !conn->no_snd_wnd) {
+ if (th->syn) {
/* First value sent by receiver is not scaled */
- th->window = htons(info.tcpi_snd_wnd >>
- (th->syn ? 0 : info.tcpi_snd_wscale));
+ th->window = htons(conn->wnd_to_tap = WINDOW_DEFAULT);
} else {
- th->window = htons(WINDOW_DEFAULT);
- }
+ if (c->tcp.kernel_snd_wnd) {
+ conn->wnd_to_tap = MIN(info.tcpi_snd_wnd,
+ conn->snd_buf);
+ } else {
+ conn->wnd_to_tap = conn->snd_buf;
+ }
+ conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW);
- if (!th->window)
- conn->tcpi_snd_wnd = 0;
+ th->window = htons(MIN(conn->wnd_to_tap >> conn->ws,
+ USHRT_MAX));
+ }
th->urg_ptr = 0;
th->check = 0;
- memcpy(data, in, len);
+ if (th->ack && now)
+ conn->ts_ack_to_tap = *now;
- tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4 + len,
+ tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4,
conn->seq_init_to_tap);
- if (th->fin)
+ if (flags & DUP_ACK) {
+ tap_ip_send(c, &conn->a.a6, IPPROTO_TCP, buf, th->doff * 4,
+ conn->seq_init_to_tap);
+ }
+
+ if (th->fin) {
+ conn->tap_data_noack = *now;
conn->seq_to_tap++;
+ }
return 0;
}
@@ -1286,7 +1278,7 @@ static void tcp_rst(struct ctx *c, struct tcp_tap_conn *conn)
if (conn->state == CLOSED)
return;
- tcp_send_to_tap(c, conn, RST, NULL, 0);
+ tcp_send_to_tap(c, conn, RST, NULL);
tcp_tap_destroy(c, conn);
}
@@ -1302,37 +1294,39 @@ static void tcp_clamp_window(struct tcp_tap_conn *conn, struct tcphdr *th,
int len, unsigned int window, int init)
{
if (init) {
- conn->ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL);
- conn->ws_allowed = conn->ws >= 0 && conn->ws <= MAX_WS;
- conn->ws *= conn->ws_allowed;
+ int ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL);
+
+ conn->ws_tap = ws;
/* RFC 7323, 2.2: first value is not scaled. Also, don't clamp
* yet, to avoid getting a zero scale just because we set a
* small window now.
*/
- conn->tap_window = ntohs(th->window);
+ conn->wnd_from_tap = ntohs(th->window);
conn->window_clamped = 0;
} else {
if (th)
- window = ntohs(th->window) << conn->ws;
+ window = ntohs(th->window) << conn->ws_tap;
else
- window <<= conn->ws;
+ window <<= conn->ws_tap;
+
+ window = MIN(MAX_WINDOW, window);
if (conn->window_clamped) {
- if (conn->tap_window == window)
+ if (conn->wnd_from_tap == window)
return;
/* Discard +/- 1% updates to spare some syscalls. */
- if ((window > conn->tap_window &&
- window * 99 / 100 < conn->tap_window) ||
- (window < conn->tap_window &&
- window * 101 / 100 > conn->tap_window)) {
- conn->tap_window = window;
+ if ((window > conn->wnd_from_tap &&
+ window * 99 / 100 < conn->wnd_from_tap) ||
+ (window < conn->wnd_from_tap &&
+ window * 101 / 100 > conn->wnd_from_tap)) {
+ conn->wnd_from_tap = window;
return;
}
}
- conn->tap_window = window;
+ conn->wnd_from_tap = window;
if (window < 256)
window = 256;
setsockopt(conn->sock, SOL_TCP, TCP_WINDOW_CLAMP,
@@ -1420,17 +1414,33 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr,
union epoll_ref ref = { .proto = IPPROTO_TCP };
const struct sockaddr *sa;
struct tcp_tap_conn *conn;
+ int i, s, *sock_pool_p;
struct epoll_event ev;
socklen_t sl;
- int s;
if (c->tcp.tap_conn_count >= MAX_TAP_CONNS)
return;
- ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
+ for (i = 0; i < TCP_SOCK_POOL_SIZE; i++) {
+ if (af == AF_INET6)
+ sock_pool_p = &init_sock_pool6[i];
+ else
+ sock_pool_p = &init_sock_pool4[i];
+ if ((ref.s = s = *sock_pool_p) > 0) {
+ *sock_pool_p = -1;
+ break;
+ }
+ }
+
+ if (s < 0)
+ ref.s = s = socket(af, SOCK_STREAM | SOCK_NONBLOCK,
+ IPPROTO_TCP);
+
if (s < 0)
return;
+ tcp_sock_set_bufsize(s);
+
if (af == AF_INET && addr4.sin_addr.s_addr == c->gw4)
addr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
else if (af == AF_INET6 && !memcmp(addr, &c->gw6, sizeof(c->gw6)))
@@ -1447,10 +1457,9 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr,
conn = &tt[c->tcp.tap_conn_count++];
conn->sock = s;
+ conn->events = 0;
- sl = sizeof(conn->sndbuf);
- if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl))
- conn->sndbuf = USHRT_MAX;
+ conn->wnd_to_tap = WINDOW_DEFAULT;
conn->mss_guest = tcp_opt_get(th, len, OPT_MSS, NULL, NULL);
if (conn->mss_guest < 0)
@@ -1488,9 +1497,8 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr,
conn->sock_port = ntohs(th->dest);
conn->tap_port = ntohs(th->source);
- conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now;
-
- bitmap_set(tcp_act, conn - tt);
+ conn->ts_sock_act = conn->ts_tap_act = *now;
+ conn->ts_ack_to_tap = conn->ts_ack_from_tap = *now;
conn->seq_init_from_tap = ntohl(th->seq);
conn->seq_from_tap = conn->seq_init_from_tap + 1;
@@ -1514,12 +1522,18 @@ static void tcp_conn_from_tap(struct ctx *c, int af, void *addr,
} else {
tcp_tap_state(conn, TAP_SYN_RCVD);
- if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0))
+ if (tcp_send_to_tap(c, conn, SYN | ACK, now))
return;
ev.events = EPOLLIN | EPOLLRDHUP;
}
+ sl = sizeof(conn->snd_buf);
+ if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl))
+ conn->snd_buf = WINDOW_DEFAULT;
+ else
+ conn->snd_buf /= 2;
+
conn->events = ev.events;
ref.tcp.index = conn - tt;
ev.data.u64 = ref.u64;
@@ -1542,10 +1556,22 @@ static void tcp_table_splice_compact(struct ctx *c,
struct epoll_event ev_from;
struct epoll_event ev_to;
+ hole->from_fin_sent = hole->to_fin_sent = 0;
+ hole->from_read = hole->from_written = 0;
+ hole->to_read = hole->to_written = 0;
+
+ bitmap_clear(splice_rcvlowat_set[0], hole - ts);
+ bitmap_clear(splice_rcvlowat_set[1], hole - ts);
+ bitmap_clear(splice_rcvlowat_act[0], hole - ts);
+ bitmap_clear(splice_rcvlowat_act[1], hole - ts);
+
if ((hole - ts) == --c->tcp.splice_conn_count)
return;
move = &ts[c->tcp.splice_conn_count];
+ if (move->state == CLOSED)
+ return;
+
memcpy(hole, move, sizeof(*hole));
move->state = CLOSED;
move = hole;
@@ -1558,8 +1584,8 @@ static void tcp_table_splice_compact(struct ctx *c,
if (move->state == SPLICE_ACCEPTED) {
ev_from.events = ev_to.events = 0;
} else if (move->state == SPLICE_CONNECT) {
- ev_from.events = EPOLLET | EPOLLRDHUP;
- ev_to.events = EPOLLET | EPOLLOUT | EPOLLRDHUP;
+ ev_from.events = 0;
+ ev_to.events = EPOLLOUT;
} else {
ev_from.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
ev_to.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
@@ -1579,11 +1605,17 @@ static void tcp_table_splice_compact(struct ctx *c,
*/
static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
{
+ int epoll_del_done = 0;
+
switch (conn->state) {
+ case CLOSED:
+ epoll_del_done = 1;
+ /* Falls through */
case SPLICE_FIN_BOTH:
case SPLICE_FIN_FROM:
case SPLICE_FIN_TO:
case SPLICE_ESTABLISHED:
+ /* Flushing might need to block: don't recycle them. */
if (conn->pipe_from_to[0] != -1) {
close(conn->pipe_from_to[0]);
conn->pipe_from_to[0] = -1;
@@ -1598,18 +1630,17 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
}
/* Falls through */
case SPLICE_CONNECT:
- epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL);
- epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL);
+ if (!epoll_del_done) {
+ epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL);
+ epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL);
+ }
close(conn->to);
/* Falls through */
case SPLICE_ACCEPTED:
close(conn->from);
tcp_splice_state(conn, CLOSED);
tcp_table_splice_compact(c, conn);
- conn->from_fin_sent = conn->to_fin_sent = 0;
- conn->from_read = conn->from_written = 0;
- conn->to_read = conn->to_written = 0;
- return;
+ break;
default:
return;
}
@@ -1622,21 +1653,15 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
*/
static void tcp_sock_consume(struct tcp_tap_conn *conn, uint32_t ack_seq)
{
- uint32_t to_ack;
-
- /* Implicitly take care of wrap-arounds */
- to_ack = ack_seq - conn->seq_ack_from_tap;
-
/* Simply ignore out-of-order ACKs: we already consumed the data we
* needed from the buffer, and we won't rewind back to a lower ACK
* sequence.
*/
-
- if (to_ack > MAX_WINDOW)
+ if (SEQ_LE(ack_seq, conn->seq_ack_from_tap))
return;
- if (to_ack)
- recv(conn->sock, NULL, to_ack, MSG_DONTWAIT | MSG_TRUNC);
+ recv(conn->sock, NULL, ack_seq - conn->seq_ack_from_tap,
+ MSG_DONTWAIT | MSG_TRUNC);
conn->seq_ack_from_tap = ack_seq;
}
@@ -1665,24 +1690,24 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn,
already_sent = conn->seq_to_tap - conn->seq_ack_from_tap;
- if (already_sent > MAX_WINDOW) {
+ if (SEQ_LT(already_sent, 0)) {
/* RFC 761, section 2.1. */
seq_to_tap = conn->seq_to_tap = conn->seq_ack_from_tap;
already_sent = 0;
}
- if (!conn->tap_window || already_sent >= conn->tap_window) {
+ if (!conn->wnd_from_tap || already_sent >= conn->wnd_from_tap) {
tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET);
return 0;
}
- fill_bufs = DIV_ROUND_UP(conn->tap_window - already_sent,
+ fill_bufs = DIV_ROUND_UP(conn->wnd_from_tap - already_sent,
conn->mss_guest);
if (fill_bufs > TCP_TAP_FRAMES) {
fill_bufs = TCP_TAP_FRAMES;
iov_rem = 0;
} else {
- iov_rem = (conn->tap_window - already_sent) % conn->mss_guest;
+ iov_rem = (conn->wnd_from_tap - already_sent) % conn->mss_guest;
}
/* Adjust iovec length for recvmsg() based on what was set last time. */
@@ -1712,11 +1737,11 @@ static int tcp_data_from_sock(struct ctx *c, struct tcp_tap_conn *conn,
tcp6_l2_mh_sock.msg_iovlen = fill_bufs + 1;
/* Don't dequeue until acknowledged by guest. */
-recvmmsg:
+recvmsg:
len = recvmsg(s, v4 ? &tcp4_l2_mh_sock : &tcp6_l2_mh_sock, MSG_PEEK);
if (len < 0) {
if (errno == EINTR)
- goto recvmmsg;
+ goto recvmsg;
goto err;
}
@@ -1726,7 +1751,7 @@ recvmmsg:
send = len - already_sent;
if (send <= 0) {
tcp_tap_epoll_mask(c, conn, conn->events | EPOLLET);
- goto out_restore_iov;
+ goto out;
}
tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET);
@@ -1762,32 +1787,22 @@ recvmmsg:
iov_tap[send_bufs - 1].iov_len = mss_tap - conn->mss_guest + last_len;
/* Likely, some new data was acked too. */
- if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->tcpi_snd_wnd) {
- if (conn->no_snd_wnd) {
+ if (conn->seq_from_tap != conn->seq_ack_to_tap || !conn->wnd_to_tap) {
+ if (conn->state != ESTABLISHED ||
+ getsockopt(s, SOL_TCP, TCP_INFO, &info, &sl)) {
conn->seq_ack_to_tap = conn->seq_from_tap;
} else {
- if (getsockopt(conn->sock, SOL_TCP, TCP_INFO, &info,
- &sl))
- goto err;
-
- if (getsockopt(conn->sock, SOL_SOCKET,
- SO_SNDBUF, &conn->sndbuf, &sl))
- conn->sndbuf = USHRT_MAX;
+ conn->seq_ack_to_tap = info.tcpi_bytes_acked +
+ conn->seq_init_from_tap;
- info.tcpi_snd_wnd = MIN(info.tcpi_snd_wnd,
- conn->sndbuf * 90 / 100);
-
- if (conn->state == ESTABLISHED)
- conn->seq_ack_to_tap = conn->seq_from_tap;
- else
- conn->seq_ack_to_tap = info.tcpi_bytes_acked +
- conn->seq_init_from_tap;
-
- conn->tcpi_acked_last = info.tcpi_bytes_acked;
+ if (c->tcp.kernel_snd_wnd) {
+ conn->wnd_to_tap = MIN(info.tcpi_snd_wnd,
+ conn->snd_buf);
+ } else {
+ conn->wnd_to_tap = conn->snd_buf;
+ }
+ conn->wnd_to_tap = MIN(conn->wnd_to_tap, MAX_WINDOW);
}
- } else {
- info.tcpi_snd_wscale = conn->ws;
- info.tcpi_snd_wnd = conn->tcpi_snd_wnd;
}
plen = conn->mss_guest;
@@ -1815,29 +1830,22 @@ recvmmsg:
b->th.dest = htons(conn->tap_port);
b->th.seq = htonl(seq_to_tap);
b->th.ack_seq = htonl(conn->seq_ack_to_tap);
-
- if (conn->no_snd_wnd) {
- b->th.window = htons(WINDOW_DEFAULT);
- } else {
- b->th.window = htons(info.tcpi_snd_wnd >>
- info.tcpi_snd_wscale);
- conn->tcpi_snd_wnd = info.tcpi_snd_wnd;
- }
+ b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws,
+ USHRT_MAX));
tcp_update_check_tcp4(b);
- if (c->mode == MODE_PASTA) {
- ip_len += sizeof(struct ethhdr);
- write(c->fd_tap, &b->eh, ip_len);
- pcap((char *)&b->eh, ip_len);
-
- conn->seq_to_tap += plen;
+ if (c->mode == MODE_PASST) {
+ b->vnet_len = htonl(sizeof(struct ethhdr) +
+ ip_len);
+ mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i];
+ seq_to_tap += plen;
continue;
}
- b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len);
-
- mh->msg_hdr.msg_iov = &tcp4_l2_iov_tap[i];
+ ip_len += sizeof(struct ethhdr);
+ pcap((char *)&b->eh, ip_len);
+ ret = write(c->fd_tap, &b->eh, ip_len);
} else {
struct tcp6_l2_buf_t *b = &tcp6_l2_buf[i];
uint32_t flow = conn->seq_init_to_tap;
@@ -1857,14 +1865,8 @@ recvmmsg:
b->th.dest = htons(conn->tap_port);
b->th.seq = htonl(seq_to_tap);
b->th.ack_seq = htonl(conn->seq_ack_to_tap);
-
- if (conn->no_snd_wnd) {
- b->th.window = htons(WINDOW_DEFAULT);
- } else {
- b->th.window = htons(info.tcpi_snd_wnd >>
- info.tcpi_snd_wscale);
- conn->tcpi_snd_wnd = info.tcpi_snd_wnd;
- }
+ b->th.window = htons(MIN(conn->wnd_to_tap >> conn->ws,
+ USHRT_MAX));
memset(b->ip6h.flow_lbl, 0, 3);
tcp_update_check_tcp6(b);
@@ -1873,21 +1875,32 @@ recvmmsg:
b->ip6h.flow_lbl[1] = (flow >> 8) & 0xff;
b->ip6h.flow_lbl[2] = (flow >> 0) & 0xff;
- if (c->mode == MODE_PASTA) {
- ip_len += sizeof(struct ethhdr);
- write(c->fd_tap, &b->eh, ip_len);
- pcap((char *)&b->eh, ip_len);
-
- conn->seq_to_tap += plen;
+ if (c->mode == MODE_PASST) {
+ b->vnet_len = htonl(sizeof(struct ethhdr) +
+ ip_len);
+ mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i];
+ seq_to_tap += plen;
continue;
}
- b->vnet_len = htonl(sizeof(struct ethhdr) + ip_len);
+ ip_len += sizeof(struct ethhdr);
+ pcap((char *)&b->eh, ip_len);
+ ret = write(c->fd_tap, &b->eh, ip_len);
+ }
+
+ if (ret < ip_len) {
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return 0;
- mh->msg_hdr.msg_iov = &tcp6_l2_iov_tap[i];
+ tap_handler(c, EPOLLERR, now);
+ }
+
+ i--;
+ continue;
}
- seq_to_tap += plen;
+ conn->seq_to_tap += plen;
}
if (c->mode == MODE_PASTA)
@@ -1902,6 +1915,7 @@ sendmmsg:
if (ret <= 0)
goto out;
+ conn->tap_data_noack = *now;
conn->seq_to_tap += conn->mss_guest * (ret - 1) + last_len;
/* sendmmsg() indicates how many messages were sent at least partially.
@@ -1925,6 +1939,8 @@ sendmmsg:
*iov_base -= part_sent;
}
+ conn->ts_ack_to_tap = *now;
+
pcapmm(tcp_l2_mh_tap, ret);
goto out;
@@ -1934,23 +1950,16 @@ err:
tcp_rst(c, conn);
ret = -errno;
}
- goto out_restore_iov;
+ goto out;
zero_len:
if (conn->state == ESTABLISHED_SOCK_FIN) {
- uint8_t probe;
-
- if (!recv(conn->sock, &probe, 1, MSG_PEEK)) {
- tcp_tap_epoll_mask(c, conn, EPOLLET);
- tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0);
- tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT);
- }
+ tcp_tap_epoll_mask(c, conn, EPOLLET);
+ tcp_send_to_tap(c, conn, FIN | ACK, now);
+ tcp_tap_state(conn, ESTABLISHED_SOCK_FIN_SENT);
}
out:
- conn->ts_sock = *now;
-
-out_restore_iov:
if (iov_rem)
iov[fill_bufs - 1].iov_len = conn->mss_guest;
if (send_bufs)
@@ -1971,12 +1980,14 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
struct tap_l4_msg *msg, int count,
struct timespec *now)
{
- int i, iov_i, ack = 0, fin = 0, retr = 0, keep = -1;
+ int i, iov_i, ack = 0, fin = 0, psh = 0, retr = 0, keep = -1;
struct msghdr mh = { .msg_iov = tcp_tap_iov };
uint32_t max_ack_seq = conn->seq_ack_from_tap;
+ uint16_t max_ack_seq_wnd = conn->wnd_from_tap;
uint32_t seq_from_tap = conn->seq_from_tap;
- uint16_t max_ack_seq_wnd = WINDOW_DEFAULT;
- ssize_t len;
+ int partial_send = 0;
+ uint16_t len;
+ ssize_t n;
for (i = 0, iov_i = 0; i < count; i++) {
uint32_t seq, seq_offset, ack_seq;
@@ -2008,14 +2019,12 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
seq = ntohl(th->seq);
ack_seq = ntohl(th->ack_seq);
- if (!i)
- max_ack_seq_wnd = ntohs(th->window);
if (th->ack) {
ack = 1;
- if (ack_seq - conn->seq_ack_from_tap < MAX_WINDOW &&
- ack_seq - max_ack_seq < MAX_WINDOW) {
+ if (SEQ_GE(ack_seq, conn->seq_ack_from_tap) &&
+ SEQ_GE(ack_seq, max_ack_seq)) {
/* Fast re-transmit */
retr = !len && !th->fin &&
ack_seq == max_ack_seq &&
@@ -2029,6 +2038,9 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
if (th->fin)
fin = 1;
+ if (th->psh)
+ psh = 1;
+
if (!len)
continue;
@@ -2046,19 +2058,22 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
* |--------| <-- len |--------| <-- len
* '--------' <-- offset '-----| <- offset
* ^ seq ^ seq
- * (offset >= 0 i.e. < MAX_WINDOW, seq + len <= seq_from_tap)
+ * (offset >= 0, seq + len <= seq_from_tap)
*
* keep, look for another buffer, then go back, in this case:
* , seq_from_tap
* |--------| <-- len
* '===' <-- offset
* ^ seq
- * (offset < 0 i.e. > MAX_WINDOW)
+ * (offset < 0)
*/
- if (seq_offset < MAX_WINDOW && seq + len <= seq_from_tap)
+ if (SEQ_GE(seq_offset, 0) && SEQ_LE(seq + len, seq_from_tap)) {
+ /* Force sending ACK, sender might have lost one */
+ psh = 1;
continue;
+ }
- if (seq_offset > MAX_WINDOW) {
+ if (SEQ_LT(seq_offset, 0)) {
if (keep == -1)
keep = i;
continue;
@@ -2079,14 +2094,15 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
tcp_clamp_window(conn, NULL, 0, max_ack_seq_wnd, 0);
if (ack) {
- conn->ts_ack_tap = *now;
+ conn->ts_ack_from_tap = *now;
+ conn->tap_data_noack = ((struct timespec) { 0, 0 });
tcp_sock_consume(conn, max_ack_seq);
}
if (retr) {
+ conn->seq_ack_from_tap = max_ack_seq;
conn->seq_to_tap = max_ack_seq;
tcp_data_from_sock(c, conn, now);
- return;
}
if (!iov_i)
@@ -2094,30 +2110,41 @@ static void tcp_data_from_tap(struct ctx *c, struct tcp_tap_conn *conn,
mh.msg_iovlen = iov_i;
eintr:
- len = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL);
- if (len < 0) {
+ n = sendmsg(conn->sock, &mh, MSG_DONTWAIT | MSG_NOSIGNAL);
+ if (n < 0) {
+ if (errno == EPIPE) {
+ /* Here's the wrap, said the tap.
+ * In my pocket, said the socket.
+ * Then swiftly looked away and left.
+ */
+ conn->seq_from_tap = seq_from_tap;
+ tcp_send_to_tap(c, conn, FORCE_ACK, now);
+ }
+
if (errno == EINTR)
goto eintr;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
- tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0);
+ tcp_send_to_tap(c, conn, UPDATE_WINDOW, now);
return;
}
tcp_rst(c, conn);
return;
}
- if (len < (seq_from_tap - conn->seq_from_tap)) {
- conn->seq_from_tap += len;
- tcp_send_to_tap(c, conn, ZERO_WINDOW, NULL, 0);
- } else {
- conn->seq_from_tap += len;
+ if (n < (seq_from_tap - conn->seq_from_tap)) {
+ partial_send = 1;
+ tcp_send_to_tap(c, conn, UPDATE_WINDOW, now);
}
+ conn->seq_from_tap += n;
+
out:
if (keep != -1) {
- tcp_send_to_tap(c, conn, ACK, NULL, 0);
- tcp_send_to_tap(c, conn, ACK, NULL, 0);
+ if (conn->seq_dup_ack != conn->seq_from_tap) {
+ conn->seq_dup_ack = conn->seq_from_tap;
+ tcp_send_to_tap(c, conn, DUP_ACK, now);
+ }
return;
}
@@ -2127,19 +2154,27 @@ out:
tcp_tap_state(conn, CLOSE_WAIT);
}
- if (!fin) {
- tcp_send_to_tap(c, conn, 0, NULL, 0);
- return;
- }
+ if (fin && !partial_send) {
+ conn->seq_from_tap++;
+
+ if (conn->state == ESTABLISHED) {
+ shutdown(conn->sock, SHUT_WR);
+ tcp_tap_state(conn, FIN_WAIT_1);
+ tcp_send_to_tap(c, conn, ACK, now);
+ } else if (conn->state == CLOSE_WAIT) {
+ shutdown(conn->sock, SHUT_WR);
+ tcp_tap_state(conn, LAST_ACK);
+ tcp_send_to_tap(c, conn, ACK, now);
+ }
+ } else {
+ int ack_to_tap = timespec_diff_ms(now, &conn->ts_ack_to_tap);
+ int ack_offset = conn->seq_from_tap - conn->seq_ack_to_tap;
- if (conn->state == ESTABLISHED) {
- shutdown(conn->sock, SHUT_WR);
- tcp_tap_state(conn, FIN_WAIT_1);
- tcp_send_to_tap(c, conn, ACK, NULL, 0);
- } else if (conn->state == CLOSE_WAIT) {
- shutdown(conn->sock, SHUT_WR);
- tcp_tap_state(conn, LAST_ACK);
- tcp_send_to_tap(c, conn, ACK, NULL, 0);
+ if (c->mode == MODE_PASTA ||
+ psh || SEQ_GE(ack_offset, conn->wnd_to_tap / 2) ||
+ ack_to_tap > ACK_INTERVAL) {
+ tcp_send_to_tap(c, conn, psh ? FORCE_ACK : 0, now);
+ }
}
}
@@ -2170,16 +2205,16 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
if (th->rst) {
tcp_tap_destroy(c, conn);
- return 1;
+ return count;
}
- conn->ts_tap = *now;
+ conn->ts_tap_act = *now;
switch (conn->state) {
case SOCK_SYN_SENT:
if (!th->syn || !th->ack) {
tcp_rst(c, conn);
- return 1;
+ return count;
}
tcp_clamp_window(conn, th, len, 0, 1);
@@ -2198,17 +2233,6 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
conn->mss_guest);
}
- ws = tcp_opt_get(th, len, OPT_WS, NULL, NULL);
- if (ws > MAX_WS) {
- if (tcp_send_to_tap(c, conn, RST, NULL, 0))
- return 1;
-
- conn->seq_to_tap = 0;
- conn->ws_allowed = 0;
- tcp_send_to_tap(c, conn, SYN, NULL, 0);
- return 1;
- }
-
/* info.tcpi_bytes_acked already includes one byte for SYN, but
* not for incoming connections.
*/
@@ -2222,35 +2246,54 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
* dequeue waiting for SYN,ACK from tap -- check now.
*/
tcp_data_from_sock(c, conn, now);
- tcp_send_to_tap(c, conn, 0, NULL, 0);
+ tcp_send_to_tap(c, conn, 0, now);
tcp_tap_epoll_mask(c, conn, EPOLLIN | EPOLLRDHUP);
break;
case TAP_SYN_RCVD:
if (th->fin) {
+ conn->seq_from_tap++;
+
shutdown(conn->sock, SHUT_WR);
- tcp_send_to_tap(c, conn, ACK, NULL, 0);
+ tcp_send_to_tap(c, conn, ACK, now);
tcp_tap_state(conn, FIN_WAIT_1);
break;
}
if (!th->ack) {
tcp_rst(c, conn);
- return 1;
+ return count;
}
tcp_clamp_window(conn, th, len, 0, 0);
tcp_tap_state(conn, ESTABLISHED);
- break;
+ if (count == 1)
+ break;
+
+ /* Falls through */
case ESTABLISHED:
case ESTABLISHED_SOCK_FIN:
case ESTABLISHED_SOCK_FIN_SENT:
+ tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET);
+ tcp_data_from_tap(c, conn, msg, count, now);
+ return count;
case CLOSE_WAIT:
case FIN_WAIT_1_SOCK_FIN:
case FIN_WAIT_1:
+ if (th->ack) {
+ conn->tap_data_noack = ((struct timespec) { 0, 0 });
+ conn->ts_ack_from_tap = *now;
+ }
+
+ tcp_sock_consume(conn, ntohl(th->ack_seq));
+ if (conn->state == FIN_WAIT_1_SOCK_FIN &&
+ conn->seq_ack_from_tap == conn->seq_to_tap) {
+ tcp_tap_destroy(c, conn);
+ return count;
+ }
+
tcp_tap_epoll_mask(c, conn, conn->events & ~EPOLLET);
- tcp_data_from_tap(c, conn, msg, count, now);
return count;
case TAP_SYN_SENT:
case LAST_ACK:
@@ -2271,8 +2314,10 @@ int tcp_tap_handler(struct ctx *c, int af, void *addr,
* tcp_connect_finish() - Handle completion of connect() from EPOLLOUT event
* @c: Execution context
* @s: File descriptor number for socket
+ * @now: Current timestamp
*/
-static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn)
+static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn,
+ struct timespec *now)
{
socklen_t sl;
int so;
@@ -2283,7 +2328,7 @@ static void tcp_connect_finish(struct ctx *c, struct tcp_tap_conn *conn)
return;
}
- if (tcp_send_to_tap(c, conn, SYN | ACK, NULL, 0))
+ if (tcp_send_to_tap(c, conn, SYN | ACK, now))
return;
/* Drop EPOLLOUT, only used to wait for connect() to complete */
@@ -2308,29 +2353,32 @@ static void tcp_splice_connect_finish(struct ctx *c,
.tcp = { .splice = 1, .v6 = v6,
.index = conn - ts } };
struct epoll_event ev_from, ev_to;
+ int i;
- if (conn->state == SPLICE_CONNECT) {
- socklen_t sl;
- int so;
+ conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1;
+ conn->pipe_from_to[1] = conn->pipe_to_from[1] = -1;
+ for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) {
+ if (splice_pipe_pool[i][0][0] > 0) {
+ SWAP(conn->pipe_from_to[0], splice_pipe_pool[i][0][0]);
+ SWAP(conn->pipe_from_to[1], splice_pipe_pool[i][0][1]);
+
+ SWAP(conn->pipe_to_from[0], splice_pipe_pool[i][1][0]);
+ SWAP(conn->pipe_to_from[1], splice_pipe_pool[i][1][1]);
+ break;
+ }
+ }
- sl = sizeof(so);
- if (getsockopt(conn->to, SOL_SOCKET, SO_ERROR, &so, &sl) ||
- so) {
+ if (conn->pipe_from_to[0] <= 0) {
+ if (pipe2(conn->pipe_to_from, O_NONBLOCK) ||
+ pipe2(conn->pipe_from_to, O_NONBLOCK)) {
tcp_splice_destroy(c, conn);
return;
}
- }
- conn->pipe_from_to[0] = conn->pipe_to_from[0] = -1;
- if (pipe2(conn->pipe_to_from, O_NONBLOCK) ||
- pipe2(conn->pipe_from_to, O_NONBLOCK)) {
- tcp_splice_destroy(c, conn);
- return;
+ fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, c->tcp.pipe_size);
+ fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, c->tcp.pipe_size);
}
- fcntl(conn->pipe_from_to[0], F_SETPIPE_SZ, PIPE_SIZE);
- fcntl(conn->pipe_to_from[0], F_SETPIPE_SZ, PIPE_SIZE);
-
if (conn->state == SPLICE_CONNECT) {
tcp_splice_state(conn, SPLICE_ESTABLISHED);
@@ -2338,7 +2386,7 @@ static void tcp_splice_connect_finish(struct ctx *c,
ev_from.data.u64 = ref_from.u64;
ev_to.data.u64 = ref_to.u64;
- epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->from, &ev_from);
+ epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_from);
epoll_ctl(c->epollfd, EPOLL_CTL_MOD, conn->to, &ev_to);
}
}
@@ -2353,20 +2401,19 @@ static void tcp_splice_connect_finish(struct ctx *c,
* Return: 0 for connect() succeeded or in progress, negative value on error
*/
static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn,
- int v6, in_port_t port)
+ int s, int v6, in_port_t port)
{
- int sock_conn = socket(v6 ? AF_INET6 : AF_INET,
- SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
+ int sock_conn = (s > 0) ? s : socket(v6 ? AF_INET6 : AF_INET,
+ SOCK_STREAM | SOCK_NONBLOCK,
+ IPPROTO_TCP);
union epoll_ref ref_accept = { .proto = IPPROTO_TCP, .s = conn->from,
.tcp = { .splice = 1, .v6 = v6,
.index = conn - ts } };
union epoll_ref ref_conn = { .proto = IPPROTO_TCP, .s = sock_conn,
.tcp = { .splice = 1, .v6 = v6,
.index = conn - ts } };
- struct epoll_event ev_accept = { .events = EPOLLET,
- .data.u64 = ref_accept.u64 };
- struct epoll_event ev_conn = { .events = EPOLLET,
- .data.u64 = ref_conn.u64 };
+ struct epoll_event ev_accept = { .data.u64 = ref_accept.u64 };
+ struct epoll_event ev_conn = { .data.u64 = ref_conn.u64 };
struct sockaddr_in6 addr6 = {
.sin6_family = AF_INET6,
.sin6_port = htons(port),
@@ -2381,11 +2428,11 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn,
socklen_t sl;
int ret;
- if (sock_conn < 0)
- return -errno;
-
conn->to = sock_conn;
+ if (s <= 0)
+ tcp_sock_set_bufsize(sock_conn);
+
if (v6) {
sa = (struct sockaddr *)&addr6;
sl = sizeof(addr6);
@@ -2402,16 +2449,17 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn,
}
tcp_splice_state(conn, SPLICE_CONNECT);
- ev_conn.events |= EPOLLOUT;
+ ev_conn.events = EPOLLOUT;
} else {
tcp_splice_state(conn, SPLICE_ESTABLISHED);
tcp_splice_connect_finish(c, conn, v6);
- ev_conn.events |= EPOLLIN;
- ev_accept.events |= EPOLLIN;
+ ev_accept.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+ ev_conn.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP;
+
+ epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept);
}
- epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->from, &ev_accept);
epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->to, &ev_conn);
return 0;
@@ -2445,7 +2493,7 @@ static int tcp_splice_connect_ns(void *arg)
a = (struct tcp_splice_connect_ns_arg *)arg;
ns_enter(a->c->pasta_pid);
- a->ret = tcp_splice_connect(a->c, a->conn, a->v6, a->port);
+ a->ret = tcp_splice_connect(a->c, a->conn, -1, a->v6, a->port);
return 0;
}
@@ -2462,13 +2510,26 @@ static int tcp_splice_new(struct ctx *c, struct tcp_splice_conn *conn,
int v6, in_port_t port)
{
struct tcp_splice_connect_ns_arg ns_arg = { c, conn, v6, port, 0 };
+ int *sock_pool_p, i, s = -1;
- if (bitmap_isset(c->tcp.port_to_tap, port)) {
+ if (bitmap_isset(c->tcp.port_to_tap, port))
+ sock_pool_p = v6 ? ns_sock_pool6 : ns_sock_pool4;
+ else
+ sock_pool_p = v6 ? init_sock_pool6 : init_sock_pool4;
+
+ for (i = 0; i < TCP_SOCK_POOL_SIZE; i++, sock_pool_p++) {
+ if ((s = *sock_pool_p) > 0) {
+ *sock_pool_p = -1;
+ break;
+ }
+ }
+
+ if (s <= 0 && bitmap_isset(c->tcp.port_to_tap, port)) {
NS_CALL(tcp_splice_connect_ns, &ns_arg);
return ns_arg.ret;
}
- return tcp_splice_connect(c, conn, v6, port);
+ return tcp_splice_connect(c, conn, s, v6, port);
}
/**
@@ -2500,10 +2561,6 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref,
ref_conn.tcp.index = conn - tt;
ref_conn.s = conn->sock = s;
- sl = sizeof(conn->sndbuf);
- if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->sndbuf, &sl))
- conn->sndbuf = USHRT_MAX;
-
if (ref.tcp.v6) {
struct sockaddr_in6 *sa6 = (struct sockaddr_in6 *)&sa;
@@ -2551,19 +2608,24 @@ static void tcp_conn_from_sock(struct ctx *c, union epoll_ref ref,
conn->seq_ack_from_tap = conn->seq_to_tap + 1;
- conn->tap_window = WINDOW_DEFAULT;
- conn->ws_allowed = 1;
+ conn->wnd_from_tap = WINDOW_DEFAULT;
- conn->ts_sock = conn->ts_tap = conn->ts_ack_tap = *now;
+ conn->ts_sock_act = conn->ts_tap_act = *now;
+ conn->ts_ack_from_tap = conn->ts_ack_to_tap = *now;
- bitmap_set(tcp_act, conn - tt);
+ tcp_send_to_tap(c, conn, SYN, now);
conn->events = ev.events = EPOLLRDHUP;
ev.data.u64 = ref_conn.u64;
epoll_ctl(c->epollfd, EPOLL_CTL_ADD, conn->sock, &ev);
tcp_tap_state(conn, SOCK_SYN_SENT);
- tcp_send_to_tap(c, conn, SYN, NULL, 0);
+
+ sl = sizeof(conn->snd_buf);
+ if (getsockopt(s, SOL_SOCKET, SO_SNDBUF, &conn->snd_buf, &sl))
+ conn->snd_buf = WINDOW_DEFAULT;
+ else
+ conn->snd_buf /= 2;
}
/**
@@ -2576,11 +2638,13 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
uint32_t events)
{
int move_from, move_to, *pipes, eof, never_read;
+ uint8_t *rcvlowat_set, *rcvlowat_act;
+ uint64_t *seq_read, *seq_write;
struct tcp_splice_conn *conn;
struct epoll_event ev;
if (ref.tcp.listen) {
- int s;
+ int s, one = 1;
if (c->tcp.splice_conn_count >= MAX_SPLICE_CONNS)
return;
@@ -2588,6 +2652,8 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
if ((s = accept4(ref.s, NULL, NULL, SOCK_NONBLOCK)) < 0)
return;
+ setsockopt(s, SOL_TCP, TCP_QUICKACK, &one, sizeof(one));
+
conn = &ts[c->tcp.splice_conn_count++];
conn->from = s;
tcp_splice_state(conn, SPLICE_ACCEPTED);
@@ -2614,8 +2680,7 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
if (conn->state == SPLICE_CONNECT)
tcp_splice_connect_finish(c, conn, ref.tcp.v6);
-
- if (conn->state == SPLICE_ESTABLISHED)
+ else if (conn->state == SPLICE_ESTABLISHED)
epoll_ctl(c->epollfd, EPOLL_CTL_MOD, ref.s, &ev);
move_to = ref.s;
@@ -2655,12 +2720,25 @@ swap:
eof = 0;
never_read = 1;
+ if (move_from == conn->from) {
+ seq_read = &conn->from_read;
+ seq_write = &conn->from_written;
+ rcvlowat_set = splice_rcvlowat_set[0];
+ rcvlowat_act = splice_rcvlowat_act[0];
+ } else {
+ seq_read = &conn->to_read;
+ seq_write = &conn->to_written;
+ rcvlowat_set = splice_rcvlowat_set[1];
+ rcvlowat_act = splice_rcvlowat_act[1];
+ }
+
+
while (1) {
- int retry_write = 1;
+ int retry_write = 0, more = 0;
ssize_t read, to_write = 0, written;
retry:
- read = splice(move_from, NULL, pipes[1], NULL, PIPE_SIZE,
+ read = splice(move_from, NULL, pipes[1], NULL, c->tcp.pipe_size,
SPLICE_F_MOVE);
if (read < 0) {
if (errno == EINTR)
@@ -2669,35 +2747,46 @@ retry:
if (errno != EAGAIN)
goto close;
- to_write = PIPE_SIZE;
+ to_write = c->tcp.pipe_size;
} else if (!read) {
eof = 1;
- to_write = PIPE_SIZE;
+ to_write = c->tcp.pipe_size;
} else {
never_read = 0;
to_write += read;
- if (move_from == conn->from)
- conn->from_read += read;
- else
- conn->to_read += read;
+ if (read >= (long)c->tcp.pipe_size * 90 / 100)
+ more = SPLICE_F_MORE;
+
+ if (bitmap_isset(rcvlowat_set, conn - ts))
+ bitmap_set(rcvlowat_act, conn - ts);
}
eintr:
written = splice(pipes[0], NULL, move_to, NULL, to_write,
- SPLICE_F_MOVE);
+ SPLICE_F_MOVE | more);
- if (written > 0) {
- if (move_from == conn->from) {
- conn->from_written += written;
- if (conn->from_read == conn->from_written)
- break;
- } else {
- conn->to_written += written;
- if (conn->to_read == conn->to_written)
- break;
+ /* Most common case: skip updating counters. */
+ if (read > 0 && read == written) {
+ if (read >= (long)c->tcp.pipe_size * 10 / 100)
+ continue;
+
+ if (!bitmap_isset(rcvlowat_set, conn - ts) &&
+ read > (long)c->tcp.pipe_size / 10) {
+ int lowat = c->tcp.pipe_size / 4;
+
+ setsockopt(move_from, SOL_SOCKET, SO_RCVLOWAT,
+ &lowat, sizeof(lowat));
+
+ bitmap_set(rcvlowat_set, conn - ts);
+ bitmap_set(rcvlowat_act, conn - ts);
}
+
+ break;
}
+ *seq_read += read > 0 ? read : 0;
+ *seq_write += written > 0 ? written : 0;
+
if (written < 0) {
if (errno == EINTR)
goto eintr;
@@ -2716,9 +2805,9 @@ eintr:
ev.data.u64 = ref.u64,
epoll_ctl(c->epollfd, EPOLL_CTL_MOD, move_to, &ev);
break;
- } else if (never_read && written == PIPE_SIZE) {
+ } else if (never_read && written == (long)(c->tcp.pipe_size)) {
goto retry;
- } else if (!never_read &&written < to_write) {
+ } else if (!never_read && written < to_write) {
to_write -= written;
goto retry;
}
@@ -2727,11 +2816,10 @@ eintr:
break;
}
- if (conn->state == SPLICE_FIN_BOTH ||
- (conn->state == SPLICE_FIN_FROM && move_from == conn->from) ||
- (conn->state == SPLICE_FIN_TO && move_from == conn->to)) {
+ if (*seq_read == *seq_write) {
if (move_from == conn->from &&
- conn->from_read == conn->from_written) {
+ (conn->state == SPLICE_FIN_FROM ||
+ conn->state == SPLICE_FIN_BOTH)) {
if (!conn->from_fin_sent) {
shutdown(conn->to, SHUT_WR);
conn->from_fin_sent = 1;
@@ -2746,7 +2834,8 @@ eintr:
if (conn->to_fin_sent)
goto close;
} else if (move_from == conn->to &&
- conn->to_read == conn->to_written) {
+ (conn->state == SPLICE_FIN_TO ||
+ conn->state == SPLICE_FIN_BOTH)) {
if (!conn->to_fin_sent) {
shutdown(conn->from, SHUT_WR);
conn->to_fin_sent = 1;
@@ -2778,7 +2867,9 @@ eintr:
return;
close:
- tcp_splice_destroy(c, conn);
+ epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->from, NULL);
+ epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->to, NULL);
+ conn->state = CLOSED;
return;
}
@@ -2806,39 +2897,42 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events,
conn = &tt[ref.tcp.index];
+ conn->ts_sock_act = *now;
+
if (events & EPOLLERR) {
if (conn->state != CLOSED)
tcp_rst(c, conn);
+
return;
}
switch (conn->state) {
case TAP_SYN_SENT:
if (events & EPOLLOUT)
- tcp_connect_finish(c, conn);
+ tcp_connect_finish(c, conn, now);
else
tcp_rst(c, conn);
return;
case ESTABLISHED_SOCK_FIN:
case ESTABLISHED_SOCK_FIN_SENT:
case ESTABLISHED:
- tcp_data_from_sock(c, conn, now);
if (events & EPOLLRDHUP) {
if (conn->state == ESTABLISHED)
tcp_tap_state(conn, ESTABLISHED_SOCK_FIN);
- tcp_data_from_sock(c, conn, now);
}
+ tcp_data_from_sock(c, conn, now);
return;
case LAST_ACK:
- tcp_send_to_tap(c, conn, 0, NULL, 0);
- if (conn->seq_ack_to_tap == conn->seq_from_tap + 1)
+ tcp_send_to_tap(c, conn, 0, now);
+ if (conn->seq_ack_to_tap == conn->seq_from_tap + 1 ||
+ conn->seq_ack_to_tap == conn->seq_from_tap)
tcp_tap_destroy(c, conn);
return;
case FIN_WAIT_1:
if (events & EPOLLIN)
tcp_data_from_sock(c, conn, now);
if (events & EPOLLRDHUP) {
- tcp_send_to_tap(c, conn, FIN | ACK, NULL, 0);
+ tcp_send_to_tap(c, conn, FIN | ACK, now);
tcp_tap_state(conn, FIN_WAIT_1_SOCK_FIN);
}
return;
@@ -2847,11 +2941,13 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events,
if (events & EPOLLIN)
tcp_data_from_sock(c, conn, now);
if (events & EPOLLHUP) {
- if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1) &&
- (conn->seq_ack_from_tap == conn->seq_to_tap)) {
+ if ((conn->seq_ack_to_tap == conn->seq_from_tap + 1 ||
+ conn->seq_ack_to_tap == conn->seq_from_tap) &&
+ (conn->seq_ack_from_tap == conn->seq_to_tap - 1 ||
+ conn->seq_ack_from_tap == conn->seq_to_tap)) {
tcp_tap_destroy(c, conn);
} else {
- tcp_send_to_tap(c, conn, 0, NULL, 0);
+ tcp_send_to_tap(c, conn, ACK, now);
}
}
return;
@@ -2869,6 +2965,43 @@ void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events,
}
/**
+ * tcp_set_pipe_size() - Set usable pipe size, probe starting from MAX_PIPE_SIZE
+ * @c: Execution context
+ */
+static void tcp_set_pipe_size(struct ctx *c)
+{
+ int probe_pipe[TCP_SPLICE_PIPE_POOL_SIZE * 2][2], i, j;
+
+ c->tcp.pipe_size = MAX_PIPE_SIZE;
+
+smaller:
+ for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE * 2; i++) {
+ if (pipe(probe_pipe[i])) {
+ i++;
+ break;
+ }
+
+ if (fcntl(probe_pipe[i][0], F_SETPIPE_SZ, c->tcp.pipe_size) < 0)
+ break;
+ }
+
+ for (j = i - 1; j >= 0; j--) {
+ close(probe_pipe[j][0]);
+ close(probe_pipe[j][1]);
+ }
+
+ if (i == TCP_SPLICE_PIPE_POOL_SIZE * 2)
+ return;
+
+ if (!(c->tcp.pipe_size /= 2)) {
+ c->tcp.pipe_size = MAX_PIPE_SIZE;
+ return;
+ }
+
+ goto smaller;
+}
+
+/**
* tcp_sock_init_ns() - Bind sockets in namespace for inbound connections
* @arg: Execution context
*
@@ -2879,6 +3012,7 @@ static int tcp_sock_init_ns(void *arg)
union tcp_epoll_ref tref = { .listen = 1, .splice = 1 };
struct ctx *c = (struct ctx *)arg;
in_port_t port;
+ int s;
ns_enter(c->pasta_pid);
@@ -2890,14 +3024,16 @@ static int tcp_sock_init_ns(void *arg)
if (c->v4) {
tref.v6 = 0;
- sock_l4(c, AF_INET, IPPROTO_TCP, port, BIND_LOOPBACK,
- tref.u32);
+ s = sock_l4(c, AF_INET, IPPROTO_TCP, port,
+ BIND_LOOPBACK, tref.u32);
+ tcp_sock_set_bufsize(s);
}
if (c->v6) {
tref.v6 = 1;
- sock_l4(c, AF_INET6, IPPROTO_TCP, port, BIND_LOOPBACK,
- tref.u32);
+ s = sock_l4(c, AF_INET6, IPPROTO_TCP, port,
+ BIND_LOOPBACK, tref.u32);
+ tcp_sock_set_bufsize(s);
}
}
@@ -2905,15 +3041,96 @@ static int tcp_sock_init_ns(void *arg)
}
/**
+ * tcp_splice_pipe_refill() - Refill pool of pre-opened pipes
+ * @c: Execution context
+ */
+static void tcp_splice_pipe_refill(struct ctx *c)
+{
+ int i;
+
+ for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) {
+ if (splice_pipe_pool[i][0][0] > 0)
+ break;
+ if (pipe2(splice_pipe_pool[i][0], O_NONBLOCK))
+ continue;
+ if (pipe2(splice_pipe_pool[i][1], O_NONBLOCK)) {
+ close(splice_pipe_pool[i][1][0]);
+ close(splice_pipe_pool[i][1][1]);
+ continue;
+ }
+
+ fcntl(splice_pipe_pool[i][0][0], F_SETPIPE_SZ,
+ c->tcp.pipe_size);
+ fcntl(splice_pipe_pool[i][1][0], F_SETPIPE_SZ,
+ c->tcp.pipe_size);
+ }
+}
+
+/**
+ * struct tcp_sock_refill_arg - Arguments for tcp_sock_refill()
+ * @c: Execution context
+ * @ns: Set to refill pool of sockets created in namespace
+ */
+struct tcp_sock_refill_arg {
+ struct ctx *c;
+ int ns;
+};
+
+/**
+ * tcp_sock_refill() - Refill pool of pre-opened sockets
+ * @arg: See @tcp_sock_refill_arg
+ *
+ * Return: 0
+ */
+static int tcp_sock_refill(void *arg)
+{
+ struct tcp_sock_refill_arg *a = (struct tcp_sock_refill_arg *)arg;
+ int i, *p4, *p6, one = 1;
+
+ if (a->ns) {
+ if (ns_enter(a->c->pasta_pid))
+ return 0;
+ p4 = ns_sock_pool4;
+ p6 = ns_sock_pool6;
+ } else {
+ p4 = init_sock_pool4;
+ p6 = init_sock_pool6;
+ }
+
+ for (i = 0; a->c->v4 && i < TCP_SOCK_POOL_SIZE; i++, p4++) {
+ if (*p4 > 0) {
+ break;
+ }
+ *p4 = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
+ setsockopt(*p4, SOL_TCP, TCP_QUICKACK, &one, sizeof(one));
+ tcp_sock_set_bufsize(*p4);
+ }
+
+ for (i = 0; a->c->v6 && i < TCP_SOCK_POOL_SIZE; i++, p6++) {
+ if (*p6 > 0) {
+ break;
+ }
+ *p6 = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK,
+ IPPROTO_TCP);
+ setsockopt(*p6, SOL_TCP, TCP_QUICKACK, &one, sizeof(one));
+ tcp_sock_set_bufsize(*p6);
+ }
+
+ return 0;
+}
+
+/**
* tcp_sock_init() - Bind sockets for inbound connections, get key for sequence
* @c: Execution context
*
* Return: 0 on success, -1 on failure
*/
-int tcp_sock_init(struct ctx *c)
+int tcp_sock_init(struct ctx *c, struct timespec *now)
{
+ struct tcp_sock_refill_arg refill_arg = { c, 0 };
union tcp_epoll_ref tref = { .listen = 1 };
in_port_t port;
+ int s;
getrandom(&c->tcp.hash_secret, sizeof(c->tcp.hash_secret), GRND_RANDOM);
@@ -2926,14 +3143,16 @@ int tcp_sock_init(struct ctx *c)
tref.v6 = 0;
tref.splice = 0;
- sock_l4(c, AF_INET, IPPROTO_TCP, port,
- c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY,
- tref.u32);
+ s = sock_l4(c, AF_INET, IPPROTO_TCP, port,
+ c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY,
+ tref.u32);
+ tcp_sock_set_bufsize(s);
if (c->mode == MODE_PASTA) {
tref.splice = 1;
- sock_l4(c, AF_INET, IPPROTO_TCP, port,
- BIND_LOOPBACK, tref.u32);
+ s = sock_l4(c, AF_INET, IPPROTO_TCP, port,
+ BIND_LOOPBACK, tref.u32);
+ tcp_sock_set_bufsize(s);
}
}
@@ -2941,14 +3160,16 @@ int tcp_sock_init(struct ctx *c)
tref.v6 = 1;
tref.splice = 0;
- sock_l4(c, AF_INET6, IPPROTO_TCP, port,
- c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY,
- tref.u32);
+ s = sock_l4(c, AF_INET6, IPPROTO_TCP, port,
+ c->mode == MODE_PASTA ? BIND_EXT : BIND_ANY,
+ tref.u32);
+ tcp_sock_set_bufsize(s);
if (c->mode == MODE_PASTA) {
tref.splice = 1;
- sock_l4(c, AF_INET6, IPPROTO_TCP, port,
- BIND_LOOPBACK, tref.u32);
+ s = sock_l4(c, AF_INET6, IPPROTO_TCP, port,
+ BIND_LOOPBACK, tref.u32);
+ tcp_sock_set_bufsize(s);
}
}
}
@@ -2959,9 +3180,18 @@ int tcp_sock_init(struct ctx *c)
if (c->v6)
tcp_sock6_iov_init();
- if (c->mode == MODE_PASTA)
+ c->tcp.refill_ts = *now;
+ tcp_sock_refill(&refill_arg);
+
+ if (c->mode == MODE_PASTA) {
+ tcp_set_pipe_size(c);
NS_CALL(tcp_sock_init_ns, c);
+ refill_arg.ns = 1;
+ NS_CALL(tcp_sock_refill, &refill_arg);
+ tcp_splice_pipe_refill(c);
+ }
+
return 0;
}
@@ -2974,72 +3204,72 @@ int tcp_sock_init(struct ctx *c)
static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn,
struct timespec *ts)
{
- int ack_tap_ms = timespec_diff_ms(ts, &conn->ts_ack_tap);
- int sock_ms = timespec_diff_ms(ts, &conn->ts_sock);
- int tap_ms = timespec_diff_ms(ts, &conn->ts_tap);
+ int ack_from_tap = timespec_diff_ms(ts, &conn->ts_ack_from_tap);
+ int ack_to_tap = timespec_diff_ms(ts, &conn->ts_ack_to_tap);
+ int sock_act = timespec_diff_ms(ts, &conn->ts_sock_act);
+ int tap_act = timespec_diff_ms(ts, &conn->ts_tap_act);
+ int tap_data_noack;
+
+ if (memcmp(&conn->tap_data_noack, &((struct timespec){ 0, 0 }),
+ sizeof(struct timespec)))
+ tap_data_noack = 0;
+ else
+ tap_data_noack = timespec_diff_ms(ts, &conn->tap_data_noack);
switch (conn->state) {
+ case CLOSED:
+ tcp_hash_remove(conn);
+ tcp_table_tap_compact(c, conn);
+ break;
case SOCK_SYN_SENT:
case TAP_SYN_RCVD:
- if (ack_tap_ms > SYN_TIMEOUT)
+ if (ack_from_tap > SYN_TIMEOUT)
tcp_rst(c, conn);
break;
case ESTABLISHED_SOCK_FIN_SENT:
- if (ack_tap_ms > FIN_TIMEOUT) {
+ if (tap_data_noack > FIN_TIMEOUT) {
tcp_rst(c, conn);
break;
}
/* Falls through */
case ESTABLISHED:
case ESTABLISHED_SOCK_FIN:
- if (tap_ms > ACT_TIMEOUT && sock_ms > ACT_TIMEOUT) {
+ if (tap_act > ACT_TIMEOUT && sock_act > ACT_TIMEOUT) {
tcp_rst(c, conn);
break;
}
- if (conn->seq_to_tap == conn->seq_ack_from_tap &&
- conn->seq_from_tap == conn->seq_ack_to_tap) {
- conn->ts_sock = *ts;
- break;
- }
-
- if (sock_ms > ACK_INTERVAL) {
- if (conn->seq_from_tap > conn->seq_ack_to_tap)
- tcp_send_to_tap(c, conn, 0, NULL, 0);
- }
+ if (!conn->wnd_to_tap)
+ tcp_send_to_tap(c, conn, UPDATE_WINDOW, ts);
+ else if (ack_to_tap > ACK_INTERVAL)
+ tcp_send_to_tap(c, conn, 0, ts);
- if (sock_ms - ack_tap_ms > ACK_TIMEOUT) {
+ if (tap_data_noack > ACK_TIMEOUT) {
if (conn->seq_ack_from_tap < conn->seq_to_tap) {
- if (sock_ms - ack_tap_ms > 10 * ACK_TIMEOUT) {
+ if (tap_data_noack > LAST_ACK_TIMEOUT) {
tcp_rst(c, conn);
break;
}
conn->seq_to_tap = conn->seq_ack_from_tap;
- if (sock_ms > ACK_TIMEOUT)
- tcp_data_from_sock(c, conn, ts);
+ tcp_data_from_sock(c, conn, ts);
}
}
-
- if (conn->seq_from_tap == conn->seq_ack_to_tap)
- conn->ts_sock = *ts;
-
- if (!conn->tcpi_snd_wnd)
- tcp_send_to_tap(c, conn, 0, NULL, 0);
-
break;
case CLOSE_WAIT:
case FIN_WAIT_1_SOCK_FIN:
- if (sock_ms - ack_tap_ms > FIN_TIMEOUT)
+ if (tap_data_noack > FIN_TIMEOUT)
tcp_rst(c, conn);
break;
case FIN_WAIT_1:
- if (sock_ms > FIN_TIMEOUT)
+ if (sock_act > FIN_TIMEOUT)
tcp_rst(c, conn);
break;
case LAST_ACK:
- if (sock_ms > LAST_ACK_TIMEOUT)
+ if (sock_act > LAST_ACK_TIMEOUT)
+ tcp_rst(c, conn);
+ else if (tap_act > LAST_ACK_TIMEOUT)
tcp_rst(c, conn);
break;
case TAP_SYN_SENT:
@@ -3049,7 +3279,6 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn,
case SPLICE_FIN_FROM:
case SPLICE_FIN_TO:
case SPLICE_FIN_BOTH:
- case CLOSED:
break;
}
}
@@ -3059,19 +3288,53 @@ static void tcp_timer_one(struct ctx *c, struct tcp_tap_conn *conn,
* @c: Execution context
* @ts: Timestamp from caller
*/
-void tcp_timer(struct ctx *c, struct timespec *ts)
+void tcp_timer(struct ctx *c, struct timespec *now)
{
- long *word = (long *)tcp_act, tmp;
- unsigned int i;
- int n;
+ struct tcp_sock_refill_arg refill_arg = { c, 0 };
+ int i;
+
+ if (timespec_diff_ms(now, &c->tcp.refill_ts) > REFILL_INTERVAL) {
+ tcp_sock_refill(&refill_arg);
+ if (c->mode == MODE_PASTA) {
+ refill_arg.ns = 1;
+ if ((c->v4 && ns_sock_pool4[TCP_SOCK_POOL_TSH] <= 0) ||
+ (c->v6 && ns_sock_pool6[TCP_SOCK_POOL_TSH] <= 0))
+ NS_CALL(tcp_sock_refill, &refill_arg);
+
+ tcp_splice_pipe_refill(c);
+ }
+ }
- for (i = 0; i < sizeof(tcp_act) / sizeof(long); i++, word++) {
- tmp = *word;
- while ((n = ffsl(tmp))) {
- int index = i * sizeof(long) * 8 + n - 1;
+ for (i = c->tcp.tap_conn_count - 1; i >= 0; i--)
+ tcp_timer_one(c, tt + i, now);
+
+ if (c->mode == MODE_PASTA) {
+ for (i = c->tcp.splice_conn_count - 1; i >= 0; i--) {
+ if ((ts + i)->state == CLOSED) {
+ tcp_splice_destroy(c, ts + i);
+ continue;
+ }
+
+ if (bitmap_isset(splice_rcvlowat_set[0], i) &&
+ !bitmap_isset(splice_rcvlowat_act[0], i)) {
+ int lowat = 1;
+
+ setsockopt((ts + i)->from, SOL_SOCKET,
+ SO_RCVLOWAT, &lowat, sizeof(lowat));
+ bitmap_clear(splice_rcvlowat_set[0], i);
+ }
+
+ if (bitmap_isset(splice_rcvlowat_set[1], i) &&
+ !bitmap_isset(splice_rcvlowat_act[1], i)) {
+ int lowat = 1;
+
+ setsockopt((ts + i)->to, SOL_SOCKET,
+ SO_RCVLOWAT, &lowat, sizeof(lowat));
+ bitmap_clear(splice_rcvlowat_set[1], i);
+ }
- tmp &= ~(1UL << (n - 1));
- tcp_timer_one(c, &tt[index], ts);
+ bitmap_clear(splice_rcvlowat_act[0], i);
+ bitmap_clear(splice_rcvlowat_act[1], i);
}
}
}
diff --git a/tcp.h b/tcp.h
index 359414c..ae983ed 100644
--- a/tcp.h
+++ b/tcp.h
@@ -11,8 +11,8 @@ struct ctx;
void tcp_sock_handler(struct ctx *c, union epoll_ref ref, uint32_t events,
struct timespec *now);
int tcp_tap_handler(struct ctx *c, int af, void *addr,
- struct tap_msg *msg, int count, struct timespec *now);
-int tcp_sock_init(struct ctx *c);
+ struct tap_l4_msg *msg, int count, struct timespec *now);
+int tcp_sock_init(struct ctx *c, struct timespec *now);
void tcp_timer(struct ctx *c, struct timespec *ts);
void tcp_update_l2_buf(unsigned char *eth_d, unsigned char *eth_s,
uint32_t *ip_da);
@@ -45,6 +45,9 @@ union tcp_epoll_ref {
* @port_to_tap: Ports bound host-side, packets to tap or spliced
* @port_to_init: Ports bound namespace-side, spliced to init
* @timer_run: Timestamp of most recent timer run
+ * @kernel_snd_wnd: Kernel reports sending window (with commit 8f7baad7f035)
+ * @pipe_size: Size of pipes for spliced connections
+ * @refill_ts: Time of last refill operation for pools of sockets/pipes
*/
struct tcp_ctx {
uint64_t hash_secret[2];
@@ -53,6 +56,9 @@ struct tcp_ctx {
uint8_t port_to_tap [USHRT_MAX / 8];
uint8_t port_to_init [USHRT_MAX / 8];
struct timespec timer_run;
+ int kernel_snd_wnd;
+ size_t pipe_size;
+ struct timespec refill_ts;
};
#endif /* TCP_H */