Source code for arim.im.das

"""
Delay-and-sum functions

Contains different implementation of delay-and-sum algorithm.

.. currentmodule:: arim.im.das

Examples
--------

::

    res = das.delay_and_sum(frame, focal_law, fillvalue=0.0)
    res = das.delay_and_sum(frame, focal_law, fillvalue=np.nan)
    res = das.delay_and_sum(frame, focal_law, interpolation="nearest")
    res = das.delay_and_sum(frame, focal_law, interpolation="nearest", aggregation="median")
    res = das.delay_and_sum(frame, focal_law, interpolation="nearest", aggregation=("huber", 1.5))
    res = das.delay_and_sum(frame, focal_law, interpolation="linear")
    res = das.delay_and_sum(frame, focal_law, interpolation=("lanczos", 3))
    res = das.delay_and_sum(frame, focal_law, interpolation=("lanczos", 3), aggregation="median")


Data structures
---------------

- ``lookup_times_tx``: ndarray of shape (numgridpoints, numtx)
- ``lookup_times_rx``: ndarray of shape (numgridpoints, numrx)
- ``amplitudes_tx``: ndarray of shape (numgridpoints, numtx)
- ``amplitudes_tx``: ndarray of shape (numgridpoints, numrx)
- ``amplitudes``: TxRxAmplitudes or ndarray (numgridpoints, numtimetraces) or None

"""

import logging
import math
import os

import numba
import numpy as np

from . import geomed, huber

logger = logging.getLogger(__name__)

use_parallel = os.environ.get("ARIM_USE_PARALLEL", not numba.core.config.IS_32BITS)


class NotImplementedTyping(NotImplementedError, TypeError):
    pass


def _check_shapes(frame, focal_law):
    numtimetraces = frame.numtimetraces
    numpoints, numtx = focal_law.lookup_times_tx.shape
    _, numrx = focal_law.lookup_times_rx.shape

    if focal_law.amplitudes is not None:
        assert focal_law.amplitudes.amplitudes_tx.shape == (numpoints, numtx)
        assert focal_law.amplitudes.amplitudes_rx.shape == (numpoints, numrx)
        assert focal_law.amplitudes.amplitudes_tx.flags.c_contiguous
        assert focal_law.amplitudes.amplitudes_rx.flags.c_contiguous

    assert frame.tx.shape == (numtimetraces,)
    assert frame.rx.shape == (numtimetraces,)

    assert focal_law.lookup_times_tx.flags.c_contiguous
    assert focal_law.lookup_times_rx.flags.c_contiguous
    assert frame.timetraces.flags.c_contiguous
    assert frame.tx.flags.c_contiguous
    assert frame.rx.flags.c_contiguous


