Masking and averaging very large netcdf data with shapefiles in dask array: how to improve performance

58 Views Asked by At

I am trying to work with 40 years of global-scale hourly precipitation data from ERA5-land. I want to mask the data using river catchment areas and then return the catchment average (average with in catchment) for every time step individually and save one netcdf for each catchment area. The following code works, but the total processing time is very slow.

First I make lists of all the data.

#create lists of ERA5 and catchment boundary files
ERA5_files = [os.path.join(dirpath,f) for (dirpath, dirnames, filenames) in os.walk("path_to_all_files") for f in filenames] # this is roughly 500 files. 

catchment_files = [os.path.join(dp, f) for dp, dn, filenames in os.walk("path_to_shapefiles") for f in filenames if os.path.splitext(f)[1] == '.shp'] # This is roughly 5000 shapefiles 

Then I read and combine the shapefiles into one geopandas dataframe gdf

gdfs = []
for f in catchment_files:
    gdf = gpd.read_file(f)
    gdfs.append(gdf)

gdf = gpd.GeoDataFrame(pd.concat(gdfs))
# remove failures
gdf = gdf[gdf['gauge_id'].isin(passed)]

Then I write functions for processing the data

# write functions

#this is applied as a preprocessing step in open_mfdataset to speed up processing time and to switch the coordinate format to match the shapefiles.
def intconv(nc_stack):
    nc_stack['tp'] = nc_stack['tp']*100
    nc_stack.astype('uint16')

    # switch coords of era5 to -180
    lon_switch = nc_stack.assign_coords(lon=(((nc_stack.lon + 180) % 360) - 180)).sortby('lon')
    lon_switch = lon_switch.rio.set_spatial_dims("lon", "lat", inplace=True)
    lon_switch = lon_switch.rio.write_crs("EPSG:4326")

    return lon_switch

# this performs the masking procedure for an individual shapefile and writes the result to a netcdf. 
def nc_clip_return(nc_stack, gdf, a):
    shape = gdf[gdf.gauge_id == a]
    nc_stack = nc_stack.load()
    da_clip = nc_stack.rio.clip(
        shape.geometry
    )

    #save netcdf
    grid_mean = da_clip.mean(dim=["lon", "lat"],skipna=True)
    name = '/save_path/' + a + '_0200_era5.nc'
    grid_mean.to_netcdf(name, mode='w',format='NETCDF4_CLASSIC')

Then I open a dask client and read in a sub-sample of the data. This takes about 10 seconds and the resulting dataset is 3529 GiB (the total for all ncdf files would be about 7000).

from dask.distributed import Client

client = Client()
client

start = time.time()
test = xr.open_mfdataset(ERA5_files[0:200], chunks={"time": 12}, parallel=True)
#test = xr.decode_cf(test)
end = time.time()
print(end - start)

Then i loop through the shapefiles, mask and average them and save as a netcdf file. My estimates suggest this will take months to run for all shapefiles. Any recommendations on how to speed up this process/optimize my code?

#loop through ids, average and save as netcdf.
for a in gdf.gauge_id.unique():
    start = time.time()
    test = test.rio.set_spatial_dims("lon", "lat", inplace=True)
    nc_clip_return(a = a, nc_stack = test, gdf = gdf) 
    
    end = time.time()
    print(end - start)

0

There are 0 best solutions below