### Install persist-queue from source Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Clone the repository and install using setup.py. ```console git clone https://github.com/peter-wangxu/persist-queue cd persist-queue python setup.py install ``` -------------------------------- ### Install persist-queue with all features Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Install with support for all features, including extra and async. ```console pip install "persist-queue[extra,async]" ``` -------------------------------- ### Install persist-queue with extra features Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Install with support for msgpack, cbor, and MySQL. ```console pip install "persist-queue[extra]" ``` -------------------------------- ### AsyncQueue Producer-Consumer Example Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md A complete example demonstrating a producer putting items into an AsyncQueue and a consumer getting and processing them concurrently. Uses temporary directories for queue storage. ```python import asyncio import tempfile import os from persistqueue import AsyncQueue async def basic_example(): # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: queue_path = os.path.join(temp_dir, "async_queue") async with AsyncQueue(queue_path) as queue: # Producer async def producer(): for i in range(5): await queue.put(f"Task {i}") await asyncio.sleep(0.1) # Consumer async def consumer(): for i in range(5): item = await queue.get() print(f"Process: {item}") await queue.task_done() await asyncio.sleep(0.2) # Run concurrently await asyncio.gather(producer(), consumer()) await queue.join() # Run example asyncio.run(basic_example()) ``` -------------------------------- ### Async File-based and SQLite Queue Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Provides examples for using asynchronous file-based and SQLite queues with `async with`. Demonstrates putting, getting, updating, and task completion. ```python import asyncio from persistqueue import AsyncQueue, AsyncSQLiteQueue async def example(): # File-based async queue async with AsyncQueue("/path/to/queue") as queue: await queue.put("data item") item = await queue.get() await queue.task_done() # SQLite-based async queue async with AsyncSQLiteQueue("/path/to/queue.db") as queue: item_id = await queue.put({"key": "value"}) item = await queue.get() await queue.update({"key": "new_value"}, item_id) await queue.task_done() asyncio.run(example()) ``` -------------------------------- ### Basic Installation of persist-queue Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Standard command to install the persist-queue library using pip. ```bash pip install persist-queue ``` -------------------------------- ### Install Async Dependencies Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Install the necessary packages for the asynchronous API. Alternatively, install all dependencies from requirements.txt. ```bash pip install aiofiles aiosqlite ``` ```bash pip install -r requirements.txt ``` -------------------------------- ### Install persist-queue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Install the library with optional support for extra features like msgpack, cbor2, MySQL, or async. ```bash pip install persist-queue ``` ```bash pip install "persist-queue[extra]" ``` ```bash pip install "persist-queue[async]" ``` ```bash pip install "persist-queue[extra,async]" ``` -------------------------------- ### Manual Async Dependency Installation Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Installs the core asynchronous libraries `aiofiles` and `aiosqlite` manually, along with their specified version requirements. ```bash pip install aiofiles>=0.8.0 aiosqlite>=0.17.0 ``` -------------------------------- ### Async Support Installation for persist-queue Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Installs persist-queue with optional dependencies required for asynchronous support. ```bash pip install "persist-queue[async]" ``` -------------------------------- ### MySQL Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates connecting to and using a MySQL-based queue. Requires valid database credentials and table setup. Items are retrieved and marked as done. ```python >>> import persistqueue >>> q = persistqueue.MySQLQueue( ... host='localhost', ... port=3306, ... user='testuser', ... password='testpass', ... database='testdb', ... table_name='test_queue' ... ) >>> q.put('item1') >>> q.put('item2') >>> q.put('item3') >>> q.get() 'item1' >>> q.task_done() >>> q.get() 'item2' >>> q.task_done() >>> q.size 1 ``` -------------------------------- ### Install Development Dependencies Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Install development and extra dependencies using pip. This is necessary for setting up a development environment. ```console pip install -r test-requirements.txt ``` ```console pip install -r extra-requirements.txt ``` -------------------------------- ### AsyncQueue Usage Example Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Demonstrates basic enqueue/dequeue operations, size checks, and concurrent producer-consumer patterns with AsyncQueue. Requires `aiofiles`. ```python import asyncio from persistqueue import AsyncQueue from persistqueue.exceptions import Empty async def main(): async with AsyncQueue("./async_file_queue", maxsize=0, autosave=False) as q: # Enqueue items await q.put("job-alpha") await q.put("job-beta") await q.put("job-gamma") print(await q.qsize()) # 3 print(await q.empty()) # False # Blocking get with timeout try: item = await q.get(block=True, timeout=2.0) print(f"Got: {item}") # Got: job-alpha await q.task_done() except Empty: print("Timed out") # Concurrent producer + consumer async def producer(queue, items): for i in items: await queue.put(i) await asyncio.sleep(0.05) async def consumer(queue, n): for _ in range(n): item = await queue.get() print(f"Consumed: {item}") await queue.task_done() await asyncio.gather( producer(q, ["task-1", "task-2", "task-3"]), consumer(q, 3), ) await q.join() # wait until all task_done() calls are matched asyncio.run(main()) ``` -------------------------------- ### AsyncSQLiteQueue Examples Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Illustrates FIFO, FILO, and unique queue behaviors using AsyncSQLiteQueue, including data updates and timeout handling. Requires `aiosqlite`. ```python import asyncio import tempfile, os from persistqueue import AsyncSQLiteQueue, AsyncFILOSQLiteQueue, AsyncUniqueQ from persistqueue.exceptions import Empty async def main(): with tempfile.TemporaryDirectory() as tmpdir: # --- FIFO SQLite async queue --- async with AsyncSQLiteQueue(os.path.join(tmpdir, "fifo.db")) as q: id1 = await q.put({"user": "alice", "action": "login"}) id2 = await q.put({"user": "bob", "action": "purchase"}) item = await q.get() print(item) # {'user': 'alice', 'action': 'login'} # Update data in-place by row id await q.update({"user": "bob", "action": "refund"}, id2) item2 = await q.get(raw=True) # raw dict includes pqid print(item2) # {'pqid': 2, 'data': {'user': 'bob', ...}, ...} await q.task_done() # --- FILO async queue --- async with AsyncFILOSQLiteQueue(os.path.join(tmpdir, "filo.db")) as q: await q.put("first") await q.put("second") print(await q.get()) # second (LIFO) # --- Unique async queue (deduplication) --- async with AsyncUniqueQ(os.path.join(tmpdir, "unique.db")) as q: await q.put("https://example.com") await q.put("https://example.com") # ignored await q.put("https://other.com") print(await q.qsize()) # 2 # --- Timeout handling --- async with AsyncSQLiteQueue(os.path.join(tmpdir, "timeout.db")) as q: try: item = await q.get(block=True, timeout=0.5) except Empty: print("Empty queue — timed out as expected") asyncio.run(main()) ``` -------------------------------- ### Basic AsyncQueue Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Demonstrates the fundamental usage of `AsyncQueue`, including initialization with `async with`, putting data, getting data, and managing task completion. ```python import asyncio from persistqueue import AsyncQueue async def example(): async with AsyncQueue("/path/to/queue") as queue: await queue.put("data item") item = await queue.get() await queue.task_done() await queue.join() asyncio.run(example()) ``` -------------------------------- ### MySQLQueue: MySQL-backed FIFO Queue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Implement a FIFO queue using MySQL as the backend, leveraging connection pooling for efficiency. Requires `pip install "persist-queue[extra]"`. Use `task_done()` after processing an item retrieved with `get()`. The `join()` method blocks until all tasks are completed. ```python import persistqueue q = persistqueue.MySQLQueue( host="localhost", port=3306, user="queueuser", passwd="secret", db_name="appdb", name="job_queue", # table name prefix auto_commit=True, charset="utf8mb4", ) q.put({"type": "email", "recipient": "alice@example.com"}) q.put({"type": "email", "recipient": "bob@example.com"}) print(q.size) # 2 item = q.get(block=True, timeout=5.0) print(item) # {'type': 'email', 'recipient': 'alice@example.com'} q.task_done() # Multi-threaded usage import threading def worker(): while True: task = q.get() print(f"Sending email to {task['recipient']}") q.task_done() for _ in range(4): t = threading.Thread(target=worker, daemon=True) t.start() q.put({"type": "email", "recipient": "carol@example.com"}) q.join() ``` -------------------------------- ### Basic File Queue Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates creating, adding to, and getting items from a basic file-based queue. Remember to call task_done() after processing an item. ```python from persistqueue import Queue # Create a queue q = Queue("my_queue_path") # Add items q.put("item1") q.put("item2") # Get items item = q.get() print(item) # "item1" # Mark as done q.task_done() ``` -------------------------------- ### Asynchronous Database Operations with aiosqlite Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Shows how to execute asynchronous database operations using `aiosqlite`. This requires `aiosqlite` to be installed. ```python async with aiosqlite.connect(db_path) as conn: cursor = await conn.execute(sql, params) await conn.commit() ``` -------------------------------- ### SQLite Queue Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates creating, adding to, and getting items from an SQLite-based queue. Use auto_commit=True for automatic commits. ```python import persistqueue # Create SQLite queue q = persistqueue.SQLiteQueue('my_queue.db', auto_commit=True) # Add items q.put('data1') q.put('data2') # Get items item = q.get() print(item) # "data1" ``` -------------------------------- ### Sync vs Async Queue Migration Example Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Compares the syntax for basic queue operations between the synchronous and asynchronous APIs. Note the use of `await` and `async with` in the asynchronous version. ```python # Sync version from persistqueue import Queue queue = Queue("/path/to/queue") queue.put("data") item = queue.get() queue.task_done() # Async version from persistqueue import AsyncQueue async with AsyncQueue("/path/to/queue") as queue: await queue.put("data") item = await queue.get() await queue.task_done() ``` -------------------------------- ### Serialization Options Example Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Imports the Queue class and available serializers from the persistqueue library, indicating support for multiple serialization protocols. ```python >>> from persistqueue import Queue >>> from persistqueue import serializers ``` -------------------------------- ### MySQL Queue Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates creating, adding to, and getting items from a MySQL-based queue. Ensure you provide correct connection details and call task_done() after processing. ```python import persistqueue # Create MySQL queue q = persistqueue.MySQLQueue( host='localhost', port=3306, user='username', password='password', database='testdb', table_name='my_queue' ) # Add items q.put('data1') q.put('data2') # Get items item = q.get() print(item) # "data1" # Mark as done q.task_done() ``` -------------------------------- ### Asynchronous File Operations with aiofiles Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Demonstrates how to perform asynchronous file write operations using `aiofiles`. Ensure `aiofiles` is installed for this functionality. ```python async with aiofiles.open(filename, mode) as f: await f.write(data) await f.flush() await f.fsync() ``` -------------------------------- ### AsyncQueue Basic Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Demonstrates basic put, get, and task_done operations with AsyncQueue. Ensure to close the queue when done. ```python import asyncio from persistqueue import AsyncQueue async def example(): async with AsyncQueue("/path/to/queue") as queue: # Put data await queue.put("data item") # Get data item = await queue.get() # Mark task as done await queue.task_done() # Wait for all tasks to complete await queue.join() ``` -------------------------------- ### AsyncSQLiteQueue Basic Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Shows how to use AsyncSQLiteQueue for putting, getting, and updating data. It returns and accepts record IDs for operations. ```python import asyncio from persistqueue import AsyncSQLiteQueue async def example(): async with AsyncSQLiteQueue("/path/to/queue.db") as queue: # Put data, returns record ID item_id = await queue.put({"key": "value"}) # Get data item = await queue.get() # Update data await queue.update({"key": "new_value"}, item_id) # Mark task as done await queue.task_done() ``` -------------------------------- ### Pluggable Serializers with Queue and SQLiteQueue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Demonstrates using different serializers (pickle, JSON, msgpack, cbor2) with both file-based and SQLite-based queues. Ensure extra packages are installed for JSON, msgpack, and cbor2. ```python from persistqueue import Queue, SQLiteQueue from persistqueue import serializers # --- pickle (default, supports any Python object) --- q_pickle = Queue("./q_pickle", serializer=serializers.pickle) q_pickle.put({"key": [1, 2, 3], "nested": {"a": True}}) print(q_pickle.get()) # {'key': [1, 2, 3], 'nested': {'a': True}} q_pickle.task_done() # --- JSON (human-readable, only JSON-serializable types) --- q_json = Queue("./q_json", serializer=serializers.json) q_json.put({"event": "page_view", "url": "/home", "ts": 1700000000}) print(q_json.get()) q_json.task_done() # --- msgpack (compact binary, requires msgpack package) --- q_msgpack = Queue("./q_msgpack", serializer=serializers.msgpack) q_msgpack.put({"sensor": "temp", "value": 23.5}) print(q_msgpack.get()) q_msgpack.task_done() # --- cbor2 (CBOR binary, requires cbor2 package) --- q_cbor = Queue("./q_cbor", serializer=serializers.cbor2) q_cbor.put({"binary_data": b"\x00\xff\xfe"}) print(q_cbor.get()) q_cbor.task_done() # Serializers also work with SQLiteQueue q_sql_json = SQLiteQueue("./sql_json.db", serializer=serializers.json, auto_commit=True) q_sql_json.put({"id": 42, "status": "pending"}) print(q_sql_json.get()) ``` -------------------------------- ### Multi-threading with MySQL Queue Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Shows a multi-threaded setup for a MySQL queue. Workers fetch items, perform actions, and signal task completion. Ensure correct database credentials. ```python from persistqueue import MySQLQueue from threading import Thread q = MySQLQueue( host='localhost', port=3306, user='username', password='password', database='testdb', table_name='my_queue' ) def worker(): while True: item = q.get() do_work(item) q.task_done() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # Block until all tasks are done ``` -------------------------------- ### Async SQLite Queue Operations Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Shows basic operations (put, get, update) using AsyncSQLiteQueue. Ensure the database path is correctly managed and the file is cleaned up afterwards. ```python async def sqlite_example(): with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp_file: db_path = tmp_file.name try: async with AsyncSQLiteQueue(db_path) as queue: # Put data item_id = await queue.put({"name": "John", "age": 30}) print(f"Inserted record ID: {item_id}") # Get data item = await queue.get() print(f"Got: {item}") # Update data await queue.update({"name": "Jane", "age": 31}, item_id) await queue.task_done() finally: os.unlink(db_path) ``` -------------------------------- ### Async Queue Timeout Handling Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Illustrates how to handle timeouts when getting items from an empty queue using AsyncQueue. A timeout exception is raised if no item is available within the specified duration. ```python async def timeout_example(): with tempfile.TemporaryDirectory() as temp_dir: queue_path = os.path.join(temp_dir, "timeout_queue") async with AsyncQueue(queue_path) as queue: # Try to get from empty queue with timeout try: item = await queue.get(timeout=1.0) print(f"Got: {item}") except Exception as e: print(f"Timeout: {e}") # Put data and try again await queue.put("Test data") item = await queue.get(timeout=1.0) print(f"Successfully got: {item}") await queue.task_done() ``` -------------------------------- ### Handling Empty and Full Exceptions in Persist Queue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Use Empty exception when getting from an empty queue (non-blocking or timed-out). Use Full exception when putting to a full bounded queue (non-blocking or timed-out). ```python from persistqueue import Queue, SQLiteQueue from persistqueue.exceptions import Empty, Full # Empty — raised on get() from an empty queue (non-blocking or timed-out) q = SQLiteQueue("./exc_demo.db", auto_commit=True) try: q.get(block=False) except Empty: print("Queue is empty") try: q.get(block=True, timeout=0.1) except Empty: print("Timed out waiting for item") # Full — raised on put() to a full bounded queue bounded = Queue("./bounded_queue", maxsize=2) bounded.put("item-1") bounded.put("item-2") try: bounded.put("item-3", block=False) except Full: print("Queue is full — cannot enqueue") try: bounded.put("item-3", block=True, timeout=0.5) except Full: print("Still full after 0.5 s timeout") ``` -------------------------------- ### File-based Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates creating a file-based queue, putting items into it, and retrieving them. Ensure the path is valid. ```python >>> from persistqueue import Queue >>> q = Queue("mypath") >>> q.put('a') >>> q.put('b') >>> q.put('c') >>> q.get() 'a' >>> q.task_done() ``` -------------------------------- ### Initialize Queue with Different Serializers Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates initializing a Queue with various serializers like pickle, MessagePack, CBOR, and JSON. Ensure the desired serializer is imported. ```python >>> q = Queue('mypath', serializer=serializers.pickle) ``` ```python >>> q = Queue('mypath', serializer=serializers.msgpack) ``` ```python >>> q = Queue('mypath', serializer=serializers.cbor2) ``` ```python >>> q = Queue('mypath', serializer=serializers.json) ``` -------------------------------- ### Persistent Dictionary Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Shows how to create and use a persistent dictionary. Supports item assignment, retrieval, length checking, and deletion. Raises KeyError for non-existent keys. ```python >>> from persistqueue import PDict >>> q = PDict("testpath", "testname") >>> q['key1'] = 123 >>> q['key2'] = 321 >>> q['key1'] 123 >>> len(q) 2 >>> del q['key1'] >>> q['key1'] KeyError: 'Key: key1 not exists.' ``` -------------------------------- ### SQLite3-based Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Shows how to create a SQLite3-based queue with auto-commit enabled, add items, and retrieve them. The queue is deleted after use. ```python >>> import persistqueue >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True) >>> q.put('str1') >>> q.put('str2') >>> q.put('str3') >>> q.get() 'str1' >>> del q ``` -------------------------------- ### Async Queue with Multiple Producers and Consumers Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Demonstrates how to use AsyncQueue with multiple producers and consumers concurrently. Ensure all producers and consumers are properly managed within an asyncio event loop. ```python async def multi_producer_consumer(): with tempfile.TemporaryDirectory() as temp_dir: queue_path = os.path.join(temp_dir, "multi_queue") async with AsyncQueue(queue_path) as queue: # Multiple producers async def producer(producer_id): for i in range(3): await queue.put(f"Producer{producer_id}-Task{i}") await asyncio.sleep(0.1) # Multiple consumers async def consumer(consumer_id): for i in range(3): item = await queue.get() print(f"Consumer{consumer_id}: {item}") await queue.task_done() await asyncio.sleep(0.2) # Create multiple producers and consumers producers = [producer(i) for i in range(2)] consumers = [consumer(i) for i in range(2)] await asyncio.gather(*producers, *consumers) await queue.join() ``` -------------------------------- ### Priority Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Illustrates the creation and use of a priority queue, where items are retrieved based on their assigned priority. Lower numbers indicate higher priority. ```python >>> import persistqueue >>> q = persistqueue.PriorityQueue('mypath') >>> q.put('low', priority=10) >>> q.put('high', priority=1) >>> q.put('mid', priority=5) >>> q.get() 'high' >>> q.get() 'mid' >>> q.get() 'low' ``` -------------------------------- ### AsyncQueue Constructor and Methods Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Documentation for the AsyncQueue class, including its constructor parameters and main asynchronous methods for queue operations. ```APIDOC ## AsyncQueue ### Constructor ```python AsyncQueue( path: str, # Queue storage path maxsize: int = 0, # Maximum queue size, 0 means unlimited chunksize: int = 100, # Number of entries in each chunk file tempdir: Optional[str] = None, # Temporary file directory serializer: Any = pickle, # Serializer autosave: bool = False # Whether to auto-save ) ``` ### Main Methods - `async put(item, block=True, timeout=None)` - Put item into queue - `async put_nowait(item)` - Non-blocking put - `async get(block=True, timeout=None)` - Get item from queue - `async get_nowait()` - Non-blocking get - `async task_done()` - Mark task as done - `async join()` - Wait for all tasks to complete - `async qsize()` - Get queue size - `async empty()` - Check if empty - `async full()` - Check if full - `async close()` - Close queue ``` -------------------------------- ### Async Queue Usage (v1.1.0+) Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates using the asynchronous file-based queue within an async function. Use 'async with' for proper resource management. ```python import asyncio from persistqueue import AsyncQueue async def main(): async with AsyncQueue("/path/to/queue") as queue: await queue.put("async item") ``` -------------------------------- ### Run Benchmarks for Development Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Run performance benchmarks during development. This command executes the benchmark script with default settings. ```console python benchmark/run_benchmark.py 1000 ``` -------------------------------- ### AsyncSQLiteQueue Constructor and Methods Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/async_api.md Documentation for the AsyncSQLiteQueue class, including its constructor parameters and main asynchronous methods for queue operations with transaction support. ```APIDOC ## AsyncSQLiteQueue ### Constructor ```python AsyncSQLiteQueue( path: str, # Database file path name: str = 'default', # Queue name serializer: Any = pickle, # Serializer auto_commit: bool = True # Whether to auto-commit transactions ) ``` ### Main Methods - `async put(item, block=True)` - Put item into queue, returns record ID - `async put_nowait(item)` - Non-blocking put - `async get(block=True, timeout=None, id=None, raw=False)` - Get item from queue - `async get_nowait(id=None, raw=False)` - Non-blocking get - `async update(item, id=None)` - Update item - `async task_done()` - Mark task as done - `async qsize()` - Get queue size - `async empty()` - Check if empty - `async full()` - Check if full - `async close()` - Close queue ``` -------------------------------- ### Asynchronous Context Manager Implementation Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Provides the implementation for an asynchronous context manager, enabling the use of `async with` for resource management like initialization and closing. ```python async def __aenter__(self): await self._async_init() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() ``` -------------------------------- ### AsyncQueue with Multiple Producers and Consumers Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Illustrates setting up multiple producer and consumer tasks that operate concurrently on a single `AsyncQueue`. Uses `asyncio.gather` to manage concurrent tasks. ```python async def multi_producer_consumer(): async with AsyncQueue("/path/to/queue") as queue: # Multiple producers producers = [producer(i) for i in range(4)] # Multiple consumers consumers = [consumer(i) for i in range(4)] await asyncio.gather(*producers, *consumers) await queue.join() ``` -------------------------------- ### AsyncSQLiteQueue Basic Operations Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Shows the basic usage of `AsyncSQLiteQueue`, including putting data, retrieving it, updating an item by its ID, and marking tasks as done. ```python async with AsyncSQLiteQueue("/path/to/queue.db") as queue: item_id = await queue.put({"key": "value"}) item = await queue.get() await queue.update({"key": "new_value"}, item_id) await queue.task_done() ``` -------------------------------- ### Multi-threading with SQLite3 FIFO Queue Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates setting up a multi-threaded worker pool for a SQLite3 FIFO queue. Requires `multithreading=True` during initialization. Uses `q.join()` to wait for tasks. ```python from persistqueue import FIFOSQLiteQueue from threading import Thread q = FIFOSQLiteQueue(path="./test", multithreading=True) def worker(): while True: item = q.get() do_work(item) for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # Block until all tasks are done ``` -------------------------------- ### Unique Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Demonstrates creating a unique queue that automatically ignores duplicate entries. The size attribute reflects the number of unique items. ```python >>> import persistqueue >>> q = persistqueue.UniqueQ('mypath') >>> q.put('str1') >>> q.put('str1') # Duplicate ignored >>> q.size 1 >>> q.put('str2') >>> q.size 2 ``` -------------------------------- ### Run Benchmarks Directly Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Execute performance benchmarks directly using the provided Python script. Specify the number of items and output format (rst, console, or json). ```console python benchmark/run_benchmark.py 1000 rst ``` -------------------------------- ### Multi-threading with File-based Queue Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Illustrates using a file-based queue with multiple worker threads. Each worker continuously retrieves items, processes them, and marks tasks as done. ```python from persistqueue import Queue from threading import Thread q = Queue() def worker(): while True: item = q.get() do_work(item) q.task_done() for i in range(num_worker_threads): t = Thread(target=worker) t.daemon = True t.start() for item in source(): q.put(item) q.join() # Block until all tasks are done ``` -------------------------------- ### Run Tests with Tox Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Execute tests using tox. Supports running tests for specific Python versions, checking code style, and generating coverage reports. ```console # Run tests for specific Python version tox -e py312 ``` ```console # Run code style checks tox -e pep8 ``` ```console # Generate coverage report tox -e cover ``` -------------------------------- ### Acknowledgment Queue Initialization and Usage Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Shows how to use an acknowledgment queue for reliable message processing. Items can be marked as acknowledged, retried (nacked), or failed. ```python >>> import persistqueue >>> ackq = persistqueue.SQLiteAckQueue('path') >>> ackq.put('str1') >>> item = ackq.get() >>> # Process the item >>> ackq.ack(item) # Mark as completed >>> # Or if processing failed: >>> ackq.nack(item) # Mark for retry >>> ackq.ack_failed(item) # Mark as failed ``` -------------------------------- ### Run Benchmarks with Tox Source: https://github.com/peter-wangxu/persist-queue/blob/master/README.rst Execute performance benchmarks using tox. The 'rst' argument generates output in reStructuredText table format. ```console tox -e bench -- rst ``` -------------------------------- ### PDict: Persistent Dictionary with SQLite3 Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Use PDict for a persistent key-value store backed by SQLite3, mimicking a subset of the Python `dict` interface. Values are serialized and survive process restarts. Supports standard dictionary operations like item assignment, retrieval, deletion, and membership testing. ```python from persistqueue import PDict # Create (or reopen) a persistent dict stored at path/name pd = PDict(path="./pdict_store", name="app_config") # Set values pd["db_host"] = "localhost" pd["db_port"] = 5432 pd["feature_flags"] = {"dark_mode": True, "beta": False} # Get values print(pd["db_host"]) # localhost print(pd["db_port"]) # 5432 # Default fallback print(pd.get("missing_key", "default_value")) # default_value # Membership test print("db_host" in pd) # True print("nonexistent" in pd) # False # Count entries print(len(pd)) # 3 # Update a value pd["db_port"] = 5433 # Delete a key del pd["feature_flags"] print(len(pd)) # 2 # Raises KeyError on missing key try: _ = pd["feature_flags"] except KeyError as e: print(e) # 'Key: feature_flags not exists.' ``` -------------------------------- ### Asynchronous Synchronization Primitives Source: https://github.com/peter-wangxu/persist-queue/blob/master/docs/ASYNC_IMPLEMENTATION.md Illustrates the use of `asyncio` synchronization primitives like `Lock` and `Condition` for managing concurrent access in asynchronous operations. ```python self._lock = asyncio.Lock() self._not_empty = asyncio.Condition(self._lock) self._not_full = asyncio.Condition(self._lock) ``` -------------------------------- ### SQLite3 FILO (Stack) Queue (FILOSQLiteQueue) Source: https://context7.com/peter-wangxu/persist-queue/llms.txt FILOSQLiteQueue implements a Last-In-First-Out queue using SQLite3, suitable for stack-oriented workflows by selecting the highest _id row. ```python import persistqueue q = persistqueue.FILOSQLiteQueue(path="./filo_queue.db", auto_commit=True) q.put("first") q.put("second") q.put("third") print(q.get()) # third ← last in, first out print(q.get()) # second print(q.get()) # first ``` -------------------------------- ### SQLiteAckQueue: Acknowledgment Queue with Status Tracking Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Utilize SQLiteAckQueue for robust item lifecycle management, tracking statuses from `inited` to `acked` or `ack_failed`. Set `auto_resume=True` to automatically re-queue unacknowledged items on restart. Use `ack()`, `nack()`, and `ack_failed()` to manage item states. `multithreading=True` is recommended for concurrent access. ```python import persistqueue from persistqueue.sqlackqueue import AckStatus ackq = persistqueue.SQLiteAckQueue( path="./ack_queue.db", auto_resume=True, # re-enqueue unack'd items on restart auto_commit=True, multithreading=True, ) ackq.put({"order_id": 101, "action": "ship"}) ackq.put({"order_id": 102, "action": "ship"}) ackq.put({"order_id": 103, "action": "ship"}) print(ackq.size) # 3 print(ackq.ready_count()) # 3 # Normal processing flow item = ackq.get(raw=True) # raw=True includes pqid for direct id-based ack print(item) # {'pqid': 1, 'data': {'order_id': 101, ...}, 'timestamp': ...} try: # do work... ackq.ack(item) # mark as successfully processed except Exception: ackq.nack(item) # requeue for retry # Permanently fail an item item2 = ackq.get() ackq.ack_failed(item2) # mark as permanently failed print(ackq.acked_count()) # 1 print(ackq.ack_failed_count()) # 1 # Update item data while it's in-flight item3 = ackq.get(raw=True) ackq.update({"order_id": 999, "action": "cancel"}, id=item3["pqid"]) ackq.ack(id=item3["pqid"]) # Cleanup old acked records periodically ackq.clear_acked_data(max_delete=500, keep_latest=100, clear_ack_failed=True) ``` -------------------------------- ### PriorityQueue: SQLite3 Priority Queue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Implement a priority queue using SQLite where items are retrieved based on an ascending priority integer, with insertion order as a tiebreaker. Lower numerical priority values indicate higher priority. `auto_commit=True` ensures changes are saved immediately. ```python import persistqueue q = persistqueue.PriorityQueue(path="./priority_queue.db", auto_commit=True) q.put("low-priority-task", priority=10) q.put("medium-priority-task", priority=5) q.put("critical-task", priority=1) q.put("another-critical", priority=1) # same priority → FIFO tiebreak print(q.get()) # critical-task (priority=1, inserted first) print(q.get()) # another-critical (priority=1, inserted second) print(q.get()) # medium-priority-task print(q.get()) # low-priority-task ``` -------------------------------- ### SQLite3 FIFO Queue (SQLiteQueue) Source: https://context7.com/peter-wangxu/persist-queue/llms.txt SQLiteQueue (or FIFOSQLiteQueue) uses SQLite3 with WAL mode for high throughput. Configure auto_commit for immediate persistence or manual commits via task_done(). Enable multithreading for concurrent access. ```python import persistqueue from persistqueue.exceptions import Empty # Open (or create) an SQLite-backed FIFO queue q = persistqueue.SQLiteQueue( path="./sqlite_queue.db", auto_commit=True, multithreading=True, ) # Enqueue – returns the integer row ID id1 = q.put({"job": "resize-image", "file": "photo.jpg"}) id2 = q.put({"job": "send-email", "to": "user@example.com"}) id3 = q.put({"job": "transcode", "file": "video.mp4"}) print(q.size) # 3 print(q.empty()) # False # Dequeue FIFO order item = q.get(block=True, timeout=3.0) print(item) # {'job': 'resize-image', 'file': 'photo.jpg'} # Non-auto-commit mode: manual transaction control q2 = persistqueue.SQLiteQueue("./sqlite_batch.db", auto_commit=False) q2.put("batch-item-1") q2.put("batch-item-2") item = q2.get() q2.task_done() # commits the transaction # Restart-safe: reopen the same path and items are still there q3 = persistqueue.SQLiteQueue("./sqlite_queue.db", auto_commit=True) print(q3.size) # 2 (id2 and id3 remain) ``` -------------------------------- ### File-based Persistent FIFO Queue (Queue) Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Use Queue for a file-based persistent FIFO queue. Items are serialized to disk immediately on put. Use task_done() to persist dequeue operations unless autosave is True. ```python import threading from persistqueue import Queue from persistqueue.exceptions import Empty # Create (or reopen) a durable queue at the given directory path q = Queue( path="./my_queue", maxsize=0, # 0 = unlimited chunksize=100, # items per chunk file autosave=False, # persist on task_done() (default) ) # Producer: enqueue items (always immediately persisted) for task in ["task-1", "task-2", "task-3"]: q.put(task) print(q.qsize()) # 3 # Blocking get with timeout try: item = q.get(block=True, timeout=5.0) print(f"Processing: {item}") # Processing: task-1 q.task_done() # persists the dequeue except Empty: print("Queue empty after timeout") # Non-blocking get try: item = q.get_nowait() print(f"Got: {item}") # Got: task-2 q.task_done() except Empty: print("Nothing to get right now") # Multi-threaded producer/consumer pattern def worker(q): while True: item = q.get() print(f"Worker processed: {item}") q.task_done() t = threading.Thread(target=worker, args=(q,), daemon=True) t.start() q.put("task-4") q.join() # blocks until all task_done() calls balance put() calls ``` -------------------------------- ### UniqueQ: Deduplicated SQLite Queue Source: https://context7.com/peter-wangxu/persist-queue/llms.txt Use UniqueQ to automatically drop duplicate items based on a SQLite UNIQUE constraint. This is ideal for work queues where reprocessing the same item is redundant. Ensure `auto_commit=True` for immediate persistence. ```python import persistqueue q = persistqueue.UniqueQ(path="./unique_queue.db", auto_commit=True) q.put("url-https://example.com/page1") q.put("url-https://example.com/page2") q.put("url-https://example.com/page1") # duplicate — silently ignored print(q.size) # 2 print(q.get()) # url-https://example.com/page1 print(q.get()) # url-https://example.com/page2 ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.