_ORDER_SHARED_MEMORY_NAME: str = "OrderSharedMemory"
_TRADE_SHARED_MEMORY_NAME: str = "TradeSharedMemory"
@functools.lru_cache(maxsize=None)
def _get_dataframe_from_shared_memory(
shared_memory_name: str,
shape: tuple[int, ...],
dtype: np.dtype
) -> pd.DataFrame:
"""
Get DataFrame from shared memory
Note: cache used for this function has no maximum size and shared_memory with the same name might caused inconsistency
Params:
shared_memory_name(str): name of shared memory
shape(tuple[int, ...]): shape of shared memory
dtype(numpy.dtype): data type of shared memory
"""
# Get shared memory instance
shared_mem = shared_memory.SharedMemory(name=shared_memory_name)
# Create a structured NumPy array from the shared memory
np_array = np.ndarray(shape, dtype=dtype, buffer=shared_mem.buf)
# Convert the NumPy array to a DataFrame
dataframe = pd.DataFrame(np_array)
return dataframe
def _orderbook_job(
cls: type["OrderBookHandler"],
date: Date,
security_code: str,
orderbook_root_folder_path: str,
orderbook_compression: str | None,
order_shape: tuple[int, ...],
trade_shape: tuple[int, ...],
order_dtype: np.dtype,
trade_dtype: np.dtype,
include_snapshot_log: bool = False
):
"""
Calculate and write orderbook based on shared memory data. This method is used as job in multiprocessing
Params:
date(utils.types.Date): specified date of tick data
security_code(str): security code of stock
orderbook_root_folder_path(str): root folder path for orderbook
orderbook_compression(str): compression type for orderbook
order_shared_memory_name(str): name of shared memory for order data
trade_shared_memory_name(str): name of shared memory for trade data
order_shape(tuple[int, ...]): shape of order data
trade_shape(tuple[int, ...]): shape of trade data
order_dtype(numpy.dtype): data type of order data
trade_dtype(numpy.dtype): data type of trade data
include_snapshot_log(bool): whether to include snapshot log
"""
order_dataframe = _get_dataframe_from_shared_memory(
_ORDER_SHARED_MEMORY_NAME,
order_shape,
order_dtype
)
trade_dataframe = _get_dataframe_from_shared_memory(
_TRADE_SHARED_MEMORY_NAME,
trade_shape,
trade_dtype
)
orderbook_writer = cls.get_orderbook_writer(orderbook_root_folder_path, orderbook_compression)
orderbook_writer.write_orderbook(
date,
security_code,
order_dataframe,
trade_dataframe,
include_snapshot_log
)
I have _orderbook_job as function that will be the job in multiprocessing. The function works fine until I tried to access np_array in _get_dataframe_from_shared_memory. When manually creating Process object that run _orderbook_job, it suddently halt at dataframe = pd.DataFrame(np_array). When using process pool as above, it weirdly create 1 extra process and do nothing (tried interupting with Control-C but it needs infinite interupt, never be able to interupt all process. This might be not relevant to my problem). I don't use any lock as it is intended for read only.
Here is the main process code.
if (self.__multiprocessing):
# Convert the DataFrame to a structured NumPy array
order_np_array = order_dataframe.to_records(index=False)
trade_np_array = trade_dataframe.to_records(index=False)
# Create a shared memory block
order_shared_memory = shared_memory.SharedMemory(
name=_ORDER_SHARED_MEMORY_NAME,
create=True,
size=order_np_array.nbytes
)
trade_shared_memory = shared_memory.SharedMemory(
name=_TRADE_SHARED_MEMORY_NAME,
create=True,
size=trade_np_array.nbytes
)
# Copy the NumPy array into the shared memory
order_np_shared_array = np.ndarray(order_np_array.shape, dtype=order_np_array.dtype, buffer=order_shared_memory.buf)
order_np_shared_array[:] = order_np_array[:]
trade_np_shared_array = np.ndarray(trade_np_array.shape, dtype=trade_np_array.dtype, buffer=trade_shared_memory.buf)
trade_np_shared_array[:] = trade_np_array[:]
function_args: list[list[Any]] = [
[
type(self),
date,
code,
self.__orderbook_file_parser.get_root_folder(),
self.__orderbook_file_parser.get_compression(),
order_np_array.shape,
trade_np_array.shape,
order_np_array.dtype,
trade_np_array.dtype,
include_snaphot_log
] for code in security_codes
]
self.__worker_pool.starmap(_orderbook_job, function_args)
# Close the shared memory block
order_shared_memory.close()
trade_shared_memory.close()
I can assure you that it is not the problem of using @functools.lru_cache(maxsize=None). Problem still occurs even when I remove it. Also tried to remove shared memory access in the subprocess and it run perfectly. The code also run in the single threaded mode with sucessful shared memory access.