I'm trying to set up a method for interpolating points from a curvilinear grid to a regular grid. To do this, I'm trying to find out which cell of the curvilinear grid each point of the regular grid belongs to. The cells of the curvilinear grid are random quadrilaterals.
Grids are created as follows, for example, for an unfavorable case:
import numpy as np
import matplotlib.pyplot as plt
def circle(xn, yn):
"""Polar grid."""
R = (xn[-1, 0] - xn[0, 0]) / (2*np.pi)
return (yn + R)*np.sin(xn/R), (yn + R)*np.cos(xn/R)
_x= np.linspace(0, 0.1, 150)
_y = np.linspace(0, 0.1, 100)
# Curvilinear grid
up, vp = circle(np.meshgrid(_x, _y, indexing='ij'))
# Cartesian grid
ui = np.linspace(up.min(), up.max(), 300)
vi = np.linspace(vp.min(), vp.max(), 200)
I'm able to find the correspondence between each grid using a classic quadrilateral point-finding algorithm:
%%cython -f -c=-O2 -I./ --compile-args=-fopenmp --link-args=-fopenmp
cimport cython
from cython.parallel cimport prange
import numpy as np
cimport numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef neighbors(double[:, ::1] xp, double[:, ::1] yp, double xi, double yi):
"""Clockwise winding Polygon :
[[xp[ix, iy], yp[ix, iy]],
[xp[ix, iy+1], yp[ix, iy+1]],
[xp[ix+1, iy+1], yp[ix+1, iy+1]],
[xp[ix+1, iy], yp[ix+1, iy]]])
"""
cdef Py_ssize_t nx = xp.shape[0]
cdef Py_ssize_t ny = xp.shape[1]
cdef Py_ssize_t ix, iy = 0
cdef Py_ssize_t tag = 0
cdef double xmin, xmax, ymin, ymax
cdef double x1, x2, x3, x4, y1, y2, y3, y4
for ix in range(nx-1):
for iy in range(ny-1):
x1 = xp[ix, iy]
x2 = xp[ix, iy+1]
x3 = xp[ix+1, iy+1]
x4 = xp[ix+1, iy]
y1 = yp[ix, iy]
y2 = yp[ix, iy+1]
y3 = yp[ix+1, iy+1]
y4 = yp[ix+1, iy]
if xi - min(x1, x2, x3, x4) < 0.:
continue
if yi - min(y1, y2, y3, y4) < 0.:
continue
# Check that point (xi, yi) is on the same side of each edge of the quadrilateral
if not (xi - x1) * (y2 - y1) - (x2 - x1) * (yi - y1) > 0:
continue
if not (xi - x2) * (y3 - y2) - (x3 - x2) * (yi - y2) > 0:
continue
if not (xi - x3) * (y4 - y3) - (x4 - x3) * (yi - y3) > 0 :
continue
if not (xi - x4) * (y1 - y4) - (x1 - x4) * (yi - y4) > 0:
continue
tag = 1
break
if tag == 1:
break
if tag == 1:
return (ix, iy), (ix, iy+1), (ix+1, iy+1), (ix+1, iy)
return None
And it works:
fig, axes = plt.subplots(1, 1, figsize=(6, 6))
for i in np.arange(0, ui.shape[0]):
axes.axvline(ui[i], color='k', linewidth=0.1)
for i in np.arange(0, vi.shape[0]):
axes.axhline(vi[i], color='k', linewidth=0.1)
for i in np.arange(0, vp.shape[1]):
axes.plot(up[:, i], vp[:, i], 'k', linewidth=0.5)
for i in np.arange(0, up.shape[0]):
axes.plot(up[i, :], vp[i, :], 'k', linewidth=0.5)
for iu, iv in ((180, 120), (100, 90), (80, 40), (10, 10)):
axes.plot(ui[iu], vi[iv], 'ro')
idx = neighbors(up, vp, ui[iu], vi[iv])
if idx:
for _idx in idx:
axes.plot(up[_idx], vp[_idx], 'go')
plt.show()
The thing is, on my system, each point search takes an average of 15 us for this grid (7 ms for a 2000x2000 grid), i.e. for the search of 2000x2000 points, an estimated time of 28000 seconds (not to mention the 3d case :)).
I think I can save time by avoiding the search for points that don't exist in the curvilinear grid, but I can't find a simple indicator to skip the search for these points.
I think there's a better global method for this kind of research, but I'm not aware of it.
How could I speed up the things using Cython/numpy?
By using a simple dichotomy to target the x's, it is possible to save quite a lot of calculation time. On the other hand, the code presented below can lead to errors if x is not monotonic:
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef bint is_monotonic(double[:, ::1] v):
cdef Py_ssize_t i
for i in range(v.shape[0] - 1):
if (v[i, 1] > v[i, 0]) != (v[i+1, 1] > v[i+1, 0]):
return False
return True
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef (double, double) minmax(double[:] arr):
cdef double min = arr[0]
cdef double max = arr[0]
cdef Py_ssize_t i
cdef Py_ssize_t n = arr.shape[0]
for i in range(1, n):
if arr[i] < min:
min = arr[i]
elif arr[i] > max:
max = arr[i]
return min, max
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef (Py_ssize_t, Py_ssize_t) dichotomy(double[:, ::1] xp, double xi, Py_ssize_t offset=1):
cdef Py_ssize_t nx = xp.shape[0]
cdef Py_ssize_t istart = 0
cdef Py_ssize_t istop = nx - 1
cdef Py_ssize_t middle = (istart + istop) // 2
cdef double xmin1, xmin2, xmax1, xmax2
cdef double xmin, xmax
while (istop - istart) > 3:
middle = (istart + istop) // 2
xmin1, xmax1 = minmax(xp[istart, :])
xmin2, xmax2 = minmax(xp[middle, :])
xmin = min(xmin1, xmax1, xmin2, xmax2)
xmax = max(xmin1, xmax1, xmin2, xmax2)
if xmin <= xi <= xmax:
if middle - istart < 3:
return istart - offset, middle + offset
istop = middle
else:
istart = middle
xmin1, xmax1 = minmax(xp[middle, :])
xmin2, xmax2 = minmax(xp[istop, :])
xmin = min(xmin1, xmax1, xmin2, xmax2)
xmax = max(xmin1, xmax1, xmin2, xmax2)
if xmin <= xi <= xmax:
return middle - offset, istop + offset
return -1, -1
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
cpdef neighbor(double[:, ::1] xp, double[:, ::1] yp, double xi, double yi):
(...)
cdef Py_ssize_t istart = 0
cdef Py_ssize_t istop = nx - 2
if is_monotonic(xp):
istart, istop = dichotomy(xp, xi)
if istart == -1:
return None
for ix in range(istart, istop + 1):
for iy in range(ny-1):
(...)