### Install LiteQueue via pip Source: https://github.com/litements/litequeue/blob/main/README.md Commands to set up a virtual environment and install the package. ```bash python3 -m venv .venv python3 -m pip --require-virtualenv install --upgrade litequeue ``` -------------------------------- ### Get Current Queue Size Source: https://context7.com/litements/litequeue/llms.txt The `qsize` method returns the count of messages currently in the queue that are in the READY or LOCKED state. It excludes messages that are DONE or FAILED. ```python from litequeue import LiteQueue q = LiteQueue(":memory:") print(q.qsize()) # 0 q.put("a") q.put("b") q.put("c") print(q.qsize()) # 3 task = q.pop() print(q.qsize()) # 3 (locked messages still count) q.done(task.message_id) print(q.qsize()) # 2 (done messages don't count) ``` -------------------------------- ### Benchmark Standard Queue Full Cycle Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Measures the performance of a full put, get, and task_done cycle for a standard Queue. ```python %%timeit -n10000 -r7 tid = random_string(20) q.put(tid) q.get() q.task_done() ``` -------------------------------- ### get Source: https://context7.com/litements/litequeue/llms.txt Retrieves a specific message by its message_id without changing its status. ```APIDOC ## get(message_id) ### Description Retrieves a specific message by its message_id without changing its status. Returns None if the message doesn't exist. ### Parameters #### Request Body - **message_id** (string) - Required - The unique identifier of the message to retrieve. ``` -------------------------------- ### Retrieve Message by ID Source: https://context7.com/litements/litequeue/llms.txt The `get` method retrieves a specific message using its `message_id` without altering its status. If the message does not exist, it returns `None`. ```python from litequeue import LiteQueue q = LiteQueue(":memory:") message = q.put("lookup_example") # Retrieve by message_id found = q.get(message.message_id) print(found.data) # 'lookup_example' print(found.in_time) # Insertion timestamp print(found.lock_time) # None (not yet popped) # Non-existent message returns None result = q.get("non-existent-id") print(result) # None ``` -------------------------------- ### List All Failed Messages Source: https://context7.com/litements/litequeue/llms.txt Use `list_failed` to get an iterator over all messages currently in the FAILED status. This is helpful for implementing dead-letter queue patterns or for manual review processes. ```python from litequeue import LiteQueue q = LiteQueue(":memory:") # Create some failed tasks for i in range(3): q.put(f"task_{i}") task = q.pop() q.mark_failed(task.message_id) # List all failed messages failed = list(q.list_failed()) print(len(failed)) # 3 for msg in q.list_failed(): print(f"Failed: {msg.data}, locked at: {msg.lock_time}") ``` -------------------------------- ### Basic LiteQueue Usage Source: https://github.com/litements/litequeue/blob/main/README.md Demonstrates initializing a queue, adding tasks, popping them, and marking them as done. ```python from litequeue import LiteQueue q = LiteQueue(":memory:") q.put("hello") q.put("world") # Message object used by LiteQueue # Message(data='world', message_id=UUID('063e95f1-3d9f-7547-8000-c3eb531fff93'), status=, in_time=1676238611851409010, lock_time=None, done_time=None) task = q.pop() print(task) # Message(data='hello', message_id='063e95f1-3d9e-7bbc-8000-a6a18a5f65d1', status=1, in_time=1676238611851279408, lock_time=1676238623180543854, done_time=None) q.done(task.message_id) q.get(task.message_id) # Message( # data='hello', # message_id='063e95f1-3d9e-7bbc-8000-a6a18a5f65d1', # status=2, <---- status is now 2 (DONE) # in_time=1676238611851279408, # lock_time=1676238623180543854, # done_time=1676238641276753673 # ) ``` -------------------------------- ### Initialize LiteQueue on Disk Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Creates a LiteQueue instance backed by a file. ```python q = LiteQueue("test.queue", maxsize=None) ``` -------------------------------- ### Initialize Standard Queue Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Creates a standard Python Queue instance. ```python from queue import Queue q = Queue() ``` -------------------------------- ### Initialize LiteQueue Environment Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Imports necessary modules and defines a helper function for generating random strings used in benchmarks. ```python from litequeue import LiteQueue, MessageStatus import sqlite3 import gc from string import ascii_lowercase, printable from random import choice def random_string(string_length=10, fuzz=False, space=False): """Generate a random string of fixed length """ letters = ascii_lowercase letters = letters + " " if space else letters if fuzz: letters = printable return "".join(choice(letters) for i in range(string_length)) ``` -------------------------------- ### Initialize LiteQueue Instance Source: https://context7.com/litements/litequeue/llms.txt Configure storage options, size limits, and table names for the queue. Supports in-memory, file-based, and existing SQLite connections. ```python from litequeue import LiteQueue import sqlite3 # In-memory queue (useful for testing) q = LiteQueue(":memory:") # Alternative in-memory initialization q = LiteQueue(memory=True) # File-based persistent queue q = LiteQueue("my_queue.db") # Queue with maximum size limit (raises IntegrityError when full) q = LiteQueue(":memory:", maxsize=100) # Custom table name for multiple queues in same database q = LiteQueue("shared.db", queue_name="EmailQueue") # Using existing SQLite connection conn = sqlite3.connect(":memory:") q = LiteQueue(filename_or_conn=conn) # Custom SQLite cache size (default: 256KB) q = LiteQueue(":memory:", sqlite_cache_size_bytes=512_000) ``` -------------------------------- ### Manage Multiple Queues in One Database Source: https://github.com/litements/litequeue/blob/main/README.md Shows how to use the queue_name parameter to isolate multiple queues within the same SQLite file. ```python import tempfile import litequeue with tempfile.TemporaryDirectory() as tmpdirname: db_path = tmpdirname + "/test.sqlite3" q1 = litequeue.LiteQueue(db_path, queue_name="q1") q2 = litequeue.LiteQueue(db_path, queue_name="q2") q1.put("a") q1.put("b") print("Q1 size", q1.qsize()) print("Q2 size", q2.qsize()) q2.put("c") q2.put("d") print("Q2 size", q2.qsize()) print(q1.pop()) print(q1.peek()) print(q2.peek()) print(q2.pop()) ``` -------------------------------- ### Initialize LiteQueue in Memory Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Creates an in-memory LiteQueue instance. ```python q = LiteQueue(":memory:", maxsize=None) ``` -------------------------------- ### empty and full Source: https://context7.com/litements/litequeue/llms.txt Check the current state of the queue. ```APIDOC ## empty() ### Description Returns True if there are no READY messages. ## full() ### Description Returns True if the queue has reached its maxsize limit. ``` -------------------------------- ### Reclaim Database Space with vacuum() Source: https://context7.com/litements/litequeue/llms.txt Use the `vacuum()` method to run SQLite's VACUUM command and reclaim disk space after deleting many messages. This operation can be slow for large databases. ```python from litequeue import LiteQueue q = LiteQueue("queue.db") # After processing and pruning many messages for i in range(1000): q.put(f"task_{i}") task = q.pop() q.done(task.message_id) q.prune() # Reclaim disk space (slow operation) q.vacuum() ``` -------------------------------- ### Run Garbage Collection Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Triggers manual garbage collection. ```python gc.collect() ``` -------------------------------- ### Benchmark LiteQueue Pop Returning Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Benchmarks the _pop_returning method for LiteQueue. ```python q = LiteQueue("pop_bench.db", maxsize=None) q.pop = q._pop_returning gc.collect() for _ in range(10000): tid = random_string(60) q.put(tid) %%time for _ in range(8000): task = q.pop() ``` -------------------------------- ### qsize Source: https://context7.com/litements/litequeue/llms.txt Returns the number of messages currently in the queue. ```APIDOC ## qsize() ### Description The qsize method returns the number of messages currently in the queue (excludes DONE and FAILED messages). ``` -------------------------------- ### Check SQLite Version Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Prints the current version of the SQLite library being used. ```python print(sqlite3.sqlite_version) ``` -------------------------------- ### Benchmark LiteQueue Full Cycle Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Measures the performance of a full put, pop, and done cycle for LiteQueue. ```python %%timeit -n10000 -r7 tid = random_string(20) q.put(tid) task = q.pop() q.done(task.message_id) ``` -------------------------------- ### peek Source: https://context7.com/litements/litequeue/llms.txt Shows the next message that would be returned by pop() without actually locking it. ```APIDOC ## peek() ### Description The peek method shows the next message that would be returned by pop() without actually locking it. Useful for inspecting the queue state. ``` -------------------------------- ### Check if Queue is Empty or Full Source: https://context7.com/litements/litequeue/llms.txt The `empty` method returns `True` if no messages are READY. The `full` method returns `True` if the queue has reached its `maxsize` limit. If `maxsize` is not set, `full` always returns `False`. ```python from litequeue import LiteQueue import sqlite3 q = LiteQueue(":memory:", maxsize=3) print(q.empty()) # True print(q.full()) # False q.put("a") q.put("b") print(q.empty()) # False print(q.full()) # False q.put("c") print(q.full()) # True # Attempting to add more raises an error try: q.put("d") except sqlite3.IntegrityError as e: print("Queue is full!") # Queue is full! # Pop one to make room q.pop() print(q.full()) # False ``` -------------------------------- ### Check Disk Usage Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Displays disk usage for the queue file and associated journal files. ```python !du -sh test.queue* ``` -------------------------------- ### Benchmark LiteQueue Put Operation Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Measures the performance of putting items into the LiteQueue. ```python %%timeit -n10000 -r7 q.put(random_string(20)) ``` -------------------------------- ### Verify Isolation Level Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Checks that the connection isolation level is set to None. ```python assert q.conn.isolation_level is None ``` -------------------------------- ### Benchmark LiteQueue Pop Transaction Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Benchmarks the _pop_transaction method for LiteQueue. ```python q = LiteQueue("pop_bench.db", maxsize=None) q.pop = q._pop_transaction gc.collect() for _ in range(10000): tid = random_string(60) q.put(tid) %%time for _ in range(8000): task = q.pop() ``` -------------------------------- ### Insert Messages into Queue Source: https://context7.com/litements/litequeue/llms.txt Use the put method to add tasks to the queue. JSON strings can be used for complex data structures. ```python from litequeue import LiteQueue import json q = LiteQueue(":memory:") # Insert simple string message message = q.put("hello world") print(message.message_id) # '063e95f1-3d9e-7bbc-8000-a6a18a5f65d1' print(message.status) # MessageStatus.READY (0) print(message.in_time) # 1676238611851279408 (nanoseconds) # Insert JSON data as message task_data = {"action": "send_email", "to": "user@example.com", "subject": "Hello"} message = q.put(json.dumps(task_data)) # Batch insert multiple messages for i in range(5): q.put(f"task_{i}") ``` -------------------------------- ### Peek at Next Message Without Locking Source: https://context7.com/litements/litequeue/llms.txt Use `peek` to view the next message that `pop()` would retrieve, without actually locking it. This is helpful for inspecting the queue's current state. ```python from litequeue import LiteQueue, MessageStatus q = LiteQueue(":memory:") q.put("first") q.put("second") # Peek shows next message without locking next_msg = q.peek() print(next_msg.data) # 'first' print(next_msg.status) # MessageStatus.READY (not locked) # Multiple peeks return the same message print(q.peek().data) # 'first' (still the same) # Pop actually locks it q.pop() print(q.peek().data) # 'second' (next in queue) ``` -------------------------------- ### done - Mark Message as Completed Source: https://context7.com/litements/litequeue/llms.txt Marks a previously popped message as successfully processed. ```APIDOC ## POST /done ### Description Marks a message as successfully processed by setting its status to DONE and recording the completion timestamp. ### Parameters #### Request Body - **message_id** (string) - Required - The ID of the message to mark as done. ``` -------------------------------- ### Multiple Queues in Same Database Source: https://context7.com/litements/litequeue/llms.txt LiteQueue supports multiple independent queues within the same SQLite database file by using different table names via the `queue_name` parameter. This allows for logical separation of tasks. ```python from litequeue import LiteQueue import tempfile import os with tempfile.TemporaryDirectory() as tmpdir: db_path = os.path.join(tmpdir, "multi_queue.db") # Create separate queues in same database email_queue = LiteQueue(db_path, queue_name="EmailQueue") notification_queue = LiteQueue(db_path, queue_name="NotificationQueue") # Add messages to different queues email_queue.put("Send welcome email") email_queue.put("Send password reset") notification_queue.put("Push notification: New message") print(f"Email queue size: {email_queue.qsize()}") # 2 print(f"Notification queue size: {notification_queue.qsize()}") # 1 # Process from specific queue email_task = email_queue.pop() print(f"Processing: {email_task.data}") # Send welcome email ``` -------------------------------- ### Check Queue Size Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Returns the current number of items in the queue. ```python q.qsize() ``` -------------------------------- ### Mark Message as Completed Source: https://context7.com/litements/litequeue/llms.txt Use the done method to finalize a task after successful processing. ```python from litequeue import LiteQueue, MessageStatus q = LiteQueue(":memory:") q.put("process_order") # Pop and process task = q.pop() try: # Process the task... result = f"Processed: {task.data}" # Mark as done q.done(task.message_id) except Exception as e: # Handle error - see mark_failed() pass # Verify completion completed_task = q.get(task.message_id) print(completed_task.status) # MessageStatus.DONE (2) print(completed_task.done_time) # Completion timestamp (nanoseconds) # Calculate processing time processing_time_ns = completed_task.done_time - completed_task.lock_time print(f"Processing took {processing_time_ns / 1e9:.4f} seconds") ``` -------------------------------- ### put - Insert Message Source: https://context7.com/litements/litequeue/llms.txt Inserts a new message into the queue and returns a Message object. ```APIDOC ## POST /put ### Description Inserts a new message into the queue. Messages are stored as strings, allowing for JSON serialization of complex data. ### Parameters #### Request Body - **data** (string) - Required - The message content to be queued. ### Response #### Success Response (200) - **message_id** (string) - The UUIDv7 identifier for the message. - **status** (int) - The initial status (READY). - **in_time** (int) - The insertion timestamp in nanoseconds. ``` -------------------------------- ### Remove Queue File Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Deletes the queue file from the filesystem. ```python !rm test.queue* ``` -------------------------------- ### Remove Benchmark Database Source: https://github.com/litements/litequeue/blob/main/benchmark.ipynb Deletes the benchmark database file. ```python !rm pop_bench.db* ``` -------------------------------- ### retry Source: https://context7.com/litements/litequeue/llms.txt Resets a message's status back to READY, allowing it to be popped and processed again. ```APIDOC ## retry(message_id) ### Description Resets a message's status back to READY, allowing it to be popped and processed again. Useful for retrying failed tasks or releasing stuck locked messages. ### Parameters #### Request Body - **message_id** (string) - Required - The unique identifier of the message to retry. ``` -------------------------------- ### mark_failed - Mark Message as Failed Source: https://context7.com/litements/litequeue/llms.txt Marks a message as failed when processing encounters an error. ```APIDOC ## POST /mark_failed ### Description Marks a message as failed. Failed messages are excluded from the queue size count. ### Parameters #### Request Body - **message_id** (string) - Required - The ID of the message to mark as failed. ``` -------------------------------- ### Message Object and Status Enum Source: https://context7.com/litements/litequeue/llms.txt Messages are represented as frozen dataclasses with status tracked via the `MessageStatus` enum. Timing metrics are stored in nanoseconds for high precision. ```python from litequeue import LiteQueue, MessageStatus, Message q = LiteQueue(":memory:") msg = q.put("example") # Message attributes print(msg.data) # 'example' print(msg.message_id) # UUIDv7 string print(msg.status) # MessageStatus.READY print(msg.in_time) # Insertion time (nanoseconds) print(msg.lock_time) # None until popped print(msg.done_time) # None until done/failed # MessageStatus enum values print(MessageStatus.READY.value) # 0 print(MessageStatus.LOCKED.value) # 1 print(MessageStatus.DONE.value) # 2 print(MessageStatus.FAILED.value) # 3 # Compare status task = q.pop() if task.status == MessageStatus.LOCKED: print("Task is being processed") ``` -------------------------------- ### Mark Message as Failed Source: https://context7.com/litements/litequeue/llms.txt Use mark_failed to update a message status when processing errors occur. ```python from litequeue import LiteQueue, MessageStatus q = LiteQueue(":memory:") q.put("risky_operation") task = q.pop() try: # Simulate processing failure raise ValueError("Processing error") except Exception as e: q.mark_failed(task.message_id) # Check failed status failed_task = q.get(task.message_id) print(failed_task.status) # MessageStatus.FAILED (3) ``` -------------------------------- ### Retry Failed or Locked Message Source: https://context7.com/litements/litequeue/llms.txt Use `retry` to reset a message's status to READY, allowing it to be processed again. This is useful for retrying tasks that failed or releasing messages stuck in a locked state. ```python from litequeue import LiteQueue, MessageStatus q = LiteQueue(":memory:") q.put("retryable_task") task = q.pop() q.mark_failed(task.message_id) # Retry the failed task q.retry(task.message_id) # Message is now ready to be popped again retried_task = q.get(task.message_id) print(retried_task.status) # MessageStatus.READY (0) print(retried_task.done_time) # None (reset) # Can now pop again task = q.pop() print(task.data) # 'retryable_task' ``` -------------------------------- ### prune Source: https://context7.com/litements/litequeue/llms.txt Deletes all DONE messages from the database. ```APIDOC ## prune(include_failed) ### Description The prune method deletes all DONE messages from the database. Optionally removes FAILED messages as well. ### Parameters #### Request Body - **include_failed** (boolean) - Optional - Whether to also remove FAILED messages. ``` -------------------------------- ### pop - Retrieve and Lock Message Source: https://context7.com/litements/litequeue/llms.txt Retrieves the next available message from the queue and locks it for processing. ```APIDOC ## POST /pop ### Description Retrieves the next available message from the queue and locks it to prevent concurrent processing. Returns None if no messages are available. ### Response #### Success Response (200) - **data** (string) - The message content. - **status** (int) - The status (LOCKED). - **lock_time** (int) - The timestamp when the message was locked. ``` -------------------------------- ### Retrieve and Lock Messages Source: https://context7.com/litements/litequeue/llms.txt The pop method retrieves and locks the next available message. Returns None if the queue is empty. ```python from litequeue import LiteQueue, MessageStatus q = LiteQueue(":memory:") q.put("task_1") q.put("task_2") # Pop retrieves and locks the message task = q.pop() print(task.data) # 'task_1' print(task.status) # MessageStatus.LOCKED (1) print(task.lock_time) # Timestamp when locked (nanoseconds) # Process the task... # Then mark as done (see done() method) # Returns None when queue is empty or all messages are locked task = q.pop() task = q.pop() task = q.pop() # None - no more ready messages ``` -------------------------------- ### list_failed Source: https://context7.com/litements/litequeue/llms.txt Returns an iterator over all messages in FAILED status. ```APIDOC ## list_failed() ### Description The list_failed method returns an iterator over all messages in FAILED status. Useful for implementing dead letter queue patterns or manual review processes. ``` -------------------------------- ### List Stuck Locked Messages Source: https://context7.com/litements/litequeue/llms.txt The `list_locked` method identifies messages that have been in the LOCKED status beyond a specified time threshold. This is useful for detecting and recovering messages from crashed workers. ```python from litequeue import LiteQueue import time q = LiteQueue(":memory:") q.put("task_1") q.put("task_2") # Pop messages (simulating workers taking tasks) task1 = q.pop() task2 = q.pop() # Wait to simulate stuck workers time.sleep(0.5) # Find messages locked for more than 0.1 seconds stuck_messages = list(q.list_locked(threshold_seconds=0.1)) print(len(stuck_messages)) # 2 # Retry stuck messages for msg in stuck_messages: q.retry(msg.message_id) print(f"Retried: {msg.data}") ``` -------------------------------- ### Remove Completed Messages Source: https://context7.com/litements/litequeue/llms.txt The `prune` method deletes DONE messages from the database. By default, it also removes FAILED messages. Call this method periodically to manage database size. ```python from litequeue import LiteQueue q = LiteQueue(":memory:") # Process some messages for i in range(5): q.put(f"task_{i}") task = q.pop() q.done(task.message_id) # Prune completed messages q.prune() # Removes all DONE messages # Prune including failed messages (default behavior) q.prune(include_failed=True) # Prune only DONE, keep FAILED for review q.prune(include_failed=False) ``` -------------------------------- ### list_locked Source: https://context7.com/litements/litequeue/llms.txt Returns messages that have been in LOCKED status for longer than the specified threshold. ```APIDOC ## list_locked(threshold_seconds) ### Description The list_locked method returns messages that have been in LOCKED status for longer than the specified threshold. Useful for detecting and recovering from worker crashes. ### Parameters #### Request Body - **threshold_seconds** (float) - Required - The duration in seconds to check for locked messages. ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.