I am working on a JUnit test involving a Spring KafkaTemplate sending a message to a sole Kafka broker, and I want to simulate a network cut upon sending an ACK from the broker to the producer just to get it to retry the delivery. I am trying Toxiproxy, but it seems that it is only able to simulate network issues (latency, network cutoffs) as a whole (meaning: with no regard as to the signal type), but my aim is to work only at ACK signal level, and allowing all other signals to pass. I wonder if Toxiproxy provides a workaround to achieve this or, otherwise, what other sniffing tools can help me.
See code excerpt attached:
@Container
static ToxiproxyContainer toxiproxy =
new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.7.0").withNetwork(network);
@Container
static GenericContainer zookeeper = new GenericContainer(DockerImageName.parse("zookeeper:3.8.0")) // "confluentinc/cp-zookeeper:latest"
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));
@Container
public static KafkaContainer kafka = new ToxicKafkaWithExternalZookeeperContainer(
"confluentinc/cp-kafka:7.4.0", "broker-1", 9092, 29092)
.withAdditionalListener(() -> String.format("%s:%s", toxiproxy.getHost(), toxiproxy.getMappedPort(8666)))
.withNetwork(network)
.withNetworkAliases("kafka")
.dependsOn(toxiproxy, zookeeper)
.withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
.withEnv("KAFKA_BROKER_ID", "1")
.withEnv("KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT_MS", "1000")
.withEnv("KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS", "5000");
@DynamicPropertySource
static void registerDynamicProperties(DynamicPropertyRegistry registry) throws Exception {
toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
proxyKafka1 = toxiproxyClient.createProxy(
"kafka",
"0.0.0.0:8666",
"kafka:19092");
registry.add("spring.kafka.bootstrap-servers", () -> "PLAINTEXT://%s:%d".formatted(toxiproxy.getHost(), toxiproxy.getMappedPort(8666)));
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
}
@Test
void whenSendingEventAndNotReceivingACK_RetryAndDuplicateMessageEntails() throws Exception {
// Given
var request = PublishEventRequestMother.createEventRequest();
Consumer<String, Envelope<?>> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singleton(topicName));
// When
proxyKafka1.toxics().bandwidth("CUT_CONNECTION_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
// proxyKafka1.toxics().latency("CUT_CONNECTION_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
kafkaTemplate.send(topicName, envelopeMapper.map(request, (Map<String, Object>) request.getResources())).get(Long.parseLong(timeOutInSeconds), TimeUnit.SECONDS);
// kafkaTemplate.send(topicName, envelopeMapper.map(request, (Map<String, Object>) request.getResources()));
// Then
ConsumerRecords<String, Envelope<?>> records = KafkaTestUtils.getRecords(consumer);
assertEquals(2, records.count());
records.forEach(singleRecord -> then(singleRecord.value()).extracting("payload").isEqualTo(request.getPayload()));
proxyKafka1.toxics().get("CUT_CONNECTION_DOWNSTREAM").remove();
}