aboutgitcodebugslistschat
path: root/flow.c
diff options
context:
space:
mode:
Diffstat (limited to 'flow.c')
-rw-r--r--flow.c198
1 files changed, 149 insertions, 49 deletions
diff --git a/flow.c b/flow.c
index d6650fc..5e94a7a 100644
--- a/flow.c
+++ b/flow.c
@@ -26,7 +26,59 @@ static_assert(ARRAY_SIZE(flow_type_str) == FLOW_NUM_TYPES,
"flow_type_str[] doesn't match enum flow_type");
/* Global Flow Table */
-unsigned flow_count;
+
+/**
+ * DOC: Theory of Operation - allocating and freeing flow entries
+ *
+ * Flows are entries in flowtab[]. We need to routinely scan the whole table to
+ * perform deferred bookkeeping tasks on active entries, and sparse empty slots
+ * waste time and worsen data locality. But, keeping the table fully compact by
+ * moving entries on deletion is fiddly: it requires updating hash tables, and
+ * the epoll references to flows. Instead, we implement the compromise described
+ * below.
+ *
+ * Free clusters
+ * A "free cluster" is a contiguous set of unused (FLOW_TYPE_NONE) entries in
+ * flowtab[]. The first entry in each cluster contains metadata ('free'
+ * field in union flow), specifically the number of entries in the cluster
+ * (free.n), and the index of the next free cluster (free.next). The entries
+ * in the cluster other than the first should have n == next == 0.
+ *
+ * Free cluster list
+ * flow_first_free gives the index of the first (lowest index) free cluster.
+ * Each free cluster has the index of the next free cluster, or MAX_FLOW if
+ * it is the last free cluster. Together these form a linked list of free
+ * clusters, in strictly increasing order of index.
+ *
+ * Allocating
+ * We always allocate a new flow into the lowest available index, i.e. the
+ * first entry of the first free cluster, that is, at index flow_first_free.
+ * We update flow_first_free and the free cluster to maintain the invariants
+ * above (so the free cluster list is still in strictly increasing order).
+ *
+ * Freeing
+ * It's not possible to maintain the invariants above if we allow freeing of
+ * any entry at any time. So we only allow freeing in two cases.
+ *
+ * 1) flow_alloc_cancel() will free the most recent allocation. We can
+ * maintain the invariants because we know that allocation was made in the
+ * lowest available slot, and so will become the lowest index free slot again
+ * after cancellation.
+ *
+ * 2) Flows can be freed by returning true from the flow type specific
+ * deferred or timer function. These are called from flow_defer_handler()
+ * which is already scanning the whole table in index order. We can use that
+ * to rebuild the free cluster list correctly, either merging them into
+ * existing free clusters or creating new free clusters in the list for them.
+ *
+ * Scanning the table
+ * Theoretically, scanning the table requires FLOW_MAX iterations. However,
+ * when we encounter the start of a free cluster, we can immediately skip
+ * past it, meaning that in practice we only need (number of active
+ * connections) + (number of free clusters) iterations.
+ */
+
+unsigned flow_first_free;
union flow flowtab[FLOW_MAX];
/* Last time the flow timers ran */
@@ -57,10 +109,35 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
*/
union flow *flow_alloc(void)
{
- if (flow_count >= FLOW_MAX)
+ union flow *flow = &flowtab[flow_first_free];
+
+ if (flow_first_free >= FLOW_MAX)
return NULL;
- return &flowtab[flow_count++];
+ ASSERT(flow->f.type == FLOW_TYPE_NONE);
+ ASSERT(flow->free.n >= 1);
+ ASSERT(flow_first_free + flow->free.n <= FLOW_MAX);
+
+ if (flow->free.n > 1) {
+ union flow *next;
+
+ /* Use one entry from the cluster */
+ ASSERT(flow_first_free <= FLOW_MAX - 2);
+ next = &flowtab[++flow_first_free];
+
+ ASSERT(FLOW_IDX(next) < FLOW_MAX);
+ ASSERT(next->f.type == FLOW_TYPE_NONE);
+ ASSERT(next->free.n == 0);
+
+ next->free.n = flow->free.n - 1;
+ next->free.next = flow->free.next;
+ } else {
+ /* Use the entire cluster */
+ flow_first_free = flow->free.next;
+ }
+
+ memset(flow, 0, sizeof(*flow));
+ return flow;
}
/**
@@ -71,48 +148,15 @@ union flow *flow_alloc(void)
*/
void flow_alloc_cancel(union flow *flow)
{
- ASSERT(FLOW_IDX(flow) == flow_count - 1);
- memset(flow, 0, sizeof(*flow));
- flow_count--;
-}
-
-/**
- * flow_table_compact() - Perform compaction on flow table
- * @c: Execution context
- * @hole: Pointer to recently closed flow
- */
-static void flow_table_compact(const struct ctx *c, union flow *hole)
-{
- union flow *from;
-
- if (FLOW_IDX(hole) == --flow_count) {
- debug("flow: table compaction: maximum index was %u (%p)",
- FLOW_IDX(hole), (void *)hole);
- memset(hole, 0, sizeof(*hole));
- return;
- }
-
- from = flowtab + flow_count;
- memcpy(hole, from, sizeof(*hole));
-
- switch (from->f.type) {
- case FLOW_TCP:
- tcp_tap_conn_update(c, &from->tcp, &hole->tcp);
- break;
- case FLOW_TCP_SPLICE:
- tcp_splice_conn_update(c, &hole->tcp_splice);
- break;
- default:
- die("Unexpected %s in tcp_table_compact()",
- FLOW_TYPE(&from->f));
- }
-
- debug("flow: table compaction (%s): old index %u, new index %u, "
- "from: %p, to: %p",
- FLOW_TYPE(&from->f), FLOW_IDX(from), FLOW_IDX(hole),
- (void *)from, (void *)hole);
-
- memset(from, 0, sizeof(*from));
+ ASSERT(flow_first_free > FLOW_IDX(flow));
+
+ flow->f.type = FLOW_TYPE_NONE;
+ /* Put it back in a length 1 free cluster, don't attempt to fully
+ * reverse flow_alloc()s steps. This will get folded together the next
+ * time flow_defer_handler runs anyway() */
+ flow->free.n = 1;
+ flow->free.next = flow_first_free;
+ flow_first_free = FLOW_IDX(flow);
}
/**
@@ -122,18 +166,46 @@ static void flow_table_compact(const struct ctx *c, union flow *hole)
*/
void flow_defer_handler(const struct ctx *c, const struct timespec *now)
{
+ struct flow_free_cluster *free_head = NULL;
+ unsigned *last_next = &flow_first_free;
bool timer = false;
- union flow *flow;
+ unsigned idx;
if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) {
timer = true;
flow_timer_run = *now;
}
- for (flow = flowtab + flow_count - 1; flow >= flowtab; flow--) {
+ for (idx = 0; idx < FLOW_MAX; idx++) {
+ union flow *flow = &flowtab[idx];
bool closed = false;
+ if (flow->f.type == FLOW_TYPE_NONE) {
+ unsigned skip = flow->free.n;
+
+ /* First entry of a free cluster must have n >= 1 */
+ ASSERT(skip);
+
+ if (free_head) {
+ /* Merge into preceding free cluster */
+ free_head->n += flow->free.n;
+ flow->free.n = flow->free.next = 0;
+ } else {
+ /* New free cluster, add to chain */
+ free_head = &flow->free;
+ *last_next = idx;
+ last_next = &free_head->next;
+ }
+
+ /* Skip remaining empty entries */
+ idx += skip - 1;
+ continue;
+ }
+
switch (flow->f.type) {
+ case FLOW_TYPE_NONE:
+ ASSERT(false);
+ break;
case FLOW_TCP:
closed = tcp_flow_defer(flow);
break;
@@ -147,7 +219,35 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
;
}
- if (closed)
- flow_table_compact(c, flow);
+ if (closed) {
+ flow->f.type = FLOW_TYPE_NONE;
+
+ if (free_head) {
+ /* Add slot to current free cluster */
+ ASSERT(idx == FLOW_IDX(free_head) + free_head->n);
+ free_head->n++;
+ flow->free.n = flow->free.next = 0;
+ } else {
+ /* Create new free cluster */
+ free_head = &flow->free;
+ free_head->n = 1;
+ *last_next = idx;
+ last_next = &free_head->next;
+ }
+ } else {
+ free_head = NULL;
+ }
}
+
+ *last_next = FLOW_MAX;
+}
+
+/**
+ * flow_init() - Initialise flow related data structures
+ */
+void flow_init(void)
+{
+ /* Initial state is a single free cluster containing the whole table */
+ flowtab[0].free.n = FLOW_MAX;
+ flowtab[0].free.next = FLOW_MAX;
}