How to stream data from Pub/Sub to Google BigTable using DataFlow?

44 Views Asked by At

I want to ask if someone can tell me, or even show me an example, of a dataflow job template, preferably in Python, in which I can:

  1. Continuously read JSON data from a Pub/Sub topic
  2. Process this data / Enrich it with custom logic
  3. Load the data into an existing BigTable table

I've tried delving into the docs of all 3 products but I found myself in a rabbit hole of undocumented APIs.

I tried using Apache Beam in Python in order to at least try to make such a pipeline work, using, for example, this definition:

with beam.Pipeline(options=pipeline_options) as p:
    _ = (
        p
        | "Read from Pub/Sub"
        >> beam.io.ReadFromPubSub(
            subscription=pipeline_options.input_subscription
        )
        | "Parse JSON" >> beam.Map(json.loads)
        | "Process message" >> beam.ParDo(ProcessMessage())
        | "Writing row object to BigTable"
        >> WriteToBigTable(
            project_id=pipeline_options.bigtable_project,
            instance_id=pipeline_options.bigtable_instance,
            table_id=pipeline_options.bigtable_table,
        )
    )

I am not sure even the json.loads works, and if it does, in what format does it even reach my "ProcessMessage" class, which I tried making generic regardless of what keys I put in, but it still fails on errors I can't understand:

class ProcessMessage(beam.DoFn):

    def process(self, message):
        from google.cloud.bigtable import row as row_
        import datetime         
        bt_row = row_.DirectRow(row_key=message.get('id'))
        for k, v in message.items():
            bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
        yield bt_row

It's very unclear how I transform my JSON message, which might not be flat, streaming from the Pub/Sub:

{
   "id": "12345",
   "somekey": "somevalue",
   "somekey2": ["some other value"]
}

into a row in bigtable, where it dynamically transforms all the keys into columns. I know bigtable requires a unique row key, so I have an ID, but I have no idea how to specify it in the code.

1

There are 1 best solutions below

0
Rafael Adao On BEST ANSWER

Have you seen the dataflow cookbook examples on GitHub?

Below is a code showing a apache beam pipeline that reads a pub/sub subscription and write on bigtable, using your input as an example:

    import logging
    
    import apache_beam as beam
    from apache_beam.io import ReadFromPubSub
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.transforms.core import DoFn
    from google.cloud.bigtable.row import DirectRow
    from google.cloud.bigtable.row_data import Cell
    from apache_beam.io.gcp.bigtableio import WriteToBigTable
    
    class ConvertToJson(beam.DoFn):
        def process(self, element):
            import json
            yield json.loads(element)
    
    class MakeBigtableRow(DoFn):
        def process(self, element):
            row = DirectRow(row_key=str(element['id']))
            for key, value in element.items():
                row.set_cell(
                    column_family_id='cf1',
                    column=key,
                    value=str(value)
                )
            yield row
    
    def run():
        class ReadPubSubOptions(PipelineOptions):
            @classmethod
            def _add_argparse_args(cls, parser):
                parser.add_argument(
                    "--subscription",
                    required=True,
                    help="PubSub subscription to read.",
                )
                parser.add_argument(
                    "--project_id",
                    required=True,
                    help="Project ID"
                )
                parser.add_argument(
                    "--instance_id",
                    required=True,
                    help="Cloud Bigtable instance ID"
                )
                parser.add_argument(
                    "--table_id",
                    required=True,
                    help="Cloud Bigtable table ID"
                )
        options = ReadPubSubOptions(streaming=True)
    
        with beam.Pipeline(options=options) as p:
            (
                p
                | "Read PubSub subscription"
                >> ReadFromPubSub(subscription=options.subscription)
                | "Convert to JSON" >> beam.ParDo(ConvertToJson())
                | 'Map to Bigtable Row' >> beam.ParDo(MakeBigtableRow())
                | "Write to BigTable" >> WriteToBigTable(
                    project_id=options.project_id,
                    instance_id=options.instance_id,
                    table_id=options.table_id
                )
                | beam.Map(logging.info)
            )
    
    if __name__ == "__main__":
        logging.getLogger().setLevel(logging.INFO)
        run()

print of bigtable UI