# -*- coding: utf-8 -*-
"""XML I/O
.. moduleauthor:: Konrad Hinsen
The XML representation of Mosaic data consists of a single element
``<mosaic version="x.y">...</mosaic>`` that can contain any number of
Mosaic data items, each of which is identified by a unique id.
The classes :class:`XMLWriter` and :class:`XMLReader` handle the
translation between Mosaic data items in memory that can reference
each other and XML elements that reference each other by id.
A common pattern for generating an XML file is:
::
with XMLWriter('molecule.xml') as writer:
writer.store("universe", universe)
writer.store("configuration1", configuration1)
writer.store("configuration2", configuration2)
A common pattern for reading from an XML file is:
::
items = {}
for id, data in XMLReader('molecule.xml'):
items[id] = data
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The Mosaic Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file LICENSE.txt, distributed as part of this software.
#-----------------------------------------------------------------------------
try:
import xml.etree.cElementTree as ET
except IOError:
import xml.etree.ElementTree as ET
import numpy as N
import immutable.np as IN
import mosaic.api as api
import mosaic.immutable_model as im
from mosaic.utility import MethodRegister
from mosaic.utility import uint_for_max_value
from mosaic.utility import isstring
from mosaic.utility import xml_encoding
# Number formatting
def n2s(x, dp=False):
if isinstance(x, int):
return str(x)
else:
if dp:
return "{0:.20g}".format(x)
else:
return "{0:.10g}".format(x)
# Data type formatting
def t2s(t):
return {N.dtype(N.int8): "int8",
N.dtype(N.int16): "int16",
N.dtype(N.int32): "int32",
N.dtype(N.int32): "int32",
N.dtype(N.uint8): "uint8",
N.dtype(N.uint16): "uint16",
N.dtype(N.uint32): "uint32",
N.dtype(N.uint32): "uint32",
N.dtype(N.float32): "float32",
N.dtype(N.float64): "float64",
N.dtype(N.bool): "boolean"
}[t]
def s2t(s):
return {"int8": N.int8,
"int16": N.int16,
"int32": N.int32,
"int32": N.int32,
"uint8": N.uint8,
"uint16": N.uint16,
"uint32": N.uint32,
"uint32": N.uint32,
"float32": N.float32,
"float64": N.float64,
"boolean": N.bool
}[s]
class XMLStore(object):
def __init__(self):
self._id_map = {}
self._data_map = {}
def _register_data_item(self, xml_id, data_item):
self._data_map[xml_id] = data_item
self._id_map[data_item] = xml_id
def _get_id(self, data_item):
return self._id_map.get(data_item, None)
def _get_data(self, xml_id):
return self._data_map.get(xml_id, None)
# Make XMLStores work as context managers
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
[docs]class XMLWriter(XMLStore):
"""Output handler for XML files
This class handles the translation of references between Mosaic
data items in memory to XML references by id. References are
allowed only to data items that have been written earlier using
the same XMLWriter instance.
An XMLWriter can be used like a file, in which case it must be
closed after the last ``store`` operation, or as a context manager
in a with-statement.
"""
def __init__(self, xml_file):
"""
:param xml_file: a writeable file object, or a string
interpreted as a file name
:type xml_file: str or file-like
"""
XMLStore.__init__(self)
if isstring(xml_file):
self.file = open(xml_file, 'w')
self._close_file = True
else:
# assume it is a file-like object
self.file = xml_file
self._close_file = False
# Keep a set of already used ids in order to check they remain unique
self._xml_ids = set()
# Start with the XML declaration
self.file.write('<?xml version="1.0" encoding="utf-8"?>')
# Open the top-level element
self.file.write('<mosaic version="%d.%d">' % api.MOSAIC_VERSION)
def close(self):
# Close the top-level element
self.file.write("</mosaic>")
# Close the underlying file if we own it
if self._close_file:
self.file.close()
# No more output after this
self.file = None
# Clear cache
self._id_map = None
self._data_map = None
storage_handler = MethodRegister()
[docs] def store(self, xml_id, data):
"""
:param xml_id: the id of the XML element representing the data item
:type xml_id: str
:param data: a Mosaic data item
:type data: :class:`mosaic.api.MosaicDataItem`
"""
api.validate_type(xml_id, str, "xml_id")
api.validate_type(data, api.MosaicDataItem, "data")
xml_id = xml_id
if xml_id in self._xml_ids:
raise ValueError("XML ID %s has already been used" % xml_id)
handler = self.storage_handler[type(data)]
if handler is None:
raise TypeError("Storage of %s not yet implemented"
% str(type(data)))
handler(self, xml_id, data)
self._register_data_item(xml_id, data)
self._xml_ids.add(xml_id)
def _write(self, element):
ET.ElementTree(element).write(self.file,
encoding=xml_encoding)
#
# Storage handlers
#
@storage_handler(api.MosaicUniverse)
def _store_universe(self, xml_id, universe):
el = ET.Element('universe')
el.set("id", xml_id)
el.set("cell_shape", universe.cell_shape)
el.set("convention", universe.convention)
st = universe.symmetry_transformations
if len(st) > 0:
el_st = ET.SubElement(el, 'symmetry_transformations')
for rot, trans in st:
el_st.append(self._symmetry_el(rot, trans))
el_molecules = ET.SubElement(el, 'molecules')
for fragment, count in universe.molecules:
el_m = ET.Element('molecule', count=str(count))
el_m.append(self._fragment_element(fragment))
el_molecules.append(el_m)
self._write(el)
def _symmetry_el(self, rot, trans):
el = ET.Element('transformation')
el_rot = ET.SubElement(el, 'rotation')
el_rot.text = ' '.join(n2s(x) for x in rot.flat)
el_trans = ET.SubElement(el, 'translation')
el_trans.text = ' '.join(n2s(x) for x in trans.flat)
return el
def _fragment_element(self, fragment):
el = ET.Element('fragment',
label=fragment.label,
species=fragment.species)
if fragment.is_polymer:
el.set('polymer_type', fragment.polymer_type)
if len(fragment.fragments) > 0:
el_fragments = ET.SubElement(el, 'fragments')
for f in fragment.fragments:
el_fragments.append(self._fragment_element(f))
if len(fragment.atoms) > 0:
el_atoms = ET.SubElement(el, 'atoms')
for a in fragment.atoms:
el_atom = ET.Element('atom',
label=a.label,
type=a.type,
name=a.name)
if a.number_of_sites != 1:
el_atom.set('nsites', str(a.number_of_sites))
el_atoms.append(el_atom)
if len(fragment.bonds) > 0:
el_bonds = ET.SubElement(el, 'bonds')
for a1, a2, order in fragment.bonds:
el_bonds.append(ET.Element('bond',
atoms=a1 + ' ' + a2,
order=order))
return el
@storage_handler(api.MosaicConfiguration)
def _store_configuration(self, xml_id, configuration):
universe = configuration.universe
universe_id = self._get_id(universe)
if universe_id is None:
raise IOError("universe must be stored first")
el = ET.Element('configuration')
el.set("id", xml_id)
el.append(ET.Element('universe', ref=universe_id))
float_type = t2s(configuration.positions.dtype)
dp = float_type == "float64"
if N.product(configuration.cell_parameters.shape) != 0:
shape_str = ' '.join(str(x)
for x in configuration.cell_parameters.shape)
el_cp = ET.Element('cell_parameters', shape=shape_str)
el_cp.text = ' '.join(n2s(p, dp)
for p in configuration.cell_parameters.flat)
el.append(el_cp)
el_pos = ET.Element('positions', type=float_type)
el_pos.text = ' '.join(' '.join(n2s(x, dp) for x in p)
for p in configuration.positions)
el.append(el_pos)
self._write(el)
@storage_handler(api.MosaicProperty)
def _store_property(self, xml_id, property):
universe = property.universe
universe_id = self._get_id(universe)
if universe_id is None:
raise IOError("universe must be stored first")
el = ET.Element(property.type + '_property')
el.set("id", xml_id)
el.set("name", property.name)
el.set("units", property.units)
el.append(ET.Element('universe', ref=universe_id))
shape_str = ' '.join(str(x) for x in property.element_shape)
el_data = ET.Element('data',
shape=shape_str,
type=t2s(property.data.dtype))
dp = property.data.dtype == N.dtype(N.float64)
el_data.text = ' '.join(' '.join(n2s(x, dp) for x in v.flat)
for v in property.data)
el.append(el_data)
self._write(el)
@storage_handler(api.MosaicLabel)
def _store_label(self, xml_id, label):
universe = label.universe
universe_id = self._get_id(universe)
if universe_id is None:
raise IOError("universe must be stored first")
el = ET.Element(label.type + '_label')
el.set("id", xml_id)
el.set("name", label.name)
el.append(ET.Element('universe', ref=universe_id))
el_strings = ET.Element("strings")
el_strings.text = ' '.join(label.strings)
el.append(el_strings)
self._write(el)
@storage_handler(api.MosaicSelection)
def _store_selection(self, xml_id, selection):
universe = selection.universe
universe_id = self._get_id(universe)
if universe_id is None:
raise IOError("universe must be stored first")
el = ET.Element(selection.type + '_selection')
el.set("id", xml_id)
el.append(ET.Element('universe', ref=universe_id))
el_indices = ET.Element("indices")
el_indices.text = ' '.join(str(i) for i in selection.indices)
el.append(el_indices)
self._write(el)
[docs]class XMLReader(XMLStore):
"""Input handler for XML files
This class handles the translation of references by id in an XML
file to in-memory references between data items.
An XMLReader is used as an iterator over ``(id, data_item)`` pairs.
The current implementation does not do any validation, it assumes
the XML input to be correct.
"""
def __init__(self, xml_file):
"""
:param xml_file: a file object, or a string
interpreted as a file name
:type xml_file: str or file-like
"""
XMLStore.__init__(self)
if isstring(xml_file):
# file name given: open file and close it at the end
self.file = open(xml_file, 'r')
self._close_file = True
else:
# assume it is a file-like object, don't close at the end
self.file = xml_file
self._close_file = False
def close(self):
if self._close_file:
self.file.close()
# No more reading after this
self.file = None
# Clear cache
self._id_map = None
self._data_map = None
def __iter__(self):
for event, el in ET.iterparse(self.file):
if el.tag == 'mosaic':
# check version number
version = tuple(int(s) for s in el.get("version").split('.'))
if version[0] > api.MOSAIC_VERSION[0] \
or version[1] > api.MOSAIC_VERSION[1]:
raise ValueError("XML data is for version %d.%d, "
"software is version %d.%d"
% (version + api.MOSAIC_VERSION))
continue
xml_id = el.get("id")
if xml_id is not None:
handler = self.data_handler[el.tag]
if handler is None:
raise ValueError("Unknown element type %s" % el.tag)
data = handler(self, el)
el.clear()
self._register_data_item(xml_id, data)
yield xml_id, data
data_handler = MethodRegister()
@data_handler("universe")
def _read_universe(self, el):
ref = el.get('ref', None)
if ref is not None:
return self._get_data(ref)
molecules = \
tuple(self._parse_molecule(el_m)
for el_m in el.iter('molecule'))
symmetry_transformations = \
tuple((self._array((3, 3), N.float64, el_t.find('rotation').text),
self._array((3,), N.float64, el_t.find('translation').text))
for el_t in el.iter('transformation'))
return im.universe(el.get('cell_shape'),
molecules,
symmetry_transformations,
el.get('convention'))
def _parse_molecule(self, el):
label, fragment = self._parse_fragment_tree(el.find('fragment'))
count = int(el.get('count'))
return fragment, label, count
def _parse_fragment_tree(self, el):
fragments = []
for el_fragments in el.findall('fragments'):
for el_f in el_fragments:
fragments.append(self._parse_fragment_tree(el_f))
atoms = []
for el_atoms in el.findall('atoms'):
for el_a in el_atoms:
atom_classes = {"dummy": im.dummy,
"unknown": im.unknown,
"element": im.element}
descr = atom_classes[el_a.get('type')](el_a.get('name'))
atoms.append((el_a.get('label'),
im.atom(descr, int(el_a.get('nsites', '1')))))
bonds = []
for el_bonds in el.findall('bonds'):
for el_b in el_bonds:
a1, a2 = el_b.get('atoms').split()
bonds.append((a1, a2, el_b.get('order')))
label = el.get('label')
species = el.get('species')
polymer_type = el.get('polymer_type', None)
if polymer_type is None:
return label, im.fragment(species, fragments, atoms, bonds)
else:
assert len(atoms) == 0
return label, im.polymer(species, fragments, bonds, polymer_type)
@data_handler("configuration")
def _read_configuration(self, el):
universe = self._read_universe(el.find('universe'))
el_pos = el.find('positions')
dtype = s2t(el_pos.get('type'))
pos = self._property((3,), dtype, el_pos.text)
el_cp = el.find('cell_parameters')
if el_cp is None:
cp = None
else:
cp = self._array(el_cp.get('shape'), dtype, el_cp.text)
return im.Configuration(universe, pos, cp)
@data_handler("atom_property")
@data_handler("site_property")
@data_handler("template_atom_property")
@data_handler("template_site_property")
def _read_property(self, el):
universe = self._read_universe(el.find('universe'))
ptype = el.tag[:-9] # strip off '_property'
klass = {"atom": im.AtomProperty,
"site": im.SiteProperty,
"template_atom": im.TemplateAtomProperty,
"template_site": im.TemplateSiteProperty}[ptype]
el_d = el.find('data')
data = self._property(el_d.get('shape'),
s2t(el_d.get('type')),
el_d.text)
return klass(universe, el.get('name'), el.get('units'), data)
@data_handler("atom_label")
@data_handler("site_label")
@data_handler("template_atom_label")
@data_handler("template_site_label")
def _read_label(self, el):
universe = self._read_universe(el.find('universe'))
ptype = el.tag[:-6] # strip off '_label'
klass = {"atom": im.AtomLabel,
"site": im.SiteLabel,
"template_atom": im.TemplateAtomLabel,
"template_site": im.TemplateSiteLabel}[ptype]
el_strings = el.find('strings')
strings = tuple(el_strings.text.split())
return klass(universe, el.get('name'), strings)
@data_handler("atom_selection")
@data_handler("site_selection")
@data_handler("template_atom_selection")
@data_handler("template_site_selection")
def _read_selection(self, el):
universe = self._read_universe(el.find('universe'))
ptype = el.tag[:-10] # strip off '_selection'
klass = {"atom": im.AtomSelection,
"site": im.SiteSelection,
"template_atom": im.TemplateAtomSelection,
"template_site": im.TemplateSiteSelection}[ptype]
el_d = el.find('indices')
indices = [int(s) for s in el_d.text.split()]
indices = IN.array(indices, uint_for_max_value(max(indices)))
return klass(universe, indices)
# Parse array data
def _array(self, shape, dtype, text):
if isstring(shape):
shape = tuple(int(s) for s in shape.split())
return IN.array([dtype(s) for s in text.split()]).reshape(shape)
def _property(self, el_shape, dtype, text):
data = IN.array([dtype(s) for s in text.split()])
if isstring(el_shape):
el_shape = tuple(int(s) for s in el_shape.split())
n_els = N.product(el_shape)
assert len(data) % n_els == 0
n_entries = len(data) // n_els
return data.reshape((n_entries,)+el_shape)