"""pfeiffer_maxigauge
=====================
.. autoclass:: PfeifferMaxiGauge
:members:
:private-members:
"""
from functools import reduce
# from time import sleep
import numpy as np
from fluidlab.instruments.drivers import Driver
from fluidlab.interfaces import PhysicalInterfaceType
from fluidlab.instruments.features import Value
from fluiddyn.util.terminal_colors import cprint
__all__ = ["PfeifferMaxiGauge"]
codes = {
"ETX": b"\x03", # End of text (reset the interface)
"CR": b"\x0D", # Carriage return (go to beginning of line)
"LF": b"\x0A", # Line feed (advance by one line)
"ENQ": b"\x05", # Enquiry (request data transmission)
"ACK": b"\x06", # Acknowledge (positive report signal)
"NAK": b"\x15", # Negative acknowledge (negative report signal)
"ESC": b"\x1B",
} # Escape
class PfeifferMaxiGaugeException(Exception):
pass
class PfeifferMaxiGaugeValue(Value):
def __init__(self, name, doc="", mnemonic=b""):
super().__init__(
name,
doc,
command_set=None,
command_get=None,
check_instrument_value=False,
pause_instrument=0.5,
channel_argument=False,
)
self.mnemonic = mnemonic
def set(self, value):
self._driver.transmission(self.mnemonic, value)
def get(self, parameter=b""):
return self._driver.reception(self.mnemonic, parameter)
class PfeifferMaxiGaugePressureValue(PfeifferMaxiGaugeValue):
def get(self, sensor):
if isinstance(sensor, int):
sensor = str(sensor).encode("ascii")
elif isinstance(sensor, (list, tuple, np.ndarray)):
return [self.get(sen) for sen in sensor]
msg = super().get(parameter=sensor)
p = msg.split(b",")
status = int(p[0])
value = float(p[1])
if status == 3:
cprint.red("Sensor error on channel " + sensor.decode("ascii"))
elif status == 4:
cprint.red("Sensor is off on channel " + sensor.decode("ascii"))
elif status == 5:
cprint.red("No sensor on channel " + sensor.decode("ascii"))
elif status == 6:
cprint.red(
"Identification error on channel " + sensor.decode("ascii")
)
return value
class PfeifferMaxiGaugeOnOffValue(PfeifferMaxiGaugeValue):
def set(self, booleans):
if len(booleans) != 6:
raise ValueError("6 booleans are expected")
msg = reduce(
lambda a, b: a + b, [b",2" if b else b",1" for b in booleans]
)
try:
super().set(msg)
except PfeifferMaxiGaugeException:
pass
def get(self):
msg = super().get(parameter=b",0,0,0,0,0,0")
return [s == b"1" for s in msg]
[docs]class PfeifferMaxiGauge(Driver):
default_physical_interface = PhysicalInterfaceType.Serial
default_inter_params = {
"baudrate": 9600,
"bytesize": 8,
"parity": "N",
"stopbits": 1,
"timeout": 1.0,
"xonxoff": False,
"rtscts": False,
"dsrdtr": False,
}
def __init__(self, serialPort, debug=False):
super().__init__(serialPort)
self.debug = debug
def __enter__(self):
super().__enter__()
self.clear_interface()
print("Pfeiffer MaxiGauge:", self.program_version().decode("ascii"))
long_description = {
b"TPR/PCR": "Pirani Gauge or Pirani Capacitance Gauge",
b"IKR9": "Cold cathode to 1e-9 mbar",
b"IKR11": "Cold cathode to 1e-11 mbar",
b"PKR": "FullRange CC",
b"APR/CMR": "Linear sensor",
b"IMR": "Pirani / High Pressure",
b"PBR": "FullRange BA",
b"no Sensor": "No sensor",
b"no Ident": "No identification",
}
for i, sensor in enumerate(self.sensor_id()):
try:
desc = "(" + long_description[sensor] + ")"
except KeyError:
desc = ""
print(i + 1, sensor.decode("ascii"), desc)
return self
def clear_interface(self):
if self.debug:
print("-> ETX")
self.interface.write(codes["ETX"])
[docs] def transmission(self, mnemonics, parameters=b""):
"""
Transmission protocol:
-> Mnemonics [and parameters] <CR>[<LF>]
<- <ACK><CR><LF>
"""
msg = self.interface.query(mnemonics + parameters + b"\r")
if self.debug:
print("->", mnemonics + parameters)
ack_positif = codes["ACK"]
ack_negatif = codes["NAK"]
if msg == ack_positif:
if self.debug:
print("<- ACK")
elif msg == ack_negatif:
if self.debug:
print("<- NAK")
raise PfeifferMaxiGaugeException("Negative report signal received.")
else:
if self.debug:
if len(msg) == 1:
print("<- " + ("0x%x" % ord(msg)))
else:
print("<- " + msg)
raise PfeifferMaxiGaugeException(
'Unknown return value: "' + msg.decode("ascii") + '"'
)
[docs] def reception(self, mnemonics, parameters=b""):
"""
Reception protocol:
-> Mnemonics [and parameters] <CR>[<LF>]
<- <ACK><CR><LF>
-> <ENQ>
<- Measurement values or parameters <CR><LF>
"""
self.transmission(mnemonics, parameters)
if self.debug:
print("-> ENQ")
data = self.interface.query(codes["ENQ"])
if self.debug:
print("<-", data)
return data
def error_status(self):
msg = self.reception(b"ERR")
p = msg.split(b",")
status_1 = int(p[0])
status_2 = int(p[1])
if status_1 & 1:
print("Watchdog has responded")
elif status_1 & 2:
print("Task fail error")
elif status_1 & 4:
print("IDCX idle error")
elif status_1 & 8:
print("Stack overflow error")
elif status_1 & 16:
print("EPROM error")
elif status_1 & 32:
print("RAM error")
elif status_1 & 64:
print("EEPROM error")
elif status_1 & 128:
print("Key error")
elif status_1 & 4096:
print("Syntax error")
elif status_1 & 8192:
print("Inadmissible parameter")
elif status_1 & 16384:
print("No hardware")
elif status_1 & 32768:
print("Fatal error")
for i in range(6):
if status_2 & (1 << i):
print("Sensor {:d}: Measurement error".format(i + 1))
if status_2 & (1 << (i + 9)):
print("Sensor {:d}: Identification error".format(i + 1))
if status_1 == 0 and status_2 == 0:
print("No error")
def program_version(self):
return self.reception(b"PNR")
def sensor_id(self):
msg = self.reception(b"TID")
data = msg.split(b",")
return data
features = [
PfeifferMaxiGaugeOnOffValue(
"onoff", "Switch ON/OFF sensors", mnemonic=b"SEN"
),
PfeifferMaxiGaugePressureValue(
"pressure", "Pressure measurement", mnemonic=b"PR"
),
]
PfeifferMaxiGauge._build_class_with_features(features)
if __name__ == "__main__":
gauge = PfeifferMaxiGauge("COM1", debug=False)