Source code for orsopy.fileio.base

"""
Implementation of the base classes for the ORSO header.
"""

import datetime
import json
import os.path
import pathlib
import re
import sys
import warnings

from collections.abc import Mapping
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from inspect import isclass
from typing import Any, Dict, Generator, List, Optional, TextIO, Tuple, Union

import numpy as np
import yaml
import yaml.constructor

# typing stuff introduced in python 3.8
try:
    from typing import Literal, get_args, get_origin
except ImportError:
    from .typing_backport import Literal, get_args, get_origin

from dataclasses import MISSING, dataclass, field, fields


def _noop(self, *args, **kw):
    pass


JSON_MIMETYPE = "application/json"

yaml.emitter.Emitter.process_tag = _noop

# make sure that datetime strings get loaded as str not datetime instances
yaml.constructor.SafeConstructor.yaml_constructors["tag:yaml.org,2002:timestamp"] = (
    yaml.constructor.SafeConstructor.yaml_constructors["tag:yaml.org,2002:str"]
)


[docs] class ORSOResolveError(ValueError): pass
[docs] class ORSOSchemaWarning(RuntimeWarning): pass
[docs] class OrsoDumper(yaml.SafeDumper):
[docs] def represent_data(self, data): if hasattr(data, "yaml_representer"): return data.yaml_representer(self) elif isinstance(data, datetime.datetime): value = data.isoformat("T") return super().represent_scalar("tag:yaml.org,2002:timestamp", value) elif np.isscalar(data) and hasattr(data, "item"): # If data is a numpy scalar, convert to a python object return super().represent_data(data.item()) else: return super().represent_data(data)
unit_registry = None
[docs] def get_unit_registry(): global unit_registry if unit_registry is None: import pint unit_registry = pint.UnitRegistry() # unit_registry.define("Angstrom = angstrom") # optional extra units return unit_registry
[docs] @dataclass class ErrorValue(Header): """ Information about errors on a value. """ error_value: float error_type: Optional[Literal["uncertainty", "resolution"]] = None value_is: Optional[Literal["sigma", "FWHM"]] = None distribution: Optional[Literal["gaussian", "triangular", "uniform", "lorentzian"]] = None yaml_representer = Header.yaml_representer_compact @property def sigma(self): """ Return value converted to standard deviation. The conversion factors can be found in common statistics and experimental physics text books or derived manually solving the variance definition integral. (e.g. Dekking, Michel (2005). A modern introduction to probability and statistics : understanding why and how. Springer, London, UK:) Values and some references available on Wikipedia, too. """ if self.value_is == "FWHM": from math import log, sqrt value = self.error_value if self.distribution in ["gaussian", None]: # Solving for the gaussian function = 0.5 yields: return value / (2.0 * sqrt(2.0 * log(2.0))) elif self.distribution == "triangular": # See solution of integral e.g. https://math.stackexchange.com/questions/4271314/ # what-is-the-proof-for-variance-of-triangular-distribution/4273147#4273147 # setting c=0 and a=b=FWHM for the symmetric triangle around 0. return value / sqrt(6.0) elif self.distribution == "uniform": # Variance is just the integral of x² from -0.5*FWHM to 0.5*FWHM => 1/12. return value / sqrt(12.0) elif self.distribution == "lorentzian": raise ValueError("Lorentzian distribution does not have a sigma value") else: raise NotImplementedError(f"Unknown distribution {self.distribution}") else: # Value is already sigma return self.error_value
[docs] @dataclass class Value(Header): """ A value or list of values with an optional unit. """ magnitude: float unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"}) error: Optional[ErrorValue] = None offset: Optional[float] = field( default=None, metadata={ "description": "The offset applied to a value (e.g. motor) to retrieve the reported (corrected) magnitude" }, ) yaml_representer = Header.yaml_representer_compact def __repr__(self): """ Make representation more readability by removing names of default arguments. """ output = super().__repr__() output = output.replace("magnitude=", "") output = output.replace("unit=", "") return output
[docs] def as_unit(self, output_unit): """ Returns the value as converted to the given unit. """ if output_unit == self.unit: return self.magnitude unit_registry = get_unit_registry() val = self.magnitude * unit_registry(self.unit) return val.to(output_unit).magnitude
[docs] @dataclass class ComplexValue(Header): """ A value or list of values with an optional unit. """ real: float imag: Optional[float] = None unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"}) error: Optional[ErrorValue] = None yaml_representer = Header.yaml_representer_compact def __repr__(self): """ Make representation more readability by removing names of default arguments. """ output = super().__repr__() output = output.replace("real=", "") output = output.replace("imag=", "") output = output.replace("unit=", "") return output
[docs] def as_unit(self, output_unit): """ Returns the complex value as converted to the given unit. """ if self.imag is None: value = self.real + 0j else: value = self.real + 1j * self.imag if output_unit == self.unit: return value unit_registry = get_unit_registry() val = value * unit_registry(self.unit) return val.to(output_unit).magnitude
[docs] @dataclass class ValueRange(Header): """ A range or list of ranges with mins, maxs, and an optional unit. """ min: float max: float unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"}) individual_magnitudes: Optional[List[float]] = field( default=None, metadata={ "description": "Can list each individual value " "that was present during the experiment, only for information." }, ) offset: Optional[float] = field( default=None, metadata={ "description": "The offset applied to a value (e.g. motor) to retrieve the reported (corrected) min/max" }, ) yaml_representer = Header.yaml_representer_compact
[docs] def as_unit(self, output_unit): """ Returns a (min, max) tuple of values as converted to the given unit. """ if output_unit == self.unit: return (self.min, self.max) unit_registry = get_unit_registry() vmin = self.min * unit_registry(self.unit) vmax = self.max * unit_registry(self.unit) return (vmin.to(output_unit).magnitude, vmax.to(output_unit).magnitude)
[docs] @dataclass class ValueVector(Header): """ A vector or list of vectors with an optional unit. For vectors relating to the sample, such as polarisation, the follow definitions are used. :param x: is defined as parallel to the radiation beam, positive going with the beam direction. :param y: is defined from the other two based on the right hand rule. :param z: is defined as normal to the sample surface, positive direction in scattering direction. :param unit: SI unit string. """ x: float y: float z: float unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"}) error: Optional[ErrorValue] = None yaml_representer = Header.yaml_representer_compact
[docs] def as_unit(self, output_unit): """ Returns a (x, y, z) tuple of values as converted to the given unit. """ if output_unit == self.unit: return (self.x, self.y, self.z) unit_registry = get_unit_registry() vx = self.x * unit_registry(self.unit) vy = self.y * unit_registry(self.unit) vz = self.z * unit_registry(self.unit) return (vx.to(output_unit).magnitude, vy.to(output_unit).magnitude, vz.to(output_unit).magnitude)
[docs] @dataclass class AlternatingField(Header): """ A physical field with regular variations as AC magnetic field. """ amplitude: Value frequency: Value phase: Optional[Value] = None
[docs] @dataclass class Person(Header): """ Information about a person, including name, affiliation(s), and contact information. """ name: str affiliation: str contact: Optional[str] = field(default=None, metadata={"description": "Contact (email) address"})
[docs] @dataclass class Column(Header): """ Information about a data column. """ name: str unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"}) physical_quantity: Optional[str] = field( default=None, metadata={ "description": "A description of the physical meaning for this column. " "For quantities defined by ORSO in header metadata the same name should be used." "(For example 'wavelength' or 'incident_angle' to indicate a column specifying " "those quantities on a point-by-point basis.)" "For canonical names of physical quantities see https://www.reflectometry.org/file_format/specification." }, ) flag_is: Optional[List[str]] = field( default=None, metadata={"description": "A list of items that a flag-value in this column stands for."} ) yaml_representer = Header.yaml_representer_compact
[docs] @dataclass class ErrorColumn(Header): """ Information about a data column. """ error_of: str error_type: Optional[Literal["uncertainty", "resolution"]] = None value_is: Optional[Literal["sigma", "FWHM"]] = None distribution: Optional[Literal["gaussian", "triangular", "uniform", "lorentzian"]] = None yaml_representer = Header.yaml_representer_compact @property def name(self): """ A convenience property to allow programs to get a valid name attribute for any column. """ return f"s{self.error_of}" @property def to_sigma(self): """ The multiplicative factor needed to convert a FWHM to sigma. The conversion factors can be found in common statistics and experimental physics text books or derived manually solving the variance definition integral. (e.g. Dekking, Michel (2005). A modern introduction to probability and statistics : understanding why and how. Springer, London, UK:) Values and some references available on Wikipedia, too. """ if self.value_is == "FWHM": from math import log, sqrt if self.distribution in ["gaussian", None]: # Solving for the gaussian function = 0.5 yields: return 1.0 / (2.0 * sqrt(2.0 * log(2.0))) elif self.distribution == "triangular": # See solution of integral e.g. https://math.stackexchange.com/questions/4271314/ # what-is-the-proof-for-variance-of-triangular-distribution/4273147#4273147 # setting c=0 and a=b=FWHM for the symmetric triangle around 0. return 1.0 / sqrt(6.0) elif self.distribution == "uniform": # Variance is just the integral of x² from -0.5*FWHM to 0.5*FWHM => 1/12. return 1.0 / sqrt(12.0) elif self.distribution == "lorentzian": raise ValueError("Lorentzian distribution does not have a sigma value") else: raise NotImplementedError(f"Unknown distribution {self.distribution}") else: # Value is already sigma return 1.0
[docs] @dataclass class ContentHash(Header): """ A hash of some content, using standard algorithms """ digest: str algorithm: Literal["sha1", "sha256", "sha384", "sha512", "sha3_256", "sha3_512"] @staticmethod def _get_digest_function(): import hashlib if hasattr(hashlib, "file_digest"): return hashlib.file_digest else: # pre python 3.11 this function did not exist BUF_SIZE = 65536 def file_digest(f, algorithm): digest = getattr(hashlib, algorithm)() while True: data = f.read(BUF_SIZE) if not data: return digest digest.update(data) return file_digest
[docs] @classmethod def from_file( cls, fname: Union[TextIO, str], algorithm: Literal["sha1", "sha256", "sha384", "sha512", "sha3_256", "sha3_512"] = "sha1", ): file_digest = cls._get_digest_function() with _possibly_open_file(fname, "rb") as f: digest = file_digest(f, algorithm) return ContentHash(digest=digest.hexdigest(), algorithm=algorithm)
[docs] def check_file(self, fname: Union[TextIO, str]): """ Check that a file has the same content hash. """ file_digest = self._get_digest_function() with _possibly_open_file(fname, "rb") as f: digest = file_digest(f, self.algorithm) return self.digest == digest.hexdigest()
[docs] @dataclass class File(Header): """ A file with file path and a last modified timestamp. """ file: str timestamp: Optional[datetime.datetime] = field( default=None, metadata={ "description": "Last modified timestamp. If it's not specified," " then an attempt will be made to check on the file" " itself" }, ) hash: Optional[ContentHash] = None def __post_init__(self): """ Assigns a timestamp for file creation if not defined. """ Header.__post_init__(self) if self.timestamp is None: fname = pathlib.Path(self.file) if fname.exists(): self.timestamp = datetime.datetime.fromtimestamp(fname.stat().st_mtime)
[docs] class NotOrsoCompatibleFileError(ValueError): pass
def _read_header_data(file: Union[TextIO, str], validate: bool = False) -> Tuple[List[dict], list, str]: """ Reads the header and data contained within an ORSO file, parsing it into json dictionaries and numerical arrays. Parameters ---------- :param file: File to read. :param validate: Validates the file against the ORSO json schema. Requires that the jsonschema package be installed. :return: The tuple contains: - First item is a list of json dicts containing the parsed yaml header. This has to be processed further. - Second item is a Python list containing numpy arrays holding the reflectometry data in the file. It's contained in a list because each of the datasets may have a different number of columns. - Final item is the ORSO version string. """ with _possibly_open_file(file, "r") as fi: header = [] # collection of the numerical arrays data = [] _ds_lines = [] first_dataset = True for i, line in enumerate(fi.readlines()): if not line.startswith("#"): # ignore empty lines if line.strip() != "": _ds_lines.append(line) continue if line.startswith("# data_set") and first_dataset: header.append(line[1:]) first_dataset = False elif line.startswith("# data_set") and not first_dataset: # a new dataset is starting. Complete the previous dataset's # numerical array and start collecting the numbers for this # dataset _d = np.array([np.fromstring(v, dtype=float, sep=" ") for v in _ds_lines]) data.append(_d) _ds_lines = [] # append '---' to signify the start of a new yaml document # Subsequent datasets get parsed into a separate dictionary, # which can be used to synthesise new datasets from the first. header.append("---\n") header.append(line[1:]) else: header.append(line[1:]) # append the last numerical array _d = np.array([np.fromstring(v, dtype=float, sep=" ") for v in _ds_lines]) data.append(_d) yml = "".join(header) # first line of an ORSO file should have the magic string pattern = re.compile( r"^(# ORSO reflectivity data file \| ([0-9]+\.?[0-9]*|\.[0-9]+)" r" standard \| YAML encoding \| https://www\.reflectometry\.org/)$" ) if len(header) < 1 or not pattern.match(header[0].lstrip(" ")): raise NotOrsoCompatibleFileError("First line does not appear to match that of an ORSO file") version = re.findall(r"([0-9]+\.?[0-9]*|\.[0-9]+)+?", header[0])[0] dcts = yaml.safe_load_all(yml) # synthesise json dicts for each dataset from the first dataset, and # updates to the yaml. first_dct = next(dcts) dct_list = [_nested_update(deepcopy(first_dct), dct) for dct in dcts] dct_list.insert(0, first_dct) if validate: # requires jsonschema be installed _validate_header_data(dct_list) return dct_list, data, version def _validate_header_data(dct_list: List[dict]): """ Checks whether a json dictionary corresponds to a valid ORSO header. Obtain these dct_list by loading from _read_header_data first. :param dct_list: Dicts corresponding to parsed yaml headers from the ORT file. :raises ValueError: When the first four columns are not named correctly (i.e. :code:`'Qz'`, :code:`'R'`, :code:`'sR'`, :code:`'sQz'`). :raises ValueError: When the units for the :code:`'Qz'` column is not :code:`'1/angstrom'`. :raises ValueError: When the units for columns :code:`'Qz'` and :code:`'sQz'` are not the same. """ import jsonschema vi = sys.version_info if vi.minor < 7: warnings.warn("Validation not possible with Python 3.6 with 2020-12 json schema", ORSOSchemaWarning) pth = os.path.dirname(__file__) schema_pth = os.path.join(pth, "schema", "refl_header.schema.json") with open(schema_pth, "r") as f: schema = json.load(f) for dct in dct_list: jsonschema.validate(dct, schema) @contextmanager def _possibly_open_file(f: Union[TextIO, str], mode: str = "wb") -> Generator[TextIO, None, None]: """ Context manager for files. :param f: If `f` is a file, then yield the file. If `f` is a str then open the file and yield the newly opened file. On leaving this context manager the file is closed, if it was opened by this context manager (i.e. `f` was a string). :param modes: An optional string that specifies the mode in which the file is opened. :yields: On leaving the context manager the file is closed, if it was opened by this context manager. """ close_file = False if (hasattr(f, "read") and hasattr(f, "write")) or f is None: g = f else: g = open(f, mode) close_file = True yield g if close_file: g.close() def _todict(obj: Any, classkey: Any = None) -> dict: """ Recursively converts an object to a dict representation https://stackoverflow.com/questions/1036409 Licenced under CC BY-SA 4.0 :param obj: Object to convert to dict representation. :param classkey: Key to be assigned to class objects. :return: Dictionary representation of object. """ if isinstance(obj, dict): data = {} for k, v in obj.items(): data[k] = _todict(v, classkey) return data elif isinstance(obj, Enum): return obj.value elif hasattr(obj, "_ast"): return _todict(obj._ast()) elif hasattr(obj, "__iter__") and not isinstance(obj, str): return [_todict(v, classkey) for v in obj] elif hasattr(obj, "__dict__"): data = dict( [ (key, _todict(value, classkey)) for key, value in obj.__dict__.items() if not callable(value) and not key.startswith("_") ] ) if classkey is not None and hasattr(obj, "__class__"): data[classkey] = obj.__class__.__name__ return data else: return obj
[docs] def json_datetime_trap(obj): if isinstance(obj, datetime.datetime): return obj.isoformat("T") return obj
[docs] def enum_trap(obj): if isinstance(obj, Enum): return obj.value return obj
[docs] def nexus_value_converter(obj): for converter in (json_datetime_trap, enum_trap): obj = converter(obj) return obj
def _nested_update(d: dict, u: dict) -> dict: """ Nested dictionary update. :param d: Dictionary to be updated. :param u: Dictionary where updates should come from. :return: Updated dictionary. """ for k, v in u.items(): if isinstance(d, Mapping): if isinstance(v, Mapping): r = _nested_update(d.get(k, {}), v) d[k] = r else: d[k] = u[k] return d def _dict_diff(old: dict, new: dict) -> dict: """ Recursive find differences between two dictionaries. :param old: Original dictionary. :param new: New dictionary to find differences in. :return: Dictionary containing values present in :py:attr:`new` that are not in :py:attr:`old`. """ out = {} for key, value in new.items(): if key in old: if type(value) is dict: diff = _dict_diff(old[key], value) if diff == {}: continue else: out[key] = diff elif old[key] == value: continue else: out[key] = value else: out[key] = value for key in [ki for ki in old.keys() if ki not in new]: out[key] = None return out