I want to use asyncio.start_server with SSL and client authentication. I can set it up correctly and it works (see example program), but when the client does not provide the correct certificate and the server refuses, I don't see any exception. I want to see the exception on both sides, client and server. Here is an admittedly long, but complete minimal runnable example:
#!/usr/bin/python
import asyncio
import ssl
from pathlib import Path
import sys
def assert_readable(path: Path, path_description) -> None:
""" no docstring """
try:
with path.open('rb'):
pass
except (IOError, PermissionError) as e:
raise IOError(f"{path_description} path is not a readable file. path='{str(path)}'") from e
def assert_readable_on_server_side(
own_cert_file: Path, own_privkey_file: Path, trusted_cert_file: Path):
""" no docstring """
_assert_readable_on_side("server", own_cert_file, own_privkey_file, trusted_cert_file)
def assert_readable_on_client_side(
own_cert_file: Path, own_privkey_file: Path, trusted_cert_file: Path):
""" no docstring """
_assert_readable_on_side("client", own_cert_file, own_privkey_file, trusted_cert_file)
def _assert_readable_on_side(
side: str, own_cert_file: Path, own_privkey_file: Path, trusted_cert_file: Path):
assert_readable(own_cert_file, f"{side} side own certificate")
assert_readable(own_privkey_file, f"{side} side private key")
assert_readable(trusted_cert_file, f"{side} side trusted certificate (=remote's certificate)")
def load_verify_locations(ssl_context: ssl.SSLContext, trusted_cert_file: Path) -> None:
""" no docstring """
try:
ssl_context.load_verify_locations(trusted_cert_file)
except Exception as e:
raise ssl.SSLError(
"error loading trusted certificate into the ssl context: " +
f"trusted_cert_file={str(trusted_cert_file)}") from e
def load_cert_chain(ssl_context: ssl.SSLContext, own_cert_file: Path, own_privkey_file: Path
) -> None:
""" no docstring """
try:
ssl_context.load_cert_chain(own_cert_file, own_privkey_file)
except Exception as e: # pylint: disable=broad-exception-caught
if "KEY_VALUES_MISMATCH" in str(e):
raise ssl.SSLError(
"ssl certificate and private key do not match: " +
f"own_cert_file={str(own_cert_file)} " +
f"own_privkey_file={str(own_privkey_file)}") from e
else:
raise ssl.SSLError(
"error loading own ssl certificate and private key: " +
f"own_cert_file={str(own_cert_file)}" +
f"own_privkey_file={str(own_privkey_file)}") from e
def asyncio_loop_exception_handler(loop, context):
""" no docstring """
print(asyncio_loop_exception_handler.__name__)
print(" ", loop)
print(" ", context)
def print_error_and_exit(error_msg):
""" no docstring """
print(f"error: {error_msg}")
print("usage examples:")
print(f" {sys.argv[0]} server server_certificate.crt server_private_key.key client_certificate.crt")
print(f" {sys.argv[0]} client client_certificate.crt client_private_key.key server_certificate.crt")
sys.exit(1)
async def handle_client(reader, writer):
""" no docstring """
print("client connected")
try:
writer.write(b"hello, this is a secure server. send 'quit' to close.\n")
await writer.drain()
while True:
data = await reader.read(100)
if not data:
print("read() didn't return any data")
break
data_decoded = data.decode()
if data_decoded.startswith('quit'):
print("quit received:", data_decoded)
break
print("rcvd, echoing:", data_decoded)
writer.write(data)
await writer.drain()
print("closing connection")
writer.close()
await writer.wait_closed()
except Exception as e: # pylint: disable=broad-exception-caught
print(f"exception caught in {handle_client.__name__}:", e)
async def server_main(own_cert_file: Path, own_privkey_file: Path, trusted_cert_file: Path):
""" no docstring """
print("loading credentials")
assert_readable_on_server_side(own_cert_file, own_privkey_file, trusted_cert_file)
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=own_cert_file, keyfile=own_privkey_file)
ssl_context.load_verify_locations(cafile=trusted_cert_file)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = False
print("setting exception handler")
loop = asyncio.get_running_loop()
loop.set_exception_handler(asyncio_loop_exception_handler)
print("starting server")
server = await asyncio.start_server(handle_client, 'localhost', 4711, ssl=ssl_context)
print("started")
async with server:
await server.serve_forever()
async def client_main(own_cert_file: Path, own_privkey_file: Path, trusted_cert_file: Path):
""" no docstring """
print("loading credentials")
assert_readable_on_client_side(own_cert_file, own_privkey_file, trusted_cert_file)
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
load_cert_chain(ssl_context, own_cert_file, own_privkey_file)
load_verify_locations(ssl_context, trusted_cert_file)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = False
print("setting exception handler")
loop = asyncio.get_running_loop()
loop.set_exception_handler(asyncio_loop_exception_handler)
print("connecting")
try:
reader, writer = await asyncio.open_connection('localhost', 4711, ssl=ssl_context)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"exception caught during await {asyncio.open_connection.__name__}()", e)
return
print("connected")
while True:
data = await reader.read(100)
print("rcvd:", data.decode())
message = input("send: ")
writer.write(message.encode())
await writer.drain()
if message == 'quit':
print('closing connection')
writer.close()
break
async def main():
""" no docstring """
if len(sys.argv) < 5:
print_error_and_exit("missing mandatory arguments")
side = sys.argv[1]
own_cert_file = Path(sys.argv[2])
own_privkey_file = Path(sys.argv[3])
trusted_cert_file = Path(sys.argv[4])
if side == "client":
await client_main(own_cert_file, own_privkey_file, trusted_cert_file)
elif side == "server":
await server_main(own_cert_file, own_privkey_file, trusted_cert_file)
else:
print_error_and_exit(f"expected 'client' or 'server' as first argument, but got: '{side}'")
try:
asyncio.run(main())
except KeyboardInterrupt as keyboard_interrupt:
print(type(keyboard_interrupt))
You can use this bash script to generate the required private keys and self signed certificates:
#!/bin/bash
generate_ssl_privkey_and_self_signed_cert() {
output_file_basename="$1"
owner="$2"
openssl req \
-newkey rsa:2048 \
-nodes \
-keyout "${output_file_basename}_private_key.key" \
-x509 \
-days 3660 \
-out "${output_file_basename}_certificate.crt" \
-subj "/CN=$owner"
}
generate_ssl_privkey_and_self_signed_cert "server" "server_hostname_or_ip_address"
generate_ssl_privkey_and_self_signed_cert "client" "client_hostname_or_ip_address"
In order to provoke a failing handshake, start the server with the wrong trusted certificate server_certificate.crt (instead of client_certificate.crt) as last parameter:
$ ./start_asyncio_side.py server server_certificate.crt server_private_key.key server_certificate.crt
loading credentials
setting exception handler
starting server
started
...
And then the client:
$ ./start_asyncio_side.py client client_certificate.crt client_private_key.key server_certificate.crt
loading credentials
setting exception handler
connecting
connected
rcvd:
send: dummy message
No exception is caught or printed, but also no connection takes place. For comparison, the server command for the working handshake is with trusting the client certificate, as follows (and the same client command as above):
$ ./start_asyncio_side.py server server_certificate.crt server_private_key.key client_certificate.crt
loading credentials
setting exception handler
starting server
started
client connected
rcvd, echoing: test
rcvd, echoing: test2
quit received: quit
closing connection
How to make the exception for the failed handshake getting printed?