I have a list of items that I need to filter based on some conditions. I'm wondering whether Dask could do this filtering in parallel, as the list is very long (a few dozen million records).
Basically, what I need to do is this:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
With Dask, what I have achieved to do is the following:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
if item_is_valid:
return item
results = []
item = []
for item in items:
delayed = dask.delayed(item_is_valid)(item)
results.append(delayed)
results = dask.compute(*results)
However, the result I get contains a few None values, which then need to be filtered out somehow in a non-parallel way.
({'type': 'dog', 'weight': 10}, {'type': 'dog', 'weight': 20}, None, None)
Perhaps the
bagAPI will work you, this is a rough pseudo-code:To inspect if this is working, inspect the outcome of
result.take(5)and if that is satisfactory: