Source code for ewokstomo.tasks.utils
from blissdata.redis_engine.scan import ScanState
[docs]
def wait_for_scan_state(scan, desired_state: ScanState):
"""
Wait until a scan reaches the desired state (or a higher one).
Args:
scan: The scan object to monitor.
desired_state (ScanState): The state we want to wait for.
Returns:
The scan object in its final state.
"""
while scan.state < desired_state.value:
scan.update()
def _get_technique_from_nabu_config(nabu_config: dict[str, dict[str, str]]) -> str:
"""Extract the technique from the Nabu configuration dictionary."""
method = nabu_config.get("method")
technique = "absorption"
if isinstance(method, str):
if method in ("Paganin", "CTF"):
technique = "phase"
return technique
def _get_output_format_from_nabu_config(nabu_config: dict[str, dict[str, str]]) -> str:
"""Extract the output format from the Nabu configuration dictionary."""
output_format = "hdf5"
if "output" in nabu_config and "file_format" in nabu_config["output"]:
output_format = nabu_config["output"]["file_format"]
return output_format