Source code for ewokstomo.tasks.energycalculation

from __future__ import annotations
import logging
from typing import Any, TYPE_CHECKING
import numpy as np
import xraylib
from ewokscore import Task
from xoppylib.sources.xoppy_bm_wiggler import xoppy_calc_bm
from xoppylib.sources.xoppy_bm_wiggler import xoppy_calc_wiggler_on_aperture

logger = logging.getLogger(__name__)

_NIST_ALIASES = {
    "air": "Air, Dry (near sea level)",
    "water": "Water, Liquid",
    "kapton": "Kapton Polyimide Film",
}
try:
    for n in xraylib.GetCompoundDataNISTList():
        _NIST_ALIASES[n] = n
        _NIST_ALIASES[n.lower()] = n
        _NIST_ALIASES[n.replace(" ", "")] = n
        _NIST_ALIASES[n.lower().replace(" ", "")] = n
except Exception as e:
    logger.debug(
        "Could not load NIST compound list from xraylib; using base aliases only: %s",
        e,
    )


def _is_number(x) -> bool:
    try:
        float(x)
        return True
    except Exception:
        return False


if TYPE_CHECKING:  # pragma: no cover - typing-only import
    pass

try:
    from ewokscore.missing_data import MissingData as _MissingDataRuntime  # type: ignore[assignment]
except Exception:  # pragma: no cover - fallback for optional dependency
    _MissingDataRuntime = None  # type: ignore[assignment]


def _is_missing_data(value: Any) -> bool:
    if value is None:
        return False
    if _MissingDataRuntime is not None and isinstance(value, _MissingDataRuntime):
        return True
    return value.__class__.__name__ == "MissingData"


def _canonical_compound_name(name: str) -> str | None:
    if not name:
        return None
    key1 = name.strip()
    key2 = key1.lower()
    key3 = key2.replace(" ", "")
    return _NIST_ALIASES.get(key1) or _NIST_ALIASES.get(key2) or _NIST_ALIASES.get(key3)


def _mu_over_rho_element(Z: int, E_keV: np.ndarray) -> np.ndarray:
    # cm^2/g
    return np.array([xraylib.CS_Total(Z, e) for e in E_keV], dtype=float)


def _mu_over_rho_compound_from_nist(
    cname: str, E_keV: np.ndarray
) -> tuple[np.ndarray, float | None]:
    """
    Return (mu_over_rho [cm^2/g], density [g/cm^3 or None]) for a NIST compound name.
    mu/ρ is computed as the mass-fraction weighted sum of elemental mu/ρ.
    """
    data = xraylib.GetCompoundDataNISTByName(cname)
    Zs = data["Elements"]
    w = data["massFractions"]
    mu_over_rho = np.zeros_like(E_keV, dtype=float)
    for Z, wf in zip(Zs, w):
        mu_over_rho += wf * _mu_over_rho_element(int(Z), E_keV)
    return mu_over_rho, float(data["density"])


def _resolve_mu_density(
    material: str, density_g_cm3: Any, E_keV: np.ndarray
) -> tuple[np.ndarray, float]:
    """
    Resolve mass attenuation (mu/ρ) and density for either an element or a NIST compound.
    - If density is '?', None or missing, we auto-fill:
        * elements: xraylib.ElementDensity(Z)
        * compounds: NIST density
    """
    try:
        Z = xraylib.SymbolToAtomicNumber(material)
    except Exception as e:
        logger.debug(
            "Material is not an element symbol (%r); trying compound paths: %s",
            material,
            e,
        )
    else:
        mu_over_rho = _mu_over_rho_element(Z, E_keV)
        if _is_number(density_g_cm3):
            rho = float(density_g_cm3)
        else:
            rho = float(xraylib.ElementDensity(Z))
        return mu_over_rho, rho

    cname = _canonical_compound_name(material)
    if cname is not None:
        mu_over_rho, rho_nist = _mu_over_rho_compound_from_nist(cname, E_keV)
        if _is_number(density_g_cm3):
            rho = float(density_g_cm3)
        elif rho_nist is not None:
            rho = float(rho_nist)
        else:
            raise ValueError(f"NIST density unavailable for compound '{cname}'")
        return mu_over_rho, rho
    try:
        comp = xraylib.CompoundParser(material)
        Zs = comp["Elements"]
        w = comp["massFractions"]
        mu_over_rho = np.zeros_like(E_keV, dtype=float)
        for Z, wf in zip(Zs, w):
            mu_over_rho += wf * _mu_over_rho_element(int(Z), E_keV)
        rho = float(density_g_cm3) if _is_number(density_g_cm3) else 1.0
        return mu_over_rho, rho
    except Exception as e:
        raise ValueError(f"Unknown material '{material}': {e}")


