### Initialize Frequenz SDK and Get Producer Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/producer Shows the complete setup for using the Producer, including initializing the microgrid with a specific resampling period and then obtaining a producer instance. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 await microgrid.initialize( "grpc://127.0.0.1:50051", ResamplerConfig2(resampling_period=timedelta(seconds=1.0)) ) producer = microgrid.producer() ``` -------------------------------- ### Initial Configuration Send and File Watching Setup Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Starts by sending the initial configuration and then sets up a FileWatcher to monitor parent directories of configuration files for creation or modification events. ```python async def _run(self) -> None: """Monitor for and send configuration file updates. At startup, the Config Manager sends the current config so that it can be cache in the Broadcast channel and served to receivers even if there hasn't been any change to the config file itself. """ await self.send_config() parent_paths = {p.parent for p in self._config_paths} # FileWatcher can't watch for non-existing files, so we need to watch for the # parent directories instead just in case a configuration file doesn't exist yet # or it is deleted and recreated again. file_watcher = FileWatcher( paths=list(parent_paths), event_types={EventType.CREATE, EventType.MODIFY}, force_polling=self._force_polling, polling_interval=self._polling_interval, ) try: async for event in file_watcher: if not event.path.exists(): _logger.error( "[%s] Received event %s, but the watched path %s doesn't exist.", self.name, event, event.path, ) continue # Since we are watching the whole parent directories, we need to make # sure we only react to events related to the configuration files we # are interested in. # # pathlib.Path.samefile raises error if any path doesn't exist so we need to # make sure the paths exists before calling it. This could happen as it is not # required that all config files exist, only one is required but we don't know # which. if not any( event.path.samefile(p) for p in self._config_paths if p.exists() ): continue match event.type: case EventType.CREATE: _logger.info( "[%s] The configuration file %s was created, sending new config...", self.name, ``` -------------------------------- ### Initialize Microgrid and Get Grid Instance Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/grid Demonstrates how to initialize the microgrid and retrieve the Grid instance. This setup is required before interacting with grid-related functionalities. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 await microgrid.initialize( "grpc://127.0.0.1:50051", ResamplerConfig2(resampling_period=timedelta(seconds=1)) ) grid = microgrid.grid() ``` -------------------------------- ### Initialize Microgrid and Get Consumer Instance Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/consumer This example demonstrates how to initialize the microgrid connection and obtain a Consumer instance. It sets up the resampling configuration for the timeseries data. Use this pattern to start interacting with consumer metrics. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 await microgrid.initialize( "grpc://127.0.0.1:50051", ResamplerConfig2(resampling_period=timedelta(seconds=1.0)) ) consumer = microgrid.consumer() ``` -------------------------------- ### Install Frequenz SDK Source: https://frequenz-floss.github.io/frequenz-sdk-python Install the Frequenz SDK using pip within an activated virtual environment. Ensure pip is installed first. ```bash python3 -m pip install frequenz-sdk ``` -------------------------------- ### ConfigManager Start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Starts the ConfigManager, including its configuration actor and optionally the logging actor. ```APIDOC ## ConfigManager.start ### Description Start this config manager. ### Method start ### Endpoint N/A (Method Call) ### Parameters None ### Request Example None ### Response None #### Success Response (200) None #### Response Example None ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Starts the MovingWindow, initiating its background tasks for data processing and updates. ```APIDOC ## start ### Description Starts the MovingWindow. This method starts the MovingWindow tasks. ### Method Signature ```python start() -> None ``` ### Parameters This method does not accept any parameters. ### Returns - **None** ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Starts the background service. This is an abstract method and must be implemented by subclasses. ```APIDOC ## start ### Description Start this background service. ### Method start ### Returns - **None**: This method does not return any value. ``` -------------------------------- ### Start ConfigManager Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Starts the ConfigManager's background services, including the configuration actor and optionally the logging actor. ```python @override def start(self) -> None: """Start this config manager.""" self.config_actor.start() if self.logging_actor: self.logging_actor.start() ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Starts the configuration manager and any associated actors. This method initializes the manager and ensures that background services are running. ```APIDOC ## start ### Description Starts this config manager. This method initializes the configuration manager and its associated actors, ensuring all background services are operational. ### Method `start()` ### Parameters None ### Response None ### Raises None ``` -------------------------------- ### Install Project in Editable Mode Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Install the project in editable mode for local development, which also installs all necessary dependencies. ```bash python -m pip install -e . ``` -------------------------------- ### Install MkDocs Dependencies Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Install the necessary dependencies for building documentation with MkDocs, if not already included in the general 'dev' dependencies. ```bash python -m pip install -e .[dev-mkdocs] ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Starts the actor. If the actor is already running, this method does nothing. ```APIDOC ## start ### Description Start this actor. If this actor is already running, this method does nothing. ### Method start ### Returns - **None** ``` -------------------------------- ### Install and Run with Nox Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Install the Nox development dependencies and use Nox to run tests and checks in isolated virtual environments. ```bash python -m pip install .[dev-noxfile] ``` ```bash nox ``` -------------------------------- ### Start Config Manager Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Starts the configuration manager and its logging actor if available. This method should be called to initialize the configuration services. ```python def start(self) -> None: """Start this config manager.""" self.config_actor.start() if self.logging_actor: self.logging_actor.start() ``` -------------------------------- ### MovingWindow.start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Starts the MovingWindow tasks, including the resampler task if configured, and the main task for updating the window. ```APIDOC ## MovingWindow.start ### Description Starts the MovingWindow tasks. This method starts the MovingWindow tasks. ### Method start() ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Start the per-phase and sub formulas. This method initiates the execution of the formula's components. ```APIDOC ## start ### Description Start the per-phase and sub formulas. ### Method Signature `start() -> None` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ``` -------------------------------- ### Start Actor in Background with `start()` Method Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/user-guide/actors Initiate an actor in the background using the `start()` method, which returns immediately. The actor runs until manually stopped or completion. Manual stopping is crucial to avoid resource leaks. ```python from frequenz.sdk.actor import Actor class MyActor(Actor): async def _run(self) -> None: print("Hello World!") actor = MyActor() actor.start() print("The actor is running") await actor.stop() ``` -------------------------------- ### Start MovingWindow Task Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Starts the MovingWindow's background tasks. If a resampler is configured, it will be set up before the main task begins execution. ```python def start(self) -> None: """Start the MovingWindow. This method starts the MovingWindow tasks. """ if self._resampler: self._configure_resampler() self._tasks.add(asyncio.create_task(self._run_impl(), name="update-window")) ``` -------------------------------- ### Install Project with Development Dependencies Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Install the project in editable mode along with all development dependencies like mypy, pylint, and pytest. ```bash python -m pip install -e .[dev] ``` -------------------------------- ### __aenter__ method Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Enters an asynchronous context, starting the background service. ```APIDOC ## __aenter__ `async` method ### Description Enter an async context. Start this background service. ### Returns - `Self`: This background service. ``` -------------------------------- ### Example Cross-Architecture Dockerfile Name Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING An example illustrating the naming convention for a Dockerfile targeting arm64 architecture, Ubuntu 20.04 OS, and Python 3.11. ```text arm64-ubuntu-20.04-python-3.11.Dockerfile ``` -------------------------------- ### Verify SDK Installation Source: https://frequenz-floss.github.io/frequenz-sdk-python Import the Frequenz SDK in the Python interpreter to confirm a successful installation. This checks if the SDK is accessible. ```python $ python3 Python 3.11.4 (main, Jun 7 2023, 10:13:09) [GCC 12.2.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import frequenz.sdk >>> ``` -------------------------------- ### Example config.toml file Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config This TOML file defines the logging configuration, including the root logger level and specific logger configurations. ```toml [logging.root_logger] level = "INFO" [logging.loggers.power_dist] name = "frequenz.sdk.actor.power_distributing" level = "DEBUG" [logging.loggers.chan] name = "frequenz.channels" level = "DEBUG" ``` -------------------------------- ### Serve Documentation with MkDocs Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Serve the project documentation locally for live preview. Changes to source files will update the site automatically if installed with '-e'. ```bash mkdocs serve ``` -------------------------------- ### Get Ringbuffer Bounds for Operations Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Calculates the usable bounds of the ringbuffer for operations like average or min/max, based on start and end timestamps. It excludes the oldest or currently overwritten window. ```python def _get_buffer_bounds( self, start: datetime, end: datetime, ) -> tuple[int, int, int]: """ Get the bounds of the ringbuffer used for further operations. This method uses the given start and end timestamps to calculate the part of the ringbuffer that can be used for further operations, like average or min/max. Here we cut out the oldest window or the window that is currently overwritten in the MovingWindow such that it is not considered in any further operation. Args: start: The start timestamp of the window. end: The end timestamp of the window. Returns: The bounds of the to be used buffer and the window size. Raises: ValueError: If the start timestamp is after the end timestamp. """ window_size = self._timestamp_to_rel_index(end) - self._timestamp_to_rel_index( start ) if window_size <= 0: raise ValueError("Start timestamp must be before end timestamp") if window_size > self._period: raise ValueError( "The window size must be smaller or equal than the period." ) rel_pos = self._get_relative_positions(start, window_size) if window_size > self._moving_window.count_valid(): raise ValueError( "The window size must be smaller than the size of the `MovingWindow`" ) # shifted distance between the next incoming sample and the start of the window dist_to_start = rel_pos.next - rel_pos.start # get the start and end position inside the ringbuffer end_pos = ( self._timestamp_to_rel_index(self._buffer.time_bound_newest) + 1 ) - dist_to_start ``` -------------------------------- ### Application Configuration Management Example Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/user-guide/config Demonstrates how an application can manage its configuration and control child actors based on these settings. It includes starting/stopping actors and reacting to configuration updates. ```python import asyncio import dataclasses import logging import pathlib from collections.abc import Sequence from datetime import timedelta from typing import Sequence, assert_never from frequenz.sdk.actor import Actor from frequenz.sdk.config import ConfigManager, wait_for_first _logger = logging.getLogger(__name__) class MyActor(Actor): def __init__( self, config_manager: ConfigManager, /, *, config_key: str | Sequence[str] ) -> None: super().__init__() self._config_manager = config_manager self._config_key = config_key async def _run(self) -> None: ... @dataclasses.dataclass(frozen=True, kw_only=True) class AppConfig: enable_actor: bool = dataclasses.field( metadata={"metadata": {"description": "Whether to enable the actor"}}, ) class App: def __init__(self, *, config_paths: Sequence[pathlib.Path]): self._config_manager = ConfigManager(config_paths) self._config_receiver = self._config_manager.new_receiver("app", AppConfig) self._actor = MyActor(self._config_manager, config_key="actor") async def _update_config(self, config_update: AppConfig | Exception | None) -> None: match config_update: case AppConfig() as config: _logger.info("New configuration received, updating.") await self._reconfigure(config) case None: _logger.info("Configuration was unset, keeping the old configuration.") case Exception(): _logger.info("New configuration has errors, keeping the old configuration.") case unexpected: assert_never(unexpected) async def _reconfigure(self, config: AppConfig) -> None: if config.enable_actor: self._actor.start() else: await self._actor.stop() async def run(self) -> None: _logger.info("Starting App...") async with self._config_manager: await self._update_config( await wait_for_first(self._config_receiver, receiver_name="app") ) _logger.info("Waiting for configuration updates...") async for config_update in self._config_receiver: await self._reconfigure(config_update) if __name__ == "__main__": asyncio.run(App(config_paths="config.toml").run()) ``` -------------------------------- ### Fetch CHP Power Formula Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/logical_meter Retrieves a Formula for fetching CHP power production. This formula operates in the Passive Sign Convention (PSC) and starts automatically if not already running. Use `new_receiver` to get a receiver for this formula. ```python return self._formula_pool.from_power_formula( channel_key="chp_power", formula_str=connection_manager.get().component_graph.chp_formula(None), ) ``` -------------------------------- ### Build Documentation with MkDocs Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Build the project documentation, which will be written to the 'site/' directory. ```bash mkdocs build ``` -------------------------------- ### Example TOML Configuration File Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/user-guide/config A sample TOML file demonstrating how to structure configuration for an application and its associated actors, including boolean flags and integer settings. ```toml [app] enable_actor = true [actor] some_config = 10 [logging.root_logger] level = "DEBUG" ``` -------------------------------- ### Reset WallClockTimer Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Resets the timer to start timing from the current moment, optionally including alignment. This method also starts the timer if it was previously closed or not yet started. ```python def reset(self) -> None: """Reset the timer to start timing from now (plus an optional alignment). If the timer was closed, or not started yet, it will be started. """ self._closed = False self._update_next_tick_time() self._current_tick_info = None # We assume the clocks will behave similarly after the timer was reset, so we ``` -------------------------------- ### Simple Background Service Example Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Demonstrates how to implement a simple background service that prints the current time at a specified resolution. It shows usage both as an async context manager and with manual start/stop. ```python from typing_extensions import override import datetime import asyncio class Clock(BackgroundService): def __init__(self, resolution_s: float, *, name: str | None = None) -> None: super().__init__(name=name) self._resolution_s = resolution_s @override def start(self) -> None: self._tasks.add(asyncio.create_task(self._tick())) async def _tick(self) -> None: while True: await asyncio.sleep(self._resolution_s) print(datetime.datetime.now()) async def main() -> None: # As an async context manager async with Clock(resolution_s=1): await asyncio.sleep(5) # Manual start/stop (only use if necessary, as cleanup is more complicated) clock = Clock(resolution_s=1) clock.start() await asyncio.sleep(5) await clock.stop() asyncio.run(main()) ``` -------------------------------- ### Simple Background Service Example Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Demonstrates how to create and use a simple background service that prints the current time every second. It shows usage both as an async context manager and with manual start/stop. ```python from typing_extensions import override import datetime import asyncio class Clock(BackgroundService): def __init__(self, resolution_s: float, *, name: str | None = None) -> None: super().__init__(name=name) self._resolution_s = resolution_s @override def start(self) -> None: self._tasks.add(asyncio.create_task(self._tick())) async def _tick(self) -> None: while True: await asyncio.sleep(self._resolution_s) print(datetime.datetime.now()) async def main() -> None: # As an async context manager async with Clock(resolution_s=1): await asyncio.sleep(5) # Manual start/stop (only use if necessary, as cleanup is more complicated) clock = Clock(resolution_s=1) clock.start() await asyncio.sleep(5) await clock.stop() asyncio.run(main()) ``` -------------------------------- ### Formula Start Method Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Starts the formula evaluator and any sub-formulas it depends on. ```python @override def start(self) -> None: """Start the formula evaluator.""" for sub_formula in self._sub_formulas: sub_formula.start() self._evaluator.start() ``` -------------------------------- ### Initialize Microgrid and Get Producer Instance Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/producer This snippet demonstrates how to initialize the microgrid connection and obtain a `Producer` instance. It requires importing necessary modules and setting up a `ResamplerConfig2`. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 await microgrid.initialize( "grpc://127.0.0.1:50051", ResamplerConfig2(resampling_period=timedelta(seconds=1.0)) ) producer = microgrid.producer() ``` -------------------------------- ### MovingWindow Initialization and Usage Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Demonstrates how to initialize a MovingWindow, feed it data, and retrieve data using time-based slicing to calculate the mean. ```APIDOC ## MovingWindow ### Description A data window that moves with the latest datapoints of a data stream. After initialization, the `MovingWindow` can be accessed by an integer index or a timestamp. A sub window can be accessed by using a slice of integers or timestamps. The window uses a ring buffer for storage and the first element is aligned to a fixed defined point in time. Modulo arithmetic is used to move the `align_to` timestamp into the latest window. Resampling can be configured via the `resampler` parameter. ### Example: Calculate the mean of a time interval ```python from datetime import datetime, timedelta, timezone import asyncio # Assuming Sample, Sender, Broadcast, and MovingWindow are imported # from frequenz.sdk.timeseries import Sample, Sender, Broadcast, MovingWindow async def send_mock_data(sender: Sender[Sample]) -> None: while True: await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0)) await asyncio.sleep(1.0) async def run() -> None: # Placeholder for actual channel and sender/receiver setup # resampled_data_channel = Broadcast[Sample](name="sample-data") # resampled_data_receiver = resampled_data_channel.new_receiver() # resampled_data_sender = resampled_data_channel.new_sender() # Mock sender and receiver for demonstration class MockSender: async def send(self, data): pass # Simulate sending data class MockReceiver: async def receive(self): # Simulate receiving data, returning None to stop for example await asyncio.sleep(0.1) # Small delay to prevent busy-waiting return None # Or return Sample data if needed for testing resampled_data_receiver = MockReceiver() resampled_data_sender = MockSender() send_task = asyncio.create_task(send_mock_data(resampled_data_sender)) async with MovingWindow( size=timedelta(seconds=5), resampled_data_recv=resampled_data_receiver, input_sampling_period=timedelta(seconds=1), ) as window: time_start = datetime.now(tz=timezone.utc) time_end = time_start + timedelta(seconds=5) # Wait for 5 seconds until the buffer is filled await asyncio.sleep(5.1) # Add a small buffer for async operations # Return a numpy array from the window # Note: Accessing window directly might require the window to be filled # The example assumes data has been sent and processed. # For a real scenario, you might need to ensure data is available. try: # This slicing might return an empty array if no data is present # or if the time range doesn't match the data in the window. array = window[time_start:time_end] if array.size > 0: mean = array.mean() print(f"Mean of the window: {mean}") else: print("Window is empty or time range invalid for current data.") except Exception as e: print(f"An error occurred: {e}") # asyncio.run(run()) ``` ### Parameters - **size** (timedelta | int) - Required - The size of the window. Can be a timedelta or an integer representing the number of samples. - **resampled_data_recv** (Receiver[Sample]) - Required - A receiver for the data samples to be stored in the window. - **input_sampling_period** (timedelta) - Required - The period of the input data samples. - **align_to** (datetime) - Optional - The timestamp to align the window to. Defaults to `datetime(1, 1, 1, tzinfo=timezone.utc)`. - **resampler** (ResamplerConfig | None) - Optional - Configuration for resampling. If `None`, no resampling is performed. ### Response - Returns a numpy ndarray when accessed via index or slice. ### Example: Create a polars data frame from a `MovingWindow` ```python from datetime import datetime, timedelta, timezone import asyncio # Assuming Sample, Sender, Broadcast, and MovingWindow are imported # from frequenz.sdk.timeseries import Sample, Sender, Broadcast, MovingWindow async def send_mock_data(sender: Sender[Sample]) -> None: while True: await sender.send(Sample(datetime.now(tz=timezone.utc), 10.0)) await asyncio.sleep(1.0) async def run() -> None: # Placeholder for actual channel and sender/receiver setup # resampled_data_channel = Broadcast[Sample](name="sample-data") # resampled_data_receiver = resampled_data_channel.new_receiver() # resampled_data_sender = resampled_data_channel.new_sender() # Mock sender and receiver for demonstration class MockSender: async def send(self, data): pass # Simulate sending data class MockReceiver: async def receive(self): await asyncio.sleep(0.1) return None resampled_data_receiver = MockReceiver() resampled_data_sender = MockSender() send_task = asyncio.create_task(send_mock_data(resampled_data_sender)) # Create a window that stores two days of data async with MovingWindow( size=timedelta(days=2), resampled_data_recv=resampled_data_receiver, input_sampling_period=timedelta(seconds=1), ) as window: # Wait for one full day until the buffer is filled await asyncio.sleep(60*60*24 + 1) # Add buffer for async time_start = datetime(2023, 1, 1, tzinfo=timezone.utc) time_end = datetime(2023, 1, 2, tzinfo=timezone.utc) # Accessing window data to create a polars DataFrame would typically involve # converting the numpy array obtained from slicing into a DataFrame. # Example: # import polars as pl # array = window[time_start:time_end] # df = pl.DataFrame({"timestamp": window.timestamps, "value": array}) # print(df) print("MovingWindow setup complete. Data can now be accessed or converted.") # asyncio.run(run()) ``` ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/config Initializes the configuration manager. It takes paths to TOML configuration files, an output sender, and optional parameters for naming, polling, and polling intervals. ```APIDOC ## __init__ ### Description Initializes the configuration manager. It takes paths to TOML configuration files, an output sender, and optional parameters for naming, polling, and polling intervals. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters * **config_paths** (str | Path | Sequence[Path | str]) - Required - The paths to the TOML files with the configuration. Order matters, as the configuration will be read and updated in the order of the paths, so the last path will override the configuration set by the previous paths. Dict keys will be merged recursively, but other objects (like lists) will be replaced by the value in the last path. * **output** (Sender[Mapping[str, Any]]) - Required - The sender to send the configuration to. * **name** (str | None) - Optional - The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. Defaults to `None`. * **force_polling** (bool) - Optional - Whether to force file polling to check for changes. Defaults to `True`. * **polling_interval** (timedelta) - Optional - The interval to poll for changes. Only relevant if polling is enabled. Defaults to `timedelta(seconds=1)`. ### Raises * **ValueError** - If no configuration path is provided. ### Request Example None ### Response #### Success Response (200) None #### Response Example None ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/logical_meter Creates a `LogicalMeter` instance. Note: `LogicalMeter` instances are not meant to be created directly by users. Use the `microgrid.logical_meter` method for creating `LogicalMeter` instances. ```APIDOC ## __init__ ### Description Create a `LogicalMeter` instance. !!! note `LogicalMeter` instances are not meant to be created directly by users. Use the [`microgrid.logical_meter`][frequenz.sdk.microgrid.logical_meter] method for creating `LogicalMeter` instances. ### Parameters - `channel_registry` (ChannelRegistry) - A channel registry instance shared with the resampling actor. - `resampler_subscription_sender` (Sender[ComponentMetricRequest]) - A sender for sending metric requests to the resampling actor. ``` -------------------------------- ### Start Formula3Phase and Sub-formulas Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Starts the per-phase formulas, any sub-formulas, and the evaluator. This method should be called before receiving data. ```python @override def start(self) -> None: """Start the per-phase and sub formulas.""" for sub_formula in self._sub_formulas: sub_formula.start() self._formula_p1.start() self._formula_p2.start() self._formula_p3.start() self._evaluator.start() ``` -------------------------------- ### Initialize Microgrid and Fetch Power Data Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/logical_meter Initializes the microgrid connection and fetches power data using a logical meter. This example demonstrates setting up the connection, defining a formula to sum power from two components, and then iterating over the results to print the power value. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 from frequenz.client.microgrid.metrics import Metric await microgrid.initialize( "grpc://microgrid.sandbox.api.frequenz.io:62060", ResamplerConfig2(resampling_period=timedelta(seconds=1)), ) logical_meter = ( microgrid.logical_meter() .start_formula("#1001 + #1002", Metric.AC_ACTIVE_POWER) .new_receiver() ) async for power in logical_meter: print(power.value) ``` -------------------------------- ### reset Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Resets the timer to start timing from the current moment. If the timer was closed or not yet started, this action will initiate it. ```APIDOC ## reset ### Description Resets the timer to start timing from the current moment. If the timer was closed or not yet started, this action will initiate it. ### Method `def reset() -> None` ### Parameters None ### Returns None ``` -------------------------------- ### Initialize ConfigManager with Multiple Files Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/user-guide/config Instantiate the ConfigManager by providing a list of TOML configuration file paths. The order matters, as later files override earlier ones. This is useful for setting base configurations and then applying specific overrides. ```python from frequenz.sdk.config import ConfigManager async with ConfigManager(["base-config.toml", "overrides.toml"]) as config_manager: ... ``` -------------------------------- ### __post_init__ Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Initializes and validates the configuration values for the Wall Clock Timer. ```APIDOC ## __post_init__ ## ### Description Check that config values are valid. ### Raises - **ValueError**: If any value is out of range. ``` -------------------------------- ### start Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Starts the formula evaluator and any associated sub-formulas. This method initiates the evaluation process and begins processing data. ```APIDOC ## start ### Description Start the formula evaluator. This method initiates the execution of the formula and any dependent sub-formulas. ### Method `start() -> None` ``` -------------------------------- ### Create and Activate Virtual Environment Source: https://frequenz-floss.github.io/frequenz-sdk-python Set up a new virtual environment for your project and activate it. This isolates project dependencies. ```bash mkdir my-sdk-project cd my-sdk-project python3 -m venv .venv . .venv/bin/activate ``` -------------------------------- ### Run the application script Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/tutorials/getting_started Execute your Frequenz SDK application from the command line. Ensure the script name matches your Python file. ```bash # Example usage python pv_optimization.py ``` -------------------------------- ### Serve Documentation with Mike Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Serve the documentation from the local 'gh-pages' branch using Mike. Note that live updates are not supported like with 'mkdocs serve'. ```bash mike serve ``` -------------------------------- ### Install Pytest Dependencies Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Install runtime and test dependencies to run pytest manually. This is also included in the general 'dev' dependencies. ```bash python -m pip install .[dev-pytest] ``` -------------------------------- ### MovingWindow Initialization Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries Initializes the MovingWindow, setting up the underlying ring buffer and starting a task to update it with new samples. The task continues until the channel receiver is closed. ```APIDOC ## MovingWindow.__init__ ### Description Initializes the MovingWindow. This method creates the underlying ring buffer and starts a new task that updates the ring buffer with new incoming samples. The task stops running only if the channel receiver is closed. ### Parameters #### Args - **size** (timedelta) - The time span of the moving window over which samples will be stored. - **resampled_data_recv** (Receiver[Sample[Quantity]]) - A receiver that delivers samples with a given sampling period. - **input_sampling_period** (timedelta) - The time interval between consecutive input samples. - **resampler_config** (ResamplerConfig | None) - The resampler configuration in case resampling is required. - **align_to** (datetime) - A timestamp that defines a point in time to which the window is aligned to modulo window size. - **name** (str | None) - The name of this moving window. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. ``` -------------------------------- ### Run and Await Actors Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Use this function to start and await the completion of one or more actors. It logs the starting process and handles exceptions or cancellations during execution. ```python async def run(*actors: Actor) -> None: """Await the completion of all actors. !!! info Please read the [`actor` module documentation][frequenz.sdk.actor] for more comprehensive guide on how to use and implement actors properly. Args: *actors: the actors to be awaited. """ _logger.info("Starting %s actor(s)...", len(actors)) for actor in actors: if actor.is_running: _logger.info("Actor %s: Already running, skipping start.", actor) else: _logger.info("Actor %s: Starting...", actor) actor.start() # Wait until all actors are done pending_tasks = {asyncio.create_task(a.wait(), name=str(a)) for a in actors} while pending_tasks: done_tasks, pending_tasks = await asyncio.wait( pending_tasks, return_when=asyncio.FIRST_COMPLETED ) # This should always be only one task, but we handle many for extra safety for task in done_tasks: # BackgroundService returns a BaseExceptionGroup containing multiple # exceptions. The 'task.result()' statement raises these exceptions, # and 'except*' is used to handle them as a group. If the task raises # multiple different exceptions, 'except*' will be invoked multiple times, # once for each exception group. try: task.result() except* asyncio.CancelledError: _logger.info("Actor %s: Cancelled while running.", task.get_name()) except* Exception: # pylint: disable=broad-exception-caught _logger.exception( "Actor %s: Raised an exception while running.", task.get_name(), ) else: _logger.info("Actor %s: Finished normally.", task.get_name()) _logger.info("All %s actor(s) finished.", len(actors)) ``` -------------------------------- ### Abstract Start Method for Background Service Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Defines the abstract method that must be implemented by subclasses to start the background service. This method is intended to be overridden. ```python @abc.abstractmethod def start(self) -> None: """Start this background service.""" ``` -------------------------------- ### Async context management for BackgroundService Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Provides asynchronous context management for starting and stopping the background service. `__aenter__` starts the service, and `__aexit__` stops it. ```python async def __aenter__(self) -> Self: """Enter an async context. Start this background service. Returns: This background service. """ self.start() return self ``` ```python async def __aexit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> None: """Exit an async context. Stop this background service. Args: exc_type: The type of the exception raised, if any. exc_val: The exception raised, if any. exc_tb: The traceback of the exception raised, if any. """ await self.stop() ``` -------------------------------- ### Initialize Microgrid and Get Consumer Power Receiver Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/consumer Demonstrates how to initialize the microgrid with a specific resampling configuration and then obtain a receiver for consumer power data. This is used to fetch power values from different points in the microgrid. ```python from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 await microgrid.initialize( "grpc://127.0.0.1:50051", ResamplerConfig2(resampling_period=timedelta(seconds=1.0)) ) consumer = microgrid.consumer() # Get a receiver for a builtin formula consumer_power_recv = consumer.power.new_receiver() async for consumer_power_sample in consumer_power_recv: print(consumer_power_sample) ``` -------------------------------- ### Deploy Documentation to Fork with Mike Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Deploy documentation to your fork's GitHub Pages using Mike with the '--push' option and specifying your fork's remote name. ```bash mike deploy --push --remote your-fork-remote ``` -------------------------------- ### Window Closure Attribute Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Indicates which side of the resampling window is closed. Defaults to `WindowSide.RIGHT`, meaning the window is (start, end]. If `WindowSide.LEFT`, the window is [start, end). ```python closed: WindowSide = RIGHT ``` -------------------------------- ### Start Actor Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Starts an actor. If the actor is already running, this method has no effect. It resets the cancelled state and creates a new task for the actor's run loop. ```python def start(self) -> None: """Start this actor. If this actor is already running, this method does nothing. """ if self.is_running: return self._is_cancelled = False self._tasks.clear() self._tasks.add(asyncio.create_task(self._run_loop())) ``` -------------------------------- ### Get Producer Power Receiver Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/producer Demonstrates how to get a receiver for the producer's power formula and iterate over the received samples. This is useful for monitoring the gross production of active sites. ```python producer_power_recv = producer.power.new_receiver() async for producer_power_sample in producer_power_recv: print(producer_power_sample) ``` -------------------------------- ### Full Frequenz SDK Application Example Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/tutorials/getting_started This is the complete code for a basic Frequenz SDK application, including imports, initialization, and data processing logic. ```python import asyncio from datetime import timedelta from frequenz.sdk import microgrid from frequenz.sdk.timeseries import ResamplerConfig2 async def run() -> None: # This points to the default Frequenz microgrid sandbox server_url = "grpc://microgrid.sandbox.api.frequenz.io:62060" # Initialize the microgrid await microgrid.initialize( server_url, ResamplerConfig2(resampling_period=timedelta(seconds=1)), ) # Define your application logic here grid_meter = microgrid.grid().power.new_receiver() async for power in grid_meter: print(power.value) def main() -> None: asyncio.run(run()) if __name__ == "__main__": main() ``` -------------------------------- ### cancel Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Cancels the actor. A cancelled actor cannot be started again. ```APIDOC ## cancel ### Description Cancel actor. Cancelled actor can't be started again. ### Method cancel ### Parameters #### Path Parameters - **msg** (str | None) - Optional - The message to be passed to the tasks being cancelled. Defaults to None. ``` -------------------------------- ### Start Formula Execution Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/logical_meter Starts execution of a given formula. Formulas support component IDs (prefixed with '#'), constant values, and operators (+, -, *, /). Supported functions include COALESCE, MAX, and MIN. The metric parameter is used for fetching receivers from the resampling actor. ```python def start_formula( self, formula: str, metric: Metric, ) -> Formula[Quantity]: """Start execution of the given formula. Formulas can have Component IDs that are preceded by a pound symbol(`#`), constant values and these operators: `+`, `-`, `*`, `/`, `(`, `)`. These functions are also supported: `COALESCE`, `MAX`, `MIN`. For example, the input string: `#20 + #5` is a formula for adding metrics from two components with ids 20 and 5. A more detailed description of the formula syntax with examples can be found [here](https://github.com/frequenz-floss/frequenz-microgrid-formula-engine-rs?tab=readme-ov-file#formula-syntax-overview). Args: formula: formula to execute. metric: The metric to use when fetching receivers from the resampling actor. Returns: A Formula that applies the formula and streams values. """ return self._formula_pool.from_string(formula, metric) ``` -------------------------------- ### __init__ Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/actor Initializes a new Actor instance. ```APIDOC ## __init__ ### Parameters - **name** (`str | None`, optional, defaults to `None`): The name of this background service. ``` -------------------------------- ### power_distribution_results Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/battery_pool Get a receiver to receive power distribution results. ```APIDOC ## power_distribution_results `property` ### Description Get a receiver to receive power distribution results. ### Returns - **ReceiverFetcher[Result]**: A receiver that will stream power distribution results for the pool's set of components. ``` -------------------------------- ### frequenz.sdk.microgrid.connection_manager.initialize Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/microgrid/connection_manager Initializes the MicrogridApi. This asynchronous function should be called only once to establish the connection to the microgrid API server. ```APIDOC ## initialize `async` ### Description Initialize the MicrogridApi. This function should be called only once. ### Parameters #### Path Parameters - **server_url** (str) - Required - The location of the microgrid API server in the form of a URL. The following format is expected: `grpc://hostname{:port}{?ssl=ssl}`, where the port should be an int between `0` and `65535` (defaulting to `9090`) and ssl should be a boolean (defaulting to false). For example: `grpc://localhost:1090?ssl=true`. ``` -------------------------------- ### Build Project Distribution Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/CONTRIBUTING Use the 'build' package to create source and binary distributions of the project. ```bash python -m pip install build python -m build ``` -------------------------------- ### Get Background Service Name Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Returns the name of this background service. ```python name: str ``` -------------------------------- ### Get Grid Measuring Point Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/microgrid Retrieves the grid measuring point. ```python def grid() -> Grid: """Return the grid measuring point.""" return _get().grid() ``` -------------------------------- ### Initialize a Formula Instance Source: https://frequenz-floss.github.io/frequenz-sdk-python/latest/reference/frequenz/sdk/timeseries/formulas Use this to create a Formula instance. It requires a name, root AST node, and a creation method for output values. Optional sub-formulas and a metric fetcher can also be provided. ```python def __init__( # pylint: disable=too-many-arguments self, *, name: str, root: AstNode[QuantityT], create_method: Callable[[float], QuantityT], sub_formulas: list[Formula[QuantityT]] | None = None, metric_fetcher: ResampledStreamFetcher | None = None, ) -> None: """Create a `Formula` instance. Args: name: The name of the formula. root: The root node of the formula AST. create_method: A method to generate the output values with. If the formula is for generating power values, this would be `Power.from_watts`, for example. sub_formulas: Any sub-formulas that this formula depends on. metric_fetcher: An optional metric fetcher that needs to be started before the formula can be evaluated. """ BackgroundService.__init__(self) self._name: str = name self._root: AstNode[QuantityT] = root self._create_method: Callable[[float], QuantityT] = create_method self._sub_formulas: list[Formula[QuantityT]] = sub_formulas or [] self._channel: Broadcast[Sample[QuantityT]] = Broadcast( name=f"{self}", resend_latest=True, ) self._evaluator: FormulaEvaluatingActor[QuantityT] = FormulaEvaluatingActor( root=self._root, output_channel=self._channel, metric_fetcher=metric_fetcher, ) ```