Source code for larpix.packet.packet_v1

from bitarray import bitarray
import struct

from .. import bitarrayhelper as bah
from ..key import Key

[docs]class Packet_v1(object): ''' A single 54-bit LArPix UART data packet. LArPix Packet objects have attributes for inspecting and modifying the contents of the packet. Internally, packets are represented as an array of bits, and the different attributes use Python "properties" to seamlessly convert between the bits representation and a more intuitive integer representation. The bits representation can be inspected with the ``bits`` attribute. Packet objects do not restrict you from adjusting an attribute for an inappropriate packet type. For example, you can create a data packet and then set ``packet.register_address = 5``. This will adjust the packet bits corresponding to a configuration packet's "register\_address" region, which is probably not what you want for your data packet. Packets have a parity bit which enforces odd parity, i.e. the sum of all the individual bits in a packet must be an odd number. The parity bit can be accessed as above using the ``parity_bit_value`` attribute. The correct parity bit can be computed using ``compute_parity()``, and the validity of a packet's parity can be checked using ``has_valid_parity()``. When constructing a new packet, the correct parity bit can be assigned using ``assign_parity()``. Individual packets can be printed to show a human-readable interpretation of the packet contents. The printed version adjusts its output based on the packet type, so a data packet will show the data word, timestamp, etc., while a configuration packet will show the register address and register data. ''' asic_version = 1 size = 54 num_bytes = 7 # These ranges are reversed from the bit addresses given in the # LArPix datasheet because BitArray indexing is big-endian but we # transmit data little-endian-ly. E.g.: # >>> x = BitArray('0b00') # >>> x[0:] = bin(2) # ('0b10') # >>> x[0] # returns True (1) # Another way to think of it is BitArray indexing reads the # bitstream from 0:N but all the LArPix datasheet indexing goes from # N:0. packet_type_bits = slice(52, 54) chipid_bits = slice(44, 52) parity_bit = 0 parity_calc_bits = slice(1, 54) channel_id_bits = slice(37, 44) timestamp_bits = slice(13, 37) dataword_bits = slice(3, 13) fifo_half_bit = 2 fifo_full_bit = 1 register_address_bits = slice(36, 44) register_data_bits = slice(28, 36) config_unused_bits = slice(1, 28) test_counter_bits_11_0 = slice(1, 13) test_counter_bits_15_12 = slice(40, 44) DATA_PACKET = bitarray('00') TEST_PACKET = bitarray('01') CONFIG_WRITE_PACKET = bitarray('10') CONFIG_READ_PACKET = bitarray('11') _bit_padding = bitarray('00') def __init__(self, bytestream=None): if bytestream is None: self.bits = bitarray(self.size) self.bits.setall(False) return elif len(bytestream) == self.num_bytes: # Parse the bytestream. Remember that bytestream[0] goes at # the 'end' of the BitArray reversed_bytestream = bytestream[::-1] self.bits = bitarray() self.bits.frombytes(reversed_bytestream) # Get rid of the padding (now at the beginning of the # bitstream because of the reverse order) self.bits = self.bits[2:] else: raise ValueError('Invalid number of bytes: %s' % len(bytestream)) def __eq__(self, other): return self.bits == other.bits def __ne__(self, other): return not (self == other) def __str__(self): string = '[ ' string += 'Chip key: {} | '.format(self.chip_key) if hasattr(self, 'direction'): string += 'Direction: {}'.format(self.direction) string += ' | ' ptype = self.packet_type if ptype == Packet_v1.TEST_PACKET: string += 'Test | ' string += 'Counter: %d | ' % self.test_counter elif ptype == Packet_v1.DATA_PACKET: string += 'Data | ' string += 'Channel: %d | ' % self.channel_id string += 'Timestamp: %d | ' % self.timestamp string += 'ADC data: %d | ' % self.dataword string += 'FIFO Half: %s | ' % bool(self.fifo_half_flag) string += 'FIFO Full: %s | ' % bool(self.fifo_full_flag) elif (ptype == Packet_v1.CONFIG_READ_PACKET or ptype == Packet_v1.CONFIG_WRITE_PACKET): if ptype == Packet_v1.CONFIG_READ_PACKET: string += 'Config read | ' else: string += 'Config write | ' string += 'Register: %d | ' % self.register_address string += 'Value: % d | ' % self.register_data first_splitter = string.find('|') string = (string[:first_splitter] + '| Chip: %d ' % self.chipid + string[first_splitter:]) string += ('Parity: %d (valid: %s) ]' % (self.parity_bit_value, self.has_valid_parity())) return string def __repr__(self): return 'Packet_v1(' + str(self.bytes()) + ')'
[docs] def bytes(self): ''' Construct the bytes that make up the packet. Byte 0 is the first byte that would be sent out and contains the first 8 bits of the packet (i.e. packet type and part of the chip ID). *Note*: The internal bits representation of the packet has a different endian-ness compared to the output of this method. ''' # Here's the only other place we have to deal with the # endianness issue by reversing the order padded_output = self._bit_padding + self.bits bytes_output = padded_output.tobytes() return bytes_output[::-1]
[docs] def export(self): '''Return a dict representation of this Packet.''' type_map = { self.TEST_PACKET.to01(): 'test', self.DATA_PACKET.to01(): 'data', self.CONFIG_WRITE_PACKET.to01(): 'config write', self.CONFIG_READ_PACKET.to01(): 'config read' } d = {} d['asic_version'] = self.asic_version d['chip_key'] = str(self.chip_key) if self.chip_key else None d['bits'] = self.bits.to01() d['type_str'] = type_map[self.packet_type.to01()] d['type'] = bah.touint(self.packet_type) d['chipid'] = self.chipid d['parity'] = self.parity_bit_value d['valid_parity'] = self.has_valid_parity() ptype = self.packet_type if ptype == Packet_v1.TEST_PACKET: d['counter'] = self.test_counter elif ptype == Packet_v1.DATA_PACKET: d['channel'] = self.channel_id d['timestamp'] = self.timestamp d['adc_counts'] = self.dataword d['fifo_half'] = bool(self.fifo_half_flag) d['fifo_full'] = bool(self.fifo_full_flag) elif (ptype == Packet_v1.CONFIG_READ_PACKET or ptype == Packet_v1.CONFIG_WRITE_PACKET): d['register'] = self.register_address d['value'] = self.register_data return d
[docs] def from_dict(self, d): ''' Inverse of export - modify packet based on dict ''' if not d['asic_version'] == self.asic_version: raise ValueError('invalid asic version {}'.format(d['asic_version'])) if 'type' in d and d['type'] not in [bah.touint(packet_type) for packet_type in \ (Packet_v1.DATA_PACKET, Packet_v1.TEST_PACKET, Packet_v1.CONFIG_WRITE_PACKET, Packet_v1.CONFIG_READ_PACKET)]: raise ValueError('invalid packet type for Packet_v2') for key, value in d.items(): if key in ('type_str', 'valid_parity'): continue elif key == 'bits': self.bits = bitarray(value) elif key == 'type': self.packet_type = value elif key == 'register': self.register_address = value elif key == 'value': self.register_data = value elif key == 'adc_counts': self.dataword = value elif key == 'parity': self.parity_bit_value = value elif key == 'counter': self.test_counter = value elif key == 'channel': self.channel_id = value elif key == 'fifo_half': self.fifo_half_flag = value elif key == 'fifo_full': self.fifo_full_flag = value else: setattr(self, key, value)
@property def chip_key(self): if hasattr(self, '_chip_key'): return self._chip_key if self.io_group is None or self.io_channel is None: return None self._chip_key = Key(self.io_group, self.io_channel, self.chipid) return self._chip_key @chip_key.setter def chip_key(self, value): # remove cached key if hasattr(self, '_chip_key'): del self._chip_key if value is None: self.io_channel = None self.io_group = None return if isinstance(value, Key): self.io_group = value.io_group self.io_channel = value.io_channel self.chipid = value.chip_id return # try again by casting as a Key self.chip_key = Key(value) @property def io_group(self): if hasattr(self, '_io_channel'): return self._io_group return None @io_group.setter def io_group(self, value): if hasattr(self, '_chip_key'): # remove cached key del self._chip_key if value is None: if hasattr(self, '_io_group'): del self._io_group return # no value validation! self._io_group = value @property def io_channel(self): if hasattr(self, '_io_channel'): return self._io_channel return None @io_channel.setter def io_channel(self, value): if hasattr(self, '_chip_key'): # remove cached key del self._chip_key if value is None: if hasattr(self, '_io_channel'): del self._io_channel return # no value validation! self._io_channel = value @property def packet_type(self): return self.bits[self.packet_type_bits] @packet_type.setter def packet_type(self, value): self.bits[self.packet_type_bits] = bah.fromuint(value, self.packet_type_bits) @property def chipid(self): return bah.touint(self.bits[self.chipid_bits]) @chipid.setter def chipid(self, value): if hasattr(self, '_chip_key'): # remove cached key del self._chip_key self.bits[self.chipid_bits] = bah.fromuint(value, self.chipid_bits) @property def chip_id(self): # additional handle to match Packet_v2 name return self.chipid @chip_id.setter def chip_id(self, value): self.chipid = value @property def parity_bit_value(self): return int(self.bits[self.parity_bit]) @parity_bit_value.setter def parity_bit_value(self, value): self.bits[self.parity_bit] = bool(value) def compute_parity(self): return 1 - (self.bits[self.parity_calc_bits].count(True) % 2) def assign_parity(self): self.parity_bit_value = self.compute_parity() def has_valid_parity(self): return self.parity_bit_value == self.compute_parity() @property def channel_id(self): return bah.touint(self.bits[self.channel_id_bits]) @channel_id.setter def channel_id(self, value): self.bits[self.channel_id_bits] = bah.fromuint(value, self.channel_id_bits) @property def timestamp(self): return bah.touint(self.bits[self.timestamp_bits]) @timestamp.setter def timestamp(self, value): self.bits[self.timestamp_bits] = bah.fromuint(value, self.timestamp_bits) @property def dataword(self): ostensible_value = bah.touint(self.bits[self.dataword_bits]) # TODO fix in LArPix v2 return ostensible_value - (ostensible_value % 2) @dataword.setter def dataword(self, value): self.bits[self.dataword_bits] = bah.fromuint(value, self.dataword_bits) @property def fifo_half_flag(self): return int(self.bits[self.fifo_half_bit]) @fifo_half_flag.setter def fifo_half_flag(self, value): self.bits[self.fifo_half_bit] = bool(value) @property def fifo_full_flag(self): return int(self.bits[self.fifo_full_bit]) @fifo_full_flag.setter def fifo_full_flag(self, value): self.bits[self.fifo_full_bit] = bool(value) @property def register_address(self): return bah.touint(self.bits[self.register_address_bits]) @register_address.setter def register_address(self, value): self.bits[self.register_address_bits] = bah.fromuint(value, self.register_address_bits) @property def register_data(self): return bah.touint(self.bits[self.register_data_bits]) @register_data.setter def register_data(self, value): self.bits[self.register_data_bits] = bah.fromuint(value, self.register_data_bits) @property def test_counter(self): return bah.touint(self.bits[self.test_counter_bits_15_12] + self.bits[self.test_counter_bits_11_0]) @test_counter.setter def test_counter(self, value): allbits = bah.fromuint(value, 16) self.bits[self.test_counter_bits_15_12] = ( bah.fromuint(allbits[:4], self.test_counter_bits_15_12)) self.bits[self.test_counter_bits_11_0] = ( bah.fromuint(allbits[4:], self.test_counter_bits_11_0))