Context
Following cythonisation (see original SO post) of a function to replace a filtering operation on an extremely large pandas.DataFrame (100,000,000 rows), I now need to vectorise the operation and do so utilising parallel processing.
The original python version which compares values in the df columns (Z, T, M) against single values (Z1, Z2, T1, T2, M1, M2):
# Python
# df: pandas.DataFrame
# Z, T, M: columns of df, float64 values
# I: column of df, uint32 values
# Z1, Z2, T1, T2, M1, M2: float64 values
res = df.query(
f"{Z1} < Z < {Z2} and {T1} < T < {T2} and {M1} < M < {M2}",
engine="numexpr",
)[["T", "I", "M"]]
The cython solution, with thanks to @Nick ODell:
import numpy as np
import pandas as pd
cimport numpy as cnp
cimport cython
cnp.import_array()
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef filter_df_cy_no_pd(df, double Z1, double Z2, double T1, double T2, double M1, double M2):
# Get NumPy array for each column
cdef cnp.float64_t[::1] Z = df['Z'].values
cdef cnp.float64_t[::1] T = df['T'].values
cdef cnp.float64_t[::1] M = df['M'].values
cdef cnp.int32_t[::1] I = df['I'].values
cdef cnp.float64_t[::1] T_out = np.empty_like(T)
cdef cnp.int32_t[::1] I_out = np.empty_like(I)
cdef cnp.float64_t[::1] M_out = np.empty_like(M)
cdef Py_ssize_t n = Z.shape[0]
cdef cython.bint in_range = 0
cdef Py_ssize_t out_idx = 0
for i in range(n):
in_range = (
(Z1 < Z[i]) & (Z[i] < Z2) &
(T1 < T[i]) & (T[i] < T2) &
(M1 < M[i]) & (M[i] < M2)
)
if in_range:
T_out[out_idx] = T[i]
I_out[out_idx] = I[i]
M_out[out_idx] = M[i]
out_idx += 1
T_out_arr = np.asarray(T_out)[:out_idx]
I_out_arr = np.asarray(I_out)[:out_idx]
M_out_arr = np.asarray(M_out)[:out_idx]
return pd.DataFrame({
'T': T_out_arr,
'I': I_out_arr,
'M': M_out_arr,
}, copy=False)
Problem
My attempts to parallelise the function have been in vain, failing when compiling.
Attempts
- The
dfparameter has been replaced with:
cnp.uint32_t[::1] I
cnp.float64_t[::1] Z
cnp.float64_t[::1] T
cnp.float64_t[::1] M
Z1, Z2, T1, T2, M1, M2parameters are nowcnp.float64_t[::1].- These parameters are now
pandas.Seriescreated in python and supplied to the function call asnumpy.ndarraywith the.to_numpy()method. - I modified the cython function definition from
cpdeftocdefalong with addingnoexcept nogil. for i in range(n)changed tofor i in prange(n, nogil=True), along with includingfrom cython.parallel import prangeat the top of themodule.pyx.- The function
returnisT_out_arr, I_out_arr, M_out_arr - I call this function from an outer
cpdeffunction under the context manager:
cdef Py_ssize_t p, r
r = Z1.shape[0]
cdef object[:, :] output = np.empty((r, 3), dtype=object)
# `3` is due to the 3 arrays returned from the inner cdef `filter_cy()` function
with nogil:
for p in range(r):
output_nested[p, :] = filter_cy(
# args...
)
Errors
Some of the numerous errors include:
Function with Python return type cannot be declared nogilCalling gil-requiring function not allowed without gilAccessing Python attribute not allowed without gilCannot read reduction variable in loop body
I understand that the nature of most of the problems stem from the presence of python objects inside the inner cdef function, but I'm finding trying to implement other alternatives tricky. I don't know if it would be better to parallelise the outer loop within which the inner cdef function is called, but ideally I need to speed up the inner filtering operation as that is the bottleneck.
Moreover, I haven't begun to try to construct a binary search implementation in the inner function. The way I was thinking about it would be possibly to sort each numpy.ndarray in the python function call, then in the inner function get the indices of the matches, then intersect the indices, and then use those indices to get the values in the outer function? I don't know if that's the best approach (or how to actually engineer it).
I appreciate any help offered.