'''
A module to control the LArPix chip.
'''
from __future__ import absolute_import
import time
from bitarray import bitarray
import larpix.bitarrayhelper as bah
import json
import os
import errno
import re
import platform
import math
import larpix.configs as configs
[docs]class Chip(object):
'''
Represents one LArPix chip and helps with configuration and packet
generation.
'''
num_channels = 32
def __init__(self, chip_id, io_chain):
self.chip_id = chip_id
self.io_chain = io_chain
self.data_to_send = []
self.config = Configuration()
self.reads = []
self.new_reads_index = 0
def __str__(self):
return 'Chip (id: %d, chain: %d)' % (self.chip_id, self.io_chain)
def __repr__(self):
return 'Chip(%d, %d)' % (self.chip_id, self.io_chain)
[docs] def get_configuration_packets(self, packet_type, registers=None):
if registers is None:
registers = range(Configuration.num_registers)
conf = self.config
packets = []
packet_register_data = conf.all_data()
for i, data in enumerate(packet_register_data):
if i not in registers:
continue
packet = Packet()
packet.packet_type = packet_type
packet.chipid = self.chip_id
packet.register_address = i
if packet_type == Packet.CONFIG_WRITE_PACKET:
packet.register_data = data
else:
packet.register_data = 0
packet.assign_parity()
packets.append(packet)
return packets
[docs] def sync_configuration(self, index=-1):
'''
Adjust self.config to match whatever config read packets are in
self.reads[index].
Defaults to the most recently read PacketCollection. Later
packets in the list will overwrite earlier packets. The
``index`` parameter could be a slice.
'''
updates = {}
if isinstance(index, slice):
for collection in self.reads[index]:
for packet in collection:
if packet.packet_type == Packet.CONFIG_READ_PACKET:
updates[packet.register_address] = packet.register_data
else:
for packet in self.reads[index]:
if packet.packet_type == Packet.CONFIG_READ_PACKET:
updates[packet.register_address] = packet.register_data
self.config.from_dict_registers(updates)
[docs] def export_reads(self, only_new_reads=True):
'''
Return a dict of the packets this Chip has received.
If ``only_new_reads`` is ``True`` (default), then only the
packets since the last time this method was called will be in
the dict. Otherwise, all of the packets stored in ``self.reads``
will be in the dict.
'''
data = {}
data['chipid'] = self.chip_id
data['io_chain'] = self.io_chain
if only_new_reads:
packets = self.reads[self.new_reads_index:]
else:
packets = self.reads
data['packets'] = list(map(lambda x:x.export(), packets))
self.new_reads_index = len(self.reads)
return data
[docs]class Smart_List(list): #vb
def __init__(self, values, low, high):
if not (type(values) == list or type(values) == Smart_List):
raise ValueError("Smart_List is not list")
if any([value > high or value < low for value in values]):
raise ValueError("value out of bounds")
list.__init__(self, values)
self.low = low
self.high = high
def __setitem__(self, key, value):
if isinstance(key, int):
if value > self.high or value < self.low:
raise ValueError("value out of bounds")
list.__setitem__(self, key, value)
else:
for num in value:
if num > self.high or num < self.low:
raise ValueError("value out of bounds")
list.__setitem__(self, key, value)
[docs]class Configuration(object):
'''
Represents the desired configuration state of a LArPix chip.
'''
fpga_packet_size = 10
num_registers = 63
pixel_trim_threshold_addresses = list(range(0, 32))
global_threshold_address = 32
csa_gain_and_bypasses_address = 33
csa_bypass_select_addresses = list(range(34, 38))
csa_monitor_select_addresses = list(range(38, 42))
csa_testpulse_enable_addresses = list(range(42, 46))
csa_testpulse_dac_amplitude_address = 46
test_mode_xtrig_reset_diag_address = 47
sample_cycles_address = 48
test_burst_length_addresses = [49, 50]
adc_burst_length_address = 51
channel_mask_addresses = list(range(52, 56))
external_trigger_mask_addresses = list(range(56, 60))
reset_cycles_addresses = [60, 61, 62]
register_names = ['pixel_trim_thresholds',
'global_threshold',
'csa_gain',
'csa_bypass',
'internal_bypass',
'csa_bypass_select',
'csa_monitor_select',
'csa_testpulse_enable',
'csa_testpulse_dac_amplitude',
'test_mode',
'cross_trigger_mode',
'periodic_reset',
'fifo_diagnostic',
'sample_cycles',
'test_burst_length',
'adc_burst_length',
'channel_mask',
'external_trigger_mask',
'reset_cycles']
TEST_OFF = 0x0
TEST_UART = 0x1
TEST_FIFO = 0x2
def __init__(self):
# Actual setup
self.load('default.json')
# Annoying things we have to do because the configuration
# register follows complex semantics:
# The following dicts/lists specify how to translate a register
# address into a sensible update to the Configuration object.
# Simple registers are just the value stored in the register.
self._simple_registers = {
32: 'global_threshold',
46: 'csa_testpulse_dac_amplitude',
48: 'sample_cycles',
51: 'adc_burst_length',
}
# These registers need the attribute extracted from the register
# data.
self._complex_modify_data = {
33: [('csa_gain', lambda data:data % 2),
('csa_bypass', lambda data:(data//2) % 2),
('internal_bypass', lambda data:(data//8) % 2)],
47: [('test_mode', lambda data:data % 4),
('cross_trigger_mode', lambda data:(data//4) % 2),
('periodic_reset', lambda data:(data//8) % 2),
('fifo_diagnostic', lambda data:(data//16) % 2)]
}
# These registers combine the register data with the existing
# attribute value to get the new attribute value.
self._complex_modify_attr = {
49: ('test_burst_length', lambda val,data:(val//256)*256+data),
50: ('test_burst_length', lambda val,data:(val%256)+data*256),
60: ('reset_cycles', lambda val,data:(val//256)*256+data),
61: ('reset_cycles',
lambda val,data:(val//0x10000)*0x10000+data*256+val%256),
62: ('reset_cycles', lambda val,data:(val%0x10000)+data*0x10000)
}
# These registers store 32 bits over 4 registers each, and those
# 32 bits correspond to entries in a 32-entry list.
self._complex_array_spec = [
(range(34, 38), 'csa_bypass_select'),
(range(38, 42), 'csa_monitor_select'),
(range(42, 46), 'csa_testpulse_enable'),
(range(52, 56), 'channel_mask'),
(range(56, 60), 'external_trigger_mask')]
self._complex_array = {}
for addresses, label in self._complex_array_spec:
for i, address in enumerate(addresses):
self._complex_array[address] = (label, i)
# These registers each correspond to an entry in an array
self._trim_registers = list(range(32))
def __setattr__(self, name, value):
'''
Default setattr behavior occurs if name is in ``register_names``, is "private"
or is a known attribute
Otherwise raises an attribute error
'''
if not (name in self.register_names or name[0] == '_' or hasattr(self, name)):
raise AttributeError('%s is not a known register' % name)
return super(Configuration, self).__setattr__(name, value)
def __eq__(self, other):
'''
Returns true if all fields match
'''
return all([getattr(self, register_name) == getattr(other, register_name)
for register_name in self.register_names])
def __str__(self):
'''
Converts configuration to a nicely formatted json string
'''
d = self.to_dict()
l = ['\"{}\": {}'.format(key,value) for key,value in d.items()]
return '{\n ' + ',\n '.join(l) + '\n}'
[docs] def compare(self, config):
'''
Returns a dict containing pairs of each differently valued register
Pair order is (self, other)
'''
d = {}
for register_name in self.register_names:
if getattr(self, register_name) != getattr(config, register_name):
d[register_name] = (getattr(self, register_name), getattr(config,
register_name))
# Attempt to simplify some of the long values (array values)
for (name, (self_value, config_value)) in d.items():
if (name in (label for _, label in self._complex_array_spec)
or name == 'pixel_trim_thresholds'):
different_values = []
for ch, (val, config_val) in enumerate(zip(self_value, config_value)):
if val != config_val:
different_values.append(({'channel': ch, 'value': val},
{'channel': ch, 'value': config_val}))
if len(different_values) < 5:
d[name] = different_values
else:
pass
return d
[docs] def get_nondefault_registers(self):
return self.compare(Configuration())
@property
def pixel_trim_thresholds(self):
return self._pixel_trim_thresholds
@pixel_trim_thresholds.setter
def pixel_trim_thresholds(self, values):
low = 0
high = 31
if not (type(values) == list or type(values) == Smart_List):
raise ValueError("pixel_trim_threshold is not list")
if not len(values) == Chip.num_channels:
raise ValueError("pixel_trim_threshold length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("pixel_trim_threshold is not int")
if any(value > high or value < low for value in values):
raise ValueError("pixel_trim_threshold out of bounds")
self._pixel_trim_thresholds = Smart_List(values, low, high)
@property
def global_threshold(self):
return self._global_threshold
@global_threshold.setter
def global_threshold(self, value):
if not type(value) == int:
raise ValueError("global_threshold is not int")
if value > 255 or value < 0:
raise ValueError("global_threshold out of bounds")
self._global_threshold = value
@property
def csa_gain(self):
return self._csa_gain
@csa_gain.setter
def csa_gain(self, value):
if not type(value) == int:
raise ValueError("csa_gain is not int")
if value > 1 or value < 0:
raise ValueError("csa_gain out of bounds")
self._csa_gain = value
@property
def csa_bypass(self):
return self._csa_bypass
@csa_bypass.setter
def csa_bypass(self, value):
if not type(value) == int:
raise ValueError("csa_bypass is not int")
if value > 1 or value < 0:
raise ValueError("csa_bypass out of bounds")
self._csa_bypass = value
@property
def internal_bypass(self):
return self._internal_bypass
@internal_bypass.setter
def internal_bypass(self, value):
if not type(value) == int:
raise ValueError("internal_bypass is not int")
if value > 1 or value < 0:
raise ValueError("internal_bypass out of bounds")
self._internal_bypass = value
@property
def csa_bypass_select(self):
return self._csa_bypass_select
@csa_bypass_select.setter
def csa_bypass_select(self, values):
low = 0
high = 1
if not (type(values) == list or type(values) == Smart_List):
raise ValueError("csa_bypass_select is not list")
if not len(values) == Chip.num_channels:
raise ValueError("csa_bypass_select length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("csa_bypass_select is not int")
if any(value > high or value < low for value in values):
raise ValueError("csa_bypass_select out of bounds")
self._csa_bypass_select = Smart_List(values, low, high)
@property
def csa_monitor_select(self):
return self._csa_monitor_select
@csa_monitor_select.setter
def csa_monitor_select(self, values):
low = 0
high = 1
if not (type(values) == list or type(values) == Smart_List):
raise ValueError("csa_monitor_select is not list")
if not len(values) == Chip.num_channels:
raise ValueError("csa_monitor_select length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("csa_monitor_select is not int")
if any(value > high or value < low for value in values):
raise ValueError("csa_monitor_select out of bounds")
self._csa_monitor_select = Smart_List(values, low, high)
@property
def csa_testpulse_enable(self):
return self._csa_testpulse_enable
@csa_testpulse_enable.setter
def csa_testpulse_enable(self, values):
if not type(values) == list:
raise ValueError("csa_testpulse_enable is not list")
if not len(values) == Chip.num_channels:
raise ValueError("csa_testpulse_enable length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("csa_testpulse_enable is not int")
if any(value > 1 or value < 0 for value in values):
raise ValueError("csa_testpulse_enable out of bounds")
self._csa_testpulse_enable = values
@property
def csa_testpulse_dac_amplitude(self):
return self._csa_testpulse_dac_amplitude
@csa_testpulse_dac_amplitude.setter
def csa_testpulse_dac_amplitude(self, value):
if not type(value) == int:
raise ValueError("csa_testpulse_dac_amplitude is not int")
if value > 255 or value < 0:
raise ValueError("csa_testpulse_dac_amplitude out of bounds")
self._csa_testpulse_dac_amplitude = value
@property
def test_mode(self):
return self._test_mode
@test_mode.setter
def test_mode(self, value):
if not type(value) == int:
raise ValueError("test_mode is not int")
valid_values = [Configuration.TEST_OFF, Configuration.TEST_UART,
Configuration.TEST_FIFO]
if not value in valid_values:
raise ValueError("test_mode is not valid")
self._test_mode = value
@property
def cross_trigger_mode(self):
return self._cross_trigger_mode
@cross_trigger_mode.setter
def cross_trigger_mode(self, value):
if not type(value) == int:
raise ValueError("cross_trigger_mode is not int")
if value > 1 or value < 0:
raise ValueError("cross_trigger_mode out of bounds")
self._cross_trigger_mode = value
@property
def periodic_reset(self):
return self._periodic_reset
@periodic_reset.setter
def periodic_reset(self, value):
if not type(value) == int:
raise ValueError("periodic_reset is not int")
if value > 1 or value < 0:
raise ValueError("periodic_reset out of bounds")
self._periodic_reset = value
@property
def fifo_diagnostic(self):
return self._fifo_diagnostic
@fifo_diagnostic.setter
def fifo_diagnostic(self, value):
if not type(value) == int:
raise ValueError("fifo_diagnostic is not int")
if value > 1 or value < 0:
raise ValueError("fifo_diagnostic out of bounds")
self._fifo_diagnostic = value
@property
def sample_cycles(self):
return self._sample_cycles
@sample_cycles.setter
def sample_cycles(self, value):
if not type(value) == int:
raise ValueError("sample_cycles is not int")
if value > 255 or value < 0:
raise ValueError("sample_cycles out of bounds")
self._sample_cycles = value
@property
def test_burst_length(self):
return self._test_burst_length
@test_burst_length.setter
def test_burst_length(self, value):
if not type(value) == int:
raise ValueError("test_burst_length is not int")
if value > 65535 or value < 0:
raise ValueError("test_burst_length out of bounds")
self._test_burst_length = value
@property
def adc_burst_length(self):
return self._adc_burst_length
@adc_burst_length.setter
def adc_burst_length(self, value):
if not type(value) == int:
raise ValueError("adc_burst_length is not int")
if value > 255 or value < 0:
raise ValueError("adc_burst_length out of bounds")
self._adc_burst_length = value
@property
def channel_mask(self):
return self._channel_mask
@channel_mask.setter
def channel_mask(self, values):
if not type(values) == list:
raise ValueError("channel_mask is not list")
if not len(values) == Chip.num_channels:
raise ValueError("channel_mask length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("channel_mask is not int")
if any(value > 1 or value < 0 for value in values):
raise ValueError("channel_mask out of bounds")
self._channel_mask = values
@property
def external_trigger_mask(self):
return self._external_trigger_mask
@external_trigger_mask.setter
def external_trigger_mask(self, values):
if not type(values) == list:
raise ValueError("external_trigger_mask is not list")
if not len(values) == Chip.num_channels:
raise ValueError("external_trigger_mask length is not %d" % Chip.num_channels)
if not all(type(value) == int for value in values):
raise ValueError("external_trigger_mask is not int")
if any(value > 1 or value < 0 for value in values):
raise ValueError("external_trigger_mask out of bounds")
self._external_trigger_mask = values
@property
def reset_cycles(self):
return self._reset_cycles
@reset_cycles.setter
def reset_cycles(self, value):
if not type(value) == int:
raise ValueError("reset_cycles is not int")
if value > 16777215 or value < 0:
raise ValueError("reset_cycles out of bounds")
self._reset_cycles = value
[docs] def enable_channels(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.channel_mask[channel] = 0
[docs] def disable_channels(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.channel_mask[channel] = 1
[docs] def enable_normal_operation(self):
#TODO Ask Dan what this means
# Load configuration for a normal physics run
pass
[docs] def enable_external_trigger(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.external_trigger_mask[channel] = 0
[docs] def disable_external_trigger(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.external_trigger_mask[channel] = 1
[docs] def enable_testpulse(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.csa_testpulse_enable[channel] = 0
[docs] def disable_testpulse(self, list_of_channels=None):
if list_of_channels is None:
list_of_channels = range(Chip.num_channels)
for channel in list_of_channels:
self.csa_testpulse_enable[channel] = 1
[docs] def enable_analog_monitor(self, channel):
self.csa_monitor_select[channel] = 1
[docs] def disable_analog_monitor(self):
self.csa_monitor_select = [0] * Chip.num_channels
[docs] def all_data(self):
bits = []
num_channels = Chip.num_channels
for channel in range(num_channels):
bits.append(self.trim_threshold_data(channel))
bits.append(self.global_threshold_data())
bits.append(self.csa_gain_and_bypasses_data())
for chunk in range(4):
bits.append(self.csa_bypass_select_data(chunk))
for chunk in range(4):
bits.append(self.csa_monitor_select_data(chunk))
for chunk in range(4):
bits.append(self.csa_testpulse_enable_data(chunk))
bits.append(self.csa_testpulse_dac_amplitude_data())
bits.append(self.test_mode_xtrig_reset_diag_data())
bits.append(self.sample_cycles_data())
bits.append(self.test_burst_length_data(0))
bits.append(self.test_burst_length_data(1))
bits.append(self.adc_burst_length_data())
for chunk in range(4):
bits.append(self.channel_mask_data(chunk))
for chunk in range(4):
bits.append(self.external_trigger_mask_data(chunk))
bits.append(self.reset_cycles_data(0))
bits.append(self.reset_cycles_data(1))
bits.append(self.reset_cycles_data(2))
return bits
[docs] def trim_threshold_data(self, channel):
return bah.fromuint(self.pixel_trim_thresholds[channel], 8)
[docs] def global_threshold_data(self):
return bah.fromuint(self.global_threshold, 8)
[docs] def csa_gain_and_bypasses_data(self):
return bitarray('0000') + [self.internal_bypass, 0,
self.csa_bypass, self.csa_gain]
[docs] def csa_bypass_select_data(self, chunk):
if chunk == 0:
return bitarray(self.csa_bypass_select[7::-1])
else:
high_bit = (chunk + 1) * 8 - 1
low_bit = chunk * 8 - 1
return bitarray(self.csa_bypass_select[high_bit:low_bit:-1])
#TODO
[docs] def csa_monitor_select_data(self, chunk):
if chunk == 0:
return bitarray(self.csa_monitor_select[7::-1])
else:
high_bit = (chunk + 1) * 8 - 1
low_bit = chunk * 8 - 1
return bitarray(self.csa_monitor_select[high_bit:low_bit:-1])
[docs] def csa_testpulse_enable_data(self, chunk):
if chunk == 0:
return bitarray(self.csa_testpulse_enable[7::-1])
else:
high_bit = (chunk + 1) * 8 - 1
low_bit = chunk * 8 - 1
return bitarray(self.csa_testpulse_enable[high_bit:low_bit:-1])
[docs] def csa_testpulse_dac_amplitude_data(self):
return bah.fromuint(self.csa_testpulse_dac_amplitude, 8)
[docs] def test_mode_xtrig_reset_diag_data(self):
toReturn = bitarray([0, 0, 0, self.fifo_diagnostic,
self.periodic_reset,
self.cross_trigger_mode])
toReturn.extend(bah.fromuint(self.test_mode, 2))
return toReturn
[docs] def sample_cycles_data(self):
return bah.fromuint(self.sample_cycles, 8)
[docs] def test_burst_length_data(self, chunk):
bits = bah.fromuint(self.test_burst_length, 16)
if chunk == 0:
return bits[8:]
elif chunk == 1:
return bits[:8]
[docs] def adc_burst_length_data(self):
return bah.fromuint(self.adc_burst_length, 8)
[docs] def channel_mask_data(self, chunk):
if chunk == 0:
return bitarray(self.channel_mask[7::-1])
else:
high_bit = (chunk + 1) * 8 - 1
low_bit = chunk * 8 - 1
return bitarray(self.channel_mask[high_bit:low_bit:-1])
[docs] def external_trigger_mask_data(self, chunk):
if chunk == 0:
return bitarray(self.external_trigger_mask[7::-1])
else:
high_bit = (chunk + 1) * 8 - 1
low_bit = chunk * 8 - 1
return bitarray(self.external_trigger_mask[high_bit:low_bit:-1])
[docs] def reset_cycles_data(self, chunk):
bits = bah.fromuint(self.reset_cycles, 24)
if chunk == 0:
return bits[16:]
elif chunk == 1:
return bits[8:16]
elif chunk == 2:
return bits[:8]
[docs] def to_dict(self):
d = {}
for register_name in self.register_names:
d[register_name] = getattr(self, register_name)
return d
[docs] def from_dict(self, d):
for register_name in self.register_names:
if register_name in d:
setattr(self, register_name, d[register_name])
[docs] def from_dict_registers(self, d):
'''
Load in the configuration specified by a dict of (register,
value) pairs.
'''
def bits_to_array(data):
bits = bah.fromuint(data, 8)
return [int(bit) for bit in bits][::-1]
for address, value in d.items():
if address in self._simple_registers:
setattr(self, self._simple_registers[address], value)
elif address in self._complex_modify_data:
attributes = self._complex_modify_data[address]
for name, extract in attributes:
setattr(self, name, extract(value))
elif address in self._complex_modify_attr:
name, combine = self._complex_modify_attr[address]
current_value = getattr(self, name)
setattr(self, name, combine(current_value, value))
elif address in self._complex_array:
name, index = self._complex_array[address]
affected = slice(index*8, (index+1)*8)
attr_list = getattr(self, name)
attr_list[affected] = bits_to_array(value)
elif address in self._trim_registers:
self.pixel_trim_thresholds[address] = value
return #phew
[docs] def write(self, filename, force=False, append=False):
if os.path.isfile(filename):
if not force:
raise IOError(errno.EEXIST,
'File %s exists. Use force=True to overwrite'
% filename)
with open(filename, 'w+') as outfile:
outfile.write(str(self))
return 0
[docs] def load(self, filename):
data = configs.load(filename)
self.from_dict(data)
[docs]class Controller(object):
'''
Controls a collection of LArPix Chip objects.
Properties and attributes:
- ``chips``: the ``Chip`` objects that the controller controls
- ``all_chip``: all possible ``Chip`` objects (considering there are
a finite number of chip IDs), initialized on object construction
- ``port``: the path to the serial port, i.e. "/dev/(whatever)"
(default: ``None`` [will attempt to auto-find correct port])
- ``timeout``: the timeout used for serial commands, in seconds.
This can be changed between calls to the read and write commands.
(default: ``1``)
- ``reads``: list of all the PacketCollections that have been sent
back to this controller. PacketCollections are created by
``run``, ``write_configuration``, and ``read_configuration``, but
not by any of the ``serial_*`` methods.
- ``use_all_chips``: if ``True``, look up chip objects in
``self.all_chips``, else look up in ``self.chips`` (default:
``False``)
'''
start_byte = b'\x73'
stop_byte = b'\x71'
def __init__(self, port=None, timeout=1):
self.chips = []
self.all_chips = self._init_chips()
self.use_all_chips = False
self.reads = []
self.nreads = 0
self.port = port
if self.port is None:
self.port = SerialPort.guess_port()
self.baudrate = 1000000
self.timeout = timeout
self.max_write = 250
self._test_mode = False
self._serial = SerialPort(port=self.port,
baudrate=self.baudrate,
timeout=self.timeout)
def _init_chips(self, nchips = 256, iochain = 0):
'''
Return all possible chips.
'''
return [Chip(i, iochain) for i in range(256)]
[docs] def get_chip(self, chip_id, io_chain):
if self.use_all_chips:
chip_list = self.all_chips
else:
chip_list = self.chips
for chip in chip_list:
if chip.chip_id == chip_id and chip.io_chain == io_chain:
return chip
raise ValueError('Could not find chip (%d, %d) (using all_chips'
'? %s)' % (chip_id, io_chain, self.use_all_chips))
#def serial_flush(self):
# with self._serial(self.port, baudrate=self.baudrate,
# timeout=self.timeout) as serial:
# serial.reset_output_buffer()
# serial.reset_input_buffer()
[docs] def serial_close(self):
if not self._serial is None:
self._serial.close()
return
[docs] def serial_read(self, timelimit):
data_in = b''
start = time.time()
close_port = False
if not self._serial._keep_open:
close_port = True
self._serial._keep_open = True
try:
while time.time() - start < timelimit:
stream = self._serial.read(self.max_write)
if len(stream) > 0:
data_in += stream
except Exception as e:
if getattr(self, '_read_tries_left', None) is None:
self._read_tries_left = 3
self.serial_read(timelimit)
elif self._read_tries_left > 0:
self._read_tries_left -= 1
self.serial_read(timelimit)
else:
del self._read_tries_left
raise
if close_port:
self.serial_close()
self._serial._keep_open = False
return data_in
[docs] def serial_write(self, bytestreams):
for bytestream in bytestreams:
self._serial.write(bytestream)
[docs] def serial_write_read(self, bytestreams, timelimit):
data_in = b''
close_port = False
if not self._serial._keep_open:
close_port = True
self._serial._keep_open = True
start = time.time()
# First do a fast write-read loop until everything is
# written out, then just read
self._serial.timeout = 0 # Return whatever's already waiting
for bytestream in bytestreams:
self._serial.write(bytestream)
stream = self._serial.read(self.max_write)
if len(stream) > 0:
data_in += stream
self._serial.timeout = self.timeout
while time.time() - start < timelimit:
stream = self._serial.read(self.max_write)
if len(stream) > 0:
data_in += stream
if close_port:
self.serial_close()
self._serial._keep_open = False
return data_in
[docs] def write_configuration(self, chip, registers=None, write_read=0,
message=None):
if registers is None:
registers = list(range(Configuration.num_registers))
elif isinstance(registers, int):
registers = [registers]
else:
pass
if message is None:
message = 'configuration write'
else:
message = 'configuration write: ' + message
bytestreams = self.get_configuration_bytestreams(chip,
Packet.CONFIG_WRITE_PACKET, registers)
if write_read == 0:
self.serial_write(bytestreams)
else:
miso_bytestream = self.serial_write_read(bytestreams,
timelimit=write_read)
packets = self.parse_input(miso_bytestream)
self.store_packets(packets, miso_bytestream, message)
[docs] def read_configuration(self, chip, registers=None, timeout=1,
message=None):
if registers is None:
registers = list(range(Configuration.num_registers))
elif isinstance(registers, int):
registers = [registers]
else:
pass
if message is None:
message = 'configuration read'
else:
message = 'configuration read: ' + message
bytestreams = self.get_configuration_bytestreams(chip,
Packet.CONFIG_READ_PACKET, registers)
data = self.serial_write_read(bytestreams, timeout)
packets = self.parse_input(data)
self.store_packets(packets, data, message)
[docs] def multi_write_configuration(self, chip_reg_pairs, write_read=0,
message=None):
'''
Send multiple write configuration commands at once.
``chip_reg_pairs`` should be a list/iterable whose elements are
an valid arguments to ``Controller.write_configuration``,
excluding the ``write_read`` argument. Just like in the single
``Controller.write_configuration``, setting ``write_read > 0`` will
have the controller read data during and after it writes, for
however many seconds are specified.
Examples:
These first 2 are equivalent and write the full configurations
>>> controller.multi_write_configuration([chip1, chip2, ...])
>>> controller.multi_write_configuration([(chip1, None), chip2, ...])
These 2 write the specified registers for the specified chips
in the specified order
>>> controller.multi_write_configuration([(chip1, 1), (chip2, 2), ...])
>>> controller.multi_write_configuration([(chip1, range(10)), chip2, ...])
'''
if message is None:
message = 'multi configuration write'
else:
message = 'multi configuration write: ' + message
final_bytestream = []
for chip_reg_pair in chip_reg_pairs:
if isinstance(chip_reg_pair, Chip):
chip_reg_pair = (chip_reg_pair, None)
chip, registers = chip_reg_pair
if registers is None:
registers = list(range(Configuration.num_registers))
elif isinstance(registers, int):
registers = [registers]
else:
pass
bytestreams = self.get_configuration_bytestreams(chip,
Packet.CONFIG_WRITE_PACKET, registers)
final_bytestream += bytestreams
final_bytestream = self.format_bytestream(final_bytestream)
if write_read == 0:
self.serial_write(final_bytestream)
else:
miso_bytestream = self.serial_write_read(final_bytestream,
timelimit=write_read)
packets = self.parse_input(miso_bytestream)
self.store_packets(packets, miso_bytestream, message)
[docs] def multi_read_configuration(self, chip_reg_pairs, timeout=1,
message=None):
'''
Send multiple read configuration commands at once.
``chip_reg_pairs`` should be a list/iterable whose elements are
Chip objects (to read entire configuration) or (chip, registers)
tuples to read only the specified register(s). Registers could
be ``None`` (i.e. all), an ``int`` for that register only, or an
iterable of ``int``s.
Examples:
These first 2 are equivalent and read the full configurations
>>> controller.multi_read_configuration([chip1, chip2, ...])
>>> controller.multi_read_configuration([(chip1, None), chip2, ...])
These 2 read the specified registers for the specified chips
in the specified order
>>> controller.multi_read_configuration([(chip1, 1), (chip2, 2), ...])
>>> controller.multi_read_configuration([(chip1, range(10)), chip2, ...])
'''
if message is None:
message = 'multi configuration read'
else:
message = 'multi configuration read: ' + message
final_bytestream = []
for chip_reg_pair in chip_reg_pairs:
if isinstance(chip_reg_pair, Chip):
chip_reg_pair = (chip_reg_pair, None)
chip, registers = chip_reg_pair
if registers is None:
registers = list(range(Configuration.num_registers))
elif isinstance(registers, int):
registers = [registers]
else:
pass
bytestreams = self.get_configuration_bytestreams(chip,
Packet.CONFIG_READ_PACKET, registers)
final_bytestream += bytestreams
mosi_bytestream = self.format_bytestream(final_bytestream)
miso_bytestream = self.serial_write_read(mosi_bytestream,
timelimit=timeout)
packets = self.parse_input(miso_bytestream)
self.store_packets(packets, miso_bytestream, message)
[docs] def get_configuration_bytestreams(self, chip, packet_type, registers):
# The configuration must be sent one register at a time
configuration_packets = \
chip.get_configuration_packets(packet_type, registers);
formatted_packets = [self.format_UART(chip, p) for p in
configuration_packets]
bytestreams = self.format_bytestream(formatted_packets)
return bytestreams
[docs] def run(self, timelimit, message):
data = self.serial_read(timelimit)
packets = self.parse_input(data)
self.store_packets(packets, data, message)
[docs] def run_testpulse(self, list_of_channels):
return
[docs] def run_fifo_test(self):
return
[docs] def run_analog_monitor_test(self):
return
[docs] def verify_configuration(self, chip_id=None, io_chain=0):
'''
Read chip configuration from specified chip and return a bool that is True if the
read chip configuration matches the current configuration stored in chip instance
Also returns a dict containing the values of registers that are different
(read register, stored register)
'''
return_value = True
different_fields = {}
if chip_id is None:
for chip in self.chips:
match, chip_fields = self.verify_configuration(chip_id=chip.chip_id,
io_chain=io_chain)
if not match:
different_fields[chip.chip_id] = chip_fields
return_value = False
else:
chip = self.get_chip(chip_id, io_chain)
self.read_configuration(chip,timeout=0.1)
configuration_data = {}
for packet in self.reads[-1]:
if (packet.packet_type == Packet.CONFIG_READ_PACKET and
packet.chipid == chip_id):
configuration_data[packet.register_address] = packet.register_data
chip_configuration = Configuration()
chip_configuration.from_dict_registers(configuration_data)
if not chip_configuration == chip.config:
return_value = False
different_fields = chip_configuration.compare(chip.config)
return (return_value, different_fields)
[docs] def read_channel_pedestal(self, chip_id, channel, io_chain=0, run_time=0.1):
'''
Set channel threshold to 0 and report back on the recieved adcs from channel
Returns mean, rms, and packet collection
'''
chip = self.get_chip(chip_id, io_chain)
# Store previous state
prev_channel_mask = chip.config.channel_mask
prev_global_threshold = chip.config.global_threshold
prev_pixel_trim_thresholds = chip.config.pixel_trim_thresholds
# Set new configuration
self.disable(chip_id=chip_id)
self.enable(chip_id=chip_id, channel_list=[channel])
chip.config.global_threshold = 0
chip.config.pixel_trim_thresholds = [31]*32
chip.config.pixel_trim_thresholds[channel] = 0
self.write_configuration(chip, Configuration.channel_mask_addresses +
Configuration.pixel_trim_threshold_addresses +
[Configuration.global_threshold_address])
self.run(0.1,'clear buffer')
# Collect data
self.run(run_time,'read_channel_pedestal_c%d_ch%d' % (chip_id, channel))
self.disable(chip_id=chip_id)
adcs = self.reads[-2].extract('adc_counts', chipid=chip_id, channel=channel)
mean = 0
rms = 0
if len(adcs) > 0:
mean = float(sum(adcs)) / len(adcs)
rms = math.sqrt(float(sum([adc**2 for adc in adcs]))/len(adcs) - mean**2)
else:
print('No packets received from chip %d, channel %d' % (chip_id, channel))
# Restore previous state
chip.config.channel_mask = prev_channel_mask
chip.config.global_threshold = prev_global_threshold
chip.config.pixel_trim_thresholds = prev_pixel_trim_thresholds
self.write_configuration(chip, Configuration.channel_mask_addresses +
Configuration.pixel_trim_threshold_addresses +
[Configuration.global_threshold_address])
self.run(2,'clear buffer')
return (adcs, mean, rms)
[docs] def enable_analog_monitor(self, chip_id, channel, io_chain=0):
'''
Enable the analog monitor on a single channel on the specified chip.
Note: If monitoring a different chip, call disable_analog_monitor first to ensure
that the monitor to that chip is disconnected.
'''
chip = self.get_chip(chip_id, io_chain)
chip.config.disable_analog_monitor()
chip.config.enable_analog_monitor(channel)
self.write_configuration(chip, Configuration.csa_monitor_select_addresses)
return
[docs] def disable_analog_monitor(self, chip_id=None, channel=None, io_chain=0):
'''
Disable the analog monitor for a specified chip and channel, if none are specified
disable the analog monitor for all chips in self.chips and all channels
'''
if chip_id is None:
for chip in self.chips:
self.disable_analog_monitor(chip_id=chip.chip_id, channel=channel,
io_chain=io_chain)
elif channel is None:
for channel in range(32):
self.disable_analog_monitor(chip_id=chip_id, channel=channel,
io_chain=io_chain)
else:
chip = self.get_chip(chip_id, io_chain)
chip.config.disable_analog_monitor()
self.write_configuration(chip, Configuration.csa_monitor_select_addresses)
return
[docs] def enable_testpulse(self, chip_id, channel_list, io_chain=0, start_dac=255):
'''
Prepare chip for pulsing - enable testpulser and set a starting dac value for
specified chip/channel
'''
chip = self.get_chip(chip_id, io_chain)
chip.config.disable_testpulse()
chip.config.enable_testpulse(channel_list)
chip.config.csa_testpulse_dac_amplitude = start_dac
self.write_configuration(chip, Configuration.csa_testpulse_enable_addresses +
[Configuration.csa_testpulse_dac_amplitude_address])
return
[docs] def issue_testpulse(self, chip_id, pulse_dac, min_dac=0, io_chain=0):
'''
Reduce the testpulser dac by pulse_dac and write_read to chip for 0.1s
'''
chip = self.get_chip(chip_id, io_chain)
chip.config.csa_testpulse_dac_amplitude -= pulse_dac
if chip.config.csa_testpulse_dac_amplitude < min_dac:
raise ValueError('Minimum DAC exceeded')
self.write_configuration(chip, [Configuration.csa_testpulse_dac_amplitude_address],
write_read=0.1)
return self.reads[-1]
[docs] def disable_testpulse(self, chip_id=None, channel_list=range(32), io_chain=0):
'''
Disable testpulser for specified chip/channels. If none specified, disable for
all chips/channels
'''
if chip_id is None:
for chip in self.chips:
self.disable_testpulse(chip_id=chip.chip_id, channel_list=channel_list,
io_chain=io_chain)
else:
chip = self.get_chip(chip_id, io_chain)
chip.config.disable_testpulse(channel_list)
self.write_configuration(chip, Configuration.csa_testpulse_enable_addresses)
return
[docs] def disable(self, chip_id=None, channel_list=range(32), io_chain=0):
'''
Update channel mask to disable specified chips/channels. If none specified,
disable all chips/channels
'''
if chip_id is None:
for chip in self.chips:
self.disable(chip_id=chip.chip_id, channel_list=channel_list,
io_chain=io_chain)
else:
chip = self.get_chip(chip_id, io_chain)
chip.config.disable_channels(channel_list)
self.write_configuration(chip, Configuration.channel_mask_addresses)
[docs] def enable(self, chip_id=None, channel_list=range(32), io_chain=0):
'''
Update channel mask to enable specified chips/channels. If none specified,
enable all chips/channels
'''
if chip_id is None:
for chip in self.chips:
self.enable(chip_id=chip.chip_id, channel_list=channel_list,
io_chain=io_chain)
else:
chip = self.get_chip(chip_id, io_chain)
chip.config.enable_channels(channel_list)
self.write_configuration(chip, Configuration.channel_mask_addresses)
[docs] def store_packets(self, packets, data, message):
'''
Store the packets in ``self`` and in ``self.chips``
'''
new_packets = PacketCollection(packets, data, message)
new_packets.read_id = self.nreads
self.nreads += 1
self.reads.append(new_packets)
#self.sort_packets(new_packets)
[docs] def sort_packets(self, collection):
'''
Sort the packets in ``collection`` into each chip in
``self.all_chips`` (if ``self.use_all_chips``) or ``self.chips``
(otherwise).
'''
by_chipid = collection.by_chipid()
io_chain = 0
for chip_id in by_chipid.keys():
if chip_id in [x.chip_id for x in self.chips]:
chip = self.get_chip(chip_id, io_chain)
chip.reads.append(by_chipid[chip_id])
elif not self._test_mode:
print('Warning chip id %d not in chips.' % chip_id)
[docs] def save_output(self, filename, message):
'''Save the data read by each chip to the specified file.'''
data = {}
data['reads'] = [collection.to_dict() for collection in self.reads]
data['chips'] = [repr(chip) for chip in self.chips]
data['message'] = message
with open(filename, 'w') as outfile:
json.dump(data, outfile, indent=4,
separators=(',',':'), sort_keys=True)
[docs] def load(self, filename):
'''
Load the data in filename into the controller.
Overwrites all data inside the controller!
'''
self.__init__(self.port)
with open(filename, 'r') as infile:
data = json.load(infile)
chip_regexp = re.compile(r'Chip\((\d+), ?(\d+)\)')
for chip_description in data['chips']:
parsed_chip = chip_regexp.match(chip_description)
chip_id = int(parsed_chip.group(1))
io_chain = int(parsed_chip.group(2))
self.chips.append(Chip(chip_id, io_chain))
for read in data['reads']:
collection = PacketCollection([])
collection.from_dict(read)
self.reads.append(collection)
self.sort_packets(collection)
return data['message']
[docs]class Packet(object):
'''
A single 54-bit LArPix UART data packet.
'''
size = 54
num_bytes = 7
# These ranges are reversed from the bit addresses given in the
# LArPix datasheet because BitArray indexing is big-endian but we
# transmit data little-endian-ly. E.g.:
# >>> x = BitArray('0b00')
# >>> x[0:] = bin(2) # ('0b10')
# >>> x[0] # returns True (1)
# Another way to think of it is BitArray indexing reads the
# bitstream from 0:N but all the LArPix datasheet indexing goes from
# N:0.
packet_type_bits = slice(52, 54)
chipid_bits = slice(44, 52)
parity_bit = 0
parity_calc_bits = slice(1, 54)
channel_id_bits = slice(37, 44)
timestamp_bits = slice(13, 37)
dataword_bits = slice(3, 13)
fifo_half_bit = 2
fifo_full_bit = 1
register_address_bits = slice(36, 44)
register_data_bits = slice(28, 36)
config_unused_bits = slice(1, 28)
test_counter_bits_11_0 = slice(1, 13)
test_counter_bits_15_12 = slice(40, 44)
DATA_PACKET = bitarray('00')
TEST_PACKET = bitarray('01')
CONFIG_WRITE_PACKET = bitarray('10')
CONFIG_READ_PACKET = bitarray('11')
_bit_padding = bitarray('00')
def __init__(self, bytestream=None):
if bytestream is None:
self.bits = bitarray(Packet.size)
self.bits.setall(False)
return
elif len(bytestream) == Packet.num_bytes:
# Parse the bytestream. Remember that bytestream[0] goes at
# the 'end' of the BitArray
reversed_bytestream = bytestream[::-1]
self.bits = bitarray()
self.bits.frombytes(reversed_bytestream)
# Get rid of the padding (now at the beginning of the
# bitstream because of the reverse order)
self.bits.pop(0)
self.bits.pop(0)
else:
raise ValueError('Invalid number of bytes: %s' %
len(bytestream))
def __eq__(self, other):
return self.bits == other.bits
def __ne__(self, other):
return not (self == other)
def __str__(self):
string = '[ '
ptype = self.packet_type
if ptype == Packet.TEST_PACKET:
string += 'Test | '
string += 'Counter: %d | ' % self.test_counter
elif ptype == Packet.DATA_PACKET:
string += 'Data | '
string += 'Channel: %d | ' % self.channel_id
string += 'Timestamp: %d | ' % self.timestamp
string += 'ADC data: %d | ' % self.dataword
string += 'FIFO Half: %s | ' % bool(self.fifo_half_flag)
string += 'FIFO Full: %s | ' % bool(self.fifo_full_flag)
elif (ptype == Packet.CONFIG_READ_PACKET or ptype ==
Packet.CONFIG_WRITE_PACKET):
if ptype == Packet.CONFIG_READ_PACKET:
string += 'Config read | '
else:
string += 'Config write | '
string += 'Register: %d | ' % self.register_address
string += 'Value: % d | ' % self.register_data
first_splitter = string.find('|')
string = (string[:first_splitter] + '| Chip: %d ' % self.chipid +
string[first_splitter:])
string += ('Parity: %d (valid: %s) ]' %
(self.parity_bit_value, self.has_valid_parity()))
return string
def __repr__(self):
return 'Packet(' + str(self.bytes()) + ')'
[docs] def bytes(self):
# Here's the only other place we have to deal with the
# endianness issue by reversing the order
padded_output = self._bit_padding + self.bits
bytes_output = padded_output.tobytes()
return bytes_output[::-1]
[docs] def export(self):
'''Return a dict representation of this Packet.'''
type_map = {
self.TEST_PACKET.to01(): 'test',
self.DATA_PACKET.to01(): 'data',
self.CONFIG_WRITE_PACKET.to01(): 'config write',
self.CONFIG_READ_PACKET.to01(): 'config read'
}
d = {}
d['bits'] = self.bits.to01()
d['type'] = type_map[self.packet_type.to01()]
d['chipid'] = self.chipid
d['parity'] = self.parity_bit_value
d['valid_parity'] = self.has_valid_parity()
ptype = self.packet_type
if ptype == Packet.TEST_PACKET:
d['counter'] = self.test_counter
elif ptype == Packet.DATA_PACKET:
d['channel'] = self.channel_id
d['timestamp'] = self.timestamp
d['adc_counts'] = self.dataword
d['fifo_half'] = bool(self.fifo_half_flag)
d['fifo_full'] = bool(self.fifo_full_flag)
elif (ptype == Packet.CONFIG_READ_PACKET or ptype ==
Packet.CONFIG_WRITE_PACKET):
d['register'] = self.register_address
d['value'] = self.register_data
return d
@property
def packet_type(self):
return self.bits[Packet.packet_type_bits]
@packet_type.setter
def packet_type(self, value):
self.bits[Packet.packet_type_bits] = bah.fromuint(value,
Packet.packet_type_bits)
@property
def chipid(self):
return bah.touint(self.bits[Packet.chipid_bits])
@chipid.setter
def chipid(self, value):
self.bits[Packet.chipid_bits] = bah.fromuint(value,
Packet.chipid_bits)
@property
def parity_bit_value(self):
return int(self.bits[Packet.parity_bit])
@parity_bit_value.setter
def parity_bit_value(self, value):
self.bits[Packet.parity_bit] = bool(value)
[docs] def compute_parity(self):
return 1 - (self.bits[Packet.parity_calc_bits].count(True) % 2)
[docs] def assign_parity(self):
self.parity_bit_value = self.compute_parity()
[docs] def has_valid_parity(self):
return self.parity_bit_value == self.compute_parity()
@property
def channel_id(self):
return bah.touint(self.bits[Packet.channel_id_bits])
@channel_id.setter
def channel_id(self, value):
self.bits[Packet.channel_id_bits] = bah.fromuint(value,
Packet.channel_id_bits)
@property
def timestamp(self):
return bah.touint(self.bits[Packet.timestamp_bits])
@timestamp.setter
def timestamp(self, value):
self.bits[Packet.timestamp_bits] = bah.fromuint(value,
Packet.timestamp_bits)
@property
def dataword(self):
ostensible_value = bah.touint(self.bits[Packet.dataword_bits])
# TODO fix in LArPix v2
return ostensible_value - (ostensible_value % 2)
@dataword.setter
def dataword(self, value):
self.bits[Packet.dataword_bits] = bah.fromuint(value,
Packet.dataword_bits)
@property
def fifo_half_flag(self):
return int(self.bits[Packet.fifo_half_bit])
@fifo_half_flag.setter
def fifo_half_flag(self, value):
self.bits[Packet.fifo_half_bit] = bool(value)
@property
def fifo_full_flag(self):
return int(self.bits[Packet.fifo_full_bit])
@fifo_full_flag.setter
def fifo_full_flag(self, value):
self.bits[Packet.fifo_full_bit] = bool(value)
@property
def register_address(self):
return bah.touint(self.bits[Packet.register_address_bits])
@register_address.setter
def register_address(self, value):
self.bits[Packet.register_address_bits] = bah.fromuint(value,
Packet.register_address_bits)
@property
def register_data(self):
return bah.touint(self.bits[Packet.register_data_bits])
@register_data.setter
def register_data(self, value):
self.bits[Packet.register_data_bits] = bah.fromuint(value,
Packet.register_data_bits)
@property
def test_counter(self):
return bah.touint(self.bits[Packet.test_counter_bits_15_12] +
self.bits[Packet.test_counter_bits_11_0])
@test_counter.setter
def test_counter(self, value):
allbits = bah.fromuint(value, 16)
self.bits[Packet.test_counter_bits_15_12] = (
bah.fromuint(allbits[:4], Packet.test_counter_bits_15_12))
self.bits[Packet.test_counter_bits_11_0] = (
bah.fromuint(allbits[4:], Packet.test_counter_bits_11_0))
[docs]class PacketCollection(object):
'''
Represents a group of packets that were sent to or received from
LArPix.
Index into the PacketCollection as if it were a list:
>>> collection[0]
Packet(b'\x07\x00\x00\x00\x00\x00\x00')
>>> first_ten = collection[:10]
>>> len(first_ten)
10
>>> type(first_ten)
larpix.larpix.PacketCollection
>>> first_ten.message
'my packets | subset slice(None, 10, None)'
To view the bits representation, add 'bits' to the index:
>>> collection[0, 'bits']
'00000000 00000000 00000000 00000000 00000000 00000000 000111'
>>> bits_format_first_10 = collection[:10, 'bits']
>>> type(bits_format_first_10[0])
str
'''
def __init__(self, packets, bytestream=None, message='',
read_id=None, skipped=None):
self.packets = packets
self.bytestream = bytestream
self.skipped = skipped
self.message = message
self.read_id = read_id
self.parent = None
def __eq__(self, other):
'''
Return True if the packets, message and bytestream compare equal.
'''
return (self.packets == other.packets and
self.message == other.message and
self.bytestream == other.bytestream)
def __repr__(self):
return '<%s with %d packets, read_id %d, "%s">' % (self.__class__.__name__,
len(self.packets), self.read_id, self.message)
def __str__(self):
if len(self.packets) < 20:
return '\n'.join(str(packet) for packet in self.packets)
else:
beginning = '\n'.join(str(packet) for packet in self.packets[:10])
middle = '\n'.join([' .', ' . omitted %d packets' %
(len(self.packets)-20), ' .'])
end = '\n'.join(str(packet) for packet in self.packets[-10:])
return '\n'.join([beginning, middle, end])
def __len__(self):
return len(self.packets)
def __getitem__(self, key):
'''
Get the specified item(s).
If key is an int, return the packet object at that index in
self.packets.
If key is a slice, return a PacketCollection with the specified
packets, and with a message inherited from self.message.
If key is (slice or int, 'str'), use the behavior as if setting
key = key[0].
If key is (int, 'bits'), return a string representation of the
bits of the specified packet, as determined by
self._bits_getitem.
If key is (slice, 'bits'), return a list of string
representations of the packets specified by the slice.
'''
if isinstance(key, slice):
items = PacketCollection([p for p in self.packets[key]])
items.message = '%s | subset %s' % (self.message, key)
items.parent = self
items.read_id = self.read_id
return items
elif isinstance(key, tuple):
if key[1] == 'bits':
return self._bits_getitem(key[0])
elif key[1] == 'str':
return self[key[0]]
else:
return self.packets[key]
def _bits_getitem(self, key):
'''
Replace each packet with a string of the packet bits grouped 8
bits at a time.
'''
if isinstance(key, slice):
return [' '.join(p.bits.to01()[i:i+8] for i in
range(0, Packet.size, 8)) for p in self.packets[key]]
else:
return ' '.join(self.packets[key].bits.to01()[i:i+8] for i in
range(0, Packet.size, 8))
[docs] def to_dict(self):
'''
Export the information in this PacketCollection to a dict.
'''
d = {}
d['packets'] = [packet.export() for packet in self.packets]
d['id'] = id(self)
d['parent'] = 'None' if self.parent is None else id(self.parent)
d['message'] = str(self.message)
d['read_id'] = 'None' if self.read_id is None else self.read_id
d['bytestream'] = ('None' if self.bytestream is None else
self.bytestream.decode('raw_unicode_escape'))
return d
[docs] def from_dict(self, d):
'''
Load the information in the dict into this PacketCollection.
'''
self.message = d['message']
self.read_id = d['read_id']
self.bytestream = d['bytestream'].encode('raw_unicode_escape')
self.parent = None
self.packets = []
for p in d['packets']:
bits = p['bits']
packet = Packet()
packet.bits = bitarray(bits)
self.packets.append(packet)
[docs] def origin(self):
'''
Return the original PacketCollection that this PacketCollection
derives from.
'''
child = self
parent = self.parent
max_generations = 100 # to prevent infinite loops
i = 0
while parent is not None and i < max_generations:
# Move up the family tree one generation
child = parent
parent = parent.parent
i += 1
if parent is None:
return child
else:
raise ValueError('Reached limit on generations: %d' %
max_generations)
[docs] def with_chipid(self, chipid):
'''
Return packets with the specified chip ID.
'''
return [packet for packet in self.packets if packet.chipid == chipid]
[docs] def by_chipid(self):
'''
Return a dict of { chipid: PacketCollection }.
'''
chip_groups = {}
for packet in self.packets:
# append packet to list if list exists, else append to empty
# list as a default
chip_groups.setdefault(packet.chipid, []).append(packet)
to_return = {}
for chipid in chip_groups:
new_collection = PacketCollection(chip_groups[chipid])
new_collection.message = self.message + ' | chip %s' % chipid
new_collection.read_id = self.read_id
new_collection.parent = self
to_return[chipid] = new_collection
return to_return
[docs]class SerialPort(object):
'''Wrapper for various serial port interfaces across platforms'''
# Guesses for default port name by platform
_default_port_map = {
'Default':['/dev/ttyUSB2','/dev/ttyUSB1'], # Same as Linux
'Linux':['/dev/ttyAMA0', '/dev/ttyUSB2','/dev/ttyUSB1',
'/dev/ttyUSB0'], # Linux
'Darwin':['scan-ftdi',], # OS X
}
_logger = None
def __init__(self, port=None, baudrate=9600, timeout=None):
self.port = port
self.resolved_port = ''
self.port_type = ''
self.baudrate = baudrate
self.timeout = timeout
self._keep_open = False
self.serial_com = None
self._initialize_serial_com()
self.logger = None
if not (self._logger is None):
self.logger = self._logger
return
[docs] @classmethod
def guess_port(cls):
'''Guess at correct port name based on platform'''
platform_default = 'Default'
platform_name = platform.system()
if platform_name not in cls._default_port_map:
platform_name = platform_default
default_devs = cls._default_port_map[platform_name]
osx_cmd = 'system_profiler SPUSBDataType | grep -C 7 FTDI | grep Serial'
for default_dev in default_devs:
if default_dev.startswith('/dev'): # pyserial
try:
if os.stat(default_dev):
return default_dev
except OSError:
continue
elif default_dev == 'scan-ftdi':
if platform_name == 'Darwin': # scan for pylibftdi on OS X
# Scan for FTDI devices
result = os.popen(osx_cmd).read()
if len(result) > 0:
idx = result.find('Serial Number:')
dev_name = result[idx+14:idx+24].strip()
print('Autoscan found FTDI device: "%s"' % dev_name)
return dev_name
elif not default_dev.startswith('/dev'): # assume pylibftdi
return default_dev
raise OSError('Cannot find serial device for platform: %s' %
platform_name)
def _ready_port(self):
'''Function handle. Will be reset to appropriate method'''
raise NotImplementedError('Serial port type has not been defined.')
def _ready_port_pyserial(self):
'''Ready a pyserial port'''
if self.serial_com is None:
# Create serial port
import serial
self.serial_com = serial.Serial(self.resolved_port,
baudrate=self.baudrate,
timeout=self.timeout)
if not self.serial_com.is_open:
# Open, if necessary
self.serial_com.open()
return
def _ready_port_pylibftdi(self):
'''Ready a pylibftdi port'''
if self.serial_com is None:
# Construct serial port
import pylibftdi
self.serial_com = pylibftdi.Device(self.resolved_port)
if self.serial_com.closed:
# Open port
self.serial_com.open()
# Confirm baudrate (Required for OS X)
self._confirm_baudrate()
return
def _ready_port_test(self):
if self.serial_com is None:
# Get FakeSerialPort from testing module
import test.test_larpix as test_lib
self.serial_com = test_lib.FakeSerialPort()
return
def _confirm_baudrate(self):
'''Check and set the baud rate'''
if self.serial_com.baudrate != self.baudrate:
# Reset baudrate
self.serial_com.baudrate = self.baudrate
return
def _initialize_serial_com(self):
'''Initialize the low-level serial com connection'''
self.resolved_port = self._resolve_port_name()
self.port_type = self._resolve_port_type()
if self.port_type is 'pyserial':
self._ready_port = self._ready_port_pyserial
self._keep_open = False
elif self.port_type is 'pylibftdi':
self._ready_port = self._ready_port_pylibftdi
self._keep_open = True
elif self.port_type is 'test':
self._ready_port = self._ready_port_test
self._keep_open = True
else:
raise ValueError('Port type must be either pyserial, pylibftdi, or test')
return
def _resolve_port_name(self):
'''Resolve the serial port name, based on user request'''
if self.port is None:
# Must set port
raise ValueError('You must choose a serial port for operation')
# FIXME: incorporate auto-scan feature
if self.port is 'auto':
# Try to guess the correct port
return self.guess_port()
# FIXME: incorporate list option?
#elif isinstance(self.port, list):
# # Try to determine best choice from list
# for port_name in list:
# if self._port_exists(port_name):
# return port_name
return self.port
def _resolve_port_type(self):
'''Resolve the type of serial port, based on the name'''
if self.resolved_port.startswith('/dev'):
# Looks like a tty device. Use pyserial.
return 'pyserial'
elif self.resolved_port is 'test':
# Testing port. Don't use an external library
return 'test'
elif not self.resolved_port.startswith('/dev'):
# Looks like a libftdi raw device. Use pylibftdi.
return 'pylibftdi'
raise ValueError('Unknown port: %s' % self.port)
[docs] def open(self):
'''Open the port'''
self._ready_port()
return
[docs] def close(self):
'''Close the port'''
if self.serial_com is None: return
self.serial_com.close()
[docs] def write(self, data):
'''Write data to serial port'''
self._ready_port()
write_time = time.time()
self.serial_com.write(data)
if self.logger:
self.logger.record({'data_type':'write','data':data,'time':write_time})
if not self._keep_open:
self.close()
return
[docs] def read(self, nbytes):
'''Read data from serial port'''
self._ready_port()
read_time = time.time()
data = self.serial_com.read(nbytes)
if self.logger:
self.logger.record({'data_type':'read','data':data,'time':read_time})
if not self._keep_open:
self.close()
return data
[docs]def enable_logger(filename=None):
'''Enable serial data logger'''
if SerialPort._logger is None:
from larpix.datalogger import DataLogger
SerialPort._logger = DataLogger(filename)
if not SerialPort._logger.is_enabled():
SerialPort._logger.enable()
return
[docs]def disable_logger():
'''Disable serial data logger'''
if SerialPort._logger is not None:
SerialPort._logger.disable()
return
[docs]def flush_logger():
'''Flush serial data logger data to output file'''
if SerialPort._logger is not None:
SerialPort._logger.flush()
return