def _transmission(
    material: str, thickness_mm: float, density_g_cm3: Any, energy_eV: np.ndarray
) -> np.ndarray:
    if thickness_mm <= 0:
        return np.ones_like(energy_eV, dtype=float)
    E_keV = np.asarray(energy_eV, dtype=float) / 1e3
    mu_over_rho, rho = _resolve_mu_density(
        material, density_g_cm3, E_keV
    )  # cm^2/g, g/cm^3
    mu = mu_over_rho * rho  # cm^-1
    t_cm = float(thickness_mm) / 10.0
    return np.exp(-mu * t_cm)


[docs] class ComputeBMSpectrum( # type: ignore[call-arg] Task, input_names=[ "TYPE_CALC", "VER_DIV", "MACHINE_NAME", "RB_CHOICE", "MACHINE_R_M", "BFIELD_T", "BEAM_ENERGY_GEV", "CURRENT_A", "HOR_DIV_MRAD", "PHOT_ENERGY_MIN", "PHOT_ENERGY_MAX", "NPOINTS", "LOG_CHOICE", "PSI_MRAD_PLOT", "PSI_MIN", "PSI_MAX", "PSI_NPOINTS", "FILE_DUMP", ], output_names=["energy_eV", "flux", "spectral_power", "cumulated_power"], ): """ Compute a bending-magnet (BM) spectrum using XOPPY's ``xoppy_calc_bm``. Inputs (units): - TYPE_CALC (int): must be 0. - VER_DIV (int): vertical divergence model, {0, 2}. - MACHINE_NAME (str), RB_CHOICE (int), MACHINE_R_M (m). - BFIELD_T (T), BEAM_ENERGY_GEV (GeV), CURRENT_A (A), HOR_DIV_MRAD (mrad). - PHOT_ENERGY_MIN / PHOT_ENERGY_MAX (eV), NPOINTS (int), LOG_CHOICE (0/1). - PSI_* (mrad) and PSI_NPOINTS (int), FILE_DUMP (bool). Outputs: - energy_eV (eV), sorted ascending. - flux (phot/s/0.1%bw) as returned by XOPPY. - spectral_power (W/eV). - cumulated_power (W). """
[docs] def run(self): TYPE_CALC = getattr(self.inputs, "TYPE_CALC", 0) VER_DIV = getattr(self.inputs, "VER_DIV", 0) MACHINE_NAME = getattr(self.inputs, "MACHINE_NAME", "ESRF bending magnet") RB_CHOICE = getattr(self.inputs, "RB_CHOICE", 0) MACHINE_R_M = getattr(self.inputs, "MACHINE_R_M", 25.0) BFIELD_T = getattr(self.inputs, "BFIELD_T", 0.8) BEAM_ENERGY_GEV = getattr(self.inputs, "BEAM_ENERGY_GEV", 6.0) CURRENT_A = getattr(self.inputs, "CURRENT_A", 0.2) HOR_DIV_MRAD = getattr(self.inputs, "HOR_DIV_MRAD", 1.0) PHOT_ENERGY_MIN = getattr(self.inputs, "PHOT_ENERGY_MIN", 100.0) PHOT_ENERGY_MAX = getattr(self.inputs, "PHOT_ENERGY_MAX", 200000.0) NPOINTS = getattr(self.inputs, "NPOINTS", 500) LOG_CHOICE = getattr(self.inputs, "LOG_CHOICE", 1) PSI_MRAD_PLOT = getattr(self.inputs, "PSI_MRAD_PLOT", 1.0) PSI_MIN = getattr(self.inputs, "PSI_MIN", -1.0) PSI_MAX = getattr(self.inputs, "PSI_MAX", 1.0) PSI_NPOINTS = getattr(self.inputs, "PSI_NPOINTS", 500) FILE_DUMP = getattr(self.inputs, "FILE_DUMP", False) a6_T, fm, a, energy_eV = xoppy_calc_bm( TYPE_CALC=TYPE_CALC, MACHINE_NAME=MACHINE_NAME, RB_CHOICE=RB_CHOICE, MACHINE_R_M=MACHINE_R_M, BFIELD_T=BFIELD_T, BEAM_ENERGY_GEV=BEAM_ENERGY_GEV, CURRENT_A=CURRENT_A, HOR_DIV_MRAD=HOR_DIV_MRAD, VER_DIV=VER_DIV, PHOT_ENERGY_MIN=PHOT_ENERGY_MIN, PHOT_ENERGY_MAX=PHOT_ENERGY_MAX, NPOINTS=NPOINTS, LOG_CHOICE=LOG_CHOICE, PSI_MRAD_PLOT=PSI_MRAD_PLOT, PSI_MIN=PSI_MIN, PSI_MAX=PSI_MAX, PSI_NPOINTS=PSI_NPOINTS, FILE_DUMP=FILE_DUMP, ) if TYPE_CALC != 0 or VER_DIV not in (0, 2): raise ValueError( "ComputeBMSpectrum expects TYPE_CALC=0 and VER_DIV in {0,2}" ) flux = a6_T[:, 5] spectral_power = a6_T[:, 6] cum_power = a6_T[:, 7] order = np.argsort(energy_eV) self.outputs.energy_eV = energy_eV[order] self.outputs.flux = flux[order] self.outputs.spectral_power = spectral_power[order] self.outputs.cumulated_power = cum_power[order]
[docs] class ComputeWigglerSpectrum( # type: ignore[call-arg] Task, input_names=[ "PHOT_ENERGY_MIN", "PHOT_ENERGY_MAX", "NPOINTS", "ENERGY", "CURRENT", "FIELD", "NPERIODS", "ULAMBDA", "K", "NTRAJPOINTS", "FILE", "SLIT_FLAG", "SLIT_D", "SLIT_NY", "SLIT_WIDTH_H_MM", "SLIT_HEIGHT_V_MM", "SLIT_CENTER_H_MM", "SLIT_CENTER_V_MM", "SHIFT_X_FLAG", "SHIFT_X_VALUE", "SHIFT_BETAX_FLAG", "SHIFT_BETAX_VALUE", "TRAJ_RESAMPLING_FACTOR", "SLIT_POINTS_FACTOR", "LOG_CHOICE", ], output_names=["energy_eV", "flux", "spectral_power", "cumulated_power"], ): """ Compute a wiggler spectrum on an aperture using ``xoppy_calc_wiggler_on_aperture``. Inputs (units): - PHOT_ENERGY_MIN / PHOT_ENERGY_MAX (eV), NPOINTS (int). - ENERGY (GeV), CURRENT (mA). - FIELD, NPERIODS (int), ULAMBDA (m), K (–), NTRAJPOINTS (int), FILE (str). - SLIT_*: distances in m/mm per name; flags as ints. - SHIFT_X_VALUE (m), SHIFT_BETAX_VALUE (rad). - TRAJ_RESAMPLING_FACTOR, SLIT_POINTS_FACTOR (floats), LOG_CHOICE (0/1). Outputs: - energy_eV (eV), sorted ascending. - flux (phot/s/0.1%bw) as returned by XOPPY. - spectral_power (W/eV). - cumulated_power (W). """ def _get(self, name: str, default: Any): return getattr(self.inputs, name, default)
[docs] def run(self): energy, flux, sp, cum, *_ = xoppy_calc_wiggler_on_aperture( FIELD=int(self._get("FIELD", 1)), NPERIODS=int(self._get("NPERIODS", 1)), ULAMBDA=float(self._get("ULAMBDA", 0.15)), # m K=float(self._get("K", 22.591)), ENERGY=float(self._get("ENERGY", 6.0)), # GeV PHOT_ENERGY_MIN=float(self._get("PHOT_ENERGY_MIN", 100.0)), PHOT_ENERGY_MAX=float(self._get("PHOT_ENERGY_MAX", 4e5)), NPOINTS=int(self._get("NPOINTS", 2000)), NTRAJPOINTS=int(self._get("NTRAJPOINTS", 101)), CURRENT=float(self._get("CURRENT", 200.0)), # mA FILE=str(self._get("FILE", "")), SLIT_FLAG=int(self._get("SLIT_FLAG", 1)), SLIT_D=float(self._get("SLIT_D", 56.5)), # m SLIT_NY=int(self._get("SLIT_NY", 101)), SLIT_WIDTH_H_MM=float(self._get("SLIT_WIDTH_H_MM", 10.0)), SLIT_HEIGHT_V_MM=float(self._get("SLIT_HEIGHT_V_MM", 5.0)), SLIT_CENTER_H_MM=float(self._get("SLIT_CENTER_H_MM", 0.0)), SLIT_CENTER_V_MM=float(self._get("SLIT_CENTER_V_MM", 0.0)), SHIFT_X_FLAG=int(self._get("SHIFT_X_FLAG", 1)), SHIFT_X_VALUE=float(self._get("SHIFT_X_VALUE", -0.002385)), # m SHIFT_BETAX_FLAG=int(self._get("SHIFT_BETAX_FLAG", 5)), SHIFT_BETAX_VALUE=float(self._get("SHIFT_BETAX_VALUE", 0.005)), TRAJ_RESAMPLING_FACTOR=float(self._get("TRAJ_RESAMPLING_FACTOR", 10000.0)), SLIT_POINTS_FACTOR=float(self._get("SLIT_POINTS_FACTOR", 3.0)), ) energy = np.asarray(energy, dtype=float) order = np.argsort(energy) self.outputs.energy_eV = energy[order] self.outputs.flux = np.asarray(flux, dtype=float)[order] self.outputs.spectral_power = np.asarray(sp, dtype=float)[order] self.outputs.cumulated_power = np.asarray(cum, dtype=float)[order]
[docs] class ApplyAttenuators( # type: ignore[call-arg] Task, input_names=["energy_eV", "spectral_power", "attenuators"], optional_input_names=["order", "flux"], output_names=[ "energy_eV", "attenuated_spectral_power", "transmission", "attenuated_flux", ], ): """ Apply a stack of attenuators to the source spectrum (and optionally flux). Inputs ------ energy_eV : numpy.ndarray Photon energy grid in electron-volts. spectral_power : numpy.ndarray Power spectrum (W/eV). attenuators : dict[str, dict] Mapping where each value contains ``material``, ``thickness_mm`` and optional ``density_g_cm3``. ``material`` accepts element symbols (e.g. ``"Al"``), NIST aliases (e.g. ``"kapton"``) or chemical formulae parsable by :mod:`xraylib`. order : list[str], optional Explicit stacking order of the attenuator keys. Defaults to the dictionary insertion order. flux : numpy.ndarray, optional Source flux array (phot/s/0.1%bw) matching ``energy_eV``. Outputs ------- energy_eV : numpy.ndarray Same energy grid passed through. transmission : numpy.ndarray Cumulative transmission of the attenuator stack. attenuated_spectral_power : numpy.ndarray ``spectral_power`` multiplied by ``transmission``. attenuated_flux : numpy.ndarray | None ``flux`` multiplied by ``transmission`` when provided, otherwise ``None``. """
[docs] def run(self): energy_eV = self.inputs.energy_eV sp_in = self.inputs.spectral_power.copy() flux_in = getattr(self.inputs, "flux", None) if _is_missing_data(flux_in): flux_in = None attenuators: dict[str, dict[str, Any]] = dict(self.inputs.attenuators) # order of stacking if hasattr(self.inputs, "order") and self.inputs.order: keys: list[str] = list(self.inputs.order) else: keys = list(attenuators.keys()) transmission = np.ones_like(energy_eV, dtype=float) for key in keys: a = attenuators[key] T = _transmission( str(a["material"]), float(a["thickness_mm"]), float(a["density_g_cm3"]), energy_eV, ) transmission *= T sp_out = sp_in * transmission flux_out = flux_in * transmission if flux_in is not None else None self.outputs.energy_eV = energy_eV self.outputs.attenuated_spectral_power = sp_out self.outputs.transmission = transmission self.outputs.attenuated_flux = flux_out
[docs] class SpectrumStats( # type: ignore[call-arg] Task, input_names=["energy_eV", "attenuated_flux"], output_names=[ "mean_energy_eV", "mean_idx", "pic_energy_eV", "pic_idx", ], ): """ Stats on an attenuated spectrum. Inputs - energy_eV: array-like (eV) - attenuated_flux : array-like (ph/s/0.1%bw) Outputs - mean_energy_eV: Flux-weighted mean using bin weights: weights = flux * ΔE / (0.001 * E). - mean_idx (int): Index of energy_eV closest to mean_energy_eV (−1 if N/A). - pic_energy_eV: energy at which flux * ΔE / (0.001 * E) is maximal (NaN if N/A). - pic_idx (int): Index of that maximum (−1 if N/A). """
[docs] def run(self): energy = np.asarray(self.inputs.energy_eV, dtype=float) flux = np.asarray(self.inputs.attenuated_flux, dtype=float) if not (energy.shape == flux.shape): raise ValueError("energy_eV and attenuated_flux must have identical shapes") mask = np.isfinite(energy) & np.isfinite(flux) & (energy > 0) if not mask.any(): self.outputs.mean_energy_eV = float("nan") self.outputs.mean_idx = -1 self.outputs.pic_energy_eV = float("nan") self.outputs.pic_idx = -1 return idx_masked = np.flatnonzero(mask) e = energy[mask] f = flux[mask] order = np.argsort(e) e_sorted = e[order] f_sorted = f[order] idx_sorted = idx_masked[order] if e_sorted.size == 1: dE = np.array([1.0], dtype=float) else: dE = np.empty_like(e_sorted) dE[1:-1] = 0.5 * (e_sorted[2:] - e_sorted[:-2]) dE[0] = e_sorted[1] - e_sorted[0] dE[-1] = e_sorted[-1] - e_sorted[-2] dE = np.clip(dE, 0.0, None) weights = f_sorted * dE / (1e-3 * e_sorted) wsum = float(np.sum(weights)) if wsum > 0.0: mean_energy_eV = float(np.sum(e_sorted * weights) / wsum) mean_idx = int(np.abs(energy - mean_energy_eV).argmin()) else: mean_energy_eV = float("nan") mean_idx = -1 if weights.size > 0: rel = int(np.argmax(weights)) pic_idx = int(idx_sorted[rel]) pic_energy_eV = float(energy[pic_idx]) else: pic_idx = -1 pic_energy_eV = float("nan") self.outputs.mean_energy_eV = mean_energy_eV self.outputs.mean_idx = mean_idx self.outputs.pic_energy_eV = pic_energy_eV self.outputs.pic_idx = pic_idx