Kafka Connect MQTT Connector Source: Error Creating Source Connector with HiveMQ Testcontainer - URI Configuration Issue

39 Views Asked by At

I am attempting to consume data from HiveMQ to Kafka using Kafka Connect's MQTT Kafka Connector Source. I have successfully added the MQTT Kafka Connector plugin, and it appears to have loaded correctly, as I can see it listed in the Kafka Connect Connector Plugins URL as shown below.

[{"class":"io.confluent.connect.mqtt.MqttSinkConnector","type":"sink","version":"1.7.1"},{"class":"io.confluent.connect.mqtt.MqttSourceConnector","type":"source","version":"1.7.1"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.5.0-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.5.0-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.5.0-ccs"}]

However, when trying to create the connector using the URI provided by HiveMQ, the Connector Create REST API is throwing the below error.

Below is a relevant code snippet:

HTTP/1.1 400 Bad Request
Date: Sat, 27 Jan 2024 15:26:00 GMT
Content-Type: application/json
Content-Length: 323
Server: Jetty(9.4.51.v20230217)

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 1 error(s):\nUnable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

HIVEMQ_INTERNAL_PORT = "1883";

  network = Network.newNetwork();
  hivemqCe = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce").withTag("2021.3"))
                .withNetwork(network)
                .withNetworkAliases(HIVEMQ_NETWORK_ALIAS)
                .withExposedPorts(HIVEMQ_INTERNAL_PORT)
                .withLogLevel(Level.DEBUG);

Below is the code for getting HiveMQ uri with network alias:

private static String getHiveMqUri(){
        return format("tcp://%s:%s",HIVEMQ_NETWORK_ALIAS,hivemqCe.getMappedPort(HIVEMQ_INTERNAL_PORT));
    }

Below is the code for getting HiveMQ uri with container ip:

private static String getHiveMqUri(){
        return format("tcp://%s:%s",hivemqCe.getContainerIpAddress(),hivemqCe.getMappedPort(HIVEMQ_INTERNAL_PORT));
    }

Both the uri throws same error while creating the connector.

I attempted to create the connector using two different URIs—one with an alias and the other with the container IP. However, neither was recognized, resulting in the same 400 error from the connector creation API. I anticipated that using the network alias in the URI would successfully create the connector. I need assistance in confirming whether this URI is correct and resolving this issue.

Below is the post API config which I use for creating the connector,

 Map<String, Object> configMap = new HashMap<>();
        configMap.put("connector.class", "io.confluent.connect.mqtt.MqttSourceConnector");
        configMap.put("mqtt.server.uri", getHiveMqUri());
        configMap.put("mqtt.topics","mqtt-source-1");
        configMap.put("kafka.topic", "kafka-source-1");
        configMap.put("mqtt.qos", "2");
        configMap.put("confluent.topic.bootstrap.servers", getInternalKafkaBoostrapUrl());
        configMap.put("confluent.topic.replication.factor", "1");
0

There are 0 best solutions below