Setting:
We are receiving gzipped csv files on an hourly basis in hdfs (like 1k+ files per 24h). These files are organized in a folder structure /data/<year>/<month>/<day>/<hour>/<unique_id>.csv.gz.
Our etl process (spark application runs once per day. During that pipeline, we are
- reading all files for the day,
- applying some transformations,
- repartition the whole dataset by hour and write the results back to hdfs (yielding 24 avro files per day).
Observation:
When monitoring the spark job, I can see a lot of shuffle operations (that also involve transferring a huge amount of data over the network, especially between step 2 and 3). In step 1/2, a single task is created per file which is scheduled on the executor node that gives the best locality level (PROCESS_LOCAL). In step 3, we have one task per hour (24 tasks in total) each writing a single avro file back to hdfs.
The reason for these large shuffle operations is that input csv files for a specific hour are physically located at multiple different cluster nodes in hdfs. After read/transform operation, all the records for a specific hour need to be sent to a single executor which runs the write task for that hour in step 3.
Idea for optimization: In order to optimize this process, the idea is to somehow physically locate all raw csv files/blocks for the same hour at the same node. We would not get rid of the shuffle operation but this would mostly require local shuffle read/writes on the executor nodes and minimize the amount of network traffic. At this point, it is also worth mentioning that network bandwidth is a very limited resource in our cluster. So, the spark application spends most of the time shuffling data around.
Is there any possibility to influence/control the physical location of a file in hdfs during the upload process or maybe by a separate script that runs on a cron schedule?
Are there any other options to optimize/streamline this process?