Pubsub emulator publishing multiple messages instead of one

395 Views Asked by At

I have a weird issue happening on the publisher side and can't able to resolve it. Let me give a brief background of what I am doing.

I have a service written in go, in order to do some operations this service is creating some K8 job which will spin up a python pod to the process and publish back the results.

I log the future.result() that is returned from publish() function in pubsub_v1 library. And I can see logs like this. To keep it simple I am configuring stuff to send us 6 commands only.

Published msg: 4
Published msg: 7
Published msg: 10
Published msg: 13
Published msg: 16
Published msg: 19

As can be seen above we have 3 differences in each msgId for published commands.

For the go service, I put logs to give us msgId and publish time and I am seeing this logs.

Consumed msg_id: 4, publish_time: 2023-08-10 13:29:02.365 +0000 UTC
Consumed msg_id: 5, publish_time: 2023-08-10 13:29:02.638 +0000 UTC
Consumed msg_id: 7, publish_time: 2023-08-10 13:29:03.167 +0000 UTC
Consumed msg_id: 8, publish_time: 2023-08-10 13:29:03.312 +0000 UTC
Consumed msg_id: 10, publish_time: 2023-08-10 13:29:03.946 +0000 UTC
Consumed msg_id: 11, publish_time: 2023-08-10 13:29:04.107 +0000 UTC
Consumed msg_id: 13, publish_time: 2023-08-10 13:29:04.674 +0000 UTC
Consumed msg_id: 14, publish_time: 2023-08-10 13:29:04.827 +0000 UTC
Consumed msg_id: 16, publish_time: 2023-08-10 13:29:05.408 +0000 UTC
Consumed msg_id: 17, publish_time: 2023-08-10 13:29:05.61 +0000 UTC
Consumed msg_id: 19, publish_time: 2023-08-10 13:29:06.145 +0000 UTC
Consumed msg_id: 20, publish_time: 2023-08-10 13:29:06.294 +0000 UTC

As can be seen here, even though it seems like we are publishing 6 commands we are receiving double, and the msgIds that is not matching with the Python side, have corrupted data. (missing the proto fields, and some extra attributes we create in our publisher library). The ones that are matching contain the data I need fully.

Please note that this is my development environment in Kubernetes. All of the pods are running in the same namespace and using a pub-sub emulator instead of the real thing (if that makes any difference)

Python code basically sends SOAP requests to some 3rd party to get results and publishes commands. The pagination details are passed from the go side with env variables.

for page in self._config.pages:
    try:
        data = self._get_results(page)

        if not data:
            continue

        self._publish(data, page, success=True)

    except Exception as e:
        self._publish(success=False)

So I am overriding our libraries' publish methods to play with some retry properties, from my research, I understand this can happen because of pubSub library can do retries and there might be cases that both retried and the initial one can succeed simultaneously and this can result sending only msg_id for the retried message.

I will put the publishing part to the with trimming unnecessary business logic to keep things simple.

Here is how publisher client is initialized

self.publisher: Client = pubsub_v1.PublisherClient(
    publisher_options=pubsub_v1.types.PublisherOptions(enable_message_ordering=True),
    batch_settings=pubsub_v1.types.BatchSettings(
                max_messages=100,
                max_bytes=1000000,
                max_latency=0.01,
            )
)

We were using default retry options, I added this inside my overrided function.

some logic
...

 # tried editing this object, removing predicates, increasing initial value etc.
retry_options = retry.Retry(
    initial=100,  # default 0.1
    maximum=60.0,
    multiplier=1.45,
    predicate=retry.if_exception_type(
        core_exceptions.Aborted,
        core_exceptions.Cancelled,
        core_exceptions.DeadlineExceeded,
        core_exceptions.InternalServerError,
        core_exceptions.ResourceExhausted,
        core_exceptions.ServiceUnavailable,
        core_exceptions.Unknown,
    ),
    deadline=600.0)

publish_future = self.publisher.publish(topic_path, data, retry=retry_options, **attributes)
if callback is not None:
    publish_future.add_done_callback(callback)
    return publish_future
