10000 openmp: implement real openmp fork/join model by csegarragonz · Pull Request #814 · faasm/faasm · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

openmp: implement real openmp fork/join model #814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e088a09
openmp: implement real openmp fork/join model
csegarragonz Jan 8, 2024
3530576
openmp: new fork-join model works now
csegarragonz Jan 9, 2024
e51ca62
openmp: fix fork-join for teams of size 1
csegarragonz Jan 10, 2024
7f57ce6
gh: bump faabric version
csegarragonz Jan 10, 2024
a3fc460
nit: remove comment
csegarragonz Jan 10, 2024
ff98ea9
runner(microbench): set singlehost flag in the request
csegarragonz Jan 10, 2024
da1fd46
openmp: do not pre-load scheduling decision (we do not need to anymore)
csegarragonz Jan 10, 2024
54c5217
tests(openmp): set singlehost flag in the request
csegarragonz Jan 10, 2024
c316ab6
tests(openmp): disable exact error message text checking
csegarragonz Jan 10, 2024
c714c27
openmp: propagate thisThreadReq downstream
csegarragonz Jan 10, 2024
8c47550
openmp: adapt dirty tracking mechanism to new fork-join model
csegarragonz Jan 11, 2024
ae5610a
nit: refactor to singlehosthint
csegarragonz Jan 11, 2024
96c98d3
dist-tests: override the cpu count when we set the local slots
csegarragonz Jan 11, 2024
da0db39
tests: set singlehosthint appropriately
csegarragonz Jan 11, 2024
8796616
faasmctl: bump to version 0.25.0
csegarragonz Jan 16, 2024
c4e414b
threads: fix thread execution
csegarragonz Jan 16, 2024
7fc8f86
tests(openmp): set singlehosthint in nested omp test
csegarragonz Jan 16, 2024
6d12053
faabric: fix tsan error
csegarragonz Jan 16, 2024
2ba090d
faabric: bump dep after merge to main
csegarragonz Jan 16, 2024
041a0ba
nits: run clang format
csegarragonz Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ FAASM_VERSION=0.18.0
FAASM_CLI_IMAGE=faasm.azurecr.io/cli:0.18.0
FAASM_WORKER_IMAGE=faasm.azurecr.io/worker:0.18.0

FAABRIC_VERSION=0.12.0
FAABRIC_PLANNER_IMAGE=faasm.azurecr.io/planner:0.12.0
FAABRIC_VERSION=0.13.0
FAABRIC_PLANNER_IMAGE=faasm.azurecr.io/planner:0.13.0

