Using MirrorMaker2 on MSK Connect (2.7.1) the source connector is throwing the following error:
[Worker-0d8c5a576b5ef6e99] [2023-12-22 16:01:00,771] ERROR [msk-dev-conv-mm2-sourceconnector|task-1|offsets] WorkerSourceTask{id=msk-dev-conv-mm2-sourceconnector-1} Failed to flush, timed out while waiting for producer to flush outstanding 3 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:509)
[Worker-0d8c5a576b5ef6e99] [2023-12-22 16:01:00,771] ERROR [msk-dev-conv-mm2-sourceconnector|task-1|offsets] WorkerSourceTask{id=msk-dev-conv-mm2-sourceconnector-1} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)
Cluster settings:
allow.everyone.if.no.acl.found = false
auto.create.topics.enable = true
delete.topic.enable = true
log.cleaner.delete.retention.ms = 86400000
log.cleanup.policy = compact
log.retention.hours = -1
message.max.bytes = 5242940
min.insync.replicas = 2
unclean.leader.election.enable = false
Mirror Maker sourceconnector settings:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
errors.log.include.messages=false
replication.factor=3
source.cluster.ssl.truststore.location=${s3import:ca-central-1:msk-manual-msk-configurations/test.truststore.jks}
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsDebugCreds=true;
sync.topic.acls.enabled=false
tasks.max=16
source.cluster.alias=
sync.topic.configs.interval.seconds=20
target.cluster.security.protocol=SASL_SSL
replication.policy.separator=
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
refresh.groups.interval.seconds=20
refresh.topics.interval.seconds=20
offset-syncs.topic.replication.factor=3
ssl.protocol=TLS
consumer.group.id=mm2-dev
target.cluster.sasl.mechanism=AWS_MSK_IAM
topics=axis.*|ausmle-test
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
producer.enable.idempotence=true
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='123' password='123';
source.cluster.bootstrap.servers=10.0.0.1:19093,10.0.0.1:19094,10.0.0.1:19095
source.cluster.sasl.mechanism=SCRAM-SHA-512
target.cluster.alias=
target.cluster.bootstrap.servers=b-1.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098,b-2.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098,b-3.msk.123.c3.kafka.ca-central-1.amazonaws.com:9098
source.cluster.ssl.truststore.password=123
sync.topic.configs.enabled=true
source.cluster.security.protocol=SASL_SSL
source.cluster.ssl.endpoint.identification.algorithm=
The issue was due to attempting to mirror topics without keys when the target cluster had the cluster setting
log.cleanup.policy="compact"Despite setting
sync.topic.configs.enabled=truein the sourceConnector config, the source topic's cleanup policy is not synced to the target topic. So the target topic will be created with the cluster default cleanup policy of compact and when MirrorMaker attempts to flush the messages without keys it fails. This is probably related to https://issues.apache.org/jira/browse/KAFKA-9459Attempting to push a message without keys to a compacted topic using the commandline client yields the following error: