Ports and Connections

Reactors may send and receive messages to or from other reactors using input and output ports. The ports of a reactor define its interface. Outputs of one reactor may be connected to the inputs of others, which gives a mechanism for composing programs from simple building blocks.

Simple Connections

Consider the following reactor.

class Counter(xronos.Reactor):
    output = xronos.OutputPortDeclaration()
    _timer = xronos.PeriodicTimerDeclaration(
        period=datetime.timedelta(milliseconds=100)
    )

    def __init__(self) -> None:
        super().__init__()
        self._count = 0

    @xronos.reaction
    def count(self, interface):
        interface.add_trigger(self._timer)
        output_effect = interface.add_effect(self.output)

        def handler() -> None:
            self._count += 1
            output_effect.set(self._count)

        return handler

It implements a counting mechanism, similar to the Timed reactor that we considered before. In addition to the timer, it also declares an output port called output using an OutputPortDeclaration. The increment reaction is triggered by the timer. In addition to the trigger, the reaction also declares an effect using add_effect(). This returns a PortEffect object, which the handler may use to write a value to the port using the set() method. Setting a value on the port notifies any reactions that are triggered by a connected port.

For instance, we can define a Printer reactor that prints any object that it receives.

class Printer(xronos.Reactor):
    input_ = xronos.InputPortDeclaration()

    @xronos.reaction
    def print(self, interface):
        input_trigger = interface.add_trigger(self.input_)

        return lambda: print(
            f"{self.name} received {input_trigger.get()} at "
            f"{self.get_time_since_startup()}"
        )

This reactor uses an InputPortDeclaration to declare an input port called input_. The reaction uses add_trigger() to declare the input port as its trigger. The returned Trigger object can be used to read the received value using the get() method.

Now we can write a main() function that instantiates and connects both reactors.

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    env.connect(counter.output, printer.input_)
    env.execute()


if __name__ == "__main__":
    main()

Copy the two reactors and the main() function into a new file called ports.py. Do not forget to add the required imports:

import datetime

import xronos

Executing the program yields output similar to the following:

$ python ports.py
printer received 1 at 0:00:00
printer received 2 at 0:00:00.100000
printer received 3 at 0:00:00.200000
printer received 4 at 0:00:00.300000
printer received 5 at 0:00:00.400000
printer received 6 at 0:00:00.500000
printer received 7 at 0:00:00.600000
printer received 8 at 0:00:00.700000
printer received 9 at 0:00:00.800000
...

This program will execute indefinitely as the timer will always produce new events and we do not use request_shutdown(). Use Ctrl+C to abort the execution. If the program defines any reactions to shutdown, these will be invoked after the termination signal is received.

Note that printer receives the first message right at startup, and all subsequent messages are spaced perfectly by the timer period of 100ms. As mentioned in Periodic Timers, time does not advance while reactions execute. This is also true for any messages sent via ports. The timestamp of any output produced by a reaction is equal to the timestamp of its trigger.

Delayed Connections

Sometimes it is useful to delay messages so that they are processed at a later point in time. The connect() method accepts an optional delay argument. If it is set, the connection will output the message on the receiving side precisely at get_time() + delay.

For instance, we can make the following modification to delay all messages by 1s:

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    env.connect(counter.output, printer.input_, delay=datetime.timedelta(seconds=1))
    env.execute()

This produces the following output after waiting for 1s:

$ python ports.py
printer received 1 at 0:00:01
printer received 2 at 0:00:01.100000
printer received 3 at 0:00:01.200000
printer received 4 at 0:00:01.300000
printer received 5 at 0:00:01.400000
printer received 6 at 0:00:01.500000
printer received 7 at 0:00:01.600000
printer received 8 at 0:00:01.700000
printer received 9 at 0:00:01.800000
...

Multiple Inputs

Reactors may define an arbitrary number of input and output ports. While in prior examples all reactions had a single trigger, it is also possible to trigger reactions by multiple inputs. Consider the following Multiplier reactor.

class Multiplier(xronos.Reactor):
    factor1 = xronos.InputPortDeclaration()
    factor2 = xronos.InputPortDeclaration()
    product = xronos.OutputPortDeclaration()

    @xronos.reaction
    def multiply(self, interface):
        factor1_trigger = interface.add_trigger(self.factor1)
        factor2_trigger = interface.add_trigger(self.factor2)
        product_effect = interface.add_effect(self.product)

        def handler():
            if factor1_trigger.is_present() and factor2_trigger.is_present():
                product_effect.set(factor1_trigger.get() * factor2_trigger.get())

        return handler

It declares two input ports factor1 and factor2 as well as the output port product. The multiply reaction declares both input ports as triggers. Consequently, the reaction executes when either or both of the inputs receive a message. The is_present() method can be used to check if the trigger was activated and carries a value.

Note

Calling get() while is_present() returns False raises an AbsentError.

If both triggers are present, the handler writes the product to the output port using the corresponding effect. If only one of the triggers is present, then no output is produced. Note that it is also possible to define other strategies, such as assuming a default value or storing the last observed value on self.

Now we have multiple options for connecting the Multiplier reactor with the Counter and Printer reactors. For instance, we can create the following pattern.

def main():
    env = xronos.Environment()
    counter = env.create_reactor("counter", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter.output, multiplier.factor1)
    env.connect(counter.output, multiplier.factor2)
    env.connect(multiplier.product, printer.input_)
    env.execute()

This connects counter.output to inputs of multiplier, so that the same messages are delivered to both input ports. When executed, the program prints square numbers.

$ python ports.py
printer received 1 at 0:00:00
printer received 4 at 0:00:00.100000
printer received 9 at 0:00:00.200000
printer received 16 at 0:00:00.300000
printer received 25 at 0:00:00.400000
printer received 36 at 0:00:00.500000
printer received 49 at 0:00:00.600000
printer received 64 at 0:00:00.700000
printer received 81 at 0:00:00.800000
...

We can also achieve the same behavior using two instances of counter.

def main():
    env = xronos.Environment()
    counter1 = env.create_reactor("counter1", Counter)
    counter2 = env.create_reactor("counter2", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter1.output, multiplier.factor1)
    env.connect(counter2.output, multiplier.factor2)
    env.connect(multiplier.product, printer.input_)
    env.execute()

Independent of how the reactors are connected, the runtime analyzes the connections as well as the declared triggers and effects of reactions to ensure repeatable behavior. Concretely, the multiply reaction does not need to wait for inputs to arrive. The runtime ensures that any reactions that may have an effect on multiply’s triggers are executed first. Thus, the values and the is_present status of any trigger is guaranteed to be known when the handler executes.

Finally, we may use delayed connections to offset the factors received by multiplier.

def main():
    env = xronos.Environment()
    counter1 = env.create_reactor("counter1", Counter)
    counter2 = env.create_reactor("counter2", Counter)
    printer = env.create_reactor("printer", Printer)
    multiplier = env.create_reactor("multiplier", Multiplier)
    env.connect(counter1.output, multiplier.factor1)
    env.connect(
        counter2.output, multiplier.factor2, delay=datetime.timedelta(milliseconds=200)
    )
    env.connect(multiplier.product, printer.input_)
    env.execute()

This produces the following output.

$ python ports.py
printer received 3 at 0:00:00.200000
printer received 8 at 0:00:00.300000
printer received 15 at 0:00:00.400000
printer received 24 at 0:00:00.500000
printer received 35 at 0:00:00.600000
printer received 48 at 0:00:00.700000
printer received 63 at 0:00:00.800000
...