8000 Worker fixes by rjarry · Pull Request #175 · DPDK/grout · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Worker fixes #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions modules/infra/api/gr_infra.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ struct gr_infra_stat {

#define GR_INFRA_MODULE 0xacdc

//! Interface events.
typedef enum {
IFACE_EVENT_UNKNOWN = EVENT_TYPE(GR_INFRA_MODULE, 0x0000),
IFACE_EVENT_POST_ADD = EVENT_TYPE(GR_INFRA_MODULE, 0x0001),
IFACE_EVENT_PRE_REMOVE = EVENT_TYPE(GR_INFRA_MODULE, 0x0002),
IFACE_EVENT_POST_RECONFIG = EVENT_TYPE(GR_INFRA_MODULE, 0x0003),
IFACE_EVENT_STATUS_UP = EVENT_TYPE(GR_INFRA_MODULE, 0x0004),
IFACE_EVENT_STATUS_DOWN = EVENT_TYPE(GR_INFRA_MODULE, 0x0005),
} iface_event_t;

// ifaces ///////////////////////////////////////////////////////////////////////
#define GR_INFRA_IFACE_ADD REQUEST_TYPE(GR_INFRA_MODULE, 0x0001)

Expand Down Expand Up @@ -174,7 +184,7 @@ struct gr_infra_iface_set_req {

// struct gr_infra_iface_set_resp { };

// iface rxqs ///////////////////////////////////////////////////////////////////
// port rxqs ///////////////////////////////////////////////////////////////////
#define GR_INFRA_RXQ_LIST REQUEST_TYPE(GR_INFRA_MODULE, 0x0010)

// struct gr_infra_rxq_list_req { };
Expand Down Expand Up @@ -268,20 +278,5 @@ struct gr_infra_packet_trace_set_req {
#define GR_INFRA_PACKET_LOG_CLEAR REQUEST_TYPE(GR_INFRA_MODULE, 0x0044)
// struct gr_infra_packet_log_clear_req { };
// struct gr_infra_packet_log_clear_resp { };
//
typedef enum {
IFACE_EVENT_UNKNOWN = EVENT_TYPE(GR_INFRA_MODULE, 0x0000),
IFACE_EVENT_POST_ADD = EVENT_TYPE(GR_INFRA_MODULE, 0x0001),
IFACE_EVENT_PRE_REMOVE = EVENT_TYPE(GR_INFRA_MODULE, 0x0002),
IFACE_EVENT_POST_RECONFIG = EVENT_TYPE(GR_INFRA_MODULE, 0x0003),
IFACE_EVENT_STATUS_UP = EVENT_TYPE(GR_INFRA_MODULE, 0x0004),
IFACE_EVENT_STATUS_DOWN = EVENT_TYPE(GR_INFRA_MODULE, 0x0005),
} iface_event_t;

typedef enum {
NEXTHOP_EVENT_NEW = EVENT_TYPE(GR_INFRA_MODULE, 0x0100),
NEXTHOP_EVENT_DELETE = EVENT_TYPE(GR_INFRA_MODULE, 0x0101),
NEXTHOP_EVENT_UPDATE = EVENT_TYPE(GR_INFRA_MODULE, 0x0102),
} nexthop_event_t;

#endif
7 changes: 7 additions & 0 deletions modules/infra/api/gr_nexthop.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ struct gr_nexthop {
clock_t last_reply; //!< timestamp when last update was received
};

//! Nexthop events.
typedef enum {
NEXTHOP_EVENT_NEW = EVENT_TYPE(GR_INFRA_MODULE, 0x0100),
NEXTHOP_EVENT_DELETE = EVENT_TYPE(GR_INFRA_MODULE, 0x0101),
NEXTHOP_EVENT_UPDATE = EVENT_TYPE(GR_INFRA_MODULE, 0x0102),
} nexthop_event_t;

#define gr_nh_flags_foreach(f, flags) \
for (gr_nh_flags_t __i = 0, f = GR_BIT16(0); __i < sizeof(gr_nh_flags_t) * CHAR_BIT; \
f = GR_BIT16(++__i)) \
Expand Down
3 changes: 2 additions & 1 deletion modules/infra/control/control_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ static void control_output_init(struct event_base *ev_base) {
if (ctrlout_ev == NULL)
ABORT("event_new() failed");

pthread_create(&thread_id, &attr, cond_wait_to_event, NULL);
if (pthread_create(&thread_id, &attr, cond_wait_to_event, NULL))
ABORT("pthread_create() failed");
}

static void control_output_fini(struct event_base *) {
Expand Down
45 changes: 26 additions & 19 deletions modules/infra/control/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,31 +229,38 @@ static int worker_graph_new(struct worker *worker, uint8_t index) {
return errno_set(-ret);
}

int worker_graph_reload_all(void) {
struct worker *worker;
unsigned next;
int worker_graph_reload(struct worker *worker) {
unsigned n 8000 ext = !atomic_load(&worker->cur_config);
int ret;

STAILQ_FOREACH (worker, &workers, next) {
next = !atomic_load(&worker->cur_config);
if ((ret = worker_graph_new(worker, next)) < 0)
return errno_log(-ret, "worker_graph_new");

if ((ret = worker_graph_new(worker, next)) < 0)
return errno_log(-ret, "worker_graph_new");
// wait for datapath worker to pickup the config update
atomic_store(&worker->next_config, next);
while (atomic_load(&worker->cur_config) != next)
usleep(500);

// wait for datapath worker to pickup the config update
atomic_store_explicit(&worker->next_config, next, memory_order_release);
while (atomic_load_explicit(&worker->cur_config, memory_order_acquire) != next)
usleep(500);
// free old config
next = !next;

// free old config
next = !next;
if (worker->graph[next] != NULL) {
node_data_reset(worker->graph[next]->name);
if ((ret = rte_graph_destroy(worker->graph[next]->id)) < 0)
errno_log(-ret, "rte_graph_destroy");
worker->graph[next] = NULL;
}

if (worker->graph[next] != NULL) {
node_data_reset(worker->graph[next]->name);
if ((ret = rte_graph_destroy(worker->graph[next]->id)) < 0)
errno_log(-ret, "rte_graph_destroy");
worker->graph[next] = NULL;
}
return 0;
}

int worker_graph_reload_all(void) {
struct worker *worker;
int ret;

STAILQ_FOREACH (worker, &workers, next) {
if ((ret = worker_graph_reload(worker)) < 0)
return ret;
}

return 0;
Expand Down
1 change: 1 addition & 0 deletions modules/infra/control/graph_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <gr_worker.h>

int worker_graph_reload(struct worker *);
int worker_graph_reload_all(void);
void worker_graph_free(struct worker *);

Expand Down
27 changes: 17 additions & 10 deletions modules/infra/control/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,14 @@ static struct rte_eth_conf default_port_config = {
};

static void port_queue_assign(struct iface_info_port *p) {
int socket_id = SOCKET_ID_ANY;
struct worker *worker, *default_worker = NULL;
// XXX: can we assume there will never be more than 64 rxqs per port?
int socket_id = SOCKET_ID_ANY;
uint64_t rxq_ids = 0;
uint16_t txq = 0;

// XXX: can we assume there will never be more than 64 rxqs per port?
assert(p->n_rxq <= 64);

if (numa_available() != -1)
socket_id = rte_eth_dev_socket_id(p->port_id);

Expand All @@ -92,37 +94,42 @@ static void port_queue_assign(struct iface_info_port *p) {
.queue_id = txq,
.enabled = false,
};
// remove the existing txq assigned to this worker, if any
for (size_t i = 0; i < gr_vec_len(worker->txqs); i++) {
if (worker->txqs[i].port_id == p->port_id) {
// ensure no duplicates
gr_vec_del_swap(worker->txqs, i);
i--;
}
}
// assign one txq to every worker
// assign one (dedicated) txq to every worker
gr_vec_add(worker->txqs, tx_qmap);
txq++;

// walk through all assigned rxqs for this worker
for (size_t i = 0; i < gr_vec_len(worker->rxqs); i++) {
struct queue_map *qmap = &worker->rxqs[i];
if (qmap->port_id == p->port_id) {
if (qmap->queue_id < p->n_rxq) {
// rxq already assigned 9E81 to a worker
rxq_ids |= 1 << qmap->queue_id;
// memorize that this rxq is already assigned
rxq_ids |= GR_BIT64(qmap->queue_id);
} else {
// remove extraneous rxq
// number of rxqs was reduced for this port
// remove the now invalid rxq from this worker
gr_vec_del_swap(worker->rxqs, i);
i--;
}
}
}
// get an handle on the default worker for unassigned rxqs
if (socket_id == SOCKET_ID_ANY || socket_id == numa_node_of_cpu(worker->cpu_id)) {
default_worker = worker;
}
}

// assign unassigned rxqs to the default worker, if any
assert(default_worker != NULL);
for (uint16_t rxq = 0; rxq < p->n_rxq; rxq++) {
if (rxq_ids & (1 << rxq))
if (rxq_ids & GR_BIT64(rxq))
continue;
struct queue_map rx_qmap = {
.port_id = p->port_id,
Expand All @@ -134,10 +141,10 @@ static void port_queue_assign(struct iface_info_port *p) {
}

static int port_configure(struct iface_info_port *p) {
int socket_id = SOCKET_ID_ANY;
struct rte_eth_conf conf = default_port_config;
uint16_t rxq_size, txq_size;
int socket_id = SOCKET_ID_ANY;
struct rte_eth_dev_info info;
uint16_t rxq_size, txq_size;
uint32_t mbuf_count;
int ret;

Expand Down
28 changes: 16 additions & 12 deletions modules/infra/control/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

#include <event2/event.h>
#include <numa.h>
#include <rte_atomic.h>
#include <rte_build_config.h>
#include <rte_common.h>
#include <rte_ethdev.h>
Expand Down Expand Up @@ -68,7 +67,7 @@ int worker_create(unsigned cpu_id) {
STAILQ_INSERT_TAIL(&workers, worker, next);

// wait until thread has initialized lcore_id
while (!atomic_load_explicit(&worker->started, memory_order_acquire))
while (!atomic_load(&worker->started))
usleep(500);

pthread_attr_destroy(&attr);
Expand All @@ -84,7 +83,7 @@ int worker_destroy(unsigned cpu_id) {

STAILQ_REMOVE(&workers, worker, worker, next);

atomic_store_explicit(&worker->shutdown, true, memory_order_release);
atomic_store(&worker->shutdown, true);
pthread_join(worker->thread, NULL);
worker_graph_free(worker);
gr_vec_free(worker->rxqs);
Expand Down Expand Up @@ -217,8 +216,8 @@ int port_plug(uint16_t port_id) {

int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
struct worker *src_worker, *dst_worker;
bool num_workers_changed;
struct queue_map *qmap;
bool reconfig;
int ret;

if (cpu_id == rte_get_main_lcore())
Expand All @@ -239,7 +238,7 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
}
return errno_set(ENODEV);
move:
reconfig = false;
num_workers_changed = false;

// prepare destination worker
dst_worker = worker_find(cpu_id);
Expand All @@ -248,7 +247,7 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
if ((ret = worker_create(cpu_id)) < 0)
return ret;
dst_worker = worker_find(cpu_id);
reconfig = true;
num_workers_changed = true;
}
if (dst_worker == NULL)
return errno_set(errno);
Expand All @@ -266,20 +265,23 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
if (gr_vec_len(src_worker->rxqs) == 0) {
if ((ret = worker_destroy(src_worker->cpu_id)) < 0)
return ret;
reconfig = true;
num_workers_changed = true;
} else {
// ensure source worker has released the rxq
if ((ret = worker_graph_reload(src_worker)) < 0)
return ret;
}

// assign to dst_worker *before* reconfiguring ports
// to avoid the dangling rxq to be assigned twice
// now it is safe to assign rxq to dst_worker
struct queue_map rx_qmap = {
.port_id = port_id,
.queue_id = rxq_id,
.enabled = true,
};
gr_vec_add(dst_worker->rxqs, rx_qmap);

if (reconfig) {
// number of workers changed, adjust number of tx queues
if (num_workers_changed) {
// adjust number of tx queues
struct gr_iface_info_port p = {0};
struct iface *iface = NULL;

Expand All @@ -294,9 +296,11 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
if (ret < 0)
return ret;
}
// all workers were reloaded already
return 0;
}

return worker_graph_reload_all();
return worker_graph_reload(dst_worker);
}

static int lcore_usage_cb(unsigned int lcore_id, struct rte_lcore_usage *usage) {
Expand Down
2 changes: 2 additions & 0 deletions modules/infra/control/worker_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct iface *iface_next(gr_iface_type_t /*type_id*/, const struct iface *prev)
return NULL;
}

mock_func(int, worker_graph_reload(struct worker *));
mock_func(int, worker_graph_reload_all(void));
mock_func(void, worker_graph_free(struct worker *));
mock_func(void *, gr_datapath_loop(void *));
Expand Down Expand Up @@ -179,6 +180,7 @@ static int teardown(void **) {

static void common_mocks(void) {
will_return_maybe(worker_graph_free, 0);
will_return_maybe(worker_graph_reload, 0);
will_return_maybe(worker_graph_reload_all, 0);
will_return_maybe(__wrap_pthread_create, 0);
will_return_maybe(__wrap_pthread_join, 0);
Expand Down
11 changes: 5 additions & 6 deletions modules/infra/datapath/main_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <gr_module.h>
#include <gr_worker.h>

#include <rte_atomic.h>
#include <rte_common.h>
#include <rte_eal.h>
#include <rte_errno.h>
Expand Down Expand Up @@ -151,15 +150,15 @@ void *gr_datapath_loop(void *priv) {
static_assert(atomic_is_lock_free(&w->shutdown));
static_assert(atomic_is_lock_free(&w->cur_config));
static_assert(atomic_is_lock_free(&w->stats_reset));
atomic_store_explicit(&w->started, true, memory_order_release);
atomic_store(&w->started, true);

reconfig:
if (w->shutdown)
if (atomic_load(&w->shutdown))
goto shutdown;

cur = atomic_load_explicit(&w->next_config, memory_order_acquire);
cur = atomic_load(&w->next_config);
graph = w->graph[cur];
atomic_store_explicit(&w->cur_config, cur, memory_order_release);
atomic_store(&w->cur_config, cur);

if (graph == NULL) {
usleep(1000);
Expand Down Expand Up @@ -188,7 +187,7 @@ void *gr_datapath_loop(void *priv) {
rte_graph_cluster_stats_get(ctx.stats, false);
timestamp_tmp = rte_rdtsc();
cycles = timestamp_tmp - timestamp;
max_sleep_us = atomic_load_explicit(&w->max_sleep_us, memory_order_relaxed);
max_sleep_us = atomic_load(&w->max_sleep_us);
if (ctx.last_count == 0 && max_sleep_us > 0) {
sleep = sleep == max_sleep_us ? sleep : (sleep + 1);
usleep(sleep);
Expand Down
0