import time
import numpy as np
import h5py
from larpix.larpix import Packet
[docs]class HDF5Logger(object):
'''
The HDF5Logger is logger class for logging packets to an hdf5 file format.
:param filename: filename to store data (optional, default: ``None``)
:param buffer_length: how many data messages to hang on to before flushing
buffer to the file (optional, default: ``10000``)
'''
VERSION = '0.0'
header_keys = ['version','created']
data_desc_map = {
Packet: 'raw_packet'
}
data_desc = {
'raw_packet' : [
('record_timestamp','f8'),
('chip_key','S32'),
('type','i8'),
('chipid','i8'),
('parity','i1'),
('valid_parity','i1'),
('counter','i8'),
('channel','i8'),
('timestamp','i8'),
('adc_counts','i8'),
('fifo_half','i1'),
('fifo_full','i1'),
('register','i8'),
('value','i8')
]
}
def __init__(self, filename=None, buffer_length=10000):
self.filename = filename
self.datafile = None
self.buffer_length = buffer_length
self._buffer = dict([(dataset, []) for dataset in self.data_desc.keys()])
self._write_idx = dict([(dataset, 0) for dataset in self.data_desc.keys()])
self._is_enabled = False
self._is_open = False
def _default_filename(self, timestamp=None):
'''
Fetch the default filename based on a timestamp
:param timestamp: tuple or ``struct_time`` representing a timestamp
'''
log_prefix = 'datalog'
time_format = '%Y_%m_%d_%H_%M_%S_%Z'
if not timestamp:
log_specifier = time.strftime(time_format)
else:
log_specifier = time.strftime(time_format, timestamp)
log_postfix = '.h5'
return (log_prefix + '_' + log_specifier + '_' + log_postfix)
def _create_header(self):
'''
Create datafile header
'''
if not self.is_open():
return
if '_header' in self.datafile.keys():
return
header = self.datafile.create_group('_header')
for header_key in self.header_keys:
if header_key == 'version':
header.attrs[header_key] = self.VERSION
elif header_key == 'created':
header.attrs[header_key] = time.time()
def _create_datasets(self):
'''
Create any missing datasets in file according to ``data_desc``
'''
if not self.is_open():
return
for dataset_name in self.data_desc.keys():
if not dataset_name in self.datafile.keys():
self.datafile.create_dataset(dataset_name, (0,), maxshape=(None,),
dtype=self.data_desc[dataset_name])
[docs] @classmethod
def encode(cls, data, *args, **kwargs):
'''
Converts data object into a numpy mixed type array as described by
``data_desc``
'''
if not isinstance(data, (Packet)):
raise ValueError('h5_logger can only encode Packet objects')
if isinstance(data, Packet):
return cls.encode_packet(data, *args, **kwargs)
[docs] @classmethod
def encode_packet(cls, packet, timestamp=None, *args, **kwargs):
'''
Converts packets into numpy mixed typ array according to ``data_desc['raw_packet']``
'''
if not isinstance(packet, Packet):
raise ValueError('packet must be of type Packet')
dict_rep = packet.export()
data_list = []
for key, dtype in cls.data_desc['raw_packet']:
if key == 'record_timestamp':
if timestamp:
data_list += [timestamp]
else:
data_list += [-1]
elif key == 'chip_key':
data_list += [str(dict_rep['chip_key'])]
elif key in dict_rep:
if key == 'type':
data_list += [int(packet.packet_type.to01(), 2)]
else:
data_list += [dict_rep[key]]
else:
data_list += [-1]
return np.array(tuple(data_list),dtype=cls.data_desc['raw_packet'])
[docs] def record(self, data, timestamp=None, *args, **kwargs):
'''
Send the specified data to log file
.. note:: buffer is flushed after all ``data`` is placed in buffer, this
means that the buffer size will exceed the set value temporarily
:param data: list of data to be written to log
:param timestamp: unix timestamp to be associated with data
'''
if not self.is_enabled():
return
if not self.is_open():
self.open()
if not isinstance(data, list):
raise ValueError('data must be a list')
if not timestamp:
timestamp = time.time()
for data_obj in data:
dataset = self.data_desc_map[type(data_obj)]
self._buffer[dataset] += [self.encode(data_obj, timestamp=timestamp)]
if any([len(buff) > self.buffer_length for dataset, buff in self._buffer.items()]):
self.flush()
[docs] def is_enabled(self):
'''
Check if logger is enabled
'''
return self._is_enabled
[docs] def enable(self):
'''
Allow the logger to record data
'''
if self.is_enabled():
return
self._is_enabled = True
[docs] def disable(self):
'''
Stop the logger from recording data without closing file
'''
if not self.is_enabled():
return
self.flush()
self._is_enabled = False
[docs] def is_open(self):
'''
Check if logger is open
'''
return self._is_open
[docs] def open(self, enable=True):
'''
Open output file if it is not already. If files already exist then data
will be appended to the end of arrays.
:param enable: ``True`` if you want to enable the logger after opening
(Optional, default=``True``)
'''
if self.is_open():
return
if not self.filename:
self.filename = self._default_filename()
self.datafile = h5py.File(self.filename)
self._is_open = True
self._is_enabled = enable
self._create_header()
self._create_datasets()
for dataset in self.data_desc.keys():
self._write_idx[dataset] = self.datafile[dataset].shape[0]
[docs] def close(self):
'''
Close logger if it is not already
.. note:: This flushes any data in the buffer before closing
'''
if not self.is_open():
return
self.flush()
self.datafile.close()
self._is_open = False
self._is_enabled = False
[docs] def flush(self):
'''
Flushes any held data to the output file
'''
if not self.is_open():
return
for dataset in self.data_desc.keys():
if self._buffer[dataset]:
to_store = np.array(self._buffer[dataset])
new_entries = to_store.shape[0]
curr_idx = self._write_idx[dataset]
next_idx = curr_idx + new_entries
if next_idx >= self.datafile[dataset].shape[0]:
self.datafile[dataset].resize(next_idx, axis=0)
self.datafile[dataset][curr_idx:next_idx] = to_store
self._buffer[dataset] = []
self._write_idx[dataset] = next_idx