I'm running a PySpark job on a Dataproc cluster using Cloud Composer as the orchestration tool.
The job instantiates an object MyClass with a bucket attribute defined like so:
from google.cloud import storage
from .constants import GCS_BUCKET_NAME
class MyClass:
bucket = storage.Client().bucket(GCS_BUCKET_NAME)
def load_data(self, filepath: str, contents: str) -> None:
blob = self.bucket.blob(filepath)
blob.upload_from_string(contents)
and GCS_BUCKET_NAME is defined in a constants.py file as
GCS_BUCKET_NAME = os.getenv("DATA_BUCKET_NAME")
The DATA_BUCKET_NAME environment variable is set using an initialization script:
#!bin/bash
DATA_BUCKET_NAME=$(/usr/share/google/get_metadata_value attributes/DATA_BUCKET_NAME)
echo "DATA_BUCKET_NAME=${DATA_BUCKET_NAME}" >> /etc/environment
When I connect with SSH into the cluster nodes of the Dataproc cluster, the environment variable is well defined.
But when the DAG runs in Cloud Composer, the job fails when the load_data function is called with the following error:
ValueError: Cannot determine path without bucket name
What I've tried so far but none of these seem to work:
- use the
storage.Client().get_bucketmethod instead ofstorage.Client().bucket - specify the project id:
storage.Client().get_bucket(GCS_BUCKET_NAME, PROJECT_ID) - allow more storage permissions to the dataproc service account
Thanks for your help!