Multithreaded python script using 1 common bloom filter

111 Views Asked by At

I have a task in front of me, using the largest possible bloom filter for a computer. I chose from pybloom_live import BloomFilter. It was possible to create a bloom filter equal to 49% of RAM (to create it, you need the same amount of memory to save it). Then I did a couple of cycles to check the data in the bloom filter. I started it, the script works as I need it, but the problem is that the script uses only 10% of the processor resources.
I started experimenting with multiprocessing. I tried different options, very often each thread creates a copy of the bloom filter and the script breaks. Very slowly, but I got to the bottom of the question. I need to make a multithreading so that several processor cores use the same bloom filter without creating copies of it. After various experiments, it was possible to make sure that several copies were not created, but another error pops up.

Someone can tell you how to make several threads use 1 bloom filter and it is desirable that working with this bloom filter should be fast.

def load_bloom_filter(file_path):
    try:
        bloom_filter = BloomFilter.fromfile(open(file_path, 'rb'))
        if bloom_filter is None:
            print("[-] error load")
        else:
            print(f"[+] load successful {file_path}, zise: {len(bloom_filter)}")
        return bloom_filter
    except Exception as e:
        print(f"[-] error: {e}")
        return None
        

loaded_bloom_filter = load_bloom_filter(input_bloom_file_path1)
manager = multiprocessing.Manager()
bloom_filter = manager.Value(BloomFilter, id(loaded_bloom_filter))

some function for check in BF
if calculated_value_txt in bloom_filter.value:
    save_value(calculated_value_txt, save_file1_, save_file2_)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
    pool.starmap(process_task, task_params)
  File "C:\Users\Root\AppData\Local\Programs\Python\Python310\lib\multiprocessing\pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "C:\Users\Root\AppData\Local\Programs\Python\Python310\lib\multiprocessing\pool.py", line 771, in get
    raise self._value
TypeError: argument of type 'int' is not iterable
1

There are 1 best solutions below

1
YOGENDRA SONI On

I think the problem is you are keeping id(loaded_bloom_filter) in your manager.

Try keeping loaded_bloom_filter in manager.Value not ID of loaded_bloom_filter.

bloom_filter = manager.Value(BloomFilter, loaded_bloom_filter)

You can use below code to verify.

from pybloom_live import BloomFilter
import multiprocessing
def load_bloom_filter(file_path):
    try:
        bloom_filter = BloomFilter.fromfile(open(file_path, 'rb'))
        if bloom_filter is None:
            print("[-] error load")
        else:
            print(f"[+] load successful {file_path}, zise: {len(bloom_filter)}")
        return bloom_filter
    except Exception as e:
        print(f"[-] error: {e}")
        return None


if __name__ == '__main__':
    bf = BloomFilter(10)
    bf.add(1)
    bf.add(2)
    bf.add(3)
    input_bloom_file_path1 = 'bf'
    bf.tofile(open(input_bloom_file_path1, 'wb'))
    calculated_value_txt_list = [1,5,3]
    loaded_bloom_filter = load_bloom_filter(input_bloom_file_path1)
    manager = multiprocessing.Manager()
    bloom_filter = manager.Value(BloomFilter, loaded_bloom_filter)
    for calculated_value_txt in calculated_value_txt_list:
        if calculated_value_txt in bloom_filter.value:
            print('found',calculated_value_txt)
        else:
            print('not found',calculated_value_txt)

output:

[+] load successful bf, zise: 3
found 1
not found 5
found 3