"""Interfaces with the instruments (:mod:`fluidlab.interfaces`)
===============================================================
Provides some classes:
.. autoclass:: PhysicalInterfaceType
:members:
:private-members:
.. autoclass:: Interface
:members:
:private-members:
.. autoclass:: QueryInterface
:members:
:private-members:
.. autoclass:: FalseInterface
:members:
:private-members:
Provides some functions:
.. autofunction:: set_default_interface
.. autofunction:: interface_from_string
Provides some modules:
.. autosummary::
:toctree:
gpib_inter
modbus_inter
serial_inter
socket_inter
usbtmc_inter
visa_inter
"""
from time import sleep, monotonic
import warnings
from enum import IntEnum
import sys
import ipaddress
[docs]class PhysicalInterfaceType(IntEnum):
GPIB = 0
Ethernet = 1
Serial = 2
Modbus = 3
default_interface = {
PhysicalInterfaceType.GPIB: "VISAInterface",
PhysicalInterfaceType.Ethernet: "SocketInterface",
PhysicalInterfaceType.Serial: "SerialInterface",
PhysicalInterfaceType.Modbus: "MinimalModbusInterface",
}
if sys.platform.startswith("linux"):
default_interface[PhysicalInterfaceType.GPIB] = "GPIBInterface"
[docs]def set_default_interface(interface_type, interface_classname):
"""
Select the default class interface for the given interface type.
By default, GPIB uses VISAInterface class on Windows (based on NI-VISA)
and GPIBInterface class on Linux (based on Linux-GPIB), Ethernet uses
SocketInterface class, Seriel uses SerialInterface class.
This means that GPIB instrument can be instantiated as
Instrument('GPIB0::1::INSTR')
instead of Instrument(VISAInterface('GPIB0::1::INSTR'))
or Instrument(GPIBInterface(0,1)),
provided the Instrument class defines Instrument.default_physical_interface
to PhysicalInterfaceType.GPIB.
The behavior can be changed by this function, i.e.
import fluidlab.interfaces as fi
fi.set_default_interface(fi.PhysicalInterface.GPIB, 'VISAInterface')
to force VISAInterface on Linux,
or
fi.set_default_interface(fi.PhysicalInterface.Ethernet, 'VISAInterface')
to use VISAInterface class instead of SocketInterface class for network
connected devices.
"""
default_interface[interface_type] = interface_classname
def interface_classname_from_string(
name, default_physical_interface=None, **kwargs
):
classname = None
physical_interface = None
if "GPIB" in name:
physical_interface = PhysicalInterfaceType.GPIB
elif "ASRL" in name:
physical_interface = PhysicalInterfaceType.Serial
classname = "VISAInterface"
elif isinstance(name, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
name = str(name)
physical_interface = PhysicalInterfaceType.Ethernet
elif name.startswith("/dev/tty"):
physical_interface = PhysicalInterfaceType.Serial
classname = "SerialInterface"
else:
try:
_ = ipaddress.ip_address(name)
physical_interface = (
PhysicalInterfaceType.Ethernet
if default_physical_interface is not PhysicalInterfaceType.Modbus
else PhysicalInterfaceType.Modbus
)
except ValueError:
pass
if physical_interface is None:
physical_interface = default_physical_interface
if classname is None:
classname = default_interface[physical_interface]
return classname, physical_interface
[docs]def interface_from_string(name, default_physical_interface=None, **kwargs):
"""
Infer physical_interface from name if possible, or use the default provided
physical_interface otherwise.
i.e. if the names contains the physical interface explicitely,
then we use it,
eg.
GPIB0::1::INSTR is GPIB physical interface
ASRL0::INSTR is a VISA Serial address
192.168.0.1 is a IP address, so Ethernet interface
"""
classname, physical_interface = interface_classname_from_string(
name, default_physical_interface
)
if classname == "VISAInterface":
from fluidlab.interfaces.visa_inter import VISAInterface
return VISAInterface(name)
if classname == "GPIBInterface":
from fluidlab.interfaces.gpib_inter import GPIBInterface
if name.endswith("::INSTR"):
name = name[:-7]
boardname, devnum = name.split("::")
if boardname.startswith("GPIB"):
boardname = boardname[4:]
board = int(boardname)
dev = int(devnum)
return GPIBInterface(board, dev)
if classname == "SocketInterface":
from fluidlab.interfaces.socket_inter import (
TCPSocketInterface,
UDPSocketInterface,
)
protocol = kwargs.pop("ethernet_protocol", "tcp")
if protocol == "tcp":
return TCPSocketInterface(name, **kwargs)
elif protocol == "udp":
return UDPSocketInterface(name, **kwargs)
else:
raise ValueError("Wrong ethernet_protocol. Should be tcp or udp")
if classname == "SerialInterface":
from fluidlab.interfaces.serial_inter import SerialInterface
return SerialInterface(name, **kwargs)
if classname == "MinimalModbusInterface":
from fluidlab.interfaces.modbus_inter import MinimalModbusInterface
return MinimalModbusInterface(name, **kwargs)
if classname == "PyModbusInterface":
from fluidlab.interfaces.modbus_inter import PyModbusInterface
return PyModbusInterface(name, **kwargs)
raise ValueError("Unknown interface type")
class InterfaceWarning(Warning):
pass
[docs]class Interface:
def _open(self):
# do the actual open (without testing self.opened)
raise NotImplementedError
def _close(self):
# do the actual close (without testing self.opened)
raise NotImplementedError
def __init__(self, **kwargs):
self.opened = False
def open(self):
if not self.opened:
self._open()
self.opened = True
self.opening_timestamp = monotonic()
else:
warnings.warn(
"open() called on already opened interface.", InterfaceWarning
)
def close(self):
if self.opened:
self._close()
self.opened = False
else:
warnings.warn(
"close() called on already closed interface.", InterfaceWarning
)
def __enter__(self):
self.open()
return self
def __exit__(self, type_, value, cb):
self.close()
[docs]class QueryInterface(Interface):
def _write(self, *args, **kwargs):
# do the actual write
raise NotImplementedError
def _read(self, *args, **kwargs):
# do the actual read
raise NotImplementedError
def write(self, *args, **kwargs):
if not self.opened:
warnings.warn(
"write() called on non-opened interface.", InterfaceWarning
)
self.open()
self._write(*args, **kwargs)
def read(self, *args, **kwargs):
if not self.opened:
warnings.warn(
"read() called on non-opened interface.", InterfaceWarning
)
self.open()
return self._read(*args, **kwargs)
def query(self, command, time_delay=0.1, **kwargs):
if hasattr(self, "_query"):
if not self.opened:
warnings.warn(
"query() called on non-opened interface.", InterfaceWarning
)
self.open()
return self._query(command, **kwargs)
else:
self.write(command)
sleep(time_delay)
return self.read(**kwargs)
[docs]class FalseInterface(QueryInterface):
"""
Dummy interface
"""
def _open(self):
pass
def _close(self):
pass
def _write(self, s):
print(s)
def _read(self):
print("just return 0 since it is a false Interface class.")
return 0