Ports and Connections
Reactors may send and receive messages to or from other reactors using input and output ports. The ports of a reactor define its interface. Outputs of one reactor may be connected to the inputs of others, which gives a mechanism for composing programs from simple building blocks.
Simple Connections
Consider the following reactor.
class Counter(xronos.Reactor):
output = xronos.OutputPortDeclaration()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self) -> None:
super().__init__()
self._count = 0
@xronos.reaction
def count(self, interface):
interface.add_trigger(self._timer)
output_effect = interface.add_effect(self.output)
def handler() -> None:
self._count += 1
output_effect.set(self._count)
return handler
class Counter(xronos.Reactor):
output = xronos.OutputPortDeclaration[int]()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self) -> None:
super().__init__()
self._count = 0
@xronos.reaction
def count(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self._timer)
output_effect = interface.add_effect(self.output)
def handler() -> None:
self._count += 1
output_effect.set(self._count)
return handler
It implements a counting mechanism, similar to the Timed
reactor that we
considered before. In addition to the timer, it also declares an output port
called output
using an OutputPortDeclaration
. The increment
reaction is triggered by the timer. In addition to the trigger, the reaction
also declares an effect using add_effect()
.
This returns a PortEffect
object, which the handler may use to
write a value to the port using the set()
method.
Setting a value on the port notifies any reactions that are triggered by a connected port.
For instance, we can define a Printer
reactor that prints any object that it receives.
class Printer(xronos.Reactor):
input_ = xronos.InputPortDeclaration()
@xronos.reaction
def print(self, interface):
input_trigger = interface.add_trigger(self.input_)
return lambda: print(
f"{self.name} received {input_trigger.get()} at "
f"{self.get_time_since_startup()}"
)
class Printer(xronos.Reactor):
input_ = xronos.InputPortDeclaration[int]()
@xronos.reaction
def print(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
input_trigger = interface.add_trigger(self.input_)
return lambda: print(
f"{self.name} received {input_trigger.get()} at "
f"{self.get_time_since_startup()}"
)
This reactor uses an InputPortDeclaration
to declare an input
port called input_
. The reaction uses
add_trigger()
to declare the input port as its
trigger. The returned Trigger
object can be used to read the
received value using the get()
method.
Now we can write a main()
function that instantiates and connects both reactors.
def main():
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
env.connect(counter.output, printer.input_)
env.execute()
if __name__ == "__main__":
main()
def main() -> None:
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
env.connect(counter.output, printer.input_)
env.execute()
if __name__ == "__main__":
main()
Copy the two reactors and the main()
function into a new file called
ports.py
. Do not forget to add the required imports:
import datetime
import xronos
import datetime
from typing import Callable
import xronos
Executing the program yields output similar to the following:
$ python ports.py
printer received 1 at 0:00:00
printer received 2 at 0:00:00.100000
printer received 3 at 0:00:00.200000
printer received 4 at 0:00:00.300000
printer received 5 at 0:00:00.400000
printer received 6 at 0:00:00.500000
printer received 7 at 0:00:00.600000
printer received 8 at 0:00:00.700000
printer received 9 at 0:00:00.800000
...
This program will execute indefinitely as the timer will always produce new
events and we do not use request_shutdown()
. Use
Ctrl+C to abort the execution. If the program defines any
reactions to shutdown
, these will be invoked after the
termination signal is received.
Note that printer
receives the first message right at startup, and all
subsequent messages are spaced perfectly by the timer period of 100ms. As
mentioned in Periodic Timers, time does not advance while reactions execute. This
is also true for any messages sent via ports. The timestamp of any output
produced by a reaction is equal to the timestamp of its trigger.
Delayed Connections
Sometimes it is useful to delay messages so that they are processed at a later
point in time. The connect()
method accepts an
optional delay
argument. If it is set, the connection will output the message
on the receiving side precisely at get_time() + delay
.
For instance, we can make the following modification to delay all messages by 1s:
def main():
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
env.connect(counter.output, printer.input_, delay=datetime.timedelta(seconds=1))
env.execute()
def main():
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
env.connect(counter.output, printer.input_, delay=datetime.timedelta(seconds=1))
env.execute()
This produces the following output after waiting for 1s:
$ python ports.py
printer received 1 at 0:00:01
printer received 2 at 0:00:01.100000
printer received 3 at 0:00:01.200000
printer received 4 at 0:00:01.300000
printer received 5 at 0:00:01.400000
printer received 6 at 0:00:01.500000
printer received 7 at 0:00:01.600000
printer received 8 at 0:00:01.700000
printer received 9 at 0:00:01.800000
...
Multiple Inputs
Reactors may define an arbitrary number of input and output ports. While in
prior examples all reactions had a single trigger, it is also possible to
trigger reactions by multiple inputs. Consider the following Multiplier
reactor.
class Multiplier(xronos.Reactor):
factor1 = xronos.InputPortDeclaration()
factor2 = xronos.InputPortDeclaration()
product = xronos.OutputPortDeclaration()
@xronos.reaction
def multiply(self, interface):
factor1_trigger = interface.add_trigger(self.factor1)
factor2_trigger = interface.add_trigger(self.factor2)
product_effect = interface.add_effect(self.product)
def handler():
if factor1_trigger.is_present() and factor2_trigger.is_present():
product_effect.set(factor1_trigger.get() * factor2_trigger.get())
return handler
class Multiplier(xronos.Reactor):
factor1 = xronos.InputPortDeclaration[int]()
factor2 = xronos.InputPortDeclaration[int]()
product = xronos.OutputPortDeclaration[int]()
@xronos.reaction
def multiply(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
factor1_trigger = interface.add_trigger(self.factor1)
factor2_trigger = interface.add_trigger(self.factor2)
product_effect = interface.add_effect(self.product)
def handler() -> None:
if factor1_trigger.is_present() and factor2_trigger.is_present():
product_effect.set(factor1_trigger.get() * factor2_trigger.get())
return handler
It declares two input ports factor1
and factor2
as well as the output port
product
. The multiply
reaction declares both input ports as triggers.
Consequently, the reaction executes when either or both of the inputs receive a
message. The is_present()
method can be used to check
if the trigger was activated and carries a value.
Note
Calling get()
while is_present()
returns False
raises an AbsentError
.
If both triggers are present, the handler writes the product to the output port
using the corresponding effect. If only one of the triggers is present, then no
output is produced. Note that it is also possible to define other strategies, such
as assuming a default value or storing the last observed value on self
.
Now we have multiple options for connecting the Multiplier
reactor with the
Counter
and Printer
reactors. For instance, we can create the following pattern.
def main():
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter.output, multiplier.factor1)
env.connect(counter.output, multiplier.factor2)
env.connect(multiplier.product, printer.input_)
env.execute()
def main() -> None:
env = xronos.Environment()
counter = env.create_reactor("counter", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter.output, multiplier.factor1)
env.connect(counter.output, multiplier.factor2)
env.connect(multiplier.product, printer.input_)
env.execute()
This connects counter.output
to inputs of multiplier
, so that the same
messages are delivered to both input ports. When executed, the program prints
square numbers.
$ python ports.py
printer received 1 at 0:00:00
printer received 4 at 0:00:00.100000
printer received 9 at 0:00:00.200000
printer received 16 at 0:00:00.300000
printer received 25 at 0:00:00.400000
printer received 36 at 0:00:00.500000
printer received 49 at 0:00:00.600000
printer received 64 at 0:00:00.700000
printer received 81 at 0:00:00.800000
...
We can also achieve the same behavior using two instances of counter
.
def main():
env = xronos.Environment()
counter1 = env.create_reactor("counter1", Counter)
counter2 = env.create_reactor("counter2", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter1.output, multiplier.factor1)
env.connect(counter2.output, multiplier.factor2)
env.connect(multiplier.product, printer.input_)
env.execute()
def main():
env = xronos.Environment()
counter1 = env.create_reactor("counter1", Counter)
counter2 = env.create_reactor("counter2", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter1.output, multiplier.factor1)
env.connect(counter2.output, multiplier.factor2)
env.connect(multiplier.product, printer.input_)
env.execute()
Independent of how the reactors are connected, the runtime analyzes the
connections as well as the declared triggers and effects of reactions to ensure
repeatable behavior. Concretely, the multiply
reaction does not need to wait
for inputs to arrive. The runtime ensures that any reactions that may have an
effect on multiply
’s triggers are executed first. Thus, the values and the
is_present
status of any trigger is guaranteed to be known when the handler
executes.
Finally, we may use delayed connections to offset the factors received by multiplier
.
def main():
env = xronos.Environment()
counter1 = env.create_reactor("counter1", Counter)
counter2 = env.create_reactor("counter2", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter1.output, multiplier.factor1)
env.connect(
counter2.output, multiplier.factor2, delay=datetime.timedelta(milliseconds=200)
)
env.connect(multiplier.product, printer.input_)
env.execute()
def main():
env = xronos.Environment()
counter1 = env.create_reactor("counter1", Counter)
counter2 = env.create_reactor("counter2", Counter)
printer = env.create_reactor("printer", Printer)
multiplier = env.create_reactor("multiplier", Multiplier)
env.connect(counter1.output, multiplier.factor1)
env.connect(
counter2.output, multiplier.factor2, delay=datetime.timedelta(milliseconds=200)
)
env.connect(multiplier.product, printer.input_)
env.execute()
This produces the following output.
$ python ports.py
printer received 3 at 0:00:00.200000
printer received 8 at 0:00:00.300000
printer received 15 at 0:00:00.400000
printer received 24 at 0:00:00.500000
printer received 35 at 0:00:00.600000
printer received 48 at 0:00:00.700000
printer received 63 at 0:00:00.800000
...