### Complete Setup Workflow Example Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Starts continuous streaming with an initial snapshot and the initialization step. This command sets up and runs pgstream in one go. ```bash # Start continuous streaming with initial snapshot and initialization step pgstream run --source postgres --source-url "postgres://user:pass@localhost:5432/source_db" --target postgres --target-url "postgres://user:pass@localhost:5432/target_db" --replication-slot "pgstream_slot" --snapshot-tables "public.*" --init ``` -------------------------------- ### Install pgstream CLI via go install Source: https://github.com/baserow/pgstream/blob/main/README.md Install the latest version of pgstream using the go install command. ```bash go install github.com/xataio/pgstream@latest ``` -------------------------------- ### Install pgstream via Homebrew Source: https://github.com/baserow/pgstream/blob/main/README.md Tap the pgstream repository and install it using Homebrew on macOS or Linux. ```bash brew tap xataio/pgstream brew install pgstream ``` -------------------------------- ### Start Docker Compose for PostgreSQL, Kafka, and OpenSearch Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Starts the necessary services (PostgreSQL, OpenSearch, Kafka) using Docker Compose for the tutorial. Ensure you have the required profiles enabled. ```bash docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile pg2os --profile kafka up ``` -------------------------------- ### Install PostgreSQL Client (Linux) Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Installs the PostgreSQL client on Linux using apt-get. This is required if 'psql not found' error occurs. ```bash apt-get install postgresql-client ``` -------------------------------- ### Start PostgreSQL Database with Docker Compose Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Starts a PostgreSQL database using docker-compose for replication. Ensure wal2json is installed on the PostgreSQL server. ```sh docker-compose -f build/docker/docker-compose.yml --profile pg2webhook up ``` -------------------------------- ### Run Pgstream with Initialization and Snapshot Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Start the pgstream replication process, ensuring initialization if needed, and perform an initial snapshot of specified tables. ```bash pgstream run -c config.yaml --init pgstream run -c config.yaml --snapshot-tables "public.users,public.orders" ``` -------------------------------- ### Complete pgstream Transformation Rules Example Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md An example of a complete transformation rules YAML file, demonstrating specific table and column transformations with various anonymizers. ```yaml transformations: infer_from_security_labels: false # whether to infer anonymization rules from Postgres `anon` security labels. Requires a live connection to the source database. Defaults to false dump_inferred_rules: false # if set, dumps the inferred anonymization rules to a file in YAML format for debugging purposes. The file will be named `inferred_anon_transformation_rules.yaml` and will be created in the directory where pgstream is run. Defaults to false validation_mode: table_level table_transformers: - schema: public table: users validation_mode: strict column_transformers: email: name: neosync_email parameters: preserve_length: true preserve_domain: true first_name: name: greenmask_firstname parameters: gender: Male username: name: greenmask_string parameters: generator: random min_length: 5 max_length: 15 symbols: "abcdefghijklmnopqrstuvwxyz1234567890" - schema: public table: orders validation_mode: relaxed column_transformers: status: name: greenmask_choice parameters: generator: random choices: ["pending", "shipped", "delivered", "cancelled"] order_date: name: greenmask_date parameters: generator: random min_value: "2020-01-01" max_value: "2025-12-31" ``` -------------------------------- ### Run all Docker Compose profiles Source: https://github.com/baserow/pgstream/blob/main/README.md Start all supported services defined in the docker-compose.yml file. ```bash docker-compose -f build/docker/docker-compose.yml up ``` -------------------------------- ### Run pgstream with Initialization and Trace Logging Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Execute this command to start pgstream with initialization and trace-level logging enabled. This provides detailed logs for debugging. ```sh pgstream run -c pg2webhook_tutorial.env --init --log-level trace ``` -------------------------------- ### Start Local Observability Stack Source: https://github.com/baserow/pgstream/blob/main/docs/Observability.md Starts the SigNoz observability stack using Docker Compose for local development and testing. Access the SigNoz dashboard at http://localhost:8080. ```bash docker-compose -f build/docker/docker-compose-signoz.yml --profile instrumentation up ``` ```bash # To start it alongside a different profile docker-compose -f build/docker/docker.yml -f build/docker/docker-compose-signoz.yml --profile pg2pg --profile instrumentation up ``` ```bash # Access SigNoz dashboard open http://localhost:8080 ``` -------------------------------- ### Initialize and Run pgstream Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md Combines initialization and running pgstream into a single command for convenience. The system automatically installs required migrations based on the configuration. ```bash pgstream run --config config.yaml --init ``` -------------------------------- ### Install PostgreSQL Client (macOS) Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Installs the PostgreSQL client on macOS using Homebrew. This is required if 'psql not found' error occurs. ```bash brew install postgresql ``` -------------------------------- ### Get pgstream Version Output Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Example output of the `pgstream version` command, displaying the pgstream version string. ```bash $pgstream version pgstream version v0.8.1 ``` -------------------------------- ### PostgreSQL to Kafka with Initial Snapshot Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Configures pgstream to capture an initial snapshot of all tables in the public schema before starting WAL replication. Includes listener and processor settings. ```shell # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" # Initial snapshot of all tables in the public schema PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*" # Processor config PGSTREAM_INJECTOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_KAFKA_WRITER_SERVERS="localhost:9092" PGSTREAM_KAFKA_TOPIC_NAME=pgstream PGSTREAM_KAFKA_TOPIC_PARTITIONS=1 PGSTREAM_KAFKA_TOPIC_REPLICATION_FACTOR=1 PGSTREAM_KAFKA_TOPIC_AUTO_CREATE=true ``` -------------------------------- ### Install Benchmark Dependencies Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Installs necessary libraries for statistical analysis in benchmarks. Modify analyze_results.py to integrate scipy.stats.ttest_rel for more robust statistical tests. ```bash pip install scipy numpy # Modify analyze_results.py to use scipy.stats.ttest_rel ``` -------------------------------- ### Initialize pgstream Separately Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md Use this command to initialize pgstream by setting up the necessary database schema and replication slot without starting the replication process immediately. ```bash pgstream init --config config.yaml ``` -------------------------------- ### Run multiple Docker Compose profiles Source: https://github.com/baserow/pgstream/blob/main/README.md Start multiple services by specifying multiple profiles, such as 'pg2pg' and 'kafka'. ```bash docker-compose -f build/docker/docker-compose.yml --profile pg2pg --profile kafka up ``` -------------------------------- ### Full Snapshot Configuration for Neon Source Source: https://github.com/baserow/pgstream/blob/main/docs/neon.md Example configuration for a full snapshot from a Neon database. It specifies tables to snapshot and disables role snapshotting. ```yaml source: postgres: url: "postgresql://pgstreamsource:password@:5432/db?sslmode=require" mode: snapshot snapshot: mode: full # schema + data tables: ["public.*"] # all tables in the public schema schema: mode: pgdump_pgrestore pgdump_pgrestore: roles_snapshot_mode: "disabled" no_owner: true no_privileges: true ``` -------------------------------- ### Install pgstream CLI via curl Source: https://github.com/baserow/pgstream/blob/main/README.md Download the latest release of pgstream for Linux AMD64 and make it executable. ```bash curl -L https://github.com/xataio/pgstream/releases/latest/download/pgstream.linux.amd64 -o pgstream chmod +x pgstream sudo mv pgstream /usr/local/bin/ ``` -------------------------------- ### Start PostgreSQL and OpenSearch with Docker Compose Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_opensearch.md Launches the PostgreSQL database and OpenSearch cluster using the provided docker-compose configuration. This sets up the necessary services for replication. ```sh docker-compose -f build/docker/docker-compose.yml --profile pg2os up ``` -------------------------------- ### Get Help for Pgstream Commands Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Display help information for the main pgstream command and its subcommands. ```bash pgstream --help pgstream init --help pgstream run --help pgstream snapshot --help pgstream status --help pgstream destroy --help ``` -------------------------------- ### HSTORE Transformer Input and Output Example Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Illustrates the transformation of an HSTORE value based on a given configuration, showing how sensitive data is masked and new values are set. ```text public_key => 12345, private_key => 12345abcdefg, email => user@email.com ``` ```text private_key => \'\\\\************\', email => use***@email.com newKey => newValue ``` -------------------------------- ### Setup Network Condition Simulation (Linux Only) Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Set up controlled network conditions for testing auto-tuning performance on Linux systems. This command requires sudo privileges and accepts predefined conditions like 'medium-jitter'. ```bash # Setup condition (requires sudo) sudo ./tools/benchmark/setup_network_conditions.sh medium-jitter ``` -------------------------------- ### Full Cloud SQL Postgres Snapshot Configuration Example Source: https://github.com/baserow/pgstream/blob/main/docs/gcp_cloudsql.md Complete YAML configuration for a full snapshot of a Cloud SQL PostgreSQL database, including table filtering and role snapshotting settings. ```yaml source: postgres: url: "postgresql://pgstreamsource:password@:5432/db" mode: snapshot snapshot: mode: full # schema + data tables: ["public.*"] # all tables in the public schema schema: mode: pgdump_pgrestore pgdump_pgrestore: roles_snapshot_mode: "disabled" no_owner: true no_privileges: true ``` -------------------------------- ### Run Continuous Data Stream Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Starts a continuous data stream from a source to a target. Requires database initialization and a replication slot. Can be configured via flags or a config file. ```bash pgstream run --source postgres --source-url --target postgres --target-url --init ``` ```bash pgstream run --source postgres --source-url --target postgres --target-url --snapshot-tables --reset ``` ```bash pgstream run --source kafka --source-url --target elasticsearch --target-url ``` ```bash pgstream run --source postgres --source-url --target kafka --target-url ``` ```bash pgstream run --config config.yaml --log-level info ``` ```bash pgstream run --config config.env ``` -------------------------------- ### Build and Run Dummy Webhook Server Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Navigates to the webhook tool directory, builds the Go executable, and then runs the server. This server listens for incoming webhook requests and prints received events. ```sh cd tools/webhook go build ./webhook ``` -------------------------------- ### Full Environment Configuration for Snapshot to PostgreSQL Tutorial Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_snapshot.md This environment file contains all necessary configurations for both the listener and processor components for the snapshot to PostgreSQL tutorial. ```sh # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*" PGSTREAM_POSTGRES_SNAPSHOT_SCHEMA_WORKERS=4 PGSTREAM_POSTGRES_SNAPSHOT_TABLE_WORKERS=4 PGSTREAM_POSTGRES_SNAPSHOT_BATCH_BYTES=83886080 PGSTREAM_POSTGRES_SNAPSHOT_WORKERS=1 PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" # Processor config PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://postgres:postgres@localhost:7654?sslmode=disable" PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=25 PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=5s PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS=true PGSTREAM_POSTGRES_WRITER_ON_CONFLICT_ACTION=nothing ``` -------------------------------- ### Validate pgstream Status Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_opensearch.md Commands to validate the pgstream configuration using either a YAML or an environment file. This check ensures the setup is valid before starting the service. ```sh # using yaml configuration file ./pgstream status -c pg2os_tutorial.yaml # using env configuration file ./pgstream status -c pg2os_tutorial.env ``` -------------------------------- ### Complete Environment Configuration for pg2pg Tutorial Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_postgres.md A comprehensive environment configuration file for the pg2pg tutorial, including listener and processor settings. ```sh # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot # Processor config PGSTREAM_POSTGRES_WRITER_TARGET_URL="postgres://postgres:postgres@localhost:7654?sslmode=disable" PGSTREAM_POSTGRES_WRITER_BATCH_SIZE=25 PGSTREAM_POSTGRES_WRITER_BATCH_TIMEOUT=5s PGSTREAM_POSTGRES_WRITER_DISABLE_TRIGGERS=true PGSTREAM_POSTGRES_WRITER_ON_CONFLICT_ACTION=nothing ``` -------------------------------- ### Initialize with Migrations Only Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md This command runs only the database migrations, creating the pgstream schema, tables, functions, and triggers, but does not create the replication slot. Useful for separate schema setup or different database credentials for migrations. ```bash pgstream init --config config.yaml --migrations-only ``` -------------------------------- ### Kafka Consumer Group Start Offset Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Sets the initial offset for the Kafka consumer group. Use 'earliest' to start from the beginning of the topic or 'latest' to start from the most recent messages. ```shell PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET=earliest ``` -------------------------------- ### Multi-Config Comparison (Recommended) Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Tests the auto-tuner against a range of manual configurations to demonstrate its superiority. Includes setting RUNS and COOLDOWN, and reviewing the results. ```bash SOURCE_URL="..." \ TARGET_URL="..." \ RUNS=5 \ COOLDOWN=90 \ ./tools/benchmark/run_multi_config_benchmark.sh # Review results cat benchmark_results/multi_config_*/comparison_report.txt ``` -------------------------------- ### pg2os_tutorial.env Configuration (With Initial Snapshot) Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_opensearch.md Environment variables for configuring pgstream with an initial snapshot. Includes listener, replication slot, snapshot settings, injector, and OpenSearch target. ```sh # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" # Initial snapshot of all tables in the public schema PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*" # Processor config PGSTREAM_INJECTOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200" PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=25 PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s ``` -------------------------------- ### Transformer Rules Example Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_transformer.md Define transformation rules for columns using YAML. This example shows transformations for 'email' and 'name' columns. ```yaml transformations: validation_mode: relaxed # Specifies the validation mode for the transformer. table_transformers: - schema: public table: test column_transformers: email: name: neosync_email parameters: preserve_length: true # Ensures the transformed email has the same length as the original. preserve_domain: true # Keeps the domain of the original email intact. email_type: fullname # Specifies the type of email transformation. name: name: greenmask_firstname parameters: generator: deterministic # Ensures the same input always produces the same output. gender: Female # Generates female names for the transformation. ``` -------------------------------- ### Full YAML Configuration for Snapshot to PostgreSQL Tutorial Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_snapshot.md This YAML configuration file provides an alternative to the environment file for setting up pgstream for the snapshot to PostgreSQL tutorial. It covers both source (listener) and target (processor) configurations. ```yaml source: postgres: url: "postgres://postgres:postgres@localhost:5432?sslmode=disable" mode: snapshot # options are replication, snapshot or snapshot_and_replication snapshot: # when mode is snapshot or snapshot_and_replication mode: full # options are data_and, schema or data tables: ["*"] # tables to snapshot, can be a list of table names or a pattern recorder: repeatable_snapshots: true # whether to repeat snapshots that have already been taken postgres_url: "postgres://postgres:postgres@localhost:5432?sslmode=disable" # URL of the database where the snapshot status is recorded snapshot_workers: 4 # number of schemas to be snapshotted in parallel data: # when mode is full or data schema_workers: 4 # number of schema tables to be snapshotted in parallel table_workers: 4 # number of workers to snapshot a table in parallel batch_bytes: 83886080 # bytes to read per batch (defaults to 80MiB) schema: # when mode is full or schema pgdump_pgrestore: clean_target_db: false # whether to clean the target database before restoring target: postgres: url: "postgres://postgres:postgres@localhost:7654?sslmode=disable" batch: timeout: 5000 # batch timeout in milliseconds size: 25 # number of messages in a batch disable_triggers: true # whether to disable triggers on the target database on_conflict_action: "nothing" # options are update, nothing or error ``` -------------------------------- ### Initialize, Check Status, and Run Pgstream Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Basic commands to initialize pgstream, check its status, and run the replication process from PostgreSQL to Kafka. ```bash pgstream init pgstream status pgstream run --source postgres --target kafka --target-url "localhost:9092" ``` -------------------------------- ### Example Webhook Payload with Metadata Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md This JSON shows an example of a webhook payload after the injector has been enabled. It includes populated metadata fields such as schema_id, table_pgstream_id, and id_col_pgstream_id. ```json { "Data": { "action": "U", "timestamp": "2025-03-13 10:18:29.264688+00", "lsn": "0/15C1CC0", "schema": "public", "table": "tutorial_test", "columns": [ { "id": "cv9al8qhi0j00i9chq8g-1", "name": "id", "type": "integer", "value": 1 }, { "id": "cv9al8qhi0j00i9chq8g-2", "name": "name", "type": "text", "value": "a" } ], "identity": [ { "id": "cv9al8qhi0j00i9chq8g-1", "name": "id", "type": "integer", "value": 1 }, { "id": "cv9al8qhi0j00i9chq8g-2", "name": "name", "type": "text", "value": "alice" } ], "metadata": { "schema_id": "cv9b12qhi0j00i9chqag", "table_pgstream_id": "cv9al8qhi0j00i9chq8g", "id_col_pgstream_id": ["cv9al8qhi0j00i9chq8g-1"], "version_col_pgstream_id": "" } } } ``` -------------------------------- ### Initialize pgstream with CLI Parameters Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_postgres.md Initializes pgstream on the source database, creating the pgstream schema and a replication slot named 'pgstream_tutorial_slot'. ```sh pgstream init --postgres-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --replication-slot pgstream_tutorial_slot ``` -------------------------------- ### Start PostgreSQL Databases with Docker Compose Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_snapshot.md Starts two PostgreSQL databases using Docker Compose. The source database runs on port 5432, and the target database runs on port 7654. ```sh # Start two PostgreSQL databases using Docker. # The source database will run on port 5432, and the target database will run on port 7654. docker-compose -f build/docker/docker-compose.yml --profile pg2pg up ``` -------------------------------- ### Configure Tables for Initial Snapshot Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Specify which tables to include in the initial snapshot. Wildcards are supported. ```shell # The following example will snapshot all tables in the `test_schema` and the table `test` from the public schema. PGSTREAM_POSTGRES_SNAPSHOT_TABLES="test_schema.* test" ``` -------------------------------- ### Input JSON for Transformation Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Example input JSON structure for the 'json' transformer. ```json { "user": { "firstname": "john", "lastname": "unknown" }, "residency": { "city": "some city", "country": "some country" }, "purchases": [ { "item": "book", "price": 10 }, { "item": "pen", "price": 2 } ] } ``` -------------------------------- ### pg2os_tutorial.env Configuration (No Initial Snapshot) Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_opensearch.md Environment variables for configuring pgstream without an initial snapshot. Includes listener, replication slot, injector, and OpenSearch target settings. ```sh # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot # Processor config PGSTREAM_INJECTOR_STORE_POSTGRES_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_OPENSEARCH_STORE_URL="http://admin:admin@localhost:9200" PGSTREAM_SEARCH_INDEXER_BATCH_SIZE=25 PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT=5s ``` -------------------------------- ### Output JSON after Transformation Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Example output JSON after applying the configured 'json' transformations. ```json { "user": { "firstname": "john", "lastname": "doe" }, "residency": { "city": "********" }, "purchases": [ { "item": "-", "price": 10 }, { "item": "-", "price": 2 } ] } ``` -------------------------------- ### Run Benchmark Suite with Defaults Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Execute the benchmark suite using default settings, which typically involves 10 paired runs. Ensure SOURCE_URL and TARGET_URL environment variables are set. ```bash # Set your database URLs export SOURCE_URL="postgresql://user:pass@source:5432/db" export TARGET_URL="postgresql://user:pass@target:5432/db" # Run with defaults (10 paired runs) ./tools/benchmark/run_benchmark.sh ``` -------------------------------- ### Configuration File Workflow Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Demonstrates the workflow for using a configuration file to manage pgstream operations, including initialization, status checks, and running the stream. ```bash # 1. Create configuration file cat > config.yaml < --replication-slot ``` -------------------------------- ### Analyze Blocked Operations with pprof Source: https://github.com/baserow/pgstream/blob/main/docs/Observability.md Use this command to analyze blocked operations. It starts an HTTP server for visualization. ```bash go tool pprof -http=:8080 http://localhost:6060/debug/pprof/block ``` -------------------------------- ### Build pgstream from source Source: https://github.com/baserow/pgstream/blob/main/README.md Clone the pgstream repository, navigate to the directory, and build the executable. ```bash git clone https://github.com/xataio/pgstream.git cd pgstream go build -o pgstream ./cmd ``` -------------------------------- ### Run pg2pg Docker Compose profile Source: https://github.com/baserow/pgstream/blob/main/README.md Start only the PostgreSQL to PostgreSQL replication services using the 'pg2pg' profile. ```bash docker-compose -f build/docker/docker-compose.yml --profile pg2pg up ``` -------------------------------- ### pg2os_tutorial.yaml Configuration (With Initial Snapshot) Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_opensearch.md YAML configuration for pgstream with an initial snapshot. Defines source (Postgres URL, snapshot mode, tables, recorder) and target (OpenSearch) settings, including batching and injector modifiers. ```yaml source: postgres: url: "postgresql://user:password@localhost:5432/mydatabase" mode: snapshot_and_replication # options are replication, snapshot or snapshot_and_replication snapshot: # when mode is snapshot or snapshot_and_replication mode: full # options are data_and, schema or data tables: ["*"] # tables to snapshot, can be a list of table names or a pattern recorder: repeatable_snapshots: true # whether to repeat snapshots that have already been taken postgres_url: "postgres://postgres:postgres@localhost:5432?sslmode=disable" # URL of the database where the snapshot status is recorded replication: replication_slot: "pgstream_tutorial_slot" target: search: engine: "opensearch" # options are elasticsearch or opensearch url: "http://localhost:9200" # URL of the search engine batch: timeout: 5000 # batch timeout in milliseconds size: 25 # number of messages in a batch modifiers: injector: enabled: true # whether to inject pgstream metadata into the WAL events ``` -------------------------------- ### PostgreSQL Anonymizer Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Example configuration for applying various anonymization functions to different columns in a PostgreSQL table. ```yaml transformations: table_transformers: - schema: public table: users column_transformers: first_name: name: pg_anonymizer parameters: anon_function: anon.fake_first_name() id: name: pg_anonymizer parameters: anon_function: anon.digest salt: salt hash_algorithm: md5 phone: name: pg_anonymizer parameters: anon_function: anon.random_phone prefix: "+1-555-" api_key: name: pg_anonymizer parameters: anon_function: anon.random_string count: 32 content: name: pg_anonymizer parameters: anon_function: anon.lorem_ipsum unit: "words" count: 50 status: name: pg_anonymizer parameters: anon_function: anon.random_in range: "ARRAY['active', 'inactive', 'pending']" ``` -------------------------------- ### OpenSearch 'public' Index Empty Result Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Example JSON output from searching the 'public' index, showing no documents. ```json { "took": 16, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 0, "relation": "eq" }, "max_score": null, "hits": [] } } ``` -------------------------------- ### Create and Populate Test Table Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_snapshot.md SQL commands to create a 'test' table with 'id' and 'name' columns and insert sample data. This data will be included in the snapshot. ```sql CREATE TABLE test(id SERIAL PRIMARY KEY, name TEXT); INSERT INTO test(name) VALUES('alice'),('bob'),('charlie'); ``` -------------------------------- ### Update Injector Configuration (Old) Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md Example of the old injector configuration format before version 1.0.0, which included a `schemalog_url`. ```yaml modifiers: injector: enabled: true schemalog_url: "..." ``` -------------------------------- ### Webhook Server Listening Message Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Indicates that the dummy webhook server has started successfully and is listening for requests on port 9910. ```text 2025-03-13T10:31:44.18911+01:00 INF logger.go:37 > listening on :9910... ``` -------------------------------- ### Create and Insert Data into Test Table Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_transformer.md Create a 'test' table with id, name, and email columns, then insert sample data and select all records. ```sql CREATE TABLE test(id SERIAL PRIMARY KEY, name TEXT, email TEXT); INSERT INTO test(name,email) VALUES('alice','alice@test.com'),('bob','bob@test.com'),('charlie','charlie@test.com'); SELECT * FROM test; ``` -------------------------------- ### OpenSearch 'pgstream' Index Schema Log Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Example JSON output from searching the 'pgstream' index, showing schema details. ```json { "took": 31, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "pgstream", "_id": "cv9esaav80ig0h81pkj0", "_score": 1.0, "_source": { "id": "cv9esaav80ig0h81pkj0", "version": 1, "schema_name": "public", "created_at": "2025-03-13 14:39:37.295222", "schema": "{\"tables\":[{\"oid\":\"16464\",\"name\":\"test\",\"columns\":[{\"name\":\"id\",\"type\":\"integer\",\"default\":\"nextval('public.test_id_seq'::regclass)\",\"nullable\":false,\"unique\":true,\"metadata\":null,\"pgstream_id\":\"cv9esaav80ig0h81pkjg-1\"},{\"name\":\"name\",\"type\":\"text\",\"nullable\":true,\"unique\":false,\"metadata\":null,\"pgstream_id\":\"cv9esaav80ig0h81pkjg-2\"}],\"primary_key_columns\":[\"id\"],\"pgstream_id\":\"cv9esaav80ig0h81pkjg\"}]}", "acked": false } } ] } } ``` -------------------------------- ### Take a Snapshot with Environment Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Initiates a data snapshot using an environment configuration file. Useful for bulk data export, creating test datasets, or backfilling data. ```bash pgstream snapshot --config config.env ``` -------------------------------- ### Multi-target Streaming to Kafka Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Streams data from PostgreSQL to Kafka. This example shows how to configure pgstream to use Kafka as a target. ```bash # Stream to Kafka pgstream run --source postgres --source-url "postgres://user:pass@localhost:5432/source_db" --target kafka --target-url "localhost:9092" --replication-slot "pgstream_slot" ``` -------------------------------- ### Run Pgstream Replication: Kafka to OpenSearch (CLI Flags) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from Kafka to OpenSearch using command-line flags. Includes initialization. ```sh # using the CLI flags pgstream run --source kafka --source-url "localhost:9092" --target opensearch --target-url "http://admin:admin@localhost:9200" --init ``` -------------------------------- ### Get Detailed Status Output with JSON Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Retrieve detailed debugging information for pgstream status checks in JSON format. ```bash pgstream status -c config.yaml --json ``` -------------------------------- ### Analyze Goroutine Analysis with pprof Source: https://github.com/baserow/pgstream/blob/main/docs/Observability.md Use this command to debug goroutine leaks or blocking operations. It starts an HTTP server for visualization. ```bash go tool pprof -http=:8080 http://localhost:6060/debug/pprof/goroutine ``` -------------------------------- ### Create a Test Table Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_postgres.md SQL command to create a sample table for replication testing. ```sql CREATE TABLE test(id SERIAL PRIMARY KEY, name TEXT); ``` -------------------------------- ### Run Pgstream Replication: Postgres to Kafka (CLI Flags) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from PostgreSQL to Kafka using command-line flags. Includes initialization. ```sh # using the CLI flags pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target kafka --target-url "localhost:9092" --init ``` -------------------------------- ### Analyze CPU Hotspots with pprof Source: https://github.com/baserow/pgstream/blob/main/docs/Observability.md Use this command to identify functions consuming the most CPU time. It starts an HTTP server for visualization. ```bash go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile ``` -------------------------------- ### Multi-target Streaming from Kafka to Elasticsearch Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Streams data from Kafka to Elasticsearch. This example demonstrates using Kafka as a source and Elasticsearch as a target. ```bash # Stream from Kafka to Elasticsearch pgstream run --source kafka --source-url "localhost:9092" --target elasticsearch --target-url "http://localhost:9200" --replication-slot "pgstream_slot" ``` -------------------------------- ### Run Pgstream Replication: Postgres to OpenSearch (CLI Flags) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from PostgreSQL to OpenSearch using command-line flags. Includes initialization. ```sh # using the CLI flags pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target opensearch --target-url "http://admin:admin@localhost:9200" --init ``` -------------------------------- ### Analyze Memory Usage with pprof Source: https://github.com/baserow/pgstream/blob/main/docs/Observability.md Use this command to find memory allocation patterns and potential leaks. It starts an HTTP server for visualization. ```bash go tool pprof -http=:8080 http://localhost:6060/debug/pprof/heap ``` -------------------------------- ### Run Pgstream Replication: PostgreSQL to PostgreSQL with Snapshot (CLI Flags) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from PostgreSQL to PostgreSQL with an initial snapshot enabled, using command-line flags. Includes initialization. ```sh # using the CLI flags pgstream run --source postgres --source-url "postgres://postgres:postgres@localhost:5432?sslmode=disable" --target postgres --target-url "postgres://postgres:postgres@localhost:7654?sslmode=disable" --snapshot-tables test --init ``` -------------------------------- ### Neosync Email Anonymization Examples Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Demonstrates different configurations for anonymizing email addresses, including preserving length/domain, and handling invalid emails. ```text Input Email | Configuration Parameters | Output Email ----------------------- | ----------------------------------------------- | ---------------------- john.doe@example.com | preserve_length: true, preserve_domain: true | abcd.efg@example.com jane.doe@company.org | preserve_length: false, preserve_domain: true | random@company.org user123@gmail.com | preserve_length: true, preserve_domain: false | abcde123@random.com invalid-email | invalid_email_action: passthrough | invalid-email invalid-email | invalid_email_action: null | NULL invalid-email | invalid_email_action: generate | generated@random.com ``` -------------------------------- ### OpenSearch 'public' Index with Inserted Data Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_kafka.md Example JSON output from searching the 'public' index after data insertion, showing synchronized documents. ```json { "took": 399, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "public-1", "_id": "cv9esaav80ig0h81pkjg_1", "_score": 1.0, "_source": { "_table": "cv9esaav80ig0h81pkjg", "cv9esaav80ig0h81pkjg-2": "alice" } }, { "_index": "public-1", "_id": "cv9esaav80ig0h81pkjg_2", "_score": 1.0, "_source": { "_table": "cv9esaav80ig0h81pkjg", "cv9esaav80ig0h81pkjg-2": "bob" } }, { "_index": "public-1", "_id": "cv9esaav80ig0h81pkjg_3", "_score": 1.0, "_source": { "_table": "cv9esaav80ig0h81pkjg", "cv9esaav80ig0h81pkjg-2": "charlie" } } ] } } ``` -------------------------------- ### Build pgstream Source: https://github.com/baserow/pgstream/blob/main/tools/autotune/benchmark/README.md Build the pgstream tool. Navigate to the pgstream directory and run the make build command. ```bash cd /path/to/pgstream make build ``` -------------------------------- ### Multi-target Streaming from Kafka to OpenSearch Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Streams data from Kafka to OpenSearch. This example shows how to configure pgstream to use Kafka as a source and OpenSearch as a target. ```bash # Stream from Kafka to OpenSearch pgstream run --source kafka --source-url "localhost:9092" --target opensearch --target-url "http://localhost:9200" --replication-slot "pgstream_slot" ``` -------------------------------- ### Initialize pgstream using a config file Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Initialize pgstream using a YAML or .env configuration file. ```bash pgstream init -c config.yaml ``` ```bash pgstream init -c config.env ``` -------------------------------- ### Configure Snapshot Schema Mode Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md In v1.0.0, the `schemalog` snapshot schema mode is removed. This example shows the required configuration using the `pgdump_pgrestore` mode. ```yaml snapshot: schema: mode: pgdump_pgrestore ``` -------------------------------- ### Run pgstream with Initialization and Trace Logging (Environment Config) Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_postgres.md Command to run pgstream with initialization enabled and trace log level using an environment configuration file. Useful for debugging. ```sh # with initialization pgstream run -c pg2pg_tutorial.env --init --log-level trace ``` -------------------------------- ### Update Injector Configuration (New) Source: https://github.com/baserow/pgstream/blob/main/docs/releases/RELEASE_NOTES_v1.md Example of the new injector configuration format in v1.0.0, where `schemalog_url` is replaced by `source_url`. For PostgreSQL sources, `source_url` is not required. ```yaml modifiers: injector: enabled: true source_url: "..." ``` -------------------------------- ### Environment Configuration (With Initial Snapshot) Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Complete environment variable configuration for pgstream with an initial snapshot of all public schema tables. ```shell # Listener config PGSTREAM_POSTGRES_LISTENER_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" PGSTREAM_POSTGRES_REPLICATION_SLOT_NAME=pgstream_tutorial_slot PGSTREAM_POSTGRES_SNAPSHOT_STORE_URL="postgres://postgres:postgres@localhost:5432?sslmode=disable" # Initial snapshot of all tables in the public schema PGSTREAM_POSTGRES_SNAPSHOT_TABLES="*" ``` -------------------------------- ### Destroy pgstream Setup Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Cleans up all resources created by pgstream init, including replication slots, schemas, functions, and triggers. Use with caution as it is destructive. ```bash pgstream destroy [flags] ``` -------------------------------- ### Validate pgstream Status with Environment Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_postgres.md Command to check the status of pgstream using an environment configuration file. This verifies that the setup is correct before running. ```sh # using env configuration file ./pgstream status -c pg2pg_tutorial.env ``` -------------------------------- ### Run PGStream with Initialization Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_transformer.md Execute the pgstream run command with initialization, specifying the configuration file and log level. ```sh # with initialization pgstream run -c pg2pg_transformer_tutorial.env --init --log-level trace ``` -------------------------------- ### Unmasking String Start - Custom Parameters Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Unmasks the beginning of a string based on a fixed number of characters, masking the rest. Useful for preserving leading identifiers. ```yaml transformations: table_transformers: - schema: public table: users column_transformers: email: name: masking parameters: type: custom unmask_end: "3" ``` -------------------------------- ### Run Pgstream Replication: Postgres to OpenSearch (Env Config) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from PostgreSQL to OpenSearch using an environment configuration file. Includes initialization and trace logging. ```sh # using the environment configuration file pgstream run -c docs/examples/pg2os.env --init --log-level trace ``` -------------------------------- ### Run Pgstream Replication: Kafka to OpenSearch (Env Config) Source: https://github.com/baserow/pgstream/blob/main/README.md Run pgstream for replication from Kafka to OpenSearch using an environment configuration file. Includes initialization and trace logging. ```sh # using the environment configuration file pgstream run -c docs/examples/kafka2os.env --init --log-level trace ``` -------------------------------- ### Email Anonymization Configuration Source: https://github.com/baserow/pgstream/blob/main/docs/transformers.md Configuration for the email transformer, which anonymizes email addresses. This example excludes a specific domain and uses a custom salt for the anonymization process. ```yaml transformations: table_transformers: - schema: public table: customers column_transformers: email: name: email parameters: exclude_domain: "example.com" salt: "helloworld" ``` -------------------------------- ### Development with Profiling Source: https://github.com/baserow/pgstream/blob/main/docs/cli.md Enables profiling for snapshot and continuous streaming operations. Performance metrics can be analyzed using Go's pprof tool. ```bash # Run snapshot with profiling enabled pgstream snapshot -c config.yaml --profile # Run continuous streaming with profiling pgstream run -c config.yaml --profile # In another terminal, analyze performance go tool pprof http://localhost:6060/debug/pprof/profile go tool pprof http://localhost:6060/debug/pprof/heap ``` -------------------------------- ### Verify Webhook Subscriptions in PostgreSQL Source: https://github.com/baserow/pgstream/blob/main/docs/tutorials/postgres_to_webhooks.md Queries the `pgstream.webhook_subscriptions` table to confirm that the subscription has been successfully registered in the database. This example shows a subscription for insert events on all tables. ```sql SELECT * FROM pgstream.webhook_subscriptions; ```