Source code for larpix.io.pacman_io

import itertools
import zmq
import bidict
import time
from collections import defaultdict
import multiprocessing
import sys
if sys.version_info[0] >= 3:
    from queue import Empty
else:
    from Queue import Empty
import os

from larpix.io import IO
from larpix.configs import load
import larpix.format.pacman_msg_format as pacman_msg_format
import larpix.format.rawhdf5format as rawhdf5format
from larpix import Packet_v2

[docs]class PACMAN_IO(IO): ''' The PACMAN_IO object interfaces with a network of PACMAN boards each running a pacman-cmdserver and pacman-dataserver. This object handles the ZMQ messaging protocol to send and receive formatted messages to/from the PACMAN boards. If you want more info on how messages are formatted, see ``larpix.format.pacman_msg_format``. The PACMAN_IO object has five flags for optimizing communications which you may or may not want to enable: - ``group_packets_by_io_group`` - ``interleave_packets_by_io_channel`` - ``double_send_packets`` - ``enable_raw_file_writing`` - ``disable_packet_parsing`` To enable each option set the flag to ``True``; to disable, set to ``False``. - The ``group_packets_by_io_group`` option is enabled by default and assembles the packets sent in each call to ``send`` into as few messages as possible destined for a single io group. E.g. sending three packets (2 for ``io_group=1``, 1 for ``io_group=2``) will combine the packets for ``io_group=1`` into a single message to transfer over the network. This reduces overhead associated with network latency and allows large data transfers to happen much faster. - The ``interleave_packets_by_io_channel`` option is enabled by default and interleaves packets within a given message to each io_channel on a given io_group. E.g. 3 packets destined for ``io_channel=1``, ``io_channel=1``, and ``io_channel=2`` will be reordered to ``io_channel=1``, ``io_channel=2``, and ``io_channel=1``. The order of the packets is preserved for each io_channel. This increases the data throughput by about a factor of N, where N is the number of io channels in the message. - The ``double_send_packets`` option is disabled by default and duplicates each packet sent to the PACMAN by a call to ``send()``. This is potentially useful for working around the 512 bug when you need to insure that a packet reaches a chip, but you don't care about introducing extra packets into the system (i.e. when configuring chips). - The ``enable_raw_file_writing`` option will directly dump data to a larpix raw hdf5 formatted file. This is used as a more performant means of logging data (see ``larpix.format.rawhdf5format``). The data file name can be accessed or changed via the ``raw_filename`` attribute, or can be set when creating the ``PACMAN_IO`` object with the ``raw_directory`` and ``raw_filename`` keyword args. - The ``disable_packet_parsing`` option will skip converting PACMAN messages into ``larpix.packet`` types. Thus if ``disable_packet_parsing=True``, every call to ``empty_queue`` will return ``[], b''``. Typically used in conjunction with ``enable_raw_file_writing``, this allows the PACMAN_IO class to read data much faster. ''' default_filepath = 'io/pacman.json' default_raw_filename_fmt = 'raw_%Y_%m_%d_%H_%M_%S_%Z.h5' max_msg_length = 2**16-1 cmdserver_port = '5555' dataserver_port = '5556' _valid_config_classes = ['PACMAN_IO'] group_packets_by_io_group = True interleave_packets_by_io_channel = True double_send_packets = False enable_raw_file_writing = False disable_packet_parsing = False _base_ctrl_reg = 0x10 _clk_ctrl_reg = 0x1010 _sw_reset_cycles_reg = 0x1014 _channel_offset = 0x2000 _channel_size = 0x1000 _uart_clock_ratio_offset = 0x10 _vddd_dac_reg = 0x24001 _vdda_dac_reg = 0x24011 _vddd_adc_reg = 0x24032 _vdda_adc_reg = 0x24042 _iddd_adc_reg = 0x24031 _idda_adc_reg = 0x24041 _vplus_adc_reg = 0x24022 _iplus_adc_reg = 0x24021 _adc2mv = lambda _,x: ((x >> 16) >> 3) * 4 _adc2ma = lambda _,x: ((x >> 16) - (x >> 31) * 65535) * 500 * 0.01 def __init__(self, config_filepath=None, hwm=20000, relaxed=True, timeout=-1, raw_directory='./', raw_filename=None): super(PACMAN_IO, self).__init__() self.load(config_filepath) self.context = zmq.Context() self.senders = bidict.bidict() self.receivers = bidict.bidict() for address in self._io_group_table.inv: self.senders[address] = self.context.socket(zmq.REQ) self.receivers[address] = self.context.socket(zmq.SUB) self.hwm = hwm for receiver in self.receivers.values(): receiver.set_hwm(self.hwm) receiver.setsockopt(zmq.CONNECT_TIMEOUT,max(timeout,0)) receiver.setsockopt(zmq.LINGER,0) receiver.setsockopt(zmq.RCVTIMEO,timeout) for sender in self.senders.values(): if relaxed: sender.setsockopt(zmq.REQ_RELAXED,True) sender.setsockopt(zmq.LINGER,0) sender.setsockopt(zmq.CONNECT_TIMEOUT,max(timeout,0)) sender.setsockopt(zmq.RCVTIMEO,timeout) sender.setsockopt(zmq.SNDTIMEO,timeout) for address in self._io_group_table.inv: send_address = 'tcp://' + address + ':' + self.cmdserver_port receive_address = 'tcp://' + address + ':' + self.dataserver_port self.senders[address].connect(send_address) self.receivers[address].connect(receive_address) self._sender_replies = defaultdict(list) self.poller = zmq.Poller() for receiver in self.receivers.values(): self.poller.register(receiver, zmq.POLLIN) self._raw_file_queue = multiprocessing.Queue() self.raw_filename = os.path.join( raw_directory, raw_filename if raw_filename is not None \ else time.strftime(self.default_raw_filename_fmt) ) self._launch_raw_file_worker()
[docs] def send(self, packets): ''' Sends a request message to PACMAN boards to send designated packets. ''' msg_packets = list() # group packets into messages destined for a single io group (otherwise 1pkt = 1msg) if self.group_packets_by_io_group: grouped_packets = self._group_by_attr(packets, 'io_group') for io_group, packets in grouped_packets.items(): msg_packets.append(packets) else: for packet in packets: msg_packets.append([packet]) # interleave across io group channels if self.interleave_packets_by_io_channel and self.group_packets_by_io_group: interleaved_msg_packets = list() for packets in msg_packets: interleaved_msg_packets.append(self._interleave_by_attr(packets, 'io_channel')) msg_packets = interleaved_msg_packets # double up sent packets to help avoid 512 bug if self.double_send_packets: doubled_msg_packets = list() for packets in msg_packets: for _ in range(2): doubled_msg_packets.append(packets) msg_packets = doubled_msg_packets # convert packets to messages resp_addresses = list() for packets in msg_packets: io_group = packets[0].io_group for i in range(0, len(packets), self.max_msg_length): #for packet in packets: print(packet) msg_len = min(len(packets)-i, self.max_msg_length) msg = pacman_msg_format.format(packets[i:i+msg_len], msg_type='REQ') address = self._io_group_table[io_group] self.senders[address].send(msg) self._sender_replies[address].append(self.senders[address].recv())
[docs] def start_listening(self): ''' Start keeping msgs from data server ''' if self.is_listening: raise RuntimeError('Already listening') super(PACMAN_IO, self).start_listening() for receiver in self.receivers.values(): receiver.setsockopt(zmq.SUBSCRIBE, b'')
[docs] def stop_listening(self): ''' Stop keeping msgs from data server ''' if not self.is_listening: raise RuntimeError('Already not listening') super(PACMAN_IO, self).stop_listening() for receiver in self.receivers.values(): receiver.setsockopt(zmq.UNSUBSCRIBE, b'')
@staticmethod def _group_by_attr(packets, attr): ''' Groups packets by the specified attribute returns a dict of attr_value: [<packets w/ attr=attr_value>] ''' groupings = defaultdict(list) for packet in packets: groupings[getattr(packet,attr)].append(packet) return groupings @staticmethod def _interleave_by_attr(packets, attr): ''' Interleaves packets by the specified attribute, e.g. by io channel:: in_packets = [<ch 1>, <ch 1>, ..., <ch 2>, <ch 2>, ..] _interleave_by_attr(in_packets, 'io_channel') # returns [<ch 1>, <ch 2>, <ch 1>, <ch 2>, ... , <ch 1>, <ch 1>] ''' groupings = PACMAN_IO._group_by_attr(packets, attr) zipped_packets = itertools.zip_longest(*groupings.values(), fillvalue=None) interleaved = list() for row in zipped_packets: for packet in row: if packet is not None: interleaved.append(packet) return interleaved
[docs] def empty_queue(self): ''' Fetch and parse waiting packets on pacman data socket returns tuple of list of packets, full bytestream of all messages ''' packets = [] address_list = list() bytestream_list = list() bytestream = b'' n_recv = 0 while self.poller.poll(0) and n_recv < self.hwm: events = dict(self.poller.poll(0)) for socket, n_events in events.items(): for _ in range(n_events): message = socket.recv() n_recv += 1 bytestream_list += [message] address_list += [self.receivers.inv[socket]] if not self.disable_packet_parsing: for message, address in zip(bytestream_list, address_list): packets += pacman_msg_format.parse(message, io_group=self._io_group_table.inv[address]) bytestream = b''.join(bytestream_list) if self.enable_raw_file_writing: self._raw_file_queue.put((bytestream_list, [self._io_group_table.inv[address] for address in address_list])) if not self._raw_file_worker.is_alive(): self._launch_raw_file_worker() return packets,bytestream
[docs] def cleanup(self): ''' Close the ZMQ objects to prevent a memory leak. This method is only required if you plan on instantiating a new ``PACMAN_IO`` object. ''' for address in self.senders.keys(): self.senders[address].close(linger=0) self.receivers[address].close(linger=0) self.context.term()
@staticmethod def _to_raw_file(queue_, filename, timeout=1, max_msgs=100000): start_time = time.time() while (time.time() < start_time + timeout or not queue_.empty()): # wait for data try: msgs, io_groups = queue_.get(timeout=timeout) except Empty: continue # buffer data while len(msgs) < max_msgs: try: new_msgs, new_io_groups = queue_.get(False) msgs.extend(new_msgs) io_groups.extend(new_io_groups) except Empty: break # write to file if len(msgs): rawhdf5format.to_rawfile(filename, msgs=msgs, msg_headers={'io_groups': io_groups}, io_version=pacman_msg_format.latest_version) start_time = time.time() def _launch_raw_file_worker(self): self._raw_file_worker = multiprocessing.Process(target=self._to_raw_file, args=(self._raw_file_queue, self.raw_filename)) self._raw_file_worker.start()
[docs] def join(self): ''' Wait for raw file worker to finish ''' self._raw_file_worker.join()
@property def raw_filename(self): return self._raw_filename @raw_filename.setter def raw_filename(self,value): if hasattr(self,'_raw_filename') \ and value != self._raw_filename \ and self._raw_file_worker.is_alive(): self.join() self._raw_filename = value
[docs] def set_reg(self, reg, val, io_group=None): ''' Set a 32-bit register in the pacman PL ''' if io_group is None: return dict([(io_group,self.set_reg(reg, val, io_group=io_group)) for io_group in self._io_group_table]) msg = pacman_msg_format.format_msg('REQ',[('WRITE',reg,val)]) addr = self._io_group_table[io_group] self.senders[addr].send(msg) self._sender_replies[addr].append(self.senders[addr].recv())
[docs] def get_reg(self, reg, io_group=None): ''' Read a 32-bit register from the pacman PL If no ``io_group`` is specified, returns a ``dict`` of ``io_group, reg_value`` else returns reg_value ''' if io_group is None: return dict([(io_group, self.get_reg(reg, io_group=io_group)) for io_group in self._io_group_table]) msg = pacman_msg_format.format_msg('REQ',[('READ',reg,0)]) addr = self._io_group_table[io_group] self.senders[addr].send(msg) self._sender_replies[addr].append(self.senders[addr].recv()) msg_data = pacman_msg_format.parse_msg(self._sender_replies[addr][-1]) if msg_data[1][0][0] == 'READ': return msg_data[1][0][-1] raise RuntimeError('Error received from server')
[docs] def ping(self, io_group=None): ''' Send a ping message If no ``io_group`` is specified, returns a ``dict`` of ``io_group, response`` else returns response ''' if io_group is None: return dict([(io_group, self.ping(io_group=io_group)) for io_group in self._io_group_table]) msg = pacman_msg_format.format_msg('REQ',[('PING',)]) addr = self._io_group_table[io_group] try: self.senders[addr].send(msg) self._sender_replies[addr].append(self.senders[addr].recv()) msg_data = pacman_msg_format.parse_msg(self._sender_replies[addr][-1]) if msg_data[1][0][0] == 'PONG': return True except zmq.ZMQError as e: print('IO error on {}: {}'.format(io_group,e)) return False
[docs] def get_vddd(self, io_group=None): ''' Gets PACMAN VDDD voltage Returns VDDD and IDDD values from the built-in ADC as a tuple of mV and mA respectively ''' if io_group is None: return dict([(io_group, self.get_vddd(io_group=io_group)) for io_group in self._io_group_table]) mv = self._adc2mv(self.get_reg(self._vddd_adc_reg, io_group=io_group)) ma = self._adc2ma(self.get_reg(self._iddd_adc_reg, io_group=io_group)) return mv, ma
[docs] def set_vddd(self, vddd_dac=0xD5A3, io_group=None, settling_time=0.1): ''' Sets PACMAN VDDD voltage If no ``vddd_dac`` value is specified, sets VDDD to default of ~1.8V Returns the resulting VDDD and IDDD values from the built-in ADC as a tuple of mV and mA respectively ''' if io_group is None: return dict([(io_group, self.set_vddd(vddd_dac, io_group=io_group)) for io_group in self._io_group_table]) self.set_reg(self._vddd_dac_reg, vddd_dac, io_group=io_group) if settling_time: time.sleep(settling_time) return self.get_vddd(io_group=io_group)
[docs] def get_vdda(self, io_group=None): ''' Gets PACMAN VDDA voltage Returns VDDA and IDDA values from the built-in ADC as a tuple of mV and mA respectively ''' if io_group is None: return dict([(io_group, self.get_vdda(io_group=io_group)) for io_group in self._io_group_table]) mv = self._adc2mv(self.get_reg(self._vdda_adc_reg, io_group=io_group)) ma = self._adc2ma(self.get_reg(self._idda_adc_reg, io_group=io_group)) return mv, ma
[docs] def set_vdda(self, vdda_dac=0xD5A3, io_group=None, settling_time=0.1): ''' Sets PACMAN VDDA voltage If no ``vdda_dac`` value is specified, sets VDDA to default of ~1.8V Returns the resulting VDDA and IDDA values from the built-in ADC as a tuple of mV and mA respectively ''' if io_group is None: return dict([(io_group, self.set_vdda(vdda_dac, io_group=io_group)) for io_group in self._io_group_table]) self.set_reg(self._vdda_dac_reg, vdda_dac, io_group=io_group) if settling_time: time.sleep(settling_time) return self.get_vdda(io_group=io_group)
[docs] def get_vplus(self, io_group=None): ''' Gets PACMAN Vplus voltage Returns Vplus and Iplus values from the built-in ADC as a tuple of mV and mA respectively ''' if io_group is None: return dict([(io_group, self.get_vplus(io_group=io_group)) for io_group in self._io_group_table]) mv = self._adc2mv(self.get_reg(self._vplus_adc_reg, io_group=io_group)) ma = self._adc2ma(self.get_reg(self._iplus_adc_reg, io_group=io_group)) return mv, ma
[docs] def enable_tile(self, tile_indices=None, io_group=None): ''' Enables the specified pixel tile(s) (first tile is index=0, second tile is index=1, ...). Returns the value of the new tile enable mask ''' if io_group is None: return dict([(io_group, self.enable_tile(tile_indices=tile_indices, io_group=io_group)) for io_group in self._io_group_table]) if tile_indices is None: tile_indices = list(range(8)) elif isinstance(tile_indices,int): tile_indices = [tile_indices] val = self.get_reg(self._base_ctrl_reg, io_group=io_group) for idx in tile_indices: val = val | (1 << idx) self.set_reg(self._base_ctrl_reg, val, io_group=io_group) return (self.get_reg(self._base_ctrl_reg, io_group=io_group) & 0xFF)
[docs] def disable_tile(self, tile_indices=None, io_group=None): ''' Disables the specified pixel tile(s) (first tile is index=0, second tile is index=1, ...). Returns the value of the new tile enable mask ''' if io_group is None: return dict([(io_group, self.disable_tile(tile_indices=tile_indices, io_group=io_group)) for io_group in self._io_group_table]) if tile_indices is None: tile_indices = list(range(8)) elif isinstance(tile_indices,int): tile_indices = [tile_indices] val = self.get_reg(self._base_ctrl_reg, io_group=io_group) for idx in tile_indices: val = val & (0xFFFFFFFF & ~(1 << idx)) self.set_reg(self._base_ctrl_reg, val, io_group=io_group) return (self.get_reg(self._base_ctrl_reg, io_group=io_group) & 0xFF)
[docs] def set_uart_clock_ratio(self, channel, ratio, io_group=None): ''' Sets PACMAN UART clock speed relative to the larpix master clock for the specified channel For a nominal 10MHz clock, a ratio value of 4 results in a 2.5MHz UART clock. Returns the value of the UART clock register that was set ''' if io_group is None: return dict([(io_group, self.set_uart_clock_ratio(channel, ratio, io_group=io_group)) for io_group in self._io_group_table]) reg = self._channel_size*channel + self._uart_clock_ratio_offset + self._channel_offset self.set_reg(reg, ratio, io_group=io_group) return self.get_reg(reg, io_group=io_group)
[docs] def reset_larpix(self, length=256, io_group=None): ''' Issues a reset of the specified length (in larpix MCLK cycles). If no ``length`` specified, issue a hard reset. Returns the value of the clock/reset control register after the reset ''' if io_group is None: return dict([(io_group, self.reset_larpix(length, io_group=io_group)) for io_group in self._io_group_table]) # set reset cycles self.set_reg(self._sw_reset_cycles_reg, length, io_group=io_group) # toggle reset bit clk_ctrl = self.get_reg(self._clk_ctrl_reg, io_group=io_group) self.set_reg(self._clk_ctrl_reg, clk_ctrl|4, io_group=io_group) self.set_reg(self._clk_ctrl_reg, clk_ctrl, io_group=io_group) return self.get_reg(self._clk_ctrl_reg, io_group=io_group)