I'm writing a tool to test an asyncio based server end-to-end. Initially I was going to spin up the server in one terminal window, and run the test in a separate window, but then I realized that it should be possible to just do that in one script. After all, I can do it with a concurrent.futures.ThreadPoolExecutor, but I'm struggling converting the logic using await/async def.
Here's a working example using the TPE:
import argparse
import socket
import concurrent.futures
import threading
import socketserver
class TCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print(f'Got data: {self.request.recv(1024).strip().decode()}')
def started_server(*, server):
print('starting server')
server.serve_forever()
print('server thread closing')
def run_client(*, host, port, server):
print('client started, attempting connection')
with socket.create_connection((host, port), timeout=0.5) as conn:
print('connected')
conn.send(b'hello werld')
print('closing server')
server.shutdown()
print('cancelled')
def test_the_server(*, host, port):
ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
print('server a')
quitter = threading.Event()
server = socketserver.TCPServer((host, port), TCPHandler)
a = ex.submit(started_server, server=server)
b = ex.submit(run_client, host=host, port=port, server=server)
print(a.result(), b.result())
print('server b')
def do_it(): # Shia LeBeouf!
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, default=60025)
args = parser.parse_args()
exit(test_the_server(host=args.host, port=args.port))
if __name__ == "__main__":
do_it()
How would I convert this to use an asyncio loop? I'm pretty sure that I need to spawn the server asyncio loop in a thread, but so far it's just turned out blocking, and other questions on SO have failed to provide a solution (or have been outdated).
Here's an example of something that fails for me:
import asyncio
import argparse
import socket
import concurrent.futures
import threading
import socketserver
class EchoHandler(asyncio.Protocol):
def data_received(self, data):
print(f"Got this data: {data.decode()}")
async def run_server(*, server):
print('starting server')
server = await server
async with server:
print('start serving')
await server.start_serving()
print('waiting on close')
await server.wait_closed()
print('server coro closing')
def started_server(*, server):
print('server thread started')
asyncio.run(run_server(server=server))
print('server thread finished')
def run_client(*, host, port, server):
print('client started, attempting connection')
with socket.create_connection((host, port), timeout=0.5) as conn:
print('connected')
conn.send(b'hello werld')
print('closing server')
server.close()
print('cancelled')
async def fnord(reader, writer):
data = await reader.read(100)
message = data.decode()
print('got', message)
def test_the_server(*, host, port):
ex = concurrent.futures.ThreadPoolExecutor(max_workers=3)
print('server a')
quitter = threading.Event()
#server = socketserver.TCPServer((host, port), TCPHandler)
server = asyncio.start_server(fnord, host, port)
a = ex.submit(started_server, server=server)
b = ex.submit(run_client, host=host, port=port, server=server)
print(a.result(), b.result())
print('server b')
def do_it(): # Shia LeBeouf!
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("-p", "--port", type=int, default=60025)
args = parser.parse_args()
exit(test_the_server(host=args.host, port=args.port))
if __name__ == "__main__":
do_it()
I was hoping that https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.wait_closed would be enough that when I call server.close() on the other thread that it would shut down the server, but it doesn't appear to be the case. serve_forever behaves the same as the start_serving approach.
As I mentioned in my comments, I'm not exactly for sure what you're looking for, but I thought I'd come up with an example in which you are running both a TCP server and client in the same
asyncioevent loop. I augmented the examples forasynciostreams to make this example.This example doesn't make for a great script though, because the server will run forever until
ctrl + c. This example prints:Here's an example using tasks directly (note that
asyncio.gatherturns the coroutines it is passed into tasks) which allows you to cancel a task if you have a reference to it. Everything is the same except for the change tomainand the extra coroutine.