Shared Memory Access Terminates Process in Python Multiprocessing

28 Views Asked by At
_ORDER_SHARED_MEMORY_NAME: str = "OrderSharedMemory"
_TRADE_SHARED_MEMORY_NAME: str = "TradeSharedMemory"

@functools.lru_cache(maxsize=None)
def _get_dataframe_from_shared_memory(
    shared_memory_name: str,
    shape: tuple[int, ...],
    dtype: np.dtype
) -> pd.DataFrame:
    """
    Get DataFrame from shared memory

    Note: cache used for this function has no maximum size and shared_memory with the same name might caused inconsistency

    Params:
        shared_memory_name(str): name of shared memory
        shape(tuple[int, ...]): shape of shared memory
        dtype(numpy.dtype): data type of shared memory
    """
    
    # Get shared memory instance
    shared_mem = shared_memory.SharedMemory(name=shared_memory_name)

    # Create a structured NumPy array from the shared memory
    np_array = np.ndarray(shape, dtype=dtype, buffer=shared_mem.buf)

    # Convert the NumPy array to a DataFrame
    dataframe = pd.DataFrame(np_array)

    return dataframe

def _orderbook_job(
        cls: type["OrderBookHandler"],
        date: Date,
        security_code: str,
        orderbook_root_folder_path: str,
        orderbook_compression: str | None,
        order_shape: tuple[int, ...],
        trade_shape: tuple[int, ...],
        order_dtype: np.dtype,
        trade_dtype: np.dtype,
        include_snapshot_log: bool = False
    ):
        """
        Calculate and write orderbook based on shared memory data. This method is used as job in multiprocessing

        Params:
            date(utils.types.Date): specified date of tick data
            security_code(str): security code of stock
            orderbook_root_folder_path(str): root folder path for orderbook
            orderbook_compression(str): compression type for orderbook
            order_shared_memory_name(str): name of shared memory for order data
            trade_shared_memory_name(str): name of shared memory for trade data
            order_shape(tuple[int, ...]): shape of order data
            trade_shape(tuple[int, ...]): shape of trade data
            order_dtype(numpy.dtype): data type of order data
            trade_dtype(numpy.dtype): data type of trade data
            include_snapshot_log(bool): whether to include snapshot log
        """

        order_dataframe = _get_dataframe_from_shared_memory(
            _ORDER_SHARED_MEMORY_NAME,
            order_shape,
            order_dtype
        )

        trade_dataframe = _get_dataframe_from_shared_memory(
            _TRADE_SHARED_MEMORY_NAME,
            trade_shape,
            trade_dtype
        )

        orderbook_writer = cls.get_orderbook_writer(orderbook_root_folder_path, orderbook_compression)

        orderbook_writer.write_orderbook(
            date,
            security_code,
            order_dataframe,
            trade_dataframe,
            include_snapshot_log
        )

I have _orderbook_job as function that will be the job in multiprocessing. The function works fine until I tried to access np_array in _get_dataframe_from_shared_memory. When manually creating Process object that run _orderbook_job, it suddently halt at dataframe = pd.DataFrame(np_array). When using process pool as above, it weirdly create 1 extra process and do nothing (tried interupting with Control-C but it needs infinite interupt, never be able to interupt all process. This might be not relevant to my problem). I don't use any lock as it is intended for read only.

Here is the main process code.

if (self.__multiprocessing):
            # Convert the DataFrame to a structured NumPy array
            order_np_array = order_dataframe.to_records(index=False)
            trade_np_array = trade_dataframe.to_records(index=False)

            # Create a shared memory block
            order_shared_memory = shared_memory.SharedMemory(
                name=_ORDER_SHARED_MEMORY_NAME,
                create=True, 
                size=order_np_array.nbytes
            )
            trade_shared_memory = shared_memory.SharedMemory(
                name=_TRADE_SHARED_MEMORY_NAME,
                create=True, 
                size=trade_np_array.nbytes
            )

            # Copy the NumPy array into the shared memory
            order_np_shared_array = np.ndarray(order_np_array.shape, dtype=order_np_array.dtype, buffer=order_shared_memory.buf)
            order_np_shared_array[:] = order_np_array[:]

            trade_np_shared_array = np.ndarray(trade_np_array.shape, dtype=trade_np_array.dtype, buffer=trade_shared_memory.buf)
            trade_np_shared_array[:] = trade_np_array[:]

            function_args: list[list[Any]] = [
                [
                    type(self),
                    date, 
                    code,
                    self.__orderbook_file_parser.get_root_folder(),
                    self.__orderbook_file_parser.get_compression(),
                    order_np_array.shape,
                    trade_np_array.shape,
                    order_np_array.dtype,
                    trade_np_array.dtype,
                    include_snaphot_log
                ] for code in security_codes
            ]

            self.__worker_pool.starmap(_orderbook_job, function_args)

            # Close the shared memory block
            order_shared_memory.close()
            trade_shared_memory.close()

I can assure you that it is not the problem of using @functools.lru_cache(maxsize=None). Problem still occurs even when I remove it. Also tried to remove shared memory access in the subprocess and it run perfectly. The code also run in the single threaded mode with sucessful shared memory access.

0

There are 0 best solutions below