### Java Test Setup with TestKit Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/testing.md Provides a complete example of testing classic actors in Java using `TestKit`. It illustrates actor system setup, actor creation, and message assertions using `testActor`. ```java import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; import org.apache.pekko.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.assertEquals; public class TestKitSampleTest { static ActorSystem system; @BeforeClass public static void setup() { system = ActorSystem.create("testSystem"); } @AfterClass public static void teardown() { if (system != null) { TestKit.shutdownActorSystem(system); } } @Test public void testMyActor() throws Exception { final TestKit testKit = new TestKit(system); ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor"); // sent the message to the actor myActor.tell("hello", testKit.getRef()); // check the response testKit.expectMsg("hello"); } // Dummy actor for the test public static class MyActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(String.class, message -> { getSender().tell(message, getSelf()); }) .build(); } } } ``` -------------------------------- ### Spawn Actor with Setup - Java Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/actor-lifecycle.md Use `Behaviors.setup` in Java to get the `ActorContext` for spawning child actors or accessing `context.getSelf()`. ```java import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.Behaviors; public class HelloWorldMain { public static Behavior main() { return Behaviors.setup( context -> { // You can access the ActorContext here // For example, to spawn a child actor: // context.spawn(HelloWorld.create(), "helloWorld"); return Behaviors.empty(); }); } } ``` -------------------------------- ### Auction Setup (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/replicated-eventsourcing-auction.md Initializes the auction entity in Java with its starting parameters, including the minimum bid. This sets up the initial state for the auction. ```java BigDecimal initialBid = new BigDecimal(100); ReplicatedEventSourcedBehavior behavior = ReplicatedEventSourcedBehavior.create( this, Auction.TypeKey, new AuctionState(initialBid, "", initialBid, true), (state, command) -> handleCommand(state, command), (state, event) -> handleEvent(state, event)) .withTagger(event -> Set.of("auction")) .withRecoveryCompleted(recoveryState -> { // Recovery completed logic if needed }); ``` -------------------------------- ### Auction Setup (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/replicated-eventsourcing-auction.md Initializes the auction entity with its starting parameters, including the minimum bid. This sets up the initial state for the auction. ```scala val initialBid = BigDecimal(100) val behavior = ReplicatedEventSourcedBehavior.withActorContext[AuctionCommand, AuctionEvent, AuctionState]( context = context.asScala, entityTypeKey = Auction.TypeKey, emptyState = AuctionState.initial(initialBid), commandHandler = (state, cmd) => handleCommand(state, cmd), eventHandler = (state, evt) => handleEvent(state, evt)) .withTagger(event => TaggedEvent.create("auction")) .withRecoveryCompleted(recoveryState => { // Recovery completed logic if needed }) ``` -------------------------------- ### GSet Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Demonstrates adding elements to a grow-only set using GSet in Java. Requires distributed data setup. ```java GSet gset = GSet.empty(); GSet gset1 = gset.add("a").add("b"); GSet gset2 = gset.add("b").add("c"); GSet merged = gset1.merge(gset2); merged.getElements().equals(java.util.stream.Collectors.toSet("a", "b", "c")); ``` -------------------------------- ### PNCounter Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Demonstrates the usage of PNCounter for positive and negative increments in Java. Requires setup for distributed data. ```java PNCounter pnCounter = new PNCounter(); PNCounter pnCounter1 = pnCounter.increment(10).increment(5); PNCounter pnCounter2 = pnCounter.increment(2).decrement(3); PNCounter merged = pnCounter1.merge(pnCounter2); merged.getValue() == 14; ``` -------------------------------- ### PNCounterMap Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Demonstrates managing multiple counters atomically using PNCounterMap in Java. Requires distributed data setup. ```java PNCounterMap map = PNCounterMap.empty(); PNCounterMap map1 = map.increment("a", 10).increment("b", 5); PNCounterMap map2 = map.increment("a", 2).decrement("b", 3); PNCounterMap merged = map1.merge(map2); merged.get("a").getValue() == 12; merged.get("b").getValue() == 2; ``` -------------------------------- ### Router Strategy Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/routers.md Demonstrates the usage of different router strategies in Java. This snippet is typically used to showcase the basic setup and configuration of a router. ```java final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RoundRobinPool.create(5)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RandomPool.create(5)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(SmallestMailboxPool.create(5)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(BroadcastPool.create(5)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RoundRobinGroup.create(Arrays.asList("/user/a", "/user/b"))).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RandomGroup.create(Arrays.asList("/user/a", "/user/b"))).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(BroadcastGroup.create(Arrays.asList("/user/a", "/user/b"))).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RoundRobinPool.create(5).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RandomPool.create(5).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(SmallestMailboxPool.create(5).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(BroadcastPool.create(5).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RoundRobinGroup.create(Arrays.asList("/user/a", "/user/b")).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(RandomGroup.create(Arrays.asList("/user/a", "/user/b")).preferLocalRoutees(true)).build(); final Router router = Routers.pool(String.class).withActorSystem(actorSystem).withRouter(BroadcastGroup.create(Arrays.asList("/user/a", "/user/b")).preferLocalRoutees(true)).build() ``` -------------------------------- ### Main application setup for blocking test (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/dispatchers.md Sets up and runs a test scenario involving blocking and non-blocking actors. This example highlights thread starvation when blocking actors share the default dispatcher. ```java import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.javadsl.Behaviors; public class BlockingDispatcherTest { public static void main(String[] args) throws InterruptedException { ActorSystem system = ActorSystem.create(Behaviors.setup(context -> { var blockingActor = context.spawn(BlockingActor.create(), "blockingActor"); var printActor = context.spawn(PrintActor.create(), "printActor"); // Send messages to both actors for (int i = 0; i < 100; i++) { blockingActor.tell(new BlockingActor.Process(printActor.unsafeUpcast())); printActor.tell(new PrintActor.Print("Message " + i)); } return Behaviors.receiveSignal((context, signal) -> { context.getLog().info("Received signal: {}", signal.toString()); return Behaviors.stopped(); }); }), "BlockingDispatcherTest"); // Allow time for messages to be processed before stopping Thread.sleep(10000); system.terminate(); } } ``` -------------------------------- ### Lease Configuration Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/coordination.md Demonstrates how to configure a specific lease implementation class and its properties in Java. ```java LeaseSettings leaseSettings = LeaseSettings.create(system.settings().config().getConfig("pekko.coordination.lease")); Lease myLease = new MyJavaLease(leaseSettings, system); ``` -------------------------------- ### Sink.lastOption Java Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Sink/lastOption.md This example demonstrates how to use Sink.lastOption in Java to get the last element of a stream as a CompletionStage of Optional. ```java import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import java.util.Optional; import java.util.concurrent.CompletionStage; public class LastOptionExample { public static void main(String[] args) { ActorSystem system = ActorSystem.create("LastOptionExample"); CompletionStage> result = Source.from(java.util.Arrays.asList(1, 2, 3)) .runWith(Sink.lastOption(), system); result.whenComplete((value, throwable) -> { if (throwable == null) { System.out.println("Last element is: " + value); // Last element is: Optional[3] } else { throwable.printStackTrace(); } }); CompletionStage> emptyResult = Source.empty() .runWith(Sink.lastOption(), system); emptyResult.whenComplete((value, throwable) -> { if (throwable == null) { System.out.println("Last element from empty stream is: " + value); // Last element from empty stream is: Optional.empty } else { throwable.printStackTrace(); } }); } } ``` -------------------------------- ### Sink.lastOption Scala Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Sink/lastOption.md This example demonstrates how to use Sink.lastOption in Scala to get the last element of a stream as a Future of Option. ```scala import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source import scala.concurrent.Future object LastOptionExample { implicit val system = ActorSystem("LastOptionExample") def main(args: Array[String]): Unit = { val result: Future[Option[Int]] = Source(List(1, 2, 3)).runWith(Sink.lastOption) result.onComplete { case scala.util.Success(value) => println(s"Last element is: $value") // Last element is: Some(3) } val emptyResult: Future[Option[Int]] = Source.empty[Int].runWith(Sink.lastOption) emptyResult.onComplete { case scala.util.Success(value) => println(s"Last element from empty stream is: $value") // Last element from empty stream is: None } } } ``` -------------------------------- ### Main Application Setup (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/stream-quickstart.md Sets up an ActorSystem for use with Pekko Streams in a Java application. ```java import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.stream.Materializer; public class Main { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("QuickStart"); Materializer mat = Materializer.createMaterializer(system); // Your stream code here // To stop the system, uncomment the following line: // system.terminate(); } } ``` -------------------------------- ### Lease Configuration Example (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/coordination.md Demonstrates how to configure a specific lease implementation class and its properties in Scala. ```scala val leaseSettings = LeaseSettings(system.settings.config.getConfig("pekko.coordination.lease")) val myLease = new MyScalaLease(leaseSettings, system) ``` -------------------------------- ### Start Subscribers Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/distributed-pub-sub.md Starts multiple subscriber actors on different nodes. This ensures that messages published to the 'content' topic are received by all instances. ```scala import org.apache.pekko.actor.typed.ActorSystem object StartSubscribers { def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Subscriber(), "pubsub-system") } } ``` -------------------------------- ### Installing an Event Adapter Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/persistence.md Install a defined EventAdapter on an EventSourcedBehavior to manage event type conversions for the journal. This is done within the behavior's setup. ```scala EventSourcedBehavior.forSpecificMessage[Command, Event, State]( persistenceId = "my-persistence-id", emptyState = State.initial, commandHandler = (_, cmd) => ???, eventHandler = (_, evt) => ??? ).eventAdapter(new MyEventAdapter()) ``` -------------------------------- ### Main Application Setup (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/stream-quickstart.md Sets up an ActorSystem implicitly for use with Pekko Streams in a Scala application. ```scala object Main extends App { implicit val system = ActorSystem("QuickStart") implicit val mat = Materializer(system) // Your stream code here // To stop the system, uncomment the following line: // system.terminate() } ``` -------------------------------- ### Sink.head Operator Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Sink/head.md This example demonstrates how to use the Sink.head operator in Java to get the first element of a stream. The stream is canceled after the first element is emitted. ```java import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.stream.javadsl.Sink; import org.apache.pekko.stream.javadsl.Source; import java.util.concurrent.CompletionStage; ActorSystem system = ActorSystem.create("SinkDocExamples"); CompletionStage result = Source.from(java.util.Arrays.asList(1, 2, 3)) .runWith(Sink.head(), system); result.whenComplete((value, exception) -> { if (exception != null) { System.out.println("Error: " + exception.getMessage()); } else { System.out.println("First element: " + value); // Prints: First element: 1 } system.terminate(); }); ``` -------------------------------- ### Sink.head Operator Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Sink/head.md This example demonstrates how to use the Sink.head operator in Scala to get the first element of a stream. The stream is canceled after the first element is emitted. ```scala import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source import scala.concurrent.Future implicit val system: ActorSystem = ActorSystem("HeadSinkSpec") val result: Future[Int] = Source(List(1, 2, 3)).runWith(Sink.head) result.onComplete { case scala.util.Success(value) => println(s"First element: $value") // Prints: First element: 1 system.terminate() case scala.util.Failure(exception) => println(s"Error: ${exception.getMessage}") system.terminate() } ``` -------------------------------- ### Set up JDK 17 and format Java files Source: https://github.com/apache/pekko/blob/main/AGENTS.md Sets the JAVA_HOME environment variable to JDK 17 and adds it to the PATH, then formats all Java files using sbt. ```shell export JAVA_HOME=$(/usr/libexec/java_home -v 17) export PATH="$JAVA_HOME/bin:$PATH" sbt javafmtAll ``` -------------------------------- ### Installing an Event Adapter (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/persistence.md Install a defined EventAdapter on an EventSourcedBehavior to manage event type conversions for the journal. This is done within the behavior's setup. ```java EventSourcedBehavior.forSpecificMessage( PersistenceId.of("", "my-persistence-id"), State.initial(), (state, cmd) -> {}, // command handler (state, evt) -> state // event handler ).withEventAdapter(new MyEventAdapter()) ``` -------------------------------- ### Start Subscribers Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/distributed-pub-sub.md Starts multiple subscriber actors on different nodes using Java. This ensures that messages published to the 'content' topic are received by all instances. ```java import org.apache.pekko.actor.typed.ActorSystem; public class StartSubscribers { public static void main(String[] args) { ActorSystem system = ActorSystem.create(Subscriber.create(), "pubsub-system"); } } ``` -------------------------------- ### Using a Setup class for parameters (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/style-guide.md Encapsulates multiple parameters into a single 'Setup' class for cleaner actor behavior definition. Useful for organizing constructor parameters. ```java public static class Setup { public final String name; public final TimerKey timerKey; public Setup(String name, TimerKey timerKey) { this.name = name; this.timerKey = timerKey; } } public Behavior counter() { return setup(context -> { Setup setup = context.getSystem().settings().config().get("my.setup.key"); // ... return receiveBuilder() .onMessage(Increment.class, cmd -> { // ... return receiveBuilder().build(); }) .build(); }); } ``` -------------------------------- ### Binary Compatibility Versioning Examples Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/common/binary-compatibility-rules.md Illustrates versioning scenarios where binary compatibility is maintained (OK) and where it is not (NO). ```text OK: 1.4.0 --> 1.5.x OK: 1.5.0 --> 1.6.x NO: 1.x.y --x 2.x.y OK: 2.0.0 --> 2.0.1 --> ... --> 2.0.n OK: 2.0.n --> 2.1.0 --> ... --> 2.1.n OK: 2.1.n --> 2.2.0 ... ``` -------------------------------- ### Extract Shard ID for Starting Entities (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/cluster-sharding.md Example of how to extract a shard ID from an entity ID when starting a new entity. This is required when `rememberEntities` is enabled in `ClusterShardingSettings`. ```java ShardIdExtractor extractShardId = entityId -> { // extract shard id from entity id // e.g. "user-" + entityId.toLong % numberOfShards // or "user-" + entityId.substring(0, entityId.lastIndexOf("-")) // if entityId is "user-123-shardNr" throw new IllegalArgumentException("Cannot extract shard id from entity id: " + entityId); }; ``` -------------------------------- ### Example Aeron Classpath Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/remoting-artery.md An example of the required jar files for the Aeron media driver classpath. Adjust version numbers as needed. ```bash agrona-2.4.1.jar:aeron-driver-1.50.4.jar:aeron-client-1.50.4.jar ``` -------------------------------- ### Extract Shard ID for Starting Entities (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/cluster-sharding.md Example of how to extract a shard ID from an entity ID when starting a new entity. This is required when `rememberEntities` is enabled in `ClusterShardingSettings`. ```scala val extractShardId: ShardIdExtractor = { case StartEntity(entityId) => // extract shard id from entity id // e.g. "user-" + entityId.toLong % numberOfShards // or "user-" + entityId.substring(0, entityId.lastIndexOf("-")) // if entityId is "user-123-shardNr" throw new IllegalArgumentException("Cannot extract shard id from entity id: " + entityId) case other => extractEntityId(other) } ``` -------------------------------- ### PNCounter Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Demonstrates the usage of PNCounter for positive and negative increments. Requires setup for distributed data. ```scala val pnCounter = PNCounter() val pnCounter1 = pnCounter.increment(10).increment(5) val pnCounter2 = pnCounter.increment(2).decrement(3) val merged = pnCounter1.merge(pnCounter2) merged.value should be (14) ``` -------------------------------- ### Basic PartitionHub Usage (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/stream-dynamic.md Demonstrates the fundamental use of PartitionHub in Java for routing elements to consumers based on a provided function. The producer's rate is automatically adjusted to the slowest consumer. ```java final Sink> partitionHub = PartitionHub.sink(String.class, (numConsumers, element) -> { // route to consumer 0 if element is even, consumer 1 if odd return element.hashCode() % 2; }); final Source producer = Source.tick(Duration.ofMillis(100), Duration.ofMillis(100), "msg"); // Attach producer to the hub final MaterializerAttributes attributes = MaterializerAttributes.create(materializer); final Pair, Sink> materialized = producer.toMat(partitionHub, Keep.both()).withAttributes(attributes).run(); // Attach consumers final Source consumer1 = materialized.first().mapAsync(1, System.out::println); final Source consumer2 = materialized.first().mapAsync(1, System.out::println); // Consumers can be attached any time after the producer has been started // The hub will start routing elements to consumers once they are attached. // If there are no consumers attached, elements are buffered and producer is backpressured. // To demonstrate, we run the consumers after a delay materialized.second().scheduleOnce(Duration.ofSeconds(5), () -> consumer1.run()); materialized.second().scheduleOnce(Duration.ofSeconds(10), () -> consumer2.run()); ``` -------------------------------- ### GSet Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Illustrates the use of GSet for grow-only sets where elements can only be added. Requires distributed data setup. ```scala val gset = GSet.empty[String] val gset1 = gset.add("a").add("b") val gset2 = gset.add("b").add("c") val merged = gset1.merge(gset2) merged.elements should be (Set("a", "b", "c")) ``` -------------------------------- ### Lease Implementation Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/coordination.md Illustrates the basic structure of a lease implementation in Java, extending the `Lease` class. ```java import org.apache.pekko.coordination.lease.javadsl.Lease; import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.coordination.lease.LeaseSettings; import java.util.concurrent.CompletionStage; public class MyJavaLease extends Lease { public MyJavaLease(LeaseSettings leaseSettings, ActorSystem system) { super(leaseSettings, system); } @Override public CompletionStage acquire() { return null; } @Override public CompletionStage release() { return null; } @Override public CompletionStage checkLease() { return null; } @Override public CompletionStage close() { return null; } @Override public CompletionStage terminated() { return null; } } ``` -------------------------------- ### MergeLatest Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md This example demonstrates merging streams of prices and quantities, emitting the latest price each time the quantity changes. Ensure all input streams have emitted at least one element before the output stream starts emitting. ```scala import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.Source import org.apache.pekko.stream.scaladsl.Flow object MergeLatest extends App { implicit val system = ActorSystem("MergeLatest") val prices = Source.tick(java.time.Duration.ofMillis(100), java.time.Duration.ofMillis(100), "price") val quantities = Source.tick(java.time.Duration.ofMillis(150), java.time.Duration.ofMillis(150), "quantity") val mergeLatestFlow = Flow.mergeLatest[String, String](quantities, eagerComplete = false) val stream = prices.via(mergeLatestFlow) stream.runForeach(println) // system.terminate() } ``` -------------------------------- ### Structuring methods within Behaviors.setup (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/style-guide.md Demonstrates structuring complex behaviors by defining methods within the Behaviors.setup block, offering an alternative to using a class for organization. ```scala Behaviors.setup[Increment] { context => val name = context.as[Setup].name val timers = context.as[Setup].timers def counter(state: CounterState): Behavior[Increment] = { Behaviors.receiveMessage[Increment] { case Increment(amount) => counter(state.copy(count = state.count + amount)) } } counter(CounterState(name, 0)) } ``` -------------------------------- ### Java Fold Example: Aggregating Elements Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md This Java example shows how to use the fold operator to aggregate elements from a stream. It starts with an initial value and applies a function to accumulate the results. Ensure the initial value is immutable. ```java import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.japi.function.Function2; import org.apache.pekko.stream.javadsl.Source; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.HashMap; public class FoldExample { public static void main(String[] args) throws Exception { ActorSystem system = ActorSystem.create("FoldExample"); List data = Arrays.asList(1, 2, 3, 4, 5, 1, 2, 3, 1, 2, 1); Source source = Source.from(data); Function2, Integer, Map> foldFunction = (acc, elem) -> { Integer count = acc.getOrDefault(elem, 0); Map newAcc = new HashMap<>(acc); newAcc.put(elem, count + 1); return newAcc; }; source.fold(new HashMap(), foldFunction).runForeach(System.out::println, system); // Output: // {1=4, 2=3, 3=2, 4=1, 5=1} system.terminate(); } } ``` -------------------------------- ### SourceQueue Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Source/queue.md Java version of the SourceQueue example. It demonstrates creating a source with a queue that buffers elements and applies an overflow strategy (e.g., dropHead) when the buffer is full. Feedback on offer acceptance is asynchronous. ```java final Source> source = Source.queue(10, OverflowStrategy.dropHead); final Pair, Source> pair = source.preMaterialize(); final SourceQueueWithComplete queue = pair.first(); final Source stream = pair.second(); final CompletionStage result = stream.runForeach(System.out::println, materializer); final ExecutorService exec = Executors.newFixedThreadPool(1); exec.submit(() -> { for (int i = 0; i < 100; i++) { queue.offer(i); } queue.complete(); }); result.toCompletableFuture().get(); exec.shutdown(); ``` -------------------------------- ### Subscriber Actor Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/distributed-pub-sub.md Defines an actor that subscribes to a topic. Ensure the actor is started on multiple nodes to receive messages. ```scala import org.apache.pekko.actor.typed.ActorRef import org.apache.pekko.actor.typed.Behavior import org.apache.pekko.actor.typed.javadsl.Behaviors import org.apache.pekko.cluster.pubsub.DistributedData.Publish import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator object Subscriber { def apply(): Behavior[String] = { Behaviors.setup[String] { context => // Subscribe to the 'content' topic context.system.receptionist ! DistributedPubSubMediator.Subscribe("content", context.self.asInstanceOf[ActorRef[DistributedPubSubMediator.SubscribeAck]]) Behaviors.receiveMessage[String] { message => context.log.info("Received message '{}' from topic 'content'", message) Behaviors.same } } } } ``` -------------------------------- ### Using a Setup class for parameters (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/style-guide.md Encapsulates multiple parameters into a single 'Setup' class for cleaner actor behavior definition. Useful for organizing constructor parameters. ```scala case class Setup(name: String, timers: Timers) val counter: Behavior[Increment] = Behaviors.setup[Increment] { context => val setup = Setup(context.as[Setup].name, context.as[Setup].timers) // ... Behaviors.receiveMessage[Increment] { case Increment(amount) => // ... Behaviors.same } } ``` -------------------------------- ### Example Aeron Properties File Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/remoting-artery.md An example configuration file for the Aeron media driver, demonstrating various tuning options. ```properties aeron.mtu.length=16384 aeron.socket.so_sndbuf=2097152 aeron.socket.so_rcvbuf=2097152 aeron.rcv.buffer.length=16384 aeron.rcv.initial.window.length=2097152 agrona.disable.bounds.checks=true aeron.threading.mode=SHARED_NETWORK # low latency settings #aeron.threading.mode=DEDICATED #aeron.sender.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy #aeron.receiver.idle.strategy=org.agrona.concurrent.BusySpinIdleStrategy # use same director in pekko.remote.artery.advanced.aeron-dir config # of the Pekko application aeron.dir=/dev/shm/aeron ``` -------------------------------- ### Wrapping DurableStateBehavior (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/durable-state/persistence.md Demonstrates how to wrap a DurableStateBehavior within Behaviors.setup in Java to access the ActorContext, for example, to get the logger. ```java import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.persistence.typed.javadsl.DurableStateBehavior; import org.apache.pekko.persistence.typed.javadsl.Effect; public class MyPersistentActorWrapper { // Assuming Command, State, and Event classes are defined elsewhere interface Command {} public static final class SomeCommand implements Command { public final String data; public SomeCommand(String data) { this.data = data; } } public static class State { public final String value; public State(String value) { this.value = value; } public static State initialState() { return new State("initial"); } } interface Event {} public static final class SomeEvent implements Event { public final String data; public SomeEvent(String data) { this.data = data; } } public static Behavior createBehavior(ActorSystem actorSystem) { return Behaviors.setup(context -> { org.apache.pekko.actor.typed.Logger log = context.getLog(); DurableStateBehavior persistentBehavior = DurableStateBehavior.receive( "my-persistence-id", State.initialState(), (state, cmd) -> { if (cmd instanceof SomeCommand) { SomeCommand someCmd = (SomeCommand) cmd; log.info("Received command with data: {}", someCmd.data); return Effect.persist(new SomeEvent(someCmd.data)) .thenRun(newState -> log.info("State updated to: {}", newState.value)); } else { return Effect.unhandled(); } }, (state, event) -> { if (event instanceof SomeEvent) { SomeEvent someEvent = (SomeEvent) event; return new State(someEvent.data); } else { return state; // Should not happen if events are handled correctly } }); return persistentBehavior; }); } } ``` -------------------------------- ### Wrapping DurableStateBehavior (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/durable-state/persistence.md Demonstrates how to wrap a DurableStateBehavior within Behaviors.setup in Scala to access the ActorContext, for example, to get the logger. ```scala import org.apache.pekko.actor.typed.ActorSystem import org.apache.pekko.actor.typed.Behavior import org.apache.pekko.actor.typed.javadsl.ActorContext import org.apache.pekko.persistence.typed.scaladsl.DurableStateBehavior object MyPersistentActorWrapper { // Assuming Command, State, and Event are defined elsewhere sealed trait Command case class SomeCommand(data: String) extends Command case class State(value: String) object State { def initialState(): State = State("initial") } sealed trait Event case class SomeEvent(data: String) extends Event def createBehavior(actorSystem: ActorSystem[_]): Behavior[Command] = { Behaviors.setup { context => val log = context.log val persistentBehavior: DurableStateBehavior[Command, State] = DurableStateBehavior.receive[Command, State]( "my-persistence-id", State.initialState(), (state, cmd) => { cmd match { case SomeCommand(data) => log.info("Received command with data: {}", data) Effect.persist(SomeEvent(data)) .thenRun(newState => log.info("State updated to: {}", newState.value)) } }, (state, event) => { event match { case SomeEvent(data) => state.copy(value = data) } } ) persistentBehavior } } } ``` -------------------------------- ### Log Configuration on Start Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/logging.md Log the complete configuration at INFO level when the actor system starts. Use with caution in production as it may expose sensitive values. ```scala pekko { # Log the complete configuration at INFO level when the actor system is started. # We do not recommend using this logging in production environments as it can include sensitive values. # This is useful when you are uncertain of what configuration is used. log-config-on-start = on } ``` -------------------------------- ### Start Standalone Aeron Media Driver with Properties File Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/remoting-artery.md Command to start the standalone Aeron media driver using a properties file for configuration. ```bash java io.aeron.driver.MediaDriver config/aeron.properties ``` -------------------------------- ### Get Cart Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/distributed-data.md Illustrates how to retrieve the current state of a shopping cart using Replicator.Get. This is a basic read operation. ```scala replicator ! Get(cartKey, ReadMajority(timeout), _ => state.cart) ``` -------------------------------- ### Create ActorSystem with SpawnProtocol Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/actor-lifecycle.md Demonstrates how to create an ActorSystem using a guardian behavior that incorporates the SpawnProtocol. This setup allows for external spawning of actors. ```scala import org.apache.pekko.actor.typed.ActorSystem import org.apache.pekko.actor.typed.SpawnProtocol object MainApp extends App { val system: ActorSystem[SpawnProtocol.Command] = ActorSystem( GuardianBehavior(), "MyActorSystem" ) // Example of spawning an actor from the outside using 'ask' // This requires importing AskPattern import org.apache.pekko.actor.typed.scaladsl.AskPattern._ import scala.concurrent.duration._ implicit val timeout: Duration = 3.seconds implicit val scheduler: org.apache.pekko.actor.typed.Scheduler = system.scheduler system.ask(SpawnProtocol.Spawn(WorkerActor(), "my-worker", _)) onComplete { case scala.util.Success(ref) => println(s"Worker actor spawned: $ref") case scala.util.Failure(ex) => println(s"Failed to spawn worker: $ex") } // To stop the system gracefully: // system.terminate() } ``` ```java import org.apache.pekko.actor.typed.ActorSystem; import org.apache.pekko.actor.typed.SpawnProtocol; import java.util.concurrent.CompletionStage; public class MainApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(GuardianBehavior.create(), "MyActorSystem"); // Example of spawning an actor from the outside using 'ask' // This requires importing AskPattern import org.apache.pekko.actor.typed.javadsl.AskPattern; import java.time.Duration; Duration timeout = Duration.ofSeconds(3); CompletionStage> futureWorker = AskPattern.ask( system, replyTo -> new SpawnProtocol.Spawn<>(WorkerActor.Command.class, WorkerActor.create(), "my-worker", replyTo), timeout, system.getScheduler()); futureWorker.whenComplete((ref, throwable) -> { if (throwable != null) { System.err.println("Failed to spawn worker: " + throwable.getMessage()); } else { System.out.println("Worker actor spawned: " + ref); } }); // To stop the system gracefully: // system.terminate(); } } ``` -------------------------------- ### PNCounterMap Example Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/distributed-data.md Shows how to manage multiple counters within a PNCounterMap for atomic replication. Requires distributed data setup. ```scala val map = PNCounterMap.empty[String] val map1 = map.increment("a", 10).increment("b", 5) val map2 = map.increment("a", 2).decrement("b", 3) val merged = map1.merge(map2) merged.get("a") should be (Some(12)) merged.get("b") should be (Some(2)) ``` -------------------------------- ### Nesting setup, withTimers, and withStash Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/style-guide.md Demonstrates how to nest setup, withTimers, and withStash methods when an actor behavior requires multiple of these functionalities. The order of nesting does not affect behavior. ```scala val typedActor: Actor[Command] = Behaviors.setup { context => Behaviors.withTimers[Command] { timers => Behaviors.withStash[Command](10) { stash => // ... behavior logic using context, timers, and stash Behaviors.same } } } ``` -------------------------------- ### Nesting setup, withTimers, and withStash (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/style-guide.md Demonstrates how to nest setup, withTimers, and withStash methods in Java when an actor behavior requires multiple of these functionalities. The order of nesting does not affect behavior. ```java Actor typedActor = Behaviors.setup(context -> { return Behaviors.withTimers(timers -> { return Behaviors.withStash(10, stash -> { // ... behavior logic using context, timers, and stash return Behaviors.same(); }); }); }); ``` -------------------------------- ### Full Lookup Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/discovery/index.md Shows a full Lookup with service name, port name, and protocol in Java. ```java Lookup.create("my-service", Optional.of("http"), Optional.of("tcp")) ``` -------------------------------- ### Obtain Read Journal Instance (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/persistence-query.md Get an instance of a ReadJournal for issuing queries. This example shows how to obtain a 'NoopJournal' instance. ```scala import org.apache.pekko.persistence.query.PersistenceQuery import org.apache.pekko.persistence.query.scaladsl.ReadJournal val query = PersistenceQuery(system).readJournalFor[ReadJournal](NoopJournal.identifier) ``` -------------------------------- ### Start ActorSystem and Send Messages Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/actors.md This snippet demonstrates how to start an ActorSystem and send initial messages to kick off actor interactions. ```scala val system = ActorSystem(HelloWorldMain.greeterBotFlow, "hello-pekko") system ! SayHello(Person("World")) system ! SayHello(Person("Pekko")) ``` -------------------------------- ### Use Extension within an Actor in Java Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/extending-pekko.md Access an extension from within a Pekko Actor in Java. This example demonstrates getting the `CountExtension`. ```java import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; public class MyActor extends AbstractActor { private final CountExtension countExtension = CountExtension.EXTENSION_ID.get(getContext().getSystem()); @Override public Receive createReceive() { return receiveBuilder() .matchEquals("getCount", msg -> { System.out.println("Count from actor: " + countExtension.count()); }) .build(); } public static Props props() { return Props.create(MyActor.class); } } ``` -------------------------------- ### Scala Test Setup with TestKit Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/testing.md Demonstrates the minimal setup for testing classic actors in Scala using `TestKit` and `WordSpec`. It shows how to create a `TestKit` instance and use `testActor` for message assertions. ```scala import org.apache.pekko.actor.ActorSystem import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.scalatest.wordspec.AnyWordSpecLike object PlainWordSpec { class SimpleActor extends Actor { def receive = { case "ping" => sender() ! "pong" } } } class PlainWordSpec extends TestKit(ActorSystem("MyActorSystem")) with ImplicitSender with AnyWordSpecLike { // "1. system and actor creation" // "2. message assertions" "An Echo actor" must { "it works" in { val echo = system.actorOf(Props[SimpleActor]) within(1000 millis) { echo ! "ping" expectMsg("pong") } } } } ``` -------------------------------- ### Use Extension within an Actor in Scala Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/extending-pekko.md Access an extension from within a Pekko Actor in Scala. This example demonstrates getting the `CountExtension`. ```scala import org.apache.pekko.actor.{Actor, ActorSystem, Props} class MyActor extends Actor { val countExtension = CountExtension(context.system) override def receive: Receive = { case "getCount" => println(s"Count from actor: ${countExtension.count()}") } } object MyActor { def props: Props = Props(new MyActor) } ``` -------------------------------- ### Basic Lookup Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/discovery/index.md Illustrates a basic Lookup with a service name in Java. ```java Lookup.create("my-service") ``` -------------------------------- ### Application Entry Point (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/guide/tutorial_2.md Provides the main entry point for the Java application, creating the actor system and starting the IotSupervisor actor. This code should be in IotMain.java. ```java public class IotMain { public static void main(String[] args) { ActorSystem system = ActorSystem.create(IotSupervisor.create(), "IotSystem"); // Example of sending a message to the supervisor // system.tell(new IotSupervisor.StartChild("Device1")); // system.terminate(); } } ``` -------------------------------- ### Setup sbt using Coursier Source: https://github.com/apache/pekko/blob/main/AGENTS.md Sets up the sbt build tool using the Coursier command-line interface. ```shell cs setup ``` -------------------------------- ### Define a range of integers Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/operators/Source/range.md Use `Source.range` to define the start and end of the integer sequence. This example shows how to import and use the `range` method. ```java import org.apache.pekko.stream.javadsl.Source; // #range-imports import java.util.concurrent.CompletionStage; // #range Source numbers = Source.range(0, 10); Source numbersWithStep = Source.range(0, 10, 2); ``` -------------------------------- ### Identity Event Adapter (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/persistence.md An example of an identity Event Adapter in Java. This serves as a starting point for custom event adaptation logic. ```java public class IdentityEventAdapter implements EventAdapter { @Override public Object toJournal(Object event) { return event; } @Override public EventSeq fromJournal(Object event, String manifest) { return EventSeq.single(event); } } ``` -------------------------------- ### Application Entry Point (Scala) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/typed/guide/tutorial_2.md Provides the main entry point for the Scala application, creating the actor system and starting the IotSupervisor actor. This code should be in IotApp.scala. ```scala object IotApp { def main(args: Array[String]): Unit = { val system = ActorSystem[IotSupervisor.Command](IotSupervisor(), "IotSystem") // Example of sending a message to the supervisor // system ! IotSupervisor.StartChild("Device1") // system.terminate() } } ``` -------------------------------- ### Subscriber Actor Example (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/distributed-pub-sub.md Defines a Java actor that subscribes to a topic. Ensure the actor is started on multiple nodes to receive messages. ```java import org.apache.pekko.actor.typed.ActorRef; import org.apache.pekko.actor.typed.Behavior; import org.apache.pekko.actor.typed.javadsl.AbstractBehavior; import org.apache.pekko.actor.typed.javadsl.ActorContext; import org.apache.pekko.actor.typed.javadsl.Behaviors; import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator; public class Subscriber extends AbstractBehavior { public static Behavior create() { return Behaviors.setup(context -> new Subscriber(context)); } private Subscriber(ActorContext context) { super(context.getSystem()); // Subscribe to the 'content' topic context.getSystem().receptionist().tell(new DistributedPubSubMediator.Subscribe("content", context.getSelf().unsafeUpcast())); } @Override public Receive createReceive() { return newReceiveBuilder().onMessage(String.class, message -> { context().getLog().info("Received message '{}' from topic 'content'", message); return this; }).build(); } } ``` -------------------------------- ### Setting up SSL Engine for TLS (Java) Source: https://github.com/apache/pekko/blob/main/docs/src/main/paradox/stream/stream-io.md Configure an `SSLEngine` for TLS encrypted connections. This involves setting up keystores and truststores, and negotiating the session details. ```java import java.io.FileInputStream; import java.security.KeyStore; import java.security.SecureRandom; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.stream.Materializer; import org.apache.pekko.stream.javadsl.Tcp; final ActorSystem system = ActorSystem.create("example"); final Materializer mat = Materializer.createMaterializer(system); char[] password = "changeit".toCharArray(); char[] ksPassword = "changeit".toCharArray(); KeyStore ks = KeyStore.getInstance("JKS"); ks.load(new FileInputStream("keystore.jks"), ksPassword); KeyStore ts = KeyStore.getInstance("JKS"); ts.load(new FileInputStream("truststore.jks"), password); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, ksPassword); TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); tmf.init(ts); SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); boolean sslNeedPeerAuth = false; String[] sslEnabledProtocols = new String[]{"TLSv1.2"}; SSLEngine engine = sslContext.createSSLEngine(); engine.setUseClientMode(false); engine.setNeedClientAuth(sslNeedPeerAuth); engine.setEnabledProtocols(sslEnabledProtocols); // Use the engine with Tcp.bindWithTls or Tcp.outgoingConnectionWithTls ```