diff options
Diffstat (limited to 'flow.c')
-rw-r--r-- | flow.c | 180 |
1 files changed, 115 insertions, 65 deletions
@@ -81,7 +81,7 @@ static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, * * Free cluster list * flow_first_free gives the index of the first (lowest index) free cluster. - * Each free cluster has the index of the next free cluster, or MAX_FLOW if + * Each free cluster has the index of the next free cluster, or FLOW_MAX if * it is the last free cluster. Together these form a linked list of free * clusters, in strictly increasing order of index. * @@ -396,18 +396,27 @@ const struct flowside *flow_initiate_af(union flow *flow, uint8_t pif, * @flow: Flow to change state * @pif: pif of the initiating side * @ssa: Source socket address + * @daddr: Destination address (may be NULL) * @dport: Destination port * * Return: pointer to the initiating flowside information */ struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif, const union sockaddr_inany *ssa, + const union inany_addr *daddr, in_port_t dport) { struct flowside *ini = &flow->f.side[INISIDE]; - inany_from_sockaddr(&ini->eaddr, &ini->eport, ssa); - if (inany_v4(&ini->eaddr)) + if (inany_from_sockaddr(&ini->eaddr, &ini->eport, ssa) < 0) { + char str[SOCKADDR_STRLEN]; + + ASSERT_WITH_MSG(0, "Bad socket address %s", + sockaddr_ntop(ssa, str, sizeof(str))); + } + if (daddr) + ini->oaddr = *daddr; + else if (inany_v4(&ini->eaddr)) ini->oaddr = inany_any4; else ini->oaddr = inany_any6; @@ -471,7 +480,9 @@ struct flowside *flow_target(const struct ctx *c, union flow *flow, /** * flow_set_type() - Set type and move to TYPED * @flow: Flow to change state - * @pif: pif of the initiating side + * @type: New flow type to assign + * + * Return: pointer to the modified flow structure. */ union flow *flow_set_type(union flow *flow, enum flow_type type) { @@ -751,19 +762,30 @@ flow_sidx_t flow_lookup_af(const struct ctx *c, * @proto: Protocol of the flow (IP L4 protocol number) * @pif: Interface of the flow * @esa: Socket address of the endpoint + * @oaddr: Our address (may be NULL) * @oport: Our port number * * Return: sidx of the matching flow & side, FLOW_SIDX_NONE if not found */ flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif, - const void *esa, in_port_t oport) + const void *esa, + const union inany_addr *oaddr, in_port_t oport) { struct flowside side = { .oport = oport, }; - inany_from_sockaddr(&side.eaddr, &side.eport, esa); - if (inany_v4(&side.eaddr)) + if (inany_from_sockaddr(&side.eaddr, &side.eport, esa) < 0) { + char str[SOCKADDR_STRLEN]; + + warn("Flow lookup on bad socket address %s", + sockaddr_ntop(esa, str, sizeof(str))); + return FLOW_SIDX_NONE; + } + + if (oaddr) + side.oaddr = *oaddr; + else if (inany_v4(&side.eaddr)) side.oaddr = inany_any4; else side.oaddr = inany_any6; @@ -780,6 +802,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) { struct flow_free_cluster *free_head = NULL; unsigned *last_next = &flow_first_free; + bool to_free[FLOW_MAX] = { 0 }; bool timer = false; union flow *flow; @@ -790,9 +813,44 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) ASSERT(!flow_new_entry); /* Incomplete flow at end of cycle */ - flow_foreach_slot(flow) { + /* Check which flows we might need to close first, but don't free them + * yet as it's not safe to do that in the middle of flow_foreach(). + */ + flow_foreach(flow) { bool closed = false; + switch (flow->f.type) { + case FLOW_TYPE_NONE: + ASSERT(false); + break; + case FLOW_TCP: + closed = tcp_flow_defer(&flow->tcp); + break; + case FLOW_TCP_SPLICE: + closed = tcp_splice_flow_defer(&flow->tcp_splice); + if (!closed && timer) + tcp_splice_timer(c, &flow->tcp_splice); + break; + case FLOW_PING4: + case FLOW_PING6: + if (timer) + closed = icmp_ping_timer(c, &flow->ping, now); + break; + case FLOW_UDP: + closed = udp_flow_defer(c, &flow->udp, now); + if (!closed && timer) + closed = udp_flow_timer(c, &flow->udp, now); + break; + default: + /* Assume other flow types don't need any handling */ + ; + } + + to_free[FLOW_IDX(flow)] = closed; + } + + /* Second step: actually free the flows */ + flow_foreach_slot(flow) { switch (flow->f.state) { case FLOW_STATE_FREE: { unsigned skip = flow->free.n; @@ -825,60 +883,31 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) break; case FLOW_STATE_ACTIVE: - /* Nothing to do */ + if (to_free[FLOW_IDX(flow)]) { + flow_set_state(&flow->f, FLOW_STATE_FREE); + memset(flow, 0, sizeof(*flow)); + + if (free_head) { + /* Add slot to current free cluster */ + ASSERT(FLOW_IDX(flow) == + FLOW_IDX(free_head) + free_head->n); + free_head->n++; + flow->free.n = flow->free.next = 0; + } else { + /* Create new free cluster */ + free_head = &flow->free; + free_head->n = 1; + *last_next = FLOW_IDX(flow); + last_next = &free_head->next; + } + } else { + free_head = NULL; + } break; default: ASSERT(false); } - - switch (flow->f.type) { - case FLOW_TYPE_NONE: - ASSERT(false); - break; - case FLOW_TCP: - closed = tcp_flow_defer(&flow->tcp); - break; - case FLOW_TCP_SPLICE: - closed = tcp_splice_flow_defer(&flow->tcp_splice); - if (!closed && timer) - tcp_splice_timer(c, &flow->tcp_splice); - break; - case FLOW_PING4: - case FLOW_PING6: - if (timer) - closed = icmp_ping_timer(c, &flow->ping, now); - break; - case FLOW_UDP: - closed = udp_flow_defer(&flow->udp); - if (!closed && timer) - closed = udp_flow_timer(c, &flow->udp, now); - break; - default: - /* Assume other flow types don't need any handling */ - ; - } - - if (closed) { - flow_set_state(&flow->f, FLOW_STATE_FREE); - memset(flow, 0, sizeof(*flow)); - - if (free_head) { - /* Add slot to current free cluster */ - ASSERT(FLOW_IDX(flow) == - FLOW_IDX(free_head) + free_head->n); - free_head->n++; - flow->free.n = flow->free.next = 0; - } else { - /* Create new free cluster */ - free_head = &flow->free; - free_head->n = 1; - *last_next = FLOW_IDX(flow); - last_next = &free_head->next; - } - } else { - free_head = NULL; - } } *last_next = FLOW_MAX; @@ -912,6 +941,21 @@ static int flow_migrate_source_rollback(struct ctx *c, unsigned bound, int ret) } /** + * flow_migrate_need_repair() - Do we need to set repair mode for any flow? + * + * Return: true if repair mode is needed, false otherwise + */ +static bool flow_migrate_need_repair(void) +{ + union flow *flow; + + foreach_established_tcp_flow(flow) + return true; + + return false; +} + +/** * flow_migrate_repair_all() - Turn repair mode on or off for all flows * @c: Execution context * @enable: Switch repair mode on if set, off otherwise @@ -966,6 +1010,9 @@ int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage, (void)stage; (void)fd; + if (flow_migrate_need_repair()) + repair_wait(c); + if ((rc = flow_migrate_repair_all(c, true))) return -rc; @@ -1019,8 +1066,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, foreach_established_tcp_flow(flow) { rc = tcp_flow_migrate_source(fd, &flow->tcp); if (rc) { - err("Can't send data, flow %u: %s", FLOW_IDX(flow), - strerror_(-rc)); + flow_err(flow, "Can't send data: %s", + strerror_(-rc)); if (!first) die("Inconsistent migration state, exiting"); @@ -1046,8 +1093,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, foreach_established_tcp_flow(flow) { rc = tcp_flow_migrate_source_ext(fd, &flow->tcp); if (rc) { - err("Extended data for flow %u: %s", FLOW_IDX(flow), - strerror_(-rc)); + flow_err(flow, "Can't send extended data: %s", + strerror_(-rc)); if (rc == -EIO) die("Inconsistent migration state, exiting"); @@ -1083,6 +1130,9 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, if (!count) return 0; + if ((rc = repair_wait(c))) + return -rc; + if ((rc = flow_migrate_repair_all(c, true))) return -rc; @@ -1092,8 +1142,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, for (i = 0; i < count; i++) { rc = tcp_flow_migrate_target(c, fd); if (rc) { - debug("Migration data failure at flow %u: %s, abort", - i, strerror_(-rc)); + flow_dbg(FLOW(i), "Migration data failure, abort: %s", + strerror_(-rc)); return -rc; } } @@ -1103,8 +1153,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, for (i = 0; i < count; i++) { rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd); if (rc) { - debug("Migration data failure at flow %u: %s, abort", - i, strerror_(-rc)); + flow_dbg(FLOW(i), "Migration data failure, abort: %s", + strerror_(-rc)); return -rc; } } |