### Install and Run Chancy Worker (Python) Source: https://github.com/tktech/chancy/blob/main/docs/index.rst This snippet demonstrates how to install Chancy, define a simple job, initialize the Chancy client with a PostgreSQL connection, perform database migrations, declare a queue, push a job, and start a worker. It uses Python's asyncio for asynchronous operations. ```bash pip install chancy ``` ```python import asyncio from chancy import Chancy, Worker, Queue, job @job() def hello_world(*, name: str): print(f"Hello, {name}!") chancy = Chancy("postgresql://:@/") async def main(): async with chancy: # Run the database migrations await chancy.migrate() # Declare the default queue await chancy.declare(Queue("default")) # Push a job await chancy.push(hello_world.job.with_kwargs(name="World")) # Start the worker (ctrl+c to exit) async with Worker(chancy) as worker: await worker.wait_for_shutdown() if __name__ == "__main__": asyncio.run(main()) ``` ```bash python worker.py ``` -------------------------------- ### Install and Run Chancy Worker with CLI (Bash) Source: https://github.com/tktech/chancy/blob/main/docs/index.rst This snippet outlines the steps to install Chancy with CLI support, define a job in a Python file, and then use the Chancy CLI to perform migrations, declare queues, push jobs, and start a worker. This approach separates the application definition from its execution. ```bash pip install chancy[cli] ``` ```python from chancy import Chancy, job @job() def hello_world(*, name: str): print(f"Hello, {name}!") chancy = Chancy("postgresql://localhost/postgres") ``` ```bash chancy --app worker.chancy misc migrate ``` ```bash chancy --app worker.chancy queue declare default ``` ```bash chancy --app worker.chancy queue push worker.hello_world --kwargs '{"name": "world"}' ``` ```bash chancy --app worker.chancy worker start ``` -------------------------------- ### Install Chancy for Django CLI Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Installs the Chancy library with support for CLI tools and Django integration. This is the initial step for using Chancy within a Django project. ```bash pip install chancy[cli,django] ``` -------------------------------- ### Migrate Database and Start Chancy Worker via CLI Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Commands to migrate the Chancy database schema and start a worker process using the Chancy CLI. These commands ensure Django is fully set up before job processing. ```bash chancy --app my_application.worker.chancy_app misc migrate chancy --app my_application.worker.chancy_app worker start ``` -------------------------------- ### Django Worker Setup with Chancy Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Sets up a 'worker.py' file for Chancy in a Django project. It configures the Django environment and initializes the Chancy application with the default database settings. ```python import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_application.settings") import django django.setup() from django.conf import settings from chancy import Chancy chancy_app = Chancy(settings.DATABASES["default"]) ``` -------------------------------- ### Run Chancy Worker with FastAPI Lifespan Events (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/fastapi.rst This snippet demonstrates how to integrate a Chancy worker into a FastAPI application using its lifespan events. It ensures the worker starts when FastAPI starts and shuts down gracefully. It includes database migration, queue declaration, and starting the worker in the background. Note: Database migration is not recommended for production. ```python import asyncio import contextlib from typing import AsyncIterator from fastapi import FastAPI from chancy import Chancy, Worker, Queue, job chancy = Chancy("postgresql://localhost/postgres") @contextlib.asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: """ FastAPI lifespan handler that starts and stops the Chancy worker. This ensures the worker starts when FastAPI starts and shuts down properly. """ # Run the database migrations (don't do this in production) await chancy.migrate() # Declare any queues we need. await chancy.declare(Queue("default")) # Start the worker in the background and return control to FastAPI async with Worker(chancy) as worker: yield app = FastAPI(lifespan=lifespan) @job(queue="default") async def send_an_email(): print("Sending an email") @app.get("/") async def read_root(): await chancy.push(send_an_email) return {"Hello": "World"} ``` -------------------------------- ### Start Python Workers with Chancy Source: https://context7.com/tktech/chancy/llms.txt This Python snippet demonstrates how to start Chancy workers to process jobs from queues. It shows worker initialization with custom IDs, heartbeats, shutdown timeouts, and tags, along with handling shutdown signals. ```python from chancy import Chancy, Worker, Queue import signal async def run_worker(): async with Chancy("postgresql://localhost/postgres") as chancy: # Create worker with custom tags worker = Worker( chancy, worker_id="worker-001", # Optional unique ID heartbeat_poll_interval=30, shutdown_timeout=30, tags={"datacenter=us-east", "has=gpu"} # Custom worker tags ) async with worker: print(f"Worker {worker.worker_id} started") print(f"Worker tags: {worker.worker_tags()}") # Wait for shutdown signal (SIGTERM, SIGINT) await worker.wait_for_shutdown() # Run worker asyncio.run(run_worker()) ``` -------------------------------- ### Synchronous Job Enqueueing with Chancy (Python) Source: https://context7.com/tktech/chancy/llms.txt Provides an example of how to enqueue and manage jobs from synchronous (non-async) Python code. It demonstrates declaring a queue, pushing a job, and waiting for its completion using the synchronous API of the Chancy client. ```python from chancy import Chancy, Job def my_task(*, data: dict): print(f"Processing: {data}") # Synchronous context for non-async applications with Chancy("postgresql://localhost/postgres") as chancy: # Declare queue synchronously chancy.sync_declare(Queue("default")) # Push job synchronously ref = chancy.sync_push( Job.from_func(my_task, kwargs={"data": {"key": "value"}}) ) # Wait for completion synchronously job = chancy.sync_get_job(ref) print(f"Job {job.id} is {job.state}") ``` -------------------------------- ### Initialize Chancy with a Custom Logger (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/log.rst This snippet shows how to pass a pre-configured logger to the Chancy constructor. This is useful when you want to integrate Chancy's logging with your application's existing logging setup. Ensure the logger is properly configured before passing it. ```python import logging from chancy import Chancy logger = logging.getLogger("my_application") chancy_app = Chancy( settings.my_database_dsn, log=logger, ) ``` -------------------------------- ### Define and Enqueue Jobs with Chancy (Python) Source: https://context7.com/tktech/chancy/llms.txt Illustrates how to define jobs using decorators for both synchronous and asynchronous functions. It shows how to enqueue single jobs with various options like priority, max attempts, scheduling, and unique keys, and how to push multiple jobs in batches. The example also includes waiting for a job to complete and checking its status. ```python from chancy import job, Job, Chancy from datetime import datetime, timedelta, timezone # Simple job with decorator @job() def send_email(*, to: str, subject: str, body: str): print(f"Sending email to {to}: {subject}") # Email sending logic here return {"status": "sent", "timestamp": datetime.now().isoformat()} # Async job @job(queue="high_priority", priority=10) async def process_payment(*, user_id: int, amount: float): print(f"Processing ${amount} payment for user {user_id}") await asyncio.sleep(1) # Simulate processing return {"transaction_id": "txn_12345", "status": "completed"} async def enqueue_jobs(): async with Chancy("postgresql://localhost/postgres") as chancy: # Push single job with custom parameters ref = await chancy.push( send_email.job .with_kwargs(to="user@example.com", subject="Test", body="Hello") .with_priority(5) .with_max_attempts(3) .with_scheduled_at(datetime.now(tz=timezone.utc) + timedelta(minutes=5)) .with_unique_key("email_user123") # Prevent duplicates ) # Wait for job to complete and check result completed_job = await chancy.wait_for_job(ref, timeout=60) print(f"Job state: {completed_job.state}") # succeeded or failed # Push multiple jobs in batches jobs = [ send_email.job.with_kwargs(to=f"user{i}@example.com", subject="Batch", body="Test") for i in range(1000) ] async for batch_refs in chancy.push_many(jobs, batch_size=500): print(f"Pushed {len(batch_refs)} jobs") ``` -------------------------------- ### Assign Worker Tags and Queues in Python Source: https://context7.com/tktech/chancy/llms.txt Explains how to route jobs to specific workers using tags in Chancy. It covers declaring queues with tag requirements (e.g., hardware capabilities) and starting workers with specific tags. ```python from chancy import Chancy, Worker, Queue async def setup_tagged_workers(): async with Chancy("postgresql://localhost/postgres") as chancy: # Declare queue that requires GPU await chancy.declare(Queue( name="ml_training", tags={"has=gpu", r"python=3\.11\.[0-9]+"}, # Regex patterns concurrency=2 )) # Declare queue for specific datacenter await chancy.declare(Queue( name="eu_processing", tags={"datacenter=eu-west"}, concurrency=4 )) # Start worker with GPU tag async with Worker( chancy, tags={"has=gpu", "datacenter=us-east"} ) as worker: # This worker will process ml_training queue # Default tags include: hostname, worker_id, python, arch, os, * await worker.wait_for_shutdown() ``` -------------------------------- ### Retrieve and Wait for Job Completion (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Explains how to use the `chancy.push()` method to get a job reference and then use `chancy.wait_for_job()` to retrieve the completed job object and assert its final state. ```python from chancy import Chancy # Assuming 'greet' is a defined job async with Chancy("postgresql://localhost/postgres") as chancy: reference = await chancy.push(greet) finished_job = await chancy.wait_for_job(reference) assert finished_job.state == finished_job.State.SUCCEEDED ``` -------------------------------- ### Transactional Python Job Enqueueing with Chancy Source: https://context7.com/tktech/chancy/llms.txt This Python example demonstrates how to atomically enqueue jobs with database operations using Chancy. It utilizes database transactions to ensure that job enqueuing is performed reliably alongside other database modifications, rolling back both if any operation fails. ```python from chancy import Chancy, Job from psycopg.rows import dict_row async def transactional_enqueue(): async with Chancy("postgresql://localhost/postgres") as chancy: # Use a transaction to ensure atomicity async with chancy.pool.connection() as conn: async with conn.cursor(row_factory=dict_row) as cursor: async with conn.transaction(): # Create user in database await cursor.execute( "INSERT INTO users (email, name) VALUES (%s, %s) RETURNING id", ("user@example.com", "John Doe") ) user = await cursor.fetchone() # Push job only if user creation succeeds await chancy.push_ex( cursor, send_welcome_email.job.with_kwargs(user_id=user["id"]) ) # If any operation fails, both are rolled back ``` -------------------------------- ### Get and Configure Chancy's Logger Globally (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/log.rst This snippet shows how to retrieve Chancy's logger using the standard Python logging module and configure it independently. This approach is useful if you need to manage Chancy's logging alongside other loggers in your application or set up specific handlers and formatters. ```python import logging logger = logging.getLogger("chancy") logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) ``` -------------------------------- ### Initialize Chancy Application Source: https://github.com/tktech/chancy/blob/main/docs/design.rst Demonstrates how to create and manage a Chancy application instance, which handles database connections and exposes core functionalities like job queuing and migrations. The application instance itself does not run tasks. ```python from chancy import Chancy async with Chancy("postgresql://localhost/postgres") as chancy: pass ``` -------------------------------- ### Initialize Chancy Application and Database Migrations (Python) Source: https://context7.com/tktech/chancy/llms.txt Demonstrates how to initialize the Chancy application with PostgreSQL connection details, configure connection pooling and notifications, and run database migrations. It also shows how to declare a queue with specific concurrency, tags, executor, and rate limiting settings. ```python import asyncio from chancy import Chancy, Worker, Queue, job # Initialize Chancy with database connection chancy = Chancy( "postgresql://user:pass@localhost/mydb", prefix="chancy_", # Table prefix for multi-tenant setups min_connection_pool_size=1, max_connection_pool_size=10, notifications=True # Enable LISTEN/NOTIFY for real-time updates ) async def setup(): async with chancy: # Run migrations to create/update database schema await chancy.migrate() # Declare queues with specific configurations await chancy.declare(Queue( name="default", concurrency=4, # Max concurrent jobs per worker tags={r".*"}, # Worker tags (regex patterns) executor="chancy.executors.process.ProcessExecutor", polling_interval=5, rate_limit=100, # Max 100 jobs rate_limit_window=60 # per 60 seconds )) asyncio.run(setup()) ``` -------------------------------- ### Run Chancy API Dashboard in Python Source: https://context7.com/tktech/chancy/llms.txt Provides instructions on running the built-in web dashboard for Chancy, including setting up API server with host, port, and authentication. It also mentions the equivalent CLI command. ```python from chancy import Chancy from chancy.plugins.api import Api # Run API server with authentication async def run_api(): chancy = Chancy("postgresql://localhost/postgres") api = Api( chancy, host="0.0.0.0", port=8080, auth=lambda username, password: username == "admin" and password == "secret" ) await api.run() # Or use CLI: # chancy --app worker.chancy api --host 0.0.0.0 --port 8080 # Dashboard provides: # - Real-time job monitoring # - Queue management # - Worker status # - Workflow visualization # - Metrics and statistics ``` -------------------------------- ### Python: Collect and Query Job Metrics with Chancy Source: https://context7.com/tktech/chancy/llms.txt This snippet demonstrates how to initialize Chancy with the Metrics plugin to automatically collect job performance data. It then shows how to query job completion rates for the last day using raw SQL. ```python from chancy import Chancy from chancy.plugins.metrics import Metrics async def get_metrics(): async with Chancy( "postgresql://localhost/postgres", plugins=[Metrics(retention_days=7)] ) as chancy: # Metrics are automatically collected # Access via API dashboard or database queries # Query job completion rates async with chancy.pool.connection() as conn: async with conn.cursor() as cursor: await cursor.execute( """ SELECT queue, COUNT(*) as total, SUM(CASE WHEN state = 'succeeded' THEN 1 ELSE 0 END) as succeeded FROM chancy_jobs WHERE completed_at > NOW() - INTERVAL '1 day' GROUP BY queue """ ) for row in await cursor.fetchall(): print(f"Queue: {row[0]}, Total: {row[1]}, Success Rate: {row[2]/row[1]*100:.1f}%") ``` -------------------------------- ### Queue Multiple Jobs (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Shows how to efficiently queue multiple jobs at once by passing a list of job objects to the `chancy.push_many()` method. ```python from chancy import Chancy # Assuming job1, job2, job3 are defined jobs async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.push_many([job1, job2, job3]) ``` -------------------------------- ### Create Job with Default Parameters (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Shows how to create a job with predefined default parameters like queue, priority, maximum retry attempts, and keyword arguments using the `@job()` decorator. ```python from chancy import job @job(queue="default", priority=1, max_attempts=3, kwargs={"name": "World"}) def greet(*, name: str): print(f"Hello, {name}!") ``` -------------------------------- ### Python Sequential Workflow Creation with Chancy Source: https://context7.com/tktech/chancy/llms.txt Illustrates creating a sequential workflow where jobs execute one after another using Chancy's `Sequence` class. It shows both direct and incremental methods for adding jobs to the sequence and pushing it for execution. Requires the 'chancy' library and a PostgreSQL database. ```python from chancy import Chancy, job from chancy.plugins.workflow import Sequence @job() def step_one(): print("Step 1") @job() def step_two(): print("Step 2") @job() def step_three(): print("Step 3") async def run_sequence(): async with Chancy("postgresql://localhost/postgres") as chancy: # Create sequential workflow sequence = Sequence("my_sequence", [step_one, step_two, step_three]) # Or build incrementally sequence = ( Sequence("my_sequence") .add(step_one) .add(step_two) .add(step_three) ) # Push and execute workflow_id = await sequence.push(chancy) ``` -------------------------------- ### Queue a Single Job (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Demonstrates the process of adding a single job to the Chancy queue using the `chancy.push()` method within an asynchronous context. ```python from chancy import Chancy # Assuming 'greet' is a defined job async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.push(greet) ``` -------------------------------- ### Python Workflow Creation and Execution with Chancy Source: https://context7.com/tktech/chancy/llms.txt Demonstrates how to define a directed acyclic graph (DAG) workflow with dependencies between jobs using Chancy. It covers adding steps, pushing the workflow, waiting for completion, and generating a DOT visualization. Requires the 'chancy' library and a PostgreSQL database. ```python from chancy import Chancy, job from chancy.plugins.workflow import Workflow, WorkflowPlugin @job() def fetch_data(*, source: str): print(f"Fetching from {source}") return {"data": [1, 2, 3]} @job() def transform_data(*, data: list): print(f"Transforming {data}") return {"transformed": [x * 2 for x in data]} @job() def save_to_db(*, data: list): print(f"Saving {data}") @job() def send_notification(*, message: str): print(f"Notification: {message}") async def run_workflow(): async with Chancy("postgresql://localhost/postgres") as chancy: # Create workflow with dependencies workflow = Workflow("data_pipeline") # Add steps with dependencies workflow.add("fetch", fetch_data.job.with_kwargs(source="api")) workflow.add("transform", transform_data, ["fetch"]) # Depends on fetch workflow.add("save", save_to_db, ["transform"]) workflow.add("notify", send_notification.job.with_kwargs(message="Done"), ["save"]) # Push workflow to database workflow_id = await WorkflowPlugin.push(chancy, workflow) # Wait for workflow completion completed_workflow = await WorkflowPlugin.wait_for_workflow( chancy, workflow_id, timeout=300 ) print(f"Workflow state: {completed_workflow.state}") for step_id, step in completed_workflow.steps.items(): print(f" {step_id}: {step.state}") # Fetch workflow status workflow = await WorkflowPlugin.fetch_workflow(chancy, workflow_id) # Generate visualization with open("workflow.dot", "w") as f: WorkflowPlugin.generate_dot(workflow, f) ``` -------------------------------- ### Create Basic Job with Decorator (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Demonstrates how to create a basic job using the `@job()` decorator from the chancy library. This function can be called normally or executed by a Chancy worker. ```python from chancy import job @job() def greet(): print(f"Hello world!") ``` -------------------------------- ### Import Job and Task in Python Source: https://github.com/tktech/chancy/blob/main/docs/faq.rst Demonstrates how to import Chancy's 'Job' class alongside Celery's 'Task' class, facilitating progressive migration from Celery to Chancy. ```python from chancy import Job from celery import Task ``` -------------------------------- ### Set Resource Limits for Jobs (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Explains how to define resource limits, such as memory and time, for job execution using the `Limit` class and passing them to the `@job()` decorator. Note that executor support varies. ```python from chancy import Limit, job @job(limits=[ Limit(Limit.Type.MEMORY, 1024 * 1024 * 1024), Limit(Limit.Type.TIME, 60), ]) def greet(*, name: str): print(f"Hello, {name}!") ``` -------------------------------- ### Run Chancy Worker Process Source: https://github.com/tktech/chancy/blob/main/docs/design.rst Shows how to set up and run a Chancy worker process. The worker is responsible for fetching jobs from the queue, executing them, and updating their status in the database. It utilizes PostgreSQL's `SELECT...FOR UPDATE SKIP LOCKED` for exclusive job processing and `LISTEN/NOTIFY` for near real-time job notifications. ```python from chancy import Chancy, Worker async with Chancy("postgresql://localhost/postgres") as chancy: async with Worker(chancy) as worker: await worker.wait_for_shutdown() ``` -------------------------------- ### Python: Django Integration with Chancy for Task Queues Source: https://context7.com/tktech/chancy/llms.txt This snippet shows how to integrate Chancy into a Django project by configuring `settings.py`, defining a job in `models.py`, and pushing jobs from a Django view. It also illustrates querying Chancy jobs and queues using the Django ORM. ```python # settings.py INSTALLED_APPS = [ # ... "chancy.contrib.django", # Core Django integration "chancy.plugins.cron.django", # Cron plugin admin "chancy.plugins.workflow.django", # Workflow plugin admin ] # Configure Chancy to use Django database connection CHANCY_DSN = "default" # Use Django's default database # models.py from chancy.contrib.django import get_chancy from chancy import job @job() def send_welcome_email(*, user_id: int): from django.contrib.auth.models import User user = User.objects.get(id=user_id) # Send email logic # views.py from django.http import JsonResponse from chancy.contrib.django import get_chancy async def create_user(request): # Get Chancy instance configured with Django connection chancy = get_chancy() # Push job ref = await chancy.push( send_welcome_email.job.with_kwargs(user_id=request.user.id) ) return JsonResponse({"job_id": str(ref.identifier)}) # Query jobs via Django ORM from chancy.contrib.django.models import Job, Queue active_queues = Queue.objects.filter(state="active") failed_jobs = Job.objects.filter(state="failed", queue="default") ``` -------------------------------- ### Query Job Counts in Chancy using SQL Source: https://github.com/tktech/chancy/blob/main/docs/howto/celery.rst This SQL query allows for introspection of the Chancy job queue by counting the occurrences of each function. It leverages the database directly for monitoring and debugging purposes, offering a simple yet powerful way to understand the state of your background tasks. No external tools or services are required. ```sql SELECT func, COUNT(*) FROM chancy_jobs GROUP BY func; ``` -------------------------------- ### Schedule Job Execution (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Shows how to schedule a job to run at a specific time in the future using the `with_scheduled_at()` method, accepting a timezone-aware datetime object. ```python from datetime import datetime, timedelta, timezone # Assuming 'greet' is a defined job future_job = greet.job.with_scheduled_at( datetime.now(timezone.utc) + timedelta(hours=1) ) ``` -------------------------------- ### Python Cron Job Scheduling with Chancy Source: https://context7.com/tktech/chancy/llms.txt Shows how to schedule recurring jobs using cron syntax with Chancy's `Cron` plugin. This includes scheduling jobs to run at specific intervals, managing multiple schedules, retrieving schedule information, and unscheduling jobs. Requires the 'chancy' library with the 'Cron' plugin and a PostgreSQL database. ```python from chancy import Chancy, job from chancy.plugins.cron import Cron @job() def daily_report(): print("Generating daily report") @job() def cleanup_temp_files(): print("Cleaning up temporary files") async def setup_cron(): async with Chancy( "postgresql://localhost/postgres", plugins=[Cron(poll_interval=60)] ) as chancy: # Schedule job to run every day at midnight UTC await Cron.schedule( chancy, "0 0 * * *", # Cron syntax daily_report.job.with_unique_key("daily_report_job") ) # Schedule multiple jobs with same schedule await Cron.schedule( chancy, "*/15 * * * *", # Every 15 minutes cleanup_temp_files.job.with_unique_key("cleanup_job"), ) # Get scheduled jobs schedules = await Cron.get_schedules(chancy) for key, schedule in schedules.items(): print(f"{key}: {schedule['cron']} - Next run: {schedule['next_run']}") # Get specific schedule schedule = await Cron.get_schedules(chancy, unique_keys=["daily_report_job"]) # Unschedule jobs await Cron.unschedule(chancy, "daily_report_job") ``` -------------------------------- ### Modify Job Properties (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Illustrates how to create a new job instance with modified properties, such as keyword arguments, using the `with_` methods on an existing job object. This is useful for customizing jobs before queuing. ```python from chancy import job, Chancy @job(queue="default", priority=1, max_attempts=3, kwargs={"name": "World"}) def greet(*, name: str): print(f"Hello, {name}!") async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.push(greet.job.with_kwargs(name="Alice")) ``` -------------------------------- ### Configure Custom Executors in Python Source: https://context7.com/tktech/chancy/llms.txt Demonstrates how to set up different executor types (process, thread, asyncio, sub-interpreter) for various workloads using the Chancy library. Each executor is configured with specific options like maximum workers and concurrency. ```python from chancy import Chancy, Queue async def setup_executors(): async with Chancy("postgresql://localhost/postgres") as chancy: # Process-based executor (default) - CPU intensive tasks await chancy.declare(Queue( name="cpu_intensive", executor="chancy.executors.process.ProcessExecutor", executor_options={"max_workers": 4}, concurrency=4 )) # Thread-based executor - I/O bound tasks await chancy.declare(Queue( name="io_bound", executor="chancy.executors.thread.ThreadedExecutor", executor_options={"max_workers": 10}, concurrency=10 )) # Asyncio executor - async tasks await chancy.declare(Queue( name="async_tasks", executor="chancy.executors.asyncex.AsyncExecutor", concurrency=100 # Can handle many concurrent async tasks )) # Sub-interpreter executor (experimental) - isolated Python interpreters await chancy.declare(Queue( name="isolated", executor="chancy.executors.sub.SubInterpreterExecutor", concurrency=2 )) ``` -------------------------------- ### Manage Python Queues with Chancy Source: https://context7.com/tktech/chancy/llms.txt This Python code illustrates queue management operations using Chancy. It covers creating queues with specific configurations like concurrency and rate limiting, updating existing queues, pausing and resuming queues, retrieving queue details, listing all queues, and deleting queues. ```python from chancy import Chancy, Queue from datetime import timedelta async def manage_queues(): async with Chancy("postgresql://localhost/postgres") as chancy: # Create queue with rate limiting await chancy.declare(Queue( name="api_calls", concurrency=10, executor="chancy.executors.asyncex.AsyncExecutor", rate_limit=100, # Max 100 jobs rate_limit_window=60, # per 60 seconds tags={"api-worker"} # Only workers with "api-worker" tag )) # Update existing queue configuration await chancy.declare( Queue("api_calls", concurrency=20), upsert=True # Update if exists ) # Pause queue for 1 hour await chancy.pause_queue("api_calls", resume_at=timedelta(hours=1)) # Resume queue immediately await chancy.resume_queue("api_calls") # Get queue details queue = await chancy.get_queue("api_calls") print(f"Queue: {queue.name}, State: {queue.state}, Concurrency: {queue.concurrency}") # List all queues all_queues = await chancy.get_all_queues() for q in all_queues: print(f"{q.name}: {q.state}") # Delete queue (and optionally purge jobs) await chancy.delete_queue("api_calls", purge_jobs=True) ``` -------------------------------- ### Set Job Priority (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Illustrates how to assign different priorities to jobs, enabling control over their execution order. Higher priority values result in earlier execution. ```python # Assuming 'greet' is a defined job higher_priority_job = greet.job.with_priority(10) lower_priority_job = greet.job.with_priority(-10) ``` -------------------------------- ### Monitor and Control Python Jobs with Chancy Source: https://context7.com/tktech/chancy/llms.txt This Python snippet shows how to monitor and control jobs within the Chancy framework. It covers pushing jobs, retrieving job statuses and details, waiting for job completion with custom states, waiting for multiple jobs, and cancelling jobs. ```python from chancy import Chancy, QueuedJob async def monitor_jobs(): async with Chancy("postgresql://localhost/postgres") as chancy: # Push a job and get reference ref = await chancy.push(my_long_task.job) # Get current job status job = await chancy.get_job(ref) if job: print(f"State: {job.state}") # pending, running, succeeded, failed print(f"Attempts: {job.attempts}/{job.max_attempts}") print(f"Created: {job.created_at}") print(f"Started: {job.started_at}") print(f"Completed: {job.completed_at}") print(f"Errors: {job.errors}") # List of error dictionaries # Wait for job with custom states completed = await chancy.wait_for_job( ref, interval=1, timeout=300, states={QueuedJob.State.SUCCEEDED, QueuedJob.State.FAILED} ) # Wait for multiple jobs refs = [await chancy.push(task.job) for _ in range(10)] completed_jobs = await chancy.wait_for_jobs(refs, timeout=60) # Cancel a running job await chancy.cancel_job(ref) ``` -------------------------------- ### Create Unique Jobs (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Demonstrates how to ensure a job with a specific key is not queued or run more than once concurrently using `with_unique_key()`. This prevents duplicate processing of the same task. ```python from chancy import Chancy, job @job() def greet(*, name: str): print(f"Hello, {name}!") async with Chancy("postgresql://localhost/postgres") as chancy: await chancy.push(greet.job.with_unique_key("greet_alice").with_kwargs(name="Alice")) ``` -------------------------------- ### Set Job Retry Attempts (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/jobs.rst Demonstrates how to configure the maximum number of times a job should be retried if it fails during execution using the `with_max_attempts()` method. ```python # Assuming 'greet' is a defined job greet.job.with_max_attempts(3) ``` -------------------------------- ### Configure Chancy API with Django Authentication Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Sets up Chancy with the DjangoAuthBackend to enable login using Django superuser credentials. It configures the API plugin with the authentication backend and Django's secret key. ```python import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_application.settings") import django django.setup() from django.conf import settings from chancy.contrib.django.auth import DjangoAuthBackend app = Chancy( dsn=settings.DATABASES["default"], plugins=[ Api( authentication_backend=DjangoAuthBackend(), secret_key=settings.SECRET_KEY, ), ], ) ``` -------------------------------- ### Set Job Resource Limits in Python Source: https://context7.com/tktech/chancy/llms.txt Illustrates how to define memory and time limits for jobs using the Chancy library. This involves specifying the limit type (MEMORY, TIME) and the corresponding value in bytes or seconds. ```python from chancy import job, Job, Limit @job() def memory_intensive_task(*, data: list): # Process large dataset result = process_large_data(data) return result async def setup_limits(): async with Chancy("postgresql://localhost/postgres") as chancy: # Set memory limit (in bytes) and timeout (in seconds) await chancy.push( memory_intensive_task.job .with_kwargs(data=[1, 2, 3]) .with_limits([ Limit(type_=Limit.Type.MEMORY, value=512 * 1024 * 1024), # 512MB Limit(type_=Limit.Type.TIME, value=300) # 5 minutes ]) ) ``` -------------------------------- ### Configure RetryPlugin with Advanced Settings in Python Source: https://github.com/tktech/chancy/blob/main/docs/howto/retry.rst Shows how to use Chancy's RetryPlugin with custom retry settings like backoff, backoff_factor, backoff_limit, and backoff_jitter for a job. ```python from chancy import job from chancy.plugins.retry import RetryPlugin from chancy import Chancy, Queue @job() def job_that_fails(): raise ValueError("This job should fail.") async with Chancy(..., plugins=[RetryPlugin()]) as chancy: await chancy.declare(Queue("default")) await chancy.push( job_that_fails.job.with_max_attempts(3).with_meta({ "retry_settings": { "backoff": 2, "backoff_factor": 3, "backoff_limit": 300, "backoff_jitter": [1, 5], } }) ) ``` -------------------------------- ### Interact with Chancy Jobs via Django ORM Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Demonstrates pushing a job to Chancy and then retrieving it using Django's asynchronous ORM. This assumes Chancy tables reside in the same database as the Django default. ```python from chancy.contrib.django.models import Job j = await chancy.push(test_job) orm_job = await Job.objects.aget(id=j.identifier) ``` -------------------------------- ### Implement Job Retries and Error Handling in Python Source: https://context7.com/tktech/chancy/llms.txt Shows how to configure automatic job retries with exponential backoff and capture detailed error information using Chancy. This includes defining a flaky job and checking its state and errors after completion. ```python from chancy import job, Job, Chancy from chancy.plugins.retry import Retry, ExponentialBackoff @job(max_attempts=5) def flaky_api_call(*, endpoint: str): # This job will retry up to 5 times if it fails response = requests.get(endpoint) response.raise_for_status() return response.json() async def setup_retry(): async with Chancy( "postgresql://localhost/postgres", plugins=[ Retry( # Exponential backoff: 1s, 2s, 4s, 8s, 16s backoff=ExponentialBackoff(base=1, multiplier=2, max_delay=60) ) ] ) as chancy: # Push job with retry configuration ref = await chancy.push( flaky_api_call.job .with_kwargs(endpoint="https://api.example.com/data") .with_max_attempts(5) ) # Check job errors after completion job = await chancy.wait_for_job(ref) if job.state == Job.State.FAILED: for error in job.errors: print(f"Attempt {error['attempt']}: {error['traceback']}") ``` -------------------------------- ### Enable Chancy Django Plugin Apps Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Includes Chancy's Cron and Workflow plugins' Django extensions in the INSTALLED_APPS setting. This allows management of cron jobs and workflows through the Django admin and ORM. ```python INSTALLED_APPS = [ ... "chancy.contrib.django", "chancy.plugins.cron.django", "chancy.plugins.workflow.django", ] ``` -------------------------------- ### Define a Job with Max Attempts in Python Source: https://github.com/tktech/chancy/blob/main/docs/howto/retry.rst Demonstrates how to define a Python job using Chancy's @job decorator and specify the maximum number of retry attempts. ```python from chancy import job @job(max_attempts=3) def my_job(): raise ValueError("This job should fail.") ``` -------------------------------- ### Modify Chancy's Logger After Initialization (Python) Source: https://github.com/tktech/chancy/blob/main/docs/howto/log.rst This snippet demonstrates how to access and modify the logger associated with a Chancy application after it has been instantiated. This allows you to change log levels or add handlers dynamically. The logger can be accessed via the `app.log` attribute. ```python import logging chancy_app = Chancy(settings.my_database_dsn) chancy_app.log.setLevel(logging.DEBUG) ``` -------------------------------- ### Enable Chancy Django Contribution Apps Source: https://github.com/tktech/chancy/blob/main/docs/howto/django.rst Adds Chancy's Django contribution apps to the INSTALLED_APPS setting in Django. This enables access to Chancy's Jobs, Queues, and Workers models within the Django ORM and Admin. ```python INSTALLED_APPS = [ ... "chancy.contrib.django", ] ``` -------------------------------- ### Access Job Context in Python Source: https://github.com/tktech/chancy/blob/main/docs/howto/context.rst This snippet demonstrates how to access the job context within a Chancy job function. By type-hinting an argument with `QueuedJob`, Chancy automatically injects the job's context, providing access to its ID and number of attempts. The argument name can be arbitrary. ```python from chancy import QueuedJob, job @job() def my_job(*, context: QueuedJob): print(f"Job ID: {context.id}") print(f"Job attempts: {context.attempts}") ``` -------------------------------- ### Modify Job Context Meta in Python Source: https://github.com/tktech/chancy/blob/main/docs/howto/context.rst This snippet illustrates how to modify the mutable 'meta' attribute of the job context in Chancy. While most of the job context is immutable, the 'meta' dictionary can be used to store and update arbitrary data associated with the job, such as tracking custom attempt counts. ```python from chancy import QueuedJob, job @job() def my_job(*, context: QueuedJob): # This will raise an exception because the job context is # generally immutable. context.id = "new_id" # This will work because the meta attribute is mutable. context.meta["attempts"] = context.meta.get("attempts", 0) + 1 ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.