From c413ff9954ef4e1ebff13ab5e2d1b82b0703abf1 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 16 Mar 2022 14:58:47 +0000 Subject: [PATCH 1/3] Test for locking and updating state --- include/faabric/state/InMemoryStateKeyValue.h | 8 +-- include/faabric/state/RedisStateKeyValue.h | 8 +-- include/faabric/state/StateClient.h | 2 + include/faabric/state/StateServer.h | 2 + src/state/StateClient.cpp | 22 +++++++ src/state/StateServer.cpp | 20 +++--- tests/test/state/test_state_server.cpp | 63 +++++++++++++++++++ 7 files changed, 107 insertions(+), 18 deletions(-) diff --git a/include/faabric/state/InMemoryStateKeyValue.h b/include/faabric/state/InMemoryStateKeyValue.h index 856b6d3c5..fe552fabd 100644 --- a/include/faabric/state/InMemoryStateKeyValue.h +++ b/include/faabric/state/InMemoryStateKeyValue.h @@ -53,6 +53,10 @@ class InMemoryStateKeyValue final : public StateKeyValue AppendedInMemoryState& getAppendedValue(uint idx); + void lockGlobal() override; + + void unlockGlobal() override; + private: const std::string thisIP; const std::string masterIP; @@ -64,10 +68,6 @@ class InMemoryStateKeyValue final : public StateKeyValue std::vector appendedData; - void lockGlobal() override; - - void unlockGlobal() override; - void pullFromRemote() override; void pullChunkFromRemote(long offset, size_t length) override; diff --git a/include/faabric/state/RedisStateKeyValue.h b/include/faabric/state/RedisStateKeyValue.h index 2a09ba7ff..a034a59a3 100644 --- a/include/faabric/state/RedisStateKeyValue.h +++ b/include/faabric/state/RedisStateKeyValue.h @@ -24,15 +24,15 @@ class RedisStateKeyValue final : public StateKeyValue static void clearAll(bool global); + void lockGlobal() override; + + void unlockGlobal() override; + private: const std::string joinedKey; uint32_t lastRemoteLockId = 0; - void lockGlobal() override; - - void unlockGlobal() override; - void pullFromRemote() override; void pullChunkFromRemote(long offset, size_t length) override; diff --git a/include/faabric/state/StateClient.h b/include/faabric/state/StateClient.h index fff3e2b4a..e2a51857d 100644 --- a/include/faabric/state/StateClient.h +++ b/include/faabric/state/StateClient.h @@ -40,5 +40,7 @@ class StateClient : public faabric::transport::MessageEndpointClient void sendStateRequest(faabric::state::StateCalls header, const uint8_t* data, int length); + + void logRequest(const std::string &op); }; } diff --git a/include/faabric/state/StateServer.h b/include/faabric/state/StateServer.h index ebd62760a..5480778df 100644 --- a/include/faabric/state/StateServer.h +++ b/include/faabric/state/StateServer.h @@ -13,6 +13,8 @@ class StateServer final : public faabric::transport::MessageEndpointServer private: State& state; + void logOperation(const std::string &op); + void doAsyncRecv(int header, const uint8_t* buffer, size_t bufferSize) override; diff --git a/src/state/StateClient.cpp b/src/state/StateClient.cpp index 66e81524d..55541425d 100644 --- a/src/state/StateClient.cpp +++ b/src/state/StateClient.cpp @@ -5,6 +5,7 @@ #include namespace faabric::state { + StateClient::StateClient(const std::string& userIn, const std::string& keyIn, const std::string& hostIn) @@ -15,6 +16,11 @@ StateClient::StateClient(const std::string& userIn, , key(keyIn) {} +void StateClient::logRequest(const std::string& op) +{ + SPDLOG_TRACE("Requesting {} on {}/{} at {}", op, user, key, host); +} + void StateClient::sendStateRequest(faabric::state::StateCalls header, const uint8_t* data, int length) @@ -33,6 +39,8 @@ void StateClient::sendStateRequest(faabric::state::StateCalls header, void StateClient::pushChunks(const std::vector& chunks) { + logRequest("push-chunks"); + for (const auto& chunk : chunks) { faabric::StatePart stateChunk; stateChunk.set_user(user); @@ -48,6 +56,8 @@ void StateClient::pushChunks(const std::vector& chunks) void StateClient::pullChunks(const std::vector& chunks, uint8_t* bufferStart) { + logRequest("pull-chunks"); + for (const auto& chunk : chunks) { // Prepare request faabric::StateChunkRequest request; @@ -69,11 +79,14 @@ void StateClient::pullChunks(const std::vector& chunks, void StateClient::append(const uint8_t* data, size_t length) { + logRequest("append"); sendStateRequest(faabric::state::StateCalls::Append, data, length); } void StateClient::pullAppended(uint8_t* buffer, size_t length, long nValues) { + logRequest("pull-appended"); + // Prepare request faabric::StateAppendedRequest request; request.set_user(user); @@ -101,11 +114,14 @@ void StateClient::pullAppended(uint8_t* buffer, size_t length, long nValues) void StateClient::clearAppended() { + logRequest("clear-appended"); sendStateRequest(faabric::state::StateCalls::ClearAppended, nullptr, 0); } size_t StateClient::stateSize() { + logRequest("state-size"); + faabric::StateRequest request; request.set_user(user); request.set_key(key); @@ -118,16 +134,22 @@ size_t StateClient::stateSize() void StateClient::deleteState() { + logRequest("delete"); + sendStateRequest(faabric::state::StateCalls::Delete, nullptr, 0); } void StateClient::lock() { + logRequest("lock"); + sendStateRequest(faabric::state::StateCalls::Lock, nullptr, 0); } void StateClient::unlock() { + logRequest("unlock"); + sendStateRequest(faabric::state::StateCalls::Unlock, nullptr, 0); } } diff --git a/src/state/StateServer.cpp b/src/state/StateServer.cpp index 7439a09d9..c1dfedb75 100644 --- a/src/state/StateServer.cpp +++ b/src/state/StateServer.cpp @@ -73,7 +73,7 @@ std::unique_ptr StateServer::recvSize( PARSE_MSG(faabric::StateRequest, buffer, bufferSize) // Prepare the response - SPDLOG_TRACE("Size {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received size {}/{}", msg.user(), msg.key()); KV_FROM_REQUEST(msg) auto response = std::make_unique(); response->set_user(kv->user); @@ -89,7 +89,7 @@ std::unique_ptr StateServer::recvPull( { PARSE_MSG(faabric::StateChunkRequest, buffer, bufferSize) - SPDLOG_TRACE("Pull {}/{} ({}->{})", + SPDLOG_TRACE("Received pull {}/{} ({}->{})", msg.user(), msg.key(), msg.offset(), @@ -118,7 +118,7 @@ std::unique_ptr StateServer::recvPush( PARSE_MSG(faabric::StatePart, buffer, bufferSize) // Update the KV store - SPDLOG_TRACE("Push {}/{} ({}->{})", + SPDLOG_TRACE("Received push {}/{} ({}->{})", msg.user(), msg.key(), msg.offset(), @@ -155,7 +155,7 @@ std::unique_ptr StateServer::recvPullAppended( PARSE_MSG(faabric::StateAppendedRequest, buffer, bufferSize) // Prepare response - SPDLOG_TRACE("Pull appended {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received pull-appended {}/{}", msg.user(), msg.key()); KV_FROM_REQUEST(msg) auto response = std::make_unique(); @@ -178,7 +178,7 @@ std::unique_ptr StateServer::recvDelete( PARSE_MSG(faabric::StateRequest, buffer, bufferSize) // Delete value - SPDLOG_TRACE("Delete {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received delete {}/{}", msg.user(), msg.key()); state.deleteKV(msg.user(), msg.key()); auto response = std::make_unique(); @@ -192,7 +192,7 @@ std::unique_ptr StateServer::recvClearAppended( PARSE_MSG(faabric::StateRequest, buffer, bufferSize) // Perform operation - SPDLOG_TRACE("Clear appended {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received clear-appended {}/{}", msg.user(), msg.key()); KV_FROM_REQUEST(msg) kv->clearAppended(); @@ -207,9 +207,9 @@ std::unique_ptr StateServer::recvLock( PARSE_MSG(faabric::StateRequest, buffer, bufferSize) // Perform operation - SPDLOG_TRACE("Lock {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received lock {}/{}", msg.user(), msg.key()); KV_FROM_REQUEST(msg) - kv->lockWrite(); + kv->lockGlobal(); auto response = std::make_unique(); return response; @@ -222,9 +222,9 @@ std::unique_ptr StateServer::recvUnlock( PARSE_MSG(faabric::StateRequest, buffer, bufferSize) // Perform operation - SPDLOG_TRACE("Unlock {}/{}", msg.user(), msg.key()); + SPDLOG_TRACE("Received unlock {}/{}", msg.user(), msg.key()); KV_FROM_REQUEST(msg) - kv->unlockWrite(); + kv->unlockGlobal(); auto response = std::make_unique(); return response; diff --git a/tests/test/state/test_state_server.cpp b/tests/test/state/test_state_server.cpp index 4e66d5837..24ab2949b 100644 --- a/tests/test/state/test_state_server.cpp +++ b/tests/test/state/test_state_server.cpp @@ -216,4 +216,67 @@ TEST_CASE_METHOD(SimpleStateServerTestFixture, std::vector actual(kv->get(), kv->get() + dataA.size()); REQUIRE(actual == dataB); } + +TEST_CASE_METHOD(SimpleStateServerTestFixture, + "Test locking state remotely", + "[state]") +{ + int nThreads = 20; + std::string user = "dummy"; + std::string key = "lock-check"; + size_t stateSize = sizeof(int); + + int initialValue = 20; + int increment = 10; + int nLoops = 5; + + // Prepare the initial KV + std::shared_ptr kv = state.getKV(user, key, stateSize); + kv->set(BYTES(&initialValue)); + + std::vector threads; + threads.reserve(20); + + for (int i = 0; i < nThreads; i++) { + threads.emplace_back([user, key, stateSize, increment, nLoops] { + State& state = getGlobalState(); + std::shared_ptr kv = + state.getKV(user, key, stateSize); + uint8_t* rawPtr = kv->get(); + + // Create a client + StateClient client(user, key, DEFAULT_STATE_HOST); + + for (int j = 0; j < nLoops; j++) { + // Lock + client.lock(); + + // Pull the data + StateChunk pullChunk(0, stateSize, rawPtr); + client.pullChunks({ pullChunk }, rawPtr); + + // Read the value and update + int currentValue = faabric::util::unalignedRead(rawPtr); + currentValue += increment; + kv->set(BYTES(¤tValue)); + + StateChunk pushChunk(0, stateSize, rawPtr); + client.pushChunks({ pushChunk }); + + // Unlock + client.unlock(); + } + }); + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + int expectedValue = initialValue + (nThreads * increment); + int actualValue = faabric::util::unalignedRead(kv->get()); + REQUIRE(actualValue == expectedValue); +} } From a9300bc3c7a724b5f6d5a5c5cf1cc10063f529dd Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 16 Mar 2022 16:34:44 +0000 Subject: [PATCH 2/3] Removing global lock/ unlock --- include/faabric/state/InMemoryStateKeyValue.h | 6 -- include/faabric/state/RedisStateKeyValue.h | 4 -- include/faabric/state/StateKeyValue.h | 4 -- src/state/InMemoryStateKeyValue.cpp | 20 ------ src/state/RedisStateKeyValue.cpp | 15 ----- src/state/StateServer.cpp | 30 --------- tests/test/state/test_state_server.cpp | 63 ------------------- 7 files changed, 142 deletions(-) diff --git a/include/faabric/state/InMemoryStateKeyValue.h b/include/faabric/state/InMemoryStateKeyValue.h index fe552fabd..6d8b1077a 100644 --- a/include/faabric/state/InMemoryStateKeyValue.h +++ b/include/faabric/state/InMemoryStateKeyValue.h @@ -53,10 +53,6 @@ class InMemoryStateKeyValue final : public StateKeyValue AppendedInMemoryState& getAppendedValue(uint idx); - void lockGlobal() override; - - void unlockGlobal() override; - private: const std::string thisIP; const std::string masterIP; @@ -64,8 +60,6 @@ class InMemoryStateKeyValue final : public StateKeyValue InMemoryStateRegistry& stateRegistry; - std::shared_mutex globalLock; - std::vector appendedData; void pullFromRemote() override; diff --git a/include/faabric/state/RedisStateKeyValue.h b/include/faabric/state/RedisStateKeyValue.h index a034a59a3..f91e7ffad 100644 --- a/include/faabric/state/RedisStateKeyValue.h +++ b/include/faabric/state/RedisStateKeyValue.h @@ -24,10 +24,6 @@ class RedisStateKeyValue final : public StateKeyValue static void clearAll(bool global); - void lockGlobal() override; - - void unlockGlobal() override; - private: const std::string joinedKey; diff --git a/include/faabric/state/StateKeyValue.h b/include/faabric/state/StateKeyValue.h index 86b2cc46d..c5f640372 100644 --- a/include/faabric/state/StateKeyValue.h +++ b/include/faabric/state/StateKeyValue.h @@ -105,10 +105,6 @@ class StateKeyValue void pushFull(); - virtual void lockGlobal() = 0; - - virtual void unlockGlobal() = 0; - protected: std::shared_mutex valueMutex; diff --git a/src/state/InMemoryStateKeyValue.cpp b/src/state/InMemoryStateKeyValue.cpp index a503528bc..688d5e8d5 100644 --- a/src/state/InMemoryStateKeyValue.cpp +++ b/src/state/InMemoryStateKeyValue.cpp @@ -90,26 +90,6 @@ bool InMemoryStateKeyValue::isMaster() // Normal state key-value API // ---------------------------------------- -void InMemoryStateKeyValue::lockGlobal() -{ - if (status == InMemoryStateKeyStatus::MASTER) { - globalLock.lock(); - } else { - StateClient cli(user, key, masterIP); - cli.lock(); - } -} - -void InMemoryStateKeyValue::unlockGlobal() -{ - if (status == InMemoryStateKeyStatus::MASTER) { - globalLock.unlock(); - } else { - StateClient cli(user, key, masterIP); - cli.unlock(); - } -} - void InMemoryStateKeyValue::pullFromRemote() { if (status == InMemoryStateKeyStatus::MASTER) { diff --git a/src/state/RedisStateKeyValue.cpp b/src/state/RedisStateKeyValue.cpp index 06891891f..045a83ad3 100644 --- a/src/state/RedisStateKeyValue.cpp +++ b/src/state/RedisStateKeyValue.cpp @@ -47,21 +47,6 @@ void RedisStateKeyValue::clearAll(bool global) } } -// TODO - the remote locking here is quite primitive since we ignore the fact -// threads can run on the same machine. Redis is also aware of scheduling and so -// we could optimise this. -void RedisStateKeyValue::lockGlobal() -{ - faabric::util::FullLock lock(valueMutex); - lastRemoteLockId = waitOnRedisRemoteLock(joinedKey); -} - -void RedisStateKeyValue::unlockGlobal() -{ - faabric::util::FullLock lock(valueMutex); - redis::Redis::getState().releaseLock(joinedKey, lastRemoteLockId); -} - void RedisStateKeyValue::pullFromRemote() { PROF_START(statePull) diff --git a/src/state/StateServer.cpp b/src/state/StateServer.cpp index c1dfedb75..4a770439d 100644 --- a/src/state/StateServer.cpp +++ b/src/state/StateServer.cpp @@ -199,34 +199,4 @@ std::unique_ptr StateServer::recvClearAppended( auto response = std::make_unique(); return response; } - -std::unique_ptr StateServer::recvLock( - const uint8_t* buffer, - size_t bufferSize) -{ - PARSE_MSG(faabric::StateRequest, buffer, bufferSize) - - // Perform operation - SPDLOG_TRACE("Received lock {}/{}", msg.user(), msg.key()); - KV_FROM_REQUEST(msg) - kv->lockGlobal(); - - auto response = std::make_unique(); - return response; -} - -std::unique_ptr StateServer::recvUnlock( - const uint8_t* buffer, - size_t bufferSize) -{ - PARSE_MSG(faabric::StateRequest, buffer, bufferSize) - - // Perform operation - SPDLOG_TRACE("Received unlock {}/{}", msg.user(), msg.key()); - KV_FROM_REQUEST(msg) - kv->unlockGlobal(); - - auto response = std::make_unique(); - return response; -} } diff --git a/tests/test/state/test_state_server.cpp b/tests/test/state/test_state_server.cpp index 24ab2949b..4e66d5837 100644 --- a/tests/test/state/test_state_server.cpp +++ b/tests/test/state/test_state_server.cpp @@ -216,67 +216,4 @@ TEST_CASE_METHOD(SimpleStateServerTestFixture, std::vector actual(kv->get(), kv->get() + dataA.size()); REQUIRE(actual == dataB); } - -TEST_CASE_METHOD(SimpleStateServerTestFixture, - "Test locking state remotely", - "[state]") -{ - int nThreads = 20; - std::string user = "dummy"; - std::string key = "lock-check"; - size_t stateSize = sizeof(int); - - int initialValue = 20; - int increment = 10; - int nLoops = 5; - - // Prepare the initial KV - std::shared_ptr kv = state.getKV(user, key, stateSize); - kv->set(BYTES(&initialValue)); - - std::vector threads; - threads.reserve(20); - - for (int i = 0; i < nThreads; i++) { - threads.emplace_back([user, key, stateSize, increment, nLoops] { - State& state = getGlobalState(); - std::shared_ptr kv = - state.getKV(user, key, stateSize); - uint8_t* rawPtr = kv->get(); - - // Create a client - StateClient client(user, key, DEFAULT_STATE_HOST); - - for (int j = 0; j < nLoops; j++) { - // Lock - client.lock(); - - // Pull the data - StateChunk pullChunk(0, stateSize, rawPtr); - client.pullChunks({ pullChunk }, rawPtr); - - // Read the value and update - int currentValue = faabric::util::unalignedRead(rawPtr); - currentValue += increment; - kv->set(BYTES(¤tValue)); - - StateChunk pushChunk(0, stateSize, rawPtr); - client.pushChunks({ pushChunk }); - - // Unlock - client.unlock(); - } - }); - } - - for (auto& t : threads) { - if (t.joinable()) { - t.join(); - } - } - - int expectedValue = initialValue + (nThreads * increment); - int actualValue = faabric::util::unalignedRead(kv->get()); - REQUIRE(actualValue == expectedValue); -} } From 50bb22fe232e8393b5cf9b8499eea31ce74fc1c5 Mon Sep 17 00:00:00 2001 From: Simon Shillaker Date: Wed, 16 Mar 2022 16:37:53 +0000 Subject: [PATCH 3/3] Remove global locking --- include/faabric/state/RedisStateKeyValue.h | 2 -- include/faabric/state/State.h | 4 +--- include/faabric/state/StateClient.h | 2 +- include/faabric/state/StateServer.h | 8 +------- src/state/StateClient.cpp | 14 -------------- src/state/StateServer.cpp | 6 ------ 6 files changed, 3 insertions(+), 33 deletions(-) diff --git a/include/faabric/state/RedisStateKeyValue.h b/include/faabric/state/RedisStateKeyValue.h index f91e7ffad..bfdee9fa5 100644 --- a/include/faabric/state/RedisStateKeyValue.h +++ b/include/faabric/state/RedisStateKeyValue.h @@ -27,8 +27,6 @@ class RedisStateKeyValue final : public StateKeyValue private: const std::string joinedKey; - uint32_t lastRemoteLockId = 0; - void pullFromRemote() override; void pullChunkFromRemote(long offset, size_t length) override; diff --git a/include/faabric/state/State.h b/include/faabric/state/State.h index 246c4f392..7868eca56 100644 --- a/include/faabric/state/State.h +++ b/include/faabric/state/State.h @@ -17,9 +17,7 @@ enum StateCalls Append = 4, ClearAppended = 5, PullAppended = 6, - Lock = 7, - Unlock = 8, - Delete = 9, + Delete = 7, }; class State diff --git a/include/faabric/state/StateClient.h b/include/faabric/state/StateClient.h index e2a51857d..ac07b7a4d 100644 --- a/include/faabric/state/StateClient.h +++ b/include/faabric/state/StateClient.h @@ -41,6 +41,6 @@ class StateClient : public faabric::transport::MessageEndpointClient const uint8_t* data, int length); - void logRequest(const std::string &op); + void logRequest(const std::string& op); }; } diff --git a/include/faabric/state/StateServer.h b/include/faabric/state/StateServer.h index 5480778df..d528281ae 100644 --- a/include/faabric/state/StateServer.h +++ b/include/faabric/state/StateServer.h @@ -13,7 +13,7 @@ class StateServer final : public faabric::transport::MessageEndpointServer private: State& state; - void logOperation(const std::string &op); + void logOperation(const std::string& op); void doAsyncRecv(int header, const uint8_t* buffer, @@ -46,11 +46,5 @@ class StateServer final : public faabric::transport::MessageEndpointServer std::unique_ptr recvDelete(const uint8_t* buffer, size_t bufferSize); - - std::unique_ptr recvLock(const uint8_t* buffer, - size_t bufferSize); - - std::unique_ptr recvUnlock(const uint8_t* buffer, - size_t bufferSize); }; } diff --git a/src/state/StateClient.cpp b/src/state/StateClient.cpp index 55541425d..80fe1ede0 100644 --- a/src/state/StateClient.cpp +++ b/src/state/StateClient.cpp @@ -138,18 +138,4 @@ void StateClient::deleteState() sendStateRequest(faabric::state::StateCalls::Delete, nullptr, 0); } - -void StateClient::lock() -{ - logRequest("lock"); - - sendStateRequest(faabric::state::StateCalls::Lock, nullptr, 0); -} - -void StateClient::unlock() -{ - logRequest("unlock"); - - sendStateRequest(faabric::state::StateCalls::Unlock, nullptr, 0); -} } diff --git a/src/state/StateServer.cpp b/src/state/StateServer.cpp index 4a770439d..d3848d828 100644 --- a/src/state/StateServer.cpp +++ b/src/state/StateServer.cpp @@ -50,12 +50,6 @@ StateServer::doSyncRecv(int header, const uint8_t* buffer, size_t bufferSize) case faabric::state::StateCalls::PullAppended: { return recvPullAppended(buffer, bufferSize); } - case faabric::state::StateCalls::Lock: { - return recvLock(buffer, bufferSize); - } - case faabric::state::StateCalls::Unlock: { - return recvUnlock(buffer, bufferSize); - } case faabric::state::StateCalls::Delete: { return recvDelete(buffer, bufferSize); }