### Connect, Create Schema/Table, and Insert Data with VastDB SDK Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_vectordb/quickstart.ipynb Connect to VastDB, create or get a schema and table, and insert data using the SDK. This example includes print statements for tracking the process. Ensure all necessary variables are defined. ```python import vastdb import pyarrow as pa import datetime as dt # --- Assume these variables are defined --- # VASTDB_ENDPOINT = "your_endpoint_url" # AWS_ACCESS_KEY_ID = "your_access_key" # AWS_SECRET_ACCESS_KEY = "your_secret_key" # BUCKET_NAME = "cneundorf-db" # SCHEMA_NAME = "test" # TABLE_NAME = "vectors_101" # ------------------------------------------ session = vastdb.connect( endpoint=VASTDB_ENDPOINT, access=AWS_ACCESS_KEY_ID, secret=AWS_SECRET_ACCESS_KEY ) print("Connecting to VAST...") with session.transaction() as tx: print(f"Starting transaction {tx.txid}...") bucket = tx.bucket(BUCKET_NAME) # Get or create the schema schema = bucket.schema(SCHEMA_NAME, fail_if_missing=False) or bucket.create_schema(SCHEMA_NAME) print(f"Using schema: {SCHEMA_NAME}") # Define the table structure dimension = 5 columns = pa.schema([ ("id", pa.int64()), ("vec", pa.list_(pa.field(name="item", type=pa.float32(), nullable=False), dimension)), ('vec_timestamp', pa.timestamp('us')) ]) # Get or create the table table = schema.table(TABLE_NAME, fail_if_missing=False) or schema.create_table(TABLE_NAME, columns) print(f"Using table: {TABLE_NAME}") # Insert a few rows of data. arrow_table = pa.table(schema=columns, data=[ [1, 2, 3], [[1.0, 2.0, 3.0, 4.0, 5.0], [6.0, 7.0, 8.0, 9.0, 10.0], [11.0, 12.0, 13.0, 14.0, 15.0]], [dt.datetime(2024, 4, 10, 12, 34), dt.datetime(2024, 4, 11, 12, 34), dt.datetime(2024, 4, 13, 12, 34)] ]) # The transaction is automatically committed when exiting the context print("Inserting data...") table.insert(arrow_table) print("Data inserted.") print("Transaction committed.") ``` -------------------------------- ### Install required packages Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/query/ray_daft_vastdb.ipynb Install Ray, Daft, and the VAST DB client library. ```bash %%capture --no-stderr %pip install "ray[client,train]==2.41.0" %pip install "daft[ray]" --no-deps %pip install vastdb ``` -------------------------------- ### Set up NiFi 2.x with VastDB using Docker Source: https://vast-data.github.io/data-platform-field-docs/vast_datastore/nifi/quickstart.html This script sets up a local environment for running NiFi 2.x with VastDB extensions using Docker. It downloads the latest VastDB NiFi processor and other necessary extensions, then starts a NiFi container. Ensure the NIFI_HOST variable is correctly set for your environment. This setup is experimental and not recommended for production. ```bash # set this to the hostname or ip address where you are running NiFi export NIFI_HOST=hostname_or_ipaddress if [[ "$NIFI_HOST" == "hostname_or_ipaddress" ]]; then echo "ERROR: NIFI_HOST variable MUST be set for your environment." exit 1 fi mkdir vastdb_nifi_docker cd vastdb_nifi_docker mkdir -p nifi_extensions mkdir -p nifi_state mkdir -p nifi_db mkdir -p nifi_flowfile mkdir -p nifi_profile mkdir -p nifi_content mkdir -p nifi_provenance LATEST_RELEASE=$(python3 -c "import requests; print(requests.get('https://api.github.com/repos/vast-data/vastdb_nifi/releases/latest').json()["tag_name"].lstrip('v'))") wget -c -P nifi_extensions https://github.com/vast-data/vastdb_nifi/releases/download/v${LATEST_RELEASE}/vastdb_nifi-${LATEST_RELEASE}-linux-x86_64-py39.nar # Include parquet support wget -c -P nifi_extensions https://repo1.maven.org/maven2/org/apache/nifi/nifi-parquet-nar/2.0.0-M4/nifi-parquet-nar-2.0.0-M4.nar wget -c -P nifi_extensions https://repo1.maven.org/maven2/org/apache/nifi/nifi-hadoop-libraries-nar/2.0.0-M4/nifi-hadoop-libraries-nar-2.0.0-M4.nar wget -c -P nifi_extensions https://repo1.maven.org/maven2/org/apache/nifi/nifi-iceberg-processors-nar/2.0.0-M4/nifi-iceberg-processors-nar-2.0.0-M4.nar wget -c -P nifi_extensions https://repo1.maven.org/maven2/org/apache/nifi/nifi-iceberg-services-api-nar/2.0.0-M4/nifi-iceberg-services-api-nar-2.0.0-M4.nar wget -c -P nifi_extensions https://repo1.maven.org/maven2/org/apache/nifi/nifi-iceberg-services-nar/2.0.0-M4/nifi-iceberg-services-nar-2.0.0-M4.nar docker run --name nifi \ -p 8443:8443 \ -d \ -e NIFI_WEB_PROXY_HOST=${NIFI_HOST} \ -e SINGLE_USER_CREDENTIALS_USERNAME=admin \ -e SINGLE_USER_CREDENTIALS_PASSWORD=123456123456 \ -v ./nifi_extensions:/opt/nifi/nifi-current/nar_extensions \ -v ./nifi_state:/opt/nifi/nifi-current/state \ -v ./nifi_db:/opt/nifi/nifi-current/database_repository \ -v ./nifi_flowfile:/opt/nifi/nifi-current/flowfile_repository \ -v ./nifi_content:/opt/nifi/nifi-current/content_repository \ -v ./nifi_provenance:/opt/nifi/nifi-current/provenance_repository \ --platform linux/amd64 \ apache/nifi:2.0.0-M4 ``` -------------------------------- ### Install VASTDB SDK Source: https://vast-data.github.io/data-platform-field-docs/vast_database/spark/upserts.html Installs the library and imports the necessary functions for upsert operations. ```python ! pip install vastdb from vastdb.spark import vastdb_upsert, generate_where_clause_arbitrary ``` -------------------------------- ### Execute Base Database Setup Source: https://vast-data.github.io/data-platform-field-docs/vast_database/admin/vastdb_py_setup.html Call the `base_setup` function with your VMS host/IP, admin credentials, and the defined configuration object to initiate the Vast Database setup process. ```python base_setup( address='VMS_HOST_OR_IP', username='admin', password='123456', config=config ) ``` -------------------------------- ### Install vastpy library Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/admin/vastdb_py_setup.ipynb Install the required library to interact with the Vast VMS API. ```bash pip install vastpy ``` -------------------------------- ### Import JSON Data Example Source: https://vast-data.github.io/data-platform-field-docs/vast_database/ingestion/python_sdk_json_import.html Example workflow for reading a JSON file and connecting to a VastDB instance. ```python VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT") VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY") VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY") VASTDB_TWITTER_INGEST_BUCKET = os.getenv("VASTDB_TWITTER_INGEST_BUCKET") VASTDB_TWITTER_INGEST_SCHEMA = os.getenv("VASTDB_TWITTER_INGEST_SCHEMA") VASTDB_TWITTER_INGEST_TABLE = 'json_import' FILE_PATH = "example.json" json_data = """ {"id":1, "tstamp":"2014-01-01 00:15:00.00000", "v1":2.0, "v2":17.4} {"id":1, "tstamp":"2014-01-01 00:30:00.00000", "v1":1.9, "v2":20.2} {"id":1, "tstamp":"2014-01-01 00:45:00.00000", "v1":1.8, "v2":19.7} """ with open(FILE_PATH, 'w') as f: f.write(json_data) pa_table = read_json_record_per_line(FILE_PATH) # Connect to VastDB session = connect_to_vastdb(VASTDB_ENDPOINT, VASTDB_ACCESS_KEY, VASTDB_SECRET_KEY) ``` -------------------------------- ### Install vastpy Library Source: https://vast-data.github.io/data-platform-field-docs/vast_database/admin/vastdb_py_setup.html Install the vastpy library using pip. This is the first step before using the API. ```bash pip install vastpy ``` -------------------------------- ### Execute base_setup function Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/admin/vastdb_py_setup.ipynb Call the setup function with specific host and credential details. ```python base_setup( address='VMS_HOST_OR_IP', username='admin', password='123456', ``` -------------------------------- ### Install Dependencies and Run Script Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/admin/restore.md Commands to install required Python libraries and execute the restoration script. ```bash pip install vastdb duckdb pandas ibis pydantic logging ``` ```bash python restore_vastdb.py ``` -------------------------------- ### Install Vast DB SDK Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/sdk_ref/01_create_session.ipynb Use this command to install the Vast DB SDK quietly. Ensure you have pip available. ```bash !pip install --quiet vastdb ``` -------------------------------- ### Install Vast DB SDK Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/sdk_ref/11_importing_files_pandas.ipynb Install the required vastdb package using pip. ```bash !pip install vastdb | tail -5 ``` -------------------------------- ### Install vastdb_parq_schema_file Source: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/09_verify_parquet.html Install the necessary library to verify Parquet files for Vast DB. Use --use-pep517 for compatibility. ```bash pip3 install --upgrade --quiet git+https://github.com/snowch/vastdb_parq_schema_file.git --use-pep517 ``` -------------------------------- ### Install Python Kafka Client Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_datastore/kafka_connect.md Installs the kafka-python library required for producer and consumer scripts. ```bash python3.9 -m pip install kafka-python ``` -------------------------------- ### Setup Zeek Directories Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_kafka/zeek_to_vast_kafka.md Command to create the required directory structure for processing. ```bash mkdir -p zeek-config pcap-files zeek-logs ``` -------------------------------- ### Install VAST DB SDK Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_netcdf_import.ipynb Install the required VAST Database Python SDK package. ```bash !pip install vastdb ``` -------------------------------- ### Initialize Docker Environment Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_datastore/kafka_connect.md Commands to set the host IP address and start the Docker containers. ```bash export HOST_IP=$(hostname -I | awk '{print $1}') ``` ```bash docker-compose up -d ``` -------------------------------- ### Create Kafka Topic and Start Producer Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_kafka/trino_direct_query.md Create a Kafka topic named 'leon' and then start a console producer to stream system messages. Ensure the bootstrap server IPs are correct for your Vast cluster. ```bash ./kafka-topics.sh --create --topic leon --bootstrap-server 172.200.202.9:9092 --partitions 1 journalctl -n all -f | ./kafka-console-producer.sh --topic leon --bootstrap-server 172.200.202.9:9092,172.200.202.10:9092,172.200.202.11:9092,172.200.202.12:9092 ``` -------------------------------- ### Start Trino in Docker Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_kafka/trino_direct_query.md Starts a Trino container configured for VAST 5.1 and above using a local properties file. ```bash docker run \ --name trino \ -p 8080:8080 -d \ -v ./vast.properties:/etc/trino/catalog/vast.properties:ro \ --platform linux/amd64 \ vastdataorg/trino-vast:429 ``` -------------------------------- ### Snapshot Output Example Source: https://vast-data.github.io/data-platform-field-docs/vast_database/sdk_ref/12_snapshots.html Example output showing the structure of Bucket objects returned by the snapshots method. ```text Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_53_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_54_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_55_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_56_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_57_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_58_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_17_59_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_00_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_01_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_02_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_03_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_04_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_05_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_06_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_07_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_08_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_09_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_10_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_11_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_12_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_13_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_14_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_15_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_16_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_17_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_18_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_19_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_20_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_21_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_22_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_23_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_24_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_25_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_26_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_27_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_28_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_29_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_30_23_UTC', tx=Transaction(id=0x0737700000000724)) Bucket(name='vastdb/.snapshot/big_catalog_2024-10-13_18_31_23_UTC', tx=Transaction(id=0x0737700000000724)) ``` -------------------------------- ### Verify installed versions Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/query/ray_daft_vastdb.ipynb Check the installed versions of Ray and Daft. ```bash !pip list | grep -E "(ray|daft)" ``` -------------------------------- ### Install Schema Checker Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_grib2_import.ipynb Installs the Vast DB Parquet schema verification tool. ```bash ! pip install --upgrade --quiet git+https://github.com/snowch/vastdb_parq_schema_file.git --use-pep517 ``` -------------------------------- ### Initialize and Start StreamProcessor Source: https://vast-data.github.io/data-platform-field-docs/vast_database/ingestion/spark_streaming_python_sdk.html Instantiates the StreamProcessor and begins the streaming process. ```python processor = StreamProcessor(await_termination_seconds=60) processor.start() ``` -------------------------------- ### Install VastDB Python SDK Source: https://vast-data.github.io/data-platform-field-docs/vast_catalog/python-sdk-vast-catalog.html Installs the VastDB Python SDK using pip. Ensure you have pip available in your environment. ```bash !pip install --quiet vastdb ``` -------------------------------- ### Install vastpy Library Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_datastore/vastpy/enablecatalog.ipynb Installs the vastpy library quietly. This is a prerequisite for using the vastpy client. ```bash !pip install --quiet vastpy ``` -------------------------------- ### Install Required Libraries Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/admin/vastdb_py_setup2.ipynb Installs the necessary Python libraries for interacting with VAST DB and related services. Use these commands in your environment before importing the libraries. ```python # !pip install vastpy # !pip install vastdb # !pip install boto3 ``` -------------------------------- ### Vast Connector Configuration Example Source: https://vast-data.github.io/data-platform-field-docs/vast_database/tuning/trino/howto.html Example configuration for the Vast connector in Trino. Set parameters like connector name, endpoint, region, credentials, and parallelism settings. ```yaml vast: |- connector.name=vast endpoint=http://10.12.200.41 region=us-east-1 access_key_id={access} secret_access_key={[Vast Secret Key]} num_of_splits=64 num_of_subsplits=10 vast.http.client.request-timeout=60m vast.http.client.idle-timeout=60m data_endpoints=http://10.12.200.41,http://10.12.200.33 ``` -------------------------------- ### Python Main Execution Block Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_netcdf_import.ipynb Ensures necessary libraries are installed and then calls the main function. Requires 'vastdb', 'pandas', 'pyarrow', and 'ibis-framework' to be installed. ```python if __name__ == "__main__": # Ensure necessary libraries are installed: # pip install vastdb pandas pyarrow ibis-framework main() ``` -------------------------------- ### Start Spark Session Source: https://vast-data.github.io/data-platform-field-docs/vast_database/tuning/Spark_Predicate_Pushdown.html Configures and initializes a SparkSession with Vast DB, Iceberg, and S3A support. ```python import socket import os import pyspark from pyspark.conf import SparkConf from pyspark.sql import SparkSession import pandas as pd pd.set_option("max_colwidth", 150) conf = SparkConf() conf.setAll([ ("spark.driver.host", socket.gethostbyname(socket.gethostname())), ("spark.sql.execution.arrow.pyspark.enabled", "false"), # VASTDB ("spark.sql.catalog.ndb", 'spark.sql.catalog.ndb.VastCatalog'), ("spark.ndb.endpoint", VASTDB_ENDPOINT), ("spark.ndb.data_endpoints", VASTDB_ENDPOINT), ("spark.ndb.access_key_id", VASTDB_ACCESS_KEY), ("spark.ndb.secret_access_key", VASTDB_SECRET_KEY), ("spark.driver.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'), ("spark.executor.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'), ("spark.sql.extensions", 'ndb.NDBSparkSessionExtension'), # ICEBERG ("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.iceberg.type", "hive"), ("spark.sql.catalog.iceberg.uri", f"thrift://{DOCKER_HOST_OR_IP}:9083"), # S3A ("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"), ("fs.s3a.endpoint", S3_ENDPOINT), ("fs.s3a.access.key", S3_ACCESS_KEY), ("fs.s3a.secret.key", S3_SECRET_KEY), ("fs.s3a.endpoint.region", "vast"), ("fs.s3a.connection.ssl.enabled", "false"), # Hive ("hive.metastore.uris", f"thrift://{DOCKER_HOST_OR_IP}:9083"), ]) spark = SparkSession.builder \ .master("local") \ .appName(SPARK_APPLICATION_NAME) \ .config(conf=conf) \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sc.setLogLevel("DEBUG") import logging # Set logging for a specific class/package logging.getLogger("com.example.HelloWorldCatalog").setLevel(logging.DEBUG) print("Spark successfully loaded") ``` ```text Spark successfully loaded ``` ```python spark ``` -------------------------------- ### Snapshot Path Example Source: https://vast-data.github.io/data-platform-field-docs/vast_catalog/python-sdk-vast-catalog.html This example shows the typical naming convention for catalog snapshot paths within a bucket. These paths are used to access historical data versions. ```text vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_08_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_13_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_18_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_23_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_28_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_33_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_38_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_43_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_48_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_53_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_21_58_29 vast-big-catalog-bucket/.snapshot/bc_table_2025-01-22_22_03_29 vast-big-catalog-bucket/.snapshot/beckie_internal_snap-2025-01-22_21_58_29 vast-big-catalog-bucket/.snapshot/beckie_internal_snap-2025-01-22_22_03_29 vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_08_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_13_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_18_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_23_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_28_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_33_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_38_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_43_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_48_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_53_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_18_58_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_03_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_08_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_13_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_18_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_23_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_28_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_33_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_38_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_43_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_48_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_53_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_19_58_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_03_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_08_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_13_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_18_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_23_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_28_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_33_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_38_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_43_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_48_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_53_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_20_58_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_03_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_08_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_13_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_18_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_23_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_28_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_33_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_38_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_43_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_48_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_53_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_21_58_29_UTC vast-big-catalog-bucket/.snapshot/big_catalog_2025-01-22_22_03_29_UTC ``` -------------------------------- ### Configure VastDB Environment Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_grib2_import.ipynb Setup environment variables for VastDB connection parameters. ```python import os VASTDB_ENDPOINT = os.getenv("VASTDB_ENDPOINT") VASTDB_ACCESS_KEY = os.getenv("VASTDB_ACCESS_KEY") VASTDB_SECRET_KEY = os.getenv("VASTDB_SECRET_KEY") # Use NYT BUCKET (DB) for now VASTDB_NYT_BUCKET=os.getenv("VASTDB_NYT_BUCKET") schema_name = 'grib2' table_name = 'grib2' ``` -------------------------------- ### Start Spark Session Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/tuning/Spark_Predicate_Pushdown.ipynb Initializes a Spark session with configurations for Vast DB and Iceberg catalogs. ```python import socket import os import pyspark from pyspark.conf import SparkConf from pyspark.sql import SparkSession import pandas as pd pd.set_option("max_colwidth", 150) conf = SparkConf() conf.setAll([ ("spark.driver.host", socket.gethostbyname(socket.gethostname())), ("spark.sql.execution.arrow.pyspark.enabled", "false"), # VASTDB ("spark.sql.catalog.ndb", 'spark.sql.catalog.ndb.VastCatalog'), ("spark.ndb.endpoint", VASTDB_ENDPOINT), ("spark.ndb.data_endpoints", VASTDB_ENDPOINT), ("spark.ndb.access_key_id", VASTDB_ACCESS_KEY), ("spark.ndb.secret_access_key", VASTDB_SECRET_KEY), ("spark.driver.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'), ("spark.executor.extraClassPath", '/usr/local/spark/jars/spark3-vast-3.4.1-f93839bfa38a/*'), ("spark.sql.extensions", 'ndb.NDBSparkSessionExtension'), # ICEBERG ("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog"), ``` -------------------------------- ### Example Output for Data Insertion Source: https://vast-data.github.io/data-platform-field-docs/vast_vectordb/quickstart.html Console output confirming the transaction and data insertion process. ```text Connecting to VAST... Starting transaction 65953105481905796... Using schema: test Using table: vectors_103 Inserting data... Data inserted. Transaction committed. ``` -------------------------------- ### Consume Kafka Topics by Prefix Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_kafka/utils.md Consume messages from all topics that start with a specific prefix. This example consumes from topics starting with 'prod-'. ```bash VAST_BROKER=172.200.204.1:9092 # Example: Consume from all topics starting with 'prod-' docker run --rm jforge/kafka-tools kafka-console-consumer.sh \ --bootstrap-server $VAST_BROKER \ --whitelist 'prod-*' \ --from-beginning ``` -------------------------------- ### Get Min/Max and Max Rows Source: https://vast-data.github.io/data-platform-field-docs/vast_database/ingestion/python_sdk_netcdf_import.html Analyzes a specified column in a Vast-Data table to find its minimum and maximum values, and retrieves the row(s) containing the maximum value. Requires `ibis-framework` to be installed for fetching max rows. ```python def get_stats_and_max_rows(session, bucket_name, schema_name, table_name, column_name): """ Connects to VAST, finds min/max of a column, and retrieves row(s) with the max value. Returns: min_val, max_val, record_count, max_rows_df (Pandas DF or None) """ min_val = None max_val = None record_count = 0 max_rows_df = pd.DataFrame() # Initialize empty DataFrame if not IBIS_AVAILABLE: logging.error("Cannot fetch max rows because `ibis-framework` is not installed.") # Still attempt to get min/max if possible without ibis for filtering # You might want to return early or handle this differently # For now, we'll proceed to get min/max but won't query for max rows. pass logging.info(f"Starting analysis for '{column_name}' in {schema_name}.{table_name}...") try: # Use a single transaction for both steps if possible with session.transaction() as tx: try: table = tx.bucket(bucket_name).schema(schema_name).table(table_name) logging.info(f"Accessed table {schema_name}.{table_name}") # --- Step 1: Find Min/Max Value --- logging.info(f"Executing select for column: {column_name} to find min/max") reader = table.select(columns=[column_name]) logging.info("Reading data into PyArrow table...") arrow_table = reader.read_all() record_count = arrow_table.num_rows logging.info(f"Read {record_count} records for min/max calculation.") if record_count > 0: logging.info("Converting to Pandas DataFrame for min/max...") df_col = arrow_table.to_pandas() if column_name in df_col.columns and pd.api.types.is_numeric_dtype(df_col[column_name]): # Drop NaNs before calculating min/max if they shouldn't be considered valid_series = df_col[column_name].dropna() if not valid_series.empty: min_val = valid_series.min() max_val = valid_series.max() logging.info(f"Calculated MIN={min_val}, MAX={max_val}.") else: logging.warning(f"Column '{column_name}' contains only NaN/Null values after dropping.") else: logging.warning(f"Column '{column_name}' not found or is not numeric in the result.") del df_col # Free memory del arrow_table # Free memory else: logging.warning("Table appears to be empty or column has no data.") return min_val, max_val, record_count, max_rows_df # Return early # --- Step 2: Find Row(s) with Max Value --- if max_val is not None and IBIS_AVAILABLE: logging.info(f"Executing select for full rows where {column_name} == {max_val}") try: # Create the predicate using ibis placeholder predicate = (_[column_name] == max_val) # Select all columns (*) where the predicate matches max_rows_reader = table.select(predicate=predicate) logging.info("Reading max value rows into PyArrow table...") max_rows_arrow_table = max_rows_reader.read_all() if max_rows_arrow_table.num_rows > 0: logging.info(f"Found {max_rows_arrow_table.num_rows} row(s) with the max value.") logging.info("Converting max rows to Pandas DataFrame...") max_rows_df = max_rows_arrow_table.to_pandas() else: # This might happen with floating point inaccuracies, though less likely with == logging.warning(f"Could not find rows matching the calculated max value {max_val}. Potentially a float precision issue?") except Exception as e: logging.error(f"An error occurred during table access or data processing: {e}") # Depending on requirements, you might want to re-raise or return partial results return None, None, 0, pd.DataFrame() except Exception as e: logging.error(f"An error occurred during transaction management: {e}") return None, None, 0, pd.DataFrame() return min_val, max_val, record_count, max_rows_df ``` -------------------------------- ### Define base_setup function Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/admin/vastdb_py_setup.ipynb Configures a Vast Database by creating a VIP pool, user, S3 access keys, view, and schema. ```python from vastpy import VASTClient def base_setup(address, username, password, config): client = VASTClient(user=username, password=password, address=address) database_owner = config['user']['name'] try: client.vippools.post(**config['vippool']) print(f"VIP pool created: {config['vippool']}") except Exception as e: print(e) raise e try: client.users.post(**config['user']) print(f"User created: {config['user']}") except Exception as e: print(e) raise e user = client.users.get(name=database_owner)[0] user_id = user['id'] try: response = client.users[user_id].access_keys.post(id=user_id) access_key = response['access_key'] secret_key = response['secret_key'] print(f"Key Generated for {database_owner}") except Exception as e: print(e) raise e try: response = client.views.post( path=config['viewpath']['path'], protocols=config['viewpath']['protocols'], create_dir=config['viewpath']['create_dir'], bucket=config['viewpath']['bucket'], bucket_owner=config['viewpath']['bucket_owner'], policy_id=config['viewpath']['policy_id'] ) print(f"View Created: {config['viewpath']['path']}") except Exception as e: print(e) raise e try: response = client.schemas.post( name=config['database']['schema'], database_name=config['database']['name'] ) print("Schema Created") except Exception as e: print(e) raise e print(access_key, secret_key) ``` -------------------------------- ### Install Confluent Kafka Client Source: https://vast-data.github.io/data-platform-field-docs/vast_kafka/101/VAST-EventBroker-101.html Installs the confluent-kafka package quietly. A kernel restart may be required after installation. ```bash %pip install --quiet -U confluent-kafka ``` ```text Note: you may need to restart the kernel to use updated packages. ``` -------------------------------- ### Setup Directory Structure Source: https://vast-data.github.io/data-platform-field-docs/vast_kafka/zeek_to_vast_kafka.html Commands to create the necessary directory structure for Zeek configuration, PCAP files, and logs. ```bash mkdir -p zeek-config pcap-files zeek-logs ``` -------------------------------- ### Install VastDB and Dependencies Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_vectordb/quickstart.ipynb Installs the VastDB Python package along with pyarrow, adbc_driver_manager, and numpy. Ensure these are installed before proceeding. ```bash !pip install vastdb pyarrow adbc_driver_manager numpy ``` -------------------------------- ### Install Python Dependencies Source: https://vast-data.github.io/data-platform-field-docs/vast_database/admin/restore.html This command installs the necessary Python libraries for interacting with VastDB, DuckDB, and data manipulation. Ensure these are installed before running the restore script. ```bash pip install vastdb duckdb pandas pydantic logging ``` -------------------------------- ### Setup Dremio Docker Environment Source: https://vast-data.github.io/data-platform-field-docs/vast_database/query/dremio_quickstart/howto.html Creates a directory, downloads the Dremio connector JAR, and defines a Dockerfile for building a Dremio image with Vast Data connector support. Ensure the placeholder URL for the JAR is replaced with a public URL. ```bash mkdir dremio_docker cd dremio_docker # TODO: replace this with public URL wget https://artifactory.vastdata.com/ui/repos/tree/General/vast-custom/tabular/connectors/dre[%E2%80%A6]3/20240901/dremio-vast-data-connector-3.0.3-SNAPSHOT.jar cat <<'EOF' > Dockerfile FROM openjdk:11-jdk as run MAINTAINER Dremio LABEL org.label-schema.name='dremio/dremio-oss' LABEL org.label-schema.description='Dremio OSS.' ARG DOWNLOAD_URL=https://download.dremio.com/community-server/24.0.0-202302100528110223-3a169b7c/dremio-community-24.0.0-202302100528110223-3a169b7c.tar.gz RUN mkdir -p /opt/dremio && mkdir -p /var/lib/dremio && mkdir -p /var/run/dremio && mkdir -p /var/log/dremio && mkdir -p /opt/dremio/data && groupadd --system dremio && useradd --base-dir /var/lib/dremio --system --gid dremio dremio && chown -R dremio:dremio /opt/dremio/data && chown -R dremio:dremio /var/run/dremio && chown -R dremio:dremio /var/log/dremio && chown -R dremio:dremio /var/lib/dremio && wget -q "${DOWNLOAD_URL}" -O dremio.tar.gz && tar vxfz dremio.tar.gz -C /opt/dremio --strip-components=1 && rm -rf dremio.tar.gz ADD dremio-vast-data-connector-3.0.3-SNAPSHOT.jar /opt/dremio/jars/ EXPOSE 9047/tcp EXPOSE 31010/tcp EXPOSE 45678/tcp USER dremio WORKDIR /opt/dremio ENV DREMIO_HOME /opt/dremio ENV DREMIO_PID_DIR /var/run/dremio ENV DREMIO_GC_LOGS_ENABLED="no" ENV DREMIO_LOG_DIR="/var/log/dremio" ENV SERVER_GC_OPTS="-XX:+PrintGCDetails" ENTRYPOINT ["bin/dremio", "start-fg"] EOF ``` -------------------------------- ### Install Dependencies Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_grib2_import.ipynb Installs necessary Python libraries for data processing. ```bash ! pip install --quiet cfgrib xarray pandas pyarrow ``` -------------------------------- ### Start the Trino Docker container Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/trino/quickstart.md Run the VAST-provided Trino container, mounting the configuration file created in the previous step. ```bash docker run \ --name trino \ -p 8080:8080 -d \ -v ./vast.properties:/etc/trino/catalog/vast.properties:ro \ --platform linux/amd64 \ vastdataorg/trino-vast:429 ``` -------------------------------- ### Install VastDB Dependencies Source: https://vast-data.github.io/data-platform-field-docs/_sources/vast_database/ingestion/python_sdk_grib2_import.ipynb Installs the required Python packages for interacting with VastDB and data processing. ```bash ! pip install --quiet --upgrade vastdb numpy pyarrow pandas ```