### Complete Setup Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md A comprehensive example showing the setup of a PullBasedJetStreamBroker with result backend, schedule source, stream, and consumer configurations. ```python import asyncio from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy from taskiq import InMemoryBroker from taskiq.scheduler import SimpleScheduler from taskiq.serializers import JSONSerializer from taskiq_nats import ( PullBasedJetStreamBroker, NATSObjectStoreResultBackend, NATSKeyValueScheduleSource, ) # Configure result backend result_backend = NATSObjectStoreResultBackend( servers=["nats://localhost:4222"], keep_results=True, bucket_name="app_results", serializer=JSONSerializer(), ) # Configure schedule source schedule_source = NATSKeyValueScheduleSource( servers=["nats://localhost:4222"], bucket_name="app_schedules", prefix="schedule", ) # Configure stream stream_config = StreamConfig( retention=RetentionPolicy.LIMITS, max_age=3600 * 24 * 7, storage=2, # File-based ) # Configure consumer consumer_config = ConsumerConfig( max_deliver=5, backoff=[1000, 2000, 4000], ) # Configure broker with all components broker = PullBasedJetStreamBroker( servers=["nats://nats1:4222", "nats://nats2:4222"], subject="app_tasks", stream_name="app_stream", durable="app_consumer", stream_config=stream_config, consumer_config=consumer_config, pull_consume_batch=5, pull_consume_timeout=5.0, ).with_result_backend( result_backend ).with_schedule_source( schedule_source ) # Create scheduler scheduler = SimpleScheduler( broker=broker, schedule_source=schedule_source, ) async def main(): await broker.startup() await result_backend.startup() await schedule_source.startup() # Tasks can now be sent and results stored await broker.shutdown() await result_backend.shutdown() await schedule_source.shutdown() asyncio.run(main()) ``` -------------------------------- ### Startup Method Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PushBasedJetStreamBroker.md An example demonstrating how to initialize and start the PushBasedJetStreamBroker. ```python broker = PushBasedJetStreamBroker(servers="nats://localhost:4222") await broker.startup() ``` -------------------------------- ### Startup Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSKeyValueScheduleSource.md Example of how to initialize and start the NATSKeyValueScheduleSource. ```python source = NATSKeyValueScheduleSource(servers="nats://localhost:4222") await source.startup() ``` -------------------------------- ### Startup Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md Example of how to use the startup method. ```python backend = NATSObjectStoreResultBackend(servers="nats://localhost:4222") await backend.startup() ``` -------------------------------- ### Quick Start Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/README.md A basic example demonstrating how to initialize a PullBasedJetStreamBroker with a NATSObjectStoreResultBackend and define a simple task. ```python from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend( NATSObjectStoreResultBackend(servers="nats://localhost:4222") ) @broker.task async def my_task() -> str: return "done" ``` -------------------------------- ### startup example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PullBasedJetStreamBroker.md Example of how to use the startup method. ```python broker = PullBasedJetStreamBroker(servers="nats://localhost:4222") await broker.startup() ``` -------------------------------- ### Basic Taskiq NATS Setup with FastAPI Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md This example demonstrates how to set up Taskiq with NATS and FastAPI, including defining tasks, endpoints, and result retrieval. ```python from fastapi import FastAPI, HTTPException from taskiq_nats import PullBasedJetStreamBroker, NATSObjectStoreResultBackend from contextlib import asynccontextmanager import asyncio from taskiq.exceptions import ResultGetError result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", ) broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend(result_backend) @broker.task async def send_email(email: str, subject: str) -> dict: """Send email asynchronously.""" await asyncio.sleep(1) return { "email": email, "subject": subject, "status": "sent", } @broker.task async def process_csv(file_path: str) -> int: """Process CSV file asynchronously.""" await asyncio.sleep(3) return 42 # Simulated row count @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown event handlers.""" # Startup await broker.startup() await result_backend.startup() yield # Shutdown await broker.shutdown() await result_backend.shutdown() app = FastAPI(lifespan=lifespan) @app.post("/send-email") async def send_email_endpoint(email: str, subject: str): """Queue an email to be sent.""" task = await send_email.kiq(email, subject) return { "task_id": task.task_id, "status": "queued", } @app.post("/process-csv") async def upload_csv_endpoint(file_path: str): """Queue a CSV file for processing.""" task = await process_csv.kiq(file_path) return { "task_id": task.task_id, "status": "processing", } @app.get("/task/{task_id}") async def get_task_status(task_id: str): """Get the status and result of a task.""" try: # Check if result is ready is_ready = await result_backend.is_result_ready(task_id) if not is_ready: return { "task_id": task_id, "status": "processing", "result": None, } # Get the result result = await result_backend.get_result(task_id) return { "task_id": task_id, "status": "completed" if not result.is_error else "failed", "result": result.return_value if not result.is_error else None, "error": result.error if result.is_error else None, } except ResultGetError: raise HTTPException(status_code=404, detail="Task not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` -------------------------------- ### Default NATS broker setup Source: https://github.com/taskiq-python/taskiq-nats/blob/master/README.md A minimal setup example with a default NATS broker and a task. ```python import asyncio from taskiq_nats import NatsBroker, JetStreamBroker broker = NatsBroker( [ "nats://nats1:4222", "nats://nats2:4222", ], queue="random_queue_name", ) @broker.task async def my_lovely_task(): print("I love taskiq") async def main(): await broker.startup() await my_lovely_task.kiq() await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Scheduler Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md This example demonstrates how to set up and run a scheduler with Taskiq NATS, including recurring and one-time tasks, and dynamic schedule management. ```python import asyncio from datetime import datetime, timedelta from taskiq_nats import NATSKeyValueScheduleSource, PullBasedJetStreamBroker from taskiq.schedule import ScheduledTask from taskiq.scheduler import SimpleScheduler broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", durable="scheduler_workers", ) schedule_source = NATSKeyValueScheduleSource( servers="nats://localhost:4222", bucket_name="task_schedules", prefix="schedule", ) scheduler = SimpleScheduler( broker=broker, schedule_source=schedule_source, ) @broker.task async def send_daily_digest() -> None: """Send daily digest email.""" print(f"[{datetime.now()}] Sending daily digest...") @broker.task async def cleanup_temp_files() -> None: """Clean up temporary files.""" print(f"[{datetime.now()}] Cleaning up temp files...") @broker.task async def backup_database() -> None: """Backup the database.""" print(f"[{datetime.now()}] Backing up database...") @broker.task async def send_reminder(user_id: int, message: str) -> None: """Send a reminder to a user.""" print(f"[{datetime.now()}] Reminder for user {user_id}: {message}") async def setup_schedules(): """Set up recurring schedules.""" await broker.startup() await schedule_source.startup() # Recurring schedules (cron) schedules = [ ScheduledTask( schedule_id="daily_digest", task_name="send_daily_digest", labels={}, cron="0 9 * * *", # 9 AM every day ), ScheduledTask( schedule_id="cleanup", task_name="cleanup_temp_files", labels={}, cron="0 2 * * *", # 2 AM every day (maintenance window) ), ScheduledTask( schedule_id="backup", task_name="backup_database", labels={}, cron="0 3 * * 0", # 3 AM every Sunday ), ] for schedule in schedules: await schedule_source.add_schedule(schedule) print(f"Scheduled: {schedule.schedule_id}") await broker.shutdown() await schedule_source.shutdown() async def run_scheduler(): """Run the scheduler.""" await broker.startup() await schedule_source.startup() # Add one-time scheduled task future_time = datetime.utcnow() + timedelta(seconds=30) one_time_schedule = ScheduledTask( schedule_id="one_time_reminder", task_name="send_reminder", labels={}, time=future_time, # One-time execution ) # Note: Usually you wouldn't add this in the scheduler itself # Run scheduler indefinitely try: await scheduler.run() except KeyboardInterrupt: print("Scheduler stopped") finally: await broker.shutdown() await schedule_source.shutdown() async def manage_schedules(): """Dynamically manage schedules.""" await broker.startup() await schedule_source.startup() # List all schedules all_schedules = await schedule_source.get_schedules() print(f"Total schedules: {len(all_schedules)}") for schedule in all_schedules: print(f" - {schedule.schedule_id}: {schedule.cron}") # Update a schedule updated = ScheduledTask( schedule_id="cleanup", task_name="cleanup_temp_files", labels={"priority": "high"}, cron="0 3 * * *", # Changed to 3 AM ) await schedule_source.add_schedule(updated) print("Updated cleanup schedule") # Delete a schedule await schedule_source.delete_schedule("backup") print("Deleted backup schedule") await broker.shutdown() await schedule_source.shutdown() if __name__ == "__main__": # asyncio.run(setup_schedules()) # asyncio.run(run_scheduler()) asyncio.run(manage_schedules()) ``` -------------------------------- ### PushBasedJetStreamBroker Example Configuration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Example configuration for the PushBasedJetStreamBroker. ```python from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy from taskiq_nats import PushBasedJetStreamBroker stream_config = StreamConfig( name="job_stream", retention=RetentionPolicy.LIMITS, max_age=3600 * 24 * 7, # 7 days storage=2, # File-based max_msgs=1_000_000, ) consumer_config = ConsumerConfig( name="job_consumer", durable_name="job_consumer", max_deliver=5, ) broker = PushBasedJetStreamBroker( servers=["nats://localhost:4222"], subject="jobs", stream_name="job_stream", queue="job_workers", stream_config=stream_config, consumer_config=consumer_config, ) ``` -------------------------------- ### Is Result Ready Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md Example of how to use the is_result_ready method. ```python if await backend.is_result_ready("task_123"): print("Result is available") else: print("Result not ready yet") ``` -------------------------------- ### NatsBroker Example Configuration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Example configuration for the NatsBroker. ```python from taskiq_nats import NatsBroker broker = NatsBroker( servers=[ "nats://nats1:4222", "nats://nats2:4222", "nats://nats3:4222", ], subject="my_tasks", queue="task_workers", name="taskiq-producer", allow_reconnect=True, max_reconnect_attempts=10, reconnect_time_wait=2.0, ) ``` -------------------------------- ### Listen Method Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PushBasedJetStreamBroker.md An example showing how to consume messages using the listen method and acknowledge them. ```python async for ackable_msg in broker.listen(): await process_message(ackable_msg.data) ack = ackable_msg.ack() if ack: await ack ``` -------------------------------- ### Get Schedules Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSKeyValueScheduleSource.md Example of retrieving and printing all stored schedules. ```python schedules = await source.get_schedules() for schedule in schedules: print(f"Schedule {schedule.schedule_id}: {schedule.cron}") ``` -------------------------------- ### Example with Connection Options Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Demonstrates how to configure NATS connection options when initializing the NatsBroker. ```python async def on_error(error): print(f"NATS error: {error}") async def on_reconnect(): print("Reconnected to NATS") broker = NatsBroker( servers="nats://localhost:4222", name="my_app", allow_reconnect=True, max_reconnect_attempts=20, reconnect_time_wait=1.0, ping_interval=30.0, error_cb=on_error, reconnected_cb=on_reconnect, ) ``` -------------------------------- ### Installation Source: https://github.com/taskiq-python/taskiq-nats/blob/master/README.md Installs the taskiq core library and the taskiq-nats plugin. ```bash pip install taskiq taskiq-nats ``` -------------------------------- ### Set Result Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md Example of how to use the set_result method. ```python from taskiq import TaskiqResult result = TaskiqResult(return_value=42, is_error=False) await backend.set_result("task_123", result) ``` -------------------------------- ### NATSObjectStoreResultBackend Configuration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Example configuration for the NATSObjectStoreResultBackend. ```python from taskiq.serializers import JSONSerializer from taskiq_nats import NATSObjectStoreResultBackend backend = NATSObjectStoreResultBackend( servers=["nats://localhost:4222"], keep_results=False, # Delete after retrieval bucket_name="task_results", serializer=JSONSerializer(), ) ``` -------------------------------- ### Basic Push-Based JetStream Broker Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PushBasedJetStreamBroker.md Demonstrates the basic setup of a PushBasedJetStreamBroker with default configurations. ```python import asyncio from taskiq_nats import PushBasedJetStreamBroker broker = PushBasedJetStreamBroker( servers=["nats://nats1:4222", "nats://nats2:4222"], subject="jobs", stream_name="job_stream", queue="job_processors", ) @broker.task async def process_job(job_id: int): print(f"Processing job {job_id}") async def main(): await broker.startup() task = await process_job.kiq(123) await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### Basic Pull-Based JetStream Setup Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PullBasedJetStreamBroker.md Basic setup for PullBasedJetStreamBroker with default settings. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker broker = PullBasedJetStreamBroker( servers=["nats://nats1:4222", "nats://nats2:4222"], subject="tasks", stream_name="task_stream", durable="task_consumer", ) @broker.task async def process_task(data: str): print(f"Processing: {data}") async def main(): await broker.startup() task = await process_task.kiq("hello") await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### Monitoring and Debugging Task Execution Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md This example shows how to log and monitor task execution using Taskiq NATS, including sending tasks and polling for their results. ```python import asyncio import logging from datetime import datetime from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend # Setup logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger("taskiq_nats") broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend( NATSObjectStoreResultBackend(servers="nats://localhost:4222") ) @broker.task async def monitored_task(task_id: int) -> str: """Task with monitoring.""" logger.info(f"Starting task {task_id}") await asyncio.sleep(2) logger.info(f"Completed task {task_id}") return f"result_{task_id}" async def monitor_tasks(): """Monitor task execution.""" await broker.startup() # Send multiple tasks task_ids = [] for i in range(5): task = await monitored_task.kiq(i) task_ids.append(task.task_id) logger.info(f"Sent task {task.task_id}") # Poll for results completed = set() while len(completed) < len(task_ids): for task_id in task_ids: if task_id in completed: continue try: result = await broker.result_backend.get_result(task_id) logger.info(f"Task {task_id} result: {result.return_value}") completed.add(task_id) except Exception: pass await asyncio.sleep(0.5) logger.info(f"All {len(completed)} tasks completed") await broker.shutdown() if __name__ == "__main__": asyncio.run(monitor_tasks()) ``` -------------------------------- ### Broadcast Tasks Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md This example shows how to send broadcast tasks to all connected workers using Taskiq NATS, where messages are not queued but sent to every worker. ```python import asyncio from taskiq_nats import NatsBroker # Create broker in broadcast mode (no queue) broker = NatsBroker( servers="nats://localhost:4222", subject="announcements", # queue parameter omitted - broadcast to all workers ) @broker.task async def notify_all(message: str) -> None: """Broadcast notification to all workers.""" print(f"[BROADCAST] {message}") async def producer(): """Send broadcast messages.""" await broker.startup() # Each of these will be received by ALL connected workers await notify_all.kiq("System maintenance in 5 minutes") await notify_all.kiq("Update configuration from server") await notify_all.kiq("Start health check") await broker.shutdown() async def worker(): """Worker that receives broadcasts.""" await broker.startup() async for message in broker.listen(): print(f"Worker received: {message}") await broker.shutdown() if __name__ == "__main__": # asyncio.run(producer()) pass ``` -------------------------------- ### NATSKeyValueScheduleSource Configuration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Example configuration for the NATSKeyValueScheduleSource. ```python from taskiq_nats import NATSKeyValueScheduleSource source = NATSKeyValueScheduleSource( servers="nats://localhost:4222", bucket_name="schedules", prefix="taskiq_schedule", ) ``` -------------------------------- ### Integration with FastAPI Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Shows how to integrate taskiq-nats within a FastAPI web application. ```python import asyncio from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend from taskiq import ResultGetError ``` -------------------------------- ### PullBasedJetStreamBroker Configuration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/configuration.md Example configuration for the PullBasedJetStreamBroker with custom stream and consumer configurations. ```python from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy from taskiq_nats import PullBasedJetStreamBroker stream_config = StreamConfig( name="task_stream", retention=RetentionPolicy.LIMITS, max_age=3600 * 24 * 30, # 30 days storage=2, # File-based ) consumer_config = ConsumerConfig( durable_name="reliable_consumer", max_deliver=10, backoff=[1000, 2000, 4000, 8000], # Exponential backoff in ms ) broker = PullBasedJetStreamBroker( servers=["nats://nats1:4222", "nats://nats2:4222"], subject="tasks", stream_name="task_stream", durable="reliable_consumer", stream_config=stream_config, consumer_config=consumer_config, pull_consume_batch=10, pull_consume_timeout=5.0, ) ``` -------------------------------- ### Polling for Results Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md This example demonstrates how to manually poll for task results using `is_result_ready` and `get_result`. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", ) broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend(result_backend) @broker.task async def long_task() -> int: await asyncio.sleep(2) return 999 async def main(): await broker.startup() task = await long_task.kiq() # Manual polling for _ in range(10): if await result_backend.is_result_ready(task.task_id): result = await result_backend.get_result(task.task_id) print(f"Result: {result.return_value}") break await asyncio.sleep(0.5) await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### Add Schedule Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSKeyValueScheduleSource.md Example of adding a daily schedule using NATSKeyValueScheduleSource. ```python from taskiq import ScheduledTask from datetime import datetime, timedelta schedule = ScheduledTask( schedule_id="daily_task", task_name="my_task", labels={}, cron="0 0 * * *", # Daily at midnight ) await source.add_schedule(schedule) ``` -------------------------------- ### Multi-Broker Deployment Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Illustrates how to use different brokers for different task types, distinguishing between fast, non-critical tasks and reliable, critical tasks. ```python import asyncio from taskiq_nats import NatsBroker, PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend # Fast, non-critical tasks (Core NATS, broadcast) fast_broker = NatsBroker( servers="nats://localhost:4222", subject="fast_tasks", ) # Reliable, critical tasks (JetStream, with results) critical_broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", subject="critical_tasks", durable="critical_workers", ).with_result_backend( NATSObjectStoreResultBackend(servers="nats://localhost:4222") ) @fast_broker.task async def log_event(event: str) -> None: """Fast, non-critical logging.""" print(f"Event: {event}") @critical_broker.task async def transfer_money(from_account: int, to_account: int, amount: float) -> dict: """Critical financial operation with result storage.""" print(f"Transferring ${amount} from account {from_account} to {to_account}") await asyncio.sleep(2) return { "status": "completed", "from": from_account, "to": to_account, "amount": amount, } async def main(): await fast_broker.startup() await critical_broker.startup() # Send to appropriate brokers await log_event.kiq("user_login") # Fast, no need for result task = await transfer_money.kiq(1001, 1002, 500.00) # Critical, need result result = await task.wait_result() print(f"Transfer result: {result.return_value}") await fast_broker.shutdown() await critical_broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Simple Task Queue with Core NATS Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Basic non-persistent task queue using NatsBroker. ```python import asyncio from taskiq_nats import NatsBroker # Create broker broker = NatsBroker( servers="nats://localhost:4222", subject="background_tasks", queue="workers", # Ensures each task is processed by one worker ) @broker.task async def send_email(recipient: str, subject: str) -> bool: """Send an email (mock implementation).""" print(f"Sending email to {recipient}: {subject}") await asyncio.sleep(1) # Simulate work return True @broker.task async def process_image(image_id: int) -> None: """Process an image (mock implementation).""" print(f"Processing image {image_id}") await asyncio.sleep(2) async def main(): """Producer that sends tasks.""" await broker.startup() # Send tasks await send_email.kiq("user@example.com", "Welcome!") await send_email.kiq("admin@example.com", "Alert") await process_image.kiq(42) await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` ```python import asyncio from taskiq import KickableTask, TaskiqMessage from taskiq_nats import NatsBroker broker = NatsBroker( servers="nats://localhost:4222", subject="background_tasks", queue="workers", ) @broker.task async def send_email(recipient: str, subject: str) -> bool: print(f"Sending email to {recipient}: {subject}") return True async def worker(): """Worker that processes tasks.""" await broker.startup() async for message in broker.listen(): # Taskiq framework handles deserialization print(f"Received task: {message}") await broker.shutdown() asyncio.run(worker()) ``` -------------------------------- ### Error Handling Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Example of how to handle potential NATS and Taskiq errors when interacting with the broker and result backend. ```python from nats.errors import Error as NatsError from taskiq import ResultGetError try: result = await backend.get_result(task_id) except ResultGetError: print("Result not ready yet") except NatsError as e: print(f"NATS error: {e}") ``` -------------------------------- ### Basic Schedule Storage Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSKeyValueScheduleSource.md Example demonstrating how to add, retrieve, and manage schedules using NATSKeyValueScheduleSource. ```python import asyncio from taskiq import ScheduledTask from taskiq_nats import NATSKeyValueScheduleSource source = NATSKeyValueScheduleSource( servers="nats://localhost:4222", bucket_name="my_schedules", ) async def main(): await source.startup() # Add a daily schedule schedule = ScheduledTask( schedule_id="daily_report", task_name="generate_report", labels={}, cron="0 9 * * *", # 9 AM every day ) await source.add_schedule(schedule) # Retrieve all schedules schedules = await source.get_schedules() print(f"Stored {len(schedules)} schedules") await source.shutdown() asyncio.run(main()) ``` -------------------------------- ### Broker Configuration from Environment Variables Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Example of constructing a Taskiq NATS broker using environment variables for flexibility. ```python import os broker = PullBasedJetStreamBroker( servers=os.getenv("NATS_SERVERS", "nats://localhost:4222").split(","), durable=os.getenv("NATS_DURABLE", "taskiq"), ) ``` -------------------------------- ### With Custom Serializer Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md This example shows how to use a custom serializer, like JSONSerializer, with NATSObjectStoreResultBackend. ```python import asyncio from taskiq.serializers import JSONSerializer from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", serializer=JSONSerializer(), # Use JSON instead of pickle ) broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend(result_backend) @broker.task async def compute() -> dict: return {"status": "done"} async def main(): await broker.startup() task = await compute.kiq() result = await task.wait_result() print(result.return_value) # {"status": "done"} await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### Multi-Node NATS Cluster Example Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Docker Compose configuration for setting up a multi-node NATS cluster. ```yaml version: '3' services: nats1: image: nats:latest command: -c /etc/nats/nats.conf volumes: - ./nats-1.conf:/etc/nats/nats.conf ports: - "4222:4222" nats2: image: nats:latest command: -c /etc/nats/nats.conf volumes: - ./nats-2.conf:/etc/nats/nats.conf ports: - "4223:4222" ``` -------------------------------- ### Basic Result Storage Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md This example demonstrates basic result storage using NATSObjectStoreResultBackend with PullBasedJetStreamBroker. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", ) broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend(result_backend) @broker.task async def add(a: int, b: int) -> int: return a + b async def main(): await broker.startup() task = await add.kiq(5, 3) result = await task.wait_result() print(f"5 + 3 = {result.return_value}") await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### Integrating with Result Backends Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Example of how to configure the NATS broker with a result backend. ```python broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend(result_backend) ``` -------------------------------- ### Task Labels and Metadata Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Example demonstrating how to use task labels with the NATS broker. ```python @broker.task async def labeled_task(): pass task = await labeled_task.kiq() # Labels available in BrokerMessage.labels ``` -------------------------------- ### Broker with Result Backend Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/PullBasedJetStreamBroker.md Example of configuring the broker with NATSObjectStoreResultBackend for task result storage. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", bucket_name="results", ) broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", subject="compute_tasks", durable="compute_consumer", ).with_result_backend(result_backend) @broker.task async def compute() -> str: return "result" async def main(): await broker.startup() task = await compute.kiq() result = await task.wait_result() print(result.return_value) # "result" await broker.shutdown() asyncio.run(main()) ``` -------------------------------- ### With Scheduler Integration Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSKeyValueScheduleSource.md Example showing how to integrate NATSKeyValueScheduleSource with taskiq's scheduler for recurring tasks. ```python import asyncio from taskiq import InMemoryBroker from taskiq.scheduler import SimpleScheduler from taskiq_nats import NATSKeyValueScheduleSource, PullBasedJetStreamBroker # Create broker and schedule source broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ) schedule_source = NATSKeyValueScheduleSource( servers="nats://localhost:4222", ) # Create scheduler scheduler = SimpleScheduler( broker=broker, schedule_source=schedule_source, ) @broker.task async def hourly_cleanup(): print("Running cleanup...") async def main(): await broker.startup() await schedule_source.startup() # Add recurring schedule from taskiq import ScheduledTask schedule = ScheduledTask( schedule_id="cleanup", task_name="hourly_cleanup", labels={}, cron="0 * * * *", # Every hour ) await schedule_source.add_schedule(schedule) # Start scheduler scheduler_task = asyncio.create_task(scheduler.run()) await asyncio.sleep(3600) # Run for 1 hour scheduler_task.cancel() await broker.shutdown() await schedule_source.shutdown() asyncio.run(main()) ``` -------------------------------- ### Health Check and Liveness Probe Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Monitor broker connectivity using health checks and liveness probes. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend broker = PullBasedJetStreamBroker( servers=["nats://localhost:4222"], name="health_check_worker", ) async def is_broker_healthy() -> bool: """Check if the broker is connected.""" try: if not broker.client.is_connected: return False # Try to publish a test message from taskiq import BrokerMessage import uuid test_msg = BrokerMessage( task_id=uuid.uuid4().hex, task_name="health_check", message=b"test", labels={}, ) await broker.kick(test_msg) return True except Exception as e: print(f"Health check failed: {e}") return False async def liveness_check(): """Continuous liveness check.""" await broker.startup() while True: is_healthy = await is_broker_healthy() status = "HEALTHY" if is_healthy else "UNHEALTHY" print(f"[{asyncio.get_event_loop().time()}] Broker: {status}") if not is_healthy: print("Broker connection lost!") break await asyncio.sleep(5) await broker.shutdown() if __name__ == "__main__": asyncio.run(liveness_check()) ``` -------------------------------- ### Testing Tasks Locally Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Unit test tasks without NATS running by mocking the broker. ```python import asyncio from unittest.mock import AsyncMock, patch from taskiq_nats import PullBasedJetStreamBroker broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ) @broker.task async def add_numbers(a: int, b: int) -> int: """Add two numbers.""" return a + b async def test_add_numbers(): """Test the add_numbers task.""" # Call the underlying function directly result = await add_numbers.run(5, 3) assert result.return_value == 8 assert not result.is_error async def test_add_numbers_with_mock(): """Test with mocked broker.""" # Mock the broker for testing with patch.object(broker, 'kick', new_callable=AsyncMock): # The task decorator adds a .kiq() method # For testing, just call the function directly result = await add_numbers.run(10, 20) assert result.return_value == 30 if __name__ == "__main__": asyncio.run(test_add_numbers()) asyncio.run(test_add_numbers_with_mock()) print("Tests passed!") ``` -------------------------------- ### Standalone Result Backend Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/api-reference/NATSObjectStoreResultBackend.md This example shows how to use NATSObjectStoreResultBackend as a standalone result backend without a broker. ```python import asyncio from taskiq import TaskiqResult from taskiq_nats import NATSObjectStoreResultBackend backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", bucket_name="my_results", ) async def main(): await backend.startup() # Store a result result = TaskiqResult(return_value="success", is_error=False) await backend.set_result("task_abc", result) # Check if result exists if await backend.is_result_ready("task_abc"): retrieved = await backend.get_result("task_abc") print(retrieved.return_value) # "success" await backend.shutdown() asyncio.run(main()) ``` -------------------------------- ### NATS broker based on JetStream (Push-based) Source: https://github.com/taskiq-python/taskiq-nats/blob/master/README.md Example of setting up a push-based JetStream broker. ```python import asyncio from taskiq_nats import ( PushBasedJetStreamBroker, PullBasedJetStreamBroker ) broker = PushBasedJetStreamBroker( servers=[ "nats://nats1:4222", "nats://nats2:4222", ], queue="awesome_queue_name", ) # Or you can use pull based variant broker = PullBasedJetStreamBroker( servers=[ "nats://nats1:4222", "nats://nats2:4222", ], durable="awesome_durable_consumer_name", ) @broker.task async def my_lovely_task(): print("I love taskiq") async def main(): await broker.startup() await my_lovely_task.kiq() await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Docker Compose Example for NATS JetStream Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md A Docker Compose configuration to set up a NATS server with JetStream enabled. ```yaml version: '3' services: nats: image: nats:latest ports: - "4222:4222" - "8222:8222" # Monitoring dashboard command: "-js" # Enable JetStream ``` -------------------------------- ### Error Handling and Retries Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Demonstrates how to handle task failures and implement retry logic with exponential backoff, as well as safely fetching results. ```python import asyncio from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend from taskiq import ResultGetError from nats.errors import Error as NatsError broker = PullBasedJetStreamBroker( servers="nats://localhost:4222", ).with_result_backend( NATSObjectStoreResultBackend(servers="nats://localhost:4222") ) @broker.task async def flaky_task(attempt: int = 1) -> str: """A task that might fail.""" print(f"Attempt {attempt}") if attempt < 3: raise ValueError(f"Simulated failure on attempt {attempt}") return f"Success on attempt {attempt}" async def run_with_retries(): """Run a task with manual retry logic.""" await broker.startup() max_retries = 3 for attempt in range(1, max_retries + 1): try: task = await flaky_task.kiq(attempt) print(f"Task {task.task_id} sent") # Wait for result result = await task.wait_result(timeout=10) if result.is_error: print(f"Task failed: {result.error}") if attempt < max_retries: print(f"Retrying... (attempt {attempt + 1})") await asyncio.sleep(2 ** attempt) # Exponential backoff continue else: print(f"Task succeeded: {result.return_value}") break except ResultGetError: print("Result not available yet") await asyncio.sleep(1) except NatsError as e: print(f"NATS error: {e}") break await broker.shutdown() async def fetch_result_safely(task_id: str): """Safely fetch a result with error handling.""" result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", ) await result_backend.startup() try: # Check if result exists if not await result_backend.is_result_ready(task_id): print(f"Result for {task_id} is not ready yet") return None # Get the result result = await result_backend.get_result(task_id, with_logs=True) if result.is_error: print(f"Task error: {result.error}") print(f"Error type: {result.error_type}") if result.log: print(f"Logs:\n{result.log}") return None return result.return_value except ResultGetError: print(f"Result for {task_id} was not found") return None except NatsError as e: print(f"Failed to retrieve result: {e}") return None finally: await result_backend.shutdown() if __name__ == "__main__": # asyncio.run(run_with_retries()) pass ``` -------------------------------- ### Reliable Task Processing with JetStream Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/EXAMPLES.md Production-grade task queue with persistence and result storage. ```python import asyncio from nats.js.api import StreamConfig, ConsumerConfig, RetentionPolicy from taskiq_nats import PullBasedJetStreamBroker from taskiq_nats import NATSObjectStoreResultBackend # Configure result storage result_backend = NATSObjectStoreResultBackend( servers="nats://localhost:4222", bucket_name="task_results", keep_results=True, ) # Configure stream with retention stream_config = StreamConfig( retention=RetentionPolicy.LIMITS, max_age=3600 * 24 * 7, # Keep messages for 7 days storage=2, # File-based storage ) # Configure consumer with delivery guarantees consumer_config = ConsumerConfig( max_deliver=5, # Retry up to 5 times backoff=[1000, 2000, 4000], # Exponential backoff (ms) ) # Create broker with all components broker = PullBasedJetStreamBroker( servers=[ "nats://nats1:4222", "nats://nats2:4222", "nats://nats3:4222", ], subject="critical_tasks", stream_name="critical_stream", durable="critical_workers", stream_config=stream_config, consumer_config=consumer_config, pull_consume_batch=10, pull_consume_timeout=5.0, ).with_result_backend(result_backend) @broker.task async def process_payment(payment_id: int, amount: float) -> dict: """Process a payment and return result.""" print(f"Processing payment {payment_id} for ${amount}") await asyncio.sleep(2) # Simulate processing return { "payment_id": payment_id, "status": "completed", "amount": amount, } async def producer(): """Send payment processing tasks.""" await broker.startup() for i in range(5): # Send task and get reference task = await process_payment.kiq(i, 99.99) print(f"Task {task.task_id} sent") await broker.shutdown() async def fetch_results(): """Retrieve results after processing.""" await broker.startup() # In production, these would come from a database or task state task_ids = ["task-1", "task-2", "task-3"] for task_id in task_ids: try: result = await result_backend.get_result(task_id) if result.is_error: print(f"Task {task_id} failed: {result.error}") else: print(f"Task {task_id} result: {result.return_value}") except Exception as e: print(f"Could not retrieve task {task_id}: {e}") await broker.shutdown() if __name__ == "__main__": # asyncio.run(producer()) asyncio.run(fetch_results()) ``` -------------------------------- ### Connection Pooling for Distributed Systems Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md Example demonstrating how to configure the NATS broker for connection pooling in distributed environments. ```python broker = PullBasedJetStreamBroker( servers=[ "nats://nats1:4222", "nats://nats2:4222", "nats://nats3:4222", ], name="worker-1", # Identify this worker max_reconnect_attempts=20, reconnect_time_wait=1.0, ) ``` -------------------------------- ### Queue-Based Task Distribution Source: https://github.com/taskiq-python/taskiq-nats/blob/master/_autodocs/INDEX.md This example demonstrates how to set up a NatsBroker for queue-based task distribution, ensuring only one worker processes each task. ```python from taskiq_nats import NatsBroker broker = NatsBroker( servers="nats://localhost:4222", queue="workers", # Only one worker processes each task ) @broker.task async def process_order(order_id: int): print(f"Processing order {order_id}") ```