10000 Fix scheduler issues on NUMA servers by Bouncner · Pull Request #2610 · hyrise/hyrise · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix scheduler issues on NUMA servers #2610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
97cdbc9
Jo
Bouncner Aug 24, 2023
ffbc337
blub
Bouncner Aug 24, 2023
85a7cde
Jojo
Bouncner Aug 24, 2023
4c3dc44
Working
Bouncner Aug 24, 2023
63d80ab
Narf
Bouncner Aug 24, 2023
741cfdd
Update src/lib/scheduler/node_queue_scheduler.cpp
dey4ss Aug 24, 2023
b70b061
Update src/lib/scheduler/node_queue_scheduler.cpp
dey4ss Aug 24, 2023
9888446
Update node_queue_scheduler.cpp
Bouncner Aug 24, 2023
434a77e
Update hyriseServer_test.py
Bouncner Aug 25, 2023
5024c4a
add scheduler shutdown test with spinning task
dey4ss Aug 25, 2023
14d26c5
next timeout
dey4ss Aug 26, 2023
1952cb3
increase scheduler timeout, fix test
dey4ss Aug 26, 2023
e58fc40
Remove stress test for now. We cannot wait sufficiently long.
Bouncner Sep 19, 2023
fa08b65
correct use of shutdown delay
dey4ss Sep 19, 2023
adb333e
Update hyriseServer_test.py
Bouncner Sep 20, 2023
e6b4274
Update hyriseServer_test.py
Bouncner Sep 20, 2023
05ab60d
Update hyriseServer_test.py
Bouncner Sep 20, 2023
4e59e96
Update node_queue_scheduler.cpp
Bouncner Sep 20, 2023
482d8b2
Update node_queue_scheduler.cpp
Bouncner Sep 20, 2023
47fa090
Update node_queue_scheduler.cpp
Bouncner Sep 20, 2023
505bd4b
Update node_queue_scheduler.cpp
Bouncner Sep 22, 2023
2d89020
Update node_queue_scheduler.cpp
Bouncner Sep 25, 2023
42bda4f
Intermediate
Bouncner Oct 3, 2023
f6facfa
Merge, merge, merge
Bouncner Oct 4, 2023
a97db92
Comment
Bouncner Oct 5, 2023
9355763
Merge branch 'master' into martin/fix/numa_nodes
dey4ss Oct 6, 2023
9668796
Merge branch 'master' into martin/fix/numa_nodes
dey4ss Oct 6, 2023
eb20bac
Move.
Bouncner Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 84 additions & 26 deletions src/lib/scheduler/node_queue_scheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#include "node_queue_scheduler.hpp"

#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <utility>
#include <vector>

