I want to deploy a streaming Dataflow job which is listening into a Pub/Sub topic.
The Pub/Sub message content looks like this:
{
"file_path": "gs://my_bucket_name/my_file.csv",
"transformations": [
{
"column_name": "NAME",
"transformation": "to_upper"
},
{
"column_name": "SURNAME",
"transformation": "to_lower"
}
]
}
My problem is that I would like to process the file specified by the message (file_path) and apply the given
transformations for each column in the CSV file.
I have tried several ways to achieve this, but none of them worked and I am wondering if this is not possible at all or I am missing something.
- First try:
class ProcessMessage(beam.DoFn):
def process(self, message):
from apache_beam.pvalue import TaggedOutput
try:
file_path = message.get('file_path')
yield TaggedOutput('file_path', file_path)
except Exception as e:
raise Exception(e)
with beam.Pipeline(options=pipeline_options) as p:
file_path = (
p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic=input_topic,timestamp_attribute='ts')
| "Parse JSON" >> beam.Map(json.loads)
| "Process Message" >> beam.ParDo(ProcessMessage).with_outputs('file_path')
)
file_content = (
p
| "Read file" >> beam.io.ReadFromText(file_path)
)
This fails with:
file_pattern must be of type string or ValueProvider; got <DoOutputsTuple main_tag=None tags=('file_path',) transform=<ParDo(PTransform) label=[ParDo(ProcessMessage)]> at 0x1441f9550> instead
- Second Try -> Read file with custom csv reader and then return the content:
class ReadFile(beam.DoFn):
def process(self, element):
import csv
import io as io_file
from apache_beam import io
file_path = element.get('file_path')
reader = csv.DictReader(io_file.TextIOWrapper(
io.filesystems.FileSystems.open(file_path),
encoding='utf-8'),
delimiter=';')
for row in reader:
yield row
with beam.Pipeline(options=pipeline_options) as p:
message = (
p | "Read from Pubsub" >> beam.io.ReadFromPubSub(
topic=pipeline_config.get('input_topic'),
timestamp_attribute='ts')
| "Parse JSON" >> beam.Map(json.loads)
| "Process message" >> beam.ParDo(ProcessMessage())
)
file_content = (
message
| beam.ParDo(ReadFile())
| beam.Map(print)
)
This does not produce any error and neither prints the file lines.
I know this post is a bit on the long side, but I hope someone may help me,
Thanks!
First solution does not work because
ReadFromTexttakes as argument string, for example bucket path "gs://bucket/file". In your example you insert into this class PCollection (result of previous PTransform) - so it will not work. Instead, you should useReadAllFromTextthat takes as input PCollection, so it is the result of previous PTransform.Also your code would need to be modified a bit:
If DoFn class returns only one type of output, there is no reason to use TaggedOutput so let's return just regular iterator.
Next,
ReadAllFromTextshould be connected to previous step of the pipeline, not top.Be aware that
file_contentvariable will be Pcollection of elements, where each element will be single row of your CSV file in form of string. Because of that it will be more complex to easily apply transformations per each column, because in first element will be columns names, next will be just single row without columns names applied.Your second try seems to be better for this:
Something like this could work, but probably better idea will be to split it into two classes - one for reading, another for applying transformations :)