Yahboom Robot Arm Controller

Yahboom DOFBOT AI Vision Robotic Arm is 6 degree-of-freedom robotic arm controlled by a Jetson Nano. In this tutorial, you will see how a robotic arm controller can be implemented using xronos.

../_images/dofbot.jpg

Image of DOFBOT-6 from Yahboom Technology / retrieved from https://github.com/YahboomTechnology/dofbot-jetson_nano/blob/master/DOFBOT.jpg / No copyright or licensing information was provided

Prerequisites

To run the code provided in this tutorial you will need a Yahboom DOFOT robot arm which can be purchased here. The robot arm includes an NVIDIA Jetson Nano which must be configured with Ubuntu 22.04. Documentation for installing Ubuntu 22.04 is found here. Alternatively, the code should also work with Raspberry Pi but this has not been tested.

Cloning repo and installing dependencies

Note

This section assumes that you are logged onto the NVIDIA Jetson Nano on the Yahboom DOFBOT.

Begin by cloning the xronos repository and navigate to the robot arm controller example:

$ git clone https://github.com/xronos/xronos.git
$ cd xronos/examples/robot-arm

Set up a virtual environment and install xronos, see Installation for instructions.

Install the Python dependencies:

$ pip install -r requirements.txt

Overview

The following diagram shows the layout of the program.

Robot arm controller diagram

The ArmControl reactor is driven by a periodic timer with a 100 ms period and accepts new trajectories on its input port from the UserInterface reactor. When the final pose of a trajectory is reached, the ArmControl reactor notifies the UserInterface that it can accept a new trajectory.

Run the program and give the controller a trajectory:

$ python arm.py 
$ > moveto green yellow init

The robot should follow a trajectory from the initial pose to the green field, followed by the yellow field and back to the initial pose.

In the following, we will take a closer look at the two reactors.

UserInterface reactor

Open user_interface.py. The UserInterface reactor is an example of a generic reusable reactor. It declares four elements. Two ports, a programmable timer, and a physical event. A PhysicalEvent is useful for sending events to the xronos runtime from external contexts such as a thread. In the __init__ method we initialize class variables including a thread and a semaphore.

def __init__(
    self, blocking: bool, parser: Callable[[str], T | None | KeyboardInterrupt]
):
    super().__init__()
    self.blocking = blocking
    self.parser = parser
    self.stop_thread_event = threading.Event()
    self.thread = threading.Thread(
        target=self.run_user_input_thread, args=(self.stop_thread_event,)
    )
    self.semaphore = threading.Semaphore(value=0)

The thread is not started until the startup reaction. This ensures that the xronos runtime runs before the thread calls any of the xronos APIs.

