Library Reactors
- class xronos.lib.ConsoleInput(parser: Callable[[str], T])
A reactor that reads and parses lines of text from stdin.
It is derived from
ExternalInput
and provides it a generator that read Reads lines of text from stdin, applies the provided parser function and sends the results via theoutput
port. If the parser function raises aValueError
, the input is discarded and the reactor will continue reading inputs. If the parser raises anRequestShutdown
exception, the reactor will stop reading inputs and request a shutdown.A simple example parser function that parses integers and a exit keyword from the console input is given below:
def simple_parser(x: str) -> int: if x == "exit": raise ConsoleInput.RequestShutdown else: return int(x)
- Parameters:
parser (Callable[[str], T]) – A function that parses the console input and returns the parsed value. Raise
ValueError
to discard the input, and raiseRequestShutdown
to shutdown the program.
- output
A port that forwards the parsed input.
- Type:
- exception RequestShutdown
An exception that can be raised by the parser to request a shutdown.
- class xronos.lib.ConstSource(value: TOutput, inhibit: bool = False)
A reactor that produces a constant value when triggered.
- Parameters:
value (TOutput) – The value to output at each triggered event.
inhibit (bool) – Initialize with output inhibited.
- class xronos.lib.ExternalInput(read_input: Generator[T])
A reactor that reads inputs from an external source.
This reactor can perform blocking I/O operations to read inputs from external sources such as files, network sockets, stdin, etc. The external inputs are scheduled on a
xronos.PhysicalEvent
and forwarded to the output port.The reactor reads inputs using the provided
read_input()
generator. This generator should perform the necessary setup and teardown operations and yield the input values read from the external source.Two simple example generators are given below.
def read_input() -> Generator[str]: with open("input.txt") as f: yield from f
def read_input() -> Generator[str]: f = open("input.txt") try: for line in f: yield line finally: f.close()
- Parameters:
read_input (Generator[T]) – A generator that reads inputs from an external source and yields them. The generator is expected to perform the necessary setup and teardown operations. See the example above.
- output
Emits the read inputs as events.
- Type:
OutputPort[T]
- exception xronos.lib.OutputDiscardedWarning(message: str | None, fqn: str | None, value: Any | None, timestamp: datetime | None)
Warning that output was discarded on an output port.
- class xronos.lib.RampSource[Toutput](initial_value: TRealOutput = 0, successor: Callable[[TRealOutput], TRealOutput] | None = None, inhibit: bool = False)
A reactor that produces a ramp output when triggered.
If no arguments are provided, this source will produce an integer ramp starting at 0 and incrementing by 1 at each trigger.
Custom ramp behavior may be implemented by providing the successor function successor(TRealOutput) -> TRealOutput. This method should be stateless and reentrant to ensure thread safety.
- class xronos.lib.SocketInput(host: str, port: int, mode: Mode, buffer_size: int = 1024, reconnect: bool = True, reconnect_interval: float = 0.5, reconnect_only_first_time: bool = False)
A reactor that reads inputs from a network socket.
This reactor opens a network socket and reads bytes from it. The bytes are forwarded to the output port. The reactor can be configured to open either a TCP server, a TCP client or a UDP listener. Deserialization of the data must be handled by the reactor connected to the output port.
In TCP client mode, if
reconnect
is set to True, the reactor will keep trying to connect to the server if it gets aConnectionRefusedError
. Once connected, it will read data until the server closes the connection, at which point it will try to reconnect, ifreconnect
is set to True.reconnect_interval
is the interval between reconnection attempts. Ifreconnect_only_first_time
is set to True, then reconnects are only performed the first time the connection is opened.In TCP server mode, the reactor will open a server socket and accept incoming connections. It will read data from each client until the connection is closed. If multiple clients are connected, then the data from the different clients will appear interleaved on the single output port
output
.In UDP listener mode, the reactor will open a socket and listen for incoming data coming from any client.
- Parameters:
host (str) – The host to connect to or bind to.
port (int) – The port to connect to or bind to.
mode (SocketInput.Mode) – The mode of operation. It can be one of the following: “tcp_server”, “tcp_client” or “udp_listener”.
buffer_size (int) – The size of the buffer used to read data from the socket.
reconnect (bool) – Whether to reconnect to the server if the connection is closed. This is only applicable in TCP client mode.
reconnect_interval (float) – The interval in seconds between reconnection attempts. This is only applicable in TCP client mode.
reconnect_only_first_time (bool) – Whether to reconnect only the first time the connection is opened. This is only applicable in TCP client mode.
- output
A port that forwards any bytes received on the socket.
- Type:
- class Mode(*values)
- class xronos.lib.StartupSource[Toutput](value: TOutput)
A reactor that produces a single output at startup.
- Parameters:
value (TOutput) – The value to output at startup
- class xronos.lib.SuccessorSource(initial_value: TOutput, successor: Callable[[TOutput], TOutput], inhibit: bool = False)
A reactor that produces a successive output when triggered.
Custom successor behavior may be implemented by providing a successor function successor(TOutput) -> TOutput. This method should be stateless, reentrant and artifact-free to ensure thread-safety.
- Parameters: