Hierarchy

Reactors can be composed hierarchically: any reactor can create and contain other reactors. This lets you build programs from well-defined building blocks and bundle related reactors into reusable components.

Creating and connecting nested reactors

The Reactor.create_reactor() method, which works just like Environment.create_reactor(), creates a reactor that is owned by the calling reactor rather than by the environment. Sub-reactors and their internal connections are set up in the __init__ method of the container.

Consider the following example. Copy it into a new file called scaled_counter.py and add the required imports at the top.

import datetime

import xronos


class Counter(xronos.Reactor):
    output = xronos.OutputPortDeclaration()
    _timer = xronos.PeriodicTimerDeclaration(
        period=datetime.timedelta(milliseconds=100)
    )

    def __init__(self):
        super().__init__()
        self._count = 0

    @xronos.reaction
    def count(self, interface):
        interface.add_trigger(self._timer)
        output_effect = interface.add_effect(self.output)

        def handler():
            self._count += 1
            output_effect.set(self._count)

        return handler


class Printer(xronos.Reactor):
    input_ = xronos.InputPortDeclaration()

    @xronos.reaction
    def print(self, interface):
        input_trigger = interface.add_trigger(self.input_)

        return lambda: print(
            f"{self.name} received {input_trigger.get()} at "
            f"{self.get_time_since_startup()}"
        )


class Gain(xronos.Reactor):
    input_ = xronos.InputPortDeclaration()
    output = xronos.OutputPortDeclaration()

    def __init__(self, factor):
        super().__init__()
        self._factor = factor

    @xronos.reaction
    def scale(self, interface):
        input_trigger = interface.add_trigger(self.input_)
        output_effect = interface.add_effect(self.output)

        def handler():
            output_effect.set(input_trigger.get() * self._factor)

        return handler


class ScaledCounter(xronos.Reactor):
    output = xronos.OutputPortDeclaration()

    def __init__(self, factor):
        super().__init__()
        counter = self.create_reactor("counter", Counter)
        gain = self.create_reactor("gain", Gain, factor)
        self.connect(counter.output, gain.input_)
        self.connect(gain.output, self.output)


def main():
    env = xronos.Environment()
    scaled = env.create_reactor("scaled_counter", ScaledCounter, 3)
    printer = env.create_reactor("printer", Printer)
    env.connect(scaled.output, printer.input_)
    env.execute()


if __name__ == "__main__":
    main()

Gain is a simple reactor that multiplies every incoming value by a configurable factor. ScaledCounter is a container reactor that composes Counter and Gain into a single component. Its __init__ method calls create_reactor() to instantiate both as sub-reactors, then uses connect() to wire them together.

ScaledCounter also declares its own output port. Connecting gain.output to self.output passes values produced inside the container through to the outside world. A container’s input port can be forwarded inward in the same way. From the outside, ScaledCounter looks like any other reactor with an output port.

Running the program produces output similar to the following:

$ python scaled_counter.py
printer received 3 at 0:00:00
printer received 6 at 0:00:00.100000
printer received 9 at 0:00:00.200000
printer received 12 at 0:00:00.300000
printer received 15 at 0:00:00.400000
printer received 18 at 0:00:00.500000
printer received 21 at 0:00:00.600000
printer received 24 at 0:00:00.700000
printer received 27 at 0:00:00.800000
...

Use Ctrl+C to stop the program.

Reactions accessing nested ports

Instead of (or in addition to) using connect(), a container reactor can define its own reactions that interact directly with its sub-reactors’ ports. This is useful when the container needs to apply logic that goes beyond simple forwarding.

Consider the following modification of ScaledCounter:

class ScaledCounter(xronos.Reactor):
    output = xronos.OutputPortDeclaration()

    def __init__(self, factor):
        super().__init__()
        counter = self.create_reactor("counter", Counter)
        self._gain = self.create_reactor("gain", Gain, factor)
        self.connect(counter.output, self._gain.input_)

    @xronos.reaction
    def on_gain(self, interface):
        gain_trigger = interface.add_trigger(self._gain.output)
        output_effect = interface.add_effect(self.output)

        def handler():
            value = gain_trigger.get()
            if value % 2 != 0:
                output_effect.set(value)

        return handler

ScaledCounter contains the same Counter and Gain sub-reactors as before and wires them together in __init__. The difference is that instead of connecting gain.output directly to self.output, it stores self._gain as an instance attribute and defines an on_gain reaction.

The reaction passes self._gain.output to add_trigger(), so it runs every time Gain produces a value. It also declares self.output as an effect via add_effect(), which gives the handler a PortEffect it can write to. The handler reads the scaled value and only forwards it when it is odd, effectively halving the output rate.

Running this modified program produces output similar to the following:

$ python scaled_counter.py
printer received 3 at 0:00:00
printer received 9 at 0:00:00.200000
printer received 15 at 0:00:00.400000
printer received 21 at 0:00:00.600000
printer received 27 at 0:00:00.800000
...

Use Ctrl+C to stop the program.