I am working with a large dataset (point speed measurements by GPS-equipped taxis) containing about 50,000,000 rows, as well as the OSM road network (on which I also created the opposite direction for the directional links, reversing the geometry). I initially performed a sjoin_nearest, to assign each point in the OSM link that belongs to.
However, given that the sjoin_nearest sometimes returns more than one link (either the same link but two different directions and/or different links with different osm ids that simply match the defined distance criterion), I tried to define a function for a refinement based on angular differences (i.e., see the point orientation, see the link orientation, and finally assign each point to the correct link (final_osm_id and final_direction), based on the minimum angular difference).
The relevant code (for a sub-sample of the first 10,000 rows of my dataset) is provided below:
# Function to calculate angular difference for a given point and link orientation
def calculate_angular_difference(Point_Orientation, Link_Orientation):
return np.abs(Point_Orientation - Link_Orientation) % 360
# Refine assignment function
def refine_assignment(group):
print("Original Group:")
print(group)
# Check if there is only one row in the group with NaN direction
if len(group) == 1 and pd.isna(group['direction'].iloc[0]):
print("Single row with NaN direction detected.")
group['final_osm_id'] = group['osm_id']
group['final_direction'] = group['direction']
return group
# Check if there are both directions of the same directional link present within the group
elif group['direction'].nunique() == 2 and group['osm_id'].nunique() == 1:
print("Both directions of the same directional link present within the group.")
# Identify the minimum angular difference
min_difference = group['Angular_Difference'].min()
# Select the row with the minimum angular difference and the correct direction
min_difference_row = group.loc[(group['Angular_Difference'] == min_difference) & ~group['direction'].isna()][['osm_id', 'direction']]
print("Min Difference Row:")
print(min_difference_row)
# Assign the values to the group DataFrame
group['final_osm_id'] = min_difference_row['osm_id'].values[0]
group['final_direction'] = min_difference_row['direction'].values[0]
return group
# Check if the group has any directional link with only one direction
elif len(group) == 1 and pd.notna(group['direction'].iloc[0]):
print("Directional link with one direction detected.")
# Identify the other direction from the original new_osm_links_3
osm_id = group['osm_id'].iloc[0]
current_direction = group['direction'].iloc[0]
other_direction = 3 - current_direction # Assuming directions are 1 and 2
# Identify the row for the original direction
original_direction_row = group[['osm_id', 'direction']]
# Identify the other direction row from the original new_osm_links_3
other_direction_row = new_osm_links_3[
(new_osm_links_3['osm_id'] == osm_id) & (new_osm_links_3['direction'] == other_direction)
]
# Check if the other_direction_row is not empty
if not other_direction_row.empty:
print("Other direction row identified:")
print(other_direction_row)
# Fetch the Link_Orientation value for both directions
original_direction_orientation = group['Link_Orientation'].values[0]
other_direction_orientation = other_direction_row['Link_Orientation'].values[0]
# Calculate angular differences for both directions
angular_difference_original = calculate_angular_difference(group['Point_Orientation'], original_direction_orientation)
angular_difference_other = calculate_angular_difference(group['Point_Orientation'], other_direction_orientation)
print("Angular Differences - Original Direction:")
print(angular_difference_original)
print("Angular Differences - Other Direction:")
print(angular_difference_other)
# Identify the direction with the overall minimum angular difference
min_difference_direction = 1 if angular_difference_original.min() <= angular_difference_other.min() else 2
# Choose the row for the direction with the minimum angular difference
if min_difference_direction == 1:
min_difference_row = original_direction_row
else:
min_difference_row = other_direction_row[['osm_id', 'direction']]
print("Min Difference Row:")
print(min_difference_row)
# Assign the values to the group DataFrame
group['final_osm_id'] = min_difference_row['osm_id'].values[0]
group['final_direction'] = min_difference_row['direction'].values[0]
return group
# Check if the osm id is not unique in the group
if group['osm_id'].nunique() > 1:
print("Multiple osm ids within the group. Calculating angular differences for each row.")
# List to store results
results = []
# Calculate angular differences for each row
for index, row in group.iterrows():
osm_id = row['osm_id']
direction = row['direction']
point_orientation = row['Point_Orientation']
link_orientation = row['Link_Orientation']
# Calculate angular difference
angular_difference = calculate_angular_difference(point_orientation, link_orientation)
results.append({'osm_id': osm_id, 'direction': direction, 'angular_difference': angular_difference})
# Convert the list of results to a DataFrame
results_df = pd.DataFrame(results)
# Check if there are directional links with only one direction present within the group
directional_links = group[group['direction'].notna()]
for index, directional_link in directional_links.iterrows():
osm_id = directional_link['osm_id']
current_direction = directional_link['direction']
other_direction = 3 - current_direction
# Identify the row for the original direction
original_direction_row = directional_links[['osm_id', 'direction']]
# Identify the other direction row from the original new_osm_links_3
other_direction_row = new_osm_links_3[
(new_osm_links_3['osm_id'] == osm_id) & (new_osm_links_3['direction'] == other_direction)
]
# Check if the other_direction_row is not empty
if not other_direction_row.empty:
print(f"Other direction row identified for link {osm_id}:")
print(other_direction_row)
# Fetch the Link_Orientation value for both directions
original_direction_orientation = directional_link['Link_Orientation']
other_direction_orientation = other_direction_row['Link_Orientation'].values[0]
# Calculate angular differences for both directions
angular_difference_original = calculate_angular_difference(point_orientation, original_direction_orientation)
angular_difference_other = calculate_angular_difference(point_orientation, other_direction_orientation)
print(f"Angular Differences - Original Direction for link {osm_id} direction {current_direction}:")
print(angular_difference_original)
print(f"Angular Differences - Other Direction for link {osm_id} direction {other_direction}:")
print(angular_difference_other)
# Add the results to the DataFrame
results_df = results_df.append({
'osm_id': osm_id,
'direction': current_direction,
'angular_difference': angular_difference_original
}, ignore_index=True)
results_df = results_df.append({
'osm_id': osm_id,
'direction': other_direction,
'angular_difference': angular_difference_other
}, ignore_index=True)
# Identify the row with the minimum angular difference among all links
min_difference_row = results_df.loc[results_df['angular_difference'].idxmin()][['osm_id', 'direction']]
print("Min Difference Row:")
print(min_difference_row)
# Assign the values to the group DataFrame
final_osm_id = min_difference_row['osm_id']
final_direction = min_difference_row['direction']
group['final_osm_id'] = final_osm_id
group['final_direction'] = final_direction
return group
# Performing the refinement function for 10,000 rows sub-saple
result_sample = interm_result_1.head(10000)
result_sample['Angular_Difference'] = result_sample.apply(
lambda row: calculate_angular_difference(row['Point_Orientation'], row['Link_Orientation']),
axis=1
)
result_sample = result_sample.groupby(result_sample.index).progress_apply(refine_assignment)
print(result_sample)
print(result_sample[['lon', 'lat', 'osm_id', 'distance', 'direction', 'final_osm_id', 'final_direction']])
The function seems to run well, by inspecting the interim print statements. However, when applying the function to the total dataset, using the relevant code (see below), it always results in a memory crash (RAM), after performing about 20% progress..
# Define the function to calculate angular difference
def calculate_angular_difference(Point_Orientation, Link_Orientation):
return np.abs(Point_Orientation - Link_Orientation) % 360
# Define the function to refine assignment
def refine_assignment(group):
# Check if there is only one row in the group with NaN direction
if len(group) == 1 and pd.isna(group['direction'].iloc[0]):
group['final_osm_id'] = group['osm_id']
group['final_direction'] = group['direction']
return group
# Check if there are both directions of the same directional link present within the group
elif group['direction'].nunique() == 2 and group['osm_id'].nunique() == 1:
min_difference = group['Angular_Difference'].min()
min_difference_row = group.loc[(group['Angular_Difference'] == min_difference) & ~group['direction'].isna()][['osm_id', 'direction']]
group['final_osm_id'] = min_difference_row['osm_id'].values[0]
group['final_direction'] = min_difference_row['direction'].values[0]
return group
# Check if the group has any directional link with only one direction
elif len(group) == 1 and pd.notna(group['direction'].iloc[0]):
osm_id = group['osm_id'].iloc[0]
current_direction = group['direction'].iloc[0]
other_direction = 3 - current_direction # Assuming directions are 1 and 2
original_direction_row = group[['osm_id', 'direction']]
other_direction_row = new_osm_links_3[
(new_osm_links_3['osm_id'] == osm_id) & (new_osm_links_3['direction'] == other_direction)
]
if not other_direction_row.empty:
original_direction_orientation = group['Link_Orientation'].values[0]
other_direction_orientation = other_direction_row['Link_Orientation'].values[0]
angular_difference_original = calculate_angular_difference(group['Point_Orientation'], original_direction_orientation)
angular_difference_other = calculate_angular_difference(group['Point_Orientation'], other_direction_orientation)
min_difference_direction = 1 if angular_difference_original.min() <= angular_difference_other.min() else 2
if min_difference_direction == 1:
min_difference_row = original_direction_row
else:
min_difference_row = other_direction_row[['osm_id', 'direction']]
group['final_osm_id'] = min_difference_row['osm_id'].values[0]
group['final_direction'] = min_difference_row['direction'].values[0]
return group
# Check if the osm id is not unique in the group
if group['osm_id'].nunique() > 1:
results = []
for index, row in group.iterrows():
osm_id = row['osm_id']
direction = row['direction']
point_orientation = row['Point_Orientation']
link_orientation = row['Link_Orientation']
angular_difference = calculate_angular_difference(point_orientation, link_orientation)
results.append({'osm_id': osm_id, 'direction': direction, 'angular_difference': angular_difference})
results_df = pd.DataFrame(results)
directional_links = group[group['direction'].notna()]
for index, directional_link in directional_links.iterrows():
osm_id = directional_link['osm_id']
current_direction = directional_link['direction']
other_direction = 3 - current_direction
original_direction_row = directional_links[['osm_id', 'direction']]
other_direction_row = new_osm_links_3[
(new_osm_links_3['osm_id'] == osm_id) & (new_osm_links_3['direction'] == other_direction)
]
if not other_direction_row.empty:
original_direction_orientation = directional_link['Link_Orientation']
other_direction_orientation = other_direction_row['Link_Orientation'].values[0]
angular_difference_original = calculate_angular_difference(point_orientation, original_direction_orientation)
angular_difference_other = calculate_angular_difference(point_orientation, other_direction_orientation)
results_df = results_df.append({
'osm_id': osm_id,
'direction': current_direction,
'angular_difference': angular_difference_original
}, ignore_index=True)
results_df = results_df.append({
'osm_id': osm_id,
'direction': other_direction,
'angular_difference': angular_difference_other
}, ignore_index=True)
min_difference_row = results_df.loc[results_df['angular_difference'].idxmin()][['osm_id', 'direction']]
final_osm_id = min_difference_row['osm_id']
final_direction = min_difference_row['direction']
group['final_osm_id'] = final_osm_id
group['final_direction'] = final_direction
return group
# Rename interm_result_1 to interm_result_2
interm_result_2 = interm_result_1
# Performing the refinement function for whole dataset
interm_result_2['Angular_Difference'] = interm_result_2.apply(
lambda row: calculate_angular_difference(row['Point_Orientation'], row['Link_Orientation']),
axis=1
)
interm_result_2 = interm_result_2.groupby(interm_result_2.index).progress_apply(refine_assignment)
print(interm_result_2)
print(interm_result_2[['lon', 'lat', 'osm_id', 'distance', 'direction', 'final_osm_id', 'final_direction']])
Is there something that I can do or change to my code, to avoid RAM crash? I have a 32Gb installed RAM. I always tried to proceed with chunks, but again the total RAM usage is about 99%. I am running the code in Spyder (Python 3.11).