Source code for fluidlab.instruments.multiplexer.keithley_2700

"""keithley_2700
================

.. autoclass:: Keithley2700
   :members:
   :private-members:

"""

__all__ = ["Keithley2700"]

import numpy as np
import time


from fluiddyn.util.terminal_colors import cprint
from fluidlab.instruments.iec60488 import IEC60488
from fluidlab.instruments.features import SuperValue, BoolValue


[docs]class Keithley2700(IEC60488): """Driver for the multiplexer Keithley 2700 Series""" def __init__(self, interface=None): super().__init__(interface) self.Range = dict() self.NPLC = dict() def set_range(self, *, channelNumber=1, manualRange=False, rangeValue=None): if not manualRange and channelNumber in self.Range: del self.Range[channelNumber] elif manualRange: self.Range[channelNumber] = rangeValue
[docs] def set_nplc(self, *, channelNumber=1, nplcValue=1.0): """This function sets the integration time of the ADC. :param channelNumber: the channel for which to set the NPLC value :type channelNumber: int :param nplcValue: Integration time, expressed in terms of line frequency (PLC). It spans from 0.01 to 10. The default PLC is 1. :type nplcValue: float .. note:: Per the documentation, there is a relation between the integration time, the short name (fast, med, slow) and the effective number of digits ========== ==== ============= Short name NPLC Resolution ========== ==== ============= >Fast 0.01 3 1/2 digits Fast 0.1 5 1/2 digits Med 1.0 6 1/2 digits Slow 5.0 ========== ==== ============= """ nplcValue = float(nplcValue) self.NPLC[channelNumber] = nplcValue
[docs] def scan( self, channelList, functionName, samplesPerChan, sampleRate, verbose ): """Initiates a scan. This method is called by the features get method. :param channelList: channel number or iterable of channel numbers :type channelList: int or list :param functionName: measurement function to configure. Some possible values are VOLT:DC, VOLT:AC, FRES, RES, CURR:DC or CURR:AC. Refer to Keithley documentation for other functions. :type functionName: str :param samplesPerChan: Number of samples to be acquired on each channel. They are stored in the device buffer during acquisition (maximum 450000). Defaults to 1. :type samplesPerChan: int :param sampleRate: frequency of the internal clock used to trigger measurements. The instrument resolution is 1 ms. Defaults to 1 kHz (maximum frequency). :type sampleRate: float :param verbose: prints additionnal information for debugging purposes :type verbose: bool .. note:: If len(channelList) == 1 and samplesPerChan = 1, a one-shot measurement is performed, instead of a scan. """ # Make sure channelList is iterable try: channelList[0] except TypeError: channelList = [channelList] # Check Front/Rear switch if len(channelList) > 1 and self.front.get() == True: raise ValueError( "Cannot get several channels while Front/Read switch is Front" ) # Check number of points (max memory 450000 data points) if samplesPerChan * len(channelList) > 450_000: raise ValueError("Cannot request more than 450000 on Keithley 2700") # Check sampleRate if sampleRate: timeInterval = 1 / sampleRate if timeInterval < 1e-3: raise ValueError("The timer resolution is 1 ms") else: timeInterval = 1e-3 if channelList == [1]: # Lecture en face avant chan = 1 self.clear_status() self.interface.write("TRAC:CLE") # clear buffer self.interface.write( "INIT:CONT OFF" ) # disable continuous initialisation self.interface.write(f'SENS:FUNC "{functionName}"') # Set range if chan in self.Range: self.interface.write( "SENS:{func:}:RANG {rang:}".format( func=functionName, rang=self.Range[chan] ) ) else: self.interface.write(f"SENS:{functionName}:RANG:AUTO ON") # Set NPLC max_nplc = None if chan in self.NPLC: nplc = self.NPLC[chan] else: nplc = 1.0 # med (default value) if max_nplc is None or nplc > max_nplc: max_nplc = nplc self.interface.write(f"SENS:{functionName}:NPLC {nplc}") self.interface.write("FORM:ELEM READ,TST,CHAN") data = self.interface.query(f"READ?", time_delay=nplc / 50.0) start = time.monotonic() total_timeout = 5.0 + nplc / 50 while not data.endswith("\n"): # time_delay was insufficiant time.sleep(0.1) data += self.interface.read() if time.monotonic() - start > total_timeout: print("Timeout!") break # print(data) parsed = data.split(",") values = parsed[0] return (float(values),) else: ListeChan = "(@" + ",".join([str(c) for c in channelList]) + ")" self.clear_status() self.interface.write("TRAC:CLE") # clear buffer self.interface.write( "INIT:CONT OFF" ) # disable continuous initialisation # Set up the trigger subsystem if samplesPerChan > 1: self.interface.write("TRIG:SOUR TIM") self.interface.write(f"TRIG:TIM {timeInterval}") self.interface.write(f"TRIG:COUN {samplesPerChan}") self.interface.write("SAMP:COUN {:}".format(len(channelList))) # Measurement subsystem self.interface.write(f'SENS:FUNC "{functionName}", {ListeChan}') # Set range on specified channels for chan in channelList: if chan in self.Range: self.interface.write( "SENS:{func:}:RANG {rang:},(@{chan:})".format( func=functionName, rang=self.Range[chan], chan=chan ) ) else: self.interface.write( "SENS:{func:}:RANG:AUTO ON,(@{chan:})".format( func=functionName, chan=chan ) ) # Set NPLC max_nplc = None for chan in channelList: if chan in self.NPLC: nplc = self.NPLC[chan] else: nplc = 1.0 # med (default value) if max_nplc is None or nplc > max_nplc: max_nplc = nplc self.interface.write( "SENS:{func:}:NPLC {nplc:},(@{chan:})".format( func=functionName, nplc=nplc, chan=chan ) ) # Starts scan self.interface.write(f"ROUT:SCAN {ListeChan}") self.interface.write("ROUT:SCAN:TSO IMM") self.interface.write("ROUT:SCAN:LSEL INT") if samplesPerChan * len(channelList) > 1: self.interface.write("TRAC:CLE") # clear buffer self.interface.write( "TRAC:POIN {:}".format(samplesPerChan * len(channelList)) ) self.interface.write( "TRAC:NOT {:}".format(samplesPerChan * len(channelList) - 1) ) # notify on nth reading self.interface.write("TRAC:FEED SENS; FEED:CONT NEXT") # self.interface.write("TRIG:COUN 1") # self.interface.write("SAMP:COUN {:}".format(len(channelList))) self.interface.write("STAT:PRES") # Reset measure enable bits self.clear_status() # *CLS self.interface.write( "STAT:MEAS:ENAB 64" ) # Enable buffer bits B6 (buffer notify) (, 8, 9, 12, 13) self.event_status_enable_register.set(0) # *ESE 0 self.status_enable_register.set(1) # *SRE 1 self.interface.write("INIT:IMM") start_meas = time.monotonic() # self.wait_till_completion_of_operations() # *OPC if samplesPerChan > 1: tmo = 1000 * (samplesPerChan / sampleRate) * 1.5 else: tmo = 1000 * len(channelList) * max_nplc * (1 / 50) * 1.5 tmo *= 10 if tmo < 10e3: tmo = 10e3 # print("tmo =", tmo, "ms") try: self.interface.wait_for_srq(timeout=tmo) except: cprint.red("Error while waiting SRQ") else: cprint.green( "SRQ received after " f"{time.monotonic() - start_meas:.1f} seconds" ) # Unassert SRQ self.clear_status() # Query number of points in buffer npoints = int(self.interface.query("TRAC:POIN?")) print("npoints =", npoints) # Fetch data self.interface.write("FORM:ELEM READ,TST,CHAN") self.interface.write("TRAC:DATA?") data = "" start_fetch = time.monotonic() while True: try: data += self.interface.read() start_fetch = ( time.monotonic() ) # reset if something was returned except Exception: print("Timeout reading on interface") nread = len(data.split(",")) // 3 if nread == npoints: cprint.green("All datapoints read") break time.sleep(0.5) self.interface.write("TRAC:DATA?") if time.monotonic() - start_fetch > 15: cprint.red("Timeout fetching data") break else: # self.interface.write("TRIG:COUN {:}".format(samplesPerChan)) # self.interface.write("SAMP:COUN {:}".format(len(channelList))) self.interface.write("FORM:ELEM READ,TST,CHAN") data = self.interface.query("READ?") npoints = 1 self.interface.write(":ROUT:SCAN:LSEL NONE") # Parsing data # print(data.strip()) try: data = np.array([float(x) for x in data.split(",")]) except ValueError: print("K2700 returned:", data) raise # print(data.size//3, "values returned") if data.size // 3 != npoints: raise ValueError("Not all points were fetched") values = data[::3] timestamps = data[1::3] channels = data[2::3] if values.size != timestamps.size: raise ValueError("Error while parsing") if channels.size != timestamps.size: raise ValueError("Error while parsing") if samplesPerChan > 1: # returns timeStamp, value for each channelList retval = list() for channum, chan in enumerate(channelList): this_values = values[channum :: len(channelList)] this_timestamps = timestamps[channum :: len(channelList)] this_chans = channels[channum :: len(channelList)] if not (this_chans == chan).all(): raise ValueError("Error while parsing") retval.append(this_timestamps) retval.append(this_values) else: # returns values only retval = values return retval
class Keithley2700Value(SuperValue): """Custom :class:`Value` class for the Keithley 2700 features.""" def __init__(self, name, doc="", function_name=None): super().__init__(name, doc) self.function_name = function_name def _build_driver_class(self, Driver): name = self._name function_name = self.function_name setattr(Driver, name, self) def get( self, chanList=[1], samplesPerChan=1, sampleRate=None, verbose=None ): """Get """ + name if verbose is None: # default is verbose for acquisitions verbose = samplesPerChan > 1 result = self._driver.scan( chanList, function_name, samplesPerChan, sampleRate, verbose ) if len(result) == 1: result = result[0] return result self.get = get.__get__(self, self.__class__) def set(self, channel, value, warn=True): """Set """ + name # Makes sense for voltage on AO channels only if name == "vdc": self._driver.write_vdc(channel, value) else: raise ValueError("Specified value cannot be written") self.set = set.__get__(self, self.__class__) features = [ BoolValue( "front", doc="True if switch Front/Rear is on Front (read-only)", command_get=":SYST:FRSW?", ), Keithley2700Value("vdc", doc="DC voltage", function_name="VOLT:DC"), Keithley2700Value("vrms", doc="RMS voltage", function_name="VOLT:AC"), Keithley2700Value("ohm_4w", doc="4-wire resistance", function_name="FRES"), Keithley2700Value("ohm", doc="2-wire resistance", function_name="RES"), Keithley2700Value("idc", doc="DC current", function_name="CURR:DC"), Keithley2700Value("irms", doc="RMS current", function_name="CURR:AC"), ] Keithley2700._build_class_with_features(features) if __name__ == "__main__": from fluidlab.interfaces.gpib_inter import GPIBInterface with Keithley2700(GPIBInterface(0, 16)) as km: front = km.front.get() print("Front/Read switch:", front) if front: km.set_range(manualRange=False) km.set_nplc(nplcValue=10.0) v = km.vdc.get() print("v =", v) else: print("Single channel one-shot measurement") print(km.vdc.get(101)) print("Multiple channel one-shot measurement") R1, R2 = km.ohm.get([101, 102]) print(R1, R2) print("Single channel timeseries") ts, R = km.ohm.get(101, samplesPerChan=100, sampleRate=10.0) print("actual frame rate:", 1 / np.mean(ts[1:] - ts[:-1]), "Hz") import matplotlib.pyplot as plt plt.plot(ts, R, "o") plt.show()