1. Will generate exceptions that cannot be caught
  2.   boost::asio::detail::cancellation_handler_base::call[virtual] == 0xFFFFFFFFFFFFFFFF。
        /// Emits the signal and causes invocation of the slot's handler, if any.
        void emit(cancellation_type_t type)
        {
          if (handler_)
            handler_->call(type);
        }
    

I can't find where the problem is anymore, the exception cannot be caught

ServerDome:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <cstdio>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::thread_pool;
using std::chrono::steady_clock;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::steady_timer;

namespace this_coro = boost::asio::this_coro;
using ssl_socket = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace std::literals::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;

awaitable<void> echo(tcp::socket socket)
{
    steady_timer timer(co_await this_coro::executor);
    try
    {
        char data[2000];
        for (;;)
        {
            timer.expires_after(2ms);
            auto result1 = co_await(async_read(socket, buffer(data, 2000), use_nothrow_awaitable) || timer.async_wait(use_nothrow_awaitable));
            if (result1.index() == 1) {
                //std::cout << "time out." << std::endl;
            }
            else {
                auto [e,n] = std::get<0>(result1);
                if (!e)
                {
                    if (n)
                        co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
                }
                else {
                    std::cout << e.message() << std::endl;
                }
            }

        }
    }
    catch (std::exception& e)
    {
        std::printf("echo Exception: %s\n", e.what());
    }
}

awaitable<void> listener()
{
    auto executor = co_await this_coro::executor;
    tcp::acceptor acceptor(executor, { tcp::v4(), 5555 });
    for (;;)
    {
        tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
        co_spawn(executor, echo(std::move(socket)), detached);
    }
}

int main(int argc, char* argv[])
{
    try
    {
        thread_pool pol(8);
        boost::asio::signal_set signals(pol, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { pol.stop(); });

        co_spawn(pol, listener(), detached);

        pol.wait();
    }
    catch (std::exception& e)
    {
        std::printf("Exception: %s\n", e.what());
    }
}

ClientDome:

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <cstdio>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using std::chrono::steady_clock;
using boost::asio::thread_pool;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;

namespace this_coro = boost::asio::this_coro;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;
thread_pool io_context(3);

awaitable<void> timeout(steady_clock::duration duration)
{
    steady_timer timer(co_await this_coro::executor);
    timer.expires_after(duration);
    co_await timer.async_wait(use_nothrow_awaitable);
}

awaitable<void> echo3(tcp::socket socket)
{
    try
    {
        char data[4024];
        for (;;)
        {
            co_await timeout(std::chrono::milliseconds(2));
            co_await async_write(socket, boost::asio::buffer(data, 2000), use_awaitable);
        }
    }
    catch (const std::exception& e)
    {
        std::printf("echo2 Exception: %s\n", e.what());
    }
}

awaitable<void> listener(tcp::socket socket)
{
    try
    {
        auto executor = co_await this_coro::executor;

        auto listen_endpoint =
            *tcp::resolver(socket.get_executor()).resolve("127.0.0.1", std::to_string(5555),
                tcp::resolver::passive).begin();
        co_await socket.async_connect(listen_endpoint, use_awaitable);
        co_spawn(executor, echo3(std::move(socket)), detached);
    }
    catch (const std::exception& e)
    {
        std::printf("connect Exception: %s\n", e.what());
    }
}

int main()
{
    try
    {
        
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { io_context.stop(); });

        for (size_t i = 0; i < 3; i++)
        {
            co_spawn(io_context, listener(std::move(tcp::socket(io_context))), detached);
        }

        io_context.wait();
    }
    catch (std::exception& e)
    {
        std::printf("main Exception: %s\n", e.what());
    }
}

I expect the program to run without errors and exceptions

1

There are 1 best solutions below

1
sehe On BEST ANSWER

There are a two main things:

Strands

I noticed sparsely occuring SEGV in deadline_timer_service. I noticed that you have a multi-threaded server context,but didn't guard the connections (echo sessions with strand). To be completely honest, I didn't intuitively think you would require one, but I'm thinking that the parallel_group that implements the || awaitable operator uses a shared cancellation mechanism that may not be threadsafe. In that case, a strand might be required:

asio::awaitable<void> listener() {
    for (tcp::acceptor acc(co_await this_coro::executor, {{}, 5555});;) {
        auto s = make_strand(acc.get_executor());
        co_spawn(                                              //
            s,                                                 //
            echo(co_await acc.async_accept(s, use_awaitable)), //
            asio::detached);
    }
}

With that the SEGV goes away.

Deadlock

I noticed client and server stop progressing after a while. After counting lines I noticed it would freeze up after the same amount of messages transferred each time: ~7800x2000 bytes. That's about 5GiB per socket.

I figured that the kernel might just block writes when the read buffer is at maximum size, because the client never reads the echoed packets in the client.

