I am using apache beam to load data to datastore. I am using the below imports but i cannot find a way to specify the database ID of the datastore.
I am able to do this using python gcloud-datastore sdk , but when it comes to apache beam, i am not able to spcify the database ID.
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1new.datastoreio import WriteToDatastore
from apache_beam.io.gcp.datastore.v1new.types import Entity,Key
import random
# Define your pipeline options
pipeline_options = beam.options.pipeline_options.PipelineOptions(
project='<>', # Specify your project ID
region='<>'
)
# List of columns to be indexed
key_tuple = ('name')
class PrintElement(beam.DoFn):
def process(self, element):
print(element)
yield element
class CreateEntity(beam.DoFn):
def process(self,element):
# python sdk offers database param
# client = datastore.Client(project='<>',database='<>')
entity = Entity(key=Key(path_elements=['Kind_name',random.randint(1, 1000000)],project='<project>'),exclude_from_indexes=key_tuple)
entity.set_properties(element)
yield entity
# Create a pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
# Read data from BigQuery
data_from_bq = (pipeline
| 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
query = 'select * from table',
use_standard_sql=True,
gcs_location="gs://<bucket name>/<folder name>/")
)
entities = (data_from_bq
| 'Prepare Data for Datastore' >> beam.ParDo(CreateEntity())
)
entities | "Print elements" >> beam.ParDo(PrintElement())
# Write to Datastore
entities | 'Write to Datastore' >> WriteToDatastore(
project='gcp-project'
)
I am able to read from bigquery and print but i am not facing any errors and the application gets completed. 1.) How can i specify the database ID when writing to datastore ?