mirror of
https://github.com/Relintai/rcpp_framework.git
synced 2024-11-14 04:57:21 +01:00
Removed coroutine.h.
This commit is contained in:
parent
d1196eb051
commit
18f1bc2496
@ -25,30 +25,9 @@
|
||||
#include <future>
|
||||
#include <memory>
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
#include <drogon/utils/coroutine.h>
|
||||
#endif
|
||||
|
||||
namespace drogon {
|
||||
class HttpClient;
|
||||
using HttpClientPtr = std::shared_ptr<HttpClient>;
|
||||
#ifdef __cpp_impl_coroutine
|
||||
namespace internal {
|
||||
struct HttpRespAwaiter : public CallbackAwaiter<HttpResponsePtr> {
|
||||
HttpRespAwaiter(HttpClient *client, HttpRequestPtr req, double timeout) :
|
||||
client_(client), req_(std::move(req)), timeout_(timeout) {
|
||||
}
|
||||
|
||||
void await_suspend(std::coroutine_handle<> handle);
|
||||
|
||||
private:
|
||||
HttpClient *client_;
|
||||
HttpRequestPtr req_;
|
||||
double timeout_;
|
||||
};
|
||||
|
||||
} // namespace internal
|
||||
#endif
|
||||
|
||||
/// Asynchronous http client
|
||||
/**
|
||||
@ -133,25 +112,6 @@ public:
|
||||
return f.get();
|
||||
}
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
/**
|
||||
* @brief Send a request via coroutines to the server and return an
|
||||
* awaiter what could be `co_await`-ed to retrieve the response
|
||||
* (HttpResponsePtr)
|
||||
*
|
||||
* @param req
|
||||
* @param timeout In seconds. If the response is not received within the
|
||||
* timeout, A `std::runtime_error` with the message "Timeout" is thrown.
|
||||
* The zero value by default disables the timeout.
|
||||
*
|
||||
* @return internal::HttpRespAwaiter. Await on it to get the response
|
||||
*/
|
||||
internal::HttpRespAwaiter sendRequestCoro(HttpRequestPtr req,
|
||||
double timeout = 0) {
|
||||
return internal::HttpRespAwaiter(this, std::move(req), timeout);
|
||||
}
|
||||
#endif
|
||||
|
||||
/// Set the pipelining depth, which is the number of requests that are not
|
||||
/// responding.
|
||||
/**
|
||||
@ -272,34 +232,4 @@ protected:
|
||||
HttpClient() = default;
|
||||
};
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
inline void internal::HttpRespAwaiter::await_suspend(
|
||||
std::coroutine_handle<> handle) {
|
||||
assert(client_ != nullptr);
|
||||
assert(req_ != nullptr);
|
||||
client_->sendRequest(
|
||||
req_,
|
||||
[handle = std::move(handle), this](ReqResult result,
|
||||
const HttpResponsePtr &resp) {
|
||||
if (result == ReqResult::Ok)
|
||||
setValue(resp);
|
||||
else {
|
||||
std::string reason;
|
||||
if (result == ReqResult::BadResponse)
|
||||
reason = "BadResponse";
|
||||
else if (result == ReqResult::NetworkFailure)
|
||||
reason = "NetworkFailure";
|
||||
else if (result == ReqResult::BadServerAddress)
|
||||
reason = "BadServerAddress";
|
||||
else if (result == ReqResult::Timeout)
|
||||
reason = "Timeout";
|
||||
setException(
|
||||
std::make_exception_ptr(std::runtime_error(reason)));
|
||||
}
|
||||
handle.resume();
|
||||
},
|
||||
timeout_);
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace drogon
|
||||
|
@ -18,9 +18,6 @@
|
||||
#include <http/HttpResponse.h>
|
||||
#include <http/HttpTypes.h>
|
||||
#include <drogon/WebSocketConnection.h>
|
||||
#ifdef __cpp_impl_coroutine
|
||||
#include <drogon/utils/coroutine.h>
|
||||
#endif
|
||||
#include <trantor/net/EventLoop.h>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -32,23 +29,6 @@ using WebSocketClientPtr = std::shared_ptr<WebSocketClient>;
|
||||
using WebSocketRequestCallback = std::function<
|
||||
void(ReqResult, const HttpResponsePtr &, const WebSocketClientPtr &)>;
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
namespace internal {
|
||||
struct WebSocketConnectionAwaiter : public CallbackAwaiter<HttpResponsePtr> {
|
||||
WebSocketConnectionAwaiter(WebSocketClient *client, HttpRequestPtr req) :
|
||||
client_(client), req_(std::move(req)) {
|
||||
}
|
||||
|
||||
void await_suspend(std::coroutine_handle<> handle);
|
||||
|
||||
private:
|
||||
WebSocketClient *client_;
|
||||
HttpRequestPtr req_;
|
||||
};
|
||||
|
||||
} // namespace internal
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief WebSocket client abstract class
|
||||
*
|
||||
@ -85,52 +65,6 @@ public:
|
||||
virtual void connectToServer(const HttpRequestPtr &request,
|
||||
const WebSocketRequestCallback &callback) = 0;
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
/**
|
||||
* @brief Set messages handler. When a message is recieved from the server,
|
||||
* the callback is called.
|
||||
*
|
||||
* @param callback The function to call when a message is received.
|
||||
*/
|
||||
void setAsyncMessageHandler(
|
||||
const std::function<Task<>(std::string &&message,
|
||||
const WebSocketClientPtr &,
|
||||
const WebSocketMessageType &)> &callback) {
|
||||
setMessageHandler([callback](std::string &&message,
|
||||
const WebSocketClientPtr &client,
|
||||
const WebSocketMessageType &type) -> void {
|
||||
[callback](std::string &&message,
|
||||
const WebSocketClientPtr client,
|
||||
const WebSocketMessageType type) -> AsyncTask {
|
||||
co_await callback(std::move(message), client, type);
|
||||
}(std::move(message), client, type);
|
||||
});
|
||||
}
|
||||
|
||||
/// Set the connection closing handler. When the connection is established
|
||||
/// or closed, the @param callback is called with a bool parameter.
|
||||
|
||||
/**
|
||||
* @brief Set the connection closing handler. When the websocket connection
|
||||
* is closed, the callback is called
|
||||
*
|
||||
* @param callback The function to call when the connection is closed.
|
||||
*/
|
||||
void setAsyncConnectionClosedHandler(
|
||||
const std::function<Task<>(const WebSocketClientPtr &)> &callback) {
|
||||
setConnectionClosedHandler(
|
||||
[callback](const WebSocketClientPtr &client) {
|
||||
[=]() -> AsyncTask { co_await callback(client); }();
|
||||
});
|
||||
}
|
||||
|
||||
/// Connect to the server.
|
||||
internal::WebSocketConnectionAwaiter connectToServerCoro(
|
||||
const HttpRequestPtr &request) {
|
||||
return internal::WebSocketConnectionAwaiter(this, request);
|
||||
}
|
||||
#endif
|
||||
|
||||
/// Get the event loop of the client;
|
||||
virtual trantor::EventLoop *getLoop() = 0;
|
||||
|
||||
@ -194,33 +128,4 @@ public:
|
||||
virtual ~WebSocketClient() = default;
|
||||
};
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
inline void internal::WebSocketConnectionAwaiter::await_suspend(
|
||||
std::coroutine_handle<> handle) {
|
||||
client_->connectToServer(req_,
|
||||
[this, handle](ReqResult result,
|
||||
const HttpResponsePtr &resp,
|
||||
const WebSocketClientPtr &) {
|
||||
if (result == ReqResult::Ok)
|
||||
setValue(resp);
|
||||
else {
|
||||
std::string reason;
|
||||
if (result == ReqResult::BadResponse)
|
||||
reason = "BadResponse";
|
||||
else if (result ==
|
||||
ReqResult::NetworkFailure)
|
||||
reason = "NetworkFailure";
|
||||
else if (result ==
|
||||
ReqResult::BadServerAddress)
|
||||
reason = "BadServerAddress";
|
||||
else if (result == ReqResult::Timeout)
|
||||
reason = "Timeout";
|
||||
setException(std::make_exception_ptr(
|
||||
std::runtime_error(reason)));
|
||||
}
|
||||
handle.resume();
|
||||
});
|
||||
}
|
||||
#endif
|
||||
|
||||
} // namespace drogon
|
||||
|
@ -19,10 +19,6 @@
|
||||
#include <tuple>
|
||||
#include <type_traits>
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
#include <drogon/utils/coroutine.h>
|
||||
#endif
|
||||
|
||||
namespace drogon {
|
||||
class HttpRequest;
|
||||
class HttpResponse;
|
||||
@ -30,14 +26,11 @@ using HttpRequestPtr = std::shared_ptr<HttpRequest>;
|
||||
using HttpResponsePtr = std::shared_ptr<HttpResponse>;
|
||||
|
||||
namespace internal {
|
||||
#ifdef __cpp_impl_coroutine
|
||||
template <typename T>
|
||||
using resumable_type = is_resumable<T>;
|
||||
#else
|
||||
|
||||
|
||||
template <typename T>
|
||||
struct resumable_type : std::false_type {
|
||||
};
|
||||
#endif
|
||||
|
||||
template <typename>
|
||||
struct FunctionTraits;
|
||||
@ -102,41 +95,6 @@ struct FunctionTraits<
|
||||
using class_type = void;
|
||||
};
|
||||
|
||||
#ifdef __cpp_impl_coroutine
|
||||
template <typename... Arguments>
|
||||
struct FunctionTraits<
|
||||
AsyncTask (*)(HttpRequestPtr req,
|
||||
std::function<void(const HttpResponsePtr &)> callback,
|
||||
Arguments...)> : FunctionTraits<AsyncTask (*)(Arguments...)> {
|
||||
static const bool isHTTPFunction = true;
|
||||
static const bool isCoroutine = true;
|
||||
using class_type = void;
|
||||
using first_param_type = HttpRequestPtr;
|
||||
using return_type = AsyncTask;
|
||||
};
|
||||
template <typename... Arguments>
|
||||
struct FunctionTraits<
|
||||
Task<> (*)(HttpRequestPtr req,
|
||||
std::function<void(const HttpResponsePtr &)> callback,
|
||||
Arguments...)> : FunctionTraits<AsyncTask (*)(Arguments...)> {
|
||||
static const bool isHTTPFunction = true;
|
||||
static const bool isCoroutine = true;
|
||||
using class_type = void;
|
||||
using first_param_type = HttpRequestPtr;
|
||||
using return_type = Task<>;
|
||||
};
|
||||
template <typename... Arguments>
|
||||
struct FunctionTraits<Task<HttpResponsePtr> (*)(HttpRequestPtr req,
|
||||
Arguments...)>
|
||||
: FunctionTraits<AsyncTask (*)(Arguments...)> {
|
||||
static const bool isHTTPFunction = true;
|
||||
static const bool isCoroutine = true;
|
||||
using class_type = void;
|
||||
using first_param_type = HttpRequestPtr;
|
||||
using return_type = Task<HttpResponsePtr>;
|
||||
};
|
||||
#endif
|
||||
|
||||
template <typename ReturnType, typename... Arguments>
|
||||
struct FunctionTraits<
|
||||
ReturnType (*)(HttpRequestPtr &&req,
|
||||
|
@ -1,594 +0,0 @@
|
||||
/**
|
||||
*
|
||||
* @file coroutine.h
|
||||
* @author Martin Chang
|
||||
*
|
||||
* Copyright 2021, Martin Chang. All rights reserved.
|
||||
* https://github.com/an-tao/drogon
|
||||
* Use of this source code is governed by a MIT license
|
||||
* that can be found in the License file.
|
||||
*
|
||||
* Drogon
|
||||
*
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <drogon/utils/optional.h>
|
||||
#include <trantor/net/EventLoop.h>
|
||||
#include <trantor/utils/Logger.h>
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <condition_variable>
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <future>
|
||||
#include <mutex>
|
||||
#include <type_traits>
|
||||
|
||||
namespace drogon {
|
||||
namespace internal {
|
||||
template <typename T>
|
||||
auto getAwaiterImpl(T &&value) noexcept(
|
||||
noexcept(static_cast<T &&>(value).operator co_await()))
|
||||
-> decltype(static_cast<T &&>(value).operator co_await()) {
|
||||
return static_cast<T &&>(value).operator co_await();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
auto getAwaiterImpl(T &&value) noexcept(
|
||||
noexcept(operator co_await(static_cast<T &&>(value))))
|
||||
-> decltype(operator co_await(static_cast<T &&>(value))) {
|
||||
return operator co_await(static_cast<T &&>(value));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
auto getAwaiter(T &&value) noexcept(
|
||||
noexcept(getAwaiterImpl(static_cast<T &&>(value))))
|
||||
-> decltype(getAwaiterImpl(static_cast<T &&>(value))) {
|
||||
return getAwaiterImpl(static_cast<T &&>(value));
|
||||
}
|
||||
|
||||
} // end namespace internal
|
||||
|
||||
template <typename T>
|
||||
struct await_result {
|
||||
using awaiter_t = decltype(internal::getAwaiter(std::declval<T>()));
|
||||
using type = decltype(std::declval<awaiter_t>().await_resume());
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
using await_result_t = typename await_result<T>::type;
|
||||
|
||||
template <typename T, typename = std::void_t<> >
|
||||
struct is_awaitable : std::false_type {
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct is_awaitable<
|
||||
T,
|
||||
std::void_t<decltype(internal::getAwaiter(std::declval<T>()))> >
|
||||
: std::true_type {
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
constexpr bool is_awaitable_v = is_awaitable<T>::value;
|
||||
|
||||
struct final_awaiter {
|
||||
bool await_ready() noexcept {
|
||||
return false;
|
||||
}
|
||||
template <typename T>
|
||||
auto await_suspend(std::coroutine_handle<T> handle) noexcept {
|
||||
return handle.promise().continuation_;
|
||||
}
|
||||
void await_resume() noexcept {
|
||||
}
|
||||
};
|
||||
|
||||
template <typename T = void>
|
||||
struct [[nodiscard]] Task {
|
||||
struct promise_type;
|
||||
using handle_type = std::coroutine_handle<promise_type>;
|
||||
|
||||
Task(handle_type h) :
|
||||
coro_(h) {
|
||||
}
|
||||
Task(const Task &) = delete;
|
||||
Task(Task &&other) {
|
||||
coro_ = other.coro_;
|
||||
other.coro_ = nullptr;
|
||||
}
|
||||
~Task() {
|
||||
if (coro_)
|
||||
coro_.destroy();
|
||||
}
|
||||
Task &operator=(const Task &) = delete;
|
||||
Task &operator=(Task &&other) {
|
||||
if (std::addressof(other) == this)
|
||||
return *this;
|
||||
if (coro_)
|
||||
coro_.destroy();
|
||||
|
||||
coro_ = other.coro_;
|
||||
other.coro_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
struct promise_type {
|
||||
Task<T> get_return_object() {
|
||||
return Task<T>{ handle_type::from_promise(*this) };
|
||||
}
|
||||
std::suspend_always initial_suspend() {
|
||||
return {};
|
||||
}
|
||||
void return_value(const T &v) {
|
||||
value = v;
|
||||
}
|
||||
void return_value(T &&v) {
|
||||
value = std::move(v);
|
||||
}
|
||||
|
||||
auto final_suspend() noexcept {
|
||||
return final_awaiter{};
|
||||
}
|
||||
|
||||
void unhandled_exception() {
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
|
||||
T &&result() && {
|
||||
if (exception_ != nullptr)
|
||||
std::rethrow_exception(exception_);
|
||||
assert(value.has_value() == true);
|
||||
return std::move(value.value());
|
||||
}
|
||||
|
||||
T &result() & {
|
||||
if (exception_ != nullptr)
|
||||
std::rethrow_exception(exception_);
|
||||
assert(value.has_value() == true);
|
||||
return value.value();
|
||||
}
|
||||
|
||||
void setContinuation(std::coroutine_handle<> handle) {
|
||||
continuation_ = handle;
|
||||
}
|
||||
|
||||
optional<T> value;
|
||||
std::exception_ptr exception_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
};
|
||||
bool await_ready() const {
|
||||
return !coro_ || coro_.done();
|
||||
}
|
||||
std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting) {
|
||||
coro_.promise().setContinuation(awaiting);
|
||||
return coro_;
|
||||
}
|
||||
|
||||
auto operator co_await() const &noexcept {
|
||||
struct awaiter {
|
||||
public:
|
||||
explicit awaiter(handle_type coro) :
|
||||
coro_(coro) {
|
||||
}
|
||||
bool await_ready() noexcept {
|
||||
return !coro_ || coro_.done();
|
||||
}
|
||||
auto await_suspend(std::coroutine_handle<> handle) noexcept {
|
||||
coro_.promise().setContinuation(handle);
|
||||
return coro_;
|
||||
}
|
||||
T await_resume() {
|
||||
auto &&v = coro_.promise().result();
|
||||
return std::move(v);
|
||||
}
|
||||
|
||||
private:
|
||||
handle_type coro_;
|
||||
};
|
||||
return awaiter(coro_);
|
||||
}
|
||||
|
||||
auto operator co_await() const &&noexcept {
|
||||
struct awaiter {
|
||||
public:
|
||||
explicit awaiter(handle_type coro) :
|
||||
coro_(coro) {
|
||||
}
|
||||
bool await_ready() noexcept {
|
||||
return !coro_ || coro_.done();
|
||||
}
|
||||
auto await_suspend(std::coroutine_handle<> handle) noexcept {
|
||||
coro_.promise().setContinuation(handle);
|
||||
return coro_;
|
||||
}
|
||||
T await_resume() {
|
||||
return std::move(coro_.promise().result());
|
||||
}
|
||||
|
||||
private:
|
||||
handle_type coro_;
|
||||
};
|
||||
return awaiter(coro_);
|
||||
}
|
||||
handle_type coro_;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct [[nodiscard]] Task<void> {
|
||||
struct promise_type;
|
||||
using handle_type = std::coroutine_handle<promise_type>;
|
||||
|
||||
Task(handle_type handle) :
|
||||
coro_(handle) {
|
||||
}
|
||||
Task(const Task &) = delete;
|
||||
Task(Task &&other) {
|
||||
coro_ = other.coro_;
|
||||
other.coro_ = nullptr;
|
||||
}
|
||||
~Task() {
|
||||
if (coro_)
|
||||
coro_.destroy();
|
||||
}
|
||||
Task &operator=(const Task &) = delete;
|
||||
Task &operator=(Task &&other) {
|
||||
if (std::addressof(other) == this)
|
||||
return *this;
|
||||
if (coro_)
|
||||
coro_.destroy();
|
||||
|
||||
coro_ = other.coro_;
|
||||
other.coro_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
struct promise_type {
|
||||
Task<> get_return_object() {
|
||||
return Task<>{ handle_type::from_promise(*this) };
|
||||
}
|
||||
std::suspend_always initial_suspend() {
|
||||
return {};
|
||||
}
|
||||
void return_void() {
|
||||
}
|
||||
auto final_suspend() noexcept {
|
||||
return final_awaiter{};
|
||||
}
|
||||
void unhandled_exception() {
|
||||
exception_ = std::current_exception();
|
||||
}
|
||||
void result() {
|
||||
if (exception_ != nullptr)
|
||||
std::rethrow_exception(exception_);
|
||||
}
|
||||
void setContinuation(std::coroutine_handle<> handle) {
|
||||
continuation_ = handle;
|
||||
}
|
||||
std::exception_ptr exception_;
|
||||
std::coroutine_handle<> continuation_;
|
||||
};
|
||||
bool await_ready() {
|
||||
return coro_.done();
|
||||
}
|
||||
std::coroutine_handle<> await_suspend(std::coroutine_handle<> awaiting) {
|
||||
coro_.promise().setContinuation(awaiting);
|
||||
return coro_;
|
||||
}
|
||||
auto operator co_await() const &noexcept {
|
||||
struct awaiter {
|
||||
public:
|
||||
explicit awaiter(handle_type coro) :
|
||||
coro_(coro) {
|
||||
}
|
||||
bool await_ready() noexcept {
|
||||
return !coro_ || coro_.done();
|
||||
}
|
||||
auto await_suspend(std::coroutine_handle<> handle) noexcept {
|
||||
coro_.promise().setContinuation(handle);
|
||||
return coro_;
|
||||
}
|
||||
auto await_resume() {
|
||||
coro_.promise().result();
|
||||
}
|
||||
|
||||
private:
|
||||
handle_type coro_;
|
||||
};
|
||||
return awaiter(coro_);
|
||||
}
|
||||
|
||||
auto operator co_await() const &&noexcept {
|
||||
struct awaiter {
|
||||
public:
|
||||
explicit awaiter(handle_type coro) :
|
||||
coro_(coro) {
|
||||
}
|
||||
bool await_ready() noexcept {
|
||||
return false;
|
||||
}
|
||||
auto await_suspend(std::coroutine_handle<> handle) noexcept {
|
||||
coro_.promise().setContinuation(handle);
|
||||
return coro_;
|
||||
}
|
||||
void await_resume() {
|
||||
coro_.promise().result();
|
||||
}
|
||||
|
||||
private:
|
||||
handle_type coro_;
|
||||
};
|
||||
return awaiter(coro_);
|
||||
}
|
||||
handle_type coro_;
|
||||
};
|
||||
|
||||
/// Fires a coroutine and doesn't force waiting nor deallocates upon promise
|
||||
/// destructs
|
||||
// NOTE: AsyncTask is designed to be not awaitable. And kills the entire process
|
||||
// if exception escaped.
|
||||
struct AsyncTask {
|
||||
struct promise_type;
|
||||
using handle_type = std::coroutine_handle<promise_type>;
|
||||
|
||||
AsyncTask() = default;
|
||||
|
||||
AsyncTask(handle_type h) :
|
||||
coro_(h) {
|
||||
}
|
||||
|
||||
AsyncTask(const AsyncTask &) = delete;
|
||||
|
||||
AsyncTask &operator=(const AsyncTask &) = delete;
|
||||
AsyncTask &operator=(AsyncTask &&other) {
|
||||
if (std::addressof(other) == this)
|
||||
return *this;
|
||||
|
||||
coro_ = other.coro_;
|
||||
other.coro_ = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
struct promise_type {
|
||||
std::coroutine_handle<> continuation_;
|
||||
|
||||
AsyncTask get_return_object() noexcept {
|
||||
return { std::coroutine_handle<promise_type>::from_promise(*this) };
|
||||
}
|
||||
|
||||
std::suspend_never initial_suspend() const noexcept {
|
||||
return {};
|
||||
}
|
||||
|
||||
void unhandled_exception() {
|
||||
LOG_FATAL << "Exception escaping AsyncTask.";
|
||||
std::terminate();
|
||||
}
|
||||
|
||||
void return_void() noexcept {
|
||||
}
|
||||
|
||||
void setContinuation(std::coroutine_handle<> handle) {
|
||||
continuation_ = handle;
|
||||
}
|
||||
|
||||
auto final_suspend() const noexcept {
|
||||
// Can't simply use suspend_never because we need symmetric transfer
|
||||
struct awaiter final {
|
||||
bool await_ready() const noexcept {
|
||||
return true;
|
||||
}
|
||||
|
||||
auto await_suspend(
|
||||
std::coroutine_handle<promise_type> coro) const noexcept {
|
||||
return coro.promise().continuation_;
|
||||
}
|
||||
|
||||
void await_resume() const noexcept {
|
||||
}
|
||||
};
|
||||
return awaiter{};
|
||||
}
|
||||
};
|
||||
bool await_ready() const noexcept {
|
||||
return coro_.done();
|
||||
}
|
||||
|
||||
void await_resume() const noexcept {
|
||||
}
|
||||
|
||||
auto await_suspend(std::coroutine_handle<> coroutine) noexcept {
|
||||
coro_.promise().setContinuation(coroutine);
|
||||
return coro_;
|
||||
}
|
||||
|
||||
handle_type coro_;
|
||||
};
|
||||
|
||||
/// Helper class that provides the infrastructure for turning callback into
|
||||
/// coroutines
|
||||
// The user is responsible to fill in `await_suspend()` and constructors.
|
||||
template <typename T = void>
|
||||
struct CallbackAwaiter {
|
||||
bool await_ready() noexcept {
|
||||
return false;
|
||||
}
|
||||
|
||||
const T &await_resume() const noexcept(false) {
|
||||
// await_resume() should always be called after co_await
|
||||
// (await_suspend()) is called. Therefore the value should always be set
|
||||
// (or there's an exception)
|
||||
assert(result_.has_value() == true || exception_ != nullptr);
|
||||
|
||||
if (exception_)
|
||||
std::rethrow_exception(exception_);
|
||||
return result_.value();
|
||||
}
|
||||
|
||||
private:
|
||||
// HACK: Not all desired types are default constructable. But we need the
|
||||
// entire struct to be constructed for awaiting. std::optional takes care of
|
||||
// that.
|
||||
optional<T> result_;
|
||||
std::exception_ptr exception_{ nullptr };
|
||||
|
||||
protected:
|
||||
void setException(const std::exception_ptr &e) {
|
||||
exception_ = e;
|
||||
}
|
||||
void setValue(const T &v) {
|
||||
result_.emplace(v);
|
||||
}
|
||||
void setValue(T &&v) {
|
||||
result_.emplace(std::move(v));
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct CallbackAwaiter<void> {
|
||||
bool await_ready() noexcept {
|
||||
return false;
|
||||
}
|
||||
|
||||
void await_resume() noexcept(false) {
|
||||
if (exception_)
|
||||
std::rethrow_exception(exception_);
|
||||
}
|
||||
|
||||
private:
|
||||
std::exception_ptr exception_{ nullptr };
|
||||
|
||||
protected:
|
||||
void setException(const std::exception_ptr &e) {
|
||||
exception_ = e;
|
||||
}
|
||||
};
|
||||
|
||||
// An ok implementation of sync_await. This allows one to call
|
||||
// coroutines and wait for the result from a function.
|
||||
template <typename Await>
|
||||
auto sync_wait(Await &&await) {
|
||||
static_assert(is_awaitable_v<std::decay_t<Await> >);
|
||||
using value_type = typename await_result<Await>::type;
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
std::atomic<bool> flag = false;
|
||||
std::exception_ptr exception_ptr;
|
||||
std::unique_lock lk(mtx);
|
||||
|
||||
if constexpr (std::is_same_v<value_type, void>) {
|
||||
auto task = [&]() -> AsyncTask {
|
||||
try {
|
||||
co_await await;
|
||||
} catch (...) {
|
||||
exception_ptr = std::current_exception();
|
||||
}
|
||||
std::unique_lock lk(mtx);
|
||||
flag = true;
|
||||
cv.notify_all();
|
||||
};
|
||||
|
||||
std::thread thr([&]() { task(); });
|
||||
cv.wait(lk, [&]() { return (bool)flag; });
|
||||
thr.join();
|
||||
if (exception_ptr)
|
||||
std::rethrow_exception(exception_ptr);
|
||||
} else {
|
||||
optional<value_type> value;
|
||||
auto task = [&]() -> AsyncTask {
|
||||
try {
|
||||
value = co_await await;
|
||||
} catch (...) {
|
||||
exception_ptr = std::current_exception();
|
||||
}
|
||||
std::unique_lock lk(mtx);
|
||||
flag = true;
|
||||
cv.notify_all();
|
||||
};
|
||||
|
||||
std::thread thr([&]() { task(); });
|
||||
cv.wait(lk, [&]() { return (bool)flag; });
|
||||
assert(value.has_value() == true || exception_ptr);
|
||||
thr.join();
|
||||
|
||||
if (exception_ptr)
|
||||
std::rethrow_exception(exception_ptr);
|
||||
|
||||
return std::move(value.value());
|
||||
}
|
||||
}
|
||||
|
||||
// Converts a task (or task like) promise into std::future for old-style async
|
||||
template <typename Await>
|
||||
inline auto co_future(Await &&await) noexcept
|
||||
-> std::future<await_result_t<Await> > {
|
||||
using Result = await_result_t<Await>;
|
||||
std::promise<Result> prom;
|
||||
auto fut = prom.get_future();
|
||||
[](std::promise<Result> prom, Await await) -> AsyncTask {
|
||||
try {
|
||||
if constexpr (std::is_void_v<Result>) {
|
||||
co_await std::move(await);
|
||||
prom.set_value();
|
||||
} else
|
||||
prom.set_value(co_await std::move(await));
|
||||
} catch (...) {
|
||||
prom.set_exception(std::current_exception());
|
||||
}
|
||||
}(std::move(prom), std::move(await));
|
||||
return fut;
|
||||
}
|
||||
|
||||
namespace internal {
|
||||
struct TimerAwaiter : CallbackAwaiter<void> {
|
||||
TimerAwaiter(trantor::EventLoop *loop,
|
||||
const std::chrono::duration<double> &delay) :
|
||||
loop_(loop), delay_(delay.count()) {
|
||||
}
|
||||
TimerAwaiter(trantor::EventLoop *loop, double delay) :
|
||||
loop_(loop), delay_(delay) {
|
||||
}
|
||||
void await_suspend(std::coroutine_handle<> handle) {
|
||||
loop_->runAfter(delay_, [handle]() { handle.resume(); });
|
||||
}
|
||||
|
||||
private:
|
||||
trantor::EventLoop *loop_;
|
||||
double delay_;
|
||||
};
|
||||
} // namespace internal
|
||||
|
||||
inline internal::TimerAwaiter sleepCoro(
|
||||
trantor::EventLoop *loop,
|
||||
const std::chrono::duration<double> &delay) noexcept {
|
||||
assert(loop);
|
||||
return internal::TimerAwaiter(loop, delay);
|
||||
}
|
||||
|
||||
inline internal::TimerAwaiter sleepCoro(trantor::EventLoop *loop,
|
||||
double delay) noexcept {
|
||||
assert(loop);
|
||||
return internal::TimerAwaiter(loop, delay);
|
||||
}
|
||||
|
||||
template <typename T, typename = std::void_t<> >
|
||||
struct is_resumable : std::false_type {
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
struct is_resumable<
|
||||
T,
|
||||
std::void_t<decltype(internal::getAwaiter(std::declval<T>()))> >
|
||||
: std::true_type {
|
||||
};
|
||||
|
||||
template <>
|
||||
struct is_resumable<AsyncTask, std::void_t<AsyncTask> > : std::true_type {
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
constexpr bool is_resumable_v = is_resumable<T>::value;
|
||||
|
||||
} // namespace drogon
|
@ -1,133 +0,0 @@
|
||||
#include <drogon/drogon_test.h>
|
||||
#include <drogon/utils/coroutine.h>
|
||||
|
||||
using namespace drogon;
|
||||
|
||||
namespace drogon::internal {
|
||||
struct SomeStruct {
|
||||
~SomeStruct() {
|
||||
beenDestructed = true;
|
||||
}
|
||||
static bool beenDestructed;
|
||||
};
|
||||
|
||||
bool SomeStruct::beenDestructed = false;
|
||||
|
||||
struct StructAwaiter : public CallbackAwaiter<std::shared_ptr<SomeStruct> > {
|
||||
void await_suspend(std::coroutine_handle<> handle) {
|
||||
setValue(std::make_shared<SomeStruct>());
|
||||
handle.resume();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace drogon::internal
|
||||
|
||||
// Workarround limitation of macros
|
||||
template <typename T>
|
||||
using is_int = std::is_same<T, int>;
|
||||
template <typename T>
|
||||
using is_void = std::is_same<T, void>;
|
||||
|
||||
DROGON_TEST(CroutineBasics) {
|
||||
// Basic checks making sure coroutine works as expected
|
||||
STATIC_REQUIRE(is_awaitable_v<Task<> >);
|
||||
STATIC_REQUIRE(is_awaitable_v<Task<int> >);
|
||||
STATIC_REQUIRE(is_awaitable_v<Task<> >);
|
||||
STATIC_REQUIRE(is_awaitable_v<Task<int> >);
|
||||
STATIC_REQUIRE(is_int<await_result_t<Task<int> > >::value);
|
||||
STATIC_REQUIRE(is_void<await_result_t<Task<> > >::value);
|
||||
|
||||
// No, you cannot await AsyncTask. By design
|
||||
STATIC_REQUIRE(is_awaitable_v<AsyncTask> == false);
|
||||
|
||||
// AsyncTask should execute eagerly
|
||||
int m = 0;
|
||||
[&m]() -> AsyncTask {
|
||||
m = 1;
|
||||
co_return;
|
||||
}();
|
||||
REQUIRE(m == 1);
|
||||
|
||||
// Make sure sync_wait works
|
||||
CHECK(sync_wait([]() -> Task<int> { co_return 1; }()) == 1);
|
||||
|
||||
// make sure it does affect the outside world
|
||||
int n = 0;
|
||||
sync_wait([&]() -> Task<> {
|
||||
n = 1;
|
||||
co_return;
|
||||
}());
|
||||
CHECK(n == 1);
|
||||
|
||||
// Testing that exceptions can propergate through coroutines
|
||||
auto throw_in_task = [TEST_CTX]() -> Task<> {
|
||||
auto f = []() -> Task<> { throw std::runtime_error("test error"); };
|
||||
|
||||
CHECK_THROWS_AS(co_await f(), std::runtime_error);
|
||||
};
|
||||
sync_wait(throw_in_task());
|
||||
|
||||
// Test sync_wait propergrates exception
|
||||
auto throws = []() -> Task<> {
|
||||
throw std::runtime_error("bla");
|
||||
co_return;
|
||||
};
|
||||
CHECK_THROWS_AS(sync_wait(throws()), std::runtime_error);
|
||||
|
||||
// Test co_return non-copyable object works
|
||||
auto return_unique_ptr = [TEST_CTX]() -> Task<std::unique_ptr<int> > {
|
||||
co_return std::make_unique<int>(42);
|
||||
};
|
||||
CHECK(*sync_wait(return_unique_ptr()) == 42);
|
||||
|
||||
// Test co_awaiting non-copyable object works
|
||||
auto await_non_copyable = [TEST_CTX]() -> Task<> {
|
||||
auto return_unique_ptr = []() -> Task<std::unique_ptr<int> > {
|
||||
co_return std::make_unique<int>(123);
|
||||
};
|
||||
auto ptr = co_await return_unique_ptr();
|
||||
CHECK(*ptr == 123);
|
||||
};
|
||||
sync_wait(await_non_copyable());
|
||||
}
|
||||
|
||||
DROGON_TEST(CompilcatedCoroutineLifetime) {
|
||||
auto coro = []() -> Task<Task<std::string> > {
|
||||
auto coro2 = []() -> Task<std::string> {
|
||||
auto coro3 = []() -> Task<std::string> {
|
||||
co_return std::string("Hello, World!");
|
||||
};
|
||||
auto coro4 = [coro3 = std::move(coro3)]() -> Task<std::string> {
|
||||
auto coro5 = []() -> Task<> { co_return; };
|
||||
co_await coro5();
|
||||
co_return co_await coro3();
|
||||
};
|
||||
co_return co_await coro4();
|
||||
};
|
||||
|
||||
co_return coro2();
|
||||
};
|
||||
|
||||
auto task1 = coro();
|
||||
auto task2 = sync_wait(task1);
|
||||
std::string str = sync_wait(task2);
|
||||
|
||||
CHECK(str == "Hello, World!");
|
||||
}
|
||||
|
||||
DROGON_TEST(CoroutineDestruction) {
|
||||
// Test coroutine destruction
|
||||
auto destruct = []() -> Task<> {
|
||||
auto awaitStruct = []() -> Task<std::shared_ptr<internal::SomeStruct> > {
|
||||
co_return co_await internal::StructAwaiter();
|
||||
};
|
||||
|
||||
auto awaitNothing = [awaitStruct]() -> Task<> {
|
||||
co_await awaitStruct();
|
||||
};
|
||||
|
||||
co_await awaitNothing();
|
||||
};
|
||||
sync_wait(destruct());
|
||||
CHECK(internal::SomeStruct::beenDestructed == true);
|
||||
}
|
Loading…
Reference in New Issue
Block a user