8000 Route `set/getFunctionResult` through the planner by csegarragonz · Pull Request #317 · faasm/faabric · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Route set/getFunctionResult through the planner #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1dd9ebe
planner: route set/getFunctionResult through the planner
csegarragonz Jun 2, 2023
b17e00c
tasks: add wrapper script for tests
csegarragonz Jun 5, 2023
4aa7ba0
env: remove duplicated env. variables for the tests
csegarragonz Jun 5, 2023
9d4bb86
wip
csegarragonz Jun 5, 2023
06fbf2e
more wip
csegarragonz Jun 5, 2023
13dea5a
more wip
csegarragonz Jun 5, 2023
9558869
tests: more test work
csegarragonz Jun 7, 2023
4ff137e
nits: self-review
csegarragonz Jun 7, 2023
6959107
scheduler: fix race condition when setting/getting results
csegarragonz Jun 7, 2023
4ae825a
tests: more fixes
csegarragonz Jun 7, 2023
0569307
nits: self-review
csegarragonz Jun 7, 2023
9be4e83
planner: fix race condition where messasge result was set between rel…
csegarragonz Jun 7, 2023
6230b67
tests: fix failing test with dummy executor
csegarragonz Jun 7, 2023
5d9c4f8
nits: run clang-format
csegarragonz Jun 7, 2023
d13afae
dist-tests: fix dist-test compilation
csegarragonz Jun 7, 2023
f393f31
tsan: fix data races
csegarragonz Jun 8, 2023
3b454e4
tsan: fix more tsan issues in running/getting results
csegarragonz Jun 8, 2023
3614daa
scheduler: remove async waiting for messages
csegarragonz Jun 19, 2023
9bd10d0
nit: fix typo
csegarragonz Jun 19, 2023
8d41496
fixtures: remove scheduler fixture from exec-context test fixture
csegarragonz Jun 19, 2023
26a56ad
nit: remove commented-out line
csegarragonz Jun 19, 2023
cd4c13d
endpoint: add test to get exec graph
csegarragonz Jun 19, 2023
81c0bc1
tests: re-factor fixture names, and simplify base fixtures
csegarragonz Jun 19, 2023
115af3c
dist-tests: fix compilation after refactor
csegarragonz Jun 19, 2023
f417499
tests: remove commented-out code
csegarragonz Jun 20, 2023
7dfb9cb
scheduler: zero-out used-slots during reset
csegarragonz Jun 20, 2023
305f89b
tests: add scheduler fixture in ptp group fixture
csegarragonz Jun 20, 2023
851af24
nits: run clang-format
csegarragonz Jun 20, 2023
4697cff
nits: self-review
csegarragonz Jun 20, 2023
7d30792
tsan: fix thread race
csegarragonz Jun 20, 2023
37c7364
tsan: fix race when reading promise value
csegarragonz Jun 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion include/faabric/planner/Planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class Planner
bool flush(faabric::planner::FlushType flushType);

// ----------
// Host membership management
// Host membership public API
// ----------

std::vector<std::shared_ptr<Host>> getAvailableHosts();
Expand All @@ -47,6 +47,17 @@ class Planner
// Best effort host removal. Don't fail if we can't
void removeHost(const Host& hostIn);

// ----------
// Request scheduling public API
// ----------

// Setters/getters for individual message results

void setMessageResult(std::shared_ptr<faabric::Message> msg);

std::shared_ptr<faabric::Message> getMessageResult(
std::shared_ptr<faabric::Message> msg);

private:
// There's a singleton instance of the planner running, but it must allow
// concurrent requests
Expand Down
5 changes: 5 additions & 0 deletions include/faabric/planner/PlannerApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ namespace faabric::planner {
enum PlannerCalls
{
NoPlanerCall = 0,
// Util
Ping = 1,
// Host-membership calls
GetAvailableHosts = 2,
RegisterHost = 3,
RemoveHost = 4,
// Scheduling calls
SetMessageResult = 8,
GetMessageResult = 9,
};
}
13 changes: 13 additions & 0 deletions include/faabric/planner/PlannerClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,24 @@ class PlannerClient final : public faabric::transport::MessageEndpointClient

void ping();

// ------
// Host membership calls
// ------

