Source code for larpix.io.serialport

'''
The serial port IO interface.

'''
from __future__ import absolute_import
import time
import os
import platform
import warnings

from larpix.io import IO
from larpix import Packet
import larpix.bitarrayhelper as bah

[docs]class SerialPort(IO): '''Wrapper for various serial port interfaces across platforms. Automatically loads correct driver based on the supplied port name: - ``'/dev/anything'`` ==> Linux ==> pySerial - ``'scan-ftdi'`` ==> MacOS ==> libFTDI ''' # Guesses for default port name by platform _default_port_map = { 'Default':['/dev/ttyUSB2','/dev/ttyUSB1'], # Same as Linux 'Linux':['/dev/serial0', '/dev/ttyAMA0', '/dev/ttyUSB2','/dev/ttyUSB1', '/dev/ttyUSB0'], # Linux 'Darwin':['scan-ftdi',], # OS X } _logger = None start_byte = b'\x73' stop_byte = b'\x71' max_write = 250 fpga_packet_size = 10 hwm = 10000 # max bytes to read from an empty queue def __init__(self, port=None, baudrate=1000000, timeout=0): super(SerialPort, self).__init__() if port is None: port = self._guess_port() self.port = port self.resolved_port = '' self.port_type = '' self.baudrate = baudrate self.timeout = timeout self.max_write = 250 self.serial_com = None self._initialize_serial_com() self.logger = None self.leftover_bytes = b'' if not (self._logger is None): self.logger = self._logger return @staticmethod def _format_UART(packet): packet_bytes = packet.bytes() formatted_packet = (SerialPort.start_byte + packet_bytes + SerialPort.stop_byte) return formatted_packet @staticmethod def _parse_input(bytestream, leftover_bytes=b''): packet_size = SerialPort.fpga_packet_size start_byte = SerialPort.start_byte[0] stop_byte = SerialPort.stop_byte[0] data_bytes = slice(1,9) # parse the bytestream into Packets byte_packets = [] skip_slices = [] bytestream = leftover_bytes + bytestream bytestream_len = len(bytestream) last_possible_start = bytestream_len - packet_size index = 0 while index <= last_possible_start: if (bytestream[index] == start_byte and bytestream[index+packet_size-1] == stop_byte): byte_packets.append(Packet(bytestream[index+1:index+9])) index += packet_size else: # Throw out everything between here and the next start byte. # Note: start searching after byte 0 in case it's # already a start byte index = bytestream.find(start_byte, index+1) if index == -1: index = bytestream_len if index <= bytestream_len: leftover_bytes = bytestream[index:] return byte_packets, leftover_bytes @staticmethod def format_bytestream(formatted_packets): bytestreams = [] current_bytestream = bytes() for packet in formatted_packets: if len(current_bytestream) + len(packet) <= SerialPort.max_write: current_bytestream += packet else: bytestreams.append(current_bytestream) current_bytestream = bytes() current_bytestream += packet bytestreams.append(current_bytestream) return bytestreams
[docs] @classmethod def encode(cls, packets): ''' Encodes a list of packets into a list of bytestream messages ''' return [SerialPort._format_UART(packet) for packet in packets]
[docs] @classmethod def decode(cls, msgs, leftover_bytes=b''): ''' Decodes a list of serial port bytestreams to packets ''' packets = [] byte_packet_list = [None] * len(msgs) for i in range(len(msgs)): byte_packet_list[i], leftover_bytes = SerialPort._parse_input(msgs[i], leftover_bytes) for packet_list in byte_packet_list: packets += packet_list for packet in packets: packet.io_channel = 1 packet.io_group = 1 return packets, leftover_bytes
[docs] @classmethod def is_valid_chip_key(cls, key): ''' Valid chip keys must be strings formatted as: ``'<io_chain>-<chip_id>'`` ''' if not super(cls, cls).is_valid_chip_key(key): return False if not isinstance(key, str): return False parsed_key = key.split('-') if not len(parsed_key) == 2: return False try: _ = int(parsed_key[0]) _ = int(parsed_key[1]) except ValueError: return False return True
[docs] def send(self, packets): ''' Format the packets as a bytestream and send it to the FPGA and on to the LArPix ASICs. ''' packet_bytes = self.encode(packets) bytestreams = self.format_bytestream(packet_bytes) for bytestream in bytestreams: self._write(bytestream)
[docs] def start_listening(self): ''' Start listening for incoming LArPix data by opening the serial port. ''' super(SerialPort, self).start_listening() self._open()
[docs] def stop_listening(self): ''' Stop listening for LArPix data by closing the serial port. ''' super(SerialPort, self).stop_listening() self._close()
[docs] def empty_queue(self): ''' Empty the incoming data buffer and return ``(packets, bytestream)``. ''' data_in = b'' keep_reading = True count = 0 while keep_reading: new_data = self._read(self.max_write) data_in += new_data count = len(data_in) keep_reading = (len(new_data) == self.max_write and count < self.hwm) packets, self.leftover_bytes = self.decode([data_in], leftover_bytes=self.leftover_bytes) return (packets, data_in)
[docs] def set_larpix_uart_clk_ratio(self, value): ''' Sends a special command to modify the larpix uart clk ratio (how many clock cycles correspond to one bit). A value of 2 means 1 uart bit == 2 clk cycles ''' data_out = ( b'c' # start byte + b'\x00' # address + bah.fromuint(value, 8, endian='big').tobytes() + b'\x00'*6 # unused + b'q' # stop byte ) self._write(data_out)
[docs] def set_larpix_reset_cnt(self, value): ''' Sends a special command to modify the length of the reset signal sent to the larpix chips. The reset will be held low for value + 1 larpix clk rising edges ''' data_out = ( b'c' # start byte + b'\x01' # address + bah.fromuint(value, 8, endian='big').tobytes() + b'\x00'*6 # unused + b'q' # stop byte ) self._write(data_out)
[docs] def larpix_reset(self): ''' Sends a special command to issue a larpix reset pulse. Pulse length is set by set_larpix_reset_cnt(). ''' data_out = ( b'c' # start byte + b'\x02' # address + b'\x00'*7 # unused + b'q' # stop byte ) self._write(data_out)
[docs] def set_utility_pulse(self, pulse_len=None, pulse_rep=None): ''' Sends a special command to issue set up utility pulser. Pulse length is the number of larpix clk cyles pulse is high, and pulse rep is the number of clk cycles until the next pulse. ''' data_out = b'' if not pulse_len is None: data_out += ( b'c' # start byte + b'\x03' # address + bah.fromuint(max(pulse_len-2,0), 32, endian='big').tobytes()[::-1] # -2 for proper register value -> clk cycles conv. + b'\x00'*3 # unused + b'q' # stop byte ) if not pulse_rep is None: data_out += ( b'c' # start byte + b'\x04' # address + bah.fromuint(max(pulse_rep-1,0), 32, endian='big').tobytes()[::-1] # -1 for proper register value -> clk cycles conv. + b'\x00'*3 # unused + b'q' # stop byte ) if data_out: self._write(data_out) else: raise RuntimeError('set either or both of pulse_len and pulse_rep')
[docs] def enable_utility_pulse(self): ''' Sends a special command to enable the utility pulser. Pulse characteristics can be set by set_utility_pulse(). ''' data_out = ( b'c' # start byte + b'\x05' # address + b'\x01' # enable + b'\x00'*6 # unused + b'q' # stop byte ) self._write(data_out)
[docs] def disable_utility_pulse(self): ''' Sends a special command to disable the utility pulser. Pulse characteristics can be set by set_utility_pulse(). ''' data_out = ( b'c' # start byte + b'\x05' # address + b'\x00' # disable + b'\x00'*6 # unused + b'q' # stop byte ) self._write(data_out)
[docs] def reset(self): ''' Sends a special command to reset FPGA and larpix. ''' data_out = ( b'c' # start byte + b'\x06' # address + b'\x00'*7 # unused + b'q' # stop byte ) self._write(data_out)
@classmethod def _guess_port(cls): '''Guess at correct port name based on platform''' platform_default = 'Default' platform_name = platform.system() if platform_name not in cls._default_port_map: platform_name = platform_default default_devs = cls._default_port_map[platform_name] osx_cmd = 'system_profiler SPUSBDataType | grep -C 7 FTDI | grep Serial' for default_dev in default_devs: if default_dev.startswith('/dev'): # pyserial try: if os.stat(default_dev): return default_dev except OSError: continue elif default_dev == 'scan-ftdi': if platform_name == 'Darwin': # scan for pylibftdi on OS X # Scan for FTDI devices result = os.popen(osx_cmd).read() if len(result) > 0: idx = result.find('Serial Number:') dev_name = result[idx+14:idx+24].strip() print('Autoscan found FTDI device: "%s"' % dev_name) return dev_name elif not default_dev.startswith('/dev'): # assume pylibftdi return default_dev raise OSError('Cannot find serial device for platform: %s' % platform_name) def _ready_port(self): '''Function handle. Will be reset to appropriate method''' raise NotImplementedError('Serial port type has not been defined.') def _ready_port_pyserial(self): '''Ready a pyserial port''' if not self.serial_com.is_open: # Open, if necessary self.serial_com.open() return def _ready_port_pylibftdi(self): '''Ready a pylibftdi port''' if self.serial_com.closed: # Open port self.serial_com.open() # Confirm baudrate (Required for OS X) self._confirm_baudrate() return def _ready_port_test(self): return True def _confirm_baudrate(self): '''Check and set the baud rate''' if self.serial_com.baudrate != self.baudrate: # Reset baudrate self.serial_com.baudrate = self.baudrate return def _initialize_serial_com(self): '''Initialize the low-level serial com connection''' self.resolved_port = self._resolve_port_name() self.port_type = self._resolve_port_type() if self.port_type == 'pyserial': self._ready_port = self._ready_port_pyserial import serial self.serial_com = serial.Serial(self.resolved_port, baudrate=self.baudrate, timeout=self.timeout) elif self.port_type == 'pylibftdi': self._ready_port = self._ready_port_pylibftdi import pylibftdi self.serial_com = pylibftdi.Device(self.resolved_port) elif self.port_type == 'test': self._ready_port = self._ready_port_test import test.test_larpix as test_lib self.serial_com = test_lib.FakeSerialPort() else: raise ValueError('Port type must be either pyserial, pylibftdi, or test') return def _resolve_port_name(self): '''Resolve the serial port name, based on user request''' if self.port is None: # Must set port raise ValueError('You must choose a serial port for operation') if self.port == 'auto': # Try to guess the correct port return self._guess_port() return self.port def _resolve_port_type(self): '''Resolve the type of serial port, based on the name''' if isinstance(self.resolved_port, str): if self.resolved_port.startswith('/dev'): # Looks like a tty device. Use pyserial. return 'pyserial' elif self.resolved_port == 'test': # Testing port. Don't use an external library return 'test' elif not self.resolved_port.startswith('/dev'): # Looks like a libftdi raw device. Use pylibftdi. return 'pylibftdi' raise ValueError('Unknown port: %s' % self.port) def _open(self): '''Open the port''' self._ready_port() return def _close(self): '''Close the port''' if self.serial_com is None: return self.serial_com.close() def _write(self, data): '''Write data to serial port''' self._ready_port() write_time = time.time() self.serial_com.write(data) if not self.is_listening: self._close() if self.logger: self.logger.record({'data_type':'write','data':data,'time':write_time}) return def _read(self, nbytes): '''Read data from serial port''' self._ready_port() read_time = time.time() data = self.serial_com.read(nbytes) if self.logger: self.logger.record({'data_type':'read','data':data,'time':read_time}) return data
[docs]def enable_logger(filename=None): '''Enable serial data logger''' if SerialPort._logger is None: from larpix.serial_helpers.datalogger import DataLogger SerialPort._logger = DataLogger(filename) if not SerialPort._logger.is_enabled(): SerialPort._logger.enable() return
[docs]def disable_logger(): '''Disable serial data logger''' if SerialPort._logger is not None: SerialPort._logger.disable() return
[docs]def flush_logger(): '''Flush serial data logger data to output file''' if SerialPort._logger is not None: SerialPort._logger.flush() return
def _test_serial_loopback(port_name='auto', enable_logging=False): '''Write stream of integers to serial port. Read back and see if loopback data is correct.''' baudrate = 1000000 timeout=0.1 if enable_logging: enable_logger() serial_port = SerialPort(port_name) serial_port.baudrate = baudrate print(' serial baudrate:',serial_port.baudrate) serial_port._open() test_length = 256 n_errors = 0 max_read_length = 8192 for iter_idx in range(10): write_data = range(iter_idx*10,iter_idx*10+test_length) write_data = [elem % 256 for elem in write_data] write_bits = bytearray(write_data) serial_port._write(write_bits) read_bits = b'' read_bits += serial_port._read(max_read_length) print("Testing:" + str([write_bits,])) if str(write_bits) != str(read_bits): print(" Error:") print(" wrote: ", str(write_bits)) print(" read: ", str(read_bits)) print(" read_bytes / wrote_bytes: %d / %d " % (len(read_bits), test_length)) n_errors += 1 else: print(' OK') serial_port._close() return n_errors if '__main__' == __name__: _test_serial_loopback()