Source code for neural_data_simulator.core.inputs

"""A collection of inputs that can be used by NDS."""
import abc
from dataclasses import dataclass
from dataclasses import field
from dataclasses import InitVar
import logging
import math
import time
from typing import List, Optional

import numpy as np
import pylsl

from neural_data_simulator.core.samples import Samples


[docs]class Input(abc.ABC): """Represents an input that can be used to consume data from. This can be an interface for a joystick, a behavior data generator, a data streamer that loads data from disk, etc. Each `read` should return all newly available data since the last `read` call. """
[docs] @abc.abstractmethod def read(self) -> Samples: """Read available data.""" pass
[docs] @abc.abstractmethod def connect(self) -> None: """Connect to input.""" pass
[docs] def disconnect(self) -> None: """Disconnect from input. The default implementation does nothing.""" pass
[docs]class SamplesInput(Input): """An input object based on :class:`neural_data_simulator.core.samples.Samples`. The underlying samples dataclass will have its timestamps modified to be in reference to when the first read was made from this class, simulating the appearance of data being collected in real-time. Alternatively, the function `set_reference_time_to_now` can be called prior to the first `read` of the data to use that as a reference time. A timer is synced between the reference time and the first timestamp in the input samples. Any calls to the `read` function will calculate the current time in reference to the synced timer and return the appropriate samples. """
[docs] def __init__(self, input_samples: Samples) -> None: """Initialize the SamplesInput class. Args: input_samples: Dataclass containing timestamps and behavior data. """ self._input_samples = input_samples self._logger = logging.getLogger(__name__) self._index_next_sample_to_read = 0 self._has_reference_time = False self._last_time_streamed: Optional[float] = None self._time_reference: Optional[float] = None
[docs] def set_reference_time_to_now(self): """Set current time as starting time for data stream.""" self._initialize_streaming_time() self._set_reference_time() self._has_reference_time = True
[docs] def read(self) -> Samples: """Get new samples from the time of last read. If first call to `read` samples will be read since the call to `set_reference_time_to_now`. If `set_reference_time_to_now` was not previously called, it will be called. Returns: :class:`neural_data_simulator.core.samples.Samples` dataclass with timestamps and data available since last `read` call. """ if not self._has_reference_time: self._logger.warning( "Reference time not calculated before first read call." " Calculating now." ) self.set_reference_time_to_now() index_current_sample = self._get_current_sample_index() output_samples = self._get_samples_from_last_read_to_index(index_current_sample) self._index_next_sample_to_read = index_current_sample + 1 return output_samples
def _get_current_sample_index(self) -> int: """Find sample index corresponding to current time. Get index of sample representing current time relative to input samples time. """ time_now = self._get_referenced_time() return int( np.searchsorted(self._input_samples.timestamps, time_now, side="right") - 1 ) def _get_samples_from_last_read_to_index(self, index: int) -> Samples: timestamps = self._input_samples.timestamps[ self._index_next_sample_to_read : index + 1 ] data = self._input_samples.data[self._index_next_sample_to_read : index + 1] return Samples(timestamps, data) def _initialize_streaming_time(self) -> None: self._last_time_streamed = time.time() def _set_reference_time(self) -> None: """Set reference time. Reference time is set as the time difference from `set_reference_time_to_now` call to first timestamp in the input samples. This correction is used to convert real time elapsed to time elapsed relative to the samples first timestamp. """ if self._last_time_streamed: self._time_reference = ( self._last_time_streamed - self._input_samples.timestamps[0] ) else: raise ValueError("Streaming time not initialized") def _get_referenced_time(self) -> float: """Get current time in the input samples reference.""" if self._time_reference: return time.time() - self._time_reference else: raise ValueError("Reference time not initialized")
[docs] def connect(self) -> None: """No action required during connect for this class.""" pass
[docs]@dataclass class StreamInfo: """Selected advertised properties of an LSL stream.""" name: str = field(init=False) """Name of the LSL stream.""" sample_rate: float = field(init=False) """Advertised sample rate of the LSL stream.""" channel_count: int = field(init=False) """Number of channels in the LSL stream.""" lsl_stream_info: InitVar[pylsl.stream_info] """pylsl stream info object.""" def __post_init__(self, lsl_stream_info: pylsl.stream_info): """Actually init members from stream info.""" self.name = lsl_stream_info.name() self.sample_rate = lsl_stream_info.nominal_srate() self.channel_count = lsl_stream_info.channel_count()
LSL_DTYPES = [ [], np.float32, np.float64, None, np.int32, np.int16, np.int8, np.int64, ]
[docs]class LSLInput(Input): """Represents an LSL Inlet stream for behavior data."""
[docs] def __init__( self, stream_name: str, connection_timeout: float = 60.0, resolve_streams_wait_time: float = 1.0, ): """Initialize LSLInput class. Args: stream_name: Name of the LSL stream to retrieve data from. connection_timeout: Maximum time for attempting a connection to an LSL input stream. resolve_streams_wait_time: Maximum waiting time to get the list of available streams. Should be bigger than 0.5 to ensure all streams are returned. """ self._stream_name = stream_name self._stream_info: Optional[pylsl.StreamInfo] = None self._inlet: Optional[pylsl.StreamInlet] = None self._connection_timeout = connection_timeout self._resolve_streams_wait_time = resolve_streams_wait_time self._logger = logging.getLogger(__name__)
[docs] def get_info(self) -> StreamInfo: """Get information about the LSL stream. If the stream is not connected, it will try to resolve the stream and return the information. Returns: LSL stream properties. Raises: ValueError: If the stream is not found. """ if self._inlet is not None: return StreamInfo(self._inlet.info()) results = pylsl.resolve_stream("name", self._stream_name) if (results is not None) and (len(results) > 0): return StreamInfo(results[0]) raise ValueError("Stream not found")
def _check_connection(self): if self._inlet is None: raise ConnectionError( "LSL StreamInlet is not connected, ensure you run connect before read." )
[docs] def read(self) -> Samples: """Read available data from the inlet as a samples. Returns: :class:`neural_data_simulator.core.samples.Samples` dataclass with timestamps and data read from the LSL StreamInlet. If no data is available, an empty Samples is returned. Raises: ValueError: LSL StreamInlet is not connected. `connect` should be called before `read`. """ self._check_connection() assert self._inlet is not None _, timestamps = self._inlet.pull_chunk( timeout=0.0, max_samples=self.buffer.shape[0], dest_obj=self.buffer ) if timestamps: data = self.buffer[: len(timestamps), :] return Samples(np.array(timestamps), np.array(data)) return Samples.empty_samples()
[docs] def set_connection_timeout(self, timeout: float) -> None: """Set the maximum time that the inlet search for the desired LSL stream. Args: timeout: Maximum time to wait in seconds. Raises: ValueError: if timeout equals or less than 0. """ if timeout > 0: self._connection_timeout = timeout else: raise ValueError(f"Timeout must be greater than 0, received {timeout}")
[docs] def connect(self): """Connect to the LSL Inlet stream.""" self._logger.info("Connecting to device...") self._stream_info = self._get_stream() # could verify # of channels and sampling rate if it exists in a config file self._logger.info(f"Found stream: '{self._stream_name}'") self._inlet = LSLInput._create_inlet(self._stream_info) self._logger.info(f"Connected to LSL input stream: '{self._stream_name}'") bufsize = ( 200 * math.ceil(self._stream_info.nominal_srate()), self._stream_info.channel_count(), ) self.buffer = np.empty( bufsize, dtype=LSL_DTYPES[self._stream_info.channel_format()] )
[docs] def disconnect(self): """Disconnect from the LSL Inlet stream.""" self._logger.info("Disconnecting LSL input stream...") del self._inlet self._inlet = None self._stream_info = None
def _get_stream(self) -> pylsl.StreamInfo: """Resolve the first LSL source stream with the provided stream name. Returns: pylsl.StreamInfo object of the first stream with the provided name. """ stream_infos = pylsl.resolve_byprop( "name", self._stream_name, timeout=self._connection_timeout ) if len(stream_infos) > 0: available_streams = pylsl.resolve_streams( wait_time=self._resolve_streams_wait_time ) streams_with_target_name = [ stream for stream in available_streams if stream.name() == self._stream_name ] if len(streams_with_target_name) > 1: raise Exception(f"Multiple streams with same name {self._stream_name}") return stream_infos[0] else: available_stream_names = [ info.name for info in self._get_available_streams() ] raise ConnectionError( f"Did not find a {self._stream_name} LSL stream.\n" f"The available streams are: {available_stream_names}" ) def _get_available_streams(self) -> List[StreamInfo]: stream_infos = pylsl.resolve_streams(wait_time=self._resolve_streams_wait_time) return [StreamInfo(lsl_stream_info) for lsl_stream_info in stream_infos] @staticmethod def _create_inlet(stream_info: pylsl.StreamInfo) -> pylsl.StreamInlet: """Create and return an LSL StreamInlet given a StreamInfo.""" return pylsl.StreamInlet( info=stream_info, processing_flags=pylsl.proc_clocksync | pylsl.proc_dejitter, )