Source code for fluidlab.objects.probes

"""
Probes (:mod:`fluidlab.objects.probes`)
=======================================

.. currentmodule:: fluidlab.objects.probes

Provides:

.. autoclass:: ConductivityProbe
   :members:

.. autoclass:: MovingConductivityProbe
   :members:


"""
from __future__ import division, print_function

import numpy as np
import os
import inspect
import glob

from fluiddyn.io import txt

from fluiddyn.util import time_as_str
from fluiddyn.io import query
import fluiddyn.output.figs as figs

from fluidlab.objects.traverse import Traverse

from fluidlab.objects.boards import ObjectUsingBoard

from fluidlab.objects.pinchvalve import PinchValve, tube_as_opened_as_possible

# class TemperatureProbe(ObjectUsingBoard):
#     """A class handling the temperature probe."""
#     def __init__(self, board=None):

#         super().__init__(board=board)


[docs]class ConductivityProbe(ObjectUsingBoard): """Represent a conductivity probe. This is an example of modification. Parameters ---------- board : ??? Object representing a board. channel : number The channel index. """ def __init__( self, board=None, channel=1, sample_rate=1000, has_to_config_board=True, VALVE=True, ): super().__init__(board=board) self.channel = channel if has_to_config_board and board is not None and self.board is not False: self.board.ain.configure( sample_rate, channels=self.channel, range10V=True, bipolar=True, # False, gain_channels=1, differential=False, ) self.sample_rate = self.board.ain.freq_used if self.board is not None and self.board is not False: self.ind_channel = self.board.ain.channels.index(self.channel) if VALVE and self.board is not None and self.board is not False: self.valve = PinchValve(board, channel=0) else: self.valve = None self.path_calib = ( os.path.dirname( os.path.abspath(inspect.getfile(inspect.currentframe())) ) + "/Calib_conductivity_probe" ) # load all saved calibrations rhos_old, volts_old = self.load_calibrations() if len(rhos_old) > 0: self.create_function_from_data(rhos_old, volts_old)
[docs] def prepare_calibration(self, rho_min=1, rho_max=1.18, nb_solutions=6): """Gives indications to prepare a calibration.""" rhos = np.linspace(rho_min, rho_max, nb_solutions) print("Possible densities:") print("rhos:", rhos) print( """These solutions can be prepared by mixing the two first solutions with extreme densities with the following volume ratio: """ ) print("V_rho_max/V_tot :", (rhos - rho_min) / (rho_max - rho_min))
[docs] def calibrate(self, rhos, duration_1measure=4.0): r"""Calibrates the probe. Parameters ---------- rhos : array_like The density :math:`\rho` of the samples (in kg/l). duration_1measure : number The duration of one measurement (in s). Notes ----- :math:`\rho(C)` and :math:`U(C, T)`, where :math:`C` the concentration, :math:`U` the voltage and :math:`T` the temperature. """ sample_rate_old = self.sample_rate self.set_sample_rate(1000.0) voltages = np.empty(rhos.shape) for ir, rho in enumerate(rhos): # measure voltages answer = query.query( f"\nPut the probe in solution with rho = {rho}\n" + "Ready? [Y / no, cancel the calibration] " ) if answer.startswith("n"): print("Calibration cancelled...") self.set_sample_rate(sample_rate_old) return happy = False while not happy: volts = self.measure_volts(duration_1measure) with open(self.path_calib + "/temp", "w") as f: f.write(repr(volts)) volt = np.median(volts) voltages[ir] = volt print(f"solution rho: {rho} ; voltage: {volt}") happy = query.query_yes_no("Are you happy with this measurement?") # load all saved calibrations rhos_old, volts_old = self.load_calibrations() # plot figures = figs.Figures() fig = figures.new_figure( name_file="fig_calibration", fig_width_mm=190, fig_height_mm=150, size_axe=[0.13, 0.14, 0.83, 0.82], ) ax = fig.gca() ax.set_xlabel(r"$\rho$") ax.set_ylabel(r"$U$ (V)") ax.set_xlim([0.95, 1.25]) ax.plot(rhos, voltages, "or") if len(volts_old) > 2: ax.plot(rhos_old, volts_old, "xg") self.create_function_from_data(rhos_old, volts_old) volts_for_plot = np.linspace(0.0, 10.0, 200) ax.plot(self.rho_from_volt(volts_for_plot), volts_for_plot, "k-") print("rhos:\n", rhos) print("volts:\n", voltages) try: print( "rhos computed from the voltages " "and from previous calibrations:\n", self.rho_from_volt(voltages), ) except AttributeError: pass print( "Warning: bug with anaconda and non-blocking show().\n" "Close the windows to continue." ) figs.show(block=True) # save the measures done before if query.query_yes_no("Do you want to save these measurements?"): self.save_calibration(rhos, voltages) volts = np.concatenate([volts_old, voltages]) rhos = np.concatenate([rhos_old, rhos]) self.create_function_from_data(rhos, volts) self.set_sample_rate(sample_rate_old)
[docs] def plot_calibrations(self, rhos=None, volts=None, rho_real=None): """Plots the measurements of the saved calibrations.""" # load all saved calibrations rhos_old, volts_old = self.load_calibrations() # plot figures = figs.Figures() fig = figures.new_figure( name_file="fig_calibration", fig_width_mm=190, fig_height_mm=150, size_axe=[0.13, 0.14, 0.83, 0.82], ) ax = fig.gca() ax.set_xlabel(r"$\rho$") ax.set_ylabel(r"$U$ (V)") ax.set_xlim([0.95, 1.25]) if len(volts_old) > 2: ax.plot(rhos_old, volts_old, "xg") volts_for_plot = np.linspace(0.0, volts_old.max(), 200) ax.plot(self.rho_from_volt(volts_for_plot), volts_for_plot, "k-") if rhos is not None and volts is not None: ax.plot(rhos, volts, "xr") if rho_real is not None: ax.plot(rho_real, volts.mean(), "ob") figs.show(block=True)
[docs] def save_calibration(self, rhos, voltages): """Saves the results of a calibration.""" name_file = "rhos_voltages_" + time_as_str() print("Save the results of the calibration in file:", name_file) txt.save_quantities_in_txt_file( self.path_calib + "/" + name_file, (rhos, voltages) )
[docs] def load_calibrations(self): """Loads the data from the previous calibrations.""" rhos, volts = [], [] cfiles = glob.glob(self.path_calib + "/rhos_voltages_*") cfiles = [cf for cf in cfiles if cf[-1] != "~"] if len(cfiles) == 0: return rhos, volts for cf in cfiles: rho, volt = txt.quantities_from_txt_file(cf) rhos.append(rho) volts.append(volt) rhos = np.concatenate(rhos) volts = np.concatenate(volts) return rhos, volts
[docs] def create_function_from_data(self, rhos, volts): """Creates a function from data.""" coeffs = np.polyfit(volts, rhos, deg=3) self.rho_from_volt = np.poly1d(coeffs)
[docs] def set_sample_rate(self, sample_rate): """Sets the sample rate.""" self.board.ain.configure(sample_rate) self.sample_rate = self.board.ain.freq_used
[docs] def measure_volts( self, duration, sample_rate=None, return_time=False, verbose=False ): """Measure and return the times and voltages. Parameters ---------- duration : (in s) """ if sample_rate is not None and sample_rate != self.sample_rate: self.set_sample_rate(sample_rate) nb = abs(np.round(duration * self.sample_rate)) with tube_as_opened_as_possible(self.valve): volts = self.board.ain(nb) volts = volts[self.ind_channel].transpose() if verbose: print("volts:\n", volts) print("volts.mean():", volts.mean()) if return_time: ts = 1.0 / self.sample_rate * np.arange(1, nb + 1) return ts, volts else: return volts
[docs] def measure( self, duration, sample_rate=None, return_time=False, verbose=False ): """Measure and return the times and density. Parameters ---------- duration : number (in s) sample_rate : number (in Hz) """ if not hasattr(self, "rho_from_volt"): raise ValueError( """Since no data from previous calibrations has been found the function measure can not be used. The function measure_volts can be used instead.""" ) results = self.measure_volts( duration, sample_rate=sample_rate, verbose=verbose ) if return_time: ts, volts = results else: volts = results rhos = self.rho_from_volt(volts) if verbose: print("rhos:\n", rhos) print("rhos.mean():", rhos.mean()) if return_time: return ts, rhos else: return rhos
[docs] def test_measure(self, duration=2, hastoplot=True, rho_real=None): """Test the measurement.""" volts = self.measure_volts(duration=duration) rhos = self.rho_from_volt(volts) volt = volts.mean() rho = rhos.mean() print( f" rho_real: {rho_real:7.5f},\n" + "mean measurement;\n" + f" rho: {rho:7.5f}, volt: {volt:7.5f}\n" + "For the calibration file:\n" + f" {rho_real:7.5f} {volt:7.5f}" ) if hastoplot: self.plot_calibrations(rhos, volts, rho_real=rho_real)
[docs]class MovingConductivityProbe(ConductivityProbe, Traverse): """Represent a conductivity probe that can be deplaced by a traverse. Parameters ---------- (for the __init__ method) board : , optional Acquisition board. channel : {1, int}, optional Indice of the channel in the acquisition board. sample_rate : {100, number}, optional Sample rate of the probe. has_to_config_board : {True, bool}, optional Has to configure the board. position_start : {300., number}, optional Position when created (in mm). position_max : {None, number}, optional ??? Deltaz : {400, number}, optional Distance between extremal points. """ def __init__( self, board=None, channel=1, sample_rate=100, has_to_config_board=True, position_start=300.0, position_max=None, Deltaz=400.0, ): ConductivityProbe.__init__( self, board=board, channel=channel, sample_rate=sample_rate, has_to_config_board=has_to_config_board, ) Traverse.__init__( self, board=board, position_start=position_start, position_max=position_max, Deltaz=Deltaz, )
[docs] def move_measure( self, deltaz=100, speed=100, sample_rate=None, return_time=False ): """Move the probe while measuring. Parameters ---------- deltaz: number Distance to move (in mm). speed: number (in mm/s) sample_rate: number (in Hz) """ duration = abs(deltaz / speed) super().move(deltaz=deltaz, speed=speed) return super().measure( duration, sample_rate=sample_rate, return_time=return_time )
if __name__ == "__main__": # from fluiddyn.util.timer import Timer from fluidlab.objects.boards import PowerDAQBoard board = PowerDAQBoard() sprobe = MovingConductivityProbe(board=board) # sprobe.prepare_calibration(rho_max=1.18) # rhos = np.array([0.9987, # 1.0207, 1.0394, 1.0753, # 1.1108, 1.1315, 1.1504]) # sprobe.calibrate(rhos=rhos) # distance = 200. # sprobe.set_sample_rate(5) # timer = Timer(4) # for i in xrange(4): # print(sprobe.move_measure(deltaz=-distance, speed=300)) # time.sleep(0.2) # sprobe.move(deltaz=distance, speed=100, bloquing=True) # timer.wait_tick() # sprobe.test_measure(rho_real=1.097727) # time.sleep(2) # volts = sprobe.measure_volts(duration=4) # print(volts.mean()) # print(len(rhos)) # sprobe.plot_calibrations()