Unable to connect to kafka broker even after the certificate is imported

56 Views Asked by At

I am using kafka of version 2.2.1. I have a problem connecting to kafka with ssl using python.

I have already loaded my CA (which is generated from a trusted authority) into the keystore defined targeted by all kafka brokers:

keytool -import -trustcacerts -alias CARoot -file AC_PROD.txt -keystore keystore.jks

And then I also imported my certificate signed into the trust store targeted by kafka brokers and create a new alias:

keytool -import -alias acdatalake -file CERT_PROD.pem -storetype JKS -keystore truststore.jks

An then, I tried to connect to the brokers using python. You can find the code below:

bootstrap_server_num = int(os.getenv("BOOTSRAP_SERVER_NUM"))
bootstrap_servers = []
if bootstrap_server_num != None and bootstrap_server_num > 0:
    for i in range(bootstrap_server_num):
        name = "BOOTSRAP_SERVER_" + str(i + 1)
        bootstrap_sev = os.getenv(name)
        if bootstrap_sev != None:
            bootstrap_servers.append(bootstrap_sev)

# You can consider bootstrap_servers as ["localhost:9094"] which the port is defined as 9094

caPath = os.getenv("CA_PATH")
certPath = os.getenv("CERT_PATH")
keyPath = os.getenv("KEY_PATH")

# CREATION SSL CONTEXT
ssl_context = ssl.create_default_context(cafile=caPath)
ssl_context.check_hostname=False

seclevel = os.getenv("SSL_SECLEVEL")
if seclevel == None:
    seclevel = 'DEFAULT@SECLEVEL=1'


ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_cert_chain(certPath, keyPath) 

kafka_client = kafka.KafkaClient(         # 1 (line 175 in source code) It trys to connect to kafka broker
    security_protocol=security_protocol,
    ssl_context=ssl_context,
    ssl_password=ssl_password,
    bootstrap_servers=bootstrap_servers,
)

admin_client = kafka.KafkaAdminClient(
    security_protocol=security_protocol,
    ssl_context=ssl_context,
    ssl_password=ssl_password,
    bootstrap_servers=bootstrap_servers,
)

And it is exactly this moment (# 1) where things go wrong:

Traceback (most recent call last):
File "app.py", line 175, in <module> 
kafka_client = kafka.KafkaClient( 
File "/home/python3_env/lib/python3.8/site-packages/kafka/client_async.py", line 244, in __init__
self.config["api_version"] = self.check_version(timeout=check_timeout)
File "/home/python3_env/lib/python3.8/site-packages/kafka/client_async.py", line 908, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config["bootstrap_topics_filter"]))
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 1266, in check_version

if not self.connect_blocking(timeout_at - time.time()):
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 352, in connect_blocking
self.connect() 2024-01-31T15:52:17.458709485+01:00 File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 455, in connect
if self._try_handshake():
File "/home/python3_env/lib/python3.8/site-packages/kafka/conn.py", line 535, in _try_handshake
self._sock.do_handshake()
File "/opt/bitnami/python/lib/python3.8/ssl.py", line 1309, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1131)

For the information, my python version is 3.8.16.

One more thing, all the CA is loaded also into the truststore following the advice of a experienced developper. But it still doesn't work. Thanks in advance for any possible help!

I signed my certificate and put it as well as CA to the right place in keystore/truststore pointed by kafka brokers. However, I still can't connect to kafka brokers using these certificates.

0

There are 0 best solutions below