### TaskTiger Initialization with Redis Source: https://context7.com/closeio/tasktiger/llms.txt Initialize TaskTiger using a Redis connection. This is a common setup for TaskTiger. ```python from redis import Redis from tasktiger import TaskTiger conn = Redis(db=15, decode_responses=True) tiger = TaskTiger(connection=conn) ``` -------------------------------- ### TaskTiger Unit Tests Setup Source: https://context7.com/closeio/tasktiger/llms.txt Set up TaskTiger for unit tests, using a dedicated Redis database and configuring synchronous task execution. ```python from tasktiger import TaskTiger from tasktiger.test import TaskTigerTestMixin from redis import Redis import unittest class TestTasks(unittest.TestCase, TaskTigerTestMixin): def setUp(self): self.conn = Redis(db=15, decode_responses=True) # Use test database self.conn.flushdb() self.tiger = TaskTiger( connection=self.conn, config={'ALWAYS_EAGER': True} # Execute tasks synchronously ) ``` -------------------------------- ### Decorate Task with Queue and Unique Options Source: https://github.com/closeio/tasktiger/blob/master/README.rst Example of decorating a task with specific options like 'queue' and 'unique'. These options can be overridden when queueing. ```python @tiger.task(queue='myqueue', unique=True) def my_task(): print('Hello') ``` -------------------------------- ### Setup TaskTiger Testing Source: https://context7.com/closeio/tasktiger/llms.txt Imports required for testing TaskTiger tasks using the provided test mixin. ```python import unittest from tasktiger import TaskTiger, Task from tasktiger.test_helpers import TaskTigerTestMixin from redis import Redis ``` -------------------------------- ### Batch Task Processing Example Source: https://github.com/closeio/tasktiger/blob/master/README.rst Configure a task to process multiple items of the same type at once by setting the 'batch' option to True. This is effective when the worker is configured for batch queues. ```python batch=True ``` -------------------------------- ### Run a Worker Source: https://github.com/closeio/tasktiger/blob/master/README.rst Execute the tasktiger command to start processing tasks, ensuring the task module is in the PYTHONPATH. ```bash % PYTHONPATH=. tasktiger ``` -------------------------------- ### Get Queue Statistics Source: https://context7.com/closeio/tasktiger/llms.txt Retrieve statistics about the task queues, including the number of queued tasks. ```python stats = self.tiger.get_queue_stats() ``` -------------------------------- ### Schedule Periodic Task Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use `periodic` to schedule tasks at regular intervals. For example, run every 5 minutes indefinitely or every Sunday at 4am. ```python schedule=periodic(minutes=5) ``` ```python schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4)) ``` -------------------------------- ### Schedule Task with Cron Expression Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use `cron_expr` for cron-style scheduling. Specify the cron expression and optionally start and end dates. Defaults to '2000-01-01T00:00Z' if start_date is not provided. ```python schedule=cron_expr("0 * * * *") ``` ```python schedule=cron_expr("0 4 * * 0") ``` -------------------------------- ### Periodic Schedule Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use this function to schedule a task to run at periodic intervals. You can specify the interval using seconds, minutes, hours, days, and weeks, along with optional start and end dates. ```python periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None) ``` -------------------------------- ### Queuing Tasks with delay() Source: https://context7.com/closeio/tasktiger/llms.txt Demonstrates how to queue tasks with various options, including delays, keyword arguments, and overriding default settings. ```APIDOC ## Queuing Tasks with delay() Use `tiger.delay()` to queue tasks with runtime options that override decorator defaults. Supports scheduling tasks for future execution using datetime or timedelta. ### Request Example ```python import datetime from tasktiger import TaskTiger from tasktiger.retry import fixed tiger = TaskTiger() def send_notification(user_id, message): print(f"Notifying user {user_id}: {message}") def generate_report(report_type, start_date, end_date): print(f"Generating {report_type} report from {start_date} to {end_date}") # Queue task for immediate execution task = tiger.delay(send_notification, args=['user123', 'Your order shipped!']) print(f"Task ID: {task.id}") # Queue with keyword arguments tiger.delay( generate_report, kwargs={'report_type': 'sales', 'start_date': '2024-01-01', 'end_date': '2024-01-31'} ) # Schedule task for future execution (absolute datetime) future_time = datetime.datetime.utcnow() + datetime.timedelta(hours=2) tiger.delay(send_notification, args=['user456', 'Reminder!'], when=future_time) # Schedule task with relative timedelta tiger.delay(send_notification, args=['user789', 'Follow up'], when=datetime.timedelta(minutes=30)) # Override task options at queue time tiger.delay( send_notification, args=['user123', 'Urgent!'], queue='high_priority', hard_timeout=10, unique=True, retry=True, retry_method=fixed(delay=60, max_retries=3), ) # Enforce maximum queue size (raises QueueFullException if exceeded) from tasktiger.exceptions import QueueFullException try: tiger.delay(send_notification, args=['user', 'msg'], max_queue_size=100) except QueueFullException as e: print(f"Queue full: {e}") ``` ``` -------------------------------- ### Initialize TaskTiger with Redis Connection and Configuration Source: https://github.com/closeio/tasktiger/blob/master/README.rst Demonstrates initializing TaskTiger with a Redis connection and configuring batch queues. Ensure Redis connection uses decode_responses=True. ```python import tasktiger from redis import Redis conn = Redis(db=1, decode_responses=True) tiger = tasktiger.TaskTiger(connection=conn, config={ 'BATCH_QUEUES': { # Batch up to 50 tasks that are queued in the my_batch_queue or any # of its subqueues, except for the send_email subqueue which only # processes up to 10 tasks at a time. 'my_batch_queue': 50, 'my_batch_queue.send_email': 10, }, }) ``` -------------------------------- ### Queue a Task using delay Method Source: https://github.com/closeio/tasktiger/blob/master/README.rst Illustrates two equivalent ways to queue a task: using the generic delay method and the decorated task's delay method. The latter requires the task to be decorated. ```python In [1]: import tasks # The following are equivalent. However, the second syntax can only be used # if the task is decorated. In [2]: tasks.tiger.delay(my_task, args=('John',), kwargs={'n': 1}) In [3]: tasks.my_task.delay('John', n=1) ``` -------------------------------- ### Working with Task Objects Source: https://context7.com/closeio/tasktiger/llms.txt Explains how to use the Task class for direct task creation, inspection, and management, including querying queues, retrying, and canceling tasks. ```APIDOC ## Working with Task Objects The Task class allows direct task creation, inspection, and management. Use it to query task queues, retry failed tasks, cancel scheduled tasks, or construct unique tasks for manipulation. ### Request Example ```python from tasktiger import TaskTiger, Task import datetime tiger = TaskTiger() def send_email(email_id): print(f"Sending email {email_id}") # Create and queue a task directly task = Task(tiger, send_email, args=['email_001'], unique=True) task.delay(when=datetime.timedelta(minutes=5)) print(f"Queued task {task.id} in state {task.state}") # Reconstruct a unique task (same ID for same args) same_task = Task(tiger, send_email, args=['email_001'], unique=True) same_task.cancel() # Cancels the scheduled task # Load a specific task by ID from a queue task = Task.from_id( tiger, queue='default', state='error', task_id='6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79', load_executions=3, # Load last 3 execution records ) print(f"Task: {task.serialized_func}, Args: {task.args}, Kwargs: {task.kwargs}") print(f"Executions: {task.n_executions()}") for execution in task.executions: print(f" Failed at: {execution['time_failed']}, Error: {execution['exception_name']}") print(f" Traceback: {execution['traceback']}") # Retry a failed task task.retry() # List tasks from a queue total_count, tasks = Task.tasks_from_queue( tiger, queue='default', state='error', # 'queued', 'active', 'scheduled', or 'error' skip=0, limit=100, load_executions=1, ) print(f"Found {total_count} tasks in error state") for t in tasks: print(f" {t.id}: {t.serialized_func} - {t.ts}") # Delete a task from the error queue task.delete() # Update scheduled time for a scheduled task task = Task(tiger, send_email, args=['email_002'], unique=True) task.delay(when=datetime.timedelta(hours=1)) task.update_scheduled_time(datetime.datetime.utcnow() + datetime.timedelta(minutes=10)) # Execute a task immediately without queuing (useful for testing) task = Task(tiger, send_email, args=['email_003']) task.execute() ``` ``` -------------------------------- ### Queue and cancel a unique task Source: https://github.com/closeio/tasktiger/blob/master/README.rst Demonstrates creating a unique task and retrieving a reference to it later to perform actions like cancellation. ```python from tasktiger import TaskTiger, Task tiger = TaskTiger() # Send an email in five minutes. task = Task(tiger, send_mail, args=['email_id'], unique=True) task.delay(when=datetime.timedelta(minutes=5)) # Unique tasks get back a task instance referring to the same task by simply # creating the same task again. ``` -------------------------------- ### Initialize TaskTiger Source: https://context7.com/closeio/tasktiger/llms.txt Create a TaskTiger instance with a Redis connection. Supports custom configuration and lazy initialization. ```python import tasktiger from redis import Redis # Basic initialization with default Redis connection tiger = tasktiger.TaskTiger() # Initialize with custom Redis connection and config conn = Redis(host='localhost', port=6379, db=1, decode_responses=True) tiger = tasktiger.TaskTiger( connection=conn, config={ 'REDIS_PREFIX': 't', 'DEFAULT_QUEUE': 'default', 'ALWAYS_EAGER': False, # Set True for testing 'DEFAULT_HARD_TIMEOUT': 300, 'BATCH_QUEUES': { 'my_batch_queue': 50, 'my_batch_queue.send_email': 10, }, 'ONLY_QUEUES': [], # Process only these queues 'EXCLUDE_QUEUES': [], # Exclude these queues }, setup_structlog=True, # Enable JSON structured logging ) # Lazy initialization for importing decorated tasks before config is ready tiger = tasktiger.TaskTiger(lazy_init=True) # Later, provide connection and config: tiger.init(connection=conn, config={'DEFAULT_QUEUE': 'myqueue'}) ``` -------------------------------- ### Launch TaskTiger with custom arguments Source: https://github.com/closeio/tasktiger/blob/master/README.rst Integrate TaskTiger into existing management scripts by parsing command line arguments manually. ```python import sys from tasktiger import TaskTiger try: command = sys.argv[1] except IndexError: command = None if command == 'tasktiger': tiger = TaskTiger(setup_structlog=True) # Strip the "tasktiger" arg when running via manage, so we can run e.g. # ./manage.py tasktiger --help tiger.run_worker_with_args(sys.argv[2:]) sys.exit(0) ``` -------------------------------- ### TaskTiger Initialization Source: https://context7.com/closeio/tasktiger/llms.txt Initialize TaskTiger with a Redis connection and configuration. The `ALWAYS_EAGER` config option executes tasks synchronously. ```python from tasktiger import TaskTiger tiger = TaskTiger( connection=self.conn, config={'ALWAYS_EAGER': True} ) ``` -------------------------------- ### Inspect Queues and Retry Task by ID in Python Source: https://github.com/closeio/tasktiger/blob/master/README.rst Demonstrates how to retrieve tasks from a specific queue and state, iterate through them, and retry a task using its ID. Requires TaskTiger and Task imports. ```python from tasktiger import TaskTiger, Task QUEUE_NAME = 'default' TASK_STATE = 'error' TASK_ID = '6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79' tiger = TaskTiger() n_total, tasks = Task.tasks_from_queue(tiger, QUEUE_NAME, TASK_STATE) for task in tasks: print(task.id, task.func) task = Task.from_id(tiger, QUEUE_NAME, TASK_STATE, TASK_ID) task.retry() ``` -------------------------------- ### Run TaskTiger Tests with Docker Compose in Bash Source: https://github.com/closeio/tasktiger/blob/master/README.rst Provides bash commands to run TaskTiger tests using Docker Compose, both for the entire test suite and for specific test files. ```bash docker-compose run --rm tasktiger pytest ``` ```bash docker-compose run --rm tasktiger pytest tests/test_base.py::TestCase ``` -------------------------------- ### TaskTiger Configuration Source: https://context7.com/closeio/tasktiger/llms.txt Configure TaskTiger with specific settings. `ALWAYS_EAGER: True` forces synchronous execution of tasks. ```python from tasktiger import TaskTiger tiger = TaskTiger( config={'ALWAYS_EAGER': True} ) ``` -------------------------------- ### TaskTiger Configuration Source: https://github.com/closeio/tasktiger/blob/master/README.rst Configuration options for initializing a TaskTiger object, including Redis connection and various settings like ALWAYS_EAGER, BATCH_QUEUES, and ONLY_QUEUES. ```APIDOC ## TaskTiger Configuration ### Description TaskTiger requires a Redis connection object and an optional configuration dictionary. The `connection` should be initialized with `decode_responses=True`. The `config` dictionary allows customization of TaskTiger's behavior. ### Parameters #### Connection - **connection** (redis.Redis) - Required - Redis connection object. #### Configuration Options (Dict) - **ALWAYS_EAGER** (bool) - Optional - If True, tasks (except future tasks) are executed locally and block until completion. Useful for testing. - **BATCH_QUEUES** (dict) - Optional - Defines queues to be processed in batches. Keys are queue names, values are batch sizes. Tasks must be declared with `batch=True`. Subqueues inherit batch settings. - Example: `{'my_batch_queue': 50, 'my_batch_queue.send_email': 10}` - **ONLY_QUEUES** (list) - Optional - If set, a worker only processes tasks from the specified queues and their subqueues. - **setup_structlog** (bool) - Optional - If True, sets up structured logging using `structlog`. ### Example ```python import tasktiger from redis import Redis conn = Redis(db=1, decode_responses=True) tiger = tasktiger.TaskTiger(connection=conn, config={ 'BATCH_QUEUES': { 'my_batch_queue': 50, 'my_batch_queue.send_email': 10, }, 'ALWAYS_EAGER': True }) ``` ``` -------------------------------- ### Supervisor Configuration for TaskTiger Workers Source: https://github.com/closeio/tasktiger/blob/master/README.rst This Supervisor configuration file sets up 4 TaskTiger workers to run as the 'ubuntu' user. It includes settings for process naming, restart policies, logging, and exit codes. ```bash [program:tasktiger] command=/usr/local/bin/tasktiger process_name=%(program_name)s_%(process_num)02d numprocs=4 numprocs_start=0 priority=999 autostart=true autorestart=true startsecs=10 startretries=3 exitcodes=0,2 stopsignal=TERM stopwaitsecs=600 killasgroup=false user=ubuntu redirect_stderr=false stdout_logfile=/var/log/tasktiger.out.log stdout_logfile_maxbytes=250MB stdout_logfile_backups=10 stderr_logfile=/var/log/tasktiger.err.log stderr_logfile_maxbytes=250MB stderr_logfile_backups=10 ``` -------------------------------- ### Manage Tasks with Task Objects Source: https://context7.com/closeio/tasktiger/llms.txt Use the Task class for direct creation, inspection, cancellation, and execution of tasks. ```python from tasktiger import TaskTiger, Task import datetime tiger = TaskTiger() def send_email(email_id): print(f"Sending email {email_id}") # Create and queue a task directly task = Task(tiger, send_email, args=['email_001'], unique=True) task.delay(when=datetime.timedelta(minutes=5)) print(f"Queued task {task.id} in state {task.state}") # Reconstruct a unique task (same ID for same args) same_task = Task(tiger, send_email, args=['email_001'], unique=True) same_task.cancel() # Cancels the scheduled task # Load a specific task by ID from a queue task = Task.from_id( tiger, queue='default', state='error', task_id='6fa07a91642363593cddef7a9e0c70ae3480921231710aa7648b467e637baa79', load_executions=3, # Load last 3 execution records ) print(f"Task: {task.serialized_func}, Args: {task.args}, Kwargs: {task.kwargs}") print(f"Executions: {task.n_executions()}") for execution in task.executions: print(f" Failed at: {execution['time_failed']}, Error: {execution['exception_name']}") print(f" Traceback: {execution['traceback']}") # Retry a failed task task.retry() # List tasks from a queue total_count, tasks = Task.tasks_from_queue( tiger, queue='default', state='error', # 'queued', 'active', 'scheduled', or 'error' skip=0, limit=100, load_executions=1, ) print(f"Found {total_count} tasks in error state") for t in tasks: print(f" {t.id}: {t.serialized_func} - {t.ts}") # Delete a task from the error queue task.delete() # Update scheduled time for a scheduled task task = Task(tiger, send_email, args=['email_002'], unique=True) task.delay(when=datetime.timedelta(hours=1)) task.update_scheduled_time(datetime.datetime.utcnow() + datetime.timedelta(minutes=10)) # Execute a task immediately without queuing (useful for testing) task = Task(tiger, send_email, args=['email_003']) task.execute() ``` -------------------------------- ### Test Task Queuing and Worker Processing Source: https://context7.com/closeio/tasktiger/llms.txt Test the actual task queuing and worker processing by disabling eager mode, queuing tasks, verifying queue stats, running the worker, and checking processed stats. ```python def test_task_queuing(self): """Test actual task queuing and worker processing.""" # Disable eager mode for this test self.tiger.config['ALWAYS_EAGER'] = False # Queue some tasks self.tiger.delay(process_item, args=[1]) self.tiger.delay(process_item, args=[2]) self.tiger.delay(process_item, args=[3]) # Verify tasks are queued stats = self.tiger.get_queue_stats() self.assertEqual(stats['default']['queued'], 3) # Run worker to process tasks self.run_worker(self.tiger, raise_on_errors=True) # Verify tasks are processed stats = self.tiger.get_queue_stats() self.assertNotIn('queued', stats.get('default', {})) ``` -------------------------------- ### Implement Custom Task Runners Source: https://context7.com/closeio/tasktiger/llms.txt Define custom runner classes to hook into task execution lifecycles or override default execution behavior. ```python from tasktiger import TaskTiger, Task from tasktiger.runner import BaseRunner, DefaultRunner tiger = TaskTiger() class LoggingRunner(DefaultRunner): """Runner that logs task execution.""" def run_single_task(self, task, hard_timeout): print(f"Starting task {task.id}: {task.serialized_func}") try: super().run_single_task(task, hard_timeout) print(f"Task {task.id} completed successfully") except Exception as e: print(f"Task {task.id} failed: {e}") raise def on_permanent_error(self, task, execution): """Called when a task fails permanently (no more retries).""" print(f"Task {task.id} failed permanently") print(f"Exception: {execution['exception_name']}") print(f"Traceback: {execution['traceback']}") # Send alert, log to external service, etc. send_alert_to_ops(task.id, execution) class CustomRunner(BaseRunner): """Fully custom runner implementation.""" def run_single_task(self, task, hard_timeout): # Custom single task execution with custom_context_manager(): result = task.func(*task.args, **task.kwargs) store_result(task.id, result) def run_batch_tasks(self, tasks, hard_timeout): # Custom batch execution all_params = [{"args": t.args, "kwargs": t.kwargs} for t in tasks] func = tasks[0].func func(all_params) def run_eager_task(self, task): # Custom eager execution (ALWAYS_EAGER mode) return task.func(*task.args, **task.kwargs) # Use custom runner for specific tasks @tiger.task(runner_class=LoggingRunner) def important_task(data): process_important_data(data) # Or specify at queue time tiger.delay(some_task, args=[data], runner_class=CustomRunner) ``` -------------------------------- ### Task Decorator and Options Source: https://github.com/closeio/tasktiger/blob/master/README.rst How to use the TaskTiger task decorator to specify task options and how these options can be overridden when queueing tasks. ```APIDOC ## Task Decorator and Options ### Description The `@tiger.task()` decorator allows specifying default options for a task. These options can be overridden when queueing the task using the `delay` method. ### Task Decorator Usage Tasks can be decorated to set default options. ```python # tasks.py import tasktiger tiger = tasktiger.TaskTiger() @tiger.task(queue='myqueue', unique=True, hard_timeout=60) def my_task(name, n=None): print(f'Hello {name}') ``` ### Task Options The following options can be specified in the task decorator or when queueing a task: - **queue** (string) - The name of the queue where the task will be placed. - **hard_timeout** (integer) - If the task exceeds this duration in seconds, it will be terminated and marked as failed. - **unique** (boolean) - If True, the task will only be queued if no identical task (same function, args, kwargs) is already in the queue. Note: This does not prevent multiple instances of the same task from running concurrently if they are already in the queue. - **unique_key** (list of strings) - If provided, implies `unique=True`. Specifies the keyword arguments to use for determining task uniqueness. ### Queueing Tasks Tasks can be queued using the `delay` method on the `TaskTiger` object or the decorated task object. #### Using `tiger.delay()` This method allows overriding options specified in the decorator. ```python # Assuming 'my_task' is defined in another module tiger.delay(my_task, queue='otherqueue', args=('John',), kwargs={'n': 1}) ``` #### Using Decorated Task's `delay()` method This syntax is only available for decorated tasks. ```python # Assuming 'my_task' is decorated as shown above import tasks tasks.my_task.delay('John', n=1) ``` ### Important Note Tasks must be defined in a module other than the script being executed (i.e., not in `__main__`). TaskTiger will raise an error otherwise. ``` -------------------------------- ### Define a Task with TaskTiger Decorator Source: https://github.com/closeio/tasktiger/blob/master/README.rst Shows how to define a task using the @tiger.task decorator. This allows for alternative task queuing syntax. ```python # tasks.py import tasktiger tiger = tasktiger.TaskTiger() @tiger.task() def my_task(name, n=None): print('Hello', name) ``` -------------------------------- ### Task Execution Source: https://context7.com/closeio/tasktiger/llms.txt Execute a Task instance directly. This is typically used in testing or when synchronous execution is desired. ```python from tasktiger import Task task = Task(self.tiger, add_numbers, args=[2, 3]) result = task.execute() ``` -------------------------------- ### Configure Rollbar Error Handling in Python Source: https://github.com/closeio/tasktiger/blob/master/README.rst Illustrates how to integrate Rollbar for error handling with TaskTiger, logging errors to Rollbar with a custom prefix. Requires TaskTiger, rollbar, and StructlogRollbarHandler imports, and Rollbar credentials. ```python import logging import rollbar import sys from tasktiger import TaskTiger from tasktiger.rollbar import StructlogRollbarHandler tiger = TaskTiger(setup_structlog=True) rollbar.init(ROLLBAR_API_KEY, APPLICATION_ENVIRONMENT, allow_logging_basic_config=False) rollbar_handler = StructlogRollbarHandler('TaskTiger') rollbar_handler.setLevel(logging.ERROR) tiger.log.addHandler(rollbar_handler) tiger.run_worker_with_args(sys.argv[1:]) ``` -------------------------------- ### Define a Task Source: https://github.com/closeio/tasktiger/blob/master/README.rst Create a simple Python function to be executed by the TaskTiger worker. ```python # tasks.py def my_task(): print('Hello') ``` -------------------------------- ### Test Eager Task Execution Source: https://context7.com/closeio/tasktiger/llms.txt Test task execution in eager mode where tasks run synchronously and return their results directly. ```python def test_add_numbers_eager(self): """Test task execution in eager mode.""" # Task executes immediately and returns result task = Task(self.tiger, add_numbers, args=[2, 3]) result = task.execute() self.assertEqual(result, 5) ``` -------------------------------- ### Queue Tasks with delay() Source: https://context7.com/closeio/tasktiger/llms.txt Use tiger.delay() to queue tasks with custom runtime options, scheduling, or queue constraints. ```python import datetime from tasktiger import TaskTiger from tasktiger.retry import fixed tiger = TaskTiger() def send_notification(user_id, message): print(f"Notifying user {user_id}: {message}") def generate_report(report_type, start_date, end_date): print(f"Generating {report_type} report from {start_date} to {end_date}") # Queue task for immediate execution task = tiger.delay(send_notification, args=['user123', 'Your order shipped!']) print(f"Task ID: {task.id}") # Queue with keyword arguments tiger.delay( generate_report, kwargs={'report_type': 'sales', 'start_date': '2024-01-01', 'end_date': '2024-01-31'} ) # Schedule task for future execution (absolute datetime) future_time = datetime.datetime.utcnow() + datetime.timedelta(hours=2) tiger.delay(send_notification, args=['user456', 'Reminder!'], when=future_time) # Schedule task with relative timedelta tiger.delay(send_notification, args=['user789', 'Follow up'], when=datetime.timedelta(minutes=30)) # Override task options at queue time tiger.delay( send_notification, args=['user123', 'Urgent!'], queue='high_priority', hard_timeout=10, unique=True, retry=True, retry_method=fixed(delay=60, max_retries=3), ) # Enforce maximum queue size (raises QueueFullException if exceeded) from tasktiger.exceptions import QueueFullException try: tiger.delay(send_notification, args=['user', 'msg'], max_queue_size=100) except QueueFullException as e: print(f"Queue full: {e}") ``` -------------------------------- ### Configure task retry strategies Source: https://context7.com/closeio/tasktiger/llms.txt Define retry behavior using built-in methods like fixed, linear, or exponential backoff. Raise RetryException for custom logic or use retry_on to filter exceptions. ```python from tasktiger import TaskTiger, RetryException from tasktiger.retry import fixed, linear, exponential from tasktiger.exceptions import StopRetry tiger = TaskTiger() # Fixed delay: retry up to 3 times, waiting 60 seconds between retries @tiger.task(retry_method=fixed(delay=60, max_retries=3)) def task_with_fixed_retry(): raise Exception("Temporary failure") # Linear backoff: start at 10s, add 10s each retry, max 5 retries (10, 20, 30, 40, 50) @tiger.task(retry_method=linear(delay=10, increment=10, max_retries=5)) def task_with_linear_backoff(): raise Exception("Temporary failure") # Exponential backoff: start at 5s, multiply by 2 each retry, max 4 retries (5, 10, 20, 40) @tiger.task(retry_method=exponential(delay=5, factor=2, max_retries=4)) def task_with_exponential_backoff(): raise Exception("Temporary failure") # Custom retry logic using RetryException def smart_task(): try: # Attempt some operation external_api_call() except ConnectionError: # Exponential backoff for network errors, don't log as error if exhausted raise RetryException( method=exponential(60, 2, 5), original_traceback=True, log_error=False, ) except ValueError: # Fixed retry for validation errors raise RetryException(method=fixed(30, 2)) # Retry only on specific exceptions @tiger.task(retry_on=(ConnectionError, TimeoutError)) def selective_retry_task(): # Only retries on ConnectionError or TimeoutError # Other exceptions go directly to error queue pass # Custom retry function def custom_retry_method(retry_count): if retry_count > 10: raise StopRetry() # Stop retrying after 10 attempts # Custom delay calculation return min(300, 2 ** retry_count) # Cap at 5 minutes @tiger.task(retry_method=custom_retry_method) def task_with_custom_retry(): pass ``` -------------------------------- ### Queue a Task Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use the delay method on a TaskTiger instance to queue a function for execution. ```python In [1]: import tasktiger, tasks In [2]: tiger = tasktiger.TaskTiger() In [3]: tiger.delay(tasks.my_task) ``` -------------------------------- ### Retrieve queue statistics Source: https://context7.com/closeio/tasktiger/llms.txt Use get_queue_stats and get_queue_sizes to monitor task counts across different states like queued, active, and scheduled. ```python import datetime from tasktiger import TaskTiger tiger = TaskTiger() # Get stats for all queues stats = tiger.get_queue_stats() # Returns: {'default': {'queued': 10, 'active': 2, 'scheduled': 5, 'error': 1}, ...} for queue_name, queue_stats in stats.items(): print(f"Queue '{queue_name}':") for state, count in queue_stats.items(): print(f" {state}: {count}") # Get sizes for a specific queue sizes = tiger.get_queue_sizes('default') # Returns: {'queued': 10, 'scheduled': 5, 'active': 2} print(f"Default queue total: {tiger.get_total_queue_size('default')}") ``` -------------------------------- ### Access Batch Task Instances in Python Source: https://github.com/closeio/tasktiger/blob/master/README.rst Shows how to access current task instances within a batch task function to check their execution count. Requires TaskTiger import and a decorated task function. ```python from tasktiger import TaskTiger tiger = TaskTiger() @tiger.task(batch=True) def my_task(args): for task in tiger.current_tasks: print(task.n_executions()) ``` -------------------------------- ### Execute TaskTiger workers Source: https://context7.com/closeio/tasktiger/llms.txt Run workers via command line or programmatically. Programmatic execution allows for fine-grained control over queue selection and worker configuration. ```python # Command line usage: # tasktiger # Process all queues # tasktiger -q emails,notifications # Process specific queues # tasktiger -q emails -e emails.spam # Include emails, exclude emails.spam subqueue # tasktiger -m myapp.tasks # Preload module for better performance # tasktiger -h redis.example.com -p 6380 -n 2 # Custom Redis host/port/db # tasktiger --executor sync --exit-after 60 # Sync mode, exit after 60 minutes # tasktiger -M 4 # Max 4 workers per queue # Programmatic worker execution from tasktiger import TaskTiger tiger = TaskTiger(setup_structlog=True) # Run worker processing all queues tiger.run_worker() # Run worker with specific options tiger.run_worker( queues='high_priority,default', exclude_queues='low_priority', module='myapp.tasks,myapp.periodic_tasks', max_workers_per_queue=2, store_tracebacks=True, ) # Custom launch script (e.g., manage.py tasktiger) import sys from tasktiger import TaskTiger if __name__ == '__main__': tiger = TaskTiger(setup_structlog=True) if len(sys.argv) > 1 and sys.argv[1] == 'tasktiger': tiger.run_worker_with_args(sys.argv[2:]) ``` -------------------------------- ### Periodic and Scheduled Tasks Source: https://context7.com/closeio/tasktiger/llms.txt Details on defining tasks that run on a schedule using interval-based `periodic()` or cron-style `cron_expr()` schedulers. Periodic tasks are automatically unique. ```APIDOC ## Periodic and Scheduled Tasks Define tasks that run on a schedule using interval-based `periodic()` or cron-style `cron_expr()` schedulers. Periodic tasks are automatically unique to prevent duplicate scheduling. ### Request Example ```python import datetime from tasktiger import TaskTiger, periodic, cron_expr tiger = TaskTiger() # Run every 5 minutes @tiger.task(schedule=periodic(minutes=5)) def collect_metrics(): print("Collecting metrics") # Run every hour starting from a specific date @tiger.task(schedule=periodic(hours=1, start_date=datetime.datetime(2024, 1, 1, 0, 0))) def hourly_cleanup(): print("Running hourly cleanup") # Run every Sunday at 4am UTC @tiger.task(schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4, 0))) def weekly_report(): print("Generating weekly report") # Run with an end date (stops scheduling after end_date) @tiger.task(schedule=periodic( days=1, start_date=datetime.datetime(2024, 1, 1), end_date=datetime.datetime(2024, 12, 31) )) def daily_task_with_expiry(): print("Running daily task (expires end of 2024)") ``` ``` -------------------------------- ### Exponential Retry Method Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use this method to retry a task with an exponentially increasing delay. Specify the initial delay, the multiplication factor, and the maximum number of retries. ```python exponential(delay, factor, max_retries) ``` -------------------------------- ### Retrieve Tasks from Queue Source: https://context7.com/closeio/tasktiger/llms.txt Retrieve tasks from a specific queue, including failed tasks. `load_executions=1` ensures execution details are loaded. ```python n_total, tasks = Task.tasks_from_queue( self.tiger, 'default', 'error', load_executions=1 ) ``` -------------------------------- ### Manage Queues and Tasks Source: https://context7.com/closeio/tasktiger/llms.txt Perform batch operations on queues and purge errored tasks with optional batching to prevent Redis overload. ```python sizes = tiger.get_sizes_for_queues_and_states([ ('emails', 'queued'), ('emails', 'error'), ('notifications', 'queued'), ]) print(f"Sizes: {sizes}") # [42, 3, 15] # Pause queue processing with system lock (requires max_workers_per_queue) tiger.set_queue_system_lock('maintenance_queue', timeout=3600) # Lock for 1 hour # Check system lock status lock_expires = tiger.get_queue_system_lock('maintenance_queue') if lock_expires: print(f"Queue locked until {datetime.datetime.fromtimestamp(lock_expires)}") # Purge old errored tasks n_purged = tiger.purge_errored_tasks( queues=['default', 'emails'], exclude_queues=['critical'], last_execution_before=datetime.datetime.utcnow() - datetime.timedelta(weeks=2), limit=1000, ) print(f"Purged {n_purged} old failed tasks") # Purge all errored tasks in batches to avoid Redis overload limit = 500 n_processed = limit while n_processed == limit: n_processed = tiger.purge_errored_tasks( last_execution_before=datetime.datetime.utcnow() - datetime.timedelta(days=30), limit=limit, ) import time time.sleep(0.1) # Don't overload Redis ``` -------------------------------- ### Periodic Task Scheduling Source: https://github.com/closeio/tasktiger/blob/master/README.rst Configuration options for scheduling periodic tasks using cron expressions. ```APIDOC ## cron_expr(expr, start_date=None, end_date=None) ### Description Defines a periodic task schedule using a cron expression. ### Parameters - **expr** (string) - Required - The cron expression (e.g., "0 * * * *"). - **start_date** (datetime) - Optional - The start date for the periodic task. Defaults to 2000-01-01T00:00Z. - **end_date** (datetime) - Optional - The end date for the periodic task. If not provided, the task repeats indefinitely. ``` -------------------------------- ### TaskTiger Worker CLI Source: https://github.com/closeio/tasktiger/blob/master/README.rst Command-line options for invoking and configuring TaskTiger workers. ```APIDOC ## tasktiger worker command ### Description Command-line interface to start TaskTiger workers. ### Options - **-q, --queues** (string) - Optional - Comma-separated list of queues to process. - **-e, --exclude-queues** (string) - Optional - Comma-separated list of queues to exclude. - **-m, --module** (string) - Optional - Modules to preload for performance. - **-h, --host** (string) - Optional - Redis server hostname. - **-p, --port** (integer) - Optional - Redis server port. ``` -------------------------------- ### Linear Retry Method Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use this method to retry a task with a linearly increasing delay. Specify the initial delay, the increment per retry, and the maximum number of retries. ```python linear(delay, increment, max_retries) ``` -------------------------------- ### Override Task Decorator Options When Queueing Source: https://github.com/closeio/tasktiger/blob/master/README.rst Demonstrates how to override task options specified in the decorator by providing them directly to the tiger.delay method. The task must be defined in a module other than '__main__'. ```python # The task will be queued in "otherqueue", even though the task decorator # says "myqueue". tiger.delay(my_task, queue='otherqueue') ``` -------------------------------- ### Define Periodic and Scheduled Tasks Source: https://context7.com/closeio/tasktiger/llms.txt Use periodic() or cron_expr() decorators to schedule tasks automatically. ```python import datetime from tasktiger import TaskTiger, periodic, cron_expr tiger = TaskTiger() # Run every 5 minutes @tiger.task(schedule=periodic(minutes=5)) def collect_metrics(): print("Collecting metrics") # Run every hour starting from a specific date @tiger.task(schedule=periodic(hours=1, start_date=datetime.datetime(2024, 1, 1, 0, 0))) def hourly_cleanup(): print("Running hourly cleanup") # Run every Sunday at 4am UTC @tiger.task(schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4, 0))) def weekly_report(): print("Generating weekly report") # Run with an end date (stops scheduling after end_date) @tiger.task(schedule=periodic( days=1, start_date=datetime.datetime(2024, 1, 1), end_date=datetime.datetime(2024, 12, 31) )) def daily_task_with_expiry(): print("Running daily task (expires end of 2024)") ``` -------------------------------- ### Fixed Retry Method Source: https://github.com/closeio/tasktiger/blob/master/README.rst Use this method to retry a task with a fixed delay between attempts. Specify the delay in seconds and the maximum number of retries. ```python fixed(delay, max_retries) ``` -------------------------------- ### Schedule tasks with cron expressions Source: https://context7.com/closeio/tasktiger/llms.txt Use the @tiger.task decorator with cron_expr to define periodic tasks. Date bounds can be specified using start_date and end_date. ```python # Cron expression: every hour at minute 0 @tiger.task(schedule=cron_expr("0 * * * *")) def hourly_cron_task(): print("Hourly cron task") # Cron expression: every Sunday at 4am @tiger.task(schedule=cron_expr("0 4 * * 0")) def sunday_morning_task(): print("Sunday morning task") # Cron expression: every weekday at 9am, with date bounds @tiger.task(schedule=cron_expr( "0 9 * * 1-5", start_date=datetime.datetime(2024, 1, 1), end_date=datetime.datetime(2024, 6, 30) )) def weekday_morning_task(): print("Weekday morning task") ``` -------------------------------- ### Python Unit Test Boilerplate Source: https://context7.com/closeio/tasktiger/llms.txt Standard Python boilerplate for running unit tests using the `unittest` module. ```python import unittest if __name__ == '__main__': unittest.main() ``` -------------------------------- ### Purge Errored Tasks Periodically in Python Source: https://github.com/closeio/tasktiger/blob/master/README.rst Sets up a periodic task to purge errored tasks from Redis based on a specified limit and last execution time. Requires TaskTiger and periodic imports, and datetime module. ```python from tasktiger import TaskTiger, periodic tiger = TaskTiger() @tiger.task(schedule=periodic(hours=1)) def purge_errored_tasks(): tiger.purge_errored_tasks( limit=1000, last_execution_before=( datetime.datetime.utcnow() - datetime.timedelta(weeks=12) ) ) ``` -------------------------------- ### Delay Task Execution Source: https://context7.com/closeio/tasktiger/llms.txt Queue a task for asynchronous execution using the `delay` method. This method is used when `ALWAYS_EAGER` is set to `False`. ```python self.tiger.delay(process_item, args=[1]) ``` -------------------------------- ### Test Failed Task Handling Source: https://context7.com/closeio/tasktiger/llms.txt Test error handling for tasks that raise exceptions. This involves queuing a failing task, running the worker with error raising disabled, and checking the error queue. ```python def test_failed_task(self): """Test error handling.""" self.tiger.config['ALWAYS_EAGER'] = False def failing_task(): raise ValueError("Task failed") self.tiger.delay(failing_task) # Run worker (don't raise on errors for this test) self.run_worker(self.tiger, raise_on_errors=False) # Check error queue n_total, tasks = Task.tasks_from_queue( self.tiger, 'default', 'error', load_executions=1 ) self.assertEqual(n_total, 1) self.assertIn('ValueError', tasks[0].executions[0]['exception_name']) ``` -------------------------------- ### Custom Retry Exception Source: https://github.com/closeio/tasktiger/blob/master/README.rst Using RetryException to handle custom retry logic within task functions. ```APIDOC ## RetryException ### Description Raised within a task function to override default retry behavior. ### Parameters - **method** (object) - Optional - Custom retry method (e.g., exponential or fixed). - **original_traceback** (boolean) - Optional - If True, logs the original traceback. Defaults to False. - **log_error** (boolean) - Optional - If False, logs a warning instead of an error on permanent failure. Defaults to True. ``` -------------------------------- ### Access Task Context Source: https://context7.com/closeio/tasktiger/llms.txt Retrieve metadata, batch information, or the TaskTiger instance from within a task function. ```python from tasktiger import TaskTiger tiger = TaskTiger() @tiger.task() def task_with_context(): # Access current task instance task = tiger.current_task print(f"Task ID: {task.id}") print(f"Function: {task.serialized_func}") print(f"Args: {task.args}, Kwargs: {task.kwargs}") print(f"Retry method: {task.retry_method}") print(f"Execution count: {task.n_executions()}") @tiger.task(batch=True, queue='batch') def batch_task_with_context(params): # Access all tasks in the batch tasks = tiger.current_tasks print(f"Processing {len(tasks)} tasks in batch") for task in tasks: print(f" Task {task.id}: args={task.args}") # Process the params for p in params: print(f"Processing: {p['args']}, {p['kwargs']}") @tiger.task() def conditional_task(): # Check if running in batch mode is_batch = tiger.current_task_is_batch print(f"Running in batch mode: {is_batch}") # Get serialized function name func_name = tiger.current_serialized_func print(f"Function: {func_name}") @tiger.task() def access_tiger_instance(): # Access the TaskTiger instance from within a task current_tiger = TaskTiger.current_instance print(f"Redis prefix: {current_tiger.config['REDIS_PREFIX']}") ``` -------------------------------- ### Migrate Task Execution Counts Source: https://github.com/closeio/tasktiger/blob/master/CHANGELOG.md Use this function to migrate task execution counts when upgrading to TaskTiger version 0.17.0. Ensure TaskTiger is upgraded to 0.16.2 first. This migration can take time depending on the number of tasks. ```python from tasktiger import TaskTiger from tasktiger.migrations import migrate_executions_count # Instantiate directly or import from your application module tiger = TaskTiger(...) # This could take a while depending on the # number of failed/retrying tasks you have migrate_executions_count(tiger) ``` -------------------------------- ### Run Worker Source: https://context7.com/closeio/tasktiger/llms.txt Run the TaskTiger worker to process tasks from the queue. `raise_on_errors=True` will cause the worker to raise exceptions encountered during task execution. ```python self.run_worker(self.tiger, raise_on_errors=True) ```