CPP_VERSION=0.3.1
CPP_CLI_IMAGE=faasm.azurecr.io/cpp-sysroot:0.3.1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/azure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
./bin/inv_wrapper.sh cluster.credentials --name ${{ env.CLUSTER_NAME }}
working-directory: ${{ github.workspace }}/experiment-base
- name: "Install faasmctl"
run: source ./bin/workon.sh && pip3 install faasmctl==0.24.0
run: source ./bin/workon.sh && pip3 install faasmctl==0.25.0
working-directory: ${{ github.workspace }}/experiment-base
- name: "Deploy Faasm on k8s cluster"
run: source ./bin/workon.sh && faasmctl deploy.k8s --workers=4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ jobs:
with:
submodules: true
- name: "Install faasmctl"
run: pip3 install faasmctl==0.24.0
run: pip3 install faasmctl==0.25.0
# Cache contains architecture-specific machine code
- name: "Get CPU model name"
run: echo "CPU_MODEL=$(./bin/print_cpu.sh)" >> $GITHUB_ENV
Expand Down Expand Up @@ -620,7 +620,7 @@ jobs:
build-type: release
if: matrix.detached == false
- name: "Install faasmctl"
run: pip3 install faasmctl==0.24.0
run: pip3 install faasmctl==0.25.0
- name: "Fetch python's CPP submodulle"
run: git submodule update --init -f third-party/cpp
working-directory: ${{ github.workspace }}/clients/python
Expand Down
2 changes: 1 addition & 1 deletion deploy/k8s-common/planner.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ metadata:
spec:
containers:
- name: planner
image: faasm.azurecr.io/planner:0.12.0
image: faasm.azurecr.io/planner:0.13.0
ports:
- containerPort: 8081
env:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
black>=23.12.0
breathe>=4.35.0
faasmctl==0.24.0
faasmctl==0.25.0
flake8>=7.0.0
invoke>=2.0.0
myst_parser>=2.0.0
Expand Down
1 change: 1 addition & 0 deletions 6D4E src/runner/MicrobenchRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ int MicrobenchRunner::doRun(std::ofstream& outFs,

auto req = createBatchRequest(user, function, inputData);
faabric::Message msg = req->messages().at(0);
req->set_singlehosthint(true);

// Check files have been uploaded
storage::FileLoader& loader = storage::getFileLoader();
Expand Down
14 changes: 11 additions & 3 deletions src/wasm/WasmModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,10 @@ int WasmModule::awaitPthreadCall(faabric::Message* msg, int pthreadPtr)
req->set_type(faabric::BatchExecuteRequest::THREADS);
req->set_subtype(wasm::ThreadRequestType::PTHREAD);

// In the local tests, we always set the single-host flag to avoid
// In the local tests, we always set the single-host hint to avoid
// having to synchronise snapshots
if (faabric::util::isTestMode()) {
req->set_singlehost(true);
req->set_singlehosthint(true);
}

for (int i = 0; i < nPthreadCalls; i++) {
Expand All @@ -547,8 +547,16 @@ int WasmModule::awaitPthreadCall(faabric::Message* msg, int pthreadPtr)
pthreadPtrsToChainedCalls.insert({ p.pthreadPtr, m.id() });
}

std::shared_ptr<faabric::util::SnapshotData> snap = nullptr;
if (!req->singlehosthint()) {
snap = executor->getMainThreadSnapshot(*msg, true);
}

// Execute the threads and await results
lastPthreadResults = executor->executeThreads(req, mergeRegions);
faabric::planner::getPlannerClient().callFunctions(req);
lastPthreadResults =
faabric::scheduler::getScheduler().awaitThreadResults(
req, 10 * faabric::util::getSystemConfig().boundTimeout);

// Empty the queue
queuedPthreadCalls.clear();
Expand Down
209 changes: 161 additions & 48 deletions src/wasm/openmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,20 @@ void doOpenMPFork(int32_t loc,
{
OMP_FUNC_ARGS("__kmpc_fork_call {} {} {}", loc, nSharedVars, microTask);

// To replicate the fork behaviour, we create (n - 1) executors with thread
// semantics (i.e. sharing the same Faaslet). And instruct the calling
// (parent) executor to also execute the same micro task
auto* parentCall = &faabric::scheduler::ExecutorContext::get()->getMsg();
auto* parentModule = getExecutingModule();
auto parentReq =
faabric::scheduler::ExecutorContext::get()->getBatchRequest();
const auto parentStr = faabric::util::funcToString(*parentCall, false);
auto* parentExecutor =
faabric::scheduler::ExecutorContext::get()->getExecutor();

const std::string parentStr =
faabric::util::funcToString(*parentCall, false);

// Set up the next level
// OpenMP execution contexs are called levels, and they contain the
// thread-local information to execute the microTask (mostly private and
// shared variables)
std::shared_ptr<threads::Level> parentLevel = level;
auto nextLevel =
std::make_shared<threads::Level>(parentLevel->getMaxThreadsAtNextLevel());
Expand All @@ -126,43 +131,19 @@ void doOpenMPFork(int32_t loc,
throw std::runtime_error("Nested OpenMP support removed");
}

// Set up the chained calls
// Set up the chained calls with thread semantics
std::shared_ptr<faabric::BatchExecuteRequest> req =
faabric::util::batchExecFactory(
parentCall->user(), parentCall->function(), nextLevel->numThreads);
parentCall->user(), parentCall->function(), nextLevel->numThreads - 1);
req->set_type(faabric::BatchExecuteRequest::THREADS);
req->set_subtype(ThreadRequestType::OPENMP);
// TODO(thread-opt): we don't relate the calling message with the callee.
// This means that OpenMP messages could be sub-optimally scheduled
// We do not want to relate the caller message with the callee because
// the current planner implementation interprets the chained request as
// a SCALE_CHANGE request, and puts all threads (including the calling
// one) in the same group ID. This means that when we do omp barrier,
// we wait for an extra thread (the caller thread is not involved in the
// OMP computation!)
// faabric::util::updateBatchExecAppId(req, parentCall->appid());

// Preload the schedulign decisions in local test mode to avoid having to
// distribute the snapshots
auto& plannerCli = faabric::planner::getPlannerClient();
if (faabric::util::isTestMode()) {
SPDLOG_INFO(
"Pre-loading scheduling decision for single-host OMP sub-app: {}",
req->appid());

auto preloadDec =
std::make_shared<faabric::batch_scheduler::SchedulingDecision>(
req->appid(), req->groupid());
for (int i = 0; i < req->messages_size(); i++) {
preloadDec->addMessage(
faabric::util::getSystemConfig().endpointHost, 0, 0, i);
}
plannerCli.preloadSchedulingDecision(preloadDec);

req->set_singlehost(true);
}

// Add remote context
// Propagate the app ID to let the planner know that these are messages
// for the same app
faabric::util::updateBatchExecAppId(req, parentCall->appid());
// Propagate the single-host hint. The single host flag can be used to hint
// that we do not need to preemptively distribute snapshots
req->set_singlehosthint(parentReq->singlehosthint());
// Serialise the level so that it is available in the request
std::vector<uint8_t> serialisedLevel = nextLevel->serialise();
req->set_contextdata(serialisedLevel.data(), serialisedLevel.size());

Expand All @@ -175,33 +156,165 @@ void doOpenMPFork(int32_t loc,
m.set_funcptr(microTask);

// OpenMP thread number
int threadNum = nextLevel->getGlobalThreadNum(i);
int threadNum = nextLevel->getGlobalThreadNum(i + 1);
m.set_appidx(threadNum);

// Group setup for distributed coordination. Note that the group index
// is just within this function group, and not the global OpenMP
// thread number
m.set_groupidx(i);
m.set_groupidx(i + 1);
}

// Execute the threads
faabric::scheduler::Executor* executor =
faabric::scheduler::ExecutorContext::get()->getExecutor();
std::vector<std 10000 ::pair<uint32_t, int>> results =
executor->executeThreads(req, parentModule->getMergeRegions());
// Do snapshotting if not on a single host. Note that, from the caller
// thread, we cannot know if the request is going to be single host or not.
// So, by default, we always take a snapshot. We can bypass this by setting
// the single host hint flag in the caller request
// TODO: ideally, we would first call the planner to know if the scheduling
// decision will be single host or not, and then have the threads wait for
// the snapshot if necessary (i.e. getOrAwaitSnapshot())
std::shared_ptr<faabric::util::SnapshotData> snap = nullptr;
if (!req->singlehosthint()) {
snap = parentExecutor->getMainThreadSnapshot(*parentCall, true);

// Get dirty regions since last batch of threads
std::span<uint8_t> memView = parentExecutor->getMemoryView();
faabric::util::getDirtyTracker()->stopTracking(memView);
faabric::util::getDirtyTracker()->stopThreadLocalTracking(memView);

// If this is the first batch, these dirty regions will be empty
std::vector<char> dirtyRegions =
faabric::util::getDirtyTracker()->getBothDirtyPages(memView);

// Apply changes to snapshot
snap->fillGapsWithBytewiseRegions();
std::vector<faabric::util::SnapshotDiff> updates =
snap->diffWithDirtyRegions(memView, dirtyRegions);

if (updates.empty()) {
SPDLOG_DEBUG(
"No updates to main thread snapshot for {} over {} pages",
parentStr,
dirtyRegions.size());
} else {
SPDLOG_DEBUG("Updating main thread snapshot for {} with {} diffs",
parentStr,
updates.size());
snap->applyDiffs(updates);
}

// Clear merge regions, not persisted between batches of threads
snap->clearMergeRegions();

// Now we have to add any merge regions we've been saving up for this
// next batch of threads
auto mergeRegions = parentModule->getMergeRegions();
for (const auto& mr : mergeRegions) {
snap->addMergeRegion(
mr.offset, mr.length, mr.dataType, mr.operation);
}
}

// Invoke all non-main threads
faabric::batch_scheduler::SchedulingDecision decision(req->appid(), 0);
if (req->messages_size() > 0) {
decision = faabric::planner::getPlannerClient().callFunctions(req);
} else {
// In a one-thread OpenMP loop, we manually create a communication
// group of size one
const std::string thisHost =
faabric::util::getSystemConfig().endpointHost;
decision.addMessage(thisHost, parentCall->id(), 0, 0);
faabric::transport::getPointToPointBroker()
.setUpLocalMappingsFromSchedulingDecision(decision);
}

// Invoke the main thread (number zero)
auto thisThreadReq = faabric::util::batchExecFactory(
parentCall->user(), parentCall->function(), 1);
thisThreadReq->set_type(faabric::BatchExecuteRequest::THREADS);
thisThreadReq->set_subtype(ThreadRequestType::OPENMP);
thisThreadReq->set_contextdata(serialisedLevel.data(),
serialisedLevel.size());
thisThreadReq->set_singlehost(parentReq->singlehost());
thisThreadReq->set_singlehosthint(parentReq->singlehosthint());
// Update the group and batch id for inter-thread communication
faabric::util::updateBatchExecAppId(thisThreadReq, parentCall->appid());
faabric::util::updateBatchExecGroupId(thisThreadReq, decision.groupId);
auto& m = thisThreadReq->mutable_messages()->at(0);
m.set_appidx(0);
m.set_groupidx(0);
m.set_funcptr(microTask);

// Finally, set the executor context, execute, and reset the context
faabric::scheduler::ExecutorContext::set(parentExecutor, thisThreadReq, 0);
if (!decision.isSingleHost()) {
faabric::util::getDirtyTracker()->startThreadLocalTracking(
parentExecutor->getMemoryView());
}
auto returnValue = parentModule->executeTask(0, 0, thisThreadReq);
faabric::scheduler::ExecutorContext::set(parentExecutor, parentReq, 0);

// Process and set thread result
if (returnValue != 0) {
SPDLOG_ERROR("OpenMP thread (0) failed, result {} on message {}",
thisThreadReq->messages(0).returnvalue(),
thisThreadReq->messages(0).id());
throw std::runtime_error("OpenMP threads failed");
}

for (auto [mid, res] : results) {
if (res != 0) {
SPDLOG_ERROR(
"OpenMP thread failed, result {} on message {}", res, mid);
// Wait for all other threads to finish
for (int i = 0; i < req->messages_size(); i++) {
uint32_t messageId = req->messages().at(i).id();

auto msgResult = faabric::planner::getPlannerClient().getMessageResult(
req->appid(),
messageId,
10 * faabric::util::getSystemConfig().boundTimeout);

if (msgResult.returnvalue() != 0) {
SPDLOG_ERROR("OpenMP thread ({}) failed, result {} on message {}",
i + 1,
msgResult.returnvalue(),
msgResult.id());
throw std::runtime_error("OpenMP threads failed");
}
}

// Perform snapshot updates if not on single host. Note that, here we know
// for sure that we must do dirty tracking, and are the last thread in
// the batch
if (!decision.isSingleHost()) {
// First, get all the thread local diffs for this thread
std::span<uint8_t> memView = parentExecutor->getMemoryView();
faabric::util::getDirtyTracker()->stopThreadLocalTracking(memView);
auto thisThreadDirtyRegions =
faabric::util::getDirtyTracker()->getThreadLocalDirtyPages(memView);

// Second, get the diffs for the batch request that has executed locally
auto diffs = parentExecutor->mergeDirtyRegions(*parentCall,
thisThreadDirtyRegions);
snap->queueDiffs(diffs);

// Write queued diffs (local and remote) to snapshot
int nWritten = snap->writeQueuedDiffs();

// Remap memory to snapshot if it's been updated
if (nWritten > 0) {
parentExecutor->setMemorySize(snap->getSize());
snap->mapToMemory(memView);
}

// Start tracking again
memView = parentExecutor->getMemoryView();
faabric::util::getDirtyTracker()->startTracking(memView);
faabric::util::getDirtyTracker()->startThreadLocalTracking(memView);
}

// Clear this module's merge regions
parentModule->clearMergeRegions();

// Reset parent level for next setting of threads
threads::setCurrentOpenMPLevel(parentLevel);
parentLevel->pushedThreads = -1;
}

Expand Down
1 change: 1 addition & 0 deletions tests/dist/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class DistTestsFixture
localResources->set_slots(nLocalSlots);
localResources->set_usedslots(nLocalUsedSlots);
sch.addHostToGlobalSet(getDistTestMasterIp(), localResources);
conf.overrideCpuCount = nLocalSlots;

auto remoteResources = std::make_shared<faabric::HostResources>();
remoteResources->set_slots(nRemoteSlots);
Expand Down
2 changes: 0 additions & 2 deletions tests/dist/threads/test_openmp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ TEST_CASE_METHOD(DistTestsFixture,
"Test OpenMP across hosts",
"[threads][openmp]")
{
conf.overrideCpuCount = 6;

// TODO(wamr-omp)
if (faasmConf.wasmVm == "wamr") {
return;
Expand Down
2 changes: 0 additions & 2 deletions tests/dist/threads/test_pthreads.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ namespace tests {

TEST_CASE_METHOD(DistTestsFixture, "Test pthreads across hosts", "[scheduler]")
{
conf.overrideCpuCount = 6;

// TODO(wamr-omp)
if (faasmConf.wasmVm == "wamr") {
return;
Expand Down
Loading
0