aboutgitcodebugslistschat
diff options
context:
space:
mode:
-rw-r--r--flow.c198
-rw-r--r--flow.h1
-rw-r--r--flow_table.h16
-rw-r--r--passt.c2
-rw-r--r--tcp.c23
-rw-r--r--tcp_conn.h3
-rw-r--r--tcp_splice.c11
7 files changed, 167 insertions, 87 deletions
diff --git a/flow.c b/flow.c
index d6650fc..5e94a7a 100644
--- a/flow.c
+++ b/flow.c
@@ -26,7 +26,59 @@ static_assert(ARRAY_SIZE(flow_type_str) == FLOW_NUM_TYPES,
"flow_type_str[] doesn't match enum flow_type");
/* Global Flow Table */
-unsigned flow_count;
+
+/**
+ * DOC: Theory of Operation - allocating and freeing flow entries
+ *
+ * Flows are entries in flowtab[]. We need to routinely scan the whole table to
+ * perform deferred bookkeeping tasks on active entries, and sparse empty slots
+ * waste time and worsen data locality. But, keeping the table fully compact by
+ * moving entries on deletion is fiddly: it requires updating hash tables, and
+ * the epoll references to flows. Instead, we implement the compromise described
+ * below.
+ *
+ * Free clusters
+ * A "free cluster" is a contiguous set of unused (FLOW_TYPE_NONE) entries in
+ * flowtab[]. The first entry in each cluster contains metadata ('free'
+ * field in union flow), specifically the number of entries in the cluster
+ * (free.n), and the index of the next free cluster (free.next). The entries
+ * in the cluster other than the first should have n == next == 0.
+ *
+ * Free cluster list
+ * flow_first_free gives the index of the first (lowest index) free cluster.
+ * Each free cluster has the index of the next free cluster, or MAX_FLOW if
+ * it is the last free cluster. Together these form a linked list of free
+ * clusters, in strictly increasing order of index.
+ *
+ * Allocating
+ * We always allocate a new flow into the lowest available index, i.e. the
+ * first entry of the first free cluster, that is, at index flow_first_free.
+ * We update flow_first_free and the free cluster to maintain the invariants
+ * above (so the free cluster list is still in strictly increasing order).
+ *
+ * Freeing
+ * It's not possible to maintain the invariants above if we allow freeing of
+ * any entry at any time. So we only allow freeing in two cases.
+ *
+ * 1) flow_alloc_cancel() will free the most recent allocation. We can
+ * maintain the invariants because we know that allocation was made in the
+ * lowest available slot, and so will become the lowest index free slot again
+ * after cancellation.
+ *
+ * 2) Flows can be freed by returning true from the flow type specific
+ * deferred or timer function. These are called from flow_defer_handler()
+ * which is already scanning the whole table in index order. We can use that
+ * to rebuild the free cluster list correctly, either merging them into
+ * existing free clusters or creating new free clusters in the list for them.
+ *
+ * Scanning the table
+ * Theoretically, scanning the table requires FLOW_MAX iterations. However,
+ * when we encounter the start of a free cluster, we can immediately skip
+ * past it, meaning that in practice we only need (number of active
+ * connections) + (number of free clusters) iterations.
+ */
+
+unsigned flow_first_free;
union flow flowtab[FLOW_MAX];
/* Last time the flow timers ran */
@@ -57,10 +109,35 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
*/
union flow *flow_alloc(void)
{
- if (flow_count >= FLOW_MAX)
+ union flow *flow = &flowtab[flow_first_free];
+
+ if (flow_first_free >= FLOW_MAX)
return NULL;
- return &flowtab[flow_count++];
+ ASSERT(flow->f.type == FLOW_TYPE_NONE);
+ ASSERT(flow->free.n >= 1);
+ ASSERT(flow_first_free + flow->free.n <= FLOW_MAX);
+
+ if (flow->free.n > 1) {
+ union flow *next;
+
+ /* Use one entry from the cluster */
+ ASSERT(flow_first_free <= FLOW_MAX - 2);
+ next = &flowtab[++flow_first_free];
+
+ ASSERT(FLOW_IDX(next) < FLOW_MAX);
+ ASSERT(next->f.type == FLOW_TYPE_NONE);
+ ASSERT(next->free.n == 0);
+
+ next->free.n = flow->free.n - 1;
+ next->free.next = flow->free.next;
+ } else {
+ /* Use the entire cluster */
+ flow_first_free = flow->free.next;
+ }
+
+ memset(flow, 0, sizeof(*flow));
+ return flow;
}
/**
@@ -71,48 +148,15 @@ union flow *flow_alloc(void)
*/
void flow_alloc_cancel(union flow *flow)
{
- ASSERT(FLOW_IDX(flow) == flow_count - 1);
- memset(flow, 0, sizeof(*flow));
- flow_count--;
-}
-
-/**
- * flow_table_compact() - Perform compaction on flow table
- * @c: Execution context
- * @hole: Pointer to recently closed flow
- */
-static void flow_table_compact(const struct ctx *c, union flow *hole)
-{
- union flow *from;
-
- if (FLOW_IDX(hole) == --flow_count) {
- debug("flow: table compaction: maximum index was %u (%p)",
- FLOW_IDX(hole), (void *)hole);
- memset(hole, 0, sizeof(*hole));
- return;
- }
-
- from = flowtab + flow_count;
- memcpy(hole, from, sizeof(*hole));
-
- switch (from->f.type) {
- case FLOW_TCP:
- tcp_tap_conn_update(c, &from->tcp, &hole->tcp);
- break;
- case FLOW_TCP_SPLICE:
- tcp_splice_conn_update(c, &hole->tcp_splice);
- break;
- default:
- die("Unexpected %s in tcp_table_compact()",
- FLOW_TYPE(&from->f));
- }
-
- debug("flow: table compaction (%s): old index %u, new index %u, "
- "from: %p, to: %p",
- FLOW_TYPE(&from->f), FLOW_IDX(from), FLOW_IDX(hole),
- (void *)from, (void *)hole);
-
- memset(from, 0, sizeof(*from));
+ ASSERT(flow_first_free > FLOW_IDX(flow));
+
+ flow->f.type = FLOW_TYPE_NONE;
+ /* Put it back in a length 1 free cluster, don't attempt to fully
+ * reverse flow_alloc()s steps. This will get folded together the next
+ * time flow_defer_handler runs anyway() */
+ flow->free.n = 1;
+ flow->free.next = flow_first_free;
+ flow_first_free = FLOW_IDX(flow);
}
/**
@@ -122,18 +166,46 @@ static void flow_table_compact(const struct ctx *c, union flow *hole)
*/
void flow_defer_handler(const struct ctx *c, const struct timespec *now)
{
+ struct flow_free_cluster *free_head = NULL;
+ unsigned *last_next = &flow_first_free;
bool timer = false;
- union flow *flow;
+ unsigned idx;
if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) {
timer = true;
flow_timer_run = *now;
}
- for (flow = flowtab + flow_count - 1; flow >= flowtab; flow--) {
+ for (idx = 0; idx < FLOW_MAX; idx++) {
+ union flow *flow = &flowtab[idx];
bool closed = false;
+ if (flow->f.type == FLOW_TYPE_NONE) {
+ unsigned skip = flow->free.n;
+
+ /* First entry of a free cluster must have n >= 1 */
+ ASSERT(skip);
+
+ if (free_head) {
+ /* Merge into preceding free cluster */
+ free_head->n += flow->free.n;
+ flow->free.n = flow->free.next = 0;
+ } else {
+ /* New free cluster, add to chain */
+ free_head = &flow->free;
+ *last_next = idx;
+ last_next = &free_head->next;
+ }
+
+ /* Skip remaining empty entries */
+ idx += skip - 1;
+ continue;
+ }
+
switch (flow->f.type) {
+ case FLOW_TYPE_NONE:
+ ASSERT(false);
+ break;
case FLOW_TCP:
closed = tcp_flow_defer(flow);
break;
@@ -147,7 +219,35 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
;
}
- if (closed)
- flow_table_compact(c, flow);
+ if (closed) {
+ flow->f.type = FLOW_TYPE_NONE;
+
+ if (free_head) {
+ /* Add slot to current free cluster */
+ ASSERT(idx == FLOW_IDX(free_head) + free_head->n);
+ free_head->n++;
+ flow->free.n = flow->free.next = 0;
+ } else {
+ /* Create new free cluster */
+ free_head = &flow->free;
+ free_head->n = 1;
+ *last_next = idx;
+ last_next = &free_head->next;
+ }
+ } else {
+ free_head = NULL;
+ }
}
+
+ *last_next = FLOW_MAX;
+}
+
+/**
+ * flow_init() - Initialise flow related data structures
+ */
+void flow_init(void)
+{
+ /* Initial state is a single free cluster containing the whole table */
+ flowtab[0].free.n = FLOW_MAX;
+ flowtab[0].free.next = FLOW_MAX;
}
diff --git a/flow.h b/flow.h
index 8064f0e..48a0ab4 100644
--- a/flow.h
+++ b/flow.h
@@ -68,6 +68,7 @@ static inline bool flow_sidx_eq(flow_sidx_t a, flow_sidx_t b)
union flow;
+void flow_init(void);
void flow_defer_handler(const struct ctx *c, const struct timespec *now);
void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
diff --git a/flow_table.h b/flow_table.h
index 2773a2b..eecf884 100644
--- a/flow_table.h
+++ b/flow_table.h
@@ -10,6 +10,19 @@
#include "tcp_conn.h"
/**
+ * struct flow_free_cluster - Information about a cluster of free entries
+ * @f: Generic flow information
+ * @n: Number of entries in the free cluster (including this one)
+ * @next: Index of next free cluster
+ */
+struct flow_free_cluster {
+ /* Must be first element */
+ struct flow_common f;
+ unsigned n;
+ unsigned next;
+};
+
+/**
* union flow - Descriptor for a logical packet flow (e.g. connection)
* @f: Fields common between all variants
* @tcp: Fields for non-spliced TCP connections
@@ -17,12 +30,13 @@
*/
union flow {
struct flow_common f;
+ struct flow_free_cluster free;
struct tcp_tap_conn tcp;
struct tcp_splice_conn tcp_splice;
};
/* Global Flow Table */
-extern unsigned flow_count;
+extern unsigned flow_first_free;
extern union flow flowtab[];
diff --git a/passt.c b/passt.c
index 71bea8f..d315438 100644
--- a/passt.c
+++ b/passt.c
@@ -285,6 +285,8 @@ int main(int argc, char **argv)
clock_gettime(CLOCK_MONOTONIC, &now);
+ flow_init();
+
if ((!c.no_udp && udp_init(&c)) || (!c.no_tcp && tcp_init(&c)))
exit(EXIT_FAILURE);
diff --git a/tcp.c b/tcp.c
index ee2c3af..905d26f 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1252,29 +1252,6 @@ static void tcp_hash_remove(const struct ctx *c,
}
/**
- * tcp_tap_conn_update() - Update tcp_tap_conn when being moved in the table
- * @c: Execution context
- * @old: Old location of tcp_tap_conn
- * @new: New location of tcp_tap_conn
- */
-void tcp_tap_conn_update(const struct ctx *c, struct tcp_tap_conn *old,
- struct tcp_tap_conn *new)
-
-{
- unsigned b = tcp_hash_probe(c, old);
-
- if (!flow_at_sidx(tc_hash[b]))
- return; /* Not in hash table, nothing to update */
-
- tc_hash[b] = FLOW_SIDX(new, TAPSIDE);
-
- debug("TCP: hash table update: old index %u, new index %u, sock %i, "
- "bucket: %u", FLOW_IDX(old), FLOW_IDX(new), new->sock, b);
-
- tcp_epoll_ctl(c, new);
-}
-
-/**
* tcp_hash_lookup() - Look up connection given remote address and ports
* @c: Execution context
* @af: Address family, AF_INET or AF_INET6
diff --git a/tcp_conn.h b/tcp_conn.h
index 636224e..a5f5cfe 100644
--- a/tcp_conn.h
+++ b/tcp_conn.h
@@ -155,9 +155,6 @@ struct tcp_splice_conn {
extern int init_sock_pool4 [TCP_SOCK_POOL_SIZE];
extern int init_sock_pool6 [TCP_SOCK_POOL_SIZE];
-void tcp_tap_conn_update(const struct ctx *c, struct tcp_tap_conn *old,
- struct tcp_tap_conn *new);
-void tcp_splice_conn_update(const struct ctx *c, struct tcp_splice_conn *new);
bool tcp_flow_defer(union flow *flow);
bool tcp_splice_flow_defer(union flow *flow);
void tcp_splice_timer(const struct ctx *c, union flow *flow);
diff --git a/tcp_splice.c b/tcp_splice.c
index daef7de..26d3206 100644
--- a/tcp_splice.c
+++ b/tcp_splice.c
@@ -232,17 +232,6 @@ static void conn_event_do(const struct ctx *c, struct tcp_splice_conn *conn,
/**
- * tcp_splice_conn_update() - Update tcp_splice_conn when being moved in the table
- * @c: Execution context
- * @new: New location of tcp_splice_conn
- */
-void tcp_splice_conn_update(const struct ctx *c, struct tcp_splice_conn *new)
-{
- if (tcp_splice_epoll_ctl(c, new))
- conn_flag(c, new, CLOSING);
-}
-
-/**
* tcp_splice_flow_defer() - Deferred per-flow handling (clean up closed)
* @flow: Flow table entry for this connection
*