So, I added that. The neat thing to do would be to co_spawn a full-duplex coroutine, but since we know that all packets are echoed 1:1, I did the simple thing and put the read inline with the write loop.

Effectively we replace the sleep(2ms) with a "read indefinitely" (here it's important that the data buffer has a higher capacity than the maximum expected reply) limited to 2ms:

co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));

Side note: now that we used parallel groups in the client code as well, I made sure to upgrade to strand executors for the coro as well.

Combined Test Program

With that change, all is well, and the server/client kept running without failures, tripping UBSan/ASan or deadlocking.

This program (potentially) combines client and server, time limited to 30s only for COLIRU. It logs frequent messages once-in-a-thousand times:

Live On Coliru ²

#include <boost/asio.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <set>

namespace asio = boost::asio;
using asio::use_awaitable;
using asio::ip::tcp;

namespace this_coro          = asio::this_coro;
constexpr auto nothrow_await = asio::as_tuple(use_awaitable);
using namespace asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;

void once_in(size_t n, auto&& action) { // helper for reduced frequency logging
    static std::atomic_size_t counter_ = 0;
    if ((++counter_ % n) == 0) {
        if constexpr (std::is_invocable_v<decltype(action), size_t>)
            std::move(action)(counter_);
        else
            std::move(action)();
    }
}

asio::awaitable<void> timeout(auto duration) {
    asio::steady_timer timer(co_await this_coro::executor);
    timer.expires_after(duration);
    co_await timer.async_wait(nothrow_await);
}

asio::awaitable<void> server_session(tcp::socket socket) {
    try {
        for (std::array<char, 2000> data;;) {
            if (auto r = co_await (async_read(socket, asio::buffer(data), nothrow_await) || timeout(2ms));
                r.index() == 1) {
                once_in(1000, [&] { std::cout << "server_session time out." << std::endl; });
            } else {
                auto [e, n] = std::get<0>(r);
                if (!e) {
                    once_in(1000, [&, n = n] {
                        std::cout << "server_session writing " << n << " bytes to "
                                  << socket.remote_endpoint() << std::endl;
                    });
                    if (n)
                        co_await async_write(socket, asio::buffer(data, n), use_awaitable);
                } else {
                    std::cout << e.message() << std::endl;
                }
            }
        }
    } catch (boost::system::system_error const& se) {
        std::cout << "server_session Exception: " << se.code().message() << std::endl;
    } catch (std::exception const& e) {
        std::cout << "server_session Exception: " << e.what() << std::endl;
    }

    std::cout << "server_session closed" << std::endl;
}

asio::awaitable<void> listener(uint16_t port) {
    for (tcp::acceptor acc(co_await this_coro::executor, {{}, port});;) {
        auto s = make_strand(acc.get_executor());
        co_spawn(                                                        //
            s,                                                           //
            server_session(co_await acc.async_accept(s, use_awaitable)), //
            asio::detached);
    }
}

asio::awaitable<void> client_session(uint16_t port) {
    try {
        tcp::socket socket(co_await this_coro::executor);
        co_await socket.async_connect({{}, port}, use_awaitable);

        for (std::array<char, 4024> data{0};;) {
            co_await (async_read(socket, asio::buffer(data), use_awaitable) || timeout(2ms));
            auto w = co_await async_write(socket, asio::buffer(data, 2000 /*SEHE?!*/), use_awaitable);

            once_in(1000, [&](size_t counter) {
                std::cout << "#" << counter << " wrote " << w << " bytes from " << socket.local_endpoint()
                          << std::endl;
            });
        }
    } catch (boost::system::system_error const& se) {
        std::cout << "client_session Exception: " << se.code().message() << std::endl;
    } catch (std::exception const& e) {
        std::cout << "client_session Exception: " << e.what() << std::endl;
    }

    std::cout << "client_session closed" << std::endl;
}

int main(int argc, char** argv) {
    auto flags = std::set<std::string_view>(argv + 1, argv + argc);
    bool server = flags.contains("server");
    bool client = flags.contains("client");

    asio::thread_pool pool(server ? 8 : 3);
    try {
        asio::signal_set signals(pool, SIGINT, SIGTERM);
        signals.async_wait([&](auto, auto) { pool.stop(); });

        if (server) {
            co_spawn(pool, listener(5555), asio::detached);
        }

        if (client) {
            co_spawn(make_strand(pool), client_session(5555), asio::detached);
            co_spawn(make_strand(pool), client_session(5555), asio::detached);
            co_spawn(make_strand(pool), client_session(5555), asio::detached);
        }

        std::this_thread::sleep_for(30s); // time limited for COLIRU
    } catch (std::exception const& e) {
        std::cout << "main Exception: " << e.what() << std::endl;
    }
}

Local demo for clarity:

enter image description here

² also using experimental::as_tuple for COLIRU