### Setting Up and Running Periodic Task Example with Concurrent-Ruby Promises (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Demonstrates how to set up and run the periodic task example. It creates a `Concurrent::Cancellation` object, starts the `repeating_scheduled_task` within a future, waits briefly, resolves the cancellation origin to stop the task, and retrieves the final result. ```ruby cancellation, origin = Concurrent::Cancellation.new result = Concurrent::Promises.future(0.1, cancellation, task, &repeating_scheduled_task).run sleep 0.03 origin.resolve result.result ``` -------------------------------- ### Implementing Backpressure with Channels Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Provides an example of using a channel to coordinate producers and consumers, demonstrating how producers automatically slow down when the channel is full, creating a backpressure mechanism. ```ruby channel = Concurrent::Promises::Channel.new 2 log = Concurrent::Array.new producers = Array.new 2 do |i| Thread.new(i) do |i| 4.times do |j| log.push format "producer %d pushing %d", i, j channel.push [i, j] end end end consumers = Array.new 4 do |i| Thread.new(i) do |consumer| 2.times do |j| from, message = channel.pop log.push format "consumer %d got %d. payload %d from producer %d", consumer, j, message, from do_stuff end end end # wait for all to finish producers.map(&:join) consumers.map(&:join) ``` -------------------------------- ### Install Gems from Gemfile Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Provides the command-line instruction to install gems listed in the Gemfile using Bundler. ```Shell bundle install ``` -------------------------------- ### Installing Concurrent Ruby Edge Gem (Shell) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Provides the shell command to install the `concurrent-ruby-edge` gem directly from the command line. This gem contains edge features separate from the core gem. ```shell gem install concurrent-ruby-edge ``` -------------------------------- ### Install Concurrent Ruby Gem (Command Line) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Provides the command-line instruction to install the concurrent-ruby gem using the gem command. ```Shell gem install concurrent-ruby ``` -------------------------------- ### Installing Concurrent Ruby C Extensions Gem (Shell) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Provides the shell command to install the `concurrent-ruby-ext` gem, which contains optional C extensions for potential performance improvements under MRI. ```shell gem install concurrent-ruby-ext ``` -------------------------------- ### Selecting Over Multiple Channels Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Shows how to use `select` or `select_op` to wait for the first message available across a collection of channels, returning the channel and the message. ```ruby ch1 = Concurrent::Promises::Channel.new 2 ch2 = Concurrent::Promises::Channel.new 2 ch1.push 1 ch2.push 2 Concurrent::Promises::Channel.select([ch1, ch2]) ch1.select(ch2) Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1) Concurrent::Promises::Channel. # or `ch1.select_op(ch2)` would be equivalent select_op([ch1, ch2]). then('got number %03d from ch%d') { |(channel, value), format| format format, value, [ch1, ch2].index(channel).succ }.value! ``` -------------------------------- ### Executing Promise Operations and Mixing Calls Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Demonstrates resolving promise-based pop and push operations and shows that promise-based (`_op`) and blocking calls can be freely mixed on the same channel. ```ruby ch.pop_op.value! push_operations.map(&:value!) pop_operations = Array.new(3) { |i| ch.pop_op } ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op) pop_operations.map(&:value) ``` -------------------------------- ### Using Non-Blocking Try Variants Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Illustrates the use of `try_push` and `try_pop` methods, which attempt the operation immediately and return success or failure without blocking. ```ruby ch ch.try_push 1 ch.try_push 2 ch.try_push 3 ch.try_pop ch.try_pop ch.try_pop ``` -------------------------------- ### Creating Channel and Push Operations as Promises Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Initializes a channel and performs push operations using `push_op`, which returns Promises (Futures) instead of blocking. Demonstrates how operations can be represented asynchronously. ```Ruby ch = Concurrent::Promises::Channel.new 2 # => # push_operations = Array.new(3) { |i| ch.push_op message: i } # => [#>, # #>, # #] ``` -------------------------------- ### Selecting Over Multiple Channels Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Demonstrates how to use `select` and `select_op` to wait for the first available message from a collection of channels. Shows how to integrate futures that push results into channels and process the selected message asynchronously. ```Ruby ch1 = Concurrent::Promises::Channel.new 2 # => # ch2 = Concurrent::Promises::Channel.new 2 # => # ch1.push 1 # => # ch2.push 2 # => # Concurrent::Promises::Channel.select([ch1, ch2]) # => [#, 1] ch1.select(ch2) # => [#, 2] Concurrent::Promises.future { 3 + 4 }.then_channel_push(ch1) # => # Concurrent::Promises::Channel. # or `ch1.select_op(ch2)` would be equivalent select_op([ch1, ch2]). then('got number %03d from ch%d') { |(channel, value), format| format format, value, [ch1, ch2].index(channel).succ }.value! # => "got number 007 from ch1" ``` -------------------------------- ### Creating Concurrent-Ruby Channel Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Initializes a new Concurrent::Promises::Channel with a specified capacity. This channel can hold up to 2 messages before blocking producers. ```ruby ch = Concurrent::Promises::Channel.new 2 ``` -------------------------------- ### Synchronizing Ruby Workers with Zero-Capacity Channel Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md This example illustrates synchronization using a zero-capacity `Concurrent::Promises::Channel`. A zero-capacity channel requires a matching `push` and `pop` operation to occur simultaneously for either to complete. The snippet shows how a `push` blocks until a `pop` is initiated in another thread, and how `push_op` returns a future that resolves when a matching `pop` occurs. ```ruby channel = Concurrent::Promises::Channel.new 0 # => # thread = Thread.new { channel.pop }; sleep 0.01 # allow the thread to go to sleep thread # => # # succeeds because there is matching pop operation waiting in the thread channel.try_push(:v1) # => true # remains pending, since there is no matching operation push = channel.push_op(:v2) # => # thread.value # => :v1 # the push operation resolves as a pairing pop is called channel.pop # => :v2 push # => #> ``` -------------------------------- ### Explicitly Using Throttled Executor with future_on (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/throttle.out.md Provides an alternative, more explicit way to achieve the same throttling as the previous example by obtaining the throttled executor using `throttle.on` and passing it to `Concurrent::Promises.future_on`. ```ruby # ... Array.new(10) do |i| # create throttled future Concurrent::Promises.future_on(throttle.on(Concurrent::Promises.default_executor)) do # ... end end.map(&:value!) ``` -------------------------------- ### Creating a Concurrent Ruby Channel Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Initializes a new channel with a specified capacity using Concurrent Ruby's Promises::Channel. This channel can hold up to the given number of messages before blocking push operations. ```Ruby ch = Concurrent::Promises::Channel.new 2 # => # ``` -------------------------------- ### Using Blocking Operations with Timeouts Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Demonstrates how to add a timeout to blocking `push` and `pop` operations, causing them to return failure if the operation cannot complete within the specified time. ```ruby ch ch.push 1, 0.01 ch.push 2, 0.01 ch.push 3, 0.01 ch.pop 0.01 ch.pop 0.01 ch.pop 0.01 ``` -------------------------------- ### Creating Channel and Push Operations with Promises Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Initializes a channel and creates push operations using the `push_op` method, which returns promises. These operations may resolve immediately if capacity is available without requiring threads. ```ruby ch = Concurrent::Promises::Channel.new 2 push_operations = Array.new(3) { |i| ch.push_op message: i } ``` -------------------------------- ### Accessing Channel Object (Context for try_ variants) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Shows the channel object, providing context for the subsequent discussion about non-blocking `try_` variants of channel operations like `try_pop`, `try_push`, and `try_select`. ```Ruby ch ``` -------------------------------- ### Pushing Messages to Full Channel (Blocking) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Creates multiple threads attempting to push messages into a channel with limited capacity. Demonstrates how threads block when the channel is full. ```ruby threads = Array.new(3) { |i| Thread.new { ch.push message: i } } sleep 0.01 # let the threads run threads ``` -------------------------------- ### Run Task with Timeout Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.in.md Shows how to use `Concurrent::Cancellation` to implement a timeout. An origin is scheduled to resolve after a specific duration, which then cancels the task. It also demonstrates the `Concurrent::Cancellation.timeout` helper. ```ruby timeout = Concurrent::Cancellation.new Concurrent::Promises.schedule(0.02) # or using shortcut helper method timeout = Concurrent::Cancellation.timeout 0.02 count = Concurrent::AtomicFixnum.new Concurrent.global_io_executor.post(timeout) do |timeout| # do stuff until cancelled count.increment until timeout.canceled? end # timeout.origin.wait count.value ``` -------------------------------- ### Popping Messages from Channel (Blocking) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Creates multiple threads attempting to pop messages from a channel. Shows how threads block if the channel is empty until a new message is pushed. ```ruby threads = Array.new(3) { |i| Thread.new { ch.pop } } sleep 0.01 # let the threads run threads ch.push message: 3 threads.map(&:value) ``` -------------------------------- ### Inspect Log Variable Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md A simple command or expression to inspect the contents of the 'log' variable, typically after concurrent operations have completed, to review the sequence of events. ```text log ``` -------------------------------- ### Adding Concurrent Ruby C Extensions Gem to Gemfile (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Shows how to add the `concurrent-ruby-ext` gem to a Gemfile. This gem provides optional C extensions for performance. After adding, `bundle install` must be run. ```ruby gem 'concurrent-ruby-ext' ``` -------------------------------- ### Pushing Messages to a Full Channel with Threads (Blocking) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Creates multiple threads to push messages into a channel. Demonstrates how threads attempting to push to a full channel will block and enter a sleeping state until capacity is available. ```Ruby threads = Array.new(3) { |i| Thread.new { ch.push message: i } } sleep 0.01 # let the threads run threads # => [#, # #, # #] ``` -------------------------------- ### Spawn Simple Async Actor - Concurrent Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md Demonstrates spawning a basic `Concurrent::ErlangActor` of type `:on_thread` for a simple asynchronous task. Shows how to retrieve the final value after termination. ```Ruby actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'addition') { 1 + 1 } # => # actor.terminated.value! # => 2 ``` -------------------------------- ### Adding Concurrent Ruby Edge Gem to Gemfile (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Shows how to add the `concurrent-ruby-edge` gem to a Gemfile, specifying the required file name. After adding, `bundle install` must be run. ```ruby gem 'concurrent-ruby-edge', require: 'concurrent-edge' ``` -------------------------------- ### Combine Cancellations (AND Logic) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.in.md Shows how to manually combine cancellation origins using the bitwise AND operator (`&`) to create a new cancellation that is only triggered when *both* original cancellations have been triggered (AND logic). ```ruby cancellation_a, origin_a = Concurrent::Cancellation.new cancellation_b, origin_b = Concurrent::Cancellation.new # cancels only when both a and b is cancelled combined_cancellation = Concurrent::Cancellation.new origin_a & origin_b origin_a.resolve cancellation_a.canceled? #=> true cancellation_b.canceled? #=> false combined_cancellation.canceled? #=> false origin_b.resolve combined_cancellation.canceled? #=> true ``` -------------------------------- ### Erlang-like Linking and Trapping Exits Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Shows how to create a linked process using spawn(link: true) and how to trap exit signals from linked processes using trap. The actor receives the exit signal instead of terminating immediately. ```ruby actor = Concurrent::ErlangActor.spawn(type: :on_thread) do spawn(link: true) do # equivalent of spawn_link in Erlang terminate :err # equivalent of exit in Erlang end trap # equivalent of process_flag(trap_exit, true) receive end actor.terminated.value! ``` -------------------------------- ### Producer-Consumer with Buffered Channel (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Demonstrates a producer-consumer pattern using Concurrent::Promises::Channel with a capacity of 2. Producers and consumers are implemented as recursive promise chains running on a thread pool. It illustrates how producers are back-pressured when the channel buffer is full. ```ruby channel = Concurrent::Promises::Channel.new 2 log = Concurrent::Array.new def produce(channel, log, producer, i) log.push format "producer %d pushing %d", producer, i channel.push_op([producer, i]).then do i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done end end def consume(channel, log, consumer, i) channel.pop_op.then(consumer, i) do |(from, message), consumer, i| log.push format "consumer %d got %d. payload %d from producer %d", consumer, i, message, from do_stuff i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done end end producers = Array.new 2 do |i| Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run end consumers = Array.new 4 do |i| Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run end # wait for all to finish producers.map(&:value!) consumers.map(&:value!) # investigate log log ``` -------------------------------- ### Using try_push and try_pop with Concurrent-Ruby Channel Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Demonstrates the non-blocking `try_push` and `try_pop` methods on a `Concurrent::Promises::Channel`. These methods return `true`/`false` or the popped value/`nil` immediately without blocking, indicating success or failure due to channel capacity. ```ruby ch.try_push 1 ch.try_push 2 ch.try_push 3 ch.try_pop ch.try_pop ch.try_pop ``` -------------------------------- ### Interacting with ErlangActor using tell and ask Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Demonstrates sending messages to an actor using `tell` for asynchronous communication and `ask` for synchronous communication that waits for a reply. Also shows how to send a termination message and wait for the actor to finish. ```Ruby # tell returns immediately returning the actor actor.tell(1).tell(1) # blocks, waiting for the answer actor.ask 10 # stop the actor actor.tell :done # The final value of the actor actor.terminated.value! ``` -------------------------------- ### Running Producer with Backpressure Actor Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Starts the producer process as a future and waits for its completion. This demonstrates how the producer interacts with the slow counter actor (initialized in previous steps) and how the backpressure mechanism works due to the actor's limited mailbox. ```ruby Concurrent::Promises.future(actor, 0, &produce).run.wait! actor.termination.value! ``` -------------------------------- ### Execute Task on Cancellation Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.in.md Illustrates how to attach a block of code to the cancellation origin using `chain`. This block is executed immediately after the cancellation is triggered, allowing for cleanup, logging, or planning follow-up actions. ```ruby cancellation.origin.chain do # This block is executed after the Cancellation is cancelled # It can then log cancellation or e.g. plan new re-execution end ``` -------------------------------- ### Implementing Backpressure with Concurrent-Ruby Channel and Threads Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Shows how to use a `Concurrent::Promises::Channel` with a limited capacity to implement backpressure between producers and consumers using standard Ruby `Thread` objects. Producers will block when the channel is full, slowing down to match consumer speed. ```ruby channel = Concurrent::Promises::Channel.new 2 log = Concurrent::Array.new producers = Array.new 2 do |i| Thread.new(i) do |i| 4.times do |j| log.push format "producer %d pushing %d", i, j channel.push [i, j] end end end consumers = Array.new 4 do |i| Thread.new(i) do |consumer| 2.times do |j| from, message = channel.pop log.push format "consumer %d got %d. payload %d from producer %d", consumer, j, message, from do_stuff end end end # wait for all to finish producers.map(&:join) consumers.map(&:join) # investigate log log ``` -------------------------------- ### Implementing Backpressure with Concurrent-Ruby Channel and Promises Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Demonstrates an alternative backpressure implementation using `Concurrent::Promises::Channel` and `Concurrent::Promises` instead of threads. Producers and consumers are represented by promises that recursively push/pop, allowing thread allocation to be managed by a thread pool. ```ruby channel = Concurrent::Promises::Channel.new 2 log = Concurrent::Array.new def produce(channel, log, producer, i) log.push format "producer %d pushing %d", producer, i channel.push_op([producer, i]).then do i + 1 < 4 ? produce(channel, log, producer, i + 1) : :done end end def consume(channel, log, consumer, i) channel.pop_op.then(consumer, i) do |(from, message), consumer, i| log.push format "consumer %d got %d. payload %d from producer %d", consumer, i, message, from do_stuff i + 1 < 2 ? consume(channel, log, consumer, i + 1) : :done end end producers = Array.new 2 do |i| Concurrent::Promises.future(channel, log, i) { |*args| produce *args, 0 }.run end consumers = Array.new 4 do |i| Concurrent::Promises.future(channel, log, i) { |*args| consume *args, 0 }.run end ``` -------------------------------- ### Requiring Concurrent Ruby Gem (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/README.md Demonstrates the simple Ruby code needed to require the `concurrent-ruby` gem. The gem automatically detects and loads the C extensions if the `concurrent-ruby-ext` gem is installed. ```ruby require 'concurrent' ``` -------------------------------- ### Simple ErlangActor Receive/Reply (on_thread) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Creates a `:on_thread` actor that uses `receive` to block until a message arrives, then immediately `reply`es with the received message. Demonstrates the blocking nature of `receive` in this type. ```Ruby ping = Concurrent::ErlangActor.spawn(type: :on_thread) { reply receive } ping.ask 42 ``` -------------------------------- ### Message Pattern Matching with `on` and `:keep` (Pool) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Shows how the :keep option can simplify pool-based actors by retaining the receive rules after processing a message, eliminating the need for explicit recursive calls to body. ```ruby actor = Concurrent::ErlangActor.spawn(type: :on_pool) do receive(on(Symbol) { |s| reply s.to_s }, on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ }, # put last works as else on(ANY) do |v| reply :bad_message terminate [:bad_message, v] end, keep: true) end actor.ask 1 actor.ask 2 actor.ask :value # this malformed message will terminate the actor actor.ask -1 # the actor is no longer alive, so ask fails actor.ask "junk" rescue $! actor.terminated.result ``` -------------------------------- ### Synchronization with Zero-Capacity Channel (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Illustrates how a Concurrent::Promises::Channel with a capacity of 0 can be used for direct synchronization. A push operation will only succeed when a corresponding pop operation is waiting, and vice-versa, effectively pairing operations. ```ruby channel = Concurrent::Promises::Channel.new 0 thread = Thread.new { channel.pop }; sleep 0.01 # # allow the thread to go to sleep thread # succeeds because there is matching pop operation waiting in the thread channel.try_push(:v1) # remains pending, since there is no matching operation push = channel.push_op(:v2) thread.value # the push operation resolves as a pairing pop is called channel.pop push ``` -------------------------------- ### Parallel Tasks with Shared Cancellation (No Failure) - Concurrent Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.out.md Similar to the previous example, this demonstrates parallel tasks sharing a `Concurrent::Cancellation`. Without the simulated random error, the tasks would continue until explicitly cancelled or completed, illustrating the basic setup. ```ruby cancellation, origin = Concurrent::Cancellation.new # => # tasks = 4.times.map do |i| Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i| count = 0 100.times do break count = :cancelled if cancellation.canceled? count += 1 sleep 0.001 # if rand > 0.95 # origin.resolve # raise 'random error' # end count end end end # => [#, # #, # #, # #] Concurrent::Promises.zip(*tasks).result ``` -------------------------------- ### Including/Extending Concurrent-Ruby Promises Factory Methods (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Illustrates two ways to make the factory methods available: including the module in a class definition to use them as instance methods, or extending the module into another module to use them as module methods. Both examples show creating a `resolvable_event`. ```ruby Class.new do include Concurrent::Promises::FactoryMethods def a_method resolvable_event end end.new.a_method # => # mod = Module.new do extend Concurrent::Promises::FactoryMethods end mod.resolvable_event # => # ``` -------------------------------- ### Flattening Nested Concurrent Ruby Promises with flat Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Introduces the `flat` method as the correct way to wait for an inner future within a promise chain without blocking threads. The example shows a future that returns another future, which is then flattened to get the final result. ```ruby Concurrent::Promises.future { Concurrent::Promises.future { 1+1 } }.flat.value! ``` -------------------------------- ### Message Pattern Matching with `on` (Thread) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Shows how to use receive with multiple on clauses to dispatch messages based on type and conditions. Includes handling symbols, positive numbers, and a fallback for any other message type, terminating the actor on bad messages. ```ruby actor = Concurrent::ErlangActor.spawn(type: :on_thread) do while true receive(on(Symbol) { |s| reply s.to_s }, on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ }, # put last works as else on(ANY) do |v| reply :bad_message terminate [:bad_message, v] end) end end actor.ask 1 actor.ask 2 actor.ask :value # this malformed message will terminate the actor actor.ask -1 # the actor is no longer alive, so ask fails actor.ask "junk" rescue $! actor.terminated.result ``` -------------------------------- ### Resolving Promise-Based Channel Operations Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Resolves pending promise-based pop and push operations using `value!`. Shows how mixing blocking (`push`) and non-blocking (`pop_op`) operations works and how pending pop operations are resolved when a message is pushed. ```Ruby ch.pop_op.value! # => {:message=>0} push_operations.map(&:value!) # => [#, # #, # #] pop_operations = Array.new(3) { |i| ch.pop_op } # => [#1}>, # #2}>, # #] ch.push message: 3 # (push|pop) can be freely mixed with (push_o|pop_op) pop_operations.map(&:value) # => [{:message=>1}, {:message=>2}, {:message=>3}] ``` -------------------------------- ### Run Async Task Until Cancelled Gracefully - Concurrent Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.out.md Demonstrates how to create a cancellable asynchronous task using `Concurrent::Cancellation`. The task runs in a future until the `origin` is resolved, allowing for graceful shutdown by checking `cancellation.canceled?`. ```ruby cancellation, origin = Concurrent::Cancellation.new # => # # - origin is used for cancelling, resolve it to cancel # - cancellation is passed down to tasks for cooperative cancellation async_task = Concurrent::Promises.future(cancellation) do |cancellation| # Do work repeatedly until it is cancelled do_stuff until cancellation.canceled? :stopped_gracefully end # => # sleep 0.01 # => 0 # Wait a bit then stop the thread by resolving the origin of the cancellation origin.resolve # => # async_task.value! # => :stopped_gracefully ``` -------------------------------- ### Creating and Using the Computer Actor in Concurrent Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Demonstrates how to spawn the `Computer` actor, send it multiple `:run` messages with job lambdas using `ask`, which returns futures. It then shows how to check the actor's status using `ask(:status)` and retrieve the results of the jobs by calling `value!` on the returned futures. ```ruby computer = Concurrent::Actor.spawn Computer, :computer results = 3.times.map { computer.ask [:run, -> { sleep 0.01; :result }] } computer.ask(:status).value! results.map(&:value!) ``` -------------------------------- ### Send Messages (Tell/Ask) to Actor - Concurrent Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md Shows how to interact with an actor using `tell` for asynchronous message sending (returns immediately) and `ask` for synchronous message sending (blocks until a reply is received). Also demonstrates sending a `:done` message to terminate the actor and retrieving its final value. ```Ruby # tell returns immediately returning the actor actor.tell(1).tell(1) # => # # blocks, waiting for the answer actor.ask 10 # => 12 # stop the actor actor.tell :done # => # # The final value of the actor actor.terminated.value! # => 12 ``` -------------------------------- ### Handling Rejection in Zipped Concurrent Ruby Promises Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Demonstrates how `Concurrent::Promises.zip` behaves when one of the zipped futures is rejected. The resulting zipped future is also rejected. The example then shows how to use `rescue` on the rejected zipped future to extract and process the rejection reason. ```ruby rejected_zip = Concurrent::Promises.zip( Concurrent::Promises.fulfilled_future(1), Concurrent::Promises.rejected_future(StandardError.new('Ups'))) ``` ```ruby rejected_zip.result ``` ```ruby rejected_zip. rescue { |reason1, reason2| (reason1 || reason2).message }. value ``` -------------------------------- ### Use Pool-Style Receive Body on Thread Actor Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md This example illustrates that the block syntax used for `receive` on pool-based actors also works correctly for thread-based actors. It demonstrates processing only `Numeric` messages and incrementing the value. ```ruby Concurrent::ErlangActor. spawn(type: :on_thread) { receive(Numeric) { |v| v.succ } }. tell('junk'). # ignored message tell(42). terminated.value! # => 43 ``` -------------------------------- ### Demonstrate ErlangActor Linking and Exit Trapping Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md This snippet shows how to use `spawn(link: true)` to create a linked process and `trap` to enable the actor to receive exit signals as messages instead of terminating immediately. The linked process terminates, sending an exit signal caught by the trapping actor. ```ruby actor = Concurrent::ErlangActor.spawn(type: :on_thread) do spawn(link: true) do # equivalent of spawn_link in Erlang terminate :err # equivalent of exit in Erlang end trap # equivalent of process_flag(trap_exit, true) receive end # => # actor.terminated.value! # => #, ``` -------------------------------- ### Parallel Processing with Shared Cancellation (Failing Task) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/cancellation.in.md Demonstrates running multiple parallel tasks that share a single cancellation origin. If any task triggers the origin (e.g., due to an error), all other tasks are cancelled. This example includes a random error that causes cancellation. ```ruby cancellation, origin = Concurrent::Cancellation.new tasks = 4.times.map do |i| Concurrent::Promises.future(cancellation, origin, i) do |cancellation, origin, i| count = 0 100.times do break count = :cancelled if cancellation.canceled? count += 1 sleep 0.001 if rand > 0.95 origin.resolve # cancel raise 'random error' end count end end end Concurrent::Promises.zip(*tasks).result ``` -------------------------------- ### Dispatch ErlangActor Messages with Keep Option on Pool Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md This example uses the `keep: true` option with `receive` on a pool-based actor. This option retains the defined receive rules until a new `receive` call replaces them, simplifying the behaviour definition by avoiding explicit recursive calls to `body`. ```ruby actor = Concurrent::ErlangActor.spawn(type: :on_pool) do receive(on(Symbol) { |s| reply s.to_s }, on(And[Numeric, -> v { v >= 0 }]) { |v| reply v.succ }, # put last works as else on(ANY) do |v| reply :bad_message terminate [:bad_message, v] end, keep: true) end # => # actor.ask 1 # => 2 actor.ask 2 # => 3 actor.ask :value # => "value" # this malformed message will terminate the actor actor.ask -1 # => :bad_message # the actor is no longer alive, so ask fails actor.ask "junk" rescue $! # => #> actor.terminated.result # => [false, nil, [:bad_message, -1]] ``` -------------------------------- ### Using Throttled Executor with Agents (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/throttle.out.md Demonstrates that the throttled executor obtained from `Concurrent::Throttle` can be used with other concurrent abstractions like `Concurrent::Agent` via the `send_via` method, ensuring that agent updates are also subject to the throttle's capacity limit. ```ruby concurrency_level = Concurrent::AtomicFixnum.new futures = Array.new(5) do |i| # create throttled future throttle.future(i) do |arg| monitor_concurrency_level(concurrency_level) { do_stuff arg } # fulfill with the observed concurrency level end end agents = Array.new(5) do |i| agent = Concurrent::Agent.new 0 # execute agent update on throttled executor agent.send_via(throttle.on(:io)) { monitor_concurrency_level(concurrency_level_throttled) { do_stuff } } agent end futures.map(&:value!) agents.each { |a| a.await }.map(&:value) ``` -------------------------------- ### Using push and pop with Timeout in Concurrent-Ruby Channel Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md Illustrates the use of `push` and `pop` methods with a timeout option on a `Concurrent::Promises::Channel`. These operations will block up to the specified timeout, returning `true`/`false` for push or the value/`nil` for pop, indicating success or timeout. ```ruby ch.push 1, 0.01 ch.push 2, 0.01 ch.push 3, 0.01 ch.pop 0.01 ch.pop 0.01 ch.pop 0.01 ``` -------------------------------- ### Limit ErlangActor Message Type on Pool Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.out.md This snippet shows how to limit the type of messages an `Concurrent::ErlangActor` processes when running on a thread pool. Similar to the thread example, only messages matching the specified type (`Numeric`) are processed. The block syntax `{ |v| v.succ }` is required for `receive` on a pool. ```ruby Concurrent::ErlangActor. spawn(type: :on_pool) { receive(Numeric) { |v| v.succ } }. tell('junk'). # ignored message tell(42). terminated.value! # => 43 ``` -------------------------------- ### Listing Concurrent-Ruby Promises Factory Methods (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Demonstrates how to list the public instance methods available within the `Concurrent::Promises::FactoryMethods` module using `instance_methods(false)` and sorting them. This reveals the various factory methods for creating Promises objects like Futures and Events. ```ruby Concurrent::Promises::FactoryMethods.instance_methods(false).sort # => [:any, # :any_event, # :any_event_on, # :any_fulfilled_future, # :any_fulfilled_future_on, # :any_resolved_future, # :any_resolved_future_on, # :delay, # :delay_on, # :fulfilled_future, # :future, # :future_on, # :make_future, # :rejected_future, # :resolvable_event, # :resolvable_event_on, # :resolvable_future, # :resolvable_future_on, # :resolved_event, # :resolved_future, # :schedule, # :schedule_on, # :zip, # :zip_events, # :zip_events_on, # :zip_futures, # :zip_futures_on, # :zip_futures_over, # :zip_futures_over_on] ``` -------------------------------- ### Implementing Backpressure with Concurrent Ruby Channel and Throttle Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Sets up a producer-consumer pattern using `Concurrent::Promises::Channel` as a buffer and `Concurrent::Throttle` to limit consumer processing speed. Defines `query_random_text` to simulate querying an API and pushing results to the channel, applying backpressure by waiting for the push to complete. Defines `count_words_in_random_text` to consume from the channel, process the data (slowly), and store results, limited by the throttle. Starts multiple query and word counter processes. ```ruby require 'json' channel = Concurrent::Promises::Channel.new 6 cancellation, origin = Concurrent::Cancellation.new def query_random_text(cancellation, channel) Concurrent::Promises.future do # for simplicity the query is omitted # url = 'some api' # Net::HTTP.get(URI(url)) sleep 0.01 { 'message' => 'Lorem ipsum rhoncus scelerisque vulputate diam inceptos' }.to_json end.then_flat_event(cancellation) do |value, cancellation| # The push to channel is fulfilled only after the message is successfully # published to the channel, therefore it will not continue querying until # current message is pushed. cancellation.origin | channel.push_op(value) # It could wait on the push indefinitely if the token is not checked # here with `or` (the pipe). end.then(cancellation) do |cancellation| # query again after the message is pushed to buffer query_random_text(cancellation, channel) unless cancellation.canceled? end end words = [] words_throttle = Concurrent::Throttle.new 1 def count_words_in_random_text(cancellation, channel, words, words_throttle) channel.pop_op.then do |response| string = JSON.load(response)['message'] # processing is slower than querying sleep 0.02 words_count = string.scan(/\w+/).size end.then_on(words_throttle.on(:io), words) do |words_count, words| # safe since throttled to only 1 task at a time words << words_count end.then_on(:io, cancellation) do |_, cancellation| # count words in next message unless cancellation.canceled? count_words_in_random_text(cancellation, channel, words, words_throttle) end end end query_processes = 3.times.map do Concurrent::Promises.future(cancellation, channel, &method(:query_random_text)).run end word_counter_processes = 2.times.map do Concurrent::Promises.future(cancellation, channel, words, words_throttle, &method(:count_words_in_random_text)).run end sleep 0.05 ``` -------------------------------- ### List Factory Methods (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.in.md Demonstrates how to list the instance methods available in the `Concurrent::Promises::FactoryMethods` module, excluding inherited methods. ```Ruby Concurrent::Promises::FactoryMethods.instance_methods(false).sort ``` -------------------------------- ### Helper Method to Inspect Object Methods (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.out.md Defines a simple helper method `inspect_methods` that takes a list of method names and an object, then returns a hash mapping each method name to the result of calling that method on the object. This is used in subsequent examples to show object states. ```ruby def inspect_methods(*methods, of:) methods.reduce({}) { |h, m| h.update m => of.send(m) } end ``` -------------------------------- ### Use Promises Convenience Module (Ruby) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.in.md Shows how factory methods are directly available on the `Concurrent::Promises` module itself for convenience, as it already extends `FactoryMethods`. ```Ruby Concurrent::Promises.resolvable_event ``` -------------------------------- ### Popping Message to Unblock Push Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.in.md Removes a message from the channel, freeing up space and allowing a previously blocked producer thread to complete its push operation. ```ruby ch.pop threads.map(&:join) ``` -------------------------------- ### Waiting for Concurrent Ruby Futures and Inspecting Log Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/channel.out.md This snippet demonstrates how to wait for the completion of multiple concurrent operations represented by `Future` objects using `value!`. It then shows how to inspect a log variable, which contains messages generated by the producers and consumers during their execution, illustrating the interleaved nature of concurrent processing. ```ruby producers.map(&:value!) # => [:done, :done] consumers.map(&:value!) # => [:done, :done, :done, :done] # investigate log log # => ["producer 0 pushing 0", # "producer 1 pushing 0", # "producer 1 pushing 1", # "consumer 1 got 0. payload 0 from producer 1", # "consumer 2 got 0. payload 1 from producer 1", # "producer 0 pushing 1", # "producer 0 pushing 2", # "producer 0 pushing 3", # "producer 1 pushing 2", # "consumer 0 got 0. payload 0 from producer 0", # "consumer 3 got 0. payload 1 from producer 0", # "consumer 2 got 1. payload 2 from producer 0", # "consumer 1 got 1. payload 3 from producer 0", # "consumer 3 got 1. payload 3 from producer 1", # "consumer 0 got 1. payload 2 from producer 1"] ``` -------------------------------- ### Filter Messages by Type (Thread) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Creates an actor on a thread that only processes Numeric messages. It ignores the 'junk' message and processes 42, calling succ on it. ```ruby Concurrent::ErlangActor. spawn(type: :on_thread) { receive(Numeric).succ }. tell('junk'). # ignored message tell(42). terminated.value! ``` -------------------------------- ### Creating a Simple Dataflow Task in Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/dataflow.md Demonstrates the basic usage of `Concurrent::dataflow` to create a task that immediately runs as it has no dependencies. The task returns a `Future` value. ```ruby task = Concurrent::dataflow { 14 } ``` -------------------------------- ### Creating a Channel with Capacity in Concurrent-Ruby Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/promises.in.md Demonstrates how to create a channel with a specified capacity using `Concurrent::Promises::Channel.new`. ```Ruby ch1 = Concurrent::Promises::Channel.new 2 ``` -------------------------------- ### Simple Asynchronous ErlangActor (on_thread) Source: https://github.com/ruby-concurrency/concurrent-ruby/blob/master/docs-source/erlang_actor.in.md Creates a simple `Concurrent::ErlangActor` of type `:on_thread` to perform a single asynchronous calculation and waits for its completion using `terminated.value!`. ```Ruby actor = Concurrent::ErlangActor.spawn(type: :on_thread, name: 'addition') { 1 + 1 } actor.terminated.value! ```