Object Detection with YOLO

YOLO11 is a popular image detection framework for Python. In this tutorial, you will see how we can implement an object detection flow using xronos.

Clone the YOLO example and install dependencies

Begin by cloning the xronos repository and navigating to the YOLO example.

$ git clone https://github.com/xronos-inc/xronos
$ cd xronos/examples/YOLO

Set up a virtual environment and install xronos, see Installation instructions.

Install the dependencies of the YOLOv11 example.

$ apt-get install libgl1
$ pip install -r requirements.txt

Overview

Familiarize yourself with the main program in YOLO.py. After parsing some command line arguments, the main() function assembles the reactor program.

def main(
    video_path: Optional[str] = None,
    no_display: bool = False,
    telemetry: bool = False,
) -> None:
    env = xronos.Environment()
    webCam = env.create_reactor(
        "WebCam", WebCam, frames_per_second=10, video_stream_path=video_path
    )
    dnn = env.create_reactor("DNN", DNN, model_path="yolo11n.pt")
    display = env.create_reactor("Display", Display, no_display=no_display)

    env.connect(webCam.frame, display.frame)
    env.connect(webCam.frame, dnn.frame)
    env.connect(dnn.result, display.dnn_result)

    if telemetry:
        print("enable telemetry")
        env.enable_telemetry()

    env.execute()

This creates the following reactor program.

YOLO diagram

In the following we will go through each of the three reactors.

WebCam reactor

Open WebCam.py. The WebCam reactor declares two reactor elements, using an ProgrammableTimerDeclaration called _sample_webcam and an OutputPortDeclaration called frame. Programmable timers are objects which the program can use to schedule future events. Unlike periodic timers, where the timestamps of the events are periodic, programmable timers can be scheduled at arbitrary intervals and are, as such, more expressive than periodic timers. Reactors communicate by exchanging events via ports. In this case, WebCam can send events containing cv2 matrices.

class WebCam(xronos.Reactor):
    _sample_webcam = xronos.ProgrammableTimerDeclaration[None]()
    frame = xronos.OutputPortDeclaration[cv2.typing.MatLike]()

The __init__ method defines the sample_period attribute and opens a VideoCapture stream using cv2.

def __init__(
    self, frames_per_second: int, video_stream_path: str | None = None
) -> None:
    super().__init__()
    self.sample_period = datetime.timedelta(seconds=1.0 / frames_per_second)
    print(self.sample_period)

    if video_stream_path is not None:
        self.stream = cv2.VideoCapture(video_stream_path)
    else:
        self.stream = cv2.VideoCapture(0, cv2.CAP_ANY)

    if not self.stream.isOpened():
        print("ERROR: Could not open VideoCapture stream")
        sys.exit(1)

    self.stream.set(cv2.CAP_PROP_FPS, frames_per_second)

The reactor contains three reactions. on_startup is triggered by startup. The startup reaction uses add_effect() to declare an effect on the programmable timer _sample_webcam. This allows the reaction handler to schedule the first occurrence of _sample_webcam event. Instead of invoking schedule() directly, the reaction handler uses the convenience function reschedule_sample_webcam to schedule the first event. We will explain this function in more detail later.

