Source code for mosaic.hdf5

# -*- coding: utf-8 -*-
"""HDF5 I/O

.. moduleauthor:: Konrad Hinsen

Any model implementing the Mosaic API can be written to a HDF5 file,
but will be implicitly converted to a :mod:`mosaic.array_model`
representation.  Data items read from a HDF5 file are always
represented by an :mod:`mosaic.array_model`.

The HDF5 storage layout is designed for clarity rather than
simplicity, the files should be as self-documenting as possible.

A common pattern for generating an HDF5 file is:

::

    with HDF5Store('molecule.h5', 'w') as writer:
       writer.store("universe", universe)
       writer.store("configuration1", configuration1)
       writer.store("configuration2", configuration2)

A common pattern for reading from an HDF5 file is:

::

    items = {}
    for id, data in HDF5Store('molecule.h5'):
       items[id] = data

"""

#-----------------------------------------------------------------------------
#       Copyright (C) 2013 The Mosaic Development Team
#
#  Distributed under the terms of the BSD License.  The full license is in
#  the file LICENSE.txt, distributed as part of this software.
#-----------------------------------------------------------------------------

import copy
import operator
import weakref

import h5py
import h5py.h5t
import h5py.h5d
import h5py.h5s
import h5py.version
import numpy as N

import mosaic.array_model
import mosaic.api as api
from mosaic.utility import MethodRegister
from mosaic.utility import ascii
from mosaic.utility import isstring
from mosaic.utility import py_str

import sys
del sys


if h5py.version.hdf5_version_tuple < (1, 8, 7):
    raise ValueError("HDF5 version >= 1.8.7 required")


def create_dataset(group, path, data):
    if isstring(data):
        ds = group.require_dataset(path, shape=(),
                                   dtype=h5py.special_dtype(vlen=bytes))
        ds[...] = data
    else:
        ds = group.require_dataset(path, shape=data.shape, dtype=data.dtype)
        if 0 not in data.shape:
            ds[...] = data
    return ds


