'''
This is an alternative to the ``larpix.format.hdf5format``format that allows for
much faster conversion to file at the expense of human readability.
To use, pass a list of bytestring messages into the ``to_rawfile()`` method::
msgs = [b'this is a test message', b'this is a different message']
to_rawfile('raw.h5', msgs)
To access the data in the file, the inverse method ``from_rawfile()`` is used::
rd = from_rawfile('raw.h5')
rd['msgs'] # [b'this is a test message', b'this is a different message']
Messages may be recieved from multiple ``io_group`` sources, in this case, a
per-message header with ``io_group`` can be specified as a list of integers of
the same length as the ``msgs`` list and passed into the file at the same time::
msgs = [b'message from 1', b'message from 2']
io_groups = [1, 2]
to_rawfile('raw.h5', msgs=msgs, msg_headers={'io_groups': io_groups})
rd = from_rawfile('raw.h5')
rd['msgs'] # [b'message from 1', b'message from 2']
rd['msg_headers']['io_groups'] # [1, 2]
File versioning
---------------
Some version validation is included with the file format through
the ``version`` and ``io_version`` file metadata. When creating a new file, a
file format version can be provided with the ``version`` keyword argument as
a string formatted ``'major.minor'``::
to_rawfile('raw_v0_0.h5', version='0.0')
Subsequent writes to the file will only occur if the requested file version and
the existing file versions are compatible. Incompatiblity occurs if there is
a difference in the major version number or the minor version number is less
than the requested file version::
to_rawfile('raw_v0_0.h5', version='0.1') # fails due to minor version incompatibility
to_rawfile('raw_v0_0.h5', version='1.0') # fails due to major version incompatibility
By default, the most recent file version is used.
On the file read side, a version number can be requested and the file will be
parsed assuming a specific version::
from_rawfile('raw_v0_0.h5', version='0.0')
from_rawfile('raw_v0_0.h5', version='0.1') # fails due to minor version incompatiblity
from_rawfile('raw_v0_0.h5', version='1.0') # fails due to major version compatibility
The ``io_version`` optional metadata marks the version of the io message format
that was used to encode the message bytestrings. If present as a keyword
argument when writing to the file, an ``AssertionError`` will be raised if
the io version is incompatible with the existing one stored in metadata::
to_rawfile('raw_io_v0_0.h5', io_version='0.0')
to_rawfile('raw_io_v0_0.h5', io_version='0.1') # fails due to minor version incompatibility
to_rawfile('raw_io_v0_0.h5', io_version='1.0') # fails due to major version incompatibility
A similar mechanism occurs when requesting an io version when reading from the
file::
from_rawfile('raw_io_v0_0.h5', io_version='0.1') # fails due to minor version incompatibility
from_rawfile('raw_io_v0_0.h5', io_version='1.0') # fails due to major version
from_rawfile('raw_io_v0_0.h5', io_version='0.0')
I think it is worthwhile to further clarify the ``io_version`` and the file
``version``, as this might be confusing. In particular, you might be asking,
"What io versions are compatible with what file versions?" The ``rawhdf5format``
is a way of wrapping raw binary data into a format that only requires HDF5 to
parse. The file version represents this HDF5 structuring (the hdf5 dataset
formats, file metadata, what message header data is available). Whereas the
``io_version`` represents the formatting of the binary data that the file
contains. So the answer to that question is: *all* file versions are compatible
with *all* io versions.
Converting to other file types
------------------------------
This format was created with a specific application in mind - provide a
temporary but fast file format for PACMAN messages. When used in this
case, to convert to the standard ``larpix.format.hdf5format``::
from larpix.format.pacman_msg_format import parse
from larpix.format.hdf5format import to_file
rd = from_rawfile('raw.h5')
pkts = list()
for io_group,msg in zip(rd['msg_headers']['io_groups'], rd['msgs']):
pkts.extend(parse(msg, io_group=io_group))
to_file('new_filename.h5', packet_list=pkts)
but as always, the most efficient means of accessing the data is to operate on
the data itself, rather than converting between types.
Metadata (v0.0)
---------------
The group ``meta`` contains file metadata stored as attributes:
- ``created``: ``float``, unix timestamp since the 1970 epoch in seconds indicating when file was first created
- ``modified``: ``float``, unix timestamp since the 1970 epoch in seconds indicating when the file was last written to
- ``version``: ``str``, file version, formatted as ``'major.minor'``
- ``io_version``: ``str``, optional version for message bytestring encoding, formatted as ``'major.minor'``
Datasets (v0.0)
---------------
The hdf5 format contains two datasets ``msgs`` and ``msg_headers``:
- ``msgs``: shape ``(N,)``; variable-length ``uint1`` arrays encoding each message bytestring
- ``msg_headers``: shape ``(N,)``; numpy structured array with fields:
- ``'io_group'``: ``uint1`` representing the ``io_group`` associated with each message
'''
import time
import warnings
import os
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE' # needed for error-free SWMR writer access
_file_read_reattempts = 100 # also needed to ignore inevitable reader errors for SWMR mode
import h5py
import numpy as np
#: Most up-to-date raw larpix hdf5 format version.
latest_version = '0.0'
#: Description of the datasets and their dtypes used in each version of the raw larpix hdf5 format.
#:
#: Structured as ``dataset_dtypes['<version>']['<dataset>'] = <dtype>``.
dataset_dtypes = {
'0.0': {
'msgs': h5py.vlen_dtype(np.dtype('u1')),
'msg_headers': np.dtype([
('io_groups','u1')
])
}
}
def _store_msgs_v0_0(msgs, version):
msg_dtype = np.dtype('u1')
arr_dtype = dataset_dtypes[version]['msgs']
return np.array([np.frombuffer(msg, dtype=msg_dtype) for msg in msgs], dtype=arr_dtype)
def _store_msg_headers_v0_0(msg_headers, version):
length = len(msg_headers['io_groups'])
arr = np.zeros((length,), dtype=dataset_dtypes[version]['msg_headers'])
for key in msg_headers:
arr[key] = msg_headers[key]
return arr
def _parse_msgs_v0_0(msgs, version):
return [msg.tobytes() for msg in msgs]
def _parse_msg_headers_v0_0(msg_headers, version):
rd = dict()
for key in msg_headers.dtype.names:
rd[key] = list(msg_headers[key].astype(int))
return rd
def _store_msgs(msgs, version):
'''
A version-safe way to put messages into the dataset
:param msgs: an iterable of PACMAN messages to convert
:param version: version string
:returns: a numpy array, 1 row for each msg
'''
return _store_msgs_v0_0(msgs, version)
def _store_msg_headers(msg_headers, version):
'''
A version-safe way to put message headers into the dataset
:param msg_headers: a dict of iterable values for each field in ``dataset_dtypes[version]['msg_headers'].names`` to convert to a numpy array
:param version: version string
:returns: a numpy array, 1 row for each io_group
'''
return _store_msg_headers_v0_0(msg_headers, version)
def _parse_msgs(msgs, version):
'''
A version-safe conversion of numpy array void objects into PACMAN message byte strings
:param msgs: a list of void-type numpy arrays
:param version: version string
:returns: list of PACMAN message byte strings, 1 for each row in data
'''
return _parse_msgs_v0_0(msgs, version)
def _parse_msg_headers(msg_headers, version):
'''
A version-safe conversion of message header numpy arrays into lists
:param msg_headers: a structure array of the msg_header dataset to convert to a list of values
:param version: version string
:returns: dict of lists, keyed by dtype fields, 1 entry in each list for each row in data
'''
return _parse_msg_headers_v0_0(msg_headers, version)
[docs]def to_rawfile(filename, msgs=None, version=None, msg_headers=None, io_version=None):
'''
Write a list of bytestring messages to an hdf5 file. If the file exists,
the messages will appended to the end of the dataset.
:param filename: desired filename for the file to write or update
:param msgs: iterable of variable-length bytestrings to write to the file. If ``None`` specified, will only create file and update metadata.
:param version: a string of major.minor version desired. If ``None`` specified, will use the latest file format version (if new file) or version in file (if updating an existing file).
:param msg_headers: a dict of iterables to associate with each message header. Iterables must be same length as ``msgs``. If ``None`` specified, will use a default value of ``0`` for each message. Keys are dtype field names specified in ``dataset_dtypes[version]['msg_headers'].names``
:param io_version: optional metadata to associate with file corresponding to the io format version of the bytestring messages. Throws ``RuntimeError`` if version incompatibility encountered in an existing file.
'''
now = time.time()
with h5py.File(filename, 'a', libver='latest') as f:
if 'meta' not in f.keys():
# new file
version = latest_version if version is None else version
f.create_group('meta')
f['meta'].attrs['version'] = version
f['meta'].attrs['created'] = now
f['meta'].attrs['modified'] = now
if io_version is not None:
f['meta'].attrs['io_version'] = io_version
# get current position in file
curr_idx = 0
# create datasets
f.create_dataset('msgs', shape=(0,), maxshape=(None,), compression='gzip', dtype=dataset_dtypes[version]['msgs'])
f.create_dataset('msg_headers', shape=(0,), maxshape=(None,), compression='gzip', dtype=dataset_dtypes[version]['msg_headers'])
f.swmr_mode = True
else:
# existing file
f.swmr_mode = True
file_version = f['meta'].attrs['version']
assert (file_version == version) or (version is None), \
'Version mismatch! file: {}, requested: {}'.format(file_version, version)
version = file_version
assert (io_version is None) or ('io_version' in f['meta'].attrs.keys() and f['meta'].attrs['io_version'].split('.')[0] == io_version.split('.')[0] and f['meta'].attrs['io_version'].split('.')[-1] >= io_version.split('.')[-1]), 'IO version mismatch! file: {}, requested {}'.format(f['meta'].attrs['io_version'], io_version)
f['meta'].attrs['modified'] = now
# update data
if msgs is not None:
headers = dict()
for key in msg_headers:
headers[key] = msg_headers[key]
if key not in dataset_dtypes[version]['msg_headers'].names:
raise RuntimeError('Encountered unknown message header key {}'.format(key))
for key in dataset_dtypes[version]['msg_headers'].names:
if key not in headers:
headers[key] = np.zeros(len(msgs))
assert len(headers[key]) == len(msgs), 'Data length mismatch! msgs is length {}, but msg_headers field {} is length {}'.format(len(msgs),key,len(headers[key]))
# resize datasets
curr_idx = len(f['msgs'])
f['msgs'].resize((curr_idx+len(msgs),))
f['msg_headers'].resize((curr_idx+len(msgs),))
# store in file
msgs_array = _store_msgs(
msgs,
version=version
)
msg_headers_array = _store_msg_headers(
msg_headers,
version=version
)
f['msgs'][curr_idx:curr_idx + len(msgs_array)] = msgs_array
f['msg_headers'][curr_idx:curr_idx + len(msg_headers_array)] = msg_headers_array
# flush
f['msgs'].flush()
f['msg_headers'].flush()
def _synchronize(attempts, *dsets):
if len(dsets) <= 1:
return
success = attempts == 0
attempt = 1
lengths = [0]*len(dsets)
while (attempt <= attempts or attempts < 0) and not success:
attempt += 1
for i,dset in enumerate(dsets):
dset.id.refresh()
lengths[i] = len(dset)
if all([lengths[0] == length for length in lengths[1:]]):
success = True
if not success:
raise RuntimeError('Could not achieve a stable file state after {} attempts!'.format(attempts))
[docs]def len_rawfile(filename, attempts=1):
'''
Check the total number of messages in a file
:param filename: filename to check
:param attempts: a parameter only relevant if file is being actively written to by another process, specifies number of refreshes to try if a synchronized state between the datasets is not achieved. A value less than ``0`` busy blocks until a synchronized state is achieved. A value greater than ``0`` tries to achieve synchronization a max of ``attempts`` before throwing a ``RuntimeError``. And a value of ``0`` does not attempt to synchronize (not recommended).
:returns: ``int`` number of messages in file
'''
err = None
for _ in range(_file_read_reattempts):
try:
with h5py.File(filename, 'r', swmr=True, libver='latest') as f:
_synchronize(attempts, f['msgs'], f['msg_headers'])
return len(f['msgs'])
except OSError as e:
if e.errno is None:
warnings.warn(str(e) + '\ntrying again...', RuntimeWarning)
err = e
else:
raise e
raise err
[docs]def from_rawfile(filename, start=None, end=None, version=None, io_version=None, msg_headers_only=False, mask=None, attempts=1):
'''
Read a chunk of bytestring messages from an existing file
:param filename: filename to read bytestrings from
:param start: index for the start position when reading from the file (default = ``None``). If a value less than 0 is specified, index is relative to the end of the file. If ``None`` is specified, data is read from the start of the file. If a ``mask`` is specified, does nothing.
:param end: index for the end position when reading from the file (default = ``None``). If a value less than 0 is specified, index is relative to the end of the file. If ``None`` is specified, data is read until the end of the file. If a ``mask`` is specified, does nothing.
:param version: required version compatibility. If ``None`` specified, uses the version stored in the file metadata
:param io_version: required io version compatibility. If ``None`` specified, does not check the ``io_version`` file metadata
:param msg_headers_only: optional flag to only load header information and not message bytestrings (``'msgs'`` value in return dict will be ``None`` if ``msg_headers_only=True``)
:param mask: boolean mask alternative to ``start`` and ``end`` chunk specification to indicate specific file rows to load. Boolean 1D array with length equal to ``len_rawfile(filename)``
:param attempts: a parameter only relevant if file is being actively written to by another process, specifies number of refreshes to try if a synchronized state between the datasets is not achieved. A value less than ``0`` busy blocks until a synchronized state is achieved. A value greater than ``0`` tries to achieve synchronization a max of ``attempts`` before throwing a ``RuntimeError``. And a value of ``0`` does not attempt to synchronize (not recommended).
:returns: ``dict`` with keys for ``'created'``, ``'modified'``, ``'version'``, and ``'io_version'`` metadata, along with ``'msgs'`` (a ``list`` of bytestring messages) and ``'msg_headers'`` (a dict with message header field name: ``list`` of message header field data, 1 per message)
'''
err = None
for _ in range(_file_read_reattempts):
try:
with h5py.File(filename, 'r', swmr=True, libver='latest') as f:
# fetch metadata
created = f['meta'].attrs['created']
modified = f['meta'].attrs['modified']
# check file format version is compatible
file_version = f['meta'].attrs['version']
version_major = file_version.split('.')[0]
assert (version is None) or (file_version >= version and version_major == version.split('.')[0]), 'Incompatible version mismatch! file: {}, requested: {}'.format(file_version, version)
version_minor = min(file_version.split('.')[-1], version.split('.')[-1]) if version is not None else file_version.split('.')[-1]
version = '{}.{}'.format(version_major,version_minor)
# check io format version is compatible
file_io_version = f['meta'].attrs['io_version'] if 'io_version' in f['meta'].attrs.keys() else None
io_version_major, io_version_minor = file_io_version.split('.') if file_io_version is not None else (None,None)
assert (io_version is None) or (file_io_version is None) or (io_version_major == io_version.split('.')[0] and io_version_minor >= io_version.split('.')[-1]), 'IO version mismatch! file: {}, requested {}'.format(file_io_version,io_version)
io_version = file_io_version
# check to make sure that the msgs and headers dsets are synchronized
_synchronize(attempts, f['msgs'], f['msg_headers'])
# define chunk of data to load
start = int(start) if start is not None else 0
end = int(end) if end is not None else len(f['msgs'])
mask = mask if mask is not None else slice(start,end)
# get data from file
msg_headers = _parse_msg_headers(f['msg_headers'][mask], version)
msgs = _parse_msgs(f['msgs'][mask], version) if not msg_headers_only else None
return dict(
created=created,
modified=modified,
version=version,
io_version=io_version,
msgs=msgs,
msg_headers=msg_headers
)
except OSError as e:
if e.errno is None:
warnings.warn(str(e) + '\ntrying again...', RuntimeWarning)
err = e
else:
raise e
raise err