diff options
Diffstat (limited to 'flow.c')
| -rw-r--r-- | flow.c | 123 |
1 files changed, 90 insertions, 33 deletions
@@ -20,6 +20,7 @@ #include "flow.h" #include "flow_table.h" #include "repair.h" +#include "epoll_ctl.h" const char *flow_state_str[] = { [FLOW_STATE_FREE] = "FREE", @@ -53,6 +54,16 @@ const uint8_t flow_proto[] = { static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, "flow_proto[] doesn't match enum flow_type"); +static const enum epoll_type flow_epoll[] = { + [FLOW_TCP] = EPOLL_TYPE_TCP, + [FLOW_TCP_SPLICE] = EPOLL_TYPE_TCP_SPLICE, + [FLOW_PING4] = EPOLL_TYPE_PING, + [FLOW_PING6] = EPOLL_TYPE_PING, + [FLOW_UDP] = EPOLL_TYPE_UDP, +}; +static_assert(ARRAY_SIZE(flow_epoll) == FLOW_NUM_TYPES, + "flow_epoll[] doesn't match enum flow_type"); + #define foreach_established_tcp_flow(flow) \ flow_foreach_of_type((flow), FLOW_TCP) \ if (!tcp_flow_is_established(&(flow)->tcp)) \ @@ -116,7 +127,7 @@ static_assert(ARRAY_SIZE(flow_proto) == FLOW_NUM_TYPES, unsigned flow_first_free; union flow flowtab[FLOW_MAX]; static const union flow *flow_new_entry; /* = NULL */ -static int epoll_id_to_fd[EPOLLFD_ID_MAX]; +static int epoll_id_to_fd[EPOLLFD_ID_SIZE]; /* Hash table to index it */ #define FLOW_HASH_LOAD 70 /* % */ @@ -341,17 +352,6 @@ static void flow_set_state(struct flow_common *f, enum flow_state state) } /** - * flow_in_epoll() - Check if flow is registered with an epoll instance - * @f: Flow to check - * - * Return: true if flow is registered with epoll, false otherwise - */ -bool flow_in_epoll(const struct flow_common *f) -{ - return f->epollid != EPOLLFD_ID_INVALID; -} - -/** * flow_epollfd() - Get the epoll file descriptor for a flow * @f: Flow to query * @@ -359,8 +359,6 @@ bool flow_in_epoll(const struct flow_common *f) */ int flow_epollfd(const struct flow_common *f) { - ASSERT(f->epollid < EPOLLFD_ID_MAX); - return epoll_id_to_fd[f->epollid]; } @@ -371,18 +369,35 @@ int flow_epollfd(const struct flow_common *f) */ void flow_epollid_set(struct flow_common *f, int epollid) { - ASSERT(epollid < EPOLLFD_ID_MAX); + ASSERT(epollid < EPOLLFD_ID_SIZE); f->epollid = epollid; } /** - * flow_epollid_clear() - Clear the flow epoll id - * @f: Flow to update + * flow_epoll_set() - Add or modify epoll registration for a flow socket + * @f: Flow to register socket for + * @command: epoll_ctl() command: EPOLL_CTL_ADD or EPOLL_CTL_MOD + * @events: epoll events to watch for + * @fd: File descriptor to register + * @sidei: Side index of the flow + * + * Return: 0 on success, -1 on error (from epoll_ctl()) */ -void flow_epollid_clear(struct flow_common *f) +int flow_epoll_set(const struct flow_common *f, int command, uint32_t events, + int fd, unsigned int sidei) { - f->epollid = EPOLLFD_ID_INVALID; + struct epoll_event ev; + union epoll_ref ref; + + ref.fd = fd; + ref.type = flow_epoll[f->type]; + ref.flowside = flow_sidx(f, sidei); + + ev.events = events; + ev.data.u64 = ref.u64; + + return epoll_ctl(flow_epollfd(f), command, fd, &ev); } /** @@ -392,7 +407,7 @@ void flow_epollid_clear(struct flow_common *f) */ void flow_epollid_register(int epollid, int epollfd) { - ASSERT(epollid < EPOLLFD_ID_MAX); + ASSERT(epollid < EPOLLFD_ID_SIZE); epoll_id_to_fd[epollid] = epollfd; } @@ -477,17 +492,20 @@ struct flowside *flow_initiate_sa(union flow *flow, uint8_t pif, * flow_target() - Determine where flow should forward to, and move to TGT * @c: Execution context * @flow: Flow to forward + * @rule_hint: Index of relevant forwarding rule, or -1 if unknown * @proto: Protocol * * Return: pointer to the target flowside information */ struct flowside *flow_target(const struct ctx *c, union flow *flow, - uint8_t proto) + int rule_hint, uint8_t proto) { - char estr[INANY_ADDRSTRLEN], fstr[INANY_ADDRSTRLEN]; + char estr[INANY_ADDRSTRLEN], ostr[INANY_ADDRSTRLEN]; struct flow_common *f = &flow->f; const struct flowside *ini = &f->side[INISIDE]; struct flowside *tgt = &f->side[TGTSIDE]; + const struct fwd_rule *rule = NULL; + const struct fwd_ports *fwd; uint8_t tgtpif = PIF_NONE; ASSERT(flow_new_entry == flow && f->state == FLOW_STATE_INI); @@ -502,29 +520,57 @@ struct flowside *flow_target(const struct ctx *c, union flow *flow, break; case PIF_SPLICE: - tgtpif = fwd_nat_from_splice(c, proto, ini, tgt); + if (proto == IPPROTO_TCP) + fwd = &c->tcp.fwd_out; + else if (proto == IPPROTO_UDP) + fwd = &c->udp.fwd_out; + else + goto nofwd; + + if (!(rule = fwd_rule_search(fwd, ini, rule_hint))) + goto norule; + + tgtpif = fwd_nat_from_splice(rule, proto, ini, tgt); break; case PIF_HOST: - tgtpif = fwd_nat_from_host(c, proto, ini, tgt); + if (proto == IPPROTO_TCP) + fwd = &c->tcp.fwd_in; + else if (proto == IPPROTO_UDP) + fwd = &c->udp.fwd_in; + else + goto nofwd; + + if (!(rule = fwd_rule_search(fwd, ini, rule_hint))) + goto norule; + + tgtpif = fwd_nat_from_host(c, rule, proto, ini, tgt); fwd_neigh_mac_get(c, &tgt->oaddr, f->tap_omac); break; - default: - flow_err(flow, "No rules to forward %s [%s]:%hu -> [%s]:%hu", - pif_name(f->pif[INISIDE]), - inany_ntop(&ini->eaddr, estr, sizeof(estr)), - ini->eport, - inany_ntop(&ini->oaddr, fstr, sizeof(fstr)), - ini->oport); + goto nofwd; } if (tgtpif == PIF_NONE) - return NULL; + goto nofwd; f->pif[TGTSIDE] = tgtpif; flow_set_state(f, FLOW_STATE_TGT); return tgt; + +norule: + /* This shouldn't happen, because if there's no rule for it we should + * have no listening socket that would let us get here + */ + flow_dbg(flow, "Missing forward rule"); + flow_log_details_(f, LOG_DEBUG, f->state); + +nofwd: + flow_err(flow, "No rules to forward %s %s [%s]:%hu -> [%s]:%hu", + pif_name(f->pif[INISIDE]), ipproto_name(proto), + inany_ntop(&ini->eaddr, estr, sizeof(estr)), ini->eport, + inany_ntop(&ini->oaddr, ostr, sizeof(ostr)), ini->oport); + return NULL; } /** @@ -600,7 +646,6 @@ union flow *flow_alloc(void) flow_new_entry = flow; memset(flow, 0, sizeof(*flow)); - flow_epollid_clear(&flow->f); flow_set_state(&flow->f, FLOW_STATE_NEW); return flow; @@ -978,6 +1023,9 @@ static int flow_migrate_source_rollback(struct ctx *c, unsigned bound, int ret) debug("...roll back migration"); + if (fwd_listen_sync(c, &c->tcp.fwd_in, PIF_HOST, IPPROTO_TCP) < 0) + die("Failed to re-establish listening sockets"); + foreach_established_tcp_flow(flow) { if (FLOW_IDX(flow) >= bound) break; @@ -1102,6 +1150,15 @@ int flow_migrate_source(struct ctx *c, const struct migrate_stage *stage, return flow_migrate_source_rollback(c, FLOW_MAX, rc); } + /* HACK: A local to local migrate will fail if the origin passt has the + * listening sockets still open when the destination passt tries to bind + * them. This does mean there's a window where we lost our listen()s, + * even if the migration is rolled back later. The only way to really + * fix that is to not allow local to local migration, which arguably we + * should (use namespaces for testing instead). */ + debug("Stop listen()s"); + fwd_listen_close(&c->tcp.fwd_in); + debug("Sending %u flows", ntohl(count)); if (!count) |
