Please note: I added two answers below but at least in my opinion these answers are not "the whole story". Other answers are invited.
My knowledge level of Kafka is such that I understand how to build stateless microservices which operate using the consumer-producer transactions API. However I do not understand how to extend this to stateful services.
To keep this question manageable, let us assume the question is limited to classes of services which are singletons, meaning that only one copy of a service process can be run at any one time.
Kafka consumers can operate as part of a consumer group, where multiple processes, each with their own consumer, process events from 1 or more topics. Let's exclude this class of problems from the question to keep matters simple.
In order to make the question more easily comprehendible, it would make sense to define a toy example problem. So let's do that -
Toy Example Problem
Events need to be consumed from a topic "input" and sent to a topic "output".
The input topic contains two types of events. "A" and "B". Each event has an associated timestamp and value.
This is what an example event of type "A" might look like:
{
"type": "A",
"timestamp": "2024-02-09 21:00:00",
"value": 1,
}
Our objective is to pair up events of type "A" and "B" and calculate the sum of the value fields.
Events of type "A" and "B" are randomized, meaning that there is no pre-determined ordering as to whether an event of type "A" will arrive before or after the corresponding* event of type "B". (*) Corresponding = event with the same timestamp.
Some events may be missing for a particular timestamp, meaning that for some timestamp values, only one, or possibly even no events, might be seen for that timestamp value.
Additionally, timestamps may be out of order by some small amount. (Maybe a few minutes maximum.)
This is a good example problem because events will be read individually, meaning that "A" and "B" arrive separately. This requires that the process maintain some state in memory. The fact that some events for some timestamps may be missing, and that the timestamps are not ordered but may be slightly randomized means that a potentially quite large amount of state must potentially be maintained in memory.
We can prevent this amount of state from becoming infinite by imposing an additional constraint. Events older than 5 minutes old will expire, meaning if an event of type "A" arrives, we wait for up to 5 minutes for "B" to arrive before discarding "A".
This leads to a complexity in answering the question about "how to write commit logic".
I don't actually know how to do it, but can offer some initial suggestions.
Suggestion 1:
- A possible approach might be to maintain state in another storage such as on disk or in MongoDB
- It is difficult to guarantee exactly once processing when interfacing to some other type of system. I am not sure it can be done in general
- It might be better to try and leverage Kafka to help solve the problem.
Suggestion 2:
We could add another topic "state" to store the state of our service. The typical consume-produce loop would look like this:
Start transaction
Consume state from "input" topic
Update internal state in memory
Produce events to "output" topic
Produce event to "state" topic which is a copy of the remaining internal state in memory
Commit and end transaction
The problem with this approach is a large amount of data might potentially be sent to the "state" topic. It might become a very slow service if large volumes of mostly repeated data has to be sent over the network.
- It could perhaps be made more efficient by consuming and processing (for example) 100 events at a time, although this would increase input to output latency.
- The transaction would span 100 events and the state topic would only be contain 1% of the volume of data compared to processing events individually.
This architecture still has problems as the startup logic requires reading from the "state" topic until the final "state" is read. This is essentially a "seek" operation to the end of the topic. I am not sure that operation can be performed consistently. (It might be the case that the only way to do it is by checking the returned message offsets, and querying the topic for the greatest offset before starting to read messages from it.) This is not a particularly good design and does not fit nicely with how Kafka is intended to be used.
I am also not sure how commit logic should work when including the "state" topic. Perhaps a "commit" can be performed as soon as a new "state" is produced to this topic.
Alternatively, we could build a "following consumer" architecture, whereby the process has a producer and a consumer for the state topic. The purpose of the consumer is for no other reason than to "read-back" produced events and perform commit operations. This becomes a messy and complicated architecture, which it would be easy to break.
Summary
In summary, I am not sure how to, or if it is possible to, write a stateful service which performs exactly-once processing with Kafka and the transactions API, even in the restricted case where that service is a singleton. (Only a single copy of it running at once.)
I believe the second suggestion will work - but it is very "un-Kafkaesque". It does not fit nicely with the usual way services are constructed around Kafka. Typically services read from one or more topics, produce data and then commit a transaction.
With the inclusion of a "state" topic things become weird because:
- We continually send out new data which make previous data on the topic obsolete. We only ever care about the last data on the topic, and yet we maintain a stream of data, because this is what a Kafka topic is.
- It is not clear how to commit efficiently in a way which does not lead to slow startup times caused by fetching out of date state data from the state topic.
This is not really a solid answer but I didn't want to add yet more lines of text into the question.
I wonder whether I am thinking about Stateful Services the wrong way.
There doesn't seem to me to be an obvious way of using Kafka in an "elegant" way to persist the aggregated internal state of a process in general. Suggestion 2 from above is as close as I have managed to get so far.
I have been trying to think about Stateful Services as things which accumulate data in memory and try to avoid putting it somewhere else like in a database, writing to disk or sending big blocks of aggregated data back to Kafka.
I am starting to think that perhaps the only sensible conclusion is that if processes maintain large amounts of state in memory then they also need to maintain that state in a resiliant data store in case the process stops running and the data in memory evaporates.
This shifted my thinking towards thinking about how to consume a single event from Kafka and update the persisted copy of the aggregated in memory data, rather than trying to commit to Kafka in some clever way to avoid requiring an on disk/in database copy of the process data.
So perhaps rather than worrying about transactions I should be instead thinking about how to
This way, even if the process dies before comitting it has a way of performing de-duplication of the previously read data if required.
Then on the producer side things become more tricky. Perhaps the transaction has to extend to producing as well.
I hope this makes some sense, it's nearly midnight in my TZ.