from pathlib import Path
from ewokscore import Task
from ewokscore.model import BaseInputModel, BaseOutputModel
from esrf_pathlib import ESRFPath
from pydantic import Field
[docs]
class TomoPath(ESRFPath, tomo=1, fallback_depth=0): # type: ignore[call-arg]
pass
def _create_symlink(
target: Path, source: Path, overwrite: bool, target_is_directory: bool
) -> Path:
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists() or target.is_symlink():
if overwrite:
target.unlink()
elif target.is_symlink() and target.resolve() == source:
return target
else:
raise FileExistsError(
f"Symlink target already exists and overwrite is False: {target}"
)
target.symlink_to(source, target_is_directory=target_is_directory)
return target
[docs]
class LinkSlicesOutputModel(BaseOutputModel):
reconstructed_slice_path: str = Field(
..., description="Original reconstructed slice path (pass-through)."
)
slices_link_collection_file: str | None = Field(
default=None,
description="Symlink path under PROCESSED_DATA/<collection>/SLICES, if created.",
)
slices_link_root_file: str | None = Field(
default=None,
description="Symlink path under PROCESSED_DATA/SLICES, if created.",
)
[docs]
class LinkVolumesOutputModel(BaseOutputModel):
reconstructed_volume_path: str = Field(
..., description="Original reconstructed volume path (pass-through)."
)
volumes_link_collection_file: str | None = Field(
default=None,
description="Symlink path under PROCESSED_DATA/<collection>/VOLUMES, if created.",
)
volumes_link_root_file: str | None = Field(
default=None,
description="Symlink path under PROCESSED_DATA/VOLUMES, if created.",
)
[docs]
class LinkSlices( # type: ignore[call-arg]
Task,
input_model=LinkSlicesInputModel,
output_model=LinkSlicesOutputModel,
):
"""Create ESRF tomo symlinks for a reconstructed slice in SLICES directories."""
[docs]
def run(self):
source = Path(self.inputs.reconstructed_slice_path)
if not source.exists():
raise FileNotFoundError(f"Reconstructed slice not found: {source}")
self.outputs.reconstructed_slice_path = str(source)
self.outputs.slices_link_collection_file = None
self.outputs.slices_link_root_file = None
tomo_path = TomoPath(str(source))
if (
tomo_path.schema_name != "tomo_v1"
or tomo_path.template_name != "slices_file"
):
return
overwrite = self.inputs.overwrite
resolved_source = source.resolve()
collection_target = Path(
tomo_path.replace_fields(template_name="slices_link_collection_file")
)
root_target = Path(
tomo_path.replace_fields(template_name="slices_link_root_file")
)
self.outputs.slices_link_collection_file = str(
_create_symlink(
target=collection_target,
source=resolved_source,
overwrite=overwrite,
target_is_directory=False,
)
)
self.outputs.slices_link_root_file = str(
_create_symlink(
target=root_target,
source=resolved_source,
overwrite=overwrite,
target_is_directory=False,
)
)
[docs]
class LinkVolumes( # type: ignore[call-arg]
Task,
input_model=LinkVolumesInputModel,
output_model=LinkVolumesOutputModel,
):
"""Create ESRF tomo symlinks for a reconstructed volume in VOLUMES directories."""
[docs]
def run(self):
source = Path(self.inputs.reconstructed_volume_path)
if not source.exists():
raise FileNotFoundError(f"Reconstructed volume not found: {source}")
self.outputs.reconstructed_volume_path = str(source)
self.outputs.volumes_link_collection_file = None
self.outputs.volumes_link_root_file = None
volume_container = source.parent
tomo_path = TomoPath(str(volume_container))
if tomo_path.schema_name != "tomo_v1" or tomo_path.template_name not in (
"volumes_file",
"volumes_custom_type_file",
):
return
overwrite = self.inputs.overwrite
resolved_source = volume_container.resolve()
collection_target = Path(
tomo_path.replace_fields(template_name="volumes_link_collection_file")
)
root_target = Path(
tomo_path.replace_fields(template_name="volumes_link_root_file")
)
self.outputs.volumes_link_collection_file = str(
_create_symlink(
target=collection_target,
source=resolved_source,
overwrite=overwrite,
target_is_directory=True,
)
)
self.outputs.volumes_link_root_file = str(
_create_symlink(
target=root_target,
source=resolved_source,
overwrite=overwrite,
target_is_directory=True,
)
)