Source code for fluidlab.daq.daqmx

"""
DAQmx (National Instruments, :mod:`fluidlab.instruments.daq.daqmx`)
===================================================================

.. todo:: DAQmx interface and drivers (using Comedi API?)...

Provides:

.. autofunction:: read_analog

.. autofunction:: write_analog

.. autofunction:: write_analog_end_task

.. autofunction:: measure_freq


"""

try:
    from collections.abc import Iterable
except ImportError:
    # For Python < 3.9
    from collections import Iterable
from numbers import Number
from platform import platform
import time

import numpy as np

import ctypes
import six

from PyDAQmx import Task, byref, float64, int32

from PyDAQmx import (
    DAQmx_Val_Cfg_Default,
    DAQmx_Val_RSE,
    DAQmx_Val_NRSE,
    DAQmx_Val_Diff,
    DAQmx_Val_Volts,
    DAQmx_AI_Coupling,
    DAQmx_Val_DC,
    DAQmx_Val_AC,
    DAQmx_Val_GND,
    DAQmx_Val_Rising,
    DAQmx_Val_FiniteSamps,
    DAQmx_Val_GroupByChannel,
    DAQmx_Val_Hz,
    DAQmx_Val_LowFreq1Ctr,
)

try:
    from PyDAQmx import DAQmx_Val_PseudoDiff
except ImportError:
    DAQmx_Val_PseudoDiff = None
    pass

from PyDAQmx.DAQmxFunctions import AttributeNotSupportedInTaskContextError

_coupling_values = {
    "DC": DAQmx_Val_DC,
    "AC": DAQmx_Val_AC,
    "GND": DAQmx_Val_GND,
}


def _parse_resource_names(resource_names):

    if isinstance(resource_names, str):
        if six.PY3 and isinstance(resource_names, str):
            resource_names = resource_names.encode("ascii")
        resource_names = [resource_names]
    elif isinstance(resource_names, Iterable):
        if six.PY3 and isinstance(resource_names[0], str):
            resource_names = [r.encode("ascii") for r in resource_names]
    else:
        raise ValueError("resource_names has to be a string or an iterable.")

    nb_resources = len(resource_names)

    return resource_names, nb_resources


