=============== LIBRARY RULES =============== From library maintainers: - Use high-level concurrency operations like par(), race(), and timeout() when possible - For manual concurrency, always use structured concurrency with supervised scopes to ensure proper cleanup of threads and resources - Keep concurrency scopes as small as possible, creating short-lived scopes for single requests or jobs - Use useCloseableInScope() for automatic resource management that ensures cleanup on scope exit - Handle errors through either exceptions (for bugs/unexpected situations) or application errors as values using Either - Prefer either blocks with .ok() for streamlined Either handling instead of manual pattern matching - Use blocking operations freely, instead of using Futures - Use Flows for defining asynchronous data processing pipelines - Use .buffer() to create explicit asynchronous boundaries in flows when producers and consumers need decoupling - Use Flow.usingEmit for custom flow creation but never share FlowEmit instances across threads - Use channels for integrating with callback-based and reactive APIs within structured concurrency scopes - Leverage retry() with exponential backoff and jitter for resilient error handling of transient failures - Use repeat() with appropriate schedules for periodic task execution within supervised scopes - Use .pipe, .tap, uninterruptible, .discard, debug utility functions ### Basic Channel Creation Example Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md A simple Scala example demonstrating the creation of a buffered channel with the default buffer size. ```scala import ox.channels.* val c = Channel.bufferedDefault[String] ``` -------------------------------- ### Channel Type Examples Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Illustrates the creation of different channel types: rendezvous, buffered with capacity, and unlimited. ```scala import ox.channels.* val c1 = Channel.rendezvous[String] val c2 = Channel.buffered[String](5) val c3 = Channel.unlimited[String] ``` -------------------------------- ### Flows with I/O and Concurrency Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Shows how to create flows that perform I/O operations and manage concurrency. This example reads lines from a resource, processes them in parallel using `mapPar`, and drains the flow. ```scala import ox.flows._ def sendHttpRequest(entry: String): Unit = ??? Flow .fromInputStream(this.getClass().getResourceAsStream("/list.txt")) .linesUtf8 .mapPar(4)(sendHttpRequest) .runDrain() ``` -------------------------------- ### Circuit Breaker Usage Example Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/circuit-breaker.md Demonstrates how to initialize and use a CircuitBreaker in Scala with supervised execution. The circuit breaker can proactively identify unresponsive services and prevent repeated attempts. ```scala import ox.supervised import ox.resilience.* supervised: val circuitBreaker = CircuitBreaker() type T def operation: T = ??? val operationResult: Option[T] = circuitBreaker.runOrDrop(operation) ``` -------------------------------- ### Sending Data and Signalling Completion Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md A Scala example showing how to send data to a channel and signal its completion using `done()`, within a supervised scope and a forked thread. ```scala import ox.fork import ox.supervised import ox.channels.* val c = Channel.rendezvous[String] supervised: fork: c.send("Hello") c.send("World") c.done() ``` -------------------------------- ### Cron Schedule Examples Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/integrations/cron4s.md Illustrates various ways to use `CronSchedule` with different operation types (direct, Either, union), custom repeat configurations, and integrating retry logic. ```scala import ox.UnionMode import ox.scheduling.cron.CronSchedule import scala.concurrent.duration.* import ox.resilience.{RetryConfig, retry} import ox.scheduling.* import cron4s.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *") // various operation definitions - same syntax repeat(CronSchedule.fromCronExpr(cronExpr))(directOperation) repeatEither(CronSchedule.fromCronExpr(cronExpr))(eitherOperation) // infinite repeats with a custom strategy def customStopStrategy: Int => Boolean = ??? repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr), customStopStrategy))(directOperation) // custom error mode repeatWithErrorMode(UnionMode[String])(RepeatConfig(CronSchedule.fromCronExpr(cronExpr)))(unionOperation) // repeat with retry inside repeat(RepeatConfig(CronSchedule.fromCronExpr(cronExpr))) { retry(Schedule.exponentialBackoff(100.millis).maxRetries(3))(directOperation) } ``` -------------------------------- ### Sequential Flow Processing Example in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Illustrates a typical flow pipeline where transformations are applied sequentially on the calling thread. The `runToList` method collects all emitted elements into a list. ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3, 5, 6) .map(_ * 2) .filter(_ % 2 == 0) .runToList() ``` -------------------------------- ### Inheritable MDC Usage Example Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/integrations/mdc-logback.md Demonstrates how to initialize and use inheritable MDC values within a Scala application. It shows setting context values, using `MDC.put` (which is not inherited), and verifying inherited values across forked threads. ```scala import org.slf4j.MDC import ox.fork import ox.logback.InheritableMDC InheritableMDC.supervisedWhere("a" -> "1", "b" -> "2") { MDC.put("c", "3") // not inherited fork { MDC.get("a") // "1" MDC.get("b") // "2" MDC.get("c") // null }.join() MDC.get("a") // "1" MDC.get("b") // "2" MDC.get("c") // "3" } ``` -------------------------------- ### Overriding OxApp Settings Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/oxapp.md Shows how to configure OxApp behavior by overriding the `settings` method. This example customizes the exit code returned upon interruption and sets a long sleep duration. ```scala import ox.* import scala.concurrent.duration.* object MyApp extends OxApp: override def settings: OxApp.Settings = OxApp.Settings.Default.copy( interruptedExitCode = ExitCode.Failure(130) ) def run(args: Vector[String])(using Ox): ExitCode = sleep(60.seconds) ExitCode.Success ``` -------------------------------- ### Kafka Mapping Stage Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/integrations/kafka.md Demonstrates using KafkaStage to map messages and publish them as a stage in a flow. This example shows how to transform data and send it to a Kafka topic, returning Kafka RecordMetadata for further processing. ```scala import ox.flow.Flow import ox.kafka.ProducerSettings import ox.kafka.KafkaStage.* import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} val settings = ProducerSettings.default.bootstrapServers("localhost:9092") val metadatas: Flow[RecordMetadata] = Flow .fromIterable(List("a", "b", "c")) .map(msg => ProducerRecord[String, String]("my_topic", msg)) .mapPublish(settings) // process & run the metadatas flow further ``` -------------------------------- ### Integrating with Libraries using Scope Runner Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/structured-concurrency/fork-join.md Details how to use `inScopeRunner` to start forks from threads managed by other libraries. A runner obtained from a scope-managed thread can be passed to other threads to start forks within the current concurrency scope. ```scala import com.softwaremill.ox.Scope Scope.current().inScopeRunner.foreach { runner => // Pass 'runner' to another thread or library to start forks // Example: new Thread(() => runner.run(() => fork { ... })).start() } // This allows starting forks from library-managed threads as long as the scope is not complete. ``` -------------------------------- ### Allocate and Release Resources within Scope using useInScope in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/resources.md Shows how to allocate and manage resources within a concurrency scope using `useInScope`. Resources are released in reverse order of acquisition after all forks within the scope complete. This example defines custom `acquire` and `release` functions for `MyResource`. ```scala import ox.{supervised, useInScope} case class MyResource(c: Int) def acquire(c: Int): MyResource = { println(s"acquiring $c ...") MyResource(c) } def release(resource: MyResource): Unit = { println(s"releasing ${resource.c} ...") } supervised { val resource1 = useInScope(acquire(10))(release) val resource2 = useInScope(acquire(20))(release) println(s"Using $resource1 ...") println(s"Using $resource2 ...") } ``` -------------------------------- ### Run Operations with CircuitBreaker Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/circuit-breaker.md Provides examples of using the CircuitBreaker to run operations with different return types and error handling modes. It shows how to use `runOrDrop`, `runOrDropEither`, and `runOrDropWithErrorMode`, as well as integrating CircuitBreaker with a retry mechanism. ```scala import ox.UnionMode import ox.resilience.* import ox.scheduling.Schedule import ox.supervised import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? def unionOperation: String | Int = ??? supervised: val circuitBreaker = CircuitBreaker() // various operation definitions circuitBreaker.runOrDrop(directOperation) circuitBreaker.runOrDropEither(eitherOperation) // custom error mode circuitBreaker.runOrDropWithErrorMode(UnionMode[String])(unionOperation) // retry with circuit breaker inside retryEither(Schedule.exponentialBackoff(100.millis).maxRetries(3)){ circuitBreaker.runOrDrop(directOperation) match case Some(value) => Right(value) case None -> Left("Operation dropped") } ``` -------------------------------- ### Use Closeable Resource with PrintWriter in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/resources.md Demonstrates the use of `useCloseable` to acquire and automatically release a resource that implements `java.io.Closeable`. The `PrintWriter` is used here as an example, ensuring its `close()` method is called after the block finishes, even if exceptions occur. ```scala import ox.useCloseable useCloseable(new java.io.PrintWriter("test.txt")) { writer => writer.println("Hello, world!") } ``` -------------------------------- ### Create Flow from InputStream (Scala) Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/io.md Creates a `Flow[Chunk[Byte]]` from a Java `InputStream`. The flow can then be processed, for example, by decoding it to a string and transforming it. Ox handles stream closure. ```Scala import ox.flow.Flow import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) Flow .fromInputStream(inputStream) // Flow[Chunk[Byte]] .decodeStringUtf8 .map(_.toUpperCase) .runForeach(println) // "SOME INPUT" ``` ```Scala import ox.flow.Flow import java.io.ByteArrayInputStream import java.io.InputStream val inputStream: InputStream = new ByteArrayInputStream("some input".getBytes) Flow .fromInputStream(inputStream, chunkSize = 4) // Flow[Chunk[Byte]] .decodeStringUtf8 .map(_.toUpperCase) .runForeach(println) // "SOME", " INPUT" ``` -------------------------------- ### Forking with Helper Methods Requiring Ox Capability Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/structured-concurrency/fork-join.md Illustrates how helper methods designed for concurrency can require an 'Ox' capability, ensuring they are called within a valid concurrency scope. This example defines and uses a 'forkComputation' helper. ```scala import ox.{fork, Fork, Ox, sleep, supervised} import scala.concurrent.duration.* def forkComputation(p: Int)(using Ox): Fork[Int] = fork { sleep(p.seconds) p + 1 } supervised { val f1 = forkComputation(2) val f2 = forkComputation(4) (f1.join(), f2.join()) } ``` -------------------------------- ### Race Computations with `raceSuccess` Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/high-level-concurrency/race.md Demonstrates racing multiple computations using `raceSuccess`. This method returns the first successful result and interrupts the losing computation. The example shows how `raceSuccess` waits for both branches to finish, including cleanup for the interrupted one. ```scala import ox.{raceSuccess, sleep} import scala.concurrent.duration.* def computation1: Int = sleep(2.seconds) 1 def computation2: Int = sleep(1.second) 2 val result: Int = raceSuccess(computation1, computation2) // 2 ``` -------------------------------- ### Race Computations with `raceEither` Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/high-level-concurrency/race.md Illustrates racing computations that return `Either` values using `raceEither`. This method returns the first `Right` value encountered. If all computations return `Left` (considered application errors), it re-throws the first reported error. The example shows how a `Left` result doesn't immediately stop the race, but the third computation is cancelled upon completion of the second. ```scala import ox.{raceEither, sleep} import scala.concurrent.duration.* raceEither({ sleep(200.millis) Left(-1) }, { sleep(500.millis) Right("ok") }, { sleep(1.second) Right("also ok") }) ``` -------------------------------- ### Creating Sources from Values Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Demonstrates creating a source that emits a sequence of values using `Source.fromValues` within a supervised context. ```scala import ox.supervised import ox.channels.Source supervised: Source.fromValues(1, 2, 3) ``` -------------------------------- ### Select from Channels Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Presents Go-like channel selection using the `select` function. It allows waiting on multiple channel operations (send or receive) and executing the first one that becomes ready. ```scala import ox.channels._ val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) ``` -------------------------------- ### Creating Sources from Flows Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Shows how to create sources from a timed tick and an iterative sequence using `Flow` operations and `runToChannel`. ```scala import ox.supervised import ox.channels.Source import ox.flow.Flow import scala.concurrent.duration.* supervised: Flow.tick(1.second, "x").runToChannel() Flow.iterate(0)(_ + 1).runToChannel() // natural numbers ``` -------------------------------- ### Basic OxApp Entry Point Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/oxapp.md Defines a standard application entry point using the OxApp trait. It demonstrates how to fork a user thread, handle command-line arguments, and return an ExitCode for program termination. ```scala import ox.* import scala.concurrent.duration.* object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = forkUser { sleep(500.millis) println("Fork finished!") } println(s"Started app with args: ${args.mkString(", ")}!") ExitCode.Success ``` -------------------------------- ### Structured Concurrency with Fork/Join Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates structured concurrency using `supervised` and `fork`. This pattern ensures that all spawned computations are properly managed and joined, similar to `par`. ```scala import scala.concurrent.duration._ import ox._ supervised { val f1 = fork { sleep(2.seconds); 1 } val f2 = fork { sleep(1.second); 2 } (f1.join(), f2.join()) } // Returns (1, 2) ``` -------------------------------- ### Source Operations: Receiving and Creation Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Explains how to receive data from a channel using Source.receive and its safe counterpart. Also covers creating sources from values, timed ticks, and iterative sequences. ```APIDOC Source Operations: trait Source[+T]: def receive(): T - Receives a value from the channel. Blocks if the channel is empty or closed without available data. - Throws ChannelClosedException if the channel is closed and no more data is available. def receiveSafe(): T | ChannelClosed - Safely receives a value. Returns ChannelClosed.Done or ChannelClosed.Error if the channel is closed and no data is available, otherwise returns the received value. Source Creation: - Source.fromValues(values: T*): Creates a source that emits the given values sequentially. - Flow.tick(duration: Duration, value: T): Creates a source that emits 'value' periodically after 'duration'. - Flow.iterate(start: T)(f: T => T): Creates a source that emits natural numbers starting from 'start' and applying 'f' iteratively. Example Usage: import ox.supervised import ox.channels.Source import ox.flow.Flow import scala.concurrent.duration.* supervised: // Receiving from a channel (assuming 'c' is a Channel[String]) // val msg = c.receive() // Creating sources: Source.fromValues(1, 2, 3) Flow.tick(1.second, "x").runToChannel() Flow.iterate(0)(_ + 1).runToChannel() ``` -------------------------------- ### Fetch Ox Cursor Rules Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/info/ai.md This command fetches the latest Ox rules for AI coding assistants from GitHub and places them in the `.cursor/rules` directory. It clones the repository with a shallow depth and sparse checkout, copies the relevant rule files, and then cleans up the temporary repository. ```shell git clone --depth=1 --filter=blob:none --sparse https://github.com/softwaremill/ox.git && cd ox && git sparse-checkout set cursor-rules && mkdir -p ../.cursor/rules && cp cursor-rules/*.mdc ../.cursor/rules && cd .. && rm -rf ox ``` -------------------------------- ### Rate Limit Computations Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates how to limit the rate of computations using a `RateLimiter`. The `fixedWindowWithStartTime` creates a limiter that allows a specified number of operations within a time window. ```scala import scala.concurrent.duration._ import ox._ import ox.scheduler.RateLimiter supervised { val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) rateLimiter.runBlocking({ // Your computation here }) } ``` -------------------------------- ### Basic repeat Syntax Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/scheduling/repeat.md Demonstrates the fundamental structure for using the `repeat` function, which takes a schedule and an operation as arguments. It relies on the `scheduled` API underneath. ```scala import ox.scheduling.repeat repeat(schedule)(operation) ``` -------------------------------- ### Scala: Map Channel Source Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/transforming-channels.md Applies a transformation function to each element received from a source channel, creating a new channel with the transformed values. This operation starts a new virtual thread to immediately receive, transform, and send values. ```scala import ox.supervised import ox.channels.{Channel, Source} supervised: val c = Channel.rendezvous[String] val c2: Source[Int] = c.map(s => s.length()) ``` -------------------------------- ### Inspecting select results with pattern matching Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/selecting-from-channels.md Shows how to inspect the results of a `select` operation, particularly when dealing with mixed send/receive clauses, using pattern matching to distinguish satisfied clauses. ```scala import ox.channels.* val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) match case c.Sent() => println("Sent to c") case d.Received(v) => println(s"Received from d: $v") ``` -------------------------------- ### Basic Rate Limiter Usage Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/rate-limiter.md Demonstrates the fundamental usage of a RateLimiter within a supervised concurrency scope. It shows how to create a fixed window rate limiter and use its blocking and non-blocking execution methods. ```scala import ox.supervised import ox.resilience.* import scala.concurrent.duration.* supervised: val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) type T def operation: T = ??? val blockedOperation: T = rateLimiter.runBlocking(operation) val droppedOperation: Option[T] = rateLimiter.runOrDrop(operation) ``` -------------------------------- ### Configure and Instantiate CircuitBreaker Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/circuit-breaker.md Demonstrates how to create a CircuitBreaker instance, either using the default configuration or providing a custom CircuitBreakerConfig. The custom configuration allows fine-tuning parameters like failure rate thresholds, slow call durations, and window sizes. ```scala import ox.supervised import ox.resilience.* import scala.concurrent.duration.* supervised: // using default config CircuitBreaker() // or CircuitBreaker(CircuitBreakerConfig.default) // custom config val config = CircuitBreakerConfig( failureRateThreshold = PercentageThreshold(50), slowCallThreshold = PercentageThreshold(50), slowCallDurationThreshold = 10.seconds, slidingWindow = SlidingWindow.CountBased(100), minimumNumberOfCalls = 20, waitDurationOpenState = 10.seconds, halfOpenTimeoutDuration = 0.millis, numberOfCallsInHalfOpenState = 10 ) // providing config for CircuitBreaker instance CircuitBreaker(config) ``` -------------------------------- ### Scala: Timeout Computation with Duration Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/high-level-concurrency/timeout.md Demonstrates using `ox.timeout` to limit computation execution time. The `timeout` function takes a duration and a computation, returning the computation's result or throwing a `TimeoutException` if the duration is exceeded. The example shows both a successful execution and a timed-out one. ```scala import ox.timeout import scala.concurrent.duration.DurationInt import scala.util.Try def computation: Int = Thread.sleep(2000) // Simulate a long-running computation 1 // Example of a timeout occurring val result1: Try[Int] = Try(timeout(1.second)(computation)) // Expected: Failure(java.util.concurrent.TimeoutException) // Example of successful execution within the timeout val result2: Try[Int] = Try(timeout(3.seconds)(computation)) // Expected: Success(1) // A variant, `timeoutOption`, doesn't throw a `TimeoutException` on timeout, but returns `None` instead. // val result3: Option[Int] = timeoutOption(1.second)(computation) // Expected: None // val result4: Option[Int] = timeoutOption(3.seconds)(computation) // Expected: Some(1) ``` -------------------------------- ### Simple OxApp Variant Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/oxapp.md A simplified OxApp variant for applications that do not need to process command-line arguments or return a specific ExitCode. Exceptions are caught, stack traces printed, and a default exit code of 1 is returned. ```scala import ox.* object MyApp extends OxApp.Simple: def run(using Ox): Unit = println("All done!") ``` -------------------------------- ### Scala Helper Function with ForkLocal Scope Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/structured-concurrency/fork-local.md Provides an example of a helper function that sets a ForkLocal value within a passed code block. It captures the code block as a context function `Ox ?=> T` to ensure nested forks use the provided scope, preventing `StructureViolationException`. ```scala def withSpan[T](spanName: String)(f: Ox ?=> T): T = { val span = spanBuilder.startSpan(spanName) currentSpan.supervisedWhere(Some(span)) { try f finally span.end() } } ``` -------------------------------- ### Select with default clause Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/selecting-from-channels.md Demonstrates how to provide a default clause to `select`. This clause specifies a return value if no other clause can be immediately satisfied, preventing the `select` from blocking indefinitely. ```scala import ox.channels.* val c = Channel.rendezvous[Int] select(c.receiveClause, Default(5)) match case c.Received(v) => println(s"Received from d: $v") case DefaultResult(v) => println(s"No value available in c, using default: $v") ``` -------------------------------- ### Create a Cleanly Shutting Down Application Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Defines an application that extends `OxApp`, ensuring graceful shutdown when interrupted by SIGINT or SIGTERM signals. The `run` method contains the application's core logic. ```scala import ox._ object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = // ... your app's code ... // might use fork {} to create top-level background threads ExitCode.Success ``` -------------------------------- ### Schedule Configurations Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/scheduling/repeat.md Shows various ways to configure the `Schedule` for the `repeat` function, including immediate execution, fixed intervals, maximum attempts, and initial delays. It also covers custom continuation strategies. ```scala import ox.scheduling.{Schedule, repeat, RepeatConfig} import scala.concurrent.duration.* // Immediate schedule with max attempts repeat(Schedule.immediate.maxAttempts(3))(directOperation) // Fixed interval schedule repeat(Schedule.fixedInterval(100.millis).maxAttempts(3))(directOperation) // Fixed interval with initial delay repeat(Schedule.fixedInterval(100.millis).maxAttempts(3).withInitialDelay(50.millis))(directOperation) // Infinite repeats with a custom stop strategy def customStopStrategy: Int => Boolean = ??? repeat(RepeatConfig(Schedule.fixedInterval(100.millis)) \ .copy(shouldContinueOnResult = customStopStrategy))(directOperation) ``` -------------------------------- ### Integrate Flow with Imperative API Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates integrating reactive flows with an imperative API using `Flow.usingEmit`. This allows emitting elements into the flow from an external loop or event source. ```scala import ox.flows._ def readNextBatch(): List[String] = ??? Flow.usingEmit { emit => forever: readNextBatch().foreach(emit.apply) } ``` -------------------------------- ### Channel Creation Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Demonstrates the creation of various channel types: default buffered, rendezvous, custom buffered, and unlimited. Each type has different blocking characteristics for send and receive operations. ```APIDOC Channel Creation: Channel.bufferedDefault[T]: Creates a buffered channel with the default buffer size (e.g., 16). - send: Blocks only when the buffer is full. - receive: Blocks until a value is available. Channel.rendezvous[T]: Creates a rendezvous channel where sender and receiver must meet. - send: Always blocks until a receiver is ready. - receive: Always blocks until a sender is ready. Channel.buffered[T](capacity: Int): Creates a buffered channel with a specified capacity. - send: Blocks when the buffer is full. - receive: Blocks until a value is available. Channel.unlimited[T]: Creates an unlimited channel where sending never blocks. - send: Never blocks. - receive: Blocks until a value is available. Example Usage: import ox.channels.* val c = Channel.bufferedDefault[String] val c1 = Channel.rendezvous[String] val c2 = Channel.buffered[String](5) val c3 = Channel.unlimited[String] ``` -------------------------------- ### OxApp.WithEitherErrors for Error Handling Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/oxapp.md Demonstrates OxApp.WithEitherErrors for integrating direct-style error handling using Either. The run function receives an EitherError token, allowing the use of .ok() combinators to manage application errors and translate them into ExitCodes. ```scala import ox.* import ox.either.* sealed trait MyAppError case class ComputationError(msg: String) extends Exception(msg) with MyAppError object MyApp extends OxApp.WithEitherErrors[MyAppError]: def doWork(): Either[MyAppError, Unit] = Left(ComputationError("oh no")) def handleError(myAppError: MyAppError): ExitCode = myAppError match { case ComputationError(_) => ExitCode.Failure(23) } def run(args: Vector[String])(using Ox, EitherError[MyAppError]): ExitCode = doWork().ok() // will end the scope with MyAppError as `doWork` returns a Left ExitCode.Success ``` -------------------------------- ### Pipe and Tap Values Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates using `pipe` and `tap` for functional data transformation and side-effecting operations using dot-syntax. `pipe` applies a function, while `tap` performs an action and passes the value through. ```scala import ox.utils._ def compute: Int = ??? def computeMore(v: Int): Long = ??? compute .pipe(2 * _) .tap(println) .pipe(computeMore) ``` -------------------------------- ### Run Parallel Computations Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates how to run multiple computations concurrently using the `par` function. It returns a tuple of results once all computations are complete. Dependencies include the `ox` library for concurrency primitives. ```scala import scala.concurrent.duration._ import ox._ def computation1: Int = { sleep(2.seconds); 1 } def computation2: String = { sleep(1.second); "2" } val result1: (Int, String) = par(computation1, computation2) // result1 will be (1, "2") ``` -------------------------------- ### Transform Flows with Basic Operations in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Demonstrates chaining common transformation stages like `map`, `filter`, `take`, `zip`, and `interleave` to build a data processing pipeline. Flows remain lazy until explicitly run. ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3, 5, 6) .map(_ * 2) .filter(_ % 2 == 0) .take(3) .zip(Flow.repeat("a number")) .interleave(Flow.repeat((0, "also a number"))) // etc., TODO: run the flow ``` -------------------------------- ### Scala AdaptiveRetry: Basic and Scheduled Retries Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/scheduling/retries.md Demonstrates the basic usage of AdaptiveRetry with different scheduling strategies. This includes immediate retries, fixed interval retries, exponential backoff, and exponential backoff with jitter and max interval. ```scala import ox.resilience.AdaptiveRetry import ox.scheduling.{Jitter, Schedule} import scala.concurrent.duration.* def directOperation: Int = ??? val adaptive = AdaptiveRetry.default // various configs with custom schedules and default ResultPolicy adaptive.retry(Schedule.immediate.maxRetries(3))(directOperation) adaptive.retry(Schedule.fixedInterval(100.millis).maxRetries(3))(directOperation) adaptive.retry(Schedule.exponentialBackoff(100.millis).maxRetries(3))(directOperation) adaptive.retry(Schedule.exponentialBackoff(100.millis).maxRetries(3).jitter() .maxInterval(5.minutes))(directOperation) ``` -------------------------------- ### selectWithin with Sources (Scala) Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/selecting-from-channels.md Demonstrates `selectWithin` used directly with channel sources. If no data is available from any source within the specified duration, a `TimeoutException` is thrown. This is suitable for error handling patterns where timeouts are critical failures. ```scala import ox.channels.* import ox.supervised import scala.concurrent.duration.* import scala.concurrent.TimeoutException supervised: val s1 = Channel.rendezvous[Int] val s2 = Channel.rendezvous[String] try val result = selectWithin(50.millis)(s1, s2) println(s"Received: $result") catch case _: TimeoutException => println("No data available within timeout") ``` -------------------------------- ### Allocate and Manage Resources Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Shows how to allocate resources within a scope using `useCloseableInScope`. The resource is automatically closed when the scope exits, whether normally or due to an error. ```scala import java.io.PrintWriter import ox._ supervised { val writer = useCloseableInScope(new PrintWriter("test.txt")) // ... use writer ... } // writer is closed when the scope ends ``` -------------------------------- ### Resource Release on Interruption Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/oxapp.md Illustrates how OxApp facilitates clean resource management. Resources attached to scopes or managed via try-finally blocks are automatically released when the application is interrupted (e.g., via CTRL+C). ```scala import ox.* object MyApp extends OxApp: def run(args: Vector[String])(using Ox): ExitCode = releaseAfterScope: println("Releasing ...") println("Waiting ...") never ``` -------------------------------- ### Create and Ask Actor Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/actors.md Demonstrates creating an actor with a stateful object and invoking a method using the 'ask' pattern. 'ask' sends the invocation and blocks until a result is returned, propagating non-fatal exceptions to the caller. ```scala import ox.supervised import ox.channels.* class Stateful: private var counter: Int = 0 def increment(delta: Int): Int = counter += delta counter supervised: val ref = Actor.create(new Stateful) ref.ask(_.increment(5)) // blocks until the invocation completes ref.ask(_.increment(4)) // returns 9 ``` -------------------------------- ### Create and Transform Flows Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Demonstrates building a reactive flow using a functional API. It covers operations like iteration, filtering, mapping, interspersing, stateful mapping, and taking a limited number of elements. ```scala import ox.flows._ Flow.iterate(0)(_ + 1) // natural numbers .filter(_ % 2 == 0) .map(_ + 1) .intersperse(5) // compute the running total .mapStateful(0) { (state, value) => val newState = state + value (newState, newState) } .take(10) .runForeach(n => println(n.toString)) ``` -------------------------------- ### Select with mixed receive and send clauses Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/selecting-from-channels.md Illustrates using `select` with a combination of sending a value to a channel and receiving a value from another. Exactly one clause is guaranteed to be satisfied. ```scala import ox.channels.{Channel, select} val c = Channel.rendezvous[Int] val d = Channel.rendezvous[Int] select(c.sendClause(10), d.receiveClause) ``` -------------------------------- ### Select with multiple receive clauses Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/selecting-from-channels.md Demonstrates the common use case of receiving exactly one value from multiple channels. It uses a dedicated `select` variant that accepts multiple `Source`s. ```scala import ox.supervised import ox.channels.* import ox.flow.Flow import scala.annotation.tailrec import scala.concurrent.duration.* case object Tick def consumer(strings: Source[String]): Nothing = supervised { val tick = Flow.tick(1.second, Tick).runToChannel() @tailrec def doConsume(acc: Int): Nothing = select(tick, strings) match case Tick => println(s"Characters received this second: $acc") doConsume(0) case s: String => doConsume(acc + s.length) doConsume(0) } ``` -------------------------------- ### Basic Retry Operations (Scala) Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/scheduling/retries.md Demonstrates basic usage of `retry` and `retryEither` with immediate retry schedules for Scala operations. ```scala import ox.resilience.{retry, retryEither} import ox.scheduling.Schedule import scala.concurrent.duration.* def directOperation: Int = ??? def eitherOperation: Either[String, Int] = ??? // various operation signatures/error modes - same syntax retry(Schedule.immediate.maxRetries(3))(directOperation) retryEither(Schedule.immediate.maxRetries(3))(eitherOperation) ``` -------------------------------- ### Basic Supervised Scope with Fork and Join Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/structured-concurrency/fork-join.md Demonstrates the fundamental usage of a 'supervised' concurrency scope to fork two threads and wait for their results using 'join()'. This pattern is equivalent to using the 'par' combinator. ```scala import ox.{fork, sleep, supervised} import scala.concurrent.duration.* supervised { val f1 = fork { sleep(2.seconds) 1 } val f2 = fork { sleep(1.second) 2 } (f1.join(), f2.join()) } ``` -------------------------------- ### Run Flows to Process Data in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Shows how to execute a flow and consume its elements using methods like `runToList`, `runForeach`, and `runDrain`. Running a flow is a blocking operation that processes elements sequentially unless asynchronous boundaries are introduced. ```scala import ox.flow.Flow import scala.concurrent.duration.* Flow.fromValues(1, 2, 3).runToList() // List(1, 2, 3) Flow.fromValues(1, 2, 3).runForeach(println) Flow.tick(1.second, "x").runDrain() // never finishes ``` -------------------------------- ### Create Finite and Infinite Flows in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Demonstrates creating different types of flows using companion object methods. `fromValues` creates a finite flow, `tick` an infinite one, and `iterate` generates sequences. ```scala import ox.flow.Flow import scala.concurrent.duration.* Flow.fromValues(1, 2, 3) // a finite flow Flow.tick(1.second, "x") // an infinite flow, emitting "x" every second Flow.iterate(0)(_ + 1) // natural numbers ``` -------------------------------- ### Rate Limiter API Shorthands Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/rate-limiter.md Provides convenient shorthand methods for creating RateLimiter instances with specific algorithms and configurations. ```APIDOC RateLimiter Shorthand Methods: RateLimiter.fixedWindowWithStartTime(maxOperations: Int, window: FiniteDuration): RateLimiter - Description: Creates a RateLimiter using StartTimeRateLimiterAlgorithm.FixedWindow. RateLimiter.slidingWindowWithStartTime(maxOperations: Int, window: FiniteDuration): RateLimiter - Description: Creates a RateLimiter using StartTimeRateLimiterAlgorithm.SlidingWindow. RateLimiter.leakyBucket(maxTokens: Int, refillInterval: FiniteDuration): RateLimiter - Description: Creates a RateLimiter using StartTimeRateLimiterAlgorithm.LeakyBucket. RateLimiter.fixedWindowWithDuration(maxOperations: Int, window: FiniteDuration): RateLimiter - Description: Creates a RateLimiter using DurationRateLimiterAlgorithm.FixedWindow. RateLimiter.slidingWindowWithDuration(maxOperations: Int, window: FiniteDuration): RateLimiter - Description: Creates a RateLimiter using DurationRateLimiterAlgorithm.SlidingWindow. ``` -------------------------------- ### Tap for Logging Flow Elements Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Demonstrates logging elements emitted by a flow using the `.tap` method. This method allows side effects, such as printing to the console, for each element processed by the flow. The flow is then collected into a list. ```scala import ox.flow.Flow Flow.fromValues(1, 2, 3) .tap(n => println(s"Received: $n")) .runToList() ``` -------------------------------- ### Sink Operations: Sending and Signalling Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/channels.md Details how to send data to a channel using Sink.send, signal completion with Sink.done, and propagate errors with Sink.error. Safe versions are provided to avoid exceptions on closed channels. ```APIDOC Sink Operations: trait Sink[-T]: def send(value: T): Unit - Sends a value to the channel. Blocks if the channel is full (for buffered channels) or if no receiver is ready (for rendezvous channels). - Throws ChannelClosedException if the channel is closed. def sendSafe(value: T): Unit | ChannelClosed - Safely sends a value. Returns ChannelClosed.Done or ChannelClosed.Error if the channel is closed, otherwise Unit. def done(): Unit - Signals that no more data will be sent. Closes the channel. - Throws ChannelClosedException if the channel is already closed. def doneSafe(): Unit | ChannelClosed - Safely signals completion. Returns ChannelClosed.Done or ChannelClosed.Error if the channel is closed, otherwise Unit. def error(cause: Throwable): Unit - Signals an error and closes the channel. Errors are propagated downstream. - Throws ChannelClosedException if the channel is already closed. def errorSafe(cause: Throwable): Unit | ChannelClosed - Safely signals an error. Returns ChannelClosed.Done or ChannelClosed.Error if the channel is closed, otherwise Unit. Example Usage: import ox.channels.* import ox.{fork, supervised} supervised: fork: c.send("Hello") c.send("World") c.done() // Handling errors: // c.error(new RuntimeException("Something went wrong")) ``` -------------------------------- ### Subscribe to Kafka Topic Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/integrations/kafka.md Demonstrates how to read messages from a Kafka topic using KafkaFlow. It involves configuring consumer settings like group ID, bootstrap servers, and auto-offset reset, then subscribing to a specific topic and processing received messages. ```scala import ox.kafka.{ConsumerSettings, KafkaFlow, ReceivedMessage} import ox.kafka.ConsumerSettings.AutoOffsetReset val settings = ConsumerSettings.default("my_group").bootstrapServers("localhost:9092") .autoOffsetReset(AutoOffsetReset.Earliest) val topic = "my_topic" val source = KafkaFlow.subscribe(settings, topic) .runForeach { (msg: ReceivedMessage[String, String]) => ??? } ``` -------------------------------- ### Control Flow Helpers Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/utils/control-flow.md A collection of inline helper methods for managing control flow in concurrent operations. These methods impose no runtime overhead. ```APIDOC forever { ... } Repeatedly evaluates the given code block forever. Parameters: - A code block to evaluate. ``` ```APIDOC repeatWhile { ... } Repeatedly evaluates the given code block, as long as it returns `true`. Parameters: - A code block that returns a boolean. ``` ```APIDOC repeatUntil { ... } Repeatedly evaluates the given code block, until it returns `true`. Parameters: - A code block that returns a boolean. ``` ```APIDOC never Blocks the current thread indefinitely, until it is interrupted. Parameters: None. ``` ```APIDOC checkInterrupt() Checks if the current thread is interrupted, and if so, throws an `InterruptedException`. Useful in compute-intensive code that wants to cooperate in the cancellation protocol. Parameters: None. Returns: Throws `InterruptedException` if interrupted, otherwise returns normally. ``` -------------------------------- ### Basic Cron Schedule Usage Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/integrations/cron4s.md Demonstrates the fundamental usage of `CronSchedule` by creating a schedule from a cron expression string and applying it to a `repeat` operation. ```scala import ox.scheduling.cron.* import cron4s.* repeat(CronSchedule.unsafeFromString("10-35 2,4,6 * ? * *"))(operation) ``` -------------------------------- ### Basic Retry API Usage Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/scheduling/retries.md Demonstrates the fundamental way to use the retry mechanism by providing a schedule and the operation to be executed. The operation can be a by-name parameter, an Either, or a computation within an F context. ```scala import ox.resilience.retry retry(schedule)(operation) ``` -------------------------------- ### Error Handling in Structured Concurrency Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/tour.md Shows how errors thrown within `fork` blocks inside a `supervised` scope are handled. The scope will fail with the first exception encountered, and other running computations are interrupted. ```scala import scala.concurrent.duration._ import ox._ supervised { fork { sleep(1.second) println("Hello!") } fork { sleep(500.millis) throw new RuntimeException("boom!") } } // This block will throw a RuntimeException("boom!") ``` -------------------------------- ### Create Flow from Channel Source in Scala Source: https://github.com/softwaremill/ox/blob/master/generated-doc/out/streaming/flows.md Shows how to create a flow from an `ox.channels.Channel`. Elements are sent to the channel concurrently, and then consumed by the flow. ```scala import ox.channels.Channel import ox.flow.Flow import ox.{fork, supervised} val ch = Channel.bufferedDefault[Int] supervised: fork: ch.send(1) ch.send(15) ch.send(-2) ch.done() Flow.fromSource(ch) // TODO: transform the flow further & run ```