From b770558c68e9b4d6a6bc9261976ace040b338509 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 29 Apr 2025 23:07:33 +1200 Subject: [PATCH] Hashes match now. --- CMakeLists.txt | 4 - cmake/FindxxHash.cmake | 38 - src/contrib/transwarp.hpp | 3744 +++++++++++++++++++++++++++++++++++++ src/servers.cpp | 9 +- src/utils/hash.cpp | 34 +- 5 files changed, 3768 insertions(+), 61 deletions(-) delete mode 100644 cmake/FindxxHash.cmake create mode 100644 src/contrib/transwarp.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 41d526b..942b0dc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,9 +32,6 @@ configure_file( # Set CMAKE_MODULE_PATH to include our custom find modules set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake) -# Find required packages -find_package(TBB REQUIRED) - # Auto-detect source files file(GLOB_RECURSE SOURCES "src/*.cpp") file(GLOB_RECURSE HEADERS "src/*.hpp") @@ -50,7 +47,6 @@ target_include_directories(dropshell PRIVATE # Link libraries target_link_libraries(dropshell PRIVATE - TBB::tbb ) # Install targets diff --git a/cmake/FindxxHash.cmake b/cmake/FindxxHash.cmake deleted file mode 100644 index 95457ad..0000000 --- a/cmake/FindxxHash.cmake +++ /dev/null @@ -1,38 +0,0 @@ -# Find xxHash library -# -# This sets the following variables: -# xxHash_FOUND - True if xxHash was found -# xxHash_INCLUDE_DIRS - xxHash include directories -# xxHash_LIBRARIES - xxHash libraries - -find_path(xxHash_INCLUDE_DIR - NAMES xxhash.h - PATHS - /usr/include - /usr/local/include - /opt/local/include - ${CMAKE_INSTALL_PREFIX}/include - PATH_SUFFIXES xxhash -) - -find_library(xxHash_LIBRARY - NAMES xxhash libxxhash - PATHS - /usr/lib - /usr/local/lib - /opt/local/lib - ${CMAKE_INSTALL_PREFIX}/lib -) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(xxHash - FOUND_VAR xxHash_FOUND - REQUIRED_VARS xxHash_LIBRARY xxHash_INCLUDE_DIR -) - -if(xxHash_FOUND) - set(xxHash_LIBRARIES ${xxHash_LIBRARY}) - set(xxHash_INCLUDE_DIRS ${xxHash_INCLUDE_DIR}) -endif() - -mark_as_advanced(xxHash_INCLUDE_DIR xxHash_LIBRARY) \ No newline at end of file diff --git a/src/contrib/transwarp.hpp b/src/contrib/transwarp.hpp new file mode 100644 index 0000000..92e62f1 --- /dev/null +++ b/src/contrib/transwarp.hpp @@ -0,0 +1,3744 @@ +/// @mainpage transwarp is a header-only C++ library for task concurrency +/// @details https://github.com/bloomen/transwarp +/// @version 2.2.3 +/// @copyright MIT http://www.opensource.org/licenses/mit-license.php +#pragma once +#include +#ifndef TRANSWARP_CPP11 +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifndef TRANSWARP_CPP11 +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#ifdef TRANSWARP_MINIMUM_TASK_SIZE + +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA +#define TRANSWARP_DISABLE_TASK_CUSTOM_DATA +#endif + +#ifndef TRANSWARP_DISABLE_TASK_NAME +#define TRANSWARP_DISABLE_TASK_NAME +#endif + +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY +#define TRANSWARP_DISABLE_TASK_PRIORITY +#endif + +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT +#define TRANSWARP_DISABLE_TASK_REFCOUNT +#endif + +#ifndef TRANSWARP_DISABLE_TASK_TIME +#define TRANSWARP_DISABLE_TASK_TIME +#endif + +#endif + + +/// The transwarp namespace +namespace transwarp { + + +#ifdef TRANSWARP_CPP11 +/// A simple value class that optionally holds a string +class option_str { +public: + option_str() {} + + option_str(std::string str) + : str_(std::move(str)), + valid_(true) + {} + + // default copy/move semantics + option_str(const option_str&) = default; + option_str& operator=(const option_str&) = default; + option_str(option_str&&) = default; + option_str& operator=(option_str&&) = default; + + operator bool() const noexcept { + return valid_; + } + + const std::string& operator*() const noexcept { + return str_; + } + +private: + std::string str_; + bool valid_ = false; +}; + +/// Detail namespace for internal functionality only +namespace detail { + +/// Used to handle data storage for a type-erased object +class storage { +public: + virtual ~storage() = default; + virtual std::unique_ptr clone() const = 0; + virtual void destroy(void* data) const noexcept = 0; + virtual void copy(const void* src, void*& dest) const = 0; + virtual void move(void*& src, void*& dest) const noexcept = 0; +}; + +/// Used to handle data storage for a type-erased object +template +class storage_impl : public transwarp::detail::storage { +public: + std::unique_ptr clone() const override { + return std::unique_ptr(new storage_impl); + } + void destroy(void* data) const noexcept override { + delete reinterpret_cast(data); + } + void copy(const void* src, void*& dest) const override { + dest = new T(*reinterpret_cast(src)); + } + void move(void*& src, void*& dest) const noexcept override { + dest = src; + src = nullptr; + } +}; + +} // detail + +/// A simple value class that can hold any value +class any_data { +public: + any_data() + : data_(nullptr) + {} + + template + any_data(T&& value) + : storage_(new transwarp::detail::storage_impl::type>), + data_(new typename std::decay::type(std::forward(value))) + {} + + any_data(const any_data& other) + : storage_(other.storage_ ? other.storage_->clone() : nullptr) + { + if (other.data_) { + storage_->copy(other.data_, data_); + } else { + data_ = nullptr; + } + } + + any_data& operator=(const any_data& other) { + if (this != &other) { + if (storage_) { + storage_->destroy(data_); + } + storage_ = other.storage_ ? other.storage_->clone() : nullptr; + if (other.data_) { + storage_->copy(other.data_, data_); + } else { + data_ = nullptr; + } + } + return *this; + } + + any_data(any_data&& other) + : storage_(std::move(other.storage_)) + { + if (other.data_) { + storage_->move(other.data_, data_); + } else { + data_ = nullptr; + } + } + + any_data& operator=(any_data&& other) { + if (this != &other) { + if (storage_) { + storage_->destroy(data_); + } + storage_ = std::move(other.storage_); + if (other.data_) { + storage_->move(other.data_, data_); + } else { + data_ = nullptr; + } + } + return *this; + } + + ~any_data() { + if (data_) { + storage_->destroy(data_); + } + } + + bool has_value() const noexcept { + return data_ != nullptr; + } + + template + const T& get() const { + return *reinterpret_cast(data_); + } + +private: + std::unique_ptr storage_; + void* data_; +}; + +using str_view = const std::string&; +#else +using any_data = std::any; +using option_str = std::optional; +using str_view = std::string_view; +#endif + + +/// The possible task types +enum class task_type { + root, ///< The task has no parents + accept, ///< The task's functor accepts all parent futures + accept_any, ///< The task's functor accepts the first parent future that becomes ready + consume, ///< The task's functor consumes all parent results + consume_any, ///< The task's functor consumes the first parent result that becomes ready + wait, ///< The task's functor takes no arguments but waits for all parents to finish + wait_any, ///< The task's functor takes no arguments but waits for the first parent to finish +}; + + +/// Base class for exceptions +class transwarp_error : public std::runtime_error { +public: + explicit transwarp_error(const std::string& message) + : std::runtime_error{message} + {} +}; + + +/// Exception thrown when a task is canceled +class task_canceled : public transwarp::transwarp_error { +public: + explicit task_canceled(const std::string& task_repr) + : transwarp::transwarp_error{"Task canceled: " + task_repr} + {} +}; + + +/// Exception thrown when a task was destroyed prematurely +class task_destroyed : public transwarp::transwarp_error { +public: + explicit task_destroyed(const std::string& task_repr) + : transwarp::transwarp_error{"Task destroyed: " + task_repr} + {} +}; + + +/// Exception thrown when an invalid parameter was passed to a function +class invalid_parameter : public transwarp::transwarp_error { +public: + explicit invalid_parameter(const std::string& parameter) + : transwarp::transwarp_error{"Invalid parameter: " + parameter} + {} +}; + + +/// Exception thrown when a task is used in unintended ways +class control_error : public transwarp::transwarp_error { +public: + explicit control_error(const std::string& message) + : transwarp::transwarp_error{"Control error: " + message} + {} +}; + + +/// The root type. Used for tag dispatch +struct root_type : std::integral_constant {}; +constexpr transwarp::root_type root{}; ///< The root task tag + +/// The accept type. Used for tag dispatch +struct accept_type : std::integral_constant {}; +constexpr transwarp::accept_type accept{}; ///< The accept task tag + +/// The accept_any type. Used for tag dispatch +struct accept_any_type : std::integral_constant {}; +constexpr transwarp::accept_any_type accept_any{}; ///< The accept_any task tag + +/// The consume type. Used for tag dispatch +struct consume_type : std::integral_constant {}; +constexpr transwarp::consume_type consume{}; ///< The consume task tag + +/// The consume_any type. Used for tag dispatch +struct consume_any_type : std::integral_constant {}; +constexpr transwarp::consume_any_type consume_any{}; ///< The consume_any task tag + +/// The wait type. Used for tag dispatch +struct wait_type : std::integral_constant {}; +constexpr transwarp::wait_type wait{}; ///< The wait task tag + +/// The wait_any type. Used for tag dispatch +struct wait_any_type : std::integral_constant {}; +constexpr transwarp::wait_any_type wait_any{}; ///< The wait_any task tag + + +class itask; + + +/// Detail namespace for internal functionality only +namespace detail { + +struct visit_visitor; +struct unvisit_visitor; +struct final_visitor; +struct schedule_visitor; +struct parent_visitor; +struct decrement_refcount_functor; + +} // detail + + +/// The executor interface used to perform custom task execution +class executor { +public: + virtual ~executor() = default; + + /// Returns the name of the executor + virtual std::string name() const = 0; + + /// Runs a task which is wrapped by the given functor. The functor only + /// captures one shared pointer and can hence be copied at low cost. + /// task represents the task that the functor belongs to. + /// This function is only ever called on the thread of the caller to schedule(). + /// The implementer needs to ensure that this never throws exceptions + virtual void execute(const std::function& functor, transwarp::itask& task) = 0; +}; + + +/// The task events that can be subscribed to using the listener interface +enum class event_type { + before_scheduled, ///< Just before a task is scheduled (handle_event called on thread of caller to schedule()) + after_future_changed, ///< Just after the task's future was changed (handle_event called on thread that changed the task's future) + before_started, ///< Just before a task starts running (handle_event called on thread that task is run on) + before_invoked, ///< Just before a task's functor is invoked (handle_event called on thread that task is run on) + after_finished, ///< Just after a task has finished running (handle_event called on thread that task is run on) + after_canceled, ///< Just after a task was canceled (handle_event called on thread that task is run on) + after_satisfied, ///< Just after a task has satisfied all its children with results (handle_event called on thread where the last child is satisfied) + after_custom_data_set, ///< Just after custom data was assigned (handle_event called on thread that custom data was set on) + count, +}; + + +/// The listener interface to listen to events raised by tasks +class listener { +public: + virtual ~listener() = default; + + /// This may be called from arbitrary threads depending on the event type (see transwarp::event_type). + /// The implementer needs to ensure that this never throws exceptions. + virtual void handle_event(transwarp::event_type event, transwarp::itask& task) = 0; +}; + + +/// An edge between two tasks +class edge { +public: + edge(transwarp::itask& parent, transwarp::itask& child) noexcept + : parent_(&parent), child_(&child) + {} + + // default copy/move semantics + edge(const edge&) = default; + edge& operator=(const edge&) = default; + edge(edge&&) = default; + edge& operator=(edge&&) = default; + + /// Returns the parent task + const transwarp::itask& parent() const noexcept { + return *parent_; + } + + /// Returns the parent task + transwarp::itask& parent() noexcept { + return *parent_; + } + + /// Returns the child task + const transwarp::itask& child() const noexcept { + return *child_; + } + + /// Returns the child task + transwarp::itask& child() noexcept { + return *child_; + } + +private: + transwarp::itask* parent_; + transwarp::itask* child_; +}; + + +class timer; +class releaser; + +/// An interface for the task class +class itask : public std::enable_shared_from_this { +public: + virtual ~itask() = default; + + virtual void finalize() = 0; + virtual std::size_t id() const noexcept = 0; + virtual std::size_t level() const noexcept = 0; + virtual transwarp::task_type type() const noexcept = 0; + virtual const transwarp::option_str& name() const noexcept = 0; + virtual std::shared_ptr executor() const noexcept = 0; + virtual std::int64_t priority() const noexcept = 0; + virtual const transwarp::any_data& custom_data() const noexcept = 0; + virtual bool canceled() const noexcept = 0; + virtual std::int64_t avg_idletime_us() const noexcept = 0; + virtual std::int64_t avg_waittime_us() const noexcept = 0; + virtual std::int64_t avg_runtime_us() const noexcept = 0; + virtual void set_executor(std::shared_ptr executor) = 0; + virtual void set_executor_all(std::shared_ptr executor) = 0; + virtual void remove_executor() = 0; + virtual void remove_executor_all() = 0; + virtual void set_priority(std::int64_t priority) = 0; + virtual void set_priority_all(std::int64_t priority) = 0; + virtual void reset_priority() = 0; + virtual void reset_priority_all() = 0; + virtual void set_custom_data(transwarp::any_data custom_data) = 0; + virtual void set_custom_data_all(transwarp::any_data custom_data) = 0; + virtual void remove_custom_data() = 0; + virtual void remove_custom_data_all() = 0; + virtual void add_listener(std::shared_ptr listener) = 0; + virtual void add_listener(transwarp::event_type event, std::shared_ptr listener) = 0; + virtual void add_listener_all(std::shared_ptr listener) = 0; + virtual void add_listener_all(transwarp::event_type event, std::shared_ptr listener) = 0; + virtual void remove_listener(const std::shared_ptr& listener) = 0; + virtual void remove_listener(transwarp::event_type event, const std::shared_ptr& listener) = 0; + virtual void remove_listener_all(const std::shared_ptr& listener) = 0; + virtual void remove_listener_all(transwarp::event_type event, const std::shared_ptr& listener) = 0; + virtual void remove_listeners() = 0; + virtual void remove_listeners(transwarp::event_type event) = 0; + virtual void remove_listeners_all() = 0; + virtual void remove_listeners_all(transwarp::event_type event) = 0; + virtual void schedule() = 0; + virtual void schedule(transwarp::executor& executor) = 0; + virtual void schedule(bool reset) = 0; + virtual void schedule(transwarp::executor& executor, bool reset) = 0; + virtual void schedule_all() = 0; + virtual void schedule_all(transwarp::executor& executor) = 0; + virtual void schedule_all(bool reset_all) = 0; + virtual void schedule_all(transwarp::executor& executor, bool reset_all) = 0; + virtual void set_exception(std::exception_ptr exception) = 0; + virtual bool was_scheduled() const noexcept = 0; + virtual void wait() const = 0; + virtual bool is_ready() const = 0; + virtual bool has_result() const = 0; + virtual void reset() = 0; + virtual void reset_all() = 0; + virtual void cancel(bool enabled) noexcept = 0; + virtual void cancel_all(bool enabled) noexcept = 0; + virtual std::vector parents() const = 0; + virtual const std::vector& tasks() = 0; + virtual std::vector edges() = 0; + +protected: + virtual void schedule_impl(bool reset, transwarp::executor* executor=nullptr) = 0; + +private: + friend struct transwarp::detail::visit_visitor; + friend struct transwarp::detail::unvisit_visitor; + friend struct transwarp::detail::final_visitor; + friend struct transwarp::detail::schedule_visitor; + friend struct transwarp::detail::parent_visitor; + friend class transwarp::timer; + friend class transwarp::releaser; + friend struct transwarp::detail::decrement_refcount_functor; + + virtual void visit(const std::function& visitor) = 0; + virtual void unvisit() noexcept = 0; + virtual void set_id(std::size_t id) noexcept = 0; + virtual void set_level(std::size_t level) noexcept = 0; + virtual void set_type(transwarp::task_type type) noexcept = 0; + virtual void set_name(transwarp::option_str name) noexcept = 0; + virtual void set_avg_idletime_us(std::int64_t idletime) noexcept = 0; + virtual void set_avg_waittime_us(std::int64_t waittime) noexcept = 0; + virtual void set_avg_runtime_us(std::int64_t runtime) noexcept = 0; + virtual void increment_childcount() noexcept = 0; + virtual void decrement_refcount() = 0; + virtual void reset_future() = 0; +}; + + +/// String conversion for the task_type enumeration +inline +std::string to_string(const transwarp::task_type& type) { + switch (type) { + case transwarp::task_type::root: return "root"; + case transwarp::task_type::accept: return "accept"; + case transwarp::task_type::accept_any: return "accept_any"; + case transwarp::task_type::consume: return "consume"; + case transwarp::task_type::consume_any: return "consume_any"; + case transwarp::task_type::wait: return "wait"; + case transwarp::task_type::wait_any: return "wait_any"; + } + throw transwarp::invalid_parameter{"task type"}; +} + + +/// String conversion for the itask class +inline +std::string to_string(const transwarp::itask& task, transwarp::str_view separator="\n") { + std::string s; + s += '"'; + const transwarp::option_str& name = task.name(); + if (name) { + s += std::string{"<"} + *name + std::string{">"} + separator.data(); + } + s += transwarp::to_string(task.type()); + s += std::string{" id="} + std::to_string(task.id()); + s += std::string{" lev="} + std::to_string(task.level()); + const std::shared_ptr exec = task.executor(); + if (exec) { + s += separator.data() + std::string{"<"} + exec->name() + std::string{">"}; + } + const std::int64_t avg_idletime_us = task.avg_idletime_us(); + if (avg_idletime_us >= 0) { + s += separator.data() + std::string{"avg-idle-us="} + std::to_string(avg_idletime_us); + } + const std::int64_t avg_waittime_us = task.avg_waittime_us(); + if (avg_waittime_us >= 0) { + s += separator.data() + std::string{"avg-wait-us="} + std::to_string(avg_waittime_us); + } + const std::int64_t avg_runtime_us = task.avg_runtime_us(); + if (avg_runtime_us >= 0) { + s += separator.data() + std::string{"avg-run-us="} + std::to_string(avg_runtime_us); + } + return s + '"'; +} + + +/// String conversion for the edge class +inline +std::string to_string(const transwarp::edge& edge, transwarp::str_view separator="\n") { + return transwarp::to_string(edge.parent(), separator) + std::string{" -> "} + transwarp::to_string(edge.child(), separator); +} + + +/// Creates a dot-style string from the given edges +inline +std::string to_string(const std::vector& edges, transwarp::str_view separator="\n") { + std::string dot = std::string{"digraph {"} + separator.data(); + for (const transwarp::edge& edge : edges) { + dot += transwarp::to_string(edge, separator) + separator.data(); + } + dot += std::string{"}"}; + return dot; +} + + +/// Removes reference and const from a type +template +struct decay { + using type = typename std::remove_const::type>::type; +}; + + +/// Returns the result type of a std::shared_future +template +struct result { + using type = decltype(std::declval>().get()); +}; + + +/// Detail namespace for internal functionality only +namespace detail { + +/// Clones a task +template +std::shared_ptr clone_task(std::unordered_map, std::shared_ptr>& task_cache, const std::shared_ptr& t) { + const auto original_task = std::static_pointer_cast(t); + const auto task_cache_it = task_cache.find(original_task); + if (task_cache_it != task_cache.cend()) { + return std::static_pointer_cast(task_cache_it->second); + } else { + auto cloned_task = t->clone_impl(task_cache); + task_cache[original_task] = cloned_task; + return cloned_task; + } +} + +} // detail + + +/// The task class +template +class task : public transwarp::itask { +public: + using result_type = ResultType; + + virtual ~task() = default; + + std::shared_ptr clone() const { + std::unordered_map, std::shared_ptr> task_cache; + return clone_impl(task_cache); + } + + virtual void set_value(const typename transwarp::decay::type& value) = 0; + virtual void set_value(typename transwarp::decay::type&& value) = 0; + virtual std::shared_future future() const noexcept = 0; + virtual typename transwarp::result::type get() const = 0; + +private: + template + friend std::shared_ptr transwarp::detail::clone_task(std::unordered_map, std::shared_ptr>& task_cache, const std::shared_ptr& t); + + virtual std::shared_ptr> clone_impl(std::unordered_map, std::shared_ptr>& task_cache) const = 0; +}; + +/// The task class (reference result type) +template +class task : public transwarp::itask { +public: + using result_type = ResultType&; + + virtual ~task() = default; + + std::shared_ptr clone() const { + std::unordered_map, std::shared_ptr> task_cache; + return clone_impl(task_cache); + } + + virtual void set_value(typename transwarp::decay::type& value) = 0; + virtual std::shared_future future() const noexcept = 0; + virtual typename transwarp::result::type get() const = 0; + +private: + template + friend std::shared_ptr transwarp::detail::clone_task(std::unordered_map, std::shared_ptr>& task_cache, const std::shared_ptr& t); + + virtual std::shared_ptr> clone_impl(std::unordered_map, std::shared_ptr>& task_cache) const = 0; +}; + +/// The task class (void result type) +template<> +class task : public transwarp::itask { +public: + using result_type = void; + + virtual ~task() = default; + + std::shared_ptr clone() const { + std::unordered_map, std::shared_ptr> task_cache; + return clone_impl(task_cache); + } + + virtual void set_value() = 0; + virtual std::shared_future future() const noexcept = 0; + virtual result_type get() const = 0; + +private: + template + friend std::shared_ptr transwarp::detail::clone_task(std::unordered_map, std::shared_ptr>& task_cache, const std::shared_ptr& t); + + virtual std::shared_ptr> clone_impl(std::unordered_map, std::shared_ptr>& task_cache) const = 0; +}; + + +/// Detail namespace for internal functionality only +namespace detail { + +template +struct assign_task_if_impl; + +} // detail + + +/// A base class for a user-defined functor that needs access to the associated +/// task or a cancel point to stop a task while it's running +class functor { +public: + + virtual ~functor() = default; + +protected: + + /// The associated task (only to be called after the task was constructed) + const transwarp::itask& transwarp_task() const noexcept { + return *transwarp_task_; + } + + /// The associated task (only to be called after the task was constructed) + transwarp::itask& transwarp_task() noexcept { + return *transwarp_task_; + } + + /// If the associated task is canceled then this will throw transwarp::task_canceled + /// which will stop the task while it's running (only to be called after the task was constructed) + void transwarp_cancel_point() const { + if (transwarp_task_->canceled()) { + throw transwarp::task_canceled{std::to_string(transwarp_task_->id())}; + } + } + +private: + template + friend struct transwarp::detail::assign_task_if_impl; + + transwarp::itask* transwarp_task_{}; +}; + + +/// Detail namespace for internal functionality only +namespace detail { + + +/// A simple thread pool used to execute tasks in parallel +class thread_pool { +public: + + explicit thread_pool(const std::size_t n_threads, + std::function on_thread_started = nullptr) + : on_thread_started_{std::move(on_thread_started)} + { + if (n_threads == 0) { + throw transwarp::invalid_parameter{"number of threads"}; + } + for (std::size_t i = 0; i < n_threads; ++i) { + std::thread thread; + try { + thread = std::thread(&thread_pool::worker, this, i); + } catch (...) { + shutdown(); + throw; + } + try { + threads_.push_back(std::move(thread)); + } catch (...) { + shutdown(); + thread.join(); + throw; + } + } + } + + // delete copy/move semantics + thread_pool(const thread_pool&) = delete; + thread_pool& operator=(const thread_pool&) = delete; + thread_pool(thread_pool&&) = delete; + thread_pool& operator=(thread_pool&&) = delete; + + ~thread_pool() { + shutdown(); + } + + void push(const std::function& functor) { + { + std::lock_guard lock{mutex_}; + functors_.push(functor); + } + cond_var_.notify_one(); + } + +private: + + void worker(const std::size_t index) { + if (on_thread_started_) { + on_thread_started_(index); + } + for (;;) { + std::function functor; + { + std::unique_lock lock{mutex_}; + cond_var_.wait(lock, [this]{ + return done_ || !functors_.empty(); + }); + if (done_ && functors_.empty()) { + break; + } + functor = std::move(functors_.front()); + functors_.pop(); + } + functor(); + } + } + + void shutdown() { + { + std::lock_guard lock{mutex_}; + done_ = true; + } + cond_var_.notify_all(); + for (std::thread& thread : threads_) { + thread.join(); + } + threads_.clear(); + } + + bool done_ = false; + std::function on_thread_started_; + std::vector threads_; + std::queue> functors_; + std::condition_variable cond_var_; + std::mutex mutex_; +}; + + +#ifdef TRANSWARP_CPP11 +template struct indices {}; + +template struct construct_range; + +template +struct construct_range : construct_range {}; + +template +struct construct_range { + using type = transwarp::detail::indices; +}; + +template +struct index_range { + using type = typename transwarp::detail::construct_range::type; +}; + +template +void call_with_each_index(transwarp::detail::indices<>, Functor&&, Tuple&&) {} + +template +void call_with_each_index(transwarp::detail::indices, Functor&& f, Tuple&& t) { + f(std::get(t)); + transwarp::detail::call_with_each_index(transwarp::detail::indices{}, std::forward(f), std::forward(t)); +} +#endif + +template +void apply_to_each(Functor&& f, Tuple&& t) { +#ifdef TRANSWARP_CPP11 + constexpr std::size_t n = std::tuple_size::type>::value; + using index_t = typename transwarp::detail::index_range<0, n>::type; + transwarp::detail::call_with_each_index(index_t{}, std::forward(f), std::forward(t)); +#else + std::apply([&f](auto&&... arg){(..., std::forward(f)(std::forward(arg)));}, std::forward(t)); +#endif +} + +template +void apply_to_each(Functor&& f, const std::vector& v) { + std::for_each(v.begin(), v.end(), std::forward(f)); +} + +template +void apply_to_each(Functor&& f, std::vector& v) { + std::for_each(v.begin(), v.end(), std::forward(f)); +} + + +template +struct assign_futures_impl { + static void work(const std::tuple>...>& source, std::tuple...>& target) { + std::get(target) = std::get(source)->future(); + assign_futures_impl::work(source, target); + } +}; + +template +struct assign_futures_impl<-1, ParentResults...> { + static void work(const std::tuple>...>&, std::tuple...>&) {} +}; + +/// Returns the futures from the given tuple of tasks +template +std::tuple...> get_futures(const std::tuple>...>& input) { + std::tuple...> result; + assign_futures_impl(sizeof...(ParentResults)) - 1, ParentResults...>::work(input, result); + return result; +} + +/// Returns the futures from the given vector of tasks +template +std::vector> get_futures(const std::vector>>& input) { + std::vector> result; + result.reserve(input.size()); + for (const std::shared_ptr>& task : input) { + result.emplace_back(task->future()); + } + return result; +} + + +/// Runs the task with the given arguments, hence, invoking the task's functor +template +Result run_task(std::size_t task_id, const std::weak_ptr& task, Args&&... args) { + const std::shared_ptr t = task.lock(); + if (!t) { + throw transwarp::task_destroyed{std::to_string(task_id)}; + } + if (t->canceled()) { + throw transwarp::task_canceled{std::to_string(task_id)}; + } + t->raise_event(transwarp::event_type::before_invoked); + return (*t->functor_)(std::forward(args)...); +} + + +struct wait_for_all_functor { + template + void operator()(const T& p) const { + p->future().wait(); + } +}; + +/// Waits for all parents to finish +template +void wait_for_all(const std::tuple>...>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents); +} + +/// Waits for all parents to finish +template +void wait_for_all(const std::vector>>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::wait_for_all_functor{}, parents); +} + + +template +Parent wait_for_any_impl() { + return {}; +} + +template +Parent wait_for_any_impl(const std::shared_ptr>& parent, const std::shared_ptr>& ...parents) { + const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1)); + if (status == std::future_status::ready) { + return parent; + } + return transwarp::detail::wait_for_any_impl(parents...); +} + +/// Waits for the first parent to finish +template +Parent wait_for_any(const std::shared_ptr>& ...parents) { + for (;;) { + Parent parent = transwarp::detail::wait_for_any_impl(parents...); + if (parent) { + return parent; + } + } +} + + +/// Waits for the first parent to finish +template +std::shared_ptr> wait_for_any(const std::vector>>& parents) { + for (;;) { + for (const std::shared_ptr>& parent : parents) { + const std::future_status status = parent->future().wait_for(std::chrono::microseconds(1)); + if (status == std::future_status::ready) { + return parent; + } + } + } +} + + +template +struct cancel_all_but_one_functor { + explicit cancel_all_but_one_functor(const std::shared_ptr>& one) noexcept + : one_(one) {} + + template + void operator()(const T& parent) const { + if (one_ != parent) { + parent->cancel(true); + } + } + + const std::shared_ptr>& one_; +}; + +/// Cancels all tasks but one +template +void cancel_all_but_one(const std::shared_ptr>& one, const std::tuple>...>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor{one}, parents); +} + +/// Cancels all tasks but one +template +void cancel_all_but_one(const std::shared_ptr>& one, const std::vector>>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::cancel_all_but_one_functor{one}, parents); +} + + +struct decrement_refcount_functor { + template + void operator()(const T& task) const { + task->decrement_refcount(); + } +}; + +/// Decrements the refcount of all parents +template +void decrement_refcount(const std::tuple>...>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents); +} + +/// Decrements the refcount of all parents +template +void decrement_refcount(const std::vector>>& parents) { + transwarp::detail::apply_to_each(transwarp::detail::decrement_refcount_functor{}, parents); +} + + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + return call_impl(sizeof...(n)), total, n..., static_cast(sizeof...(n))>::template + work(task_id, task, parents); + } +}; + +template +struct call_impl_vector; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>&) { + return transwarp::detail::run_task(task_id, task); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>&) { + return transwarp::detail::run_task(task_id, task); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + transwarp::detail::wait_for_all(parents); + const std::tuple...> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, std::get(futures)...); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + transwarp::detail::wait_for_all(parents); + std::vector> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, std::move(futures)); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + using parent_t = typename std::remove_reference(parents))>::type; // Use first type as reference + parent_t parent = transwarp::detail::wait_for_any(std::get(parents)...); + transwarp::detail::cancel_all_but_one(parent, parents); + auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, std::move(future)); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + std::shared_ptr> parent = transwarp::detail::wait_for_any(parents); + transwarp::detail::cancel_all_but_one(parent, parents); + auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, std::move(future)); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + transwarp::detail::wait_for_all(parents); + const std::tuple...> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, std::get(futures).get()...); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + transwarp::detail::wait_for_all(parents); + const std::vector> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + std::vector results; + results.reserve(futures.size()); + for (const std::shared_future& future : futures) { + results.emplace_back(future.get()); + } + return transwarp::detail::run_task(task_id, task, std::move(results)); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + using parent_t = typename std::remove_reference(parents))>::type; /// Use first type as reference + parent_t parent = transwarp::detail::wait_for_any(std::get(parents)...); + transwarp::detail::cancel_all_but_one(parent, parents); + const auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, future.get()); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + std::shared_ptr> parent = transwarp::detail::wait_for_any(parents); + transwarp::detail::cancel_all_but_one(parent, parents); + const auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + return transwarp::detail::run_task(task_id, task, future.get()); + } +}; + +struct future_get_functor { + template + void operator()(const std::shared_future& f) const { + f.get(); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + transwarp::detail::wait_for_all(parents); + const std::tuple...> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated + return transwarp::detail::run_task(task_id, task); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + transwarp::detail::wait_for_all(parents); + const std::vector> futures = transwarp::detail::get_futures(parents); + transwarp::detail::decrement_refcount(parents); + transwarp::detail::apply_to_each(transwarp::detail::future_get_functor{}, futures); // Ensures that exceptions are propagated + return transwarp::detail::run_task(task_id, task); + } +}; + +template +struct call_impl { + template + static Result work(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + using parent_t = typename std::remove_reference(parents))>::type; // Use first type as reference + parent_t parent = transwarp::detail::wait_for_any(std::get(parents)...); + transwarp::detail::cancel_all_but_one(parent, parents); + const auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + future.get(); // Ensures that exceptions are propagated + return transwarp::detail::run_task(task_id, task); + } +}; + +template<> +struct call_impl_vector { + template + static Result work(std::size_t task_id, const Task& task, const std::vector>>& parents) { + std::shared_ptr> parent = transwarp::detail::wait_for_any(parents); + transwarp::detail::cancel_all_but_one(parent, parents); + const auto future = parent->future(); + transwarp::detail::decrement_refcount(parents); + future.get(); // Ensures that exceptions are propagated + return transwarp::detail::run_task(task_id, task); + } +}; + +/// Calls the functor of the given task with the results from the tuple of parents. +/// Throws transwarp::task_canceled if the task is canceled. +/// Throws transwarp::task_destroyed in case the task was destroyed prematurely. +template +Result call(std::size_t task_id, const Task& task, const std::tuple>...>& parents) { + constexpr std::size_t n = std::tuple_size...>>::value; + return transwarp::detail::call_impl(n)>::template + work(task_id, task, parents); +} + +/// Calls the functor of the given task with the results from the vector of parents. +/// Throws transwarp::task_canceled if the task is canceled. +/// Throws transwarp::task_destroyed in case the task was destroyed prematurely. +template +Result call(std::size_t task_id, const Task& task, const std::vector>>& parents) { + return transwarp::detail::call_impl_vector::template + work(task_id, task, parents); +} + + +template +struct call_with_each_functor { + explicit call_with_each_functor(const Functor& f) noexcept + : f_(f) {} + + template + void operator()(const T& task) const { + if (!task) { + throw transwarp::invalid_parameter{"task pointer"}; + } + f_(*task); + } + + const Functor& f_; +}; + +/// Calls the functor with every element in the tuple +template +void call_with_each(const Functor& f, const std::tuple>...>& t) { + transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor{f}, t); +} + +/// Calls the functor with every element in the vector +template +void call_with_each(const Functor& f, const std::vector>>& v) { + transwarp::detail::apply_to_each(transwarp::detail::call_with_each_functor{f}, v); +} + + +/// Sets level of a task and increments the child count +struct parent_visitor { + explicit parent_visitor(transwarp::itask& task) noexcept + : task_(task) {} + + void operator()(transwarp::itask& task) const { + if (task_.level() <= task.level()) { + // A child's level is always larger than any of its parents' levels + task_.set_level(task.level() + 1); + } + task.increment_childcount(); + } + + transwarp::itask& task_; +}; + +/// Applies final bookkeeping to the task and collects the task +struct final_visitor { + explicit final_visitor(std::vector& tasks) noexcept + : tasks_(tasks) {} + + void operator()(transwarp::itask& task) noexcept { + tasks_.push_back(&task); + task.set_id(id_++); + } + + std::vector& tasks_; + std::size_t id_ = 0; +}; + +/// Generates edges +struct edges_visitor { + explicit edges_visitor(std::vector& edges) noexcept + : edges_(edges) {} + + void operator()(transwarp::itask& task) { + for (transwarp::itask* parent : task.parents()) { + edges_.emplace_back(*parent, task); + } + } + + std::vector& edges_; +}; + +/// Schedules using the given executor +struct schedule_visitor { + schedule_visitor(bool reset, transwarp::executor* executor) noexcept + : reset_(reset), executor_(executor) {} + + void operator()(transwarp::itask& task) { + task.schedule_impl(reset_, executor_); + } + + bool reset_; + transwarp::executor* executor_; +}; + +/// Resets the given task +struct reset_visitor { + + void operator()(transwarp::itask& task) const { + task.reset(); + } +}; + +/// Cancels or resumes the given task +struct cancel_visitor { + explicit cancel_visitor(bool enabled) noexcept + : enabled_{enabled} {} + + void operator()(transwarp::itask& task) const noexcept { + task.cancel(enabled_); + } + + bool enabled_; +}; + +/// Assigns an executor to the given task +struct set_executor_visitor { + explicit set_executor_visitor(std::shared_ptr executor) noexcept + : executor_{std::move(executor)} {} + + void operator()(transwarp::itask& task) const noexcept { + task.set_executor(executor_); + } + + std::shared_ptr executor_; +}; + +/// Removes the executor from the given task +struct remove_executor_visitor { + + void operator()(transwarp::itask& task) const noexcept { + task.remove_executor(); + } +}; + +/// Assigns a priority to the given task +struct set_priority_visitor { + explicit set_priority_visitor(std::int64_t priority) noexcept + : priority_{priority} {} + + void operator()(transwarp::itask& task) const noexcept { + task.set_priority(priority_); + } + + std::int64_t priority_; +}; + +/// Resets the priority of the given task +struct reset_priority_visitor { + + void operator()(transwarp::itask& task) const noexcept { + task.reset_priority(); + } +}; + +/// Assigns custom data to the given task +struct set_custom_data_visitor { + explicit set_custom_data_visitor(transwarp::any_data custom_data) noexcept + : custom_data_{std::move(custom_data)} {} + + void operator()(transwarp::itask& task) const noexcept { + task.set_custom_data(custom_data_); + } + + transwarp::any_data custom_data_; +}; + +/// Removes custom data from the given task +struct remove_custom_data_visitor { + + void operator()(transwarp::itask& task) const noexcept { + task.remove_custom_data(); + } +}; + +/// Pushes the given task into the vector of tasks +struct push_task_visitor { + explicit push_task_visitor(std::vector& tasks) + : tasks_(tasks) {} + + void operator()(transwarp::itask& task) { + tasks_.push_back(&task); + } + + std::vector& tasks_; +}; + +/// Adds a new listener to the given task +struct add_listener_visitor { + explicit add_listener_visitor(std::shared_ptr listener) + : listener_(std::move(listener)) + {} + + void operator()(transwarp::itask& task) { + task.add_listener(listener_); + } + + std::shared_ptr listener_; +}; + +/// Adds a new listener per event type to the given task +struct add_listener_per_event_visitor { + add_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr listener) + : event_(event), listener_(std::move(listener)) + {} + + void operator()(transwarp::itask& task) { + task.add_listener(event_, listener_); + } + + transwarp::event_type event_; + std::shared_ptr listener_; +}; + +/// Removes a listener from the given task +struct remove_listener_visitor { + explicit remove_listener_visitor(std::shared_ptr listener) + : listener_(std::move(listener)) + {} + + void operator()(transwarp::itask& task) { + task.remove_listener(listener_); + } + + std::shared_ptr listener_; +}; + +/// Removes a listener per event type from the given task +struct remove_listener_per_event_visitor { + remove_listener_per_event_visitor(transwarp::event_type event, std::shared_ptr listener) + : event_(event), listener_(std::move(listener)) + {} + + void operator()(transwarp::itask& task) { + task.remove_listener(event_, listener_); + } + + transwarp::event_type event_; + std::shared_ptr listener_; +}; + +/// Removes all listeners from the given task +struct remove_listeners_visitor { + + void operator()(transwarp::itask& task) { + task.remove_listeners(); + } + +}; + +/// Removes all listeners per event type from the given task +struct remove_listeners_per_event_visitor { + explicit remove_listeners_per_event_visitor(transwarp::event_type event) + : event_(event) + {} + + void operator()(transwarp::itask& task) { + task.remove_listeners(event_); + } + + transwarp::event_type event_; +}; + +/// Visits the given task using the visitor given in the constructor +struct visit_visitor { + explicit visit_visitor(const std::function& visitor) noexcept + : visitor_(visitor) {} + + void operator()(transwarp::itask& task) const { + task.visit(visitor_); + } + + const std::function& visitor_; +}; + +/// Unvisits the given task +struct unvisit_visitor { + + void operator()(transwarp::itask& task) const noexcept { + task.unvisit(); + } +}; + +/// Determines the result type of the Functor dispatching on the task type +template +struct functor_result { + static_assert(std::is_same::value || + std::is_same::value || + std::is_same::value || + std::is_same::value || + std::is_same::value || + std::is_same::value || + std::is_same::value, + "Invalid task type, must be one of: root, accept, accept_any, consume, consume_any, wait, wait_any"); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) == 0, "A root task cannot have parent tasks"); + using type = decltype(std::declval()()); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "An accept task must have at least one parent"); + using type = decltype(std::declval()(std::declval>()...)); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()(std::declval>>())); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "An accept_any task must have at least one parent"); + using arg_t = typename std::tuple_element<0, std::tuple>::type; // Using first type as reference + using type = decltype(std::declval()(std::declval>())); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()(std::declval>())); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "A consume task must have at least one parent"); + using type = decltype(std::declval()(std::declval()...)); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()(std::declval>())); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "A consume_any task must have at least one parent"); + using arg_t = typename std::tuple_element<0, std::tuple>::type; // Using first type as reference + using type = decltype(std::declval()(std::declval())); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()(std::declval())); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "A wait task must have at least one parent"); + using type = decltype(std::declval()()); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()()); +}; + +template +struct functor_result { + static_assert(sizeof...(ParentResults) > 0, "A wait_any task must have at least one parent"); + using type = decltype(std::declval()()); +}; + +template +struct functor_result>>> { + using type = decltype(std::declval()()); +}; + + +template +struct assign_task_if_impl; + +template<> +struct assign_task_if_impl { + template + void operator()(Functor&, transwarp::itask&) const noexcept {} +}; + +template<> +struct assign_task_if_impl { + template + void operator()(Functor& functor, transwarp::itask& task) const noexcept { + functor.transwarp_task_ = &task; + } +}; + +/// Assigns the task to the given functor if the functor is a subclass of transwarp::functor +template +void assign_task_if(Functor& functor, transwarp::itask& task) noexcept { + transwarp::detail::assign_task_if_impl::value>{}(functor, task); +} + + +/// Returns a ready future with the given value as its state +template +std::shared_future make_future_with_value(Value&& value) { + std::promise promise; + promise.set_value(std::forward(value)); + return promise.get_future(); +} + +/// Returns a ready future +inline +std::shared_future make_ready_future() { + std::promise promise; + promise.set_value(); + return promise.get_future(); +} + +/// Returns a ready future with the given exception as its state +template +std::shared_future make_future_with_exception(std::exception_ptr exception) { + if (!exception) { + throw transwarp::invalid_parameter{"exception pointer"}; + } + std::promise promise; + promise.set_exception(exception); + return promise.get_future(); +} + + +struct clone_task_functor { + explicit clone_task_functor(std::unordered_map, std::shared_ptr>& task_cache) noexcept + : task_cache_(task_cache) {} + + template + void operator()(T& t) { + t = transwarp::detail::clone_task(task_cache_, t); + } + + std::unordered_map, std::shared_ptr>& task_cache_; +}; + + +struct push_task_functor { + explicit push_task_functor(std::vector& tasks) noexcept + : tasks_(tasks) {} + + template + void operator()(T& t) { + tasks_.push_back(t.get()); + } + + std::vector& tasks_; +}; + + +/// Determines the type of the parents +template +struct parents { + using type = std::tuple>...>; + static std::size_t size(const type&) { + return std::tuple_size::value; + } + static type clone(std::unordered_map, std::shared_ptr>& task_cache, const type& obj) { + type cloned = obj; + transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned); + return cloned; + } + static std::vector tasks(const type& parents) { + std::vector tasks; + transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents); + return tasks; + } +}; + +/// Determines the type of the parents. Specialization for vector parents +template +struct parents>>> { + using type = std::vector>>; + static std::size_t size(const type& obj) { + return obj.size(); + } + static type clone(std::unordered_map, std::shared_ptr>& task_cache, const type& obj) { + type cloned = obj; + transwarp::detail::apply_to_each(transwarp::detail::clone_task_functor{task_cache}, cloned); + return cloned; + } + static std::vector tasks(const type& parents) { + std::vector tasks; + transwarp::detail::apply_to_each(transwarp::detail::push_task_functor{tasks}, parents); + return tasks; + } +}; + + +template +class base_runner { +protected: + + template + void call(std::size_t task_id, + const std::weak_ptr& task, + const Parents& parents) { + promise_.set_value(transwarp::detail::call(task_id, task, parents)); + } + + std::promise promise_; +}; + +template +class base_runner { +protected: + + template + void call(std::size_t task_id, + const std::weak_ptr& task, + const Parents& parents) { + transwarp::detail::call(task_id, task, parents); + promise_.set_value(); + } + + std::promise promise_; +}; + +/// A callable to run a task given its parents +template +class runner : public transwarp::detail::base_runner { +public: + + runner(std::size_t task_id, + const std::weak_ptr& task, + const typename transwarp::decay::type& parents) + : task_id_(task_id), + task_(task), + parents_(parents) + {} + + std::future future() { + return this->promise_.get_future(); + } + + void operator()() { + if (const std::shared_ptr t = task_.lock()) { + t->raise_event(transwarp::event_type::before_started); + } + try { + this->call(task_id_, task_, parents_); + } catch (const transwarp::task_canceled&) { + this->promise_.set_exception(std::current_exception()); + if (const std::shared_ptr t = task_.lock()) { + t->raise_event(transwarp::event_type::after_canceled); + } + } catch (...) { + this->promise_.set_exception(std::current_exception()); + } + if (const std::shared_ptr t = task_.lock()) { + t->raise_event(transwarp::event_type::after_finished); + } + } + +private: + const std::size_t task_id_; + const std::weak_ptr task_; + const typename transwarp::decay::type parents_; +}; + + +/// A simple circular buffer (FIFO). +/// ValueType must support default construction. The buffer lets you push +/// new values onto the back and pop old values off the front. +template +class circular_buffer { +public: + + static_assert(std::is_default_constructible::value, "ValueType must be default constructible"); + + using value_type = ValueType; + + /// Constructs a circular buffer with a given fixed capacity + explicit + circular_buffer(std::size_t capacity) + : data_(capacity) + { + if (capacity < 1) { + throw transwarp::invalid_parameter{"capacity"}; + } + } + + // delete copy/move semantics + circular_buffer(const circular_buffer&) = delete; + circular_buffer& operator=(const circular_buffer&) = delete; + circular_buffer(circular_buffer&& other) = delete; + circular_buffer& operator=(circular_buffer&&) = delete; + + /// Pushes a new value onto the end of the buffer. If that exceeds the capacity + /// of the buffer then the oldest value gets dropped (the one at the front). + template::type, value_type>::value>::type> + void push(T&& value) { + data_[end_] = std::forward(value); + increment(); + } + + /// Returns the value at the front of the buffer (the oldest value). + /// This is undefined if the buffer is empty + const value_type& front() const { + return data_[front_]; + } + + /// Removes the value at the front of the buffer (the oldest value) + void pop() { + if (!empty()) { + data_[front_] = ValueType{}; + decrement(); + } + } + + /// Returns the capacity of the buffer + std::size_t capacity() const { + return data_.size(); + } + + /// Returns the number of populated values of the buffer. Its maximum value + /// equals the capacity of the buffer + std::size_t size() const { + return size_; + } + + /// Returns whether the buffer is empty + bool empty() const { + return size_ == 0; + } + + /// Returns whether the buffer is full + bool full() const { + return size_ == data_.size(); + } + + /// Swaps this buffer with the given buffer + void swap(circular_buffer& buffer) { + std::swap(end_, buffer.end_); + std::swap(front_, buffer.front_); + std::swap(size_, buffer.size_); + std::swap(data_, buffer.data_); + } + +private: + + void increment_or_wrap(std::size_t& value) const { + if (value == data_.size() - 1) { + value = 0; + } else { + ++value; + } + } + + void increment() { + increment_or_wrap(end_); + if (full()) { + increment_or_wrap(front_); + } else { + ++size_; + } + } + + void decrement() { + increment_or_wrap(front_); + --size_; + } + + std::size_t end_{}; + std::size_t front_{}; + std::size_t size_{}; + std::vector data_; +}; + + +class spinlock { +public: + + void lock() noexcept { + while (locked_.test_and_set(std::memory_order_acquire)); + } + + void unlock() noexcept { + locked_.clear(std::memory_order_release); + } + +private: + std::atomic_flag locked_ = ATOMIC_FLAG_INIT; +}; + + +} // detail + + +/// A functor not doing nothing +struct no_op_functor { + void operator()() const noexcept {} +}; + +/// An object to use in places where a no-op functor is required +constexpr no_op_functor no_op{}; + + +/// Executor for sequential execution. Runs functors sequentially on the same thread +class sequential : public transwarp::executor { +public: + + sequential() = default; + + // delete copy/move semantics + sequential(const sequential&) = delete; + sequential& operator=(const sequential&) = delete; + sequential(sequential&&) = delete; + sequential& operator=(sequential&&) = delete; + + /// Returns the name of the executor + std::string name() const override { + return "transwarp::sequential"; + } + + /// Runs the functor on the current thread + void execute(const std::function& functor, transwarp::itask&) override { + functor(); + } +}; + + +/// Executor for parallel execution. Uses a simple thread pool +class parallel : public transwarp::executor { +public: + + explicit parallel(const std::size_t n_threads, + std::function on_thread_started = nullptr) + : pool_{n_threads, std::move(on_thread_started)} + {} + + // delete copy/move semantics + parallel(const parallel&) = delete; + parallel& operator=(const parallel&) = delete; + parallel(parallel&&) = delete; + parallel& operator=(parallel&&) = delete; + + /// Returns the name of the executor + std::string name() const override { + return "transwarp::parallel"; + } + + /// Pushes the functor into the thread pool for asynchronous execution + void execute(const std::function& functor, transwarp::itask&) override { + pool_.push(functor); + } + +private: + transwarp::detail::thread_pool pool_; +}; + + +/// Detail namespace for internal functionality only +namespace detail { + +const transwarp::option_str nullopt_string; +const transwarp::any_data any_empty; + + +template +struct make_future_functor; + +template +struct make_future_functor { + template + void operator()(Future& future, const OtherFuture& other) const { + other.get(); + future = transwarp::detail::make_ready_future(); + } +}; + +template +struct make_future_functor { + template + void operator()(Future& future, const OtherFuture& other) const { + future = transwarp::detail::make_future_with_value(other.get()); + } +}; + + +/// Common task functionality shared across `task_impl` and `value_task` +template +class task_common : public transwarp::task { +public: + /// The result type of this task + using result_type = ResultType; + + /// The task's id + std::size_t id() const noexcept override { + return id_; + } + + /// The optional task name + const transwarp::option_str& name() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_NAME + return name_; +#else + return transwarp::detail::nullopt_string; +#endif + } + + /// The task priority (defaults to 0) + std::int64_t priority() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + return priority_; +#else + return 0; +#endif + } + + /// The custom task data (may not hold a value) + const transwarp::any_data& custom_data() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + return custom_data_; +#else + return transwarp::detail::any_empty; +#endif + } + + /// Sets a task priority (defaults to 0). transwarp will not directly use this. + /// This is only useful if something else is using the priority (e.g. a custom executor) + void set_priority(std::int64_t priority) override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + ensure_task_not_running(); + priority_ = priority; +#else + (void)priority; +#endif + } + + /// Resets the task priority to 0 + void reset_priority() override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + ensure_task_not_running(); + priority_ = 0; +#endif + } + + /// Assigns custom data to this task. transwarp will not directly use this. + /// This is only useful if something else is using this custom data (e.g. a custom executor) + void set_custom_data(transwarp::any_data custom_data) override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + ensure_task_not_running(); + if (!custom_data.has_value()) { + throw transwarp::invalid_parameter{"custom data"}; + } + custom_data_ = std::move(custom_data); + raise_event(transwarp::event_type::after_custom_data_set); +#else + (void)custom_data; +#endif + } + + /// Removes custom data from this task + void remove_custom_data() override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + ensure_task_not_running(); + custom_data_ = {}; + raise_event(transwarp::event_type::after_custom_data_set); +#endif + } + + /// Returns the future associated to the underlying execution + std::shared_future future() const noexcept override { + return future_; + } + + /// Adds a new listener for all event types + void add_listener(std::shared_ptr listener) override { + ensure_task_not_running(); + check_listener(listener); + ensure_listeners_object(); + for (int i=0; i(transwarp::event_type::count); ++i) { + (*listeners_)[static_cast(i)].push_back(listener); + } + } + + /// Adds a new listener for the given event type only + void add_listener(transwarp::event_type event, std::shared_ptr listener) override { + ensure_task_not_running(); + check_listener(listener); + ensure_listeners_object(); + (*listeners_)[event].push_back(std::move(listener)); + } + + /// Removes the listener for all event types + void remove_listener(const std::shared_ptr& listener) override { + ensure_task_not_running(); + check_listener(listener); + if (!listeners_) { + return; + } + for (int i=0; i(transwarp::event_type::count); ++i) { + auto listeners_pair = listeners_->find(static_cast(i)); + if (listeners_pair != listeners_->end()) { + std::vector>& l = listeners_pair->second; + l.erase(std::remove(l.begin(), l.end(), listener), l.end()); + } + } + } + + /// Removes the listener for the given event type only + void remove_listener(transwarp::event_type event, const std::shared_ptr& listener) override { + ensure_task_not_running(); + check_listener(listener); + if (!listeners_) { + return; + } + auto listeners_pair = listeners_->find(event); + if (listeners_pair != listeners_->end()) { + std::vector>& l = listeners_pair->second; + l.erase(std::remove(l.begin(), l.end(), listener), l.end()); + } + } + + /// Removes all listeners + void remove_listeners() override { + ensure_task_not_running(); + if (!listeners_) { + return; + } + listeners_->clear(); + } + + /// Removes all listeners for the given event type + void remove_listeners(transwarp::event_type event) override { + ensure_task_not_running(); + if (!listeners_) { + return; + } + auto listeners_pair = listeners_->find(event); + if (listeners_pair != listeners_->end()) { + listeners_pair->second.clear(); + } + } + +protected: + + using listeners_t = std::map>>; + using tasks_t = std::vector; + + /// Checks if the task is currently running and throws transwarp::control_error if it is + void ensure_task_not_running() const { + if (future_.valid() && future_.wait_for(std::chrono::seconds{0}) != std::future_status::ready) { + throw transwarp::control_error{"task currently running: " + transwarp::to_string(*this, " ")}; + } + } + + /// Raises the given event to all listeners + void raise_event(transwarp::event_type event) { + if (!listeners_) { + return; + } + auto listeners_pair = listeners_->find(event); + if (listeners_pair != listeners_->end()) { + for (const std::shared_ptr& listener : listeners_pair->second) { + listener->handle_event(event, *this); + } + } + } + + /// Check for non-null listener pointer + void check_listener(const std::shared_ptr& listener) const { + if (!listener) { + throw transwarp::invalid_parameter{"listener pointer"}; + } + } + + void ensure_listeners_object() { + if (!listeners_) { + listeners_.reset(new listeners_t); + } + } + + /// Assigns the given id + void set_id(std::size_t id) noexcept override { + id_ = id; + } + + /// Assigns the given name + void set_name(transwarp::option_str name) noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_NAME + name_ = std::move(name); +#else + (void)name; +#endif + } + + void copy_from(const task_common& task) { + id_ = task.id_; +#ifndef TRANSWARP_DISABLE_TASK_NAME + name_ = task.name_; +#endif +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + priority_ = task.priority_; +#endif +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + custom_data_ = task.custom_data_; +#endif + if (task.has_result()) { + try { + make_future_functor::value>{}(future_, task.future_); + } catch (...) { + future_ = transwarp::detail::make_future_with_exception(std::current_exception()); + } + } + visited_ = task.visited_; + if (task.listeners_) { + listeners_.reset(new listeners_t(*task.listeners_)); + } + } + + std::size_t id_ = 0; +#ifndef TRANSWARP_DISABLE_TASK_NAME + transwarp::option_str name_; +#endif +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + std::int64_t priority_ = 0; +#endif +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + transwarp::any_data custom_data_; +#endif + std::shared_future future_; + bool visited_ = false; + std::unique_ptr listeners_; + std::unique_ptr tasks_; +}; + + +/// The base task class that contains the functionality that can be used +/// with all result types (void and non-void). +template +class task_impl_base : public transwarp::detail::task_common { +public: + /// The task type + using task_type = TaskType; + + /// The result type of this task + using result_type = ResultType; + + /// Can be called to explicitly finalize this task making this task + /// the terminal task in the graph. This is also done implicitly when + /// calling, e.g., any of the *_all methods. It should normally not be + /// necessary to call this method directly + void finalize() override { + if (!this->tasks_) { + this->tasks_.reset(new typename transwarp::detail::task_common::tasks_t); + visit(transwarp::detail::final_visitor{*this->tasks_}); + unvisit(); + auto compare = [](const transwarp::itask* const l, const transwarp::itask* const r) { + const std::size_t l_level = l->level(); + const std::size_t l_id = l->id(); + const std::size_t r_level = r->level(); + const std::size_t r_id = r->id(); + return std::tie(l_level, l_id) < std::tie(r_level, r_id); + }; + std::sort(this->tasks_->begin(), this->tasks_->end(), compare); + } + } + + /// The task's level + std::size_t level() const noexcept override { + return level_; + } + + /// The task's type + transwarp::task_type type() const noexcept override { + return type_; + } + + /// The task's executor (may be null) + std::shared_ptr executor() const noexcept override { + return executor_; + } + + /// Returns whether the associated task is canceled + bool canceled() const noexcept override { + return canceled_.load(); + } + + /// Returns the average idletime in microseconds (-1 if never set) + std::int64_t avg_idletime_us() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + return avg_idletime_us_.load(); +#else + return -1; +#endif + } + + /// Returns the average waittime in microseconds (-1 if never set) + std::int64_t avg_waittime_us() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + return avg_waittime_us_.load(); +#else + return -1; +#endif + } + + /// Returns the average runtime in microseconds (-1 if never set) + std::int64_t avg_runtime_us() const noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + return avg_runtime_us_.load(); +#else + return -1; +#endif + } + + /// Assigns an executor to this task which takes precedence over + /// the executor provided in schedule() or schedule_all() + void set_executor(std::shared_ptr executor) override { + this->ensure_task_not_running(); + if (!executor) { + throw transwarp::invalid_parameter{"executor pointer"}; + } + executor_ = std::move(executor); + } + + /// Assigns an executor to all tasks which takes precedence over + /// the executor provided in schedule() or schedule_all() + void set_executor_all(std::shared_ptr executor) override { + this->ensure_task_not_running(); + transwarp::detail::set_executor_visitor visitor{std::move(executor)}; + visit_all(visitor); + } + + /// Removes the executor from this task + void remove_executor() override { + this->ensure_task_not_running(); + executor_.reset(); + } + + /// Removes the executor from all tasks + void remove_executor_all() override { + this->ensure_task_not_running(); + transwarp::detail::remove_executor_visitor visitor; + visit_all(visitor); + } + + /// Schedules this task for execution on the caller thread. + /// The task-specific executor gets precedence if it exists. + /// This overload will reset the underlying future. + void schedule() override { + this->ensure_task_not_running(); + this->schedule_impl(true); + } + + /// Schedules this task for execution on the caller thread. + /// The task-specific executor gets precedence if it exists. + /// reset denotes whether schedule should reset the underlying + /// future and schedule even if the future is already valid. + void schedule(bool reset) override { + this->ensure_task_not_running(); + this->schedule_impl(reset); + } + + /// Schedules this task for execution using the provided executor. + /// The task-specific executor gets precedence if it exists. + /// This overload will reset the underlying future. + void schedule(transwarp::executor& executor) override { + this->ensure_task_not_running(); + this->schedule_impl(true, &executor); + } + + /// Schedules this task for execution using the provided executor. + /// The task-specific executor gets precedence if it exists. + /// reset denotes whether schedule should reset the underlying + /// future and schedule even if the future is already valid. + void schedule(transwarp::executor& executor, bool reset) override { + this->ensure_task_not_running(); + this->schedule_impl(reset, &executor); + } + + /// Schedules all tasks in the graph for execution on the caller thread. + /// The task-specific executors get precedence if they exist. + /// This overload will reset the underlying futures. + void schedule_all() override { + this->ensure_task_not_running(); + schedule_all_impl(true); + } + + /// Schedules all tasks in the graph for execution using the provided executor. + /// The task-specific executors get precedence if they exist. + /// This overload will reset the underlying futures. + void schedule_all(transwarp::executor& executor) override { + this->ensure_task_not_running(); + schedule_all_impl(true, &executor); + } + + /// Schedules all tasks in the graph for execution on the caller thread. + /// The task-specific executors get precedence if they exist. + /// reset_all denotes whether schedule_all should reset the underlying + /// futures and schedule even if the futures are already present. + void schedule_all(bool reset_all) override { + this->ensure_task_not_running(); + schedule_all_impl(reset_all); + } + + /// Schedules all tasks in the graph for execution using the provided executor. + /// The task-specific executors get precedence if they exist. + /// reset_all denotes whether schedule_all should reset the underlying + /// futures and schedule even if the futures are already present. + void schedule_all(transwarp::executor& executor, bool reset_all) override { + this->ensure_task_not_running(); + schedule_all_impl(reset_all, &executor); + } + + /// Assigns an exception to this task. Scheduling will have no effect after an exception + /// has been set. Calling reset() will remove the exception and re-enable scheduling. + void set_exception(std::exception_ptr exception) override { + this->ensure_task_not_running(); + this->future_ = transwarp::detail::make_future_with_exception(exception); + schedule_mode_ = false; + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Returns whether the task was scheduled and not reset afterwards. + /// This means that the underlying future is valid + bool was_scheduled() const noexcept override { + return this->future_.valid(); + } + + /// Waits for the task to complete. Should only be called if was_scheduled() + /// is true, throws transwarp::control_error otherwise + void wait() const override { + ensure_task_was_scheduled(); + this->future_.wait(); + } + + /// Returns whether the task has finished processing. Should only be called + /// if was_scheduled() is true, throws transwarp::control_error otherwise + bool is_ready() const override { + ensure_task_was_scheduled(); + return this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + /// Returns whether this task contains a result + bool has_result() const noexcept override { + return was_scheduled() && this->future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + /// Resets this task + void reset() override { + this->ensure_task_not_running(); + this->future_ = std::shared_future{}; + cancel(false); + schedule_mode_ = true; +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + refcount_ = childcount_; +#endif + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Resets all tasks in the graph + void reset_all() override { + this->ensure_task_not_running(); + transwarp::detail::reset_visitor visitor; + visit_all(visitor); + } + + /// If enabled then this task is canceled which will + /// throw transwarp::task_canceled when retrieving the task result. + /// Passing false is equivalent to resume. + void cancel(bool enabled) noexcept override { + canceled_ = enabled; + } + + /// If enabled then all pending tasks in the graph are canceled which will + /// throw transwarp::task_canceled when retrieving the task result. + /// Passing false is equivalent to resume. + void cancel_all(bool enabled) noexcept override { + transwarp::detail::cancel_visitor visitor{enabled}; + visit_all(visitor); + } + + /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. + /// This is only useful if something else is using the priority (e.g. a custom executor) + void set_priority_all(std::int64_t priority) override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + this->ensure_task_not_running(); + transwarp::detail::set_priority_visitor visitor{priority}; + visit_all(visitor); +#else + (void)priority; +#endif + } + + /// Resets the priority of all tasks to 0 + void reset_priority_all() override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + this->ensure_task_not_running(); + transwarp::detail::reset_priority_visitor visitor; + visit_all(visitor); +#endif + } + + /// Assigns custom data to all tasks. transwarp will not directly use this. + /// This is only useful if something else is using this custom data (e.g. a custom executor) + void set_custom_data_all(transwarp::any_data custom_data) override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + this->ensure_task_not_running(); + transwarp::detail::set_custom_data_visitor visitor{std::move(custom_data)}; + visit_all(visitor); +#else + (void)custom_data; +#endif + } + + /// Removes custom data from all tasks + void remove_custom_data_all() override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + this->ensure_task_not_running(); + transwarp::detail::remove_custom_data_visitor visitor; + visit_all(visitor); +#endif + } + + /// Adds a new listener for all event types and for all parents + void add_listener_all(std::shared_ptr listener) override { + this->ensure_task_not_running(); + transwarp::detail::add_listener_visitor visitor{std::move(listener)}; + visit_all(visitor); + } + + /// Adds a new listener for the given event type only and for all parents + void add_listener_all(transwarp::event_type event, std::shared_ptr listener) override { + this->ensure_task_not_running(); + transwarp::detail::add_listener_per_event_visitor visitor{event, std::move(listener)}; + visit_all(visitor); + } + + /// Removes the listener for all event types and for all parents + void remove_listener_all(const std::shared_ptr& listener) override { + this->ensure_task_not_running(); + transwarp::detail::remove_listener_visitor visitor{std::move(listener)}; + visit_all(visitor); + } + + /// Removes the listener for the given event type only and for all parents + void remove_listener_all(transwarp::event_type event, const std::shared_ptr& listener) override { + this->ensure_task_not_running(); + transwarp::detail::remove_listener_per_event_visitor visitor{event, std::move(listener)}; + visit_all(visitor); + } + + /// Removes all listeners and for all parents + void remove_listeners_all() override { + this->ensure_task_not_running(); + transwarp::detail::remove_listeners_visitor visitor; + visit_all(visitor); + } + + /// Removes all listeners for the given event type and for all parents + void remove_listeners_all(transwarp::event_type event) override { + this->ensure_task_not_running(); + transwarp::detail::remove_listeners_per_event_visitor visitor{event}; + visit_all(visitor); + } + + /// Returns the task's parents (may be empty) + std::vector parents() const override { + return transwarp::detail::parents::tasks(parents_); + } + + /// Returns all tasks in the graph in breadth order + const std::vector& tasks() override { + finalize(); + return *this->tasks_; + } + + /// Returns all edges in the graph. This is mainly for visualizing + /// the tasks and their interdependencies. Pass the result into transwarp::to_string + /// to retrieve a dot-style graph representation for easy viewing. + std::vector edges() override { + std::vector edges; + transwarp::detail::edges_visitor visitor{edges}; + visit_all(visitor); + return edges; + } + +protected: + + task_impl_base() {} + + template + task_impl_base(F&& functor, std::shared_ptr>... parents) + : functor_(new Functor(std::forward(functor))), + parents_(std::move(parents)...) + { + init(); + } + + template + task_impl_base(F&& functor, std::vector>> parents) + : functor_(new Functor(std::forward(functor))), + parents_(std::move(parents)) + { + init(); + if (parents_.empty()) { + set_type(transwarp::task_type::root); + } + } + + void init() { + set_type(task_type::value); + transwarp::detail::assign_task_if(*functor_, *this); + transwarp::detail::call_with_each(transwarp::detail::parent_visitor{*this}, parents_); + } + + template + friend class transwarp::detail::runner; + + template + friend R transwarp::detail::run_task(std::size_t, const std::weak_ptr&, A&&...); + + /// Assigns the given level + void set_level(std::size_t level) noexcept override { + level_ = level; + } + + /// Assigns the given type + void set_type(transwarp::task_type type) noexcept override { + type_ = type; + } + + /// Assigns the given idletime + void set_avg_idletime_us(std::int64_t idletime) noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + avg_idletime_us_ = idletime; +#else + (void)idletime; +#endif + } + + /// Assigns the given waittime + void set_avg_waittime_us(std::int64_t waittime) noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + avg_waittime_us_ = waittime; +#else + (void)waittime; +#endif + } + + /// Assigns the given runtime + void set_avg_runtime_us(std::int64_t runtime) noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_TIME + avg_runtime_us_ = runtime; +#else + (void)runtime; +#endif + } + + /// Checks if the task was scheduled and throws transwarp::control_error if it's not + void ensure_task_was_scheduled() const { + if (!this->future_.valid()) { + throw transwarp::control_error{"task was not scheduled: " + transwarp::to_string(*this, " ")}; + } + } + + /// Schedules this task for execution using the provided executor. + /// The task-specific executor gets precedence if it exists. + /// Runs the task on the same thread as the caller if neither the global + /// nor the task-specific executor is found. + void schedule_impl(bool reset, transwarp::executor* executor=nullptr) override { + if (schedule_mode_ && (reset || !this->future_.valid())) { + if (reset) { + cancel(false); + } +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + refcount_ = childcount_; +#endif + std::weak_ptr self = std::static_pointer_cast(this->shared_from_this()); + using runner_t = transwarp::detail::runner; + std::shared_ptr runner = std::shared_ptr(new runner_t(this->id(), self, parents_)); + this->raise_event(transwarp::event_type::before_scheduled); + this->future_ = runner->future(); + this->raise_event(transwarp::event_type::after_future_changed); + if (this->executor_) { + this->executor_->execute([runner]{ (*runner)(); }, *this); + } else if (executor) { + executor->execute([runner]{ (*runner)(); }, *this); + } else { + (*runner)(); + } + } + } + + /// Schedules all tasks in the graph for execution using the provided executor. + /// The task-specific executors get precedence if they exist. + /// Runs tasks on the same thread as the caller if neither the global + /// nor a task-specific executor is found. + void schedule_all_impl(bool reset_all, transwarp::executor* executor=nullptr) { + transwarp::detail::schedule_visitor visitor{reset_all, executor}; + visit_all(visitor); + } + + /// Visits each task in a depth-first traversal + void visit(const std::function& visitor) override { + if (!this->visited_) { + transwarp::detail::call_with_each(transwarp::detail::visit_visitor{visitor}, parents_); + visitor(*this); + this->visited_ = true; + } + } + + /// Traverses through each task and marks them as not visited. + void unvisit() noexcept override { + if (this->visited_) { + this->visited_ = false; + transwarp::detail::call_with_each(transwarp::detail::unvisit_visitor{}, parents_); + } + } + + /// Visits all tasks + template + void visit_all(Visitor& visitor) { + finalize(); + for (transwarp::itask* t : *this->tasks_) { + visitor(*t); + } + } + + void increment_childcount() noexcept override { +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + ++childcount_; +#endif + } + + void decrement_refcount() override { +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + if (--refcount_ == 0) { + this->raise_event(transwarp::event_type::after_satisfied); + } +#endif + } + + void reset_future() override { + this->future_ = std::shared_future{}; + this->raise_event(transwarp::event_type::after_future_changed); + } + + std::size_t level_ = 0; + transwarp::task_type type_ = transwarp::task_type::root; + std::shared_ptr executor_; + std::atomic canceled_{false}; + bool schedule_mode_ = true; +#ifndef TRANSWARP_DISABLE_TASK_TIME + std::atomic avg_idletime_us_{-1}; + std::atomic avg_waittime_us_{-1}; + std::atomic avg_runtime_us_{-1}; +#endif + std::unique_ptr functor_; + typename transwarp::detail::parents::type parents_; +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + std::size_t childcount_ = 0; + std::atomic refcount_{0}; +#endif +}; + + +/// A task proxy +template +class task_impl_proxy : public transwarp::detail::task_impl_base { +public: + /// The task type + using task_type = TaskType; + + /// The result type of this task + using result_type = ResultType; + + /// Assigns a value to this task. Scheduling will have no effect after a value + /// has been set. Calling reset() will remove the value and re-enable scheduling. + void set_value(const typename transwarp::decay::type& value) override { + this->ensure_task_not_running(); + this->future_ = transwarp::detail::make_future_with_value(value); + this->schedule_mode_ = false; + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Assigns a value to this task. Scheduling will have no effect after a value + /// has been set. Calling reset() will remove the value and re-enable scheduling. + void set_value(typename transwarp::decay::type&& value) override { + this->ensure_task_not_running(); + this->future_ = transwarp::detail::make_future_with_value(std::move(value)); + this->schedule_mode_ = false; + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Returns the result of this task. Throws any exceptions that the underlying + /// functor throws. Should only be called if was_scheduled() is true, + /// throws transwarp::control_error otherwise + typename transwarp::result::type get() const override { + this->ensure_task_was_scheduled(); + return this->future_.get(); + } + +protected: + + task_impl_proxy() = default; + + template + task_impl_proxy(F&& functor, std::shared_ptr>... parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)...) + {} + + template + task_impl_proxy(F&& functor, std::vector>> parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)) + {} + +}; + +/// A task proxy for reference result type. +template +class task_impl_proxy : public transwarp::detail::task_impl_base { +public: + /// The task type + using task_type = TaskType; + + /// The result type of this task + using result_type = ResultType&; + + /// Assigns a value to this task. Scheduling will have no effect after a value + /// has been set. Calling reset() will remove the value and re-enable scheduling. + void set_value(typename transwarp::decay::type& value) override { + this->ensure_task_not_running(); + this->future_ = transwarp::detail::make_future_with_value(value); + this->schedule_mode_ = false; + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Returns the result of this task. Throws any exceptions that the underlying + /// functor throws. Should only be called if was_scheduled() is true, + /// throws transwarp::control_error otherwise + typename transwarp::result::type get() const override { + this->ensure_task_was_scheduled(); + return this->future_.get(); + } + +protected: + + task_impl_proxy() = default; + + template + task_impl_proxy(F&& functor, std::shared_ptr>... parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)...) + {} + + template + task_impl_proxy(F&& functor, std::vector>> parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)) + {} + +}; + +/// A task proxy for void result type. +template +class task_impl_proxy : public transwarp::detail::task_impl_base { +public: + /// The task type + using task_type = TaskType; + + /// The result type of this task + using result_type = void; + + /// Assigns a value to this task. Scheduling will have no effect after a call + /// to this. Calling reset() will reset this and re-enable scheduling. + void set_value() override { + this->ensure_task_not_running(); + this->future_ = transwarp::detail::make_ready_future(); + this->schedule_mode_ = false; + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Blocks until the task finishes. Throws any exceptions that the underlying + /// functor throws. Should only be called if was_scheduled() is true, + /// throws transwarp::control_error otherwise + void get() const override { + this->ensure_task_was_scheduled(); + this->future_.get(); + } + +protected: + + task_impl_proxy() = default; + + template + task_impl_proxy(F&& functor, std::shared_ptr>... parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)...) + {} + + template + task_impl_proxy(F&& functor, std::vector>> parents) + : transwarp::detail::task_impl_base(std::forward(functor), std::move(parents)) + {} + +}; + +} // detail + + +/// A task representing a piece of work given by functor and parent tasks. +/// By connecting tasks a directed acyclic graph is built. +/// Tasks should be created using the make_task factory functions. +template +class task_impl : public transwarp::detail::task_impl_proxy::type, TaskType, Functor, ParentResults...> { +public: + /// The task type + using task_type = TaskType; + + /// The result type of this task + using result_type = typename transwarp::detail::functor_result::type; + + /// A task is defined by functor and parent tasks. + /// Note: Don't use this constructor directly, use transwarp::make_task + template + task_impl(F&& functor, std::shared_ptr>... parents) + : transwarp::detail::task_impl_proxy(std::forward(functor), std::move(parents)...) + {} + + /// A task is defined by functor and parent tasks. + /// Note: Don't use this constructor directly, use transwarp::make_task + template + task_impl(F&& functor, std::vector>> parents) + : transwarp::detail::task_impl_proxy(std::forward(functor), std::move(parents)) + {} + + // delete copy/move semantics + task_impl(const task_impl&) = delete; + task_impl& operator=(const task_impl&) = delete; + task_impl(task_impl&&) = delete; + task_impl& operator=(task_impl&&) = delete; + + /// Gives this task a name and returns a ptr to itself + std::shared_ptr named(std::string name) { +#ifndef TRANSWARP_DISABLE_TASK_NAME +#ifdef TRANSWARP_CPP11 + this->set_name(transwarp::option_str{std::move(name)}); +#else + this->set_name(std::make_optional(std::move(name))); +#endif +#else + (void)name; +#endif + return std::static_pointer_cast(this->shared_from_this()); + } + + /// Creates a continuation to this task + template + auto then(TaskType_, Functor_&& functor) -> std::shared_ptr::type, result_type>> { + using task_t = transwarp::task_impl::type, result_type>; + return std::shared_ptr(new task_t(std::forward(functor), std::static_pointer_cast(this->shared_from_this()))); + } + + /// Clones this task and casts the result to a ptr to task_impl + std::shared_ptr clone_cast() const { + return std::static_pointer_cast(this->clone()); + } + +private: + + task_impl() = default; + + std::shared_ptr> clone_impl(std::unordered_map, std::shared_ptr>& task_cache) const override { + auto t = std::shared_ptr{new task_impl}; + t->copy_from(*this); + t->level_ = this->level_; + t->type_ = this->type_; + t->executor_ = this->executor_; + t->canceled_ = this->canceled_.load(); + t->schedule_mode_ = this->schedule_mode_; +#ifndef TRANSWARP_DISABLE_TASK_TIME + t->avg_idletime_us_ = this->avg_idletime_us_.load(); + t->avg_waittime_us_ = this->avg_waittime_us_.load(); + t->avg_runtime_us_ = this->avg_runtime_us_.load(); +#endif + t->functor_.reset(new Functor(*this->functor_)); + t->parents_ = transwarp::detail::parents::clone(task_cache, this->parents_); + t->executor_ = this->executor_; +#ifndef TRANSWARP_DISABLE_TASK_REFCOUNT + t->childcount_ = this->childcount_; +#endif + return t; + } + +}; + + +/// A value task that stores a single value and doesn't require scheduling. +/// Value tasks should be created using the make_value_task factory functions. +template +class value_task : public transwarp::detail::task_common { +public: + /// The task type + using task_type = transwarp::root_type; + + /// The result type of this task + using result_type = ResultType; + + /// A value task is defined by a given value. + /// Note: Don't use this constructor directly, use transwarp::make_value_task + template + value_task(T&& value) + { + this->future_ = transwarp::detail::make_future_with_value(std::forward(value)); + this->tasks_.reset(new typename transwarp::detail::task_common::tasks_t{this}); + } + + // delete copy/move semantics + value_task(const value_task&) = delete; + value_task& operator=(const value_task&) = delete; + value_task(value_task&&) = delete; + value_task& operator=(value_task&&) = delete; + + /// Gives this task a name and returns a ptr to itself + std::shared_ptr named(std::string name) { +#ifndef TRANSWARP_DISABLE_TASK_NAME +#ifdef TRANSWARP_CPP11 + this->set_name(transwarp::option_str{std::move(name)}); +#else + this->set_name(std::make_optional(std::move(name))); +#endif +#else + (void)name; +#endif + return std::static_pointer_cast(this->shared_from_this()); + } + + /// Creates a continuation to this task + template + auto then(TaskType_, Functor_&& functor) -> std::shared_ptr::type, result_type>> { + using task_t = transwarp::task_impl::type, result_type>; + return std::shared_ptr(new task_t(std::forward(functor), std::static_pointer_cast(this->shared_from_this()))); + } + + /// Clones this task and casts the result to a ptr to value_task + std::shared_ptr clone_cast() const { + return std::static_pointer_cast(this->clone()); + } + + /// Nothing to be done to finalize a value task + void finalize() override {} + + /// The task's level + std::size_t level() const noexcept override { + return 0; + } + + /// The task's type + transwarp::task_type type() const noexcept override { + return transwarp::task_type::root; + } + + /// Value tasks don't have executors as they don't run + std::shared_ptr executor() const noexcept override { + return nullptr; + } + + /// Value tasks cannot be canceled + bool canceled() const noexcept override { + return false; + } + + /// Returns -1 as value tasks don't run + std::int64_t avg_idletime_us() const noexcept override { + return -1; + } + + /// Returns -1 as value tasks don't run + std::int64_t avg_waittime_us() const noexcept override { + return -1; + } + + /// Returns -1 as value tasks don't run + std::int64_t avg_runtime_us() const noexcept override { + return -1; + } + + /// No-op because a value task never runs + void set_executor(std::shared_ptr) override {} + + /// No-op because a value task never runs and doesn't have parents + void set_executor_all(std::shared_ptr) override {} + + /// No-op because a value task never runs + void remove_executor() override {} + + /// No-op because a value task never runs and doesn't have parents + void remove_executor_all() override {} + + /// Sets a priority to all tasks (defaults to 0). transwarp will not directly use this. + /// This is only useful if something else is using the priority + void set_priority_all(std::int64_t priority) override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + this->set_priority(priority); +#else + (void)priority; +#endif + } + + /// Resets the priority of all tasks to 0 + void reset_priority_all() override { +#ifndef TRANSWARP_DISABLE_TASK_PRIORITY + this->reset_priority(); +#endif + } + + /// Assigns custom data to all tasks. transwarp will not directly use this. + /// This is only useful if something else is using this custom data + void set_custom_data_all(transwarp::any_data custom_data) override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + this->set_custom_data(std::move(custom_data)); +#else + (void)custom_data; +#endif + } + + /// Removes custom data from all tasks + void remove_custom_data_all() override { +#ifndef TRANSWARP_DISABLE_TASK_CUSTOM_DATA + this->remove_custom_data(); +#endif + } + + /// No-op because a value task never runs + void schedule() override {} + + /// No-op because a value task never runs + void schedule(transwarp::executor&) override {} + + /// No-op because a value task never runs + void schedule(bool) override {} + + /// No-op because a value task never runs + void schedule(transwarp::executor&, bool) override {} + + /// No-op because a value task never runs and doesn't have parents + void schedule_all() override {} + + /// No-op because a value task never runs and doesn't have parents + void schedule_all(transwarp::executor&) override {} + + /// No-op because a value task never runs and doesn't have parents + void schedule_all(bool) override {} + + /// No-op because a value task never runs and doesn't have parents + void schedule_all(transwarp::executor&, bool) override {} + + /// Assigns a value to this task + void set_value(const typename transwarp::decay::type& value) override { + this->future_ = transwarp::detail::make_future_with_value(value); + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Assigns a value to this task + void set_value(typename transwarp::decay::type&& value) override { + this->future_ = transwarp::detail::make_future_with_value(std::move(value)); + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Assigns an exception to this task + void set_exception(std::exception_ptr exception) override { + this->future_ = transwarp::detail::make_future_with_exception(exception); + this->raise_event(transwarp::event_type::after_future_changed); + } + + /// Returns the value of this task. Throws an exception if this task has an exception assigned to it + typename transwarp::result::type get() const override { + return this->future_.get(); + } + + /// Returns true because a value task is scheduled once on construction + bool was_scheduled() const noexcept override { + return true; + } + + /// No-op because a value task never runs + void wait() const override {} + + /// Returns true because a value task is always ready + bool is_ready() const override { + return true; + } + + /// Returns true because a value task always contains a result + bool has_result() const noexcept override { + return true; + } + + /// No-op because a value task never runs + void reset() override {} + + /// No-op because a value task never runs and doesn't have parents + void reset_all() override {} + + /// No-op because a value task never runs + void cancel(bool) noexcept override {} + + /// No-op because a value task never runs and doesn't have parents + void cancel_all(bool) noexcept override {} + + /// Adds a new listener for all event types and for all parents + void add_listener_all(std::shared_ptr listener) override { + this->add_listener(listener); + } + + /// Adds a new listener for the given event type only and for all parents + void add_listener_all(transwarp::event_type event, std::shared_ptr listener) override { + this->add_listener(event, listener); + } + + /// Removes the listener for all event types and for all parents + void remove_listener_all(const std::shared_ptr& listener) override { + this->remove_listener(listener); + } + + /// Removes the listener for the given event type only and for all parents + void remove_listener_all(transwarp::event_type event, const std::shared_ptr& listener) override { + this->remove_listener(event, listener); + } + + /// Removes all listeners and for all parents + void remove_listeners_all() override { + this->remove_listeners(); + } + + /// Removes all listeners for the given event type and for all parents + void remove_listeners_all(transwarp::event_type event) override { + this->remove_listeners(event); + } + + /// Empty because a value task doesn't have parents + std::vector parents() const override { + return {}; + } + + /// Returns all tasks in the graph in breadth order + const std::vector& tasks() override { + return *this->tasks_; + } + + /// Returns empty edges because a value task doesn't have parents + std::vector edges() override { + return {}; + } + +private: + + value_task() + { + this->tasks_.reset(new typename transwarp::detail::task_common::tasks_t{this}); + } + + std::shared_ptr> clone_impl(std::unordered_map, std::shared_ptr>&) const override { + auto t = std::shared_ptr(new value_task); + t->copy_from(*this); + return t; + } + + /// No-op as value tasks are always at level 0 + void set_level(std::size_t) noexcept override {} + + /// No-op as value tasks are always root tasks + void set_type(transwarp::task_type) noexcept override {} + + /// No-op as value tasks don't run + void set_avg_idletime_us(std::int64_t) noexcept override {} + + /// No-op as value tasks don't run + void set_avg_waittime_us(std::int64_t) noexcept override {} + + /// No-op as value tasks don't run + void set_avg_runtime_us(std::int64_t) noexcept override {} + + /// No-op because a value task never runs + void schedule_impl(bool, transwarp::executor*) override {} + + /// Visits this task + void visit(const std::function& visitor) override { + if (!this->visited_) { + visitor(*this); + this->visited_ = true; + } + } + + /// Marks this task as not visited + void unvisit() noexcept override { + this->visited_ = false; + } + + void increment_childcount() noexcept override {} + + void decrement_refcount() override {} + + void reset_future() override {} + +}; + + +/// A factory function to create a new task +template +auto make_task(TaskType, Functor&& functor, std::shared_ptr... parents) -> std::shared_ptr::type, typename Parents::result_type...>> { + using task_t = transwarp::task_impl::type, typename Parents::result_type...>; + return std::shared_ptr(new task_t(std::forward(functor), std::move(parents)...)); +} + + +/// A factory function to create a new task with vector parents +template +auto make_task(TaskType, Functor&& functor, std::vector parents) -> std::shared_ptr::type, std::vector>> { + using task_t = transwarp::task_impl::type, std::vector>; + return std::shared_ptr(new task_t(std::forward(functor), std::move(parents))); +} + + +/// A factory function to create a new value task +template +auto make_value_task(Value&& value) -> std::shared_ptr::type>> { + using task_t = transwarp::value_task::type>; + return std::shared_ptr(new task_t(std::forward(value))); +} + + +/// Casts from task interface to sub-class task +template +transwarp::task* cast(transwarp::itask* task) { + auto casted = dynamic_cast*>(task); + if (!casted) { + throw transwarp::invalid_parameter{"invalid cast"}; + } + return casted; +} + + +/// Casts from task interface to sub-class task +template +const transwarp::task* cast(const transwarp::itask* task) { + auto casted = dynamic_cast*>(task); + if (!casted) { + throw transwarp::invalid_parameter{"invalid cast"}; + } + return casted; +} + + +/// A function similar to std::for_each but returning a transwarp task for +/// deferred, possibly asynchronous execution. This function creates a graph +/// with std::distance(first, last) root tasks +template +auto for_each(InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr>>>> { + const auto distance = std::distance(first, last); + if (distance <= 0) { + throw transwarp::invalid_parameter{"first or last"}; + } + std::vector>> tasks; + tasks.reserve(static_cast(distance)); + for (; first != last; ++first) { + tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first]{ unary_op(*first); })); + } + return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks); +} + +/// A function similar to std::for_each but returning a transwarp task for +/// deferred, possibly asynchronous execution. This function creates a graph +/// with std::distance(first, last) root tasks. +/// Overload for automatic scheduling by passing an executor. +template +auto for_each(transwarp::executor& executor, InputIt first, InputIt last, UnaryOperation unary_op) -> std::shared_ptr>>>> { + auto task = transwarp::for_each(first, last, unary_op); + task->schedule_all(executor); + return task; +} + + +/// A function similar to std::transform but returning a transwarp task for +/// deferred, possibly asynchronous execution. This function creates a graph +/// with std::distance(first1, last1) root tasks +template +auto transform(InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr>>>> { + const auto distance = std::distance(first1, last1); + if (distance <= 0) { + throw transwarp::invalid_parameter{"first1 or last1"}; + } + std::vector>> tasks; + tasks.reserve(static_cast(distance)); + for (; first1 != last1; ++first1, ++d_first) { + tasks.push_back(transwarp::make_task(transwarp::root, [unary_op,first1,d_first]{ *d_first = unary_op(*first1); })); + } + return transwarp::make_task(transwarp::wait, transwarp::no_op, tasks); +} + +/// A function similar to std::transform but returning a transwarp task for +/// deferred, possibly asynchronous execution. This function creates a graph +/// with std::distance(first1, last1) root tasks. +/// Overload for automatic scheduling by passing an executor. +template +auto transform(transwarp::executor& executor, InputIt first1, InputIt last1, OutputIt d_first, UnaryOperation unary_op) -> std::shared_ptr>>>> { + auto task = transwarp::transform(first1, last1, d_first, unary_op); + task->schedule_all(executor); + return task; +} + + +/// A task pool that allows running multiple instances of the same task in parallel. +template +class task_pool { +public: + + /// Constructs a task pool + task_pool(std::shared_ptr> task, + std::size_t minimum_size, + std::size_t maximum_size) + : task_(std::move(task)), + minimum_(minimum_size), + maximum_(maximum_size), + finished_(maximum_size) + { + if (minimum_ < 1) { + throw transwarp::invalid_parameter{"minimum size"}; + } + if (minimum_ > maximum_) { + throw transwarp::invalid_parameter{"minimum or maximum size"}; + } + task_->add_listener(transwarp::event_type::after_finished, listener_); + for (std::size_t i=0; iclone()); + } + } + + /// Constructs a task pool with reasonable defaults for minimum and maximum + explicit + task_pool(std::shared_ptr> task) + : task_pool(std::move(task), 32, 65536) + {} + + // delete copy/move semantics + task_pool(const task_pool&) = delete; + task_pool& operator=(const task_pool&) = delete; + task_pool(task_pool&&) = delete; + task_pool& operator=(task_pool&&) = delete; + + /// Returns the next idle task. + /// If there are no idle tasks then it will attempt to double the + /// pool size. If that fails then it will return a nullptr. On successful + /// retrieval of an idle task the function will mark that task as busy. + std::shared_ptr> next_task(bool maybe_resize=true) { + const transwarp::itask* finished_task{}; + { + std::lock_guard lock{spinlock_}; + if (!finished_.empty()) { + finished_task = finished_.front(); finished_.pop(); + } + } + + std::shared_ptr> task; + if (finished_task) { + task = busy_.find(finished_task)->second; + } else { + if (maybe_resize && idle_.empty()) { + resize(size() * 2); // double pool size + } + if (idle_.empty()) { + return nullptr; + } + task = idle_.front(); idle_.pop(); + busy_.emplace(task.get(), task); + } + + auto future = task->future(); + if (future.valid()) { + future.wait(); // will return immediately + } + return task; + } + + /// Just like next_task() but waits for a task to become available. + /// The returned task will always be a valid pointer + std::shared_ptr> wait_for_next_task(bool maybe_resize=true) { + for (;;) { + std::shared_ptr> g = next_task(maybe_resize); + if (g) { + return g; + } + } + } + + /// Returns the current total size of the pool (sum of idle and busy tasks) + std::size_t size() const { + return idle_.size() + busy_.size(); + } + + /// Returns the minimum size of the pool + std::size_t minimum_size() const { + return minimum_; + } + + /// Returns the maximum size of the pool + std::size_t maximum_size() const { + return maximum_; + } + + /// Returns the number of idle tasks in the pool + std::size_t idle_count() const { + std::lock_guard lock{spinlock_}; + return idle_.size() + finished_.size(); + } + + /// Returns the number of busy tasks in the pool + std::size_t busy_count() const { + std::lock_guard lock{spinlock_}; + return busy_.size() - finished_.size(); + } + + /// Resizes the task pool to the given new size if possible + void resize(std::size_t new_size) { + reclaim(); + if (new_size > size()) { // grow + const std::size_t count = new_size - size(); + for (std::size_t i=0; iclone()); + } + } else if (new_size < size()) { // shrink + const std::size_t count = size() - new_size; + for (std::size_t i=0; i lock{spinlock_}; + finished_.swap(finished); + } + while (!finished.empty()) { + const transwarp::itask* task = finished.front(); finished.pop(); + const auto it = busy_.find(task); + idle_.push(it->second); + busy_.erase(it); + } + } + +private: + + class finished_listener : public transwarp::listener { + public: + + explicit + finished_listener(task_pool& pool) + : pool_(pool) + {} + + // Called on a potentially high-priority thread + void handle_event(transwarp::event_type, transwarp::itask& task) override { + std::lock_guard lock{pool_.spinlock_}; + pool_.finished_.push(static_cast(&task)); + } + + private: + task_pool& pool_; + }; + + std::shared_ptr> task_; + std::size_t minimum_; + std::size_t maximum_; + mutable transwarp::detail::spinlock spinlock_; // protecting finished_ + transwarp::detail::circular_buffer finished_; + std::queue>> idle_; + std::unordered_map>> busy_; + std::shared_ptr listener_{new finished_listener(*this)}; +}; + + +/// A timer that tracks the average idle, wait, and run time of each task it listens to. +/// - idle = time between scheduling and starting the task (executor dependent) +/// - wait = time between starting and invoking the task's functor, i.e. wait for parent tasks to finish +/// - run = time between invoking and finishing the task's computations +class timer : public transwarp::listener { +public: + timer() = default; + + // delete copy/move semantics + timer(const timer&) = delete; + timer& operator=(const timer&) = delete; + timer(timer&&) = delete; + timer& operator=(timer&&) = delete; + + /// Performs the actual timing and populates the task's timing members + void handle_event(const transwarp::event_type event, transwarp::itask& task) override { + switch (event) { + case transwarp::event_type::before_scheduled: { + const std::chrono::time_point now = std::chrono::steady_clock::now(); + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + track.startidle = now; + } + break; + case transwarp::event_type::before_started: { + const std::chrono::time_point now = std::chrono::steady_clock::now(); + track_idletime(task, now); + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + track.startwait = now; + } + break; + case transwarp::event_type::after_canceled: { + const std::chrono::time_point now = std::chrono::steady_clock::now(); + track_waittime(task, now); + } + break; + case transwarp::event_type::before_invoked: { + const std::chrono::time_point now = std::chrono::steady_clock::now(); + track_waittime(task, now); + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + track.running = true; + track.startrun = now; + } + break; + case transwarp::event_type::after_finished: { + const std::chrono::time_point now = std::chrono::steady_clock::now(); + track_runtime(task, now); + } + break; + default: break; + } + } + + /// Resets all timing information + void reset() { + std::lock_guard lock{spinlock_}; + tracks_.clear(); + } + +private: + + void track_idletime(transwarp::itask& task, const std::chrono::time_point& now) { + std::int64_t avg_idletime_us; + { + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + track.idletime += std::chrono::duration_cast(now - track.startidle).count(); + ++track.idlecount; + avg_idletime_us = static_cast(track.idletime / track.idlecount); + } + task.set_avg_idletime_us(avg_idletime_us); + } + + void track_waittime(transwarp::itask& task, const std::chrono::time_point& now) { + std::int64_t avg_waittime_us; + { + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + track.waittime += std::chrono::duration_cast(now - track.startwait).count(); + ++track.waitcount; + avg_waittime_us = static_cast(track.waittime / track.waitcount); + } + task.set_avg_waittime_us(avg_waittime_us); + } + + void track_runtime(transwarp::itask& task, const std::chrono::time_point& now) { + std::int64_t avg_runtime_us; + { + std::lock_guard lock{spinlock_}; + auto& track = tracks_[&task]; + if (!track.running) { + return; + } + track.running = false; + track.runtime += std::chrono::duration_cast(now - track.startrun).count(); + ++track.runcount; + avg_runtime_us = static_cast(track.runtime / track.runcount); + } + task.set_avg_runtime_us(avg_runtime_us); + } + + struct track { + bool running = false; + std::chrono::time_point startidle; + std::chrono::time_point startwait; + std::chrono::time_point startrun; + std::chrono::microseconds::rep idletime = 0; + std::chrono::microseconds::rep idlecount = 0; + std::chrono::microseconds::rep waittime = 0; + std::chrono::microseconds::rep waitcount = 0; + std::chrono::microseconds::rep runtime = 0; + std::chrono::microseconds::rep runcount = 0; + }; + + transwarp::detail::spinlock spinlock_; // protecting tracks_ + std::unordered_map tracks_; +}; + + +/// The releaser will release a task's future when the task's `after_satisfied` +/// event was received which happens when all children received the task's result. +/// The releaser should be used in cases where the task's result is only needed +/// for consumption by its children and can then be discarded. +class releaser : public transwarp::listener { +public: + releaser() = default; + + /// The executor gives control over where a task's future is released + explicit releaser(std::shared_ptr executor) + : executor_(std::move(executor)) + {} + + // delete copy/move semantics + releaser(const releaser&) = delete; + releaser& operator=(const releaser&) = delete; + releaser(releaser&&) = delete; + releaser& operator=(releaser&&) = delete; + + void handle_event(const transwarp::event_type event, transwarp::itask& task) override { + if (event == transwarp::event_type::after_satisfied) { + if (executor_) { + executor_->execute([&task]{ task.reset_future(); }, task); + } else { + task.reset_future(); + } + } + } + +private: + std::shared_ptr executor_; +}; + + +} // transwarp diff --git a/src/servers.cpp b/src/servers.cpp index f690ea5..3f9741e 100644 --- a/src/servers.cpp +++ b/src/servers.cpp @@ -7,10 +7,11 @@ #include "services.hpp" #include "config.hpp" #include "templates.hpp" +#include "contrib/transwarp.hpp" + #include #include #include -#include #include namespace dropshell { @@ -49,14 +50,15 @@ std::vector get_configured_servers() { return servers; } +// https://github.com/bloomen/transwarp?tab=readme-ov-file#range-functions void list_servers() { auto servers = get_configured_servers(); tableprint tp("All DropShell Servers"); tp.add_row({"Name", "Address", "Health", "Ports"}); - std::for_each(std::execution::par, servers.begin(), servers.end(), [&](const ServerInfo& server) { - + transwarp::parallel exec{servers.size()}; + auto task = transwarp::for_each(exec, servers.begin(), servers.end(), [&](const ServerInfo& server) { std::map status = service_runner::get_all_services_status(server.name); std::set ports_used; @@ -71,6 +73,7 @@ void list_servers() { tp.add_row({server.name, server.ssh_host, serviceticks, ports_used_str}); }); + task->wait(); tp.print(); } diff --git a/src/utils/hash.cpp b/src/utils/hash.cpp index 50e40f0..3fdeca7 100644 --- a/src/utils/hash.cpp +++ b/src/utils/hash.cpp @@ -11,20 +11,21 @@ namespace dropshell { uint64_t hash_file(const std::string &path) { // Create hash state - XXH3_state_t* const state = XXH3_createState(); + XXH64_state_t* const state = XXH64_createState(); if (state == nullptr) { std::cerr << "Failed to create hash state" << std::endl; return 0; } // Initialize state with seed 0 - XXH3_64bits_reset(state); + XXH64_hash_t const seed = 0; /* or any other value */ + if (XXH64_reset(state, seed) == XXH_ERROR) return 0; // Open file std::ifstream file(path, std::ios::binary); if (!file.is_open()) { std::cerr << "Failed to open file: " << path << std::endl; - XXH3_freeState(state); + XXH64_freeState(state); return 0; } @@ -32,40 +33,41 @@ uint64_t hash_file(const std::string &path) { const size_t buffer_size = 4096; char buffer[buffer_size]; while (file.read(buffer, buffer_size)) { - if (XXH3_64bits_update(state, buffer, file.gcount()) == XXH_ERROR) { + if (XXH64_update(state, buffer, file.gcount()) == XXH_ERROR) { std::cerr << "Failed to update hash" << std::endl; - XXH3_freeState(state); + XXH64_freeState(state); return 0; } } // Handle any remaining bytes if (file.gcount() > 0) { - if (XXH3_64bits_update(state, buffer, file.gcount()) == XXH_ERROR) { + if (XXH64_update(state, buffer, file.gcount()) == XXH_ERROR) { std::cerr << "Failed to update hash" << std::endl; - XXH3_freeState(state); + XXH64_freeState(state); return 0; } } // Get final hash - XXH64_hash_t hash = XXH3_64bits_digest(state); - XXH3_freeState(state); + XXH64_hash_t hash = XXH64_digest(state); + XXH64_freeState(state); return hash; } uint64_t hash_directory_recursive(const std::string &path) { // Create hash state - XXH3_state_t* const state = XXH3_createState(); + XXH64_state_t* const state = XXH64_createState(); if (state == nullptr) { std::cerr << "Failed to create hash state" << std::endl; return 0; } // Initialize state with seed 0 - if (XXH3_64bits_reset(state) == XXH_ERROR) { + XXH64_hash_t const seed = 0; /* or any other value */ + if (XXH64_reset(state, seed) == XXH_ERROR) { std::cerr << "Failed to reset hash state" << std::endl; - XXH3_freeState(state); + XXH64_freeState(state); return 0; } @@ -75,18 +77,18 @@ uint64_t hash_directory_recursive(const std::string &path) { if (entry.is_regular_file()) { // Get file hash XXH64_hash_t file_hash = hash_file(entry.path().string()); - XXH3_64bits_update(state, &file_hash, sizeof(file_hash)); + XXH64_update(state, &file_hash, sizeof(file_hash)); } } } catch (const std::filesystem::filesystem_error& e) { std::cerr << "Filesystem error: " << e.what() << std::endl; - XXH3_freeState(state); + XXH64_freeState(state); return 0; } // Get final hash - XXH64_hash_t hash = XXH3_64bits_digest(state); - XXH3_freeState(state); + XXH64_hash_t hash = XXH64_digest(state); + XXH64_freeState(state); return hash; }