Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist

390 Views Asked by At

I'm facing below exception when i try to subscribe to gcloud pubsub emulator from springboot application.

com.google.api.gax.rpc.NotFoundException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Subscription does not exist (resource=pullSubscriber)
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90) ~[gax-2.32.0.jar:2.32.0]
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41) ~[gax-2.32.0.jar:2.32.0]
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1.onFailure(StreamingSubscriberConnection.java:339) ~[google-cloud-pubsub-1.124.1.jar:1.124.1]
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84) ~[api-common-2.15.0.jar:2.15.0]
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1127) ~[guava-32.1.2-jre.jar:na]
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-32.1.2-jre.jar:na]
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1286) ~[guava-32.1.2-jre.jar:na]
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1055) ~[guava-32.1.2-jre.jar:na]
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:807) ~[guava-32.1.2-jre.jar:na]
        at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92) ~[api-common-2.15.0.jar:2.15.0]
        at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74) ~[api-common-2.15.0.jar:2.15.0]
        at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:51) ~[api-common-2.15.0.jar:2.15.0]
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:261) ~[google-cloud-pubsub-1.124.1.jar:1.124.1]
        at com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:104) ~[gax-2.32.0.jar:2.32.0]
        at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84) ~[gax-grpc-2.32.0.jar:2.32.0]
        at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84) ~[gax-2.32.0.jar:2.32.0]
        at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148) ~[gax-grpc-2.32.0.jar:2.32.0]
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) ~[grpc-core-1.56.1.jar:1.56.1]
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) ~[grpc-core-1.56.1.jar:1.56.1]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) ~[grpc-core-1.56.1.jar:1.56.1]
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) ~[grpc-core-1.56.1.jar:1.56.1]
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.56.1.jar:1.56.1]
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[grpc-core-1.56.1.jar:1.56.1]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Below is my code where i'm trying to subscribe using subscriber name.


@SpringBootApplication
@EnableScheduling
@EnableCaching
public class CodenameHere {

    @Autowired
    PubSubTemplate pubSubTemplate;

    public static void main(String[] args) {
        SpringApplication.run(CodenameHere.class, args);
    }

    @Bean
    public MessageChannel inputMessageChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
            new PubSubInboundChannelAdapter(pubSubTemplate, "pullSubscriber");
        adapter.setOutputChannel(messageChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(String.class);
        return adapter;
    }

    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        System.out.println("Message arrived via an inbound channel adapter from pullSubscriber! Payload: " + payload);
        message.ack();
    }
}

Below is my application.properties

spring.cloud.gcp.core.enabled=true
spring.cloud.gcp.project-id=my-project

spring.cloud.gcp.pubsub.enabled=true
spring.cloud.gcp.pubsub.project-id=my-project
spring.cloud.gcp.pubsub.emulator-host=localhost:8085

I've started the emulator and created the topic and subscription and able to publish and subscribe using the same topic and subscriber name on python-pubsub sample (https://github.com/googleapis/python-pubsub).

I wonder what i'm missing?

0

There are 0 best solutions below