Source code for larpix.io.fakeio

'''
A module for the FakeIO class.

'''
from __future__ import print_function
from collections import deque

from larpix.io import IO
from larpix import TimestampPacket

[docs]class FakeIO(IO): ''' An IO stand-in that sends output to stdout (i.e. print) and reads input from a data member that can be set in advance. The queue is implemented as a ``collections.deque`` object. Data can be queued up in advance through repeated calls to ``queue.append()``. The first element of the queue will be passed on to the ``Controller.read`` method each time it is called. This is a true queue, i.e. first-in, first-out. The format for an element of the queue is a tuple: ``([list_of_Packets], b'corresponding bytes')``. Although meaningless in terms of the internal implementation, ``FakeIO`` objects contain an internal state determining whether the object is currently "listening," and will raise a ``RuntimeError`` if ``empty_queue`` is called when the object is not listening. ''' def __init__(self): super(FakeIO, self).__init__() self.queue = deque() self.sent = []
[docs] @classmethod def encode(cls, packets): ''' Placeholder for packet encoding :returns: ``packets`` ''' return packets
[docs] @classmethod def decode(cls, msgs): ''' Placeholder for message decoding :returns: ``msgs`` ''' return msgs
[docs] @staticmethod def add_timestamps(packets, positions, timestamps=0): ''' Insert timestamp packets into a list of packets in the given positions. Convenience method for modifying lists of packets to add to the FakeIO queue. Modifies the list in-place. The positions are with respect to the indexes of the original list, so that the inserted element is just before the element that originally had that index. e.g. >>> add_timestamps([a, b, c, d], [1, 3]) [a, TimestampPacket(...), b, c, TimestampPacket(...), d] If timestamps is a list, those timestamps will be used for the TimestampPackets. If it is an int, it will be taken as the starting time, and each subsequent packet will be incremented by 1. A default starting time of 0 is assumed. ''' npositions = len(positions) if isinstance(timestamps, int): timestamps = list(range(timestamps, timestamps+npositions)) for position, timestamp in reversed(list(zip(positions, timestamps))): packets.insert(position, TimestampPacket(timestamp))
[docs] def send(self, packets): ''' Print the packets to stdout. ''' for packet in packets: print(packet) self.sent.append(packets)
[docs] def start_listening(self): ''' Mock-up of beginning to listen for new packets. ''' super(FakeIO, self).start_listening()
[docs] def stop_listening(self): ''' Mock-up of no longer listening for new packets. ''' super(FakeIO, self).stop_listening()
[docs] def empty_queue(self): ''' Read and remove the next item from the internal queue and return it as if it were data that had just been read. ''' if not self.is_listening: raise RuntimeError('Cannot empty queue when not' ' listening') if len(self.queue): data = self.queue.popleft() return data return list(), b''