diff options
Diffstat (limited to 'flow.c')
-rw-r--r-- | flow.c | 1039 |
1 files changed, 937 insertions, 102 deletions
@@ -5,9 +5,11 @@ * Tracking for logical "flows" of packets. */ +#include <errno.h> #include <stdint.h> #include <stdio.h> #include <unistd.h> +#include <sched.h> #include <string.h> #include "util.h" @@ -17,6 +19,18 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" +#include "repair.h" + +const char *flow_state_str[] = { + [FLOW_STATE_FREE] = "FREE", + [FLOW_STATE_NEW] = "NEW", + [FLOW_STATE_INI] = "INI", + [FLOW_STATE_TGT] = "TGT", + [FLOW_STATE_TYPED] = "TYPED", + [FLOW_STATE_ACTIVE] = "ACTIVE", +}; +static_assert(ARRAY_SIZE(flow_state_str) == FLOW_NUM_STATES, + "flow_state_str[] doesn't match enum flow_state"); const char *flow_type_str[] = { [FLOW_TYPE_NONE] = "<none>", @@ -24,6 +38,7 @@ const char *flow_type_str[] = { [FLOW_TCP_SPLICE] = "TCP connection (spliced)", [FLOW_PING4] = "ICMP ping sequence", [FLOW_PING6] = "ICMPv6 ping sequence", + [FLOW_UDP] = "UDP flow", }; static_assert(ARRAY_SIZE(flow_type_str) == FLOW_NUM_TYPES, "flow_type_str[] doesn't match enum flow_type"); @@ -33,51 +48,19 @@ const uint8_t flow_proto[] = { [FLOW_TCP_SPLICE] = IPPROTO_TCP, [FLOW_PING4] = IPPROTO_ICMP, [FLOW_PING6] = IPPROTO_ICMPV6, + [FLOW_UDP] = IPPROTO_UDP, }; static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, "flow_proto[] doesn't match enum flow_type"); -/* Global Flow Table */ +#define foreach_established_tcp_flow(flow) \ + flow_foreach_of_type((flow), FLOW_TCP) \ + if (!tcp_flow_is_established(&(flow)->tcp)) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else -/** - * DOC: Theory of Operation - flow entry life cycle - * - * An individual flow table entry moves through these logical states, usually in - * this order. - * - * FREE - Part of the general pool of free flow table entries - * Operations: - * - flow_alloc() finds an entry and moves it to ALLOC state - * - * ALLOC - A tentatively allocated entry - * Operations: - * - flow_alloc_cancel() returns the entry to FREE state - * - FLOW_START() set the entry's type and moves to START state - * Caveats: - * - It's not safe to write fields in the flow entry - * - It's not safe to allocate further entries with flow_alloc() - * - It's not safe to return to the main epoll loop (use FLOW_START() - * to move to START state before doing so) - * - It's not safe to use flow_*() logging functions - * - * START - An entry being prepared by flow type specific code - * Operations: - * - Flow type specific fields may be accessed - * - flow_*() logging functions - * - flow_alloc_cancel() returns the entry to FREE state - * Caveats: - * - Returning to the main epoll loop or allocating another entry - * with flow_alloc() implicitly moves the entry to ACTIVE state. - * - * ACTIVE - An active flow entry managed by flow type specific code - * Operations: - * - Flow type specific fields may be accessed - * - flow_*() logging functions - * - Flow may be expired by returning 'true' from flow type specific - * deferred or timer handler. This will return it to FREE state. - * Caveats: - * - It's not safe to call flow_alloc_cancel() - */ +/* Global Flow Table */ /** * DOC: Theory of Operation - allocating and freeing flow entries @@ -98,7 +81,7 @@ static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, * * Free cluster list * flow_first_free gives the index of the first (lowest index) free cluster. - * Each free cluster has the index of the next free cluster, or MAX_FLOW if + * Each free cluster has the index of the next free cluster, or FLOW_MAX if * it is the last free cluster. Together these form a linked list of free * clusters, in strictly increasing order of index. * @@ -132,18 +115,167 @@ static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, unsigned flow_first_free; union flow flowtab[FLOW_MAX]; +static const union flow *flow_new_entry; /* = NULL */ + +/* Hash table to index it */ +#define FLOW_HASH_LOAD 70 /* % */ +#define FLOW_HASH_SIZE ((2 * FLOW_MAX * 100 / FLOW_HASH_LOAD)) + +/* Table for lookup from flowside information */ +static flow_sidx_t flow_hashtab[FLOW_HASH_SIZE]; + +static_assert(ARRAY_SIZE(flow_hashtab) >= 2 * FLOW_MAX, +"Safe linear probing requires hash table with more entries than the number of sides in the flow table"); /* Last time the flow timers ran */ static struct timespec flow_timer_run; +/** flowside_from_af() - Initialise flowside from addresses + * @side: flowside to initialise + * @af: Address family (AF_INET or AF_INET6) + * @eaddr: Endpoint address (pointer to in_addr or in6_addr) + * @eport: Endpoint port + * @oaddr: Our address (pointer to in_addr or in6_addr) + * @oport: Our port + */ +static void flowside_from_af(struct flowside *side, sa_family_t af, + const void *eaddr, in_port_t eport, + const void *oaddr, in_port_t oport) +{ + if (oaddr) + inany_from_af(&side->oaddr, af, oaddr); + else + side->oaddr = inany_any6; + side->oport = oport; + + if (eaddr) + inany_from_af(&side->eaddr, af, eaddr); + else + side->eaddr = inany_any6; + side->eport = eport; +} + +/** + * struct flowside_sock_args - Parameters for flowside_sock_splice() + * @c: Execution context + * @fd: Filled in with new socket fd + * @err: Filled in with errno if something failed + * @type: Socket epoll type + * @sa: Socket address + * @sl: Length of @sa + * @data: epoll reference data + */ +struct flowside_sock_args { + const struct ctx *c; + int fd; + int err; + enum epoll_type type; + const struct sockaddr *sa; + socklen_t sl; + const char *path; + uint32_t data; +}; + +/** flowside_sock_splice() - Create and bind socket for PIF_SPLICE based on flowside + * @arg: Argument as a struct flowside_sock_args + * + * Return: 0 + */ +static int flowside_sock_splice(void *arg) +{ + struct flowside_sock_args *a = arg; + + ns_enter(a->c); + + a->fd = sock_l4_sa(a->c, a->type, a->sa, a->sl, NULL, + a->sa->sa_family == AF_INET6, a->data); + a->err = errno; + + return 0; +} + +/** flowside_sock_l4() - Create and bind socket based on flowside + * @c: Execution context + * @type: Socket epoll type + * @pif: Interface for this socket + * @tgt: Target flowside + * @data: epoll reference portion for protocol handlers + * + * Return: socket fd of protocol @proto bound to our address and port from @tgt + * (if specified). + */ +int flowside_sock_l4(const struct ctx *c, enum epoll_type type, uint8_t pif, + const struct flowside *tgt, uint32_t data) +{ + const char *ifname = NULL; + union sockaddr_inany sa; + socklen_t sl; + + ASSERT(pif_is_socket(pif)); + + pif_sockaddr(c, &sa, &sl, pif, &tgt->oaddr, tgt->oport); + + switch (pif) { + case PIF_HOST: + if (inany_is_loopback(&tgt->oaddr)) + ifname = NULL; + else if (sa.sa_family == AF_INET) + ifname = c->ip4.ifname_out; + else if (sa.sa_family == AF_INET6) + ifname = c->ip6.ifname_out; + + return sock_l4_sa(c, type, &sa, sl, ifname, + sa.sa_family == AF_INET6, data); + + case PIF_SPLICE: { + struct flowside_sock_args args = { + .c = c, .type = type, + .sa = &sa.sa, .sl = sl, .data = data, + }; + NS_CALL(flowside_sock_splice, &args); + errno = args.err; + return args.fd; + } + + default: + /* If we add new socket pifs, they'll need to be implemented + * here + */ + ASSERT(0); + } +} + +/** flowside_connect() - Connect a socket based on flowside + * @c: Execution context + * @s: Socket to connect + * @pif: Target pif + * @tgt: Target flowside + * + * Connect @s to the endpoint address and port from @tgt. + * + * Return: 0 on success, negative on error + */ +int flowside_connect(const struct ctx *c, int s, + uint8_t pif, const struct flowside *tgt) +{ + union sockaddr_inany sa; + socklen_t sl; + + pif_sockaddr(c, &sa, &sl, pif, &tgt->eaddr, tgt->eport); + return connect(s, &sa.sa, sl); +} + /** flow_log_ - Log flow-related message * @f: flow the message is related to + * @newline: Append newline at the end of the message, if missing * @pri: Log priority * @fmt: Format string * @...: printf-arguments */ -void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) +void flow_log_(const struct flow_common *f, bool newline, int pri, + const char *fmt, ...) { + const char *type_or_state; char msg[BUFSIZ]; va_list args; @@ -151,40 +283,232 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...) (void)vsnprintf(msg, sizeof(msg), fmt, args); va_end(args); - logmsg(pri, "Flow %u (%s): %s", flow_idx(f), FLOW_TYPE(f), msg); + /* Show type if it's set, otherwise the state */ + if (f->state < FLOW_STATE_TYPED) + type_or_state = FLOW_STATE(f); + else + type_or_state = FLOW_TYPE(f); + + logmsg(newline, false, pri, + "Flow %u (%s): %s", flow_idx(f), type_or_state, msg); +} + +/** flow_log_details_() - Log the details of a flow + * @f: flow to log + * @pri: Log priority + * @state: State to log details according to + * + * Logs the details of the flow: endpoints, interfaces, type etc. + */ +void flow_log_details_(const struct flow_common *f, int pri, + enum flow_state state) +{ + char estr0[INANY_ADDRSTRLEN], fstr0[INANY_ADDRSTRLEN]; + char estr1[INANY_ADDRSTRLEN], fstr1[INANY_ADDRSTRLEN]; + const struct flowside *ini = &f->side[INISIDE]; + const struct flowside *tgt = &f->side[TGTSIDE]; + + if (state >= FLOW_STATE_TGT) + flow_log_(f, true, pri, + "%s [%s]:%hu -> [%s]:%hu => %s [%s]:%hu -> [%s]:%hu", + pif_name(f->pif[INISIDE]), + inany_ntop(&ini->eaddr, estr0, sizeof(estr0)), + ini->eport, + inany_ntop(&ini->oaddr, fstr0, sizeof(fstr0)), + ini->oport, + pif_name(f->pif[TGTSIDE]), + inany_ntop(&tgt->oaddr, fstr1, sizeof(fstr1)), + tgt->oport, + inany_ntop(&tgt->eaddr, estr1, sizeof(estr1)), + tgt->eport); + else if (state >= FLOW_STATE_INI) + flow_log_(f, true, pri, "%s [%s]:%hu -> [%s]:%hu => ?", + pif_name(f->pif[INISIDE]), + inany_ntop(&ini->eaddr, estr0, sizeof(estr0)), + ini->eport, + inany_ntop(&ini->oaddr, fstr0, sizeof(fstr0)), + ini->oport); +} + +/** + * flow_set_state() - Change flow's state + * @f: Flow changing state + * @state: New state + */ +static void flow_set_state(struct flow_common *f, enum flow_state state) +{ + uint8_t oldstate = f->state; + + ASSERT(state < FLOW_NUM_STATES); + ASSERT(oldstate < FLOW_NUM_STATES); + + f->state = state; + flow_log_(f, true, LOG_DEBUG, "%s -> %s", flow_state_str[oldstate], + FLOW_STATE(f)); + + flow_log_details_(f, LOG_DEBUG, MAX(state, oldstate)); +} + +/** + * flow_initiate_() - Move flow to INI, setting pif[INISIDE] + * @flow: Flow to change state + * @pif: pif of the initiating side + */ +static void flow_initiate_(union flow *flow, uint8_t pif) +{ + struct flow_common *f = &flow->f; + + ASSERT(pif != PIF_NONE); + ASSERT(flow_new_entry == flow && f->state == FLOW_STATE_NEW); + ASSERT(f->type == FLOW_TYPE_NONE); + ASSERT(f->pif[INISIDE] == PIF_NONE && f->pif[TGTSIDE] == PIF_NONE); + + f->pif[INISIDE] = pif; + flow_set_state(f, FLOW_STATE_INI); +} + +/** + * flow_initiate_af() - Move flow to INI, setting INISIDE details + * @flow: Flow to change state + * @pif: pif of the initiating side + * @af: Address family of @saddr and @daddr + * @saddr: Source address (pointer to in_addr or in6_addr) + * @sport: Endpoint port + * @daddr: Destination address (pointer to in_addr or in6_addr) + * @dport: Destination port + * + * Return: pointer to the initiating flowside information + */ +const struct flowside *flow_initiate_af(union flow *flow, uint8_t pif, + sa_family_t af, + const void *saddr, in_port_t sport, + const void *daddr, in_port_t dport) +{ + struct flowside *ini = &flow->f.side[INISIDE]; + + flowside_from_af(ini, af, saddr, sport, daddr, dport); + flow_initiate_(flow, pif); + return ini; +} + +/** + * flow_initiate_sa() - Move flow to INI, setting INISIDE details + * @flow: Flow to change state + * @pif: pif of the initiating side + * @ssa: Source socket address + * @daddr: Destination address (may be NULL) + * @dport: Destination port + * + * Return: pointer to the initiating flowside information + */ +struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif, + const union sockaddr_inany *ssa, + const union inany_addr *daddr, + in_port_t dport) +{ + struct flowside *ini = &flow->f.side[INISIDE]; + + if (inany_from_sockaddr(&ini->eaddr, &ini->eport, ssa) < 0) { + char str[SOCKADDR_STRLEN]; + + ASSERT_WITH_MSG(0, "Bad socket address %s", + sockaddr_ntop(ssa, str, sizeof(str))); + } + if (daddr) + ini->oaddr = *daddr; + else if (inany_v4(&ini->eaddr)) + ini->oaddr = inany_any4; + else + ini->oaddr = inany_any6; + ini->oport = dport; + flow_initiate_(flow, pif); + return ini; } /** - * flow_start() - Set flow type for new flow and log - * @flow: Flow to set type for - * @type: Type for new flow - * @iniside: Which side initiated the new flow + * flow_target() - Determine where flow should forward to, and move to TGT + * @c: Execution context + * @flow: Flow to forward + * @proto: Protocol * - * Return: @flow + * Return: pointer to the target flowside information + */ +struct flowside *flow_target(const struct ctx *c, union flow *flow, + uint8_t proto) +{ + char estr[INANY_ADDRSTRLEN], fstr[INANY_ADDRSTRLEN]; + struct flow_common *f = &flow->f; + const struct flowside *ini = &f->side[INISIDE]; + struct flowside *tgt = &f->side[TGTSIDE]; + uint8_t tgtpif = PIF_NONE; + + ASSERT(flow_new_entry == flow && f->state == FLOW_STATE_INI); + ASSERT(f->type == FLOW_TYPE_NONE); + ASSERT(f->pif[INISIDE] != PIF_NONE && f->pif[TGTSIDE] == PIF_NONE); + ASSERT(flow->f.state == FLOW_STATE_INI); + + switch (f->pif[INISIDE]) { + case PIF_TAP: + tgtpif = fwd_nat_from_tap(c, proto, ini, tgt); + break; + + case PIF_SPLICE: + tgtpif = fwd_nat_from_splice(c, proto, ini, tgt); + break; + + case PIF_HOST: + tgtpif = fwd_nat_from_host(c, proto, ini, tgt); + break; + + default: + flow_err(flow, "No rules to forward %s [%s]:%hu -> [%s]:%hu", + pif_name(f->pif[INISIDE]), + inany_ntop(&ini->eaddr, estr, sizeof(estr)), + ini->eport, + inany_ntop(&ini->oaddr, fstr, sizeof(fstr)), + ini->oport); + } + + if (tgtpif == PIF_NONE) + return NULL; + + f->pif[TGTSIDE] = tgtpif; + flow_set_state(f, FLOW_STATE_TGT); + return tgt; +} + +/** + * flow_set_type() - Set type and move to TYPED + * @flow: Flow to change state + * @type: New flow type to assign * - * Should be called before setting any flow type specific fields in the flow - * table entry. + * Return: pointer to the modified flow structure. */ -union flow *flow_start(union flow *flow, enum flow_type type, - unsigned iniside) +union flow *flow_set_type(union flow *flow, enum flow_type type) { - (void)iniside; - flow->f.type = type; - flow_dbg(flow, "START %s", flow_type_str[flow->f.type]); + struct flow_common *f = &flow->f; + + ASSERT(type != FLOW_TYPE_NONE); + ASSERT(flow_new_entry == flow && f->state == FLOW_STATE_TGT); + ASSERT(f->type == FLOW_TYPE_NONE); + ASSERT(f->pif[INISIDE] != PIF_NONE && f->pif[TGTSIDE] != PIF_NONE); + + f->type = type; + flow_set_state(f, FLOW_STATE_TYPED); return flow; } /** - * flow_end() - Clear flow type for finished flow and log - * @flow: Flow to clear + * flow_activate() - Move flow to ACTIVE + * @f: Flow to change state */ -static void flow_end(union flow *flow) +void flow_activate(struct flow_common *f) { - if (flow->f.type == FLOW_TYPE_NONE) - return; /* Nothing to do */ + ASSERT(&flow_new_entry->f == f && f->state == FLOW_STATE_TYPED); + ASSERT(f->pif[INISIDE] != PIF_NONE && f->pif[TGTSIDE] != PIF_NONE); - flow_dbg(flow, "END %s", flow_type_str[flow->f.type]); - flow->f.type = FLOW_TYPE_NONE; + flow_set_state(f, FLOW_STATE_ACTIVE); + flow_new_entry = NULL; } /** @@ -196,9 +520,12 @@ union flow *flow_alloc(void) { union flow *flow = &flowtab[flow_first_free]; + ASSERT(!flow_new_entry); + if (flow_first_free >= FLOW_MAX) return NULL; + ASSERT(flow->f.state == FLOW_STATE_FREE); ASSERT(flow->f.type == FLOW_TYPE_NONE); ASSERT(flow->free.n >= 1); ASSERT(flow_first_free + flow->free.n <= FLOW_MAX); @@ -221,7 +548,10 @@ union flow *flow_alloc(void) flow_first_free = flow->free.next; } + flow_new_entry = flow; memset(flow, 0, sizeof(*flow)); + flow_set_state(&flow->f, FLOW_STATE_NEW); + return flow; } @@ -233,15 +563,234 @@ union flow *flow_alloc(void) */ void flow_alloc_cancel(union flow *flow) { + ASSERT(flow_new_entry == flow); + ASSERT(flow->f.state == FLOW_STATE_NEW || + flow->f.state == FLOW_STATE_INI || + flow->f.state == FLOW_STATE_TGT || + flow->f.state == FLOW_STATE_TYPED); ASSERT(flow_first_free > FLOW_IDX(flow)); - flow_end(flow); + flow_set_state(&flow->f, FLOW_STATE_FREE); + memset(flow, 0, sizeof(*flow)); + /* Put it back in a length 1 free cluster, don't attempt to fully * reverse flow_alloc()s steps. This will get folded together the next * time flow_defer_handler runs anyway() */ flow->free.n = 1; flow->free.next = flow_first_free; flow_first_free = FLOW_IDX(flow); + flow_new_entry = NULL; +} + +/** + * flow_hash() - Calculate hash value for one side of a flow + * @c: Execution context + * @proto: Protocol of this flow (IP L4 protocol number) + * @pif: pif of the side to hash + * @side: Flowside (must not have unspecified parts) + * + * Return: hash value + */ +static uint64_t flow_hash(const struct ctx *c, uint8_t proto, uint8_t pif, + const struct flowside *side) +{ + struct siphash_state state = SIPHASH_INIT(c->hash_secret); + + inany_siphash_feed(&state, &side->oaddr); + inany_siphash_feed(&state, &side->eaddr); + + return siphash_final(&state, 38, (uint64_t)proto << 40 | + (uint64_t)pif << 32 | + (uint64_t)side->oport << 16 | + (uint64_t)side->eport); +} + +/** + * flow_sidx_hash() - Calculate hash value for given side of a given flow + * @c: Execution context + * @sidx: Flow & side index to get hash for + * + * Return: hash value, of the flow & side represented by @sidx + */ +static uint64_t flow_sidx_hash(const struct ctx *c, flow_sidx_t sidx) +{ + const struct flow_common *f = &flow_at_sidx(sidx)->f; + const struct flowside *side = &f->side[sidx.sidei]; + uint8_t pif = f->pif[sidx.sidei]; + + ASSERT(pif != PIF_NONE); + return flow_hash(c, FLOW_PROTO(f), pif, side); +} + +/** + * flow_hash_probe_() - Find hash bucket for a flow, given hash + * @hash: Raw hash value for flow & side + * @sidx: Flow and side to find bucket for + * + * Return: If @sidx is in the hash table, its current bucket, otherwise a + * suitable free bucket for it. + */ +static inline unsigned flow_hash_probe_(uint64_t hash, flow_sidx_t sidx) +{ + unsigned b = hash % FLOW_HASH_SIZE; + + /* Linear probing */ + while (flow_sidx_valid(flow_hashtab[b]) && + !flow_sidx_eq(flow_hashtab[b], sidx)) + b = mod_sub(b, 1, FLOW_HASH_SIZE); + + return b; +} + +/** + * flow_hash_probe() - Find hash bucket for a flow + * @c: Execution context + * @sidx: Flow and side to find bucket for + * + * Return: If @sidx is in the hash table, its current bucket, otherwise a + * suitable free bucket for it. + */ +static inline unsigned flow_hash_probe(const struct ctx *c, flow_sidx_t sidx) +{ + return flow_hash_probe_(flow_sidx_hash(c, sidx), sidx); +} + +/** + * flow_hash_insert() - Insert side of a flow into into hash table + * @c: Execution context + * @sidx: Flow & side index + * + * Return: raw (un-modded) hash value of side of flow + */ +uint64_t flow_hash_insert(const struct ctx *c, flow_sidx_t sidx) +{ + uint64_t hash = flow_sidx_hash(c, sidx); + unsigned b = flow_hash_probe_(hash, sidx); + + flow_hashtab[b] = sidx; + flow_dbg(flow_at_sidx(sidx), "Side %u hash table insert: bucket: %u", + sidx.sidei, b); + + return hash; +} + +/** + * flow_hash_remove() - Drop side of a flow from the hash table + * @c: Execution context + * @sidx: Side of flow to remove + */ +void flow_hash_remove(const struct ctx *c, flow_sidx_t sidx) +{ + unsigned b = flow_hash_probe(c, sidx), s; + + if (!flow_sidx_valid(flow_hashtab[b])) + return; /* Redundant remove */ + + flow_dbg(flow_at_sidx(sidx), "Side %u hash table remove: bucket: %u", + sidx.sidei, b); + + /* Scan the remainder of the cluster */ + for (s = mod_sub(b, 1, FLOW_HASH_SIZE); + flow_sidx_valid(flow_hashtab[s]); + s = mod_sub(s, 1, FLOW_HASH_SIZE)) { + unsigned h = flow_sidx_hash(c, flow_hashtab[s]) % FLOW_HASH_SIZE; + + if (!mod_between(h, s, b, FLOW_HASH_SIZE)) { + /* flow_hashtab[s] can live in flow_hashtab[b]'s slot */ + debug("hash table remove: shuffle %u -> %u", s, b); + flow_hashtab[b] = flow_hashtab[s]; + b = s; + } + } + + flow_hashtab[b] = FLOW_SIDX_NONE; +} + +/** + * flowside_lookup() - Look for a matching flowside in the flow table + * @c: Execution context + * @proto: Protocol of the flow (IP L4 protocol number) + * @pif: pif to look for in the table + * @side: Flowside to look for in the table + * + * Return: sidx of the matching flow & side, FLOW_SIDX_NONE if not found + */ +static flow_sidx_t flowside_lookup(const struct ctx *c, uint8_t proto, + uint8_t pif, const struct flowside *side) +{ + flow_sidx_t sidx; + union flow *flow; + unsigned b; + + b = flow_hash(c, proto, pif, side) % FLOW_HASH_SIZE; + while ((sidx = flow_hashtab[b], flow = flow_at_sidx(sidx)) && + !(FLOW_PROTO(&flow->f) == proto && + flow->f.pif[sidx.sidei] == pif && + flowside_eq(&flow->f.side[sidx.sidei], side))) + b = mod_sub(b, 1, FLOW_HASH_SIZE); + + return flow_hashtab[b]; +} + +/** + * flow_lookup_af() - Look up a flow given addressing information + * @c: Execution context + * @proto: Protocol of the flow (IP L4 protocol number) + * @pif: Interface of the flow + * @af: Address family, AF_INET or AF_INET6 + * @eaddr: Guest side endpoint address (guest local address) + * @oaddr: Our guest side address (guest remote address) + * @eport: Guest side endpoint port (guest local port) + * @oport: Our guest side port (guest remote port) + * + * Return: sidx of the matching flow & side, FLOW_SIDX_NONE if not found + */ +flow_sidx_t flow_lookup_af(const struct ctx *c, + uint8_t proto, uint8_t pif, sa_family_t af, + const void *eaddr, const void *oaddr, + in_port_t eport, in_port_t oport) +{ + struct flowside side; + + flowside_from_af(&side, af, eaddr, eport, oaddr, oport); + return flowside_lookup(c, proto, pif, &side); +} + +/** + * flow_lookup_sa() - Look up a flow given an endpoint socket address + * @c: Execution context + * @proto: Protocol of the flow (IP L4 protocol number) + * @pif: Interface of the flow + * @esa: Socket address of the endpoint + * @oaddr: Our address (may be NULL) + * @oport: Our port number + * + * Return: sidx of the matching flow & side, FLOW_SIDX_NONE if not found + */ +flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif, + const void *esa, + const union inany_addr *oaddr, in_port_t oport) +{ + struct flowside side = { + .oport = oport, + }; + + if (inany_from_sockaddr(&side.eaddr, &side.eport, esa) < 0) { + char str[SOCKADDR_STRLEN]; + + warn("Flow lookup on bad socket address %s", + sockaddr_ntop(esa, str, sizeof(str))); + return FLOW_SIDX_NONE; + } + + if (oaddr) + side.oaddr = *oaddr; + else if (inany_v4(&side.eaddr)) + side.oaddr = inany_any4; + else + side.oaddr = inany_any6; + + return flowside_lookup(c, proto, pif, &side); } /** @@ -253,79 +802,111 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) { struct flow_free_cluster *free_head = NULL; unsigned *last_next = &flow_first_free; + bool to_free[FLOW_MAX] = { 0 }; bool timer = false; - unsigned idx; + union flow *flow; if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) { timer = true; flow_timer_run = *now; } - for (idx = 0; idx < FLOW_MAX; idx++) { - union flow *flow = &flowtab[idx]; - bool closed = false; - - if (flow->f.type == FLOW_TYPE_NONE) { - unsigned skip = flow->free.n; - - /* First entry of a free cluster must have n >= 1 */ - ASSERT(skip); - - if (free_head) { - /* Merge into preceding free cluster */ - free_head->n += flow->free.n; - flow->free.n = flow->free.next = 0; - } else { - /* New free cluster, add to chain */ - free_head = &flow->free; - *last_next = idx; - last_next = &free_head->next; - } + ASSERT(!flow_new_entry); /* Incomplete flow at end of cycle */ - /* Skip remaining empty entries */ - idx += skip - 1; - continue; - } + /* Check which flows we might need to close first, but don't free them + * yet as it's not safe to do that in the middle of flow_foreach(). + */ + flow_foreach(flow) { + bool closed = false; switch (flow->f.type) { case FLOW_TYPE_NONE: ASSERT(false); break; case FLOW_TCP: - closed = tcp_flow_defer(flow); + closed = tcp_flow_defer(&flow->tcp); break; case FLOW_TCP_SPLICE: - closed = tcp_splice_flow_defer(flow); + closed = tcp_splice_flow_defer(&flow->tcp_splice); if (!closed && timer) - tcp_splice_timer(c, flow); + tcp_splice_timer(c, &flow->tcp_splice); break; case FLOW_PING4: case FLOW_PING6: if (timer) - closed = icmp_ping_timer(c, flow, now); + closed = icmp_ping_timer(c, &flow->ping, now); + break; + case FLOW_UDP: + closed = udp_flow_defer(c, &flow->udp, now); + if (!closed && timer) + closed = udp_flow_timer(c, &flow->udp, now); break; default: /* Assume other flow types don't need any handling */ ; } - if (closed) { - flow_end(flow); + to_free[FLOW_IDX(flow)] = closed; + } + + /* Second step: actually free the flows */ + flow_foreach_slot(flow) { + switch (flow->f.state) { + case FLOW_STATE_FREE: { + unsigned skip = flow->free.n; + + /* First entry of a free cluster must have n >= 1 */ + ASSERT(skip); if (free_head) { - /* Add slot to current free cluster */ - ASSERT(idx == FLOW_IDX(free_head) + free_head->n); - free_head->n++; + /* Merge into preceding free cluster */ + free_head->n += flow->free.n; flow->free.n = flow->free.next = 0; } else { - /* Create new free cluster */ + /* New free cluster, add to chain */ free_head = &flow->free; - free_head->n = 1; - *last_next = idx; + *last_next = FLOW_IDX(flow); last_next = &free_head->next; } - } else { - free_head = NULL; + + /* Skip remaining empty entries */ + flow += skip - 1; + continue; + } + + case FLOW_STATE_NEW: + case FLOW_STATE_INI: + case FLOW_STATE_TGT: + case FLOW_STATE_TYPED: + /* Incomplete flow at end of cycle */ + ASSERT(false); + break; + + case FLOW_STATE_ACTIVE: + if (to_free[FLOW_IDX(flow)]) { + flow_set_state(&flow->f, FLOW_STATE_FREE); + memset(flow, 0, sizeof(*flow)); + + if (free_head) { + /* Add slot to current free cluster */ + ASSERT(FLOW_IDX(flow) == + FLOW_IDX(free_head) + free_head->n); + free_head->n++; + flow->free.n = flow->free.next = 0; + } else { + /* Create new free cluster */ + free_head = &flow->free; + free_head->n = 1; + *last_next = FLOW_IDX(flow); + last_next = &free_head->next; + } + } else { + free_head = NULL; + } + break; + + default: + ASSERT(false); } } @@ -333,11 +914,265 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) } /** + * flow_migrate_source_rollback() - Disable repair mode, return failure + * @c: Execution context + * @bound: No need to roll back flow indices >= @bound + * @ret: Negative error code + * + * Return: @ret + */ +static int flow_migrate_source_rollback(struct ctx *c, unsigned bound, int ret) +{ + union flow *flow; + + debug("...roll back migration"); + + foreach_established_tcp_flow(flow) { + if (FLOW_IDX(flow) >= bound) + break; + if (tcp_flow_repair_off(c, &flow->tcp)) + die("Failed to roll back TCP_REPAIR mode"); + } + + if (repair_flush(c)) + die("Failed to roll back TCP_REPAIR mode"); + + return ret; +} + +/** + * flow_migrate_need_repair() - Do we need to set repair mode for any flow? + * + * Return: true if repair mode is needed, false otherwise + */ +static bool flow_migrate_need_repair(void) +{ + union flow *flow; + + foreach_established_tcp_flow(flow) + return true; + + return false; +} + +/** + * flow_migrate_repair_all() - Turn repair mode on or off for all flows + * @c: Execution context + * @enable: Switch repair mode on if set, off otherwise + * + * Return: 0 on success, negative error code on failure + */ +static int flow_migrate_repair_all(struct ctx *c, bool enable) +{ + union flow *flow; + int rc; + + /* If we don't have a repair helper, there's nothing we can do */ + if (c->fd_repair < 0) + return 0; + + foreach_established_tcp_flow(flow) { + if (enable) + rc = tcp_flow_repair_on(c, &flow->tcp); + else + rc = tcp_flow_repair_off(c, &flow->tcp); + + if (rc) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, FLOW_IDX(flow), + rc); + } + } + + if ((rc = repair_flush(c))) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, FLOW_IDX(flow), rc); + } + + return 0; +} + +/** + * flow_migrate_source_pre() - Prepare flows for migration: enable repair mode + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor (unused) + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + int rc; + + (void)stage; + (void)fd; + + if (flow_migrate_need_repair()) + repair_wait(c); + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + return 0; +} + +/** + * flow_migrate_source() - Dump all the remaining information and send data + * @c: Execution context (unused) + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count = 0; + bool first = true; + union flow *flow; + int rc; + + (void)c; + (void)stage; + + /* If we don't have a repair helper, we can't migrate TCP flows */ + if (c->fd_repair >= 0) { + foreach_established_tcp_flow(flow) + count++; + } + + count = htonl(count); + if (write_all_buf(fd, &count, sizeof(count))) { + rc = errno; + err_perror("Can't send flow count (%u)", ntohl(count)); + return flow_migrate_source_rollback(c, FLOW_MAX, rc); + } + + debug("Sending %u flows", ntohl(count)); + + if (!count) + return 0; + + /* Dump and send information that can be stored in the flow table. + * + * Limited rollback options here: if we fail to transfer any data (that + * is, on the first flow), undo everything and resume. Otherwise, the + * stream might now be inconsistent, and we might have closed listening + * TCP sockets, so just terminate. + */ + foreach_established_tcp_flow(flow) { + rc = tcp_flow_migrate_source(fd, &flow->tcp); + if (rc) { + flow_err(flow, "Can't send data: %s", + strerror_(-rc)); + if (!first) + die("Inconsistent migration state, exiting"); + + return flow_migrate_source_rollback(c, FLOW_MAX, -rc); + } + + first = false; + } + + /* And then "extended" data (including window data we saved previously): + * the target needs to set repair mode on sockets before it can set + * this stuff, but it needs sockets (and flows) for that. + * + * This also closes sockets so that the target can start connecting + * theirs: you can't sendmsg() to queues (using the socket) if the + * socket is not connected (EPIPE), not even in repair mode. And the + * target needs to restore queues now because we're sending the data. + * + * So, no rollback here, just try as hard as we can. Tolerate per-flow + * failures but not if the stream might be inconsistent (reported here + * as EIO). + */ + foreach_established_tcp_flow(flow) { + rc = tcp_flow_migrate_source_ext(fd, &flow->tcp); + if (rc) { + flow_err(flow, "Can't send extended data: %s", + strerror_(-rc)); + + if (rc == -EIO) + die("Inconsistent migration state, exiting"); + } + } + + return 0; +} + +/** + * flow_migrate_target() - Receive flows and insert in flow table + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count; + unsigned i; + int rc; + + (void)stage; + + if (read_all_buf(fd, &count, sizeof(count))) + return errno; + + count = ntohl(count); + debug("Receiving %u flows", count); + + if (!count) + return 0; + + if ((rc = repair_wait(c))) + return -rc; + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + repair_flush(c); + + /* TODO: flow header with type, instead? */ + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target(c, fd); + if (rc) { + flow_dbg(FLOW(i), "Migration data failure, abort: %s", + strerror_(-rc)); + return -rc; + } + } + + repair_flush(c); + + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd); + if (rc) { + flow_dbg(FLOW(i), "Migration data failure, abort: %s", + strerror_(-rc)); + return -rc; + } + } + + return 0; +} + +/** * flow_init() - Initialise flow related data structures */ void flow_init(void) { + unsigned b; + /* Initial state is a single free cluster containing the whole table */ flowtab[0].free.n = FLOW_MAX; flowtab[0].free.next = FLOW_MAX; + + for (b = 0; b < FLOW_HASH_SIZE; b++) + flow_hashtab[b] = FLOW_SIDX_NONE; } |