Dependency Cycles
When a reaction writes to a port that (directly or indirectly) triggers another reaction, the runtime must establish an ordering between them. It infers this ordering from two sources:
Port connections: if reaction A writes to a port connected to reaction B’s trigger, A must execute before B.
Reaction order: within the same reactor, reactions declared earlier take priority over reactions declared later at the same logical time.
A dependency cycle occurs when these two sources of ordering information
contradict each other, making it impossible to find a valid execution order.
The runtime detects this at startup and raises a ValidationError.
A range sensor example
Consider a Controller reactor that periodically polls a RangeSensor by
sending requests through a port and receiving distance readings back through
another port. The controller shuts down once the sensor reports an obstacle
within a safe threshold.
import datetime
import xronos
SAFE_DISTANCE_M = 1.0
INITIAL_DISTANCE_M = 5.0
DISTANCE_DECREMENT_M = 0.1
class RangeSensor(xronos.Reactor):
request = xronos.InputPortDeclaration()
response = xronos.OutputPortDeclaration()
def __init__(self):
super().__init__()
self._distance = INITIAL_DISTANCE_M
@xronos.reaction
def on_request(self, interface):
interface.add_trigger(self.request)
response_effect = interface.add_effect(self.response)
def handler():
self._distance = max(0.0, self._distance - DISTANCE_DECREMENT_M)
response_effect.set(self._distance)
return handler
class Controller(xronos.Reactor):
request = xronos.OutputPortDeclaration()
response = xronos.InputPortDeclaration()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self):
super().__init__()
self._obstacle_detected = False
@xronos.reaction
def handle_response(self, interface):
response_trigger = interface.add_trigger(self.response)
shutdown_effect = interface.add_effect(self.shutdown)
def handler():
distance = response_trigger.get()
print(f"{self.get_time_since_startup()}: range reading = {distance:.2f} m")
if distance < SAFE_DISTANCE_M and not self._obstacle_detected:
self._obstacle_detected = True
print(
f"{self.get_time_since_startup()}: WARNING — obstacle detected "
f"at {distance:.2f} m"
)
shutdown_effect.trigger_shutdown()
return handler
@xronos.reaction
def send_request(self, interface):
interface.add_trigger(self._timer)
request_effect = interface.add_effect(self.request)
return lambda: request_effect.set(True)
def main():
env = xronos.Environment()
controller = env.create_reactor("controller", Controller)
sensor = env.create_reactor("sensor", RangeSensor)
env.connect(controller.request, sensor.request)
env.connect(sensor.response, controller.response)
env.execute()
if __name__ == "__main__":
main()
import datetime
from typing import Callable
import xronos
SAFE_DISTANCE_M = 1.0
INITIAL_DISTANCE_M = 5.0
DISTANCE_DECREMENT_M = 0.1
class RangeSensor(xronos.Reactor):
request = xronos.InputPortDeclaration[bool]()
response = xronos.OutputPortDeclaration[float]()
def __init__(self) -> None:
super().__init__()
self._distance = INITIAL_DISTANCE_M
@xronos.reaction
def on_request(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self.request)
response_effect = interface.add_effect(self.response)
def handler() -> None:
self._distance = max(0.0, self._distance - DISTANCE_DECREMENT_M)
response_effect.set(self._distance)
return handler
class Controller(xronos.Reactor):
request = xronos.OutputPortDeclaration[bool]()
response = xronos.InputPortDeclaration[float]()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self) -> None:
super().__init__()
self._obstacle_detected = False
@xronos.reaction
def handle_response(
self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
response_trigger = interface.add_trigger(self.response)
shutdown_effect = interface.add_effect(self.shutdown)
def handler() -> None:
distance = response_trigger.get()
print(f"{self.get_time_since_startup()}: range reading = {distance:.2f} m")
if distance < SAFE_DISTANCE_M and not self._obstacle_detected:
self._obstacle_detected = True
print(
f"{self.get_time_since_startup()}: WARNING — obstacle detected "
f"at {distance:.2f} m"
)
shutdown_effect.trigger_shutdown()
return handler
@xronos.reaction
def send_request(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self._timer)
request_effect = interface.add_effect(self.request)
return lambda: request_effect.set(True)
def main() -> None:
env = xronos.Environment()
controller = env.create_reactor("controller", Controller)
sensor = env.create_reactor("sensor", RangeSensor)
env.connect(controller.request, sensor.request)
env.connect(sensor.response, controller.response)
env.execute()
if __name__ == "__main__":
main()
RangeSensor exposes a request input and a response output. Each time it
receives a request it decrements the simulated distance and writes the new
value to response. Controller defines two reactions. handle_response is
triggered by response and reads the distance. send_request is triggered by
a periodic timer and writes to request.
Copy the program into a file called range_sensor.py and run it — it fails
immediately:
$ python range_sensor.py
[ERROR] There is a dependency cycle involving the following reactions:
- controller.handle_response
- sensor.on_request
- controller.send_request
The cycle arises as follows:
handle_responseis declared beforesend_requestinsideController, so the runtime infers thathandle_responsemust execute beforesend_requestat each timestamp.But
send_requestwrites tocontroller.request, which is connected tosensor.request, which triggerssensor.on_request, which writes tosensor.response, which triggershandle_response. Sosend_requestmust execute beforehandle_response.
These two constraints directly contradict each other.
Fix 1: Reorder the reactions
The simplest fix is to declare send_request before handle_response. The
runtime then infers that send_request runs first, which is consistent with
the data flow through the ports. This also reflects the causal order of the
interaction: the controller first sends a request, then handles the response.
Move the send_request reaction above handle_response in Controller:
class Controller(xronos.Reactor):
request = xronos.OutputPortDeclaration()
response = xronos.InputPortDeclaration()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self):
super().__init__()
self._obstacle_detected = False
@xronos.reaction
def send_request(self, interface):
interface.add_trigger(self._timer)
request_effect = interface.add_effect(self.request)
return lambda: request_effect.set(True)
@xronos.reaction
def handle_response(self, interface):
response_trigger = interface.add_trigger(self.response)
shutdown_effect = interface.add_effect(self.shutdown)
def handler():
distance = response_trigger.get()
print(f"{self.get_time_since_startup()}: range reading = {distance:.2f} m")
if distance < SAFE_DISTANCE_M and not self._obstacle_detected:
self._obstacle_detected = True
print(
f"{self.get_time_since_startup()}: WARNING — obstacle detected "
f"at {distance:.2f} m"
)
shutdown_effect.trigger_shutdown()
return handler
class Controller(xronos.Reactor):
request = xronos.OutputPortDeclaration[bool]()
response = xronos.InputPortDeclaration[float]()
_timer = xronos.PeriodicTimerDeclaration(
period=datetime.timedelta(milliseconds=100)
)
def __init__(self) -> None:
super().__init__()
self._obstacle_detected = False
@xronos.reaction
def send_request(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self._timer)
request_effect = interface.add_effect(self.request)
return lambda: request_effect.set(True)
@xronos.reaction
def handle_response(
self, interface: xronos.ReactionInterface
) -> Callable[[], None]:
response_trigger = interface.add_trigger(self.response)
shutdown_effect = interface.add_effect(self.shutdown)
def handler() -> None:
distance = response_trigger.get()
print(f"{self.get_time_since_startup()}: range reading = {distance:.2f} m")
if distance < SAFE_DISTANCE_M and not self._obstacle_detected:
self._obstacle_detected = True
print(
f"{self.get_time_since_startup()}: WARNING — obstacle detected "
f"at {distance:.2f} m"
)
shutdown_effect.trigger_shutdown()
return handler
The program now runs and prints distance readings until an obstacle is detected:
$ python range_sensor.py
0:00:00: range reading = 4.90 m
0:00:00.100000: range reading = 4.80 m
0:00:00.200000: range reading = 4.70 m
...
0:00:03.900000: range reading = 1.00 m
0:00:04: range reading = 0.90 m
0:00:04: WARNING — obstacle detected at 0.90 m
Fix 2: Introduce a delay
Reordering reactions is not always possible — for instance, if handle_response
genuinely needs to run before send_request for some reason, or if the cycle
spans multiple reactors in a way that makes reordering impractical. In those
cases, a delayed connection breaks the cycle.
A delayed connection does not deliver the message at the current logical time step. Instead it schedules it for a future time step. Since the message now arrives in a strictly later time step, there is no same-step ordering constraint between the sender and the receiver, and the cycle disappears.
The following program keeps the original reaction order from range_sensor_cycle.py
(with handle_response first) and instead adds a 1 ms delay to the connection
from controller.request to sensor.request:
def main():
env = xronos.Environment()
controller = env.create_reactor("controller", Controller)
sensor = env.create_reactor("sensor", RangeSensor)
env.connect(
controller.request,
sensor.request,
delay=datetime.timedelta(milliseconds=1),
)
env.connect(sensor.response, controller.response)
env.execute()
def main() -> None:
env = xronos.Environment()
controller = env.create_reactor("controller", Controller)
sensor = env.create_reactor("sensor", RangeSensor)
env.connect(
controller.request,
sensor.request,
delay=datetime.timedelta(milliseconds=1),
)
env.connect(sensor.response, controller.response)
env.execute()
The delay breaks the same-step dependency between send_request and
handle_response. The program runs correctly, though each reading now arrives
1 ms after the request was sent rather than at the same timestamp.
$ python range_sensor.py
0:00:00.001000: range reading = 4.90 m
0:00:00.101000: range reading = 4.80 m
0:00:00.201000: range reading = 4.70 m
...
0:00:03.901000: range reading = 1.00 m
0:00:04.001000: range reading = 0.90 m
0:00:04.001000: WARNING — obstacle detected at 0.90 m
Note
Reordering reactions is generally preferred when possible, because it preserves the zero-delay semantics of the connection. Delays change the timing of messages and should be chosen deliberately.
Fix 3: Decouple request and response in the sensor
Another approach is to break the cycle inside RangeSensor itself, without
changing the Controller reaction order or adding a connection delay. Instead
of responding to a request in the same reaction that receives it, the sensor
uses a ProgrammableTimer to schedule the response 1 ms later.
This separates the two concerns into two reactions:
send_response— triggered by the programmable timer — sends the distance reading.on_request— triggered by therequestport — updates the distance and schedules the timer.
send_response is declared before on_request. This ordering conveys that
the sensor always finishes delivering any pending response before it accepts a
new request at the same timestamp.
Note
send_response must be declared before on_request. If their order were
reversed, the runtime would infer that Controller first receives and then sends,
but also sensor first receives and then sends, creating a causality problem.
The Controller keeps the original declaration order from range_sensor_cycle.py
(handle_response before send_request) and the connections carry no delay:
class RangeSensor(xronos.Reactor):
request = xronos.InputPortDeclaration()
response = xronos.OutputPortDeclaration()
_send_timer = xronos.ProgrammableTimerDeclaration()
def __init__(self):
super().__init__()
self._distance = INITIAL_DISTANCE_M
@xronos.reaction
def send_response(self, interface):
interface.add_trigger(self._send_timer)
response_effect = interface.add_effect(self.response)
def handler():
response_effect.set(self._distance)
return handler
@xronos.reaction
def on_request(self, interface):
interface.add_trigger(self.request)
send_timer_effect = interface.add_effect(self._send_timer)
def handler():
self._distance = max(0.0, self._distance - DISTANCE_DECREMENT_M)
send_timer_effect.schedule(None, datetime.timedelta(milliseconds=1))
return handler
class RangeSensor(xronos.Reactor):
request = xronos.InputPortDeclaration[bool]()
response = xronos.OutputPortDeclaration[float]()
_send_timer = xronos.ProgrammableTimerDeclaration[None]()
def __init__(self) -> None:
super().__init__()
self._distance = INITIAL_DISTANCE_M
@xronos.reaction
def send_response(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self._send_timer)
response_effect = interface.add_effect(self.response)
def handler() -> None:
response_effect.set(self._distance)
return handler
@xronos.reaction
def on_request(self, interface: xronos.ReactionInterface) -> Callable[[], None]:
interface.add_trigger(self.request)
send_timer_effect = interface.add_effect(self._send_timer)
def handler() -> None:
self._distance = max(0.0, self._distance - DISTANCE_DECREMENT_M)
send_timer_effect.schedule(None, datetime.timedelta(milliseconds=1))
return handler
The cycle disappears because on_request no longer writes directly to
response. Its only effect is scheduling _send_timer, which fires at a
strictly later timestamp. The response therefore arrives at the controller in a
new timestamp, with no same-step ordering constraint between the two reactors.
$ python range_sensor.py
0:00:00.001000: range reading = 4.90 m
0:00:00.101000: range reading = 4.80 m
0:00:00.201000: range reading = 4.70 m
...
0:00:03.901000: range reading = 1.00 m
0:00:04.001000: range reading = 0.90 m
0:00:04.001000: WARNING — obstacle detected at 0.90 m


