### Full Asynchronous Example with Tornado or Asyncio Source: https://github.com/dask/distributed/blob/main/docs/source/asynchronous.rst A self-contained example demonstrating starting an asynchronous client, submitting a job, awaiting the result, and closing the client. This can be run using either Tornado's IOLoop or Python's asyncio. ```python from dask.distributed import Client async def f(): client = await Client(asynchronous=True) future = client.submit(lambda x: x + 1, 10) result = await future await client.close() return result # Either use Tornado from tornado.ioloop import IOLoop IOLoop().run_sync(f) # Or use asyncio import asyncio asyncio.get_event_loop().run_until_complete(f()) ``` -------------------------------- ### Install dask distributed Source: https://github.com/dask/distributed/blob/main/docs/source/quickstart.rst Install the dask and distributed libraries. Use this command to get started with dask.distributed. ```bash $ python -m pip install dask distributed --upgrade ``` -------------------------------- ### Install dask.distributed from Source Source: https://github.com/dask/distributed/blob/main/docs/source/install.rst Clone the dask.distributed repository from GitHub and install it from source using pip. ```bash git clone https://github.com/dask/distributed.git cd distributed python -m pip install . ``` -------------------------------- ### Setup Local Dask Cluster Source: https://github.com/dask/distributed/blob/main/docs/source/quickstart.rst Create a Client instance without arguments to automatically start a local scheduler and worker. This is the easiest way to set up dask.distributed for local testing. ```python >>> from dask.distributed import Client >>> client = Client() # set up local cluster on your laptop >>> client ``` -------------------------------- ### Start Dask Scheduler Source: https://github.com/dask/distributed/blob/main/docs/source/quickstart.rst Manually start a dask scheduler process. This is the first step in setting up a distributed cluster on multiple machines. ```bash $ dask scheduler Scheduler started at 127.0.0.1:8786 ``` -------------------------------- ### Registering a Custom UDP Backend Source: https://github.com/dask/distributed/blob/main/docs/source/communications.rst Example of how to register a custom UDP backend for Dask communications using setuptools entry points in setup.py. ```python setup(name="dask_udp", entry_points={ "distributed.comm.backends": [ "udp=dask_udp.backend:UDPBackend", ] }, ... ) ``` -------------------------------- ### Install dask.distributed with Pip Source: https://github.com/dask/distributed/blob/main/docs/source/install.rst Install or upgrade dask.distributed using pip. This command ensures you have the latest version. ```bash python -m pip install dask distributed --upgrade ``` -------------------------------- ### Install and use jemalloc on Linux Source: https://github.com/dask/distributed/blob/main/docs/source/worker-memory.rst Installs jemalloc via conda and preloads it for Dask workers on Linux. This can help manage memory more effectively. ```bash conda install jemalloc LD_PRELOAD=$CONDA_PREFIX/lib/libjemalloc.so dask worker <...> ``` -------------------------------- ### Start an Asynchronous Dask Client Source: https://github.com/dask/distributed/blob/main/docs/source/asynchronous.rst Initialize a Dask client for use within an asynchronous context like `async/await` functions. ```python async def f(): client = await Client(asynchronous=True) ``` -------------------------------- ### Security Object Configuration Source: https://github.com/dask/distributed/blob/main/docs/source/tls.rst Example of how to configure a Security object for client use with TLS/SSL enabled. ```APIDOC ## Security Object Configuration ### Description This example demonstrates how to instantiate and configure a `Security` object in Python to enable TLS/SSL for Dask distributed clients. It specifies the Certificate Authority file, client certificate, client private key, and enforces encryption. ### Method Python API ### Parameters - `tls_ca_file` (string) - Path to the Certificate Authority certificate file. - `tls_client_cert` (string) - Path to the client's certificate file. - `tls_client_key` (string) - Path to the client's private key file. - `require_encryption` (boolean) - If True, enforces encrypted communication. ### Request Example ```python from distributed import Client from distributed.security import Security sec = Security(tls_ca_file='cluster_ca.pem', tls_client_cert='cli_cert.pem', tls_client_key='cli_key.pem', require_encryption=True) client = Client(..., security=sec) ``` ### Response This code snippet configures the security settings for a Dask client. The success is indicated by the client successfully connecting to the Dask cluster with the specified security parameters. ``` -------------------------------- ### Create and Listen with a Custom Server Source: https://github.com/dask/distributed/blob/main/docs/source/foundations.rst Defines handlers for 'add' and 'stream_data' operations and starts a server listening on TCP port 8888. Requires asyncio and distributed.core. ```python import asyncio from distributed.core import Server def add(comm, x=None, y=None): # simple handler, just a function return x + y async def stream_data(comm, interval=1): # complex handler, multiple responses data = 0 while True: await asyncio.sleep(interval) data += 1 await comm.write(data) s = Server({'add': add, 'stream_data': stream_data}) s.listen('tcp://:8888') # listen on TCP port 8888 asyncio.get_event_loop().run_forever() ``` -------------------------------- ### Example of nested spans Source: https://github.com/dask/distributed/blob/main/docs/source/spans.rst This example demonstrates how to use nested `span` context managers to create a detailed breakdown of a workflow, from overall process to specific stages like data loading and model training. ```APIDOC ## Nested Spans Example ### Description This example shows how to nest `span` context managers to create a hierarchical tagging of tasks within a Dask workflow. ### Code ```python import dask.config import dask.array as da from distributed import Client, span # Read important note below dask.config.set({"optimization.fuse.active": False}) client = Client() with span("Alice's workflow"): with span("data load"): a = da.read_zarr(...) # Replace with actual data loading with span("ML preprocessing"): a = preprocess(a) # Replace with actual preprocessing with span("Model training"): model = train(a) # Replace with actual model training model = model.compute() ``` ### Spans Created - ``("Alice's workflow", )`` - ``("Alice's workflow", "data load")`` - ``("Alice's workflow", "ML preprocessing")`` - ``("Alice's workflow", "Model training")`` ``` -------------------------------- ### Using Queue for Distributed Communication Source: https://context7.com/dask/distributed/llms.txt Demonstrates the Dask Queue for passing futures and data between producers and consumers. Covers putting, getting, batch gets, and a producer/consumer pattern. ```python from distributed import Client, Queue with Client() as client: q = Queue("work-queue", maxsize=100) # Put values (primitives or Futures) q.put(42) q.put("hello") future = client.submit(lambda: 99) q.put(future) print(q.qsize()) # 3 print(q.get()) # 42 print(q.get()) # 'hello' print(q.get()) # or resolved 99 # Batch get — returns all currently queued items for x in range(5): q.put(x * 10) batch = q.get(batch=True) print(batch) # [0, 10, 20, 30, 40] # Producer / consumer pattern across tasks def producer(q, n): for i in range(n): q.put(i) q.put(None) # sentinel def consumer(q): results = [] while True: item = q.get() if item is None: break results.append(item * 2) return results work_q = Queue("pipeline") f_prod = client.submit(producer, work_q, 5) f_cons = client.submit(consumer, work_q) print(f_cons.result()) # [0, 2, 4, 6, 8] ``` -------------------------------- ### Install dask.distributed with Conda Source: https://github.com/dask/distributed/blob/main/docs/source/install.rst Use this command to install the latest version of dask.distributed from the conda-forge repository. ```bash conda install dask distributed -c conda-forge ``` -------------------------------- ### Create and Activate Conda Environment Source: https://github.com/dask/distributed/blob/main/docs/source/develop.rst Create a conda environment from a YAML file and activate it. Then install the Dask Distributed package in editable mode. ```bash conda env create --file continuous_integration/environment-3.11.yaml conda activate dask-distributed python -m pip install -e . ``` -------------------------------- ### Install and use jemalloc globally on macOS Source: https://github.com/dask/distributed/blob/main/docs/source/worker-memory.rst Installs jemalloc using Homebrew and inserts it globally for Dask workers on macOS. This can help manage memory more effectively. ```bash brew install jemalloc DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib dask worker <...> ``` -------------------------------- ### Launching Workers with Multiple Processes Source: https://github.com/dask/distributed/blob/main/docs/source/worker.rst Example of launching multiple worker processes, each with a single thread, suitable for GIL-bound Python computations. ```bash $ dask worker scheduler:8786 --nworkers 8 --nthreads 1 ``` -------------------------------- ### Example Custom AMM Policy Source: https://github.com/dask/distributed/blob/main/docs/source/active_memory_manager.rst This example demonstrates a custom policy that ensures specific keys ('foo' and 'bar') are always replicated across all workers. It handles cases where keys are not in memory or already fully replicated. ```python from distributed.active_memory_manager import ActiveMemoryManagerPolicy, Suggestion from distributed.scheduler import Scheduler from distributed.worker import WorkerState from distributed.task import TaskState class ReplicateFooBarPolicy(ActiveMemoryManagerPolicy): def __init__(self, config=None): # Load parameters from Dask config if needed pass def run(self): """ Ensures keys 'foo' and 'bar' are replicated on all workers. """ target_keys = {"foo", "bar"} scheduler: Scheduler = self.manager.scheduler for key in target_keys: ts = scheduler.tasks.get(key) if ts is None: # Task not in memory, do nothing continue # Find workers that have the task workers_with_task = {w for w, _ in ts.who_has} # Find workers that don't have the task workers_without_task = set(scheduler.workers.values()) - workers_with_task # Replicate on workers that don't have it, if any exist for worker in workers_without_task: yield Suggestion("replicate", ts, {worker}) # Optionally, you could add logic here to drop replicas if needed, # but this policy focuses on ensuring replication. # Example of accessing other AMM attributes: # print(f"Current worker memory: {self.manager.workers_memory}") # print(f"Pending task updates: {self.manager.pending}") ``` -------------------------------- ### Configure AMM Policies in distributed.yaml Source: https://github.com/dask/distributed/blob/main/docs/source/active_memory_manager.rst Example configuration for the Active Memory Manager in the distributed.yaml file. This shows how to enable the AMM, set its interval, and define custom policies with their respective class paths and parameters. ```yaml distributed: scheduler: active-memory-manager: start: true interval: 2s policies: - class: mymodule.EnsureBroadcast key: foo - class: mymodule.EnsureBroadcast key: bar ``` -------------------------------- ### Dask: Favorable Task Stealing Example Source: https://github.com/dask/distributed/blob/main/docs/source/work-stealing.rst This example illustrates a situation where stealing tasks is beneficial. It involves tasks with expensive computation (100 seconds) and minimal data dependencies, making the computation-to-communication ratio favorable for stealing. ```python [data] = client.scatter([100]) x = client.submit(sleep, data) ``` -------------------------------- ### Create and Use BankAccount Actor Source: https://context7.com/dask/distributed/llms.txt Demonstrates creating a stateful actor from a Python class, calling its methods, and passing actors to tasks. Ensure the `distributed` library is installed. ```python from distributed import Client class BankAccount: def __init__(self, balance=0): self.balance = balance def deposit(self, amount): self.balance += amount return self.balance def withdraw(self, amount): if amount > self.balance: raise ValueError("Insufficient funds") self.balance -= amount return self.balance def get_balance(self): return self.balance with Client() as client: # Create actor: submit the class with actor=True future = client.submit(BankAccount, balance=100, actor=True) account = future.result() # Actor proxy print(account) # # Call methods — each returns an ActorFuture print(account.deposit(50).result()) # 150 print(account.withdraw(30).result()) # 120 print(account.get_balance().result()) # 120 # Actors can be passed to regular tasks def transfer(src, dst, amount): src.withdraw(amount).result() dst.deposit(amount).result() future2 = client.submit(BankAccount, balance=0, actor=True) account2 = future2.result() client.submit(transfer, account, account2, 40).result() print(account.get_balance().result()) # 80 print(account2.get_balance().result()) # 40 ``` -------------------------------- ### Connect to Dask Scheduler and Manage Datasets Source: https://context7.com/dask/distributed/llms.txt Connect to a Dask scheduler to list, get, and unpublish datasets. Ensure the scheduler address is correct. ```python with Client("tcp://same-scheduler:8786") as consumer: print(consumer.list_datasets()) # ['my_array'] arr = consumer.get_dataset("my_array") # get the collection result = consumer.compute(arr.sum()) print(result.result()) consumer.unpublish_dataset("my_array") print(consumer.list_datasets()) # [] ``` -------------------------------- ### Configure Security Object for TLS Client Source: https://github.com/dask/distributed/blob/main/docs/source/tls.rst Example of creating a Security object to configure TLS settings for a Dask client. Ensure certificate files are in PEM format. ```python from distributed import Client from distributed.security import Security sec = Security(tls_ca_file='cluster_ca.pem', tls_client_cert='cli_cert.pem', tls_client_key='cli_key.pem', require_encryption=True) client = Client(..., security=sec) ``` -------------------------------- ### Retrieve Task Transition Logs Source: https://github.com/dask/distributed/blob/main/docs/source/logging.rst Use the `Scheduler.story` method to get the transition logs for a specific task key. This helps in understanding task computation progress. ```python >>> f = client.submit(inc, 123) >>> f >>> s.story(f.key) [('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'released', 'waiting', {'inc-aad7bbea25dc61c8e53d929c7ec50bed': 'processing'}, 1605143345.7283862), ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'waiting', 'processing', {}, 1605143345.7284858), ('inc-aad7bbea25dc61c8e53d929c7ec50bed', 'processing', 'memory', {}, 1605143345.731495)] ``` -------------------------------- ### Submit tasks with Client Source: https://github.com/dask/distributed/blob/main/docs/source/task-launch.rst Submit tasks to the Dask cluster using the Client object. This example demonstrates a recursive Fibonacci calculation submitted to the cluster. ```python from distributed import Client, get_client, secede, rejoin def fib(n): if n < 2: return n client = get_client() a_future = client.submit(fib, n - 1) b_future = client.submit(fib, n - 2) a, b = client.gather([a_future, b_future]) return a + b if __name__ == "__main__": client = Client() future = client.submit(fib, 10) result = future.result() print(result) # prints "55" ``` -------------------------------- ### Use Distributed Lock with Context Manager Source: https://context7.com/dask/distributed/llms.txt Shows how to use a distributed lock for mutual exclusion using the preferred context manager interface. Ensure the `distributed` library is installed. ```python from distributed import Client, Lock with Client() as client: lock = Lock("my-resource") # Context manager (preferred) with lock: print("exclusive access") ``` -------------------------------- ### Connect Client to Existing Scheduler Source: https://github.com/dask/distributed/blob/main/docs/source/quickstart.rst Create a Client instance by providing the address of a running scheduler. Use this when connecting to a pre-existing cluster. ```python >>> from dask.distributed import Client >>> client = Client('127.0.0.1:8786') ``` -------------------------------- ### Connect to a Dask Cluster with Client Source: https://context7.com/dask/distributed/llms.txt Demonstrates various ways to instantiate a Client to connect to a Dask cluster, including auto-creating a LocalCluster, explicitly defining a LocalCluster, connecting to a remote scheduler, and using an asynchronous context manager. ```python from distributed import Client, LocalCluster # ── Option 1: auto-create a LocalCluster ────────────────────────────────────── with Client() as client: print(client) # ``` ```python cluster = LocalCluster(n_workers=2, threads_per_worker=2, memory_limit="2GiB") client = Client(cluster) print(client.scheduler_info()["workers"].keys()) client.close() cluster.close() ``` ```python client = Client("tcp://scheduler-host:8786", timeout="30s") ``` ```python import asyncio async def main(): async with Client(asynchronous=True) as client: future = client.submit(lambda x: x ** 2, 7) result = await future print(result) # 49 asyncio.run(main()) ``` -------------------------------- ### Install and use jemalloc on macOS Source: https://github.com/dask/distributed/blob/main/docs/source/worker-memory.rst Installs jemalloc via conda and inserts it for Dask workers on macOS. This can help manage memory more effectively. ```bash conda install jemalloc DYLD_INSERT_LIBRARIES=$CONDA_PREFIX/lib/libjemalloc.dylib dask worker <...> ``` -------------------------------- ### Initialize Dask Client Source: https://github.com/dask/distributed/blob/main/docs/source/examples/word-count.rst Connect to the dask distributed scheduler. Provide the IP address and port of your scheduler. ```python client = Client('SCHEDULER_IP:SCHEDULER_PORT') ``` -------------------------------- ### Start Dask Worker Source: https://github.com/dask/distributed/blob/main/docs/source/quickstart.rst Manually start dask worker processes and connect them to a running scheduler. At least one worker must be running for computations to be processed. ```bash $ dask worker 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 ``` -------------------------------- ### Initialize HDFS Connection Source: https://github.com/dask/distributed/blob/main/docs/source/examples/word-count.rst Establish a connection to the HDFS namenode. Replace NAMENODE_HOSTNAME and NAMENODE_PORT with your cluster's details. ```python hdfs = hdfs3.HDFileSystem('NAMENODE_HOSTNAME', port=NAMENODE_PORT) ``` -------------------------------- ### Submit Initial Task Source: https://github.com/dask/distributed/blob/main/docs/source/task-launch.rst Submit an initial task to download and convert data into a list. This is the first step in a workflow where the size of the subsequent data processing is unknown. ```python future = client.submit(download_and_convert_to_list, uri) ``` -------------------------------- ### Initialize a Dask Client Source: https://github.com/dask/distributed/blob/main/docs/source/client.rst Initialize a Client by pointing it to the address of a Scheduler. This is the primary entry point for interacting with a Dask cluster. ```python >>> from distributed import Client >>> client = Client('127.0.0.1:8786') ``` -------------------------------- ### Get a Published Dataset Source: https://github.com/dask/distributed/blob/main/docs/source/publish.rst Retrieve a published dataset from the cluster by its name. ```APIDOC ## get_dataset ### Description Retrieve a published dataset from the cluster by its name. ### Method `client.get_dataset(name)` ### Parameters - **name** (str) - The name of the dataset to retrieve. ### Response #### Success Response (200) - **dataset** (dask.dataframe.DataFrame or list[dask.distributed.Future]) - The retrieved Dask collection or list of futures. ### Response Example ```python df = client.get_dataset('negative_accounts') ``` ``` -------------------------------- ### Worker Client Functions Source: https://github.com/dask/distributed/blob/main/docs/source/api.rst Functions to get a client connection from within a worker process. ```APIDOC ## Worker Client Functions ### Description Functions to obtain a client object from within a worker, allowing workers to submit tasks or communicate with the scheduler. - **`distributed.worker_client()`**: Returns a client connected to the scheduler. - **`distributed.get_worker()`**: Returns the current worker object. - **`distributed.get_client()`**: Returns the client associated with the current context. ### Example Usage ```python from distributed import worker_client, get_worker def my_task(): worker = get_worker() client = worker_client() print(f"Running on worker: {worker.id}") # Use client to submit another task or get info # future = client.submit(another_task) return worker.id # Submit my_task to the cluster # client.submit(my_task) ``` ``` -------------------------------- ### Get Workers API Source: https://github.com/dask/distributed/blob/main/docs/source/http_services.rst This endpoint retrieves a list of all currently connected workers on the scheduler. ```APIDOC ## GET /api/v1/get_workers ### Description Get all workers currently connected to the scheduler. ### Method GET ### Endpoint /api/v1/get_workers ``` -------------------------------- ### Get Future Result Source: https://github.com/dask/distributed/blob/main/docs/source/manage-computation.rst Retrieve the concrete value of an individual Future. This operation blocks until the result is available. ```python >>> future.result() 1 ``` -------------------------------- ### Task Transition Methods Source: https://github.com/dask/distributed/blob/main/docs/source/scheduling-state.rst These methods handle specific transitions between task states. They update scheduler state and can recommend subsequent transitions. ```python def transition_released_waiting(self, key, stimulus_id): ... def transition_processing_memory(self, key, stimulus_id): ... def transition_processing_erred(self, key, stimulus_id): ... ``` -------------------------------- ### Retire Workers API Request Source: https://github.com/dask/distributed/blob/main/docs/source/http_services.rst Example JSON payload for the /api/v1/retire_workers endpoint to specify which workers to retire. ```json { "workers":["tcp://127.0.0.1:53741", "tcp://127.0.0.1:53669"] } ``` -------------------------------- ### Client - Connect to a Dask cluster Source: https://context7.com/dask/distributed/llms.txt The Client class is the primary entry point for interacting with a Dask cluster. It can automatically start a LocalCluster or connect to an existing scheduler. Various connection methods are demonstrated, including using context managers for asynchronous operations. ```APIDOC ## Client — Connect to a Dask cluster `Client` is the main entry point. Instantiating it without arguments starts a `LocalCluster` in the background. Passing a scheduler address (string) or an existing cluster object connects to that cluster. All keyword arguments that are not recognized by `Client` itself are forwarded to `LocalCluster`. ```python from distributed import Client, LocalCluster # ── Option 1: auto-create a LocalCluster ────────────────────────────────────── with Client() as client: print(client) # # ── Option 2: explicit LocalCluster ─────────────────────────────────────────── cluster = LocalCluster(n_workers=2, threads_per_worker=2, memory_limit="2GiB") client = Client(cluster) print(client.scheduler_info()["workers"].keys()) client.close() cluster.close() # ── Option 3: connect to a remote scheduler ─────────────────────────────────── client = Client("tcp://scheduler-host:8786", timeout="30s") # ── Option 4: async context manager ─────────────────────────────────────────── import asyncio async def main(): async with Client(asynchronous=True) as client: future = client.submit(lambda x: x ** 2, 7) result = await future print(result) # 49 asyncio.run(main()) ``` ``` -------------------------------- ### Interact with Server via Basic Communication Source: https://github.com/dask/distributed/blob/main/docs/source/foundations.rst Connects to a server at tcp://127.0.0.1:8888, sends an 'add' operation, reads the result, and prints it. Also demonstrates streaming data. Requires asyncio and distributed.core. ```python import asyncio from distributed.core import connect async def f(): comm = await connect('tcp://127.0.0.1:8888') await comm.write({'op': 'add', 'x': 1, 'y': 2}) result = await comm.read() await comm.close() print(result) >>> asyncio.get_event_loop().run_until_complete(f()) 3 async def g(): comm = await connect('tcp://127.0.0.1:8888') await comm.write({'op': 'stream_data', 'interval': 1}) while True: result = await comm.read() print(result) >>> asyncio.get_event_loop().run_until_complete(g()) 1 2 3 ... ``` -------------------------------- ### Custom Human Serialization Source: https://github.com/dask/distributed/blob/main/docs/source/serialization.rst Example of registering custom serialization and deserialization functions for a `Human` class using Dask's protocol. ```APIDOC ## Custom Human Serialization ### Description This example demonstrates how to define and register custom serialization and deserialization logic for a `Human` class. It uses the `@dask_serialize.register` and `@dask_deserialize.register` decorators to associate the serialization functions with the `Human` type. ### Code ```python from typing import Tuple, Dict, List from distributed.protocol import dask_serialize, dask_deserialize class Human: def __init__(self, name): self.name = name @dask_serialize.register(Human) def serialize(human: Human) -> Tuple[Dict, List[bytes]]: """Serializes a Human object into a header and frames.""" header = {} frames = [human.name.encode()] return header, frames @dask_deserialize.register(Human) def deserialize(header: Dict, frames: List[bytes]) -> Human: """Deserializes a Human object from a header and frames.""" return Human(frames[0].decode()) ``` ``` -------------------------------- ### Recursive Fibonacci Function Source: https://github.com/dask/distributed/blob/main/docs/source/task-launch.rst A standard recursive implementation of the Fibonacci sequence. This is used as a motivating example for demonstrating task submission from within tasks. ```python def fib(n): if n < 2: return n a = fib(n - 1) b = fib(n - 2) return a + b print(fib(10)) # prints "55" ``` -------------------------------- ### Manual Acquire and Release of Distributed Semaphore Source: https://context7.com/dask/distributed/llms.txt Demonstrates manual acquisition and release of a distributed semaphore, including timeout functionality. Use `acquire(timeout=...)` for timed attempts. ```python from distributed import Client, Semaphore with Client() as client: sem = Semaphore(max_leases=3, name="db-connections") # Manual acquire with timeout acquired = sem.acquire(timeout="10s") if acquired: try: print("acquired lease") finally: sem.release() ``` -------------------------------- ### Dask Distributed Message Examples Source: https://github.com/dask/distributed/blob/main/docs/source/protocol.rst Illustrates the dictionary-based message formats used for commands, status updates, and data transfer within dask.distributed. ```python { 'op': 'compute', 'function': ... 'args': ['x'] } ``` ```python { 'op': 'task-complete', 'key': 'y', 'nbytes': 26 } ``` ```python { 'op': 'register-worker', 'address': '192.168.1.42', 'name': 'alice', 'nthreads': 4 } ``` ```python { 'x': b'...', 'y': b'...' } ``` -------------------------------- ### Asynchronous Test with @gen_cluster Source: https://github.com/dask/distributed/blob/main/docs/source/develop.rst Example of an asynchronous test using the @gen_cluster decorator. This style requires using the coroutine API for all blocking functions. ```python # tests/test_submit.py from distributed.utils_test import gen_cluster, inc from distributed import Client, Future, Scheduler, Worker @gen_cluster(client=True) async def test_submit(c, s, a, b): assert isinstance(c, Client) assert isinstance(s, Scheduler) assert isinstance(a, Worker) assert isinstance(b, Worker) future = c.submit(inc, 1) assert isinstance(future, Future) assert future.key in c.futures # result = future.result() # This synchronous API call would block result = await future assert result == 2 assert future.key in s.tasks assert future.key in a.data or future.key in b.data ``` -------------------------------- ### Access Datasets via Dictionary Interface Source: https://github.com/dask/distributed/blob/main/docs/source/publish.rst Use the `.datasets` mapping on the client for a dictionary-like interface to publish, list, get, and delete global datasets. ```APIDOC ## Client.datasets mapping ### Description Provides a dictionary-like interface to manage published datasets. ### Usage - **Publish**: `client.datasets['dataset_name'] = dask_collection` - **List**: `list(client.datasets)` - **Get**: `dask_collection = client.datasets['dataset_name']` - **Delete**: `del client.datasets['dataset_name']` ### Example ```python client.datasets['my_data'] = df print(list(client.datasets)) df_retrieved = client.datasets['my_data'] del client.datasets['my_data'] ``` ``` -------------------------------- ### Batching Input for Larger Tasks Source: https://github.com/dask/distributed/blob/main/docs/source/efficiency.rst Illustrates the difference between mapping a function over a large sequence, creating many futures, and batching the input into larger chunks for more efficient processing. ```python >>> futures = client.map(f, seq) >>> len(futures) # avoid large numbers of futures 1000000000 ``` ```python >>> def f_many(chunk): ... return [f(x) for x in chunk] >>> from tlz import partition_all >>> chunks = partition_all(1000000, seq) # Collect into groups of size 1000 >>> futures = client.map(f_many, chunks) >>> len(futures) # Compute on larger pieces of your data at once 1000 ``` -------------------------------- ### Extend PATH for Macports Python Source: https://github.com/dask/distributed/blob/main/docs/source/install.rst For Macports users, extend the PATH environment variable to include the directory where Python binaries are installed. This is a workaround for a known issue. ```bash export PATH=/opt/local/Library/Frameworks/Python.framework/Versions/3.7/bin:$PATH ``` -------------------------------- ### Connect to Dask Client and Load Data Source: https://github.com/dask/distributed/blob/main/docs/source/publish.rst Connect to a Dask scheduler and load data into a dask.dataframe. The data is then persisted on the cluster. ```python from dask.distributed import Client client = Client('scheduler-address:8786') import dask.dataframe as dd df = dd.read_csv('s3://my-bucket/*.csv') df2 = df[df.balance < 0] df2 = client.persist(df2) >>> df2.head() name balance 0 Alice -100 1 Bob -200 2 Charlie -300 3 Dennis -400 4 Edith -500 ```