else:
    result = publish_future.result()
    self._logger.log_info(f"Published msg: {result}")

Code snippets are very simplistic to keep things simple. I can assure you that, none of the publish logic is getting retried at the application level.

I want to note that, python code is working on its own with no failure, I wrote a dummy subscriber and run things in my locally using the same Pubsub emulator via docker-compose. and if 5 messages are needed 5 are received, this happens when it runs in K8

I am using python version 3.11.1

Versions of google related libraries.

grpcio                   1.51.1
grpcio-health-checking   1.51.1
grpcio-reflection        1.51.1
grpcio-status            1.51.1
grpcio-tools             1.51.1
google-cloud-pubsub      2.16.0
google-api-core          2.11.1
google-auth              2.22.0
grpc-google-iam-v1       0.12.6

What I tried

  • Try with one message to make sure for loop isn't the problem. The same issue happening.
  • Decreased the limit per page to receive less data thinking message size might be the problem same issue happened even with 1 data.
  • Play with the initial value in the retry options to increase it thinking maybe we timed out in initial rpc response and retry happened.
  • remove all of the exceptions from retry, to cancel out retry, to see if it fix it. It didn't.
  • Increase cpu values from 100m to 200m for the python pod (which I don't think the issue is related to that so I didn't increase it too much to see what will happen. I can play with that)

Here are some of the resources I checked.

Issues with multiple messages being published to same pub sub topic

Here it states that One of the most typical reasons would be a DEADLINE_EXCEEDED error, which occurs when the client does not receive a response quickly enough from the server. This can result in duplicates as both the initial request and the retried request could ultimately succeed and you would only get the message ID back from the second request

Which seems a similar problem to what I am seeing.

pub-sub doc on troubleshooting the issue https://cloud.google.com/pubsub/docs/troubleshooting#publish-deadline-exceeded

I am wondering if anyone has any idea and point me in the right direction perhaps on how can I resolve this issue. What else I can try to debug this issue?

UPDATE

I just tried with empty data and received 6 msgs for the go side. This means the data size is the issue here causing delay in publishing. How can I mitigate this? Increase the initial timeout even more? Increase CPU?

1

There are 1 best solutions below

0
Wray27 On

This is less of a solution and more of a workaround. I've been having a similar issue where I have been using the emulator to pass a single message between two services within some tests.

I believe that the emulator isn't recieving an ack after sending a message in a push subscription - when a single message is sent. This is hard to verify as the logs from the emulator only print when a message is published to the topic.

The reason being is that after setting the ack_deadline_seconds on the subscription to 1 second. The message from the emulator was instantly being retried even though the request was a succcessful one.

The workaround is to create the subscription at the start and delete it at the end of every test. Also, setting the ack_deadline_seconds arbitrarily high (the default should be enough though) this is so that no retrying occurs. I believe this to be an acceptable solution, as the emulator is used for testing purposes only.

See an example pytest.fixture below:

@pytest.fixture(autouse=True)
def subscription():
    """
    Recreates a push subscription for every test
    due to an emulator bug that retries when sending a single message
    """
    subscriber = SubscriberClient()
    topic = "dummy-topic"
    project = os.environ["GCP_PROJECT_ID"]
    topic_name = subscriber.topic_path(project, topic)
    url = "http://my-service:8080"
    subscription_name = subscriber.subscription_path(project, "thiswillbedeleted")
    push_config = types.PushConfig(
        push_endpoint=url,
        # NOTE: this next line creates a subscription with "noWrapper": { "writeMetadata": true }
        # but the emulator does not support this feature yet.
        # no_wrapper=types.PushConfig.NoWrapper(write_metadata=True)
    )

    with subscriber:
        subscription = subscriber.create_subscription(
            {
                "name": subscription_name,
                "topic": topic_name,
                "push_config": push_config,
                # no acknowledgement is received by the emulator - this value has
                # been set arbitralily high so that requests aren't retried 
                # 10 seconds is the default
                "ack_deadline_seconds": 10,
            }
        )
        yield subscription
        subscriber.delete_subscription(subscription=subscription_name)