from flexx import flx
from flexxros.node import ROSWidget
from flexxros.widgets import ROSTopicPlotter, ROSDynReconfigWidget, ROSActionClientWidget
[docs]class ActuatorBox(ROSWidget):
def init(self, name, actuator_type, setpoint_topic,
feedback_topic, cont_enable_topic,
cont_setpoint_topic, cont_min, cont_max,
is_angles=False):
self.setpoint_topic = setpoint_topic
self.cont_setpoint_topic = cont_setpoint_topic
self.cont_enable_topic = cont_enable_topic
self.is_angles = is_angles
with flx.GroupWidget(title=name, flex=1):
with flx.FormLayout(flex=1):
flx.Label(text="Actuator setpoint")
if is_angles:
self.set_slider = flx.Slider(title="0", min=-1.6, max=1.6)
self.set_slider2 = flx.Slider(title="0", min=-1.6, max=1.6)
else:
self.set_slider = flx.Slider(title="50", min=0, max=100, value=50)
flx.Label(text="Controller setpoint")
self.enable_cont = flx.CheckBox(title="Enabled")
self.enable_cont.set_checked(True)
self.cont_slider = flx.Slider(title="0", min=cont_min, max=cont_max, value=0)
flx.Widget(minsize=20)
self.announce_publish(setpoint_topic, actuator_type)
self.announce_publish(cont_setpoint_topic, "std_msgs/Float64")
self.announce_publish(cont_enable_topic, "std_msgs/Bool")
self.subscribe(setpoint_topic, actuator_type, self._setpoint_callback)
self.subscribe(cont_setpoint_topic, "std_msgs/Float64", self._cont_callback)
self.subscribe(cont_enable_topic, "std_msgs/Bool", self._enable_callback)
if is_angles:
self.reaction(self._setpoint_slider, "set_slider2.user_done")
@flx.reaction("set_slider.user_done")
def _setpoint_slider(self, *events):
if self.is_angles:
self.publish(self.setpoint_topic, {'weight_1_offset_radians': float(self.set_slider.value), 'weight_2_offset_radians': float(self.set_slider2.value)})
else:
self.publish(self.setpoint_topic, {'value': float(self.set_slider.value)})
def _setpoint_callback(self, msg):
if self.is_angles:
self.set_slider.set_title(str(int(msg.weight_1_offset_radians)))
self.set_slider.set_value(msg.weight_1_offset_radians)
self.set_slider2.set_title(str(int(msg.weight_2_offset_radians)))
self.set_slider2.set_value(msg.weight_2_offset_radians)
else:
self.set_slider.set_title(str(int(msg.value)))
self.set_slider.set_value(msg.value)
@flx.reaction("cont_slider.user_done")
def _cont_slider(self, *events):
self.publish(self.cont_setpoint_topic, {'data': float(self.cont_slider.value)})
def _cont_callback(self, msg):
self.cont_slider.set_title(str(int(msg.data)))
self.cont_slider.set_value(msg.data)
@flx.reaction("enable_cont.user_checked")
def _enable_check(self, *events):
self.publish(self.cont_enable_topic, {'data': bool(self.enable_cont.checked)})
def _enable_callback(self, msg):
self.enable_cont.set_checked(msg.data)
[docs]class SamPlots(flx.Widget):
def init(self):
with flx.HBox(flex=1):
with flx.VBox(flex=1):
self.plot1 = ROSTopicPlotter("/pitch_feedback", "std_msgs/Float64", "data", (-1.6, 1.6))
flx.Widget(flex=1)
self.plot2 = ROSTopicPlotter("/depth_feedback", "std_msgs/Float64", "data", (0, 6))
flx.Widget(flex=1)
self.plot3 = ROSTopicPlotter("/roll_feedback", "std_msgs/Float64", "data", (-1.6, 1.6))
with flx.VBox(flex=1):
self.plot4 = ROSTopicPlotter("/uavcan_lcg_command", "sam_msgs/PercentStamped", "value")
flx.Widget(flex=1)
self.plot5 = ROSTopicPlotter("/uavcan_vbs_command", "sam_msgs/PercentStamped", "value")
flx.Widget(flex=1)
self.plot6 = ROSTopicPlotter("/ros_to_uavcan_bridge_node/tcg_command1", "sam_msgs/BallastAngles", "weight_1_offset_radians", (-3.14, 3.14))
with flx.VBox(flex=1):
self.plot7 = ROSTopicPlotter("/uavcan_to_ros_bridge_node/lcg_feedback", "sam_msgs/PercentStamped", "value")
flx.Widget(flex=1)
self.plot8 = ROSTopicPlotter("/uavcan_to_ros_bridge_node/vbs_feedback", "sam_msgs/PercentStamped", "value")
flx.Widget(flex=1)
flx.Widget(minsize=220)
[docs]class SamActuatorBar(ROSWidget):
def init(self):
with flx.VBox(flex=0, minsize=300, style="background: #9d9;"):
lcg_actuator = ActuatorBox("Pitch - LCG", "sam_msgs/PercentStamped",
"/uavcan_lcg_command", "/uavcan_to_ros_bridge_node/lcg_feedback",
"/LCG_trim/pid_enable", "/pitch_setpoint", -1.6, 1.6)
vbs_actuator = ActuatorBox("Depth - VBS", "sam_msgs/PercentStamped",
"/uavcan_vbs_command", "/uavcan_to_ros_bridge_node/vbs_feedback",
"/VBS_depth/pid_enable", "/depth_setpoint", 0, 5)
tcg_actuator = ActuatorBox("Roll - TCG", "sam_msgs/BallastAngles",
"/ros_to_uavcan_bridge_node/tcg_command1", "",
"/TCG_pid/pid_enable", "/roll_setpoint", -1.6, 1.6, True)
self.leak_button = flx.Button(text="No leaks...", style="background: #008000;", disabled=True)
flx.Widget(flex=1)
self.startup_check = ROSActionClientWidget("/sam_startup_check", "sam_msgs/SystemsCheck")
self.abort_button = flx.Button(text="Abort", style="background: #ff6961;")
self.subscribe("/uavcan_leak", "sam_msgs/Leak", self.callback)
def callback(msg):
if msg.value:
self.leak_button.set_text("Leaking!")
self.leak_button.apply_style("background: #ff6961;")
@flx.reaction('abort_button.pointer_click')
def _publish_abort(self, *events):
self.publish("/abort", {})