Source code for fluidlab.objects.traverse_lxm32

"""Traverses (:mod:`fluidlab.objects.traverse_lxm32`)
=====================================================

.. currentmodule:: fluidlab.objects.traverse_lxm32

Provides:

.. autoclass:: Traverse
   :members:
   :undoc-members:

- 192.168.28.11 - Variateur1
- 192.168.28.12 - Variateur2
- 192.168.28.13 - Variateur3
- 192.168.28.14 - Variateur4
- 192.168.28.19 - PC pilote Muriel

"""

from __future__ import division, print_function

from fluidlab.util.calcul_track import (
    make_track_sleep_1period_tbottom,
    concatenate,
)
from fluiddyn.io import query

import os
import sys
from time import time, sleep, ctime
from threading import Thread

import numpy as np

try:
    from u3 import U3
except ImportError:
    pass

from fluiddyn.util.timer import Timer
from fluiddyn.util import time_as_str

from fluidlab.objects.motors.lxm32_modbus import Motor
from fluidlab.util.util import make_ip_as_str

volt_no_movement = 2.5

path_dir = os.path.expanduser(r"~\.fluidlab")
if not os.path.exists(path_dir):
    os.makedirs(path_dir)


class TraverseError(ValueError):
    pass


def define_track_profilometer(
    z_max, z_min, v_up, v_down, acc, dacc, dt, t_exp, t_bottom, t_period=None
):

    times1, positions1, speeds1, t_total = make_track_sleep_1period_tbottom(
        z_max, z_min, v_up, v_down, acc, dacc, dt, t_bottom, t_period
    )
    nb_periods = int(round(t_exp / t_total, 0))
    times, speeds, positions = concatenate(
        times1, speeds1, positions1, nb_periods
    )

    return times, speeds, positions