std::vector<Host> getAvailableHosts();

// Registering a host returns the keep-alive timeout for heartbeats
int registerHost(std::shared_ptr<RegisterHostRequest> req);

void removeHost(std::shared_ptr<RemoveHostRequest> req);

// ------
// Scheduling calls
// ------

void setMessageResult(std::shared_ptr<faabric::Message> msg);

std::shared_ptr<faabric::Message> getMessageResult(
std::shared_ptr<faabric::Message> msg);
};
}
6 changes: 6 additions & 0 deletions include/faabric/planner/PlannerServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ class PlannerServer final : public faabric::transport::MessageEndpointServer
std::unique_ptr<google::protobuf::Message> recvRemoveHost(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvSetMessageResult(
std::span<const uint8_t> buffer);

std::unique_ptr<google::protobuf::Message> recvGetMessageResult(
std::span<const uint8_t> buffer);

private:
faabric::planner::Planner& planner;
};
Expand Down
10 changes: 10 additions & 0 deletions include/faabric/planner/PlannerState.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <faabric/planner/planner.pb.h>
#include <faabric/proto/faabric.pb.h>

#include <map>

Expand All @@ -13,5 +14,14 @@ struct PlannerState
// We deliberately use the host's IP as unique key, but assign a unique host
// id for redundancy
std::map<std::string, std::shared_ptr<Host>> hostMap;

// Double-map holding the message results. The first key is the app id. For
// each app id, we keep a map of the message id, and the actual message
// result
std::map<int, std::map<int, std::shared_ptr<faabric::Message>>> appResults;

// Map holding the hosts that have registered interest in getting an app
// result
std::map<int, std::vector<std::string>> appResultWaiters;
};
}
3 changes: 2 additions & 1 deletion include/faabric/scheduler/FunctionCallApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ enum FunctionCalls
Flush = 2,
Unregister = 3,
GetResources = 4,
PendingMigrations = 5
PendingMigrations = 5,
SetMessageResult = 6,
};
}
5 changes: 5 additions & 0 deletions include/faabric/scheduler/FunctionCallClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ getPendingMigrationsRequests();
std::vector<std::pair<std::string, faabric::UnregisterRequest>>
getUnregisterRequests();

std::vector<std::pair<std::string, std::shared_ptr<faabric::Message>>>
getMessageResults();

void queueResourceResponse(const std::string& host,
faabric::HostResources& res);

Expand All @@ -50,5 +53,7 @@ class FunctionCallClient : public faabric::transport::MessageEndpointClient
void executeFunctions(std::shared_ptr<faabric::BatchExecuteRequest> req);

void unregister(faabric::UnregisterRequest& req);

void setMessageResult(std::shared_ptr<faabric::Message> msg);
};
}
2 changes: 2 additions & 0 deletions include/faabric/scheduler/FunctionCallServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class FunctionCallServer final
void recvExecuteFunctions(std::span<const uint8_t> buffer);

void recvUnregister(std::span<const uint8_t> buffer);

