### Navigate to Flink Examples Project Directory Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Change the current working directory to the root of the cloned `flink-table-api-java-examples` project. ```bash cd flink-table-api-java-examples ``` -------------------------------- ### Start Interactive Flink Table API JShell Session Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Launch JShell with the project's compiled JAR on the classpath and an initialization script. This command sets up an interactive environment for experimenting with the Flink Table API. ```bash jshell --class-path ./target/flink-table-api-java-examples-1.0.jar --startup ./jshell-init.jsh ``` -------------------------------- ### Execute Flink Table API Example from JAR File Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Run a specific Flink Table API example, such as `Example_00_HelloWorld`, directly from the generated JAR file. The program is read-only and prints results to the console without affecting existing Kafka clusters. ```bash cd target java -cp flink-table-api-java-examples-1.0.jar io.confluent.flink.examples.table.Example_00_HelloWorld ``` -------------------------------- ### Clone Confluent Flink Table API Java Examples Repository Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Clone the `flink-table-api-java-examples` repository from GitHub to your local machine to access the example code. ```bash git clone https://github.com/confluentinc/flink-table-api-java-examples.git ``` -------------------------------- ### Motivating Example: Flink Table API Program on Confluent Cloud Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md This comprehensive Java example illustrates the fundamental structure of a Flink Table API program designed to run on Confluent Cloud. It demonstrates how to initialize the TableEnvironment using ConfluentSettings, execute simple 'Hello world!' statements with both Table API and SQL, filter data from a source table ('examples.marketplace.clicks'), print schema and explain plans, collect and test materialized results using ConfluentTools, and perform aggregations or data piping between tables. The code highlights the integration with Confluent Cloud's REST API for Flink operations. ```java import io.confluent.flink.plugin.*; import org.apache.flink.table.api.*; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*; import java.util.List; // A table program... // - runs in a regular main() method // - uses Apache Flink's APIs // - communicates to Confluent Cloud via REST calls public static void main(String[] args) { // Set up the connection to Confluent Cloud EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); TableEnvironment env = TableEnvironment.create(settings); // Run your first Flink statement in Table API env.fromValues(row("Hello world!")).execute().print(); // Or use SQL env.sqlQuery("SELECT 'Hello world!'").execute().print(); // Structure your code with Table objects - the main ingredient of Table API. Table table = env.from("examples.marketplace.clicks").filter($("user_agent").like("Mozilla%")); table.printSchema(); table.printExplain(); // Use the provided tools to test on a subset of the streaming data List expected = ConfluentTools.collectMaterialized(table, 50); List actual = List.of(Row.of(42, 500)); if (!expected.equals(actual)) { // Print all data System.out.println("Results don't match"); System.out.println( expected.stream().map(Row::toString).collect(Collectors.joining("\n"))); // Or access nested data System.out.println("First row: " + expected.get(0).getFieldAs("user_id")); } // Access your Kafka topics or start with the built-in examples // with unbounded data sets env.from("examples.marketplace.clicks") .groupBy($("user_id")) .select($("user_id"), $("view_time").sum()) .execute() .print(); // Or pipe data from A to B TablePipeline pipeline = env.from("A").select(withAllColumns()).insertInto("B"); // Asynchronously // pipeline.execute(); // Or synchronously // pipeline.execute().await(); ``` -------------------------------- ### Load Confluent Flink Table API Configuration from Command-Line Arguments Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Demonstrates how to initialize ConfluentSettings directly from command-line arguments passed to the main method of a Java application. This provides a straightforward way to consume external configuration. ```java public static void main(String[] args) { ConfluentSettings settings = ConfluentSettings.fromArgs(args); } ``` -------------------------------- ### Expected Flink Example Console Output Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md An example of the console output when running a Flink Table API example, indicating a common configuration error where a parameter like 'client.organization-id' is not found. ```text Exception in thread "main" io.confluent.flink.plugin.ConfluentFlinkException: Parameter 'client.organization-id' not found. ``` -------------------------------- ### Build Flink Table API Examples JAR with Maven Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Build a runnable JAR file for the project using Maven. This command cleans previous builds and packages the project. Java 11 or higher is required, and the included Maven wrapper (`mvnw`) ensures a consistent Maven version. ```bash ./mvnw clean package ``` -------------------------------- ### Creating a Managed Confluent Table Descriptor in Java Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Demonstrates how to programmatically create a `TableDescriptor` for a managed table in Confluent Cloud using `ConfluentTableDescriptor.forManaged()`. This example defines a schema with integer columns and a watermark based on the `$rowtime` system column, then registers the table in the environment. ```java TableDescriptor descriptor = ConfluentTableDescriptor.forManaged() .schema( Schema.newBuilder() .column("i", DataTypes.INT()) .column("s", DataTypes.INT()) .watermark("$rowtime", $("$rowtime").minus(lit(5).seconds())) // Access $rowtime system column .build()) .build(); env.createTable("t1", descriptor); ``` -------------------------------- ### Confluent Cloud Configuration Parameters for cloud.properties Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Details for configuring the `src/main/resources/cloud.properties` file, which is essential for establishing a connection to Confluent Cloud. Each parameter's purpose and its source location within the Confluent Cloud Console are provided. ```APIDOC client.organization-id: Your Confluent Cloud organization ID. Source: Menu -> Settings -> Organizations (https://confluent.cloud/settings/organizations) client.environment-id: Your Confluent Cloud environment ID. Source: Menu -> Environments (https://confluent.cloud/environments) client.cloud: The cloud provider for your compute pool. Source: Menu -> Environments -> your environment -> Flink -> your compute pool client.region: The region for your compute pool. Source: Menu -> Environments -> your environment -> Flink -> your compute pool client.compute-pool-id: The ID of your Flink compute pool. Source: Menu -> Environments -> your environment -> Flink -> your compute pool client.flink-api-key: Your Flink API key. Source: Menu -> Settings -> API keys (https://confluent.cloud/settings/api-keys) client.flink-api-secret: Your Flink API secret. Source: Menu -> Settings -> API keys (https://confluent.cloud/settings/api-keys) ``` -------------------------------- ### Set FLINK_PROPERTIES Environment Variable for JShell Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Before starting JShell, set the `FLINK_PROPERTIES` environment variable to point to the `cloud.properties` file, ensuring that the Flink Table API environment picks up the necessary configuration. ```bash export FLINK_PROPERTIES=./src/main/resources/cloud.properties ``` -------------------------------- ### Execute 'Hello World' SQL Query in Flink JShell Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Run a basic SQL query 'SELECT 'Hello world!'' using the pre-initialized `TableEnvironment` object, `env`, within the JShell interactive session. The result will be printed to the console. ```java env.executeSql("SELECT 'Hello world!'").print(); ``` -------------------------------- ### Load Confluent Flink Table API Configuration from Environment Variables Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Illustrates how to retrieve ConfluentSettings from globally set environment variables. This method is convenient for containerized deployments or environments where variables are managed externally. ```java ConfluentSettings settings = ConfluentSettings.fromGlobalVariables(); ``` -------------------------------- ### Confluent Flink Client: Endpoint Configuration Relationship and Defaults Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Clarifies the mutual exclusivity between `client.endpoint-template` and `client.rest-endpoint`, and describes the default behavior when neither is explicitly configured for the Flink statement API. ```APIDOC Relationship and Default Behavior: 1. Mutual Exclusivity: - `client.endpoint-template` and `client.rest-endpoint` cannot be set simultaneously 2. Default Behavior: - If neither `client.rest-endpoint` nor `client.endpoint-template` is configured, the default template `https://flink.{region}.{cloud}.confluent.cloud` is used for statement API - If endpoint templates are used, each endpoint is constructed independently with the provided templates ``` -------------------------------- ### Configure Confluent Flink Table API via Command-Line Arguments Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Explains how to pass Confluent Cloud connection parameters directly as command-line arguments when executing the Flink Table API program. This method is useful for dynamic configuration during deployment or testing. ```bash java -jar my-table-program.jar \ --cloud aws \ --region us-east-1 \ --flink-api-key key \ --flink-api-secret secret \ --organization-id b0b21724-4586-4a07-b787-d0bb5aacbf87 \ --environment-id env-z3y2x1 \ --compute-pool-id lfcp-8m03rm ``` -------------------------------- ### Apply Multi-layered Configuration for Confluent Flink Table API Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Demonstrates how to combine configuration options from CLI arguments, environment variables, and code using ConfluentSettings. This approach allows for a flexible precedence order, where code settings override environment variables, and CLI arguments/properties files have the highest precedence. ```java public static void main(String[] args) { // Args might set cloud, region, org, env, and compute pool. // Environment variables might pass key and secret. // Code sets the session name and SQL-specific options. ConfluentSettings settings = ConfluentSettings.newBuilder(args) .setContextName("MyTableProgram") .setOption("sql.local-time-zone", "UTC") .build(); TableEnvironment env = TableEnvironment.create(settings); } ``` -------------------------------- ### Configure Confluent Flink Table API via Properties File Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Illustrates how to store Confluent Cloud connection options in a cloud.properties file. This method allows for externalizing sensitive or frequently changed configuration parameters, improving maintainability and security. ```properties # Cloud region client.cloud=aws client.region=us-east-1 # Access & compute resources client.flink-api-key=key client.flink-api-secret=secret client.organization-id=b0b21724-4586-4a07-b787-d0bb5aacbf87 client.environment-id=env-z3y2x1 client.compute-pool-id=lfcp-8m03rm ``` -------------------------------- ### Load Confluent Flink Table API Configuration from Properties File Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Shows how to load ConfluentSettings from a properties file, either from an arbitrary file system location or from a resource within the JAR package. The FLINK_PROPERTIES environment variable can also specify the file path. ```java // Arbitrary file location in file system ConfluentSettings settings = ConfluentSettings.fromFile("/path/to/cloud.properties"); // Part of the JAR package (in src/main/resources) ConfluentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); ``` -------------------------------- ### Confluent Flink Client: Endpoint Template Configuration (`client.endpoint-template`) Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Explains the recommended method for configuring the Flink statement API endpoint URL using a template. It supports placeholders for region and cloud, and specifies its default value and environment variable. ```APIDOC client.endpoint-template: Default: https://flink.{region}.{cloud}.confluent.cloud Example: https://flinkpls-dom123.{region}.{cloud}.confluent.cloud Usage: The template supports placeholders `{region}` and `{cloud}` that are replaced with the configured region and cloud provider values. Environment Variable: ENDPOINT_TEMPLATE ``` -------------------------------- ### Confluent Flink Client: Required Configuration Options Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Lists the essential configuration properties for connecting to Confluent Flink, including cloud provider, region, API keys, organization, environment, and compute pool IDs. These properties are mandatory for client operation. ```APIDOC Property key | CLI arg | Environment variable | Required | Comment --------------------------|----------------------|----------------------|----------|------------------------------------------------------------------------------ client.cloud | --cloud | CLOUD_PROVIDER | Y | Confluent identifier for a cloud provider. For example: `aws` client.region | --region | CLOUD_REGION | Y | Confluent identifier for a cloud provider's region. For example: `us-east-1` client.flink-api-key | --flink-api-key | FLINK_API_KEY | Y | API key for Flink access. client.flink-api-secret | --flink-api-secret | FLINK_API_SECRET | Y | API secret for Flink access. client.organization | --organization | ORG_ID | Y | ID of the organization. For example: `b0b21724-4586-4a07-b787-d0bb5aacbf87` client.environment | --environment | ENV_ID | Y | ID of the environment. For example: `env-z3y2x1` client.compute-pool | --compute-pool | COMPUTE_POOL_ID | Y | ID of the compute pool. For example: `lfcp-8m03rm` ``` -------------------------------- ### Configure Confluent Flink Table API via Environment Variables Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Shows how to export Confluent Cloud connection parameters as environment variables before running the Flink Table API program. This is a common practice for production environments, allowing configuration without modifying code or deployment artifacts. ```bash export CLOUD_PROVIDER="aws" export CLOUD_REGION="us-east-1" export FLINK_API_KEY="key" export FLINK_API_SECRET="secret" export ORG_ID="b0b21724-4586-4a07-b787-d0bb5aacbf87" export ENV_ID="env-z3y2x1" export COMPUTE_POOL_ID="lfcp-8m03rm" java -jar my-table-program.jar ``` -------------------------------- ### Configure Confluent Cloud Endpoints in Java Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md This snippet demonstrates three methods for configuring Confluent Cloud endpoints using `ConfluentSettings`. The recommended approach uses endpoint templates directly or via a properties file, allowing dynamic resolution based on region and cloud. A third, discouraged option, derives endpoints from a single `rest-endpoint`. ```java // Option 1 (RECOMMENDED): Using endpoint templates // Resolved endpoints: // - Statement API: https://flinkpls-dom123.us-east-1.aws.confluent.cloud ConfluentSettings settings1 = ConfluentSettings.newBuilder() .setRegion("us-east-1") .setCloud("aws") .setEndpointTemplate("https://flinkpls-dom123.{region}.{cloud}.confluent.cloud") // Other required settings... .build(); // Option 2: Using properties file with endpoint templates // cloud.properties: // client.region=us-east-1 // client.cloud=aws // client.endpoint-template=https://flinkpls-dom123.{region}.{cloud}.confluent.cloud // Resolved endpoints: // - Statement API: https://flinkpls-dom123.us-east-1.aws.confluent.cloud ConfluentSettings settings2 = ConfluentSettings.fromResource("/cloud.properties"); // Option 3 (DISCOURAGED): Using rest-endpoint (both statement endpoint will be derived from this) // Resolved endpoints: // - Statement API: https://flink.us-east-1.aws.proxy.confluent.cloud ConfluentSettings settings3 = ConfluentSettings.newBuilder() .setRegion("us-east-1") .setCloud("aws") .setRestEndpoint("proxy.confluent.cloud") // Other required settings... .build(); ``` -------------------------------- ### Supported Flink Table API Methods on Confluent Cloud Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Lists the stable and ready-to-use API methods across various Flink Table API components, including `TableEnvironment`, `Table`, `TablePipeline`, `StatementSet`, `TableResult`, `TableConfig`, and `Expressions`. It also mentions Confluent-specific utility classes. ```APIDOC // TableEnvironment TableEnvironment.createStatementSet() TableEnvironment.createTable(String, TableDescriptor) TableEnvironment.executeSql(String) TableEnvironment.explainSql(String) TableEnvironment.from(String) TableEnvironment.fromValues(...) TableEnvironment.getConfig() TableEnvironment.getCurrentCatalog() TableEnvironment.getCurrentDatabase() TableEnvironment.listCatalogs() TableEnvironment.listDatabases() TableEnvironment.listFunctions() TableEnvironment.listTables() TableEnvironment.listTables(String, String) TableEnvironment.listViews() TableEnvironment.sqlQuery(String) TableEnvironment.useCatalog(String) TableEnvironment.useDatabase(String) // Table: SQL equivalents Table.select(...) Table.as(...) Table.filter(...) Table.where(...) Table.groupBy(...) Table.distinct() Table.join(...) Table.leftOuterJoin(...) Table.rightOuterJoin(...) Table.fullOuterJoin(...) Table.minus(...) Table.minusAll(...) Table.union(...) Table.unionAll(...) Table.intersect(...) Table.intersectAll(...) Table.orderBy(...) Table.offset(...) Table.fetch(...) Table.limit(...) Table.window(...) Table.insertInto(String) Table.executeInsert(String) // Table: API extensions Table.getResolvedSchema() Table.printSchema() Table.addColumns(...) Table.addOrReplaceColumns(...) Table.renameColumns(...) Table.dropColumns(...) Table.map(...) Table.explain() Table.printExplain() Table.execute() // TablePipeline TablePipeline.explain() TablePipeline.printExplain() TablePipeline.execute() // StatementSet StatementSet.explain() StatementSet.add(TablePipeline) StatementSet.execute() StatementSet.addInsert(String, Table) StatementSet.addInsertSql(String) StatementSet.explain() // TableResult TableResult.getJobClient().cancel() TableResult.await(...) TableResult.getResolvedSchema() TableResult.collect() TableResult.print() // TableConfig TableConfig.set(...) // Expressions Expressions.* (except for call()) // Others TableDescriptor.* FormatDescriptor.* Tumble.* Slide.* Session.* Over.* Confluent adds the following classes for more convenience: ConfluentSettings.* ConfluentTools.* ConfluentTableDescriptor.* ``` -------------------------------- ### Confluent Flink Client: REST Endpoint Configuration (`client.rest-endpoint`) Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Describes the discouraged method for specifying the base domain for REST API calls to Confluent Cloud. It outlines how the full URL is constructed, its mutual exclusivity with `client.endpoint-template`, and its environment variable. ```APIDOC client.rest-endpoint: Default: No default value Example: proxy.confluent.cloud Usage: When specified, the plugin constructs the full Flink statement API endpoint URL as `https://flink.{region}.{cloud}.{rest-endpoint}` where `{region}` and `{cloud}` are replaced with the configured region and cloud provider values. Important: `client.endpoint-template` and `client.rest-endpoint` are mutually exclusive. If both are set, an exception is thrown. Environment Variable: REST_ENDPOINT ``` -------------------------------- ### Configure Confluent Flink Table API Programmatically Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Details how to set all Confluent Cloud connection options directly within the Java code using the ConfluentSettings.newBuilder() fluent API. This method offers strong typing and compile-time validation but requires recompilation for changes. ```java ConfluentSettings settings = ConfluentSettings.newBuilder() .setCloud("aws") .setRegion("us-east-1") .setFlinkApiKey("key") .setFlinkApiSecret("secret") .setOrganizationId("b0b21724-4586-4a07-b787-d0bb5aacbf87") .setEnvironmentId("env-z3y2x1") .setComputePoolId("lfcp-8m03rm") .build(); ``` -------------------------------- ### Confluent Flink Client: Additional Configuration Options Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md Details optional configuration properties for the Confluent Flink client, such as endpoint templates, principal IDs, session contexts, statement names, and REST endpoints. These settings provide flexibility for advanced use cases. ```APIDOC Property key | CLI arg | Environment variable | Required | Comment ---------------------------|-----------------------|----------------------|----------|---------------------------------------------------------------------------------------------------------- client.endpoint-template | --endpoint-template | ENDPOINT_TEMPLATE | N | A template for the endpoint URL. For example: `https://flinkpls-dom123.{region}.{cloud}.confluent.cloud` client.principal | --principal | PRINCIPAL_ID | N | Principal that runs submitted statements. For example: `sa-23kgz4` (for a service account) client.context | --context | | N | A name for this Table API session. For example: `my_table_program` client.statement-name | --statement-name | | N | Unique name for statement submission. By default, generated using a UUID. client.rest-endpoint | --rest-endpoint | REST_ENDPOINT | N | URL to the REST endpoint. For example: `proxyto.confluent.cloud` client.catalog-cache | | | N | Expiration time for catalog objects. For example: '5 min'. '1 min' by default. '0' disables the caching. ``` -------------------------------- ### ConfluentTools Class API Reference for Flink Table API Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md This section provides a detailed API reference for the `ConfluentTools` class, which extends Flink Table API capabilities for Confluent Cloud. It outlines methods for data collection (`collectChangelog`, `collectMaterialized`), result presentation (`printChangelog`, `printMaterialized`), and statement lifecycle management (`getStatementName`, `stopStatement`). Each method's purpose, parameters, and return types are described. ```APIDOC ConfluentTools Class: Description: Adds additional methods useful for developing and testing Flink Table API programs on Confluent Cloud. Methods: collectChangelog(table: Table, limit: int = null) -> List: Description: Executes table transformations on Confluent Cloud and returns results locally as a list of changelog rows. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: table: The Table object to execute. limit: (Optional) The maximum number of rows to collect. Returns: A List of Row objects representing the changelog. collectChangelog(tableResult: TableResult, limit: int = null) -> List: Description: Executes table transformations on Confluent Cloud and returns results locally as a list of changelog rows. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: tableResult: The TableResult object to collect from. limit: (Optional) The maximum number of rows to collect. Returns: A List of Row objects representing the changelog. printChangelog(table: Table, limit: int = null) -> void: Description: Executes table transformations on Confluent Cloud and prints the results to the console in a table style. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: table: The Table object to execute. limit: (Optional) The maximum number of rows to print. Returns: void printChangelog(tableResult: TableResult, limit: int = null) -> void: Description: Executes table transformations on Confluent Cloud and prints the results to the console in a table style. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: tableResult: The TableResult object to print from. limit: (Optional) The maximum number of rows to print. Returns: void collectMaterialized(table: Table, limit: int = null) -> List: Description: Executes table transformations on Confluent Cloud and returns results locally as a materialized changelog (changes applied to an in-memory table, returned as insert-only rows). Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: table: The Table object to execute. limit: (Optional) The maximum number of rows to collect. Returns: A List of Row objects representing the materialized changelog. collectMaterialized(tableResult: TableResult, limit: int = null) -> List: Description: Executes table transformations on Confluent Cloud and returns results locally as a materialized changelog (changes applied to an in-memory table, returned as insert-only rows). Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: tableResult: The TableResult object to collect from. limit: (Optional) The maximum number of rows to collect. Returns: A List of Row objects representing the materialized changelog. printMaterialized(table: Table, limit: int = null) -> void: Description: Executes table transformations on Confluent Cloud and prints the materialized results to the console in a table style. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: table: The Table object to execute. limit: (Optional) The maximum number of rows to print. Returns: void printMaterialized(tableResult: TableResult, limit: int = null) -> void: Description: Executes table transformations on Confluent Cloud and prints the materialized results to the console in a table style. Consumes a fixed amount of rows from the returned iterator. Works on both finite and infinite input tables, stopping after the desired amount of rows for unbounded pipelines. Parameters: tableResult: The TableResult object to print from. limit: (Optional) The maximum number of rows to print. Returns: void getStatementName(tableResult: TableResult) -> String: Description: Retrieves the unique name of a statement after it has been submitted to Confluent Cloud. Parameters: tableResult: The TableResult object associated with the statement. Returns: The statement name as a String. stopStatement(tableResult: TableResult) -> void: Description: Stops a running statement on Confluent Cloud based on its TableResult object. Parameters: tableResult: The TableResult object associated with the statement to stop. Returns: void stopStatement(env: TableEnvironment, statementName: String) -> void: Description: Stops a running statement on Confluent Cloud based on its name. Parameters: env: The TableEnvironment instance. statementName: The name of the statement to stop. Returns: void ``` -------------------------------- ### Add Confluent Flink Table API Java Dependencies to Maven Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md To integrate the Apache Flink Table API and the Confluent Flink Table API Java plugin into an existing Maven project, include these dependencies in the section of your pom.xml file. Ensure flink.version and confluent-plugin.version are defined. ```xml org.apache.flink flink-table-api-java ${flink.version} io.confluent.flink confluent-flink-table-api-java-plugin ${confluent-plugin.version} ``` -------------------------------- ### Collect and Print Flink Changelog Results with ConfluentTools Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md The `ConfluentTools.collectChangelog` and `ConfluentTools.printChangelog` methods execute Flink Table API transformations on Confluent Cloud. They return results as a list of changelog rows or print them to the console, consuming a fixed amount of rows. These methods support both finite and infinite input tables, stopping after a desired row count for unbounded pipelines. ```java // On Table object Table table = env.from("examples.marketplace.customers"); List rows = ConfluentTools.collectChangelog(table, 100); ConfluentTools.printChangelog(table, 100); // On TableResult object TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers"); List rows = ConfluentTools.collectChangelog(tableResult, 100); ConfluentTools.printChangelog(tableResult, 100); ``` ```java // For finite (i.e. bounded) tables ConfluentTools.collectChangelog(table); ConfluentTools.printChangelog(table); ``` -------------------------------- ### Manage Flink Statement Lifecycle on Confluent Cloud with ConfluentTools Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md The `ConfluentTools.getStatementName` and `ConfluentTools.stopStatement` methods provide lifecycle control for Flink statements submitted to Confluent Cloud. `getStatementName` retrieves the unique identifier of a statement from a `TableResult` object. `stopStatement` allows terminating a running statement, either using its `TableResult` or directly by its statement name. ```java // On TableResult object TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers"); String statementName = ConfluentTools.getStatementName(tableResult); ConfluentTools.stopStatement(tableResult); // Based on statement name ConfluentTools.stopStatement(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql"); ``` -------------------------------- ### Collect and Print Materialized Flink Results with ConfluentTools Source: https://github.com/confluentinc/flink-table-api-java-examples/blob/master/README.md The `ConfluentTools.collectMaterialized` and `ConfluentTools.printMaterialized` methods execute Flink Table API transformations on Confluent Cloud. They return results as a materialized changelog (insert-only rows from an in-memory table) or print them. Similar to changelog methods, they consume a fixed amount of rows and work with both finite and infinite input tables. ```java // On Table object Table table = env.from("examples.marketplace.customers"); List rows = ConfluentTools.collectMaterialized(table, 100); ConfluentTools.printMaterialized(table, 100); // On TableResult object TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers"); List rows = ConfluentTools.collectMaterialized(tableResult, 100); ConfluentTools.printMaterialized(tableResult, 100); ``` ```java // For finite (i.e. bounded) tables ConfluentTools.collectMaterialized(table); ConfluentTools.printMaterialized(table); ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.