@xronos.reaction
def on_startup(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    """Start the thread listening for console input."""
    interface.add_trigger(self.startup)

    def handler() -> None:
        # Print out the help message on startup.
        self.parser("help")
        self.thread.start()

    return handler

The thread reads a user command from stdin and schedules it via the physical event. The thread will then acquire a semaphore before reading the next command.

def run_user_input_thread(self, stop_event: threading.Event) -> None:
    """Listen for console input and schedule the next user input action.

    This function will run in a separate thread. After receiving an input
    it will block until the semaphore is released by the runtime.
    """
    while not stop_event.is_set():
        cmd = input("> ")
        self._user_input.trigger(cmd)
        self.semaphore.acquire()

The physical events encapsulating the commands from stdin are handled by a reaction that parses the commands with a supplied function and writes recognized commands to the output port. The UserInterface reactor can be configured as blocking, in which case it will wait for an event on its input port before releasing the semaphore and allowing the external thread to read more commands from the console. If it is non-blocking it will schedule the release of the semaphore at once.

@xronos.reaction
def on_user_input(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    """Parse the user input and output any parsed command on the output port."""
    user_input_trigger = interface.add_trigger(self._user_input)
    output_effect = interface.add_effect(self.output)
    unblock_effect = interface.add_effect(self._unblock)

    def handler() -> None:
        cmd = self.parser(user_input_trigger.get())
        if isinstance(cmd, KeyboardInterrupt):
            self.request_shutdown()
        elif cmd:
            output_effect.set(cmd)

        if not cmd or not self.blocking:
            # If the user interface is non-blocking, schedule an unblock event.
            unblock_effect.schedule(value=None)

    return handler

ArmControl reactor

Open arm.py. The ArmControl reactor declares several reactor elements. An input port new_trajectory for accepting new trajectories from the user or a planner, a periodic timer _sample_timer for sampling the servos, a programmable timer _in_position for detecting changes to the servo states, and finally an output port trajectory_completed for notifying the user, or planner, that the trajectory was completed.

Upon receiving a new trajectory on the input port we update the internal state of ArmControl.

@xronos.reaction
def on_new_trajectory(
    self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
    new_trajectory_trigger = interface.add_trigger(self.new_trajectory)

    def handler() -> None:
        self.trajectory = new_trajectory_trigger.get()
        next_pose = self.trajectory.next_pose()
        if self.verbose:
            print(f"Received trajectory, setting next_pose={next_pose}")
        if next_pose:
            self.goal = next_pose

    return handler

In response to timer events, the position of each servo is read through self.read_pose(). When a servo is within some small delta angle from its desired position, it is said to be in-position. The reaction schedules a programmable timer if in-position changes.

@xronos.reaction
def on_sample_timer(
    self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
    interface.add_trigger(self._sample_timer)
    in_position_effect = interface.add_effect(self._in_position)

    # update position and schedule action if state of in_position changes
    def handler() -> None:
        self.read_pose()
        in_position = self.in_position()
        distance = Pose.delta(self.goal, self.position)
        if self.verbose:
            print(f"Distance: {distance}")

        if in_position != self.was_in_position:
            self.was_in_position = in_position
            in_position_effect.schedule(in_position)

    return handler

When in-position changes one out of three things can happen. If the robot is not in-position, a new command is sent to the servo actuators to drive the robot to in-position. If the robot is in-position, it has either arrived at an intermediate pose of its trajectory, in which case it updates the current goal to the next pose, or it has completed its trajectory, in which case it writes to the output port to notify the user.

@xronos.reaction
def on_in_position_change(
    self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
    in_position_trigger = interface.add_trigger(self._in_position)
    trajectory_completed_effect = interface.add_effect(self.trajectory_completed)

    def handler() -> None:
        if not in_position_trigger.get():
            if self.verbose:
                print(f"Goal distance: {(Pose.delta(self.goal, self.position))}")

            self.goal.validate()
            for actuator, pos_deg in self.goal.positions.items():
                self.move_actuator(actuator, pos_deg)
        else:
            # Currrent pose goal reached. Check if more poses are in the trajectory.
            next_pose = self.trajectory.next_pose()
            if next_pose:
                self.goal = next_pose
            else:
                trajectory_completed_effect.set(True)

    return handler

Running the program

Run the application:

$ python arm.py

A command line interface is provided by the UserInterface reactor, type help for more info:

$ > help
CLI to robot arm controller. Commands: help, moveto [<green/red/blue/yellow/init> ...]. Exit with `exit` or Ctrl+C

To move the robot, provide a trajectory, e.g.:

$ > moveto red yellow green init

Telemetry

The Xronos Dashboard can provide invaluable runtime observability into the program. Bring it up on the Jetson by following the instructions in Dashboard. Then start the robot arm with telemetry enabled.

$ python arm.py --telemetry

There are now two options for viewing the dashboard. If you are within a desktop environment on the Jetson, either through a remote desktop, or by connecting a monitor and a keyboard directly to the Jetson, then you can simply point the web browser to localhost:3000 as explained in Dashboard. If you only have an SSH connection to the Jetson you can point the browser on your host computer to JETSON_IP_ADDR:3000 where JETSON_IP_ADDR is the IP address or hostname of the Jetson.

The trace view should look something like this.

Xronos Dashboard for robot arm

Observe that ui.on_user_input reaction executes and triggers arm_control.on_user_goal which updates the current goal pose. In the following execution of arm_control.on_sample_timer the change in in-position is detected, and an internal event is scheduled which triggers new commands to the servos.