void recvSetMessageResult(std::span<const uint8_t> buffer);
};
}
31 changes: 18 additions & 13 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace faabric::scheduler {
typedef std::pair<std::shared_ptr<BatchExecuteRequest>,
std::shared_ptr<faabric::util::SchedulingDecision>>
InFlightPair;
typedef std::promise<std::shared_ptr<faabric::Message>> MessageResultPromise;
typedef std::shared_ptr<MessageResultPromise> MessageResultPromisePtr;

class Scheduler;

Expand Down Expand Up @@ -261,16 +263,17 @@ class Scheduler

void flushLocally();

// ----------------------------------
// Message results
// ----------------------------------
void setFunctionResult(faabric::Message& msg);

void setMessageResultLocally(std::shared_ptr<faabric::Message> msg);

faabric::Message getFunctionResult(const faabric::Message& msg,
int timeoutMs);

void getFunctionResultAsync(const faabric::Message& msg,
int timeoutMs,
asio::io_context& ioc,
asio::any_io_executor& executor,
std::function<void(faabric::Message&)> handler);
faabric::Message getFunctionResult(int appId, int msgId, int timeoutMs);

void setThreadResult(const faabric::Message& msg,
int32_t returnValue,
Expand Down Expand Up @@ -344,12 +347,12 @@ class Scheduler
// ----------------------------------
// Exec graph
// ----------------------------------
void logChainedFunction(const faabric::Message& parentMessage,
void logChainedFunction(faabric::Message& parentMessage,
const faabric::Message& chainedMessage);

std::set<unsigned int> getChainedFunctions(unsigned int msgId);
std::set<unsigned int> getChainedFunctions(const faabric::Message& msg);

ExecGraph getFunctionExecGraph(unsigned int msgId);
ExecGraph getFunctionExecGraph(const faabric::Message& msg);

// ----------------------------------
// Function Migration
Expand Down Expand Up @@ -394,12 +397,14 @@ class Scheduler
std::unordered_map<uint32_t, faabric::transport::Message>
threadResultMessages;

std::unordered_map<uint32_t, std::shared_ptr<MessageLocalResult>>
localResults;

std::unordered_map<std::string, std::set<std::string>> pushedSnapshotsMap;

std::mutex localResultsMutex;
// ---- Message results ----
std::unordered_map<uint32_t, MessageResultPromisePtr> plannerResults;
std::mutex plannerResultsMutex;
faabric::Message doGetFunctionResult(
std::shared_ptr<faabric::Message> msgPtr,
int timeoutMs);

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
Expand Down Expand Up @@ -443,7 +448,7 @@ class Scheduler
std::vector<std::pair<std::string, faabric::Message>>
recordedMessagesShared;

ExecGraphNode getFunctionExecGraphNode(unsigned int msgId);
ExecGraphNode getFunctionExecGraphNode(int appId, int msgId);

// ---- Point-to-point ----
faabric::transport::PointToPointBroker& broker;
Expand Down
28 changes: 9 additions & 19 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void FaabricEndpointHandler::onRequest(
} else if (msg.isexecgraphrequest()) {
SPDLOG_DEBUG("Processing execution graph request");
faabric::scheduler::ExecGraph execGraph =
sched.getFunctionExecGraph(msg.id());
sched.getFunctionExecGraph(msg);
response.result(beast::http::status::ok);
response.body() = faabric::scheduler::execGraphToJson(execGraph);

Expand All @@ -92,7 +92,9 @@ void FaabricEndpointHandler::executeFunction(
size_t messageIndex)
{
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();
faabric::Message& msg = *ber->mutable_messages(messageIndex);
// Deliberately make a copy here to avoid data races. The BER message will
// be used for execution, the message copy to wait on the function result
faabric::Message msg = ber->messages(messageIndex);

if (msg.user().empty()) {
response.result(beast::http::status::bad_request);
Expand Down Expand Up @@ -127,27 +129,16 @@ void FaabricEndpointHandler::executeFunction(
// Await result on global bus (may have been executed on a different worker)
if (msg.isasync()) {
response.result(beast::http::status::ok);
response.body() = faabric::util::buildAsyncResponse(msg);
response.body() = faabric::util::messageToJson(msg);
return ctx.sendFunction(std::move(response));
}

// TODO: temporarily make this HTTP call block one server thread.
// Eventually. we will route all HTTP requests through the planner instead
// of the worker, so we will be able to remove this blocking call
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);
sch.getFunctionResultAsync(
msg,
conf.globalMessageTimeout,
ctx.ioc,
ctx.executor,
beast::bind_front_handler(&FaabricEndpointHandler::onFunctionResult,
this->shared_from_this(),
std::move(ctx),
std::move(response)));
}
auto result = sch.getFunctionResult(msg, conf.globalMessageTimeout);

void FaabricEndpointHandler::onFunctionResult(
HttpRequestContext&& ctx,
faabric::util::BeastHttpResponse&& response,
faabric::Message& result)
{
beast::http::status statusCode =
(result.returnvalue() == 0) ? beast::http::status::ok
: beast::http::status::internal_server_error;
Expand All @@ -159,5 +150,4 @@ void FaabricEndpointHandler::onFunctionResult(
response.body() = result.outputdata();
return ctx.sendFunction(std::move(response));
}

}
1 change: 1 addition & 0 deletions src/mpi/MpiWorld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ void MpiWorld::create(faabric::Message& call, int newId, int newSize)
faabric::util::batchExecFactory(user, function, size - 1);
for (int i = 0; i < req->messages_size(); i++) {
faabric::Message& msg = req->mutable_messages()->at(i);
msg.set_appid(call.appid());
msg.set_ismpi(true);
msg.set_mpiworldid(id);
msg.set_mpirank(i + 1);
Expand Down
75 changes: 75 additions & 0 deletions src/planner/Planner.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <faabric/planner/Planner.h>
#include <faabric/scheduler/Scheduler.h>
#include <faabric/util/clock.h>
#include <faabric/util/config.h>
#include <faabric/util/environment.h>
Expand Down Expand Up @@ -177,6 +178,80 @@ bool Planner::isHostExpired(std::shared_ptr<Host> host, long epochTimeMs)
return (epochTimeMs - host->registerts().epochms()) > hostTimeoutMs;
}

void Planner::setMessageResult(std::shared_ptr<faabric::Message> msg)
{
int appId = msg->appid();
int msgId = msg->id();

faabric::util::FullLock lock(plannerMx);

SPDLOG_INFO("Planner setting message result (id: {}) for {}:{}:{}",
msg->id(),
msg->appid(),
msg->groupid(),
msg->groupidx());

// Set the result
state.appResults[appId][msgId] = msg;

// Dispatch an async message to all hosts that are waiting
auto& sch = faabric::scheduler::getScheduler();
if (state.appResultWaiters.find(msgId) != state.appResultWaiters.end()) {
for (const auto& host : state.appResultWaiters[msgId]) {
SPDLOG_INFO("Sending result to waiting host: {}", host);
sch.getFunctionCallClient(host)->setMessageResult(msg);
}
}
}

std::shared_ptr<faabric::Message> Planner::getMessageResult(
std::shared_ptr<faabric::Message> msg)
{
int appId = msg->appid();
int msgId = msg->id();

{
faabric::util::SharedLock lock(plannerMx);

// We debug and not error these messages as they happen frequently
// when polling for results
if (state.appResults.find(appId) == state.appResults.end()) {
SPDLOG_DEBUG("App {} not registered in app results", appId);
} else if (state.appResults[appId].find(msgId) ==
state.appResults[appId].end()) {
SPDLOG_DEBUG("Msg {} not registered in app results (app id: {})",
msgId,
appId);
} else {
return state.appResults[appId][msgId];
}
}

// If we are here, it means that we have not found the message result, so
// we register the calling-host's interest if the calling-host has
// provided a masterhost. The masterhost is set when dispatching a message
// within faabric, but not when sending an HTTP request
if (!msg->masterhost().empty()) {
faabric::util::FullLock lock(plannerMx);

// Check again if the result is not set, as it could have been set
// between releasing the shared lock and acquiring the full lock
if (state.appResults.contains(appId) &&
state.appResults[appId].contains(msgId)) {
return state.appResults[appId][msgId];
}

// Definately the message result is not set, so we add the host to the
// waiters list
SPDLOG_DEBUG("Adding host {} on the waiting list for message {}",
msg->masterhost(),
msgId);
state.appResultWaiters[msgId].push_back(msg->masterhost());
}

return nullptr;
}

Planner& getPlanner()
{
static Planner planner;
Expand Down
19 changes: 19 additions & 0 deletions src/planner/PlannerClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,23 @@ void PlannerClient::removeHost(std::shared_ptr<RemoveHostRequest> req)
faabric::EmptyResponse response;
syncSend(PlannerCalls::RemoveHost, req.get(), &response);
}

void PlannerClient::setMessageResult(std::shared_ptr<faabric::Message> msg)
{
faabric::EmptyResponse response;
syncSend(PlannerCalls::SetMessageResult, msg.get(), &response);
}

std::shared_ptr<faabric::Message> PlannerClient::getMessageResult(
std::shared_ptr<faabric::Message> msg)
{
faabric::Message responseMsg;
syncSend(PlannerCalls::GetMessageResult, msg.get(), &responseMsg);

if (responseMsg.id() == 0 && responseMsg.appid() == 0) {
return nullptr;
}

return std::make_shared<faabric::Message>(responseMsg);
}
}
Loading
0