[docs] def delay_and_sum_numba( frame, focal_law, fillvalue=0.0, interpolation="nearest", aggregation="mean", result=None, ): """ Delay-and-sum function for non-uniform amplitudes Parameters ---------- frame : Frame focal_law : FocalLaw fillvalue : float Default: 0. interpolation : str Interpolation of timetraces between samples. "linear", "nearest" aggregation : str Only "mean" supported. result : ndarray Write on it if provided. Returns ------- result : ndarray (numpoints, ) """ # numtimetraces = frame.numtimetraces numpoints, _ = focal_law.lookup_times_tx.shape _check_shapes(frame, focal_law) weighted_timetraces = focal_law.weigh_timetraces(frame.timetraces) dtype_float, dtype_amp, dtype_data = _infer_datatypes( weighted_timetraces, focal_law, result ) aggregation = aggregation.lower() if aggregation != "mean": raise NotImplementedError if result is None: result = np.full((numpoints,), 0, dtype=dtype_data) assert result.shape == (numpoints,) if interpolation.lower() == "nearest": delay_and_sum_function = _delay_and_sum_amplitudes_nearest elif interpolation.lower() == "linear": delay_and_sum_function = _delay_and_sum_amplitudes_linear else: raise ValueError("invalid 'interpolation' argument") logger.debug(f"Delay-and-sum function: {delay_and_sum_function.__name__}") delay_and_sum_function( weighted_timetraces, frame.tx, frame.rx, focal_law.lookup_times_tx, focal_law.lookup_times_rx, focal_law.amplitudes.amplitudes_tx, focal_law.amplitudes.amplitudes_rx, frame.time.step, frame.time.start, fillvalue, result, ) return result
def _infer_datatypes( timetraces, focal_law, result, dtype_float=None, dtype_amp=None, dtype_data=None ): """ Returns ------- dtype_float: datatype of times only values (time) dtype_amp: datatype of TFM amplitudes dtype_data: datatype that suits both the input timetraces and the results """ if dtype_float is None: dtype_float = np.result_type( focal_law.lookup_times_tx, focal_law.lookup_times_rx ) if dtype_amp is None: if focal_law.amplitudes is None: has_amp = False dtype_amp = None else: has_amp = True dtype_amp = focal_law.amplitudes.dtype if dtype_data is None: if has_amp: data_arrays = [timetraces, dtype_amp] else: data_arrays = [timetraces] if result is not None: data_arrays.append(result) dtype_data = np.result_type(*data_arrays) return dtype_float, dtype_amp, dtype_data @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_amplitudes_nearest( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, amplitudes_tx, amplitudes_rx, dt, t0, fillvalue, result, ): """ Numba implementation of the delay and sum algorithm, using nearest time point match. Parameters ---------- weighted_timetraces : ndarray [numtimetraces x numsamples] lookup_times_tx : ndarray [numpoints x numtx] Times of flight (floats) between the transmitters and the grid points. lookup_times_rx : ndarray [numpoints x numrx] Times of flight (floats) between the grid points and the receivers. amplitudes_tx : ndarray [numpoints x numtx] amplitudes_rx : ndarray [numpoints x numrx] result : ndarray [numpoints] Result. tx, rx : ndarray [numtimetraces] Mapping between the timetraces and the transmitter/receiver. Values: integers in [0, numtx[ and [0, numrx] respectively Returns ------- None """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = round((lookup_time - t0) / dt) if lookup_index < 0 or lookup_index >= numsamples: res_tmp += fillvalue else: res_tmp += ( amplitudes_tx[point, tx[scan]] * amplitudes_rx[point, rx[scan]] * weighted_timetraces[scan, lookup_index] ) result[point] = res_tmp / numtimetraces @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_amplitudes_linear( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, amplitudes_tx, amplitudes_rx, dt, t0, fillvalue, result, ): """ Numba implementation of the delay and sum algorithm, using linear interpolation for time point. Parameters ---------- weighted_timetraces : ndarray [numtimetraces x numsamples] lookup_times_tx : ndarray [numpoints x numtx] Times of flight (floats) between the transmitters and the grid points. lookup_times_rx : ndarray [numpoints x numrx] Times of flight (floats) between the grid points and the receivers. amplitudes_tx : ndarray [numpoints x numtx] amplitudes_rx : ndarray [numpoints x numrx] result : ndarray [numpoints] Result. tx, rx : ndarray [numtimetraces] Mapping between the timetraces and the transmitter/receiver. Values: integers in [0, numtx[ and [0, numrx] respectively Returns ------- None """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) loc1 = (lookup_time - t0) / dt lookup_index = int(loc1) frac1 = loc1 - lookup_index lookup_index1 = lookup_index + 1 if lookup_index < 0 or lookup_index1 >= numsamples: res_tmp += fillvalue else: lscanVal = weighted_timetraces[scan, lookup_index] lscanVal1 = weighted_timetraces[scan, lookup_index1] lscanUseVal = lscanVal + frac1 * (lscanVal1 - lscanVal) res_tmp += ( amplitudes_tx[point, tx[scan]] * amplitudes_rx[point, rx[scan]] * lscanUseVal ) result[point] = res_tmp / numtimetraces @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_amplitudes_linear( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, amplitudes_tx, amplitudes_rx, dt, t0, fillvalue, result, ): """ Numba implementation of the delay and sum algorithm, using linear interpolation for time point. Parameters ---------- weighted_timetraces : ndarray [numtimetraces x numsamples] lookup_times_tx : ndarray [numpoints x numtx] Times of flight (floats) between the transmitters and the grid points. lookup_times_rx : ndarray [numpoints x numrx] Times of flight (floats) between the grid points and the receivers. amplitudes_tx : ndarray [numpoints x numtx] amplitudes_rx : ndarray [numpoints x numrx] result : ndarray [numpoints] Result. tx, rx : ndarray [numtimetraces] Mapping between the timetraces and the transmitter/receiver. Values: integers in [0, numtx[ and [0, numrx] respectively Returns ------- None """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) loc1 = (lookup_time - t0) / dt lookup_index = int(loc1) frac1 = loc1 - lookup_index lookup_index1 = lookup_index + 1 if lookup_index < 0 or lookup_index1 >= numsamples: res_tmp += fillvalue else: lscanVal = weighted_timetraces[scan, lookup_index] lscanVal1 = weighted_timetraces[scan, lookup_index1] lscanUseVal = lscanVal + frac1 * (lscanVal1 - lscanVal) res_tmp += ( amplitudes_tx[point, tx[scan]] * amplitudes_rx[point, rx[scan]] * lscanUseVal ) result[point] = res_tmp / numtimetraces
[docs] def delay_and_sum_numba_noamp( frame, focal_law, fillvalue=0.0, interpolation="nearest", aggregation="mean", result=None, ): """ Delay and sum with uniform amplitudes Parameters ---------- frame focal_law fillvalue interpolation : str Interpolation of timetraces between samples. 'linear', 'nearest', ('lanczos', a) result Returns ------- result : ndarray (numpoints, ) """ numpoints, _ = focal_law.lookup_times_tx.shape _check_shapes(frame, focal_law) weighted_timetraces = focal_law.weigh_timetraces(frame.timetraces) dtype_float, dtype_amp, dtype_data = _infer_datatypes( weighted_timetraces, focal_law, result ) if result is None: result = np.full((numpoints,), 0, dtype=dtype_data) assert result.shape == (numpoints,) invdt = 1 / frame.time.step t0 = frame.time.start if isinstance(interpolation, str): interpolation_name = interpolation.lower() interpolation_args = () else: interpolation_name = interpolation[0].lower() interpolation_args = interpolation[1:] if isinstance(aggregation, str): aggregation_name = aggregation.lower() aggregation_args = () else: aggregation_name = aggregation[0].lower() aggregation_args = aggregation[1:] if aggregation_name == "mean": if interpolation_name == "nearest": das_func = _delay_and_sum_noamp assert len(interpolation_args) == 0 elif interpolation_name == "linear": das_func = _delay_and_sum_noamp_linear assert len(interpolation_args) == 0 elif interpolation_name == "lanczos": das_func = _delay_and_sum_noamp_lanczos assert len(interpolation_args) == 1 else: raise ValueError("invalid interpolation") elif aggregation_name == "median": if dtype_data != np.complex128: raise NotImplementedTyping if interpolation_name == "lanczos": das_func = _delay_and_sum_noamp_median_lanczos assert len(interpolation_args) == 1 elif interpolation_name == "nearest": das_func = _delay_and_sum_noamp_median_nearest assert len(interpolation_args) == 0 else: raise NotImplementedError elif aggregation_name == "huber": if dtype_data != np.complex128: raise NotImplementedTyping if interpolation_name == "lanczos": das_func = _delay_and_sum_noamp_huber_lanczos assert len(interpolation_args) == 1 else: raise NotImplementedError das_func( weighted_timetraces, frame.tx, frame.rx, focal_law.lookup_times_tx, focal_law.lookup_times_rx, invdt, t0, fillvalue, *interpolation_args, *aggregation_args, result, ) return result
# todo: add cache=True if it becomes compatible with parallel=True (numba) @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, result, ): # Mean aggregation, nearest interpolation numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = round((lookup_time - t0) * invdt) if lookup_index < 0 or lookup_index >= numsamples: res_tmp += fillvalue else: res_tmp += weighted_timetraces[scan, lookup_index] result[point] = res_tmp / numtimetraces @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp_median_nearest( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, result, ): numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): datapoints = np.empty(numtimetraces, weighted_timetraces.dtype) for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = round((lookup_time - t0) * invdt) if lookup_index < 0 or lookup_index >= numsamples: datapoints[scan] = fillvalue else: datapoints[scan] = weighted_timetraces[scan, lookup_index] # I don't know how to statically cast complex64 to float32, and # complex128 to float64 :( # Have to impose dtype meanwhile. res, _ = geomed.geomed(datapoints.view(np.float64).reshape((numtimetraces, 2))) result[point] = res.view(np.complex128)[0] @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp_linear( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, result, ): numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index_exact = (lookup_time - t0) * invdt lookup_index_left = int(lookup_index_exact) lookup_index_right = lookup_index_left + 1 frac = lookup_index_exact - lookup_index_left if lookup_index_left < 0 or lookup_index_right >= numsamples: res_tmp += fillvalue else: scan_val_left = weighted_timetraces[scan, lookup_index_left] scan_val_right = weighted_timetraces[scan, lookup_index_right] res_tmp += (1 - frac) * scan_val_left + frac * scan_val_right result[point] = res_tmp / numtimetraces
[docs] @numba.jit(nopython=True, cache=True, fastmath=True) def sinc(x): if x == 0: return 1.0 else: return math.sin(math.pi * x) / (math.pi * x)
[docs] @numba.jit(nopython=True, cache=True, fastmath=True) def lanczos_interpolation(t, x, a): i_min = math.floor(t) - a + 1 i_max = math.floor(t) + a + 1 # +1 because of how range() works n = len(x) out = 0.0 for i in range(i_min, i_max): out += x[i % n] * sinc(t - i) * sinc((t - i) / a) return out
@numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp_lanczos( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, a, result, ): """ Delay and sum with Lanczos interpolation with factor 'a' of the timetraces. https://en.wikipedia.org/wiki/Lanczos_resampling """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): res_tmp = 0.0 for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = (lookup_time - t0) * invdt if lookup_index < 0 or lookup_index >= numsamples: res_tmp += fillvalue else: res_tmp += lanczos_interpolation( lookup_index, weighted_timetraces[scan], a ) result[point] = res_tmp / numtimetraces @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp_median_lanczos( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, a, result, ): numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): datapoints = np.empty(numtimetraces, weighted_timetraces.dtype) for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = (lookup_time - t0) * invdt if lookup_index < 0 or lookup_index >= numsamples: datapoints[scan] = fillvalue else: datapoints[scan] = lanczos_interpolation( lookup_index, weighted_timetraces[scan], a ) # I don't know how to statically cast complex64 to float32, and # complex128 to float64 :( # Have to impose dtype meanwhile. res, _ = geomed.geomed(datapoints.view(np.float64).reshape((numtimetraces, 2))) result[point] = res.view(np.complex128)[0] @numba.jit(nopython=True, nogil=True, parallel=use_parallel, fastmath=True) def _delay_and_sum_noamp_huber_lanczos( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, invdt, t0, fillvalue, a, tau, result, ): numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in numba.prange(numpoints): datapoints = np.empty(numtimetraces, weighted_timetraces.dtype) for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = (lookup_time - t0) * invdt if lookup_index < 0 or lookup_index >= numsamples: datapoints[scan] = fillvalue else: datapoints[scan] = lanczos_interpolation( lookup_index, weighted_timetraces[scan], a ) # I don't know how to statically cast complex64 to float32, and # complex128 to float64 :( # Have to impose dtype meanwhile. res, _ = huber.huber_m_estimate( datapoints.view(np.float64).reshape((numtimetraces, 2)), tau ) result[point] = res.view(np.complex128)[0] @numba.jit(nopython=True, nogil=True, cache=True, fastmath=True) def _general_delay_and_sum_nearest( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, amplitudes, dt, t0, fillvalue, result, ): """ Numba implementation of the delay and sum algorithm using nearest interpolation for time point. Amplitudes are defined per timetrace instead of per element. This function is therefore more general but more memory-hungry. One amplitude per timetrace. Parameters ---------- weighted_timetraces : ndarray [numtimetraces x numsamples] lookup_times_tx : ndarray [numpoints x numtx] Times of flight (floats) between the transmitters and the grid points. lookup_times_rx : ndarray [numpoints x numrx] Times of flight (floats) between the grid points and the receivers. amplitudes : ndarray [numpoints x numtimetraces] result : ndarray [numpoints] Result. tx, rx : ndarray [numtimetraces] Mapping between the timetraces and the transmitter/receiver. Values: integers in [0, numtx[ and [0, numrx] Returns ------- None """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in range(numpoints): if np.isnan(result[point]): continue for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = int((lookup_time - t0) / dt + 0.5) if lookup_index < 0 or lookup_index >= numsamples: result[point] += fillvalue else: result[point] += ( amplitudes[point, scan] * weighted_timetraces[scan, lookup_index] ) result[point] /= numtimetraces @numba.jit(nopython=True, nogil=True, cache=True, fastmath=True) def _general_delay_and_sum_linear( weighted_timetraces, tx, rx, lookup_times_tx, lookup_times_rx, amplitudes, dt, t0, fillvalue, result, ): """ Numba implementation of the delay and sum algorithm, using linear interpolation for time point. Amplitudes are defined per timetrace instead of per element. This function is therefore more general but more memory-hungry. One amplitude per timetrace. Parameters ---------- weighted_timetraces : ndarray [numtimetraces x numsamples] lookup_times_tx : ndarray [numpoints x numtx] Times of flight (floats) between the transmitters and the grid points. lookup_times_rx : ndarray [numpoints x numrx] Times of flight (floats) between the grid points and the receivers. amplitudes : ndarray [numpoints x numtimetraces] result : ndarray [numpoints] Result. tx, rx : ndarray [numtimetraces] Mapping between the timetraces and the transmitter/receiver. Values: integers in [0, numtx[ and [0, numrx] Returns ------- None """ numtimetraces, numsamples = weighted_timetraces.shape numpoints, _ = lookup_times_tx.shape for point in range(numpoints): if np.isnan(result[point]): continue for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) loc1 = (lookup_time - t0) / dt lookup_index = int(loc1) frac1 = loc1 - lookup_index lookup_index1 = lookup_index + 1 if lookup_index < 0 or lookup_index1 >= numsamples: result[point] += fillvalue else: lscanVal = weighted_timetraces[scan, lookup_index] lscanVal1 = weighted_timetraces[scan, lookup_index1] lscanUseVal = lscanVal + frac1 * (lscanVal1 - lscanVal) result[point] += amplitudes[point, scan] * lscanUseVal result[point] /= numtimetraces
[docs] def delay_and_sum_naive( frame, focal_law, fillvalue=0.0, result=None, interpolate_position="nearest" ): """ Pure-Python implementation of delay and sum. This is a very slow implementation, use for test only. """ numtimetraces = frame.numtimetraces numpoints, _ = focal_law.lookup_times_tx.shape from . import tfm assert isinstance(focal_law.amplitudes, tfm.TxRxAmplitudes) _check_shapes(frame, focal_law) weighted_timetraces = focal_law.weigh_timetraces(frame.timetraces) _, _, dtype_data = _infer_datatypes(weighted_timetraces, focal_law, result) if result is None: result = np.full((numpoints,), 0, dtype=dtype_data) assert result.shape == (numpoints,) if interpolate_position != "nearest": raise NotImplementedError lookup_times_tx = focal_law.lookup_times_tx lookup_times_rx = focal_law.lookup_times_rx amplitudes_tx = focal_law.amplitudes.amplitudes_tx amplitudes_rx = focal_law.amplitudes.amplitudes_rx timetrace_weights = focal_law.timetrace_weights if timetrace_weights is None: timetrace_weights = np.ones(numtimetraces) timetraces = frame.timetraces tx = frame.tx rx = frame.rx t0 = frame.time.start dt = frame.time.step numsamples = len(frame.time) for point in range(numpoints): if np.isnan(result[point]): continue for scan in range(numtimetraces): lookup_time = ( lookup_times_tx[point, tx[scan]] + lookup_times_rx[point, rx[scan]] ) lookup_index = int(round((lookup_time - t0) / dt)) if lookup_index < 0 or lookup_index >= numsamples: result[point] += fillvalue else: result[point] += ( timetrace_weights[scan] * amplitudes_tx[point, tx[scan]] * amplitudes_rx[point, rx[scan]] * timetraces[scan, lookup_index] ) result[point] /= numtimetraces return result
[docs] def delay_and_sum(frame, focal_law, *args, **kwargs): """ Delay-and-sum timetraces .. warning: Not all combinations of parameters are implemented. Parameters ---------- frame focal_law fillvalue : float Value to use outside of time limits of the timetrace. interpolation : str "nearest", "linear", "lanczos" aggregation : str "mean", "median" Returns ------- result : ndarray (numpoints, ) See Also -------- :func:`delay_and_sum_numba` :func:`delay_and_sum_numba_noamp` """ from . import tfm if isinstance(focal_law.amplitudes, tfm.TxRxAmplitudes): return delay_and_sum_numba(frame, focal_law, *args, **kwargs) elif focal_law.amplitudes is None: return delay_and_sum_numba_noamp(frame, focal_law, *args, **kwargs) else: raise NotImplementedError