I am using kafka of version 2.2.1. I have a problem connecting to kafka with ssl using python.
I have already loaded my CA (which is generated from a trusted authority) into the keystore defined targeted by all kafka brokers:
keytool -import -trustcacerts -alias CARoot -file AC_PROD.txt -keystore keystore.jks
And then I also imported my certificate signed into the trust store targeted by kafka brokers and create a new alias:
keytool -import -alias acdatalake -file CERT_PROD.pem -storetype JKS -keystore truststore.jks
An then, I tried to connect to the brokers using python. You can find the code below:
bootstrap_server_num = int(os.getenv("BOOTSRAP_SERVER_NUM"))
bootstrap_servers = []
if bootstrap_server_num != None and bootstrap_server_num > 0:
for i in range(bootstrap_server_num):
name = "BOOTSRAP_SERVER_" + str(i + 1)
bootstrap_sev = os.getenv(name)
if bootstrap_sev != None:
bootstrap_servers.append(bootstrap_sev)
# You can consider bootstrap_servers as ["localhost:9094"] which the port is defined as 9094
caPath = os.getenv("CA_PATH")
certPath = os.getenv("CERT_PATH")
keyPath = os.getenv("KEY_PATH")
# CREATION SSL CONTEXT
ssl_context = ssl.create_default_context(cafile=caPath)
ssl_context.check_hostname=False
seclevel = os.getenv("SSL_SECLEVEL")
if seclevel == None:
seclevel = 'DEFAULT@SECLEVEL=1'
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_cert_chain(certPath, keyPath)
kafka_client = kafka.KafkaClient( # 1 (line 175 in source code) It trys to connect to kafka broker
security_protocol=security_protocol,
ssl_context=ssl_context,
ssl_password=ssl_password,
bootstrap_servers=bootstrap_servers,
)
admin_client = kafka.KafkaAdminClient(
security_protocol=security_protocol,
ssl_context=ssl_context,
ssl_password=ssl_password,
bootstrap_servers=bootstrap_servers,
)
And it is exactly this moment (# 1) where things go wrong:
Traceback (most recent call last):
File "app.py", line 175, in <module>
kafka_client = kafka.KafkaClient(
File "/home/python3_env/lib/python3.8/site-packages/kafka/client_async.py", line 244, in __init__
self.config["api_version"] = self.check_version(timeout=check_timeout)
File "/home/python3_env/lib/python3.8/site-packages/kafka/client_async.py", line 908, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config["bootstrap_topics_filter"]))
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 1266, in check_version
if not self.connect_blocking(timeout_at - time.time()):
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 352, in connect_blocking
self.connect() 2024-01-31T15:52:17.458709485+01:00 File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 455, in connect
if self._try_handshake():
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 535, in _try_handshake
self._sock.do_handshake()
File "/opt/bitnami/python/lib/python3.8/ssl.py", line 1309, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1131)
For the information, my python version is 3.8.16.
One more thing, all the CA is loaded also into the truststore following the advice of a experienced developper. But it still doesn't work. Thanks in advance for any possible help!
I signed my certificate and put it as well as CA to the right place in keystore/truststore pointed by kafka brokers. However, I still can't connect to kafka brokers using these certificates.