Source code for flexxros.widgets

from flexx import flx
from flexxros.node import ROSWidget

[docs]class ROSTopicPlotter(ROSWidget): """ flx.Widget subclass that subscribes to a topic and plots it using a flx.PlotWidget """ def init(self, topic, topic_type, key="data", yrange=(0, 100), nsamples=100): self.key = key self.nsamples = nsamples self.start_time = time() self.plot = self.mem_plot = flx.PlotWidget(flex=1, style='width: 400px; height: 220px;', xdata=[], yrange=yrange, ylabel=topic+"/"+key, xlabel="Time") self.subscribe(topic, topic_type, self._callback) def _callback(self, msg): #try: # Prepare plots times = list(self.plot.xdata) times.append(time() - self.start_time) times = times[-self.nsamples:] # cpu data values = list(self.plot.ydata) values.append(msg[self.key]) values = values[-self.nsamples:] self.plot.set_data(times, values)
#except BufferError: # print("Got buffer error!")
[docs]class ROSDynReconfigWidget(flx.Widget): """ flx.Widget subclass that dynamically creates a form to set parameters through dynamic reconfigure """ def init(self, server_name): self.is_init = False self.react = self.reaction(self._callback, "!root."+server_name.replace("/", "_")) with flx.GroupWidget(title=server_name): self.vbox = flx.FormLayout(flex=1) self.root.announce_reconfig(server_name) @flx.action def add_children(self, ev): print("Got add children!") if self.is_init: for c in ev: for child in self.vbox.children: print(child.title, child.text) if child.title == c: child.set_text(str(ev[c])) break return print("Event: ", ev) with self.vbox: for c in ev: if c in ["groups", "type", "source"]: continue l = flx.LineEdit(title=c, text=str(ev[c])) flx.Button(text="Set") flx.Widget(minsize=60) self.is_init = True def _callback(self, *events): print("Got new reconfig!") for ev in events: self.add_children(ev)
[docs]class ROSActionClientWidget(ROSWidget): """ flx.Widget subclass that presents a widget similar to the normal axclient.py """ def init(self, server_name, server_type): self.is_init = False self.server_name = server_name with flx.GroupWidget(title=server_name, flex=1): with flx.FormLayout(flex=1): self.arguments = flx.LineEdit(title="Args", text="") self.feedback = flx.LineEdit(title="Feedback", text="") self.result = flx.LineEdit(title="Result", text="") self.send_goal = flx.Button(text="Send goal") flx.Widget(minsize=40) self.reaction(self._prototype_callback, "!root."+server_name.replace("/", "_")+"_prototype") self.announce_action_client(server_name, server_type) @flx.reaction("send_goal.pointer_click") def _send_goal(self, *events): self.send_action_goal(self.server_name, self.arguments.text, self._feedback_callback, self._result_callback) self.feedback.set_text("Waiting...") self.result.set_text("Waiting...") def _prototype_callback(self, msg_string): self.arguments.set_text(msg_string.data) def _feedback_callback(self, msg): print("Got new feedback!") exclude = ["source", "type"] texts = [str(key) + ": " + str(value) for key, value in msg.items() if key not in exclude] self.feedback.set_text(", ".join(texts)) def _result_callback(self, msg): print("Got new result!") exclude = ["source", "type"] texts = [str(key) + ": " + str(value) for key, value in msg.items() if key not in exclude] self.result.set_text(", ".join(texts)) self.feedback.set_text("")