Ports and Connections

Reactors may send and receive messages to or from other reactors using input and output ports. The ports of a reactor define its interface. Outputs of one reactor can be connected to the inputs of others, which gives a natural way to compose programs from simple building blocks.

Simple Connections

Consider the following reactor.

class Counter(xronos.Reactor):
    output = xronos.OutputPortDeclaration()
    _timer = xronos.PeriodicTimerDeclaration(
        period=datetime.timedelta(milliseconds=100)
    )

    def __init__(self) -> None:
        super().__init__()
        self._count = 0

    @xronos.reaction
    def count(self, interface):
        interface.add_trigger(self._timer)
        output_effect = interface.add_effect(self.output)

        def handler() -> None:
            self._count += 1
            output_effect.set(self._count)

        return handler

It implements a counting mechanism, similar to the Timed reactor from the Periodic Timers guide. In addition to the timer, it declares an output port called output using an OutputPortDeclaration. The increment reaction is triggered by the timer. Alongside the trigger, the reaction also declares an effect using add_effect(). This returns a PortEffect object, which the handler can use to write a value to the port via the set() method. Setting a value on the port notifies any reactions that are triggered by a connected port.

For instance, we can define a Printer reactor that prints whatever it receives.

class Printer(xronos.Reactor):
    input_ = xronos.InputPortDeclaration()

    @xronos.reaction
    def print(self, interface):
        input_trigger = interface.add_trigger(self.input_)

        return lambda: print(
            f"{self.name} received {input_trigger.get()} at "
            f"{self.get_time_since_startup()}"
        )

This reactor uses an InputPortDeclaration to declare an input port called input_. The reaction uses add_trigger() to declare the input port as its trigger. The returned Trigger object can then be used to read the received value via the get() method.

Now we can write a main() function that instantiates and connects both reactors.

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    env.connect(counter.output, printer.input_)
    env.execute()


if __name__ == "__main__":
    main()

Copy the two reactors and the main() function into a new file called ports.py, and add the required imports at the top:

import datetime

import xronos

Running the program yields output similar to the following:

$ python ports.py
printer received 1 at 0:00:00
printer received 2 at 0:00:00.100000
printer received 3 at 0:00:00.200000
printer received 4 at 0:00:00.300000
printer received 5 at 0:00:00.400000
printer received 6 at 0:00:00.500000
printer received 7 at 0:00:00.600000
printer received 8 at 0:00:00.700000
printer received 9 at 0:00:00.800000
...

This program runs indefinitely — the timer keeps producing events and we never explicitly shut it down. Use Ctrl+C to stop it.

Notice that printer receives the first message right at startup, and all subsequent messages are spaced precisely by the timer period of 100ms. As mentioned in Periodic Timers, time doesn’t advance while reactions execute — and that applies to messages sent via ports too. The timestamp of any output produced by a reaction is equal to the timestamp of its trigger.

Delayed Connections

Sometimes it’s useful to delay messages so that they arrive at a later point in time. The connect() method accepts an optional delay argument. When set, the connection delivers the message on the receiving side precisely at get_time() + delay.

For instance, we can make the following change to delay all messages by 1s:

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    env.connect(counter.output, printer.input_, delay=datetime.timedelta(seconds=1))
    env.execute()

This produces the following output after waiting 1s:

$ python ports.py
printer received 1 at 0:00:01
printer received 2 at 0:00:01.100000
printer received 3 at 0:00:01.200000
printer received 4 at 0:00:01.300000
printer received 5 at 0:00:01.400000
printer received 6 at 0:00:01.500000
printer received 7 at 0:00:01.600000
printer received 8 at 0:00:01.700000
printer received 9 at 0:00:01.800000
...

Multiple Inputs

Reactors can define any number of input and output ports. While prior examples had reactions with a single trigger, it’s also possible to trigger a reaction from multiple inputs. Consider the following Multiplier reactor.

class Multiplier(xronos.Reactor):
    factor1 = xronos.InputPortDeclaration()
    factor2 = xronos.InputPortDeclaration()
    product = xronos.OutputPortDeclaration()

    @xronos.reaction
    def multiply(self, interface):
        factor1_trigger = interface.add_trigger(self.factor1)
        factor2_trigger = interface.add_trigger(self.factor2)
        product_effect = interface.add_effect(self.product)

        def handler():
            if factor1_trigger.is_present() and factor2_trigger.is_present():
                product_effect.set(factor1_trigger.get() * factor2_trigger.get())

        return handler

It declares two input ports, factor1 and factor2, as well as the output port product. The multiply reaction declares both input ports as triggers, so it runs whenever either or both inputs receive a message. The is_present() method tells you whether a particular trigger was activated and carries a value.

Note

Calling get() while is_present() returns False raises an AbsentError.

If both triggers are present, the handler writes their product to the output port. If only one is present, no output is produced. Of course, other strategies are possible too — for instance, assuming a default value or storing the last observed value in self.

Now we have a few options for connecting the Multiplier with the Counter and Printer reactors. Here’s one pattern that works nicely.

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter.output, multiplier.factor1)
    env.connect(counter.output, multiplier.factor2)
    env.connect(multiplier.product, printer.input_)
    env.execute()

This connects counter.output to both inputs of multiplier, delivering the same messages to both ports. When executed, the program prints square numbers.

$ python ports.py
printer received 1 at 0:00:00
printer received 4 at 0:00:00.100000
printer received 9 at 0:00:00.200000
printer received 16 at 0:00:00.300000
printer received 25 at 0:00:00.400000
printer received 36 at 0:00:00.500000
printer received 49 at 0:00:00.600000
printer received 64 at 0:00:00.700000
printer received 81 at 0:00:00.800000
...

You can achieve the same result using two separate counter instances.

def main():
    env = xronos.Environment()
    counter1 = env.create_reactor("counter1", Counter)
    counter2 = env.create_reactor("counter2", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter1.output, multiplier.factor1)
    env.connect(counter2.output, multiplier.factor2)
    env.connect(multiplier.product, printer.input_)
    env.execute()

Regardless of how the reactors are connected, the runtime analyzes the connections and the declared triggers and effects of each reaction to ensure repeatable behavior. Concretely, the multiply reaction doesn’t need to wait for inputs to arrive — the runtime guarantees that any reactions which could affect multiply’s triggers have already executed. This means the value and is_present status of every trigger is always known when the handler runs.

We can also use delayed connections to offset the factors received by multiplier.

def main():
    env = xronos.Environment()
    counter1 = env.create_reactor("counter1", Counter)
    counter2 = env.create_reactor("counter2", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter1.output, multiplier.factor1)
    env.connect(
        counter2.output, multiplier.factor2, delay=datetime.timedelta(milliseconds=200)
    )
    env.connect(multiplier.product, printer.input_)
    env.execute()

This produces the following output.

$ python ports.py
printer received 3 at 0:00:00.200000
printer received 8 at 0:00:00.300000
printer received 15 at 0:00:00.400000
printer received 24 at 0:00:00.500000
printer received 35 at 0:00:00.600000
printer received 48 at 0:00:00.700000
printer received 63 at 0:00:00.800000
...