I've been the confluent-kafka[avro] (2.1.1) library to send the result of an AWS lambda to our kafka with an avro schema.
The serialisation of the schema is fine, except for the first key, "creation_time" which invariably is set to something like 1970-01-20T13:34:12.378Z when received by kafka (as visualised in AKHQ).
The timestamp of the message is fine as long as I let it set to default, If y try to use the same timestamp as in the schema the message is shown in the AKHQ as sent 58 years ago.
I have the problem in all our environments kafka's and I can reproduce it on my local dev env.
I tried to debug the code, but I can't get info after the serialisation, here's what I'm sure of:
Timestamp var content just before serialisation (float): 1690451888.45323
Time received on the AKHQ message: 1970-01-20T13:34:11.888Z
After conversion this time give 1686851 as timestamp.
I initially through it was somehow truncated before serialisation, but it doesn't looks like it.
Here's how I get my timestamp in the values:
timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()
values = {
"creationTime": timestamp,
"description": message,
"eventTypeId": self.config.metric_name,
"pgd": [],
"contracts": [],
"points": [],
"objects": [],
}
My kafka code
"""This module contains everything necessary to send messages to kafka"""
import logging
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from param_store_models import KafkaInfo
LOGGER = logging.getLogger(__name__)
class KafkaProducer:
"""Class used to send messages to kafka"""
def __init__(self, schema: str, kafka_info: KafkaInfo):
producer_ssm_conf = {
"bootstrap.servers": kafka_info.bootstrap_servers,
"security.protocol": kafka_info.security_protocol,
"sasl.mechanism": kafka_info.sasl_mecanism,
"sasl.username": kafka_info.sasl_username,
"sasl.password": kafka_info.sasl_password,
}
registry_ssm_conf = {"url": kafka_info.schema_registry_url}
serializer = AvroSerializer(
SchemaRegistryClient(registry_ssm_conf), schema, conf={"auto.register.schemas": False}
)
producer_default_conf = {
"value.serializer": serializer,
"key.serializer": StringSerializer(),
"enable.idempotence": "true",
"max.in.flight.requests.per.connection": 1,
"retries": 5,
"acks": "all",
"retry.backoff.ms": 500,
"queue.buffering.max.ms": 500,
"error_cb": self.delivery_report,
}
self.__serializing_producer = SerializingProducer({**producer_default_conf, **producer_ssm_conf})
def produce(self, topic: str, key=None, value=None, timestamp=0):
"""Asynchronously produce message to a topic"""
LOGGER.info(f"Produce message {value} to topic {topic}")
self.__serializing_producer.produce(topic, key, value, on_delivery=self.delivery_report, timestamp=timestamp)
def flush(self):
"""
Flush messages and trigger callbacks
:return: Number of messages still in queue.
"""
LOGGER.debug("Flushing messages to kafka")
return self.__serializing_producer.flush()
@staticmethod
def delivery_report(err, msg):
"""
Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
"""
if err:
LOGGER.error(f"Kafka message delivery failed: {err}")
else:
LOGGER.info(f"Kafka message delivered to {msg.topic()} [{msg.partition()}]")
My avro schema (partially redacted to hide the customer info)
{
"type": "record",
"name": "EventRecord",
"namespace": "com.event",
"doc": "Schéma d'un évènement de supervision brut",
"fields": [
{
"name": "creationTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "eventTypeId",
"type": "string"
},
{
"name": "internalProductId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "description",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "contracts",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "points",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "objects",
"type": {
"type": "array",
"items": "string"
}
}
]
}
Received message on the akhq topic
{
"creationTime": "1970-01-20T13:34:12.378Z",
"eventTypeId": "test",
"description": "Test",
"pgd": [],
"contracts": [],
"points": [],
"objects": []
}
Versions
confluent-kafka[avro]==2.1.1
kafka broker = latest
Operating system: Windows (Kafka stock on docker) / AWS ECS (Docker containers also)
Your
timestampvariable is created like so:timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()The
timestamp()function you call returns the number of seconds since epoch. However, you are using thetimestamp-millislogical type which expects milliseconds since epoch.So the easiest solution is just to multiple your
timestampby 1000.Another option is that most libraries support logical types so that you should be able to set the
creationTimeto the datetime object and the library will do the conversion to figure out how many milliseconds it should be for serialization.