[docs]class Traverse: def __init__(self, ip_modbus=None, const_position=1.062, offset_abs=None): self.motor = Motor(ip_modbus=ip_modbus, disable_limit_switches=False) self.movement_allowed = True self._const_position = const_position self.ip_modbus = ip_modbus sleep(0.2) if self.motor.state == "Fault (9)": self.motor.fault_reset() sleep(0.1) self.enable() if self.motor.state == "Ready To Switch (4)": self.enable() sleep(0.1) self._offset_abs = offset_abs self.u3 = U3() self._use_u3 = False self.u3.writeRegister(5000, volt_no_movement) # Load calibration* try: self._coef_meter_per_rot = self._load_calibration() except IOError: self._coef_meter_per_rot = 1e6 print( "Calibration file doesn't exist. Use function self.calibrate()." )
[docs] def get_absolute_position(self): # coeff1 = 1.0273e6 # Value chosen by hand coeff1 = 1e6 # Variateur 1, 2 et 3 # coeff2 = 8e5 if self._offset_abs: abs_pos = self.motor.get_position_actual() / coeff1 - self._offset_abs else: abs_pos = self.motor.get_position_actual() / coeff1 return abs_pos
[docs] def get_relative_position(self): return self._const_position - self.get_absolute_position()
[docs] def set_relative_position(self, pos): self._const_position = pos + self.get_absolute_position()
# _coef_meter_per_rot = 2.65e-4 # _coef_meter_per_rot = 9.7303e-7 # _coef_meter_per_rot = 9.729128113524125e-7 # _coef_meter_per_rot = 9.994783e-7
[docs] def close(self): self.motor.close()
[docs] def stop_all(self): self.movement_allowed = False self.motor.stop_rotation()
[docs] def enable(self): self.movement_allowed = 1 self.motor.enable()
[docs] def disable(self): self.movement_allowed = 0 self.motor.disable()
[docs] def get_state(self): return self.motor.get_state()
[docs] def is_displacement_possible(self, displacement, position=None): upper_limit = 1.2 lower_limit = 0.01 if position is None: position = self.get_absolute_position() new_position = position - displacement if new_position > upper_limit or new_position < lower_limit: raise TraverseError( "Travelled distance too long (absolute position: {}).".format( new_position ) ) return True
[docs] def calibrate(self, delta_t, rpm, save=False): """Function to compute _coef_meter_per_rot. Parameters ---------- delta_t : float Time of the translation (s). rpm : int Rotation speed of the motor (rotation per minute, can be negative). save : bool Save calibration file Notes ----- The motor turns during delta_t at rpm. With the position sensor, this function can evaluate the translation speed of the carriage. Normally, _coef_meter_per_rot has to be constant whatever the value of rpm. But for small rotation speed, it can be false. """ self.motor.set_target_rotation_rate(rpm) sleep(2) l1 = self.get_absolute_position() sleep(delta_t) l2 = self.get_absolute_position() self.motor.set_target_rotation_rate(0) travelled_length = abs(l2 - l1) speed = travelled_length / delta_t coef_meter_per_rot = speed / abs(rpm) print( "rpm: {}\nspeed: {}\n_coef_meter_per_rot: {:.4e}".format( abs(rpm), speed, coef_meter_per_rot ) ) # Save calibration file if save: path = os.path.join( path_dir, make_ip_as_str(self.ip_modbus) + "calibration.txt" ) with open(path, "w") as f: f.write(f"coef_meter_per_rot = {coef_meter_per_rot}") # Use the calibration coefficient self._coef_meter_per_rot = coef_meter_per_rot return coef_meter_per_rot
def _load_calibration(self): """Loads the calibration of the traverse. Returns ------- _coef_meter_per_rot : float Value of the calibration """ path = os.path.join( path_dir, make_ip_as_str(self.ip_modbus) + "calibration.txt" ) with open(path, "r") as f: coef_meter_per_rot = float(f.readlines()[0].split("=")[1]) return coef_meter_per_rot
[docs] def set_target_speed(self, speed): self.motor.set_target_rotation_rate(-speed / self._coef_meter_per_rot)
[docs] def move(self, displacement, speed=0.05): """Move the carriage. Parameters ---------- displacement : float Distance in meter. speed : float Velocity of displacement in m/s (positive) """ self.is_displacement_possible(displacement) direction = -np.sign(displacement) time_theo = abs(displacement / speed) d0 = self.get_relative_position() self.motor.set_target_rotation_rate( direction * speed / self._coef_meter_per_rot ) tstart = time() while time() - tstart < time_theo: sleep(0.02) if not self.movement_allowed: break self.motor.set_target_rotation_rate(0) d1 = self.get_relative_position() print( "Travelled distance : {}\tError : {}".format( abs(d1 - d0), abs(abs(d1 - d0) - abs(displacement)) ) )
[docs] def go_to(self, position_to_go, speed=0.05, relative=True): """ move the platform from your position to the absolute position. Parameters ---------- position_to_go : float Position in m. speed : float Translation velocity (m/s, positive). absolute : {True, bool} Absolute or relative position. """ print( "go_to({}, speed={}, relative={})".format( position_to_go, speed, relative ) ) if relative: get_pos = self.get_relative_position else: get_pos = self.get_absolute_position current_position = get_pos() displacement = position_to_go - current_position self.move(displacement, speed) new_pos = get_pos() error_pos = abs(new_pos - position_to_go) if ( error_pos > 0.005 and abs(new_pos - current_position) > 0.5 * abs(displacement) and self.movement_allowed ): self.go_to(position_to_go, relative=relative)
[docs] def make_profiles( self, z_min, z_max, speed_down, speed_up=None, period=None, nb_periods=None, ): if speed_up is None: speed_up = speed_down distance = z_max - z_min if distance < 0: raise ValueError("z_max < z_min") if period is None: period = distance * (speed_down + speed_up) elif distance * (speed_down + speed_up) + 2 > period: raise ValueError("distance * (speed_down + speed_up) > period.") self.go_to(z_max, speed_up) i_period = 0 timer = Timer(period) while True: if ( not self.movement_allowed or nb_periods is not None and i_period >= nb_periods ): break print(f"profile number {i_period}") print("go down") self.go_to(z_min, speed_down) print("go up") self.go_to(z_max, speed_up) i_period += 1 if not self.movement_allowed: break timer.wait_tick()
[docs] def record_positions(self, duration, dtime=0.2): path = os.path.join( path_dir, "positions_traverse_{}.txt".format(time_as_str()) ) with open(path, "w") as f: timer = Timer(dtime) f.write( "# time start (since Epoch): {:.4f} s ({})\n".format( timer.tstart, ctime(timer.tstart) ) + "# time, position\n" ) t_loop = 0.0 while t_loop < duration: pos = self.get_relative_position() t = time() f.write("{:.4f},{:.4f}\n".format(t - timer.tstart, pos)) f.flush() t_loop = timer.wait_tick()
[docs] def record_positions_async(self, duration, dtime=0.2): thread = Thread(target=self.record_positions, args=(duration, dtime)) thread.start() return thread
[docs] def make_profiles_from_track(self, times, speeds, positions, save=False): speeds = abs(speeds) speeds[speeds == 0] = speeds[1] # Go to starting position it = 0 pos_start = positions[0] print("****** Go to starting position ******") traverse.go_to(pos_start, speed=0.05, relative=True) if save: pat_to_save = os.path.join(path_dir, "profilometers_motion.txt") f = open(path_to_save, "w") f.write( "it = {} ; t = {} ; z = {} ; v = {} ; ".format( it, times[0], positions[0], speeds[0] ) ) # Follow track and track register for it, t in enumerate(times[1:-1]): it += 1 print("it = ", it) pos_to_go = float(positions[it]) speed = float(speeds[it]) traverse.go_to(pos_to_go, speed, relative=True) if save: f.write( "it = {} ; t = {} ; z = {} ; v = {} ; ".format( it, times[it], positions[it], speeds[it] ) ) if save: f.close()
[docs] def follow_track( self, times, speeds, positions, fact_multiplicatif_accel=1, coeff_smooth=2.0, periodic=True, use_u3=False, ): if use_u3: self.u3.writeRegister(5000, volt_no_movement) dt = timestep = times[1] - times[0] self.times = times self.speeds = speeds self.positions = positions rotation_rates = self.rotation_rates = ( self.speeds / self._coef_meter_per_rot ) # to decrease discretization error speeds[:-1] = (speeds[:-1] + speeds[1:]) / 2 eps = 0.001 # an error of eps m in position is accepted error_v = 0.0001 x_measured = [] v_sent = [] t_sent = [] acc = abs((rotation_rates[1] - rotation_rates[0])) / timestep or 600 self.motor.set_acceleration(acc * fact_multiplicatif_accel) t_start = time() path_out = os.path.join( path_dir, time_as_str() + "_track_profiler_" + self.ip_modbus + ".txt" ) f = open(path_out, "w") f.write( "# track_profiler " + self.ip_modbus + f"\n# time since epock: {t_start}" + "\n# time(s), position (m)\n" ) f.flush() timer = Timer(dt) v = speeds[0] t = time() - t_start x = self.get_relative_position() x_measured.append(x) v_sent.append(v) t_sent.append(time() - t_start) self.set_target_speed(v) f.write(f"{t:.4f},{x:.4f}\n") f.flush() timer.wait_tick() for it, t in enumerate(times[1:-1]): it += 1 v_theo = speeds[it] t = time() - t_start x = self.get_relative_position() if not self.movement_allowed: print("not self.movement_allowed") self.motor.set_target_rotation_rate(0) self.motor.run_quick_stop() raise TraverseError("not self.movement_allowed.") error = abs(positions[it] - x) if error > 0.02: print( "Large error position carriage! error = " "{}:\n".format(error) + "STOP traverse " + self.ip_modbus ) self.set_target_speed(0) self.motor.run_quick_stop() return v_should_go = (positions[it + 1] - x) / timestep if use_u3: if abs(v_theo) <= error_v: value = volt_no_movement elif v_theo < -error_v: value = 5.0 else: value = 0.0 self.u3.writeRegister(5000, value) if error >= eps: v_target = (coeff_smooth * v_theo + v_should_go) / ( coeff_smooth + 1 ) else: v_target = v_theo if error > 5 * eps: print( ( "Warning ({}): error = {:4.3f} m, " "v_target:{:4.3f} m/s" ).format(self.ip_modbus, error, v_target) ) sys.stdout.flush() acc = ( abs(rotation_rates[it + 1] - rotation_rates[it - 1]) / (2 * timestep) or 600 ) self.motor.set_acceleration(acc * fact_multiplicatif_accel) t_sent.append(time() - t_start) self.set_target_speed(v_target) x_measured.append(x) v_sent.append(v_target) f.write(f"{t:.4f},{x:.4f}\n") f.flush() if self.motor.state != "Operation Enabled (6)": print("Error: motor not enable, " + self.ip_modbus) self.set_target_speed(0) self.motor.run_quick_stop() return if it % 10 == 0: print( "traverse " + self.ip_modbus + f", it = {it}, t = {t:.4f} s" ) sys.stdout.flush() timer.wait_tick() t = time() - t_start x = self.get_relative_position() if periodic: pos_end = positions[0] else: pos_end = positions[-1] v = (pos_end - x) / timestep x_measured.append(x) v_sent.append(v) t_sent.append(time() - t_start) self.set_target_speed(v) f.write(f"{t:.4f},{x:.4f}\n") f.flush() f.close() timer.wait_tick() self.motor.set_acceleration(1000) self.motor.set_target_rotation_rate(0) if use_u3: self.u3.writeRegister(5000, volt_no_movement) self.x_measured = np.array(x_measured) self.v_sent = np.array(v_sent) self.t_sent = np.array(t_sent) return self.x_measured
class Traverses: def __init__(self, ip_addresses=None, const_positions=None, offset_abs=None): if ip_addresses is None: ip_addresses = ["192.168.28.11", "192.168.28.12", "192.168.28.13"] if const_positions is None: const_positions = [1.046_875] * 3 if offset_abs is None: offset_abs = [0.0] * 3 self.ip_addresses = ip_addresses self.movement_allowed = True self.traverses = [] for ind_traverse, ip in enumerate(ip_addresses): traverse = Traverse( ip_modbus=ip, const_position=const_positions[ind_traverse], offset_abs=offset_abs[ind_traverse], ) self.traverses.append(traverse) self.__dict__["traverse" + str(ind_traverse)] = traverse try: self.u3 = U3() self._use_u3 = True self.u3.writeRegister(5000, volt_no_movement) except NameError: self._use_u3 = False def close(self): for traverse in self.traverses: traverse.close() def stop_all(self): self.movement_allowed = False for traverse in self.traverses: traverse.stop_all() def enable(self): self.movement_allowed = True for traverse in self.traverses: traverse.enable() def disable(self): for traverse in self.traverses: traverse.disable() def get_relative_position(self): return self.traverse0.get_relative_position() def go_to(self, position_to_go, speed=0.05, relative=True): """ Go to a position. Parameters ---------- position_to_go : float Position in m. speed : float Translation velocity (m/s, positive). relative : {True, bool} Absolute or relative position. """ if self._use_u3: if relative: displacement = position_to_go - self.get_relative_position() sign = np.sign(displacement) else: raise NotImplementedError if sign > 0: value = 5.0 else: value = 0.0 self.u3.writeRegister(5000, value) threads = [] for traverse in self.traverses: thread = Thread( target=traverse.go_to, args=(position_to_go,), kwargs={"speed": speed, "relative": relative}, ) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join() self.u3.writeRegister(5000, volt_no_movement) def make_profiles( self, z_min, z_max, speed_down, speed_up=None, period=None, nb_periods=None, ): if speed_up is None: speed_up = speed_down distance = z_max - z_min if distance < 0: raise ValueError("z_max < z_min") if period is None: period = distance * (speed_down + speed_up) elif distance * (speed_down + speed_up) + 2 > period: raise ValueError("distance * (speed_down + speed_up) > period.") self.go_to(z_max, speed_up) path = os.path.join( path_dir, "make_profiles_{}.txt".format(time_as_str()) ) with open(path, "w") as f: f.write( time_as_str(2) + "\nmake_profiles(z_min=" "{}, z_max={}, speed_down={},\n".format(z_min, z_max, speed_down) + f" speed_up={speed_up},\n" + " period={}, nb_periods={})\n".format( period, nb_periods ) ) i_period = 0 timer = Timer(period) while True: if ( not self.movement_allowed or nb_periods is not None and i_period >= nb_periods ): break print(f"profile number {i_period}") print("go down") self.go_to(z_min, speed_down) print("go up") self.go_to(z_max, speed_up) i_period += 1 if ( not self.movement_allowed or nb_periods is not None and i_period >= nb_periods ): break timer.wait_tick() def make_profiles_from_track(self, times, speeds, positions, save=False): speeds = abs(speeds) speeds[speeds == 0] = speeds[1] # Go to starting position it = 0 pos_start = positions[0] print("****** Go to starting position ******") self.go_to(pos_start, speed=0.05, relative=True) if save: pat_to_save = os.path.join(path_dir, "profilometers_motion.txt") f = open(path_to_save, "w") f.write( "it = {} ; t = {} ; z = {} ; v = {} ; ".format( it, times[0], positions[0], speeds[0] ) ) # Follow track and track register for it, t in enumerate(times[1:-1]): it += 1 print("it = ", it) pos_to_go = float(positions[it]) speed = float(speeds[it]) self.go_to(pos_to_go, speed, relative=True) if save: f.write( "it = {} ; t = {} ; z = {} ; v = {} ; ".format( it, times[it], positions[it], speeds[it] ) ) if save: f.close() def record_positions(self, duration, dtime=0.2): path = os.path.join( path_dir, "positions_traverses_{}.txt".format(time_as_str()) ) nb_traverses = len(self.traverses) positions = np.empty([nb_traverses]) times = np.empty_like(positions) with open(path, "w") as f: timer = Timer(dtime) f.write( "# time start (since Epoch): {:.4f} s ({})\n".format( timer.tstart, ctime(timer.tstart) ) + f"# {nb_traverses} traverse(s)\n" + "# time, position\n" ) t_loop = 0.0 while t_loop < duration: for it, traverse in enumerate(self.traverses): positions[it] = traverse.get_relative_position() times[it] = time() times -= timer.tstart s = "" for it, t in enumerate(times): if s != "": s += "," pos = positions[it] s += f"{t:.4f},{pos:.4f}" f.write(s + f"\n") t_loop = timer.wait_tick() def record_positions_async(self, duration, dtime=0.2): thread = create_thread_record_positions(duration, dtime) thread.start() return thread def create_thread_record_positions(self, duration, dtime=0.2): thread = Thread(target=self.record_positions, args=(duration, dtime)) return thread def run_profiles( self, times, speeds, positions, relative=True, thread_record=None ): eps = 0.001 for traverse in self.traverses: pos = traverse.get_relative_position() if abs(pos - positions[0]) > eps: print("First run traverses.go_to({}, 0.01)".format(positions[0])) return threads = [] if thread_record is not None: threads.append(thread_record) # The master driver is also driven the 0/5 volts output signal thread = Thread( target=self.traverses[0].follow_track, args=(times, speeds, positions), kwargs={"use_u3": True}, ) threads.append(thread) for traverse in self.traverses[1:]: thread = Thread( target=traverse.follow_track, args=(times, speeds, positions) ) threads.append(thread) for thread in threads: thread.start() for thread in threads: thread.join() print("threads = ", threads) if __name__ == "__main__": ip_addresses = ["192.168.28.11"] traverses = Traverses(ip_addresses=ip_addresses) # traverses = Traverses() # traverse0 = traverses.traverse0 # traverse1 = traverses.traverse1 # traverse = Traverse(ip_modbus='192.168.28.13') # Parameters z_max = 0.77 z_min = 0.02 coef = 2 v_up = 0.1 / coef v_down = 0.07 / coef accel = 0.05 / coef dacc = 0.05 / coef dt = 0.25 * coef t_exp = 200.0 t_bottom = 5.0 * coef t_period = 60.0 * coef profile = True if profile: times, speeds, positions = define_track_profilometer( z_max, z_min, v_up, v_down, accel, dacc, dt, t_exp, t_bottom, t_period ) import matplotlib.pyplot as plt fig = plt.figure() ax = fig.gca() ax.set_title("Position Vs Time") ax.set_xlabel("time (s)") ax.set_ylabel("z (m)") ax.plot(times, positions, "x") plt.show(block=True) answer = query.query("\n Is the expected track? [y/n] ") if answer.startswith("n"): print("track cancelled") pass else: print("running track") traverses.run_profiles(times, speeds, positions)