Source code for ewokstomo.tests.test_gallery

import shutil
from pathlib import Path

import pytest
import numpy as np
from PIL import Image
from tomoscan.esrf.scan.nxtomoscan import NXtomoScan

from ewokstomo.tasks.buildgallery import (
    BuildProjectionsGallery,
    BuildSlicesGallery,
    _save_to_gallery,
    BuildVolumeGallery,
    _auto_intensity_bounds,
    _projection_index_from_data_url,
    _normalize_projections_by_beam_intensity,
)

DATA_ROOT = Path(__file__).resolve().parent / "data"
SCAN_COLLECTION = "TestEwoksTomo"


[docs] def get_data_dir(scan_name: str) -> Path: return DATA_ROOT / "PROCESSED_DATA" / SCAN_COLLECTION / scan_name
[docs] @pytest.fixture def tmp_dataset_path(tmp_path) -> Path: src_dir = get_data_dir("TestEwoksTomo_0010") dst_dir = tmp_path / "TestEwoksTomo_0010" shutil.copytree(src_dir, dst_dir) proj_dir = dst_dir / "projections" slices_dir = dst_dir / "slices" # remove any existing darks/flats and gallery for pattern in ("*_darks.hdf5", "*_flats.hdf5"): for f in proj_dir.glob(pattern): f.unlink() for gallery_parent in (proj_dir, slices_dir): gallery = gallery_parent / "gallery" if gallery.exists(): shutil.rmtree(gallery) references_dir = dst_dir / "references" if references_dir.exists(): shutil.rmtree(references_dir) # generate fresh darks/flats from ewokstomo.tasks.reducedarkflat import ReduceDarkFlat nx = proj_dir / "TestEwoksTomo_0010.nx" rd_task = ReduceDarkFlat( inputs={ "nx_path": str(nx), "dark_reduction_method": "mean", "flat_reduction_method": "median", "overwrite": True, "return_info": False, }, ) rd_task.execute() return dst_dir
[docs] @pytest.fixture def simple_image() -> np.ndarray: return np.linspace(0, 255, num=100, dtype=float).reshape((10, 10))
[docs] def test_auto_bounds_handles_negative_values(): data = np.linspace(-1000.0, 1000.0, num=10000, dtype=float).reshape((100, 100)) lower, upper = _auto_intensity_bounds(data) assert np.isclose(lower, np.percentile(data, 0.01)) assert np.isclose(upper, np.percentile(data, 99.99)) assert lower < 0.0 < upper
[docs] def test_projection_index_from_data_url(): nx = get_data_dir("TestEwoksTomo_0010") / "projections" / "TestEwoksTomo_0010.nx" scan = NXtomoScan(str(nx), entry="entry0000") first_data_url = next(iter(scan.get_proj_angle_url().values())) assert _projection_index_from_data_url(first_data_url) == 4
[docs] def test_normalize_projections_by_beam_intensity(): projections = np.array( [ [[10.0, 20.0], [30.0, 40.0]], [[10.0, 20.0], [30.0, 40.0]], ], dtype=np.float32, ) beam_intensities = np.array([2.0, 8.0], dtype=np.float32) result = _normalize_projections_by_beam_intensity(projections, beam_intensities) expected_mean = (2.0 + 8.0) / 2.0 np.testing.assert_allclose(result[0], projections[0] * (expected_mean / 2.0)) np.testing.assert_allclose(result[1], projections[1] * (expected_mean / 8.0))
[docs] @pytest.mark.order(5) @pytest.mark.parametrize("Task", [BuildProjectionsGallery]) def test_buildgallery_task(Task, tmp_dataset_path): proj_dir = tmp_dataset_path / "projections" nx = proj_dir / "TestEwoksTomo_0010.nx" refs_dir = tmp_dataset_path / "references" dataset_name = nx.stem darks = refs_dir / f"{dataset_name}_darks.hdf5" flats = refs_dir / f"{dataset_name}_flats.hdf5" task = Task( inputs={ "nx_path": str(nx), "reduced_darks_path": str(darks), "reduced_flats_path": str(flats), "output_format": "png", }, ) task.execute() gallery_dir = Path(task.outputs.processed_data_dir) / "gallery" assert gallery_dir.exists(), "Gallery directory does not exist" assert gallery_dir.is_dir(), "Gallery path is not a directory" small_images = sorted( p for p in gallery_dir.glob("*.png") if not p.name.endswith("_large.png") ) large_images = sorted(gallery_dir.glob("*_large.png")) assert len(small_images) == 5, f"Expected 5 small images, found {len(small_images)}" assert len(large_images) == 5, f"Expected 5 large images, found {len(large_images)}" gif_path = gallery_dir / f"{nx.stem}.gif" assert gif_path.exists(), "Projections GIF does not exist" with Image.open(gif_path) as gif: assert gif.format == "GIF", "Projections GIF is not a valid GIF image" assert gif.size == (16, 16), "Projections GIF is not 16x16" assert getattr(gif, "n_frames", 1) >= 1, "Projections GIF has too few frames" colors = gif.getcolors(maxcolors=256) assert colors is not None, "Projections GIF palette is missing" assert len(colors) <= 32, "Projections GIF exceeds 32 colors" for img_path in small_images: with Image.open(img_path) as img: assert img.format == "PNG", f"{img_path.name} is not a valid PNG image" assert img.mode == "L", f"{img_path.name} is not grayscale" assert img.size == (16, 16), f"{img_path.name} not resized to 16x16" arr = np.array(img) assert arr.dtype == np.uint8, f"{img_path.name} not saved as 8-bit" assert arr.max() <= 255 and arr.min() >= 0, ( f"{img_path.name} has out-of-bound pixel values" ) for img_path in large_images: with Image.open(img_path) as img: assert img.format == "PNG", f"{img_path.name} is not a valid PNG image" assert img.mode == "L", f"{img_path.name} is not grayscale" assert img.size[0] > 0 and img.size[1] > 0, ( f"{img_path.name} has invalid dimensions" ) assert max(img.size) <= 1000, ( f"{img_path.name} exceeds the maximum expected dimension" )
[docs] @pytest.mark.order(7) @pytest.mark.parametrize("angle_step,expected_count", [(45, 9), (90, 5), (180, 3)]) def test_buildgallery_angles(angle_step, expected_count, tmp_dataset_path): proj_dir = tmp_dataset_path / "projections" nx = proj_dir / "TestEwoksTomo_0010.nx" refs_dir = tmp_dataset_path / "references" dataset_name = nx.stem darks = refs_dir / f"{dataset_name}_darks.hdf5" flats = refs_dir / f"{dataset_name}_flats.hdf5" gallery_dir = proj_dir / "gallery" if gallery_dir.exists(): shutil.rmtree(gallery_dir) task = BuildProjectionsGallery( inputs={ "nx_path": str(nx), "reduced_darks_path": str(darks), "reduced_flats_path": str(flats), "angle_step": angle_step, "output_format": "png", }, ) task.execute() small_images = list( p for p in gallery_dir.glob("*.png") if not p.name.endswith("_large.png") ) large_images = list(gallery_dir.glob("*_large.png")) assert len(small_images) == expected_count, ( f"Expected {expected_count} resized images, found {len(small_images)}" ) assert len(large_images) == expected_count, ( f"Expected {expected_count} large images, found {len(large_images)}" ) gif_path = gallery_dir / f"{nx.stem}.gif" assert gif_path.exists(), "Projections GIF does not exist"
def _expected_slice_path(root: Path) -> Path: return ( root / "slices" / "TestEwoksTomo_0010slice_000008_plane_XY" / "TestEwoksTomo_0010slice_000008_plane_XY_00008.hdf5" )
[docs] @pytest.mark.order(11) def test_buildslicesgallery_creates_one_image(tmp_dataset_path): slices_gallery_dir = tmp_dataset_path / "slices" / "gallery" if slices_gallery_dir.exists(): shutil.rmtree(slices_gallery_dir) slice_path = _expected_slice_path(tmp_dataset_path) assert slice_path.exists(), f"Missing test input: {slice_path}" task = BuildSlicesGallery( inputs={ "reconstructed_slice_path": str(slice_path), "output_format": "png", "overwrite": True, "image_size": 1000, } ) task.execute() processed_dir = Path(task.outputs.processed_data_dir) out_gallery = Path(task.outputs.gallery_path) out_image = Path(task.outputs.gallery_image_path) large_image = out_image.with_name(f"{out_image.stem}_large{out_image.suffix}") assert out_gallery == processed_dir / "gallery" assert out_gallery.exists() and out_gallery.is_dir() assert out_image.name == f"{slice_path.stem}.png" assert out_image.exists() assert large_image.exists() with Image.open(out_image) as im: assert im.format == "PNG" assert im.mode == "L" assert im.size == (16, 16) arr = np.array(im) assert arr.dtype == np.uint8 assert 0 <= arr.min() <= 255 and 0 <= arr.max() <= 255 assert len(np.unique(arr)) > 1 with Image.open(large_image) as im_large: assert max(im_large.size) <= 1000, ( "Large gallery image exceeds the maximum expected dimension" )
[docs] def test_buildslicesgallery_drops_dataset_name_from_absorption_preview(tmp_path): task = BuildSlicesGallery( inputs={ "reconstructed_slice_path": str( tmp_path / "sample_dataset" / "slices" / "sample_dataset_absorption_xy_00008.hdf5" ), } ) gallery_path = task.get_gallery_file_path( tmp_path / "sample_dataset" / "slices" / "gallery", tmp_path / "sample_dataset" / "slices" / "sample_dataset_absorption_xy_00008.hdf5", "png", ) assert Path(gallery_path).name == "absorption_xy_00008.png"
[docs] @pytest.mark.order(12) def test_buildslicesgallery_overwrite_guard(tmp_dataset_path): slices_gallery_dir = tmp_dataset_path / "slices" / "gallery" if slices_gallery_dir.exists(): shutil.rmtree(slices_gallery_dir) slice_path = _expected_slice_path(tmp_dataset_path) # First run creates the image BuildSlicesGallery( inputs={ "reconstructed_slice_path": str(slice_path), "output_format": "png", "overwrite": True, } ).execute() # Second run with overwrite disabled must raise task = BuildSlicesGallery( inputs={ "reconstructed_slice_path": str(slice_path), "output_format": "png", "overwrite": False, } ) with pytest.raises(RuntimeError): task.execute()
[docs] @pytest.mark.order(13) def test_buildslicesgallery_bounds_and_resize(tmp_dataset_path): slices_gallery_dir = tmp_dataset_path / "slices" / "gallery" if slices_gallery_dir.exists(): shutil.rmtree(slices_gallery_dir) slice_path = _expected_slice_path(tmp_dataset_path) task = BuildSlicesGallery( inputs={ "reconstructed_slice_path": str(slice_path), "output_format": "png", "overwrite": True, "bounds": (50.0, 200.0), "image_size": 1000, } ) task.execute() processed_dir = Path(task.outputs.processed_data_dir) assert task.get_gallery_dir(processed_dir) == str(processed_dir / "gallery") out_image = Path(task.outputs.gallery_image_path) large_image = out_image.with_name(f"{out_image.stem}_large{out_image.suffix}") assert out_image.exists() assert large_image.exists() with Image.open(out_image) as im: assert im.format == "PNG" assert im.mode == "L" assert im.size == (16, 16) arr = np.array(im) assert arr.min() >= 0 and arr.max() <= 255 assert len(np.unique(arr)) >= 2 assert (arr.max() - arr.min()) >= 32 with Image.open(large_image) as im_large: assert max(im_large.size) <= 1000, ( "Large gallery image exceeds the maximum expected dimension" )
[docs] @pytest.mark.order(14) def test_buildvolumegallery_creates_nine_images(tmp_path): volume_dir = ( tmp_path / "visitor" / "ma1234" / "id00" / "20251201" / "PROCESSED_DATA" / "TestEwoksTomo" / "TestEwoksTomo_0010" / "volumes" / "TestEwoksTomo_0010_absorption_16Bit_tiff" ) volume_dir.mkdir(parents=True, exist_ok=True) volume_path = volume_dir / "TestEwoksTomo_0010.tiff" volume = np.arange(8 * 10 * 12, dtype=np.uint16).reshape(8, 10, 12) frames = [Image.fromarray(volume[i]) for i in range(volume.shape[0])] frames[0].save( volume_path, save_all=True, append_images=frames[1:], compression=None ) task = BuildVolumeGallery( inputs={ "reconstructed_volume_path": str(volume_path), "output_format": "png", "overwrite": True, "image_size": 1000, } ) task.execute() gallery_dir = Path(task.outputs.gallery_path) image_paths = [Path(p) for p in task.outputs.gallery_image_paths] large_paths = [p.with_name(f"{p.stem}_large{p.suffix}") for p in image_paths] assert gallery_dir.exists() and gallery_dir.is_dir() assert len(image_paths) == 9 assert len(set(image_paths)) == 9 assert all(p.exists() for p in image_paths) assert all(p.exists() for p in large_paths) for direction in ("xy", "xz", "yz"): direction_images = [p for p in image_paths if f"_{direction}_" in p.name] assert len(direction_images) == 3, ( f"Expected 3 images for direction {direction}, got {len(direction_images)}" ) assert gallery_dir == volume_dir / "gallery" gallery_previews = sorted( p for p in gallery_dir.glob("*.png") if not p.name.endswith("_large.png") ) assert len(gallery_previews) == 9 for p in image_paths: name = p.stem assert name.startswith("absorption_") assert not name.startswith("TestEwoksTomo_0010_") assert name.count("_") >= 2
[docs] @pytest.mark.order(15) def test_buildvolumegallery_overwrite_guard(tmp_path): volume_dir = ( tmp_path / "visitor" / "ma1234" / "id00" / "20251201" / "PROCESSED_DATA" / "TestEwoksTomo" / "TestEwoksTomo_0010" / "volumes" / "TestEwoksTomo_0010_absorption_16Bit_tiff" ) volume_dir.mkdir(parents=True, exist_ok=True) volume_path = volume_dir / "TestEwoksTomo_0010.tiff" volume = ( np.random.RandomState(0).randint(0, 65535, size=(8, 10, 12)).astype(np.uint16) ) frames = [Image.fromarray(volume[i]) for i in range(volume.shape[0])] frames[0].save( volume_path, save_all=True, append_images=frames[1:], compression=None ) BuildVolumeGallery( inputs={ "reconstructed_volume_path": str(volume_path), "output_format": "png", "overwrite": True, } ).execute() task = BuildVolumeGallery( inputs={ "reconstructed_volume_path": str(volume_path), "output_format": "png", "overwrite": False, } ) with pytest.raises(RuntimeError): task.execute()