I'm a first-time naive Beam user and have some questions. First I'll state the problem I'm trying to solve:
I have a BigQuery database with ~35 million rows of data of minutely timeseries data, across ~100k people. My goal is to create a TensorFlow Dataset (TFDS) where each example is a 2D Tensor of timeseries data that is 30 minutes long by ~50 features wide. These windows overlap each minute. For example. a user with data from 12:00 - 12:38 would have 10 examples each with 30 minutes of data, with the first window ranging 12:00-12:29 and the last 12:09-12:38.
I have a TFDS DatasetBuilder already written that works serially, which I've modified to use the built-in TFDS Beam support. I did this by creating a list of Pandas dataframes (stored in combined_inputs below), one for each user, and passing that into beam.Create then I apply a processing function (which does the windowing across an entire person's dataframe, imputation and formatting) via beam.Flatmap like so:
pcollection = beam.Create(combined_inputs)
transform = beam.FlatMap(lambda data: self._generate_examples(data[0],data[1],data[2]))
My problem is that I receive a MemoryError when I try to run this:
write(payload)
MemoryError
My questions
- I think I'm using Beam in a non-optimal way by passing in a list of Dataframes, correct?
- Would it be more efficient to serialize that list of DataFrames to a GCS bucket (via pickle or parquet), then pass a list of filenames into beam.Create? If yes, is there a built in way to have beam do then deserialize those dataframes and pass them into my Flatmap function? Or should I deserialize the dataframes within my Flatmap function?
- Or would it be more efficient to pass a list of userIDs to beam.Create and then query BigQuery per user within my Flatmap function and do all of the processing there?
- Or do something completely different that I'm not thinking of? I did read a little bit about the Beam BigQuery I/O connector, but that operates on a single row, and I want to operate on windows, so I think this isn't the right solution?
Thanks so much!