aboutgitcodebugslistschat
path: root/flow.c
diff options
context:
space:
mode:
Diffstat (limited to 'flow.c')
-rw-r--r--flow.c288
1 files changed, 273 insertions, 15 deletions
diff --git a/flow.c b/flow.c
index a6fe6d1..5e64b79 100644
--- a/flow.c
+++ b/flow.c
@@ -19,6 +19,7 @@
#include "inany.h"
#include "flow.h"
#include "flow_table.h"
+#include "repair.h"
const char *flow_state_str[] = {
[FLOW_STATE_FREE] = "FREE",
@@ -52,6 +53,13 @@ const uint8_t flow_proto[] = {
static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES,
"flow_proto[] doesn't match enum flow_type");
+#define foreach_established_tcp_flow(flow) \
+ flow_foreach_of_type((flow), FLOW_TCP) \
+ if (!tcp_flow_is_established(&(flow)->tcp)) \
+ /* NOLINTNEXTLINE(bugprone-branch-clone) */ \
+ continue; \
+ else
+
/* Global Flow Table */
/**
@@ -259,11 +267,13 @@ int flowside_connect(const struct ctx *c, int s,
/** flow_log_ - Log flow-related message
* @f: flow the message is related to
+ * @newline: Append newline at the end of the message, if missing
* @pri: Log priority
* @fmt: Format string
* @...: printf-arguments
*/
-void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
+void flow_log_(const struct flow_common *f, bool newline, int pri,
+ const char *fmt, ...)
{
const char *type_or_state;
char msg[BUFSIZ];
@@ -279,7 +289,7 @@ void flow_log_(const struct flow_common *f, int pri, const char *fmt, ...)
else
type_or_state = FLOW_TYPE(f);
- logmsg(true, false, pri,
+ logmsg(newline, false, pri,
"Flow %u (%s): %s", flow_idx(f), type_or_state, msg);
}
@@ -299,7 +309,7 @@ void flow_log_details_(const struct flow_common *f, int pri,
const struct flowside *tgt = &f->side[TGTSIDE];
if (state >= FLOW_STATE_TGT)
- flow_log_(f, pri,
+ flow_log_(f, true, pri,
"%s [%s]:%hu -> [%s]:%hu => %s [%s]:%hu -> [%s]:%hu",
pif_name(f->pif[INISIDE]),
inany_ntop(&ini->eaddr, estr0, sizeof(estr0)),
@@ -312,7 +322,7 @@ void flow_log_details_(const struct flow_common *f, int pri,
inany_ntop(&tgt->eaddr, estr1, sizeof(estr1)),
tgt->eport);
else if (state >= FLOW_STATE_INI)
- flow_log_(f, pri, "%s [%s]:%hu -> [%s]:%hu => ?",
+ flow_log_(f, true, pri, "%s [%s]:%hu -> [%s]:%hu => ?",
pif_name(f->pif[INISIDE]),
inany_ntop(&ini->eaddr, estr0, sizeof(estr0)),
ini->eport,
@@ -333,7 +343,7 @@ static void flow_set_state(struct flow_common *f, enum flow_state state)
ASSERT(oldstate < FLOW_NUM_STATES);
f->state = state;
- flow_log_(f, LOG_DEBUG, "%s -> %s", flow_state_str[oldstate],
+ flow_log_(f, true, LOG_DEBUG, "%s -> %s", flow_state_str[oldstate],
FLOW_STATE(f));
flow_log_details_(f, LOG_DEBUG, MAX(state, oldstate));
@@ -390,9 +400,9 @@ const struct flowside *flow_initiate_af(union flow *flow, uint8_t pif,
*
* Return: pointer to the initiating flowside information
*/
-const struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif,
- const union sockaddr_inany *ssa,
- in_port_t dport)
+struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif,
+ const union sockaddr_inany *ssa,
+ in_port_t dport)
{
struct flowside *ini = &flow->f.side[INISIDE];
@@ -771,7 +781,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
struct flow_free_cluster *free_head = NULL;
unsigned *last_next = &flow_first_free;
bool timer = false;
- unsigned idx;
+ union flow *flow;
if (timespec_diff_ms(now, &flow_timer_run) >= FLOW_TIMER_INTERVAL) {
timer = true;
@@ -780,8 +790,7 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
ASSERT(!flow_new_entry); /* Incomplete flow at end of cycle */
- for (idx = 0; idx < FLOW_MAX; idx++) {
- union flow *flow = &flowtab[idx];
+ flow_foreach_slot(flow) {
bool closed = false;
switch (flow->f.state) {
@@ -798,12 +807,12 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
} else {
/* New free cluster, add to chain */
free_head = &flow->free;
- *last_next = idx;
+ *last_next = FLOW_IDX(flow);
last_next = &free_head->next;
}
/* Skip remaining empty entries */
- idx += skip - 1;
+ flow += skip - 1;
continue;
}
@@ -856,14 +865,15 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
if (free_head) {
/* Add slot to current free cluster */
- ASSERT(idx == FLOW_IDX(free_head) + free_head->n);
+ ASSERT(FLOW_IDX(flow) ==
+ FLOW_IDX(free_head) + free_head->n);
free_head->n++;
flow->free.n = flow->free.next = 0;
} else {
/* Create new free cluster */
free_head = &flow->free;
free_head->n = 1;
- *last_next = idx;
+ *last_next = FLOW_IDX(flow);
last_next = &free_head->next;
}
} else {
@@ -875,6 +885,254 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now)
}
/**
+ * flow_migrate_source_rollback() - Disable repair mode, return failure
+ * @c: Execution context
+ * @bound: No need to roll back flow indices >= @bound
+ * @ret: Negative error code
+ *
+ * Return: @ret
+ */
+static int flow_migrate_source_rollback(struct ctx *c, unsigned bound, int ret)
+{
+ union flow *flow;
+
+ debug("...roll back migration");
+
+ foreach_established_tcp_flow(flow) {
+ if (FLOW_IDX(flow) >= bound)
+ break;
+ if (tcp_flow_repair_off(c, &flow->tcp))
+ die("Failed to roll back TCP_REPAIR mode");
+ }
+
+ if (repair_flush(c))
+ die("Failed to roll back TCP_REPAIR mode");
+
+ return ret;
+}
+
+/**
+ * flow_migrate_need_repair() - Do we need to set repair mode for any flow?
+ *
+ * Return: true if repair mode is needed, false otherwise
+ */
+static bool flow_migrate_need_repair(void)
+{
+ union flow *flow;
+
+ foreach_established_tcp_flow(flow)
+ return true;
+
+ return false;
+}
+
+/**
+ * flow_migrate_repair_all() - Turn repair mode on or off for all flows
+ * @c: Execution context
+ * @enable: Switch repair mode on if set, off otherwise
+ *
+ * Return: 0 on success, negative error code on failure
+ */
+static int flow_migrate_repair_all(struct ctx *c, bool enable)
+{
+ union flow *flow;
+ int rc;
+
+ /* If we don't have a repair helper, there's nothing we can do */
+ if (c->fd_repair < 0)
+ return 0;
+
+ foreach_established_tcp_flow(flow) {
+ if (enable)
+ rc = tcp_flow_repair_on(c, &flow->tcp);
+ else
+ rc = tcp_flow_repair_off(c, &flow->tcp);
+
+ if (rc) {
+ debug("Can't %s repair mode: %s",
+ enable ? "enable" : "disable", strerror_(-rc));
+ return flow_migrate_source_rollback(c, FLOW_IDX(flow),
+ rc);
+ }
+ }
+
+ if ((rc = repair_flush(c))) {
+ debug("Can't %s repair mode: %s",
+ enable ? "enable" : "disable", strerror_(-rc));
+ return flow_migrate_source_rollback(c, FLOW_IDX(flow), rc);
+ }
+
+ return 0;
+}
+
+/**
+ * flow_migrate_source_pre() - Prepare flows for migration: enable repair mode
+ * @c: Execution context
+ * @stage: Migration stage information (unused)
+ * @fd: Migration file descriptor (unused)
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage,
+ int fd)
+{
+ int rc;
+
+ (void)stage;
+ (void)fd;
+
+ if (flow_migrate_need_repair())
+ repair_wait(c);
+
+ if ((rc = flow_migrate_repair_all(c, true)))
+ return -rc;
+
+ return 0;
+}
+
+/**
+ * flow_migrate_source() - Dump all the remaining information and send data
+ * @c: Execution context (unused)
+ * @stage: Migration stage information (unused)
+ * @fd: Migration file descriptor
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage,
+ int fd)
+{
+ uint32_t count = 0;
+ bool first = true;
+ union flow *flow;
+ int rc;
+
+ (void)c;
+ (void)stage;
+
+ /* If we don't have a repair helper, we can't migrate TCP flows */
+ if (c->fd_repair >= 0) {
+ foreach_established_tcp_flow(flow)
+ count++;
+ }
+
+ count = htonl(count);
+ if (write_all_buf(fd, &count, sizeof(count))) {
+ rc = errno;
+ err_perror("Can't send flow count (%u)", ntohl(count));
+ return flow_migrate_source_rollback(c, FLOW_MAX, rc);
+ }
+
+ debug("Sending %u flows", ntohl(count));
+
+ if (!count)
+ return 0;
+
+ /* Dump and send information that can be stored in the flow table.
+ *
+ * Limited rollback options here: if we fail to transfer any data (that
+ * is, on the first flow), undo everything and resume. Otherwise, the
+ * stream might now be inconsistent, and we might have closed listening
+ * TCP sockets, so just terminate.
+ */
+ foreach_established_tcp_flow(flow) {
+ rc = tcp_flow_migrate_source(fd, &flow->tcp);
+ if (rc) {
+ err("Can't send data, flow %u: %s", FLOW_IDX(flow),
+ strerror_(-rc));
+ if (!first)
+ die("Inconsistent migration state, exiting");
+
+ return flow_migrate_source_rollback(c, FLOW_MAX, -rc);
+ }
+
+ first = false;
+ }
+
+ /* And then "extended" data (including window data we saved previously):
+ * the target needs to set repair mode on sockets before it can set
+ * this stuff, but it needs sockets (and flows) for that.
+ *
+ * This also closes sockets so that the target can start connecting
+ * theirs: you can't sendmsg() to queues (using the socket) if the
+ * socket is not connected (EPIPE), not even in repair mode. And the
+ * target needs to restore queues now because we're sending the data.
+ *
+ * So, no rollback here, just try as hard as we can. Tolerate per-flow
+ * failures but not if the stream might be inconsistent (reported here
+ * as EIO).
+ */
+ foreach_established_tcp_flow(flow) {
+ rc = tcp_flow_migrate_source_ext(fd, &flow->tcp);
+ if (rc) {
+ err("Extended data for flow %u: %s", FLOW_IDX(flow),
+ strerror_(-rc));
+
+ if (rc == -EIO)
+ die("Inconsistent migration state, exiting");
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * flow_migrate_target() - Receive flows and insert in flow table
+ * @c: Execution context
+ * @stage: Migration stage information (unused)
+ * @fd: Migration file descriptor
+ *
+ * Return: 0 on success, positive error code on failure
+ */
+int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage,
+ int fd)
+{
+ uint32_t count;
+ unsigned i;
+ int rc;
+
+ (void)stage;
+
+ if (read_all_buf(fd, &count, sizeof(count)))
+ return errno;
+
+ count = ntohl(count);
+ debug("Receiving %u flows", count);
+
+ if (!count)
+ return 0;
+
+ repair_wait(c);
+
+ if ((rc = flow_migrate_repair_all(c, true)))
+ return -rc;
+
+ repair_flush(c);
+
+ /* TODO: flow header with type, instead? */
+ for (i = 0; i < count; i++) {
+ rc = tcp_flow_migrate_target(c, fd);
+ if (rc) {
+ debug("Migration data failure at flow %u: %s, abort",
+ i, strerror_(-rc));
+ return -rc;
+ }
+ }
+
+ repair_flush(c);
+
+ for (i = 0; i < count; i++) {
+ rc = tcp_flow_migrate_target_ext(c, &flowtab[i].tcp, fd);
+ if (rc) {
+ debug("Migration data failure at flow %u: %s, abort",
+ i, strerror_(-rc));
+ return -rc;
+ }
+ }
+
+ return 0;
+}
+
+/**
* flow_init() - Initialise flow related data structures
*/
void flow_init(void)