### Full Kafka Streams Application Example Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Demonstrates a complete Kafka Streams application setup using KafkaStreamsInitializer. Includes defining the topology, setting up a dead-letter queue, and implementing the onStart hook. A shutdown hook is added for graceful termination. ```java public class MyKafkaStreamApp { public static void main(String[] args) { try { KafkaStreamsInitializer initializer = new KafkaStreamsInitializer( new MyKafkaStreams() ); // Add a shutdown hook to gracefully close on SIGTERM Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("Shutting down..."); initializer.getKafkaStreams().close(); })); initializer.start(); logger.info("Application started successfully"); logger.info("Topology:\n{}", initializer.getTopology().describe()); } catch (Exception e) { logger.error("Failed to start application", e); System.exit(1); } } } public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { streamsBuilder .stream("input_topic", Consumed.with(Serdes.String(), Serdes.String())) .mapValues(String::toUpperCase) .to("output_topic", Produced.with(Serdes.String(), Serdes.String())); } @Override public String dlqTopic() { return "dlq_topic"; } @Override public void onStart(KafkaStreams kafkaStreams) { logger.info("Kafka Streams application started: {}", kafkaStreams.topology().describe()); } } ``` -------------------------------- ### Standalone Java Application Setup Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/INDEX.md Illustrates how to initialize and start a KStreamplify application in a standalone Java environment. Requires a custom KafkaStreamsStarter implementation. ```java public class Main { public static void main(String[] args) { KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(new MyTopology()); initializer.start(); } } public class MyTopology extends KafkaStreamsStarter { // ... topology implementation } ``` -------------------------------- ### start() Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Builds the topology, creates and starts the Kafka Streams instance, and starts the HTTP server. This is the primary method to call to start the application. It blocks until the HTTP server is started but does not block on Kafka Streams processing. ```APIDOC ## start() ### Description Builds the topology, creates and starts the Kafka Streams instance, and starts the HTTP server. This is the primary method to call to start the application. It blocks until the HTTP server is started but does not block on Kafka Streams processing. ### Signature ```java public void start() ``` ### Execution Steps 1. Creates a `StreamsBuilder` 2. Calls `KafkaStreamsStarter.topology()` to build the topology 3. Builds the topology with configuration 4. Creates a `KafkaStreams` instance 5. Calls `KafkaStreamsStarter.onStart()` hook 6. Registers a shutdown hook to close Kafka Streams on JVM termination 7. Sets the uncaught exception handler 8. Sets the state listener 9. Starts Kafka Streams 10. Starts the HTTP server for web services ### Throws None (exceptions in initialization are logged and may cause shutdown) ### Example ```java KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(new MyKafkaStreams()); initializer.start(); // Application is now running System.out.println("Kafka Streams application started"); ``` ``` -------------------------------- ### Complete Kstreamplify Configuration Example Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md A comprehensive example of an `application.yml` file demonstrating various configuration options for Kstreamplify, including Kafka properties, processing behavior, error handling, topic remapping, and Spring Boot integration. ```yaml kafka: properties: # Standard Kafka Streams application.id: 'user-processor' bootstrap.servers: 'kafka-broker-1:9092,kafka-broker-2:9092' schema.registry.url: 'http://schema-registry:8081' application.server.var.name: 'POD_IP' # Processing behavior num.stream.threads: 4 commit.interval.ms: 10000 cache.max.bytes.buffering: 52428800 # Error handling processing.exception.handler: 'com.michelin.kstreamplify.error.DlqProcessingExceptionHandler' deserialization.exception.handler: 'com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler' production.exception.handler: 'com.michelin.kstreamplify.error.DlqProductionExceptionHandler' # DLQ options dlq: deserialization-handler: forward-restclient-exception: true continue-on-unhandled-errors: false production-handler: continue-on-serialization-exception: false # Multi-team setup prefix: self: 'prod.platform.' payments: 'prod.payments.' fraud: 'prod.fraud.' # Topic remapping topic: remap: legacy-user-events: user-events-v2 # Web services server: port: 8080 topology: path: 'topology' kubernetes: liveness: path: 'liveness' readiness: path: 'ready' # Spring Boot (if using Spring Boot starter) spring: application: name: 'user-processor' kafka: bootstrap-servers: '${kafka.properties.bootstrap.servers}' management: endpoints: web: exposure: include: 'health,metrics' metrics: export: prometheus: enabled: true ``` -------------------------------- ### Complete KafkaStreamsStarter Usage Example Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md A full example demonstrating how to set up and test a Kafka Streams application using KafkaStreamsStarterTest, including topic definitions and topology implementation. ```java import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; public class OrderProcessingTest extends KafkaStreamsStarterTest { private TestInputTopic orderInput; private TestOutputTopic orderOutput; @Override protected KafkaStreamsStarter getKafkaStreamsStarter() { return new OrderProcessing(); } @Override protected Map getSpecificProperties() { return Map.of( STATE_DIR_CONFIG, "/tmp/test-orders" ); } @BeforeEach void setUp() { orderInput = createInputTestTopic(Topics.ordersInput()); orderOutput = createOutputTestTopic(Topics.ordersOutput()); } @Test void shouldValidateOrderAmount() { Order validOrder = new Order("O1", 100.0); orderInput.pipeInput("O1", validOrder); List> results = orderOutput.readKeyValuesToList(); assertEquals(1, results.size()); } @Test void shouldRouteBadOrdersToDlq() { Order invalidOrder = new Order("O2", -50.0); // Invalid amount orderInput.pipeInput("O2", invalidOrder); // Check DLQ List> errors = dlqTopic.readKeyValuesToList(); assertEquals(1, errors.size()); assertTrue(errors.get(0).value.getContextMessage().contains("negative")); // Valid orders still process Order validOrder = new Order("O3", 100.0); orderInput.pipeInput("O3", validOrder); List> results = orderOutput.readKeyValuesToList(); assertEquals(1, results.size()); } @Test void shouldDeduplicateWithinWindow() { Order order = new Order("O1", 100.0); // Same order within window orderInput.pipeInput("O1", order); orderInput.pipeInput("O1", order); // Should only see one List> results = orderOutput.readKeyValuesToList(); assertEquals(1, results.size()); } @Test void shouldAdvanceTimeForWindows() { Order order = new Order("O1", 100.0); orderInput.pipeInput("O1", order, 0); testDriver.advanceWallClockTime(Duration.ofMinutes(5)); // Now duplicate outside window orderInput.pipeInput("O1", order, 300000); // Should see both List> results = orderOutput.readKeyValuesToList(); assertEquals(2, results.size()); } } // Topic definitions public class Topics { public static TopicWithSerde ordersInput() { return new TopicWithSerde<>( "orders-input", Serdes.String(), SerdesUtils.getValueSerdes() ); } public static TopicWithSerde ordersOutput() { return new TopicWithSerde<>( "orders-output", Serdes.String(), SerdesUtils.getValueSerdes() ); } } // Topology implementation public class OrderProcessing extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { Topics.ordersInput().stream(streamsBuilder) .mapValues(this::validateOrder) .to(Topics.ordersOutput()); } private Order validateOrder(Order order) { if (order.getAmount() <= 0) { throw new IllegalArgumentException("Order amount must be positive"); } return order; } @Override public String dlqTopic() { return "orders-dlq"; } } ``` -------------------------------- ### Environment Variable Usage Example Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md Example of setting environment variables before running the application. This allows dynamic configuration without modifying the application code. ```bash export KAFKA_BOOTSTRAP_SERVERS=kafka.prod:9092 export SCHEMA_REGISTRY_URL=http://registry.prod:8081 java -jar application.jar ``` -------------------------------- ### Start Kafka Streams Application Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Call this method to build the topology, create and start the Kafka Streams instance, and start the HTTP server. It blocks until the HTTP server is started but not on Kafka Streams processing. ```java KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(new MyKafkaStreams()); initializer.start(); // Application is now running System.out.println("Kafka Streams application started"); ``` -------------------------------- ### Example Topic Remapping Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md An example of remapping the 'events' topic to 'events-v2'. This ensures that any topology referencing 'events' will actually use 'events-v2'. ```yaml kafka: properties: topic: remap: events: events-v2 # Routes to events-v2 instead of events ``` -------------------------------- ### Unit Testing Kafka Streams Applications Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Provides an example of how to unit test a Kafka Streams application using KafkaStreamsStarterTest. This setup allows for testing topologies with input and output topics using SerdesUtils. ```java public class UserProcessorTest extends KafkaStreamsStarterTest { private TestInputTopic inputTopic; private TestOutputTopic outputTopic; @Override protected KafkaStreamsStarter getKafkaStreamsStarter() { return new UserProcessor(); } @BeforeEach void setUp() { inputTopic = testDriver.createInputTopic( "user-input", new StringSerializer(), SerdesUtils.getValueSerdes().serializer() ); outputTopic = testDriver.createOutputTopic( "user-output", new StringDeserializer(), SerdesUtils.getValueSerdes().deserializer() ); } @Test void shouldProcessUsers() { User user = new User("john", "john@example.com"); inputTopic.pipeInput("1", user); List> results = outputTopic.readKeyValuesToList(); assertEquals(1, results.size()); } } ``` -------------------------------- ### Spring Boot Application Setup Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/INDEX.md Demonstrates how to set up a Spring Boot application with KStreamplify. Includes the main application class and a custom KafkaStreamsStarter implementation. ```java @SpringBootApplication public class MyApp { public static void main(String[] args) { SpringApplication.run(MyApp.class, args); } } @Component public class MyTopology extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { // Define topology } @Override public String dlqTopic() { return "dlq"; } } ``` -------------------------------- ### onStart(KafkaStreams kafkaStreams) Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Optional lifecycle hook executed after the Kafka Streams instance starts but before the topology begins processing records. ```APIDOC ## onStart(KafkaStreams kafkaStreams) ### Description Optional lifecycle hook executed after the Kafka Streams instance starts but before the topology begins processing records. ### Signature ```java public void onStart(KafkaStreams kafkaStreams) ``` ### Parameters #### Path Parameters - **kafkaStreams** (`KafkaStreams`) - Required - The running Kafka Streams instance ### Return Type `void` ### Description This method is called once after `kafkaStreams.start()` completes. Use it to initialize external resources, log startup information, or verify topology readiness. ### Example ```java @Override public void onStart(KafkaStreams kafkaStreams) { log.info("Application started with topology: {}", kafkaStreams.topology().describe()); } ``` ``` -------------------------------- ### Implement Kafka Streams Startup Hook Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Use the onStart method as a lifecycle hook to perform actions after the Kafka Streams instance has started. This is useful for logging or initializing resources. ```java import org.apache.kafka.streams.KafkaStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // ... inside your class extending KafkaStreamsStarter ... private static final Logger log = LoggerFactory.getLogger(MyKafkaStreams.class); @Override public void onStart(KafkaStreams kafkaStreams) { log.info("Application started with topology: {}", kafkaStreams.topology().describe()); } ``` -------------------------------- ### Implement On Start Hook in Kafka Streams Source: https://github.com/michelin/kstreamplify/blob/main/README.md Execute custom code before the Kafka Streams instance starts by overriding the onStart method in a KafkaStreamsStarter subclass. ```java import org.springframework.stereotype.Component; import org.apache.kafka.streams.KafkaStreams; import io.michelin.kstreamplify.KafkaStreamsStarter; @Component public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void onStart(KafkaStreams kafkaStreams) { // Execute code before starting the Kafka Streams instance } } ``` -------------------------------- ### Get Host and Port Information Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Retrieve the host and port information for the current instance. This is crucial for routing requests in a multi-instance deployment, especially for interactive queries. ```java HostInfo hostInfo = initializer.getHostInfo(); System.out.println("Listening on " + hostInfo.host() + ":" + hostInfo.port()); ``` -------------------------------- ### Obtain HostInfo Instance Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/types.md Get an instance of HostInfo, which contains host and port details. This is typically obtained during Kafka Streams initialization. ```java HostInfo hostInfo = kafkaStreamsInitializer.getHostInfo(); ``` -------------------------------- ### Kafka Streams Topology with Avro Serdes Source: https://github.com/michelin/kstreamplify/blob/main/README.md Example of integrating Avro Serdes into a Kafka Streams topology for input and output topics. ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { streamsBuilder .stream("input_topic", Consumed.with(Serdes.String(), SerdesUtils.getValueSerdes())) .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); } } ``` -------------------------------- ### Java Kafka Streams Initialization Source: https://github.com/michelin/kstreamplify/blob/main/README.md Initialize and start Kafka Streams in your main method using KafkaStreamsInitializer and your KafkaStreamsStarter implementation. ```java public class MainKstreamplify { public static void main(String[] args) { KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(new MyKafkaStreams()); initializer.start(); } } ``` -------------------------------- ### dlqTopic() Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Returns the name of the dead-letter queue topic for error routing. This topic must exist or be created before the application starts. ```APIDOC ## dlqTopic() ### Description Returns the name of the dead-letter queue topic for error routing. This topic must exist or be created before the application starts. ### Signature ```java public abstract String dlqTopic() ``` ### Return Type `String` ### Description The name of the topic where failed records are sent. This is used by error handlers (`DlqProcessingExceptionHandler`, `DlqDeserializationExceptionHandler`, `DlqProductionExceptionHandler`) to route errors. If no DLQ topic is configured, errors may cause the stream to fail. ### Example ```java @Override public String dlqTopic() { return "application-dlq"; } ``` ``` -------------------------------- ### Standalone Kafka Streams Application Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Create a standalone Kafka Streams application by extending KafkaStreamsStarter and defining the topology. Use KafkaStreamsInitializer to start the application. ```java public class MyKafkaStreamsApp extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { // Define topology } @Override public String dlqTopic() { return "my-dlq"; } public static void main(String[] args) { KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(new MyKafkaStreamsApp()); initializer.start(); } } ``` -------------------------------- ### Joining Streams with Avro Keys and Values Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Illustrates how to join two KStreams ('orders' and 'customers') using SerdesUtils for custom object serialization. This example is suitable for enriching order data with customer information. ```java KStream orders = streamsBuilder.stream( "orders", Consumed.with(Serdes.String(), SerdesUtils.getValueSerdes()) ); KStream customers = streamsBuilder.stream( "customers", Consumed.with(Serdes.String(), SerdesUtils.getValueSerdes()) ); orders.join( customers, (order, customer) -> enrichOrderWithCustomer(order, customer), JoinWindows.ofTimeDifferenceAndGrace( Duration.ofSeconds(5), Duration.ofSeconds(1) ), StreamJoined.with( Serdes.String(), SerdesUtils.getValueSerdes(), SerdesUtils.getValueSerdes() ) ).to( "enriched-orders", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()) ); ``` -------------------------------- ### Define Kafka Streams Topology and DLQ Topic Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Implement the topology and dlqTopic methods to define your Kafka Streams application. This example shows a basic stream processing pipeline that converts messages to uppercase and sends them to an output topic. ```java import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.common.serialization.Serdes; import org.springframework.stereotype.Component; @Component public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { KStream stream = streamsBuilder .stream("input_topic", Consumed.with(Serdes.String(), Serdes.String())); stream.mapValues(String::toUpperCase) .to("output_topic", Produced.with(Serdes.String(), Serdes.String())); } @Override public String dlqTopic() { return "dlq_topic"; } } ``` -------------------------------- ### Configure DlqProductionExceptionHandler Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/errors.md Configure DlqProductionExceptionHandler to manage errors during record serialization and production. This setup allows routing serialization errors to the DLQ instead of failing the entire process. ```yaml kafka: properties: production.exception.handler: 'com.michelin.kstreamplify.error.DlqProductionExceptionHandler' dlq: production-handler: continue-on-serialization-exception: true ``` -------------------------------- ### Query DLQ Records Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/errors.md Read and process records from a DLQ topic using Kafka Streams test driver. This example demonstrates how to deserialize KafkaError records and log their details. ```java TestOutputTopic dlqTopic = testDriver.createOutputTopic( "dlq", new StringDeserializer(), SerdesUtils.getValueSerdes().deserializer() ); List> dlqRecords = dlqTopic.readKeyValuesToList(); for (KeyValue record : dlqRecords) { KafkaError error = record.value; log.error("Error at offset {}: {}", error.getOffset(), error.getContextMessage()); log.error("Cause: {}", error.getCause()); } ``` -------------------------------- ### Check code formatting with Spotless Source: https://github.com/michelin/kstreamplify/blob/main/CONTRIBUTING.md Run this command to verify that the codebase adheres to the project's style guidelines. ```bash mvn spotless:check ``` -------------------------------- ### Get Value Serde Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Retrieves the Serde (serializer/deserializer) configured for the topic's values. ```java public Serde getValueSerde() ``` -------------------------------- ### KafkaStreamsStarterTest Implementation Source: https://github.com/michelin/kstreamplify/blob/main/README.md Extend KafkaStreamsStarterTest and override getKafkaStreamsStarter() to set up your test environment. Use TestInputTopic and TestOutputTopic for interacting with your topology. ```java public class MyKafkaStreamsTest extends KafkaStreamsStarterTest { private TestInputTopic inputTopic; private TestOutputTopic outputTopic; @Override protected KafkaStreamsStarter getKafkaStreamsStarter() { return new MyKafkaStreams(); } @BeforeEach void setUp() { inputTopic = testDriver.createInputTopic("input_topic", new StringSerializer(), SerdesUtils.getValueSerdes().serializer()); outputTopic = testDriver.createOutputTopic("output_topic", new StringDeserializer(), SerdesUtils.getValueSerdes().deserializer()); } @Test void shouldUpperCase() { inputTopic.pipeInput("1", user); List> results = outputTopic.readKeyValuesToList(); assertEquals("FIRST NAME", results.get(0).value.getFirstName()); assertEquals("LAST NAME", results.get(0).value.getLastName()); } @Test void shouldFailAndRouteToDlqTopic() { inputTopic.pipeInput("1", user); List> errors = dlqTopic.readKeyValuesToList(); assertEquals("1", errors.get(0).key); assertEquals("Something bad happened...", errors.get(0).value.getContextMessage()); assertEquals(0, errors.get(0).value.getOffset()); } ``` -------------------------------- ### Apply code formatting with Spotless Source: https://github.com/michelin/kstreamplify/blob/main/CONTRIBUTING.md Run this command to automatically fix formatting issues and apply required file headers. ```bash mvn spotless:apply ``` -------------------------------- ### Get Key Serde Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Retrieves the Serde (serializer/deserializer) configured for the topic's keys. ```java public Serde getKeySerde() ``` -------------------------------- ### Get Unprefixed Topic Name Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Returns the original topic name without any prefix or remapping applied. ```java public String getUnPrefixedName() ``` -------------------------------- ### State Store Optimization for HA and Performance Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md Configure state directory and standby replicas for state store optimization. Use a fast disk like SSD for 'state.dir' and set 'num.standby.replicas' to 1 for High Availability. ```yaml kafka: properties: num.standby.replicas: 1 state.dir: '/fast-disk/kafka-streams' ``` -------------------------------- ### getHostInfo() Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Returns the host and port information for the current instance. Returns a `HostInfo` object containing the hostname (or IP) and server port. This is used for interactive queries to route requests to the correct instance in a multi-instance deployment. ```APIDOC ## getHostInfo() ### Description Returns the host and port information for the current instance. Returns a `HostInfo` object containing the hostname (or IP) and server port. This is used for interactive queries to route requests to the correct instance in a multi-instance deployment. ### Signature ```java public HostInfo getHostInfo() ``` ### Return Type `HostInfo` ### Example ```java HostInfo hostInfo = initializer.getHostInfo(); System.out.println("Listening on " + hostInfo.host() + ":" + hostInfo.port()); ``` ``` -------------------------------- ### Get Kafka Streams Topology Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Obtain the built Kafka Streams topology object. This can be useful for debugging or visualizing the topology. ```java Topology topology = initializer.getTopology(); System.out.println(topology.describe()); ``` -------------------------------- ### Configure Kubernetes Probe Paths Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md Customize the URL paths for Kubernetes liveness and readiness probes. Defaults are 'liveness' and 'ready' respectively. ```yaml kubernetes: liveness: path: 'custom-liveness' readiness: path: 'custom-readiness' ``` ```yaml kubernetes: liveness: path: 'health/live' readiness: path: 'health/ready' ``` -------------------------------- ### Create TopicWithSerde with Default Prefix Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Use this constructor to create a TopicWithSerde instance with the default 'self' prefix key. It requires the topic name and Serde objects for keys and values. ```java TopicWithSerde userTopic = new TopicWithSerde<>( "users", Serdes.String(), SerdesUtils.getValueSerdes() ); ``` -------------------------------- ### Get Avro Key Serdes Source: https://github.com/michelin/kstreamplify/blob/main/README.md Retrieve the Serdes for Avro keys using SerdesUtils. This is useful for configuring Kafka Streams topics. ```java SerdesUtils.getKeySerdes() ``` -------------------------------- ### TopicWithSerde Constructors Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Constructors for TopicWithSerde. Use the first to initialize with topic name and SerDes. Use the second to also specify a prefix for the key Serde. ```java public TopicWithSerde(String topicName, Serde keySerde, Serde valueSerde); public TopicWithSerde(String topicName, String prefixKey, Serde keySerde, Serde valueSerde); ``` -------------------------------- ### getKafkaStreamsStarter() Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md This abstract method must be overridden by subclasses to provide the KafkaStreamsStarter implementation that will be tested. It's crucial for setting up the topology for testing purposes. ```APIDOC ## getKafkaStreamsStarter() ### Description Must override - Returns the `KafkaStreamsStarter` implementation to test. ### Signature ```java protected abstract KafkaStreamsStarter getKafkaStreamsStarter() ``` ### Return Type `KafkaStreamsStarter` ### Details Provides the topology implementation for testing. This is typically a new instance of your application's KafkaStreamsStarter. ### Example ```java public class MyTopologyTest extends KafkaStreamsStarterTest { @Override protected KafkaStreamsStarter getKafkaStreamsStarter() { return new MyKafkaStreamsApplication(); } @Test void shouldProcessRecords() { // Test implementation } } ``` ``` -------------------------------- ### Get Avro Value Serdes Source: https://github.com/michelin/kstreamplify/blob/main/README.md Retrieve the Serdes for Avro values using SerdesUtils. This is useful for configuring Kafka Streams topics. ```java SerdesUtils.getValueSerdes() ``` -------------------------------- ### createOutputTestTopic(TopicWithSerde topicWithSerde) Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md Creates a `TestOutputTopic` instance, enabling the retrieval of results from Kafka topics during testing. ```APIDOC ## createOutputTestTopic(TopicWithSerde topicWithSerde) ### Description Creates a `TestOutputTopic` using a `TopicWithSerde` declaration. ### Signature ```java protected TestOutputTopic createOutputTestTopic(TopicWithSerde topicWithSerde) ``` ### Parameters #### Path Parameters There are no path parameters for this method. #### Query Parameters There are no query parameters for this method. #### Request Body There is no request body for this method. ### Parameters | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | topicWithSerde | `TopicWithSerde` | Yes | — | Topic declaration with key/value SerDes | ### Return Type `TestOutputTopic` ### Details Creates an output topic for reading topology results. Works seamlessly with TopicWithSerde. ### Source Similar to createInputTestTopic ``` -------------------------------- ### Set Application Server via Default Environment Variable Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md If `application.server.var.name` is not set, the application server address can be configured using the default `APPLICATION_SERVER` environment variable. ```bash export APPLICATION_SERVER=localhost:8080 ``` -------------------------------- ### Get Kafka Streams Instance Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Retrieve the running KafkaStreams instance. This instance can be used for state queries, interactive queries, or programmatic management. ```java KafkaStreams kafkaStreams = initializer.getKafkaStreams(); System.out.println("Current state: " + kafkaStreams.state()); kafkaStreams.close(); ``` -------------------------------- ### createInputTestTopic(TopicWithSerde topicWithSerde) Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md Creates a `TestInputTopic` instance, simplifying the process of sending data to a Kafka topic during tests. ```APIDOC ## createInputTestTopic(TopicWithSerde topicWithSerde) ### Description Creates a `TestInputTopic` using a `TopicWithSerde` declaration. ### Signature ```java protected TestInputTopic createInputTestTopic(TopicWithSerde topicWithSerde) ``` ### Parameters #### Path Parameters There are no path parameters for this method. #### Query Parameters There are no query parameters for this method. #### Request Body There is no request body for this method. ### Parameters | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | topicWithSerde | `TopicWithSerde` | Yes | — | Topic declaration with key/value SerDes | ### Return Type `TestInputTopic` ### Details Simplifies input topic creation by using the same TopicWithSerde definition as the main topology. The topic name is resolved with prefixes and remapping applied. ### Example ```java public class UserProcessorTest extends KafkaStreamsStarterTest { private TestInputTopic inputTopic; private TestOutputTopic outputTopic; @Override protected KafkaStreamsStarter getKafkaStreamsStarter() { return new UserProcessor(); } @BeforeEach void setUp() { inputTopic = createInputTestTopic(Topics.userInput()); outputTopic = createOutputTestTopic(Topics.userOutput()); } @Test void shouldProcessUsers() { User user = new User("john", "john@example.com"); inputTopic.pipeInput("1", user); List> results = outputTopic.readKeyValuesToList(); assertEquals(1, results.size()); } } // Topic definitions public class Topics { public static TopicWithSerde userInput() { return new TopicWithSerde<>( "users-input", Serdes.String(), SerdesUtils.getValueSerdes() ); } public static TopicWithSerde userOutput() { return new TopicWithSerde<>( "users-output", Serdes.String(), SerdesUtils.getValueSerdes() ); } } ``` ``` -------------------------------- ### Kafka Streams Topology with Processing Result API Source: https://github.com/michelin/kstreamplify/blob/main/README.md Example of using the Processing Result API and TopologyErrorHandler to handle errors and route them to a DLQ topic. ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { KStream stream = streamsBuilder .stream("input_topic", Consumed.with(Serdes.String(), SerdesUtils.getValueSerdes())); TopologyErrorHandler .catchErrors(stream.mapValues(MyKafkaStreams::toUpperCase)) .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); } @Override public String dlqTopic() { return "dlq_topic"; } private static ProcessingResult toUpperCase(KafkaUser value) { try { value.setLastName(value.getLastName().toUpperCase()); return ProcessingResult.success(value); } catch (Exception e) { return ProcessingResult.fail(e, value, "Something went wrong..."); } } } ``` -------------------------------- ### Get Hostname and Port from HostInfo Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/types.md Retrieve the hostname and port number from a HostInfo object. This is useful for interactive query routing and application server configuration. ```java public String host(); public int port(); ``` -------------------------------- ### Get Value Serdes for Avro Records Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Use this to obtain a SpecificAvroSerde configured for record values. It automatically uses Schema Registry settings from the KafkaStreamsExecutionContext. ```java KStream stream = streamsBuilder.stream( "user-events", Consumed.with( Serdes.String(), SerdesUtils.getValueSerdes() ) ); ``` -------------------------------- ### TopicWithSerde Constructors Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Constructors for creating a TopicWithSerde instance. The first constructor uses the default topic name, while the second allows specifying a prefix for the key SerDe. ```APIDOC ## TopicWithSerde Constructors ### `TopicWithSerde(String topicName, Serde keySerde, Serde valueSerde)` Creates a `TopicWithSerde` with the given topic name and key/value SerDes. ### `TopicWithSerde(String topicName, String prefixKey, Serde keySerde, Serde valueSerde)` Creates a `TopicWithSerde` with the given topic name, a prefix for the key SerDe, and key/value SerDes. ``` -------------------------------- ### Get Key Serdes for Avro Records Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Use this to obtain a SpecificAvroSerde configured for record keys. It automatically uses Schema Registry settings from the KafkaStreamsExecutionContext. ```java KStream stream = streamsBuilder.stream( "my-topic", Consumed.with( SerdesUtils.getKeySerdes(), Serdes.String() ) ); ``` -------------------------------- ### Declare Input and Output Topics with TopicWithSerde API Source: https://github.com/michelin/kstreamplify/blob/main/README.md Define input and output topics using the TopicWithSerde API, specifying topic name, key SerDe, and value SerDe. This simplifies topic management across environments. ```java public static TopicWithSerde inputTopic() { return new TopicWithSerde<>( "input_topic", Serdes.String(), SerdesUtils.getValueSerdes() ); } public static TopicWithSerde outputTopic() { return new TopicWithSerde<>( "output_topic", Serdes.String(), SerdesUtils.getValueSerdes() ); } ``` -------------------------------- ### Create Test Topics for Unit Testing Source: https://github.com/michelin/kstreamplify/blob/main/README.md Utilize createInputTestTopic and createOutputTestTopic methods to generate test instances of TopicWithSerde, enabling unit testing of Kafka Streams topologies. ```java TestInputTopic inputTopic = createInputTestTopic(inputTopic()); TestInputTopic outputTopic = createOutputTestTopic(outputTopic()); ``` -------------------------------- ### Kubernetes Pod Spec for Application Server Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md Example of how to set the `APPLICATION_SERVER` environment variable in a Kubernetes pod spec, using the pod's IP address. ```yaml # Kubernetes pod spec example env: - name: APPLICATION_SERVER valueFrom: fieldRef: fieldPath: status.podIP ``` -------------------------------- ### Spring Boot Starter Parent Dependency Source: https://github.com/michelin/kstreamplify/blob/main/README.md Add the Kstreamplify Spring Boot starter to your project using the Spring Boot Starter Parent. You may need to override the kafka.version property. ```xml 4.3.1 com.michelin kstreamplify-spring-boot ${kstreamplify.version} ``` -------------------------------- ### TopicWithSerde Constructor with Default Prefix Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Creates a TopicWithSerde instance using the default 'self' prefix key for the topic. ```APIDOC ## TopicWithSerde(String topicName, Serde keySerde, Serde valueSerde) ### Description Creates a TopicWithSerde with the default "self" prefix key. ### Signature ```java public TopicWithSerde(String topicName, Serde keySerde, Serde valueSerde) ``` ### Parameters #### Path Parameters * **topicName** (String) - Yes - The logical topic name (without prefix) * **keySerde** (Serde) - Yes - The key serializer/deserializer * **valueSerde** (Serde) - Yes - The value serializer/deserializer ### Example ```java TopicWithSerde userTopic = new TopicWithSerde<>( "users", Serdes.String(), SerdesUtils.getValueSerdes() ); ``` ``` -------------------------------- ### Use TopicWithSerde in Tests Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md Ensure tests use the same topic definitions as the main topology by leveraging `Topics.ordersInput()` for both. ```java // In main topology Topics.ordersInput().stream(streamsBuilder) // In tests createInputTestTopic(Topics.ordersInput()) ``` -------------------------------- ### Get Resolved Topic Name (toString) Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Returns the fully resolved topic name, including any prefix and remapping. This method is called automatically by stream/table operations. ```java public String toString() ``` -------------------------------- ### Defining Topics with TopicWithSerde Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Shows how to define Kafka topics with their associated Serdes using the TopicWithSerde class, simplifying stream consumption and production. This is useful for creating reusable topic configurations. ```java public class Topics { public static TopicWithSerde userInput() { return new TopicWithSerde<>( "users-input", Serdes.String(), SerdesUtils.getValueSerdes() ); } public static TopicWithSerde userOutput() { return new TopicWithSerde<>( "users-output", Serdes.String(), SerdesUtils.getValueSerdes() ); } } @Component public class UserProcessor extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { Topics.userInput().stream(streamsBuilder) .mapValues(this::enrichUser) .to(Topics.userOutput()); } } ``` -------------------------------- ### Handle HttpServerException During Initialization Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/errors.md Catch HttpServerException when initializing KafkaStreamsInitializer to manage failures in starting the embedded HTTP server, often due to port conflicts or binding issues. ```java try { KafkaStreamsInitializer initializer = new KafkaStreamsInitializer(starter); initializer.start(); // May throw HttpServerException } catch (HttpServerException e) { log.error("Failed to start HTTP server on port {}", e.getMessage()); } ``` -------------------------------- ### Create TopicWithSerde with Custom Prefix Key Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Use this constructor to create a TopicWithSerde instance with a custom prefix key. This is useful for multi-team topic management, where the prefixKey is used to look up a topic prefix in the application configuration. ```java // Consumes from "staging.team2.orders" topic TopicWithSerde ordersTopic = new TopicWithSerde<>( "orders", "team2", Serdes.String(), SerdesUtils.getValueSerdes() ); // Configuration: // kafka: // properties: // prefix: // team2: "staging.team2." ``` -------------------------------- ### Customize Kubernetes Probe Paths Source: https://github.com/michelin/kstreamplify/blob/main/README.md Configure custom paths for readiness and liveness probes in Kubernetes by setting 'kubernetes.liveness.path' and 'kubernetes.readiness.path' in application.yml. ```yaml kubernetes: liveness: path: 'custom-liveness' readiness: path: 'custom-readiness' ``` -------------------------------- ### Basic Stream Processing with Custom Serdes Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/SerdesUtils.md Demonstrates a basic Kafka Streams topology using SerdesUtils for custom object serialization and deserialization. This is useful for processing streams of custom objects like 'User'. ```java @Component public class UserProcessor extends KafkaStreamsStarter { @Override public void topology(StreamsBuilder streamsBuilder) { streamsBuilder .stream( "user-input", Consumed.with( Serdes.String(), SerdesUtils.getValueSerdes() ) ) .mapValues(this::enrichUser) .to( "user-output", Produced.with( Serdes.String(), SerdesUtils.getValueSerdes() ) ); } private User enrichUser(User user) { user.setProcessedAt(System.currentTimeMillis()); return user; } @Override public String dlqTopic() { return "dlq"; } } ``` -------------------------------- ### Get Server Port Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Retrieves the configured HTTP server port used for exposing web services. This port is essential for topology endpoints, interactive queries, and Kubernetes probes. ```java public int getServerPort() ``` -------------------------------- ### getKafkaStreamsStarter() Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Returns the `KafkaStreamsStarter` implementation passed to the constructor. ```APIDOC ## getKafkaStreamsStarter() ### Description Returns the `KafkaStreamsStarter` implementation passed to the constructor. ### Signature ```java public KafkaStreamsStarter getKafkaStreamsStarter() ``` ### Return Type `KafkaStreamsStarter` ``` -------------------------------- ### TopicWithSerde Constructor with Custom Prefix Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/TopicWithSerde.md Creates a TopicWithSerde instance with a custom prefix key, enabling multi-team topic management by looking up the prefix in application configuration. ```APIDOC ## TopicWithSerde(String topicName, String prefixKey, Serde keySerde, Serde valueSerde) ### Description Creates a TopicWithSerde with a custom prefix key for multi-team topic management. ### Signature ```java public TopicWithSerde(String topicName, String prefixKey, Serde keySerde, Serde valueSerde) ``` ### Parameters #### Path Parameters * **topicName** (String) - Yes - The logical topic name (without prefix) * **prefixKey** (String) - Yes - The prefix key to lookup in configuration * **keySerde** (Serde) - Yes - The key serializer/deserializer * **valueSerde** (Serde) - Yes - The value serializer/deserializer ### Description The `prefixKey` is used to lookup a prefix in the application configuration under `kafka.properties.prefix.`. This enables consuming or producing to topics owned by different teams. ### Example ```java // Consumes from "staging.team2.orders" topic TopicWithSerde ordersTopic = new TopicWithSerde<>( "orders", "team2", Serdes.String(), SerdesUtils.getValueSerdes() ); // Configuration: // kafka: // properties: // prefix: // team2: "staging.team2." ``` ``` -------------------------------- ### Override Initial Wall Clock Time Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarterTest.md Override this method to control the starting event time for the topology test driver. This is crucial for time-based operations like windowing and grace periods. ```java protected Instant getInitialWallClockTime() { return Instant.parse("2024-01-01T00:00:00Z"); } ``` -------------------------------- ### Configure RocksDB State Store Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/types.md Illustrates setting configuration options for RocksDB state store behavior, such as cache size and compression type. These settings are crucial for performance tuning. ```java RocksDbConfig rocksDbConfig = new RocksDbConfig(); rocksDbConfig.setBlockCacheSize(100 * 1024 * 1024); // 100MB rocksDbConfig.setWriteBufferSize(64 * 1024 * 1024); // 64MB rocksDbConfig.setBlockSize(8192); // 8KB rocksDbConfig.setCompressionType("SNAPPY"); ``` -------------------------------- ### Handle OtherInstanceResponseException in Distributed Queries Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/errors.md Catch OtherInstanceResponseException when performing interactive queries in a distributed Kafka Streams setup to manage failures in communicating with other instances. This can occur if a remote instance is unreachable. ```java try { StateStoreRecord result = keyValueStoreService.getKeyValueByKey("store", "key"); } catch (OtherInstanceResponseException e) { log.warn("Could not reach other instance for query: {}", e.getMessage()); // Retry or fallback } ``` -------------------------------- ### Access KafkaStreamsExecutionContext Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsInitializer.md Demonstrates how to access various components of the KafkaStreamsExecutionContext after initialization. This includes retrieving Kafka properties, SerDes configuration, the DLQ topic name, and the application prefix. ```java KafkaStreamsExecutionContext.getProperties(); KafkaStreamsExecutionContext.getSerdesConfig(); KafkaStreamsExecutionContext.getDlqTopicName(); KafkaStreamsExecutionContext.getPrefix(); ``` -------------------------------- ### Customize Uncaught Exception Handling Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/KafkaStreamsStarter.md Provide a custom StreamsUncaughtExceptionHandler to control the behavior when uncaught exceptions occur in stream processing threads. This example logs the error and shuts down the Kafka Streams client. ```java import org.apache.kafka.streams.StreamsUncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // ... inside your class extending KafkaStreamsStarter ... private static final Logger log = LoggerFactory.getLogger(MyKafkaStreams.class); @Override public StreamsUncaughtExceptionHandler uncaughtExceptionHandler() { return (exception) -> { log.error("Uncaught exception in stream thread", exception); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; }; } ``` -------------------------------- ### Retrieve successful value from ProcessingResult Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/ProcessingResult.md Call getValue() to get the processed value when the ProcessingResult is successful. Returns null if the result contains an error. Ensure isValid() is true before calling this method to avoid null pointer exceptions. ```java public V getValue() ``` ```java if (result.isValid()) { KafkaUser user = result.getValue(); } ``` -------------------------------- ### Configure Spring Boot Application Name and Kafka Servers Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/configuration.md Set the application name and Kafka bootstrap servers when using the Spring Boot starter. Kafka properties are automatically converted. ```yaml spring: application: name: 'my-kafka-streams-app' kafka: bootstrap-servers: 'localhost:9092' ``` -------------------------------- ### Kafka Streams Processor API with error handling Source: https://github.com/michelin/kstreamplify/blob/main/_autodocs/api-reference/ProcessingResult.md This example shows how to implement error handling using ProcessingResult within a custom Kafka Streams Processor. It demonstrates forwarding successful or failed records using the context's forward method. ```java public static class UserProcessor extends ContextualProcessor> { @Override public void process(Record record) { try { KafkaUser user = record.value(); if (user.getEmail() == null) { throw new IllegalArgumentException("Email is required"); } context().forward(ProcessingResult.wrapRecordSuccess(record.withValue(user))); } catch (Exception e) { context().forward(ProcessingResult.wrapRecordFailure(e, record, "Validation failed")); } } } ```