Unable to Insert Data in GreenPlum using GPKAFKA

89 Views Asked by At

I am facing issues with reading data from KAFKA using GPKAFKA and inserting the data into the GreenPlum database. My target table has multiple columns of different data types and the external table being created by GPKAFKA is a replicate of my target table, however, when pushing the data into the target table I am facing the error:

debug,rollback the batch 0 due to Failed to execute batch: pq: avro_import: only support single json column

The GreenPlum database version I am using is 6.19.3. The following are the details. How to resolve this issue?

KAFKA message in json format with avro schema:

{
    "key1": "try81",
    "col1": {
        "int": 1
    },
    "col2": {
        "string": "def"
    },
    "col3": {
        "string": "ghi"
    },
    "col4": {
        "string": "jkl"
    },
    "instance_id": {
        "string": "009"
    }
}

The DDL of the table:

CREATE TABLE
    testgpkafka
    (
        key1 CHARACTER VARYING(5) NOT NULL,
        col1 INTEGER,
        col2 CHARACTER VARYING(55),
        col3 CHARACTER VARYING(19),
        col4 CHARACTER VARYING(13),
        instance_id CHARACTER VARYING(15),
        PRIMARY KEY (key1)
    );

The gpkafka.yaml file:

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081   
   OUTPUT:
      SCHEMA: smi
      TABLE:  testgpkafka
      MODE:   insert

I have also tried with schema type json and I am facing error:

debug,rollback the batch 0 due to Failed to execute batch: pq: json_import: only support single json/jsonb/gp_jsonb column

The following was the kafka message:

{
    "key1": "tr159",
    "col1": 1,
    "col2": "def",
    "col3": "ghi",
    "col4": "jkl",
    "instance_id": "009"
}

I have also tried converting the KAFKA message into a single json column but it also did not work.

KAFKA message:

{
    "data": {
        "key1": "tr150",
        "col1": 1,
        "col2": "def",
        "col3": "ghi",
        "col4": "jkl",
        "instance_id": "009"
    }
}
1

There are 1 best solutions below

1
Nikita Koposov On

insert mode new feature. More stable is mapping

somthing like this

DATABASE: gpdb_dev
USER: --
PASSWORD: --
HOST: --
PORT: --
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: 192.168.151.201:9092, 192.168.151.202:9092, 192.168.151.203:9092
        TOPIC: mcp_kafka_net_21.mcp.testgpkafka
        PARTITIONS: (0)
    VALUE:
        COLUMNS:
          - NAME: jdata
            TYPE: gp_json # custom type from EXTENSION dataflow in GreenPlum or you can use type "json" but it crashed in null char "\0000"
      COLUMNS:
      FORMAT: avro
      AVRO_OPTION:
        SCHEMA_REGISTRY_ADDR: http://192.168.151.201:8081  
   OUTPUT:
      SCHEMA: smi
      TABLE: testgpkafka
      MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->'col1'->>'int')::integer
        - NAME: col2
          EXPRESSION: (jdata->'col2'->>'string')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->'col3'->>'string')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->'col4'->>'string')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->'col4'->>'string')::varchar(15)

or Without last transform like this

      MAPPING: 
        - NAME: key1
          EXPRESSION: (jdata->>'key1')::varchar(5)
        - NAME: col1
          EXPRESSION: (jdata->>'col1')::integer
        - NAME: col2
          EXPRESSION: (jdata->>'col2')::varchar(55) 
        - NAME: col3
          EXPRESSION: (jdata->>'col3')::varchar(19) 
        - NAME: col4
          EXPRESSION: (jdata->>'col4')::varchar(13) 
        - NAME: instance_id
          EXPRESSION: (jdata->>'col4')::varchar(15)

And you kafka message looks uncorrect for this avro schema