### Install for MkDocs Documentation Build Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Install the necessary dependencies to build the documentation using MkDocs. ```bash python -m pip install -e .[dev-mkdocs] ``` -------------------------------- ### Hello World Example Source: https://frequenz-floss.github.io/frequenz-channels-python A basic example demonstrating sending and receiving a single message using an Anycast channel. ```python import asyncio from frequenz.channels import Anycast async def main() -> None: hello_channel = Anycast[str](name="hello-world-channel") sender = hello_channel.new_sender() receiver = hello_channel.new_receiver() await sender.send("Hello World!") message = await receiver.receive() print(message) asyncio.run(main()) ``` -------------------------------- ### Install for Local Development Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Install the project in editable mode for local development, including all dependencies. ```bash python -m pip install -e . ``` -------------------------------- ### Install and Run with Nox Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Install nox-aware dependencies and run checks using nox, which creates its own virtual environments. ```bash python -m pip install .[dev-noxfile] nox ``` -------------------------------- ### Basic Channel Usage Example Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Demonstrates creating tasks for sending and receiving messages using anycast channels. This example requires an asyncio event loop to run. ```python task_group.create_task(send("sender_2", acast.new_sender(), 20, 22)) task_group.create_task(recv("receiver_1", acast.new_receiver())) task_group.create_task(recv("receiver_2", acast.new_receiver())) asyncio.run(main()) ``` -------------------------------- ### Install Frequenz Channels using Pip Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/installation Install the Frequenz Channels package using pip within your activated virtual environment. Ensure pip is installed first. ```bash python3 -m pip install frequenz-channels ``` -------------------------------- ### Install with Development Dependencies Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Install the project in editable mode with all development dependencies like mypy, pylint, and pytest. ```bash python -m pip install -e .[dev] ``` -------------------------------- ### Hello World with Anycast Channel Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/quick-start A basic example demonstrating sending and receiving a string message using an Anycast channel. ```python import asyncio from frequenz.channels import Anycast async def main() -> None: hello_channel = Anycast[str](name="hello-world-channel") sender = hello_channel.new_sender() receiver = hello_channel.new_receiver() await sender.send("Hello World!") message = await receiver.receive() print(message) asyncio.run(main()) ``` -------------------------------- ### Verify Frequenz Channels Installation Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/installation Import the 'frequenz.channels' module in the Python interpreter to confirm that the installation was successful. ```python $ python3 Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import frequenz.channels >>> ``` -------------------------------- ### Example Cross-Architecture Dockerfile Name Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING An example illustrating the naming convention for a Dockerfile targeting an ARM64 architecture, Ubuntu 20.04 OS, and Python 3.11. ```text arm64-ubuntu-20.04-python-3.11.Dockerfile ``` -------------------------------- ### Initialize NopReceiver Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Initializes an instance of the NopReceiver. This is a basic setup step. ```python def __init__(self) -> None: """Initialize this instance.""" self._close_event: asyncio.Event = asyncio.Event() ``` -------------------------------- ### Serve Documentation with MkDocs Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Serve the project documentation locally for live preview during development. Changes to source files will be reflected live if installed with '-e'. ```bash mkdocs serve ``` -------------------------------- ### Install for Pytest Development Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Install runtime and test dependencies for manual pytest execution. This is also included in the general dev dependencies. ```bash python -m pip install .[dev-pytest] # included in .[dev] too # And for example pytest tests/test_*.py ``` -------------------------------- ### Install and Enable Direnv for Automatic Virtual Environment Management Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/installation Install the 'direnv' tool and configure it to automatically manage virtual environments for your project. This simplifies environment activation and deactivation. ```bash sudo apt install direnv # if you use Debian/Ubuntu mkdir my-channels-project cd my-channels-project echo "layout python python3" > .envrc direnv allow ``` -------------------------------- ### Check Python Version Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/installation Verify that Python version 3.11 or newer is installed. This is a prerequisite for installing Frequenz Channels. ```bash $ python3 --version Python 3.11.4 ``` -------------------------------- ### Example Usage of Pipe Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Demonstrates how to set up and use a Pipe to forward messages between two Broadcast channels using AsyncExitStack for resource management. ```python import asyncio from contextlib import closing, aclosing, AsyncExitStack from frequenz.channels import Broadcast, Pipe, Receiver async def main() -> None: # Channels, receivers and Pipe are in AsyncExitStack # to close and stop them at the end. async with AsyncExitStack() as stack: source_channel = await stack.enter_async_context( aclosing(Broadcast[int](name="source channel")) ) source_receiver = stack.enter_context(closing(source_channel.new_receiver())) forwarding_channel = await stack.enter_async_context( aclosing(Broadcast[int](name="forwarding channel")) ) await stack.enter_async_context( Pipe(source_receiver, forwarding_channel.new_sender()) ) receiver = stack.enter_context(closing(forwarding_channel.new_receiver())) source_sender = source_channel.new_sender() await source_sender.send(10) assert await receiver.receive() == 11 asyncio.run(main()) ``` -------------------------------- ### RelaySender Example Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Demonstrates how to use RelaySender to send a message to multiple channels simultaneously. Ensures messages are received by all connected receivers. ```python from frequenz.channels import Broadcast from frequenz.channels.experimental import RelaySender channel1: Broadcast[int] = Broadcast(name="channel1") channel2: Broadcast[int] = Broadcast(name="channel2") receiver1 = channel1.new_receiver() receiver2 = channel2.new_receiver() tee_sender = RelaySender(channel1.new_sender(), channel2.new_sender()) await tee_sender.send(5) assert await receiver1.receive() == 5 assert await receiver2.receive() == 5 ``` -------------------------------- ### Start Pipe Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Starts the pipe's operation if it is not already running. Creates a new asyncio task to run the pipe's internal logic. ```python async def start(self) -> None: """Start this pipe if it is not already running.""" if not self._task or self._task.done(): self._task = asyncio.create_task(self._run()) ``` -------------------------------- ### Timer with Auto-Start Disabled Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Illustrates creating a Timer with auto_start=False. The timer must be explicitly reset before it starts sending messages. ```python import asyncio from datetime import timedelta from frequenz.channels.timer import Timer async def main(): # Create a timer that sends messages every second, but does not start automatically timer = Timer(interval=timedelta(seconds=1), auto_start=False) # Trying to receive before reset will not work and will eventually time out or raise an error depending on the receive method used. # For demonstration, we'll just wait a bit to show it's not sending. print("Waiting for timer to start (it shouldn't yet)...") await asyncio.sleep(2) # Reset the timer to start it timer.reset() print("Timer reset and started.") # Now we can receive messages drift = await timer.receive() print(f"Received drift: {drift}") timer.stop() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Timer Initialization and Usage Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Demonstrates creating a Timer that sends messages every second. The timer starts automatically. It shows how to receive messages and handle the timer stopping. ```python import asyncio from datetime import timedelta from frequenz.channels import ReceiverStoppedError from frequenz.channels.timer import Timer async def main(): # Create a timer that sends a message every second timer = Timer(interval=timedelta(seconds=1)) # Receive 5 messages from the timer for _ in range(5): try: drift = await timer.receive() print(f"Received drift: {drift}") except ReceiverStoppedError: print("Timer stopped.") break # Stop the timer timer.stop() print("Timer stopped explicitly.") # Trying to receive after stopping will raise ReceiverStoppedError try: await timer.receive() except ReceiverStoppedError: print("Attempted to receive from stopped timer, as expected.") # Resetting the timer allows it to be started again timer.reset() print("Timer reset.") drift = await timer.receive() print(f"Received drift after reset: {drift}") timer.stop() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Comprehensive Frequenz Channels Showcase Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/quick-start An advanced example showcasing most of the library's main features, including multiple senders, receivers, control commands, and timers. ```python import asyncio from dataclasses import dataclass from datetime import timedelta from enum import Enum, auto from typing import assert_never from frequenz.channels import ( Anycast, Broadcast, Receiver, Sender, merge, select, selected_from, ) from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed class Command(Enum): PING = auto() STOP_SENDER = auto() class ReplyCommand(Enum): PONG = auto() @dataclass(frozen=True) class Reply: reply: ReplyCommand source: str async def send( sender: Sender[str], control_command: Receiver[Command], control_reply: Sender[Reply], ) -> None: """Send a counter value every second, until a stop command is received.""" print(f"{sender}: Starting") timer = Timer(timedelta(seconds=1.0), TriggerAllMissed()) counter = 0 async for selected in select(timer, control_command): if selected_from(selected, timer): print(f"{sender}: Sending {counter}") await sender.send(f"{sender}: {counter}") counter += 1 elif selected_from(selected, control_command): print(f"{sender}: Received command: {selected.message.name}") match selected.message: case Command.STOP_SENDER: print(f"{sender}: Stopping") break case Command.PING: print(f"{sender}: Ping received, reply with pong") await control_reply.send(Reply(ReplyCommand.PONG, str(sender))) case _ as unknown: assert_never(unknown) print(f"{sender}: Finished") async def receive( receivers: list[Receiver[str]], control_command: Receiver[Command], control_reply: Sender[Reply], ) -> None: """Receive data from multiple channels, until no more data is received for 2 seconds.""" print("receive: Starting") timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift()) print(f"{timer=}") merged = merge(*receivers) async for selected in select(merged, timer, control_command): if selected_from(selected, merged): message = selected.message print(f"receive: Received {message=}") timer.reset() print(f"{timer=}") elif selected_from(selected, control_command): print(f"receive: received command: {selected.message.name}") match selected.message: case Command.PING: print("receive: Ping received, reply with pong") await control_reply.send(Reply(ReplyCommand.PONG, "receive")) case Command.STOP_SENDER: pass # Ignore case _ as unknown: assert_never(unknown) elif selected_from(selected, timer): drift = selected.message print( f"receive: No data received for {timer.interval + drift} seconds, " "giving up" ) break print("receive: Finished") async def main() -> None: data_channel_1 = Anycast[str](name="data-channel-1") data_channel_2 = Anycast[str](name="data-channel-2") command_channel = Broadcast[Command](name="control-channel") # (1)! reply_channel = Anycast[Reply](name="reply-channel") async with asyncio.TaskGroup() as tasks: tasks.create_task( send( data_channel_1.new_sender(), command_channel.new_receiver(), reply_channel.new_sender(), ), name="send-channel-1", ) tasks.create_task( send( data_channel_2.new_sender(), command_channel.new_receiver(), reply_channel.new_sender(), ), name="send-channel-2", ) tasks.create_task( receive( [data_channel_1.new_receiver(), data_channel_2.new_receiver()], command_channel.new_receiver(), reply_channel.new_sender(), ), name="receive", ) control_sender = command_channel.new_sender() reply_receiver = reply_channel.new_receiver() # Send a ping command to all tasks and wait for the replies await control_sender.send(Command.PING) print(f"main: {await reply_receiver.receive()}") print(f"main: {await reply_receiver.receive()}") ``` -------------------------------- ### Timer with Missed Tick Policy Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Shows how to configure a Timer with a specific missed tick policy. This example uses 'accumulate' to show how missed ticks can be handled. ```python import asyncio from datetime import timedelta from frequenz.channels.timer import Timer, MissedTickPolicy async def main(): # Create a timer that sends messages every 0.1 seconds, with a policy to accumulate missed ticks # We will intentionally block the loop to cause missed ticks. timer = Timer(interval=timedelta(milliseconds=100), missed_tick_policy=MissedTickPolicy.accumulate) print("Timer started. Will block the loop shortly to cause missed ticks.") # Block the event loop for a short period to miss several ticks await asyncio.sleep(0.5) # This sleep is within the loop, so it doesn't block execution of other tasks # To actually miss ticks, we need to simulate a long-running synchronous operation # For this example, we'll just let it run and observe the drift. # A real scenario would involve a CPU-bound task or blocking I/O. # Receive a few messages to observe the drift and potential accumulation for i in range(5): drift = await timer.receive() print(f"Received message {i+1}: drift={drift}") # If the drift is large, it indicates missed ticks were accumulated. timer.stop() print("Timer stopped.") if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### GroupingLatestValueCache Example Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Demonstrates how to use GroupingLatestValueCache to store and retrieve the latest values from a channel, grouped by a key. Shows initialization, sending data, checking for keys, retrieving values, and deleting entries. ```python from frequenz.channels import Broadcast from frequenz.channels.experimental import GroupingLatestValueCache channel = Broadcast[tuple[int, str]](name="lvc_test") cache = GroupingLatestValueCache(channel.new_receiver(), key=lambda x: x[0]) sender = channel.new_sender() assert cache.get(6) is None assert 6 not in cache await sender.send((6, "twenty-six")) assert 6 in cache assert cache.get(6) == (6, "twenty-six") del cache[6] assert cache.get(6) is None assert 6 not in cache await cache.stop() ``` -------------------------------- ### frequenz.channels.timer.Timer.__init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Initializes a Timer instance. This constructor sets up the timer's interval, missed tick policy, and optional start-up configurations like auto-start, start delay, and initial tick behavior. ```APIDOC ## Timer.__init__ ### Description Initializes this timer. See the [class documentation][frequenz.channels.timer.Timer] for details. ### Method __init__ ### Parameters #### Arguments - **interval** (timedelta) - The time between timer ticks. Must be at least 1 microsecond. - **missed_tick_policy** (MissedTickPolicy) - The policy of the timer when it misses a tick. Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift` or a custom class deriving from `MissedTickPolicy`. See the documentation of the each class for more details. - **auto_start** (bool, optional) - Whether the timer should be started when the instance is created. Defaults to `True`. This can only be `True` if there is already a running loop or an explicit `loop` that is running was passed. - **start_delay** (timedelta, optional) - The delay before the timer should start. Defaults to `timedelta(0)`. If `auto_start` is `False`, an exception is raised. This has microseconds resolution, anything smaller than a microsecond means no delay. - **tick_at_start** (bool, optional) - When `True`, the timer will trigger immediately after starting, and then wait for the interval before triggering again. Defaults to `False`. When `False`, the timer will wait the interval before triggering for the first time. If `auto_start` is `False` and this is `True`, an exception is raised. If a `start_delay` is specified and this is `True`, the first trigger will be immediately after the `start_delay`. - **loop** (asyncio.AbstractEventLoop | None, optional) - The event loop to use to track time. If `None`, `asyncio.get_running_loop()` will be used. ### Raises - **RuntimeError**: If it was called without a loop and there is no running loop. - **ValueError**: If `interval` is not positive or is smaller than 1 microsecond; if `start_delay` is negative or `start_delay` was specified but `auto_start` is `False`. ``` -------------------------------- ### Timer Initialization with Auto-Start Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Initializes a Timer instance. If `auto_start` is True, the timer is reset immediately with the specified `start_delay` and `tick_at_start` behavior. Otherwise, it starts upon the first receiving method call. ```python timer = Timer(auto_start=True, start_delay=timedelta(seconds=5), tick_at_start=True) ``` -------------------------------- ### Pipe __aenter__ Method Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental The __aenter__ method starts the pipe and returns the pipe instance, intended for use within an async context manager. ```python async def __aenter__(self) -> Pipe[ChannelMessageT]: """Enter the runtime context.""" await self.start() return self ``` -------------------------------- ### Timer Initialization Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Initializes a Timer instance with specified interval, missed tick policy, and optional parameters for auto-start, start delay, tick-at-start behavior, and event loop. Use this to configure a timer for scheduled events. ```python def __init__( self, interval: timedelta, missed_tick_policy: MissedTickPolicy, /, *, auto_start: bool = True, start_delay: timedelta = timedelta(0), tick_at_start: bool = False, loop: asyncio.AbstractEventLoop | None = None, ) -> None: """Initialize this timer. See the [class documentation][frequenz.channels.timer.Timer] for details. Args: interval: The time between timer ticks. Must be at least 1 microsecond. missed_tick_policy: The policy of the timer when it misses a tick. Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift` or a custom class deriving from `MissedTickPolicy`. See the documentation of the each class for more details. auto_start: Whether the timer should be started when the instance is created. This can only be `True` if there is already a running loop or an explicit `loop` that is running was passed. start_delay: The delay before the timer should start. If `auto_start` is `False`, an exception is raised. This has microseconds resolution, anything smaller than a microsecond means no delay. tick_at_start: When `True`, the timer will trigger immediately after starting, and then wait for the interval before triggering again. When `False`, the timer will wait the interval before triggering for the first time. If `auto_start` is `False` and this is `True`, an exception is raised. If a `start_delay` is specified and this is `True`, the first trigger will be immediately after the `start_delay`. loop: The event loop to use to track time. If `None`, `asyncio.get_running_loop()` will be used. Raises: RuntimeError: If it was called without a loop and there is no running loop. ValueError: If `interval` is not positive or is smaller than 1 microsecond; if `start_delay` is negative or `start_delay` was specified but `auto_start` is `False`. """ if interval < timedelta(microseconds=1): raise ValueError( ``` -------------------------------- ### Multiple Senders and Receivers with Buffered Anycast Channel Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/channels/anycast Illustrates sending integers from multiple senders to multiple receivers using a buffered Anycast channel with a limit of 2. This example shows how senders can block when the buffer is full. It requires importing Anycast, Receiver, ReceiverStoppedError, and Sender. ```python import asyncio from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender async def send(name: str, sender: Sender[int], start: int, stop: int) -> None: for message in range(start, stop): print(f"{name} sending {message}") await sender.send(message) async def recv(name: str, receiver: Receiver[int]) -> None: try: async for message in receiver: print(f"{name} received {message}") await asyncio.sleep(0.1) # sleep (or work) with the data except ReceiverStoppedError: pass async def main() -> None: acast = Anycast[int](name="numbers", limit=2) async with asyncio.TaskGroup() as task_group: task_group.create_task(send("sender_1", acast.new_sender(), 10, 13)) task_group.create_task(send("sender_2", acast.new_sender(), 20, 22)) task_group.create_task(recv("receiver_1", acast.new_receiver())) task_group.create_task(recv("receiver_2", acast.new_receiver())) asyncio.run(main()) ``` -------------------------------- ### Watch for File Modifications Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/utilities/file-watchers This example demonstrates how to use FileWatcher to monitor a specific file for modifications. It creates the file, then asynchronously updates it, and prints the event details when a modification is detected. The FileWatcher is configured to only watch for the MODIFY event type. ```python import asyncio from frequenz.channels.file_watcher import EventType, FileWatcher PATH = "/tmp/test.txt" file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY]) async def update_file() -> None: await asyncio.sleep(1) with open(PATH, "w", encoding="utf-8") as file: file.write("Hello, world!") async def main() -> None: # Create file with open(PATH, "w", encoding="utf-8") as file: file.write("Hello, world!") async with asyncio.TaskGroup() as group: group.create_task(update_file()) async for event in file_watcher: print(f"File {event.path}: {event.type.name}") break asyncio.run(main()) ``` -------------------------------- ### Event Receiver Example Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/utilities/events Demonstrates using an Event receiver with select() to stop a loop after a certain condition is met. The loop exits after printing the first 5 numbers, triggered by the stop_event. ```python import asyncio from frequenz.channels import Anycast, select, selected_from from frequenz.channels.event import Event channel: Anycast[int] = Anycast(name="channel") receiver = channel.new_receiver() sender = channel.new_sender() stop_event = Event(name="stop") async def do_work() -> None: async for selected in select(receiver, stop_event): if selected_from(selected, receiver): print(selected.message) elif selected_from(selected, stop_event): print("Stop event triggered") stop_event.stop() break async def send_stuff() -> None: for i in range(10): if stop_event.is_stopped: break await asyncio.sleep(1) await sender.send(i) async def main() -> None: async with asyncio.TaskGroup() as task_group: task_group.create_task(do_work(), name="do_work") task_group.create_task(send_stuff(), name="send_stuff") await asyncio.sleep(5.5) stop_event.set() asyncio.run(main()) ``` -------------------------------- ### Build Documentation with MkDocs Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Build the project documentation, which will be written to the 'site/' directory. ```bash mkdocs build ``` -------------------------------- ### Create and Activate Virtual Environment Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/user-guide/installation Set up a new project directory and create a Python virtual environment. Activate the environment to isolate project dependencies. ```bash mkdir my-channels-project cd my-channels-project python3 -m venv .venv . .venv/bin/activate ``` -------------------------------- ### Deploy Multi-Version Documentation with Mike Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Deploy a specific version of the documentation using mike and set it as the default. This is for previewing multi-version documentation locally. ```bash mike deploy my-version mike set-default my-version mike serve ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Initializes a new Frequenz channel with a specified name and buffer limit. ```APIDOC ## `__init__` ### Description Initializes this channel with a given name and an optional buffer limit. ### Parameters - **`name`** (`str`) - Required - The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel. - **`limit`** (`int`) - Optional - The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer. Defaults to `10`. ``` -------------------------------- ### NopReceiver.__init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Initializes a new instance of the NopReceiver. ```APIDOC ## NopReceiver.__init__ ### Description Initializes a new instance of the NopReceiver. ### Method __init__ ### Parameters None ### Response None ``` -------------------------------- ### reset Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Resets the timer to start timing from the current moment, with options for an initial delay and immediate tick. If the timer was stopped or not started, it will begin running. This method requires an active event loop. ```APIDOC ## reset ### Description Reset the timer to start timing from now (plus an optional delay). If the timer was stopped, or not started yet, it will be started. This can only be called with a running loop. ### Method `reset( *, interval: timedelta | None = None, start_delay: timedelta = timedelta(0), tick_at_start: bool = False ) -> None` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **interval** (timedelta | None) - Optional - The new interval between ticks. If `None`, the current interval is kept. - **start_delay** (timedelta) - Optional - The delay before the timer should start. This has microseconds resolution, anything smaller than a microsecond means no delay. Defaults to `timedelta(0)`. - **tick_at_start** (bool) - Optional - When `True`, the timer will trigger immediately after starting, and then wait for the interval before triggering again. When `False`, the timer will wait the interval before triggering for the first time. If a `start_delay` is specified and this is `True`, the first trigger will be immediately after the `start_delay`. Defaults to `False`. ### Request Example ```json { "example": "Not applicable for this method." } ``` ### Response #### Success Response (200) None #### Response Example ```json { "example": "Not applicable for this method." } ``` ### Error Handling - `RuntimeError`: If it was called without a running loop. - `ValueError`: If `start_delay` is negative. ``` -------------------------------- ### Build Source and Binary Distribution Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/CONTRIBUTING Use this command to build the source and binary distribution of the project. ```bash python -m pip install build python -m build ``` -------------------------------- ### Timer Class Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Represents a receiver that sends messages at regular intervals. Timers start automatically by default but can be configured to start manually. They are designed to run within an asyncio event loop and handle timing drift. The timer's resolution is 1 microsecond. ```APIDOC ## Class Timer ### Description A receiver that sends a message regularly. `Timer`s are started by default after they are created. This can be disabled by using `auto_start=False` option when creating them. In this case, the timer will not be started until `reset()` is called. Receiving from the timer (either using `receive()` or using the async iterator interface) will also start the timer at that point. Timers need to be created in a context where a `asyncio` loop is already running. If no `loop` is specified, the current running loop is used. Timers can be stopped by calling `stop()`. A stopped timer will raise a `ReceiverStoppedError` or stop the async iteration as usual. Once a timer is explicitly stopped, it can only be started again by explicitly calling `reset()` (trying to receive from it or using the async iterator interface will keep failing). Timer messages are `timedelta`s containing the drift of the timer, i.e. the difference between when the timer should have triggered and the time when it actually triggered. ### Warning Even when the `asyncio` loop's monotonic clock is a `float`, timers use `int`s to represent time internally. The internal time is tracked in microseconds, so the timer resolution is 1 microsecond (`interval` must be at least 1 microsecond). If the timer is delayed too much, then it will behave according to the `missed_tick_policy`. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Initializes a new BroadcastSender instance, linking it to a broadcast channel. ```APIDOC ## __init__ ### Description Initialize this sender. ### Method Signature `__init__(channel: Broadcast[_T]) -> None` ### Parameters #### Path Parameters - **channel** (Broadcast[_T]) - Required - A reference to the broadcast channel this sender belongs to. ``` -------------------------------- ### Get LatestValueCache String Value Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Returns the last value seen by the cache as a string. ```python def __str__(self) -> str: """Return the last value seen by this cache.""" return str(self._latest_value) ``` -------------------------------- ### Channel Initialization and Properties Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Demonstrates how to initialize a channel with a name and limit, and access its properties like name, closed status, and buffer limit. ```APIDOC ## Channel Class ### Description Represents a channel for communication between coroutines, with a limited buffer size. ### Methods #### `__init__(self, *, name: str, limit: int = 10)` Initializes a channel. * **Args**: * `name` (str): The name of the channel for logging and string representation. * `limit` (int, optional): The size of the internal buffer. Defaults to 10. #### Properties * **`name`** (str): The name of this channel. * **`is_closed`** (bool): Whether this channel is closed. Further operations will raise an exception. * **`limit`** (int): The maximum number of messages that can be stored in the channel's buffer. If the buffer is full, senders will block. ``` -------------------------------- ### get Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Retrieves the latest value that has been received by the cache. Raises ValueError if no value has been received yet. ```APIDOC ## get ### Description Return the latest value that has been received. ### Method GET ### Endpoint N/A (Instance method) ### Parameters None ### Request Example None ### Response #### Success Response (200) - **return value** (T_co) - The latest value that has been received. #### Error Response (400) - **ValueError**: If no value has been received yet or if the cache has been stopped. ``` -------------------------------- ### Get Channel Name Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Retrieves the name of the broadcast channel, used for logging and string representation. ```python @property def name(self) -> str: """The name of this channel. This is for logging purposes, and it will be shown in the string representation of this channel. """ return self._name ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Initializes a new receiver instance, configuring its buffer size, overflow warnings, and connection to a broadcast channel. ```APIDOC ## __init__ ### Description Initialize this receiver. Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver. ### Method `__init__(channel: Broadcast[_T], /, *, name: str | None, limit: int, warn_on_overflow: bool)` ### Parameters - **channel** (`Broadcast[_T]`) - A reference to the Broadcast channel that this receiver belongs to. - **name** (`str | None`) - A name to identify the receiver in the logs. If `None` an `id(self)`-based name will be used. This is only for debugging purposes, it will be shown in the string representation of the receiver. - **limit** (`int`) - Number of messages the receiver can hold in its buffer. - **warn_on_overflow** (`bool`) - Whether to log a warning when the receiver's buffer is full and a message is dropped. ``` -------------------------------- ### __aiter__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Get an async iterator over the received messages. This method returns the receiver itself as it is already an async iterator. ```APIDOC ## __aiter__ ### Description Get an async iterator over the received messages. ### Returns - `Self`: This receiver, as it is already an async iterator. ### Source Code `src/frequenz/channels/_receiver.py` ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Initialize the experimental receiver. It takes an optional underlying receiver. ```APIDOC ## __init__ ### Description Initialize this instance. ### Parameters - `receiver` (Receiver[ReceiverMessageT_co] | None): The underlying receiver, or `None` if there is no receiver. ### Source Code `src/frequenz/channels/experimental/_optional_receiver.py` ``` -------------------------------- ### Event Receiver String Representation Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/event Describes how to get the string representation of the Event receiver for debugging purposes. ```APIDOC ## Event Receiver String Representation ### `__repr__` - **Description**: Return a string representation of this event. ### `__str__` - **Description**: Return a string representation of this event. ``` -------------------------------- ### Timer Delay Tolerance Attribute Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Defines the maximum delay tolerated before a timer starts to drift. This is a timedelta object. ```python delay_tolerance: timedelta ``` -------------------------------- ### Get LatestValueCache Representation Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Returns a string representation of the cache, including its latest value, receiver, and unique ID. ```python def __repr__(self) -> str: """Return a string representation of this cache.""" return ( f"" ) ``` -------------------------------- ### Receive Message from Receiver Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Use `receive()` to get a message from the receiver. This method will raise `ReceiverStoppedError` or `ReceiverError` if there are issues. ```python 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 ``` async def receive(self) -> ReceiverMessageT_co: # noqa: DOC503 """Receive a message. Returns: The received message. Raises: ReceiverStoppedError: If there is some problem with the receiver. ReceiverError: If there is some problem with the receiver. """ try: received = await anext(self) except StopAsyncIteration as exc: # If we already had a cause and it was the receiver was stopped, # then reuse that error, as StopAsyncIteration is just an artifact # introduced by __anext__. if ( isinstance(exc.__cause__, ReceiverStoppedError) and exc.__cause__.receiver is self ): # This is a false positive, we are actually checking __cause__ is a # ReceiverStoppedError which is an exception. raise exc.__cause__ # pylint: disable=raising-non-exception raise ReceiverStoppedError(self) from exc return received ``` ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Initializes a merger with a collection of receivers. It sets up pending tasks to fetch messages from each receiver. ```APIDOC ## __init__ ### Description Initialize this merger. ### Parameters - `*receivers` (Receiver[ReceiverMessageT_co]): The receivers to merge. - `name` (str | None): The name of the receiver. Used to create the string representation of the receiver. ``` -------------------------------- ### Get Latest Value from Cache Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Retrieves the latest received value. Raises ValueError if no value has been received or if the cache is stopped. ```python def get(self) -> T_co: """Return the latest value that has been received. This raises a `ValueError` if no value has been received yet. Use `has_value` to check whether a value has been received yet, before trying to access the value, to avoid the exception. Returns: The latest value that has been received. Raises: ValueError: If no value has been received yet. """ if isinstance(self._latest_value, _Sentinel): raise ValueError("No value has been received yet.") if self._stopped: raise ValueError("Cache has been stopped.") return self._latest_value ``` -------------------------------- ### FileWatcher.__init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/file_watcher Initializes a FileWatcher instance to monitor specified paths for file system events. ```APIDOC ## FileWatcher.__init__ ### Description Initializes this file watcher. ### Method __init__ ### Parameters #### Path Parameters - **paths** (list[Path | str]) - Required - The paths to watch for changes. - **event_types** (Iterable[EventType]) - Optional - The types of events to watch for. Defaults to watch for all event types. - **force_polling** (bool) - Optional - Whether to explicitly force file polling to check for changes. Note that even if set to False, file polling will still be used as a fallback when the underlying file system does not support event-based notifications. Defaults to True. - **polling_interval** (timedelta) - Optional - The interval to poll for changes. Only relevant if polling is enabled. Defaults to timedelta(seconds=1) ### Source Code ```python def __init__( self, paths: list[pathlib.Path | str], event_types: abc.Iterable[EventType] = frozenset(EventType), *, force_polling: bool = True, polling_interval: timedelta = timedelta(seconds=1), ) -> None: """Initialize this file watcher. Args: paths: The paths to watch for changes. event_types: The types of events to watch for. Defaults to watch for all event types. force_polling: Whether to explicitly force file polling to check for changes. Note that even if set to False, file polling will still be used as a fallback when the underlying file system does not support event-based notifications. polling_interval: The interval to poll for changes. Only relevant if polling is enabled. """ self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" self._stop_event: asyncio.Event = asyncio.Event() self._paths: list[pathlib.Path] = [ path if isinstance(path, pathlib.Path) else pathlib.Path(path) for path in paths ] self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch( *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events, force_polling=force_polling, poll_delay_ms=int(polling_interval.total_seconds() * 1_000), ) self._awatch_stopped_exc: Exception | None = None self._changes: set[FileChange] = set() ``` ``` -------------------------------- ### Get Number of Unconsumed Messages Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels Return the number of unconsumed messages currently in the broadcast receiver's internal queue. ```python def __len__(self) -> int: """Return the number of unconsumed messages in the broadcast receiver. Returns: Number of items in the receiver's internal queue. """ return len(self._q) ``` -------------------------------- ### Timer.__init__ Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Initializes a Timer instance with specified interval, missed tick policy, and optional startup configurations. It allows control over auto-starting, initial delay, and immediate triggering. ```APIDOC ## Timer.__init__ ### Description Initializes a Timer instance with specified interval, missed tick policy, and optional startup configurations. It allows control over auto-starting, initial delay, and immediate triggering. ### Method __init__ ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **interval** (timedelta) - Required - The time between timer ticks. Must be at least 1 microsecond. - **missed_tick_policy** (MissedTickPolicy) - Required - The policy of the timer when it misses a tick. Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift` or a custom class deriving from `MissedTickPolicy`. - **auto_start** (bool) - Optional - Whether the timer should be started when the instance is created. Defaults to `True`. - **start_delay** (timedelta) - Optional - The delay before the timer should start. Defaults to `timedelta(0)`. - **tick_at_start** (bool) - Optional - When `True`, the timer will trigger immediately after starting. Defaults to `False`. - **loop** (AbstractEventLoop | None) - Optional - The event loop to use to track time. Defaults to `None`. ### Request Example ```python from datetime import timedelta from frequenz.channels import Timer, MissedTickPolicy timer = Timer( interval=timedelta(seconds=1), missed_tick_policy=MissedTickPolicy.TriggerAllMissed, auto_start=True, start_delay=timedelta(seconds=5), tick_at_start=True, loop=None ) ``` ### Response #### Success Response (None) This method does not return a value. #### Response Example None ### Raises - **RuntimeError**: If it was called without a loop and there is no running loop. - **ValueError**: If `interval` is not positive or is smaller than 1 microsecond; if `start_delay` is negative or `start_delay` was specified but `auto_start` is `False`. ``` -------------------------------- ### Get Set of Keys Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/experimental Returns the set of keys for which values have been received. If no key function is provided, this will return an empty set. ```python keys() -> KeysView[HashableT] ``` ```python @override def keys(self) -> KeysView[HashableT]: """Return the set of keys for which values have been received. If no key function is provided, this will return an empty set. """ return self._latest_value_by_key.keys() ``` -------------------------------- ### Event Receiver Asynchronous Iteration Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/event Explains how to get an async iterator for received messages and how to await the next message in the iteration. ```APIDOC ## Event Receiver Asynchronous Iteration ### `__aiter__` - **Description**: Get an async iterator over the received messages. - **Returns**: `Self` - This receiver, as it is already an async iterator. ### `__anext__` `async` - **Description**: Await the next message in the async iteration over received messages. - **Returns**: `ReceiverMessageT_co` - The next received message. - **Raises**: - `StopAsyncIteration`: If the receiver stopped producing messages. - `ReceiverError`: If there is some problem with the receiver. ``` -------------------------------- ### Timer Initialization with Auto-Start Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels/timer Initializes a Timer instance. If `auto_start` is True, the timer is immediately reset with an optional `start_delay` and `tick_at_start` behavior. ```python if auto_start: self.reset(start_delay=start_delay, tick_at_start=tick_at_start) ``` -------------------------------- ### Send Numbers to a Receiver Source: https://frequenz-floss.github.io/frequenz-channels-python/latest/reference/frequenz/channels A basic example showing how to send integers from a single sender to a single receiver using an Anycast channel. ```python import asyncio from frequenz.channels import Anycast, Sender async def send(sender: Sender[int]) -> None: for message in range(3): print(f"sending {message}") await sender.send(message) async def main() -> None: channel = Anycast[int](name="numbers") sender = channel.new_sender() receiver = channel.new_receiver() async with asyncio.TaskGroup() as task_group: task_group.create_task(send(sender)) for _ in range(3): message = await receiver.receive() print(f"received {message}") await asyncio.sleep(0.1) # sleep (or work) with the data asyncio.run(main()) ```