diff options
Diffstat (limited to 'flow.c')
-rw-r--r-- | flow.c | 198 |
1 files changed, 149 insertions, 49 deletions
@@ -26,7 +26,59 @@ static_assert(ARRAY_SIZE(flow_type_str) == FLOW_NUM_TYPES, "flow_type_str[] doesn't match enum flow_type"); /* Global Flow Table */ -unsigned flow_count; + +/** + * DOC: Theory of Operation - allocating and freeing flow entries + * + * Flows are entries in flowtab[]. We need to routinely scan the whole table to + * perform deferred bookkeeping tasks on active entries, and sparse empty slots + * waste time and worsen data locality. But, keeping the table fully compact by + * moving entries on deletion is fiddly: it requires updating hash tables, and + * the epoll references to flows. Instead, we implement the compromise described + * below. + * + * Free clusters + * A "free cluster" is a contiguous set of unused (FLOW_TYPE_NONE) entries in + * flowtab[]. The first entry in each cluster contains metadata ('free' + * field in union flow), specifically the number of entries in the cluster + * (free.n), and the index of the next free cluster (free.next). The entries + * in the cluster other than the first should have n == next == 0. + * + * Free cluster list + * flow_first_free gives the index of the first (lowest index) free cluster. + * Each free cluster has the index of the next free cluster, or MAX_FLOW if + * it is the last free cluster. Together these form a linked list of free + * clusters, in strictly increasing order of index. + * + * Allocating + * We always allocate a new flow into the lowest available index, i.e. the + * first entry of the first free cluster, that is, at index flow_first_free. + * We update flow_first_free and the free cluster to maintain the invariants + * above (so the free cluster list is still in strictly increasing order). + * + * Freeing + * It's not possible to maintain the invariants above if we allow freeing of + * any entry at any time. So we only allow freeing in two cases. + * + * 1) flow_alloc_cancel() will free the most recent allocation. We can + * maintain the invariants because we know that allocation was made in the + * lowest available slot, and so will become the lowest index free slot again + * after cancellation. + * + * 2) Flows can be freed by returning true from the flow type specific + * deferred or timer function. These are called from flow_defer_handler() + * which is already scanning the whole table in index order. We can use that + * to rebuild the free cluster list correctly, either merging them into + * existing free clusters or creating new free clusters in the list for them. + * + * Scanning the table + * Theoretically, scanning the table requires FLOW_MAX iterations. However, + * when we encounter the start of a free cluster, we can immediately skip + * past it, meaning that in practice we only need (number of active + * connections) + (number of free clusters) iterations. + */ + +unsigned flow_first_free; union flow flowtab[FLOW_MAX]; /* Last time the flow timers ran */ @@ -57,10 +109,35 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) */ union flow *flow_alloc(void) { - if (flow_count >= FLOW_MAX) + union flow *flow = &flowtab[flow_first_free]; + + if (flow_first_free >= FLOW_MAX) return NULL; - return &flowtab[flow_count++]; + ASSERT(flow->f.type == FLOW_TYPE_NONE); + ASSERT(flow->free.n >= 1); + ASSERT(flow_first_free + flow->free.n <= FLOW_MAX); + + if (flow->free.n > 1) { + union flow *next; + + /* Use one entry from the cluster */ + ASSERT(flow_first_free <= FLOW_MAX - 2); + next = &flowtab[++flow_first_free]; + + ASSERT(FLOW_IDX(next) < FLOW_MAX); + ASSERT(next->f.type == FLOW_TYPE_NONE); + ASSERT(next->free.n == 0); + + next->free.n = flow->free.n - 1; + next->free.next = flow->free.next; + } else { + /* Use the entire cluster */ + flow_first_free = flow->free.next; + } + + memset(flow, 0, sizeof(*flow)); + return flow; } /** @@ -71,48 +148,15 @@ union flow *flow_alloc(void) */ void flow_alloc_cancel(union flow *flow) { - ASSERT(FLOW_IDX(flow) == flow_count - 1); - memset(flow, 0, sizeof(*flow)); - flow_count--; -} - -/** - * flow_table_compact() - Perform compaction on flow table - * @c: Execution context - * @hole: Pointer to recently closed flow - */ -static void flow_table_compact(const struct ctx *c, union flow *hole) -{ - union flow *from; - - if (FLOW_IDX(hole) == --flow_count) { - debug("flow: table compaction: maximum index was %u (%p)", - FLOW_IDX(hole), (void *)hole); - memset(hole, 0, sizeof(*hole)); - return; - } - - from = flowtab + flow_count; - memcpy(hole, from, sizeof(*hole)); - - switch (from->f.type) { - case FLOW_TCP: - tcp_tap_conn_update(c, &from->tcp, &hole->tcp); - break; - case FLOW_TCP_SPLICE: - tcp_splice_conn_update(c, &hole->tcp_splice); - break; - default: - die("Unexpected %s in tcp_table_compact()", - FLOW_TYPE(&from->f)); - } - - debug("flow: table compaction (%s): old index %u, new index %u, " - "from: %p, to: %p", - FLOW_TYPE(&from->f), FLOW_IDX(from), FLOW_IDX(hole), - (void *)from, (void *)hole); - - memset(from, 0, sizeof(*from)); + ASSERT(flow_first_free > FLOW_IDX(flow)); + + flow->f.type = FLOW_TYPE_NONE; + /* Put it back in a length 1 free cluster, don't attempt to fully + * reverse flow_alloc()s steps. This will get folded together the next + * time flow_defer_handler runs anyway() */ + flow->free.n = 1; + flow->free.next = flow_first_free; + flow_first_free = FLOW_IDX(flow); } /** @@ -122,18 +166,46 @@ static void flow_table_compact(const struct ctx *c, union flow *hole) */ void flow_defer_handler(const struct ctx *c, const struct timespec *now) { + struct flow_free_cluster *free_head = NULL; + unsigned *last_next = &flow_first_free; bool timer = false; - union flow *flow; + unsigned idx; if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) { timer = true; flow_timer_run = *now; } - for (flow = flowtab + flow_count - 1; flow >= flowtab; flow--) { + for (idx = 0; idx < FLOW_MAX; idx++) { + union flow *flow = &flowtab[idx]; bool closed = false; + if (flow->f.type == FLOW_TYPE_NONE) { + unsigned skip = flow->free.n; + + /* First entry of a free cluster must have n >= 1 */ + ASSERT(skip); + + if (free_head) { + /* Merge into preceding free cluster */ + free_head->n += flow->free.n; + flow->free.n = flow->free.next = 0; + } else { + /* New free cluster, add to chain */ + free_head = &flow->free; + *last_next = idx; + last_next = &free_head->next; + } + + /* Skip remaining empty entries */ + idx += skip - 1; + continue; + } + switch (flow->f.type) { + case FLOW_TYPE_NONE: + ASSERT(false); + break; case FLOW_TCP: closed = tcp_flow_defer(flow); break; @@ -147,7 +219,35 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) ; } - if (closed) - flow_table_compact(c, flow); + if (closed) { + flow->f.type = FLOW_TYPE_NONE; + + if (free_head) { + /* Add slot to current free cluster */ + ASSERT(idx == FLOW_IDX(free_head) + free_head->n); + free_head->n++; + flow->free.n = flow->free.next = 0; + } else { + /* Create new free cluster */ + free_head = &flow->free; + free_head->n = 1; + *last_next = idx; + last_next = &free_head->next; + } + } else { + free_head = NULL; + } } + + *last_next = FLOW_MAX; +} + +/** + * flow_init() - Initialise flow related data structures + */ +void flow_init(void) +{ + /* Initial state is a single free cluster containing the whole table */ + flowtab[0].free.n = FLOW_MAX; + flowtab[0].free.next = FLOW_MAX; } |