### Install Missing JDBC Drivers for Chunjun Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Provides a solution for common compilation issues where JDBC driver packages (e.g., DB2, Dameng, GBase, ojdbc8) are not found. Users can manually install them or use the provided scripts for Windows or Unix platforms to automate the installation process. ```batch ./$CHUNJUN_HOME/bin/install_jars.bat ``` ```shell ./$CHUNJUN_HOME/bin/install_jars.sh ``` -------------------------------- ### Start ChunJun Website Local Development Server Source: https://github.com/dtstack/chunjun/blob/master/website/README.md After installing dependencies, run this command from the `website` directory to start the local development server for the ChunJun website. This allows for live preview and testing during development. ```shell yarn dev ``` -------------------------------- ### Compile Chunjun Plugins Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Compiles Chunjun plugins from source code. This process uses Maven to build the project, skipping tests, or can be executed via a dedicated build script. The complete installation package will be generated in the 'chunjun-assembly/target' directory. ```maven mvn clean package -DskipTests ``` ```shell sh build/build.sh ``` -------------------------------- ### Run Chunjun Docker Container Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Demonstrates how to run the Chunjun Docker container. It can be started with a default stream-to-stream task in standalone mode or configured to run a specific task file by mounting it into the container. ```shell docker run -p 8081:8081 dtopensource/chunjun-master ``` ```shell docker run -p 8081:8081 -v /chunjun/chunjun-examples/json/stream/stream.json:/opt/flink/job/stream.json dtopensource/chunjun-master ``` -------------------------------- ### Clone Chunjun Repository Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Clones the Chunjun project from GitHub to your local machine and navigates into the project directory, preparing for compilation or direct use. ```bash git clone https://github.com/DTStack/chunjun.git cd chunjun ``` -------------------------------- ### Oracle Source Connection Parameter Example Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/oracle/oracle-source.md A JSON configuration example for the 'connection' parameter, illustrating how to specify the JDBC URL, table, and schema for connecting to an Oracle database. ```JSON { "connection": [{ "jdbcUrl": ["jdbc:oracle:thin:@0.0.0.1:1521:orcl"], "table": ["table"], "schema":"public" }] } ``` -------------------------------- ### Build Chunjun Docker Image Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Builds a Docker image named 'chunjun-master' locally. This command simplifies the deployment process by packaging Chunjun into a container, requiring prior Maven packaging of the project. ```shell sh ./bin/chunjun-docker.sh ``` -------------------------------- ### Install Dependencies for ChunJun Website Deployment Source: https://github.com/dtstack/chunjun/blob/master/website/README.md Before deploying the ChunJun website, navigate to the `website` directory and execute this command to ensure all necessary project dependencies are installed using `yarn`. ```shell yarn ``` -------------------------------- ### SQL Example: Upsert Kafka Table Creation and Data Insertion Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/kafka/kafka-sink.md Demonstrates creating two Kafka tables (`pageviews_per_region` and `pageviews`) and then inserting data from `pageviews` into `pageviews_per_region` using an upsert mechanism. Includes examples of Debezium JSON and standard JSON formats for Kafka connectors. ```sql CREATE TABLE pageviews_per_region ( id BIGINT, col_bit BOOLEAN, col_tinyint BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka-x', 'topic' = 'pageviews_per_region_2', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE pageviews ( id BIGINT, col_bit BOOLEAN, col_tinyint BIGINT ) WITH ( 'connector' = 'kafka-x', 'topic' = 'pageviews_2', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'debezium-json' ); -- 计算 pv、uv 并插入到 upsert-kafka sink INSERT INTO pageviews_per_region SELECT id, col_bit, col_tinyint FROM pageviews; -- {"before":null,"after":{"id":1,"col_bit":true,"col_tinyint":1},"op":"c"} -- {"before":{"id":1,"col_bit":true,"col_tinyint":1},"after":{"id":1,"col_bit":true,"col_tinyint":2},"op":"u"} -- {"before":{"id":2,"col_bit":true,"col_tinyint":2},"after":null,"op":"d"} ``` -------------------------------- ### Install Dependencies for ChunJun Website Development Source: https://github.com/dtstack/chunjun/blob/master/website/README.md To prepare the ChunJun website for local development, navigate to the `website` directory and execute this command to install all necessary project dependencies using `yarn`. ```shell yarn ``` -------------------------------- ### ChunJun Documentation Image Reference Example Source: https://github.com/dtstack/chunjun/blob/master/website/README.md This example demonstrates how images are referenced within ChunJun's markdown documentation. It shows the required `/chunjun` project prefix for image paths when deployed on GitHub Pages, ensuring correct rendering. ```markdown ![image](/chunjun/doc/dirty/dirty-conf.png) ``` -------------------------------- ### Submit Chunjun Task in Local Mode Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Executes a simple stream-to-stream synchronization task directly on the local machine. This mode is ideal for quick testing and development as it does not require a Flink or Hadoop environment. ```shell sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json ``` -------------------------------- ### Compile Chunjun for Specific Platforms Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Compiles Chunjun plugins with specific Maven profiles to support different platforms, such as TDH or the default open-source Hadoop environment. This allows for tailored builds based on your deployment needs. ```maven mvn clean package -DskipTests -P default,tdh ``` ```maven mvn clean package -DskipTests -P default ``` -------------------------------- ### Build ChunJun Docker Image Source: https://github.com/dtstack/chunjun/blob/master/chunjun-docker/docker/README.md Navigates to the Docker build context and initiates the Docker image build process. The image is tagged with a user-defined image name, making it ready for deployment or pushing to a registry. ```Bash cd ./chunjun-docker/docker docker build -t ${image_name} . ``` -------------------------------- ### Compile ChunJun Plugin Packages Source: https://github.com/dtstack/chunjun/blob/master/chunjun-docker/docker/README.md Navigates into the cloned ChunJun directory and executes the build script to compile the necessary plugin packages. This step prepares the components required for the Docker image. ```Bash cd chunjun sh build/build.sh ``` -------------------------------- ### Building StreamOutputFormat with StreamOutputFormatBuilder in Chunjun Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/Contribution/How to define a plugin.md The StreamOutputFormatBuilder class is responsible for constructing and configuring instances of StreamOutputFormat. It extends BaseRichOutputFormatBuilder and provides a checkFormat method to validate the OutputFormat's configuration, ensuring proper setup before use. ```java public class StreamOutputFormatBuilder extends BaseRichOutputFormatBuilder { private StreamOutputFormat format; public StreamOutputFormatBuilder() { super.format = format = new StreamOutputFormat(); } //检查Outputformat配置 @Override protected void checkFormat() {......} } ``` -------------------------------- ### Install Missing JDBC Driver Packages for ChunJun Source: https://github.com/dtstack/chunjun/blob/master/README_CH.md This snippet provides commands to manually install JDBC driver packages (e.g., DB2, Dameng, Gbase, Ojdbc8) that might be missing during compilation. The scripts are located in the `$CHUNJUN_HOME/bin` directory and are available for both Windows and Unix platforms. ```bash ./$CHUNJUN_HOME/bin/install_jars.bat ``` ```bash ./$CHUNJUN_HOME/bin/install_jars.sh ``` -------------------------------- ### Set Binlog Read Start Position by Journal Name Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/binlog/binlog-source.md Specifies the starting position for reading the binlog file using a filename (journal name). Data collection will begin from the start of the specified file. ```APIDOC Parameter: journal-name Description: Starting position for reading the binlog file, specified as a filename. Data collection starts from the beginning of the specified file. Required: No Field Type: string Default Value: None ``` -------------------------------- ### Set Binlog Read Start Position by File Position Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/binlog/binlog-source.md Specifies the starting position for reading the binlog file using a specific position within the file. Data collection will begin from the specified position in the file. ```APIDOC Parameter: position Description: Starting position for reading the binlog file, specified as a position within the file. Data collection starts from the specified position in the file. Required: No Field Type: string Default Value: None ``` -------------------------------- ### SQL Example: Chunjun Upsert Kafka Connector Usage Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/kafka/kafka-sink.md This SQL snippet illustrates the creation of two tables, 'pageviews_per_region' and 'pageviews', using Chunjun's 'upsert-kafka-x' and 'kafka-x' connectors respectively. It defines schema, primary keys, and connector properties like topic and format. The 'INSERT INTO' statement demonstrates how to transfer data between these Kafka-backed tables, showcasing a common ETL pattern. The comments provide examples of Debezium-like JSON messages for create, update, and delete operations, indicating the expected data format for upsert functionality. ```SQL CREATE TABLE pageviews_per_region ( id BIGINT, col_bit BOOLEAN, col_tinyint BIGINT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka-x', 'topic' = 'pageviews_per_region_2', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'json', 'value.format' = 'json' ); CREATE TABLE pageviews ( id BIGINT, col_bit BOOLEAN, col_tinyint BIGINT ) WITH ( 'connector' = 'kafka-x', 'topic' = 'pageviews_2', 'properties.bootstrap.servers' = 'localhost:9092', 'value.format' = 'debezium-json' ); -- 计算 pv、uv 并插入到 upsert-kafka sink INSERT INTO pageviews_per_region SELECT id, col_bit, col_tinyint FROM pageviews; -- {"before":null,"after":{"id":1,"col_bit":true,"col_tinyint":1},"op":"c"} -- {"before":{"id":1,"col_bit":true,"col_tinyint":1},"after":{"id":1,"col_bit":true,"col_tinyint":2},"op":"u"} -- {"before":{"id":2,"col_bit":true,"col_tinyint":2},"after":null,"op":"d"} ``` -------------------------------- ### Set Binlog Read Start Position by Timestamp Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/binlog/binlog-source.md Specifies the starting position for reading the binlog file using a timestamp. Data collection will begin from the specified timestamp. ```APIDOC Parameter: timestamp Description: Starting position for reading the binlog file, specified as a timestamp. Data collection starts from the specified timestamp. Required: No Field Type: string Default Value: None ``` -------------------------------- ### Clone ChunJun Git Repository Source: https://github.com/dtstack/chunjun/blob/master/chunjun-docker/docker/README.md Clones the ChunJun project repository from GitHub to the local machine using the Git command-line tool. This is the initial step to obtain the source code. ```Bash git clone https://github.com/DTStack/chunjun.git ``` -------------------------------- ### HDFS Sink Compression Type Configuration Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/hdfs/hdfs-sink.md Defines the HDFS file compression type. 'text' supports 'GZIP', 'BZIP2'. 'orc' supports 'SNAPPY', 'GZIP', 'BZIP', 'LZ4'. 'parquet' supports 'SNAPPY', 'GZIP', 'LZO'. Note that 'SNAPPY' requires 'SnappyCodec' installation. This parameter is optional and expects a string value. ```APIDOC compress: description: hdfs文件压缩类型 options: text: 支持`GZIP`、`BZIP2`格式 orc: 支持`SNAPPY`、`GZIP`、`BZIP`、`LZ4`格式 parquet: 支持`SNAPPY`、`GZIP`、`LZO`格式 note: `SNAPPY`格式需要用户安装**SnappyCodec** required: 否 type: string default: text: 默认不进行压缩 orc: 默认为ZLIB格式 parquet: 默认为SNAPPY格式 ``` -------------------------------- ### Formatting Chunjun Project Code with Maven Spotless Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/开发者指南/如何自定义插件.md This shell command applies code formatting rules defined by the Maven Spotless plugin across the Chunjun project. It's recommended to run this command before packaging to ensure consistent code style. ```shell mvn spotless:apply ``` -------------------------------- ### Deploy ChunJun Website to GitHub Pages Source: https://github.com/dtstack/chunjun/blob/master/website/README.md To deploy the ChunJun website to GitHub Pages, first ensure all dependencies are installed by running `yarn` in the `website` directory, then execute this command. This process builds and publishes the site. ```shell yarn deploy ``` -------------------------------- ### Start Yarn Session for ChunJun Deployment Source: https://github.com/dtstack/chunjun/blob/master/README_CH.md This snippet demonstrates how to initiate a Yarn Session environment for ChunJun. It requires Flink and Hadoop environments to be pre-configured with $HADOOP_HOME and $FLINK_HOME variables. The command uploads the chunjun-dist directory to the Yarn Session. ```shell cd $FLINK_HOME/bin ./yarn-session -t $CHUNJUN_HOME -d ``` -------------------------------- ### Example Hadoop HA and Kerberos Configuration Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/hive/hive-sink.md Illustrates how to configure Hadoop High Availability (HA) and Kerberos settings within the `hadoopConfig` parameter for ChunJun, mapping `core-site.xml` and `hdfs-site.xml` properties. ```Properties 'properties.hadoop.user.name' = 'root', 'properties.dfs.ha.namenodes.ns' = 'nn1,nn2', 'properties.fs.defaultFS' = 'hdfs://ns', 'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000', 'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000', 'properties.dfs.nameservices' = 'ns', 'properties.fs.hdfs.impl.disable.cache' = 'true', 'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem' ``` -------------------------------- ### Packaging Chunjun Flink Connector with Maven Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/开发者指南/如何自定义插件.md This shell command is used to package the Chunjun project, specifically for Flink connectors, using Maven. The `-DskipTests` flag ensures that unit and integration tests are skipped during the build process, which can speed up packaging. ```shell mvn clean package -DskipTests ``` -------------------------------- ### Submit Chunjun Task in Yarn Session Mode Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Submits a Chunjun task to an existing Flink Yarn session. This mode requires both Flink and Hadoop environments to be configured, and a Yarn session must be active. The Chunjun plugin package is uploaded to the session during startup. ```shell cd $FLINK_HOME/bin ./yarn-session -t $CHUNJUN_HOME -d ``` ```shell sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"} ``` -------------------------------- ### Package and Format Flink SQL Connector Plugin Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/Contribution/How to define a plugin.md These shell commands provide instructions for packaging the Flink SQL connector plugin using Maven. It includes commands to clean and package the project while skipping tests, and to format the code using Spotless before packaging to ensure consistent code style. ```Shell mvn clean package -DskipTests ``` ```Shell mvn spotless:apply ``` -------------------------------- ### Oracle LogMiner Core Procedures and Views Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner原理.md This section documents the essential PL/SQL procedures and dynamic views used for configuring and interacting with Oracle LogMiner. It covers the processes for adding log files, starting and ending LogMiner sessions, and querying the analyzed redo data. ```APIDOC DBMS_LOGMNR.ADD_LOGFILE: Purpose: Specifies redo log files for analysis. Parameters: LOGFILENAME: Path to the redo log file. OPTIONS: DBMS_LOGMNR.NEW: Indicates the start of a new log file list. DBMS_LOGMNR.ADDFILE: Adds an additional log file to the current list. DBMS_LOGMNR.START_LOGMNR: Purpose: Initiates a LogMiner session. V$LOGMNR_CONTENTS: Purpose: View to query interested redo data after starting LogMiner. Permissions: Requires SELECT ANY TRANSACTION privilege. DBMS_LOGMNR.END_LOGMNR: Purpose: Terminates a LogMiner session. ``` -------------------------------- ### Clone ChunJun Project Repository Source: https://github.com/dtstack/chunjun/blob/master/README_CH.md This command uses the Git tool to download the ChunJun project source code from GitHub to your local machine. This is the first step to get the project ready for compilation and development. ```shell git clone https://github.com/DTStack/chunjun.git ``` -------------------------------- ### Run ChunJun Docker in Local Mode Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md This command demonstrates how to run the ChunJun master Docker container in 'local' mode using the `-m` parameter. By default, ChunJun runs in 'standalone' mode, which exposes a web UI at `localhost:8081`. In standalone mode, the container does not exit automatically and requires manual termination via `docker stop` from another terminal. ```Shell docker run -p 8081:8081 dtopensource/chunjun-master -m local ``` -------------------------------- ### Submit Chunjun Task in Standalone Mode Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md Submits a Chunjun task to a Flink Standalone cluster. This mode requires a running Flink Standalone environment but does not depend on Hadoop. Users can monitor task status via the Flink web UI. ```shell bash $FLINK_HOME/bin/start-cluster.sh ``` ```shell sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json ``` -------------------------------- ### Oracle Source Column Parameter Formats Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/oracle/oracle-source.md Examples demonstrating the three supported formats for the 'column' parameter: reading all fields, specifying only field names, and providing detailed field information including type, format, and a default value. ```Bash "column":["*"] ``` ```Text "column":["id","name"] ``` ```JSON { "column": [{ "name": "col", "type": "datetime", "format": "yyyy-MM-dd hh:mm:ss", "value": "value" }] } ``` -------------------------------- ### Configuring StreamOutputFormat with StreamOutputFormatBuilder Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/开发者指南/如何自定义插件.md The StreamOutputFormatBuilder class facilitates the construction and configuration of StreamOutputFormat instances. It extends BaseRichOutputFormatBuilder and is responsible for instantiating StreamOutputFormat and performing necessary configuration checks via the checkFormat method. This builder pattern ensures that the StreamOutputFormat is correctly initialized before being used in the data pipeline. ```Java public class StreamOutputFormatBuilder extends BaseRichOutputFormatBuilder { private StreamOutputFormat format; public StreamOutputFormatBuilder() { super.format = format = new StreamOutputFormat(); } //检查Outputformat配置 @Override protected void checkFormat() {......} } ``` -------------------------------- ### Chunjun Supported Data Types Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/kafka/kafka-sink.md Lists the data types supported and not supported by Chunjun for various operations, including primitive types, complex types, and time-related types. ```APIDOC Supported Data Types: BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, STRING, VARCHAR, CHAR, TIMESTAMP, DATE, BINARY, ARRAY, MAP, STRUCT, LIST, ROW Unsupported Data Types: Other ``` -------------------------------- ### Hadoop HA and Kerberos Configuration Example Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/hive/hive-sink.md Illustrates how to configure Hadoop High Availability (HA) and Kerberos-related settings within the Chunjun connector. These properties are typically found in 'core-site.xml' and 'hdfs-site.xml'. ```Configuration 'properties.hadoop.user.name' = 'root', 'properties.dfs.ha.namenodes.ns' = 'nn1,nn2', 'properties.fs.defaultFS' = 'hdfs://ns', 'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000', 'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider', 'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000', 'properties.dfs.nameservices' = 'ns', 'properties.fs.hdfs.impl.disable.cache' = 'true', 'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem' ``` -------------------------------- ### Copy Compiled Plugins to Docker Build Directory Source: https://github.com/dtstack/chunjun/blob/master/chunjun-docker/docker/README.md Copies the compiled Flink distribution (plugin packages) into the designated Docker image build directory. This ensures that the required artifacts are available for the Docker build process. ```Bash cp -r flink-dist ./chunjun-docker/docker ``` -------------------------------- ### Flink SQL Connector Core Interfaces and Components Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/开发者指南/如何自定义插件.md This section outlines the key interfaces and components involved in developing Flink SQL Connectors, including factories for creating table sources and sinks, and runtime providers for data processing. It details their responsibilities and relationships within the Flink SQL architecture. ```APIDOC DynamicTableFactory: - Purpose: Defines and validates table creation parameters, retrieves table metadata, defines encoding/decoding formats (optional), and creates DynamicTableSource/Sink instances. - Implementations: DynamicTableSourceFactory, DynamicTableSinkFactory DynamicTableSourceFactory: - Purpose: Factory for source tables (e.g., Kafka) and lookup tables (e.g., MySQL for dimension tables). DynamicTableSinkFactory: - Purpose: Factory for result tables (e.g., MySQL for sink). DynamicTableSource: - Purpose: Generates functions for reading data. - Implementations: - ScanTableSource: For generating RichSourceFunction (streaming) or RichInputFormat (batch/streaming) for data reading. - LookupTableSource: For generating TableFunction (full scan) or AsyncTableFunction (LRU cache) for lookup operations. - Output: Wrapped as a Provider. DynamicTableSink: - Purpose: Generates functions for writing data. - Implementations: RichSinkFunction (streaming) or RichOutputFormat (batch/streaming). - Output: Wrapped as a Provider. ScanTableSource: - Purpose: Interface to implement for source connectors that scan data. - Method: getScanRuntimeProvider() - creates DtInputFormatSourceFunction (which wraps StreamInputFormat). LookupTableSource: - Purpose: Interface to implement for source connectors that perform lookup operations (e.g., dimension tables). - Method: getLookupRuntimeProvider() - creates JdbcLruTableFunction/JdbcAllTableFunction. DtInputFormatSourceFunction: - Purpose: Wrapper class for all InputFormat implementations, handles 2PC and other features. - Note: Should not be modified casually. StreamInputFormat: - Purpose: Handles data operations (open, writeRecord, close). - Inheritance: Extends BaseRichInputFormat. BaseRichInputFormat: - Purpose: Common base class for all OutputFormat implementations. - Note: Should not be modified casually. JdbcLruTableFunction: - Purpose: Implements asynchronous lookup with LRU caching for dimension tables. - Inheritance: Extends AbstractLruTableFunction. JdbcAllTableFunction: - Purpose: Implements full-load caching for dimension tables. - Note: Not recommended for large datasets. - Inheritance: Extends AbstractAllTableFunction. ``` -------------------------------- ### Implementing Flink DynamicTableSourceFactory and DynamicTableSinkFactory Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/开发者指南/如何自定义插件.md This Java code demonstrates the implementation of `StreamDynamicTableFactory`, which acts as both a source and sink factory for Flink SQL connectors. It outlines the essential methods for creating `DynamicTableSource` and `DynamicTableSink` instances, defining a unique `factoryIdentifier`, and specifying `requiredOptions` and `optionalOptions` for table creation parameters. ```java public class StreamDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { //创建DynamicTableSource @Override public DynamicTableSource createDynamicTableSource(Context context) { } //创建DynamicTableSink @Override public DynamicTableSink createDynamicTableSink(Context context) { } //connector唯一标识符 @Override public String factoryIdentifier() { } //必选参数设置 @Override public Set> requiredOptions() { } //可选参数设置 @Override public Set> optionalOptions() { } } ``` -------------------------------- ### Run ChunJun Docker with Custom Host Mapping Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/快速开始.md When your data source involves external machines or services, it's necessary to pass their host information to the Docker container. This command illustrates how to map a custom hostname, such as `www.test.com`, to a specific IP address like `192.168.100.100` within the ChunJun Docker container using the `--add-host` parameter. This ensures the container can properly resolve and connect to external dependencies. ```Shell docker run -p 8081:8081 --add-host=www.test.com:192.168.100.100 dtopensource/chunjun-master ``` -------------------------------- ### LogMiner Dictionary Example: SQL INSERT Statement Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner原理.md This SQL statement demonstrates a standard data insertion into the HR.JOBS table. It is used in the documentation to illustrate how LogMiner processes data when a dictionary is available, showing human-readable table and column names. ```SQL INSERT INTO HR.JOBS(JOB_ID, JOB_TITLE, MIN_SALARY, MAX_SALARY) VALUES('IT_WT','Technical Writer', 4000, 11000); ``` -------------------------------- ### Start Flink Standalone Cluster Source: https://github.com/dtstack/chunjun/blob/master/README_CH.md This command initiates the Flink Standalone cluster. Upon successful startup, the Flink web UI is typically accessible on port 8081 of the current machine, allowing users to monitor the status and progress of Flink jobs. ```shell sh $FLINK_HOME/bin/start-cluster.sh ``` -------------------------------- ### Implement DynamicTableFactory for Flink SQL Connector Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/Contribution/How to define a plugin.md This Java class demonstrates how to implement the DynamicTableSourceFactory and DynamicTableSinkFactory interfaces for a Flink SQL connector. It includes methods for creating dynamic table sources and sinks, defining the connector's unique identifier, and specifying required and optional configuration parameters. ```Java public class StreamDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { //创建DynamicTableSource @Override public DynamicTableSource createDynamicTableSource(Context context) { } //创建DynamicTableSink @Override public DynamicTableSink createDynamicTableSink(Context context) { } //connector唯一标识符 @Override public String factoryIdentifier() { } //必选参数设置 @Override public Set> requiredOptions() { } //可选参数设置 @Override public Set> optionalOptions() { } } ``` -------------------------------- ### LogMiner: Start New Redo Log File List Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner原理.md This SQL command demonstrates how to initialize a new list of redo log files for LogMiner analysis. The DBMS_LOGMNR.NEW option ensures that any previously added log files are cleared, and the specified file becomes the first in the new list. ```SQL EXECUTE DBMS_LOGMNR.ADD_LOGFILE( LOGFILENAME => '/oracle/logs/log1.f', OPTIONS => DBMS_LOGMNR.NEW); ``` -------------------------------- ### Check LogMiner Tool Installation (Oracle 11g) Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner配置.md These SQL commands describe the `DBMS_LOGMNR` and `DBMS_LOGMNR_D` packages to verify if the LogMiner tool is installed in Oracle 11g. ```SQL desc DBMS_LOGMNR; desc DBMS_LOGMNR_D; ``` -------------------------------- ### ChunJun GBase-X SQL Connector Configuration Parameters Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/gbase/gbase-source.md This section details the configuration parameters required for setting up the GBase-X SQL connector in ChunJun. It includes essential connection details, scanning properties, and incremental synchronization options. ```APIDOC connector: description: gbase-x required: true type: String default: N/A url: description: jdbc:gbase://localhost:9042/test required: true type: String default: N/A schema: description: GBase database required: false type: String default: N/A table-name: description: Table name required: true type: String default: N/A username: description: username required: true type: String default: N/A password: description: password required: true type: String default: N/A scan.polling-interval: description: Polling interval time. Optional (if not filled, it's an offline task), no default. required: false type: String default: N/A scan.parallelism: description: Parallelism. Interval polling currently does not support multiple parallelisms. required: false type: String default: N/A scan.fetch-size: description: Fetch size from the database each time, unit: records. required: false type: String default: 1024 scan.query-timeout: description: Database connection timeout, unit: seconds. required: false type: String default: 1 scan.partition.column: description: Splitting field for multi-parallelism reading. Must be set under multi-parallelism. Interval polling does not support multi-parallelism. required: false type: String default: N/A scan.partition.strategy: description: Data sharding strategy. required: false type: String default: range scan.increment.column: description: Incremental field name. If this field is configured, the parallelism can only be 1. Optional, no default. required: false type: String default: N/A scan.increment.column-type: description: Incremental field type. Optional, no default. required: false type: String default: N/A scan.start-location: description: Starting position of the incremental field. If not specified, all data will be synchronized first, then incremental synchronization. Optional, no default. required: false type: String default: N/A scan.restore.columnname: description: If checkpoint (CP) is enabled, the field name for task resumption from savepoint (SP)/checkpoint (CP). If resumed, it will override scan.start-location and start from the resumption point. Optional, no default. required: false type: String default: N/A scan.restore.columntype: description: If checkpoint (CP) is enabled, the field type for task resumption from savepoint (SP)/checkpoint (CP). Optional, no default. required: false type: String default: N/A ``` -------------------------------- ### Check LogMiner Tool Installation (Oracle 10g) Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner配置.md These SQL commands describe the `DBMS_LOGMNR` and `DBMS_LOGMNR_D` packages to verify if the LogMiner tool is installed in Oracle 10g. ```SQL desc DBMS_LOGMNR; desc DBMS_LOGMNR_D; ``` -------------------------------- ### Check LogMiner Tool Package Installation Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/logminer/LogMiner配置.md Verifies the installation of LogMiner tool packages (DBMS_LOGMNR and DBMS_LOGMNR_D) by describing them. If no information is returned, the packages need to be initialized, applicable to Oracle 10g. ```SQL desc DBMS_LOGMNR; desc DBMS_LOGMNR_D; ``` -------------------------------- ### Install Missing Maven Artifact Locally (javac-shaded) Source: https://github.com/dtstack/chunjun/blob/master/README.md This command installs a specific version of the 'javac-shaded' JAR into the local Maven repository. It serves as a workaround for the 'Failed to read artifact descriptor' error encountered during compilation, typically due to repository access issues. ```shell mvn install:install-file -DgroupId=com.google.errorprone -DartifactId=javac-shaded -Dversion=9+181-r4173-1 -Dpackaging=jar -Dfile=./jars/javac-shaded-9+181-r4173-1.jar ``` -------------------------------- ### MySQL Metric Plugin Configuration Parameters (APIDOC) Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/General Configuration.md Detailed API documentation for the parameters within the `metricPluginConf.pluginProp` object, used for connecting to and configuring MySQL for metric storage. ```APIDOC jdbcUrl: String (required) - JDBC connection string for MySQL. table: String (required) - MySQL table name. username: String (required) - MySQL username. password: String (required) - Password corresponding to the MySQL username. properties: Map (optional) - Database connection properties. ``` -------------------------------- ### Check Oracle LogMiner Tool Installation and Initialize (Oracle 11g) Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/logminer/LogMiner配置.md Verify if the LogMiner tool package is installed in Oracle 11g by querying `DBMS_LOGMNR` and `DBMS_LOGMNR_D`. If no information is returned, initialize the LogMiner tool package by executing the provided SQL scripts from `$ORACLE_HOME/rdbms/admin`. ```SQL desc DBMS_LOGMNR; desc DBMS_LOGMNR_D; ``` ```SQL @ $ORACLE_HOME /rdbms/admin/dbmslm.sql; @ $ORACLE_HOME /rdbms/admin/dbmslmd.sql; ``` -------------------------------- ### HDFS Sink Parallelism Configuration Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/hdfs/hdfs-sink.md Sets the parallelism for the HDFS sink. This parameter is optional and expects a string value. ```APIDOC sink.parallelism: description: sink的并行度 required: 否 type: String default: 无 ``` -------------------------------- ### Start Flink Standalone Cluster Source: https://github.com/dtstack/chunjun/blob/master/README.md This command initiates the Flink Standalone cluster. After successful startup, the Flink Web UI is typically accessible on port 8081, configurable via `flink-conf.yaml`. ```shell sh $FLINK_HOME/bin/start-cluster.sh ``` -------------------------------- ### HDFS Sink Field Delimiter Configuration Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/hdfs/hdfs-sink.md Sets the field delimiter when 'fileType' is 'text'. This parameter is optional and expects a string value. Default is '\001'. ```APIDOC field-delimiter: description: `fileType`为`text`时字段的分隔符 required: 否 type: string default: `\001` ``` -------------------------------- ### HDFS Sink Parquet Dictionary Encoding Configuration Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/hdfs/hdfs-sink.md Enables dictionary encoding when 'fileType' is 'parquet'. This parameter is optional and expects a boolean value. Default is 'true'. ```APIDOC enable-dictionary: description: `fileType`为`parquet`时,是否启动字典编码 required: 否 type: boolean default: `true` ``` -------------------------------- ### ChunJun Job Configuration Parameters Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun通用配置详解.md This API documentation describes the main components of a ChunJun task script: 'content' (containing reader and writer configurations) and 'setting' (for speed, error limits, metrics, restore, log, and dirty data), specifying their purpose and whether they are required. ```APIDOC Parameter: content Description: Used to configure the task's input source and output source Required: Yes Sub-parameters: reader: Description: reader plugin detailed configuration Required: Yes writer: Description: writer plugin detailed configuration Required: Yes Parameter: setting Description: Configures the overall environment settings for the task Required: No Sub-parameters: speed: Description: rate limit Required: No errorLimit: Description: error control Required: No metricPluginConf: Description: metric plugin configuration Required: No restore: Description: task type and breakpoint resume configuration Required: No log: Description: log recording configuration Required: No dirty: Description: dirty data saving Required: No ``` -------------------------------- ### Oracle Source Sync Plugin Parameter Reference Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/oracle/oracle-source.md Detailed API documentation for all configurable parameters of the Oracle Source Sync plugin, including descriptions, required status, data types, default values, and specific notes for each parameter. ```APIDOC Sync Parameters: - connection: description: Database connection parameters, including jdbcUrl, schema, table. required: true type: List default: None example: "connection": [{ "jdbcUrl": ["jdbc:oracle:thin:@0.0.0.1:1521:orcl"], "table": ["table"], "schema":"public" }] - jdbcUrl: description: JDBC connection string for relational databases. Refer to Oracle official documentation for jdbcUrl. required: true type: string default: Username - schema: description: Database schema name. required: false type: string default: None - table: description: Destination table name. Currently supports configuring a single table; multi-table support will be added later. required: true type: List default: None - username: description: Username for the data source. required: true type: String default: None - password: description: Password for the specified data source username. required: true type: String default: None - fetchSize: description: Number of data rows to read from the database at once. Oracle's default fetchSize is 10. Setting fetchSize too small leads to frequent data reads, affecting query speed and database pressure. Setting it too large might cause OutOfMemory (OOM) errors with very large data volumes. This parameter controls the number of data rows read per fetch. Note: This parameter's value should not be set too large, otherwise it may cause read timeouts and task failures. required: false type: int default: 1024 - where: description: Filter condition. The reader plugin constructs SQL based on the specified column, table, and where conditions for data extraction. In practical business scenarios, it's common to synchronize data for the current day, e.g., by setting the where condition to 'gmt_create > time'. Note: Do not specify 'limit 10' as a where condition, as 'limit' is not a valid SQL WHERE clause. required: false type: String default: None - splitPk: description: When 'channel' in the speed configuration is greater than 1, this parameter must be specified. The Reader plugin constructs SQL based on the concurrency and this specified field, allowing each concurrent process to read different data and improve read speed. Note: It is recommended to use the table's primary key for splitPk, as primary keys are usually evenly distributed, preventing data hotspots. Currently, splitPk only supports integer data splitting, not float, string, date, or other types. If a user specifies other unsupported types, ChunJun will report an error. If 'channel' is greater than 1 but this parameter is not configured, the task will fail. required: false type: String default: None - queryTimeOut: description: Query timeout time, in seconds. Note: When data volume is very large, or querying from views, or using custom SQL queries, this parameter can be used to specify the timeout. required: false type: int default: 1000 - customSql: description: Custom query statement. If specifying fields alone cannot meet the requirements, this parameter can be used to specify any complex query statement. Note: Must be a query statement, otherwise the task will fail; the fields returned by the query statement need to correspond to the fields in the column list; when this parameter is specified, the 'table' specified in 'connection' is invalid; when this parameter is specified, 'column' must specify concrete field information and cannot use '*' wildcard. required: false type: String default: None - column: description: Fields to be read. required: true type: List default: None formats: 1. Read all fields: "column":["*"] 2. Specify field names only: "column":["id","name"] 3. Specify detailed information: "column": [{ "name": "col", "type": "datetime", "format": "yyyy-MM-dd hh:mm:ss", "value": "value" }] attributes: - name: Field name. - type: Field type, can be different from the database field type; the program will perform type conversion. - format: If the field is a time string, you can specify the time format to convert the field type to a date format. - value: If the specified field does not exist in the database, the 'value' will be returned as a constant column. If the specified field exists and its value is null, this 'value' will be returned as the default value. - polling: description: Whether to enable interval polling. If enabled, data will be periodically fetched from the database based on 'pollingInterval'. Enabling interval polling also requires configuring 'pollingInterval' and 'increColumn', and optionally 'startLocation'. If 'startLocation' is not configured, the task will query the maximum value of the increment field from the database as the polling's starting position. required: false type: Boolean default: false - pollingInterval: description: Polling interval time, the interval for fetching data from the database, defaults to 5000 milliseconds. required: false type: long default: 5000 - increColumn: description: Increment field, can be the corresponding increment field name or a pure number, indicating the 0-indexed order position of the increment field in 'column'. required: false type: String or int default: None - startLocation: description: Increment query start position. required: false type: String default: None - useMaxFunc: description: Used to mark whether to save one or more data records at the endLocation. true: do not save, false (default): save. In some cases, the last few records might be duplicated, this parameter can be set to true. required: false type: Boolean default: false - requestAccumulatorInterval: description: Interval for sending query accumulator requests. required: false type: int default: 2 ``` -------------------------------- ### Shutdown and Mount Oracle Database Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_zh/ChunJun连接器/logminer/LogMiner配置.md Shuts down the Oracle database immediately and then starts it in mount mode. This is a prerequisite for enabling or disabling archive log mode and applies to both Oracle 10g and 11g. ```SQL shutdown immediate; startup mount; ``` -------------------------------- ### Kingbase-X SQL Connector Configuration Parameters Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/kingbase/kingbase-source.md API documentation for configuring the Kingbase-X SQL connector, detailing connection, scan, and incremental synchronization parameters. Each parameter includes its description, whether it's required, its data type, and default value. ```APIDOC SQL Connector Parameters: connector: Description: kingbase-x Required: Yes Type: String Default: None url: Description: jdbc:kingbase8://localhost:54321/MOWEN Required: Yes Type: String Default: None schema: Description: 数据库schema名 (Database schema name) Required: No Type: string Default: None table-name: Description: 表名 (Table name) Required: Yes Type: String Default: None username: Description: username Required: Yes Type: String Default: None password: Description: password Required: Yes Type: String Default: None scan.polling-interval: Description: 间隔轮训时间。非必填(不填为离线任务),无默认 (Polling interval. Optional (if not filled, it's an offline task), no default) Required: No Type: String Default: None scan.parallelism: Description: 并行度 (Parallelism) Required: No Type: String Default: None scan.fetch-size: Description: 每次从数据库中fetch大小,单位:条。(Fetch size from the database each time, unit: rows.) Required: No Type: String Default: 1024 scan.query-timeout: Description: 数据库连接超时时间,单位:秒。(Database connection timeout, unit: seconds.) Required: No Type: String Default: 1 scan.partition.column: Description: 多并行度读取的切分字段,多并行度下必需要设置 (Splitting field for multi-parallelism reading, must be set under multi-parallelism) Required: No Type: String Default: None scan.partition.strategy: Description: 数据分片策略 (Data sharding strategy) Required: No Type: String Default: range scan.increment.column: Description: 增量字段名称 (Incremental field name) Required: No Type: String Default: None scan.increment.column-type: Description: 增量字段类型 (Incremental field type) Required: No Type: String Default: None scan.start-location: Description: 增量字段开始位置,如果不指定则先同步所有,然后在增量 (Start position of the incremental field. If not specified, all data is synchronized first, then incremental.) Required: No Type: String Default: None scan.restore.columnname: Description: 开启了cp,任务从sp/cp续跑字段名称。如果续跑,则会覆盖scan.start-location开始位置,从续跑点开始 (If checkpointing is enabled, the field name for task resumption from SP/CP. If resumed, it will override scan.start-location and start from the resumption point.) Required: No Type: String Default: None scan.restore.columntype: Description: 开启了cp,任务从sp/cp续跑字段类型 (If checkpointing is enabled, the field type for task resumption from SP/CP.) Required: No Type: String Default: None ``` -------------------------------- ### Shutdown and Mount Oracle Database (Oracle 11g) Source: https://github.com/dtstack/chunjun/blob/master/docs/docs_en/ChunJun Connector/logminer/LogMiner配置.md These SQL commands shut down the Oracle 11g database immediately and then start it in mount mode, which is a prerequisite for enabling archive logging. ```SQL shutdown immediate; startup mount; ```