Source code for ewoksndreg.io.nexus
from itertools import takewhile
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
import h5py
from silx.io import h5py_utils
from silx.io.url import DataUrl
[docs]
def common_h5_parent(h5names: List[str]) -> Tuple[str, List[str]]:
"""
:param h5names: Absolute HDF5 dataset or group names.
:returns: Name of the common parent and the relative names with respect to that parent.
"""
split_h5names = [h5name.split("/") for h5name in h5names]
common_parent_groups = list(
takewhile(lambda parts: all(p == parts[0] for p in parts), zip(*split_h5names))
)
ncommon = len(common_parent_groups)
parent_h5name = "/".join(split_h5names[0][:ncommon])
rel_h5names = ["/".join(parts[ncommon:]) for parts in split_h5names]
return parent_h5name, rel_h5names
[docs]
def find_nxdata_image_stacks(
root_url: Union[str, DataUrl],
) -> Tuple[DataUrl, Dict[str, DataUrl]]:
"""
:param h5names: Absolute HDF5 dataset or group names.
:returns: URL of the common parent and dictionary that maps relative name w.r.t. common parent to URL.
"""
if not isinstance(root_url, DataUrl):
root_url = DataUrl(root_url)
filename = root_url.file_path()
with h5py_utils.File(filename) as fh:
name = root_url.data_path() or "/"
root = fh[name]
h5names = list()
def func(_, h5item):
if isinstance(h5item, h5py.Group):
nx_class = h5item.attrs.get("NX_class")
if nx_class == "NXdata":
signal = h5item.attrs.get("signal")
if signal:
if h5item[signal].ndim == 3:
for h5child in h5item.values():
if h5child.ndim == 3:
h5names.append(h5child.name)
_ = root.visititems(func)
if not h5names:
raise RuntimeError(
f"No NXdata groups found with 3D signals under {root_url.path()!r}"
)
parent_h5name, rel_h5names = common_h5_parent(h5names)
image_stacks = {
key: DataUrl(f"{filename}::{name}") for key, name in zip(rel_h5names, h5names)
}
common_parent_url = DataUrl(f"{filename}::{parent_h5name}")
return common_parent_url, image_stacks
[docs]
def nxdata_image_stacks_metadata(
common_parent_url: Union[str, DataUrl],
image_stacks: Dict[str, Union[str, DataUrl]],
top_nx_class: str = "NXprocess",
top_name: str = "align",
output_root_url: Union[str, DataUrl, None] = None,
) -> Tuple[DataUrl, Dict[str, Any]]:
"""
:param common_parent_url: URL to the common parent of all image stack URLs.
:param image_stacks: URL to image stacks.
:param top_nx_class: NX_class of the parent group which needs to be renamed to `top_name`.
:param top_name: New top HDF5 group name.
:param output_root_url: output root URL.
:returns: output root URL and HDF5/NeXus metadata relative to the file root following the Silx dictdump schema.
"""
if not isinstance(common_parent_url, DataUrl):
common_parent_url = DataUrl(common_parent_url)
if output_root_url is not None and not isinstance(output_root_url, DataUrl):
output_root_url = DataUrl(output_root_url)
# For example:
# common_parent_name = "/entry/process/results"
# common_parent_groups = ["", "entry", "process", "results"]
common_parent_name = common_parent_url.data_path()
common_parent_groups = common_parent_name.split("/")
filename = common_parent_url.file_path()
with h5py_utils.File(filename) as fh:
common_parent = fh[common_parent_name]
# Classes of the common parent groups
# For example
# nxclasses = ["NXroot", "NXentry", "NXprocess", "NXcollection"]
parent = common_parent
common_parent_group_attrs = [dict(parent.attrs)]
while parent.name != "/":
parent = parent.parent
common_parent_group_attrs.append(dict(parent.attrs))
common_parent_group_attrs = common_parent_group_attrs[::-1]
nxclasses = [attrs.get("NX_class") for attrs in common_parent_group_attrs]
# Find the top level to replace it with a new name
# For example
# common_parent_groups = ["", "entry", "align", "results"]
if top_nx_class in nxclasses:
top_level = nxclasses.index(top_nx_class)
else:
top_level = len(nxclasses) - 1
original_top_name = common_parent_groups[top_level]
common_parent_groups[top_level] = top_name
nxclasses[top_level] = top_nx_class
common_parent_group_attrs[top_level]["NX_class"] = top_nx_class
if top_level > 0:
top_parent_attrs = common_parent_group_attrs[top_level - 1]
original_top_parent_default = top_parent_attrs.get("default")
if original_top_name == original_top_parent_default:
top_parent_attrs["default"] = top_name
# Replace the common input groups with the requested common groups
if output_root_url:
output_root_file_path = output_root_url.file_path()
if output_root_url.data_path():
requested_common_parent_groups = output_root_url.data_path().split("/")
else:
requested_common_parent_groups = [""]
nextra = len(requested_common_parent_groups) - len(common_parent_groups)
if nextra <= 0:
nrequested = len(requested_common_parent_groups)
common_parent_groups[:nrequested] = requested_common_parent_groups
else:
common_parent_groups = requested_common_parent_groups
common_parent_group_attrs += [{"NX_class": "NXcollection"}] * nextra
nxclasses += ["NXcollection"] * nextra
else:
output_root_file_path = filename
output_root_name = "/".join(common_parent_groups)
# Metadata of the common parent groups
output_metadata = {}
common_output_metadata = output_metadata
for name, attrs in zip(common_parent_groups, common_parent_group_attrs):
if name:
common_output_metadata[name] = dict()
common_output_metadata = common_output_metadata[name]
for key, value in attrs.items():
common_output_metadata[f"@{key}"] = value
# Metadata of the NXdata group(s)
top_nxdata_is_annotated = False
for dset_relname, dset_url in image_stacks.items():
dset_parts = dset_relname.split("/")
nxdata_is_common_parent = len(dset_parts) == 1
if nxdata_is_common_parent:
if top_nxdata_is_annotated:
# NXdata metadata is already read in a previous iteration
continue
# Read NXdata metadata
nxdata_metadata = _get_nxdata_metadata(fh, dset_url)
common_output_metadata.update(nxdata_metadata)
top_nxdata_is_annotated = True
else:
# Get the metadata of the parents
parent_metadata = common_output_metadata
for s in dset_parts[:-2]:
if s not in parent_metadata:
parent_metadata[s] = {"@NX_class": "NXcollection"}
parent_metadata = parent_metadata[s]
name_in_nxdata = dset_parts[-2]
nxdata_is_annotated = name_in_nxdata in parent
if nxdata_is_annotated:
# NXdata metadata is already read in a previous iteration
continue
# Read NXdata metadata
nxdata_metadata = _get_nxdata_metadata(fh, dset_url)
nxdata_metadata = _get_nxdata_metadata(fh, dset_url)
parent_metadata[name_in_nxdata] = nxdata_metadata
output_root_url = DataUrl(f"{output_root_file_path}::{output_root_name}")
return output_root_url, output_metadata
def _get_nxdata_metadata(fh: h5py.File, dset_url: Union[str, DataUrl]) -> dict:
"""NXdata metadata includes all HDF5 attributes and axes field values."""
if not isinstance(dset_url, DataUrl):
dset_url = DataUrl(dset_url)
nxdata = fh[dset_url.data_path()].parent
nxdata_metadata = {f"@{k}": v for k, v in nxdata.attrs.items()}
for name in nxdata.attrs.get("axes", []):
nxdata_metadata[name] = nxdata[name][()]
return nxdata_metadata
[docs]
def nx_annotate(
treedict: Dict,
h5item: Union[h5py.Group, h5py.Dataset, str, DataUrl],
**open_options,
) -> None:
"""Like dicttonx from Silx but recursive addition of groups and datasets
and modifying of attributes.
"""
if isinstance(h5item, str):
h5item = DataUrl(h5item)
if isinstance(h5item, DataUrl):
_ = open_options.setdefault("mode", "a")
with h5py_utils.File(h5item.file_path(), **open_options) as fh:
h5item = fh[h5item.data_path() or "/"]
_dicttonx(treedict, h5item)
else:
_dicttonx(treedict, h5item)
def _dicttonx(treedict: Dict, h5item: Union[h5py.Group, h5py.Dataset]) -> None:
child_attrs = dict()
for key, value in treedict.items():
if "@" in key:
child_name, _, attr_name = key.partition("@")
if child_name:
child_attrs[(child_name, attr_name)] = value
else:
h5item.attrs[attr_name] = value
elif isinstance(value, dict):
h5group = h5item.require_group(key)
_dicttonx(value, h5group)
else:
if key not in h5item:
h5item[key] = value
for (child_name, attr_name), value in child_attrs.items():
h5child = h5item[child_name]
if attr_name not in h5child.attrs:
h5child.attrs[attr_name] = value