Process file from a Pub/Sub message in Dataflow streaming

739 Views Asked by At

I want to deploy a streaming Dataflow job which is listening into a Pub/Sub topic.

The Pub/Sub message content looks like this:

{
   "file_path": "gs://my_bucket_name/my_file.csv",
   "transformations": [
      {
         "column_name": "NAME",
         "transformation": "to_upper"
      },
      {
         "column_name": "SURNAME",
         "transformation": "to_lower"
      }
   ]
}

My problem is that I would like to process the file specified by the message (file_path) and apply the given transformations for each column in the CSV file.

I have tried several ways to achieve this, but none of them worked and I am wondering if this is not possible at all or I am missing something.

  1. First try:
class ProcessMessage(beam.DoFn):

    def process(self, message):
        from apache_beam.pvalue import TaggedOutput
        try:
            file_path = message.get('file_path')
            yield TaggedOutput('file_path', file_path)
        except Exception as e:
            raise Exception(e)

with beam.Pipeline(options=pipeline_options) as p:
    file_path = (
        p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=input_topic,timestamp_attribute='ts')
          | "Parse JSON" >> beam.Map(json.loads)
          | "Process Message" >> beam.ParDo(ProcessMessage).with_outputs('file_path')
    )
    file_content = (
        p
        | "Read file" >> beam.io.ReadFromText(file_path)
    )

This fails with: file_pattern must be of type string or ValueProvider; got <DoOutputsTuple main_tag=None tags=('file_path',) transform=<ParDo(PTransform) label=[ParDo(ProcessMessage)]> at 0x1441f9550> instead

  1. Second Try -> Read file with custom csv reader and then return the content:
class ReadFile(beam.DoFn):

    def process(self, element):
        import csv
        import io as io_file

        from apache_beam import io

        file_path = element.get('file_path')

        reader = csv.DictReader(io_file.TextIOWrapper(
            io.filesystems.FileSystems.open(file_path),
            encoding='utf-8'),
            delimiter=';')

        for row in reader:
            yield row

with beam.Pipeline(options=pipeline_options) as p:

    message = (
        p | "Read from Pubsub" >> beam.io.ReadFromPubSub(
            topic=pipeline_config.get('input_topic'),
            timestamp_attribute='ts')
        | "Parse JSON" >> beam.Map(json.loads)
        | "Process message" >> beam.ParDo(ProcessMessage())
    )

    file_content = (
        message
        | beam.ParDo(ReadFile())
        | beam.Map(print)
    )

This does not produce any error and neither prints the file lines.

I know this post is a bit on the long side, but I hope someone may help me,

Thanks!

2

There are 2 best solutions below

1
Pav3k On BEST ANSWER

First solution does not work because ReadFromText takes as argument string, for example bucket path "gs://bucket/file". In your example you insert into this class PCollection (result of previous PTransform) - so it will not work. Instead, you should use ReadAllFromText that takes as input PCollection, so it is the result of previous PTransform.

Also your code would need to be modified a bit:

If DoFn class returns only one type of output, there is no reason to use TaggedOutput so let's return just regular iterator.

class ProcessMessage(beam.DoFn):

    def process(self, message):
        try:
            file_path = message.get('file_path')
            yield file_path 
        except Exception as e:
            raise Exception(e)

Next, ReadAllFromText should be connected to previous step of the pipeline, not to p.

file_content = (
            p 
            | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=p.options.topic, timestamp_attribute='ts')
            | "Parse JSON" >> beam.Map(json.loads)
            | "Process Message" >> beam.ParDo(ProcessMessage())
            | "Read file" >> beam.io.ReadAllFromText()   
        )

Be aware that file_content variable will be Pcollection of elements, where each element will be single row of your CSV file in form of string. Because of that it will be more complex to easily apply transformations per each column, because in first element will be columns names, next will be just single row without columns names applied.

Your second try seems to be better for this:

class ApplyTransforms(beam.DoFn):

    def process(self, element):

        file_path = element.get('file_path')
        transformations = element.get('transformations')

        with beam.io.gcsio.GcsIO().open(file_path) as file:
            reader = csv.DictReader(io.TextIOWrapper(file, encoding="utf-8"), delimiter=';')
            for row in reader:
                for transform in transformations:
                    col_name = transform.get("column_name")
                    transformation = transform.get("transformation")
                    # apply your transform per row 
                yield row

Something like this could work, but probably better idea will be to split it into two classes - one for reading, another for applying transformations :)

0
Juanan On

Thanks to @Pav3k answer I was able to solve the problem. My code is now decoupled and looks like this:

class MyMessage(typing.NamedTuple):
    # Simple way to propagate all the needed information from the Pub/Sub message.
    file_path: str
    transformations: dict


class ProcessMessage(beam.DoFn):

    def process(self, message):
        """
        Example of the Pub/Sub message
        {
            "file_path": "gs://my-bucket/file_to_process.csv",
            "transformations": {
                "col_1": "to_upper",
                "col_2": "to_lower"
            }
        }
        """
        yield MyMessage(file_path=message.get('file_path'), 
                        transformations=message.get('transformations'))


class ReadFile(beam.DoFn):

    def process(self, element: MyMessage):
        import csv
        import io as io_file

        from apache_beam import io

        reader = csv.DictReader(io_file.TextIOWrapper(
            io.filesystems.FileSystems.open(MyMessage.file_path),
            encoding='utf-8'),
            delimiter=';')

        for row in reader:
            # Yields both the row to process and the transformations.
            yield (row, MyMessage.transformations)


class Transform(beam.ParDo):

    def to_upper(self, value):
        return value.upper()

    def to_lower(self, value):
        return value.lower()

    def process(self, element):
        """
        Now I now the transformations for each element and may be parallelized.
        """
        row = element[0]
        transformations = element[1]
        transformed_row = {}
        for key in transformations:
            value = row[key]
            transformation = transformations[key]
            transformed_row[key] = getattr(self, transformation)(value)
        yield transformed_row


def main(argv):

    parser = argparse.ArgumentParser()
    parser.add_argument("--topic_name", required=True)
    app_args, pipeline_args = parser.parse_known_args()
    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as p:

        message = (
            p | "Read from Pubsub" >> beam.io.ReadFromPubSub(
                topic=app_args.topic_name,
                timestamp_attribute='ts')
            | "Parse JSON" >> beam.Map(json.loads)
            | "Process message" >> beam.ParDo(ProcessMessage())
        )

        file_content = (
            message
            | beam.ParDo(ReadFile())
            | beam.ParDo(Transform())
            | beam.Map(print)
        )