aboutgitcodebugslistschat
path: root/tcp_splice.c
diff options
context:
space:
mode:
Diffstat (limited to 'tcp_splice.c')
-rw-r--r--tcp_splice.c216
1 files changed, 111 insertions, 105 deletions
diff --git a/tcp_splice.c b/tcp_splice.c
index bcafd33..0095740 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -51,7 +51,7 @@
#define MAX_PIPE_SIZE (2UL * 1024 * 1024)
#define TCP_SPLICE_MAX_CONNS (128 * 1024)
#define TCP_SPLICE_PIPE_POOL_SIZE 16
-#define REFILL_INTERVAL 1000 /* ms, refill pool of pipes */
+#define TCP_SPLICE_CONN_PRESSURE 30 /* % of splice_conn_count */
#define TCP_SPLICE_FILE_PRESSURE 30 /* % of c->nofile */
/* From tcp.c */
@@ -83,24 +83,24 @@ struct tcp_splice_conn {
int pipe_b_a[2];
uint8_t events;
-#define SPLICE_CLOSED 0
-#define SPLICE_CONNECT BIT(0)
-#define SPLICE_ESTABLISHED BIT(1)
-#define SPLICE_A_OUT_WAIT BIT(2)
-#define SPLICE_B_OUT_WAIT BIT(3)
-#define SPLICE_A_FIN_RCVD BIT(4)
-#define SPLICE_B_FIN_RCVD BIT(5)
-#define SPLICE_A_FIN_SENT BIT(6)
-#define SPLICE_B_FIN_SENT BIT(7)
+#define CLOSED 0
+#define CONNECT BIT(0)
+#define ESTABLISHED BIT(1)
+#define A_OUT_WAIT BIT(2)
+#define B_OUT_WAIT BIT(3)
+#define A_FIN_RCVD BIT(4)
+#define B_FIN_RCVD BIT(5)
+#define A_FIN_SENT BIT(6)
+#define B_FIN_SENT BIT(7)
uint8_t flags;
-#define SPLICE_V6 BIT(0)
-#define SPLICE_IN_EPOLL BIT(1)
-#define SPLICE_RCVLOWAT_SET_A BIT(2)
-#define SPLICE_RCVLOWAT_SET_B BIT(3)
-#define SPLICE_RCVLOWAT_ACT_A BIT(4)
-#define SPLICE_RCVLOWAT_ACT_B BIT(5)
-#define SPLICE_CLOSING BIT(6)
+#define SOCK_V6 BIT(0)
+#define IN_EPOLL BIT(1)
+#define RCVLOWAT_SET_A BIT(2)
+#define RCVLOWAT_SET_B BIT(3)
+#define RCVLOWAT_ACT_A BIT(4)
+#define RCVLOWAT_ACT_B BIT(5)
+#define CLOSING BIT(6)
uint64_t a_read;
uint64_t a_written;
@@ -108,7 +108,7 @@ struct tcp_splice_conn {
uint64_t b_written;
};
-#define CONN_V6(x) (x->flags & SPLICE_V6)
+#define CONN_V6(x) (x->flags & SOCK_V6)
#define CONN_V4(x) (!CONN_V6(x))
#define CONN_HAS(conn, set) ((conn->events & (set)) == (set))
#define CONN(index) (tc + (index))
@@ -118,15 +118,13 @@ static struct tcp_splice_conn tc[TCP_SPLICE_MAX_CONNS];
/* Display strings for connection events */
static const char *tcp_splice_event_str[] __attribute((__unused__)) = {
- "SPLICE_CONNECT", "SPLICE_ESTABLISHED",
- "SPLICE_A_OUT_WAIT", "SPLICE_B_OUT_WAIT",
- "SPLICE_A_FIN_RCVD", "SPLICE_B_FIN_RCVD",
- "SPLICE_A_FIN_SENT", "SPLICE_B_FIN_SENT",
+ "CONNECT", "ESTABLISHED", "A_OUT_WAIT", "B_OUT_WAIT",
+ "A_FIN_RCVD", "B_FIN_RCVD", "A_FIN_SENT", "B_FIN_SENT",
};
/* Display strings for connection flags */
static const char *tcp_splice_flag_str[] __attribute((__unused__)) = {
- "V6", "IN_EPOLL", "RCVLOWAT_SET_A", "RCVLOWAT_SET_B",
+ "SOCK_V6", "IN_EPOLL", "RCVLOWAT_SET_A", "RCVLOWAT_SET_B",
"RCVLOWAT_ACT_A", "RCVLOWAT_ACT_B", "CLOSING",
};
@@ -141,23 +139,27 @@ static void tcp_splice_conn_epoll_events(uint16_t events,
{
*a = *b = 0;
- if (events & SPLICE_CLOSED)
+ if (events & CLOSED)
return;
- if (events & SPLICE_ESTABLISHED)
- *a = *b = EPOLLIN | EPOLLRDHUP;
- else if (events & SPLICE_CONNECT)
+ if (events & ESTABLISHED) {
+ if (!(events & B_FIN_SENT))
+ *a = EPOLLIN | EPOLLRDHUP;
+ if (!(events & A_FIN_SENT))
+ *b = EPOLLIN | EPOLLRDHUP;
+ } else if (events & CONNECT) {
*b = EPOLLOUT;
+ }
- *a |= (events & SPLICE_A_OUT_WAIT) ? EPOLLOUT : 0;
- *b |= (events & SPLICE_B_OUT_WAIT) ? EPOLLOUT : 0;
+ *a |= (events & A_OUT_WAIT) ? EPOLLOUT : 0;
+ *b |= (events & B_OUT_WAIT) ? EPOLLOUT : 0;
}
static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn);
static int tcp_splice_epoll_ctl(struct ctx *c, struct tcp_splice_conn *conn);
/**
- * conn_flag_do() - Set/unset given flag, log, update epoll on SPLICE_CLOSING
+ * conn_flag_do() - Set/unset given flag, log, update epoll on CLOSING flag
* @c: Execution context
* @conn: Connection pointer
* @flag: Flag to set, or ~flag to unset
@@ -181,7 +183,7 @@ static void conn_flag_do(struct ctx *c, struct tcp_splice_conn *conn,
tcp_splice_flag_str[fls(flag)]);
}
- if (flag == SPLICE_CLOSING)
+ if (flag == CLOSING)
tcp_splice_epoll_ctl(c, conn);
}
@@ -201,7 +203,7 @@ static void conn_flag_do(struct ctx *c, struct tcp_splice_conn *conn,
*/
static int tcp_splice_epoll_ctl(struct ctx *c, struct tcp_splice_conn *conn)
{
- int m = (conn->flags & SPLICE_IN_EPOLL) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
+ int m = (conn->flags & IN_EPOLL) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
union epoll_ref ref_a = { .r.proto = IPPROTO_TCP, .r.s = conn->a,
.r.p.tcp.tcp.splice = 1,
.r.p.tcp.tcp.index = conn - tc,
@@ -214,15 +216,8 @@ static int tcp_splice_epoll_ctl(struct ctx *c, struct tcp_splice_conn *conn)
struct epoll_event ev_b = { .data.u64 = ref_b.u64 };
uint32_t events_a, events_b;
- if (conn->flags & SPLICE_CLOSING) {
- if (conn->flags & SPLICE_IN_EPOLL)
- epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->a, &ev_a);
-
- if (conn->events & SPLICE_CONNECT)
- epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->b, &ev_b);
-
- return 0;
- }
+ if (conn->flags & CLOSING)
+ goto delete;
tcp_splice_conn_epoll_events(conn->events, &events_a, &events_b);
ev_a.events = events_a;
@@ -230,13 +225,13 @@ static int tcp_splice_epoll_ctl(struct ctx *c, struct tcp_splice_conn *conn)
if (epoll_ctl(c->epollfd, m, conn->a, &ev_a) ||
epoll_ctl(c->epollfd, m, conn->b, &ev_b))
- goto err;
+ goto delete;
- conn->flags |= SPLICE_IN_EPOLL; /* No need to log this */
+ conn->flags |= IN_EPOLL; /* No need to log this */
return 0;
-err:
+delete:
epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->a, &ev_a);
epoll_ctl(c->epollfd, EPOLL_CTL_DEL, conn->b, &ev_b);
return -errno;
@@ -251,12 +246,6 @@ err:
static void conn_event_do(struct ctx *c, struct tcp_splice_conn *conn,
unsigned long event)
{
- if (event == SPLICE_CLOSED) {
- conn->events = SPLICE_CLOSED;
- debug("TCP (spliced): index %i, CLOSED", conn - tc);
- return;
- }
-
if (event & (event - 1)) {
if (!(conn->events & ~event))
return;
@@ -274,7 +263,7 @@ static void conn_event_do(struct ctx *c, struct tcp_splice_conn *conn,
}
if (tcp_splice_epoll_ctl(c, conn))
- conn_flag(c, conn, SPLICE_CLOSING);
+ conn_flag(c, conn, CLOSING);
}
#define conn_event(c, conn, event) \
@@ -304,22 +293,25 @@ static void tcp_table_splice_compact(struct ctx *c,
memcpy(hole, move, sizeof(*hole));
move->a = move->b = -1;
- move->flags = move->events = 0;
move->a_read = move->a_written = move->b_read = move->b_written = 0;
+ move->pipe_a_b[0] = move->pipe_a_b[1] = -1;
+ move->pipe_b_a[0] = move->pipe_b_a[1] = -1;
+ move->flags = move->events = 0;
debug("TCP (spliced): index %i moved to %i", move - tc, hole - tc);
+ tcp_splice_epoll_ctl(c, hole);
if (tcp_splice_epoll_ctl(c, hole))
- conn_flag(c, hole, SPLICE_CLOSING);
+ conn_flag(c, hole, CLOSING);
}
/**
- * tcp_splice_destroy() - Close spliced connection and pipes, drop from epoll
+ * tcp_splice_destroy() - Close spliced connection and pipes, clear
* @c: Execution context
* @conn: Connection pointer
*/
static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
{
- if (conn->events & SPLICE_ESTABLISHED) {
+ if (conn->events & ESTABLISHED) {
/* Flushing might need to block: don't recycle them. */
if (conn->pipe_a_b[0] != -1) {
close(conn->pipe_a_b[0]);
@@ -333,18 +325,19 @@ static void tcp_splice_destroy(struct ctx *c, struct tcp_splice_conn *conn)
}
}
- if (conn->events & SPLICE_CONNECT) {
+ if (conn->events & CONNECT) {
close(conn->b);
conn->b = -1;
}
- conn_event(c, conn, SPLICE_CLOSED);
-
close(conn->a);
conn->a = -1;
- conn->flags = 0;
conn->a_read = conn->a_written = conn->b_read = conn->b_written = 0;
+ conn->events = CLOSED;
+ conn->flags = 0;
+ debug("TCP (spliced): index %i, CLOSED", conn - tc);
+
tcp_table_splice_compact(c, conn);
}
@@ -364,7 +357,7 @@ static int tcp_splice_connect_finish(struct ctx *c,
conn->pipe_a_b[1] = conn->pipe_b_a[1] = -1;
for (i = 0; i < TCP_SPLICE_PIPE_POOL_SIZE; i++) {
- if (splice_pipe_pool[i][0][0] > 0) {
+ if (splice_pipe_pool[i][0][0] >= 0) {
SWAP(conn->pipe_a_b[0], splice_pipe_pool[i][0][0]);
SWAP(conn->pipe_a_b[1], splice_pipe_pool[i][0][1]);
@@ -377,7 +370,7 @@ static int tcp_splice_connect_finish(struct ctx *c,
if (conn->pipe_a_b[0] < 0) {
if (pipe2(conn->pipe_a_b, O_NONBLOCK) ||
pipe2(conn->pipe_b_a, O_NONBLOCK)) {
- conn_flag(c, conn, SPLICE_CLOSING);
+ conn_flag(c, conn, CLOSING);
return -EIO;
}
@@ -385,8 +378,8 @@ static int tcp_splice_connect_finish(struct ctx *c,
fcntl(conn->pipe_b_a[0], F_SETPIPE_SZ, c->tcp.pipe_size);
}
- if (!(conn->events & SPLICE_ESTABLISHED))
- conn_event(c, conn, SPLICE_ESTABLISHED);
+ if (!(conn->events & ESTABLISHED))
+ conn_event(c, conn, ESTABLISHED);
return 0;
}
@@ -450,9 +443,9 @@ static int tcp_splice_connect(struct ctx *c, struct tcp_splice_conn *conn,
close(sock_conn);
return ret;
}
- conn_event(c, conn, SPLICE_CONNECT);
+ conn_event(c, conn, CONNECT);
} else {
- conn_event(c, conn, SPLICE_ESTABLISHED);
+ conn_event(c, conn, ESTABLISHED);
return tcp_splice_connect_finish(c, conn);
}
@@ -575,20 +568,23 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
conn = CONN(c->tcp.splice_conn_count++);
conn->a = s;
- conn->flags = ref.r.p.tcp.tcp.v6 ? SPLICE_V6 : 0;
+ conn->flags = ref.r.p.tcp.tcp.v6 ? SOCK_V6 : 0;
if (tcp_splice_new(c, conn, ref.r.p.tcp.tcp.index))
- conn_flag(c, conn, SPLICE_CLOSING);
+ conn_flag(c, conn, CLOSING);
return;
}
conn = CONN(ref.r.p.tcp.tcp.index);
- if (events & EPOLLERR || events & EPOLLHUP)
+ if (conn->events == CLOSED)
+ return;
+
+ if (events & EPOLLERR)
goto close;
- if (conn->events == SPLICE_CONNECT) {
+ if (conn->events == CONNECT) {
if (!(events & EPOLLOUT))
goto close;
if (tcp_splice_connect_finish(c, conn))
@@ -597,9 +593,9 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
if (events & EPOLLOUT) {
if (ref.r.s == conn->a)
- conn_event(c, conn, ~SPLICE_A_OUT_WAIT);
+ conn_event(c, conn, ~A_OUT_WAIT);
else
- conn_event(c, conn, ~SPLICE_B_OUT_WAIT);
+ conn_event(c, conn, ~B_OUT_WAIT);
tcp_splice_dir(conn, ref.r.s, 1, &from, &to, &pipes);
} else {
@@ -608,9 +604,16 @@ void tcp_sock_handler_splice(struct ctx *c, union epoll_ref ref,
if (events & EPOLLRDHUP) {
if (ref.r.s == conn->a)
- conn_event(c, conn, SPLICE_A_FIN_RCVD);
+ conn_event(c, conn, A_FIN_RCVD);
+ else
+ conn_event(c, conn, B_FIN_RCVD);
+ }
+
+ if (events & EPOLLHUP) {
+ if (ref.r.s == conn->a)
+ conn_event(c, conn, A_FIN_SENT); /* Fake, but implied */
else
- conn_event(c, conn, SPLICE_B_FIN_RCVD);
+ conn_event(c, conn, B_FIN_SENT);
}
swap:
@@ -620,13 +623,13 @@ swap:
if (from == conn->a) {
seq_read = &conn->a_read;
seq_write = &conn->a_written;
- lowat_set_flag = SPLICE_RCVLOWAT_SET_A;
- lowat_act_flag = SPLICE_RCVLOWAT_ACT_A;
+ lowat_set_flag = RCVLOWAT_SET_A;
+ lowat_act_flag = RCVLOWAT_ACT_A;
} else {
seq_read = &conn->b_read;
seq_write = &conn->b_written;
- lowat_set_flag = SPLICE_RCVLOWAT_SET_B;
- lowat_act_flag = SPLICE_RCVLOWAT_ACT_B;
+ lowat_set_flag = RCVLOWAT_SET_B;
+ lowat_act_flag = RCVLOWAT_ACT_B;
}
while (1) {
@@ -636,6 +639,7 @@ swap:
retry:
readlen = splice(from, NULL, pipes[1], NULL, c->tcp.pipe_size,
SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
+ trace("TCP (spliced): %li from read-side call", readlen);
if (readlen < 0) {
if (errno == EINTR)
goto retry;
@@ -660,6 +664,8 @@ retry:
eintr:
written = splice(pipes[0], NULL, to, NULL, to_write,
SPLICE_F_MOVE | more | SPLICE_F_NONBLOCK);
+ trace("TCP (spliced): %li from write-side call (passed %lu)",
+ written, to_write);
/* Most common case: skip updating counters. */
if (readlen > 0 && readlen == written) {
@@ -697,9 +703,9 @@ eintr:
goto retry;
if (to == conn->a)
- conn_event(c, conn, SPLICE_A_OUT_WAIT);
+ conn_event(c, conn, A_OUT_WAIT);
else
- conn_event(c, conn, SPLICE_B_OUT_WAIT);
+ conn_event(c, conn, B_OUT_WAIT);
break;
}
@@ -715,23 +721,21 @@ eintr:
break;
}
- if ( (conn->events & SPLICE_A_FIN_RCVD) &&
- !(conn->events & SPLICE_B_FIN_SENT)) {
- if (*seq_read == *seq_write) {
+ if ((conn->events & A_FIN_RCVD) && !(conn->events & B_FIN_SENT)) {
+ if (*seq_read == *seq_write && eof) {
shutdown(conn->b, SHUT_WR);
- conn_event(c, conn, SPLICE_B_FIN_SENT);
+ conn_event(c, conn, B_FIN_SENT);
}
}
- if ( (conn->events & SPLICE_B_FIN_RCVD) &&
- !(conn->events & SPLICE_A_FIN_SENT)) {
- if (*seq_read == *seq_write) {
+ if ((conn->events & B_FIN_RCVD) && !(conn->events & A_FIN_SENT)) {
+ if (*seq_read == *seq_write && eof) {
shutdown(conn->a, SHUT_WR);
- conn_event(c, conn, SPLICE_A_FIN_SENT);
+ conn_event(c, conn, A_FIN_SENT);
}
}
- if (CONN_HAS(conn, SPLICE_A_FIN_SENT | SPLICE_B_FIN_SENT))
+ if (CONN_HAS(conn, A_FIN_SENT | B_FIN_SENT))
goto close;
if ((events & (EPOLLIN | EPOLLOUT)) == (EPOLLIN | EPOLLOUT)) {
@@ -746,10 +750,13 @@ eintr:
goto swap;
}
+ if (events & EPOLLHUP)
+ goto close;
+
return;
close:
- conn_flag(c, conn, SPLICE_CLOSING);
+ conn_flag(c, conn, CLOSING);
}
/**
@@ -829,38 +836,36 @@ void tcp_splice_init(struct ctx *c)
/**
* tcp_splice_timer() - Timer for spliced connections
* @c: Execution context
- * @now: Current timestamp
*/
-void tcp_splice_timer(struct ctx *c, struct timespec *now)
+void tcp_splice_timer(struct ctx *c)
{
struct tcp_splice_conn *conn;
for (conn = CONN(c->tcp.splice_conn_count - 1); conn >= tc; conn--) {
- if (conn->flags & SPLICE_CLOSING) {
+ if (conn->flags & CLOSING) {
tcp_splice_destroy(c, conn);
- continue;
+ return;
}
- if ( (conn->flags & SPLICE_RCVLOWAT_SET_A) &&
- !(conn->flags & SPLICE_RCVLOWAT_ACT_A)) {
+ if ( (conn->flags & RCVLOWAT_SET_A) &&
+ !(conn->flags & RCVLOWAT_ACT_A)) {
setsockopt(conn->a, SOL_SOCKET, SO_RCVLOWAT,
&((int){ 1 }), sizeof(int));
- conn_flag(c, conn, ~SPLICE_RCVLOWAT_SET_A);
+ conn_flag(c, conn, ~RCVLOWAT_SET_A);
}
- if ( (conn->flags & SPLICE_RCVLOWAT_SET_B) &&
- !(conn->flags & SPLICE_RCVLOWAT_ACT_B)) {
+ if ( (conn->flags & RCVLOWAT_SET_B) &&
+ !(conn->flags & RCVLOWAT_ACT_B)) {
setsockopt(conn->b, SOL_SOCKET, SO_RCVLOWAT,
&((int){ 1 }), sizeof(int));
- conn_flag(c, conn, ~SPLICE_RCVLOWAT_SET_B);
+ conn_flag(c, conn, ~RCVLOWAT_SET_B);
}
- conn_flag(c, conn, ~SPLICE_RCVLOWAT_ACT_A);
- conn_flag(c, conn, ~SPLICE_RCVLOWAT_ACT_B);
+ conn_flag(c, conn, ~RCVLOWAT_ACT_A);
+ conn_flag(c, conn, ~RCVLOWAT_ACT_B);
}
- if (timespec_diff_ms(now, &c->tcp.refill_ts) > REFILL_INTERVAL)
- tcp_splice_pipe_refill(c);
+ tcp_splice_pipe_refill(c);
}
/**
@@ -869,14 +874,15 @@ void tcp_splice_timer(struct ctx *c, struct timespec *now)
*/
void tcp_splice_defer_handler(struct ctx *c)
{
+ int max_conns = c->tcp.conn_count / 100 * TCP_SPLICE_CONN_PRESSURE;
int max_files = c->nofile / 100 * TCP_SPLICE_FILE_PRESSURE;
struct tcp_splice_conn *conn;
- if (c->tcp.splice_conn_count * 6 < max_files)
+ if (c->tcp.splice_conn_count < MIN(max_files / 6, max_conns))
return;
for (conn = CONN(c->tcp.splice_conn_count - 1); conn >= tc; conn--) {
- if (conn->flags & SPLICE_CLOSING)
+ if (conn->flags & CLOSING)
tcp_splice_destroy(c, conn);
}
}