### Install arq Source: https://github.com/python-arq/arq/blob/main/docs/index.md Install the package using pip. ```default pip install arq ``` -------------------------------- ### Enqueue and Handle Job Results Source: https://context7.com/python-arq/arq/llms.txt Demonstrates how to enqueue a job, check its status, retrieve its info, and get the result with timeouts. Requires a running ARQ worker. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings from arq.jobs import Job, JobStatus async def compute(ctx, x: int, y: int): return x + y async def main(): redis = await create_pool(RedisSettings()) # Enqueue and wait for result job = await redis.enqueue_job('compute', 10, 20) # Get job status status = await job.status() print(f'Status: {status}') # JobStatus.queued, JobStatus.in_progress, etc. # Get job info (doesn't wait for completion) info = await job.info() if info: print(f'Function: {info.function}, Args: {info.args}') # Wait for result with timeout (requires worker running) try: result = await job.result(timeout=30, poll_delay=0.5) print(f'Result: {result}') # Result: 30 except asyncio.TimeoutError: print('Job did not complete in time') except Exception as e: print(f'Job raised exception: {e}') # Get result info without re-raising exceptions result_info = await job.result_info() if result_info: print(f'Success: {result_info.success}, Result: {result_info.result}') # Retrieve existing job by ID existing_job = Job(job_id='my-custom-id', redis=redis) status = await existing_job.status() class WorkerSettings: functions = [compute] keep_result = 3600 # Keep results for 1 hour if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Enqueue and Manage Jobs Source: https://github.com/python-arq/arq/blob/main/docs/index.md Demonstrates how to enqueue a task, inspect its status, and retrieve results using the arq Redis pool. ```python # requires `pip install devtools`, used for pretty printing of job info from devtools import debug async def the_task(ctx): print('running the task') return 42 async def main(): redis = await create_pool(RedisSettings()) job = await redis.enqueue_job('the_task') # get the job's id print(job.job_id) """ > 68362958a244465b9be909db4b7b5ab4 (or whatever) """ # get information about the job, will include results if the job has finished, but # doesn't await the job's result debug(await job.info()) """ > docs/examples/job_results.py:23 main JobDef( function='the_task', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2019, 4, 23, 13, 58, 56, 781000), score=1556027936781 ) (JobDef) """ # get the Job's status print(await job.status()) """ > JobStatus.queued """ # poll redis for the job result, if the job raised an exception, # it will be raised here # (You'll need the worker running at the same time to get a result here) print(await job.result(timeout=5)) """ > 42 """ class WorkerSettings: functions = [the_task] if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Define WorkerSettings and Execution Source: https://github.com/python-arq/arq/blob/main/docs/index.md Basic structure for a WorkerSettings class and the entry point for running the worker. ```python class WorkerSettings: functions = [download_content] on_startup = startup on_shutdown = shutdown redis_settings = REDIS_SETTINGS if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Abort Jobs Source: https://github.com/python-arq/arq/blob/main/docs/index.md Demonstrates how to use the abort method on a job instance. Requires allow_abort_jobs to be set to True in WorkerSettings. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings async def do_stuff(ctx): print('doing stuff...') await asyncio.sleep(10) return 'stuff done' async def main(): redis = await create_pool(RedisSettings()) job = await redis.enqueue_job('do_stuff') await asyncio.sleep(1) await job.abort() class WorkerSettings: functions = [do_stuff] allow_abort_jobs = True if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Create Redis Connection Pool with arq Source: https://context7.com/python-arq/arq/llms.txt Establishes a connection to Redis using `create_pool`. Supports default, custom settings, and DSN strings. Use `RedisSettings` for configuration. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings async def main(): # Basic connection with default settings (localhost:6379) redis = await create_pool(RedisSettings()) # Connection with custom settings redis = await create_pool( RedisSettings( host='redis.example.com', port=6379, password='secret', database=1, ssl=True, conn_timeout=5, conn_retries=3, ) ) # Connection from DSN string redis = await create_pool(RedisSettings.from_dsn('redis://user:pass@localhost:6379/1')) # Enqueue a job job = await redis.enqueue_job('my_task', 'arg1', key='value') print(f'Enqueued job: {job.job_id}') # Output: Enqueued job: 68362958a244465b9be909db4b7b5ab4 if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Custom Job Serialization with MsgPack Source: https://github.com/python-arq/arq/blob/main/docs/index.md Implement custom job serialization using MsgPack for potentially improved memory efficiency over pickle. Ensure the same serializer/deserializer is used for both connection pool creation and worker settings. ```default import asyncio import msgpack # installable with "pip install msgpack" from arq import create_pool from arq.connections import RedisSettings async def the_task(ctx): return 42 async def main(): redis = await create_pool( RedisSettings(), job_serializer=msgpack.packb, job_deserializer=lambda b: msgpack.unpackb(b, raw=False), ) await redis.enqueue_job('the_task') class WorkerSettings: functions = [the_task] job_serializer = msgpack.packb # refer to MsgPack's documentation as to why raw=False is required job_deserializer = lambda b: msgpack.unpackb(b, raw=False) if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Implement Worker Lifecycle Hooks Source: https://context7.com/python-arq/arq/llms.txt Define startup, shutdown, and job-specific hooks within a WorkerSettings class to manage shared resources and track job execution. ```python import asyncio from httpx import AsyncClient from arq import create_pool from arq.connections import RedisSettings async def on_startup(ctx): """Called when worker starts - initialize shared resources.""" print('Worker starting...') ctx['http_client'] = AsyncClient() ctx['db_pool'] = await create_database_pool() async def on_shutdown(ctx): """Called when worker shuts down - cleanup resources.""" print('Worker shutting down...') await ctx['http_client'].aclose() await ctx['db_pool'].close() async def on_job_start(ctx): """Called before each job starts.""" print(f'Starting job {ctx["job_id"]} (attempt {ctx["job_try"]})') ctx['job_start_time'] = asyncio.get_event_loop().time() async def on_job_end(ctx): """Called after each job ends (before result is recorded).""" elapsed = asyncio.get_event_loop().time() - ctx['job_start_time'] print(f'Job {ctx["job_id"]} completed in {elapsed:.2f}s') async def after_job_end(ctx): """Called after job ends and result is recorded.""" print(f'Job {ctx["job_id"]} result saved') async def my_task(ctx, data: str): client = ctx['http_client'] # Use shared resources return f'Processed: {data}' class WorkerSettings: functions = [my_task] on_startup = on_startup on_shutdown = on_shutdown on_job_start = on_job_start on_job_end = on_job_end after_job_end = after_job_end redis_settings = RedisSettings() # Placeholder for example async def create_database_pool(): return None ``` -------------------------------- ### Access Job Results Source: https://github.com/python-arq/arq/blob/main/docs/index.md Initialize a connection pool to interact with job results. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings ``` -------------------------------- ### Define and Enqueue arq Jobs Source: https://github.com/python-arq/arq/blob/main/docs/index.md Configure Redis settings, define worker functions with context, and enqueue jobs using the arq pool. ```default import asyncio from httpx import AsyncClient from arq import create_pool from arq.connections import RedisSettings # Here you can configure the Redis connection. # The default is to connect to localhost:6379, no password. REDIS_SETTINGS = RedisSettings() async def download_content(ctx, url): session: AsyncClient = ctx['session'] response = await session.get(url) print(f'{url}: {response.text:.80}...') return len(response.text) async def startup(ctx): ctx['session'] = AsyncClient() async def shutdown(ctx): await ctx['session'].aclose() async def main(): redis = await create_pool(REDIS_SETTINGS) for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'): await redis.enqueue_job('download_content', url) # WorkerSettings defines the settings to use when creating the work, # It's used by the arq CLI. # redis_settings might be omitted here if using the default settings ``` -------------------------------- ### Schedule Cron Jobs with ARQ Source: https://context7.com/python-arq/arq/llms.txt Use the `cron` function to schedule jobs that run at specific times. Configure schedules for daily, hourly, weekday, or multiple specific times. Jobs can also be set to run at startup or with custom IDs. ```python from datetime import timezone from arq import cron from arq.connections import RedisSettings async def daily_cleanup(ctx): """Runs daily at 3:00 AM.""" print('Running daily cleanup...') # Perform cleanup tasks async def hourly_sync(ctx): """Runs every hour at minute 30.""" print('Running hourly sync...') # Sync data async def weekday_report(ctx): """Runs Monday-Friday at 9:00 AM.""" print('Generating weekday report...') # Generate report async def complex_schedule(ctx): """Runs at 9:12 AM, 12:12 PM, and 6:12 PM.""" print('Running scheduled task...') class WorkerSettings: cron_jobs = [ # Daily at 3:00 AM cron(daily_cleanup, hour=3, minute=0), # Every hour at minute 30 cron(hourly_sync, minute=30), # Monday-Friday (0=Monday, 4=Friday) at 9:00 AM cron(weekday_report, weekday={0, 1, 2, 3, 4}, hour=9, minute=0), # Multiple hours: 9:12 AM, 12:12 PM, 6:12 PM cron(complex_schedule, hour={9, 12, 18}, minute=12), # Run at startup and then on schedule cron(daily_cleanup, hour=3, minute=0, run_at_startup=True), # With custom job_id for cross-schedule uniqueness cron(hourly_sync, minute=0, job_id='hourly-sync-v1'), # First day of every month at midnight cron(daily_cleanup, day=1, hour=0, minute=0), # With timeout and result retention cron(daily_cleanup, hour=3, minute=0, timeout=3600, keep_result=86400), ] redis_settings = RedisSettings() # Timezone for cron evaluation (defaults to system timezone) # timezone = timezone.utc ``` -------------------------------- ### Enqueue Jobs with arq Source: https://context7.com/python-arq/arq/llms.txt Queues jobs for execution using `enqueue_job`. Supports basic, deferred, unique, and custom-queued jobs. Deferred jobs can be scheduled by seconds, datetime, or timedelta. Custom job IDs ensure uniqueness. ```python import asyncio from datetime import datetime, timedelta from arq import create_pool from arq.connections import RedisSettings async def main(): redis = await create_pool(RedisSettings()) # Basic job enqueue job = await redis.enqueue_job('process_data', {'user_id': 123}) # Deferred execution - run in 30 seconds job = await redis.enqueue_job('send_email', 'user@example.com', _defer_by=30) # Deferred execution - run at specific time job = await redis.enqueue_job('generate_report', _defer_until=datetime(2024, 12, 31, 23, 59)) # Deferred by timedelta job = await redis.enqueue_job('cleanup', _defer_by=timedelta(hours=1)) # Custom job ID for uniqueness (returns None if job already exists) job = await redis.enqueue_job('sync_user', user_id=42, _job_id='sync-user-42') duplicate = await redis.enqueue_job('sync_user', user_id=42, _job_id='sync-user-42') print(duplicate) # None - job already exists # Custom queue name job = await redis.enqueue_job('critical_task', _queue_name='high-priority') # Set expiration (job won't run after this duration) job = await redis.enqueue_job('time_sensitive', _expires=timedelta(minutes=5)) if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Job Retries with Retry Exception Source: https://context7.com/python-arq/arq/llms.txt Illustrates how to use the Retry exception to implement custom retry logic for jobs, including exponential backoff based on attempt number. Requires a running ARQ worker. ```python import asyncio from httpx import AsyncClient from arq import create_pool, Retry from arq.connections import RedisSettings async def fetch_with_retry(ctx, url: str): """Fetches URL with automatic retry on failure.""" session: AsyncClient = ctx['session'] job_try = ctx['job_try'] # Current attempt number (1-indexed) response = await session.get(url) if response.status_code == 429: # Rate limited # Retry with exponential backoff: 5s, 10s, 15s, 20s raise Retry(defer=job_try * 5) if response.status_code >= 500: # Server error # Retry after 30 seconds raise Retry(defer=30) if response.status_code != 200: raise ValueError(f'Unexpected status: {response.status_code}') return response.json() async def startup(ctx): ctx['session'] = AsyncClient() async def shutdown(ctx): await ctx['session'].aclose() async def main(): redis = await create_pool(RedisSettings()) job = await redis.enqueue_job('fetch_with_retry', 'https://api.example.com/data') try: result = await job.result(timeout=60) print(f'Data: {result}') except Exception as e: print(f'Failed after all retries: {e}') class WorkerSettings: functions = [fetch_with_retry] on_startup = startup on_shutdown = shutdown max_tries = 5 # Allow up to 5 attempts if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Retry Jobs with Back-off Source: https://github.com/python-arq/arq/blob/main/docs/index.md Shows how to raise an arq.worker.Retry exception within a task to defer execution, useful for handling transient failures. ```python import asyncio from httpx import AsyncClient from arq import create_pool, Retry from arq.connections import RedisSettings async def download_content(ctx, url): session: AsyncClient = ctx['session'] response = await session.get(url) if response.status_code != 200: # retry the job with increasing back-off # delays will be 5s, 10s, 15s, 20s # after max_tries (default 5) the job will permanently fail raise Retry(defer=ctx['job_try'] * 5) return len(response.text) async def startup(ctx): ctx['session'] = AsyncClient() async def shutdown(ctx): await ctx['session'].aclose() async def main(): redis = await create_pool(RedisSettings()) await redis.enqueue_job('download_content', 'https://httpbin.org/status/503') class WorkerSettings: functions = [download_content] on_startup = startup on_shutdown = shutdown if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Custom Serialization with MessagePack in ARQ Source: https://context7.com/python-arq/arq/llms.txt Implement custom job serialization using `job_serializer` and `job_deserializer` with libraries like MessagePack. Ensure the same serializer is used in both the client and worker for data consistency. ```python import asyncio import msgpack # pip install msgpack from arq import create_pool from arq.connections import RedisSettings async def process_data(ctx, data: dict): """Process data that was serialized with msgpack.""" return {'processed': True, 'input': data} async def main(): # Create pool with custom serializer redis = await create_pool( RedisSettings(), job_serializer=msgpack.packb, job_deserializer=lambda b: msgpack.unpackb(b, raw=False), ) job = await redis.enqueue_job('process_data', {'key': 'value', 'count': 42}) print(f'Enqueued: {job.job_id}') class WorkerSettings: functions = [process_data] # Must use same serializer in worker job_serializer = msgpack.packb job_deserializer = lambda b: msgpack.unpackb(b, raw=False) if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Schedule Deferred Jobs Source: https://github.com/python-arq/arq/blob/main/docs/index.md Use _defer_by or _defer_until in enqueue_job to schedule tasks for future execution. ```python import asyncio from datetime import datetime, timedelta from arq import create_pool from arq.connections import RedisSettings async def the_task(ctx): print('this is the tasks, delay since enqueueing:', datetime.now() - ctx['enqueue_time']) async def main(): redis = await create_pool(RedisSettings()) # deferred by 10 seconds await redis.enqueue_job('the_task', _defer_by=10) # deferred by 1 minute await redis.enqueue_job('the_task', _defer_by=timedelta(minutes=1)) # deferred until jan 28th 2032, you'll be waiting a long time for this... await redis.enqueue_job('the_task', _defer_until=datetime(2032, 1, 28)) class WorkerSettings: functions = [the_task] if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Run Arq Health Check via CLI Source: https://github.com/python-arq/arq/blob/main/docs/index.md Execute a health check for Arq using the CLI. The command exits with 0 if the health check key is found, and 1 otherwise. ```default arq --check demo.WorkerSettings ``` -------------------------------- ### Configure Worker Health Checks Source: https://context7.com/python-arq/arq/llms.txt Use health_check_interval in WorkerSettings to enable periodic health updates, and utilize check_health or async_check_health for monitoring. ```python import asyncio from arq import check_health, create_pool from arq.connections import RedisSettings from arq.worker import async_check_health async def my_task(ctx): return 'done' class WorkerSettings: functions = [my_task] redis_settings = RedisSettings() health_check_interval = 60 # Update health key every 60 seconds # health_check_key = 'custom:health:key' # Optional custom key # Synchronous health check (for CLI or scripts) def run_health_check(): """Returns 0 if healthy, 1 if not.""" exit_code = check_health(WorkerSettings) print(f'Health check exit code: {exit_code}') return exit_code # Async health check async def async_health_check(): exit_code = await async_check_health( RedisSettings(), health_check_key=None, # Uses default queue_name=None, # Uses default ) return exit_code # CLI usage: arq --check mymodule.WorkerSettings ``` -------------------------------- ### Schedule Cron Jobs with Arq Source: https://github.com/python-arq/arq/blob/main/docs/index.md Define cron jobs to run periodically at specific times using Arq. The `cron` function allows specifying hours, minutes, and seconds for job execution. ```default from arq import cron async def run_regularly(ctx): print('run foo job at 9.12am, 12.12pm and 6.12pm') class WorkerSettings: cron_jobs = [ cron(run_regularly, hour={9, 12, 18}, minute=12) ] ``` -------------------------------- ### CLI Commands for arq Source: https://github.com/python-arq/arq/blob/main/docs/index.md Common command-line interface commands for managing arq workers. ```default python demo.py ``` ```default arq demo.WorkerSettings ``` ```default arq demo.WorkerSettings --watch path/to/src ``` ```default arq --help ``` -------------------------------- ### Test worker with run_check Source: https://context7.com/python-arq/arq/llms.txt Executes a worker and validates job completion, raising an exception if any jobs fail. ```python async def run_tests(): worker = Worker(functions=[my_task], redis_settings=RedisSettings(), burst=True) try: jobs_completed = await worker.run_check() print(f'{jobs_completed} jobs completed successfully') except Exception as e: print(f'Jobs failed: {e}') finally: await worker.close() ``` -------------------------------- ### Define arq Worker Functions Source: https://context7.com/python-arq/arq/llms.txt Defines asynchronous worker functions that process jobs. Functions receive a context dictionary and job arguments. `WorkerSettings` configures functions, startup/shutdown hooks, and worker parameters. ```python import asyncio from httpx import AsyncClient from arq import create_pool, func from arq.connections import RedisSettings async def download_content(ctx, url: str): """Worker function that downloads content from a URL.""" session: AsyncClient = ctx['session'] response = await session.get(url) print(f'Job {ctx["job_id"]} (try {ctx["job_try"]}): Downloaded {len(response.text)} bytes') return {'url': url, 'size': len(response.text)} async def startup(ctx): """Called when worker starts.""" ctx['session'] = AsyncClient() async def shutdown(ctx): """Called when worker shuts down.""" await ctx['session'].aclose() # WorkerSettings class configures the worker class WorkerSettings: functions = [download_content] on_startup = startup on_shutdown = shutdown redis_settings = RedisSettings() max_jobs = 10 # Maximum concurrent jobs job_timeout = 300 # 5 minutes default timeout max_tries = 5 # Maximum retry attempts ``` -------------------------------- ### Query Queued Jobs and Results Source: https://context7.com/python-arq/arq/llms.txt Access the queue state and completed job results using the queued_jobs and all_job_results methods on the redis pool. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings async def example_task(ctx, x: int): return x * 2 async def main(): redis = await create_pool(RedisSettings()) # Enqueue some jobs await redis.enqueue_job('example_task', 1) await redis.enqueue_job('example_task', 2) await redis.enqueue_job('example_task', 3) # Get all queued jobs queued = await redis.queued_jobs() for job_def in queued: print(f'Queued: {job_def.function}({job_def.args}) - ID: {job_def.job_id}') # Get jobs from specific queue queued = await redis.queued_jobs(queue_name='custom-queue') # Get all job results (completed jobs) results = await redis.all_job_results() for result in results: print(f'Result: {result.function} -> {result.result} (success: {result.success})') class WorkerSettings: functions = [example_task] if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Run arq Workers Source: https://context7.com/python-arq/arq/llms.txt Execute workers via the CLI for production or programmatically using run_worker for custom integration. ```python import asyncio from arq import run_worker, Worker from arq.connections import RedisSettings async def my_task(ctx, data: str): return f'Processed: {data}' class WorkerSettings: functions = [my_task] redis_settings = RedisSettings() queue_name = 'arq:queue' max_jobs = 10 job_timeout = 300 poll_delay = 0.5 burst = False # Set True to exit after queue is empty # CLI usage (recommended): # arq mymodule.WorkerSettings # arq mymodule.WorkerSettings --burst # Process queue and exit # arq mymodule.WorkerSettings --watch src/ # Auto-reload on changes # Programmatic usage: def run_programmatically(): """Run worker synchronously (blocks until shutdown).""" worker = run_worker(WorkerSettings) print(f'Worker completed: {worker.jobs_complete} jobs') ``` -------------------------------- ### Run Synchronous Jobs in Arq Source: https://github.com/python-arq/arq/blob/main/docs/index.md Handle blocking synchronous tasks within Arq by running them in a separate executor using `loop.run_in_executor`. This prevents blocking the main event loop. ```default import time import functools import asyncio from concurrent import futures def sync_task(t): return time.sleep(t) async def the_task(ctx, t): blocking = functools.partial(sync_task, t) loop = asyncio.get_running_loop() return await loop.run_in_executor(ctx['pool'], blocking) async def startup(ctx): ctx['pool'] = futures.ProcessPoolExecutor() class WorkerSettings: functions = [the_task] on_startup = startup ``` -------------------------------- ### Run worker asynchronously Source: https://context7.com/python-arq/arq/llms.txt Executes a worker in burst mode and prints the count of completed jobs. ```python async def run_async(): """Run worker asynchronously.""" worker = Worker( functions=[my_task], redis_settings=RedisSettings(), burst=True, ) await worker.async_run() print(f'Completed: {worker.jobs_complete}') await worker.close() ``` -------------------------------- ### Abort Running Jobs in ARQ Source: https://context7.com/python-arq/arq/llms.txt Enable `allow_abort_jobs=True` in `WorkerSettings` to abort jobs. Use the `job.abort()` method to stop a job, with optional timeout and poll delay for the abort operation. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings async def long_running_task(ctx): """A task that can be interrupted.""" print('Starting long task...') try: for i in range(100): await asyncio.sleep(1) print(f'Progress: {i+1}%') except asyncio.CancelledError: print('Task was aborted!') raise return 'completed' async def main(): redis = await create_pool(RedisSettings()) # Enqueue long-running job job = await redis.enqueue_job('long_running_task') print(f'Job started: {job.job_id}') # Wait a bit then abort await asyncio.sleep(5) # Abort the job (returns True if successfully aborted) aborted = await job.abort(timeout=10, poll_delay=0.5) print(f'Aborted: {aborted}') class WorkerSettings: functions = [long_running_task] allow_abort_jobs = True # Required for abort functionality job_timeout = 120 if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Enforce Job Uniqueness Source: https://github.com/python-arq/arq/blob/main/docs/index.md Use custom _job_id to prevent duplicate jobs from being enqueued while a task is still running. ```python import asyncio from arq import create_pool from arq.connections import RedisSettings from arq.jobs import Job async def the_task(ctx): print('running the task with id', ctx['job_id']) async def main(): redis = await create_pool(RedisSettings()) # no id, random id will be generated job1 = await redis.enqueue_job('the_task') print(job1) """ > """ # random id again, again the job will be enqueued and a job will be returned job2 = await redis.enqueue_job('the_task') print(job2) """ > """ # custom job id, job will be enqueued job3 = await redis.enqueue_job('the_task', _job_id='foobar') print(job3) """ > """ # same custom job id, job will not be enqueued and enqueue_job will return None job4 = await redis.enqueue_job('the_task', _job_id='foobar') print(job4) """ > None """ # you can retrieve jobs by using arq.jobs.Job await redis.enqueue_job('the_task', _job_id='my_job') job5 = Job(job_id='my_job', redis=redis) print(job5) """ """ class WorkerSettings: functions = [the_task] if __name__ == '__main__': asyncio.run(main()) ``` -------------------------------- ### Arq Health Check Value Format Source: https://github.com/python-arq/arq/blob/main/docs/index.md The format of the health check value recorded by Arq in Redis. This indicates the current state of jobs processed by the worker. ```default Mar-01 17:41:22 j_complete=0 j_failed=0 j_retried=0 j_ongoing=0 queued=0 ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.