Expand All @@ -12,6 +16,7 @@
#include "job_task.hpp"
#include "shutdown_task.hpp"
#include "task_queue.hpp"
#include "types.hpp"
#include "uid_allocator.hpp"
#include "utils/assert.hpp"
#include "worker.hpp"
Expand All @@ -35,25 +40,32 @@ void NodeQueueScheduler::begin() {

_workers.reserve(Hyrise::get().topology.num_cpus());
_node_count = Hyrise::get().topology.nodes().size();
_queues.reserve(_node_count);
_queues.resize(_node_count);
_workers_per_node.reserve(_node_count);

for (auto node_id = NodeID{0}; node_id < Hyrise::get().topology.nodes().size(); ++node_id) {
auto queue = std::make_shared<TaskQueue>(node_id);

_queues.emplace_back(queue);

const auto& topology_node = Hyrise::get().topology.nodes()[node_id];

for (const auto& topology_cpu : topology_node.cpus) {
_workers.emplace_back(
std::make_shared<Worker>(queue, WorkerID{_worker_id_allocator->allocate()}, topology_cpu.cpu_id));
}

// Tracked per node as core restrictions can lead to unbalanced core counts.
_workers_per_node.emplace_back(topology_node.cpus.size());

// Only create queues for nodes with CPUs assigned. Otherwise, no workers are active on these nodes and we might
// add tasks to these queues that can never be directly pulled and must be stolen by other nodes' workers. As
// ShutdownTasks are not stealable, placing tasks on nodes without workers can lead to failing shutdowns.
if (!topology_node.cpus.empty()) {
_active_nodes.push_back(node_id);
auto queue = std::make_shared<TaskQueue>(node_id);
_queues[node_id] = queue;

for (const auto& topology_cpu : topology_node.cpus) {
// TODO(anybody): Place queues on the actual NUMA node once we have NUMA-aware allocators.
_workers.emplace_back(
std::make_shared<Worker>(queue, WorkerID{_worker_id_allocator->allocate()}, topology_cpu.cpu_id));
}
}
}

Assert(!_active_nodes.empty(), "None of the system nodes has active workers.");
_active = true;

for (auto& worker : _workers) {
Expand All @@ -63,7 +75,24 @@ void NodeQueueScheduler::begin() {
}

void NodeQueueScheduler::wait_for_all_tasks() {
// To check if the system is still processing incoming jobs, we store the previous task count and loop-wait until no
// new jobs are created anymore.
auto previous_task_count = TaskID::base_type{_task_counter.load()};

auto progressless_loop_count = size_t{0};
auto previous_finished_task_count = TaskID::base_type{0};

while (true) {
const auto current_task_count = _task_counter.load();
if (current_task_count > previous_task_count) {
// System is still processing new tasks (can happen when, e.g., currently running tasks schedule new tasks):
// loop-wait until task counter is stable (this check can still fail in edge cases, but we make the simple
// assumption that nobody calls wait_for_all_tasks() if there is still significant processing ongoing).
previous_task_count = current_task_count;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

auto num_finished_tasks = uint64_t{0};
for (const auto& worker : _workers) {
num_finished_tasks += worker->num_finished_tasks();
Expand All @@ -73,18 +102,43 @@ void NodeQueueScheduler::wait_for_all_tasks() {
break;
}

// Ensure we do not wait forever for tasks that cannot be processed or are stuck. We currently wait 1 hour (3600
// seconds). This wait time allows us to run TPC-H with scale factor 1000 and two cores without issues, which we
// consider acceptable right now. If large scale factors or slower data access paths (e.g., data on secondary
// storage) become relevant, the current mechanism and general query processing probably need to be re-evaluated
// (e.g., ensure operators split their work into smaller tasks).
if (progressless_loop_count >= 360'000) {
const auto remaining_task_count = _task_counter - num_finished_tasks;
auto message = std::stringstream{};
// We waited for 1 h (360'000 * 10 ms).
message << "Timeout: no progress while waiting for all scheduled tasks to be processed. " << remaining_task_count
<< " task(s) still remaining without progress for 1 h now, quitting.";
Fail(message.str());
}

if (previous_finished_task_count == num_finished_tasks) {
++progressless_loop_count;
} else {
previous_finished_task_count = num_finished_tasks;
progressless_loop_count = 0;
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

for (auto& queue : _queues) {
for (const auto& queue : _queues) {
if (!queue) {
continue;
}

auto queue_check_runs = size_t{0};
while (!queue->empty()) {
// The following assert checks that we are not looping forever. The empty() check can be inaccurate for
// concurrent queues when many tiny tasks have been scheduled (see MergeSort scheduler test). When this assert is
// triggered in other situations, there have probably been new tasks added after wait_for_all_tasks() was called.
Assert(queue_check_runs < 1'000, "Queue is not empty but all registered tasks have already been processed.");
Assert(queue_check_runs < 6'000, "Queue is not empty but all registered tasks have already been processed.");

std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
++queue_check_runs;
}
}
Expand Down Expand Up @@ -112,8 +166,8 @@ void NodeQueueScheduler::finish() {

auto check_runs = size_t{0};
while (_active_worker_count.load() > 0) {
Assert(check_runs < 1'00 8000 0, "Timeout: not all shut down tasks have been processed.");
std::this_thread::sleep_for(std::chrono::milliseconds(1));
Assert(check_runs < 3'000, "Timeout: not all shut down tasks have been processed.");
std::this_thread::sleep_for(std::chrono::milliseconds(10));
++check_runs;
}

Expand Down Expand Up @@ -162,9 +216,11 @@ void NodeQueueScheduler::schedule(std::shared_ptr<AbstractTask> task, NodeID pre
}

NodeID NodeQueueScheduler::determine_queue_id(const NodeID preferred_node_id) const {
const auto active_node_count = _active_nodes.size();

// Early out: no need to check for preferred node or other queues, if there is only a single node queue.
if (_node_count == 1) {
return NodeID{0};
if (active_node_count == 1) {
return _active_nodes[0];
}

if (preferred_node_id != CURRENT_NODE_ID) {
Expand All @@ -177,19 +233,21 @@ NodeID NodeQueueScheduler::determine_queue_id(const NodeID preferred_node_id) co
return worker->queue()->node_id();
}

// Initial min values with Node 0.
auto min_load_node_id = NodeID{0};
auto min_load = _queues[0]->estimate_load();
// Initialize minimal values with first active node.
auto min_load_node_id = _active_nodes[0];
auto min_load = _queues[min_load_node_id]->estimate_load();

// When the current load of node 0 is small (less tasks than threads on first node), do not check other queues.
if (min_load < _workers_per_node[0]) {
return NodeID{0};
// When the load of the initial node is small (less tasks than threads on first node), do not check other queues.
if (min_load < _workers_per_node[min_load_node_id]) {
return min_load_node_id;
}

for (auto node_id = NodeID{1}; node_id < _node_count; ++node_id) {
// Check remaining nodes.
for (auto node_id_offset = size_t{1}; node_id_offset < active_node_count; ++node_id_offset) {
const auto node_id = _active_nodes[node_id_offset];
const auto queue_load = _queues[node_id]->estimate_load();
if (queue_load < min_load) {
min_load_node_id = node_id;
min_load_node_id = _active_nodes[node_id];
min_load = queue_load;
}
}
Expand All @@ -208,7 +266,7 @@ void NodeQueueScheduler::_group_tasks(const std::vector<std::shared_ptr<Abstract
auto round_robin_counter = 0;
auto common_node_id = std::optional<NodeID>{};

std::vector<std::shared_ptr<AbstractTask>> grouped_tasks(NUM_GROUPS);
auto grouped_tasks = std::vector<std::shared_ptr<AbstractTask>>(NUM_GROUPS);
for (const auto& task : tasks) {
if (!task->predecessors().empty() || !task->successors().empty() || dynamic_cast<ShutdownTask*>(&*task)) {
// Do not group tasks that either have precessors/successors or are ShutdownTasks.
Expand Down
1 change: 1 addition & 0 deletions src/lib/scheduler/node_queue_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class NodeQueueScheduler : public AbstractScheduler {
std::shared_ptr<UidAllocator> _worker_id_allocator;
std::vector<std::shared_ptr<TaskQueue>> _queues;
std::vector<std::shared_ptr<Worker>> _workers;
std::vector<NodeID> _active_nodes;

std::atomic_bool _active{false};
std::atomic_int64_t _active_worker_count{0};
Expand Down
2 changes: 1 addition & 1 deletion src/lib/scheduler/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void Worker::_work(const AllowSleep allow_sleep) {
if (!task) {
// Simple work stealing without explicitly transferring data between nodes.
for (const auto& queue : Hyrise::get().scheduler()->queues()) {
if (queue == _queue) {
if (!queue || queue == _queue) {
continue;
}

Expand Down
0