### Install Kazoo via Pip Source: https://github.com/python-zk/kazoo/blob/master/docs/install.md Use this command to install the Kazoo library. No Zookeeper C bindings are required. ```bash pip install kazoo ``` -------------------------------- ### KazooTestCase Example Source: https://github.com/python-zk/kazoo/blob/master/docs/testing.md KazooTestCase offers a complete test case equivalent to using KazooTestHarness as a mixin. It simplifies setup by providing the client directly within the test methods. ```python from kazoo.testing import KazooTestCase class MyTest(KazooTestCase): def testmycode(self): self.client.ensure_path('/test/path') result = self.client.get('/test/path') ... ``` -------------------------------- ### KazooTestHarness Setup Source: https://github.com/python-zk/kazoo/blob/master/docs/testing.md Use KazooTestHarness by mixing it into your test class and calling setup_zookeeper() in setUp and teardown_zookeeper() in tearDown. This provides a client for testing Zookeeper interactions. ```python from kazoo.testing import KazooTestHarness class MyTest(KazooTestHarness): def setUp(self): self.setup_zookeeper() def tearDown(self): self.teardown_zookeeper() def testmycode(self): self.client.ensure_path('/test/path') result = self.client.get('/test/path') ... ``` -------------------------------- ### Basic Logging Setup for Kazoo Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Configure basic logging to avoid 'No handlers could be found' messages from Kazoo. This is a minimal setup; more advanced configurations are available in Python's logging tutorial. ```python import logging logging.basicConfig() ``` -------------------------------- ### Initialize and Start Kazoo Client Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Create a KazooClient instance and establish a connection to a Zookeeper server. Ensure Zookeeper is running on the specified host and port. ```python from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181') zk.start() ``` -------------------------------- ### Simple Barrier Example Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates creating, waiting on, and removing a simple barrier. Use this to block processes until a specific condition is met and then unblock them. ```python from kazoo.client import KazooClient from kazoo.recipe.barrier import Barrier, DoubleBarrier import threading import time zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # SIMPLE BARRIER - Block until condition is met barrier = zk.Barrier("/barriers/simple") # Create the barrier (blocks waiting processes) barrier.create() # In another process/thread: wait for barrier def wait_for_start(): barrier = zk.Barrier("/barriers/simple") print("Waiting for barrier to be removed...") cleared = barrier.wait(timeout=30) if cleared: print("Barrier cleared, proceeding!") else: print("Timeout waiting for barrier") # Remove the barrier (unblocks all waiters) barrier.remove() ``` -------------------------------- ### Async Get with Callback Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates performing an asynchronous GET operation and processing the result using a callback function. Handles Node not found and Connection lost errors. ```python def get_callback(async_obj): try: data, stat = async_obj.get() print(f"Async get result: {data.decode('utf-8')}") except NoNodeError: print("Node not found") except ConnectionLossException: print("Connection lost") async_obj = zk.get_async("/async/node") async_obj.rawlink(get_callback) # Callback called when ready ``` -------------------------------- ### Initialize and Manage KazooClient Connections Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates how to create a KazooClient instance with basic and advanced configurations, including SSL/TLS and SASL authentication. Includes starting, stopping, and listening for connection state changes. ```python from kazoo.client import KazooClient, KazooState import logging # Basic logging setup logging.basicConfig() # Create client with basic configuration zk = KazooClient(hosts='127.0.0.1:2181') # Start the connection (blocks until connected or timeout) zk.start(timeout=15) # Check connection status print(f"Connected: {zk.connected}") # Add a state listener for connection events def connection_listener(state): if state == KazooState.LOST: print("Session lost - ephemeral nodes will be removed") elif state == KazooState.SUSPENDED: print("Connection suspended - commands cannot run") elif state == KazooState.CONNECTED: print("Connected/Reconnected to ZooKeeper") zk.add_listener(connection_listener) # Create client with advanced configuration zk_advanced = KazooClient( hosts='zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181', timeout=10.0, read_only=True, # Allow connections to read-only servers randomize_hosts=True, # Randomize host selection connection_retry=dict(max_tries=5, delay=0.5, backoff=2), command_retry=dict(max_tries=3, delay=0.1) ) # SSL/TLS connection zk_ssl = KazooClient( hosts='127.0.0.1:2281', use_ssl=True, certfile='/path/to/client.crt', keyfile='/path/to/client.key', ca='/path/to/ca.crt', verify_certs=True ) # SASL authentication zk_sasl = KazooClient( hosts='127.0.0.1:2181', sasl_options={ 'mechanism': 'DIGEST-MD5', 'username': 'myuser', 'password': 'mypassword' } ) # Clean shutdown zk.stop() zk.close() ``` -------------------------------- ### Clone Repository and Build Kazoo Source: https://github.com/python-zk/kazoo/blob/master/CONTRIBUTING.md Clone the Kazoo repository and run the build scripts to set up your development environment. Ensure you have Python and Java installed. ```bash git clone git@github.com:/kazoo.git cd kazoo make ``` -------------------------------- ### Double Barrier Example Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates the use of a DoubleBarrier for synchronizing the start and end of a distributed computation. All workers must enter before any can proceed, and all must leave before the barrier is fully released. ```python from kazoo.client import KazooClient from kazoo.recipe.barrier import Barrier, DoubleBarrier import threading import time zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # DOUBLE BARRIER - Synchronize start and end of computation def worker(worker_id, num_workers): """Worker that participates in double barrier.""" client = KazooClient(hosts='127.0.0.1:2181') client.start() barrier = DoubleBarrier( client, "/barriers/computation", num_clients=num_workers, identifier=f"worker-{worker_id}" ) # Enter barrier (blocks until all workers enter) print(f"Worker {worker_id}: entering barrier") barrier.enter() print(f"Worker {worker_id}: all workers entered, starting work") # Do computation time.sleep(2) print(f"Worker {worker_id}: work complete") # Leave barrier (blocks until all workers leave) print(f"Worker {worker_id}: leaving barrier") barrier.leave() print(f"Worker {worker_id}: all workers left") client.stop() # Start 3 workers (in practice, these would be separate processes) threads = [] for i in range(3): t = threading.Thread(target=worker, args=(i, 3)) threads.append(t) t.start() for t in threads: t.join() zk.stop() ``` -------------------------------- ### Example Commit Message Source: https://github.com/python-zk/kazoo/blob/master/CONTRIBUTING.md Illustrates the recommended format for commit messages, including subject, body, footer with breaking changes, and issue references. ```text feat(core): add tasty cookies to the client handler Properly formatted commit messages provide understandable history and documentation. This patch will provide a delicious cookie when all tests have passed and the commit message is properly formatted. BREAKING CHANGE: This patch requires developer to lower expectations about what "delicious" and "cookie" may mean. Some sadness may result. Closes #3.14, #9.75 ``` -------------------------------- ### Parallel Async Operations Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates initiating multiple asynchronous GET operations in parallel and then collecting and processing their results. ```python zk.ensure_path("/async/parallel") for i in range(5): zk.create(f"/async/parallel/node-{i}", f"data-{i}".encode()) # Fire off multiple async operations async_results = [] for i in range(5): result = zk.get_async(f"/async/parallel/node-{i}") async_results.append(result) # Collect all results for i, result in enumerate(async_results): data, stat = result.get() print(f"Node {i}: {data.decode('utf-8')}") ``` -------------------------------- ### SetPartitioner - Divide Work Items Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates setting up a SetPartitioner to divide a defined set of work items among group members. This example initializes the partitioner with a set of items. ```python # SET PARTITIONER - Divide work among members # Define the set of items to partition all_items = {"item-1", "item-2", "item-3", "item-4", "item-5"} partitioner = zk.SetPartitioner( path="/partitioner/work", set=all_items, identifier="worker-1" ) ``` -------------------------------- ### Non-Blocking Lease Example Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates obtaining a non-blocking lease for exclusive, time-limited access to a resource. If the lease cannot be obtained immediately, it indicates another process holds it. ```python from kazoo.client import KazooClient from kazoo.recipe.lease import NonBlockingLease, MultiNonBlockingLease import datetime import time zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # NON-BLOCKING LEASE - Time-limited exclusive access lease = zk.NonBlockingLease( path="/leases/my-resource", duration=datetime.timedelta(minutes=5), identifier="worker-1" ) # Check if lease was obtained if lease: print("Got the lease, doing work...") # Do work within lease duration time.sleep(2) print("Work complete") else: print("Could not obtain lease, another process has it") # MULTI NON-BLOCKING LEASE - Obtain multiple leases leases = zk.MultiNonBlockingLease( paths=["/leases/resource-a", "/leases/resource-b"], duration=datetime.timedelta(minutes=5), identifier="multi-worker" ) if leases: print("Got all leases") # Work with multiple resources else: print("Could not obtain all leases") # Use case: Scheduled task that should only run once across cluster def run_scheduled_task(): lease = zk.NonBlockingLease( path="/scheduled/daily-report", duration=datetime.timedelta(hours=1), identifier="scheduler-node-1" ) if lease: print("Running daily report task...") # Generate report print("Daily report complete") return True else: print("Another node is running the report") return False run_scheduled_task() zk.stop() ``` -------------------------------- ### One-Time Watches using Kazoo Client Methods Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates how to set a one-time watch on a ZNode using the `get`, `get_children`, or `exists` methods. The provided callback function is executed only once when the specified event occurs. ```python from kazoo.client import KazooClient def my_one_time_watch(event): """Called once when node changes.""" print(f"One-time watch triggered: {event.type}") zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Set one-time watch data, stat = zk.get("/watched/data", watch=my_one_time_watch) children = zk.get_children("/watched/children", watch=my_one_time_watch) stat = zk.exists("/watched/data", watch=my_one_time_watch) zk.stop() ``` -------------------------------- ### Get Zookeeper Node Data and Stat Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Retrieve the data and stat information (including version) for a Zookeeper node. ```python # Print the version of a node and its data data, stat = zk.get("/my/favorite") print("Version: %s, data: %s" % (stat.version, data.decode("utf-8"))) ``` -------------------------------- ### KazooRetry - Interruptible Retry Source: https://context7.com/python-zk/kazoo/llms.txt Shows how to configure a retry mechanism that can be interrupted, for example, by a shutdown signal. Uses an 'interrupt' callback. ```python def check_shutdown(): """Return True to interrupt retry loop.""" return False # Replace with actual shutdown check interruptible_retry = KazooRetry( max_tries=-1, # Infinite retries interrupt=check_shutdown ) ``` -------------------------------- ### Kazoo Transaction Rollback Example Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates a transaction that is expected to fail due to a version mismatch in a `check` operation. This demonstrates that all operations within the transaction are rolled back if any single operation fails. ```python from kazoo.client import KazooClient from kazoo.exceptions import RolledBackError zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Setup test data zk.ensure_path("/txn") zk.create("/txn/node-a", b"value-a") zk.create("/txn/node-b", b"value-b") # Transaction that fails (all operations rolled back) zk.create("/txn/check-node", b"data") stat = zk.set("/txn/check-node", b"v1") # version is now 1 try: txn = zk.transaction() txn.create("/txn/will-not-exist", b"data") txn.check("/txn/check-node", version=0) # Wrong version - will fail! txn.set_data("/txn/node-a", b"will-not-update") results = txn.commit() except Exception as e: print(f"Transaction failed: {e}") # /txn/will-not-exist was NOT created # /txn/node-a was NOT updated ``` -------------------------------- ### Perform ZooKeeper CRUD Operations with Kazoo Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates how to create, read, update, and delete ZooKeeper nodes using the KazooClient. Covers ensuring paths, creating nodes with data, ephemeral and sequential nodes, checking existence, getting data and children, and conditional updates. ```python from kazoo.client import KazooClient from kazoo.exceptions import NoNodeError, NodeExistsError, BadVersionError zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # CREATE NODES # Ensure a path exists (creates all parent nodes if needed) zk.ensure_path("/my/favorite/path") # Create a node with data zk.create("/my/favorite/path/node", b"initial data") # Create with makepath=True (creates parents automatically) zk.create("/deep/nested/path/node", b"data", makepath=True) # Create ephemeral node (deleted when session ends) zk.create("/my/ephemeral", b"temp data", ephemeral=True) # Create sequential node (gets unique suffix) path = zk.create("/my/sequence/item-", b"seq data", sequence=True) print(f"Created sequential node: {path}") # e.g., /my/sequence/item-0000000001 # Create with both options and get stat path, stat = zk.create( "/my/path/node", b"data", ephemeral=True, sequence=True, include_data=True ) # READ NODES # Check if node exists stat = zk.exists("/my/favorite/path") if stat: print(f"Node exists, version: {stat.version}") # Get node data and stats data, stat = zk.get("/my/favorite/path/node") print(f"Data: {data.decode('utf-8')}") print(f"Version: {stat.version}, Created: {stat.created}") # Get children of a node children = zk.get_children("/my/favorite/path") print(f"Children: {children}") # Get children with stat children, stat = zk.get_children("/my/favorite/path", include_data=True) # UPDATE NODES # Set data on a node stat = zk.set("/my/favorite/path/node", b"updated data") print(f"New version: {stat.version}") # Conditional update with version check (optimistic locking) data, stat = zk.get("/my/favorite/path/node") try: zk.set("/my/favorite/path/node", b"new data", version=stat.version) except BadVersionError: print("Node was modified by another process!") # DELETE NODES # Delete a single node zk.delete("/my/favorite/path/node") ``` -------------------------------- ### DataWatch for Monitoring Node Data Changes Source: https://context7.com/python-zk/kazoo/llms.txt Sets up a DataWatch to monitor changes to a specific ZNode's data. The watcher function is called immediately upon setup and then again whenever the data changes. It can be stopped by returning False. ```python from kazoo.client import KazooClient from kazoo.recipe.watchers import DataWatch zk = KazooClient(hosts='127.0.0.1:2181') zk.start() zk.ensure_path("/watched/data") zk.set("/watched/data", b"initial") @zk.DataWatch("/watched/data") def watch_data(data, stat): """Called immediately and on every data change.""" if data is None: print("Node does not exist (yet)") return print(f"Data changed: {data.decode('utf-8')}, version: {stat.version}") # Return False to stop watching # return False ``` ```python @zk.DataWatch("/watched/data") def watch_data_with_event(data, stat, event): """Receives event info (None on first call).""" if event: print(f"Event type: {event.type}, path: {event.path}") if data: print(f"Current data: {data.decode('utf-8')}") ``` ```python # Trigger the watches by modifying data zk.set("/watched/data", b"new value") zk.stop() ``` -------------------------------- ### KazooRetry - Retry Custom Function Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates retrying a custom Python function that performs ZooKeeper operations, including getting data, modifying it, and setting it back with versioning. ```python def my_operation(): """Custom operation that might fail.""" data, stat = zk.get("/my/node") new_value = int(data) + 1 zk.set("/my/node", str(new_value).encode(), version=stat.version) return new_value # Setup test node zk.ensure_path("/my") zk.create("/my/node", b"0") try: result = custom_retry(my_operation) print(f"Operation succeeded: {result}") except RetryFailedError: print("Operation failed after all retries") ``` -------------------------------- ### Build Kazoo Documentation Locally Source: https://github.com/python-zk/kazoo/blob/master/CONTRIBUTING.md Build the project's documentation locally using 'make html'. Open the generated index.html file in a web browser to preview the documentation. ```bash make html ``` -------------------------------- ### Get Children of a Zookeeper Node Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md List all immediate children of a specified Zookeeper node. ```python # List the children children = zk.get_children("/my/favorite") print("There are %s children with names %s" % (len(children), children)) ``` -------------------------------- ### Async Get Children Source: https://context7.com/python-zk/kazoo/llms.txt Performs an asynchronous operation to retrieve the children of a specified ZooKeeper node and prints the list of children. ```python async_result = zk.get_children_async("/async") children = async_result.get() print(f"Children: {children}") ``` -------------------------------- ### Wait for Partition Acquisition Source: https://context7.com/python-zk/kazoo/llms.txt This code demonstrates how to wait for a partition to be acquired using Kazoo's SetPartitioner. It includes logic for handling partitioner failures, releases, and successful acquisitions. ```python while True: if partitioner.failed: print("Partitioner failed, recreating...") partitioner = zk.SetPartitioner( path="/partitioner/work", set=all_items ) elif partitioner.release: print("Partition released, waiting to reacquire...") partitioner.wait_for_acquire() elif partitioner.acquired: print(f"My assigned items: {list(partitioner)}") # Process assigned items for item in partitioner: print(f"Processing: {item}") break elif partitioner.allocating: print("Waiting for allocation...") partitioner.wait_for_acquire() # Release partition when done partitioner.finish() zk.stop() ``` -------------------------------- ### Kazoo Asynchronous Create Operation Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates initiating an asynchronous create operation and then retrieving the result. This allows other work to be performed while the Zookeeper operation is in progress. ```python from kazoo.client import KazooClient from kazoo.handlers.gevent import SequentialGeventHandler from kazoo.exceptions import ConnectionLossException, NoNodeError import sys # Standard threading handler (default) zk = KazooClient(hosts='127.0.0.1:2181') # Gevent handler (for gevent applications) # zk = KazooClient(handler=SequentialGeventHandler()) # Eventlet handler (for eventlet applications) # from kazoo.handlers.eventlet import SequentialEventletHandler # zk = KazooClient(handler=SequentialEventletHandler()) # Async connection event = zk.start_async() event.wait(timeout=30) if not zk.connected: zk.stop() print("Failed to connect") sys.exit(1) # Setup test data zk.ensure_path("/async") # ASYNC CREATE async_result = zk.create_async("/async/node", b"async data") # Do other work while operation is pending... path = async_result.get() # Block until complete print(f"Created: {path}") ``` -------------------------------- ### Create Kazoo Client with Gevent Handler Source: https://github.com/python-zk/kazoo/blob/master/docs/async_usage.md Instantiate KazooClient using SequentialGeventHandler for asynchronous operations with gevent. The start_async() method returns immediately. ```python from kazoo.client import KazooClient from kazoo.handlers.gevent import SequentialGeventHandler zk = KazooClient(handler=SequentialGeventHandler()) # returns immediately event = zk.start_async() # Wait for 30 seconds and see if we're connected event.wait(timeout=30) if not zk.connected: # Not connected, stop trying to connect zk.stop() raise Exception("Unable to connect.") ``` -------------------------------- ### Run All Kazoo Tests Source: https://github.com/python-zk/kazoo/blob/master/CONTRIBUTING.md Execute the entire test suite for the Kazoo project using the 'make test' command. This ensures all code changes pass automated checks. ```bash make test ``` -------------------------------- ### Party - Track Group Membership Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates using the Party recipe to track group membership. Includes joining, checking membership, iterating through members, and leaving the party. ```python from kazoo.client import KazooClient from kazoo.recipe.party import Party, ShallowParty from kazoo.recipe.partitioner import SetPartitioner import time zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # PARTY - Track group membership party = zk.Party("/parties/mygroup", identifier="member-1") # Join the party party.join() print(f"Joined party, current members: {list(party)}") # Check membership if "member-1" in party: print("We are in the party") # Iterate members for member in party: print(f"Party member: {member}") # Leave the party party.leave() ``` -------------------------------- ### Initialize Kazoo Client in Read-Only Mode Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Configure the Kazoo client to connect to Zookeeper servers in read-only mode. Ensure Zookeeper servers are configured for read-only access. ```python from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181', read_only=True) zk.start() ``` -------------------------------- ### KazooClient - Configuration with Retry Policies Source: https://context7.com/python-zk/kazoo/llms.txt Configures a KazooClient instance with specific retry policies for both connection attempts and general commands. ```python zk_with_retry = KazooClient( hosts='127.0.0.1:2181', connection_retry=dict( max_tries=-1, delay=0.5, backoff=2, max_delay=120 ), command_retry=dict( max_tries=3, delay=0.1, backoff=2 ) ) ``` -------------------------------- ### Party - Shallow Party Source: https://context7.com/python-zk/kazoo/llms.txt Shows how to use ShallowParty for simpler group membership tracking where data is stored in the node name itself. ```python # SHALLOW PARTY - Simpler membership (stores data in node name) shallow = zk.ShallowParty("/parties/shallow", identifier="shallow-member") shallow.join() print(f"Shallow party members: {list(shallow)}") shallow.leave() ``` -------------------------------- ### Handle Asynchronous Callbacks in Kazoo Source: https://github.com/python-zk/kazoo/blob/master/docs/async_usage.md Define a callback function to process results from asynchronous Kazoo operations. The callback receives an IAsyncResult object and should call its get() method to retrieve the data, handling potential exceptions like ConnectionLossException or NoAuthException. ```python import sys from kazoo.exceptions import ConnectionLossException from kazoo.exceptions import NoAuthException def my_callback(async_obj): try: children = async_obj.get() do_something(children) except (ConnectionLossException, NoAuthException): sys.exit(1) # Both these statements return immediately, the second sets a callback # that will be run when get_children_async has its return value async_obj = zk.get_children_async("/some/node") async_obj.rawlink(my_callback) ``` -------------------------------- ### Kazoo Transaction Context Manager Source: https://context7.com/python-zk/kazoo/llms.txt Shows how to use a transaction as a context manager. Operations within the `with` block are automatically committed upon successful exit of the block. ```python from kazoo.client import KazooClient from kazoo.exceptions import RolledBackError zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Setup test data zk.ensure_path("/txn") zk.create("/txn/node-a", b"value-a") zk.create("/txn/node-b", b"value-b") # Context manager usage with zk.transaction() as txn: txn.create("/txn/ctx-node", b"context value") txn.set_data("/txn/node-a", b"ctx-updated") # Automatically committed on exit ``` -------------------------------- ### KazooRetry - Custom Retry Policy Source: https://context7.com/python-zk/kazoo/llms.txt Illustrates configuring and using a custom retry policy with specific parameters like max tries, delay, backoff, and deadline. ```python custom_retry = KazooRetry( max_tries=5, # Maximum retry attempts (-1 for infinite) delay=0.1, # Initial delay between retries (seconds) backoff=2, # Backoff multiplier (exponential) max_delay=60.0, # Maximum delay cap max_jitter=0.4, # Random jitter (0.0-1.0) to prevent thundering herd ignore_expire=True, # Retry on session expiration deadline=30.0 # Total time limit for all retries ) # Use custom retry try: result = custom_retry(zk.get, "/some/path") except RetryFailedError: print("All retry attempts failed") ``` -------------------------------- ### Create Zookeeper Node with Data Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Create a Zookeeper node with specified data. The parent path must exist unless makepath is enabled. ```python # Create a node with data zk.create("/my/favorite/node", b"a value") ``` -------------------------------- ### Leader Election with Kazoo Source: https://context7.com/python-zk/kazoo/llms.txt Implement leader election to ensure only one process runs a specific function at a time. Use `election.run()` for blocking behavior or a thread for non-blocking participation. `election.contenders()` shows current participants. ```python from kazoo.client import KazooClient from kazoo.recipe.election import Election import time import threading zk = KazooClient(hosts='127.0.0.1:2181') zk.start() def leader_function(name): """Function to run when elected leader.""" print(f"{name} is now the leader!") # Simulate leader work for i in range(5): print(f"{name} doing leader work iteration {i}") time.sleep(1) print(f"{name} finished leader work") # Create election instance election = zk.Election("/elections/myservice", "worker-1") # Check current contenders contenders = election.contenders() print(f"Election contenders: {contenders}") # Run election in a thread for non-blocking behavior def participate(): election.run(leader_function, "worker-1") thread = threading.Thread(target=participate) thread.start() # Cancel participation (does not interrupt if already leader) time.sleep(2) election.cancel() zk.stop() ``` ```python # Process 1 election = zk.Election("/elections/myservice", "process-1") election.run(my_leader_func, "process-1") # Process 2 election = zk.Election("/elections/myservice", "process-2") election.run(my_leader_func, "process-2") # Only one will be leader at a time ``` -------------------------------- ### Recursive and Conditional Deletes in Kazoo Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates how to recursively delete a ZNode and its children, and how to perform a conditional delete with version checking to prevent race conditions. ```python zk.delete("/my/favorite", recursive=True) ``` ```python try: zk.delete("/my/path/node", version=2) except BadVersionError: print("Version mismatch - node was modified") ``` ```python zk.stop() ``` -------------------------------- ### Semaphore for Limiting Concurrent Access Source: https://context7.com/python-zk/kazoo/llms.txt Utilizes Kazoo's `Semaphore` recipe to limit the number of concurrent clients or processes that can access a resource. Shows how to acquire and release leases, and check current holders. ```python from kazoo.client import KazooClient from kazoo.recipe.lock import Semaphore zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # SEMAPHORE - Limited concurrent access semaphore = zk.Semaphore("/locks/semaphore", "client-1", max_leases=3) with semaphore: print("Holding one of 3 available leases") # Check current lease holders holders = semaphore.lease_holders() print(f"Current lease holders: {holders}") zk.stop() ``` -------------------------------- ### KazooRetry - Built-in Retry Source: https://context7.com/python-zk/kazoo/llms.txt Shows how to use the built-in retry mechanism provided by KazooClient for operations that might fail transiently. ```python from kazoo.client import KazooClient from kazoo.retry import KazooRetry, RetryFailedError from kazoo.exceptions import ConnectionLoss, SessionExpiredError import time zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Use built-in retry method result = zk.retry(zk.get, "/some/path") ``` -------------------------------- ### Queue Class Source: https://github.com/python-zk/kazoo/blob/master/docs/api/recipe/queue.md The Queue class provides a basic distributed queue implementation. ```APIDOC ## Queue Class ### Description Provides a basic distributed queue implementation. ### Version Added 0.6 ``` -------------------------------- ### Simple and Locking Queues with Kazoo Source: https://context7.com/python-zk/kazoo/llms.txt Utilize Kazoo's Queue for basic FIFO operations and LockingQueue for reliable, transactional task consumption. Items are byte strings. `LockingQueue.get()` locks an item, `consume()` removes it, and `release()` unlocks it. ```python from kazoo.client import KazooClient from kazoo.recipe.queue import Queue, LockingQueue zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # SIMPLE QUEUE queue = zk.Queue("/queues/simple") # Add items to queue queue.put(b"task-1") queue.put(b"task-2") # Add with priority (lower = higher priority, 0-999) queue.put(b"urgent-task", priority=10) queue.put(b"normal-task", priority=100) queue.put(b"low-priority", priority=500) # Get queue length print(f"Queue length: {len(queue)}") # Get and remove item (returns highest priority first) item = queue.get() if item: print(f"Processing: {item.decode('utf-8')}") else: print("Queue is empty") # LOCKING QUEUE (Reliable consumption with ZK 3.4+) locking_queue = zk.LockingQueue("/queues/locking") # Add items locking_queue.put(b"reliable-task-1") locking_queue.put(b"reliable-task-2", priority=50) # Add multiple items atomically locking_queue.put_all([b"batch-1", b"batch-2", b"batch-3"], priority=100) # Get item (locks it, doesn't remove yet) item = locking_queue.get(timeout=5.0) if item: print(f"Got item: {item.decode('utf-8')}") # Check if we still hold the lock (important after reconnection) if locking_queue.holds_lock(): # Process the item print("Processing item...") # Consume (permanently remove) after successful processing locking_queue.consume() else: print("Lost lock, item will be reprocessed") else: print("No item available within timeout") # Get item without consuming - release lock if processing fails item = locking_queue.get(timeout=5.0) if item: try: process_item(item) locking_queue.consume() # Success - remove from queue except Exception as e: locking_queue.release() # Failure - release lock, item stays in queue print(f"Processing failed, item released: {e}") def process_item(item): """Simulate item processing.""" print(f"Processing: {item.decode('utf-8')}") zk.stop() ``` -------------------------------- ### Kazoo API Modules Source: https://github.com/python-zk/kazoo/blob/master/docs/api.md This section lists the available modules within the kazoo library for API reference. ```APIDOC ## Kazoo API Modules This documentation is organized alphabetically by module name. * [`kazoo.client`](api/client.md) * [`kazoo.exceptions`](api/exceptions.md) * [`kazoo.handlers.gevent`](api/handlers/gevent.md) * [`kazoo.handlers.threading`](api/handlers/threading.md) * [`kazoo.handlers.utils`](api/handlers/utils.md) * [`kazoo.interfaces`](api/interfaces.md) * [`kazoo.protocol.states`](api/protocol/states.md) * [`kazoo.recipe.barrier`](api/recipe/barrier.md) * [`kazoo.recipe.cache`](api/recipe/cache.md) * [`kazoo.recipe.counter`](api/recipe/counter.md) * [`kazoo.recipe.election`](api/recipe/election.md) * [`kazoo.recipe.lease`](api/recipe/lease.md) * [`kazoo.recipe.lock`](api/recipe/lock.md) * [`kazoo.recipe.partitioner`](api/recipe/partitioner.md) * [`kazoo.recipe.party`](api/recipe/party.md) * [`kazoo.recipe.queue`](api/recipe/queue.md) * [`kazoo.recipe.watchers`](api/recipe/watchers.md) * [`kazoo.retry`](api/retry.md) * [`kazoo.security`](api/security.md) * [`kazoo.testing.harness`](api/testing.md) ``` -------------------------------- ### Kazoo Transaction Commit Source: https://context7.com/python-zk/kazoo/llms.txt Demonstrates creating and committing a transaction with multiple operations. All queued operations will succeed or fail atomically. Requires `kazoo.exceptions.RolledBackError` for handling failures. ```python from kazoo.client import KazooClient from kazoo.exceptions import RolledBackError zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # Setup test data zk.ensure_path("/txn") zk.create("/txn/node-a", b"value-a") zk.create("/txn/node-b", b"value-b") # Create a transaction transaction = zk.transaction() # Queue operations transaction.create("/txn/new-node", b"new value") transaction.set_data("/txn/node-a", b"updated-a") transaction.check("/txn/node-b", version=0) # Fail if version != 0 transaction.delete("/txn/node-b") # Commit transaction - all operations execute atomically results = transaction.commit() print(f"Transaction results: {results}") ``` -------------------------------- ### Exclusive Lock Acquisition and Release Source: https://context7.com/python-zk/kazoo/llms.txt Shows how to acquire and release an exclusive distributed lock using Kazoo. It covers blocking, non-blocking, and timeout-based acquisition, as well as using the lock as a context manager for automatic release. ```python from kazoo.client import KazooClient from kazoo.recipe.lock import Lock from kazoo.exceptions import LockTimeout zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # EXCLUSIVE LOCK lock = zk.Lock("/locks/mylock", "process-identifier") # Blocking acquire (waits forever by default) lock.acquire() try: print("Lock acquired - doing exclusive work") # Critical section finally: lock.release() ``` ```python with zk.Lock("/locks/mylock", "my-identifier"): print("Have lock, doing work") ``` ```python # Non-blocking acquire if lock.acquire(blocking=False): try: print("Got the lock immediately") finally: lock.release() else: print("Could not acquire lock") ``` ```python # Acquire with timeout try: lock.acquire(blocking=True, timeout=10.0) print("Lock acquired within timeout") lock.release() except LockTimeout: print("Failed to acquire lock within 10 seconds") ``` ```python # Check current contenders contenders = lock.contenders() print(f"Current lock contenders: {contenders}") zk.stop() ``` -------------------------------- ### Ensure Zookeeper Path Exists Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Recursively create a Zookeeper path if it does not exist. This method only sets ACLs and cannot set node data. ```python # Ensure a path, create if necessary zk.ensure_path("/my/favorite") ``` -------------------------------- ### Run Individual Kazoo Tests with Pytest Source: https://github.com/python-zk/kazoo/blob/master/CONTRIBUTING.md Run specific tests using pytest by setting the ZOOKEEPER_PATH environment variable and specifying the test module, class, or method. This is useful for targeted testing. ```bash export ZOOKEEPER_PATH=//bin/zookeeper/ bin/pytest -v kazoo/tests/test_client.py::TestClient::test_create ``` -------------------------------- ### Async Set Source: https://context7.com/python-zk/kazoo/llms.txt Updates the data of an existing ZooKeeper node asynchronously and prints the new version of the node after the update. ```python async_result = zk.set_async("/async/node", b"updated async") stat = async_result.get() print(f"Updated, new version: {stat.version}") ``` -------------------------------- ### ChildrenWatch for Monitoring ZNode Children Changes Source: https://context7.com/python-zk/kazoo/llms.txt Configures a ChildrenWatch to track changes in the list of children for a given ZNode. The watcher is invoked initially and whenever the children set is modified. It can be disabled by returning False. ```python from kazoo.client import KazooClient from kazoo.recipe.watchers import ChildrenWatch zk = KazooClient(hosts='127.0.0.1:2181') zk.start() zk.ensure_path("/watched/children") @zk.ChildrenWatch("/watched/children") def watch_children(children): """Called immediately and when children list changes.""" print(f"Current children: {children}") # Return False to stop watching # return False ``` ```python @zk.ChildrenWatch("/watched/children", send_event=True) def watch_children_with_event(children, event): """Receives event info on changes.""" if event: print(f"Children event: {event.type}") print(f"Children: {children}") ``` ```python zk.create("/watched/children/child1", b"data") zk.stop() ``` -------------------------------- ### Kazoo Client State Listener for Lock Safety Source: https://context7.com/python-zk/kazoo/llms.txt Adds a listener to the Kazoo client to monitor connection states. This is crucial for lock safety, as it allows handling session expiration (`LOST`) or temporary connection loss (`SUSPENDED`) to ensure locks are not held indefinitely or operations are paused appropriately. ```python from kazoo.client import KazooClient, KazooState zk = KazooClient(hosts='127.0.0.1:2181') zk.start() # State listener for lock safety def state_listener(state): if state == KazooState.LOST: # Session expired - locks are automatically released! print("WARNING: Session lost, lock no longer held") elif state == KazooState.SUSPENDED: # Connection lost temporarily - pause critical operations print("Connection suspended, pausing work") zk.add_listener(state_listener) zk.stop() ``` -------------------------------- ### Execute Atomic Transaction Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Sends multiple commands to be committed as a single atomic unit. Either all commands succeed or all fail. The result is a list of success/failure outcomes. ```python transaction = zk.transaction() transaction.check('/node/a', version=3) transaction.create('/node/b', b"a value") results = transaction.commit() ``` -------------------------------- ### Custom Kazoo Retry Policy Source: https://github.com/python-zk/kazoo/blob/master/docs/basic_usage.md Manually create a KazooRetry instance with specific retry policies, such as limiting retries or ignoring session expirations. ```python from kazoo.retry import KazooRetry kr = KazooRetry(max_tries=3, ignore_expire=False) result = kr(client.get, "/some/path") ``` -------------------------------- ### IHandler Interface Source: https://github.com/python-zk/kazoo/blob/master/docs/api/interfaces.md The IHandler interface is used to customize callback handling within KazooClient. Implementations should be passed during KazooClient instantiation. ```APIDOC ## IHandler Interface ### Description Implementations of `IHandler` are passed into `KazooClient` during instantiation to manage preferred callback handling. ### Usage Developers should create objects implementing `IHandler` and pass them to the `KazooClient` constructor. ### Methods - `async_result()`: Must be used to obtain `IAsyncResult` objects if needed, instead of direct instantiation. ```