I am using the python pubsublite client(async version) for subscribing from pubsub-lite. I get the below error intermittently

    Traceback (most recent call last): 
File \"/usr/local/lib/python3.10/site-packages/grpc/_plugin_wrapping.py\", line 89, in __call__
 self._metadata_plugin( 
File \"/usr/local/lib/python3.10/site-packages/google/auth/transport/grpc.py\", line 101, in __call__
 callback(self._get_authorization_headers(context), None) 
File \"/usr/local/lib/python3.10/site-packages/google/auth/transport/grpc.py\", line 87, in _get_authorization_headers
 self._credentials.before_request( 
File \"/usr/local/lib/python3.10/site-packages/google/auth/credentials.py\", line 134, in before_request
 self.apply(headers) 
File \"/usr/local/lib/python3.10/site-packages/google/auth/credentials.py\", line 110, in apply
 _helpers.from_bytes(token or self.token) 
File \"/usr/local/lib/python3.10/site-packages/google/auth/_helpers.py\", line 130, in from_bytes 
raise ValueError(\"{0!r} could not be converted to unicode\".format(value))
ValueError: None could not be converted to unicode" 

I don't use GOOGLE_APPLICATION_CREDENTIALS env variable to specify credentials, instead I do as below(I don't want to write credentials to a file in aws host)

import asyncio

from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)
from google.oauth2 import service_account


class AsyncTimedIterable:
    def __init__(self, iterable, poll_timeout=90):
        class AsyncTimedIterator:
            def __init__(self):
                self._iterator = iterable.__aiter__()

            async def __anext__(self):
                try:
                    result = await asyncio.wait_for(
                        self._iterator.__anext__(), int(poll_timeout)
                    )
                    if not result:
                        raise StopAsyncIteration
                    return result

                except asyncio.TimeoutError as e:
                    raise e

        self._factory = AsyncTimedIterator

    def __aiter__(self):
        return self._factory()


# TODO add project info below
location = CloudZone(CloudRegion("region"), "zone")

subscription_path = SubscriptionPath("project_number", location, "subscription_id")

# TODO add service account details
gcp_creds = {}


async def async_receive_from_subscription(per_partition_count=100):
    # Configure when to pause the message stream for more incoming messages based on the
    # maximum size or number of messages that a single-partition subscriber has received,
    # whichever condition is met first.
    per_partition_flow_control_settings = FlowControlSettings(
        # 1,000 outstanding messages. Must be >0.
        messages_outstanding=per_partition_count,
        # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
        bytes_outstanding=10 * 1024 * 1024,
    )

    async with AsyncSubscriberClient(
        credentials=service_account.Credentials.from_service_account_info(gcp_creds)
    ) as async_subscriber_client:
        message_iterator = await async_subscriber_client.subscribe(
            subscription_path,
            per_partition_flow_control_settings=per_partition_flow_control_settings,
        )

        timed_iter = AsyncTimedIterable(message_iterator, 90)
        async for message in timed_iter:
            yield message


async def main():
    async for message in async_receive_from_subscription(per_partition_count=100_000):
        print(message.data)


if __name__ == "__main__":
    asyncio.run(main())


when I went through the files in stack trace I saw a code comment as below in file ``

# The plugin may be invoked on a thread created by Core, which will not
# have the context propagated. This context is stored and installed in
# the thread invoking the plugin.

Is it because the credentials I set are not being sent to another thread when it is created?

0

There are 0 best solutions below