ksqldb with Apicurio - problem getting schema

18 Views Asked by At

I am trying to create a streams in ksqlDB from Kafka topics serialized with Avro. I am using Apicurio to handle the schemas. The topics are created with Debezium, which is monitoring changes in a MySQL database.

In my tests, I have managed to create the topics but when I try to create a stream using

CREATE STREAM customers (id int, first_name varchar, last_name VARCHAR, email VARCHAR) WITH (KAFKA_TOPIC='dbserver1.inventory.customers',VALUE_FORMAT='AVRO');

then I get the following error: Could not get latest schema for subject dbserver1.inventory.customers-value Caused by: Failed to discover artifact type from content.; error code: 0

I've created my stack with two docker-compose files:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    networks:
      - etl-streaming-network

  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
    networks:
      - etl-streaming-network

  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    networks:
      - etl-streaming-network

  apicurio:
    image: apicurio/apicurio-registry-mem:2.2.5.Final
    ports:
     - 8080:8080
    networks:
      - etl-streaming-network

  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
     - apicurio
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - ENABLE_APICURIO_CONVERTERS=true
    networks:
      - etl-streaming-network

  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:9092
    links:
      - kafka
    networks:
      - etl-streaming-network
      
     
networks:
  etl-streaming-network:
    driver: bridge

and

version: '2'
services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
      - apicurio
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_HOST_NAME: ksql-server
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://apicurio:8080/apis/ccompat/v6
    networks:
      - etl-streaming-network
    restart: unless-stopped

 
  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
    restart: unless-stopped
    networks:
      - etl-streaming-network

networks:
  etl-streaming-network:
    driver: bridge

The configuration of the Debezium connector is as follows:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "topic.prefix": "dbserver1",
        "database.include.list": "inventory",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://apicurio:8080/apis/registry/v2",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true"
    }
}

I have very little experience with Kafka, so any help would be appreciated.

0

There are 0 best solutions below