### Basic GroupResult Usage Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md Demonstrates how to create a group of tasks, apply them asynchronously, and retrieve their results using `get()`. ```python from celery import Celery, group app = Celery() @app.task def add(x, y): return x + y job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8)) result = job.apply_async() # result is a GroupResult (subclass of ResultSet) results = result.get() # [4, 8, 16] ``` -------------------------------- ### Complete Celery Configuration Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/configuration.md A comprehensive example demonstrating how to configure Celery with broker, result backend, task settings, worker options, queues, routes, and beat schedules. ```python from celery import Celery from celery.schedules import crontab from kombu import Exchange, Queue app = Celery('myapp') # Broker app.conf.broker_url = 'amqp://guest:guest@localhost:5672//' app.conf.broker_connection_retry_on_startup = True # Result backend app.conf.result_backend = 'redis://localhost:6379/0' app.conf.result_expires = 3600 # Tasks app.conf.task_serializer = 'json' app.conf.accept_content = ['json'] app.conf.timezone = 'UTC' app.conf.enable_utc = True # Task execution app.conf.task_time_limit = 30 * 60 # 30 minutes app.conf.task_soft_time_limit = 25 * 60 # 25 minutes app.conf.task_acks_late = True app.conf.task_track_started = True # Worker settings app.conf.worker_prefetch_multiplier = 4 app.conf.worker_max_tasks_per_child = 1000 app.conf.worker_concurrency = 8 # Queue configuration app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('priority', Exchange('priority'), routing_key='priority'), Queue('background', Exchange('background'), routing_key='background'), ) app.conf.task_routes = { 'myapp.tasks.priority': {'queue': 'priority'}, 'myapp.tasks.background': {'queue': 'background'}, } # Beat schedule app.conf.beat_schedule = { 'check-status-every-minute': { 'task': 'myapp.tasks.check_status', 'schedule': 60.0, }, 'daily-cleanup-at-3am': { 'task': 'myapp.tasks.cleanup', 'schedule': crontab(hour=3, minute=0), }, } ``` -------------------------------- ### Configuration Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/beat.md Example of how to configure beat_schedule in Celery application settings. ```APIDOC ## Configuration Example ### Using beat_schedule Configuration ```python from celery import Celery from celery.schedules import crontab app = Celery('myapp') app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': 30.0, 'args': (16, 16), }, 'multiply-at-midnight': { 'task': 'myapp.tasks.multiply', 'schedule': crontab(hour=0, minute=0), 'args': (5, 5), }, 'cleanup-every-hour': { 'task': 'myapp.tasks.cleanup', 'schedule': 3600.0, 'options': {'expires': 3600}, }, 'weekly-report': { 'task': 'myapp.tasks.generate_report', 'schedule': crontab(hour=0, minute=0, day_of_week=1), 'args': (), 'kwargs': {'format': 'pdf'}, }, } ``` ``` -------------------------------- ### Queue Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Demonstrates creating a Queue, binding it to an Exchange, and declaring it on a channel. ```python from kombu import Queue, Exchange, Connection default_exchange = Exchange('tasks', type='direct', durable=True) # Create queue queue = Queue( 'celery', exchange=default_exchange, routing_key='celery', durable=True, ) with Connection('amqp://localhost') as conn: with conn.channel() as channel: queue(channel).declare() queue(channel).bind_to(default_exchange, routing_key='celery') ``` -------------------------------- ### Celery App Setup and Task Definitions Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/task.md Basic setup for a Celery application and definitions for `add` and `failing_task` including retry logic. ```python from celery import Celery app = Celery('myapp') @app.task def add(x, y): return x + y @app.task(bind=True, max_retries=3) def failing_task(self, x, y): try: if x == 0: raise ValueError("x cannot be 0") return x / y except ZeroDivisionError as exc: # Retry after 60 seconds raise self.retry(exc=exc, countdown=60) ``` -------------------------------- ### Celery Async Task Execution and Result Handling Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md This example demonstrates setting up a Celery app, defining a task, executing it asynchronously with `delay`, and handling its result using `ready`, `state`, and `get`. ```python from celery import Celery app = Celery() @app.task def add(x, y): return x + y # Async execution and result handling result = add.delay(2, 2) # Check status print(result.ready()) # Is it done? print(result.state) # 'PENDING', 'STARTED', 'SUCCESS', etc. # Wait for result value = result.get(timeout=10) print(value) # 4 # With propagate=False, exceptions are not raised try: value = result.get(propagate=True) # Raises exception from task except Exception as e: print(f"Task failed: {e}") # Cancel a task result = add.delay(2, 2) result.revoke() # Check if successful if result.successful(): print(f"Result: {result.result}") elif result.failed(): print(f"Failed: {result.info}") ``` -------------------------------- ### Complete Celery Beat Example Configuration Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/beat.md This example demonstrates a comprehensive configuration for Celery Beat, including setting up a beat schedule with various tasks, schedules (fixed intervals and crontab), and arguments. It also defines the associated Celery tasks. ```python from celery import Celery from celery.schedules import crontab app = Celery('myapp', broker='amqp://localhost') # Configure beat schedule app.conf.beat_schedule = { 'check-status-every-minute': { 'task': 'myapp.tasks.check_status', 'schedule': 60.0, 'args': (), }, 'generate-report-daily': { 'task': 'myapp.tasks.generate_daily_report', 'schedule': crontab(hour=0, minute=0), 'args': (), }, 'sync-data-every-hour': { 'task': 'myapp.tasks.sync_data', 'schedule': 3600.0, 'options': {'expires': 3600}, }, 'backup-every-sunday': { 'task': 'myapp.tasks.backup_database', 'schedule': crontab(day_of_week=0, hour=2, minute=0), 'args': (), }, } # Define tasks @app.task def check_status(): """Check system status every minute.""" print("Checking status...") @app.task def generate_daily_report(): """Generate report at midnight.""" print("Generating report...") @app.task def sync_data(): """Sync data every hour.""" print("Syncing data...") @app.task def backup_database(): """Backup database every Sunday at 2am.""" print("Backing up database...") if __name__ == '__main__': app.start() ``` -------------------------------- ### Producer Publish Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Shows how to create a Producer and publish a message with specific routing key and serializer. ```python producer = Producer(conn, exchange='tasks', routing_key='celery.task') producer.publish( {'method': 'add', 'args': [2, 2]}, routing_key='celery.task', serializer='json', ) ``` -------------------------------- ### Run Flower Monitoring UI Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Command to start Flower, a real-time web-based monitoring tool for Celery. Ensure Flower is installed via pip. ```bash pip install flower celery -A myapp flower ``` -------------------------------- ### Install Pre-commit Hooks Source: https://github.com/sbdchd/celery-types/blob/main/README.md Installs the pre-commit hooks for the project using uv. These hooks automate code quality checks before commits. ```shell # install pre-commit hooks uv run prek install ``` -------------------------------- ### Install uv (Python Package Manager) Source: https://github.com/sbdchd/celery-types/blob/main/README.md Installs the uv package manager using a curl command. uv is a fast Python package installer and resolver. ```shell # install uv (https://docs.astral.sh/uv/) curl -LsSf https://astral.sh/uv/install.sh | sh ``` -------------------------------- ### Producer Context Manager Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Demonstrates using Producer as a context manager for publishing messages within a connection. ```python from kombu import Connection, Exchange, Producer conn = Connection('amqp://localhost') with conn: with Producer(conn, exchange='tasks') as producer: producer.publish( {'x': 2, 'y': 2}, routing_key='celery.task', ) ``` -------------------------------- ### Eager Task Execution and Result Retrieval Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md This example shows how to enable eager task execution by setting `app.conf.task_always_eager = True`. When enabled, tasks return an `EagerResult` immediately, and `get()` retrieves the result synchronously. ```python from celery import Celery app = Celery() app.conf.task_always_eager = True # Enable eager execution @app.task def add(x, y): return x + y # Returns EagerResult immediately result = add.delay(2, 2) print(result.get()) # Returns synchronously: 4 ``` -------------------------------- ### Celery Task Workflow Examples Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/canvas.md Demonstrates the creation and execution of sequential chains, parallel groups, and chords with callbacks using Celery tasks. ```python from celery import Celery, chain, group, chord app = Celery() @app.task def add(x, y): return x + y @app.task def xsum(values): return sum(values) @app.task def multiply(x, y): return x * y # Chain: sequential execution workflow = chain( add.s(2, 2), # 4 multiply.s(4), # 4 * 4 = 16 add.s(10) # 16 + 10 = 26 ) result = workflow.apply_async() # Group: parallel execution job = group( add.s(2, 2), multiply.s(4, 4), add.s(8, 8) ) result = job.apply_async() # Chord: parallel with callback callback = xsum.s() workflow = chord( group(add.s(2, 2), add.s(4, 4), add.s(8, 8)) )(callback) result = workflow.apply_async() ``` -------------------------------- ### Install celery-types Source: https://github.com/sbdchd/celery-types/blob/main/README.md Install the celery-types package using pip. This is the primary step to integrate type stubs into your project. ```shell pip install celery-types ``` -------------------------------- ### Comprehensive Celery Monitoring and Control Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md An example demonstrating how to monitor worker health, view task statistics, check active tasks, and list registered tasks using Celery's inspect API. Includes a sample long-running task. ```python from celery import Celery import time app = Celery('myapp', broker='amqp://localhost') @app.task def long_task(seconds): """Long-running task.""" for i in range(seconds): time.sleep(1) return f"Slept for {seconds} seconds" def monitor_and_control(): """Monitor workers and control tasks.""" inspector = app.control.inspect() # Check worker health ping = inspector.ping() print(f"Active workers: {list(ping.keys())}") # Get worker stats stats = inspector.stats() for worker_name, worker_stats in stats.items(): print(f"{worker_name}: {worker_stats['total']} tasks processed") # Get active tasks active = inspector.active() for worker_name, tasks in active.items(): print(f"{worker_name}: {len(tasks)} active tasks") for task in tasks: print(f" - {task['name']} ({task['id']})") # Get registered tasks registered = inspector.registered() for worker_name, task_names in registered.items(): print(f"{worker_name} registered tasks: {task_names}") if __name__ == '__main__': monitor_and_control() ``` -------------------------------- ### Celery App Configuration with Supporting Modules Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Initializes a Celery application and configures its broker and result backend URLs. This example also sets up a process pool for CPU-bound tasks and defines tasks using promises and direct AMQP communication. ```python from celery import Celery from billiard import Pool from vine import promise from kombu import Connection # Create Celery app app = Celery('myapp') # Configure with supporting modules app.conf.broker_url = 'amqp://localhost' app.conf.result_backend = 'redis://localhost' # Create process pool for CPU-bound tasks cpu_pool = Pool(processes=4) # Define CPU-bound task using process pool @app.task def cpu_intensive(data): """CPU-bound task executed in process pool.""" result = cpu_pool.apply_async( expensive_computation, args=(data,) ) return result.get() # Define promise-based async callback @app.task(bind=True) def async_callback(self, result): """Task using promise for async handling.""" my_promise = promise() def handle_result(value): print(f"Promise resolved: {value}") my_promise.then(handle_result) my_promise.set(result) # AMQP-level communication @app.task def direct_amqp(): """Direct AMQP operations.""" from amqp import Connection conn = Connection(host='localhost') channel = conn.channel() channel.exchange_declare(exchange='tasks', type='direct') channel.basic_publish( body=b'direct message', exchange='tasks', routing_key='celery' ) channel.close() conn.close() # Django result backend (in Django settings) # CELERY_RESULT_BACKEND = 'db+postgresql://user:password@localhost/dbname' ``` -------------------------------- ### ImproperlyConfigured Exception Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/errors.md Raised when Celery is not properly configured, typically due to missing broker configuration. This snippet demonstrates a scenario where starting the app without configuration triggers the exception. ```python from celery import Celery # Missing broker configuration app = Celery('myapp') app.start() # Raises ImproperlyConfigured ``` -------------------------------- ### Configure Default Queues and Exchanges Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/configuration.md Define the default queues, exchanges, and routing keys for task distribution. This example sets up two queues: 'default' and 'high_priority'. ```python from kombu import Queue, Exchange app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('priority'), routing_key='high'), ) ``` -------------------------------- ### Configure Task Routing Rules Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/configuration.md Map specific tasks or task patterns to queues. This example shows routing based on task name and also demonstrates using a callable predicate for dynamic routing. ```python app.conf.task_routes = { 'myapp.tasks.high_priority': {'queue': 'priority'}, 'myapp.tasks.low_priority': {'queue': 'background'}, } # Or with callable predicates def route_task(name, *args, **kwargs): if name.startswith('myapp.tasks.priority'): return {'queue': 'priority'} return {'queue': 'default'} app.conf.task_routes = [route_task] ``` -------------------------------- ### Run Celery Beat Scheduler Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Command to start the Celery Beat scheduler, responsible for triggering periodic tasks. Use `-l info` for verbose logging. ```bash celery -A myapp beat -l info ``` -------------------------------- ### Kombu Message Consumption Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Demonstrates how to consume messages from a Kombu queue using AMQP. Ensure you have a running RabbitMQ instance and the necessary exchange and queue configured. ```python from kombu import Connection, Queue, Exchange, Consumer conn = Connection('amqp://localhost') exchange = Exchange('tasks', type='direct', durable=True) queue = Queue('celery', exchange=exchange, routing_key='celery') with conn: with conn.channel() as channel: consumer = Consumer( channel, queues=[queue], no_ack=False, callbacks=[callback_function], ) for body, message in consumer.consume(): print(f"Received: {body}") message.ack() ``` -------------------------------- ### Configure Django Celery Results App Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Example of adding 'django_celery_results' to INSTALLED_APPS in Django's settings.py. ```python # settings.py INSTALLED_APPS = [ # ... 'django_celery_results', ] ``` -------------------------------- ### Monitor Running Tasks Continuously Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md Periodically fetch and display active tasks from all workers. This example runs indefinitely, printing task information every 5 seconds. ```python from celery import Celery import time app = Celery('myapp') inspector = app.control.inspect() # Get active tasks while True: active = inspector.active() for worker_name, tasks in active.items(): print(f"Worker {worker_name}:") for task in tasks: print(f" - {task['name']} (ID: {task['id']})") time.sleep(5) ``` -------------------------------- ### Schedule Task at Sunrise using Ephem Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md This snippet demonstrates how to calculate the next sunrise using the ephem library and schedule a Celery task to run at that time. Ensure ephem is installed. ```python import ephem from celery.schedules import schedule # Calculate next sunrise observer = ephem.Observer() observer.date = '2024-01-15' sunrise = ephem.next_rising(ephem.Sun(), observer) # Schedule task at sunrise app.add_periodic_task( schedule=sunrise, sig=morning_task.s(), name='sunrise-task' ) ``` -------------------------------- ### Running the Celery Application Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Start the Celery worker process. This should typically be placed within an `if __name__ == '__main__':` block. ```python if __name__ == '__main__': app.start() ``` -------------------------------- ### Usage of AsyncResult for Task Results Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/types.md Demonstrates how to use `AsyncResult` to get the result of an asynchronous task, including type hinting for the return value. ```python @app.task def multiply(x: int, y: int) -> int: return x * y result: AsyncResult[int] = multiply.delay(3, 4) product: int = result.get() ``` -------------------------------- ### GroupResult Persistence and Restoration Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md Shows how to save a `GroupResult` to a backend, restore it later using its ID, and then retrieve its results. The group result is also cleaned up by deleting it from the backend. ```python from celery import Celery, group app = Celery() @app.task def add(x, y): return x + y # Create a group job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8)) result = job.apply_async() # Save the group result for later retrieval result.save() # Later, restore the group result restored = GroupResult.restore(result.id, app=app) print(restored.get()) # [4, 8, 16] # Clean up result.delete() ``` -------------------------------- ### Configure Periodic Tasks with Beat Schedule Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Sets up scheduled tasks to run at specific intervals or times using Celery Beat. This example configures a daily cleanup task and a task that runs every 30 seconds. ```python from celery.schedules import crontab app.conf.beat_schedule = { 'daily-cleanup': { 'task': 'myapp.tasks.cleanup', 'schedule': crontab(hour=3, minute=0), }, 'every-30-seconds': { 'task': 'myapp.tasks.check_status', 'schedule': 30.0, }, } ``` -------------------------------- ### Run Celery Worker Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Command to start a Celery worker process. The `-A` flag specifies the application module, and `-l info` sets the logging level. ```bash celery -A myapp worker -l info ``` -------------------------------- ### Control Celery Workers and Tasks Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Provides examples for managing Celery tasks and workers. This includes revoking a task by its ID, inspecting active tasks, and checking worker health. ```python from celery import Celery app = Celery('myapp') # Revoke task app.control.revoke('task-id') # Get active tasks inspector = app.control.inspect() active = inspector.active() # Check worker health ping = inspector.ping() ``` -------------------------------- ### Declare Exchange Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Declares an exchange on the message broker. This example shows how to declare a default direct exchange. Ensure Exchange, Queue, and Connection are imported. ```python from kombu import Exchange, Queue, Connection # Direct exchange (default) default_exchange = Exchange('tasks', type='direct', durable=True) # Topic exchange for pattern-based routing topic_exchange = Exchange('events', type='topic', durable=True) # Fanout exchange for broadcasts broadcast_exchange = Exchange('notifications', type='fanout', durable=True) with Connection('amqp://localhost') as conn: with conn.channel() as channel: default_exchange(channel).declare() ``` -------------------------------- ### Establish AMQP Connection and Declare Resources Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Shows how to create an AMQP connection, obtain a channel, and declare exchanges and queues. ```python from amqp import Connection # Create connection conn = Connection(host='localhost', userid='guest', password='guest') # Get channel channel = conn.channel() # Declare exchange channel.exchange_declare(exchange='tasks', type='direct') # Declare queue channel.queue_declare(queue='celery', durable=True) # Bind queue to exchange channel.queue_bind(exchange='tasks', queue='celery', routing_key='celery') ``` -------------------------------- ### Celery App Initialization and Configuration Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/configuration.md Demonstrates multiple ways to initialize a Celery application and update its configuration settings. ```python from celery import Celery app = Celery('myapp') # 1. Direct keyword arguments app = Celery('myapp', broker='amqp://localhost', backend='redis://localhost') # 2. Settings dictionary app.conf.update( broker_url='amqp://localhost', result_backend='redis://localhost', task_serializer='json', ) # 3. Python module app.config_from_object('myapp.settings') # 4. Environment variable app.config_from_envvar('CELERY_CONFIG_MODULE') # 5. Direct attribute assignment app.conf.broker_url = 'amqp://localhost' ``` -------------------------------- ### Basic Celery App Initialization and Task Execution Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Demonstrates how to initialize a Celery application, define a simple task, and execute it asynchronously. It also shows how to retrieve the task's result and chain multiple tasks together. ```python from celery import Celery, chain app = Celery('myapp', broker='amqp://localhost') @app.task def add(x, y): return x + y # Execute asynchronously result = add.delay(2, 2) print(result.get()) # Get result # Chain tasks workflow = chain(add.s(2, 2), add.s(4), add.s(8)) result = workflow.apply_async() ``` -------------------------------- ### Create and Use Task Signatures Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/canvas.md Demonstrates various ways to create task signatures, including shorthand, explicit naming, and method calls. Also shows partial signatures and immutable signatures. ```python from celery import Celery, signature app = Celery() @app.task def add(x, y): return x + y # Creating signatures sig1 = add.s(2, 2) # Shorthand sig2 = signature('tasks.add', args=(2, 2)) # Explicit sig3 = add.signature(args=(2, 2)) # Method call # Partial signature (can provide more args later) add_partial = add.s(2) result = add_partial.delay(y=2) # Immutable signature (cannot override arguments) sig_immutable = add.si(2, 2) ``` -------------------------------- ### Wait for Task Result (Alias for get) Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md The `wait` method is an alias for `get` and is used to block until a task completes. A timeout can be specified. ```python value = result.wait(timeout=30) ``` -------------------------------- ### Get Task Result Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md Use the `get` method to block until a task completes and retrieve its result. You can specify a timeout and whether to propagate exceptions. ```python result = add.delay(2, 2) value = result.get(timeout=10) # Wait max 10 seconds ``` -------------------------------- ### Build and Publish with uv Source: https://github.com/sbdchd/celery-types/blob/main/README.md Builds the project and publishes it using the uv package manager. This is typically used for releasing new versions. ```shell uv build && uv publish ``` -------------------------------- ### SecurityError Exception Example Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/errors.md Raised for security-related issues, such as using an invalid serializer. This example shows how attempting to send a task with a 'pickle' serializer, when only 'json' is allowed, triggers a SecurityError. ```python app.setup_security( allowed_serializers=['json'], # Only JSON allowed ) result = app.send_task('task', serializer='pickle') # Raises SecurityError ``` -------------------------------- ### AMQP Channel for Basic Publish and Consume Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates publishing a message to an exchange and setting up a consumer to receive and acknowledge messages. ```python channel.basic_publish( body=b'message body', exchange='tasks', routing_key='celery', ) def callback(message): print(f"Received: {message.body}") message.ack() channel.basic_consume(queue='celery', callback=callback) while True: channel.wait() ``` -------------------------------- ### Inspect Scheduled Tasks Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md Retrieve a list of tasks that are scheduled to run in the future but have not yet started. ```python inspector = app.control.inspect() scheduled = inspector.scheduled() # Returns: {'celery@hostname': [{'eta': '2024-01-15T10:30:00', ...}]} ``` -------------------------------- ### Create a Task Chain Workflow Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Demonstrates how to define a sequence of tasks where each task's output is passed as input to the next. This is useful for multi-step processes. ```python from celery import chain workflow = chain( add.s(2, 2), add.s(4), add.s(8) ) result = workflow.apply_async() ``` -------------------------------- ### Celery App Initialization Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Instantiate the main Celery application. Specify the module name, broker URL, and modules to include for task discovery. ```python from celery import Celery app = Celery('myapp', broker='amqp://guest:guest@localhost:5672//') ``` -------------------------------- ### AMQP Connection and Channel Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates establishing an AMQP connection and creating a channel for interacting with message brokers, including exchange and queue declarations. ```APIDOC ## AMQP Connection and Channel ### Description Provides low-level interfaces for establishing AMQP connections and managing channels to interact with message brokers. ### Module: amqp ```python from amqp import Connection, Channel ``` ### Class: Connection Low-level AMQP connection. ```python from amqp import Connection # Create connection conn = Connection(host='localhost', userid='guest', password='guest') # Get channel channel = conn.channel() # Declare exchange channel.exchange_declare(exchange='tasks', type='direct') # Declare queue channel.queue_declare(queue='celery', durable=True) # Bind queue to exchange channel.queue_bind(exchange='tasks', queue='celery', routing_key='celery') ``` ### Class: Channel AMQP channel for communication. ```python channel.basic_publish( body=b'message body', exchange='tasks', routing_key='celery', ) def callback(message): print(f"Received: {message.body}") message.ack() channel.basic_consume(queue='celery', callback=callback) while True: channel.wait() ``` ``` -------------------------------- ### Inspect Reserved Tasks Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md Get a list of tasks that have been fetched by workers and are reserved for execution, but not yet running. ```python inspector = app.control.inspect() reserved = inspector.reserved() # Returns: {'celery@hostname': [{'id': 'task-id', 'name': 'myapp.tasks.add', ...}]} ``` -------------------------------- ### Create and Manage Billiard Processes with Queues Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Illustrates creating a separate process and using a queue for inter-process communication. ```python from billiard import Process, Queue # Create inter-process queue queue = Queue() # Create process process = Process(target=worker_func, args=(queue,)) process.start() # Wait for process process.join() ``` -------------------------------- ### Revoke Long-Running Tasks Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md Identify and revoke tasks that have been running for longer than a specified duration (10 minutes in this example). ```python from celery import Celery app = Celery('myapp') inspector = app.control.inspect() # Find long-running tasks active = inspector.active() for worker_name, tasks in active.items(): for task in tasks: time_start = task['time_start'] elapsed = time.time() - time_start # Revoke if running longer than 10 minutes if elapsed > 600: app.control.revoke(task['id'], terminate=True) print(f"Revoked long-running task {task['id']}") ``` -------------------------------- ### Create and Manage a Vine Promise Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates creating a promise object, setting its value, retrieving it, and attaching callbacks for success and error handling. ```python from vine import promise # Create promise my_promise = promise() # Set result my_promise.set(value) # Get result result = my_promise.get() # Handle callbacks def on_success(value): print(f"Success: {value}") def on_error(exc, traceback): print(f"Error: {exc}") my_promise.then(on_success, on_error) ``` -------------------------------- ### Get Next Scheduled Execution Time Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/beat.md The `next` method of a `ScheduleEntry` returns a new `ScheduleEntry` instance representing the next scheduled execution. ```python next_entry = entry.next() ``` -------------------------------- ### Import AMQP Protocol Modules Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Import the necessary classes for establishing AMQP connections and channels. ```python from amqp import Connection, Channel ``` -------------------------------- ### AsyncResult Class Definition Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/types.md Defines the `AsyncResult` class, a generic class representing the result of an asynchronous task, with methods like `get` and `wait`. ```python class AsyncResult(ResultBase, Generic[_R_co]): def get(self, timeout: float | None = None) -> _R_co: ... def wait(self, timeout: float | None = None) -> _R_co: ... ``` -------------------------------- ### Create and Manage a Billiard Pool Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates creating a process pool, submitting tasks asynchronously, retrieving results, and properly closing the pool. ```python from billiard import Pool # Create pool with 4 worker processes pool = Pool(processes=4) # Apply function asynchronously result = pool.apply_async(func, args=(arg1, arg2)) # Wait for result value = result.get(timeout=30) # Close pool pool.close() pool.join() ``` -------------------------------- ### Define and Execute a Simple Celery Task Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Defines a basic addition task and demonstrates how to execute it asynchronously and retrieve its result. Ensure Celery is configured with a broker. ```python from celery import Celery app = Celery('myapp', broker='amqp://localhost') # Simple task @app.task def add(x, y): return x + y # Execute result = add.delay(2, 2) print(result.get()) ``` -------------------------------- ### Run Celery Beat from Command Line Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/beat.md Execute the Beat scheduler using the `celery beat` command. Options include specifying the app, using a custom scheduler like `DatabaseScheduler`, and setting the log level. ```bash # Run beat scheduler celery -A myapp beat # With custom schedule database celery -A myapp beat --scheduler django_celery_beat.schedulers:DatabaseScheduler # With specific log level celery -A myapp beat -l debug ``` -------------------------------- ### Billiard Process and Communication Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Illustrates creating and managing individual processes using Billiard, along with inter-process communication using Queues. ```APIDOC ## Billiard Process and Communication ### Description Provides mechanisms for creating and managing individual processes and facilitating communication between them using queues. ### Process and Communication ```python from billiard import Process, Queue # Create inter-process queue queue = Queue() # Create process process = Process(target=worker_func, args=(queue,)) process.start() # Wait for process process.join() ``` ``` -------------------------------- ### Get Communication Channel Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Obtains a communication channel from an active connection. This channel is used for subsequent messaging operations. The channel is automatically managed when used as a context manager. ```python with conn.channel() as channel: # Use channel for operations pass ``` -------------------------------- ### Run Linting and Typechecking with uv Source: https://github.com/sbdchd/celery-types/blob/main/README.md Executes formatting, linting, and typechecking tools for the project using uv. This ensures code quality and adherence to type standards. ```shell # run formatting, linting, and typechecking s/lint ``` ```shell uv run ruff check --fix uv run ruff format uv run basedpyright typings tests uv run mypy tests ``` -------------------------------- ### Celery Class Initialization Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Initializes a new Celery application instance. This is the main entry point for configuring and executing tasks. ```APIDOC ## Celery Class ### Description Main application instance for configuring and executing tasks in Celery. ### Signature ```python class Celery(Generic[_T_Global]): def __init__( self, main: str | None = None, loader: Any | None = None, backend: str | type[Backend] | None = None, amqp: str | type[AMQP] | None = None, events: str | type[Events] | None = None, log: str | type[Logging] | None = None, control: str | type[Control] | None = None, set_as_current: bool = True, tasks: str | type[TaskRegistry] | None = None, broker: str | None = None, include: list[str] | tuple[str, ...] | None = None, changes: dict[str, Any] | None = None, config_source: str | object | None = None, fixups: list[str] | None = None, task_cls: str | type[_T_Global] | None = None, autofinalize: bool = True, namespace: str | None = None, strict_typing: bool = True, **kwargs: Any, ) -> None ``` ### Parameters | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | main | str \| None | No | None | Module name for this instance. Used to generate task names. | | loader | Any \| None | No | None | Loader to use for loading task modules. | | backend | str \| type[Backend] \| None | No | None | Backend class or string reference for result storage. | | amqp | str \| type[AMQP] \| None | No | None | AMQP implementation class or string reference. | | events | str \| type[Events] \| None | No | None | Events implementation class or string reference. | | log | str \| type[Logging] \| None | No | None | Logging implementation class or string reference. | | control | str \| type[Control] \| None | No | None | Control implementation class or string reference. | | set_as_current | bool | No | True | Set this instance as the current Celery application. | | tasks | str \| type[TaskRegistry] \| None | No | None | Task registry class or string reference. | | broker | str | No | None | Broker URL for message transport. | | include | list[str] \| tuple[str, ...] \| None | No | None | List of module names to import task definitions from. | | changes | dict[str, Any] \| None | No | None | Configuration changes to apply. | | config_source | str \| object \| None | No | None | Configuration source object or module path. | | fixups | list[str] \| None | No | None | List of fixup module names to apply. | | task_cls | str \| type[_T_Global] \| None | No | None | Default task class or string reference. | | autofinalize | bool | No | True | Automatically finalize the app after configuration. | | namespace | str | No | None | Configuration namespace for filtering settings. | | strict_typing | bool | No | True | Enable strict type checking for task parameters. | ``` -------------------------------- ### Run Celery Beat Programmatically Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/beat.md Start the Celery Beat scheduler programmatically within your Python application. This is useful for integrating Beat into existing applications or for custom startup routines. ```python from celery import Celery from celery.bin import beat app = Celery('myapp') if __name__ == '__main__': beat.run(app) ``` -------------------------------- ### Usage of Signature Type for Deferred Calls Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/types.md Shows how to use the `Signature` type hint for deferred task calls and how to apply them asynchronously. ```python # Type-annotated signatures sig: Signature[int] = add.s(2, 2) result: AsyncResult[int] = sig.apply_async() value: int = result.get() ``` -------------------------------- ### Check if Task is Ready Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/result.md The `ready` method returns `True` if the task has completed, whether successfully, failed, or revoked. ```python if result.ready(): print("Task completed") ``` -------------------------------- ### Create Task Signature Shorthands Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/task.md Use `s` for a mutable signature or `si` for an immutable signature as convenient shorthands for creating task signatures. ```python sig = add.s(2, 2) sig2 = add.si(2, 2) # Immutable signature ``` -------------------------------- ### Establish Connection to Broker Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Establishes a connection to the message broker using a Connection object. Ensure the Connection class is imported. ```python conn = Connection('amqp://guest:guest@localhost:5672//') conn.connect() ``` -------------------------------- ### Sync Dependencies with uv Source: https://github.com/sbdchd/celery-types/blob/main/README.md Synchronizes project dependencies using the uv package manager. This command ensures your local environment matches the project's requirements. ```shell uv sync ``` -------------------------------- ### Usage of ResultSet for Group Results Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/types.md Illustrates how to use `ResultSet` to apply and retrieve results from a group of tasks. ```python from celery import group job = group(add.s(2, 2), add.s(4, 4), add.s(8, 8)) result = job.apply_async() # Returns GroupResult values: list[int] = result.get() # [4, 8, 16] ``` -------------------------------- ### Billiard Pool apply_async with Callbacks Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates using `apply_async` with success and error callback functions for asynchronous task execution. ```python pool = Pool(processes=4) result = pool.apply_async( func=sum, args=([1, 2, 3, 4, 5],), callback=on_success, error_callback=on_error, ) value = result.get(timeout=10) ``` -------------------------------- ### Import Billiard Process Pool Modules Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Import the main components for process pool management from the billiard library. ```python from billiard import Pool, Process, Queue, Pipe ``` -------------------------------- ### Configure In-Memory Cache Result Backend Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Use this configuration for an in-memory cache result backend. ```python CELERY_RESULT_BACKEND = 'cache+db://default' ``` -------------------------------- ### Run All Pre-commit Checks Manually Source: https://github.com/sbdchd/celery-types/blob/main/README.md Manually runs all pre-commit checks on all files in the project using uv. This is useful for verifying code quality outside of the commit process. ```shell # run all checks manually uv run prek run --all-files ``` -------------------------------- ### Vine Promise Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Shows how to use the `vine.promise` object for managing deferred operations and handling their results or errors asynchronously. ```APIDOC ## Vine Promise ### Description Represents a deferred computation or operation, allowing for asynchronous handling of results and errors. ### Class: promise ```python from vine import promise # Create promise my_promise = promise() # Set result my_promise.set(value) # Get result result = my_promise.get() # Handle callbacks def on_success(value): print(f"Success: {value}") def on_error(exc, traceback): print(f"Error: {exc}") my_promise.then(on_success, on_error) ``` ### Methods | Method | Description | |--------|-------------| | `set()` | Set promise result | | `get()` | Get promise result (blocks) | | `then()` | Add callback for when promise resolves | | `on_error()` | Add error callback | | `cancel()` | Cancel the promise | ``` -------------------------------- ### Loading Configuration from Object Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Load Celery application settings from a Python object or module. This is useful for organizing configuration separately. ```python app.config_from_object('myapp.settings') ``` -------------------------------- ### config_from_object Method Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Loads Celery configuration settings from a Python object. ```APIDOC ## config_from_object Method ### Description Load configuration from a Python object. ### Usage ```python app.config_from_object('myapp.settings') ``` ``` -------------------------------- ### Xstarmap for Parallel Unpacking Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/canvas.md Similar to xmap, but unpacks arguments from iterables of tuples or lists, allowing for parallel execution of tasks with multiple arguments per call. ```python pairs = [(2, 2), (4, 4), (8, 8)] job = add.starmap(pairs) # Equivalent to: group(add.s(2, 2), add.s(4, 4), add.s(8, 8)) ``` -------------------------------- ### Handle Nonexistent Queue Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/errors.md The `QueueNotFound` exception is raised when attempting to route a task to a queue that does not exist. ```python from celery.exceptions import QueueNotFound # Assuming 'add' is a registered task # This will raise QueueNotFound if 'nonexistent_queue' does not exist # result = add.apply_async(queue='nonexistent_queue') ``` -------------------------------- ### Chain Tasks Sequentially Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/canvas.md Execute tasks in a sequence, where the output of one task becomes the input for the next. This is useful for multi-step processes. ```python from celery import chain # Sequential execution workflow = chain(add.s(2, 2), add.s(4), add.s(8)) result = workflow.apply_async() # Equivalent to: add(2, 2) -> add(result, 4) -> add(result, 8) ``` -------------------------------- ### Execute Tasks in Parallel using Group Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/README.md Shows how to run multiple tasks concurrently. The results are collected in a list after all tasks in the group have completed. ```python from celery import group job = group( add.s(2, 2), add.s(4, 4), add.s(8, 8) ) result = job.apply_async() results = result.get() # [4, 8, 16] ``` -------------------------------- ### Apply Async Task Execution Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/task.md Use `apply_async` for fine-grained control over task execution, including countdown, expiration, and retry policies. ```python result = add.apply_async( args=(2, 2), countdown=10, # Execute in 10 seconds expires=300, # Expires in 5 minutes retry=True, ) ``` -------------------------------- ### Billiard Pool Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/supporting-modules.md Demonstrates the creation and usage of a Billiard Pool for parallel task execution, including asynchronous application of functions and pool management. ```APIDOC ## Billiard Pool ### Description Manages a pool of worker processes for executing tasks in parallel. Allows for asynchronous task submission and retrieval of results. ### Class: Pool ```python from billiard import Pool # Create pool with 4 worker processes pool = Pool(processes=4) # Apply function asynchronously result = pool.apply_async(func, args=(arg1, arg2)) # Wait for result value = result.get(timeout=30) # Close pool pool.close() pool.join() ``` ### Key Methods #### apply_async Execute function asynchronously in pool. ```python pool = Pool(processes=4) result = pool.apply_async( func=sum, args=([1, 2, 3, 4, 5],), callback=on_success, error_callback=on_error, ) value = result.get(timeout=10) ``` #### map Apply function to iterable in parallel. ```python values = [1, 2, 3, 4, 5] results = pool.map(lambda x: x * 2, values) ``` #### map_async Apply function asynchronously. ```python result = pool.map_async(lambda x: x * 2, values) values = result.get() ``` ``` -------------------------------- ### Apply Async Signature Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/canvas.md Execute a task signature asynchronously, with the option to update parameters like countdown. ```python sig = add.s(2, 2) result = sig.apply_async(countdown=10) ``` -------------------------------- ### Managing Broker Connections Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Establish and manage connections to the message broker using a context manager. This ensures connections are properly opened and closed. ```python with app.connection() as conn: # Use connection pass ``` -------------------------------- ### connection Method Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/celery.md Creates a connection to the Celery message broker. ```APIDOC ## connection Method ### Description Create a connection to the broker. ### Usage ```python with app.connection() as conn: # Use connection pass ``` ``` -------------------------------- ### Inspect Worker Configuration Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/control.md Query workers to retrieve their current configuration settings. ```python inspector = app.control.inspect() config = inspector.conf() # Returns worker configuration settings ``` -------------------------------- ### Usage of Bound Task Context Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/types.md Demonstrates how to use the `Context` type hint for `self` in a bound task to access request details like ID and retries. ```python @app.task(bind=True) def bound_task(self: Context, x, y): print(self.request.id) # Task ID print(self.request.retries) # Retry count print(self.request.hostname) # Worker name ``` -------------------------------- ### Kombu AMQP Connection Source: https://github.com/sbdchd/celery-types/blob/main/_autodocs/api-reference/kombu.md Establishes a connection to a RabbitMQ broker using AMQP. Can be configured with a DSN string or individual connection parameters. ```python from kombu import Connection conn = Connection('amqp://guest:guest@localhost:5672//') # or conn = Connection( 'amqp', hostname='localhost', userid='guest', password='guest', virtual_host='/', ) ```