aboutgitcodebugslistschat
path: root/flow.c
diff options
context:
space:
mode:
Diffstat (limited to 'flow.c')
-rw-r--r--flow.c180
1 files changed, 115 insertions, 65 deletions
diff --git a/flow.c b/flow.c
index 749c498..da5c813 100644
--- a/flow.c
+++ b/flow.c
@@ -81,7 +81,7 @@ static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES,
*
* Free cluster list
* flow_first_free gives the index of the first (lowest index) free cluster.
- * Each free cluster has the index of the next free cluster, or MAX_FLOW if
+ * Each free cluster has the index of the next free cluster, or FLOW_MAX if
* it is the last free cluster. Together these form a linked list of free
* clusters, in strictly increasing order of index.
*
@@ -396,18 +396,27 @@ const struct flowside *flow_initiate_af(union flow *flow, uint8_t pif,
* @flow: Flow to change state
* @pif: pif of the initiating side
* @ssa: Source socket address
+ * @daddr: Destination address (may be NULL)
* @dport: Destination port
*
* Return: pointer to the initiating flowside information
*/
struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif,
const union sockaddr_inany *ssa,
+ const union inany_addr *daddr,
in_port_t dport)
{
struct flowside *ini = &flow->f.side[INISIDE];
- inany_from_sockaddr(&ini->eaddr, &ini->eport, ssa);
- if (inany_v4(&ini->eaddr))
+ if (inany_from_sockaddr(&ini->eaddr, &ini->eport, ssa) < 0) {
+ char str[SOCKADDR_STRLEN];
+
+ ASSERT_WITH_MSG(0, "Bad socket address %s",
+ sockaddr_ntop(ssa, str, sizeof(str)));
+ }
+ if (daddr)
+ ini->oaddr = *daddr;
+ else if (inany_v4(&ini->eaddr))
ini->oaddr = inany_any4;
else
ini->oaddr = inany_any6;
@@ -471,7 +480,9 @@ struct flowside *flow_target(const struct ctx *c, union flow *flow,
/**
* flow_set_type() - Set type and move to TYPED
* @flow: Flow to change state
- * @pif: pif of the initiating side
+ * @type: New flow type to assign
+ *
+ * Return: pointer to the modified flow structure.
*/
union flow *flow_set_type(union flow *flow, enum flow_type type)
{
@@ -751,19 +762,30 @@ flow_sidx_t flow_lookup_af(const struct ctx *c,
* @proto: Protocol of the flow (IP L4 protocol number)
* @pif: Interface of the flow
* @esa: Socket address of the endpoint
+ * @oaddr: Our address (may be NULL)
* @oport: Our port number
*
* Return: sidx of the matching flow & side, FLOW_SIDX_NONE if not found
*/
flow_sidx_t flow_lookup_sa(const struct ctx *c, uint8_t proto, uint8_t pif,
- const void *esa, in_port_t oport)
+ const void *esa,
+ const union inany_addr *oaddr, in_port_t oport)
{
struct flowside side = {
.oport = oport,
};
- inany_from_sockaddr(&side.eaddr, &side.eport, esa);
- if (inany_v4(&side.eaddr))
+ if (inany_from_sockaddr(&side.eaddr, &side.eport, esa) < 0) {
+ char str[SOCKADDR_STRLEN];
+
+ warn("Flow lookup on bad socket address %s",
+ sockaddr_ntop(esa, str, sizeof(str)));
+ return FLOW_SIDX_NONE;
+ }
+
+ if (oaddr)
+ side.oaddr = *oaddr;
+ else if (inany_v4(&side.eaddr))
side.oaddr = inany_any4;
else
side.oaddr = inany_any6;
@@ -780,6 +802,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
{
struct flow_free_cluster *free_head = NULL;
unsigned *last_next = &flow_first_free;
+ bool to_free[FLOW_MAX] = { 0 };
bool timer = false;
union flow *flow;
@@ -790,9 +813,44 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
ASSERT(!flow_new_entry); /* Incomplete flow at end of cycle */
- flow_foreach_slot(flow) {
+ /* Check which flows we might need to close first, but don't free them
+ * yet as it's not safe to do that in the middle of flow_foreach().
+ */
+ flow_foreach(flow) {
bool closed = false;
+ switch (flow->f.type) {
+ case FLOW_TYPE_NONE:
+ ASSERT(false);
+ break;
+ case FLOW_TCP:
+ closed = tcp_flow_defer(&flow->tcp);
+ break;
+ case FLOW_TCP_SPLICE:
+ closed = tcp_splice_flow_defer(&flow->tcp_splice);
+ if (!closed && timer)
+ tcp_splice_timer(c, &flow->tcp_splice);
+ break;
+ case FLOW_PING4:
+ case FLOW_PING6:
+ if (timer)
+ closed = icmp_ping_timer(c, &flow->ping, now);
+ break;
+ case FLOW_UDP:
+ closed = udp_flow_defer(c, &flow->udp, now);
+ if (!closed && timer)
+ closed = udp_flow_timer(c, &flow->udp, now);
+ break;
+ default:
+ /* Assume other flow types don't need any handling */
+ ;
+ }
+
+ to_free[FLOW_IDX(flow)] = closed;
+ }
+
+ /* Second step: actually free the flows */
+ flow_foreach_slot(flow) {
switch (flow->f.state) {
case FLOW_STATE_FREE: {
unsigned skip = flow->free.n;
@@ -825,60 +883,31 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
break;
case FLOW_STATE_ACTIVE:
- /* Nothing to do */
+ if (to_free[FLOW_IDX(flow)]) {
+ flow_set_state(&flow->f, FLOW_STATE_FREE);
+ memset(flow, 0, sizeof(*flow));
+
+ if (free_head) {
+ /* Add slot to current free cluster */
+ ASSERT(FLOW_IDX(flow) ==
+ FLOW_IDX(free_head) + free_head->n);
+ free_head->n++;
+ flow->free.n = flow->free.next = 0;
+ } else {
+ /* Create new free cluster */
+ free_head = &flow->free;
+ free_head->n = 1;
+ *last_next = FLOW_IDX(flow);
+ last_next = &free_head->next;
+ }
+ } else {
+ free_head = NULL;
+ }
break;
default:
ASSERT(false);
}
-
- switch (flow->f.type) {
- case FLOW_TYPE_NONE:
- ASSERT(false);
- break;
- case FLOW_TCP:
- closed = tcp_flow_defer(&flow->tcp);
- break;
- case FLOW_TCP_SPLICE:
- closed = tcp_splice_flow_defer(&flow->tcp_splice);
- if (!closed && timer)
- tcp_splice_timer(c, &flow->tcp_splice);
- break;
- case FLOW_PING4:
- case FLOW_PING6:
- if (timer)
- closed = icmp_ping_timer(c, &flow->ping, now);
- break;
- case FLOW_UDP:
- closed = udp_flow_defer(&flow->udp);
- if (!closed && timer)
- closed = udp_flow_timer(c, &flow->udp, now);
- break;
- default:
- /* Assume other flow types don't need any handling */
- ;
- }
-
- if (closed) {
- flow_set_state(&flow->f, FLOW_STATE_FREE);
- memset(flow, 0, sizeof(*flow));
-
- if (free_head) {
- /* Add slot to current free cluster */
- ASSERT(FLOW_IDX(flow) ==
- FLOW_IDX(free_head) + free_head->n);
- free_head->n++;
- flow->free.n = flow->free.next = 0;
- } else {
- /* Create new free cluster */
- free_head = &flow->free;
- free_head->n = 1;
- *last_next = FLOW_IDX(flow);
- last_next = &free_head->next;
- }
- } else {
- free_head = NULL;
- }
}
*last_next = FLOW_MAX;
@@ -912,6 +941,21 @@ static int flow_migrate_source_rollback(struct ctx *c, unsigned bound, int ret)
}
/**
+ * flow_migrate_need_repair() - Do we need to set repair mode for any flow?
+ *
+ * Return: true if repair mode is needed, false otherwise
+ */
+static bool flow_migrate_need_repair(void)
+{
+ union flow *flow;
+
+ foreach_established_tcp_flow(flow)
+ return true;
+
+ return false;
+}
+
+/**
* flow_migrate_repair_all() - Turn repair mode on or off for all flows
* @c: Execution context
* @enable: Switch repair mode on if set, off otherwise
@@ -966,6 +1010,9 @@ int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
(void)stage;
(void)fd;
+ if (flow_migrate_need_repair())
+ repair_wait(c);
+
if ((rc = flow_migrate_repair_all(c, true)))
return -rc;
@@ -1019,8 +1066,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
foreach_established_tcp_flow(flow) {
rc = tcp_flow_migrate_source(fd, &flow->tcp);
if (rc) {
- err("Can't send data, flow %u: %s", FLOW_IDX(flow),
- strerror_(-rc));
+ flow_err(flow, "Can't send data: %s",
+ strerror_(-rc));
if (!first)
die("Inconsistent migration state, exiting");
@@ -1046,8 +1093,8 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
foreach_established_tcp_flow(flow) {
rc = tcp_flow_migrate_source_ext(fd, &flow->tcp);
if (rc) {
- err("Extended data for flow %u: %s", FLOW_IDX(flow),
- strerror_(-rc));
+ flow_err(flow, "Can't send extended data: %s",
+ strerror_(-rc));
if (rc == -EIO)
die("Inconsistent migration state, exiting");
@@ -1083,6 +1130,9 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
if (!count)
return 0;
+ if ((rc = repair_wait(c)))
+ return -rc;
+
if ((rc = flow_migrate_repair_all(c, true)))
return -rc;
@@ -1092,8 +1142,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
for (i = 0; i < count; i++) {
rc = tcp_flow_migrate_target(c, fd);
if (rc) {
- debug("Migration data failure at flow %u: %s, abort",
- i, strerror_(-rc));
+ flow_dbg(FLOW(i), "Migration data failure, abort: %s",
+ strerror_(-rc));
return -rc;
}
}
@@ -1103,8 +1153,8 @@ int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
for (i = 0; i < count; i++) {
rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd);
if (rc) {
- debug("Migration data failure at flow %u: %s, abort",
- i, strerror_(-rc));
+ flow_dbg(FLOW(i), "Migration data failure, abort: %s",
+ strerror_(-rc));
return -rc;
}
}