Source code for fluidlab.objects.rotatingobjects

"""
Rotating objects (:mod:`fluidlab.objects.rotatingobjects`)
==========================================================

.. currentmodule:: fluidlab.objects.rotatingobjects

Provides:

.. autoclass:: RotatingObject
   :members:

.. autoclass:: InnerCylinderOldTC
   :members:

.. autoclass:: InnerCylinder
   :members:

.. autoclass:: RotatingTable
   :members:

.. autofunction:: create_rotating_objects_kepler

"""


from __future__ import division, print_function

import numpy as np
import numbers
import atexit
import os
import inspect
import glob
import time

from fluidlab.objects.boards import ObjectUsingBoard
from fluidlab.objects.boards import NIDAQBoard
from fluiddyn.util.timer import Timer
from fluiddyn.io import query
from fluiddyn.io import txt

import fluiddyn.output.figs as figs

from fluiddyn.util.daemons import DaemonThread

#  , DaemonProcess
# We can use either a deamon thread or a deamon process...
Daemon = DaemonThread
# Daemon = DaemonProcess


[docs]class RotatingObject(ObjectUsingBoard): name = "rotating object" rotation_rate_max = 2 * np.pi / 2.43 # (rad/s) voltage_max = 5.0 channel_class = 0 def __init__(self, rotation_rate, board=None, channel=None): if channel is not None: self.channel = channel else: self.channel = self.channel_class if board is None: board = NIDAQBoard() super().__init__(board=board) if hasattr(rotation_rate, "__call__"): self.rotation_rate_vs_t = rotation_rate self.rotation_rate = self.rotation_rate_vs_t(0.0) if not isinstance(self.rotation_rate, numbers.Number): raise ValueError( "If rotation_rate is a function, " "it should return a number." ) elif isinstance(rotation_rate, numbers.Number): self.rotation_rate = rotation_rate else: raise ValueError("rotation_rate should be a number or a function.") self.path_calib = os.path.join( os.path.dirname( os.path.abspath(inspect.getfile(inspect.currentframe())) ), "Calib_rotation_objects", ) if hasattr(self, "name"): self.path_calib = os.path.join( self.path_calib, self.name.capitalize().replace(" ", "_") ) if not os.path.exists(self.path_calib): os.makedirs(self.path_calib) # load all saved calibrations volts_old, periods_old = self.load_calibrations() if len(volts_old) > 0: cond = 1.0 / periods_old > 0 volts_old = volts_old[cond] periods_old = periods_old[cond] self.create_function_from_data(volts_old, periods_old) else: self.write("This rotating object should first be calibrated.") atexit.register(RotatingObject.stop, self) def set_rotation_rate(self, rotation_rate): self.rotation_rate = rotation_rate if self.board.works: self.write(f"{self.name:22s}; rot. rate: {rotation_rate}") voltage = self._voltage_from_rotation_rate(rotation_rate) if voltage < 0: voltage = 0 self._set_voltage(voltage) def _set_voltage(self, voltage): if self.board.works: self.board.out.set_voltage(voltage, channels=self.channel) def calibrate(self, voltage=3): self.write(f"calibration with voltage: {voltage:6.2f}") self._set_voltage(voltage) self.write("Please measure the period.") yes = False while not yes: yes = query.query_yes_no("Is it measure now?") self._set_voltage(0) period = query.query_number("What was the period?") self.write("Something else could be implemented here.") return period def calibrate_with_period(self, period=8): self.write(f"Calibrate with period: {period:6.2f}") voltage = self._voltage_from_rotation_rate(2 * np.pi / period) self.write(f"it should correspond to a voltage of {voltage:6.2f}") period = self.calibrate(voltage)
[docs] def load_calibrations(self): """Loads the data from the previous calibrations.""" volts, periods = [], [] cfiles = glob.glob(self.path_calib + "/volts_periods_*") cfiles = [cf for cf in cfiles if cf[-1] != "~"] if len(cfiles) == 0: return volts, periods for cf in cfiles: v, p = txt.quantities_from_txt_file(cf) volts.append(v) periods.append(p) volts = np.concatenate(volts) periods = np.concatenate(periods) return volts, periods
[docs] def plot_calibrations(self, volts=None, periods=None): """Plots the measurements of the saved calibrations.""" # load all saved calibrations volts_old, periods_old = self.load_calibrations() # plot figures = figs.Figures() fig = figures.new_figure( name_file="fig_calibration", fig_width_mm=190, fig_height_mm=150, size_axe=[0.13, 0.14, 0.83, 0.82], ) ax = fig.gca() ax.set_xlabel(r"$U$ (V)") ax.set_ylabel(r"$\Omega$ (rad/s)") # ax.set_xlim([0.95, 1.25]) Omegas_old = 2 * np.pi / periods_old if len(volts_old) > 2: ax.plot(volts_old, Omegas_old, "xg") cond = 1.0 / periods_old > 0 volts_old = volts_old[cond] periods_old = periods_old[cond] if len(volts_old) > 0: self.create_function_from_data(volts_old, periods_old) rr_for_plot = np.linspace(0.0, self.rotation_rate_max, 200) volts_for_plot = self._voltage_from_rotation_rate(rr_for_plot) ax.plot(volts_for_plot, rr_for_plot, "k-") if periods is not None and volts is not None: Omegas = 2 * np.pi / periods ax.plot(volts, Omegas, "xr") figs.show(block=True)
[docs] def create_function_from_data(self, volts, periods): """Creates a function from data.""" Omegas = 2 * np.pi / periods coeffs = np.polyfit(Omegas, volts, deg=2) self._voltage_from_rotation_rate = np.poly1d(coeffs)
# def _voltage_from_rotation_rate(self, rotation_rate): # return rotation_rate/self.rotation_rate_max*self.voltage_max def stop(self): if self.board is not False: self._set_voltage(0.0) def write(self, string): print(string)
[docs]class InnerCylinderOldTC(RotatingObject): rotation_rate_max = 2 * np.pi / 2.43 # (rad/s) voltage_max = 5.0 name = "Inner cylinder old TC"
[docs]class InnerCylinder(RotatingObject): rotation_rate_max = 1.18 # (rad/s) voltage_max = 5.0 name = "Inner cylinder new TC"
[docs]class RotatingTable(RotatingObject): rotation_rate_max = 2.5 # (rad/s) voltage_max = 5.0 name = "Rotating table" channel_class = 1
class DaemonRunningRotatingObject(Daemon): def __init__(self, rotating_object): super().__init__() self.ro = rotating_object def run(self): ro = self.ro ro.running = True ro.set_rotation_rate(ro.rotation_rate) if hasattr(ro, "rotation_rate_vs_t"): self._loop_time() def _loop_time(self, dt=0.2): ro = self.ro ro.write("start _loop_time " + self.ro.name) tstart = time.time() timer = Timer(dt) while self.keepgoing.value: tnow = time.time() t = tnow - tstart rr = ro.rotation_rate_vs_t(t) ro.set_rotation_rate(rr) timer.wait_tick() ro.write("exit loop") def stop(self): super().stop() def create_rotating_objects_pseudokepler(Omega_i, R_i, R_o, gamma): """Return two rotating objects related with the Kepler scaling law. Parameters ---------- Omega_i : number The rotation rate of the inner cylinder (in rad/s) R_i : number Radius of the inner cylinder. R_o : number Radius of the outer cylinder (same units as *R_i*). """ alpha_i = R_i / R_o mu_o = alpha_i ** gamma def Omega_o(t): return mu_o * Omega_i(t) inner_cylinder = InnerCylinder(rotation_rate=Omega_i) rotating_table = RotatingTable( rotation_rate=Omega_o, board=inner_cylinder.board ) return inner_cylinder, rotating_table
[docs]def create_rotating_objects_kepler(Omega_i, R_i, R_o): """Return two rotating objects related with the Kepler scaling law. Parameters ---------- Omega_i : number The rotation rate of the inner cylinder (in rad/s) R_i : number Radius of the inner cylinder. R_o : number Radius of the outer cylinder (same units as *R_i*). """ alpha_i = R_i / R_o mu_o = alpha_i ** (3 / 2) def Omega_o(t): return mu_o * Omega_i(t) inner_cylinder = InnerCylinder(rotation_rate=Omega_i) rotating_table = RotatingTable( rotation_rate=Omega_o, board=inner_cylinder.board ) return inner_cylinder, rotating_table
if __name__ == "__main__": import time inner_cylinder = InnerCylinder(rotation_rate=0) # def Omega_i(t): # Omega = 0.2 # time_rampe = 10 # t = t/time_rampe # if t < Omega: # ret = t*Omega # # elif t < 2*Omega: # # ret = Omega*(2-t) # else: # ret = 0 # # ret = Omega # return ret # R_i = 100 # R_o = 482/2 # rc, rt = create_rotating_objects_kepler(Omega_i, R_i, R_o) # ro.calibrate(1) # ro.calibrate(5) # ro.calibrate(4) # ro.calibrate_with_period(8) # ro.plot_calibrations() # daemon_rc = DaemonRunningRotatingObject(rc) # daemon_rt = DaemonRunningRotatingObject(rt) # daemon_rc.start() # daemon_rt.start() # time.sleep(100) # daemon_rc.stop() # daemon_rt.stop() # daemon.start() # time.sleep(22) # daemon.stop() # time.sleep(1) # daemon = DaemonRunningRotatingObject(ro) # daemon.start() # time.sleep(2)