I am currently learning how to use the concurrent_channel provided in boost asio. Unfortunately, there are no usage examples for concurrent_channel in boost asio, so I'm trying to write some demos myself for verification.
I have a couple of questions regarding concurrent_channel: 1. Does my test program have any undefined behavior? 2. What is the correct usage of concurrent_channel in asio?
#include <exception>
#include <string>
#include <thread>
#include <vector>
#include <boost/asio/error.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
namespace asio = boost::asio;
namespace sys = boost::system;
// asio's concurrent channel
using concurrent_channel_t =
asio::experimental::concurrent_channel<void(sys::error_code, int)>;
// An exception handler for coroutines that rethrows any exception thrown by
// the coroutine. We handle all known error cases with error_code's. If an
// exception is raised, it's something critical, e.g. out of memory.
// Re-raising it in the exception handler will case io_context::run()
// to throw and terminate the program.
static constexpr auto rethrow_handler = [](std::exception_ptr ex) {
if (ex) {
std::rethrow_exception(ex);
}
};
void send_func(int thrid, concurrent_channel_t& chan)
{
std::printf("thread %d: start to send data\n", thrid);
for (int i = 0; i < 100; ++i) {
if (!chan.try_send(sys::error_code{}, i)) {
chan.async_send(sys::error_code{}, 0, [thrid](sys::error_code ec) {
std::printf("thread %d: send data error: %s\n",
thrid,
ec.message().c_str());
});
}
}
std::printf("thread %d: send data done\n", thrid);
}
void receive_func(concurrent_channel_t& chan, asio::yield_context yield)
{
std::printf("start to receive data from channel\n");
while (true) {
sys::error_code ec;
auto n = chan.async_receive(yield[ec]);
if (ec) {
if (ec != asio::experimental::error::channel_closed) {
std::printf("receiving data error: %s\n", ec.message().c_str());
}
return;
}
std::printf("received data %d\n", n);
}
}
int main(int argc, char* argv[])
{
try {
int thread_count;
if (argc < 2) {
thread_count = 4;
} else {
thread_count = std::stoi(argv[1]);
}
asio::io_context ioctx;
concurrent_channel_t chan(ioctx.get_executor(), 4096);
// Launch a stackfull coroutine for receiving data from channel.
asio::spawn(
ioctx.get_executor(),
[&chan](asio::yield_context yield) mutable {
receive_func(chan, yield);
},
rethrow_handler);
// Start N threads for sending data to channel concurrently.
std::vector<std::thread> threads;
for (int i = 0; i < thread_count; ++i) {
threads.emplace_back(
[thrid = i, &chan]() { send_func(thrid, chan); });
}
// Start a new background thread for joining sending threads.
std::thread([&threads, &chan]() {
for (auto& th: threads) {
th.join();
}
chan.close();
}).detach();
// Run io_context endless.
ioctx.run();
return 0;
} catch (const std::exception& ex) {
std::printf("unexpected exception: %s\n", ex.what());
return 1;
}
}
No I don't see UB in your code.
What is "correct"? If you mean "typical", then I'd say it's more typical that channel writes also arise from asynchronous operations.
In that sense it's more typical to see a thread pool for any async work which then causes messages written to the channel.
In general, try to move away from focusing on (or thinking about) threads. Instead, focus on (asynchronous) process flows, that run on top of your IO service threads.
Here's my clarified version:
Live On Coliru
Prints something like: