Timestamp of current time change to 1970-01-20 after serialisation by Serialising Producer

370 Views Asked by At

I've been the confluent-kafka[avro] (2.1.1) library to send the result of an AWS lambda to our kafka with an avro schema.

The serialisation of the schema is fine, except for the first key, "creation_time" which invariably is set to something like 1970-01-20T13:34:12.378Z when received by kafka (as visualised in AKHQ). The timestamp of the message is fine as long as I let it set to default, If y try to use the same timestamp as in the schema the message is shown in the AKHQ as sent 58 years ago.

I have the problem in all our environments kafka's and I can reproduce it on my local dev env.

I tried to debug the code, but I can't get info after the serialisation, here's what I'm sure of: Timestamp var content just before serialisation (float): 1690451888.45323 Time received on the AKHQ message: 1970-01-20T13:34:11.888Z

After conversion this time give 1686851 as timestamp. I initially through it was somehow truncated before serialisation, but it doesn't looks like it.

Here's how I get my timestamp in the values:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()
values = {
    "creationTime": timestamp,
    "description": message,
    "eventTypeId": self.config.metric_name,
    "pgd": [],
    "contracts": [],
    "points": [],
    "objects": [],
}

My kafka code

"""This module contains everything necessary to send messages to kafka"""
import logging

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer

from param_store_models import KafkaInfo

LOGGER = logging.getLogger(__name__)


class KafkaProducer:
    """Class used to send messages to kafka"""

    def __init__(self, schema: str, kafka_info: KafkaInfo):
        producer_ssm_conf = {
            "bootstrap.servers": kafka_info.bootstrap_servers,
            "security.protocol": kafka_info.security_protocol,
            "sasl.mechanism": kafka_info.sasl_mecanism,
            "sasl.username": kafka_info.sasl_username,
            "sasl.password": kafka_info.sasl_password,
        }
        registry_ssm_conf = {"url": kafka_info.schema_registry_url}

        serializer = AvroSerializer(
            SchemaRegistryClient(registry_ssm_conf), schema, conf={"auto.register.schemas": False}
        )

        producer_default_conf = {
            "value.serializer": serializer,
            "key.serializer": StringSerializer(),
            "enable.idempotence": "true",
            "max.in.flight.requests.per.connection": 1,
            "retries": 5,
            "acks": "all",
            "retry.backoff.ms": 500,
            "queue.buffering.max.ms": 500,
            "error_cb": self.delivery_report,
        }

        self.__serializing_producer = SerializingProducer({**producer_default_conf, **producer_ssm_conf})

    def produce(self, topic: str, key=None, value=None, timestamp=0):
        """Asynchronously produce message to a topic"""
        LOGGER.info(f"Produce message {value} to topic {topic}")
        self.__serializing_producer.produce(topic, key, value, on_delivery=self.delivery_report, timestamp=timestamp)

    def flush(self):
        """
        Flush messages and trigger callbacks
        :return: Number of messages still in queue.
        """
        LOGGER.debug("Flushing messages to kafka")
        return self.__serializing_producer.flush()

    @staticmethod
    def delivery_report(err, msg):
        """
        Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush().
        """
        if err:
            LOGGER.error(f"Kafka message delivery failed: {err}")
        else:
            LOGGER.info(f"Kafka message delivered to {msg.topic()} [{msg.partition()}]")

My avro schema (partially redacted to hide the customer info)

{
  "type": "record",
  "name": "EventRecord",
  "namespace": "com.event",
  "doc": "Schéma d'un évènement de supervision brut",
  "fields": [
    {
      "name": "creationTime",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "eventTypeId",
      "type": "string"
    },
    {
      "name": "internalProductId",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "description",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "contracts",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "points",
      "type": {
        "type": "array",
        "items": "string"
      }
    },
    {
      "name": "objects",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

Received message on the akhq topic

{
  "creationTime": "1970-01-20T13:34:12.378Z",
  "eventTypeId": "test",
  "description": "Test",
  "pgd": [],
  "contracts": [],
  "points": [],
  "objects": []
}

Versions

confluent-kafka[avro]==2.1.1

kafka broker = latest

Operating system: Windows (Kafka stock on docker) / AWS ECS (Docker containers also)

1

There are 1 best solutions below

1
Scott On BEST ANSWER

Your timestamp variable is created like so:

timestamp = datetime.now(tz=tz.gettz(self.config.timezone)).timestamp()

The timestamp() function you call returns the number of seconds since epoch. However, you are using the timestamp-millis logical type which expects milliseconds since epoch.

So the easiest solution is just to multiple your timestamp by 1000.

Another option is that most libraries support logical types so that you should be able to set the creationTime to the datetime object and the library will do the conversion to figure out how many milliseconds it should be for serialization.