[docs]def read_analog( resource_names, terminal_config, volt_min, volt_max, samples_per_chan=1, sample_rate=1, coupling_types="DC", output_filename=None, verbose=False, ): """Read from the analog input subdevice. Parameters ---------- resource_names: {str or iterable of str} Analogic input identifier(s), e.g. 'Dev1/ai0'. terminal_config: {'Diff', 'PseudoDiff', 'RSE', 'NRSE'} A type of configuration (apply to all terminals). volt_min : {number or iterable of numbers} Minima for the channels. volt_max : {number or iterable of numbers} Maxima for the channels. samples_per_chan: number Number of samples per channel to read. sample_rate: number Sample rate for all channels (Hz). coupling_types : {'DC', 'AC', 'GND', list of str} Type of coupling for each resource. output_filename: {None, str} If specified data is output into this file instead of output arrays. verbose: {False, boolean} If True, print more verbose message """ if output_filename is not None: raise NotImplementedError() # prepare resource_names resource_names, nb_resources = _parse_resource_names(resource_names) # prepare terminal_config if terminal_config is None: if verbose: print("DAQmx: Default terminal configuration will be used.") terminal_config = DAQmx_Val_Cfg_Default elif terminal_config == "RSE": if verbose: print("DAQmx: Referenced single-ended mode") terminal_config = DAQmx_Val_RSE elif terminal_config == "NRSE": if verbose: print("DAQmx: Non-referenced single-ended mode") terminal_config = DAQmx_Val_NRSE elif terminal_config == "Diff": if verbose: print("DAQmx: Differential mode") terminal_config = DAQmx_Val_Diff elif terminal_config == "PseudoDiff": if verbose: print("DAQmx: Pseudodifferential mode") terminal_config = DAQmx_Val_PseudoDiff else: raise ValueError("DAQmx: Unrecognized terminal mode") # prepare volt_min, volt_max if not isinstance(volt_min, Number) and len(volt_min) != nb_resources: raise ValueError( "volt_min has to be a number or an iterable of the same length " "as resource_names" ) if not isinstance(volt_max, Number) and len(volt_max) != nb_resources: raise ValueError( "volt_max has to be a number or an iterable of the same length " "as resource_names" ) if isinstance(volt_min, Number): volt_min = [volt_min] * nb_resources if isinstance(volt_max, Number): volt_max = [volt_max] * nb_resources # check samples_per_chan if not isinstance(samples_per_chan, int) or samples_per_chan <= 0: raise ValueError("samples_per_chan has to be a positive integer.") # prepare coupling_types if not isinstance(coupling_types, str) and len(coupling_types) != nb_resources: raise ValueError( "coupling_types has to be a number or an iterable " "of the same length as resource_names" ) if isinstance(coupling_types, str): coupling_types = [coupling_types] * nb_resources possible_keys_coupling = _coupling_values.keys() for coupling in coupling_types: if coupling not in possible_keys_coupling: raise ValueError(f"Bad value in coupling_types, got: {coupling}") if verbose: print("DAQmx: Create Task") task = Task() actual_volt_min = float64() actual_volt_max = float64() for ir, resource in enumerate(resource_names): if verbose: print( "DAQmx: Create AI Voltage Chan (" + str(resource) + " [" + str(volt_min[ir]) + "V;" + str(volt_max[ir]) + "V])" ) task.CreateAIVoltageChan( resource, "", terminal_config, volt_min[ir], volt_max[ir], DAQmx_Val_Volts, None, ) # Attention SetChanAttribute doit etre dans une deuxieme boucle # car dans le cas d'une acquisition multi-cartes, DAQmx impose que # toutes les voies soient ajoutees a la task avant de changer # quelque parametre for ir, resource in enumerate(resource_names): # check volt range try: task.GetAIRngHigh(resource, byref(actual_volt_max)) task.GetAIRngLow(resource, byref(actual_volt_min)) actual_volt_available = True except AttributeError: actual_volt_available = False # DAQmx Base if actual_volt_available: actual_vmin = actual_volt_min.value actual_vmax = actual_volt_max.value if actual_vmin != volt_min[ir] or actual_vmax != volt_max[ir]: print( "DAQmx: Actual range for " + str(resource) + " is actually [{:6.2f} V, {:6.2f} V].".format( actual_vmin, actual_vmax ) ) # set coupling coupling_value = _coupling_values[coupling_types[ir]] if verbose: for name, value in _coupling_values.items(): if value == coupling_value: print( "DAQmx: Setting AI channel coupling (" + str(resource) + "): " + name ) try: task.SetChanAttribute(resource, DAQmx_AI_Coupling, coupling_value) except AttributeNotSupportedInTaskContextError: print("Coupling attribute not supported on this device") # configure clock and DMA input buffer if samples_per_chan > 1: verbose_text = "DAQmx: Configure clock timing (" if verbose: if samples_per_chan < 1000: verbose_text += str(samples_per_chan) + " samp/chan @ " elif samples_per_chan < 1_000_000: verbose_text += str(samples_per_chan / 1000) + " kSamp/chan @ " else: verbose_text += str(samples_per_chan / 1_000_000) + " MSamp/chan @ " if sample_rate < 1000: verbose_text += "%.2f Hz using OnboardClock)" % sample_rate elif sample_rate < 1_000_000: verbose_text += "%.2f kHz using OnboardClock)" % (sample_rate / 1000.0) else: verbose_text += "%.2f MHz using OnboardClock)" % (sample_rate / 1e6) print(verbose_text) task.CfgSampClkTiming( "OnboardClock", sample_rate, DAQmx_Val_Rising, DAQmx_Val_FiniteSamps, samples_per_chan, ) if verbose: print("DAQmx: Configure DMA input buffer") task.CfgInputBuffer(samples_per_chan) # start task if verbose: if platform().startswith("Windows"): dateformat = "%A %d %B %Y - %X (%z)" else: dateformat = "%A %e %B %Y - %H:%M:%S (UTC%z)" starttime = time.time() starttime_str = time.strftime(dateformat, time.localtime(starttime)) endtime = starttime + samples_per_chan / sample_rate endtime_str = time.strftime(dateformat, time.localtime(endtime)) print("DAQmx: Starting acquisition: " + starttime_str) print( " Expected duration: %.2f min" % (samples_per_chan / (60.0 * sample_rate)) ) print(" Expected end time: " + endtime_str) task.StartTask() # read data # why 10? timeout = float(10 * samples_per_chan / sample_rate) buffer_size_in_samps = int(samples_per_chan * nb_resources) data = np.zeros((buffer_size_in_samps,), dtype=np.float64) samples_per_chan_read = int32() task.ReadAnalogF64( samples_per_chan, timeout, DAQmx_Val_GroupByChannel, data, buffer_size_in_samps, byref(samples_per_chan_read), None, ) if verbose: print("DAQmx: %d samples read." % samples_per_chan_read.value) return data.reshape([nb_resources, samples_per_chan])
[docs]def write_analog( resource_names, sample_rate=1, volt_min=-10.0, volt_max=10.0, signals=None, blocking=True, verbose=False, ): """Write analogic output Parameters ---------- resource_name: Analogic input identifier(s), e.g. 'Dev1/ao0'. sample_rate: number Frequency rate for all channels (Hz). volt_min: {number or iterable of numbers} Minima for the channels volt_max: {number or iterable of numbers} Maxima for the channels signals: numpy.ndarray or simple scalar The signal(s) to be output. blocking: bool Specifies whether to wait until the task is done before returning. If blocking=false, then a task object is returned. To stop the task, use the :func:`write_analog_end_task` function. """ # prepare resource_names resource_names, nb_resources = _parse_resource_names(resource_names) # prepare volt_min, volt_max if not isinstance(volt_min, Number) and len(volt_min) != nb_resources: raise ValueError( "volt_min has to be a number or an iterable of the same length " "as resource_names" ) if not isinstance(volt_max, Number) and len(volt_max) != nb_resources: raise ValueError( "volt_max has to be a number or an iterable of the same length " "as resource_names" ) if isinstance(volt_min, Number): volt_min = [volt_min] * nb_resources if isinstance(volt_max, Number): volt_max = [volt_max] * nb_resources if not isinstance(signals, (list, tuple, np.ndarray)): nb_samps_per_chan = 1 # if np.isscalar(signals)==True elif signals.ndim == 1: nb_samps_per_chan = len(signals) elif signals.ndim == 2: nb_samps_per_chan = signals.shape[1] else: raise ValueError("signals has to be a scalar or an array of dimension 1 or 2.") # create task if verbose: print("DAQmx: Create Task") task = Task() # create AO channels for ir, resource in enumerate(resource_names): if verbose: print( "DAQmx: Create A0 Voltage Chan (" + resource + " [" + str(volt_min[ir]) + "V;" + str(volt_max[ir]) + "V])" ) task.CreateAOVoltageChan( resource, "", volt_min[ir], volt_max[ir], DAQmx_Val_Volts, None ) # configure clock if nb_samps_per_chan > 1: verbose_text = "DAQmx: Configure clock timing (" if verbose: if nb_samps_per_chan < 1000: verbose_text = verbose_text + str(nb_samps_per_chan) + " samp/chan @ " elif nb_samps_per_chan < 1_000_000: verbose_text = ( verbose_text + str(nb_samps_per_chan / 1000) + " kSamp/chan @ " ) else: verbose_text = ( verbose_text + str(nb_samps_per_chan / 1_000_000) + " MSamp/chan @ " ) if sample_rate < 1000: verbose_text = verbose_text + ( "%.2f Hz using OnboardClock)" % sample_rate ) elif sample_rate < 1_000_000: verbose_text = verbose_text + ( "%.2f kHz using OnboardClock)" % (sample_rate / 1000.0) ) else: verbose_text = verbose_text + ( "%.2f MHz using OnboardClock)" % (sample_rate / 1e6) ) print(verbose_text) task.CfgSampClkTiming( "OnboardClock", sample_rate, DAQmx_Val_Rising, DAQmx_Val_FiniteSamps, nb_samps_per_chan, ) # write data written = int32() if nb_samps_per_chan == 1: task.WriteAnalogScalarF64(1, 10.0, signals, None) print(" Write voltage: " + ("%.2f Volts" % signals)) # 0,10== dont autostart + timeout 10sec, # http://zone.ni.com/reference/en-XX/help/370471AE-01/daqmxcfunc/daqmxwriteanalogscalarf64/ # task.WriteAnalogF64( # nb_samps_per_chan, 0, 10.0, DAQmx_Val_GroupByChannel, # signals.ravel(), byref(written), None) else: task.WriteAnalogF64( nb_samps_per_chan, 0, 10.0, DAQmx_Val_GroupByChannel, signals.ravel(), byref(written), None, ) # 0,10== dont autostart + timeout 10sec, # http://zone.ni.com/reference/en-XX/help/370471AE-01/daqmxcfunc/daqmxwriteanalogf64/ # start task if verbose: if platform().startswith("Windows"): dateformat = "%A %d %B %Y - %X (%z)" else: dateformat = "%A %e %B %Y - %H:%M:%S (UTC%z)" starttime = time.time() starttime_str = time.strftime(dateformat, time.localtime(starttime)) endtime = starttime + nb_samps_per_chan / sample_rate endtime_str = time.strftime(dateformat, time.localtime(endtime)) print("DAQmx: Starting write Task: " + starttime_str) print( " Expected duration: %.2f min" % (nb_samps_per_chan / (60.0 * sample_rate)) ) print(" Expected end time: " + endtime_str) task.StartTask() if blocking: task.WaitUntilTaskDone(1.1 * nb_samps_per_chan / sample_rate) task.StopTask() else: if verbose: print("DAQmx write done") return task
[docs]def write_analog_end_task(task, timeout=0.0): """This function ends a writing task that has been created with blocking=False. Parameters ---------- task : PyDAQmx.Task The task to end. timeout : number Time (in s) to wait before stopping the task if it is not done. """ task.WaitUntilTaskDone(timeout) task.StopTask() task.ClearTask()
[docs]def measure_freq(resource_name, freq_min=1, freq_max=1000): """Read analogic output Parameters ---------- resource_name: str Analogic input identifier, e.g. 'Dev1/ctr0'. freq_min : number The minimum frequency (Hz) that you expect to measure. freq_max : number The maximum frequency (Hz) that you expect to measure. """ # create task task = Task() # it seems that this argument is actually not used with the method # DAQmx_Val_LowFreq1Ctr. measure_time = 0.5 task.CreateCIFreqChan( resource_name, "", freq_min, freq_max, DAQmx_Val_Hz, DAQmx_Val_Rising, DAQmx_Val_LowFreq1Ctr, measure_time, 1, "", ) task.StartTask() timeout = 10 result = float64() _ = ctypes.POINTER(ctypes.c_uint)() task.ReadCounterScalarF64(timeout, byref(result), None) return result.value
if __name__ == "__main__": # data = read_analog( # resource_names='dev1/ai0', # terminal_config='Diff', # volt_min=-10, # volt_max=10, # samples_per_chan=10, # sample_rate=10, # coupling_types='DC') # data = read_analog( # resource_names=['dev1/ai{}'.format(ic) for ic in range(4)], # terminal_config='Diff', # volt_min=-10, # volt_max=10, # samples_per_chan=10, # sample_rate=10, # coupling_types='DC') # signals = np.cos(np.linspace(0, 2*np.pi, 100)) # write_analog('dev1/ao0', 10, signals, blocking=True) signals = np.cos(np.linspace(0, 2 * np.pi, 100)) signals = np.vstack((signals, signals + 2)) write_analog([f"dev1/ao{i}" for i in (0, 2)], 10, signals, blocking=True)