### Install Dependencies from Example Directory Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Installs dependencies when your current directory is `examples/opentelemetry`. It installs the YDB SDK from the parent directory. ```sh pip install -e '../..[opentelemetry]' -r requirements.txt ``` -------------------------------- ### Run Full Stack from Example Directory Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Starts the full stack when your current directory is the `examples/opentelemetry` folder. This command also builds the `otel-example` image. ```sh cd /path/to/ydb-python-sdk/examples/opentelemetry docker compose -f compose-e2e.yaml up --build ``` -------------------------------- ### Run End-to-End Tracing Example with Docker Compose Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/opentelemetry.md This command builds the `otel-example` Docker image and starts the full stack, including YDB, OTLP, Tempo, and Grafana, for end-to-end tracing visualization. ```sh docker compose -f examples/opentelemetry/compose-e2e.yaml up --build ``` -------------------------------- ### Run the OpenTelemetry Example Script Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Executes the main OpenTelemetry example script. Ensure YDB is running and dependencies are installed before running. ```sh python examples/opentelemetry/otel_example.py ``` -------------------------------- ### Typical Local Run for Tracing Example Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/opentelemetry.md This sequence sets up YDB in Docker, installs the SDK with OpenTelemetry support, and runs the example script. Access Grafana at http://localhost:3000 to view traces. ```sh docker compose up -d pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt python examples/opentelemetry/otel_example.py ``` -------------------------------- ### Run Example on Self-Hosted YDB Cluster Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/README.md Configures and runs an example on a self-hosted YDB cluster using authentication token. Obtain cluster credentials and token beforehand. ```bash # Change the directory for the particular example, e.g. basic_example_v1 EXAMPLE=basic_example_v1 cd $EXAMPLE # Obtain the YDB authentication token export YDB_ACCESS_TOKEN_CREDENTIALS=`ydb -p ydb1 auth get-token -f` # Set the database path and its endpoint export YDB_ENDPOINT=grpcs://cluster-endpoint.domain.com:2136 export YDB_DATABASE=/Root/testdb # Run the script python3 __main__.py -e $YDB_ENDPOINT -d $YDB_DATABASE -p python-examples/$EXAMPLE ``` -------------------------------- ### Run Example on Local YDB Instance Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/README.md Configures and runs an example on a local YDB instance using anonymous credentials. Ensure a local YDB cluster is running and accessible. ```bash # Change the directory for the particular example, e.g. basic_example_v1 EXAMPLE=basic_example_v1 cd $EXAMPLE # Enable the anonymous authentication mode export YDB_ANONYMOUS_CREDENTIALS=1 # Set the database path and its endpoint export YDB_ENDPOINT=grpc://localhost:2136 export YDB_DATABASE=/local # Run the script python3 __main__.py -e $YDB_ENDPOINT -d $YDB_DATABASE -p python-examples/$EXAMPLE ``` -------------------------------- ### Synchronous Full Example Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md A complete synchronous example demonstrating driver initialization, session pool usage, schema creation, data insertion within a transaction, and data reading. ```python import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) with ydb.QuerySessionPool(driver) as pool: # Schema pool.execute_with_retries("DROP TABLE IF EXISTS users") pool.execute_with_retries( "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))" ) # Insert in a transaction def insert(tx: ydb.QueryTxContext): with tx.execute( "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')", commit_tx=True, ): pass pool.retry_tx_sync(insert) # Read result_sets = pool.execute_with_retries("SELECT id, name FROM users") for row in result_sets[0].rows: print(row["id"], row["name"]) ``` -------------------------------- ### Asynchronous Full Example Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md A complete asynchronous example demonstrating async driver initialization, session pool usage, schema creation, data insertion within a transaction, and data reading. ```python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) async with ydb.aio.QuerySessionPool(driver) as pool: # Schema await pool.execute_with_retries("DROP TABLE IF EXISTS users") await pool.execute_with_retries( "CREATE TABLE users (id Uint64, name Utf8, PRIMARY KEY (id))" ) # Insert in a transaction async def insert(tx: ydb.aio.QueryTxContext): async with await tx.execute( "INSERT INTO users (id, name) VALUES (1, 'Alice'), (2, 'Bob')", commit_tx=True, ): pass await pool.retry_tx_async(insert) # Read result_sets = await pool.execute_with_retries( "SELECT id, name FROM users" ) for row in result_sets[0].rows: print(row["id"], row["name"]) asyncio.run(main()) ``` -------------------------------- ### Run Example with Managed YDB in Yandex Cloud Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/README.md Configures and runs an example using a managed YDB instance in Yandex Cloud with service account key authentication. Ensure the service account key file, endpoint, and database path are correctly set. ```bash # Change the directory for the particular example, e.g. basic_example_v1 EXAMPLE=basic_example_v1 cd $EXAMPLE # Set the path to the service account key file export YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=$HOME/key-ydb-sa-0.json # Set the database path and its endpoint export YDB_ENDPOINT=grpcs://ydb.serverless.yandexcloud.net:2135 export YDB_DATABASE=/ru-central1/b1gfvslmokutuvt2g019/etnvbffeqegu1ub2rg2o # Run the script python3 __main__.py -e $YDB_ENDPOINT -d $YDB_DATABASE -p python-examples/$EXAMPLE ``` -------------------------------- ### Start YDB with Docker Compose Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Use this command to start a minimal YDB instance using Docker Compose. Ensure YDB is healthy before proceeding. ```sh cd /path/to/ydb-python-sdk docker compose up -d ``` -------------------------------- ### Install Python Dependencies Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/README.md Installs required Python packages for YDB examples, including iso8601, ydb, and yandexcloud. ```bash python3 -m pip install iso8601 python3 -m pip install ydb python3 -m pip install yandexcloud ``` -------------------------------- ### Create Table with Covered Index Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Example of creating a table with a covered index defined from the start. ```python driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("email", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)), ) .with_primary_keys("id") .with_indexes( ydb.TableIndex("name_idx") .with_index_columns("name") .with_data_columns("email"), ), ) ``` -------------------------------- ### Install OTLP/gRPC Exporter Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/opentelemetry.md Install the OpenTelemetry OTLP/gRPC exporter, which is compatible with backends like Jaeger and Tempo. ```sh # OTLP/gRPC exporter (works with Jaeger, Tempo, and others) pip install opentelemetry-exporter-otlp-proto-grpc ``` -------------------------------- ### Start Infrastructure with Docker Compose Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/tests/slo/README.md Starts the SLO workload infrastructure using Docker Compose. Ensure you are in the 'ydb-slo-action' repository root. This command starts the services in detached mode and builds the necessary images. ```bash docker compose -f deploy/compose.yml --profile telemetry up -d --build ``` -------------------------------- ### Install YDB with OpenTelemetry Extra Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/opentelemetry.md Install the YDB SDK with the `opentelemetry` extra to include necessary OpenTelemetry packages. ```sh pip install ydb[opentelemetry] ``` -------------------------------- ### Install YDB Python SDK from Sources Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/quickstart.md Install the YDB Python SDK directly from its source code repository. This is useful for development or when using the latest unreleased changes. ```bash pip install -e . ``` -------------------------------- ### Install YDB Python SDK Source: https://context7.com/ydb-platform/ydb-python-sdk/llms.txt Install the SDK using pip. Additional packages can be installed for Yandex Cloud IAM or OpenTelemetry support. ```sh pip install ydb # Yandex Cloud IAM credentials support pip install "ydb[yc]" # OpenTelemetry tracing support pip install "ydb[opentelemetry]" ``` -------------------------------- ### Setup Virtual Environment Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/quickstart.md If you cannot upgrade pip due to system restrictions, set up a virtual environment to manage Python packages independently. Activate the environment before proceeding. ```bash python -m pip install virtualenv virtualenv venv source venv/bin/activate python -m pip install --upgrade pip ``` -------------------------------- ### Install Dependencies from Repository Root Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Installs the necessary Python dependencies, including the YDB SDK in editable mode and OpenTelemetry extras, from the repository root. ```sh python3 -m venv .venv source .venv/bin/activate # Windows: .venv\Scripts\activate pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt ``` -------------------------------- ### Install YDB[yc] Extra Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Install the `ydb[yc]` extra for ServiceAccountCredentials and MetadataUrlCredentials. This command installs necessary dependencies for Yandex Cloud integration. ```sh pip install ydb[yc] ``` -------------------------------- ### Idempotent Directory Setup and Table Creation Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/scheme.md Sets up a directory structure and creates a table if it doesn't exist, tolerating existing directories. Uses `ydb.SchemeError` for error handling during directory creation and `CREATE TABLE IF NOT EXISTS` for tables. ```python import ydb def setup_schema(driver: ydb.Driver, base_path: str) -> None: for subdir in ("tables", "topics", "coordination"): path = f"{base_path}/{subdir}" try: driver.scheme_client.make_directory(path) except ydb.SchemeError: pass # already exists with ydb.QuerySessionPool(driver) as pool: pool.execute_with_retries( f"CREATE TABLE IF NOT EXISTS `{base_path}/tables/events` " "(id Uint64, payload Utf8, PRIMARY KEY (id))" ) ``` -------------------------------- ### Async Table Operations Example Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Demonstrates asynchronous table operations including creating, bulk upserting, describing, and dropping tables using the async driver. Ensure the driver is properly initialized and awaited. ```python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) # Schema operations await driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id"), ) # Bulk upsert column_types = ( ydb.BulkUpsertColumns() .add_column("id", ydb.PrimitiveType.Uint64) .add_column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)) ) rows = [...] # Assume rows are defined here await driver.table_client.bulk_upsert("/local/users", rows, column_types) # Describe entry = await driver.table_client.describe_table("/local/users") for col in entry.columns: print(col.name, col.type) await driver.table_client.drop_table("/local/users") asyncio.run(main()) ``` -------------------------------- ### Getting a Topic Client Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Demonstrates how to obtain a topic client instance from a YDB driver, both synchronously and asynchronously. ```APIDOC ## Getting a Topic Client The topic client is available on every driver instance via the `topic_client` property. **Synchronous:** ```python import ydb with ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client ``` **Asynchronous:** ```python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: await driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client ``` ``` -------------------------------- ### Async Scheme Operations Example Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/scheme.md Demonstrates asynchronous creation, listing, description, and removal of directories using the YDB Python SDK's async client. Requires asyncio and ydb libraries. ```python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local" ) as driver: await driver.wait(timeout=5, fail_fast=True) await driver.scheme_client.make_directory("/local/my_app") listing = await driver.scheme_client.list_directory("/local") for entry in listing.children: print(entry.name, entry.type) entry = await driver.scheme_client.describe_path("/local/my_app") print(entry.is_directory()) await driver.scheme_client.remove_directory("/local/my_app") asyncio.run(main()) ``` -------------------------------- ### Start YDB with Chaos Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/AGENTS.md Initiate YDB with chaos enabled for topic chaos testing. This command starts YDB nodes, simulating failures by killing a DB node every ~20 seconds. ```sh docker compose -f tests/slo/playground/configs/compose.yaml up -d ``` -------------------------------- ### Create and Activate Virtual Environment Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/README.md If you cannot upgrade pip due to system-owned installation, create and activate a virtual environment to manage Python packages. ```sh $ python -m pip install virtualenv $ virtualenv venv $ source venv/bin/activate $ python -m pip install --upgrade pip ``` -------------------------------- ### Synchronous Write and Read Topic Messages Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md This example demonstrates synchronous writing and reading of messages to a YDB topic. It includes topic creation, message writing, and message reading with commit. Ensure the topic and consumer exist or handle their creation. ```python import ydb endpoint = "grpc://localhost:2136" database = "/local" topic_path = "/local/my-topic" consumer = "my-consumer" with ydb.Driver(endpoint=endpoint, database=database) as driver: driver.wait(timeout=5, fail_fast=True) # Create topic (skip if already exists) try: driver.topic_client.drop_topic(topic_path) except ydb.SchemeError: pass driver.topic_client.create_topic(topic_path, consumers=[consumer]) # Write messages with driver.topic_client.writer(topic_path) as writer: for i in range(10): writer.write(f"message-{i}") # Read messages with driver.topic_client.reader(topic_path, consumer=consumer) as reader: for _ in range(10): try: message = reader.receive_message(timeout=5) print(message.data.decode()) reader.commit(message) except TimeoutError: break ``` -------------------------------- ### Create Table with Partitioning Settings Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Demonstrates how to create a table with specified partitioning settings, controlling the number and size of partitions. ```APIDOC ## Create Table with Partitioning Settings ### Description Creates a table with custom partitioning configurations, including minimum and maximum partition counts, target partition size, and auto-splitting based on size or load. ### Method `driver.table_client.create_table(path, table_description)` ### Parameters #### Path Parameters - **path** (string) - Required - The path where the table will be created. #### Request Body - **table_description** (ydb.TableDescription) - Required - An object describing the table schema and settings. - `with_partitioning_settings(ydb.PartitioningSettings)` - Configures automatic partitioning. - `with_min_partitions_count(n)` (int) - Sets the minimum number of shards. - `with_max_partitions_count(n)` (int) - Sets the maximum number of shards. - `with_partition_size_mb(mb)` (int) - Sets the target shard size in MB. - `with_partitioning_by_load(flag)` (ydb.FeatureFlag) - Enables or disables partitioning by load. - `with_partitioning_by_size(flag)` (ydb.FeatureFlag) - Enables or disables partitioning by size. ### Request Example ```python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id") .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) ``` ``` -------------------------------- ### Set up Development Environment Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/AGENTS.md Activate the virtual environment before running any project commands. This ensures all dependencies are correctly managed. ```sh python3 -m venv .venv source .venv/bin/activate pip install -e . tox ``` -------------------------------- ### Manage Table Schema with TTL and Indexes Source: https://context7.com/ydb-platform/ydb-python-sdk/llms.txt Demonstrates creating a table with TTL, a secondary index, and auto-partitioning using `ydb.TableDescription`. It also shows how to alter the table by adding a column and modifying partitioning, and finally dropping the table. ```python import ydb from collections import namedtuple import datetime with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # Create a table with TTL, secondary index, and auto-partitioning driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)), ydb.Column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)), ) .with_primary_keys("id") .with_indexes( ydb.TableIndex("user_idx").with_index_columns("user_id") ) .with_ttl( ydb.TtlSettings().with_date_type_column( "created_at", expire_after_seconds=30 * 24 * 3600, # 30-day retention ) ) .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) # Alter: add column, change partitioning driver.table_client.alter_table( "/local/events", add_columns=[ydb.Column("metadata", ydb.OptionalType(ydb.PrimitiveType.Json))], ) # Describe entry = driver.table_client.describe_table("/local/events") for col in entry.columns: print(col.name, col.type) driver.table_client.drop_table("/local/events") ``` -------------------------------- ### Initialize Driver and Access Service Clients Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Demonstrates initializing the YDB driver, waiting for it to be ready, and then accessing different service clients. Ensure the driver is properly configured with endpoint and database. The `QuerySessionPool` is created separately. ```python with ydb.Driver(endpoint=..., database=...) as driver: driver.wait(timeout=5, fail_fast=True) with ydb.QuerySessionPool(driver) as pool: # Query service pass topics = driver.topic_client # Topic service scheme = driver.scheme_client # Schema operations table = driver.table_client # Table service ``` -------------------------------- ### Create a Coordination Node with Default Configuration Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/coordination.md Use this to create a coordination node with default settings. Ensure the driver is initialized. ```python # Minimal creation — default config driver.coordination_client.create_node("/local/my_node") ``` -------------------------------- ### Install YDB Python SDK Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/README.md Install the YDB Python SDK using pip. Ensure you have Python 3.8 or higher and pip version 9.0.1 or higher. ```sh $ python -m pip install ydb ``` -------------------------------- ### Create a Topic Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Shows how to create a new topic with specified configurations, including partition counts, retention period, and initial consumers. ```APIDOC ## Create a Topic ```python # Synchronous driver.topic_client.create_topic( "/local/my-topic", min_active_partitions=1, max_active_partitions=10, retention_period=datetime.timedelta(hours=24), consumers=["my-consumer"], ) # Asynchronous await driver.topic_client.create_topic( "/local/my-topic", min_active_partitions=1, max_active_partitions=10, retention_period=datetime.timedelta(hours=24), consumers=["my-consumer"], ) ``` Key parameters for `create_topic`: * `path` — full topic path including database prefix. * `min_active_partitions` — minimum number of active partitions (default: 1). * `max_active_partitions` — maximum number of partitions when auto-scaling is enabled. * `retention_period` — how long messages are kept (`datetime.timedelta`). * `retention_storage_mb` — maximum storage size per partition in megabytes. * `supported_codecs` — list of [`TopicCodec`](apireference.md#ydb.TopicCodec) values the topic accepts (default: RAW and GZIP). * `consumers` — list of consumer names (strings) or [`TopicConsumer`](apireference.md#ydb.TopicConsumer) objects to create upfront. * `partition_write_speed_bytes_per_second` — per-partition write throughput limit. ``` -------------------------------- ### Run Integration Tests Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/AGENTS.md Execute integration tests which require Docker. Pytest-docker will start Docker automatically, so do not start it manually. Ensure the virtual environment is activated. ```sh source .venv/bin/activate && tox -e py -- tests -v ``` -------------------------------- ### Initialize Driver with Connection String Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Initialize the driver using a connection string that includes the protocol, host, port, and database path. This is a concise way to specify connection details. ```python driver = ydb.Driver( connection_string="grpc://localhost:2136/?database=/local", ) ``` -------------------------------- ### Create and Manage Topics with YDB Python SDK Source: https://context7.com/ydb-platform/ydb-python-sdk/llms.txt Demonstrates creating a topic with auto-scaling partitions, altering topic settings like retention period and consumers, describing topic configuration, and dropping a topic. ```python import datetime import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # Create topic with auto-scaling partitions driver.topic_client.create_topic( "/local/events-topic", min_active_partitions=2, max_active_partitions=50, retention_period=datetime.timedelta(hours=24), consumers=["analytics-consumer", "audit-consumer"], auto_partitioning_settings=ydb.TopicAutoPartitioningSettings( strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP_AND_DOWN, up_utilization_percent=70, down_utilization_percent=20, stabilization_window=datetime.timedelta(seconds=60), ), ) # Alter — add a consumer or change retention driver.topic_client.alter_topic( "/local/events-topic", add_consumers=["new-consumer"], set_retention_period=datetime.timedelta(hours=48), ) # Describe desc = driver.topic_client.describe_topic("/local/events-topic", include_stats=True) print(desc.partitions, desc.consumers) driver.topic_client.drop_topic("/local/events-topic") ``` -------------------------------- ### Run Full Stack with Docker Compose Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/opentelemetry/README.md Launches the full stack including YDB, OTLP collector, Tempo, and Grafana. The `otel-example` service is built and run within Compose. ```sh cd /path/to/ydb-python-sdk docker compose -f examples/opentelemetry/compose-e2e.yaml up --build ``` -------------------------------- ### Configure Table Partitioning Settings Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Use PartitioningSettings to define minimum/maximum partitions, target partition size, and enable auto-splitting by size or load. Pass these settings to TableDescription. ```python ydb.PartitioningSettings() \ .with_min_partitions_count(4) \ .with_max_partitions_count(256) \ .with_partition_size_mb(512) \ .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) \ .with_partitioning_by_size(ydb.FeatureFlag.ENABLED) ``` ```python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id") .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) ``` -------------------------------- ### Import YDB Package Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/quickstart.md Import the YDB package to start using its functionalities in your Python application. ```python import ydb ``` -------------------------------- ### Topic Creation Options Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/tests/slo/README.md Options for creating a topic, including topic path, consumer name, partition counts, and retention hours. ```bash Arguments: endpoint YDB endpoint to connect to db YDB database to connect to Options: --topic-path topic path to create --topic-consumer consumer name --topic-min-partitions minimum active partitions --topic-max-partitions maximum active partitions --topic-retention-hours retention period in hours ``` -------------------------------- ### Anonymous Credentials for YDB Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Use AnonymousCredentials for local or unauthenticated YDB deployments. No setup is required. ```python credentials = ydb.AnonymousCredentials() ``` -------------------------------- ### Create Table with Explicit Partitions Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Shows how to create a table pre-split at specific key ranges. ```APIDOC ## Create Table with Explicit Partitions ### Description Creates a table and pre-splits it at explicitly defined key points, which is useful when the key distribution is known in advance. ### Method `driver.table_client.create_table(path, table_description)` ### Parameters #### Path Parameters - **path** (string) - Required - The path where the table will be created. #### Request Body - **table_description** (ydb.TableDescription) - Required - An object describing the table schema and settings. - `with_partition_at_keys(ydb.ExplicitPartitions)` - Defines explicit split points for the table. - `ydb.ExplicitPartitions(split_points)` - A list of `ydb.SplitPoint` objects. - `ydb.SplitPoint(key)` - Defines a key at which to split the partition. ### Request Example ```python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ) .with_primary_keys("id") .with_partition_at_keys( ydb.ExplicitPartitions( [ydb.SplitPoint(1000), ydb.SplitPoint(2000), ydb.SplitPoint(3000)] ) ), ) ``` ``` -------------------------------- ### Initialize Driver with DriverConfig Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Use DriverConfig to specify connection details like endpoint, database, and credentials. This is one of the ways to initialize the driver. ```python import ydb config = ydb.DriverConfig( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) driver = ydb.Driver(config) ``` -------------------------------- ### Reading Messages with TopicReader Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Shows how to create and use a TopicReader to receive and commit messages from a topic, with both synchronous and asynchronous examples. ```APIDOC ## Reading Messages ### Create a Reader A reader requires a `consumer` name that must already exist on the topic. #### Synchronous Reader ```python with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader: message = reader.receive_message() print(message.data.decode()) reader.commit(message) ``` #### Asynchronous Reader ```python async with driver.topic_client.reader("/local/my-topic", consumer="my-consumer") as reader: message = await reader.receive_message() print(message.data.decode()) reader.commit(message) ``` ``` -------------------------------- ### Create Table with Columns and Primary Keys Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Use `create_table` with `TableDescription` to define columns and primary keys. Primary key columns cannot be optional. ```python driver.table_client.create_table( "/local/users", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("age", ydb.OptionalType(ydb.PrimitiveType.Uint32)), ) .with_primary_keys("id"), ) ``` -------------------------------- ### Create Table with Date-based TTL Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md Example of creating a table where rows automatically expire 7 days after their creation timestamp. ```python driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("data", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)), ) .with_primary_keys("id") .with_ttl( ydb.TtlSettings().with_date_type_column( "created_at", expire_after_seconds=7 * 24 * 3600, ) ), ) ``` -------------------------------- ### Create Table with TTL, Index, Partitioning, Bulk Upsert, and Read Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/table.md This snippet shows how to create a table with Time-To-Live (TTL) settings, a secondary index, and dynamic partitioning. It then performs a bulk upsert operation to load data and streams the data back using read_table. Finally, it cleans up by dropping the table. Ensure the YDB driver is properly configured and connected. ```python import ydb from collections import namedtuple Event = namedtuple("Event", ["id", "user_id", "kind", "created_at"]) with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # Create table driver.table_client.create_table( "/local/events", ydb.TableDescription() .with_columns( ydb.Column("id", ydb.PrimitiveType.Uint64), ydb.Column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)), ydb.Column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)), ydb.Column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)), ) .with_primary_keys("id") .with_indexes( ydb.TableIndex("user_idx").with_index_columns("user_id") ) .with_ttl( ydb.TtlSettings().with_date_type_column( "created_at", expire_after_seconds=30 * 24 * 3600, ) ) .with_partitioning_settings( ydb.PartitioningSettings() .with_min_partitions_count(4) .with_max_partitions_count(64) .with_partitioning_by_load(ydb.FeatureFlag.ENABLED) ), ) # Bulk load import datetime now = datetime.datetime.now(tz=datetime.timezone.utc) column_types = ( ydb.BulkUpsertColumns() .add_column("id", ydb.PrimitiveType.Uint64) .add_column("user_id", ydb.OptionalType(ydb.PrimitiveType.Uint64)) .add_column("kind", ydb.OptionalType(ydb.PrimitiveType.Utf8)) .add_column("created_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp)) ) rows = [Event(i, i % 10, "click", now) for i in range(10000)] driver.table_client.bulk_upsert("/local/events", rows, column_types) # Stream all rows back def read_all(session): with session.read_table("/local/events", columns=("id", "user_id", "kind")) as it: for result_set in it: for row in result_set.rows: print(row.id, row.user_id, row.kind) driver.table_client.retry_operation_sync(read_all) # Clean up driver.table_client.drop_table("/local/events") ``` -------------------------------- ### Check YDB Health Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/AGENTS.md Verify that YDB nodes are running and healthy after starting them with chaos enabled. This step is crucial before proceeding with topic testing. ```sh docker ps --format "table {{.Names}}\t{{.Status}}" | grep ydb ``` -------------------------------- ### Scheme Service - Directory Management Source: https://context7.com/ydb-platform/ydb-python-sdk/llms.txt Demonstrates how to create, list, and describe directories and paths within YDB using the `scheme_client`. ```APIDOC ## Scheme Service — Directory Management Manage the YDB path hierarchy (databases, directories, tables, topics). ```python import ydb with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: driver.wait(timeout=5, fail_fast=True) # Create directory structure driver.scheme_client.make_directory("/local/myapp") driver.scheme_client.make_directory("/local/myapp/production") # List directory listing = driver.scheme_client.list_directory("/local/myapp") print(listing.self.name, listing.self.type) for entry in listing.children: print(entry.name, entry.type) tables = [e for e in listing.children if e.is_table()] topics = [e for e in listing.children if e.type == ydb.SchemeEntryType.TOPIC] # Describe any path entry = driver.scheme_client.describe_path("/local/myapp") print(entry.name, entry.owner, entry.size_bytes) print(entry.is_directory()) # True print(entry.is_table()) # False # Idempotent setup — tolerate "already exists" for subdir in ("tables", "topics"): try: driver.scheme_client.make_directory(f"/local/myapp/{subdir}") except ydb.SchemeError: pass # already exists driver.scheme_client.remove_directory("/local/myapp/production") # Async # await driver.scheme_client.make_directory("/local/async-app") # listing = await driver.scheme_client.list_directory("/local") ``` ``` -------------------------------- ### Initialize Driver with Keyword Arguments Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/driver.md Initialize the driver by passing connection parameters directly as keyword arguments to the Driver constructor. This is an alternative to using a DriverConfig object. ```python driver = ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) ``` -------------------------------- ### Configure OTLP Endpoint Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/tests/slo/README.md Sets the OTLP endpoint environment variable to configure metrics reporting. This example uses a specific YDB Prometheus endpoint. ```bash OTLP_ENDPOINT=http://ydb-prometheus:9090/api/v1/otlp/v1/metrics ``` -------------------------------- ### Get Synchronous Topic Client Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Instantiates a synchronous YDB driver and accesses its topic client property. Ensure environment variables for credentials are set. ```python import ydb with ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client ``` -------------------------------- ### Query Time-Series Data Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/time-series-serverless/sample-commands.txt Use curl to query the deployed serverless function. Provide parameters for start time, end time, interval, and aggregation values. ```bash curl "https://functions.yandexcloud.net/d4ek54ao5ouqq1tg5tbq?start=1615000000000&end=1615000010000&interval=1&mean=12.3&sigma=5" ``` -------------------------------- ### Enable Tracing for Async Driver Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/opentelemetry.md Call `enable_tracing()` once at startup to integrate OpenTelemetry with the async YDB driver. This setup allows for automatic trace context propagation. ```python import asyncio import ydb from ydb.opentelemetry import enable_tracing enable_tracing() async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", ) as driver: await driver.wait(timeout=5) async with ydb.aio.QuerySessionPool(driver) as pool: await pool.execute_with_retries("SELECT 1") asyncio.run(main()) ``` -------------------------------- ### explain Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/apireference.md Explains a given query, returning the query plan. This asynchronous method can take query text and parameters, and return the plan as a string or a dictionary. ```APIDOC ## async explain(query: str, parameters: dict | None = None, result_format: [QueryExplainResultFormat](#ydb.QueryExplainResultFormat) = QueryExplainResultFormat.STR) -> str | Dict[str, Any] ### Description Explains query result ### Parameters * **query**: YQL or SQL query. * **parameters**: dict with parameters and YDB types; * **result_format**: Return format: string or dict. ### Returns Parsed query plan. ### Return type str | *Dict*[str, *Any*] ``` -------------------------------- ### Get Query Execution Plan as Parsed Dictionary Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md Obtain the query execution plan as a parsed dictionary. This allows direct access to plan components like 'Plan'. ```python # Returns a parsed dict plan_dict = pool.explain_with_retries( "SELECT * FROM users WHERE id = 1", result_format=ydb.QueryExplainResultFormat.DICT, ) print(plan_dict["Plan"]) ``` -------------------------------- ### Bulk Insert Using List of Structs Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/types.md Combines ListType and StructType for efficient bulk inserts. This example defines a list of user rows and inserts them using a single query. ```python row_type = ( ydb.StructType() .add_member("id", ydb.PrimitiveType.Uint64) .add_member("name", ydb.PrimitiveType.Utf8) ) rows_type = ydb.ListType(row_type) rows = [ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, ] pool.execute_with_retries( "DECLARE $rows AS List>; " "INSERT INTO users SELECT id, name FROM AS_TABLE($rows)", parameters={"$rows": (rows, rows_type)}, ) ``` -------------------------------- ### Get Asynchronous Topic Client Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Instantiates an asynchronous YDB driver and accesses its topic client property within an async function. Ensure environment variables for credentials are set. ```python import asyncio import ydb async def main(): async with ydb.aio.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=ydb.credentials_from_env_variables(), ) as driver: await driver.wait(timeout=5, fail_fast=True) topic_client = driver.topic_client ``` -------------------------------- ### Rebuild Documentation Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/AGENTS.md After any changes to the documentation files, rebuild the HTML output and verify there are no new errors. Ensure the virtual environment is activated. ```sh source .venv/bin/activate && sphinx-build -b html docs docs/_build/html -q ``` -------------------------------- ### Table Create Command Arguments Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/tests/slo/README.md Lists the arguments and options for the `table-create` command, used to create tables in YDB for SLO testing. Specifies endpoint, database, table name, partitioning, initial data, and timeouts. ```bash python tests/slo/src/ table-create [options] Arguments: endpoint YDB endpoint to connect to db YDB database to connect to Options: -t --table-name table name to create -p-min --min-partitions-count minimum amount of partitions in table -p-max --max-partitions-count maximum amount of partitions in table -p-size --partition-size partition size in mb -c --initial-data-count amount of initially created rows --write-timeout write timeout milliseconds --batch-size amount of new records in each create request --threads number of threads to use ``` -------------------------------- ### Get Query Execution Plan Directly from Session Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md Retrieve the query execution plan directly using a session object. This method is an alternative to using the pool for explain operations. ```python def callee(session: ydb.QuerySession): plan = session.explain("SELECT * FROM users WHERE id = 1") print(plan) pool.retry_operation_sync(callee) ``` -------------------------------- ### Get Query Execution Plan as JSON Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md Obtain the query execution plan as a JSON string without executing the query. This is useful for understanding query performance before production deployment. ```python # Returns a JSON string plan = pool.explain_with_retries("SELECT * FROM users WHERE id = 1") print(plan) ``` -------------------------------- ### Configure Topic Reader with TopicReaderSelector Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Uses TopicReaderSelector to specify advanced reading options for a topic, such as a specific start time. This allows for precise control over where the reader begins consuming messages. ```python reader = driver.topic_client.reader( topic=ydb.TopicReaderSelector( path="/local/my-topic", read_from=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc), ), consumer="my-consumer", ) ``` -------------------------------- ### Topic Creation Command Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/tests/slo/README.md Command to create a new topic with specified path and configuration. Supports setting minimum/maximum partitions and retention period. ```bash python tests/slo/src/ topic-create [options] ``` -------------------------------- ### Configure YDB Credentials Source: https://context7.com/ydb-platform/ydb-python-sdk/llms.txt Select credential providers based on environment variables or explicit configuration. Ensure necessary packages are installed for specific credential types like Yandex Cloud IAM. ```python import os import ydb import ydb.iam import ydb.oauth2_token_exchange # From environment (recommended for cloud deployments) credentials = ydb.credentials_from_env_variables() # Anonymous (local / unauthenticated) credentials = ydb.AnonymousCredentials() # Static token / API key credentials = ydb.AccessTokenCredentials("your-iam-token") # Username + password credentials = ydb.StaticCredentials.from_user_password("user", "password") # Yandex Cloud service account key file (requires ydb[yc]) credentials = ydb.iam.ServiceAccountCredentials.from_file( os.getenv("SA_KEY_FILE") ) # Yandex Cloud Compute/Functions instance metadata (requires ydb[yc]) credentials = ydb.iam.MetadataUrlCredentials() # OAuth 2.0 token exchange (federated identity) credentials = ydb.oauth2_token_exchange.Oauth2TokenExchangeCredentials( token_endpoint="https://auth.example.com/oauth/token", audience="ydb", subject_token_source=ydb.oauth2_token_exchange.JwtTokenSource( signing_method="RS256", private_key_file="/path/to/private.pem", key_id="my-key-id", issuer="my-issuer", subject="my-subject", audience="ydb", ), ) with ydb.Driver( endpoint="grpc://localhost:2136", database="/local", credentials=credentials, ) as driver: driver.wait(timeout=5, fail_fast=True) ``` -------------------------------- ### Create Topic with Auto-Partitioning Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/topic.md Enable auto-partitioning when creating a topic to automatically scale partitions based on write throughput. Configure scaling strategy, utilization thresholds, and stabilization window. ```python import datetime import ydb driver.topic_client.create_topic( "/local/my-topic", consumers=["my-consumer"], max_active_partitions=100, auto_partitioning_settings=ydb.TopicAutoPartitioningSettings( strategy=ydb.TopicAutoPartitioningStrategy.SCALE_UP, up_utilization_percent=70, down_utilization_percent=30, stabilization_window=datetime.timedelta(seconds=60), ), ) ``` -------------------------------- ### Fetch Results in Apache Arrow Format Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/query.md Retrieve query results in Apache Arrow IPC format for analytics workloads. Requires 'pyarrow' to be installed. Decode schema and read record batches. ```python import pyarrow as pa import ydb result_sets = pool.execute_with_retries( "SELECT id, name, score FROM users ORDER BY id LIMIT 1000", result_set_format=ydb.QueryResultSetFormat.ARROW, ) for result_set in result_sets: schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema)) batch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema) df = batch.to_pandas() print(df.head()) ``` -------------------------------- ### Initialize YDB Driver Source: https://github.com/ydb-platform/ydb-python-sdk/blob/main/docs/quickstart.md Initialize the YDB driver with your endpoint and database. It's recommended to use environment variables for credentials. ```python endpoint = "grpc://localhost:2136" # your ydb endpoint database = "/local" # your ydb database with ydb.Driver( endpoint=endpoint, database=database, credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) ```