### Install and Run RQ Dashboard Source: https://python-rq.org/docs/monitoring Install the rq-dashboard package and run the command to start the web-based monitor. ```bash pip install rq-dashboard $ rq-dashboard ``` -------------------------------- ### Custom Worker Script for Performance Source: https://python-rq.org/docs/workers This example shows how to create a custom worker script to preload libraries before the worker loop starts, improving performance for jobs with lengthy setups or common module dependencies. ```APIDOC ## Custom Worker Script for Performance ### Description To improve throughput for jobs with lengthy setups or shared module dependencies, preload necessary libraries before the worker loop begins. This avoids the overhead of importing modules after each fork. ### Usage Create a Python script that imports modules before initializing and starting the RQ worker. ### Example ```python #!/usr/bin/env python from redis import Redis from rq import Worker # Preload libraries import library_that_you_want_preloaded # Provide the worker with the list of queues (str) to listen to. w = Worker(['default'], connection=Redis()) w.work() ``` ``` -------------------------------- ### Starting the RQ Cron Scheduler Source: https://python-rq.org/docs/cron Initiate the RQ cron scheduler by running the `rq cron` command with your configuration file. ```bash rq cron cron_config.py ``` -------------------------------- ### RQ Queue Operations Source: https://python-rq.org/docs Provides examples for common queue operations including getting the number of jobs, retrieving job IDs and instances, fetching a specific job by ID, emptying a queue, and deleting a queue with or without its jobs. ```python from rq import Queue from redis import Redis redis_conn = Redis() q = Queue(connection=redis_conn) # Getting the number of jobs in the queue # Note: Only queued jobs are counted, not including deferred ones print(len(q)) # Retrieving jobs queued_job_ids = q.job_ids # Gets a list of job IDs from the queue queued_jobs = q.jobs # Gets a list of enqueued job instances job = q.fetch_job('my_id') # Returns job having ID "my_id" # Emptying a queue, this will delete all jobs in this queue q.empty() # Deleting a queue q.delete(delete_jobs=True) # Passing in `True` will remove all jobs in the queue # queue is now unusable. It can be recreated by enqueueing jobs to it. ``` -------------------------------- ### Accessing and Using StartedJobRegistry Source: https://python-rq.org/docs/job_registries Demonstrates how to get a StartedJobRegistry, retrieve its associated queue, count the number of jobs, and list job IDs. It also shows how to check if a job exists within the registry. ```python import time from redis import Redis from rq import Queue from rq.registry import StartedJobRegistry from somewhere import count_words_at_url redis = Redis() queue = Queue(connection=redis) job = queue.enqueue(count_words_at_url, 'http://nvie.com') # get StartedJobRegistry by queue registry = StartedJobRegistry(queue=queue) # or get StartedJobRegistry by queue name and connection registry2 = StartedJobRegistry(name='my_queue', connection=redis) # sleep for a moment while job is taken off the queue time.sleep(0.1) print('Queue associated with the registry: %s' % registry.get_queue()) print('Number of jobs in registry %s' % registry.count) # get the list of ids for the jobs in the registry print('IDs in registry %s' % registry.get_job_ids()) # test if a job is in the registry using the job instance or job id print('Job in registry %s' % (job in registry)) print('Job in registry %s' % (job.id in registry)) ``` -------------------------------- ### Run RQ Worker with Scheduler Enabled (Programmatic) Source: https://python-rq.org/docs/scheduling Programmatically start an RQ worker with the scheduler enabled. Ensure Redis is connected and a queue is defined. ```python from rq import Worker, Queue from redis import Redis redis = Redis() queue = Queue(connection=redis) worker = Worker(queues=[queue], connection=redis) worker.work(with_scheduler=True) ``` -------------------------------- ### Starting Worker with Custom Serializer via CLI Source: https://python-rq.org/docs/workers Use the `rq worker` command with the `--serializer` option to specify a custom serializer path for the worker process. ```bash $ rq worker --serializer rq.serializers.JSONSerializer ``` -------------------------------- ### Run RQ Worker with Scheduler Source: https://python-rq.org/docs/scheduling Start an RQ worker that includes the scheduler component to process scheduled jobs. ```bash $ rq worker --with-scheduler ``` -------------------------------- ### RQ Cron Command Line Usage Source: https://python-rq.org/docs/cron How to use the `rq cron` command to schedule jobs from the command line, including available options and examples. ```APIDOC ## Command Line Usage Use the `rq cron` command to schedule jobs. You can specify a file path or a dotted module path to your cron configuration. ### Basic Usage ```bash $ rq cron my_cron_config.py ``` ### Using Module Path ```bash $ rq cron myapp.cron_config ``` ### Options * `--url`, `-u`: Redis connection URL (env: RQ_REDIS_URL) * `--config`, `-c`: Python module with RQ settings (env: RQ_CONFIG) * `--path`, `-P`: Additional Python import paths (can be specified multiple times) * `--logging-level`, `-l`: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL; default: INFO) * `--worker-class`, `-w`: Dotted path to RQ Worker class * `--job-class`, `-j`: Dotted path to RQ Job class * `--queue-class`: Dotted path to RQ Queue class * `--connection-class`: Dotted path to Redis client class * `--serializer`, `-S`: Dotted path to serializer class ### Positional Argument * `config_path`: File path or module path to your cron configuration ### Example ```bash $ rq cron myapp.cron_config --url redis://localhost:6379/1 --path src ``` ``` -------------------------------- ### Configure Custom Job and Queue Classes for RQ Worker Source: https://python-rq.org/docs/workers Use the `--job-class` and `--queue-class` arguments when starting an RQ worker to specify custom classes. Ensure these same classes are used when enqueueing jobs. ```bash $ rq worker --job-class 'custom.JobClass' --queue-class 'custom.QueueClass' ``` -------------------------------- ### Python Example for Custom Job and Queue Classes Source: https://python-rq.org/docs/workers Define custom `Job` and `Queue` classes in Python and use them to enqueue jobs. This ensures consistency between worker configuration and job creation. ```python from rq import Queue from rq.job import Job class CustomJob(Job): pass class CustomQueue(Queue): job_class = CustomJob queue = CustomQueue('default', connection=redis_conn) queue.enqueue(some_func) ``` -------------------------------- ### Start RQ Workers Source: https://python-rq.org/docs/workers Start RQ workers to process jobs from specified queues. Workers run in an endless loop, waiting for new work when queues are empty. For concurrent job processing, start multiple workers. ```bash $ rq worker high default low *** Listening for work on high, default, low Got send_newsletter('me@nvie.com') from default Job ended normally without result *** Listening for work on high, default, low ... ``` -------------------------------- ### Redis Sentinel Configuration Source: https://python-rq.org/docs/connections Example configuration for Redis Sentinel, specifying instances, master name, database, authentication, and timeouts. This enables fault-tolerant connections for workers and RQ. ```yaml SENTINEL: { 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], 'MASTER_NAME': 'master', 'DB': 2, 'USERNAME': 'redis-user', 'PASSWORD': 'redis-secret', 'SOCKET_TIMEOUT': None, 'CONNECTION_KWARGS': { # Eventual addition Redis connection arguments 'ssl_ca_path': None, }, 'SENTINEL_KWARGS': { # Eventual Sentinels connections arguments 'username': 'sentinel-user', 'password': 'sentinel-secret', }, } ``` -------------------------------- ### Better Worker Process Title Source: https://python-rq.org/docs/workers Improve the visibility of RQ worker processes in system monitoring tools (like `ps` and `top`) by installing the `setproctitle` package. ```APIDOC ## Better Worker Process Title ### Description Install the `setproctitle` package to provide more descriptive titles for RQ worker processes, making them easier to identify and manage in system monitoring tools. ### Installation ```bash pip install setproctitle ``` ``` -------------------------------- ### Accessing Job Registries from Queue Objects Source: https://python-rq.org/docs/job_registries Shows how to directly access various job registries (Started, Deferred, Finished, Failed, Scheduled) as attributes of a Queue object. This is a convenient way to get registry instances introduced in version 1.2.0. ```python from redis import Redis from rq import Queue redis = Redis() queue = Queue(connection=redis) queue.started_job_registry # Returns StartedJobRegistry queue.deferred_job_registry # Returns DeferredJobRegistry queue.finished_job_registry # Returns FinishedJobRegistry queue.failed_job_registry # Returns FailedJobRegistry queue.scheduled_job_registry # Returns ScheduledJobRegistry ``` -------------------------------- ### Job Position in Queue Source: https://python-rq.org/docs/jobs This section describes how to get the position of a job within the work queue for user feedback or debugging. It provides examples using `job.get_position()` and `q.get_job_position()`, and notes that this operation can be slow on very large queues. ```APIDOC ## Job Position in Queue For user feedback or debuging it is possible to get the position of a job within the work queue. This allows to track the job processing through the queue. This function iterates over all jobs within the queue and therefore does perform poorly on very large job queues. ```python from rq import Queue from redis import Redis from hello import say_hello redis_conn = Redis() q = Queue(connection=redis_conn) job = q.enqueue(say_hello) job2 = q.enqueue(say_hello) job2.get_position() # returns 1 q.get_job_position(job) # return 0 ``` ``` -------------------------------- ### Get Job Executions Source: https://python-rq.org/docs/jobs Retrieves all execution data for a given job, including start times and heartbeats. ```APIDOC ## Job Executions ### Description Accesses the execution data for a job, providing details about each time the job has run. ### Method `job.get_executions()` ### Parameters - `job` (Job): The Job object for which to retrieve executions. ### Request Example ```python from redis import Redis from rq.job import Job redis = Redis() job = Job.fetch('my_job_id', connection=redis) executions = job.get_executions() # Returns all current executions execution = job.get_executions()[0] # Retrieves a single execution print(execution.created_at) # When did this execution start? print(execution.last_heartbeat) # Worker's last heartbeat ``` ### Execution Object Properties - `id`: ID of an execution. - `job`: The `Job` object that owns this execution instance. - `composite_key`: A combination of `job.id` and `execution.id`, formatted as `:`. - `created_at`: A datetime object representing the start of this execution. - `last_heartbeat`: The worker's last heartbeat timestamp for this execution. ``` -------------------------------- ### Starting RQ Worker with a Custom Name Source: https://python-rq.org/docs/workers Specify a custom name for an RQ worker when instantiating it. This overrides the default random naming and helps in identifying specific workers. ```python from redis import Redis from rq import Queue, Worker redis = Redis() queue = Queue('queue_name') # Start a worker with a custom name worker = Worker([queue], connection=redis, name='foo') ``` -------------------------------- ### Configure Custom Exception Handler for RQ Worker Source: https://python-rq.org/docs/workers Specify a custom exception handler using the `--exception-handler` option when starting an RQ worker. Multiple handlers can be chained. ```bash $ rq worker --exception-handler 'path.to.my.ErrorHandler' ``` ```bash $ rq worker --exception-handler 'path.to.my.ErrorHandler' --exception-handler 'another.ErrorHandler' ``` -------------------------------- ### Set Custom Failure TTL for Jobs Source: https://python-rq.org/docs/jobs Configure the time-to-live (TTL) for failed jobs in seconds when enqueueing them. This example sets a 5-minute TTL. ```python job = queue.enqueue(foo_job, failure_ttl=300) # 5 minutes in seconds ``` -------------------------------- ### Start RQ Workers in Burst Mode Source: https://python-rq.org/docs/workers Start RQ workers in burst mode to process all available jobs in the specified queues and then quit. This is useful for batch processing or temporary scaling during peak periods. ```bash $ rq worker --burst high default low *** Listening for work on high, default, low Got send_newsletter('me@nvie.com') from default Job ended normally without result No more work, burst finished. Registering death. ``` -------------------------------- ### Storing arbitrary data on jobs Source: https://python-rq.org/docs/jobs This section describes how to store arbitrary pickleable data on a job using the `meta` property. It provides an example of updating the `meta` dictionary with custom information and saving it using `job.save_meta()`. ```APIDOC ## Storing arbitrary data on jobs To add/update custom status information on this job, you have access to the `meta` property, which allows you to store arbitrary pickleable data on the job itself: ```python import socket def add(x, y): job = get_current_job() job.meta['handled_by'] = socket.gethostname() job.save_meta() # do more work time.sleep(1) return x + y ``` ``` -------------------------------- ### Basic Cron Configuration File Source: https://python-rq.org/docs/cron A sample Python configuration file demonstrating how to register multiple scheduled jobs with different settings, including arguments, timeouts, and TTLs. ```python # my_cron_config.py from rq.cron import register from myapp.tasks import cleanup_database, generate_reports, backup_files # Simple job - runs every 60 seconds register(cleanup_database, queue_name='maintenance', interval=60) # Job with arguments register( generate_reports, queue_name='reports', args=('daily',), kwargs={'format': 'pdf', 'email': True}, interval=3600 ) # Job with custom timeout and TTL settings register( backup_files, queue_name='backup', cron='0 2 * * *', # Daily at 2 AM job_timeout=1800, # 30 minutes max execution time result_ttl=3600, # Keep results for 1 hour failure_ttl=86400 # Keep failed jobs for 1 day ) ``` -------------------------------- ### Access RQ Job Execution Data Source: https://python-rq.org/docs/jobs Access `Execution` objects to get details about a job's execution, such as its start time and the worker's last heartbeat. `job.get_executions()` returns all current executions for a job. ```python from redis import Redis from rq.job import Job redis = Redis() job = Job.fetch('my_job_id', connection=redis) executions = job.get_executions() # Returns all current executions execution = job.get_executions()[0] # Retrieves a single execution print(execution.created_at) # When did this execution start? print(execution.last_heartbeat) # Worker's last heartbeat ``` -------------------------------- ### Basic Redis Connection for RQ Queue Source: https://python-rq.org/docs/connections Instantiate a Redis client and pass it to the RQ Queue constructor for basic connection management. ```python from redis import Redis from rq import Queue redis = Redis('localhost', 6789) q = Queue(connection=redis) ``` -------------------------------- ### Multiple Redis Connections for Different Queues Source: https://python-rq.org/docs/connections Demonstrates how to use distinct Redis connections for different RQ queues by passing separate connection objects to their constructors. ```python from rq import Queue from redis import Redis conn1 = Redis('localhost', 6379) conn2 = Redis('remote.host.org', 9836) q1 = Queue('foo', connection=conn1) q2 = Queue('bar', connection=conn2) ``` -------------------------------- ### Enqueue Jobs with Various Argument Types in RQ Source: https://python-rq.org/docs Demonstrates how to enqueue jobs with different argument types, including strings, dictionaries, tuples, None, booleans, and file contents, using the `rq enqueue` command and its Python equivalent. ```shell rq enqueue path.to.func abc ``` ```python queue.enqueue(path.to.func, 'abc') ``` ```shell rq enqueue path.to.func abc=def ``` ```python queue.enqueue(path.to.func, abc='def') ``` ```shell rq enqueue path.to.func ':{ "json": "abc" }' ``` ```python queue.enqueue(path.to.func, {'json': 'abc'}) ``` ```shell rq enqueue path.to.func 'key:= { "json": "abc" }' ``` ```python queue.enqueue(path.to.func, key={'json': 'abc'}) ``` ```shell rq enqueue path.to.func %1, 2 ``` ```python queue.enqueue(path.to.func, (1, 2)) ``` ```shell rq enqueue path.to.func %None ``` ```python queue.enqueue(path.to.func, None) ``` ```shell rq enqueue path.to.func %True ``` ```python queue.enqueue(path.to.func, True) ``` ```shell rq enqueue path.to.func 'key%= (1, 2)' ``` ```python queue.enqueue(path.to.func, key=(1, 2)) ``` ```shell rq enqueue path.to.func 'key%= { "foo": True }' ``` ```python queue.enqueue(path.to.func, key={'foo': True}) ``` ```shell rq enqueue path.to.func @path/to/file ``` ```python queue.enqueue(path.to.func, open('path/to/file', 'r').read()) ``` ```shell rq enqueue path.to.func key=@path/to/file ``` ```python queue.enqueue(path.to.func, key=open('path/to/file', 'r').read()) ``` ```shell rq enqueue path.to.func :@path/to/file.json ``` ```python queue.enqueue(path.to.func, json.loads(open('path/to/file.json', 'r').read())) ``` ```shell rq enqueue path.to.func key:=@path/to/file.json ``` ```python queue.enqueue(path.to.func, key=json.loads(open('path/to/file.json', 'r').read())) ``` -------------------------------- ### Custom Worker Script for Preloading Modules Source: https://python-rq.org/docs/workers Use a custom worker script to import necessary modules before the worker loop starts. This improves performance for jobs with lengthy setups or shared module dependencies by avoiding repeated imports after forking. ```python #!/usr/bin/env python from redis import Redis from rq import Worker # Preload libraries import library_that_you_want_preloaded # Provide the worker with the list of queues (str) to listen to. w = Worker(['default'], connection=Redis()) w.work() ``` -------------------------------- ### Create Job Source: https://python-rq.org/docs/jobs Demonstrates how to create a new job with custom options like timeout, result TTL, and explicit arguments for the job function. ```APIDOC ## Job Creation ### Description Creates a new job with specified function, arguments, and various configuration options. ### Method `Job.create(*args, **kwargs)` ### Parameters - `timeout` (int or str): Maximum runtime of the job in seconds. Can include units like 'h', 'm', 's'. - `result_ttl` (int): Time in seconds to keep successful jobs and their results. - `ttl` (int or None): Maximum queued time in seconds before the job is discarded. - `failure_ttl` (int): Time in seconds to keep failed jobs. - `depends_on` (Job or str): Job or job ID that must complete before this job is queued. - `id` (str): Manually specify the job's ID. - `description` (str): Additional description for the job. - `connection`: - `status`: - `origin` (str): Queue name where the job was enqueued. - `meta` (dict): Custom status information. - `args` (tuple): Positional arguments for the job function. - `kwargs` (dict): Keyword arguments for the job function. ### Request Example ```python from rq.job import Job # Example with basic arguments job = Job.create(count_words_at_url, 'http://nvie.com', id='my_job_id') # Example passing arguments and kwargs to the job function job = Job.create(count_words_at_url, ttl=30, # This ttl will be used by RQ args=('http://nvie.com',), kwargs={ 'description': 'Function description', # This is passed on to count_words_at_url 'ttl': 15 # This is passed on to count_words_at_url function }) ``` ``` -------------------------------- ### Pushing and Popping Connections for Testing Source: https://python-rq.org/docs/connections Shows how to use `push_connection` and `pop_connection` to manage Redis connections programmatically, often useful for setting up unit tests without using a `with` statement. ```python import unittest from rq import Queue from rq import push_connection, pop_connection class MyTest(unittest.TestCase): def setUp(self): push_connection(Redis()) def tearDown(self): pop_connection() def test_foo(self): """Any queues created here use local Redis.""" q = Queue() ``` -------------------------------- ### RQ Cron Programmatic API - Basic Usage Source: https://python-rq.org/docs/cron Demonstrates how to create and manage a cron scheduler programmatically using the RQ Python library. ```APIDOC ## Programmatic API - Basic Usage This section shows how to use the `CronScheduler` class to register and start scheduled jobs. ### Creating a Cron Scheduler ```python from redis import Redis from rq.cron import CronScheduler from myapp.tasks import cleanup_database, send_reports # Create a cron scheduler redis_conn = Redis(host='localhost', port=6379, db=0) cron = CronScheduler(connection=redis_conn, logging_level='INFO') # Register jobs cron.register( cleanup_database, queue_name='maintenance', interval=3600 ) cron.register( send_reports, queue_name='reports', args=('daily',), cron='0 9 * * *' # Daily at 9 AM ) # Start the scheduler (this will block until interrupted) try: cron.start() except KeyboardInterrupt: print("Shutting down cron scheduler...") ``` ``` -------------------------------- ### Counting Workers Source: https://python-rq.org/docs/workers For monitoring purposes, `Worker.count()` provides a more performant way to get the number of workers, either for a connection or a specific queue. ```APIDOC ## Counting Workers ### Description Efficiently obtain the number of active workers. Use `Worker.count()` for performance-critical monitoring scenarios. ### Count All Workers Get the total number of workers connected to a Redis instance. ```python from redis import Redis from rq import Worker redis = Redis() worker_count = Worker.count(connection=redis) print(f"Total workers: {worker_count}") ``` ### Count Workers for a Specific Queue Get the number of workers listening to a particular queue. ```python from redis import Redis from rq import Queue, Worker redis = Redis() queue = Queue('queue_name', connection=redis) queue_worker_count = Worker.count(queue=queue) print(f"Workers for queue 'queue_name': {queue_worker_count}") ``` ``` -------------------------------- ### Synchronous Job Execution in RQ (is_async=False) Source: https://python-rq.org/docs Illustrates how to configure a queue to execute jobs synchronously within the same process by setting `is_async=False` in the Queue constructor. This is useful for testing and behaves similarly to Celery's `ALWAYS_EAGER` mode, though a Redis connection is still required. ```python q = Queue('low', is_async=False, connection=my_redis_conn) job = q.enqueue(fib, 8) job.result ``` -------------------------------- ### Create and enqueue a job directly Source: https://python-rq.org/docs/jobs Creates a job instance directly using `Job.create()` and then enqueues it. This method is useful when you need to manipulate the job object before enqueuing. ```python from rq.job import Job job = Job.create(count_words_at_url, 'http://nvie.com') print('Job id: %s' % job.id) q.enqueue_job(job) ``` -------------------------------- ### Get Job Position in Queue Source: https://python-rq.org/docs/jobs Retrieve a job's position within a queue using `job.get_position()` or `q.get_job_position(job)`. Note that this can be inefficient for very large queues. ```python from rq import Queue from redis import Redis from hello import say_hello redis_conn = Redis() q = Queue(connection=redis_conn) job = q.enqueue(say_hello) job2 = q.enqueue(say_hello) job2.get_position() # returns 1 q.get_job_position(job) # return 0 ``` -------------------------------- ### Display All Queues and Workers Source: https://python-rq.org/docs/monitoring Use `rq info` to view all existing queues, their job counts, and active workers with their associated queues. ```bash $ rq info ``` -------------------------------- ### Organize Workers by Queue Source: https://python-rq.org/docs/monitoring Use the `-R` or `--by-queue` flag with `rq info` to display workers organized by the queues they are listening on. ```bash $ rq info -R ``` -------------------------------- ### Using Connection Context Manager (Deprecated) Source: https://python-rq.org/docs/connections Illustrates the deprecated `Connection` context manager for temporarily overriding the default Redis connection for newly created RQ objects within its scope. Note: This approach is deprecated and explicit connection management is recommended. ```python from rq import Queue, Connection from redis import Redis with Connection(Redis('localhost', 6379)): q1 = Queue('foo') with Connection(Redis('remote.host.org', 9836)): q2 = Queue('bar') q3 = Queue('qux') assert q1.connection != q2.connection assert q2.connection != q3.connection assert q1.connection == q3.connection ``` -------------------------------- ### Shortcut for Latest Job Return Value Source: https://python-rq.org/docs/results Use `job.return_value()` as a convenient shortcut to get the return value of the latest successful job execution. Returns None if the latest result is not successful. ```python job = Job.fetch(id='my_id', connection=redis) print(job.return_value()) # Shortcut for job.latest_result().return_value ``` -------------------------------- ### Create RQ Job with Arguments and Keyword Arguments Source: https://python-rq.org/docs/jobs When creating a job, you can pass arguments and keyword arguments to the underlying job function. Use `kwargs` to pass arguments that might conflict with RQ's own parameters, like `description` or `ttl`. ```python job = Job.create(count_words_at_url, ttl=30, # This ttl will be used by RQ args=('http://nvie.com',), kwargs={ 'description': 'Function description', # This is passed on to count_words_at_url 'ttl': 15 # This is passed on to count_words_at_url function }) ``` -------------------------------- ### Monitor and Inspect Failed Jobs Source: https://python-rq.org/docs/jobs This snippet demonstrates how to enqueue a job that will fail, access the FailedJobRegistry, and verify that the failed job is stored there. It requires setting up Redis and an RQ Queue. ```python from redis import Redis from rq import Queue from rq.job import Job def div_by_zero(x): return x / 0 connection = Redis() queue = Queue(connection=connection) job = queue.enqueue(div_by_zero, 1) registry = queue.failed_job_registry worker = Worker([queue]) worker.work(burst=True) assert len(registry) == 1 # Failed jobs are kept in FailedJobRegistry ``` -------------------------------- ### Registering Interval-Based Jobs Source: https://python-rq.org/docs/cron Schedule jobs to run at a fixed interval in seconds using the `interval` parameter. ```python cron.register(my_function, queue_name='default', interval=300) # Every 5 minutes ``` ```python cron.register(hourly_task, queue_name='hourly', interval=3600) # Every hour ``` -------------------------------- ### Time to live for job in queue Source: https://python-rq.org/docs/jobs This section explains the two Time To Live (TTL) settings for jobs: `result_ttl` for the job result and `ttl` for the job itself, which determines the maximum queued time before a job is discarded. Examples show how to set these when creating or enqueuing a job. ```APIDOC ## Time to live for job in queue A job has two TTLs, one for the job result, `result_ttl`, and one for the job itself, `ttl`. The latter is used if you have a job that shouldn’t be executed after a certain amount of time. ```python # When creating the job: job = Job.create(func=say_hello, result_ttl=600, # how long (in seconds) to keep the job (if successful) and its results ttl=43, # maximum queued time (in seconds) of the job before it's discarded. ) # or when queueing a new job: job = q.enqueue(count_words_at_url, 'http://nvie.com', result_ttl=600, # how long to keep the job (if successful) and its results ttl=43 # maximum queued time ) ``` ``` -------------------------------- ### Specify Config File via CLI Source: https://python-rq.org/docs/workers Use the `-c` option with the `rq worker` command to specify a Python module for configuration. ```bash $ rq worker -c settings ``` -------------------------------- ### Registering Cron-Based Jobs Source: https://python-rq.org/docs/cron Schedule jobs using standard cron syntax with the `cron` parameter. Supports various frequencies like daily, every 15 minutes, and weekly. ```python # Every day at 2:30 AM cron.register(daily_backup, queue_name='maintenance', cron='30 2 * * *') ``` ```python # Every 15 minutes cron.register(frequent_task, queue_name='default', cron='*/15 * * * *') ``` ```python # Weekly on Sundays at 6:00 PM cron.register(weekly_report, queue_name='reports', cron='0 18 * * 0') ``` -------------------------------- ### Job / Queue Creation with Custom Serializer Source: https://python-rq.org/docs/jobs This section describes how to use custom serializers when creating jobs or queues, allowing for serialization and de-serialization of job arguments. It highlights the security risks of the default `pickle` serializer and recommends using alternatives like `JSONSerializer` for primitive argument types. ```APIDOC ## Job / Queue Creation with Custom Serializer When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. Serializers must implement `loads` and `dumps`. The default serializer is `pickle`. > **Warning:** RQ uses `pickle` as its default serializer, which **is not secure**. Only run RQ against Redis instances that you trust. It is possible to construct malicious pickle data that will execute arbitrary code during unpickling. To avoid pickle, use an alternative serializer, such as `JSONSerializer`, when enqueueing and processing jobs. JSON only supports primitive argument types (str, int, float, bool, list, dict, None). For example, to enqueue jobs with `JSONSerializer`: ```python from rq import Queue from rq.serializers import JSONSerializer queue = Queue(connection=connection, serializer=JSONSerializer) job = queue.enqueue('my_module.count_words', 'https://example.com') ``` Then run workers with the same serializer: ```bash $ rq worker --serializer rq.serializers.JSONSerializer ``` ``` -------------------------------- ### RQ Worker Configuration File Options Source: https://python-rq.org/docs/workers Use this Python file to configure worker settings such as Redis connection, queues, Sentry DSN, custom worker names, and logging. ```python REDIS_URL = 'redis://localhost:6379/1' # You can also specify the Redis DB to use # REDIS_HOST = 'redis.example.com' # REDIS_PORT = 6380 # REDIS_DB = 3 # REDIS_PASSWORD = 'very secret' # Queues to listen on QUEUES = ['high', 'default', 'low'] # If you're using Sentry to collect your runtime exceptions, you can use this # to configure RQ for it in a single step # The 'sync+' prefix is required for raven: https://github.com/nvie/rq/issues/350#issuecomment-43592410 SENTRY_DSN = 'sync+http://public:secret@example.com/1' # If you want custom worker name # NAME = 'worker-1024' # If you want to use a dictConfig # for more complex/consistent logging requirements. DICT_CONFIG = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'standard': { 'format': '%(asctime)s [%(levelname)s] %(name)s: %(message)s' }, }, 'handlers': { 'default': { 'level': 'INFO', 'formatter': 'standard', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stderr', # Default is stderr }, }, 'loggers': { 'root': { # root logger 'handlers': ['default'], 'level': 'INFO', 'propagate': False }, } } ``` -------------------------------- ### RQ CLI Enqueue Options Source: https://python-rq.org/docs Available options for the `rq enqueue` command, including queue selection, timeouts, result TTL, dependencies, and scheduling. ```bash * -q, --queue [value] The name of the queue. * --timeout [value] Specifies the maximum runtime of the job before it is interrupted and marked as failed. * --result-ttl [value] Specifies how long successful jobs and their results are kept. * --ttl [value] Specifies the maximum queued time of the job before it is discarded. * --failure-ttl [value] Specifies how long failed jobs are kept. * --description [value] Additional description of the job * --depends-on [value] Specifies another job id that must complete before this job will be queued. * --job-id [value] The id of this job * --at-front Will place the job at the front of the queue, instead of the end * --retry-max [value] Maximum number of retries * --retry-interval [value] Interval between retries in seconds * --schedule-in [value] Delay until the function is enqueued (e.g. 10s, 5m, 2d). * --schedule-at [value] Schedule job to be enqueued at a certain time formatted in ISO 8601 without timezone (e.g. 2021-05-27T21:45:00). * --quiet Only logs errors. ``` -------------------------------- ### RQ CLI Enqueue Usage Source: https://python-rq.org/docs Command-line interface for enqueuing jobs. Specify function, arguments, and various options like queue, timeout, and scheduling. ```bash rq enqueue [OPTIONS] FUNCTION [ARGUMENTS] ``` -------------------------------- ### Run RQ Cron Scheduler from CLI Source: https://python-rq.org/docs/cron Execute the `rq cron` command with a Python file or module path specifying cron job configurations. Options can be used to customize Redis connection, settings, and import paths. ```bash $ rq cron my_cron_config.py ``` ```bash $ rq cron myapp.cron_config ``` ```bash $ rq cron myapp.cron_config --url redis://localhost:6379/1 --path src ``` -------------------------------- ### Basic Programmatic API for RQ Cron Scheduler Source: https://python-rq.org/docs/cron Use the `CronScheduler` class to programmatically register and manage cron jobs. This involves creating a Redis connection, initializing the scheduler, and registering tasks with specific queues, arguments, and cron schedules. ```python from redis import Redis from rq.cron import CronScheduler from myapp.tasks import cleanup_database, send_reports # Create a cron scheduler redis_conn = Redis(host='localhost', port=6379, db=0) cron = CronScheduler(connection=redis_conn, logging_level='INFO') # Register jobs cron.register( cleanup_database, queue_name='maintenance', interval=3600 ) cron.register( send_reports, queue_name='reports', args=('daily',), cron='0 9 * * *' # Daily at 9 AM ) # Start the scheduler (this will block until interrupted) try: cron.start() except KeyboardInterrupt: print("Shutting down cron scheduler...") ``` -------------------------------- ### Chain RQ Jobs with Dependencies Source: https://python-rq.org/docs Execute jobs sequentially based on completion of prior jobs using the `depends_on` argument. A job will only run if its dependencies finish successfully by default. Multiple dependencies can be specified as a list. ```python q = Queue('low', connection=my_redis_conn) report_job = q.enqueue(generate_report) q.enqueue(send_report, depends_on=report_job) ``` ```python queue = Queue('low', connection=redis) foo_job = queue.enqueue(foo) bar_job = queue.enqueue(bar) baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job]) ``` -------------------------------- ### Group Multiple RQ Jobs with rq.group.Group Source: https://python-rq.org/docs Organize multiple jobs under a single group ID using `rq.group.Group`. This allows for collective tracking and management of related jobs. Jobs are prepared using `Queue.prepare_data`. ```python from rq import Queue from rq.group import Group from redis import Redis # Tell RQ what Redis connection to use redis_conn = Redis() q = Queue(connection=redis_conn) # no args implies the default queue group = Group.create(connection=redis_conn) jobs = group.enqueue_many( queue=q, job_datas=[ Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_job_id'), Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_other_job_id'), ] ) ``` ```python print(group.get_jobs()) # [Job('my_job_id'), Job('my_other_job_id')] ``` ```python from rq.group import Group group = Group.fetch(id='my_group', connection=redis_conn) ``` -------------------------------- ### Using the @job Decorator in RQ Source: https://python-rq.org/docs Demonstrates how to use the `@job` decorator to define a task that can be enqueued asynchronously. It shows how to specify queue name, connection, and timeout, and how to call the decorated function using `.delay()` and retrieve its return value. ```python from rq.decorators import job @job('low', connection=my_redis_conn, timeout=5) def add(x, y): return x + y job = add.delay(3, 4) time.sleep(1) print(job.return_value()) ``` -------------------------------- ### RQ CLI Enqueue Function Specification Source: https://python-rq.org/docs How to specify the function to execute via the RQ CLI, either as a dot-separated string reference or a Python file path. ```bash #### Function: There are two options: * Execute a function: dot-separated string of package, module and function (Just like passing a string to `queue.enqueue()`). * Execute a python file: dot-separated pathname of the file. Because it is technically an import `__name__ == '__main__'` will not work. ``` -------------------------------- ### Enqueue Many Jobs in Bulk with RQ Source: https://python-rq.org/docs Use `queue.enqueue_many()` to enqueue multiple jobs efficiently. This method utilizes a Redis pipeline for performance. `Queue.prepare_data` is used to format job arguments. ```python jobs = q.enqueue_many( [ Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_job_id'), Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_other_job_id'), ] ) ``` ```python with q.connection.pipeline() as pipe: jobs = q.enqueue_many( [ Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_job_id'), Queue.prepare_data(count_words_at_url, ('http://nvie.com',), job_id='my_other_job_id'), ], pipeline=pipe ) pipe.execute() ``` -------------------------------- ### List Failed Jobs and Exceptions Source: https://python-rq.org/docs/exceptions Iterate through the FailedJobRegistry to retrieve and print information about failed jobs and their associated exceptions. ```python from redis import Redis from rq import Queue from rq.job import Job from rq.registry import FailedJobRegistry redis = Redis() queue = Queue(connection=redis) registry = FailedJobRegistry(queue=queue) # Show all failed job IDs and the exceptions they caused during runtime for job_id in registry.get_job_ids(): job = Job.fetch(job_id, connection=redis) print(job_id, job.exc_info) ``` -------------------------------- ### Unsupported decode_responses=True for Redis Connection Source: https://python-rq.org/docs/connections Demonstrates the incorrect usage of `decode_responses=True` with a Redis connection in RQ. This argument is not supported and can lead to issues. ```python from redis import Redis from rq import Queue conn = Redis(..., decode_responses=True) # This is not supported q = Queue(connection=conn) ``` -------------------------------- ### Enqueue Job with Callbacks Source: https://python-rq.org/docs Enqueue a job with specified success, failure, and stopped callback functions. ```python queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure, on_stopped=report_stopped) ``` -------------------------------- ### Requeue All Failed Jobs via CLI Source: https://python-rq.org/docs/jobs Use the RQ CLI to requeue all jobs currently in a specified queue's failed job registry. Requires specifying the queue URL. ```bash # This command will requeue all jobs in myqueue's failed job registry rq requeue --queue myqueue -u redis://localhost:6379 --all ``` -------------------------------- ### Repeat Job with Varying Intervals Source: https://python-rq.org/docs/scheduling Schedule a job to repeat using a list of intervals, allowing for different delays between each repetition. ```python from redis import Redis from rq import Queue, Repeat queue = Queue(connection=Redis()) # Or use different intervals between repetitions job = queue.enqueue(my_function, repeat=Repeat(times=3, interval=[5, 10, 15])) ``` -------------------------------- ### SpawnWorker CLI Usage Source: https://python-rq.org/docs/workers Launch SpawnWorker from the command line using the `-w` option to enable cross-platform compatibility. ```bash rq worker -w rq.worker.SpawnWorker ``` -------------------------------- ### RQ Cron Monitoring - Listing Active Schedulers Source: https://python-rq.org/docs/cron How to list all currently active cron scheduler instances using `CronScheduler.all()`. ```APIDOC ## Monitoring Schedulers RQ provides tools to monitor and manage active cron schedulers. ### Listing Active Schedulers Use `CronScheduler.all()` to retrieve all active scheduler instances. ```python from redis import Redis from rq.cron import CronScheduler connection = Redis() active_schedulers = CronScheduler.all(connection) for scheduler in active_schedulers: print(f"{scheduler.name} (PID: {scheduler.pid})") ``` ``` -------------------------------- ### Create RQ Job with Custom ID Source: https://python-rq.org/docs/jobs Use `Job.create()` to create a job with a predetermined ID. This is useful for idempotency or when you need to reference a job before it's enqueued. ```python job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id') ``` -------------------------------- ### Run Jobs Synchronously with Fake Redis Source: https://python-rq.org/docs/testing For unit tests, set `is_async=False` on the Queue to execute jobs in the same thread, eliminating the need for separate workers. Use `FakeStrictRedis` to mock Redis, avoiding the need to run a Redis server during tests. ```python from fakeredis import FakeStrictRedis from rq import Queue queue = Queue(is_async=False, connection=FakeStrictRedis()) job = queue.enqueue(my_long_running_job) assert job.is_finished ``` -------------------------------- ### Handle RQ Job Failures with Dependency Class Source: https://python-rq.org/docs Control job execution flow even when dependencies fail using `rq.job.Dependency`. Set `allow_failure=True` to run dependents regardless of dependency status, and `enqueue_at_front=True` to prioritize dependent jobs. ```python from redis import Redis from rq.job import Dependency from rq import Queue queue = Queue(connection=Redis()) job_1 = queue.enqueue(div_by_zero) dependency = Dependency( jobs=[job_1], allow_failure=True, # allow_failure defaults to False enqueue_at_front=True # enqueue_at_front defaults to False ) job_2 = queue.enqueue(say_hello, depends_on=dependency) """ job_2 will execute even though its dependency (job_1) fails, and it will be enqueued at the front of the queue. """ ``` -------------------------------- ### Repeat Jobs Source: https://python-rq.org/docs/scheduling Configure jobs to repeat automatically using the `Repeat` class. Specify the number of times to repeat and the interval between repetitions. ```APIDOC ## Repeat Class ### Description Allows jobs to be repeated a specified number of times at defined intervals. ### Parameters - **times** (int) - The total number of times the job should be repeated (must be at least 1). - **interval** (int or list[int]) - The time interval in seconds between job repetitions. Can be a single value or a list of intervals. ### Request Example ```python from redis import Redis from rq import Queue, Repeat queue = Queue(connection=Redis()) # Repeat a job 3 times with the same interval of 30 seconds job = queue.enqueue(my_function, repeat=Repeat(times=3, interval=30)) # Repeat a job with different intervals for each repetition job = queue.enqueue(my_function, repeat=Repeat(times=3, interval=[5, 10, 15])) ``` ### How Repeat Works - Jobs with `Repeat` configuration are re-enqueued after successful completion. - `Repeat` only affects successfully completed jobs; failed jobs are not repeated. - If `interval` is 0, the job is re-enqueued immediately. - If the list of intervals is shorter than the number of repetitions, the last interval is used for remaining repeats. ### Checking Job Repeat Status - **job.repeats_left**: Returns the number of repeats remaining for the job. - **job.repeat_intervals**: Returns the list of intervals configured for the job's repetitions. ### Request Example (Checking Status) ```python job = queue.enqueue(my_function, repeat=Repeat(times=3, interval=30)) print(job.repeats_left) # Outputs: 3 print(job.repeat_intervals) # Outputs: [30] ``` ``` -------------------------------- ### Enable Interval Polling Source: https://python-rq.org/docs/monitoring Use the `--interval` flag with `rq info` to continuously update the statistics on the console. Be aware that low interval values can increase load on Redis. ```bash $ rq info --interval 1 ``` ```bash $ rq info --interval 0.5 ``` -------------------------------- ### Configure Callback Timeouts with Callback Class Source: https://python-rq.org/docs Use the Callback object to specify functions and custom timeouts for success, failure, and stopped callbacks. The timeout can be specified in seconds or as a string like '2m'. ```python from rq import Callback queue.enqueue(say_hello, on_success=Callback(report_success), # default callback timeout (60 seconds) on_failure=Callback(report_failure, timeout=10), # 10 seconds timeout on_stopped=Callback(report_stopped, timeout="2m")) # 2 minute timeout ```