Source code for fluidlab.instruments.features

"""Features for defining drivers (:mod:`fluidlab.instruments.features`)
=======================================================================

.. todo:: Work on the documentation of :mod:`fluidlab.instruments.features`.

Provides:

.. autoclass:: Feature
   :members:
   :private-members:

.. autoclass:: WriteCommand
   :members:
   :private-members:

.. autoclass:: QueryCommand
   :members:
   :private-members:

.. autoclass:: Value
   :members:
   :private-members:

.. autoclass:: BoolValue
   :members:
   :private-members:

.. autoclass:: StringValue
   :members:
   :private-members:

.. autoclass:: NumberValue
   :members:
   :private-members:

.. autoclass:: IntValue
   :members:
   :private-members:

.. autoclass:: FloatValue
   :members:
   :private-members:

.. autoclass:: RegisterValue
   :members:
   :private-members:

"""
import warnings
import time


def custom_formatwarning(message, category, filename, lineno, line=None):
    return f"{filename}:{lineno}: {category.__name__}: {message}\n"


warnings.formatwarning = custom_formatwarning
warnings.simplefilter("always", UserWarning)
# warnings.simplefilter('always', Warning)
# warnings.simplefilter('always')


[docs]class Feature: def __init__(self, name, doc=""): self._name = name self.__doc__ = doc def __repr__(self): C = type(self) return "<" + C.__name__ + " object>"
[docs]class WriteCommand(Feature): def __init__(self, name, doc="", command_str=""): super().__init__(name, doc) self.command_str = command_str
[docs] def _build_driver_class(self, Driver): """Add a "write function" to the driver class""" command_str = self.command_str def func(self): self._interface.write(command_str) func.__doc__ = self.__doc__ setattr(Driver, self._name, func)
[docs]class QueryCommand(Feature): def __init__(self, name, doc="", command_str="", parse_result=None): super().__init__(name, doc) self.command_str = command_str self.parse_result = parse_result
[docs] def _build_driver_class(self, Driver): """Add a "query function" to the driver class""" command_str = self.command_str parse_result = self.parse_result if parse_result is None: def func(self): return self._interface.query(command_str) else: def func(self): return parse_result(self._interface.query(command_str)) func.__doc__ = self.__doc__ setattr(Driver, self._name, func)
class SuperValue(Feature): def _build_driver_class(self, Driver): name = self._name setattr(Driver, name, self)
[docs]class Value(SuperValue): _fmt = "{}" def __init__( self, name, doc="", command_set=None, command_get=None, check_instrument_value=True, pause_instrument=0.0, channel_argument=False, possible_values=None, possible_channels=None, default_channel=0, ): super().__init__(name, doc) self.command_set = command_set self.check_instrument_value_after_set = check_instrument_value self.possible_channels = possible_channels self.possible_values = possible_values self.default_channel = default_channel self.pause_instrument = pause_instrument # certains appareils plantent si on leur parle sans faire de pauses self.channel_argument = channel_argument # if true, then get() and set() needs a channel argument # in that case, command_get and command_set strings are provided # with python string format arguments {channel} and {value} if ( command_get is None and command_set is not None and not isinstance(command_set, int) ): try: command_get = command_set + "?" except TypeError: print(command_set) print(self.__class__) print(name) raise self.command_get = command_get def _check_value(self, value): pass def _check_instrument_value(self, value): pass def _convert_from_str(self, value): return value.strip() def _convert_as_str(self, value): return self._fmt.format(value)
[docs] def get(self, channel=None): """Get the value from the instrument. Optional argument 'channel' is used for multichannel instrument. Then command_get should include '{channel:}' """ if isinstance(channel, list) or isinstance(channel, tuple): return [self.get(c) for c in channel] if channel is None: channel = self.default_channel if self.possible_channels is not None and channel not in self.possible_channels: raise ValueError(f"Wrong channel. Must be in {str(self.possible_channels):}.") if self.pause_instrument > 0: time.sleep(self.pause_instrument) command = self.command_get if self.channel_argument: command = command.format(channel=channel) if self.pause_instrument > 0: result = self._convert_from_str( self._interface.query(command, time_delay=self.pause_instrument) ) else: result = self._convert_from_str(self._interface.query(command)) self._check_value(result) return result
[docs] def set(self, value, channel=None): """Set the value in the instrument. Optional argument 'channel' is used for multichannel instrument. Then command_set argument should include '{channel:}' and '{value:}' """ if self.pause_instrument > 0: time.sleep(self.pause_instrument) self._check_value(value) if channel is None: channel = self.default_channel if self.possible_channels is not None and channel not in self.possible_channels: raise ValueError(f"Wrong channel. Must be in {str(self.possible_channels):}.") if self.possible_values is not None and value not in self.possible_values: raise ValueError(f"Wrong value. Must be in {str(self.possible_values):}.") if self.channel_argument: # here we don't call _convert_as_str to allow the user to choose # the desired format in the command_set string command = self.command_set.format( channel=channel, value=self._convert_as_str(value) ) else: command = self.command_set + " " + self._convert_as_str(value) self._interface.write(command) if self.check_instrument_value_after_set: self._check_instrument_value(value)
class ReadOnlyBoolValue(Value): def get(self): return self._interface.read_readonlybool(self._adress)
[docs]class BoolValue(Value): def __init__( self, name, doc="", command_set=None, command_get=None, check_instrument_value=True, pause_instrument=0.0, channel_argument=False, true_string="1", false_string="0", possible_channels=None, default_channel=0, ): super().__init__( name, doc, command_set, command_get, check_instrument_value, pause_instrument, channel_argument, possible_values={True, False}, possible_channels=possible_channels, default_channel=default_channel, ) self.true_string = true_string self.false_string = false_string def _convert_from_str(self, value): value = value.strip() if value == self.false_string: return False else: return True def _convert_as_str(self, value): if value: return self.true_string else: return self.false_string def _check_instrument_value(self, value): instr_value = self.get() if instr_value != value: msg = ( self._name + " could not be set to " + str(value) + " and was set to " + str(instr_value) + " instead" ) warnings.warn(msg, UserWarning)
[docs]class StringValue(Value): def __init__( self, name, doc="", command_set=None, command_get=None, valid_values=None, check_instrument_value=True, pause_instrument=0.0, channel_argument=False, possible_values=None, possible_channels=None, default_channel=0, ): super().__init__( name, doc, command_set=command_set, command_get=command_get, check_instrument_value=check_instrument_value, pause_instrument=pause_instrument, channel_argument=channel_argument, possible_values=possible_values, possible_channels=possible_channels, default_channel=default_channel, ) self.valid_values = valid_values def _check_value(self, value): value = value.lower() if self.valid_values is not None and value not in self.valid_values: raise ValueError( 'Value "{}" not in valid_values, which is equal to {}'.format( value, repr(self.valid_values) ) ) def _check_instrument_value(self, value): value = value.lower() instr_value = self.get().lower() if not (value.startswith(instr_value)): msg = ( self._name + " could not be set to " + str(value) + " and was set to " + str(instr_value) + " instead" ) warnings.warn(msg, UserWarning)
class ReadOnlyInt16Value(Value): def get(self): return self._interface.read_readonlyint16(self._adress) class Int16Value(Value): def get(self): return self._interface.read_int16(self._adress) def set(self, value, signed=False): self._interface.write_int16(self._adress, value, signed) class DecimalInt16Value(Int16Value): def __init__(self, name, doc="", address=0, number_of_decimals=0): self._number_of_decimals = number_of_decimals super().__init__(name, doc, address) def get(self): raw_value = super().get() if self._number_of_decimals == 0: return raw_value else: return float(raw_value) / 10 ** self._number_of_decimals def set(self, value, check=True, signed=False): """Set the Value to value. If check, checks that the value was properly set. """ if self._number_of_decimals == 0: raw_int = int(value) else: raw_int = int(value * 10 ** self._number_of_decimals) super().set(raw_int, signed) if check: self._check_value(value) def _check_value(self, value): """After a value is set, checks the instrument value and sends a warning if it doesn't match.""" instr_value = self.get() if instr_value != value: msg = ( "Value {} could not be set to {} and was set " "to {} instead" ).format(self._name, value, instr_value) warnings.warn(msg, UserWarning) class Int16StringValue(SuperValue): def __init__(self, name, doc="", int_dict=None, adress=0): self._adress = adress self._int_dict = int_dict string_dict = {} for k in int_dict: string_dict[int_dict[k]] = k self._string_dict = string_dict super().__init__(name, doc) def get(self): return self._int_dict[self._interface.read_int16(self._adress)] def set(self, string): self._interface.write_int16(self._adress, self._string_dict[string]) class ReadOnlyFloat32Value(Value): def get(self): return self._interface.read_readonlyfloat32(self._adress) class Float32Value(Value): def get(self): return self._interface.read_float32(self._adress) def set(self, value): self._interface.write_float32(self._adress, value)
[docs]class NumberValue(Value): def __init__( self, name, doc="", command_set=None, command_get=None, limits=None, check_instrument_value=True, pause_instrument=0.0, channel_argument=False, possible_values=None, possible_channels=None, default_channel=0, ): super().__init__( name, doc, command_set=command_set, command_get=command_get, check_instrument_value=check_instrument_value, pause_instrument=pause_instrument, channel_argument=channel_argument, possible_values=possible_values, possible_channels=possible_channels, default_channel=default_channel, ) if limits is not None and len(limits) != 2: raise ValueError("limits have to be a sequence of length 2 or None") self.limits = limits def _check_value(self, value): if self.limits is None: return lim_min = self.limits[0] lim_max = self.limits[1] if lim_min is not None and value < lim_min: raise ValueError( f"Value ({value}) is smaller than lim_min ({lim_min})" ) if lim_max is not None and value > lim_max: raise ValueError( f"Value ({value}) is larger than lim_max ({lim_max})" ) def _check_instrument_value(self, value): instr_value = self.get() if abs(instr_value - value) > 0.001 * abs(value): msg = ( self._name + " could not be set to " + str(value) + " and was set to " + str(instr_value) + " instead" ) warnings.warn(msg, UserWarning)
[docs]class FloatValue(NumberValue): _fmt = "{:f}" def _convert_from_str(self, value): return float(value)
class FloatScientificValue(NumberValue): _fmt = "{:.5e}" def _convert_from_str(self, value): return float(value)
[docs]class IntValue(NumberValue): _fmt = "{:d}" def _convert_from_str(self, value): return int(value)
[docs]class RegisterValue(NumberValue): def __init__( self, name, doc="", command_set=None, command_get=None, keys=None, default_value=0, check_instrument_value=True, pause_instrument=0.0, channel_argument=False, possible_values=None, possible_channels=None, default_channel=0, ): if keys is None: raise ValueError("Keys has to contain the keys of the register.") self.keys = keys self.nb_bits = len(keys) limits = (0, 2 ** self.nb_bits) super().__init__( name, doc, command_set=command_set, command_get=command_get, limits=limits, check_instrument_value=check_instrument_value, pause_instrument=pause_instrument, channel_argument=channel_argument, possible_values=possible_values, possible_channels=possible_channels, default_channel=default_channel, ) if isinstance(default_value, int): self.default_value = self.compute_dict_from_number(default_value) elif isinstance(default_value, dict): for k in default_value.keys(): if k not in keys: raise ValueError(f"key {k} not in keys") for k in keys: default_value.setdefault(k, False) self.default_value = default_value else: raise ValueError("default_value has to be an int or a dict.") def _build_driver_class(self, Driver): name = self._name setattr(Driver, name, self)
[docs] def get_as_number(self): """Get the register as number.""" value = self._interface.query(self.command_get) self._check_value(value) return value
[docs] def get(self): """Get the register as dictionary.""" number = self.get_as_number() return self.compute_dict_from_number(number)
[docs] def set(self, value): """Set the register. Parameters ---------- value : {dict, int} The value as a dictionnary or an integer. """ if isinstance(value, dict): value = self.compute_number_from_dict(value) self._check_value(value) self._interface.write(self.command_set + f" {value}")
def compute_number_from_dict(self, d): for k in d.keys(): if k not in self.keys: raise ValueError(f"key {k} not in keys") self._complete_dict_with_default(d) number = 0 for i, k in enumerate(self.keys): if d[k]: number += 2 ** i return number def compute_dict_from_number(self, number): s = bin(number)[2:].zfill(self.nb_bits) d = {} for i, k in enumerate(self.keys): d[k] = s[-1 - i] == "1" return d def _complete_dict_with_default(self, d): for k, v in self.default_value.items(): d.setdefault(k, v)