diff options
Diffstat (limited to 'flow.c')
-rw-r--r-- | flow.c | 243 |
1 files changed, 243 insertions, 0 deletions
@@ -19,6 +19,7 @@ #include "inany.h" #include "flow.h" #include "flow_table.h" +#include "repair.h" const char *flow_state_str[] = { [FLOW_STATE_FREE] = "FREE", @@ -52,6 +53,35 @@ const uint8_t flow_proto[] = { static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, "flow_proto[] doesn't match enum flow_type"); +#define foreach_flow(i, flow, bound) \ + for ((i) = 0, (flow) = &flowtab[(i)]; \ + (i) < (bound); \ + (i)++, (flow) = &flowtab[(i)]) \ + if ((flow)->f.state == FLOW_STATE_FREE) \ + (i) += (flow)->free.n - 1; \ + else + +#define foreach_active_flow(i, flow, bound) \ + foreach_flow((i), (flow), (bound)) \ + if ((flow)->f.state != FLOW_STATE_ACTIVE) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + +#define foreach_tcp_flow(i, flow, bound) \ + foreach_active_flow((i), (flow), (bound)) \ + if ((flow)->f.type != FLOW_TCP) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + +#define foreach_established_tcp_flow(i, flow, bound) \ + foreach_tcp_flow((i), (flow), (bound)) \ + if (!tcp_flow_is_established(&(flow)->tcp)) \ + /* NOLINTNEXTLINE(bugprone-branch-clone) */ \ + continue; \ + else + /* Global Flow Table */ /** @@ -875,6 +905,219 @@ void flow_defer_handler(const struct ctx *c, const struct timespec *now) } /** + * flow_migrate_source_rollback() - Disable repair mode, return failure + * @c: Execution context + * @max_flow: Maximum index of affected flows + * @ret: Negative error code + * + * Return: @ret + */ +static int flow_migrate_source_rollback(struct ctx *c, unsigned max_flow, + int ret) +{ + union flow *flow; + unsigned i; + + debug("...roll back migration"); + + foreach_established_tcp_flow(i, flow, max_flow) + if (tcp_flow_repair_off(c, &flow->tcp)) + die("Failed to roll back TCP_REPAIR mode"); + + if (repair_flush(c)) + die("Failed to roll back TCP_REPAIR mode"); + + return ret; +} + +/** + * flow_migrate_repair_all() - Turn repair mode on or off for all flows + * @c: Execution context + * @enable: Switch repair mode on if set, off otherwise + * + * Return: 0 on success, negative error code on failure + */ +static int flow_migrate_repair_all(struct ctx *c, bool enable) +{ + union flow *flow; + unsigned i; + int rc; + + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + if (enable) + rc = tcp_flow_repair_on(c, &flow->tcp); + else + rc = tcp_flow_repair_off(c, &flow->tcp); + + if (rc) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, i, rc); + } + } + + if ((rc = repair_flush(c))) { + debug("Can't %s repair mode: %s", + enable ? "enable" : "disable", strerror_(-rc)); + return flow_migrate_source_rollback(c, i, rc); + } + + return 0; +} + +/** + * flow_migrate_source_pre() - Prepare flows for migration: enable repair mode + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor (unused) + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source_pre(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + int rc; + + (void)stage; + (void)fd; + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + return 0; +} + +/** + * flow_migrate_source() - Dump all the remaining information and send data + * @c: Execution context (unused) + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count = 0; + bool first = true; + union flow *flow; + unsigned i; + int rc; + + (void)c; + (void)stage; + + foreach_established_tcp_flow(i, flow, FLOW_MAX) + count++; + + count = htonl(count); + if (write_all_buf(fd, &count, sizeof(count))) { + rc = errno; + err_perror("Can't send flow count (%u)", ntohl(count)); + return flow_migrate_source_rollback(c, FLOW_MAX, rc); + } + + debug("Sending %u flows", ntohl(count)); + + /* Dump and send information that can be stored in the flow table. + * + * Limited rollback options here: if we fail to transfer any data (that + * is, on the first flow), undo everything and resume. Otherwise, the + * stream might now be inconsistent, and we might have closed listening + * TCP sockets, so just terminate. + */ + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + rc = tcp_flow_migrate_source(fd, &flow->tcp); + if (rc) { + err("Can't send data, flow %u: %s", i, strerror_(-rc)); + if (!first) + die("Inconsistent migration state, exiting"); + + return flow_migrate_source_rollback(c, FLOW_MAX, -rc); + } + + first = false; + } + + /* And then "extended" data (including window data we saved previously): + * the target needs to set repair mode on sockets before it can set + * this stuff, but it needs sockets (and flows) for that. + * + * This also closes sockets so that the target can start connecting + * theirs: you can't sendmsg() to queues (using the socket) if the + * socket is not connected (EPIPE), not even in repair mode. And the + * target needs to restore queues now because we're sending the data. + * + * So, no rollback here, just try as hard as we can. Tolerate per-flow + * failures but not if the stream might be inconsistent (reported here + * as EIO). + */ + foreach_established_tcp_flow(i, flow, FLOW_MAX) { + rc = tcp_flow_migrate_source_ext(fd, i, &flow->tcp); + if (rc) { + err("Extended data for flow %u: %s", i, strerror_(-rc)); + + if (rc == -EIO) + die("Inconsistent migration state, exiting"); + } + } + + return 0; +} + +/** + * flow_migrate_target() - Receive flows and insert in flow table + * @c: Execution context + * @stage: Migration stage information (unused) + * @fd: Migration file descriptor + * + * Return: 0 on success, positive error code on failure + */ +int flow_migrate_target(struct ctx *c, const struct migrate_stage *stage, + int fd) +{ + uint32_t count; + unsigned i; + int rc; + + (void)stage; + + if (read_all_buf(fd, &count, sizeof(count))) + return errno; + + count = ntohl(count); + debug("Receiving %u flows", count); + + if ((rc = flow_migrate_repair_all(c, true))) + return -rc; + + repair_flush(c); + + /* TODO: flow header with type, instead? */ + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target(c, fd); + if (rc) { + debug("Migration data failure at flow %u: %s, abort", + i, strerror_(-rc)); + return -rc; + } + } + + repair_flush(c); + + for (i = 0; i < count; i++) { + rc = tcp_flow_migrate_target_ext(c, flowtab + i, fd); + if (rc) { + debug("Migration data failure at flow %u: %s, abort", + i, strerror_(-rc)); + return -rc; + } + } + + return 0; +} + +/** * flow_init() - Initialise flow related data structures */ void flow_init(void) |