Source code for arim.ray

"""
Ray tracing module

"""

import contextlib
import gc
import logging
import math
import os
import time
import warnings
from concurrent.futures import ThreadPoolExecutor

import numba
import numpy as np

from . import geometry as g
from . import settings as s
from .exceptions import ArimWarning, InvalidDimension
from .helpers import Cache, NoCache, chunk_array

use_parallel = os.environ.get("ARIM_USE_PARALLEL", not numba.core.config.IS_32BITS)


[docs] def find_minimum_times( time_1, time_2, dtype=None, dtype_indices=None, block_size=None, numthreads=None ): """ For i=1:n and j=1:p, out_min_times(i, j) := min_{k=1:m} time_1[i, k] + time_2[k, j] out_min_indices(i, j) := argmin_{k=1:m} time_1[i, k] + time_2[k, j] Parameters ---------- time_1 Shape: (n, m) time_2 Shape: (m, p) dtype dtype_indices Returns ------- out_min_times Shape: (n, p) out_min_indices Shape: (n, p) Notes ----- Memory usage: - duplicate 'time_1' if it not in C-order. - duplicate 'time_2' if it not in Fortran-order. """ assert time_1.ndim == 2 assert time_2.ndim == 2 try: n, m = time_1.shape m_, p = time_2.shape except ValueError: raise InvalidDimension("time_1 and time_2 must be 2d.") if m != m_: raise ValueError("Array shapes must be (n, m) and (m, p).") if dtype is None: dtype = np.result_type(time_1, time_2) if dtype_indices is None: dtype_indices = s.INT if block_size is None: block_size = s.BLOCK_SIZE_FIND_MIN_TIMES if numthreads is None: numthreads = s.NUMTHREADS out_min_times = np.full((n, p), np.inf, dtype=dtype) out_best_indices = np.full((n, p), -1, dtype=dtype_indices) # time_1 will be scanned row per row, time_2 column per column. # Force to use the most efficient order (~20 times speed-up between the best and worst case). time_1 = np.ascontiguousarray(time_1) time_2 = np.asfortranarray(time_2) # Chunk time_1 and time_2 such as each chunk contains roughly 'block_size' # floats. Chunks for 'time_1' are lines (only complete lines), chunks # for 'time_2' are columns (only complete columns). block_size_adj = math.ceil(block_size / m) futures = [] with ThreadPoolExecutor(max_workers=numthreads) as executor: for chunk1 in chunk_array((n, m), block_size_adj, axis=0): for chunk2 in chunk_array((m, p), block_size_adj, axis=1): chunk_res = (chunk1[0], chunk2[1]) futures.append( executor.submit( _find_minimum_times, time_1[chunk1], time_2[chunk2], out_min_times[chunk_res], out_best_indices[chunk_res], ) ) # Raise exceptions that happened, if any: for future in futures: future.result() return out_min_times, out_best_indices
@numba.jit(nopython=True, nogil=True, cache=True) def _find_minimum_times(time_1, time_2, out_min_times, out_best_indices): """ Parameters ---------- time_1 time_2 out_min_time out_best_indices Returns ------- """ n, m = time_1.shape m, p = time_2.shape for i in range(n): for j in range(p): for k in range(m): new_time = time_1[i, k] + time_2[k, j] if new_time < out_min_times[i, j]: out_min_times[i, j] = new_time out_best_indices[i, j] = k logger = logging.getLogger(__name__)
[docs] def ray_tracing_for_paths( paths_list, walls=None, turn_off_invalid_rays=False, convert_to_fortran_order=False ): """ Perform the ray tracing for different paths. Save the result in ``Path.rays``. Parameters ---------- paths_list : List[Path] convert_to_fortran_order Returns ------- None """ paths_list = tuple(paths_list) fermat_paths_tuple = tuple(FermatPath.from_path(path) for path in paths_list) fermat_solver = FermatSolver(fermat_paths_tuple) rays_dict = fermat_solver.solve() for path, fermat_path in zip(paths_list, fermat_paths_tuple): rays = rays_dict[fermat_path] suspicious_rays = rays.gone_through_extreme_points() num_suspicious_rays = suspicious_rays.sum() if num_suspicious_rays > 0: logger.warning( f"{num_suspicious_rays} rays of path {path.name} go through " "the interface limits. Extend limits." ) if turn_off_invalid_rays: rays_dict[fermat_path]._invalid_rays = rays.gone_through_interface( path.interfaces, walls ) if convert_to_fortran_order: old_rays_dict = rays_dict rays_dict = {k: v.to_fortran_order() for k, v in old_rays_dict.items()} # Save results in attribute path.rays: for path, fermat_path in zip(paths_list, fermat_paths_tuple): path.rays = rays_dict[fermat_path]
[docs] def ray_tracing( views_list, walls=None, turn_off_invalid_rays=False, convert_to_fortran_order=False ): """ Perform the ray tracing for different views. Save the result in ``Path.rays``. Parameters ---------- views : List[View] Returns ------- None """ # Ray tracing: paths_set = set(v.tx_path for v in views_list) | set(v.rx_path for v in views_list) return ray_tracing_for_paths( list(paths_set), walls=walls, turn_off_invalid_rays=turn_off_invalid_rays, convert_to_fortran_order=convert_to_fortran_order, )
@numba.jit(nopython=True, nogil=True, parallel=use_parallel) def _expand_rays(interior_indices, indices_new_interface, expanded_indices): """ Expand the rays by one interface knowing the beginning of the rays and the points the rays must go through at the last interface. A0, A1, ..., A(d+1) are (d+2) interfaces. n: number of points of interface A0 m: number of points of interface Ad p: number of points of interface A(d+1) Arrays layout must be contiguous. Output: out_ray Parameters ---------- interior_indices: *interior* indices of rays going from A(0) to A(d). Shape: (d, n, m) indices_new_interface: indices of the points of interface A(d) that the rays starting from A(0) cross to go to A(d+1). Shape: (n, p) expanded_indices: OUTPUT Shape (d+1, n, p) """ d, n, m = interior_indices.shape _, p = indices_new_interface.shape for i in numba.prange(n): for j in range(p): # get the point on interface A(d) to which the ray goes idx = indices_new_interface[i, j] # copy the head of ray for k in range(d): expanded_indices[k, i, j] = interior_indices[k, i, idx] # add the last point expanded_indices[d, i, j] = idx
[docs] class Rays: """ Rays(times, interior_indices, path) Store the rays between the first and last sets of points along a specific path. - n: number of points of the first set of points. - m: number of points of the last set of points. - d: number of interfaces along the path. We name A(1), A(2), ..., A(d) the d interfaces along the path. A ray passes A(1), A(2), ..., A(d) in this order. The ray (i, j) is defined as the ray starting in `A(1)[i]`` and arriving in ``A(d)[j]``. Parameters ---------- times : ndarray of floats [n x m] Shortest time between first and last set of points. ``times[i, j]`` is the total travel time for the ray (i, j). indices_interior : ndarray of floats [(d-2) x n x m] Indices of points through which each ray goes, excluding the first and last interfaces. ``indices[k-1, i, j]`` is the indice point of the ``k`` *interior* interface through which the ray (i,j) goes. fermat_path : FermatPath Sets of points crossed by the rays. order : None, 'C' or 'F' Force the order if the indices Attributes ---------- times indices_interior fermat_path indices : ndarray of floats [d x n x m] Indices of points through which each ray goes. For k=0:p, a ray starting from ``A(1)[i]`` and ending in ``A(d)[i]`` goes through the k-th interface at the point indexed by ``indices[k, i, j]``. By definition, ``indices[0, i, j] := i`` and ``indices[d-1, i, j] := j`` for all i and j. """ # __slots__ = [] def __init__( self, times, interior_indices, fermat_path, invalid_rays=None, order=None ): assert times.ndim == 2 assert interior_indices.ndim == 3 assert ( times.shape == interior_indices.shape[1:] == (len(fermat_path.points[0]), len(fermat_path.points[-1])) ) assert fermat_path.num_points_sets == interior_indices.shape[0] + 2 assert interior_indices.dtype.kind == "i" assert times.dtype.kind == "f" indices = self.make_indices(interior_indices, order=order) self._times = times self._indices = indices self._fermat_path = fermat_path self._invalid_rays = invalid_rays
[docs] @classmethod def make_rays_two_interfaces(cls, times, path, dtype_indices): """ Alternative constructor for Rays objects when there is only two interfaces, i.e. no interior interface. """ if path.num_points_sets != 2: raise ValueError( "This constructor works only for path with two interfaces. Use __init__ instead." ) n = len(path.points[0]) m = len(path.points[1]) interior_indices = np.zeros((0, n, m), dtype=dtype_indices) return cls(times, interior_indices, path)
@property def fermat_path(self): return self._fermat_path @property def times(self): return self._times @property def indices(self): return self._indices @property def interior_indices(self): return self.indices[1:-1, ...]
[docs] @staticmethod def make_indices(interior_indices, order=None): """ Parameters ---------- interior_indices : ndarray Shape (d, n, m) Returns ------- indices : ndarray Shape (n, m, d+2) such as: - indices[0, i, j] := i for all i, j - indices[-1, i, j] := j for all i, j - indices[k, i, j] := interior_indices[i, j, k+1] for all i, j and for k=1:(d-1) """ dm2, n, m = interior_indices.shape if order is None: if interior_indices.flags.c_contiguous: order = "C" elif interior_indices.flags.fortran: order = "F" else: order = "C" indices = np.zeros((dm2 + 2, n, m), dtype=interior_indices.dtype, order=order) indices[0, ...] = np.repeat(np.arange(n), m).reshape((n, m)) indices[-1, ...] = np.tile(np.arange(m), n).reshape((n, m)) indices[1:-1, ...] = interior_indices return indices
[docs] def get_coordinates(self, n_interface=None): """ Yields the coordinates of the rays of the n-th interface, as a tuple of three 2d ndarrays. Use numpy fancy indexing. Example ------- :: for (d, (x, y, z)) in enumerate(rays.get_coordinates()): # Coordinates at the d-th interface of the ray between ray A(1)[i] and # ray_A(d)[j]. x[i, j] y[i, j] z[i, j] """ if n_interface is None: for n_interface in range(self.fermat_path.num_points_sets): points = self.fermat_path.points[n_interface] indices = self.indices[n_interface, ...] x = points.x[indices] y = points.y[indices] z = points.z[indices] yield (x, y, z) else: points = self.fermat_path.points[n_interface] indices = self.indices[n_interface, ...] x = points.x[indices] y = points.y[indices] z = points.z[indices] yield (x, y, z)
[docs] def get_coordinates_one(self, start_index, end_index): """ Return the coordinates of one ray as ``Point``. This function is slow: use ``get_coordinates`` or a variant for treating a larger number of rays. """ indices = self.indices[:, start_index, end_index] num_points_sets = self.fermat_path.num_points_sets x = np.zeros(num_points_sets, s.FLOAT) y = np.zeros(num_points_sets, s.FLOAT) z = np.zeros(num_points_sets, s.FLOAT) for i, (points, j) in enumerate(zip(self.fermat_path.points, indices)): x[i] = points.x[j] y[i] = points.y[j] z[i] = points.z[j] return g.Points.from_xyz(x, y, z, "Ray")
[docs] def gone_through_extreme_points(self): """ Returns the rays which are going through at least one extreme point in the interfaces. These rays can be non physical, it is then safer to be conservative and remove them all. Extreme points are the first/last points (in indices) in the interfaces, except the first and last interfaces (respectively the points1 and the grid). Returns ------- out : ndarray of bool ``rays[i, j]`` is True if the rays starting from the i-th point of the first interface and going to the j-th point of the last interface is going through at least one extreme point through the middle interfaces. Order: same as attribute ``indices``. """ order = "F" if self.indices.flags.f_contiguous else "C" shape = self.indices.shape[1:] out = np.zeros(shape, order=order, dtype=bool) interior_indices = self.interior_indices middle_points = tuple(self.fermat_path.points)[1:-1] for d, points in enumerate(middle_points): np.logical_or(out, interior_indices[d, ...] == 0, out=out) np.logical_or(out, interior_indices[d, ...] == (len(points) - 1), out=out) return out
[docs] def gone_through_interface(self, path_interfaces, walls): """ Returns the rays which are going through at least one interface (i.e. in the line). Considering a convex geometry, these are rays which are physically blocked by the interface. Parameters ---------- path_interfaces : list[Interface] . walls : list[OrientedPoints] All of the walls in the geometry, passed in from ``examination_object.walls``. Returns ------- out : ndarray[bool] ``rays[i, j]`` is `True` if the ray starting from the i-th point of the first interface and going through the j-th point on the last interface is going through at least one other interface in the middle. """ order = "F" if self.indices.flags.f_contiguous else "C" shape = self.indices.shape[1:] out = np.zeros(shape, order=order, dtype=bool) if walls is not None: interface_names = [interface.points.name for interface in path_interfaces] loc_wrt_line = lambda a1, a2, y: ( (a2[0] - a1[0]) * (y[2] - a1[2]) - (y[0] - a1[0]) * (a2[2] - a1[2]) ) # Initialise but defined at end of first loop. last = None for d, coords in enumerate(self.get_coordinates()): coords = np.stack(coords) if d != 0: for name, wall in walls.items(): if (name in interface_names) or ( name.lower() == "frontwall" and "Probe" in interface_names ): continue wall = wall.points # Check if bounding boxes intersect. is_overlap = ( ( np.min((last[0, :, :], coords[0, :, :]), axis=0) <= np.max(wall[:, 0]) ) & ( np.max((last[0, :, :], coords[0, :, :]), axis=0) >= np.min(wall[:, 0]) ) & ( np.min((last[1, :, :], coords[1, :, :]), axis=0) <= np.max(wall[:, 1]) ) & ( np.max((last[1, :, :], coords[1, :, :]), axis=0) >= np.min(wall[:, 1]) ) & ( np.min((last[2, :, :], coords[2, :, :]), axis=0) <= np.max(wall[:, 2]) ) & ( np.max((last[2, :, :], coords[2, :, :]), axis=0) >= np.min(wall[:, 2]) ) ) # If any bboxes are overlapping, check the rest. if is_overlap.any(): # Prep to check if segments overlap for segment_start, segment_end in zip(wall[:-1], wall[1:]): whereis_last_wrt_wall = loc_wrt_line( segment_start, segment_end, last ) whereis_coords_wrt_wall = loc_wrt_line( segment_start, segment_end, coords ) whereis_wallmin_wrt_ray = loc_wrt_line( last, coords, segment_start ) whereis_wallmax_wrt_ray = loc_wrt_line( last, coords, segment_end ) ray_touches_or_crosses_wall = ( (whereis_last_wrt_wall < 0) ^ (whereis_coords_wrt_wall < 0) ) | ( ( np.abs(whereis_last_wrt_wall) < np.finfo(float).eps ) | ( np.abs(whereis_coords_wrt_wall) < np.finfo(float).eps ) ) wall_touches_or_crosses_ray = ( (whereis_wallmin_wrt_ray < 0) ^ (whereis_wallmax_wrt_ray < 0) ) | ( ( np.abs(whereis_wallmin_wrt_ray) < np.finfo(float).eps ) | ( np.abs(whereis_wallmax_wrt_ray) < np.finfo(float).eps ) ) is_intersection = ( is_overlap & ray_touches_or_crosses_wall & wall_touches_or_crosses_ray ) np.logical_or(out, is_intersection, out=out) # If no overlaps at all, return early as the above is quite slow. else: np.logical_or(out, is_overlap, out=out) last = coords return out
[docs] def to_fortran_order(self): """ Returns a Ray object with the . TFM objects except to have lookup times indexed by (grid_idx, probe_idx) whereas the arrays in this object are indexed by (probe_idx, grid_idx). By converting them to Fortran array, their transpose are C-contiguous and indexed as expected by TFM objects. Returns ------- Rays """ return self.__class__( np.asfortranarray(self.times), np.asfortranarray(self.interior_indices), self.fermat_path, self._invalid_rays, "F", )
[docs] @staticmethod def expand_rays(interior_indices, indices_new_interface): """ Expand the rays by one interface knowing the beginning of the rays and the points the rays must go through at the last interface. A0, A1, ..., A(d+1) are (d+2) interfaces. n: number of points of interface A0 m: number of points of interface Ad p: number of points of interface A(d+1) For more information on ``interior_indices``, see the documentation of ``Rays``. Parameters ---------- interior_indices: *interior* indices of rays going from A(0) to A(d). Shape: (d, n, m) indices_new_interface: indices of the points of interface A(d) that the rays starting from A(0) cross to go to A(d+1). Shape: (n, p) Returns ------- expanded_indices Shape (d+1, n, p) """ d, n, m = interior_indices.shape n_, p = indices_new_interface.shape if n != n_: raise ValueError("Inconsistent shapes") if d == 0: new_shape = (1, *indices_new_interface.shape) return indices_new_interface.reshape(new_shape) else: expanded_indices = np.empty((d + 1, n, p), dtype=interior_indices.dtype) _expand_rays(interior_indices, indices_new_interface, expanded_indices) return expanded_indices
[docs] def reverse(self, order="f"): """ Returns a new Rays object which corresponds to the reversed path. Parameters ---------- order : str Order of the arrays 'times' and 'indices'. Default: 'f' Returns ------- reversed_rays : Rays """ reversed_times = np.asarray(self.times.T, order=order) # Input x of shape (d, n, m) # Output y of shape(d, m, n) such as ``x[k, i, j] == y[d - k, j, i]`` reversed_indices = np.swapaxes(self.interior_indices, 1, 2) reversed_indices = reversed_indices[::-1, ...] reversed_indices = np.asarray(reversed_indices, order=order) reversed_path = self.fermat_path.reverse() return self.__class__(reversed_times, reversed_indices, reversed_path)
[docs] class FermatPath(tuple): """ FermatPath(points_and_speeds) This object contain the interface points through which the pass during the propagation and the speeds between the consecutive interfaces. This object should be used only for the internal plumbing of FermatSolver. This object can be obtained from a (smarter) :class:`Path` object via the class method :meth:`FermatPath.from_path`. A FermatPath must starts and ends with Points objects. Speeds (stored as float) and Points must alternate. Ex: FermatPath((points_1, speed_1_2, points_2, speed_2_3, points_3)) """ def __new__(cls, sequence): if len(sequence) % 2 == 0 or len(sequence) < 3: raise ValueError( f"{cls.__name__} expects a sequence of length odd and >= 5)" ) assert all(np.isfinite(sequence[1::2])), "nonfinite velocity" return super().__new__(cls, sequence)
[docs] @classmethod def from_path(cls, path): """ Create a FermatPath object from a (smarter) Path object. """ path_pieces = [] for interface, material, mode in zip( path.interfaces, path.materials, path.modes ): velocity = material.velocity(mode) path_pieces.append(interface.points) path_pieces.append(velocity) path_pieces.append(path.interfaces[-1].points) return cls(path_pieces)
def __repr__(self): return "{}({})".format( self.__class__.__name__, ", ".join([str(x) for x in self]) ) def __add__(self, tail): if self[-1] != tail[0]: raise ValueError("Cannot join two subpaths with different extremities.") return self.__class__((*self, *tail[1:]))
[docs] def reverse(self): return self.__class__(tuple(reversed(self)))
[docs] def split_head(self): """ Split a Path in two at the first interface: ``(points_1, speed_1_2, points_2)`` and ``(points_2, speed_2_3, ..., points_n)``. """ if len(self) < 5: raise ValueError("Not enough elements to split (min: 5)") head = self.__class__(self[:3]) tail = self.__class__(self[2:]) return head, tail
[docs] def split_queue(self): """ Split a Path in two at the last interface: ``(points_1, speed_1_2, ... points_n1)`` and ``(points_n1, speed, points_n)``. """ if len(self) < 5: raise ValueError("Not enough elements to split (min: 5)") head = self.__class__(self[:-2]) tail = self.__class__(self[-3:]) return head, tail
@property def points(self): """ Returns all the Points objects in Path as a tuple. """ return tuple(self[0::2]) @property def velocities(self): return tuple(self[1::2]) @property def num_points_sets(self): return len(self) // 2 + 1 @property def len_largest_interface(self): """ Excluse first and last dataset """ all_points = tuple(self.points) interfaces = all_points[1:-1] if not interfaces: return 0 else: return max([len(x) for x in interfaces])
[docs] class FermatSolver: """ Solver: take as input the interfaces, give as output the ray paths. General usage: instantiate object, then call method ``solve`` (or ``solve_no_clean`` to keep intermediary results). Results are stored in attributes ``res``. Parameters ---------- paths : set of FermatPath Paths which will be solved. Solving several paths at a time allows an efficient caching. dtype : numpy.dtype Datatype for times and distances. Optional, default: settings.FLOAT dtype_indices : numpy.dtype Datatype for indices. Optional, default: use the smallest unsigned integers that fits. Attributes ---------- res : dictionary Rays stored as ``Rays`` objects, indexed by the ``paths``. paths Cf. above. dtype Cf. above. dtype_indices Cf. above. cached_distance : dict Keys: tuple of Points (points1, points2). Values: euclidean distance between all points of 'points1' and all points of 'points2'. cached_result : dict Keys: Path. Values: _FermatSolverResult """ def __init__(self, fermat_paths_set, dtype=None, dtype_indices=None): if dtype is None: dtype = s.FLOAT if dtype_indices is None: dtype_indices = s.INT for path in fermat_paths_set: try: hash(path) except TypeError as e: raise TypeError("Path must be hashable.") from e self.dtype = dtype self.dtype_indices = dtype_indices self.clear_cache() self.res = {} self.paths = fermat_paths_set self.num_minimization = 0 self.num_euc_distance = 0
[docs] @classmethod def from_views(cls, views_list, dtype=None, dtype_indices=None): """ Create a FermatSolver from a list of views (alternative constructor). Parameters ---------- views : list of Views dtype : numpy.dtype or None dtype_indices : numpy.dtype or None Returns ------- """ paths = set( path for v in views_list for path in (v.tx_path.to_fermat_path(), v.rx_path.to_fermat_path()) ) return cls(paths, dtype=dtype, dtype_indices=dtype_indices)
[docs] def solve(self): """ Compute the rays for all paths and store them in ``self.res``. """ self.solve_no_clean() self.clear_cache() return self.res
[docs] def solve_no_clean(self): """ Compute the rays for all paths and store them in ``self.res``. """ tic = time.perf_counter() for path in self.paths: self.res[path] = self._solve(path) toc = time.perf_counter() logger.info(f"Ray tracing: solved all in {toc - tic:.3g}s") return self.res
def _solve(self, path): """ Returns the rays starting from the first interface and last interface of ``path``. This function is recursive. Intermediate results are stored in self.cached_result and self.cached_distance. Warning: it is not safe to call this with a Path not passed to __init__ because of possible overflows. Returns ------- res : Rays """ if path in self.cached_result: # Cache hits, hourray return self.cached_result[path] # Special case if we have only two (consecutive) boundaries: if len(path) == 3: return self.consecutive_times(path) # General case: compute by calling _solve() recursively: head, tail = path.split_queue() res_head = self._solve(head) res_tail = self._solve(tail) assert isinstance(res_head, Rays) assert isinstance(res_tail, Rays) self.num_minimization += 1 logger.debug(f"Ray tracing: solve for subpaths {str(head)} and {str(tail)}") times, indices_at_interface = find_minimum_times( res_head.times, res_tail.times, dtype=self.dtype, dtype_indices=self.dtype_indices, ) assert res_tail.fermat_path.num_points_sets == 2 indices = Rays.expand_rays(res_head.interior_indices, indices_at_interface) del indices_at_interface # no more useful res = Rays(times, indices, path) self.cached_result[path] = res return res
[docs] def clear_cache(self): self.cached_distance = Cache() self.cached_result = Cache() gc.collect() # force the garbage collector to delete unreferenced objects
[docs] def consecutive_times(self, path): """Computes the rays between two consecutive sets of points. This is straight forward: each ray is a straight line; ray lengths are obtained by taking the Euclidean distances between points. Cache the distance array in the two directions: points1 to points2, points 2 to points1. Returns a ``Rays`` object. """ points1, speed, points2 = path key = (points1, points2) try: distance = self.cached_distance[key] except KeyError: self.num_euc_distance += 1 distance = g.distance_pairwise(points1, points2, dtype=self.dtype) rkey = (points1, points2) self.cached_distance[key] = distance if key != rkey: # if points1 and points2 are the same! self.cached_distance[rkey] = distance.T return Rays.make_rays_two_interfaces(distance / speed, path, self.dtype_indices)
@numba.vectorize( ["float64(float64, float64)", "float32(float32, float32)"], nopython=True, target="parallel", ) def _signed_leg_angle(polar, azimuth): pi2 = np.pi / 2 if -pi2 < azimuth <= polar: return polar else: return -polar def _to_readonly(x): if isinstance(x, np.ndarray): x.flags.writeable = False return elif isinstance(x, g.Points): x.coords.flags.writeable = False return elif x is None: return else: try: xlist = list(x) except TypeError: raise TypeError(f"unhandled type: {type(x)}") else: for x in xlist: _to_readonly(x) def _cache_ray_geometry(user_func): """ Cache decorator companion for RayGeometry. Numpy arrays are set to read-only to avoid mistakes. Parameters ---------- user_func Returns ------- """ def wrapper(self, interface_idx, is_final=True): """ Parameters ---------- self : RayGeometry interface_idx : int is_final : bool Returns ------- """ actual_interface_idx = self._interface_indices[interface_idx] key = f"{user_func.__name__}:{actual_interface_idx}" try: # Cache hit res = self._cache[key] except KeyError: pass else: if is_final: # Promote to final result if necessary self._final_keys.add(key) return res # Cache miss: compute and save result res = user_func(self, interface_idx=interface_idx) # Save in cache: _to_readonly(res) self._cache[key] = res if is_final: self._final_keys.add(key) return res return wrapper
[docs] class RayGeometry: """ RayGeometry computes the leg sizes and various angles in rays. RayGeometry uses an internal cache during the computation which reduces the computational time. The cache has two levels: intermediate results and final results. """ def __init__(self, interfaces, rays, use_cache=True): """ Parameters ---------- interfaces : Sized of Interface rays : Rays use_cache : bool Default: True """ numinterfaces = len(interfaces) self.interfaces = interfaces self.rays = rays assert rays.fermat_path.points == tuple( i.points for i in interfaces ), "Inconsistent rays and interfaces" # self.legs = [] * path.numlegs # self.incoming_legs = [None] + [] * path.numlegs # self.outgoing_legs = [] * (path.numlegs - 1) + [None] # Used for caching via _cache_ray_geometry wrapper self._use_cache = use_cache self._cache = Cache() if use_cache else NoCache() self._final_keys = set() self._interface_indices = tuple(range(numinterfaces))
[docs] @classmethod def from_path(cls, path, use_cache=True): """ Parameters ---------- path : Path use_cache : bool Optional Returns ------- """ if path.rays is None: raise ValueError("Rays must be computed first.") return cls(path.interfaces, path.rays, use_cache=use_cache)
@property def numinterfaces(self): return len(self.interfaces) @property def numlegs(self): return len(self.interfaces) - 1
[docs] @contextlib.contextmanager def precompute(self): """ Context manager for cleaning intermediate results after execution. Example ------- >>> ray_geometry = RayGeometry.from_path(path) >>> with ray_geometry.precompute(): ... ray_geometry.inc_angle(1) ... ray_geometry.signed_inc_angle(1) ... # At this stage, the two results are stored in cache and the intermadiate ... # results are discarded. >>> ray_geometry.inc_angle(1) # fetch from cache """ if not self._use_cache: warnings.warn( "Caching is not enabled therefore precompute() will not work.", ArimWarning, stacklevel=2, ) yield self.clear_intermediate_results()
[docs] @_cache_ray_geometry def leg_points(self, interface_idx): """ Returns the coordinates (x, y, z) in the GCS of the points crossed by the rays at a given interface. ``points[i, j]`` contains the (x, y, z) coordinates of the points through which the ray (i, j) goes at the interface of index ``interface_idx``. Parameters ---------- interface_idx : int Returns ------- points : Points """ interfaces = self.interfaces points = interfaces[interface_idx].points legs_points = points.coords.take(self.rays.indices[interface_idx], axis=0) return g.aspoints(legs_points)
[docs] @_cache_ray_geometry def orientations_of_legs_points(self, interface_idx): """ ``orientations[i, j]`` is the basis (3x3 orthogonal matrix) attached to the point through which the ray (i, j) goes at the interface of index ``interface_idx``. Parameters ---------- interface_idx Returns ------- orientations : Points """ orientations_all_points = self.interfaces[interface_idx].orientations rays_indices = self.rays.indices[interface_idx] orientations = orientations_all_points.coords.take(rays_indices, axis=0) return g.aspoints(orientations)
[docs] def clear_intermediate_results(self): """ Clear cache containing intermediate (non-final) results. """ keys_to_flush = [key for key in self._cache if key not in self._final_keys] for key in keys_to_flush: self._cache.pop(key, None)
[docs] def clear_all_results(self): """ Clear the whole cache (intermediate and final results). """ self._cache = self._cache.__class__() self._final_keys = set()
[docs] @_cache_ray_geometry def inc_leg_size(self, interface_idx): """ Compute the size of the incoming leg. Gives the same result as :meth:`RayGeometry.inc_leg_radius` but does not rely on the spherical coordinates, therefore this method is faster when spherical coordinates are not required elsewhere. Parameters ---------- interface_idx Returns ------- """ if interface_idx == 0: return None legs_starts = self.leg_points(interface_idx - 1, is_final=False).coords legs_ends = self.leg_points(interface_idx, is_final=False).coords legs = g.Points(legs_starts - legs_ends) return legs.norm2()
[docs] @_cache_ray_geometry def inc_leg_cartesian(self, interface_idx): """ Returns the coordinates of the source points of the legs that arrive at the interface of index ``interface_idx``. The coordinates of each leg are given in the Cartesian coordinate system attached to the its end point (i.e. point on interface interface_idx). No incoming legs towards the first interface. Parameters ---------- interface_idx : int Returns ------- points : Points or None None for the first interface. """ if interface_idx == 0: return None legs_starts = self.leg_points(interface_idx - 1, is_final=False).coords legs_ends = self.leg_points(interface_idx, is_final=False).coords orientations = self.orientations_of_legs_points( interface_idx, is_final=False ).coords legs_local = g.from_gcs(legs_starts, orientations, legs_ends) return g.aspoints(legs_local)
[docs] @_cache_ray_geometry def inc_leg_radius(self, interface_idx): """ Returns the radius (length) of the leg incoming to interface interface_idx. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.inc_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None return g.spherical_coordinates_r(cartesian.x, cartesian.y, cartesian.z)
[docs] @_cache_ray_geometry def inc_leg_polar(self, interface_idx): """ Returns the polar angle in [0, pi] of the leg incoming to interface interface_idx. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.inc_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None radius = self.inc_leg_radius(interface_idx, is_final=False) return g.spherical_coordinates_theta(cartesian.z, radius)
[docs] @_cache_ray_geometry def inc_leg_azimuth(self, interface_idx): """ Returns the corrected polar angle in [0, pi] of the leg incoming to interface interface_idx. Corresponds to the polar angle if the normals of the interface are on the same side as the incoming legs, pi/2 minus polar angle otherwise. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.inc_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None return g.spherical_coordinates_phi(cartesian.x, cartesian.y)
[docs] @_cache_ray_geometry def inc_angle(self, interface_idx): return self.inc_leg_polar(interface_idx, is_final=False)
[docs] @_cache_ray_geometry def signed_inc_angle(self, interface_idx): azimuth = self.inc_leg_azimuth(interface_idx, is_final=False) if azimuth is None: return None polar = self.inc_leg_polar(interface_idx, is_final=False) return _signed_leg_angle(polar, azimuth)
[docs] @_cache_ray_geometry def conventional_inc_angle(self, interface_idx): if interface_idx == 0: return None are_normals_on_inc_rays_side = self.interfaces[ interface_idx ].are_normals_on_inc_rays_side if are_normals_on_inc_rays_side is None: raise ValueError( "Attribute are_normals_on_inc_rays_side must be set for the" f"interface {interface_idx}." ) elif are_normals_on_inc_rays_side: return self.inc_leg_polar(interface_idx, is_final=False) else: # out = pi - theta out = self.inc_leg_polar(interface_idx, is_final=False).copy() out[...] *= -1 out[...] += np.pi return out
[docs] @_cache_ray_geometry def out_leg_cartesian(self, interface_idx): """ Returns the coordinates of the destination points of the legs that start from the interface of index ``interface_idx``. The coordinates of each leg are given in the Cartesian coordinate system attached to the its start point (i.e. point on interface interface_idx). No outgoing legs from the last interface. Parameters ---------- interface_idx : int Returns ------- points : Points or None None for the last interface. """ actual_interface_idx = self._interface_indices[interface_idx] if actual_interface_idx == (self.numinterfaces - 1): # No outgoing legs from the last interface return None legs_starts = self.leg_points(interface_idx, is_final=False).coords legs_ends = self.leg_points(interface_idx + 1, is_final=False).coords orientations = self.orientations_of_legs_points( interface_idx, is_final=False ).coords # Convert legs in the local coordinate systems. legs_local = g.from_gcs(legs_ends, orientations, legs_starts) return g.aspoints(legs_local)
[docs] @_cache_ray_geometry def out_leg_radius(self, interface_idx): """ Returns the radius (length) of the leg outgoing from interface interface_idx. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.out_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None return g.spherical_coordinates_r(cartesian.x, cartesian.y, cartesian.z)
[docs] @_cache_ray_geometry def out_leg_polar(self, interface_idx): """ Returns the polar angle in [0, pi] of the leg outgoing from interface interface_idx. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.out_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None radius = self.out_leg_radius(interface_idx, is_final=False) return g.spherical_coordinates_theta(cartesian.z, radius)
[docs] @_cache_ray_geometry def out_leg_azimuth(self, interface_idx): """ Returns the corrected polar angle in [0, pi] of the leg outgoing from interface interface_idx. Corresponds to the polar angle if the normals of the interface are on the same side as the outgoing legs, pi/2 minus polar angle otherwise. Use spherical coordinate system attached to the end point of the leg. Parameters ---------- interface_idx : int Returns ------- ndarray """ cartesian = self.out_leg_cartesian(interface_idx, is_final=False) if cartesian is None: return None return g.spherical_coordinates_phi(cartesian.x, cartesian.y)
[docs] @_cache_ray_geometry def out_angle(self, interface_idx): return self.out_leg_polar(interface_idx, is_final=False)
[docs] @_cache_ray_geometry def signed_out_angle(self, interface_idx): azimuth = self.out_leg_azimuth(interface_idx, is_final=False) if azimuth is None: return None polar = self.out_leg_polar(interface_idx, is_final=False) return _signed_leg_angle(polar, azimuth)
[docs] @_cache_ray_geometry def conventional_out_angle(self, interface_idx): actual_interface_idx = self._interface_indices[interface_idx] if actual_interface_idx == (self.numinterfaces - 1): return None are_normals_on_out_rays_side = self.interfaces[ interface_idx ].are_normals_on_out_rays_side if are_normals_on_out_rays_side is None: raise ValueError( "Attribute are_normals_on_out_rays_side must be set for the" f"interface {interface_idx}." ) elif are_normals_on_out_rays_side: return self.out_leg_polar(interface_idx, is_final=False) else: # out = pi - theta out = self.out_leg_polar(interface_idx, is_final=False).copy() out[...] *= -1 out[...] += np.pi return out