I am trying to write a general async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) async initiating function.
The goal is to wrap arbitrary, blocking, third-party functions in a thread, and provide an asio-based interface.
It's not perfect yet (for instance, I know I need to post the completion handlers on the executor instead of running them in the thread), but I feel quite close.
I have three issues and questions:
Why does the program stop before all completion handlers have run? I shouldn't need a work guard, since the async operation is ongoing, right? EDIT: I'm mistaken. The non-callback handlers aren't being called at all, as evidenced by putting a sleep_for(1s) after the run() call. So my question is instead, why not?
Is this code violating some asio principle? It seems like something that would be fairly common to want to do, but I find very few examples of people doing similar things.
(bonus) I want to swap
std::threadwithconcurrency::task<void>. The problem is then that I can't use a move-only type in the lambda capture. I triedself = make_shared<remove_reference_t<Self>>(move(self)), but this caused the three handlers to just printstr:without the args. I believe this has something to do with the fact that theSelftype (really aasio::detail::compose_op) contains a moved-in copy of theimpl. So when I go to print, I'm using the old moved-from version. Anyone have any insight why that might be the case?
#include <chrono>
#include <iostream>
#include <memory>
#include <thread>
#include "asio.hpp"
template <typename Fn, typename... Args>
struct async_task_impl {
std::decay_t<Fn> fn_;
std::tuple<std::decay_t<Args>...> args_;
async_task_impl(Fn&& fn, Args&&... args)
: fn_(std::forward<Fn>(fn)), args_(std::forward<Args>(args)...) {}
template <typename Self>
auto operator()(Self& self) {
// @todo: use concurrency::create_task
auto t =
std::thread([me = *this, // copy impl into thread
self = std::move(self) // move composed_op into thread?
]() mutable {
try {
std::apply(me.fn_, me.args_);
self.complete({});
} catch (std::exception& e) {
self.complete(std::current_exception());
}
});
t.detach();
}
};
// runs some blocking task on its own thread and wraps it in asio
template <typename Executor, typename Token, typename Fn, typename... Args>
auto async_task(Executor& executor, Token&& token, Fn&& func, Args&&... args) {
return asio::async_compose<Token, void(std::exception_ptr)>(
async_task_impl(std::forward<Fn>(func), std::forward<Args>(args)...),
token, executor);
}
Test code: Godbolt
void slow_print(std::string str) {
static std::mutex m;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
{
std::unique_lock lk(m);
std::cout << "slow_print: " << str << "\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
int main() {
try {
asio::io_context ctx;
using namespace std::string_literals;
async_task(
ctx, [](std::exception_ptr) { std::cout << "callback done\n"; },
slow_print, "callback"s);
asio::co_spawn(
ctx,
[&]() -> asio::awaitable<void> {
co_await async_task(ctx, asio::use_awaitable, slow_print, "coro"s);
},
asio::detached);
auto f = std::async(std::launch::async, [&] {
auto fut = async_task(ctx, asio::use_future, slow_print, "future"s);
fut.get();
});
ctx.run();
} catch (std::exception& e) {
std::cout << e.what() << "\n";
}
return 0;
}
SHORT ANSWERS
I have no direct idea, your own Godbolt link seems to the premise, and so does this slightly embellished example: https://godbolt.org/z/WMKa4sqaE See below for some notes about the changes.
Maybe. See below.
Yes. The docs have a very similar example: "To see this in practice, let's use a detached thread to adapt a synchronous operation into an asynchronous one"
Beast has some helpers in their code base (stable_operation_base or something, from the top of my head). Also see this blog post by Richard Hodges which creates a
shared_composed_opfrom acomposed_opthat afford reference stability of the standard operation implementation.LONG ANSWERS
Yes. Resumed coroutines are not work - it's only when they suspend they usually enqueue a an operation with a completion handler to resume.
This is already the case with non-c++20 stackful coros, as Tanner has made very explicitly clear on occasion:
What's worse: when you interact with more than one IO object they may be associated with different execution context, so you might need to track work on multiple executors.
The good news is that Asio (Chris) knew about this, which is why the signature of
async_composetakes a list ofIoObjectsOrExecutors:The composed operation specialized on your callable type will effective use
boost::asio::prefer(ex, execution::outstanding_work.tracked)on all of the associated executors.So as long as the composed operation (
self) stays around, there should be work.Services Are Not IO Objects Or Executors
You pass the service ("execution context") itself instead of an executor. When passing executors, prefer to pass by value.
Then What Went Wrong?
Again, I don't really know as I didn't exactly reproduce your complaints.
However, keep in mind the semantics of completion. In simplified pseudo-code,
complete()does:In other words, don't expect the work guards to stick past completion. In fact the order is pretty central to the allocation guarantees of the library.
(More) Reliable Debug Output
In C++ use
std::flush(orstd::endl) if you want output to appear. Otherwise you might just be confused about output timing. This is frequently a source of confusion when printing stuff from completion handlers in Asio.For maximum insight, I'll introduce a variadic trace function that also timestamps each trace:
Side Note
use_futureI don't get what you tried to achieve with the
std::asyncversion. As it stands you're demonstrating whystd::asynchas been a bad design.If you are looking to demonstrate Asio's future support, I'd write:
Now, to avoid interfering with the service because the future will block, I'd suggest running the service in the background instead:
Of course, you can invert the situation by introducing a thread for the blocking wait:
Double Moves
In your task implementation, you both move
selfand copy*this. Howevercompose_opaggregates yourasync_task_impl(as theimpl_member), so there is a timing link between those. As far as I know the evaluation order or lambda captures in unspecified.I'd suggest avoiding the unnecessary copy:
Or indeed, going for syntactic sugar:
To make it even more elegant, just pass the
selfas a mutable argument instead of capturing it (this may not work withconcurrency::create_taskof course):Perfect Storage vs. Perfect Forwarding
Another place where you are not 100% clear about the forwarding intent is in the
async_task_implconstructor.Args...is already in non-deduced context there, soArgs&&...mandates rvalues. This might be why you used""s-literals?There are several ways to fix
Either you can let the compiler do its job:
If you feel that's a pessimization (does your code-base use expensive non-move-aware argument types?), the simplest is to make the construct an independent template:
I would probably go all-the-way and be explicit about the decay moment using a deduction guide. The best part is you no longer require a constructor at all:
Full Demo
Combining all the above:
Live On Coliru Live On Compiler Explorer
Prints