@xronos.reaction
def on_startup(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    interface.add_trigger(self.startup)
    sample_webcam_effect = interface.add_effect(self._sample_webcam)

    def handler() -> None:
        self.reschedule_sample_webcam(sample_webcam_effect)

    return handler

The on_sample_webcam reaction is triggered by the _sample_webcam event and may have an effect on the fame output port. The main task of the handler is to read a frame from the camera and to forward this frame to the next reactor using the frame output port. A value is written to a port by calling set() on the declared port effect. If no frame could be read, the handler stops the program using request_shutdown(). Finally, the handler uses reschedule_sample_webcam to schedule the next occurrence of _sample_webcam, so that the reaction will be triggered again in the future.

@xronos.reaction
def on_sample_webcam(
    self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
    interface.add_trigger(self._sample_webcam)
    sample_webcam_effect = interface.add_effect(self._sample_webcam)
    frame_effect = interface.add_effect(self.frame)

    def handler() -> None:
        ret, _frame = self.stream.read()
        if ret:
            frame_effect.set(_frame)
        else:
            print("WARNING: Failed to read frames from videostream. Shutting down.")
            self.request_shutdown()
        self.reschedule_sample_webcam(sample_webcam_effect)

    return handler

The function reschedule_sample_webcam, that is used for scheduling new occurrences of the _sample_webcam event, implements a simple adaptive load-balancing strategy. It uses xronos.Reactor.get_lag() to inspect how closely events are processed with respect to real-time. If the lag is greater than sample_period we skip a number of samples. Note that event itself is declared to carry no value. Therefore, it is scheduled using value=None.

def reschedule_sample_webcam(
    self, action: xronos.ProgrammableTimerEffect[None]
) -> None:
    next_sample_time = self.sample_period + self.sample_period * math.floor(
        self.get_lag() / self.sample_period
    )
    action.schedule(value=None, delay=next_sample_time)

The on_shutdown reaction is triggered by the shutdown event. The handler frees up resources and gracefully shuts the camera stream down. Even in response to an unhandled exception or Ctrl+C, the shutdown handler is invoked and closes the video stream.

@xronos.reaction
def on_shutdown(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    interface.add_trigger(self.shutdown)

    def handler() -> None:
        print("Cleaning up WebCam")
        if self.stream.isOpened():
            self.stream.release()

    return handler

DNN reactor

The DNN reactor is found in DNN.py. It declares an input port called frame and output port called result.

class DNN(xronos.Reactor):
    frame = xronos.InputPortDeclaration[cv2.typing.MatLike]()
    result = xronos.OutputPortDeclaration[
        tuple[list[str], NDArray[np.float32], NDArray[np.float32]]
    ]()

The __init__ method checks whether there is a GPU with CUDA support and loads the YOLO model parameters using the ultralytics module.

def __init__(self, model_path: str) -> None:
    super().__init__()
    if torch.cuda.is_available():
        print("Using CUDA for inference")
        self.device = "cuda"
    else:
        print("Using CPU for inference")
        self.device = "cpu"
    self.model: ultralytics.YOLO = ultralytics.YOLO(model_path, verbose=False)  # type: ignore

The reactor defines a single reaction. It is triggered by the frame input port and has the output port result as an effect. Upon receiving a frame it does inference using the YOLO model and extracts bounding boxes, confidence values and names. The result is written to the output port.

@xronos.reaction
def on_frame(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    frame_trigger = interface.add_trigger(self.frame)
    result_effect = interface.add_effect(self.result)

    def handler() -> None:
        # Run the model on the frame
        results: ultralytics.engine.results.Results = self.model(  # type: ignore
            [frame_trigger.get()]
        )[0]
        # Extract names, bounding boxes and confidences scores from the results.
        boundingBoxes = results.boxes.xyxyn.cpu().numpy()
        confidences = results.boxes.conf.cpu().numpy()
        names = [results.names[x] for x in results.boxes.cls.cpu().numpy()]
        result_effect.set((names, confidences, boundingBoxes))

    return handler

Display reactor

The Display reactor is found in Display.py. It declares two input ports frame and dnn_result.

class Display(xronos.Reactor):
    # Display requires events frame and dnn_result to be simultaneous.
    frame = xronos.InputPortDeclaration[cv2.typing.MatLike]()
    dnn_result = xronos.InputPortDeclaration[
        tuple[list[str], NDArray[np.float32], NDArray[np.float32]]
    ]()

The __init__ method initializes a few attributes.

def __init__(
    self, confidence_threshold: float = 0.7, no_display: bool = False
) -> None:
    super().__init__()
    self.confidence_threshold = confidence_threshold
    self.label_color = LABEL_COLOR
    self.no_display = no_display

The reactor defines two reactions. on_frame is triggered by both input ports. This means that if a message is received on either input port, the reaction will be executed. If both input ports receive a message with identical timestamps, the xronos runtime ensures that the reaction is only executed once, with both input ports present. In this case, the messages arrive simultaneously. The presence of an event can be checked using the is_present() method. The ability to align events on multiple input port simplifies the application logic needed. The reaction handler can fully focus on sensor fusion.

@xronos.reaction
def on_frame(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    frame_trigger = interface.add_trigger(self.frame)
    dnn_result_trigger = interface.add_trigger(self.dnn_result)

    def handler() -> None:
        if not frame_trigger.is_present() or not dnn_result_trigger.is_present():
            print(
                "ERROR: Display requires events frame and dnn_result simultaneously"
            )
            return

        current_frame = frame_trigger.get()
        current_dnn_result = dnn_result_trigger.get()

        x_shape, y_shape = current_frame.shape[1], current_frame.shape[0]
        # Draw all the object detection triangles and dnn_result
        for name, confidence, coordinate in zip(*current_dnn_result):
            if confidence > self.confidence_threshold:
                x1, y1 = int(coordinate[0] * x_shape), int(coordinate[1] * y_shape)  # type: ignore[index]
                x2, y2 = int(coordinate[2] * x_shape), int(coordinate[3] * y_shape)  # type: ignore[index]
                cv2.rectangle(
                    current_frame, (x1, y1), (x2, y2), self.label_color, 2
                )
                cv2.putText(
                    current_frame,
                    name,
                    (x1, y1),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.9,
                    self.label_color,
                    2,
                )

        if not self.no_display:
            # Draw actual frame
            cv2.imshow("frame", current_frame)
            # Terminate on press-q
            cv2.putText(
                current_frame,
                "Exit with `q`",
                (0, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (0, 0, 0),
                2,
            )
            if cv2.waitKey(1) == ord("q"):
                self.request_shutdown()

    return handler

Finally, the on_shutdown reaction is triggered by the xronos.Reactor.shutdown event and closes the window.

@xronos.reaction
def on_shutdown(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
    interface.add_trigger(self.shutdown)

    def handler() -> None:
        print("Cleaning up Display")
        cv2.destroyAllWindows()

    return handler

Bringing it all together

Run the example application with disabled debug logging from torch:

$ YOLO_VERBOSE=False python YOLO.py

A window will appear displaying the video stream from your webcam and the object detection made by YOLO.

Optionally, you can append the --telemetry argument to visualize the program execution in the Dashboard.