Source code for ewoksndreg.io.nexus

from itertools import takewhile
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union

import h5py
from silx.io import h5py_utils
from silx.io.url import DataUrl


[docs] def common_h5_parent(h5names: List[str]) -> Tuple[str, List[str]]: """ :param h5names: Absolute HDF5 dataset or group names. :returns: Name of the common parent and the relative names with respect to that parent. """ split_h5names = [h5name.split("/") for h5name in h5names] common_parent_groups = list( takewhile(lambda parts: all(p == parts[0] for p in parts), zip(*split_h5names)) ) ncommon = len(common_parent_groups) parent_h5name = "/".join(split_h5names[0][:ncommon]) rel_h5names = ["/".join(parts[ncommon:]) for parts in split_h5names] return parent_h5name, rel_h5names
[docs] def find_nxdata_image_stacks( root_url: Union[str, DataUrl], ) -> Tuple[DataUrl, Dict[str, DataUrl]]: """ :param h5names: Absolute HDF5 dataset or group names. :returns: URL of the common parent and dictionary that maps relative name w.r.t. common parent to URL. """ if not isinstance(root_url, DataUrl): root_url = DataUrl(root_url) filename = root_url.file_path() with h5py_utils.File(filename) as fh: name = root_url.data_path() or "/" root = fh[name] h5names = list() def func(_, h5item): if isinstance(h5item, h5py.Group): nx_class = h5item.attrs.get("NX_class") if nx_class == "NXdata": signal = h5item.attrs.get("signal") if signal: if h5item[signal].ndim == 3: for h5child in h5item.values(): if h5child.ndim == 3: h5names.append(h5child.name) _ = root.visititems(func) if not h5names: raise RuntimeError( f"No NXdata groups found with 3D signals under {root_url.path()!r}" ) parent_h5name, rel_h5names = common_h5_parent(h5names) image_stacks = { key: DataUrl(f"{filename}::{name}") for key, name in zip(rel_h5names, h5names) } common_parent_url = DataUrl(f"{filename}::{parent_h5name}") return common_parent_url, image_stacks
[docs] def nxdata_image_stacks_metadata( common_parent_url: Union[str, DataUrl], image_stacks: Dict[str, Union[str, DataUrl]], top_nx_class: str = "NXprocess", top_name: str = "align", output_root_url: Union[str, DataUrl, None] = None, ) -> Tuple[DataUrl, dict]: """ :param common_parent_url: URL to the common parent of all image stack URLs. :param image_stacks: URL to image stacks. :param top_nx_class: NX_class of the parent group which needs to be renamed to `top_name`. :param top_name: New top HDF5 group name. :param output_root_url: output root URL. :returns: output root URL and HDF5/NeXus metadata relative to the file root following the Silx dictdump schema. """ if not isinstance(common_parent_url, DataUrl): common_parent_url = DataUrl(common_parent_url) if output_root_url is not None and not isinstance(output_root_url, DataUrl): output_root_url = DataUrl(output_root_url) # For example: # common_parent_name = "/entry/process/results" # common_parent_groups = ["", "entry", "process", "results"] common_parent_name = common_parent_url.data_path() common_parent_groups = common_parent_name.split("/") filename = common_parent_url.file_path() with h5py_utils.File(filename) as fh: common_parent = fh[common_parent_name] # Classes of the common parent groups # For example # nxclasses = ["NXroot", "NXentry", "NXprocess", "NXcollection"] parent = common_parent common_parent_group_attrs = [dict(parent.attrs)] while parent.name != "/": parent = parent.parent common_parent_group_attrs.append(dict(parent.attrs)) common_parent_group_attrs = common_parent_group_attrs[::-1] nxclasses = [attrs.get("NX_class") for attrs in common_parent_group_attrs] # Find the top level to replace it with a new name # For example # common_parent_groups = ["", "entry", "align", "results"] if top_nx_class in nxclasses: top_level = nxclasses.index(top_nx_class) else: top_level = len(nxclasses) - 1 original_top_name = common_parent_groups[top_level] common_parent_groups[top_level] = top_name nxclasses[top_level] = top_nx_class common_parent_group_attrs[top_level]["NX_class"] = top_nx_class if top_level > 0: top_parent_attrs = common_parent_group_attrs[top_level - 1] original_top_parent_default = top_parent_attrs.get("default") if original_top_name == original_top_parent_default: top_parent_attrs["default"] = top_name # Replace the common input groups with the requested common groups if output_root_url: output_root_file_path = output_root_url.file_path() if output_root_url.data_path(): requested_common_parent_groups = output_root_url.data_path().split("/") else: requested_common_parent_groups = [""] nextra = len(requested_common_parent_groups) - len(common_parent_groups) if nextra <= 0: nrequested = len(requested_common_parent_groups) common_parent_groups[:nrequested] = requested_common_parent_groups else: common_parent_groups = requested_common_parent_groups common_parent_group_attrs += [{"NX_class": "NXcollection"}] * nextra nxclasses += ["NXcollection"] * nextra else: output_root_file_path = filename output_root_name = "/".join(common_parent_groups) # Metadata of the common parent groups output_metadata = {} common_output_metadata = output_metadata for name, attrs in zip(common_parent_groups, common_parent_group_attrs): if name: common_output_metadata[name] = dict() common_output_metadata = common_output_metadata[name] for key, value in attrs.items(): common_output_metadata[f"@{key}"] = value # Metadata of the NXdata group(s) top_nxdata_is_annotated = False for dset_relname, dset_url in image_stacks.items(): dset_parts = dset_relname.split("/") nxdata_is_common_parent = len(dset_parts) == 1 if nxdata_is_common_parent: if top_nxdata_is_annotated: # NXdata metadata is already read in a previous iteration continue # Read NXdata metadata nxdata_metadata = _get_nxdata_metadata(fh, dset_url) common_output_metadata.update(nxdata_metadata) top_nxdata_is_annotated = True else: # Get the metadata of the parents parent_metadata = common_output_metadata for s in dset_parts[:-2]: if s not in parent_metadata: parent_metadata[s] = {"@NX_class": "NXcollection"} parent_metadata = parent_metadata[s] name_in_nxdata = dset_parts[-2] nxdata_is_annotated = name_in_nxdata in parent if nxdata_is_annotated: # NXdata metadata is already read in a previous iteration continue # Read NXdata metadata nxdata_metadata = _get_nxdata_metadata(fh, dset_url) nxdata_metadata = _get_nxdata_metadata(fh, dset_url) parent_metadata[name_in_nxdata] = nxdata_metadata output_root_url = DataUrl(f"{output_root_file_path}::{output_root_name}") return output_root_url, output_metadata
def _get_nxdata_metadata(fh: h5py.File, dset_url: Union[str, DataUrl]) -> dict: """NXdata metadata includes all HDF5 attributes and axes field values.""" if not isinstance(dset_url, DataUrl): dset_url = DataUrl(dset_url) nxdata = fh[dset_url.data_path()].parent nxdata_metadata = {f"@{k}": v for k, v in nxdata.attrs.items()} for name in nxdata.attrs.get("axes", []): nxdata_metadata[name] = nxdata[name][()] return nxdata_metadata
[docs] def nx_annotate( treedict: Dict, h5item: Union[h5py.Group, h5py.Dataset, str, DataUrl], **open_options, ) -> None: """Like dicttonx from Silx but recursive addition of groups and datasets and modifying of attributes. """ if isinstance(h5item, str): h5item = DataUrl(h5item) if isinstance(h5item, DataUrl): _ = open_options.setdefault("mode", "a") with h5py_utils.File(h5item.file_path(), **open_options) as fh: h5item = fh[h5item.data_path() or "/"] _dicttonx(treedict, h5item) else: _dicttonx(treedict, h5item)
def _dicttonx(treedict: Dict, h5item: Union[h5py.Group, h5py.Dataset]) -> None: child_attrs = dict() for key, value in treedict.items(): if "@" in key: child_name, _, attr_name = key.partition("@") if child_name: child_attrs[(child_name, attr_name)] = value else: h5item.attrs[attr_name] = value elif isinstance(value, dict): h5group = h5item.require_group(key) _dicttonx(value, h5group) else: if key not in h5item: h5item[key] = value for (child_name, attr_name), value in child_attrs.items(): h5child = h5item[child_name] if attr_name not in h5child.attrs: h5child.attrs[attr_name] = value