I have a following problem - I have a list of delayed objects after applying following code (see below):
When I am applying
ddf = dd.from_delayed(lazy_results_names)
instead of dask dataframe I recieve dask scalar. For some reason this method returns a scalar.
Can you please help me how can update the code so that instead I recieve a dataframe? I have tried reading documentation and Chat GPT, both don't help much.
import dask.dataframe as dd
import pandas as pd
import numpy as np
import dask.delayed
from dask import delayed, compute
from fuzzywuzzy import process
#first dask dataset
data_orbis = {'Name': ['Johanna', 'Chanti', 'Pamela', 'Connie', 'Cesar', 'Eldar', 'Chris', "Richard","Rechard","Carl","Carl"],'ID': ['100', "200","300","400","500","600","700", "800", "900", "999", "1100" ], 'Country': ["Germany", "Germany", "Czechia","Russia","Andorra","France","Germany","Germany","Germany", "Denmark", "Sweden" ], 'City': ["Mainz", "Mainz", "Brno", "Moscow", "Andorra","Paris", "Berlin", "Mainz", "Mannheim","Copenhagen", "Malmo" ]}
df = pd.DataFrame(data_orbis)
dask_df_orbis = dd.from_pandas(df, npartitions=4)
name_series_orbis = dask_df_orbis["Name"]
#second dask dataset
data_dealscan = {'Name': ['Andrey', 'Canti', 'Pamelo', 'Cannie', 'Cezar', 'Eltor', 'Chriss', "Richard", "Rechard","Carl","Carll"],'ID': ['np.nan', "np.nan","np.nan","np.nan","500","600","700", "800","900", "999", "np.nan"], 'Country': ["Germany", "Germany", "Czechia","Russia","Andorra","France","Germany","Germany","Germany","Denmark", "Sweden"], 'City': ["Mainz", "Mainz", "Brno", "Moscow", "Andorra","Paris", "Berlin", "Mainz", "Mannheim","Copenhagen", "Malmo" ]}
pandas_df_dealscan = pd.DataFrame(data_dealscan)
dask_df_dealscan = dd.from_pandas(pandas_df_dealscan, npartitions=4)
dask_series_dealscan_names = dask_df_dealscan['Name']
# Convert Dask Series to Dask list using delayed
@delayed(nout=100)
def series_to_list(series):
return series.tolist()
# Apply the delayed function to the Dask Series
dask_dealscan_names_list = series_to_list(dask_series_dealscan_names)
# Define a delayed function for matching
@dask.delayed
def match_names_with_country_and_city(name_dealscan, country, city, name_series_orbis):
best_match = process.extractOne(name_dealscan, name_series_orbis)
best_match = best_match[0] # Extracting the best match from the tuple
return best_match
# Create a list of delayed computations
lazy_results_names = []
for name_dealscan, country, city in zip(dask_dealscan_names_list, dask_df_dealscan['Country'], dask_df_dealscan['City']):
# Delay the computation of matching function
lazy_result_name = match_names_with_country_and_city(name_dealscan, country, city, name_series_orbis)
lazy_results_names.append(lazy_result_name)