### Full CentralDogmaPropertySupplier usage example Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc A complete example demonstrating the setup of Central Dogma client and Decaton processor subscription. ```java public class CentralDogmaSupplierMain { public static void main(String[] args) throws Exception { final CentralDogma centralDogma = new ArmeriaCentralDogmaBuilder() .host("127.0.0.1") .accessToken("accesstoken") .build(); CentralDogmaPropertySupplier supplier = CentralDogmaPropertySupplier .register(centralDogma, "project", "repository", "/testProcessor.json"); // ... ProcessorSubscription testProcessor = SubscriptionBuilder.newBuilder("testProcessor") .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .properties(supplier) .consumerConfig(consumerConfig) .buildAndStart(); } } ``` -------------------------------- ### Build and Run Decaton Example Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc This shell script demonstrates how to build the Decaton example using Gradle and then run the processor and producer. It also shows the expected output when a task is processed. ```sh $ ./gradlew shadowJar $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProcessorMain & $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.ProducerMain motoko 25 Put task succeeded: my-decaton-topic-1-5 Noticed motoko is 25 years old ``` -------------------------------- ### YAML: Decaton Processor Configuration Example Source: https://context7.com/line/decaton/llms.txt Example configuration file in YAML format for Decaton processor properties. This file can be monitored by Central Dogma for dynamic updates. ```yaml # Decaton processor configuration decaton.partition.concurrency: 20 decaton.processing.rate.per.partition: 500 decaton.max.pending.records: 5000 decaton.commit.interval.ms: 2000 decaton.ignore.keys: - blocked-user-1 - blocked-user-2 ``` -------------------------------- ### Configure CompactionProcessor in pipeline Source: https://github.com/line/decaton/blob/master/docs/task-compaction.adoc Example showing how to integrate CompactionProcessor into a Decaton subscription pipeline and define the compaction logic. ```java public class TaskCompactionMain { public static void main(String[] args) { // ... ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") .processorsBuilder( ProcessorsBuilder .consuming("my-decaton-topic", extractor) .thenProcess(TaskCompactionMain::createCompactionProcessor, // <1> ProcessorScope.THREAD) .thenProcess(LocationEventProcessor::new, // <2> ProcessorScope.THREAD) ) } private static CompactionProcessor createCompactionProcessor() { return new CompactionProcessor<>(1000L, (left, right) -> { // <3> if (left.task().getTimestamp() == right.task().getTimestamp()) { // <4> return CompactChoice.PICK_EITHER; } else if (left.task().getTimestamp() > right.task().getTimestamp()) { return CompactChoice.PICK_LEFT; // <5> } else { return CompactChoice.PICK_RIGHT; // <6> } }); } } ``` -------------------------------- ### Build and Start ProcessorSubscription Source: https://context7.com/line/decaton/llms.txt Configures and starts a Decaton ProcessorSubscription. Requires Kafka consumer properties and processor-specific configurations like topic, deserializer, and processing logic. Includes settings for concurrency, pending records, and commit intervals. Handles graceful shutdown via a shutdown hook. ```java import com.linecorp.decaton.processor.runtime.ProcessorSubscription; import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; public class ProcessorSubscriptionExample { public static void main(String[] args) throws Exception { // Configure Kafka consumer Properties consumerConfig = new Properties(); consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-processor-group"); consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-processor"); // Build and start the processor subscription ProcessorSubscription subscription = SubscriptionBuilder .newBuilder("my-processor") // Subscription ID for metrics/logging .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .properties( StaticPropertySupplier.of( // 10 threads per partition for concurrent processing Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 10), // Maximum pending records before pausing consumption Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 1000), // Commit interval in milliseconds Property.ofStatic(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS, 1000L) ) ) .buildAndStart(); // Subscription runs in background threads System.out.println("Processor started, press Ctrl+C to stop"); // Graceful shutdown Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Shutting down processor..."); subscription.close(); })); // Keep main thread alive Thread.currentThread().join(); } } ``` -------------------------------- ### JSON: Decaton Processor Configuration Example Source: https://context7.com/line/decaton/llms.txt Example configuration file in JSON format for Decaton processor properties. This file can be monitored by Central Dogma for dynamic updates. ```json { "$schema": "https://raw.githubusercontent.com/line/decaton/v9.5.0/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json", "decaton.partition.concurrency": 20, "decaton.processing.rate.per.partition": 500, "decaton.max.pending.records": 5000, "decaton.commit.interval.ms": 2000, "decaton.ignore.keys": ["blocked-user-1", "blocked-user-2"] } ``` -------------------------------- ### Instantiate and Run ProcessorSubscription Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc This main class sets up and starts a Decaton processor subscription. It configures Kafka consumer properties, defines the topic and processor, and runs the subscription for a specified duration. ```java public final class ProcessorMain { public static void main(String[] args) throws Exception { Properties consumerConfig = new Properties(); consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor"); consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers")); consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor"); ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") // <1> .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .buildAndStart(); Thread.sleep(10000); subscription.close(); } } ``` -------------------------------- ### Example YAML property file format Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc YAML configuration files must use flat, dot-separated keys similar to JSON. ```yaml # You should use the flat, dot-separated keys like JSON. decaton.partition.concurrency: 8 decaton.processing.rate.per.partition: 50 ``` -------------------------------- ### Decaton Processor Configuration (YAML) Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Example of a Decaton processor dynamic configuration file in YAML format, demonstrating the use of $schema for validation. ```yaml # $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json # yaml-language-server: $schema=https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json decaton.partition.concurrency: 10000 decaton.processing.rate.per.partition: -1 ... ``` -------------------------------- ### Produce Tasks with DecatonClient Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Example of producing tasks in a batch using DecatonClient. The client handles task delivery and provides a callback for error handling. ```java public final class BatchProducerMain { public static void main(String[] args) throws Exception { try (DecatonClient client = newClient()) { for (int i = 0; i < 100; i++) { String name = "name:" + i; PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(i).build(); client.put(name, task) .whenComplete((r, e) -> { if (e != null) { System.err.println("Producing task failed... " + e); } }); } } } ... ``` -------------------------------- ### Decaton Processor Configuration (JSON Schema) Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Example of a Decaton processor dynamic configuration file in JSON format, including the $schema directive for IDE and CI validation. ```json { "$schema": "https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json", "decaton.partition.concurrency": 10000, "decaton.processing.rate.per.partition": -1, ... } ``` -------------------------------- ### Configure TaskBatchingMain Source: https://github.com/line/decaton/blob/master/docs/task-batching.adoc Register the batching processor within the SubscriptionBuilder setup. ```java public class TaskBatchingMain { public static void main(String[] args) { // ... (Set up other options like default DecatonProcessor) long lingerMillis = 1000; int capacity = 100; ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") .processorsBuilder( ProcessorsBuilder .consuming("my-decaton-topic", extractor) .thenProcess(() -> createBatchingProcessor(lingerMillis, capacity), ProcessorScope.THREAD) ) // ... (Set up other options and build) } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); // <1> ``` -------------------------------- ### Configure Micrometer Monitoring Source: https://context7.com/line/decaton/llms.txt Register a Micrometer registry with Decaton to enable automatic metric collection. This example demonstrates exposing metrics via a Prometheus HTTP server. ```java import com.linecorp.decaton.processor.metrics.Metrics; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; import io.prometheus.client.exporter.HTTPServer; public class MonitoringExample { public static void setupMonitoring() throws Exception { // Create Prometheus registry PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); // Register with Decaton - metrics are automatically collected Metrics.register(registry); // Expose metrics endpoint for Prometheus scraping new HTTPServer.Builder() .withPort(9090) .withRegistry(registry.getPrometheusRegistry()) .build(); System.out.println("Metrics available at http://localhost:9090/metrics"); } } ``` -------------------------------- ### Set Static Ignored Keys in Java Source: https://github.com/line/decaton/blob/master/docs/key-blocking.adoc Use `StaticPropertySupplier` to set a static list of keys to ignore. This example demonstrates how to configure `CONFIG_IGNORE_KEYS` when building a subscription. ```java SubscriptionBuilder.newBuilder("ignore-key-processor") .properties( StaticPropertySupplier.of( Property.ofStatic( CONFIG_IGNORE_KEYS, Arrays.asList("key1", "key2") ))) ... ``` -------------------------------- ### Set Static Processing Rate in Decaton Source: https://github.com/line/decaton/blob/master/docs/rate-limiting.adoc Configure a static processing rate for a Decaton subscription using `SubscriptionBuilder#properties`. This example uses `StaticPropertySupplier` to set the `CONFIG_PROCESSING_RATE` to 100L. The property can also be configured dynamically. ```java ... SubscriptionBuilder.newBuilder("rate-limit-processor") .properties( StaticPropertySupplier.of( // <1> Property.ofStatic(CONFIG_PROCESSING_RATE, 100L))) ... ``` -------------------------------- ### Produce Tasks with DecatonClient Source: https://context7.com/line/decaton/llms.txt Demonstrates configuring the DecatonClient, creating tasks, and various methods for putting tasks onto a queue including asynchronous, synchronous, and scheduled delivery. ```java import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.client.PutTaskResult; import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; import java.util.concurrent.CompletableFuture; public class TaskProducerExample { public static void main(String[] args) throws Exception { // Configure producer properties Properties producerConfig = new Properties(); producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-decaton-client"); producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Create DecatonClient for producing tasks try (DecatonClient client = DecatonClient .producing("my-decaton-topic", new ProtocolBuffersSerializer()) .applicationId("MyApp") .instanceId("localhost") .producerConfig(producerConfig) .build()) { // Create a task PrintMessageTask task = PrintMessageTask.newBuilder() .setName("John") .setAge(30) .build(); // Put task asynchronously - key determines partition routing and ordering CompletableFuture result = client.put("John", task); // Wait synchronously for completion PutTaskResult putResult = result.join(); System.out.println("Task sent to: " + putResult.topic() + "-" + putResult.partition() + "-" + putResult.offset()); // Or handle asynchronously with error callback client.put("Jane", task, error -> { System.err.println("Failed to produce task: " + error.getMessage()); }); // Put with custom timestamp client.put("Bob", task, System.currentTimeMillis()); // Put with scheduled processing time (delayed processing) DecatonClient.TaskMetadata metadata = DecatonClient.TaskMetadata.builder() .timestamp(System.currentTimeMillis()) .scheduledTime(System.currentTimeMillis() + 60000) // Process after 1 minute .build(); client.put("Alice", task, metadata); } } } ``` -------------------------------- ### Build Project with Gradle Source: https://github.com/line/decaton/blob/master/README.md Execute the build command to compile the project using the Gradle wrapper. ```sh ./gradlew build ``` -------------------------------- ### Configure Gradle for Decaton and Protobuf Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Configures the build environment with necessary plugins and dependencies for Protocol Buffers and Kafka clients. ```groovy plugins { id 'idea' id 'com.google.protobuf' version '0.8.18' id 'com.gradleup.shadow' version '8.3.6' } dependencies { implementation "com.google.protobuf:protobuf-java:$PROTOBUF_VERSION" implementation "org.apache.kafka:kafka-clients:$KAFKA_VERSION" } protobuf { protoc { artifact = "com.google.protobuf:protoc:$PROTOBUF_VERSION" } } idea { module { generatedSourceDirs += file('build/generated/source/proto/main/java') } } ``` -------------------------------- ### Java: Dynamic Processor Configuration with Central Dogma Source: https://context7.com/line/decaton/llms.txt Sets up a Decaton processor to dynamically load configuration from Central Dogma. Static properties take precedence over dynamic ones. Ensure Central Dogma client is configured with host, port, and access token. ```java import com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplier; import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.centraldogma.client.CentralDogma; import com.linecorp.centraldogma.client.armeria.ArmeriaCentralDogmaBuilder; public class DynamicConfigExample { public static ProcessorSubscription createDynamicSubscription(Properties consumerConfig) throws Exception { // Connect to Central Dogma server CentralDogma centralDogma = new ArmeriaCentralDogmaBuilder() .host("centraldogma.example.com", 36462) .accessToken("your-access-token") .build(); // Create property supplier that watches for config changes // File will be created with defaults if it doesn't exist CentralDogmaPropertySupplier dynamicSupplier = CentralDogmaPropertySupplier.register( centralDogma, "my-project", "my-repository", "/decaton/processor-config.json" // or .yaml ); return SubscriptionBuilder .newBuilder("dynamic-processor") .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .properties( // Static properties take precedence over dynamic ones StaticPropertySupplier.of( Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 10) ), // Dynamic properties from Central Dogma dynamicSupplier ) .buildAndStart(); } } ``` -------------------------------- ### Build and Run Decaton Processor Source: https://github.com/line/decaton/blob/master/docs/task-extractor.adoc Builds the project JAR and executes the processor while providing a sample JSON message via the Kafka console producer. ```sh $ ./gradlew shadowJar $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.UserEventProcessorMain & $ /path/to/kafka_dir/bin/kafka-console-producer.sh --broker-list $KAFKA_BOOTSTRAP_SERVERS --topic my-decaton-json-topic > {"eventTimestampMillis":1571368115079,"name": "daisuke","age": 52} ``` -------------------------------- ### Implement Task Producer Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Demonstrates creating a DecatonClient to produce tasks into a Kafka topic. The client should be created once and closed properly to flush buffers. ```java public final class ProducerMain { public static void main(String[] args) throws Exception { try (DecatonClient client = newClient()) { String name = args[0]; int age = Integer.parseInt(args[1]); PrintMessageTask task = PrintMessageTask.newBuilder().setName(name).setAge(age).build(); CompletableFuture result = client.put(name, task); // <1> // Synchronously wait the result result.join(); // Asynchronously observe the result result.whenComplete((r, e) -> { System.err.println("Producing task failed... " + e); }); } } private static DecatonClient newClient() { Properties producerConfig = new Properties(); producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-decaton-client"); producerConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers")); return DecatonClient.producing("my-decaton-topic", new ProtocolBuffersSerializer()) .applicationId("MyApp") // <2> // By default it sets local hostname but here we go explicit .instanceId("localhost") .producerConfig(producerConfig) .build(); } } ``` -------------------------------- ### Build Decaton Benchmark ShadowJar Source: https://github.com/line/decaton/blob/master/benchmark/README.md Builds the shadowJar for the benchmark module using Gradle. ```sh ../gradlew :benchmark:shadowJar ``` -------------------------------- ### Configure Decaton with Brave/Zipkin Tracing Source: https://context7.com/line/decaton/llms.txt Use this snippet to enable distributed tracing in Decaton by configuring Zipkin reporter and Brave tracing. Ensure Zipkin is accessible at the specified URL. ```java import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.BraveTracingProvider; import brave.Tracing; import brave.kafka.clients.KafkaTracing; import zipkin2.reporter.brave.AsyncZipkinSpanHandler; import zipkin2.reporter.urlconnection.URLConnectionSender; public class TracingExample { public static ProcessorSubscription createTracedSubscription(Properties consumerConfig) { // Configure Zipkin reporter AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.create( URLConnectionSender.create("http://zipkin:9411/api/v2/spans")); // Create Brave tracing Tracing tracing = Tracing.newBuilder() .localServiceName("my-decaton-processor") .addSpanHandler(spanHandler) .build(); return SubscriptionBuilder .newBuilder("traced-processor") .enableTracing(new BraveTracingProvider(tracing)) .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .buildAndStart(); } } ``` -------------------------------- ### Configure ProcessorSubscription with PropertySupplier Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Apply the property supplier to the Decaton processor during subscription initialization. ```java ProcessorSubscription testProcessor = SubscriptionBuilder.newBuilder("testProcessor") /* ... */ .properties(supplier) .consumerConfig(consumerConfig) .buildAndStart(); ``` -------------------------------- ### Run Decaton Performance Benchmark Source: https://github.com/line/decaton/blob/master/docs/runtime.adoc Execute the benchmark script to compare runtime performance with varying latency and concurrency settings. ```bash ./debm.sh \ --title "Decaton" \ --runner com.linecorp.decaton.benchmark.DecatonRunner \ --runs 3 \ --format json \ --tasks 10000 \ --warmup 100000 \ --simulate-latency=$latency \ --latency-count=5 \ --param=decaton.partition.concurrency=300 \ --param=decaton.subpartition.runtime={THREAD_POOL|VIRTUAL_THREAD} ``` -------------------------------- ### Execute Decaton Benchmark Source: https://github.com/line/decaton/blob/master/benchmark/README.md Runs the benchmark script with specified parameters for tasks, warmup, and concurrency. ```sh ./debm.sh \ --title "Decaton" \ --runner com.linecorp.decaton.benchmark.DecatonRunner \ --tasks 10000 \ --warmup 100 \ --simulate-latency=10 \ --param=decaton.partition.concurrency=20 ``` -------------------------------- ### Enable Brave Tracing in Decaton Source: https://github.com/line/decaton/blob/master/docs/tracing.adoc Configure Decaton to use BraveTracingProvider to enable distributed tracing. Ensure your KafkaProducerSupplier is set up to handle trace information. ```java SubscriptionBuilder.newBuilder("tracing-key-processor") .enableTracing( new BraveTracingProvider()) ... ``` -------------------------------- ### Import PGP Secret Key on New Machine Source: https://github.com/line/decaton/blob/master/developer-docs/making-release.md Imports a PGP secret key from a file on a new machine. ```sh gpg --import gpg-secret.key ``` -------------------------------- ### Configure Rate Limited Processor in Java Source: https://context7.com/line/decaton/llms.txt Sets up a Decaton processor with a specified processing rate and concurrency per partition. Use `ProcessorProperties.CONFIG_PROCESSING_RATE` to define tasks per second and `ProcessorProperties.CONFIG_PARTITION_CONCURRENCY` for threads per partition. Valid rates are -1 for unlimited, 0 to pause, and N > 0 for N tasks/sec. ```java import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.Property; import java.util.Properties; public class RateLimitedProcessorExample { public static ProcessorSubscription createRateLimitedSubscription(Properties consumerConfig) { return SubscriptionBuilder .newBuilder("rate-limited-processor") .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new DatabaseProcessor()) ) .consumerConfig(consumerConfig) .properties( StaticPropertySupplier.of( // Limit to 100 tasks/second per partition // With 3 partitions, total max = 300 tasks/second Property.ofStatic(ProcessorProperties.CONFIG_PROCESSING_RATE, 100L), // Use 5 threads per partition Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 5) ) ) .buildAndStart(); } } // Rate values: // -1 (default): Unlimited processing rate // 0: Stop all processing (pause) // N > 0: Process up to N tasks per second per partition ``` -------------------------------- ### Gradle Properties for Signing and Sonatype Source: https://github.com/line/decaton/blob/master/developer-docs/making-release.md Configuration for Gradle to handle signing artifacts and authenticating with Sonatype. Replace placeholders with your specific details. ```properties signing.keyId=// The last 8 symbols of the key ID that you can see with `gpg -K` signing.secretKeyRingFile=/path/to/HOME/.gradle/pgp-keyring.gpg signing.password=// Your PGP keyring password sonatypeUsername=// Sonatype token's username that you confirmed at step 1 sonatypePassword=// Sonatype token's password you confirmed at step 1 ``` -------------------------------- ### Provide Static Properties with Property Supplier Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Use StaticPropertySupplier to provide hardcoded properties for Decaton configuration. Properties are adopted from the first supplier that defines them. ```java .properties( StaticPropertySupplier.of( // This one is adopted even if below centralDogmaPropertySupplier contains the same property Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 100), Property.ofStatic(...)), centralDogmaPropertySupplier); ``` -------------------------------- ### Instantiate CentralDogmaPropertySupplier Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Choose between registering a new property file with defaults or using an existing one. ```java CentralDogmaPropertySupplier supplier = CentralDogmaPropertySupplier .register(centralDogma, PROJECT_NAME, REPOSITORY_NAME, "/properties.json"); // <1> CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier( centralDogma, PROJECT_NAME, REPOSITORY_NAME, "/properties.json"); // <2> ``` -------------------------------- ### Configure Subpartition Runtime in Java Source: https://github.com/line/decaton/blob/master/docs/runtime.adoc Set the subpartition runtime for a subscription using the SubscriptionBuilder. ```java SubscriptionBuilder.newBuilder("my-subscription") .subPartitionRuntime(SubPartitionRuntime.VIRTUAL_THREAD) ... .build(); ``` -------------------------------- ### Run Custom Benchmark Source: https://github.com/line/decaton/blob/master/benchmark/README.md Executes the benchmark script using a custom runner JAR. ```sh export CLASSPATH=/path/to/custom-runner-all.jar cd /path/to/decaton/benchmark ./debm.sh \ --title "Custom" \ --runner com.example.CustomRunner \ --tasks 10000 \ --warmup 100 \ --simulate-latency=10 ``` -------------------------------- ### Benchmark Concurrency Tuning Source: https://github.com/line/decaton/blob/master/docs/runtime.adoc Run the benchmark script with varying concurrency values to identify the throughput peak for the THREAD_POOL runtime. ```bash ./debm.sh \ --title "Decaton" \ --runner com.linecorp.decaton.benchmark.DecatonRunner \ --runs 2 \ --format json \ --tasks 10000 \ --warmup 100000 \ --simulate-latency=4 \ --latency-count=5 \ --param=decaton.partition.concurrency=$conc \ --param=decaton.subpartition.runtime=THREAD_POOL ``` -------------------------------- ### Add Benchmark Dependency Source: https://github.com/line/decaton/blob/master/benchmark/README.md Configures the build.gradle file to include the decaton-benchmark dependency. ```groovy repositories { mavenCentral() mavenLocal() } dependencies { implementation "com.linecorp.decaton:decaton-benchmark:$DECATON_VERSION" } ``` -------------------------------- ### Publish Benchmark Module Locally Source: https://github.com/line/decaton/blob/master/benchmark/README.md Publishes the benchmark module to the local Maven repository for use in custom projects. ```sh cd /path/to/decaton/benchmark ../gradlew :benchmark:publishToMavenLocal -x sign ``` -------------------------------- ### Implement Task Compaction in Decaton Source: https://context7.com/line/decaton/llms.txt Configures a subscription with a CompactionProcessor to deduplicate location events based on timestamps. The CompactionProcessor must be the first processor in the pipeline. ```java import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.ProcessorScope; import com.linecorp.decaton.processor.processors.CompactionProcessor; import com.linecorp.decaton.processor.processors.CompactionProcessor.CompactChoice; // Domain object with timestamp for compaction decision public class LocationEvent { private long timestamp; private double latitude; private double longitude; private String userId; // getters and setters } // Processor that handles compacted location events public class LocationEventProcessor implements DecatonProcessor { @Override public void process(ProcessingContext context, LocationEvent event) throws InterruptedException { // Only processes the most recent location per user within the compaction window System.out.printf("Processing location for user %s: (%.6f, %.6f) at %d%n", event.getUserId(), event.getLatitude(), event.getLongitude(), event.getTimestamp()); saveToDatabase(event); } } // Subscription with task compaction public class CompactionExample { public static ProcessorSubscription createCompactingSubscription(Properties consumerConfig) { return SubscriptionBuilder .newBuilder("location-processor") .processorsBuilder( ProcessorsBuilder.consuming("location-events", new LocationEventExtractor()) // CompactionProcessor MUST be first in the pipeline .thenProcess(CompactionExample::createCompactionProcessor, ProcessorScope.THREAD) // Then the actual processing logic .thenProcess(LocationEventProcessor::new, ProcessorScope.THREAD) ) .consumerConfig(consumerConfig) .buildAndStart(); } private static CompactionProcessor createCompactionProcessor() { return new CompactionProcessor<>( 1000L, // lingerMillis: 1 second compaction window (left, right) -> { // Compare timestamps to decide which task survives if (left.task().getTimestamp() > right.task().getTimestamp()) { return CompactChoice.PICK_LEFT; // Keep older task } else if (left.task().getTimestamp() < right.task().getTimestamp()) { return CompactChoice.PICK_RIGHT; // Keep newer task } else { return CompactChoice.PICK_EITHER; // Same timestamp, either works } } ); } } ``` -------------------------------- ### Configure ProcessorSubscription with Concurrency Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc This snippet shows how to configure the ProcessorSubscription to use a specified number of threads for processing tasks within a single partition, enhancing throughput. ```java int partitionConcurrency = Integer.parseInt(System.getProperty("concurrency")); ProcessorSubscription subscription = SubscriptionBuilder.newBuilder("my-decaton-processor") ``` -------------------------------- ### Implement RetryingProcessor and Subscription Source: https://context7.com/line/decaton/llms.txt Defines a processor with retry logic and a subscription configured with a 5-second backoff. ```java import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.RetryConfig; import com.linecorp.decaton.processor.Completion; import java.time.Duration; // Processor with retry logic public class RetryingProcessor implements DecatonProcessor { private static final int MAX_RETRIES = 5; @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { try { // Attempt to process - may fail due to external service issues callExternalService(task); } catch (Exception e) { if (context.metadata().retryCount() < MAX_RETRIES) { // Schedule retry - task will be reprocessed after backoff System.out.printf("Retrying task (attempt %d/%d): %s%n", context.metadata().retryCount() + 1, MAX_RETRIES, e.getMessage()); context.retry(); // Automatically handles completion } else { // Max retries exceeded - log and discard System.err.printf("Task failed after %d retries, discarding: %s%n", MAX_RETRIES, task.getName()); } } } private void callExternalService(PrintMessageTask task) throws Exception { // Simulated external call that may fail if (Math.random() < 0.3) { throw new Exception("Service temporarily unavailable"); } } } // Subscription with retry enabled public class RetrySubscriptionExample { public static ProcessorSubscription createSubscription(Properties consumerConfig) { return SubscriptionBuilder .newBuilder("retry-processor") .enableRetry(RetryConfig.withBackoff(Duration.ofSeconds(5))) // 5 second backoff .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", // Also subscribes to my-decaton-topic-retry new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new RetryingProcessor()) ) .consumerConfig(consumerConfig) .buildAndStart(); } } ``` -------------------------------- ### Implement AsyncRetryProcessor Source: https://context7.com/line/decaton/llms.txt Demonstrates how to handle retries within an asynchronous processing context using deferCompletion. ```java // Async processor with retry public class AsyncRetryProcessor implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { Completion completion = context.deferCompletion(); callExternalServiceAsync(task).whenComplete((result, error) -> { if (error != null && context.metadata().retryCount() < 3) { try { context.retry(); // Handles completion automatically } catch (InterruptedException e) { Thread.currentThread().interrupt(); completion.complete(); } } else { completion.complete(); // Must complete on success or final failure } }); } } ``` -------------------------------- ### Define Task Schema with Protobuf Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Defines a simple task schema using Protocol Buffers syntax. ```protobuf syntax = "proto3"; package com.linecorp.decaton.example.protocol; message PrintMessageTask { string name = 1; int32 age = 2; } ``` -------------------------------- ### Implement Asynchronous Task Processing with Deferred Completion Source: https://context7.com/line/decaton/llms.txt Implement DecatonProcessor for asynchronous task processing using deferred completion. You are responsible for calling `complete()` on the `Completion` object. The `name()` method is used for tracing. ```java import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.Completion; // Asynchronous processor with deferred completion public class AsyncProcessor implements DecatonProcessor { private final HttpClient httpClient; public AsyncProcessor(HttpClient httpClient) { this.httpClient = httpClient; } @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { // Defer completion - YOU are responsible for calling complete() Completion completion = context.deferCompletion(); // Send async HTTP request httpClient.sendAsync(buildRequest(task)) .whenComplete((response, error) -> { if (error != null) { System.err.println("Processing failed: " + error.getMessage()); } // MUST call complete() to allow offset commit completion.complete(); }); // Method returns immediately, task completes when callback fires } @Override public String name() { return "AsyncHttpProcessor"; // Used for tracing/observability } } ``` -------------------------------- ### Execute Decaton Processor with Varying Concurrency Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Shell commands to run the processor with different concurrency settings and trigger the batch producer. Observe the impact of concurrency on task delivery latency. ```sh $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=1 example.ProcessorMain2 $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain Task for name:1 delivered in 37 ms Task for name:0 delivered in 199 ms Task for name:3 delivered in 41 ms ... Task for name:95 delivered in 1287 ms Task for name:96 delivered in 1322 ms ``` ```sh $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS -Dconcurrency=20 example.ProcessorMain2 $ java -cp build/libs/example-*-all.jar -Dbootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS example.BatchProducerMain Task for name:10 delivered in 41 ms Task for name:36 delivered in 37 ms ... Task for name:84 delivered in 160 ms Task for name:89 delivered in 183 ms ``` -------------------------------- ### Run Decaton Processor Benchmarks Source: https://github.com/line/decaton/blob/master/processor/README.md Executes the JMH benchmark suite for the processor module. Running without the Gradle daemon is recommended for consistent results. ```sh ./gradlew --no-daemon clean processor:jmh ``` -------------------------------- ### Enable Retry Queuing in Decaton Configuration Source: https://github.com/line/decaton/blob/master/docs/retry-queueing.adoc Configure the processor to use a retry topic and define backoff duration. The default retry topic name is the original topic name suffixed with '-retry'. ```java ... SubscriptionBuilder.newBuilder("testProcessor") .enableRetry(RetryConfig.withBackoff(Duration.ofMillis(100))) // <1> .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", // <2> new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new RetryingProcessorSync()) ) ... ``` -------------------------------- ### Implement DecatonProcessor for Slow IO Simulation Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc This class implements DecatonProcessor to simulate a slow I/O operation by sleeping for a short duration. It calculates and prints the delivery latency of the task. ```java public class PrintMessageTaskProcessor2 implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { long deliveryLatencyMs = System.currentTimeMillis() - context.metadata().timestampMillis(); simulateSlowIO(); System.out.printf("Task for %s delivered in %d ms\n", task.getName(), deliveryLatencyMs); } private static void simulateSlowIO() throws InterruptedException { Thread.sleep(30); } } ``` -------------------------------- ### Add Central Dogma dependencies in Gradle Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Include the necessary Decaton and Central Dogma client libraries in your build.gradle file. ```groovy dependencies { implementation "com.linecorp.decaton:decaton-centraldogma:$DECATON_VERSION" implementation "com.linecorp.centraldogma:centraldogma-client-armeria:$CENTRALDOGMA_VERSION" } ``` -------------------------------- ### Add Decaton Gradle Dependencies Source: https://github.com/line/decaton/blob/master/README.md Include these dependencies in your build.gradle file to enable task production or processing capabilities. ```groovy // For task producers implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION" implementation "com.linecorp.decaton:decaton-client:$DECATON_VERSION" // For processors implementation "com.linecorp.decaton:decaton-common:$DECATON_VERSION" implementation "com.linecorp.decaton:decaton-processor:$DECATON_VERSION" ``` -------------------------------- ### Apache License 2.0 Source: https://github.com/line/decaton/blob/master/README.md The legal license text governing the use of the Decaton framework. ```text Copyright 2020 LINE Corporation Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ``` -------------------------------- ### Use YAML for property configuration Source: https://github.com/line/decaton/blob/master/docs/dynamic-property-configuration.adoc Specify a YAML file path to use YAML-formatted properties instead of JSON. ```java CentralDogmaPropertySupplier supplier = CentralDogmaPropertySupplier.register( centralDogma, "project", "repository", "/properties.yaml"); // <1> ``` -------------------------------- ### Find PGP Key ID Source: https://github.com/line/decaton/blob/master/developer-docs/making-release.md Command to list your PGP keys and find the Key ID for migration. ```sh gpg -K ``` -------------------------------- ### Configure Key Blocking in Decaton Processor Source: https://context7.com/line/decaton/llms.txt Use `ProcessorProperties.CONFIG_IGNORE_KEYS` to specify a list of keys for which tasks should be skipped. This configuration is static and set during builder initialization. ```java import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.Property; import java.util.Arrays; import java.util.Properties; public class KeyBlockingExample { public static ProcessorSubscription createBlockingSubscription(Properties consumerConfig) { return SubscriptionBuilder .newBuilder("blocking-processor") .processorsBuilder( ProcessorsBuilder.consuming( "my-decaton-topic", new ProtocolBuffersDeserializer<>(PrintMessageTask.parser())) .thenProcess(new PrintMessageTaskProcessor()) ) .consumerConfig(consumerConfig) .properties( StaticPropertySupplier.of( // Block specific keys - tasks with these keys are skipped Property.ofStatic( ProcessorProperties.CONFIG_IGNORE_KEYS, Arrays.asList( "malicious-user-123", "spam-source-456", "blocked-ip-789" ) ) ) ) .buildAndStart(); } } // For dynamic blocking, use CentralDogmaPropertySupplier // Update /decaton/processor-config.json to add/remove blocked keys: /* { "decaton.ignore.keys": ["malicious-user-123", "new-blocked-user"] } */ ``` -------------------------------- ### Export PGP Secret Key for Gradle Source: https://github.com/line/decaton/blob/master/developer-docs/making-release.md Exports the PGP secret key to a file for use with Gradle. Ensure KEY_ID is replaced with your actual key ID. ```sh $ gpg --export-secret-keys KEY_ID > ~/.gradle/pgp-keyring.gpg ``` -------------------------------- ### Async Task Completion with Kafka Producer Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc Use `ProcessingContext#deferCompletion` to manually manage task completion when integrating with asynchronous middleware clients like Kafka's Producer. Ensure `#complete` is called exactly once to avoid processor stalls. ```java public class PrintMessageTaskProcessorAsync implements DecatonProcessor { Producer producer; @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { Completion completion = context.deferCompletion(); // <1> producer.send(new ProducerRecord<>("next-topic", "Hello" + task.getName()), (metadata, exception) -> completion.complete()); } } ``` -------------------------------- ### Implement Synchronous Task Processing Source: https://context7.com/line/decaton/llms.txt Implement DecatonProcessor for synchronous task processing. Task completion is automatic upon method return. Access task metadata like latency and retry count. ```java import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.Completion; // Simple synchronous processor public class PrintMessageTaskProcessor implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { // Access task metadata long deliveryLatency = System.currentTimeMillis() - context.metadata().timestampMillis(); int retryCount = context.metadata().retryCount(); String sourceApp = context.metadata().sourceApplicationId(); // Process the task System.out.printf("Processing task: name=%s, age=%d, latency=%dms, retries=%d%n", task.getName(), task.getAge(), deliveryLatency, retryCount); // Task is automatically completed when this method returns } } ``` -------------------------------- ### Implement DecatonProcessor for PrintMessageTask Source: https://github.com/line/decaton/blob/master/docs/getting-started.adoc This class implements the DecatonProcessor interface to process PrintMessageTask objects. It prints the name and age of the task. ```java public class PrintMessageTaskProcessor implements DecatonProcessor { @Override public void process(ProcessingContext context, PrintMessageTask task) throws InterruptedException { System.out.printf("Noticed %s is %d years old\n", task.getName(), task.getAge()); } } ```