Source code for ewoksndreg.io.nexus

from itertools import takewhile
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

import h5py
from silx.io import h5py_utils
from silx.io.url import DataUrl


[docs] def common_h5_parent(h5names: List[str]) -> Tuple[str, List[str]]: """ :param h5names: Absolute HDF5 dataset or group names. :returns: Name of the common parent and the relative names with respect to that parent. """ split_h5names = [h5name.split("/") for h5name in h5names] common_parent_groups = list( takewhile(lambda parts: all(p == parts[0] for p in parts), zip(*split_h5names)) ) ncommon = len(common_parent_groups) parent_h5name = "/".join(split_h5names[0][:ncommon]) rel_h5names = ["/".join(parts[ncommon:]) for parts in split_h5names] return parent_h5name, rel_h5names
[docs] def find_nxdata_image_stacks( root_url: Union[str, DataUrl], ) -> Tuple[DataUrl, Dict[str, DataUrl]]: """ :param h5names: Absolute HDF5 dataset or group names. :returns: URL of the common parent and dictionary that maps relative name w.r.t. common parent to URL. """ if not isinstance(root_url, DataUrl): root_url = DataUrl(root_url) filename = root_url.file_path() with h5py_utils.File(filename) as fh: name = root_url.data_path() or "/" root = fh[name] h5names = list() def func(_, h5item): if isinstance(h5item, h5py.Group): nx_class = h5item.attrs.get("NX_class") if nx_class == "NXdata": signal = h5item.attrs.get("signal") if signal: if h5item[signal].ndim == 3: for h5child in h5item.values(): if h5child.ndim == 3: h5names.append(h5child.name) _ = root.visititems(func) if not h5names: raise RuntimeError( f"No NXdata groups found with 3D signals under {root_url.path()!r}" ) parent_h5name, rel_h5names = common_h5_parent(h5names) image_stacks = { key: DataUrl(f"{filename}::{name}") for key, name in zip(rel_h5names, h5names) } common_parent_url = DataUrl(f"{filename}::{parent_h5name}") return common_parent_url, image_stacks
[docs] def nxdata_image_stacks_metadata( common_parent_url: Union[str, DataUrl], image_stacks: Dict[str, Union[str, DataUrl]], top_nx_class: str = "NXprocess", top_name: str = "align", output_root_url: Union[str, DataUrl, None] = None, ) -> Tuple[DataUrl, Dict[str, Any]]: """ :param common_parent_url: URL to the common parent of all image stack URLs. :param image_stacks: URL to image stacks. :param top_nx_class: NX_class of the parent group which needs to be renamed to `top_name`. :param top_name: New top HDF5 group name. :param output_root_url: output root URL. :returns: output root URL and HDF5/NeXus metadata relative to the file root following the Silx dictdump schema. """ if not isinstance(common_parent_url, DataUrl): common_parent_url = DataUrl(common_parent_url) if output_root_url is not None and not isinstance(output_root_url, DataUrl): output_root_url = DataUrl(output_root_url) # For example: # common_parent_name = "/entry/process/results" # common_parent_groups = ["", "entry", "process", "results"] common_parent_name = common_parent_url.data_path() common_parent_groups = common_parent_name.split("/") filename = common_parent_url.file_path() with h5py_utils.File(filename) as fh: common_parent = fh[common_parent_name] # Classes of the common parent groups # For example # nxclasses = ["NXroot", "NXentry", "NXprocess", "NXcollection"] parent = common_parent common_parent_group_attrs = [dict(parent.attrs)] while parent.name != "/": parent = parent.parent common_parent_group_attrs.append(dict(parent.attrs)) common_parent_group_attrs = common_parent_group_attrs[::-1] nxclasses = [attrs.get("NX_class") for attrs in common_parent_group_attrs] # Find the top level to replace it with a new name # For example # common_parent_groups = ["", "entry", "align", "results"] if top_nx_class in nxclasses: top_level = nxclasses.index(top_nx_class) else: top_level = len(nxclasses) - 1 original_top_name = common_parent_groups[top_level] common_parent_groups[top_level] = top_name nxclasses[top_level] = top_nx_class common_parent_group_attrs[top_level]["NX_class"] = top_nx_class if top_level > 0: top_parent_attrs = common_parent_group_attrs[top_level - 1] original_top_parent_default = top_parent_attrs.get("default") if original_top_name == original_top_parent_default: top_parent_attrs["default"] = top_name # Replace the common input groups with the requested common groups if output_root_url: output_root_file_path = output_root_url.file_path() if output_root_url.data_path(): requested_common_parent_groups = output_root_url.data_path().split("/") else: requested_common_parent_groups = [""] nextra = len(requested_common_parent_groups) - len(common_parent_groups) if nextra <= 0: nrequested = len(requested_common_parent_groups) common_parent_groups[:nrequested] = requested_common_parent_groups else: common_parent_groups = requested_common_parent_groups common_parent_group_attrs += [{"NX_class": "NXcollection"}] * nextra nxclasses += ["NXcollection"] * nextra else: output_root_file_path = filename output_root_name = "/".join(common_parent_groups) # Metadata of the common parent groups output_metadata = {} common_output_metadata = output_metadata for name, attrs in zip(common_parent_groups, common_parent_group_attrs): if name: common_output_metadata[name] = dict() common_output_metadata = common_output_metadata[name] for key, value in attrs.items(): common_output_metadata[f"@{key}"] = value # Metadata of the NXdata group(s) top_nxdata_is_annotated = False for dset_relname, dset_url in image_stacks.items(): dset_parts = dset_relname.split("/") nxdata_is_common_parent = len(dset_parts) == 1 if nxdata_is_common_parent: if top_nxdata_is_annotated: # NXdata metadata is already read in a previous iteration continue # Read NXdata metadata nxdata_metadata = _get_nxdata_metadata(fh, dset_url) common_output_metadata.update(nxdata_metadata) top_nxdata_is_annotated = True else: # Get the metadata of the parents parent_metadata = common_output_metadata for s in dset_parts[:-2]: if s not in parent_metadata: parent_metadata[s] = {"@NX_class": "NXcollection"} parent_metadata = parent_metadata[s] name_in_nxdata = dset_parts[-2] nxdata_is_annotated = name_in_nxdata in parent if nxdata_is_annotated: # NXdata metadata is already read in a previous iteration continue # Read NXdata metadata nxdata_metadata = _get_nxdata_metadata(fh, dset_url) nxdata_metadata = _get_nxdata_metadata(fh, dset_url) parent_metadata[name_in_nxdata] = nxdata_metadata output_root_url = DataUrl(f"{output_root_file_path}::{output_root_name}") return output_root_url, output_metadata
def _get_nxdata_metadata(fh: h5py.File, dset_url: Union[str, DataUrl]) -> dict: """NXdata metadata includes all HDF5 attributes and axes field values.""" if not isinstance(dset_url, DataUrl): dset_url = DataUrl(dset_url) nxdata = fh[dset_url.data_path()].parent nxdata_metadata = {f"@{k}": v for k, v in nxdata.attrs.items()} for name in nxdata.attrs.get("axes", []): nxdata_metadata[name] = nxdata[name][()] return nxdata_metadata
[docs] def nx_annotate( treedict: Dict, h5item: Union[h5py.Group, h5py.Dataset, str, DataUrl], **open_options, ) -> None: """Like dicttonx from Silx but recursive addition of groups and datasets and modifying of attributes. """ if isinstance(h5item, str): h5item = DataUrl(h5item) if isinstance(h5item, DataUrl): _ = open_options.setdefault("mode", "a") with h5py_utils.File(h5item.file_path(), **open_options) as fh: h5item = fh[h5item.data_path() or "/"] _dicttonx(treedict, h5item) else: _dicttonx(treedict, h5item)
def _dicttonx(treedict: Dict, h5item: Union[h5py.Group, h5py.Dataset]) -> None: child_attrs = dict() for key, value in treedict.items(): if "@" in key: child_name, _, attr_name = key.partition("@") if child_name: child_attrs[(child_name, attr_name)] = value else: h5item.attrs[attr_name] = value elif isinstance(value, dict): h5group = h5item.require_group(key) _dicttonx(value, h5group) else: if key not in h5item: h5item[key] = value for (child_name, attr_name), value in child_attrs.items(): h5child = h5item[child_name] if attr_name not in h5child.attrs: h5child.attrs[attr_name] = value