I'm trying to create an integration test with PubSub emulator based on the example from this GitHub repo which looks like
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
private static final String PROJECT_ID = "test-project";
@Container
private static final PubSubEmulatorContainer pubsubEmulator =
new PubSubEmulatorContainer(
DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));
@DynamicPropertySource
static void emulatorProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
}
@BeforeAll
static void setup() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
.usePlaintext()
.build();
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
TopicAdminClient topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider)
.build());
SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build());
PubSubAdmin admin =
new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);
admin.createTopic("test-topic");
admin.createSubscription("test-subscription", "test-topic");
admin.close();
channel.shutdown();
}
// By default, autoconfiguration will initialize application default credentials.
// For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
@TestConfiguration
static class PubSubEmulatorConfiguration {
@Bean
CredentialsProvider googleCredentials() {
return NoCredentialsProvider.create();
}
}
@Autowired PubSubSender sender;
@Autowired PubSubSubscriberTemplate subscriberTemplate;
@Autowired PubSubPublisherTemplate publisherTemplate;
@Test
void testSend() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = sender.send("hello!");
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
@Test
void testWorker() throws ExecutionException, InterruptedException {
ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");
List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
PubSubWorker worker =
new PubSubWorker(
"test-subscription",
subscriberTemplate,
(msg) -> {
messages.add(msg);
});
worker.start();
await().until(() -> messages, not(empty()));
assertEquals(1, messages.size());
assertEquals(future.get(), messages.get(0).getMessageId());
assertEquals("hi!", messages.get(0).getData().toStringUtf8());
worker.stop();
}
@AfterEach
void teardown() {
// Drain any messages that are still in the subscription so that they don't interfere with
// subsequent tests.
await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
}
}
all works fine for the above example but when I want to test my implementation as follows
@Autowired
private FunctionCatalog catalog;
@Test
void testSendB() throws ExecutionException, InterruptedException {
Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
var pubSubMessage = new PubSubMessage();
pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
function.accept(pubSubMessage);
List<AcknowledgeablePubsubMessage> msgs =
await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));
assertEquals(1, msgs.size());
assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());
for (AcknowledgeablePubsubMessage msg : msgs) {
msg.ack();
}
}
it will throw error:
java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).
where my service implementation uses Publisher instead of PubSubPublisherTemplate from the example:
private final Publisher publisher;
public void publishMessage(String message) {
var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
var pubsubApiMessage = getPubsubApiMessage(byteStr);
try {
publish(pubsubApiMessage);
} catch (Exception e) {
log.error("Error during event publishing: " + e.getMessage(), e);
throw new RuntimeException(e);
}
}
private void publish(PubsubMessage pubsubApiMessage) throws Exception {
publisher.publish(pubsubApiMessage).get();
}
private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
return PubsubMessage.newBuilder()
.setData(byteStr)
.build();
}
and works fine when deployed to GCP but not in this case of integration test using PubSub emulator.
It came up that the PubSub emulator requires its own test publisher which can be created as a bean in configuration. Example: