### Install Dependencies and Start Development Server Source: https://github.com/apache/paimon/blob/master/docs/README.md Installs project dependencies using Yarn and starts the Docusaurus development server for hot-reloading. ```bash # Install dependencies yarn install # Start development server (with hot reload) yarn start ``` -------------------------------- ### Quick Start: Running HDFS E2E Tests with Docker Source: https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/tests/e2e/hdfs/README.md This snippet outlines the steps to bring up an HDFS cluster using Docker Compose, install the Paimon package with HDFS extras, run the end-to-end tests, and tear down the cluster. ```sh # 1. Bring up a single-NameNode + single-DataNode cluster. docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml up -d # Wait ~30s for the cluster to become healthy; check with: docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml ps # 2. Install the package with the hdfs extra. pip install -e '.[hdfs]' # 3. Run the tests. PYPAIMON_HDFS_E2E_URL=hdfs://localhost:8020 \ python -m pytest pypaimon/tests/e2e/hdfs/ -v # 4. Teardown. docker compose -f pypaimon/tests/e2e/hdfs/docker-compose.yml down -v ``` -------------------------------- ### Install Daft, Paimon, and Ray Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/daft.md Install `pypaimon` with both Daft and Ray extras for executing Daft plans on Ray. ```bash pip install 'pypaimon[daft,ray]' ``` -------------------------------- ### Example: Synchronize Initial Set of Tables Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mysql-cdc.md This example synchronizes an initial set of tables ('product', 'user', 'address') from a MySQL database. It configures the warehouse, database, MySQL connection, catalog, and table sink properties. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mysql_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --mysql_conf hostname=127.0.0.1 \ --mysql_conf username=root \ --mysql_conf password=123456 \ --mysql_conf database-name=source_db \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 \ --including_tables 'product|user|address' ``` -------------------------------- ### Example: Compact Database with Flink Action Jar Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx This example demonstrates how to compact a specific database using the Flink Action Jar, including S3 catalog configurations. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ compact_database \ --warehouse s3:///path/to/warehouse \ --including_databases test_db \ --catalog_conf s3.endpoint=https://****.com \ --catalog_conf s3.access-key=***** \ --catalog_conf s3.secret-key=***** ``` -------------------------------- ### Install Daft with Paimon Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/daft.md Install the `pypaimon` library with Daft support using pip. ```bash pip install 'pypaimon[daft]' ``` -------------------------------- ### Compact Historical Partitions for Databases (Flink Action Jar - Example) Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx Example command to compact historical partitions for tables in the 'test_db' database. It specifies the warehouse path, partition idle time, and provides S3 catalog configurations. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ compact_database \ --warehouse s3:///path/to/warehouse \ --including_databases test_db \ --partition_idle_time 1d \ --catalog_conf s3.endpoint=https://****.com \ --catalog_conf s3.access-key=***** \ --catalog_conf s3.secret-key=***** ``` -------------------------------- ### Query Data Using BTree Index (Python SDK) Source: https://github.com/apache/paimon/blob/master/docs/docs/multimodal-table/global-index/btree.mdx This Python SDK example demonstrates how to build a filter using PredicateBuilder for an IN clause and apply it to a table read. Ensure you have the pypaimon library installed. ```python from pypaimon.common.predicate_builder import PredicateBuilder table = catalog.get_table("db.my_table") read_builder = table.new_read_builder() read_builder = read_builder.with_filter( PredicateBuilder(table.fields) .is_in("name", ["a200", "a300"]) ) scan = read_builder.new_scan() read = read_builder.new_read() pa_table = read.to_arrow(scan.plan().splits()) print(pa_table) ``` -------------------------------- ### Example Kafka Sync Table Command Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/kafka-cdc.mdx An example of the kafka_sync_table command with specific configurations for warehouse, database, table, partitioning, primary keys, computed columns, Kafka source, catalog, and table sink. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ kafka_sync_table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition_keys pt \ --primary_keys pt,uid \ --computed_column '_year=year(age)' \ --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \ --kafka_conf topic=order \ --kafka_conf properties.group.id=123456 \ --kafka_conf value.format=canal-json \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Install Paimon Connector for Trino Source: https://github.com/apache/paimon/blob/master/docs/docs/ecosystem/trino.md Installs the Paimon connector by extracting the plugin archive to the Trino plugin directory. Ensure you replace `` with the correct version. ```bash tar -zxf paimon-trino--@@VERSION@@-plugin.tar.gz -C ${TRINO_HOME}/plugin ``` -------------------------------- ### Install PyPaimon Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/index.md Install the PyPaimon package using pip. This command installs the core library. ```shell pip install pypaimon ``` -------------------------------- ### Example: Synchronize Entire MongoDB Database Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mongo-cdc.md A practical example demonstrating the synchronization of an entire MongoDB database to Paimon. It includes essential configurations for warehouse, database, MongoDB connection details, Paimon catalog, and table sink properties. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mongodb_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --mongodb_conf hosts=127.0.0.1:27017 \ --mongodb_conf username=root \ --mongodb_conf password=123456 \ --mongodb_conf database=source_db \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Get Help for compact_database Action Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx Run the compact_database action with the --help flag to view available options and usage instructions. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ compact_database --help ``` -------------------------------- ### Pulsar Table Synchronization Example Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/pulsar-cdc.md An example of running the pulsar_sync_table action with specific configurations for warehouse, database, table, partition keys, computed columns, Pulsar source configurations, catalog configurations, and table sink configurations. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ pulsar_sync_table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition_keys pt \ --primary_keys pt,uid \ --computed_column '_year=year(age)' \ --pulsar_conf topic=order \ --pulsar_conf value.format=canal-json \ --pulsar_conf pulsar.client.serviceUrl=pulsar://127.0.0.1:6650 \ --pulsar_conf pulsar.admin.adminUrl=http://127.0.0.1:8080 \ --pulsar_conf pulsar.consumer.subscriptionName=paimon-tests \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Example: Synchronize Entire MySQL Database Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mysql-cdc.md This example demonstrates how to synchronize an entire MySQL database named 'source_db' to a Paimon database, configuring connection details, catalog, and table sink properties. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mysql_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --mysql_conf hostname=127.0.0.1 \ --mysql_conf username=root \ --mysql_conf password=123456 \ --mysql_conf database-name=source_db \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Example: Trigger Full Compaction with Flink SQL Source: https://github.com/apache/paimon/blob/master/docs/docs/learn-paimon/understand-files.mdx A simplified Flink SQL example to trigger full compaction on table 'T'. ```sql CALL sys.compact('T'); ``` -------------------------------- ### Example: Synchronize Specific Tables and Recover from Savepoint Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mysql-cdc.md This example shows how to synchronize specific tables ('product', 'user', 'address', 'order', 'custom') from a MySQL database. It also illustrates recovering a job from a savepoint to add new tables to synchronization without restarting the entire process. ```bash /bin/flink run \ --fromSavepoint savepointPath \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mysql_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --mysql_conf hostname=127.0.0.1 \ --mysql_conf username=root \ --mysql_conf password=123456 \ --mysql_conf database-name=source_db \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --including_tables 'product|user|address|order|custom' ``` -------------------------------- ### Install PyPaimon with Mosaic Format Support Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/index.md Install PyPaimon with optional support for the Mosaic format, which is optimized for wide tables. ```shell pip install pypaimon[mosaic] ``` -------------------------------- ### Install PyPaimon with HDFS Support Source: https://github.com/apache/paimon/blob/master/paimon-python/README.md Installs PyPaimon with the optional HDFS extra, enabling HDFS access without a local Hadoop installation. ```commandline pip install 'pypaimon[hdfs]' ``` -------------------------------- ### Install PyPaimon with SQL Support Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/sql.md Install PyPaimon along with the necessary dependencies for SQL query support. This command installs pypaimon-rust, which bundles DataFusion. ```shell pip install pypaimon[sql] ``` -------------------------------- ### Example: Migrate Hive Table using Flink Action Source: https://github.com/apache/paimon/blob/master/docs/docs/migration/migration-from-hive.mdx An example demonstrating the Flink Action command for migrating a Hive table. This shows a concrete usage of the command. ```bash /flink run ./paimon-flink-action-@@VERSION@@.jar migrate_table \ --warehouse /path/to/warehouse \ --catalog_conf uri=thrift://localhost:9083 \ --catalog_conf metastore=hive \ --source_type hive \ --database default ``` -------------------------------- ### Filesystem Catalog Configuration Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/cli.md Example configuration for a PyPaimon filesystem catalog. Specify 'filesystem' as the metastore and provide the path to your warehouse. ```yaml metastore: filesystem warehouse: /path/to/warehouse ``` -------------------------------- ### Start Spark SQL with Paimon OBS Catalog Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/filesystems.mdx Configure and start Spark SQL with Paimon catalog integration for OBS. Place the required Paimon JARs in Spark's jars directory. ```shell spark-sql \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.warehouse=obs:/// \ --conf spark.sql.catalog.paimon.fs.obs.endpoint=obs-endpoint-hostname \ --conf spark.sql.catalog.paimon.fs.obs.access.key=xxx \ --conf spark.sql.catalog.paimon.fs.obs.secret.key=yyy ``` -------------------------------- ### Example: Compact Specific Partitions with Minor Strategy Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx An example of submitting a compaction job using Flink Action Jar, specifying multiple partitions, table configuration for parallelism, a minor compaction strategy, and S3 catalog configurations. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ compact \ --warehouse s3:///path/to/warehouse \ --database test_db \ --table test_table \ --partition dt=20221126,hh=08 \ --partition dt=20221127,hh=09 \ --table_conf sink.parallelism=10 \ --compact_strategy minor \ --catalog_conf s3.endpoint=https://****.com \ --catalog_conf s3.access-key=***** \ --catalog_conf s3.secret-key=***** ``` -------------------------------- ### Install PyPaimon with Lance Format Support Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/index.md Install PyPaimon with optional support for the Lance format, optimized for ML and vector search. ```shell pip install pypaimon[lance] ``` -------------------------------- ### Start Flink SQL Client Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/quick-start.mdx Launch the Flink SQL client to interact with the Flink cluster via SQL. ```bash /bin/sql-client.sh ``` -------------------------------- ### Example: Synchronize Multiple PostgreSQL Tables Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/postgres-cdc.md This example demonstrates synchronizing data from multiple PostgreSQL tables (source_table1, source_table2) into a single Paimon table (test_table). It specifies partition keys, primary keys, a computed column, detailed PostgreSQL connection configurations, Hive catalog configuration, and Paimon table sink configurations. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ postgres_sync_table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition_keys pt \ --primary_keys pt,uid \ --computed_column '_year=year(age)' \ --postgres_conf hostname=127.0.0.1 \ --postgres_conf username=root \ --postgres_conf password=123456 \ --postgres_conf database-name='source_db' \ --postgres_conf schema-name='public' \ --postgres_conf table-name='source_table1|source_table2' \ --postgres_conf slot.name='paimon_cdc' \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Compact Historical Partitions for Databases (Flink SQL - Example) Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx Example Flink SQL call to compact historical partitions for tables within a specific database ('test_db') using a 'combined' mode and an idle time of '1 day'. ```sql -- history partition compact table CALL sys.compact_database( includingDatabases => 'test_db', mode => 'combined', partition_idle_time => '1 d' ); ``` -------------------------------- ### Build PyPaimon from Source Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/index.md Build the source package for PyPaimon. After building, install the package from the generated tar.gz file. ```commandline python3 setup.py sdist ``` ```commandline pip3 install dist/*.tar.gz ``` -------------------------------- ### Configuring File Index Columns Source: https://github.com/apache/paimon/blob/master/docs/docs/primary-key-table/query-performance.md Configures which columns to use for file indexing. This example shows configuration for Bloom Filter and Bitmap indexes. ```sql ALTER TABLE table_name config ( 'file-index.bloom-filter.columns' = 'col1,col2', 'file-index.bitmap.columns' = 'col3' ); ``` -------------------------------- ### Start CPU Metrics Collector (Bash) Source: https://github.com/apache/paimon/blob/master/paimon-benchmark/paimon-cluster-benchmark/README.md Script to activate the CPU metrics collector on worker nodes. Must be run from the master node. ```bash paimon-benchmark/bin/setup_cluster.sh ``` -------------------------------- ### Multi-Catalog Query Example Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/sql.md Register multiple catalogs with different configurations and perform cross-catalog queries, such as joining tables from different catalogs. ```python ctx = SQLContext() ctx.register_catalog("a", {"warehouse": "/path/to/warehouse_a"}) ctx.register_catalog("b", { "metastore": "rest", "uri": "http://localhost:8080", "warehouse": "warehouse_b", }) ctx.set_current_catalog("a") ctx.set_current_database("default") # Cross-catalog join batches = ctx.sql(""" SELECT a_users.name, b_orders.amount FROM a.default.users AS a_users JOIN b.default.orders AS b_orders ON a_users.id = b_orders.user_id """) ``` -------------------------------- ### Create Streaming Source Table Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/sql-lookup.mdx Set up a temporary streaming table, for example, from Kafka, which includes a processing time attribute for the join. ```sql CREATE TEMPORARY TABLE orders ( order_id INT, total INT, customer_id INT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'வைக்', 'properties.bootstrap.servers' = '...', -- Replace with your Kafka bootstrap servers 'format' = 'csv' ... -- Other Kafka connector options ); ``` -------------------------------- ### Create Table with Partitioning and Bucketing Source: https://github.com/apache/paimon/blob/master/docs/docs/learn-paimon/understand-files.mdx Example of creating a table with specified partitioning and bucket configuration. Adjust 'bucket' to control the number of files per partition. ```sql CREATE TABLE MyTable ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING, PRIMARY KEY (dt, hh, user_id) NOT ENFORCED ) PARTITIONED BY (dt, hh) WITH ( 'bucket' = '10' ); ``` -------------------------------- ### Build and Serve Production Version Source: https://github.com/apache/paimon/blob/master/docs/README.md Builds the production-ready static site and serves it locally for preview. ```bash # Production build yarn build # Preview the production build locally yarn serve ``` -------------------------------- ### Synchronize Multiple Pulsar Topics to Paimon Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/pulsar-cdc.md Example demonstrating synchronization of multiple Pulsar topics ('order', 'logistic_order', 'user') into a single Paimon database. This setup is useful for consolidating data from various sources. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ pulsar_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --pulsar_conf topic=order,logistic_order,user \ --pulsar_conf value.format=canal-json \ --pulsar_conf pulsar.client.serviceUrl=pulsar://127.0.0.1:6650 \ --pulsar_conf pulsar.admin.adminUrl=http://127.0.0.1:8080 \ --pulsar_conf pulsar.consumer.subscriptionName=paimon-tests \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Force Start Flink Job for Drop Partition Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/action-jars.md Use the '--force_start_flink_job' flag to ensure actions like 'drop_partition' are submitted as Flink jobs, unifying the experience regardless of the action's default behavior. This example shows dropping partitions with the flag enabled. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ drop_partition \ --warehouse \ --database \ --table \ [--partition [--partition ...]] \ [--catalog_conf [--catalog_conf ...]] --force_start_flink_job true ``` -------------------------------- ### Explain Scan Plan CLI with Filters Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Shows how to use the Paimon CLI to explain a scan plan with pushed-down filters, projections, limits, and verbose output for each split. ```bash paimon -c paimon.yaml table explain default.events \ --where "dt = '2026-05-16' AND id = 7" \ --select dt,id,val \ --limit 100 \ --verbose ``` -------------------------------- ### Install PyJindoSDK Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/pyjindosdk-support.md Install the pyjindosdk package using pip. PyPaimon will automatically use it for OSS access after installation. ```shell pip install pyjindosdk ``` -------------------------------- ### Example of Rescaling Bucket Numbers for Different Partitions Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/rescale-bucket.md Demonstrates how to set different bucket numbers for different partitions of a table. This allows for granular control over data distribution across partitions. ```sql ALTER TABLE my_table SET ('bucket' = '4'); INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01') SELECT * FROM ...; ALTER TABLE my_table SET ('bucket' = '8'); INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02') SELECT * FROM ...; ``` -------------------------------- ### Explain Scan Plan CLI Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Demonstrates how to use the Paimon CLI to explain a table's scan plan for a whole-table scan. ```bash paimon -c paimon.yaml table explain default.events ``` -------------------------------- ### Write and Read Data with Sharding Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx An example demonstrating how to create a catalog, define a schema, create a table, write data in batches, and then read specific shards. This illustrates the complete lifecycle of data management with sharding. ```python import pyarrow as pa from pypaimon import CatalogFactory, Schema # Create catalog catalog_options = {'warehouse': 'file:///path/to/warehouse'} catalog = CatalogFactory.create(catalog_options) catalog.create_database("default", False) # Define schema pa_schema = pa.schema([ ('user_id', pa.int64()), ('item_id', pa.int64()), ('behavior', pa.string()), ('dt', pa.string()), ]) # Create table and write data schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt']) catalog.create_table('default.test_table', schema, False) table = catalog.get_table('default.test_table') # Write data in two batches write_builder = table.new_batch_write_builder() # First write table_write = write_builder.new_write() table_commit = write_builder.new_commit() data1 = { 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014], 'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'], 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'], } pa_table = pa.Table.from_pydict(data1, schema=pa_schema) table_write.write_arrow(pa_table) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # Second write table_write = write_builder.new_write() table_commit = write_builder.new_commit() data2 = { 'user_id': [5, 6, 7, 8, 18], 'item_id': [1005, 1006, 1007, 1008, 1018], 'behavior': ['e', 'f', 'g', 'h', 'z'], 'dt': ['p2', 'p1', 'p2', 'p2', 'p1'], } pa_table = pa.Table.from_pydict(data2, schema=pa_schema) table_write.write_arrow(pa_table) table_commit.commit(table_write.prepare_commit()) table_write.close() table_commit.close() # Read specific shard read_builder = table.new_read_builder() table_read = read_builder.new_read() # Read shard 2 out of 3 total shards splits = read_builder.new_scan().with_shard(2, 3).plan().splits() shard_data = table_read.to_arrow(splits) # Verify shard distribution by reading all shards splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits() splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits() splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits() # Combine all shards should equal full table read all_shards_data = pa.concat_tables([ table_read.to_arrow(splits1), table_read.to_arrow(splits2), table_read.to_arrow(splits3), ]) full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits()) ``` -------------------------------- ### Start Flink Local Cluster Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/quick-start.mdx Initiate a local Flink cluster using the provided start script. ```bash /bin/start-cluster.sh ``` -------------------------------- ### Create Table Like Source Table Source: https://github.com/apache/paimon/blob/master/docs/docs/spark/sql-ddl.md Demonstrates creating a new table that inherits the schema and partitioning from an existing source table. This is useful for creating tables with identical structures. ```sql CREATE TABLE target_table LIKE source_table; ``` -------------------------------- ### Hive Table Query Example Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/filesystems.mdx Example SQL queries to interact with tables managed by Paimon via Hive metastore. ```sql SELECT * FROM test_table; SELECT COUNT(1) FROM test_table; ``` -------------------------------- ### Install PyPaimon CLI Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/cli.md Installs the PyPaimon package, which includes the command-line interface. This command makes the `paimon` executable available in your terminal. ```shell pip install paimon ``` -------------------------------- ### Create Aliyun OSS-based Paimon Catalog in Doris Source: https://github.com/apache/paimon/blob/master/docs/docs/ecosystem/doris.md This SQL snippet demonstrates creating a Paimon catalog using Aliyun OSS as the storage. Provide your bucket name and credentials. ```sql CREATE CATALOG `paimon_oss` PROPERTIES ( "type" = "paimon", "warehouse" = "oss://paimon-bucket/paimonoss", "oss.endpoint" = "oss-cn-beijing.aliyuncs.com", "oss.access_key" = "ak", "oss.secret_key" = "sk" ); ``` -------------------------------- ### Create Filesystem Catalog Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Create a Paimon catalog with a filesystem warehouse. Options are passed as strings. ```python from pypaimon import CatalogFactory # Note that keys and values are all string catalog_options = { 'warehouse': 'file:///path/to/warehouse' } catalog = CatalogFactory.create(catalog_options) ``` -------------------------------- ### Install PyPaimon with Vortex Format Support Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/index.md Install PyPaimon with optional support for the Vortex format. This requires Python version 3.11 or higher. ```shell pip install pypaimon[vortex] ``` -------------------------------- ### Basic Index Creation and Search Source: https://github.com/apache/paimon/blob/master/paimon-tantivy/paimon-tantivy-jni/README.md Demonstrates creating a Tantivy index writer, adding documents with row IDs and text, committing changes, and then performing a search returning ranked results. ```java // Create index and write documents try (TantivyIndexWriter writer = new TantivyIndexWriter("/tmp/my_index")) { writer.addDocument(1L, "Apache Paimon is a streaming data lake platform"); writer.addDocument(2L, "Tantivy is a full-text search engine written in Rust"); writer.addDocument(3L, "Paimon supports real-time data ingestion"); writer.commit(); } // Search — returns (rowId, score) pairs ranked by relevance try (TantivySearcher searcher = new TantivySearcher("/tmp/my_index")) { SearchResult result = searcher.search("paimon", 10); for (int i = 0; i < result.size(); i++) { System.out.println("rowId=" + result.getRowIds()[i] + " score=" + result.getScores()[i]); } } ``` -------------------------------- ### Implement Custom PartitionMarkDoneAction in Java Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/sql-write.mdx Provides a Java example for implementing a custom partition mark done action. This allows for custom logic when a partition is marked as complete. The `markDone` method contains the custom logic, and `close` is for cleanup. ```java package org.apache.paimon; public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction { @Override public void markDone(String partition) { // do something. } @Override public void close() {} } ``` -------------------------------- ### Example: Synchronize Specified MongoDB Tables Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mongo-cdc.md This example shows how to synchronize specific MongoDB tables into Paimon, optionally resuming from a savepoint. It utilizes the '--including_tables' option to filter which collections are synchronized. ```bash /bin/flink run \ --fromSavepoint savepointPath \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mongodb_sync_database \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --mongodb_conf hosts=127.0.0.1:27017 \ --mongodb_conf username=root \ --mongodb_conf password=123456 \ --mongodb_conf database=source_db \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --including_tables 'product|user|address|order|custom' ``` -------------------------------- ### Create Generic Paimon Catalog and Table Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/quick-start.mdx Set up a generic Paimon catalog, potentially using a Hive metastore, and create a 'word_count' table. ```sql CREATE CATALOG my_catalog WITH ( 'type'='paimon-generic', 'hive-conf-dir'='...', 'hadoop-conf-dir'='...' ); USE CATALOG my_catalog; -- create a word count table CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt BIGINT ) WITH ( 'connector'='paimon' ); ``` -------------------------------- ### Start Spark SQL with Paimon S3 Catalog Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/filesystems.mdx Configure and start Spark SQL with a Paimon catalog pointing to an S3 warehouse. Place the necessary Paimon JARs in Spark's jars directory. ```shell spark-sql \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.warehouse=s3:/// \ --conf spark.sql.catalog.paimon.s3.endpoint=your-endpoint-hostname \ --conf spark.sql.catalog.paimon.s3.access-key=xxx \ --conf spark.sql.catalog.paimon.s3.secret-key=yyy ``` -------------------------------- ### Get Help for Flink Action Compaction Source: https://github.com/apache/paimon/blob/master/docs/docs/maintenance/dedicated-compaction.mdx Run the `compact --help` command with the Paimon Flink Action Jar to display all available options and usage instructions for the compaction action. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ compact --help ``` -------------------------------- ### Start Query Service with Flink SQL Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/sql-lookup.mdx Use this Flink SQL command to start a query service for a specific Paimon table. This enhances lookup join performance by allowing Flink to fetch data directly from the service. ```sql CALL sys.query_service('database_name.table_name', parallelism); ``` -------------------------------- ### Java Example for RESTApi Usage Source: https://github.com/apache/paimon/blob/master/docs/docs/program-api/rest-api.mdx Demonstrates how to initialize and use the RESTApi class in Java. It shows how to configure options, set authentication tokens (Bear or DLF), and list tables in a database. ```java import org.apache.paimon.options.Options; import org.apache.paimon.rest.RESTApi; import java.util.List; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_ID; import static org.apache.paimon.rest.RESTCatalogOptions.DLF_ACCESS_KEY_SECRET; import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN; import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER; import static org.apache.paimon.rest.RESTCatalogOptions.URI; public class RESTApiExample { public static void main(String[] args) { Options options = new Options(); options.set(URI, ""); options.set(WAREHOUSE, "my_instance_name"); setBearToken(options); // or setDlfToken RESTApi api = new RESTApi(options); List tables = api.listTables("my_database"); System.out.println(tables); } private static void setBearToken(Options options) { options.set(TOKEN_PROVIDER, "bear"); options.set(TOKEN, ""); } private static void setDlfToken(Options options) { options.set(TOKEN_PROVIDER, "dlf"); options.set(DLF_ACCESS_KEY_ID, ""); options.set(DLF_ACCESS_KEY_SECRET, ""); } } ``` -------------------------------- ### Merge Into - Using Source SQL for Temporary View Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/action-jars.md This example illustrates how to use '--source_sql' to create a catalog and a temporary view, which is then used as the source table for the merge-into operation. This is useful for dynamic source table creation. ```bash ./flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ merge_into \ --warehouse \ --database \ --table T \ --source_sql "CREATE CATALOG test_cat WITH (...)" \ --source_sql "CREATE TEMPORARY VIEW test_cat.`default`.S AS SELECT order_id, price, 'important' FROM important_order" \ --source_table test_cat.default.S \ --on "T.id = S.order_id" \ --merge_actions not-matched-insert\ --not_matched_insert_values * ``` -------------------------------- ### Start Interactive SQL REPL Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/cli.md Launches an interactive SQL Read-Eval-Print Loop (REPL) session for executing multiple SQL queries. Command history is saved to `~/.paimon_history`. ```shell paimon sql ``` ```sql SHOW DATABASES; USE mydb; SHOW TABLES; SELECT count(*) AS cnt FROM users WHERE age > 18; exit ``` -------------------------------- ### List Databases using Catalog API Source: https://github.com/apache/paimon/blob/master/docs/docs/program-api/catalog-api.md Demonstrates how to list all available databases using the Catalog API. Ensure a Catalog instance is properly created before calling this method. ```java import org.apache.paimon.catalog.Catalog; import java.util.List; public class ListDatabases { public static void main(String[] args) { Catalog catalog = CreateCatalog.createFilesystemCatalog(); List databases = catalog.listDatabases(); } } ``` -------------------------------- ### Example: Synchronize MySQL Shards Using Regex Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mysql-cdc.md Synchronize data from MySQL tables that match a regular expression across multiple databases into a single Paimon table. This example shows how to capture tables named 'source_table' from databases matching 'source_db.+' pattern. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mysql_sync_table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition_keys pt \ --primary_keys pt,uid \ --computed_column '_year=year(age)' \ --mysql_conf hostname=127.0.0.1 \ --mysql_conf username=root \ --mysql_conf password=123456 \ --mysql_conf database-name='source_db.+' \ --mysql_conf table-name='source_table' \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Example: Synchronize Multiple MySQL Tables Source: https://github.com/apache/paimon/blob/master/docs/docs/cdc-ingestion/mysql-cdc.md Synchronize data from multiple specified MySQL tables into a single Paimon table. This example demonstrates how to configure the action to pull data from 'source_table1' and 'source_table2' within the 'source_db' database, applying partitioning, primary keys, and computed columns. ```bash /bin/flink run \ /path/to/paimon-flink-action-@@VERSION@@.jar \ mysql_sync_table \ --warehouse hdfs:///path/to/warehouse \ --database test_db \ --table test_table \ --partition_keys pt \ --primary_keys pt,uid \ --computed_column '_year=year(age)' \ --mysql_conf hostname=127.0.0.1 \ --mysql_conf username=root \ --mysql_conf password=123456 \ --mysql_conf database-name='source_db' \ --mysql_conf table-name='source_table1|source_table2' \ --catalog_conf metastore=hive \ --catalog_conf uri=thrift://hive-metastore:9083 \ --table_conf bucket=4 \ --table_conf changelog-producer=input \ --table_conf sink.parallelism=4 ``` -------------------------------- ### Create a Database Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Use `create_database` to create a new database in the catalog. Set `ignore_if_exists` to `True` to avoid errors if the database already exists. Optional properties can be provided as a dictionary. ```python catalog.create_database( name='database_name', ignore_if_exists=True, # To raise error if the database exists, set False properties={'key': 'value'} # optional database properties ) ``` -------------------------------- ### now Source: https://github.com/apache/paimon/blob/master/docs/generated/temporal_functions.html Gets the timestamp when ingesting the record. The output type is TIMESTAMP_LTZ(3). ```APIDOC ## now() ### Description Gets the timestamp when ingesting the record. The output type is TIMESTAMP_LTZ(3). ### Method GET ### Endpoint / ### Response #### Success Response (200) - **timestamp** (TIMESTAMP_LTZ(3)) - The current timestamp. ``` -------------------------------- ### Stream Write and Commit Example Source: https://github.com/apache/paimon/blob/master/docs/docs/program-api/java-api.mdx Demonstrates writing records and committing them continuously using the StreamTableWrite and StreamTableCommit APIs. This example is suitable for distributed tasks where commit identifiers must be managed and incremented for each commit. It also includes a commented-out section for handling commit retries after failures using filterAndCommit. ```java import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.StreamTableCommit; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.StreamWriteBuilder; import java.util.List; public class StreamWriteTable { public static void main(String[] args) throws Exception { // 1. Create a WriteBuilder (Serializable) Table table = GetTable.getTable(); StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder(); // 2. Write records in distributed tasks StreamTableWrite write = writeBuilder.newWrite(); // commitIdentifier like Flink checkpointId long commitIdentifier = 0; while (true) { GenericRow record1 = GenericRow.of(BinaryString.fromString("Alice"), 12); GenericRow record2 = GenericRow.of(BinaryString.fromString("Bob"), 5); GenericRow record3 = GenericRow.of(BinaryString.fromString("Emily"), 18); // If this is a distributed write, you can use writeBuilder.newWriteSelector. // WriteSelector determines to which logical downstream writers a record should be written to. // If it returns empty, no data distribution is required. write.write(record1); write.write(record2); write.write(record3); List messages = write.prepareCommit(false, commitIdentifier); commitIdentifier++; // 3. Collect all CommitMessages to a global node and commit StreamTableCommit commit = writeBuilder.newCommit(); commit.commit(commitIdentifier, messages); // 4. When failure occurs and you're not sure if the commit process is successful, // you can use `filterAndCommit` to retry the commit process. // Succeeded commits will be automatically skipped. /* Map> commitIdentifiersAndMessages = new HashMap<>(); commitIdentifiersAndMessages.put(commitIdentifier, messages); commit.filterAndCommit(commitIdentifiersAndMessages); */ Thread.sleep(1000); } } } ``` -------------------------------- ### Get Paimon Database Information Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/cli.md Retrieve and display database information in JSON format. ```shell paimon db get mydb ``` -------------------------------- ### Paimon On-Disk File Structure Example Source: https://github.com/apache/paimon/blob/master/docs/docs/concepts/spec/index.md Illustrates the typical directory and file layout created by Paimon on disk for a table named 'my_table'. This structure includes data files, index files, manifest lists, and schema definitions. ```shell warehouse └── default.db └── my_table ├── bucket-0 │ └── data-59f60cb9-44af-48cc-b5ad-59e85c663c8f-0.orc ├── index │ └── index-5625e6d9-dd44-403b-a738-2b6ea92e20f1-0 ├── manifest │ ├── index-manifest-5d670043-da25-4265-9a26-e31affc98039-0 │ ├── manifest-6758823b-2010-4d06-aef0-3b1b597723d6-0 │ ├── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-0 │ └── manifest-list-9f856d52-5b33-4c10-8933-a0eddfaa25bf-1 ├── schema │ └── schema-0 └── snapshot ├── EARLIEST ├── LATEST └── snapshot-1 ``` -------------------------------- ### Create Table As Select - Primary Key and Partition Source: https://github.com/apache/paimon/blob/master/docs/docs/flink/sql-ddl.md Creates a new table 'my_table_all_as' with both partitioning on 'dt' and a primary key on 'dt,hh'. This combines the benefits of data organization and efficient querying. ```sql CREATE TABLE my_table_all ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING, PRIMARY KEY (dt, hh, user_id) NOT ENFORCED ) PARTITIONED BY (dt, hh); CREATE TABLE my_table_all_as WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM my_table_all; ``` -------------------------------- ### Get Table Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Retrieves a table from Paimon by its identifier. This allows you to access and interact with an existing table. ```APIDOC ## Get Table ```python table = catalog.get_table('database_name.table_name') ``` ``` -------------------------------- ### Read Data into Pandas DataFrame Source: https://github.com/apache/paimon/blob/master/docs/docs/pypaimon/python-api.mdx Read all data from the splits into a pandas.DataFrame. Requires the 'pandas' library to be installed. ```python table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) ``` -------------------------------- ### Python SDK Vector Search Source: https://github.com/apache/paimon/blob/master/docs/docs/multimodal-table/global-index/vector.mdx Example of performing a vector search using the Paimon Python SDK. ```APIDOC ## Python SDK Vector Search ### Description Demonstrates how to perform a vector search using the Paimon Python SDK, including an example with a scalar filter. ### Method `table.new_vector_search_builder()` ### Endpoint N/A (This is an SDK method call) ### Parameters - **`with_vector_column(str vector_column)`**: Specifies the vector column. - **`with_query_vector(list query_vector)`**: Sets the query vector. - **`with_limit(int limit)`**: Sets the maximum number of results. - **`with_option(str key, str value)`**: Adds a search option (e.g., `ivf.nprobe`). - **`with_filter(Predicate predicate)`**: Applies a scalar filter before the search. - **`execute_local()`**: Executes the search and returns a result object. ### Reading Results - Use `read_builder.new_scan().with_global_index_result(result).plan()` to get a scan plan. - Use `table_read.to_arrow(plan.splits())` to convert the results to an Arrow table. ### Request Example (Basic Search) ```python table = catalog.get_table("db.my_table") # Step 1: Build vector search result = ( table.new_vector_search_builder() .with_vector_column("embedding") .with_query_vector([1.0, 2.0, 3.0]) .with_limit(5) .with_option("ivf.nprobe", "16") .execute_local() ) # Step 2: Read matching rows using the search result read_builder = table.new_read_builder() scan = read_builder.new_scan().with_global_index_result(result) plan = scan.plan() table_read = read_builder.new_read() pa_table = table_read.to_arrow(plan.splits()) print(pa_table) ``` ### Request Example (With Scalar Filter) ```python from pypaimon.common.predicate_builder import PredicateBuilder predicate = ( PredicateBuilder(table.fields) .equal("category", "electronics") ) result = ( table.new_vector_search_builder() .with_vector_column("embedding") .with_query_vector([1.0, 2.0, 3.0]) .with_limit(5) .with_filter(predicate) .execute_local() ) ``` ```