8000 result_core refactor by David-Haim · Pull Request #33 · David-Haim/concurrencpp · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

result_core refactor #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

# Others
.vs
CMakeSettings.json

# Specific directories
build/
Expand Down
7 changes: 5 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ set(concurrencpp_sources
source/executors/thread_executor.cpp
source/executors/thread_pool_executor.cpp
source/executors/worker_thread_executor.cpp
source/results/impl/consumer_context.cpp
source/results/impl/result_state.cpp
source/results/promises.cpp
source/results/result_core.cpp
source/runtime/runtime.cpp
source/threads/thread.cpp
source/timers/timer.cpp
Expand All @@ -43,13 +44,15 @@ set(concurrencpp_headers
include/concurrencpp/executors/thread_executor.h
include/concurrencpp/executors/thread_pool_executor.h
include/concurrencpp/executors/worker_thread_executor.h
include/concurrencpp/results/impl/consumer_context.h
include/concurrencpp/results/impl/producer_context.h
include/concurrencpp/results/impl/result_state.h
include/concurrencpp/results/constants.h
include/concurrencpp/results/executor_exception.h
include/concurrencpp/results/make_result.h
include/concurrencpp/results/promises.h
include/concurrencpp/results/result.h
include/concurrencpp/results/result_awaitable.h
include/concurrencpp/results/result_core.h
include/concurrencpp/results/result_fwd_declerations.h
include/concurrencpp/results/when_result.h
include/concurrencpp/runtime/constants.h
Expand Down
4 changes: 0 additions & 4 deletions include/concurrencpp/results/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ namespace concurrencpp::details::consts {

inline const char* k_result_resolve_via_executor_null_error_msg = "result::resolve_via() - given executor is null.";

inline const char* k_result_awaitable_error_msg = "concurrencpp::awaitable_type<type>::await_suspend() - awaitable is empty.";

inline const char* k_result_resolver_error_msg = "result_resolver<type>::await_suspend() - awaitable is empty.";

inline const char* k_executor_exception_error_msg = "concurrencpp::result - an exception was thrown while trying to enqueue result continuation.";

inline const char* k_make_exceptional_result_exception_null_error_msg = "make_exception_result() - given exception_ptr is null.";
Expand Down
2 changes: 1 addition & 1 deletion include/concurrencpp/results/executor_exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ namespace concurrencpp::errors {
};
} // namespace concurrencpp::errors

#endif
#endif
129 changes: 129 additions & 0 deletions include/concurrencpp/results/impl/consumer_context.h
9E12
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#ifndef CONCURRENCPP_CONSUMER_CONTEXT_H
#define CONCURRENCPP_CONSUMER_CONTEXT_H

#include <mutex>
#include <condition_variable>
#include <experimental/coroutine>

#include "concurrencpp/errors.h"

#include "concurrencpp/results/executor_exception.h"

namespace concurrencpp::details {
class await_context {

private:
std::shared_ptr<executor> m_executor;
std::experimental::coroutine_handle<> m_handle;
std::exception_ptr m_executor_exception;

public:
await_context() noexcept = default;
await_context(std::shared_ptr<executor> executor) noexcept;

void set_coro_handle(std::experimental::coroutine_handle<> coro_handle) noexcept;

void operator()(bool capture_executor_exception = true);

void throw_if_executor_threw() const;
};

class wait_context {

private:
std::mutex m_lock;
std::condition_variable m_condition;
bool m_ready = false;

public:
void wait() noexcept;
bool wait_for(size_t milliseconds) noexcept;

void notify() noexcept;
};

class when_all_state_base {

protected:
std::atomic_size_t m_counter;

public:
virtual ~when_all_state_base() noexcept = default;
virtual void on_result_ready() noexcept = 0;
};

class when_any_state_base {

protected:
std::atomic_bool m_fulfilled = false;
std::recursive_mutex m_lock;

public:
virtual ~when_any_state_base() noexcept = default;
virtual void on_result_ready(size_t) noexcept = 0;
};

class when_any_context {

private:
std::shared_ptr<when_any_state_base> m_when_any_state;
size_t m_index;

public:
when_any_context(std::shared_ptr<when_any_state_base> when_any_state, size_t index) noexcept;

void operator()() const noexcept;
};

class consumer_context {

private:
enum class consumer_status { idle, await, await_via, wait, when_all, when_any };

union storage {
int idle;
std::experimental::coroutine_handle<> coro_handle;
await_context* await_ctx;
std::shared_ptr<wait_context> wait_ctx;
std::shared_ptr<when_all_state_base> when_all_state;
when_any_context when_any_ctx;

template<class type, class... argument_type>
static void build(type& o, argument_type&&... arguments) noexcept {
new (std::addressof(o)) type(std::forward<argument_type>(arguments)...);
}

template<class type>
static void destroy(type& o) noexcept {
o.~type();
}

storage() noexcept : idle() {}
~storage() noexcept {}
};

private:
consumer_status m_status;
storage m_storage;

public:
consumer_context() noexcept;
~consumer_context() noexcept;

void clear() noexcept;

void set_await_context(std::experimental::coroutine_handle<> coro_handle) noexcept;

void set_await_via_context(await_context* await_ctx) noexcept;

void set_wait_context(std::shared_ptr<wait_context> wait_ctx) noexcept;

void set_when_all_context(std::shared_ptr<when_all_state_base> when_all_state) noexcept;

void set_when_any_context(std::shared_ptr<when_any_state_base> when_any_ctx, size_t index) noexcept;

void operator()() noexcept;
};
} // namespace concurrencpp::details

#endif
164 changes: 164 additions & 0 deletions include/concurrencpp/results/impl/producer_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#ifndef CONCURRENCPP_PRODUCER_CONTEXT_H
#define CONCURRENCPP_PRODUCER_CONTEXT_H

#include "concurrencpp/results/result_fwd_declerations.h"

#include <optional>

#include <cassert>

namespace concurrencpp::details {
template<class type>
class producer_context {

private:
std::optional<type> m_result;
std::exception_ptr m_exception;

void assert_state() const noexcept {
assert((!m_result.has_value() && !static_cast<bool>(m_exception)) || (m_result.has_value() != static_cast<bool>(m_exception)));
}

public:
template<class... argument_types>
void build_result(argument_types&&... arguments) {
assert(!m_result.has_value());
assert(!static_cast<bool>(m_exception));
m_result.emplace(std::forward<argument_types>(arguments)...);
}

void build_exception(std::exception_ptr exception) noexcept {
assert(!m_result.has_value());
assert(!static_cast<bool>(m_exception));
m_exception = exception;
}

result_status status() const noexcept {
assert_state();

if (m_result.has_value()) {
return result_status::value;
}

if (static_cast<bool>(m_exception)) {
return result_status::exception;
}

return result_status::idle;
}

type get() {
assert_state();

if (m_result.has_value()) {
return std::move(m_result.value());
}

assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}
};

template<>
class producer_context<void> {

private:
std::exception_ptr m_exception;
bool m_ready = false;

void assert_state() const noexcept {
assert((!m_ready && !static_cast<bool>(m_exception)) || (m_ready != static_cast<bool>(m_exception)));
}

public:
void build_result() noexcept {
assert(!m_ready);
assert(!static_cast<bool>(m_exception));
m_ready = true;
}

void build_exception(std::exception_ptr exception) noexcept {
assert(!m_ready);
assert(!static_cast<bool>(m_exception));
m_exception = exception;
}

result_status status() const noexcept {
assert_state();

if (m_ready) {
return result_status::value;
}

if (static_cast<bool>(m_exception)) {
return result_status::exception;
}

return result_status::idle;
}

void get() const {
assert_state();

if (m_ready) {
return;
}

assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}
};

template<class type>
class producer_context<type&> {

private:
type* m_pointer = nullptr;
std::exception_ptr m_exception;

void assert_state() const noexcept {
assert((m_pointer == nullptr && !static_cast<bool>(m_exception)) || ((m_pointer != nullptr) || static_cast<bool>(m_exception)));
}

public:
void build_result(type& reference) noexcept {
assert(m_pointer == nullptr);
assert(!static_cast<bool>(m_exception));
m_pointer = std::addressof(reference);
}

void build_exception(std::exception_ptr exception) noexcept {
assert(m_pointer == nullptr);
assert(!static_cast<bool>(m_exception));
m_exception = exception;
}

result_status status() const noexcept {
assert_state();

if (m_pointer != nullptr) {
return result_status::value;
}

if (static_cast<bool>(m_exception)) {
return result_status::exception;
}

return result_status::idle;
}

type& get() {
assert_state();

if (m_pointer != nullptr) {
assert(reinterpret_cast<size_t>(m_pointer) % alignof(type) == 0);
return *m_pointer;
}

assert(static_cast<bool>(m_exception));
std::rethrow_exception(m_exception);
}
};
} // namespace concurrencpp::details

#endif
Loading
0