Library Reactors

class xronos.lib.ConsoleInput(parser: Callable[[str], T])

A reactor that reads and parses lines of text from stdin.

It is derived from ExternalInput and provides it a generator that read Reads lines of text from stdin, applies the provided parser function and sends the results via the output port. If the parser function raises a ValueError, the input is discarded and the reactor will continue reading inputs. If the parser raises an RequestShutdown exception, the reactor will stop reading inputs and request a shutdown.

A simple example parser function that parses integers and a exit keyword from the console input is given below:

def simple_parser(x: str) -> int:
    if x == "exit":
        raise ConsoleInput.RequestShutdown
    else:
        return int(x)
Parameters:

parser (Callable[[str], T]) – A function that parses the console input and returns the parsed value. Raise ValueError to discard the input, and raise RequestShutdown to shutdown the program.

output

A port that forwards the parsed input.

Type:

xronos.OutputPort[T]

exception RequestShutdown

An exception that can be raised by the parser to request a shutdown.

class xronos.lib.ConstSource(value: TOutput, inhibit: bool = False)

A reactor that produces a constant value when triggered.

Parameters:
  • value (TOutput) – The value to output at each triggered event.

  • inhibit (bool) – Initialize with output inhibited.

class xronos.lib.ExternalInput(read_input: Generator[T])

A reactor that reads inputs from an external source.

This reactor can perform blocking I/O operations to read inputs from external sources such as files, network sockets, stdin, etc. The external inputs are scheduled on a xronos.PhysicalEvent and forwarded to the output port.

The reactor reads inputs using the provided read_input() generator. This generator should perform the necessary setup and teardown operations and yield the input values read from the external source.

Two simple example generators are given below.

def read_input() -> Generator[str]:
    with open("input.txt") as f:
        yield from f
Parameters:

read_input (Generator[T]) – A generator that reads inputs from an external source and yields them. The generator is expected to perform the necessary setup and teardown operations. See the example above.

output

Emits the read inputs as events.

Type:

OutputPort[T]

exception xronos.lib.OutputDiscardedWarning(message: str | None, fqn: str | None, value: Any | None, timestamp: datetime | None)

Warning that output was discarded on an output port.

Parameters:
  • message (str | None) – the warning message.

  • fqn (str | None) – fully qualified name of the reactor element that produced the output.

  • value (Any | None) – value that was discarded.

  • timestamp (datetime | None) – the timestamp at which the value was discarded.

class xronos.lib.RampSource[Toutput](initial_value: TRealOutput = 0, successor: Callable[[TRealOutput], TRealOutput] | None = None, inhibit: bool = False)

A reactor that produces a ramp output when triggered.

If no arguments are provided, this source will produce an integer ramp starting at 0 and incrementing by 1 at each trigger.

Custom ramp behavior may be implemented by providing the successor function successor(TRealOutput) -> TRealOutput. This method should be stateless and reentrant to ensure thread safety.

Parameters:
  • initial_value (TRealOutput) – The value to output at each timer event.

  • successor (Callable[[TRealOutput], TRealOutput] | None) – Successor operator for the type.

  • inhibit (bool) – Initialize with output inhibited.

class xronos.lib.SocketInput(host: str, port: int, mode: Mode, buffer_size: int = 1024, reconnect: bool = True, reconnect_interval: float = 0.5, reconnect_only_first_time: bool = False)

A reactor that reads inputs from a network socket.

This reactor opens a network socket and reads bytes from it. The bytes are forwarded to the output port. The reactor can be configured to open either a TCP server, a TCP client or a UDP listener. Deserialization of the data must be handled by the reactor connected to the output port.

In TCP client mode, if reconnect is set to True, the reactor will keep trying to connect to the server if it gets a ConnectionRefusedError. Once connected, it will read data until the server closes the connection, at which point it will try to reconnect, if reconnect is set to True. reconnect_interval is the interval between reconnection attempts. If reconnect_only_first_time is set to True, then reconnects are only performed the first time the connection is opened.

In TCP server mode, the reactor will open a server socket and accept incoming connections. It will read data from each client until the connection is closed. If multiple clients are connected, then the data from the different clients will appear interleaved on the single output port output.

In UDP listener mode, the reactor will open a socket and listen for incoming data coming from any client.

Parameters:
  • host (str) – The host to connect to or bind to.

  • port (int) – The port to connect to or bind to.

  • mode (SocketInput.Mode) – The mode of operation. It can be one of the following: “tcp_server”, “tcp_client” or “udp_listener”.

  • buffer_size (int) – The size of the buffer used to read data from the socket.

  • reconnect (bool) – Whether to reconnect to the server if the connection is closed. This is only applicable in TCP client mode.

  • reconnect_interval (float) – The interval in seconds between reconnection attempts. This is only applicable in TCP client mode.

  • reconnect_only_first_time (bool) – Whether to reconnect only the first time the connection is opened. This is only applicable in TCP client mode.

output

A port that forwards any bytes received on the socket.

Type:

xronos.OutputPort[T]

class Mode(*values)
class xronos.lib.StartupSource[Toutput](value: TOutput)

A reactor that produces a single output at startup.

Parameters:

value (TOutput) – The value to output at startup

class xronos.lib.SuccessorSource(initial_value: TOutput, successor: Callable[[TOutput], TOutput], inhibit: bool = False)

A reactor that produces a successive output when triggered.

Custom successor behavior may be implemented by providing a successor function successor(TOutput) -> TOutput. This method should be stateless, reentrant and artifact-free to ensure thread-safety.

Parameters:
  • initial_value (TOutput) – The initial value of the source.

  • successor (Callable[[TOutput], TOutput]) – Successor operator for the type.

  • inhibit (bool) – Initialize with output inhibited.

count() int

Number of times an output has been produced.

This count is reset if the reset port is triggered.

class xronos.lib.TimerSource[Toutput](period: timedelta, inhibit: bool = False)

Simple reactor that produces a pure (valueless) event from a periodic timer.

Parameters:
  • period (timedelta) – The interval at which the timer event is triggered.

  • inhibit (bool) – Initialize with output inhibited.