How To Run Kafka Camel Connectors On Amazon MSK

1k Views Asked by At

Context: I followed this link on setting up AWS MSK and testing a producer and consumer and it is setup and working correctly. I am able to send and receive messages via 2 separate EC2 instances that both use the same Kafka cluster (My MSK cluster). Now, I would like to establish a data pipeline all the way from Eventhubs to AWS Firehose which follows the form:

Azure Eventhub -> Eventhub-to-Kafka Camel Connector -> AWS MSK -> Kafka-to-Kinesis-Firehose Camel Connector -> AWS Kinesis Firehose

I was able to successfully do this without the use of MSK (via regular old Kafka) but for unstated reasons need to use MSK now and I can't get it working.

Problem: When trying to start the connectors between AWS MSK and the two Camel connectors I am using, I get the following error:

Bug

These are the two connectors in question:

  1. AWS Kinesis Firehose to Kafka Connector (Kafka -> Consumer)
  2. Azure Eventhubs to Kafka Connector (Producer -> Kafka)

Goal: Get these connectors to work with the MSK, like they did without it, when they were working directly with Kafka.

Here is the issue for Firehose:

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
com.amazonaws.services.kinesisfirehose.model.AmazonKinesisFirehoseException: The security token included in the request is invalid

Here is the one for Azure:

[2021-05-04 14:09:56,848] WARN Load balancing for event processor failed - If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.
If you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.
Please remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.
Status code 403, "<?xml version="1.0" encoding="utf-8"?><Error><Code>AuthorizationFailure</Code><Message>This request is not authorized to perform this operation.
Time:2021-05-04T14:09:56.7148317Z</Message></Error>" (com.azure.messaging.eventhubs.PartitionBasedLoadBalancer:344)
[2021-05-04 14:09:56,858] Error was received while reading the incoming data. The connection will be closed. (reactor.netty.channel.ChannelOperationsHandler:319)
java.lang.NoSuchMethodError: org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createExchange(Z)Lorg/apache/camel/Exchange;
        at org.apache.camel.component.azure.eventhubs.EventHubsConsumer.createAzureEventHubExchange(EventHubsConsumer.java:93)
2

There are 2 best solutions below

6
OneCricketeer On BEST ANSWER

MSK doesn't offer Kafka Connect as a service. You'll need to install this on your own computer, or on other AWS compute resources. From there, you need to install the Camel connector plugins

0
floating_hammer On

Kafka Connect is a framework which works with Kafka(MSK, open source or any other kafka distribution). However, it does not come with any connectors. Kafka Connect is bundled along with with open source kafka.

As a best practice never run kafka connect on the same servers as your broker nodes. Because they share binaries. Tuning a broker can cause unintended issues on kafka brokers. Also, Kafka Connect applications are applications and you donot run your kafka consumer or producer applications on the same nodes. So create an EC2 instance(s) and deploy kafka connect there.

Coming to TLS - If you are enabling client side TLS authentication - you need to look for boostrap_broker_tls.