Create new column in Dask DataFrame with specific value for each partition

444 Views Asked by At

I have two Dask DataFrames with the same number of partitions.

The first one has few columns and few rows for each partition (so Pandas DataFrame), but the number of rows could differs between two partitions (not the columns). The second Dask DataFrame has exactly the same number of partitions but have only 1 value. I'm trying to merge these 2 Dask DataFrames by merging, partition to partition like below.

Dask DataFrame N°1 (toy example):

enter image description here

Dask DataFrame N°2 (with only 1 value per partition and so only 1 column):

enter image description here

And my aim is to obtain a Dask Dataframe like this :

enter image description here

I tried Dask.assign() with Dask.map_partitions() but without success.

Do you have an idea to make this efficently ?

Regards.

rmarion37

2

There are 2 best solutions below

0
rmarion37 On BEST ANSWER

Thanks for your answer,

To fulfill all the rows for each partition the key point, in the delayed function, is to force the value as a integer (for my example). It's for sure a special tweak from Pandas, not very clear in the docuementation but real !!!

@dask.delayed def process_partitions(partition1, partition2):  
    partition1.loc[:, 'id_cycle'] = int(partition2)
    return partition1

delayeds = []
for partition1, partition2 in zip(ddf1.partitions, ddf2.partitions):
    delayeds.append(process_partitions(partition1, partition2))

ddf3 = from_delayed(delayeds)
ddf = ddf3.compute()
3
SultanOrazbayev On

If you know for sure that the number of partitions and their order are the same for two dataframes, it's possible to create a custom procedure that processes these partitions in pairs:

from dask import delayed
from dask.dataframe import from_delayed

@delayed
def process_partitions(partition1, partition2):
    ...
    # for example:
    # merged = partition1.merge(partition2)
    # return merged

# assume ddf1, ddf2 have the same number/order of partitions
delayeds = []
for partition1, partition2 in zip(ddf1.partitions, ddf2.partitions):
    delayeds.append(process_partitions(partition1, partition2))

ddf3 = from_delayed(delayeds)