Environment:
- DataStax Enterprise 6.8.42 (Cassandra)
- Luna Streaming (Apache Pulsar version 2.7.2)
Background: My project is to receive the changing data from Cassandra with CDC-agent to Pulsar, then perform the time-window function.
Problem Statement: I write the Java function to receive the KeyValue messages (Key and Value are AVRO) per the latest topic schema. However, after I reviewed the function log, the incoming messages came in bytes, as shown below.
Error log:
2024-03-05T12:53:18,038+0000 [public/default/demo_sumbyId-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/demo_sumbyId:0] Uncaught exception in Java Instance java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"Key schemas or Value schemas are different schema type, from key schema type is AVRO and to key schema is BYTES, from value schema is AVRO and to value schema is BYTES","reqId":1870435977707154767, "remote":"Censored", "local":"Censored"}
Latest Schema:
{
"version": 7,
"schemaInfo": {
"name": "data-exampleio.demo",
"schema": {
"key": {
"name": "demo",
"schema": {
"type": "record",
"name": "demo",
"namespace": "exampleio",
"doc": "Table exampleio.demo",
"fields": [
{
"name": "id",
"type": "int"
}
]
},
"type": "AVRO",
"timestamp": 0,
"properties": {}
},
"value": {
"name": "demo",
"schema": {
"type": "record",
"name": "demo",
"namespace": "exampleio",
"doc": "Table exampleio.demo",
"fields": [
{
"name": "amount",
"type": [
"null",
{
"type": "record",
"name": "cql_decimal",
"namespace": "",
"fields": [
{
"name": "bigint",
"type": "bytes"
},
{
"name": "scale",
"type": "int"
}
],
"logicalType": "cql_decimal"
}
]
},
{
"name": "context",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"timestamp": 0,
"properties": {}
}
},
"type": "KEY_VALUE",
"timestamp": 1709620937104,
"properties": {
"key.schema.name": "demo",
"key.schema.properties": "{}",
"key.schema.type": "AVRO",
"kv.encoding.type": "SEPARATED",
"value.schema.name": "demo",
"value.schema.properties": "{}",
"value.schema.type": "AVRO"
}
}
}
I tried to write the generic function (function<Object,Object>) that just returns an incoming message, but the error still persists. The interesting point is that I cannot receive proper messages without the --st auto_consume option, although the latest schema is registered.
with --st auto_consume:
key:[MA==], properties:[writetime=1709646681322536], content:{key={id=24}, value={amount={scale=2, bigint=java.nio.HeapByteBuffer[pos=0 lim=3 cap=3]}, context=exampleagain}}
without --st auto_consume:
key:[Mg==], properties:[writetime=1709647768862639], content:??exampleagain
do you have any clue or possible root cause for this issue?
Thank!