### Install launchdarkly-eventsource (Sync) Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md Install the library for synchronous use with urllib3. This is the base installation. ```bash pip install launchdarkly-eventsource ``` -------------------------------- ### AsyncSSEClient start() Method Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Shows how to manually start the SSE stream connection using the start() method and handle potential connection errors like HTTPStatusError. ```python client = AsyncSSEClient("https://events.example.com/stream") try: await client.start() print("Connected") except HTTPStatusError as e: print(f"HTTP {e.status}: {e.headers}") ``` -------------------------------- ### AsyncSSEClient Constructor Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Demonstrates creating an AsyncSSEClient instance for simple HTTP connections and advanced configurations with custom headers and retry strategies. Ensure the 'async' extra is installed. ```python import asyncio import logging from ld_eventsource import AsyncSSEClient from ld_eventsource.config import AsyncConnectStrategy, ErrorStrategy async def main(): # Simple HTTP connection async with AsyncSSEClient("https://events.example.com/stream") as client: async for event in client.events: print(f"Event: {event.data}") # Advanced configuration async with AsyncSSEClient( connect=AsyncConnectStrategy.http( url="https://events.example.com/stream", headers={"Authorization": "Bearer token123"}, aiohttp_request_options={"timeout": aiohttp.ClientTimeout(total=None, sock_read=30)} ), initial_retry_delay=2, error_strategy=ErrorStrategy.continue_with_max_attempts(5), logger=logging.getLogger("my_app.sse") ) as client: async for event in client.events: print(event.data) asyncio.run(main()) ``` -------------------------------- ### Install Async Support Source: https://github.com/launchdarkly/python-eventsource/blob/main/README.md Install the package with optional 'async' extra to enable asynchronous client support. ```bash pip install launchdarkly-eventsource[async] ``` -------------------------------- ### Complete SSEClient Configuration Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md A comprehensive example demonstrating how to configure and use the synchronous SSEClient with custom connection, retry, error, and logging settings. ```python from ld_eventsource import SSEClient from ld_eventsource.config import ( ConnectStrategy, ErrorStrategy, RetryDelayStrategy ) import logging logger = logging.getLogger("my_app.sse") client = SSEClient( # Connection connect=ConnectStrategy.http( url="https://api.example.com/events", headers={"Authorization": "Bearer token"}, urllib3_request_options={"timeout": 30}, query_params=lambda: {"session": get_session_id()} ), # Retry on failure initial_retry_delay=1, retry_delay_strategy=RetryDelayStrategy.default( max_delay=120, backoff_multiplier=1.5, jitter_multiplier=0.5 ), # Error handling error_strategy=ErrorStrategy.continue_with_max_attempts(5), retry_delay_reset_threshold=60, # Resume from saved position last_event_id=load_saved_id(), # Logging logger=logger ) # Use it for event in client.events: process(event) save_id(client.last_event_id) ``` -------------------------------- ### Install Runtime and Test Requirements Source: https://github.com/launchdarkly/python-eventsource/blob/main/CONTRIBUTING.md Installs the necessary dependencies for running and testing the project using Poetry. ```shell poetry install eval $(poetry env activate) ``` -------------------------------- ### start() Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Attempts to start the SSE stream. Handles connection retries based on the configured error strategy. ```APIDOC ## start() ```python def start(self) -> None ``` Attempts to start the stream if it is not already active. If the connection fails and the error strategy allows, this method will sleep (based on the retry delay) and retry until successful or the strategy indicates failure. **Behavior:** - If the stream is already active, returns immediately - If a retry delay is configured, sleeps before attempting connection - On first connection failure, raises an exception unless the error strategy continues - On subsequent failures, behavior depends on the configured error strategy **Raises:** | Exception | Condition | |-----------|-----------| | `HTTPStatusError` | HTTP response status is 4xx, 5xx, or 204 (unless error_strategy continues) | | `HTTPContentTypeError` | HTTP response content type is not `text/event-stream` (unless error_strategy continues) | | `Exception` | Network or connection error (unless error_strategy continues) | **Example:** ```python client = SSEClient("https://events.example.com/stream") try: client.start() print("Connected successfully") except HTTPStatusError as e: print(f"HTTP error {e.status}: {e.headers}") except Exception as e: print(f"Connection failed: {e}") ``` ``` -------------------------------- ### Install Optional Async Dependencies Source: https://github.com/launchdarkly/python-eventsource/blob/main/CONTRIBUTING.md Installs additional dependencies required for using the asynchronous features of the project. ```shell poetry install --extras async ``` -------------------------------- ### Start Constructor Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Indicates a successful connection to an SSE stream. A Start action is emitted after the first successful connection and after each reconnection. ```python Start(headers: Optional[Headers] = None) ``` -------------------------------- ### start() Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Attempts to start the Server-Sent Events stream if it is not already active. It handles retries based on configured strategies. ```APIDOC ## start() ### Description Attempts to start the stream if it is not already active. Asynchronously sleeps and retries based on the configured strategies. ### Method `async def start(self) -> None` ### Behavior - If the stream is already active, returns immediately - Sleeps for the retry delay if needed - Retries based on the error strategy on failure - Does not return a value; check `last_event_id` to verify connection success ### Raises - `HTTPStatusError`: If HTTP response status is 4xx, 5xx, or 204 (unless error_strategy continues) - `HTTPContentTypeError`: If HTTP response content type is not `text/event-stream` - `Exception`: For network or connection errors ### Example ```python client = AsyncSSEClient("https://events.example.com/stream") try: await client.start() print("Connected") except HTTPStatusError as e: print(f"HTTP {e.status}: {e.headers}") ``` ``` -------------------------------- ### Starting SSEClient and Handling Connection Errors Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Shows how to start the SSEClient and handle potential connection errors like HTTP status errors or network issues using a try-except block. ```python client = SSEClient("https://events.example.com/stream") try: client.start() print("Connected successfully") except HTTPStatusError as e: print(f"HTTP error {e.status}: {e.headers}") except Exception as e: print(f"Connection failed: {e}") ``` -------------------------------- ### Start Event Properties Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md Lists the properties of a Start event, primarily the HTTP headers received upon connection establishment. ```python start.headers # HTTP headers (Optional[Headers]) ``` -------------------------------- ### Basic HTTP Connection Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Demonstrates a basic HTTP connection to an SSE stream using the default ConnectStrategy.http() method. ```python from ld_eventsource import SSEClient from ld_eventsource.config import ConnectStrategy # Basic HTTP connection client = SSEClient(ConnectStrategy.http("https://api.example.com/events")) ``` -------------------------------- ### SSEClient Constructor and Basic Usage Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Demonstrates creating an SSEClient with default settings and starting the stream. Also shows custom HTTP options and reading events. ```python import logging from ld_eventsource import SSEClient # Simple HTTP connection with default settings client = SSEClient("https://my-server.com/events") client.start() # Custom HTTP options from ld_eventsource.config import ConnectStrategy, ErrorStrategy client = SSEClient( connect=ConnectStrategy.http( url="https://my-server.com/events", headers={"Authorization": "Bearer token123"}, urllib3_request_options={"timeout": 30} ), initial_retry_delay=2, error_strategy=ErrorStrategy.continue_with_max_attempts(max_attempts=5), logger=logging.getLogger("my_app.sse") ) client.start() # Read events for event in client.events: print(f"Event: {event.event}, Data: {event.data}") ``` -------------------------------- ### Handling Start and Event Actions Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Connect to an SSE stream and process 'Start' and 'Event' actions. This snippet demonstrates how to check for connection status and extract event data. ```python from ld_eventsource import SSEClient from ld_eventsource.actions import Start, Event client = SSEClient("https://events.example.com/stream") for action in client.all: if isinstance(action, Start): print("Connected!") if action.headers: rate_limit = action.headers.get('X-RateLimit-Remaining') if rate_limit: print(f"Rate limit remaining: {rate_limit}") elif isinstance(action, Event): print(f"Event: {action.data}") ``` -------------------------------- ### SSEClient Constructor Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Creates an SSEClient instance. The client connects when `start()` is called or when events are read. ```APIDOC ## SSEClient Constructor ### SSEClient() Creates a client instance in an inactive state. The client does not attempt to connect until you call `start()` or read from the `events` or `all` properties. **Parameters:** | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | connect | `Union[str, ConnectStrategy]` | Yes | — | Either a URL string for HTTP connections, or a `ConnectStrategy` instance for custom connection handling | | initial_retry_delay | `float` | No | `1` | Initial delay before reconnecting after a failure, in seconds | | retry_delay_strategy | `Optional[RetryDelayStrategy]` | No | `None` | Custom retry delay strategy; uses `RetryDelayStrategy.default()` if not specified | | retry_delay_reset_threshold | `float` | No | `60` | Minimum connection duration (seconds) before the retry delay strategy resets to initial state | | error_strategy | `Optional[ErrorStrategy]` | No | `None` | Custom error handling strategy; uses `ErrorStrategy.always_fail()` if not specified | | last_event_id | `Optional[str]` | No | `None` | Initial value for `Last-Event-Id` header; sent to server to support resuming interrupted streams | | logger | `Optional[logging.Logger]` | No | `None` | Logger instance for debug output; a no-op logger is created if not specified | **Example:** ```python import logging from ld_eventsource import SSEClient # Simple HTTP connection with default settings client = SSEClient("https://my-server.com/events") client.start() # Custom HTTP options from ld_eventsource.config import ConnectStrategy, ErrorStrategy client = SSEClient( connect=ConnectStrategy.http( url="https://my-server.com/events", headers={"Authorization": "Bearer token123"}, urllib3_request_options={"timeout": 30} ), initial_retry_delay=2, error_strategy=ErrorStrategy.continue_with_max_attempts(max_attempts=5), logger=logging.getLogger("my_app.sse") ) client.start() # Read events for event in client.events: print(f"Event: {event.event}, Data: {event.data}") ``` ``` -------------------------------- ### Start Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Indicates that SSEClient has successfully connected to a stream. A Start action is emitted after the first successful connection, and again after each reconnection following a failure. ```APIDOC ## Start ### Description Indicates that `SSEClient` has successfully connected to a stream. A `Start` action is only available from the `all` property. It is emitted after the first successful connection, and again after each reconnection following a failure. ### Constructor ```python Start(headers: Optional[Headers] = None) ``` ### Parameters #### Path Parameters * None #### Query Parameters * None #### Request Body * None ### Properties #### headers - **headers** (Optional[Headers]) - The HTTP response headers from the stream connection, if available. Header name lookups are case-insensitive per RFC 7230. Returns `None` for non-HTTP connections or if headers are not available. ### Example *No example provided in source.* ``` -------------------------------- ### Dynamic Query Parameters Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Example usage of the DynamicQueryParams type alias, demonstrating how to create a strategy with a callable that adds a timestamp to the query parameters. ```python import time def add_timestamp(): return {"ts": str(int(time.time()))} strategy = ConnectStrategy.http( url="https://api.example.com/events", query_params=add_timestamp ) ``` -------------------------------- ### Comment Usage Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Shows how to check if an action received from the 'all' property is a Comment and print its content. ```python from ld_eventsource import SSEClient from ld_eventsource.actions import Comment client = SSEClient("https://events.example.com/stream") for action in client.all: if isinstance(action, Comment): print(f"Comment: {action.comment}") ``` -------------------------------- ### Minimal Synchronous SSE Client Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md Demonstrates the basic usage of SSEClient to connect to an event stream and print received events. This is suitable for synchronous applications. ```python from ld_eventsource import SSEClient client = SSEClient("https://api.example.com/events") for event in client.events: print(f"{event.event}: {event.data}") ``` -------------------------------- ### Minimal Asynchronous SSE Client Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md Shows how to use AsyncSSEClient for asynchronous applications with asyncio. It connects to an event stream and processes events using async for. ```python import asyncio from ld_eventsource import AsyncSSEClient async def main(): async with AsyncSSEClient("https://api.example.com/events") as client: async for event in client.events: print(f"{event.event}: {event.data}") asyncio.run(main()) ``` -------------------------------- ### Immutable Strategy Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/EXTENSIBILITY.md Demonstrates the best practice of keeping custom strategies immutable. New instances of the strategy should be returned with updated state instead of modifying the current instance. ```python # Good: Create new instance class MyStrategy(ErrorStrategy): def __init__(self, count=0): self.count = count def apply(self, exception): # Return NEW strategy with updated state next_strategy = MyStrategy(count=self.count + 1) return (ErrorStrategy.CONTINUE, next_strategy) # Avoid: Modifying self class BadStrategy(ErrorStrategy): def __init__(self): self.count = 0 def apply(self, exception): self.count += 1 # Don't do this! return (ErrorStrategy.CONTINUE, self) ``` -------------------------------- ### Event Usage Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Demonstrates how to iterate through events from an SSEClient and access their properties like event type, data, and IDs. ```python from ld_eventsource import SSEClient client = SSEClient("https://events.example.com/stream") for event in client.events: print(f"Type: {event.event}") print(f"Data: {event.data}") if event.id: print(f"Event ID: {event.id}") print(f"Last ID in stream: {event.last_event_id}") print("---") ``` -------------------------------- ### AsyncSSEClient Constructor Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Creates an async client instance for Server-Sent Events. The client does not connect until explicitly started or events are read. ```APIDOC ## AsyncSSEClient() ### Description Creates an async client instance in an inactive state. The client does not attempt to connect until you call `await start()` or read from the `events` or `all` properties. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python AsyncSSEClient( connect: Union[str, AsyncConnectStrategy], initial_retry_delay: float = 1, retry_delay_strategy: Optional[RetryDelayStrategy] = None, retry_delay_reset_threshold: float = 60, error_strategy: Optional[ErrorStrategy] = None, last_event_id: Optional[str] = None, logger: Optional[logging.Logger] = None ) ``` #### Parameters - **connect** (`Union[str, AsyncConnectStrategy]`) - Required - Either a URL string for HTTP connections via aiohttp, or an `AsyncConnectStrategy` instance - **initial_retry_delay** (`float`) - Optional - Initial delay before reconnecting after a failure, in seconds. Default: `1` - **retry_delay_strategy** (`Optional[RetryDelayStrategy]`) - Optional - Custom retry delay strategy; uses `RetryDelayStrategy.default()` if not specified. Default: `None` - **retry_delay_reset_threshold** (`float`) - Optional - Minimum connection duration (seconds) before the retry delay strategy resets. Default: `60` - **error_strategy** (`Optional[ErrorStrategy]`) - Optional - Custom error handling strategy; uses `ErrorStrategy.always_fail()` if not specified. Default: `None` - **last_event_id** (`Optional[str]`) - Optional - Initial value for `Last-Event-Id` header for resuming streams. Default: `None` - **logger** (`Optional[logging.Logger]`) - Optional - Logger instance for debug output. Default: `None` ### Example ```python import asyncio import logging from ld_eventsource import AsyncSSEClient from ld_eventsource.config import AsyncConnectStrategy, ErrorStrategy async def main(): # Simple HTTP connection async with AsyncSSEClient("https://events.example.com/stream") as client: async for event in client.events: print(f"Event: {event.data}") # Advanced configuration async with AsyncSSEClient( connect=AsyncConnectStrategy.http( url="https://events.example.com/stream", headers={"Authorization": "Bearer token123"}, aiohttp_request_options={"timeout": aiohttp.ClientTimeout(total=None, sock_read=30)} ), initial_retry_delay=2, error_strategy=ErrorStrategy.continue_with_max_attempts(5), logger=logging.getLogger("my_app.sse") ) as client: async for event in client.events: print(event.data) asyncio.run(main()) ``` ``` -------------------------------- ### Start Headers Property Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Provides access to the HTTP response headers from the stream connection. Header lookups are case-insensitive. ```python @property def headers(self) -> Optional[Headers] ``` -------------------------------- ### Importing Action Types Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Shows the correct import paths for various action types (Action, Event, Comment, Start, Fault) from the ld_eventsource.actions module. ```python from ld_eventsource.actions import Action, Event, Comment, Start, Fault ``` -------------------------------- ### Custom File Stream Strategy Implementation Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Provides a custom ConnectStrategy for reading Server-Sent Events from a local file. This example defines a FileStreamStrategy and a corresponding FileStreamClient. ```python from ld_eventsource.config import ConnectStrategy, ConnectionClient, ConnectionResult from io import BytesIO import logging class FileStreamStrategy(ConnectStrategy): def __init__(self, file_path): self.file_path = file_path def create_client(self, logger: Logger) -> ConnectionClient: return FileStreamClient(self.file_path, logger) class FileStreamClient(ConnectionClient): def __init__(self, file_path, logger): self.file_path = file_path self.logger = logger def connect(self, last_event_id: Optional[str]) -> ConnectionResult: self.logger.info(f"Opening file {self.file_path}") with open(self.file_path, 'rb') as f: stream_data = f.read() def chunk_iterator(): yield stream_data return ConnectionResult( stream=chunk_iterator(), closer=None, headers=None ) # Usage client = SSEClient(FileStreamStrategy("events.txt")) for event in client.events: print(event.data) ``` -------------------------------- ### AsyncSSEClient with Custom Retry and Error Strategies Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md This example configures custom retry delay and error handling strategies for the AsyncSSEClient. It demonstrates setting maximum delay, backoff multiplier, jitter, and a time-limited error continuation. ```python import asyncio from ld_eventsource import AsyncSSEClient from ld_eventsource.config import RetryDelayStrategy, ErrorStrategy async def main(): async with AsyncSSEClient( url="https://events.example.com/stream", initial_retry_delay=1, retry_delay_strategy=RetryDelayStrategy.default( max_delay=60, backoff_multiplier=2, jitter_multiplier=0.5 ), error_strategy=ErrorStrategy.continue_with_time_limit(max_time=300) ) as client: async for event in client.events: print(event.data) asyncio.run(main()) ``` -------------------------------- ### Using Dynamic Query Parameters with SSEClient Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/types.md Example of configuring an SSEClient with dynamic query parameters using a callable that adds a timestamp and session ID to the URL. This is useful for tracking or session management. ```python import time from ld_eventsource import SSEClient from ld_eventsource.config import ConnectStrategy def add_timestamp_and_session(): return { "timestamp": str(int(time.time())), "session_id": get_session_id() } client = SSEClient( ConnectStrategy.http( url="https://events.example.com/stream", query_params=add_timestamp_and_session ) ) for event in client.events: print(event.data) ``` -------------------------------- ### Import Actions and Events Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md Import classes related to actions and events, including generic Action, Event, Comment, Start, and Fault types. ```python # Actions and events from ld_eventsource.actions import ( Action, Event, Comment, Start, Fault ) ``` -------------------------------- ### Dynamic Query Parameters Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Example of providing a callable to dynamically generate query parameters for each connection attempt. This allows for parameters like timestamps to be updated per request. ```python def dynamic_params(): return {"timestamp": str(int(time.time()))} client = SSEClient( ConnectStrategy.http( url="https://api.example.com/events", query_params=dynamic_params ) ) ``` -------------------------------- ### AsyncSSEClient close() Method Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Illustrates how to properly close the AsyncSSEClient and release resources, typically used in a finally block to ensure closure after processing events. ```python client = AsyncSSEClient("https://events.example.com/stream") try: async for event in client.events: print(event.data) finally: await client.close() ``` -------------------------------- ### Reading All Actions with Error Handling (Sync) Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Iterate through all possible actions (Event, Start, Fault) from an SSE stream using synchronous reading. Includes basic error reporting for Fault actions. ```python from ld_eventsource import SSEClient from ld_eventsource.actions import Event, Start, Fault client = SSEClient("https://events.example.com/stream") for action in client.all: if isinstance(action, Event): print(f"Event: {action.data}") elif isinstance(action, Start): print("Connected") elif isinstance(action, Fault): if action.error: print(f"Error: {action.error}") else: print("Stream ended") ``` -------------------------------- ### Accessing HTTP Headers from SSEClient Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/types.md Demonstrates how to access and retrieve HTTP headers from a successful connection's Start action using the Headers type alias. Header lookups are case-insensitive. ```python from ld_eventsource import SSEClient from ld_eventsource.actions import Start client = SSEClient("https://events.example.com/stream") for action in client.all: if isinstance(action, Start): headers = action.headers if headers: # Case-insensitive access content_type = headers.get('content-type') rate_limit = headers.get('X-RateLimit-Remaining') server = headers.get('server') ``` -------------------------------- ### Minimal Configuration for AsyncSSEClient Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md This snippet shows the most basic way to instantiate and use the AsyncSSEClient with a given URL. It's suitable for simple stream consumption. ```python import asyncio from ld_eventsource import AsyncSSEClient async def main(): async with AsyncSSEClient("https://events.example.com/stream") as client: async for event in client.events: print(event.data) asyncio.run(main()) ``` -------------------------------- ### Import Configuration Strategies Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md Import various configuration strategies for connecting, handling errors, and managing retry delays in both synchronous and asynchronous modes. ```python # Config strategies from ld_eventsource.config import ( ConnectStrategy, AsyncConnectStrategy, ErrorStrategy, RetryDelayStrategy, ConnectionClient, ConnectionResult, AsyncConnectionClient, AsyncConnectionResult ) ``` -------------------------------- ### Iterate Over All SSE Actions Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Access all notifications from the stream, including events, comments, connection notifications, and errors. Automatically starts or restarts the stream if not active. Yields Start on connection, Event/Comment for data, Fault on errors, and Fault(None) before normal closure. ```python client = SSEClient("https://events.example.com/stream") for action in client.all: if isinstance(action, Event): print(f"Event: {action.event} = {action.data}") elif isinstance(action, Start): print(f"Connected with headers: {action.headers}") elif isinstance(action, Fault): if action.error: print(f"Error: {action.error}") else: print("Stream ended normally") elif isinstance(action, Comment): print(f"Comment: {action.comment}") ``` -------------------------------- ### interrupt() Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Stops the current SSE stream connection if active. The client can be restarted by calling `start()` or reading events. ```APIDOC ## interrupt() ```python def interrupt(self) -> None ``` Stops the stream connection if currently active, without permanently closing the client. The client will automatically attempt to reconnect if you continue reading events or call `start()` again. This differs from `close()` which prevents all future reconnection. **Example:** ```python client = SSEClient("https://events.example.com/stream") for event in client.events: if event.data == "STOP": client.interrupt() # Connection closes but can be restarted break ``` ``` -------------------------------- ### Import Configuration Strategies and Classes Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md Import various configuration-related classes and strategies for managing connection and error handling. ```python from ld_eventsource.config import ( ConnectStrategy, AsyncConnectStrategy, ErrorStrategy, RetryDelayStrategy, ConnectionClient, ConnectionResult, AsyncConnectionClient, AsyncConnectionResult ) ``` -------------------------------- ### Using Optional and Union Types for SSEClient Parameters Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/types.md Illustrates how to use Optional and Union types when initializing SSEClient, showing that parameters like 'connect', 'last_event_id', and 'logger' can accept multiple types or be None. ```python from typing import Optional from ld_eventsource import SSEClient client: SSEClient = SSEClient( connect="https://events.example.com", # Can be str or ConnectStrategy last_event_id=None, # Can be str or None logger=None # Can be logging.Logger or None ) ``` -------------------------------- ### HTTP Connection with Custom Headers Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Shows how to establish an HTTP connection with custom headers, such as Authorization and User-Agent. This is useful for authentication and identifying the client. ```python client = SSEClient( ConnectStrategy.http( url="https://api.example.com/events", headers={ "Authorization": "Bearer my_token", "User-Agent": "MyApp/1.0" } ) ) ``` -------------------------------- ### SSEClient.all Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md An iterator that yields all `Action` objects from the stream, including `Event`, `Comment`, `Start`, and `Fault` types. Useful for comprehensive stream monitoring. ```APIDOC ## SSEClient.all ### Description Provides an iterator that yields all `Action` objects from the Server-Sent Events stream. This includes `Event` objects, `Comment` lines, `Start` connection notifications, and `Fault` error notifications. It is useful for monitoring all types of stream activity. ### Usage ```python for action in client.all: if isinstance(action, Event): print(f"Event data: {action.data}") elif isinstance(action, Fault): print(f"Error occurred: {action.error}") ``` ``` -------------------------------- ### Basic SSEClient Usage with RedisStreamStrategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/EXTENSIBILITY.md Demonstrates the basic usage of SSEClient with a RedisStreamStrategy to consume events. ```python strategy = RedisStreamStrategy( redis_url="redis://localhost:6379", stream_key="events" ) client = SSEClient(strategy) for event in client.events: print(event.data) ``` -------------------------------- ### Configure Error Strategy with Reset Behavior Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ErrorStrategy.md Demonstrates setting up an `SSEClient` with a maximum number of retries per session and a reset threshold for the retry delay. This allows strategies like `continue_with_max_attempts` to restart their counting after a successful connection period. ```python from ld_eventsource import SSEClient from ld_eventsource.config import ErrorStrategy # Allow 5 retries per successful connection session client = SSEClient( url="https://api.example.com/events", initial_retry_delay=1, retry_delay_reset_threshold=60, # Reset after 1 minute connected error_strategy=ErrorStrategy.continue_with_max_attempts(5) ) # If a connection succeeds and lasts 60+ seconds, then fails, ``` -------------------------------- ### AsyncSSEClient.all Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md An asynchronous iterator that yields all `Action` objects from the stream, including `Event`, `Comment`, `Start`, and `Fault` types. Suitable for comprehensive async stream monitoring. ```APIDOC ## AsyncSSEClient.all ### Description Provides an asynchronous iterator that yields all `Action` objects from the Server-Sent Events stream. This includes `Event`, `Comment`, `Start`, and `Fault` types, making it suitable for comprehensive monitoring in `asyncio` applications. ### Usage ```python async for action in client.all: if isinstance(action, Event): print(f"Async event data: {action.data}") elif isinstance(action, Fault): print(f"Async error occurred: {action.error}") ``` ``` -------------------------------- ### Async SSE Client with Custom Strategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncConnectStrategy.md Demonstrates how to use a custom asynchronous strategy with AsyncSSEClient to stream events. The custom strategy yields data chunks asynchronously. ```python async def stream_source(): # Yield chunks asynchronously yield b"data: hello\n\n" await asyncio.sleep(1) yield b"data: world\n\n" async def main(): client = AsyncSSEClient(CustomAsyncStrategy(stream_source)) async for event in client.events: print(event.data) asyncio.run(main()) ``` -------------------------------- ### connect() Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Attempts to establish a connection to a stream. This method is crucial for initiating the event stream and can raise various exceptions if the connection fails. ```APIDOC ## connect() ### Description Attempts to establish a connection to a stream. Raises an exception if unsuccessful. ### Method Signature ```python def connect(self, last_event_id: Optional[str]) -> ConnectionResult ``` ### Parameters #### Path Parameters - **last_event_id** (Optional[str]) - Required - The current `Last-Event-Id` value (should be sent to server for stream resumption) ### Returns A `ConnectionResult` containing the stream iterator and headers. ### Raises - **HTTPStatusError**: HTTP response status is 4xx, 5xx, or 204 - **HTTPContentTypeError**: HTTP response `Content-Type` is not `text/event-stream` - **Exception**: Network or I/O error ``` -------------------------------- ### Import Core SSEClient and AsyncSSEClient Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md Import the main synchronous and asynchronous Server-Sent Events client classes from the ld_eventsource package. ```python # Main package from ld_eventsource import SSEClient, AsyncSSEClient ``` -------------------------------- ### AsyncSSEClient interrupt() Method Example Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Demonstrates interrupting the SSE stream connection temporarily using the interrupt() method, allowing for automatic reconnection on subsequent reads. ```python client = AsyncSSEClient("https://events.example.com/stream") async for event in client.events: if should_disconnect: await client.interrupt() break ``` -------------------------------- ### Minimal SSEClient Configuration (HTTP) Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md Basic configuration for SSEClient using only a URL for HTTP connections. No custom headers or advanced options are specified. ```python from ld_eventsource import SSEClient client = SSEClient("https://events.example.com/stream") ``` -------------------------------- ### Event Actions Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/INDEX.md Defines the structure of different types of messages that can be received from an SSE stream, including regular events, comments, connection start messages, and fault/error messages. ```APIDOC ## Event Actions Represents the different types of data that can be received from an SSE stream. ### Event - **Description**: Represents a standard data payload from the server. - **Attributes**: - `event` (str): The type of the event (defaults to "message"). - `data` (str): The payload of the event. - `id` (Optional[str]): The ID of this specific event. - `last_event_id` (Optional[str]): The ID of the last event received in the stream. ### Comment - **Description**: Represents a comment line from the server (lines starting with `:`). - **Attributes**: - `comment` (str): The content of the comment. ### Start - **Description**: Represents the initial connection establishment message. - **Attributes**: - `headers` (Optional[Headers]): HTTP headers received upon connection. ### Fault - **Description**: Represents an error or the end of the stream due to a fault. - **Attributes**: - `error` (Optional[Exception]): The exception that occurred, if any. - `headers` (Optional[Headers]): HTTP headers received at the time of the fault, if available. ``` -------------------------------- ### Configure Async HTTP Connection Strategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md Use AsyncConnectStrategy.http for asynchronous HTTP connections. Configure the URL, headers, and aiohttp specific request options such as timeout. ```python from ld_eventsource.config import AsyncConnectStrategy import aiohttp strategy = AsyncConnectStrategy.http( url="https://events.example.com/stream", headers={"Authorization": "Bearer token"}, aiohttp_request_options={ "timeout": aiohttp.ClientTimeout(total=None, sock_read=30) } ) ``` -------------------------------- ### SSEClient Configuration with Stream Resumption Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md Configures SSEClient to resume an interrupted stream using the `last_event_id` parameter. The example shows saving and loading the last event ID. ```python from ld_eventsource import SSEClient # Load saved event ID from persistent storage saved_id = load_last_event_id() client = SSEClient( url="https://events.example.com/stream", last_event_id=saved_id ) for event in client.events: print(event.data) # Save current position save_last_event_id(client.last_event_id) ``` -------------------------------- ### Handle Rate Limiting with HTTPStatusError Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/errors.md Example of catching HTTPStatusError and specifically handling rate limiting (429) or server errors (5xx). Requires importing SSEClient and HTTPStatusError. ```python from ld_eventsource import SSEClient from ld_eventsource.errors import HTTPStatusError client = SSEClient("https://events.example.com/stream") try: for event in client.events: print(event.data) except HTTPStatusError as e: if e.status == 429: # Too Many Requests print(f"Rate limited. Retry-After: {e.headers.get('Retry-After')}") elif e.status >= 500: print("Server error, will retry") else: print(f"HTTP {e.status}: {e}") ``` -------------------------------- ### Run Unit Tests Source: https://github.com/launchdarkly/python-eventsource/blob/main/CONTRIBUTING.md Executes all unit tests for the project. ```shell make test ``` -------------------------------- ### Iterate Over SSE Events Only Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Access only Event objects from the stream, filtering out comments, connection notifications, and errors. Automatically starts or restarts the stream if not active. Skips non-Event actions. ```python client = SSEClient("https://events.example.com/stream") for event in client.events: print(f"Type: {event.event}") print(f"Data: {event.data}") if event.id: print(f"ID: {event.id}") ``` -------------------------------- ### Async HTTP Connection Strategy with Dynamic Query Parameters Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncConnectStrategy.md Sets up an asynchronous HTTP connection strategy that dynamically adds query parameters to each request. A callable function is provided to generate these parameters, such as a timestamp. ```python import time def add_timestamp(): return {"ts": str(int(time.time()))} strategy = AsyncConnectStrategy.http( url="https://api.example.com/events", query_params=add_timestamp ) ``` -------------------------------- ### AsyncSSEClient.all Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncSSEClient.md Provides an async iterable for all notifications from the stream, including events, comments, connection notifications, and errors. It yields instances of Action subclasses: Event, Comment, Start, or Fault. ```APIDOC ## AsyncSSEClient.all ### Description An async iterable of all notifications from the stream, including events, comments, connection notifications, and errors. Returns instances of `Action` subclasses: `Event`, `Comment`, `Start`, or `Fault`. ### Behavior - Automatically starts or restarts the stream if not already active - `Start` action contains HTTP response headers - `Event` or `Comment` for received data - `Fault` on errors (contains exception details) - Yields `Fault(None)` before stopping if stream closes normally ### Returns AsyncIterable[Action] ``` -------------------------------- ### Reading Only Events from SSE Stream Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/Actions.md Efficiently read only event data from an SSE stream, ignoring other action types like Start or Fault. This is useful when only event payloads are of interest. ```python from ld_eventsource import SSEClient client = SSEClient("https://events.example.com/stream") for event in client.events: print(f"{event.event}: {event.data}") ``` -------------------------------- ### Import SSEClient and AsyncSSEClient Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/README.md Import the main client classes from the ld_eventsource package for both synchronous and asynchronous event stream handling. ```python from ld_eventsource import SSEClient, AsyncSSEClient ``` -------------------------------- ### Logging in Custom Strategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/EXTENSIBILITY.md Example of incorporating logging within a custom error strategy. This helps in debugging by providing insights into the strategy's decision-making process during stream events. ```python import logging from ld_eventsource.config import ErrorStrategy class MyStrategy(ErrorStrategy): def __init__(self, logger=None): self.logger = logger or logging.getLogger(__name__) def apply(self, exception): if exception is None: self.logger.debug("Stream ended normally") else: self.logger.warning(f"Stream error: {exception}") return (ErrorStrategy.CONTINUE, self) ``` -------------------------------- ### AsyncConnectionClient.connect() Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncConnectStrategy.md Asynchronously attempts to establish a connection to a stream. Raises an exception if unsuccessful. ```APIDOC ## AsyncConnectionClient.connect() ### Description Asynchronously attempts to establish a connection to a stream. Raises an exception if unsuccessful. ### Method `async def connect(self, last_event_id: Optional[str]) -> AsyncConnectionResult` ### Parameters #### Path Parameters - **last_event_id** (`Optional[str]`) - Optional - The current `Last-Event-Id` value (sent to server for stream resumption) ### Returns An `AsyncConnectionResult` containing the stream iterator and headers. ### Raises - `HTTPStatusError`: HTTP response status is 4xx, 5xx, or 204 - `HTTPContentTypeError`: HTTP response `Content-Type` is not `text/event-stream` - `Exception`: Network or I/O error ``` -------------------------------- ### Async SSE Client with Client-Managed Session Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/AsyncConnectStrategy.md Initializes an AsyncSSEClient where the client itself manages the creation and closure of the underlying aiohttp session. This is the default behavior when no explicit session is provided. ```python client = AsyncSSEClient( AsyncConnectStrategy.http("https://api.example.com/events") ) ``` -------------------------------- ### Custom Redis Stream Connection Strategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/EXTENSIBILITY.md Implement a custom ConnectStrategy to read SSE from a Redis stream. This example defines a RedisStreamStrategy and RedisStreamClient to consume messages from a specified Redis stream key. ```python import redis from ld_eventsource.config import ConnectStrategy, ConnectionClient, ConnectionResult from ld_eventsource import SSEClient from typing import Optional import logging class RedisStreamStrategy(ConnectStrategy): """Read SSE events from a Redis stream.""" def __init__(self, redis_url: str, stream_key: str): self.redis_url = redis_url self.stream_key = stream_key def create_client(self, logger: logging.Logger) -> ConnectionClient: return RedisStreamClient(self.redis_url, self.stream_key, logger) class RedisStreamClient(ConnectionClient): def __init__(self, redis_url: str, stream_key: str, logger: logging.Logger): self.redis_url = redis_url self.stream_key = stream_key self.logger = logger self.client = None def connect(self, last_event_id: Optional[str]) -> ConnectionResult: """Connect to Redis and stream events.""" self.logger.info(f"Connecting to Redis stream: {self.stream_key}") self.client = redis.from_url(self.redis_url) async def redis_chunk_iterator(): """Yield events from Redis stream.""" last_id = last_event_id or '0' while True: # Read new entries from stream entries = self.client.xread( {self.stream_key: last_id}, block=1000, count=10 ) if entries: for stream_key, messages in entries: for msg_id, data in messages: # Convert to SSE format event_line = b'' for field, value in data.items(): if field == b'event': event_line += b'event: ' + value + b'\n' elif field == b'id': event_line += b'id: ' + value + b'\n' last_id = value.decode() else: event_line += b'data: ' + value + b'\n' event_line += b'\n' yield event_line def redis_closer(): """Close Redis connection.""" if self.client: self.client.close() return ConnectionResult( stream=redis_chunk_iterator(), closer=redis_closer, headers=None ) def close(self): """Clean up resources.""" if self.client: self.client.close() ``` -------------------------------- ### Custom Fibonacci Backoff Strategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/RetryDelayStrategy.md Implement a custom retry delay strategy using the Fibonacci sequence. This example shows how to define a new class inheriting from RetryDelayStrategy and override the apply method. ```python from ld_eventsource.config import RetryDelayStrategy import random class FibonacciBackoffStrategy(RetryDelayStrategy): """Fibonacci sequence backoff: 1, 1, 2, 3, 5, 8, 13...""" def __init__(self, fib_prev=0, fib_curr=1, max_delay=60): self.fib_prev = fib_prev self.fib_curr = fib_curr self.max_delay = max_delay def apply(self, base_delay): # Compute next Fibonacci number next_fib = min(self.fib_prev + self.fib_curr, self.max_delay) # Create strategy for next retry next_strategy = FibonacciBackoffStrategy( fib_prev=self.fib_curr, fib_curr=next_fib, max_delay=self.max_delay ) return (float(next_fib), next_strategy) # Usage client = SSEClient( url="https://api.example.com/events", initial_retry_delay=1, retry_delay_strategy=FibonacciBackoffStrategy() ) ``` -------------------------------- ### Run Linter and Type Checker Source: https://github.com/launchdarkly/python-eventsource/blob/main/CONTRIBUTING.md Executes the linter and checks type hints for the project. ```shell make lint ``` -------------------------------- ### Async Session Management with aiohttp Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/configuration.md Use this snippet for async applications that need to manage aiohttp sessions. It demonstrates how to initialize and close multiple AsyncSSEClient instances within a single session context. ```python import aiohttp from ld_eventsource import AsyncSSEClient from ld_eventsource.config import AsyncConnectStrategy async def main(): async with aiohttp.ClientSession() as session: client1 = AsyncSSEClient( AsyncConnectStrategy.http(url1, session=session) ) client2 = AsyncSSEClient( AsyncConnectStrategy.http(url2, session=session) ) # Use clients... await client1.close() await client2.close() # Session is closed automatically ``` -------------------------------- ### Access Last Event ID Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Get the ID of the last received event. This property is updated when an event with an 'id:' field is received and can be set during client initialization to resume a stream from a specific position. ```python # Resume from a saved position saved_id = "12345" client = SSEClient( "https://events.example.com/stream", last_event_id=saved_id ) # Access the current ID for event in client.events: print(f"Current last_event_id: {client.last_event_id}") save_id_to_db(client.last_event_id) ``` -------------------------------- ### Create Default HTTP ConnectStrategy Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ConnectStrategy.md Creates the default HTTP implementation using urllib3 for making HTTP requests to an SSE endpoint. Use this for standard HTTP/S connections. ```python @staticmethod def http( url: str, headers: Optional[dict] = None, pool: Optional[PoolManager] = None, urllib3_request_options: Optional[dict] = None, query_params: Optional[DynamicQueryParams] = None ) -> ConnectStrategy: ``` -------------------------------- ### Get Next Retry Delay Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/SSEClient.md Retrieve the retry delay in seconds for the next reconnection attempt if the stream has failed. Initially 0, this value indicates the sleep duration before reconnecting after an error or closure. ```python client = SSEClient("https://events.example.com/stream") try: client.start() except Exception as e: print(f"Connection failed, will retry in {client.next_retry_delay}s") ``` -------------------------------- ### Implement Custom Error Strategy Class Source: https://github.com/launchdarkly/python-eventsource/blob/main/_autodocs/api-reference/ErrorStrategy.md Subclass `ErrorStrategy` to create a reusable error handling strategy. This example implements a strategy that retries specific HTTP status codes up to a maximum number of attempts. ```python from ld_eventsource.config import ErrorStrategy from ld_eventsource.errors import HTTPStatusError class RetryableHTTPErrorStrategy(ErrorStrategy): def __init__(self, retryable_statuses=None, max_retries=3): self.retryable_statuses = retryable_statuses or {429, 503} self.max_retries = max_retries self.retry_count = 0 def apply(self, exception): if exception is None: # Stream ended normally return (ErrorStrategy.FAIL, self) if not isinstance(exception, HTTPStatusError): # Non-HTTP errors always fail return (ErrorStrategy.FAIL, self) if exception.status in self.retryable_statuses: if self.retry_count < self.max_retries: # Retry with incremented counter next_strategy = RetryableHTTPErrorStrategy( self.retryable_statuses, self.max_retries ) next_strategy.retry_count = self.retry_count + 1 return (ErrorStrategy.CONTINUE, next_strategy) else: return (ErrorStrategy.FAIL, self) # Non-retryable status return (ErrorStrategy.FAIL, self) # Usage client = SSEClient( url="https://api.example.com/events", error_strategy=RetryableHTTPErrorStrategy() ) ```