How do I efficiently apply a function that uses a fairly large NetworkX graph to each row of a Dask series?

39 Views Asked by At

I am currently facing this problem:

I have a pandas (or dask) dataframe where one of the columns holds integer values representing the index of a node in a given NetworkX graph. The graph itself is moderately big (~100mb, it's a OSMNX street network). For each of these nodes, I need to do a local Dijkstra search using nx.dijkstra_predecessor_and_distance up to a given distance from the source node, then compare the visited nodes with indices in another list and return the nodes that we met.

First, I defined the function that should take the integer value of each row:

def get_all_walkable_stopID(node_index,walk_network,stops : pd.DataFrame, max_walk=1500):
    """
    Given a OSMNX walk network, the index of one node and a pd.DataFrame of stops with coordinates, returns all stops within walkable distance.
    """
    
    pred,distance = nx.dijkstra_predecessor_and_distance(walk_network, node_index, cutoff=max_walk, weight='length')  
    # is a df with a single column "distance"
    result = stops.join(pd.DataFrame.from_dict(distance,orient='index',columns=['distance']),on='nearest_osmnx_node',how='inner')[['distance']]

    return result.index.values.tolist() # this is a list of nodes of interest within walkable distance

As you can see, I pass around the walk_network as input of the function. This works well in local pandas, but when I try

walkers_dask.start_nearest_osmnx_node.apply(
    get_all_walkable_stopID,
    args=(walk_network,stops),
    meta=('x','object')
).compute()

I get an awful long cryptic CancelledError (as usual with Dask) telling me that it failed to deserialize:

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa3 in position 0: invalid start byte

I even get this error if I replace the output of the function to a simple 'test' string (and modify the meta accordingly); but if I apply a lambda function with no arguments

walkers_dask.start_nearest_osmnx_node.apply(
    lambda x : 'test',
    meta=('x','string')
).compute()

the computation succeeds and returns a Series of 'test' as expected, so there must be something wrong with the argument passing, but I am currently at a loss for better ways to distribute this computation with Dask and I have no idea how to fix this.

Any help is appreciated!

0

There are 0 best solutions below