Latency from sequential yield calls in python

57 Views Asked by At

I'm writing a code to read a set of pandas.DataFrames stored in h5 files and iterate over their rows. The aim of my code is to handle the dataset using a pytorch IterableDataset, but I belive my question below is not exclusive to pytorch.

Since reading each h5 from disk takes a a while, I have implemented the following logic

  1. the code reads the first file from disk, and pre-fetches the second file asynchronously
  2. once the first DataFrame is read out from disk, the code starts iterating over its rows (using `yield from)
  3. once that iteration is done, the code reads the following file asynchronously and starts yielding from the one it prefetched before.

The code can be found here

def _load_next( file_list, index, device, labels, variables):
    if index >= len(file_list):
        return None
        
     thedata=pd.read_hdf(file_list[index], 'df')
     labels=torch.Tensor( thedata[labels].values).to(device)
     variables=torch.Tensor( thedata[variables].values).to(device)     

     return index, (labels,variables)   

class datasets( IterableDataset ):
    def __init__( self, path, device, variables, labels):
        self.files=glob.glob(path)
    self.device=device
        self.variables=variables
        self.labels=labels
        self.restart()
        
    def restart(self):
        print("Re-starting iterator")
        # read first file and submit prefetching of the following
        self.file_index, self.current_data=_load_next(self.files,0, self.device)
        self.prefetch=self.executor.submit(_load_next, self.files, self.file_index+1, self.device)   
        
    def __iter__(self):
       while True:
            yield from zip(self.current_data[0], self.current_data[1])
            result=self.prefetch.result()
            if result is None: 
                self.executor.shutdown(wait=False)
                raise StopIteration
            else:
                self.file_index, self.current_data = result
                self.prefetch=self.executor.submit(_load_next, self.files, self.file_index+1, self.device)

The logic works well, however each yield from call takes a few seconds, which introduces a (perhaps) unnecessary latency (the latency is actually longer than the time it takes to prefetch the following file). Is there a way to remove this latency, perhaps running yield from asynchronously? Other ideas are of course welcome! Thanks in advance!

1

There are 1 best solutions below

0
spontiak On

Thanks to the feedback received in the discussion, I managed to optimize the code even further - the problem was not in the yield call but on the fact that the torch.Tensor iterator that is ~slow but can be constructed asynchronously. The code reads like that below.

This is quite faster than the previous version, but I'm not fully satisfied: the neural network training and the prefetching seem to be competing for the same resources (the training slows down until the prefetching is done)

def _load_next( file_list, index, device, labels, variables):
    if index >= len(file_list):
        return None
        
     thedata=pd.read_hdf(file_list[index], 'df')
     labels=torch.Tensor( thedata[labels].values).to(device).__iter__()
     variables=torch.Tensor( thedata[variables].values).to(device).__iter__()    

     return index, (labels,variables)   

class datasets( IterableDataset ):
    def __init__( self, path, device, variables, labels):
        self.files=glob.glob(path)
        self.device=device
        self.variables=variables
        self.labels=labels
        self.restart()
        
    def restart(self):
        print("Re-starting iterator")
        # read first file and submit prefetching of the following
        self.file_index, self.current_data=_load_next(self.files,0, self.device)
        self.prefetch=self.executor.submit(_load_next, self.files, self.file_index+1, self.device)   
        
    def __iter__(self):
       while True:
            yield from zip(self.current_data[0], self.current_data[1])
            result=self.prefetch.result()
            if result is None: 
                self.executor.shutdown(wait=False)
                self.restart()
                break
            else:
                self.file_index, self.current_data = result
                self.prefetch=self.executor.submit(_load_next, self.files, self.file_index+1, self.device)