Source code for bluepyefe.recording

from __future__ import annotations

"""Recording class"""

"""
Copyright (c) 2022, EPFL/Blue Brain Project

 This file is part of BluePyEfe <https://github.com/BlueBrain/BluePyEfe>

 This library is free software; you can redistribute it and/or modify it under
 the terms of the GNU Lesser General Public License version 3.0 as published
 by the Free Software Foundation.

 This library is distributed in the hope that it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 details.

 You should have received a copy of the GNU Lesser General Public License
 along with this library; if not, write to the Free Software Foundation, Inc.,
 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
from abc import ABC, abstractmethod
import logging
import numpy
import efel
import matplotlib.pyplot as plt
from pathlib import Path

from .tools import to_ms, to_mV, to_nA, set_efel_settings

logger = logging.getLogger(__name__)


[docs] class Recording(ABC): """Contains the data related to an electrophysiological recording.""" def __init__(self, config_data, reader_data, protocol_name): """ Constructor Args: config_data (dict): metadata for the recording considered informed by the user. reader_data (dict): metadata for the recording considered returned by the recording reader. protocol_name (str): name of the protocol of the present recording. """ self.config_data = config_data self.reader_data = reader_data self.protocol_name = protocol_name if "filepath" in config_data: self.files = [config_data["filepath"]] elif "i_file" in config_data: self.files = [config_data["i_file"], config_data["v_file"]] else: self.files = [] self.id = reader_data.get("id", None) self.location = None self.efeatures = {} self.t = None self.current = None self.voltage = None self.amp = None self.hypamp = None self.repetition = None if len(reader_data): self.t, self.current, self.voltage, self.amp, self.hypamp = self.standardize_trace( config_data, reader_data ) self.export_attr = None self.auto_threshold = None self.peak_time = None @property def name(self): """Proxy that can be used to name the recording.""" if self.id: return self.id if "filepath" in self.config_data and self.config_data["filepath"] is not None: return str(Path(self.config_data["filepath"]).stem) if "v_file" in self.config_data: return str(Path(self.config_data["v_file"]).stem) return "" @property def time(self): """Alias of the time attribute""" return self.t @time.setter def time(self, value): """Setter for an alias of the time attribute""" self.t = value @property def spikecount(self) -> int | None: if self.peak_time is None: return None else: return len(self.peak_time)
[docs] def set_timing_ecode(self, name_timings, config_data): """Used by some of the children classes to check that the timing of the ecode is provided by the user and assign it to the correct attribute.""" for timing in name_timings: if timing in config_data and config_data[timing] is not None: setattr(self, timing, int(round(config_data[timing] / self.dt))) else: raise AttributeError( f"For protocol {self.protocol_name}, {timing} should" "be specified in the config (in ms)." )
[docs] def set_amplitudes_ecode(self, amp_name, config_data, reader_data, value): """Check in the user-provided data or reader-provided data if a given current amplitude is provided. If it isn't use the value computed from the current series""" unit = config_data.get("i_unit") or reader_data.get("i_unit") if unit is None: raise Exception(f"Current unit not configured for file {self.files}") if amp_name in config_data and config_data[amp_name] is not None: amp = to_nA(config_data[amp_name], unit) elif amp_name in reader_data and reader_data[amp_name] is not None: amp = to_nA(reader_data[amp_name], unit) else: amp = value setattr(self, amp_name, amp)
[docs] def index_to_ms(self, name_timing, time_series): """Used by some of the children classes to translate a timing attribute from index to ms.""" setattr(self, name_timing, time_series[int(round(getattr(self, name_timing)))])
def ms_to_index(self, timing): return int(round(timing / self.dt))
[docs] def get_params(self): """Returns the eCode parameters""" return {attr: getattr(self, attr) for attr in self.export_attr}
[docs] @abstractmethod def interpret(self): """Analyse a current array and extract from it the parameters needed to reconstruct the array""" pass
[docs] @abstractmethod def generate(self): """Generate the current array from the parameters of the ecode""" pass
@abstractmethod def compute_relative_amp(self, amp_threshold): """Divide all the amplitude in the stimuli by the spiking amplitude""" pass @abstractmethod def in_target(self, target, tolerance): """Returns a boolean. True if the amplitude of the eCode is close to target and False otherwise.""" pass
[docs] def standardize_trace(self, config_data, reader_data): """Standardize the units of the current and voltage times series. If some metadata are present both in the file itself and the file_metadata dictionary, the latter is used.""" # Create the time series t = numpy.arange(len(reader_data["voltage"])) # Give it the correct sampling rate if "dt" in config_data and config_data["dt"] is not None: t = t * config_data["dt"] elif "dt" in reader_data and reader_data["dt"] is not None: t = t * reader_data["dt"] else: raise Exception( "Sampling rate not configured for " "file {}".format(self.files) ) # Convert it to ms if "t_unit" in config_data and config_data["t_unit"] is not None: t = to_ms(t, config_data["t_unit"]) elif "t_unit" in reader_data and reader_data["t_unit"] is not None: t = to_ms(t, reader_data["t_unit"]) else: raise Exception( "Time unit not configured for " "file {}".format(self.files) ) # Convert current to nA # Determine the unit to use unit = config_data.get("i_unit") or reader_data.get("i_unit") if unit: current = to_nA(reader_data.get("current", 0), unit) # Set amp - prioritize amp in config_data amp_source = config_data if "amp" in config_data else reader_data amp = to_nA(amp_source.get("amp", 0), unit) if "amp" in amp_source else None # Set hypamp - prioritize hypamp in config_data hypamp_source = config_data if "hypamp" in config_data else reader_data hypamp = to_nA(hypamp_source.get("hypamp", 0), unit) if "hypamp" in hypamp_source else None else: raise Exception( "Current unit not configured for " "file {}".format(self.files) ) # Convert voltage to mV if "v_unit" in config_data and config_data["v_unit"] is not None: voltage = to_mV(reader_data["voltage"], config_data["v_unit"]) elif "v_unit" in reader_data and reader_data["v_unit"] is not None: voltage = to_mV(reader_data["voltage"], reader_data["v_unit"]) else: raise Exception( "Voltage unit not configured for " "file {}".format(self.files) ) # Offset membrane potential to known value if "v_corr" in config_data and config_data["v_corr"] is not None: voltage = ( voltage - numpy.median(voltage[:100]) + config_data["v_corr"] ) # Correct for the liquid junction potential # WARNING: the ljp is informed as a positive float but we substract it # from the voltage if "ljp" in config_data and config_data["ljp"] is not None: voltage = numpy.array(voltage) - config_data["ljp"] if "repetition" in reader_data: self.repetition = reader_data["repetition"] return t, current, voltage, amp, hypamp
[docs] def call_efel(self, efeatures, efel_settings=None): """ Calls efel to compute the wanted efeatures """ if efel_settings is None: efel_settings = {} settings = {"stimulus_current": self.amp} if "Threshold" not in efel_settings and self.auto_threshold is not None: logger.warning(f"Threshold was not provided and was automatically" f" set to {self.auto_threshold}") settings["Threshold"] = self.auto_threshold for setting in efel_settings: if setting not in ['stim_start', 'stim_end']: settings[setting] = efel_settings[setting] stim_start = efel_settings.get('stim_start', self.ton) stim_end = efel_settings.get('stim_end', self.toff) # Special case for SpikeRec if "multiple_decay_time_constant_after_stim" in efeatures: if hasattr(self, "multi_stim_start") and hasattr(self, "multi_stim_end"): settings["multi_stim_start"] = self.multi_stim_start settings["multi_stim_end"] = self.multi_stim_end elif "stim_start" in efel_settings and "stim_end" in efel_settings: settings["multi_stim_start"] = [stim_start] settings["multi_stim_end"] = [stim_end] set_efel_settings(settings) efel_trace = { "T": self.t, "V": self.voltage, 'stim_start': [stim_start], 'stim_end': [stim_end] } if self.current is not None: efel_trace["I"] = self.current try: return efel.getFeatureValues( [efel_trace], efeatures, raise_warnings=False ) except TypeError as e: if "Unknown feature name" in str(e): str_f = " ".join(efeatures) raise Exception("One of the following feature name does not " f"exist in eFEL: {str_f}")
[docs] def compute_efeatures( self, efeatures, efeature_names=None, efel_settings=None ): """Compute a set of efeatures for the present recording. Args: efeatures (list of str): name of the efeatures to extract from the recordings. efeature_names (list of str): Optional. Given name for the features. Can and should be used if the same feature is to be extracted several time on different sections of the same recording. efel_settings (dict): eFEL settings in the form {setting_name: setting_value}. """ if efeature_names is None: efeature_names = efeatures for i, f in enumerate(efeatures): if efeature_names[i] is None: efeature_names[i] = f efel_vals = self.call_efel(efeatures, efel_settings) for efeature_name, efeature in zip(efeature_names, efeatures): if efel_vals[0][efeature] is not None: value = [v for v in efel_vals[0][efeature] if v is not None] else: value = [] if len(value) == 0 or numpy.isinf(numpy.nanmean(value)): self.efeatures[efeature_name] = numpy.nan else: self.efeatures[efeature_name] = numpy.nanmean(value)
[docs] def compute_spikecount(self, efel_settings=None): """Compute the number of spikes in the trace""" if not efel_settings: efel_settings = {} tmp_settings = {'strict_stiminterval': True} if efel_settings is not None: tmp_settings.update(efel_settings) # If the setting Threshold is not provided, tries to find it if tmp_settings.get("Threshold", None) is None: tmp_settings["Threshold"] = self.auto_threshold efel_vals = self.call_efel(['peak_time'], tmp_settings) self.peak_time = efel_vals[0]['peak_time'] else: self.peak_time = self.call_efel(['peak_time'], tmp_settings)[0]['peak_time'] if self.spikecount == 0 and numpy.max(self.voltage) > tmp_settings["Threshold"]: logger.warning( f"No spikes were detected in recording {self.files} but the " "voltage goes higher than the spike detection threshold." )
[docs] def set_autothreshold(self, offset_voltage=20.) -> None: """Computes the threshold based on the input voltage sets it as an attribute.""" idx_ton = self.ms_to_index(self.ton) idx_toff = self.ms_to_index(self.toff) step_voltage = numpy.median(self.voltage[idx_ton:idx_toff]) base_voltage = numpy.median(self.voltage[:idx_ton]) if base_voltage > step_voltage: thresh = base_voltage + offset_voltage else: thresh = step_voltage + offset_voltage # The threshold cannot be lower than the base voltage (handles the case # where the step is hyperpolarizing) self.auto_threshold = numpy.clip(thresh, base_voltage + offset_voltage, 50.)
[docs] def in_target(self, target, tolerance, absolute_amplitude=False): """Returns a boolean. True if the amplitude of the eCode is close to target and False otherwise.""" effective_amp = self.amp if absolute_amplitude else self.amp_rel if numpy.abs(target - effective_amp) < tolerance: return True return False
[docs] def compute_relative_amp(self, amp_threshold): """Divide all the amplitude in the stimuli by the spiking amplitude""" self.amp_rel = 100.0 * self.amp / amp_threshold self.hypamp_rel = 100.0 * self.hypamp / amp_threshold
def get_plot_amplitude_title(self): return " ({:.01f}%)".format(self.amp_rel)
[docs] def plot( self, axis_current=None, axis_voltage=None, display_xlabel=True, display_ylabel=True ): """Plot the recording""" if axis_current is None or axis_voltage is None: _, axs = plt.subplots(nrows=2, ncols=1, figsize=[4.9, 4.8]) axis_current, axis_voltage = axs[0], axs[1] title = "Holding Amp = {:.03f} nA\nAmp = {:.03f} nA".format( self.hypamp, self.amp) if self.amp_rel is not None: title += self.get_plot_amplitude_title() if self.id is not None: title += "\nid: {}".format(self.id) if self.repetition is not None: title += "\nRepetition: {}".format(self.repetition) axis_current.set_title(title, size="x-small") gen_t, gen_i = self.generate() axis_current.plot(self.t, self.current, c="C0", lw=0.8) axis_current.plot(gen_t, gen_i, c="C1", ls="--", lw=0.8) axis_voltage.plot(self.t, self.voltage, c="C0", lw=0.8) if self.peak_time is not None: max_v = numpy.max(self.voltage) for pt in self.peak_time: axis_voltage.plot( [pt, pt], [max_v + 5, max_v + 15], c="C3", ls="-", lw=0.5 ) if self.auto_threshold is not None: axis_voltage.plot( [self.t[0], self.t[-1]], [self.auto_threshold, self.auto_threshold], c="black", ls="--", lw=0.5, alpha=0.8 ) if display_xlabel: axis_voltage.set_xlabel("Time (ms)") if display_ylabel: axis_current.set_ylabel("Current (nA)") axis_voltage.set_ylabel("Voltage (mV)") axis_current.tick_params(axis="both", which="major", labelsize=8) axis_current.tick_params(axis="both", which="minor", labelsize=6) axis_voltage.tick_params(axis="both", which="major", labelsize=8) axis_voltage.tick_params(axis="both", which="minor", labelsize=6) return axis_current, axis_voltage