I want to ask if someone can tell me, or even show me an example, of a dataflow job template, preferably in Python, in which I can:
- Continuously read JSON data from a Pub/Sub topic
- Process this data / Enrich it with custom logic
- Load the data into an existing BigTable table
I've tried delving into the docs of all 3 products but I found myself in a rabbit hole of undocumented APIs.
I tried using Apache Beam in Python in order to at least try to make such a pipeline work, using, for example, this definition:
with beam.Pipeline(options=pipeline_options) as p:
_ = (
p
| "Read from Pub/Sub"
>> beam.io.ReadFromPubSub(
subscription=pipeline_options.input_subscription
)
| "Parse JSON" >> beam.Map(json.loads)
| "Process message" >> beam.ParDo(ProcessMessage())
| "Writing row object to BigTable"
>> WriteToBigTable(
project_id=pipeline_options.bigtable_project,
instance_id=pipeline_options.bigtable_instance,
table_id=pipeline_options.bigtable_table,
)
)
I am not sure even the json.loads works, and if it does, in what format does it even reach my "ProcessMessage" class, which I tried making generic regardless of what keys I put in, but it still fails on errors I can't understand:
class ProcessMessage(beam.DoFn):
def process(self, message):
from google.cloud.bigtable import row as row_
import datetime
bt_row = row_.DirectRow(row_key=message.get('id'))
for k, v in message.items():
bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
yield bt_row
It's very unclear how I transform my JSON message, which might not be flat, streaming from the Pub/Sub:
{
"id": "12345",
"somekey": "somevalue",
"somekey2": ["some other value"]
}
into a row in bigtable, where it dynamically transforms all the keys into columns. I know bigtable requires a unique row key, so I have an ID, but I have no idea how to specify it in the code.
Have you seen the dataflow cookbook examples on GitHub?
Reading from pubsub subscription with python
Writing to bigtable with python
Below is a code showing a apache beam pipeline that reads a pub/sub subscription and write on bigtable, using your input as an example: