### Developer Setup for Kinesis SQL Connector (Maven) Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Instructions for setting up the developer environment for the kinesis-sql project using Maven. This involves cloning the repository, checking out the appropriate branch based on the Spark version, and running the Maven install command. This process builds the connector JAR file. ```bash git clone git@github.com:qubole/kinesis-sql.git git checkout master cd kinesis-sql mvn install -DskipTests ``` -------------------------------- ### Configure Kinesis Source Performance Options in Scala Source: https://context7.com/qubole/kinesis-sql/llms.txt This example demonstrates how to fine-tune the Kinesis source's performance using various configuration options. It covers executor settings for fetching data and client settings for interacting with the Kinesis service, along with failure handling. ```scala val spark = SparkSession.builder() .appName("KinesisPerformance") .getOrCreate() val kinesis = spark .readStream .format("kinesis") .option("streamName", "high-throughput-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") .option("startingPosition", "LATEST") // Executor options .option("kinesis.executor.maxFetchTimeInMs", "1000") .option("kinesis.executor.maxFetchRecordsPerShard", "100000") .option("kinesis.executor.maxRecordPerRead", "10000") .option("kinesis.executor.addIdleTimeBetweenReads", "false") .option("kinesis.executor.idleTimeBetweenReadsInMs", "1000") // Client options .option("kinesis.client.describeShardInterval", "1s") .option("kinesis.client.numRetries", "3") .option("kinesis.client.retryIntervalMs", "1000") .option("kinesis.client.maxRetryIntervalMs", "10000") .option("kinesis.client.avoidEmptyBatches", "false") // Failure handling .option("failOnDataLoss", "false") .load() kinesis .selectExpr("CAST(data AS STRING)") .writeStream .format("parquet") .option("path", "/data/kinesis-output") .option("checkpointLocation", "/checkpoints/kinesis") .start() .awaitTermination() ``` -------------------------------- ### Configure Kinesis Sink Performance Options in Scala Source: https://context7.com/qubole/kinesis-sql/llms.txt Configures the Kinesis sink with producer options for optimized write performance in Spark Structured Streaming. Requires SparkSession and DataFrame setup. Outputs data to a Kinesis stream. ```scala val spark = SparkSession.builder() .appName("KinesisSinkConfig") .getOrCreate() // Prepare DataFrame with required columns val dataStream = spark .readStream .format("rate") // Generate test data .option("rowsPerSecond", "1000") .load() .selectExpr( "CAST(value AS STRING) as partitionKey", "CAST(timestamp AS STRING) as data" ) // Write with optimized sink configuration dataStream .writeStream .format("kinesis") .outputMode("append") .option("streamName", "output-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") // Sink producer options .option("kinesis.executor.recordMaxBufferedTime", "1000") .option("kinesis.executor.maxConnections", "1") .option("kinesis.executor.aggregationEnabled", "true") .option("kniesis.executor.flushwaittimemillis", "100") .start() .awaitTermination() ``` -------------------------------- ### Authenticate Kinesis with Session Token Credentials in Scala Source: https://context7.com/qubole/kinesis-sql/llms.txt This code example shows how to authenticate with Kinesis using temporary security credentials provided via a session token. This method is suitable for short-term authentication needs and requires AWS Access Key ID, Secret Key, and Session Token. ```scala val spark = SparkSession.builder() .appName("KinesisSessionToken") .getOrCreate() // Using session-based credentials (from AWS STS GetSessionToken) val kinesis = spark .readStream .format("kinesis") .option("streamName", "secure-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "ASIAXXX...") .option("awsSecretKey", "tempSecretKey...") .option("sessionToken", "FwoGZXIvYXdzEBY...") .option("startingPosition", "LATEST") .load() kinesis .writeStream .format("console") .start() .awaitTermination() ``` -------------------------------- ### Resume Kinesis Stream Reading with JSON Checkpoint in Scala Source: https://context7.com/qubole/kinesis-sql/llms.txt This snippet illustrates how to resume reading from a Kinesis stream at specific shard positions using a JSON-formatted checkpoint. The JSON defines the starting position for each shard, allowing for precise control over data consumption. ```scala val spark = SparkSession.builder() .appName("KinesisCheckpoint") .getOrCreate() // JSON checkpoint format for resuming from specific positions val checkpointJson = """ { "shardId-000000000001": { "iteratorType": "AFTER_SEQUENCE_NUMBER", "iteratorPosition": "49605240428222307037115827613554798409561082419642105874" }, "metadata": { "streamName": "my-stream", "batchId": "7" }, "shardId-000000000000": { "iteratorType": "AFTER_SEQUENCE_NUMBER", "iteratorPosition": "49605240428200006291917297020490128157480794051565322242" } } """ val kinesis = spark .readStream .format("kinesis") .option("streamName", "my-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") .option("startingPosition", checkpointJson) // Pass JSON directly .load() kinesis .selectExpr("CAST(data AS STRING)", "sequenceNumber") .writeStream .format("console") .start() .awaitTermination() ``` -------------------------------- ### Read from Kinesis Stream using Spark Structured Streaming Source: https://context7.com/qubole/kinesis-sql/llms.txt Subscribes to an Amazon Kinesis stream and reads data using Spark's Structured Streaming API. The source provides binary data, stream name, partition key, sequence number, and arrival timestamp. Requires the connector JAR to be included when starting the Spark shell. ```scala // Start Spark Shell with the connector JAR // $SPARK_HOME/bin/spark-shell --packages com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0 import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("KinesisReader") .getOrCreate() // Subscribe to a Kinesis stream val kinesis = spark .readStream .format("kinesis") .option("streamName", "my-kinesis-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") .option("startingPosition", "TRIM_HORIZON") // Options: LATEST, TRIM_HORIZON, earliest .option("failOnDataLoss", "true") .load() // Schema returned by the source: // root // |-- data: binary (nullable = true) // |-- streamName: string (nullable = true) // |-- partitionKey: string (nullable = true) // |-- sequenceNumber: string (nullable = true) // |-- approximateArrivalTimestamp: timestamp (nullable = true) // Process the stream - convert binary data to string and count words val wordCounts = kinesis .selectExpr("CAST(data AS STRING) as message") .groupBy("message") .count() // Write results to console val query = wordCounts .writeStream .format("console") .outputMode("complete") .start() query.awaitTermination() ``` -------------------------------- ### Word Count on Kinesis Stream Data (Scala) Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Scala code for performing a word count operation on data read from a Kinesis stream. It casts the binary data to a string, groups by the data, counts occurrences, and writes the results to the console. The stream processing is started and awaited to terminate. ```scala // Cast data into string and group by data column scala> :paste kinesis .selectExpr("CAST(data AS STRING)").as[(String)] .groupBy("data").count() .writeStream .format("console") .outputMode("complete") .start() .awaitTermination() ``` -------------------------------- ### Subscribe to Kinesis Stream in Spark Structured Streaming (Scala) Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Scala code to read data from a Kinesis stream using Spark Structured Streaming. It configures the Kinesis source with stream name, endpoint URL, AWS credentials, and starting position. The schema of the incoming data is also defined. ```scala // Subscribe the "test" stream scala> :paste val kinesis = spark .readStream .format("kinesis") .option("streamName", "spark-streaming-example") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", [ACCESS_KEY]) .option("awsSecretKey", [SECRET_KEY]) .option("startingposition", "TRIM_HORIZON") .load ``` -------------------------------- ### Build Kinesis Connector JAR from Source using Maven Source: https://context7.com/qubole/kinesis-sql/llms.txt Instructions for cloning the connector repository and building the JAR file using Maven. Supports different Spark versions by checking out specific branches. Includes commands for running tests and using the connector with spark-submit. ```bash # Clone the repository git clone git@github.com:qubole/kinesis-sql.git cd kinesis-sql # For Spark 3.0.x (master branch) git checkout master mvn clean install -DskipTests # The output JAR will be at: # target/spark-sql-kinesis_2.12-1.2.1_spark-3.0-SNAPSHOT.jar # For Spark 2.4.x (use appropriate branch) git checkout spark-2.4 mvn clean install -DskipTests # Run tests mvn test # Use the connector with spark-submit spark-submit \ --packages com.qubole.spark:spark-sql-kinesis_2.12:1.2.0-spark_3.0 \ --class com.example.KinesisApp \ your-app.jar ``` -------------------------------- ### Spark Shell with Kinesis Connector JAR Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Command to launch the Spark shell, including the Kinesis connector JAR file. This allows direct interaction with the Kinesis connector within the Spark environment. ```bash $SPARK_HOME/bin/spark-shell --jars target/spark-sql-kinesis_2.11-2.2.0.jar ``` -------------------------------- ### AWS CLI Commands for Kinesis Stream Management Source: https://context7.com/qubole/kinesis-sql/llms.txt Provides AWS CLI commands to create, describe, add records to, list shards of, and delete Kinesis streams. These commands are essential for setting up and managing Kinesis streams used with the connector. ```bash # Create a new Kinesis stream with 2 shards aws kinesis create-stream --stream-name my-stream --shard-count 2 # Wait for stream to become active aws kinesis wait stream-exists --stream-name my-stream # Describe stream to verify configuration aws kinesis describe-stream --stream-name my-stream # Add test records to the stream aws kinesis put-record \ --stream-name my-stream \ --partition-key "partition-1" \ --data "Hello Kinesis" aws kinesis put-record \ --stream-name my-stream \ --partition-key "partition-2" \ --data "Structured Streaming" # List shards in the stream aws kinesis list-shards --stream-name my-stream # Delete stream when done aws kinesis delete-stream --stream-name my-stream ``` -------------------------------- ### Kinesis Source Configuration Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Configure Kinesis as a data source for streaming jobs. This section outlines the various options available for connecting to and reading data from Kinesis streams. ```APIDOC ## Kinesis Source Configuration ### Description Configure Kinesis as a data source for streaming jobs. This section outlines the various options available for connecting to and reading data from Kinesis streams. ### Method N/A (Configuration Options) ### Endpoint N/A ### Parameters #### Path Parameters N/A #### Query Parameters N/A #### Request Body N/A ### Request Example N/A ### Response #### Success Response (200) N/A #### Response Example N/A ### Configuration Options | Option Name | Default Value | Description | |---------------------------------|---------------------------------------------|------------------------------------------------------------------------------------------------------------| | streamName | - | Name of the stream in Kinesis to read from | | endpointUrl | https://kinesis.us-east-1.amazonaws.com | End-point URL for Kinesis Stream | | awsAccessKeyId | - | AWS Credentials for Kinesis describe, read record operations | | awsSecretKey | - | AWS Credentials for Kinesis describe, read record operations | | awsSTSRoleARN | - | AWS STS Role ARN for Kinesis describe, read record operations | | awsSTSSessionName | - | AWS STS Session name for Kinesis describe, read record operations | | awsUseInstanceProfile | true | Use Instance Profile Credentials if none of credentials provided | | startingPosition | LATEST | Starting Position in Kinesis to fetch data from. Possible values are "latest", "trim_horizon", "earliest" (alias for trim_horizon), or JSON serialized map shardId->KinesisPosition | | failondataloss | true | Fail the streaming job if any active shard is missing or expired | | kinesis.executor.maxFetchTimeInMs | 1000 | Maximum time spent in executor to fetch record from Kinesis per Shard | | kinesis.executor.maxFetchRecordsPerShard | 100000 | Maximum Number of records to fetch per shard | | kinesis.executor.maxRecordPerRead | 10000 | Maximum Number of records to fetch per getRecords API call | | kinesis.executor.addIdleTimeBetweenReads | false | Add delay between two consecutive getRecords API call | | kinesis.executor.idleTimeBetweenReadsInMs | 1000 | Minimum delay between two consecutive getRecords | | kinesis.client.describeShardInterval | 1s (1 second) | Minimum Interval between two ListShards API calls to consider resharding | | kinesis.client.numRetries | 3 | Maximum Number of retries for Kinesis API requests | | kinesis.client.retryIntervalMs | 1000 | Cool-off period before retrying Kinesis API | | kinesis.client.maxRetryIntervalMs | 10000 | Max Cool-off period between 2 retries | | kinesis.client.avoidEmptyBatches| false | Avoid creating an empty microbatch job by checking upfront if there are any unread data in the stream before the batch is started | ``` -------------------------------- ### Authenticate Kinesis with EC2 Instance Profile Credentials in Scala Source: https://context7.com/qubole/kinesis-sql/llms.txt This snippet demonstrates how to configure the Kinesis source to use EC2 instance profile credentials for authentication. This is the default behavior when running on AWS infrastructure and requires no explicit credential configuration. ```scala val spark = SparkSession.builder() .appName("KinesisInstanceProfile") .getOrCreate() // Use instance profile credentials (default behavior) val kinesis = spark .readStream .format("kinesis") .option("streamName", "my-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsUseInstanceProfile", "true") // This is the default .option("startingPosition", "TRIM_HORIZON") .load() kinesis .selectExpr("CAST(data AS STRING) as message", "partitionKey", "sequenceNumber") .writeStream .format("console") .start() .awaitTermination() ``` -------------------------------- ### Create Kinesis Stream using AWS CLI Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Command to create a new Kinesis stream using the AWS Command Line Interface. It specifies the stream name and the number of shards for the stream. ```bash $ aws kinesis create-stream --stream-name test --shard-count 2 ``` -------------------------------- ### Add Records to Kinesis Stream using AWS CLI Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Commands to add individual records to a Kinesis stream using the AWS CLI. Each command specifies the stream name, a partition key, and the data payload for the record. ```bash $ aws kinesis put-record --stream-name test --partition-key 1 --data 'Kinesis' $ aws kinesis put-record --stream-name test --partition-key 1 --data 'Connector' $ aws kinesis put-record --stream-name test --partition-key 1 --data 'for' $ aws kinesis put-record --stream-name test --partition-key 1 --data 'Apache' $ aws kinesis put-record --stream-name test --partition-key 1 --data 'Spark' ``` -------------------------------- ### Kinesis Sink Configuration Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Configure Kinesis as a data sink for streaming jobs. This section outlines the various options available for writing data to Kinesis streams. ```APIDOC ## Kinesis Sink Configuration ### Description Configure Kinesis as a data sink for streaming jobs. This section outlines the various options available for writing data to Kinesis streams. ### Method N/A (Configuration Options) ### Endpoint N/A ### Parameters #### Path Parameters N/A #### Query Parameters N/A #### Request Body N/A ### Request Example N/A ### Response #### Success Response (200) N/A #### Response Example N/A ### Configuration Options | Option Name | Default Value | Description | |---------------------------------|-------------------------------|----------------------------------------------------------------------------| | streamName | - | Name of the stream in Kinesis to write to | | endpointUrl | https://kinesis.us-east-1.amazonaws.com | The aws endpoint of the kinesis Stream | | awsAccessKeyId | - | AWS Credentials for Kinesis describe, read record operations | | awsSecretKey | - | AWS Credentials for Kinesis describe, read record | | awsSTSRoleARN | - | AWS STS Role ARN for Kinesis describe, read record operations | | awsSTSSessionName | - | AWS STS Session name for Kinesis describe, read record operations | | awsUseInstanceProfile | true | Use Instance Profile Credentials if none of credentials provided | | kinesis.executor.recordMaxBufferedTime | 1000 (millis) | Specify the maximum buffered time of a record | | kinesis.executor.maxConnections | 1 | Specify the maximum connections to Kinesis | | kinesis.executor.aggregationEnabled | true | Specify if records should be aggregated before sending them to Kinesis | | kniesis.executor.flushwaittimemillis | 100 | Wait time while flushing records to Kinesis on Task End | ``` -------------------------------- ### AWS Authentication with STS Role Assumption for Kinesis Source: https://context7.com/qubole/kinesis-sql/llms.txt Demonstrates how to use AWS STS (Security Token Service) to assume an IAM role for cross-account access or temporary credentials when interacting with Kinesis streams. This method is useful for secure, temporary access. ```scala val spark = SparkSession.builder() .appName("KinesisSTS") .getOrCreate() // Using STS Role ARN for authentication val kinesis = spark .readStream .format("kinesis") .option("streamName", "cross-account-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsSTSRoleARN", "arn:aws:iam::123456789012:role/KinesisReadRole") .option("awsSTSSessionName", "spark-kinesis-session") .option("startingPosition", "LATEST") .load() // Process stream kinesis .selectExpr("CAST(data AS STRING)") .writeStream .format("console") .start() .awaitTermination() ``` -------------------------------- ### Write to Kinesis Stream using Spark Structured Streaming Source: https://context7.com/qubole/kinesis-sql/llms.txt Writes streaming data to an Amazon Kinesis stream using the Kinesis sink. The input DataFrame must contain 'data' and 'partitionKey' columns. Configuration options for the Kinesis executor are available. ```scala import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("KinesisWriter") .getOrCreate() // Read from source stream val sourceStream = spark .readStream .format("kinesis") .option("streamName", "source-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") .option("startingPosition", "LATEST") .load() // Transform and prepare data for sink (must have partitionKey and data columns) val transformed = sourceStream .selectExpr( "CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING) as data" ) // Write to destination Kinesis stream val sinkQuery = transformed .writeStream .format("kinesis") .outputMode("update") .option("streamName", "destination-stream") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", "AKIAIOSFODNN7EXAMPLE") .option("awsSecretKey", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") .option("kinesis.executor.recordMaxBufferedTime", "1000") .option("kinesis.executor.maxConnections", "1") .option("kinesis.executor.aggregationEnabled", "true") .start() sinkQuery.awaitTermination() ``` -------------------------------- ### Print Kinesis Stream Schema in Spark (Scala) Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Scala command to print the schema of the DataFrame read from the Kinesis stream. This helps in understanding the structure of the data being processed. ```scala scala> kinesis.printSchema root |-- data: binary (nullable = true) |-- streamName: string (nullable = true) |-- partitionKey: string (nullable = true) |-- sequenceNumber: string (nullable = true) |-- approximateArrivalTimestamp: timestamp (nullable = true) ``` -------------------------------- ### Using Kinesis as a Sink in Spark Structured Streaming (Scala) Source: https://github.com/qubole/kinesis-sql/blob/master/README.md Scala code demonstrating how to use Kinesis as a sink for Spark Structured Streaming. It processes data, groups it, and writes the aggregated results to a specified Kinesis stream. This includes configuring the Kinesis sink with stream name, endpoint, and AWS credentials. ```scala // Cast data into string and group by data column scala> :paste kinesis .selectExpr("CAST(rand() AS STRING) as partitionKey","CAST(data AS STRING)").as[(String,String)] .groupBy("data").count() .writeStream .format("kinesis") .outputMode("update") .option("streamName", "spark-sink-example") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("awsAccessKeyId", [ACCESS_KEY]) .option("awsSecretKey", [SECRET_KEY]) .start() .awaitTermination() ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.