Speeding up DASK bag processing of a text file?

202 Views Asked by At

Hi I have the following code :

import dask.bag as db 

class Similarity(object):

    def __init__(self):
        self.bag = None

    @staticmethod
    def olap(item, ary) :
        iix,item = item.strip().split(':')
        aix,ary = ary.strip().split(':')
        o = len( set(item.split(' ')) & set(ary.split(' ')) )
        return o,int(aix),int(iix),item


    def process(self, file):
        self.bag = db.read_text(file)   
        rv = []
        for c, i in enumerate(self.bag.take(100)) :
            rv.append( self.bag.map(Similarity.olap, i).filter(lambda x: x[1] != x[2]).max().compute() )
        return rv

For now i'm processing ~10_000 line text file, which is a light load. It is simply a Sentence per line which is split to words and compared. Every line with ALL the other lines in the file.

The problem is that it is too SLOOOW ... 100 steps take ~1m 20sec with all the CPUs working. At the same time the score function is fast ~2 micro secs

In [171]: %timeit Similarity.olap('5:aaa bbb ccc dddd  ooooooooo ppppppppppp jee', '7:bbb aa ccc ddd ee uu oooo pppp')                                                       
2.09 µs ± 56.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

So do you have any tricks to help me SPEED it up ?

1

There are 1 best solutions below

2
SultanOrazbayev On

Comparing 10**4 lines to each other is 10**8 computations, so it's not a very light computation. If your similarity metric is symmetric (seems to be the case), then you can halve the time, by comparing only one of every pair (so similarity from A to B is sufficient to know similarity from B to A).

In terms of the actual code, you have .compute inside a loop, which will slow down computations because next iteration must wait for the previous computation to complete. Rough pseudocode for a faster way is as follows (will probably need adjusting for actual results):

# ensure this is in the imports along with the dask bag
import dask


# other code including Class definition skipped

    def process(self, file):
        self.bag = db.read_text(file)   
        rv = []
        for c, i in enumerate(self.bag.take(100)) :
            # no compute inside the loop
            rv.append( self.bag.map(Similarity.olap, i).filter(lambda x: x[1] != x[2]).max())

        # now the rv contains lazy objects only, so compute them at once
        rv = dask.compute(rv)
        return rv