Kafka is sending messages to only one partition on a topic. I have used KeyedSerializationSchema in the FlinkKafkaProducer09 and passing an attribute from the event stream which is to be used to perform the hash partition selection (As I want all events from a type of attribute to enter into a specific partition every time). When I post messages belonging to 50 different attribute types I see all of them going into the same partition. I was expecting some form of load balancing based on attribute based partition selection by kafka.
DataStream<JsonObject> myEvents = ....;
FlinkKafkaProducer09<JsonObject> myProducer = new FlinkKafkaProducer09<>(myTopic, new myImplementationOfKeyedSerializationSchema("attributeNameToUseForPartition"), kafkaproperties);
myEvents.addSink(myProducer).setParallelism(1).name("mySink");
....
class myImplementationOfKeyedSerializationSchema implements KeyedSerializationSchema<JsonObject>
{
public myImplementationOfKeyedSerializationSchema (String messageKey) {
this.messageKey = messageKey;
}
@Override
public byte[] serializeKey(JsonObject event) {
return event.get(messageKey).toString().getBytes();
}
@Override
public byte[] serializeValue(JsonObject event) {
return event.toString().getBytes();
}
@Override
public String getTargetTopic(JsonObject event) {
return null;
}
}
I am unable to figure out why partition selection is not happening. Flink Version : 1.1.4
I haven't dug into the code to see how Flink configures the Kafka producer, for the case where you don't provide an explicit partitioner. But it's often problematic to depend on default Kafka behavior when using Flink, as Flink overrides a lot of that.
At least for the newer
KafkaSink, you can specify the partitioner used by theKafkaRecordSerializationSchema, which you can set via theKafkaRecordSerializationSchemaBuilder.setPartitioner()method.