Testing GCP PubSub with Testcontainer PubSub emulator

1.2k Views Asked by At

I'm trying to create an integration test with PubSub emulator based on the example from this GitHub repo which looks like

@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
public class PubSubIntegrationTests {
  private static final String PROJECT_ID = "test-project";

  @Container
  private static final PubSubEmulatorContainer pubsubEmulator =
      new PubSubEmulatorContainer(
          DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators"));

  @DynamicPropertySource
  static void emulatorProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint);
  }

  @BeforeAll
  static void setup() throws Exception {
    ManagedChannel channel =
        ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint())
            .usePlaintext()
            .build();
    TransportChannelProvider channelProvider =
        FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

    TopicAdminClient topicAdminClient =
        TopicAdminClient.create(
            TopicAdminSettings.newBuilder()
                .setCredentialsProvider(NoCredentialsProvider.create())
                .setTransportChannelProvider(channelProvider)
                .build());

    SubscriptionAdminClient subscriptionAdminClient =
        SubscriptionAdminClient.create(
            SubscriptionAdminSettings.newBuilder()
                .setTransportChannelProvider(channelProvider)
                .setCredentialsProvider(NoCredentialsProvider.create())
                .build());

    PubSubAdmin admin =
        new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient);

    admin.createTopic("test-topic");
    admin.createSubscription("test-subscription", "test-topic");

    admin.close();
    channel.shutdown();
  }

  // By default, autoconfiguration will initialize application default credentials.
  // For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider.
  @TestConfiguration
  static class PubSubEmulatorConfiguration {
    @Bean
    CredentialsProvider googleCredentials() {
      return NoCredentialsProvider.create();
    }
  }

  @Autowired PubSubSender sender;

  @Autowired PubSubSubscriberTemplate subscriberTemplate;
  @Autowired PubSubPublisherTemplate publisherTemplate;

  @Test
  void testSend() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = sender.send("hello!");

    List<AcknowledgeablePubsubMessage> msgs =
        await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

    assertEquals(1, msgs.size());
    assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
    assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

    for (AcknowledgeablePubsubMessage msg : msgs) {
      msg.ack();
    }
  }

  @Test
  void testWorker() throws ExecutionException, InterruptedException {
    ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!");

    List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>());
    PubSubWorker worker =
        new PubSubWorker(
            "test-subscription",
            subscriberTemplate,
            (msg) -> {
              messages.add(msg);
            });
    worker.start();

    await().until(() -> messages, not(empty()));
    assertEquals(1, messages.size());
    assertEquals(future.get(), messages.get(0).getMessageId());
    assertEquals("hi!", messages.get(0).getData().toStringUtf8());

    worker.stop();
  }

  @AfterEach
  void teardown() {
    // Drain any messages that are still in the subscription so that they don't interfere with
    // subsequent tests.
    await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0));
  }
}

all works fine for the above example but when I want to test my implementation as follows

    @Autowired
    private FunctionCatalog catalog;

    @Test
    void testSendB() throws ExecutionException, InterruptedException {
        Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION);
        var pubSubMessage = new PubSubMessage();
        pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes()));
        function.accept(pubSubMessage);

        List<AcknowledgeablePubsubMessage> msgs =
                await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty()));

        assertEquals(1, msgs.size());
        assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId());
        assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8());

        for (AcknowledgeablePubsubMessage msg : msgs) {
            msg.ack();
        }
    }

it will throw error:

java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).

where my service implementation uses Publisher instead of PubSubPublisherTemplate from the example:

    private final Publisher publisher;

    public void publishMessage(String message) {
        var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
        var pubsubApiMessage = getPubsubApiMessage(byteStr);

        try {
            publish(pubsubApiMessage);
        } catch (Exception e) {
            log.error("Error during event publishing: " + e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    private void publish(PubsubMessage pubsubApiMessage) throws Exception {
        publisher.publish(pubsubApiMessage).get();
    }

    private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
        return PubsubMessage.newBuilder()
                            .setData(byteStr)
                            .build();
    }

and works fine when deployed to GCP but not in this case of integration test using PubSub emulator.

1

There are 1 best solutions below

0
JackTheKnife On BEST ANSWER

It came up that the PubSub emulator requires its own test publisher which can be created as a bean in configuration. Example:

@Configuration
public class PubSubConfig {
   @Value("${gcp.pubsub.topic.name}")
   private String topicName;

   @Value("${gcp.project.id}")
   private String projected;
   
   @Value("${spring.cloud.gcp.pubsub.emulator-host}")
   private String host;
   
   private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create();

   @Bean
   public SubscriberStub testSubscriber(
         FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder()
                                                             .setTransportChannelProvider(fixedTransportChannelProvider)
                                                             .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                             .build());
   }

   @Primary
   @Bean
   public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName))
                      .setChannelProvider(fixedTransportChannelProvider)
                      .setCredentialsProvider(NoCredentialsProvider.create())
                      .build();
   }

   @Bean
   public TopicAdminClient getTopicAdminClient(
         FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return TopicAdminClient.create(TopicAdminSettings.newBuilder()
                                                       .setTransportChannelProvider(fixedTransportChannelProvider)
                                                       .setCredentialsProvider(CREDENTIALS_PROVIDER)
                                                       .build());
   }

   @Primary
   @Bean
   public FixedTransportChannelProvider getChannelProvider() {
      var channel = ManagedChannelBuilder.forTarget(host)
                                         .usePlaintext()
                                         .build();
      return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
   }

   @Bean
   public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException {
      return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder()
                                                                     .setCredentialsProvider(
                                                                           NoCredentialsProvider.create())
                                                                     .setTransportChannelProvider(
                                                                           fixedTransportChannelProvider)
                                                                     .build());
   }

}