"""
Forward model of the inspection of a solid block in immersion
Calculating the frequency-domain **scatterer** responses:
- :func:`scat_unshifted_transfer_functions`: base function, no time-shift
- :func:`singlefreq_scat_transfer_functions`
- :func:`multifreq_scat_transfer_functions`
Calculating the frequency-domain **wall** responses:
- :func:`wall_unshifted_transfer_functions`: base function, no time-shift
- :func:`singlefreq_wall_transfer_functions`
- :func:`multifreq_wall_transfer_functions`
Boilerplate::
import arim.models.block_in_immersion as bim
probe_p = probe.to_oriented_points()
frontwall = \
arim.geometry.points_1d_wall_z(xmin, xmax, z_frontwall, numpoints)
backwall = \
arim.geometry.points_1d_wall_z(xmin, xmax, z_backwall, numpoints)
grid = arim.geometry.Grid(xmin, xmax, ymin, ymax, zmin, zmax, pixel_size)
grid_p = grid.to_oriented_points()
exam_obj = arim.BlockInImmersion(block_material, couplant_material,
frontwall, backwall)
views = bim.make_views(examination_object, probe_p,
grid_p, max_number_of_reflection=1,
tfm_unique_only=False)
Scattering precomputation
=========================
In the computation of the model, there are two options for scattering. The
first option is to pass functions, which are called for each pair of incident
and scatterer angles. The evaluation time per angle pair is in general almost
constant.
The second option is to pass scattering matrices, which are the function
outputs for a grid of incident and scatterer angle. Missing angles are obtained
by linear interpolation. The second option suffers from a loss of accuracy if
the number of angles used for evaluation is too small. The total evaluation
time is the sum of the precomputation time and the interpolation.
For a small number of angles to evaluate, passing the functions (option 1) is
often the most computationally efficient. For a large amount of angles to
evaluate, precomputing the scattering matrices (option 2) is often more
computationally efficient.
"""
import logging
import warnings
from collections import OrderedDict, namedtuple
from itertools import product
import numpy as np
from .. import core as c
from .. import helpers, model, ray, signal
from ..ray import RayGeometry
from .helpers import make_views_from_paths
logger = logging.getLogger(__name__)
_RayWeightsCommon = namedtuple(
"_RayWeightsCommon",
["couplant", "numgridpoints", "wavelength_in_couplant", "wavelengths_in_block"],
)
def _init_ray_weights(path, frequency, probe_element_width, use_directivity):
if path.rays is None:
raise ValueError("Ray tracing must have been performed first.")
couplant, block = path.materials[:2]
numgridpoints = len(path.interfaces[-1].points)
if use_directivity and probe_element_width is None:
raise ValueError("probe_element_width must be provided to compute directivity")
wavelength_in_couplant = couplant.longitudinal_vel / frequency
if block.transverse_vel is None:
wavelengths_in_block = dict(
[(c.Mode.L, block.longitudinal_vel / frequency), (c.Mode.T, float("nan"))]
)
else:
wavelengths_in_block = dict(
[
(c.Mode.L, block.longitudinal_vel / frequency),
(c.Mode.T, block.transverse_vel / frequency),
]
)
return _RayWeightsCommon(
couplant, numgridpoints, wavelength_in_couplant, wavelengths_in_block
)
[docs]
def tx_ray_weights(
path,
ray_geometry,
frequency,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
):
"""
Coefficients Q_i(r, omega) in forward model.
Parameters
----------
path : Path
ray_geometry : arim.ray.RayGeometry
frequency : float
probe_element_width : float or None
Mandatory if use_directivity is True
use_directivity : bool
Default True
use_beamspread : bool
Default True
use_transrefl : bool
Default: True
use_attenuation : bool
Default: True
turn_off_invalid_rays : bool
Default: False
Returns
-------
weights : ndarray
Shape (numelements, numgridpoints)
weights_dict : dict[str, ndarray]
Components of the ray weights: beamspread, directivity, transmission-reflection, attenuation
"""
d = _init_ray_weights(path, frequency, probe_element_width, use_directivity)
weights_dict = dict()
one = np.ones((len(path.interfaces[0].points), d.numgridpoints), order="F")
if use_directivity:
weights_dict[
"directivity"
] = model.directivity_2d_rectangular_in_fluid_for_path(
ray_geometry, probe_element_width, d.wavelength_in_couplant
)
else:
weights_dict["directivity"] = one
if use_transrefl:
weights_dict["transrefl"] = model.transmission_reflection_for_path(
path, ray_geometry, unit="displacement"
)
else:
weights_dict["transrefl"] = one
if use_beamspread:
weights_dict["beamspread"] = model.beamspread_2d_for_path(ray_geometry)
else:
weights_dict["beamspread"] = one
if use_attenuation:
weights_dict["attenuation"] = model.material_attenuation_for_path(
path, ray_geometry, frequency
)
else:
weights_dict["attenuation"] = one
if turn_off_invalid_rays:
weights_dict["invalid"] = (~path.rays._invalid_rays).astype(float)
else:
weights_dict["invalid"] = one
weights = (
weights_dict["directivity"]
* weights_dict["transrefl"]
* weights_dict["beamspread"]
* weights_dict["attenuation"]
* weights_dict["invalid"]
)
return weights, weights_dict
[docs]
def rx_ray_weights(
path,
ray_geometry,
frequency,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
):
"""
Coefficients Q'_i(r, omega) in forward model.
Parameters
----------
path : Path
ray_geometry : arim.ray.RayGeometry
frequency : float
probe_element_width : float or None
Mandatory if use_directivity is True
use_directivity : bool
Default True
use_beamspread : bool
Default True
use_transrefl : bool
Default: True
use_attenuation : bool
Default: True
turn_off_invalid_rays : bool
Default False
Returns
-------
weights : ndarray
Shape (numelements, numgridpoints)
weights_dict : dict[str, ndarray]
Components of the ray weights: beamspread, directivity, transmission-reflection, attenuation
"""
d = _init_ray_weights(path, frequency, probe_element_width, use_directivity)
weights_dict = dict()
one = np.ones((len(path.interfaces[0].points), d.numgridpoints), order="F")
if use_directivity:
weights_dict[
"directivity"
] = model.directivity_2d_rectangular_in_fluid_for_path(
ray_geometry, probe_element_width, d.wavelength_in_couplant
)
else:
weights_dict["directivity"] = one
if use_transrefl:
weights_dict["transrefl"] = model.reverse_transmission_reflection_for_path(
path, ray_geometry, unit="displacement"
)
else:
weights_dict["transrefl"] = one
if use_beamspread:
weights_dict["beamspread"] = model.reverse_beamspread_2d_for_path(ray_geometry)
else:
weights_dict["beamspread"] = one
if use_attenuation:
weights_dict["attenuation"] = model.material_attenuation_for_path(
path, ray_geometry, frequency
)
else:
weights_dict["attenuation"] = one
if turn_off_invalid_rays:
weights_dict["invalid"] = (~path.rays._invalid_rays).astype(float)
else:
weights_dict["invalid"] = one
# the coefficient accounts for the normalisation convention of the scattering in Bristol's literature
scat_normalisation = np.sqrt(d.wavelengths_in_block[path.modes[-1]])
weights = (
weights_dict["directivity"]
* weights_dict["transrefl"]
* weights_dict["beamspread"]
* weights_dict["attenuation"]
* weights_dict["invalid"]
)
weights *= scat_normalisation
return weights, weights_dict
[docs]
def ray_weights_for_views(
views,
frequency,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
save_debug=False,
):
"""
Compute coefficients Q_i(r, omega) and Q'_j(r, omega) from the forward model for
all views.
NB: do not compute the scattering.
Internally use :func:`tx_ray_weights` and :func:`rx_way_weights`.
Parameters
----------
views : dict[Views]
frequency : float
probe_element_width : float
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
turn_off_invalid_rays : bool
save_debug : bool
Returns
-------
RayWeights
"""
tx_ray_weights_dict = {}
rx_ray_weights_dict = {}
if save_debug:
tx_ray_weights_debug_dict = {}
rx_ray_weights_debug_dict = {}
else:
tx_ray_weights_debug_dict = None
rx_ray_weights_debug_dict = None
scat_angle_dict = {}
all_tx_paths = {view.tx_path for view in views.values()}
all_rx_paths = {view.rx_path for view in views.values()}
all_paths = all_tx_paths | all_rx_paths
model_options = dict(
frequency=frequency,
probe_element_width=probe_element_width,
use_beamspread=use_beamspread,
use_directivity=use_directivity,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
turn_off_invalid_rays=turn_off_invalid_rays,
)
# By proceeding this way, geometrical computations can be reused for both
# tx and rx path.
for path in all_paths:
ray_geometry = RayGeometry.from_path(path)
scat_angle_dict[path] = ray_geometry.signed_inc_angle(-1)
scat_angle_dict[path].flags.writeable = False
if path in all_tx_paths:
ray_weights, ray_weights_debug = tx_ray_weights(
path, ray_geometry, **model_options
)
ray_weights.flags.writeable = False
tx_ray_weights_dict[path] = ray_weights
if save_debug:
tx_ray_weights_debug_dict[path] = ray_weights_debug
del ray_weights, ray_weights_debug
if path in all_rx_paths:
ray_weights, ray_weights_debug = rx_ray_weights(
path, ray_geometry, **model_options
)
ray_weights.flags.writeable = False
rx_ray_weights_dict[path] = ray_weights
if save_debug:
rx_ray_weights_debug_dict[path] = ray_weights_debug
del ray_weights, ray_weights_debug
return model.RayWeights(
tx_ray_weights_dict,
rx_ray_weights_dict,
tx_ray_weights_debug_dict,
rx_ray_weights_debug_dict,
scat_angle_dict,
)
[docs]
def frontwall_path(
couplant_material,
block_material,
probe_points,
probe_orientations,
frontwall_points,
frontwall_orientations,
):
"""
Probe -> couplant -> frontwall -> couplant -> probe
Parameters
----------
couplant_material
block_material
probe_points
probe_orientations
frontwall_points
frontwall_orientations
Returns
-------
Path
"""
probe_start = c.Interface(
probe_points, probe_orientations, are_normals_on_out_rays_side=True
)
probe_end = c.Interface(
probe_points, probe_orientations, are_normals_on_inc_rays_side=True
)
frontwall_ext_refl = c.Interface(
frontwall_points,
frontwall_orientations,
"fluid_solid",
"reflection",
reflection_against=block_material,
are_normals_on_inc_rays_side=False,
are_normals_on_out_rays_side=False,
)
return c.Path(
interfaces=(probe_start, frontwall_ext_refl, probe_end),
materials=(couplant_material, couplant_material),
modes=(c.Mode.L, c.Mode.L),
name="Frontwall",
)
[docs]
def backwall_paths(
couplant_material,
block_material,
probe_oriented_points,
frontwall,
backwall,
max_number_of_reflection=1,
):
"""
Make backwall paths
when max_backwall_refl == 1
Probe -> couplant -> frontwall -> block (L or T) -> backwall -> block (L or T) -> frontwall -> couplant -> probe
(additional) when max_backwall_refl == 2
Probe -> couplant -> frontwall -> block (L or T) -> backwall -> block (L or T) -> frontwall
-> block (L or T) -> backwall -> block (L or T) -> frontwall -> couplant -> probe
(additional) when max_backwall_refl == 3
Probe -> couplant -> frontwall -> block (L or T) -> backwall -> block (L or T) -> frontwall
-> block (L or T) -> backwall -> block (L or T) -> frontwall
-> block (L or T) -> backwall -> block (L or T) -> frontwall -> couplant -> probe
Parameters
----------
couplant_material : Material
block_material : Material
probe_oriented_points : OrientedPoints
frontwall: OrientedPoints
backwall: OrientedPoints
max_number_of_reflection : int
Number of internal reflections. Default: 1.
Returns
-------
OrderedDict of Path
Keys: LL, LT, TL, TT
(additional) LLLL, LLLT, LLTL, LLTT, LTLT, LTTL LTTT, TLLT, TLTT, TTTT
(additional, ...refl=3) left to user to work out...
"""
if max_number_of_reflection > 3:
msg = "The maximum number of backwall reflections exceeds coding limit (3)"
raise ValueError(msg)
paths = OrderedDict()
if max_number_of_reflection == 0:
return paths
probe_start = c.Interface(*probe_oriented_points, are_normals_on_out_rays_side=True)
frontwall_couplant_to_block = c.Interface(
*frontwall,
"fluid_solid",
"transmission",
are_normals_on_inc_rays_side=False,
are_normals_on_out_rays_side=True,
)
backwall_refl = c.Interface(
*backwall,
"solid_fluid",
"reflection",
reflection_against=couplant_material,
are_normals_on_inc_rays_side=False,
are_normals_on_out_rays_side=False,
)
frontwall_block_to_couplant = c.Interface(
*frontwall,
"solid_fluid",
"transmission",
are_normals_on_inc_rays_side=True,
are_normals_on_out_rays_side=False,
)
frontwall_refl = c.Interface(
*frontwall,
"solid_fluid",
"reflection",
reflection_against=couplant_material,
are_normals_on_inc_rays_side=True,
are_normals_on_out_rays_side=True,
)
probe_end = c.Interface(*probe_oriented_points, are_normals_on_inc_rays_side=True)
for mode1 in (c.Mode.L, c.Mode.T):
for mode2 in (c.Mode.L, c.Mode.T):
key = mode1.key() + mode2.key()
paths[key] = c.Path(
interfaces=(
probe_start,
frontwall_couplant_to_block,
backwall_refl,
frontwall_block_to_couplant,
probe_end,
),
materials=(
couplant_material,
block_material,
block_material,
couplant_material,
),
modes=(c.Mode.L, mode1, mode2, c.Mode.L),
name="Backwall " + key,
)
if max_number_of_reflection == 1:
return paths
for mode1 in (c.Mode.L, c.Mode.T):
for mode2 in (c.Mode.L, c.Mode.T):
for mode3 in (c.Mode.L, c.Mode.T):
for mode4 in (c.Mode.L, c.Mode.T):
key = mode1.key() + mode2.key() + mode3.key() + mode4.key()
paths[key] = c.Path(
interfaces=(
probe_start,
frontwall_couplant_to_block,
backwall_refl,
frontwall_refl,
backwall_refl,
frontwall_block_to_couplant,
probe_end,
),
materials=(
couplant_material,
block_material,
block_material,
block_material,
block_material,
couplant_material,
),
modes=(c.Mode.L, mode1, mode2, mode3, mode4, c.Mode.L),
name="Backwall " + key,
)
if max_number_of_reflection == 2:
return paths
for mode1 in (c.Mode.L, c.Mode.T):
for mode2 in (c.Mode.L, c.Mode.T):
for mode3 in (c.Mode.L, c.Mode.T):
for mode4 in (c.Mode.L, c.Mode.T):
for mode5 in (c.Mode.L, c.Mode.T):
for mode6 in (c.Mode.L, c.Mode.T):
key = (
mode1.key()
+ mode2.key()
+ mode3.key()
+ mode4.key()
+ mode5.key()
+ mode6.key()
)
paths[key] = c.Path(
interfaces=(
probe_start,
frontwall_couplant_to_block,
backwall_refl,
frontwall_refl,
backwall_refl,
frontwall_refl,
backwall_refl,
frontwall_block_to_couplant,
probe_end,
),
materials=(
couplant_material,
block_material,
block_material,
block_material,
block_material,
block_material,
block_material,
couplant_material,
),
modes=(
c.Mode.L,
mode1,
mode2,
mode3,
mode4,
mode5,
mode6,
c.Mode.L,
),
name="Backwall " + key,
)
return paths
[docs]
def backwall_paths2(
couplant_material,
block_material,
probe_oriented_points,
frontwall,
backwall,
max_backwall_refl=1,
):
warnings.warn("Deprecated, use backwall_paths() instead", DeprecationWarning)
return backwall_paths(
couplant_material,
block_material,
probe_oriented_points,
frontwall,
backwall,
max_number_of_reflection=max_backwall_refl,
)
[docs]
def ray_weights_for_wall(
path,
frequency,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
walls=None,
):
"""
Compute model coefficients for wall echoes.
Parameters
----------
path
frequency
probe_element_width
use_directivity
use_beamspread
use_transrefl
use_attenuation
turn_off_invalid_rays
walls
Returns
-------
weights : ndarray
Shape (numelements, numelements)
weights_dict : dict[str, ndarray]
Components of the ray weights: beamspread, directivity, transmission-reflection, attenuation
"""
# perform ray tracing if needed
if path.rays is None:
ray.ray_tracing_for_paths(
[path], turn_off_invalid_rays=turn_off_invalid_rays, walls=walls
)
ray_geometry = RayGeometry.from_path(path)
d = _init_ray_weights(path, frequency, probe_element_width, use_directivity)
weights_dict = dict()
one = np.ones((len(path.interfaces[0].points), d.numgridpoints), order="F")
if use_directivity:
directivity = model.directivity_2d_rectangular_in_fluid_for_path(
ray_geometry, probe_element_width, d.wavelength_in_couplant
)
weights_dict["directivity"] = directivity * directivity.T
else:
weights_dict["directivity"] = one
if use_transrefl:
weights_dict["transrefl"] = model.transmission_reflection_for_path(
path, ray_geometry, unit="displacement"
)
else:
weights_dict["transrefl"] = one
if use_beamspread:
weights_dict["beamspread"] = model.beamspread_2d_for_path(ray_geometry)
else:
weights_dict["beamspread"] = one
if use_attenuation:
weights_dict["attenuation"] = model.material_attenuation_for_path(
path, ray_geometry, frequency
)
else:
weights_dict["attenuation"] = one
if turn_off_invalid_rays:
weights_dict["invalid"] = (~path.rays._invalid_rays).astype(float)
else:
weights_dict["invalid"] = one
weights = (
weights_dict["directivity"]
* weights_dict["transrefl"]
* weights_dict["beamspread"]
* weights_dict["attenuation"]
* weights_dict["invalid"]
)
return weights, weights_dict
[docs]
def make_interfaces(
couplant_material,
probe_oriented_points,
frontwall,
grid_oriented_points,
reflecting_walls=None,
):
"""
Construct Interface objects for the case of a solid block in immersion
(couplant is liquid).
The interfaces are for rays starting from the probe and arriving in the
grid. There must be a frontwall interface to allow liquid-to-solid
transmission. Additional walls can be included to allow solid-against-
liquid reflection.
Assumes that walls are provided in the order that the ray reflects. For
example, if no reflections:
`walls = None`
If 1 reflection from the backwall:
`walls = [backwall]`
If 2 reflections from the backwall and then the frontwall:
`walls = [backwall, frontwall]`
Note that in this final example, the frontwall must be provided in `walls`
as well as in the input variable.
Assumes all that walls have orientation facing into the solid.
Parameters
----------
couplant_material: Material
probe_oriented_points : OrientedPoints
frontwall: OrientedPoints
grid_oriented_points: OrientedPoints
reflecting_walls: OrderedDict[OrientedPoints] or None
Returns
-------
interface_dict : dict[Interface]
Keys: probe, frontwall_trans, grid, wall_name_1 (optional), ...
"""
interface_dict = OrderedDict()
interface_dict["probe"] = c.Interface(
*probe_oriented_points, are_normals_on_out_rays_side=True
)
interface_dict["grid"] = c.Interface(
*grid_oriented_points, are_normals_on_inc_rays_side=True
)
# Need both transmission and reflection for frontwall
interface_dict["frontwall_trans"] = c.Interface(
*frontwall,
kind="fluid_solid",
transmission_reflection="transmission",
reflection_against=None,
are_normals_on_inc_rays_side=False,
are_normals_on_out_rays_side=True,
)
if reflecting_walls is not None:
for name, wall in reflecting_walls.items():
name = name.lower() + "_refl"
interface_dict[name] = c.Interface(
*wall,
kind="solid_fluid",
transmission_reflection="reflection",
reflection_against=couplant_material,
are_normals_on_inc_rays_side=True,
are_normals_on_out_rays_side=True,
)
return interface_dict
[docs]
def make_paths(
block_material,
couplant_material,
interface_dict,
max_number_of_reflection=1,
):
"""
Creates all iterations of paths up with max_number_of_reflection. Path
names are defined as the wave modes of the ray in transmit convention,
separated by the wall which is skipped from. If 1 reflection is allowed
from a wall "backwall", then the paths returned will be named "L", "T",
"L backwall L", "L backwall T", "T backwall L", "T backwall T".
Paths are returned in transmit convention: for the path XY, X is the mode
before reflection against the backwall and Y is the mode after reflection.
The path XY in transmit convention is the path YX in receive convention.
Parameters
----------
block_material : Material
couplant_material : Material
interface_dict : dict[Interface]
max_number_of_reflection : int
Default: 1.
Returns
-------
paths : OrderedDict
"""
paths = OrderedDict()
if max_number_of_reflection > 2:
raise NotImplementedError
if max_number_of_reflection < 0:
raise ValueError
probe = interface_dict["probe"]
frontwall = interface_dict["frontwall_trans"]
grid = interface_dict["grid"]
wall_dict = OrderedDict(
(key, val)
for key, val in interface_dict.items()
if key not in ["probe", "grid", "frontwall_trans"]
)
wall_names = list(wall_dict.keys())
if (max_number_of_reflection > 0 and len(wall_names) == 0) or (
max_number_of_reflection > 1 and len(wall_names) < 2
):
raise ValueError("Not enough walls to reflect from.")
mode_names = ("L", "T")
modes = (c.Mode.longitudinal, c.Mode.transverse)
for no_reflections in range(max_number_of_reflection + 1):
# For this number of reflections, make all the combinations of paths.
path_idxs_up_to_refl = list(product(range(2), repeat=no_reflections + 1))
for path_idxs in path_idxs_up_to_refl:
# For each path with this number of reflections.
path_name = "" # Current convention does not include frontwall transmission in path name, so start with empty string.
path_modes = [c.Mode.longitudinal]
path_interfaces = [probe, frontwall]
path_materials = [couplant_material]
for i, mode in enumerate(path_idxs):
if i == 0:
path_name += mode_names[mode]
else:
# Have to settle on a naming convention.
# Published work to date (~2023) joins wave modes and assumes a constant wall (typically back wall).
# New `Path` method `longname` splices wall names into the mode names to indicate which wall was skipped from. Preserve simple naming convention for path dict keys.
# If multiple paths with the same modes but different wall skips are needed, they will need to be stored in different dicts. Edit this string if this is inconvenient.
path_name += "{}".format(mode_names[mode])
path_interfaces.append(wall_dict[wall_names[i - 1]])
path_modes.append(modes[mode])
path_materials.append(block_material)
path_interfaces.append(grid)
paths[path_name] = c.Path(
interfaces=path_interfaces,
materials=path_materials,
modes=path_modes,
name=path_name,
)
return paths
[docs]
def make_views(
examination_object,
probe_oriented_points,
scatterers_oriented_points,
walls_for_imaging=None,
tfm_unique_only=False,
):
"""
Make views for the measurement model of a block in immersion (scatterers response
only).
Parameters
----------
examination_object : arim.core.BlockInImmersion
probe_oriented_points : OrientedPoints
scatterers_oriented_points : OrientedPoints
walls_for_imaging : list[str]
Keys of the walls in examination_object.walls which will be used to reflected
from when imaging. Must be provided in the order that they are reflected
from on the transmit path. The front wall transmission should not be provided
in this list, it is assumed to exist in the examination object because this
is an immersion configuration. Subsequent reflections from the front wall
may be included. The length of this list will be used as the max number of
reflections. The default is None, i.e. no reflections.
tfm_unique_only : bool
Default False. If True, returns only the views that give *different* imaging
results with TFM (AB-CD and DC-BA give the same imaging result).
Returns
-------
views: OrderedDict[Views]
"""
try:
couplant = examination_object.couplant_material
block = examination_object.block_material
frontwall = None
walls = OrderedDict()
if walls_for_imaging is None:
walls_for_imaging = []
frontwall = examination_object.walls["Frontwall"]
for name in walls_for_imaging:
walls[name] = examination_object.walls[name]
max_number_of_reflection = len(walls_for_imaging)
except AttributeError as e:
raise ValueError("Examination object should be a BlockInImmersion") from e
interfaces = make_interfaces(
couplant,
probe_oriented_points,
frontwall,
scatterers_oriented_points,
walls,
)
paths = make_paths(block, couplant, interfaces, max_number_of_reflection)
return make_views_from_paths(paths, tfm_unique_only)
[docs]
def scat_unshifted_transfer_functions(
views,
tx,
rx,
freq_array,
scat_obj,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
scat_angle=0.0,
numangles_for_scat_precomp=0,
first_nonzero_freq_idx=None,
turn_off_invalid_rays=False,
):
"""
Compute unshifted transfer functions for scatterer echoes (multi-frequency model).
Returns ``H_ij(omega) = Q_i(omega) Q'_j(omega) S(omega, theta_i, theta_j)``
Output spectra uses the *math* Fourier convention (not the acoustics one).
Parameters
----------
views : Dict[Views]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
freq_array : ndarray or float
Shape: (numfreq, )
scat_obj : arim.scat.Scattering2d
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
scat_angle : float
numangles_for_scat_precomp : int
Number of angles in [-pi, pi] for scattering precomputation.
0 to disable. See module documentation.
first_nonzero_freq_idx : int or None
Default: assumes first freq is zero, except if only one freq is given.
turn_off_invalid_rays : bool
Yields
------
partial_transfer_function_f : ndarray
Shape: (numscatterers, numtimetraces, numfreq). Complex. Contribution for one view.
delays : ndarray
Shape: (numscatterers, numtimetraces). Float. Contribution for one view.
See Also
--------
:func:`arim.signal.timeshift_spectra`
"""
freq_array = np.atleast_1d(freq_array)
numfreq = len(freq_array)
if first_nonzero_freq_idx is None:
if numfreq == 1:
# assume the freq is nonzero
first_nonzero_freq_idx = 0
else:
# assume only the first freq is zero, as returned by fftfreq
first_nonzero_freq_idx = 1
nonzero_freq_array = freq_array[first_nonzero_freq_idx:]
# Precompute all ray weights
ray_weights_allfreq = []
with helpers.timeit("Computation of ray weights", logger):
for frequency in nonzero_freq_array:
# logger.debug(f'ray weight freq={frequency}')
ray_weights = ray_weights_for_views(
views,
frequency=frequency,
probe_element_width=probe_element_width,
use_beamspread=use_beamspread,
use_directivity=use_directivity,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
turn_off_invalid_rays=turn_off_invalid_rays,
)
ray_weights_allfreq.append(ray_weights)
# (Pre)compute scattering
from ..scat import ScatFromData
scat_keys_to_compute = set(view.scat_key() for view in views.values())
# model_amplitudes_factory is way faster with the scattering is given as matrices instead of functions.
# If matrices can be computed cheaply, it's worth it.
if isinstance(scat_obj, ScatFromData):
with helpers.timeit("Scattering", logger):
scat_matrices = scat_obj.as_multi_freq_matrices(
nonzero_freq_array, scat_obj.numangles, to_compute=scat_keys_to_compute
)
elif numangles_for_scat_precomp > 0:
with helpers.timeit("Scattering", logger):
scat_matrices = scat_obj.as_multi_freq_matrices(
nonzero_freq_array,
numangles_for_scat_precomp,
to_compute=scat_keys_to_compute,
)
else:
scat_matrices = None
numtimetraces = len(tx)
for view in views.values():
logger.info(f"Transfer function for scatterers in view {view.name}")
numscatterers = view.tx_path.rays.times.shape[1]
partial_transfer_function_f = np.zeros(
(numscatterers, numtimetraces, numfreq), complex
)
# shape: (numscatterers, numtimetraces)
delays = np.ascontiguousarray(
(
np.take(view.tx_path.rays.times, tx, axis=0)
+ np.take(view.rx_path.rays.times, rx, axis=0)
).T
)
for freq_idx, frequency in enumerate(nonzero_freq_array):
freq_idx2 = first_nonzero_freq_idx + freq_idx
if scat_matrices:
scattering = {key: mat[freq_idx] for key, mat in scat_matrices.items()}
else:
scattering = scat_obj.as_angles_funcs(frequency)
ray_weights = ray_weights_allfreq[freq_idx]
# compute Q_i Q'_j S_ij
# shape: (numscatterers, numtimetraces, )
model_coefficients = model.model_amplitudes_factory(
tx, rx, view, ray_weights, scattering, scat_angle=scat_angle
)[...]
np.conj(model_coefficients, out=model_coefficients)
partial_transfer_function_f[..., freq_idx2] = model_coefficients
yield partial_transfer_function_f, delays
[docs]
def wall_unshifted_transfer_functions(
wall_paths,
tx,
rx,
freq_array,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
first_nonzero_freq_idx=None,
turn_off_invalid_rays=False,
walls=None,
):
"""Compute unshifted transfer functions for walls echoes.
Output spectra uses the *math* Fourier convention (not the acoustics one).
Parameters
----------
wall_paths : Dict[arim.Path]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
freq_array : ndarray or float
Shape: (numfreq, )
probe_element_width : [type], optional
[description] (the default is None, which [default_description])
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
first_nonzero_freq_idx : int or None
Default: assumes first freq is zero, except if only one freq is given.
turn_off_invalid_rays : bool
walls : list[OrientedPoints]
Yields
------
partial_transfer_function_f : ndarray
Shape: (numtimetraces, numfreq). Complex. Contribution for one wall path.
delays : ndarray
Shape: (numtimetraces). Float. Contribution for wall path.
"""
freq_array = np.atleast_1d(freq_array)
numfreq = len(freq_array)
if first_nonzero_freq_idx is None:
if numfreq == 1:
# assume the freq is nonzero
first_nonzero_freq_idx = 0
else:
# assume only the first freq is zero, as returned by fftfreq
first_nonzero_freq_idx = 1
nonzero_freq_array = freq_array[first_nonzero_freq_idx:]
for pathname, path in wall_paths.items():
logger.info(f"Transfer function for wall {pathname}")
partial_transfer_function_f = np.zeros((len(tx), len(freq_array)), complex)
for freq_idx, frequency in enumerate(nonzero_freq_array):
freq_idx2 = first_nonzero_freq_idx + freq_idx
# shape: (numelements, numelements)
ray_weights, _ = ray_weights_for_wall(
path,
frequency=frequency,
probe_element_width=probe_element_width,
use_beamspread=use_beamspread,
use_directivity=use_directivity,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
turn_off_invalid_rays=turn_off_invalid_rays,
walls=walls,
)
# Fancy indexing:
partial_transfer_function_f[:, freq_idx2] = ray_weights[tx, rx].conj()
delays = path.rays.times[tx, rx]
yield partial_transfer_function_f, delays
[docs]
def singlefreq_scat_transfer_functions(
views,
tx,
rx,
frequency,
freq_array,
scat_obj,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
scat_angle=0.0,
numangles_for_scat_precomp=0,
turn_off_invalid_rays=False,
):
"""
Compute transfer functions for scatterer echoes (single-frequency model).
Output spectra uses the *math* Fourier convention (not the acoustics one).
Parameters
----------
views : Dict[Views]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
frequency : float
freq_array : ndarray
Shape: (numfreq, )
scat_obj : arim.scat.Scattering2d
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
scat_angle : float
numangles_for_scat_precomp : int
Number of angles in [-pi, pi] for scattering precomputation.
0 to disable. See module documentation.
turn_off_invalid_rays : bool
Yields
------
viewname : str
Key of `views`
partial_transfer_function_f : ndarray
Shape: (numtimetraces, numfreq). Complex. Contribution for one view.
Notes
-----
Legacy function, superseeded by :func:`scat_unshifted_transfer_functions`
and :func:`arim.signal.timeshift_spectra`.
"""
unshifted_tfs = scat_unshifted_transfer_functions(
views,
tx,
rx,
frequency,
scat_obj,
probe_element_width=probe_element_width,
use_directivity=use_directivity,
use_beamspread=use_beamspread,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
scat_angle=scat_angle,
numangles_for_scat_precomp=numangles_for_scat_precomp,
turn_off_invalid_rays=turn_off_invalid_rays,
)
for viewname, (unshifted_tf, delays) in zip(views.keys(), unshifted_tfs):
# shape (numscatterers, numtimetraces, numfreq)
tf = signal.timeshift_spectra(unshifted_tf, delays, freq_array)
# lazy tf.sum(axis=0):
if tf.shape[0] == 1:
tf = tf[0]
else:
tf = tf.sum(axis=0)
yield viewname, tf
[docs]
def singlefreq_wall_transfer_functions(
wall_paths,
tx,
rx,
frequency,
freq_array,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
walls=None,
):
"""
Compute transfer functions for wall echoes (single-frequency model).
Parameters
----------
wall_paths : Dict[Path]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
frequency : float
Frequency at which the model runs.
freq_array : ndarray
Shape: (numfreq, ). First freq is assumed to be zero.
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
turn_off_invalid_rays : bool
walls : OrderedDict[OrientedPoints]
Yields
------
pathname : str
Key of `wall_paths`
partial_transfer_function_f : ndarray
Shape: (numtimetraces, numfreq). Complex. Contribution for one path.
Notes
-----
Legacy function, superseeded by :func:`wall_unshifted_transfer_functions`
and :func:`arim.signal.timeshift_spectra`.
"""
unshifted_tfs = wall_unshifted_transfer_functions(
wall_paths,
tx,
rx,
frequency,
probe_element_width=probe_element_width,
use_directivity=use_directivity,
use_beamspread=use_beamspread,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
turn_off_invalid_rays=turn_off_invalid_rays,
walls=walls,
)
for pathname, (unshifted_tf, delays) in zip(wall_paths.keys(), unshifted_tfs):
tf = signal.timeshift_spectra(unshifted_tf, delays, freq_array)
yield pathname, tf
[docs]
def multifreq_scat_transfer_functions(
views,
tx,
rx,
freq_array,
scat_obj,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
scat_angle=0.0,
numangles_for_scat_precomp=0,
turn_off_invalid_rays=False,
):
"""
Compute transfer functions for scatterer echoes (multi-frequency model).
Output spectra uses the *math* Fourier convention (not the acoustics one).
Parameters
----------
views : Dict[Views]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
freq_array : ndarray
Shape: (numfreq, )
scat_obj : arim.scat.Scattering2d
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
scat_angle : float
numangles_for_scat_precomp : int
Number of angles in [-pi, pi] for scattering precomputation.
0 to disable. See module documentation.
turn_off_invalid_rays : bool
Yields
------
viewname : str
Key of `views`
partial_transfer_function_f : ndarray
Shape: (numtimetraces, numfreq). Complex. Contribution for one view.
Notes
-----
Legacy function, superseeded by :func:`scat_unshifted_transfer_functions`
and :func:`arim.signal.timeshift_spectra`.
"""
unshifted_tfs = scat_unshifted_transfer_functions(
views,
tx,
rx,
freq_array,
scat_obj,
probe_element_width=probe_element_width,
use_directivity=use_directivity,
use_beamspread=use_beamspread,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
scat_angle=scat_angle,
numangles_for_scat_precomp=numangles_for_scat_precomp,
turn_off_invalid_rays=turn_off_invalid_rays,
)
for viewname, (unshifted_tf, delays) in zip(views.keys(), unshifted_tfs):
# shape (numscatterers, numtimetraces, numfreq)
tf = signal.timeshift_spectra(unshifted_tf, delays, freq_array)
# lazy tf.sum(axis=0):
if tf.shape[0] == 1:
tf = tf[0]
else:
tf = tf.sum(axis=0)
yield viewname, tf
[docs]
def multifreq_wall_transfer_functions(
wall_paths,
tx,
rx,
freq_array,
probe_element_width=None,
use_directivity=True,
use_beamspread=True,
use_transrefl=True,
use_attenuation=True,
turn_off_invalid_rays=False,
walls=None,
):
"""
Compute transfer functions for scatterer echoes (multi-frequency model).
Parameters
----------
wall_paths : Dict[Path]
tx : ndarray
Shape: (numtimetraces, )
rx : ndarray
Shape: (numtimetraces, )
freq_array : ndarray
Shape: (numfreq, ). First freq is assumed to be zero.
probe_element_width : float or None
use_directivity : bool
use_beamspread : bool
use_transrefl : bool
use_attenuation : bool
turn_off_invalid_rays : bool
walls : OrderedDict[OrientedPoints]
Yields
------
pathname : str
Key of `wall_paths`
partial_transfer_function_f : ndarray
Shape: (numtimetraces, numfreq). Complex. Contribution for one path.
Notes
-----
Legacy function, superseeded by :func:`wall_unshifted_transfer_functions`
and :func:`arim.signal.timeshift_spectra`.
"""
unshifted_tfs = wall_unshifted_transfer_functions(
wall_paths,
tx,
rx,
freq_array,
probe_element_width=probe_element_width,
use_directivity=use_directivity,
use_beamspread=use_beamspread,
use_transrefl=use_transrefl,
use_attenuation=use_attenuation,
turn_off_invalid_rays=turn_off_invalid_rays,
walls=walls,
)
for pathname, (unshifted_tf, delays) in zip(wall_paths.keys(), unshifted_tfs):
tf = signal.timeshift_spectra(unshifted_tf, delays, freq_array)
yield pathname, tf