[docs]class HDF5Store(object): """I/O handler for HDF5 files This class handles the translation of references between Mosaic data items in memory and HDF5 object references in a file. When writing data, references are allowed only to data items that have been written earlier using the same HDF5Store instance. An HDF5Store can be used like a file, in which case it must be closed after the last ``store`` or ``retrieve`` operation, or as a context manager in a with-statement. For reading, it also behaves like an iterator yielding ``(name, data_item)`` pairs. """ def __init__(self, hdf5_group_or_filename, file_mode=None): """ :param hdf5_group_or_filename: a group in an open HDF5 file, or a string interpreted as a file name :type hdf5_group_or_filename: str or ``h5py.Group`` :param file_mode: ``'r'`` or ``'w'``, used only with a filename argument :type file_mode: str """ if isstring(hdf5_group_or_filename): if file_mode is None: file_mode = 'r' self.root = h5py.File(hdf5_group_or_filename, file_mode) self._close = True else: api.validate_value(file_mode, [None], "file_mode") self.root = hdf5_group_or_filename self._close = False self._path_map = weakref.WeakKeyDictionary() self._data_map = weakref.WeakValueDictionary() self._factory = mosaic.array_model.Factory() def close(self): if self._close: self.root.file.close() self.root = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False def flush(self): self.root.file.flush() storage_handler = MethodRegister()
[docs] def store(self, path, data): """ :param path: a HDF5 path, relative to the group used by the HDF5Store :type path: str :param data: a Mosaic data item :type data: :class:`mosaic.api.MosaicDataItem` """ api.validate_type(path, str, "path") if path[0] == "/": raise ValueError("absolute paths not allowed") api.validate_type(data, api.MosaicDataItem, "data") handler = self.storage_handler[type(data)] if handler is None: raise TypeError("Storage of %s not yet implemented" % str(type(data))) handler(self, path, data) self._register_data_item(path, data)
[docs] def retrieve(self, path_or_node): """ :param path_or_node: a HDF5 path, relative to the group used by the HDF5Store, or an HDF5 node :type path_or_node: str or h5py.Node :returns: a Mosaic data item :rtype: :class:`mosaic.api.MosaicDataItem` """ if isinstance(path_or_node, h5py.h5r.Reference): path_or_node = self.root[path_or_node] if isinstance(path_or_node, str): path = path_or_node else: # Group, Dataset, or classes with the same interface path = path_or_node.name if path[0] == '/': if path.startswith(self.root.name): path = path[len(self.root.name):] if path[0] == '/': path = path[1:] else: raise ValueError("path name not inside root group") data = self._get_data(path) if data is not None: return data node = self.root[path] label = py_str(self._read_stamp(node)) if label not in self._node_types: raise ValueError("Undefined node type %s" % label) handler = self.storage_handler[label] if handler is None: raise ValueError("Retrieval of node type %s not yet implemented" % label) data = handler(self, node) self._register_data_item(path, data) return data
_node_types = ['universe', 'configuration', 'property', 'label', 'selection'] def __iter__(self): # First pass: universes for name in self.root: node = self.root[name] if node.attrs.get('DATA_MODEL', None) == ascii('MOSAIC') \ and node.attrs['MOSAIC_DATA_TYPE'] == ascii('universe'): yield name, self.retrieve(node) # Second pass: anything else for name in self.root: node = self.root[name] if node.attrs.get('DATA_MODEL', None) == ascii('MOSAIC') \ and node.attrs['MOSAIC_DATA_TYPE'] != ascii('universe'): yield name, self.retrieve(node) def recursive_iterator(self, group=None): if group is None: group = self.root # First pass: universes for name in group: node = group[name] if node.attrs.get('DATA_MODEL', None) == ascii('MOSAIC') \ and node.attrs['MOSAIC_DATA_TYPE'] == ascii('universe'): yield name, self.retrieve(node) if isinstance(node, h5py.Group): for gname, gnode in self.recursive_iterator(node): yield name+'/'+gname, gnode # Second pass: anything else for name in group: node = group[name] if node.attrs.get('DATA_MODEL', None) == ascii('MOSAIC') \ and node.attrs['MOSAIC_DATA_TYPE'] != ascii('universe'): yield name, self.retrieve(node) if isinstance(node, h5py.Group): for gname, gnode in self.recursive_iterator(node): yield name+'/'+gname, gnode def _stamp(self, node, label): node.attrs['DATA_MODEL'] = ascii('MOSAIC') node.attrs['DATA_MODEL_MAJOR_VERSION'] = api.MOSAIC_VERSION[0] node.attrs['DATA_MODEL_MINOR_VERSION'] = api.MOSAIC_VERSION[1] node.attrs['MOSAIC_DATA_TYPE'] = ascii(label) def _read_stamp(self, node): if node.attrs['DATA_MODEL'] != ascii('MOSAIC'): raise ValueError("Node %s not an Mosaic data item" % str(node)) version = (node.attrs['DATA_MODEL_MAJOR_VERSION'], node.attrs['DATA_MODEL_MINOR_VERSION']) if version[0] > api.MOSAIC_VERSION[0] \ or version[1] > api.MOSAIC_VERSION[1]: raise ValueError("HDF5 data is for version %d.%d, " "software is version %d.%d" % (version + api.MOSAIC_VERSION)) return node.attrs['MOSAIC_DATA_TYPE'] def _register_data_item(self, path, data_item): self._data_map[path] = data_item self._path_map[data_item] = path def _get_path(self, data_item): return self._path_map.get(data_item, None) def _get_data(self, path): return self._data_map.get(path, None) # # Store data in HDF5 file # @storage_handler(api.MosaicUniverse) def _store_universe(self, path, universe): group = self.root.require_group(path) try: # ActivePapers support group.mark_as_data_item() except AttributeError: pass self._stamp(group, 'universe') # Convert the universe to an array_model universe = self._factory(universe) # Create HDF5 datasets create_dataset(group, 'cell_shape', data=universe.cell_shape) create_dataset(group, 'symmetry_transformations', data=N.array(universe._symmetry_transformations)) create_dataset(group, 'convention', data=universe.convention) create_dataset(group, 'fragments', data=universe._fragment_array) create_dataset(group, 'atoms', data=universe._atom_array) create_dataset(group, 'bonds', data=universe._bond_array) create_dataset(group, 'molecules', data=universe._molecule_array) if len(universe._polymer_array) > 0: create_dataset(group, 'polymers', data=universe._polymer_array) symbols = universe._symbols symbol_table = group.require_dataset('symbols', shape=(len(symbols),), dtype=h5py.special_dtype(vlen=bytes)) for i, s in enumerate(symbols): symbol_table[i] = ascii(s) @storage_handler(api.MosaicConfiguration) def _store_configuration(self, path, configuration): universe = configuration.universe universe_path = self._get_path(universe) if universe_path is None: raise IOError("universe must be stored first") group = self.root.require_group(path) try: # ActivePapers support group.mark_as_data_item() except AttributeError: pass self._stamp(group, 'configuration') group.attrs['universe'] = self.root[universe_path].ref ncellparams = N.multiply.reduce(universe.cell_parameter_array_shape) if ncellparams > 0: create_dataset(group, 'cell_parameters', data=configuration.cell_parameters) arr = N.ascontiguousarray(configuration.positions) positions = group.require_dataset('positions', shape=(len(arr),), dtype='(3,)f%d' % arr.dtype.itemsize) # There doesn't seem to be any way to write this array # using high-level operations, so we use the low-level access. mtype = h5py.h5t.py_create(positions.id.dtype) mspace = h5py.h5s.create_simple(positions.shape) fspace = positions.id.get_space() positions.id.write(mspace, fspace, arr, mtype) @storage_handler(api.MosaicProperty) def _store_property(self, path, property): universe = property.universe universe_path = self._get_path(universe) if universe_path is None: raise IOError("universe must be stored first") element_shape = property.data.shape[1:] if element_shape: dtype = N.dtype((property.data.dtype, element_shape)) else: dtype = property.data.dtype arr = N.ascontiguousarray(property.data) ds = self.root.require_dataset(path, shape=(len(arr),), dtype=dtype) self._stamp(ds, 'property') ds.attrs['universe'] = self.root[universe_path].ref ds.attrs['name'] = ascii(property.name) ds.attrs['units'] = ascii(property.units) ds.attrs['property_type'] = ascii(property.type) # There doesn't seem to be any way to write this array # using high-level operations, so we use the low-level access. mtype = h5py.h5t.py_create(ds.id.dtype) mspace = h5py.h5s.create_simple(ds.shape) fspace = ds.id.get_space() ds.id.write(mspace, fspace, arr, mtype) @storage_handler(api.MosaicLabel) def _store_label(self, path, label): universe = label.universe label = self._factory(label) universe_path = self._get_path(universe) if universe_path is None: raise IOError("universe must be stored first") ds = create_dataset(self.root, path, data=label._string_array) self._stamp(ds, 'label') ds.attrs['universe'] = self.root[universe_path].ref ds.attrs['name'] = ascii(label.name) ds.attrs['label_type'] = ascii(label.type) @storage_handler(api.MosaicSelection) def _store_selection(self, path, selection): universe = selection.universe universe_path = self._get_path(universe) if universe_path is None: raise IOError("universe must be stored first") ds = create_dataset(self.root, path, N.ascontiguousarray(selection.indices)) ds.attrs['universe'] = self.root[universe_path].ref ds.attrs['selection_type'] = ascii(selection.type) self._stamp(ds, 'selection') # # Retrieve data from HDF5 file # @storage_handler('universe') def _retrieve_universe(self, node): # Work around a restriction/bug in h5py that prevents reading # arrays of length zero. if node['symmetry_transformations'].shape[0] == 0: st = N.zeros((0,), dtype=node['symmetry_transformations'].dtype) else: st = node['symmetry_transformations'][...] return mosaic.array_model.Universe.from_arrays( node['atoms'][...], node['bonds'][...], node['fragments'][...], node['polymers'][...] if 'polymers' in node else None, node['molecules'][...], tuple(py_str(s) for s in node['symbols'][...]), st, py_str(node['cell_shape'][()]), py_str(node['convention'][()])) @storage_handler('configuration') def _retrieve_configuration(self, node): universe = self.retrieve(node.attrs['universe']) positions = node['positions'][...] if 'cell_parameters' in node: ds = node['cell_parameters'] cell_parameters = N.empty(ds.shape, ds.dtype) cell_parameters[...] = ds[...] else: assert N.product(universe.cell_parameter_array_shape) == 0 cell_parameters = N.empty(universe.cell_parameter_array_shape, positions.dtype) return mosaic.array_model.Configuration(universe, positions, cell_parameters) @storage_handler('property') def _retrieve_property(self, node): universe = self.retrieve(node.attrs['universe']) data = node[...] name = py_str(node.attrs['name']) units = py_str(node.attrs['units']) property_type = py_str(node.attrs['property_type']) return mosaic.array_model.Property(universe, name, units, data, property_type) @storage_handler('label') def _retrieve_label(self, node): universe = self.retrieve(node.attrs['universe']) data = node[...] name = py_str(node.attrs['name']) label_type = py_str(node.attrs['label_type']) return mosaic.array_model.Label.from_arrays(universe, name, data, label_type) @storage_handler('selection') def _retrieve_selection(self, node): universe = self.retrieve(node.attrs['universe']) indices = node[...] selection_type = py_str(node.attrs['selection_type']) return mosaic.array_model.Selection(universe, indices, selection_type)