"""
Implementation of the base classes for the ORSO header.
"""
import datetime
import json
import os.path
import pathlib
import re
import sys
import warnings
from collections.abc import Mapping
from contextlib import contextmanager
from copy import deepcopy
from enum import Enum
from inspect import isclass
from typing import Any, Dict, Generator, List, Optional, TextIO, Tuple, Union
import numpy as np
import yaml
import yaml.constructor
# typing stuff introduced in python 3.8
try:
from typing import Literal, get_args, get_origin
except ImportError:
from .typing_backport import Literal, get_args, get_origin
from ..dataclasses import (_FIELD, _FIELD_INITVAR, _FIELDS, _HAS_DEFAULT_FACTORY, _POST_INIT_NAME, MISSING, _create_fn,
_field_init, _init_param, _set_new_attribute, dataclass, field, fields)
def _noop(self, *args, **kw):
pass
yaml.emitter.Emitter.process_tag = _noop
# make sure that datetime strings get loaded as str not datetime instances
yaml.constructor.SafeConstructor.yaml_constructors[
"tag:yaml.org,2002:timestamp"
] = yaml.constructor.SafeConstructor.yaml_constructors["tag:yaml.org,2002:str"]
def _custom_init_fn(fieldsarg, frozen, has_post_init, self_name, globals):
"""
_init_fn from dataclasses adapted to accept additional keywords.
See dataclasses source for comments on code.
"""
seen_default = False
for f in fieldsarg:
if f.init:
if not (f.default is MISSING and f.default_factory is MISSING):
seen_default = True
elif seen_default:
raise TypeError(f"non-default argument {f.name!r} " "follows default argument")
locals = {f"_type_{f.name}": f.type for f in fieldsarg}
locals.update({"MISSING": MISSING, "_HAS_DEFAULT_FACTORY": _HAS_DEFAULT_FACTORY})
body_lines = []
for f in fieldsarg:
line = _field_init(f, frozen, locals, self_name)
if line:
body_lines.append(line)
if has_post_init:
params_str = ",".join(f.name for f in fieldsarg if f._field_type is _FIELD_INITVAR)
body_lines.append(f"{self_name}.{_POST_INIT_NAME}({params_str})")
# processing of additional user keyword arguments
body_lines += ["for k,v in user_kwds.items():", " setattr(self, k, v)"]
return _create_fn(
"__init__",
[self_name] + [_init_param(f) for f in fieldsarg if f.init] + ["**user_kwds"],
body_lines,
locals=locals,
globals=globals,
return_type=None,
)
[docs]def orsodataclass(cls: type):
attrs = cls.__dict__
bases = cls.__bases__
if "__annotations__" in attrs and len([k for k in attrs["__annotations__"].keys() if not k.startswith("_")]) > 0:
# only applies to dataclass children of Header
# add optional comment attribute, needs to come last
attrs["__annotations__"]["comment"] = Optional[str]
setattr(cls, "comment", field(default=None))
# create the _orso_optional attribute
orso_optionals = []
for fname, ftype in attrs["__annotations__"].items():
if type(None) in get_args(ftype):
orso_optionals.append(fname)
for base in bases:
if hasattr(base, "_orso_optionals"):
orso_optionals += getattr(base, "_orso_optionals")
setattr(cls, "_orso_optionals", orso_optionals)
out = dataclass(cls, repr=False, init=False)
fieldsarg = getattr(out, _FIELDS)
# Generate custom __init__ method that allows arbitrary extra keyword arguments
has_post_init = hasattr(out, _POST_INIT_NAME)
# Include InitVars and regular fields (so, not ClassVars).
flds = [f for f in fieldsarg.values() if f._field_type in (_FIELD, _FIELD_INITVAR)]
init_fun = _custom_init_fn(flds, False, has_post_init, "self", globals())
_set_new_attribute(out, "__init__", init_fun)
return out
else:
return cls
[docs]class ORSOResolveError(ValueError):
pass
[docs]class OrsoDumper(yaml.SafeDumper):
[docs] def represent_data(self, data):
if hasattr(data, "yaml_representer"):
return data.yaml_representer(self)
elif isinstance(data, datetime.datetime):
value = data.isoformat("T")
return super().represent_scalar("tag:yaml.org,2002:timestamp", value)
elif np.isscalar(data) and hasattr(data, "item"):
# If data is a numpy scalar, convert to a python object
return super().represent_data(data.item())
else:
return super().represent_data(data)
unit_registry = None
[docs]@orsodataclass
class ErrorValue(Header):
"""
Information about errors on a value.
"""
error_value: float
error_type: Optional[Literal["uncertainty", "resolution"]] = None
value_is: Optional[Literal["sigma", "FWHM"]] = None
distribution: Optional[Literal["gaussian", "triangular", "uniform", "lorentzian"]] = None
yaml_representer = Header.yaml_representer_compact
@property
def sigma(self):
"""
Return value converted to standard deviation.
The conversion factors can be found in common statistics and experimental physics text books or derived
manually solving the variance definition integral.
(e.g. Dekking, Michel (2005).
A modern introduction to probability and statistics : understanding why and how.
Springer, London, UK:)
Values and some references available on Wikipedia, too.
"""
if self.value_is == "FWHM":
from math import log, sqrt
value = self.error_value
if self.distribution in ["gaussian", None]:
# Solving for the gaussian function = 0.5 yields:
return value / (2.0 * sqrt(2.0 * log(2.0)))
elif self.distribution == "triangular":
# See solution of integral e.g. https://math.stackexchange.com/questions/4271314/
# what-is-the-proof-for-variance-of-triangular-distribution/4273147#4273147
# setting c=0 and a=b=FWHM for the symmetric triangle around 0.
return value / sqrt(6.0)
elif self.distribution == "uniform":
# Variance is just the integral of x² from -0.5*FWHM to 0.5*FWHM => 1/12.
return value / sqrt(12.0)
elif self.distribution == "lorentzian":
raise ValueError("Lorentzian distribution does not have a sigma value")
else:
raise NotImplementedError(f"Unknown distribution {self.distribution}")
else:
# Value is already sigma
return self.error_value
[docs]@orsodataclass
class Value(Header):
"""
A value or list of values with an optional unit.
"""
magnitude: float
unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"})
error: Optional[ErrorValue] = None
yaml_representer = Header.yaml_representer_compact
def __repr__(self):
"""
Make representation more readability by removing names of default arguments.
"""
output = super().__repr__()
output = output.replace("magnitude=", "")
output = output.replace("unit=", "")
return output
[docs] def as_unit(self, output_unit):
"""
Returns the value as converted to the given unit.
"""
if output_unit == self.unit:
return self.magnitude
global unit_registry
if unit_registry is None:
import pint
unit_registry = pint.UnitRegistry()
val = self.magnitude * unit_registry(self.unit)
return val.to(output_unit).magnitude
[docs]@orsodataclass
class ComplexValue(Header):
"""
A value or list of values with an optional unit.
"""
real: float
imag: Optional[float] = None
unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"})
error: Optional[ErrorValue] = None
yaml_representer = Header.yaml_representer_compact
def __repr__(self):
"""
Make representation more readability by removing names of default arguments.
"""
output = super().__repr__()
output = output.replace("real=", "")
output = output.replace("imag=", "")
output = output.replace("unit=", "")
return output
[docs] def as_unit(self, output_unit):
"""
Returns the complex value as converted to the given unit.
"""
if self.imag is None:
value = self.real + 0j
else:
value = self.real + 1j * self.imag
if output_unit == self.unit:
return value
global unit_registry
if unit_registry is None:
import pint
unit_registry = pint.UnitRegistry()
val = value * unit_registry(self.unit)
return val.to(output_unit).magnitude
[docs]@orsodataclass
class ValueRange(Header):
"""
A range or list of ranges with mins, maxs, and an optional unit.
"""
min: float
max: float
unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"})
yaml_representer = Header.yaml_representer_compact
[docs] def as_unit(self, output_unit):
"""
Returns a (min, max) tuple of values as converted to the given unit.
"""
if output_unit == self.unit:
return (self.min, self.max)
global unit_registry
if unit_registry is None:
import pint
unit_registry = pint.UnitRegistry()
vmin = self.min * unit_registry(self.unit)
vmax = self.max * unit_registry(self.unit)
return (vmin.to(output_unit).magnitude, vmax.to(output_unit).magnitude)
[docs]@orsodataclass
class ValueVector(Header):
"""
A vector or list of vectors with an optional unit.
For vectors relating to the sample, such as polarisation,
the follow definitions are used.
:param x: is defined as parallel to the radiation beam, positive going
with the beam direction.
:param y: is defined from the other two based on the right hand rule.
:param z: is defined as normal to the sample surface, positive direction
in scattering direction.
:param unit: SI unit string.
"""
x: float
y: float
z: float
unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"})
error: Optional[ErrorValue] = None
yaml_representer = Header.yaml_representer_compact
[docs] def as_unit(self, output_unit):
"""
Returns a (x, y, z) tuple of values as converted to the given unit.
"""
if output_unit == self.unit:
return (self.x, self.y, self.z)
global unit_registry
if unit_registry is None:
import pint
unit_registry = pint.UnitRegistry()
vx = self.x * unit_registry(self.unit)
vy = self.y * unit_registry(self.unit)
vz = self.z * unit_registry(self.unit)
return (vx.to(output_unit).magnitude, vy.to(output_unit).magnitude, vz.to(output_unit).magnitude)
[docs]@orsodataclass
class Person(Header):
"""
Information about a person, including name, affilation(s), and contact
information.
"""
name: str
affiliation: str
contact: Optional[str] = field(default=None, metadata={"description": "Contact (email) address"})
[docs]@orsodataclass
class Column(Header):
"""
Information about a data column.
"""
name: str
unit: Optional[str] = field(default=None, metadata={"description": "SI unit string"})
physical_quantity: Optional[str] = field(
default=None, metadata={"physical_quantity": "A description of the column"}
)
yaml_representer = Header.yaml_representer_compact
[docs]@orsodataclass
class ErrorColumn(Header):
"""
Information about a data column.
"""
error_of: str
error_type: Optional[Literal["uncertainty", "resolution"]] = None
value_is: Optional[Literal["sigma", "FWHM"]] = None
distribution: Optional[Literal["gaussian", "triangular", "uniform", "lorentzian"]] = None
yaml_representer = Header.yaml_representer_compact
@property
def name(self):
"""
A convenience property to allow programs to get a valid name attribute for any column.
"""
return f"s{self.error_of}"
@property
def to_sigma(self):
"""
The multiplicative factor needed to convert a FWHM to sigma.
The conversion factors can be found in common statistics and experimental physics text books or derived
manually solving the variance definition integral.
(e.g. Dekking, Michel (2005).
A modern introduction to probability and statistics : understanding why and how.
Springer, London, UK:)
Values and some references available on Wikipedia, too.
"""
if self.value_is == "FWHM":
from math import log, sqrt
if self.distribution in ["gaussian", None]:
# Solving for the gaussian function = 0.5 yields:
return 1.0 / (2.0 * sqrt(2.0 * log(2.0)))
elif self.distribution == "triangular":
# See solution of integral e.g. https://math.stackexchange.com/questions/4271314/
# what-is-the-proof-for-variance-of-triangular-distribution/4273147#4273147
# setting c=0 and a=b=FWHM for the symmetric triangle around 0.
return 1.0 / sqrt(6.0)
elif self.distribution == "uniform":
# Variance is just the integral of x² from -0.5*FWHM to 0.5*FWHM => 1/12.
return 1.0 / sqrt(12.0)
elif self.distribution == "lorentzian":
raise ValueError("Lorentzian distribution does not have a sigma value")
else:
raise NotImplementedError(f"Unknown distribution {self.distribution}")
else:
# Value is already sigma
return 1.0
[docs]@orsodataclass
class File(Header):
"""
A file with file path and a last modified timestamp.
"""
file: str
timestamp: Optional[datetime.datetime] = field(
default=None,
metadata={
"description": "Last modified timestamp. If it's not specified,"
" then an attempt will be made to check on the file"
" itself"
},
)
def __post_init__(self):
"""
Assigns a timestamp for file creation if not defined.
"""
Header.__post_init__(self)
if self.timestamp is None:
fname = pathlib.Path(self.file)
if fname.exists():
self.timestamp = datetime.datetime.fromtimestamp(fname.stat().st_mtime)
[docs]class NotOrsoCompatibleFileError(ValueError):
pass
def _read_header_data(file: Union[TextIO, str], validate: bool = False) -> Tuple[List[dict], list, str]:
"""
Reads the header and data contained within an ORSO file, parsing it into
json dictionaries and numerical arrays.
Parameters
----------
:param file: File to read.
:param validate: Validates the file against the ORSO json schema.
Requires that the jsonschema package be installed.
:return: The tuple contains:
- First item is a list of json dicts containing the parsed yaml
header. This has to be processed further.
- Second item is a Python list containing numpy arrays holding the
reflectometry data in the file. It's contained in a list because
each of the datasets may have a different number of columns.
- Final item is the ORSO version string.
"""
with _possibly_open_file(file, "r") as fi:
header = []
# collection of the numerical arrays
data = []
_ds_lines = []
first_dataset = True
for i, line in enumerate(fi.readlines()):
if not line.startswith("#"):
# ignore empty lines
if line.strip() != "":
_ds_lines.append(line)
continue
if line.startswith("# data_set") and first_dataset:
header.append(line[1:])
first_dataset = False
elif line.startswith("# data_set") and not first_dataset:
# a new dataset is starting. Complete the previous dataset's
# numerical array and start collecting the numbers for this
# dataset
_d = np.array([np.fromstring(v, dtype=float, sep=" ") for v in _ds_lines])
data.append(_d)
_ds_lines = []
# append '---' to signify the start of a new yaml document
# Subsequent datasets get parsed into a separate dictionary,
# which can be used to synthesise new datasets from the first.
header.append("---\n")
header.append(line[1:])
else:
header.append(line[1:])
# append the last numerical array
_d = np.array([np.fromstring(v, dtype=float, sep=" ") for v in _ds_lines])
data.append(_d)
yml = "".join(header)
# first line of an ORSO file should have the magic string
pattern = re.compile(
r"^(# ORSO reflectivity data file \| ([0-9]+\.?[0-9]*|\.[0-9]+)"
r" standard \| YAML encoding \| https://www\.reflectometry\.org/)$"
)
if len(header) < 1 or not pattern.match(header[0].lstrip(" ")):
raise NotOrsoCompatibleFileError("First line does not appear to match that of an ORSO file")
version = re.findall(r"([0-9]+\.?[0-9]*|\.[0-9]+)+?", header[0])[0]
dcts = yaml.safe_load_all(yml)
# synthesise json dicts for each dataset from the first dataset, and
# updates to the yaml.
first_dct = next(dcts)
dct_list = [_nested_update(deepcopy(first_dct), dct) for dct in dcts]
dct_list.insert(0, first_dct)
if validate:
# requires jsonschema be installed
_validate_header_data(dct_list)
return dct_list, data, version
def _validate_header_data(dct_list: List[dict]):
"""
Checks whether a json dictionary corresponds to a valid ORSO header.
Obtain these dct_list by loading from _read_header_data first.
:param dct_list: Dicts corresponding to parsed yaml headers from the ORT file.
:raises ValueError: When the first four columns are not named correctly
(i.e. :code:`'Qz'`, :code:`'R'`, :code:`'sR'`, :code:`'sQz'`).
:raises ValueError: When the units for the :code:`'Qz'` column is not
:code:`'1/angstrom'`.
:raises ValueError: When the units for columns :code:`'Qz'` and
:code:`'sQz'` are not the same.
"""
import jsonschema
req_cols = ["Qz", "R", "sR", "sQz"]
acceptable_Qz_units = ["1/angstrom", "1/nm"]
pth = os.path.dirname(__file__)
schema_pth = os.path.join(pth, "schema", "refl_header.schema.json")
with open(schema_pth, "r") as f:
schema = json.load(f)
for dct in dct_list:
jsonschema.validate(dct, schema)
# Validate the column names. Ideally this is done with the jsonschema,
# but it's difficult to create a schema from the 'default' orsopy
# dataclasses that does that. It is possible but it requires extra
# column objects like Qz_column, R_column, etc.
cols = dct["columns"]
ncols = min(4, len(cols))
col_names = [col["name"] if "name" in col else "s" + col["error_of"] for col in cols]
units = [col.get("unit") for col in cols]
if col_names[:ncols] != req_cols[:ncols]:
raise ValueError("The first four columns should be named" " 'Qz', 'R', ['sR', ['sQz']]")
if units[0] not in acceptable_Qz_units:
raise ValueError("The Qz column must have units of '1/angstrom'" " or '1/nm'")
@contextmanager
def _possibly_open_file(f: Union[TextIO, str], mode: str = "wb") -> Generator[TextIO, None, None]:
"""
Context manager for files.
:param f: If `f` is a file, then yield the file. If `f` is a str then
open the file and yield the newly opened file. On leaving this
context manager the file is closed, if it was opened by this
context manager (i.e. `f` was a string).
:param modes: An optional string that specifies the mode in which
the file is opened.
:yields: On leaving the context manager the file is closed, if it
was opened by this context manager.
"""
close_file = False
if (hasattr(f, "read") and hasattr(f, "write")) or f is None:
g = f
else:
g = open(f, mode)
close_file = True
yield g
if close_file:
g.close()
def _todict(obj: Any, classkey: Any = None) -> dict:
"""
Recursively converts an object to a dict representation
https://stackoverflow.com/questions/1036409
Licenced under CC BY-SA 4.0
:param obj: Object to convert to dict representation.
:param classkey: Key to be assigned to class objects.
:return: Dictionary representation of object.
"""
if isinstance(obj, dict):
data = {}
for (k, v) in obj.items():
data[k] = _todict(v, classkey)
return data
elif isinstance(obj, Enum):
return obj.value
elif hasattr(obj, "_ast"):
return _todict(obj._ast())
elif hasattr(obj, "__iter__") and not isinstance(obj, str):
return [_todict(v, classkey) for v in obj]
elif hasattr(obj, "__dict__"):
data = dict(
[
(key, _todict(value, classkey))
for key, value in obj.__dict__.items()
if not callable(value) and not key.startswith("_")
]
)
if classkey is not None and hasattr(obj, "__class__"):
data[classkey] = obj.__class__.__name__
return data
else:
return obj
def _nested_update(d: dict, u: dict) -> dict:
"""
Nested dictionary update.
:param d: Dictionary to be updated.
:param u: Dictionary where updates should come from.
:return: Updated dictionary.
"""
for k, v in u.items():
if isinstance(d, Mapping):
if isinstance(v, Mapping):
r = _nested_update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
else:
d = {k: u[k]}
return d
def _dict_diff(old: dict, new: dict) -> dict:
"""
Recursive find differences between two dictionaries.
:param old: Original dictionary.
:param new: New dictionary to find differences in.
:return: Dictionary containing values present in :py:attr:`new`
that are not in :py:attr:`old`.
"""
out = {}
for key, value in new.items():
if key in old:
if type(value) is dict:
diff = _dict_diff(old[key], value)
if diff == {}:
continue
else:
out[key] = diff
elif old[key] == value:
continue
else:
out[key] = value
else:
out[key] = value
for key in [ki for ki in old.keys() if ki not in new]:
out[key] = None
return out