### Build and Use an Inline Async Job Pipeline Source: https://github.com/socketry/async-job/blob/main/guides/getting-started/readme.md Create a simple inline job processing pipeline using Async::Job::Builder. This example uses an inline processor that executes jobs in the background via Async. ```ruby require "async" require "async/job" require "async/job/processor/inline" # This is how we execute a job from the queue: executor = proc do |job| puts "Processing job: #{job}" end # Create a simple inline pipeline: pipeline = Async::Job::Builder.build(executor) do # We are going to use an inline processor which processes the job in the background using Async{} enqueue Async::Job::Processor::Inline end # Enqueue a job: Async do pipeline.call("My job") # Prints "Processing job: My job" end ``` -------------------------------- ### Create and Use an Inline Async Job Pipeline Source: https://github.com/socketry/async-job/blob/main/context/getting-started.md This example demonstrates how to create a simple inline job processing pipeline using Async::Job::Builder. It defines an executor for processing jobs and uses an inline processor that executes jobs in the background via Async. ```ruby require "async" require "async/job" require "async/job/processor/inline" # This is how we execute a job from the queue: executor = proc do |job| puts "Processing job: #{job}" end # Create a simple inline pipeline: pipeline = Async::Job::Builder.build(executor) do # We are going to use an inline processor which processes the job in the background using Async{}: enqueue Async::Job::Processor::Inline end # Enqueue a job: Async do pipeline.call("My job") # Prints "Processing job: My job" end ``` -------------------------------- ### Buffer jobs with Async::Job::Processor::Aggregate Source: https://context7.com/socketry/async-job/llms.txt Use Aggregate to buffer jobs and flush them as a batch to an inner processor. Ensure to start and stop the aggregate processor. ```ruby require "async" require "async/job" executor = proc { |job| puts "Batch processing: #{job["id"]}" } inner = Async::Job::Processor::Inline.new(executor) aggregate = Async::Job::Processor::Aggregate.new(inner) Async do aggregate.start # These are buffered and flushed as a batch 10.times do |i| aggregate.call({"id" => "job-"#{i}"}) end aggregate.stop end ``` -------------------------------- ### Unified Client/Server Interface with Async::Job::Queue Source: https://context7.com/socketry/async-job/llms.txt The `Queue` object, returned by `Builder#build`, provides a unified interface for enqueuing jobs via `call` and managing the processing lifecycle with `start`/`stop`. Access internal components like `client` and `delegate`. ```ruby require "async" require "async/job" require "async/job/processor/inline" executor = proc { |job| puts "Got: #{job}" } pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Inline end # Access internals puts pipeline.client.class # => Async::Job::Processor::Inline puts pipeline.delegate.class # => Proc Async do pipeline.start # Start the server side pipeline.call("job-1") # Enqueue via the client side pipeline.call("job-2") pipeline.stop # Stop the server side end ``` -------------------------------- ### Build Inline Queue Pipeline Source: https://github.com/socketry/async-job/blob/main/guides/inline-queue/readme.md Use Async::Job::Builder to construct a pipeline that dequeues jobs using the Inline queue. This setup is suitable for background jobs within the same process. ```ruby pipeline = Async::Job::Builder.build(buffer) do dequeue Async::Job::Queue::Inline end ``` -------------------------------- ### Rate-limit jobs with Async::Job::Processor::Delayed Source: https://context7.com/socketry/async-job/llms.txt Insert a configurable sleep before processing each job to manage throughput. The default delay is 0.1 seconds. Start and stop are delegated to the underlying delegate. ```ruby require "async" require "async/job/processor/delayed" executor = proc { |job| puts "Processed: #{job}" } # Add 500ms delay between each job delayed = Async::Job::Processor::Delayed.new(executor, delay: 0.5) Async do delayed.call("job-1") # Waits 0.5s, then processes delayed.call("job-2") # Waits 0.5s, then processes delayed.call("job-3") # Waits 0.5s, then processes end ``` -------------------------------- ### Async::Job::Queue Source: https://context7.com/socketry/async-job/llms.txt The unified client and server interface for a job processing pipeline. It is returned by `Builder#build` and provides methods to enqueue jobs (`call`) and manage the server lifecycle (`start`/`stop`). Internal components like the client, server, and delegate are also accessible. ```APIDOC ## Async::Job::Queue — Unified Client/Server Interface `Queue` is the object returned by `Builder#build`. It exposes `call` to enqueue jobs via the client middleware chain, and `start`/`stop` to manage the server lifecycle. The `client`, `server`, and `delegate` attributes expose each internal component. ```ruby require "async" require "async/job" require "async/job/processor/inline" executor = proc { |job| puts "Got: #{job}" } pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Inline end # Access internals puts pipeline.client.class # => Async::Job::Processor::Inline puts pipeline.delegate.class # => Proc Async do pipeline.start # Start the server side pipeline.call("job-1") # Enqueue via the client side pipeline.call("job-2") pipeline.stop # Stop the server side end ``` ``` -------------------------------- ### Async::Job::Buffer Source: https://context7.com/socketry/async-job/llms.txt An in-memory, thread-safe, and fiber-compatible job buffer backed by `Async::Queue`. Jobs are enqueued using `call` and dequeued with `pop`. It can optionally forward enqueued jobs to a delegate and supports lifecycle delegation via `start`/`stop`. ```APIDOC ## Async::Job::Buffer — In-Memory Job Buffer `Buffer` is a thread-safe, fiber-compatible intermediate store backed by `Async::Queue`. Jobs are enqueued with `call` and dequeued with `pop`. It can forward enqueued jobs to an optional delegate and supports `start`/`stop` lifecycle delegation. ```ruby require "async" require "async/job" buffer = Async::Job::Buffer.new Async do # Producer: enqueue jobs buffer.call({"id" => "1", "task" => "send_email"}) buffer.call({"id" => "2", "task" => "resize_image"}) puts buffer.empty? # => false # Consumer: pop jobs job1 = buffer.pop puts job1["task"] # => "send_email" job2 = buffer.pop puts job2["task"] # => "resize_image" puts buffer.empty? # => true end ``` ``` -------------------------------- ### Build a Job Processing Pipeline with Builder Source: https://context7.com/socketry/async-job/llms.txt Use `Builder.build` to create a job pipeline with middleware. The pipeline acts as both a client for submitting jobs and a server for processing them. Requires `Async`, `async/job`, and a processor like `Async::Job::Processor::Inline`. ```ruby require "async" require "async/job" require "async/job/processor/inline" # The executor: any callable that receives a job executor = proc do |job| puts "Processing job: #{job.inspect}" end # Build a pipeline with an inline background processor pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Inline end Async do pipeline.call({"id" => "abc-123", "type" => "email", "to" => "user@example.com"}) # => Processing job: {"id"=>"abc-123", "type"=>"email", "to"=>"user@example.com"} end ``` -------------------------------- ### Build an inline background processing pipeline Source: https://context7.com/socketry/async-job/llms.txt Construct a job pipeline using Async::Job::Builder with a custom executor and Async::Job::Processor::Inline for fire-and-forget execution. Monitor job completion and status. ```ruby require "async" require "async/job" require "async/job/processor/inline" # Define how jobs are executed executor = proc do |job| case job["type"] when "email" puts "Sending email to #{job["to"]}" when "sms" puts "Sending SMS to #{job["phone"]}" else raise "Unknown job type: #{job["type"]}" end end # Build the pipeline pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Inline end Async do pipeline.call({"type" => "email", "to" => "alice@example.com"}) pipeline.call({"type" => "sms", "phone" => "+1-555-0100"}) pipeline.call({"type" => "email", "to" => "bob@example.com"}) sleep 0.1 # Allow async tasks to complete puts pipeline.client.complete_count # => 3 puts pipeline.client.status_string # => "C=0/3 F=0" end ``` -------------------------------- ### Add async-job Gem to Project Source: https://github.com/socketry/async-job/blob/main/context/getting-started.md Use this command to add the async-job gem to your project's Gemfile. ```shell $ bundle add async-job ``` -------------------------------- ### Async::Job::Builder.build Source: https://context7.com/socketry/async-job/llms.txt Builds a complete job processing pipeline by wrapping an executor with middleware on both enqueue and dequeue sides. It returns an Async::Job::Queue instance that serves as both the client for submitting jobs and the server for managing processing. ```APIDOC ## Async::Job::Builder.build — Build a Job Processing Pipeline `Builder.build` creates a complete job processing pipeline by wrapping a delegate executor with layered middleware on the enqueue (`enqueue`) and dequeue (`dequeue`) sides. It returns an `Async::Job::Queue` that acts as both the client (call to submit jobs) and the server (start/stop to manage processing). ```ruby require "async" require "async/job" require "async/job/processor/inline" # The executor: any callable that receives a job executor = proc do |job| puts "Processing job: #{job.inspect}" end # Build a pipeline with an inline background processor pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Inline end Async do pipeline.call({"id" => "abc-123", "type" => "email", "to" => "user@example.com"}) # => Processing job: {"id"=>"abc-123", "type"=>"email", "to"=>"user@example.com"} end ``` ``` -------------------------------- ### Full Pipeline: Aggregate + Inline Processors Source: https://context7.com/socketry/async-job/llms.txt This pipeline uses an aggregate processor to buffer jobs on the enqueue side for non-blocking, low-latency requests, and an inline processor to execute them in background async tasks. It's ideal for high-throughput scenarios where front-end latency is critical. Ensure 'async', 'async/job', 'async/job/processor/inline', and 'async/job/processor/aggregate' are required. ```ruby require "async" require "async/job" require "async/job/processor/inline" require "async/job/processor/aggregate" executor = proc { |job| puts "Executed: #{job["id"]}" } pipeline = Async::Job::Builder.build(executor) do enqueue Async::Job::Processor::Aggregate # Buffer on enqueue side (non-blocking) enqueue Async::Job::Processor::Inline # Execute in background async task end Async do pipeline.start # These return immediately; the aggregate buffers them 20.times { |i| pipeline.call({"id" => "job-"#{i}"}) sleep 0.5 # Jobs are flushed and executed in the background pipeline.stop end ``` -------------------------------- ### Inline Async Processor with Async::Job::Processor::Inline Source: https://context7.com/socketry/async-job/llms.txt The `Processor::Inline` executes jobs in background async tasks without external dependencies. It supports delayed execution via `scheduled_at` and tracks job statistics. Errors are logged and do not halt the processor. ```ruby require "async" require "async/job/processor/inline" executor = proc do |job| puts "[#{Time.now}] Processing: #{job["name"]}" end processor = Async::Job::Processor::Inline.new(executor) Async do # Immediate job processor.call({"name" => "job-a"}) # Scheduled job (runs 2 seconds from now) processor.call({"name" => "job-b", "scheduled_at" => (Time.now + 2).iso8601}) sleep 3 # Wait for both to complete puts processor.complete_count # => 2 puts processor.failed_count # => 0 puts processor.status_string # => "C=0/2 F=0" end ``` -------------------------------- ### In-Memory Job Buffer with Async::Job::Buffer Source: https://context7.com/socketry/async-job/llms.txt Use `Async::Job::Buffer` as a thread-safe, fiber-compatible in-memory queue. Jobs are enqueued with `call` and dequeued with `pop`. It supports lifecycle delegation and can forward jobs to a delegate. ```ruby require "async" require "async/job" buffer = Async::Job::Buffer.new Async do # Producer: enqueue jobs buffer.call({"id" => "1", "task" => "send_email"}) buffer.call({"id" => "2", "task" => "resize_image"}) puts buffer.empty? # => false # Consumer: pop jobs job1 = buffer.pop puts job1["task"] # => "send_email" job2 = buffer.pop puts job2["task"] # => "resize_image" puts buffer.empty? # => true end ``` -------------------------------- ### Define a custom job class with Async::Job::Generic Source: https://context7.com/socketry/async-job/llms.txt Inherit from Async::Job::Generic to create custom job types. Implement `serialize` for payload encoding and `call` for job execution logic. The `enqueue` class method is a convenience wrapper. ```ruby require "async/job/generic" class EmailJob < Async::Job::Generic def initialize(id, to:, subject:, scheduled_at: nil) super(id, scheduled_at: scheduled_at) @to = to @subject = subject end def serialize {"id" => @id, "to" => @to, "subject" => @subject, "scheduled_at" => @scheduled_at&.iso8601} end def call puts "Sending email to #{@to}: #{@subject}" end end job = EmailJob.new("email-001", to: "user@example.com", subject: "Welcome!") puts job.id # => "email-001" puts job.serialize # => {"id"=>"email-001", "to"=>"user@example.com", ...} job.call # => Sending email to user@example.com: Welcome! ``` -------------------------------- ### Serialize jobs using Async::Job::Coder Source: https://context7.com/socketry/async-job/llms.txt Utilize Coder module for job payload serialization. Supports JSON (default), Marshal (binary, Ruby-only), and MessagePack (compact, cross-language). Coder.Time normalizes various time representations. ```ruby require "async/job/coder" require "async/job/coder/marshal" require "async/job/coder/message_pack" job_data = {"id" => "123", "type" => "resize", "width" => 800, "height" => 600} # JSON (default) encoded_json = Async::Job::Coder::DEFAULT.dump(job_data) decoded_json = Async::Job::Coder::DEFAULT.parse(encoded_json) puts decoded_json["type"] # => "resize" # Marshal (Ruby-only, binary) encoded_marshal = Async::Job::Coder::Marshal::DEFAULT.dump(job_data) decoded_marshal = Async::Job::Coder::Marshal::DEFAULT.load(encoded_marshal) puts decoded_marshal["width"] # => 800 # MessagePack (compact, cross-language) encoded_msgpack = Async::Job::Coder::MessagePack::DEFAULT.dump(job_data) decoded_msgpack = Async::Job::Coder::MessagePack::DEFAULT.load(encoded_msgpack) puts decoded_msgpack["height"] # => 600 # Coder.Time — normalize various time representations puts Async::Job::Coder.Time("2025-01-15T10:00:00Z").class # => Time puts Async::Job::Coder.Time(1700000000).class # => Time puts Async::Job::Coder.Time(nil).inspect # => nil ``` -------------------------------- ### Add Aggregate Queue to Pipeline Source: https://github.com/socketry/async-job/blob/main/context/aggregate-queue.md Add the aggregate queue to an existing queue to aggregate jobs before they are enqueued into Redis. This avoids Redis latency issues affecting the front-end. ```ruby pipeline = Async::Job::Builder.build(buffer) do # Aggregating before passing the job into Redis will avoid Redis latency issues affecting the front-end: enqueue Async::Job::Queue::Aggregate dequeue Async::Job::Queue::Redis end ``` -------------------------------- ### Async::Job::Processor::Inline Source: https://context7.com/socketry/async-job/llms.txt An in-process asynchronous job processor that executes each job in a background async task using `Async::Idler`. It does not rely on external queue dependencies, supports optional `scheduled_at` timestamps for delayed execution, and tracks job statistics like `call_count`, `complete_count`, and `failed_count`. Errors are logged and do not halt the processor. ```APIDOC ## Async::Job::Processor::Inline — In-Process Async Processor `Processor::Inline` executes each job in a background async task using `Async::Idler`, without any external queue dependency. It supports optional `scheduled_at` timestamps for delayed execution and tracks `call_count`, `complete_count`, and `failed_count` statistics. Errors are logged and do not crash the processor. ```ruby require "async" require "async/job/processor/inline" executor = proc do |job| puts "[#{Time.now}] Processing: #{job["name"]}" end processor = Async::Job::Processor::Inline.new(executor) Async do # Immediate job processor.call({"name" => "job-a"}) # Scheduled job (runs 2 seconds from now) processor.call({"name" => "job-b", "scheduled_at" => (Time.now + 2).iso8601}) sleep 3 # Wait for both to complete puts processor.complete_count # => 2 puts processor.failed_count # => 0 puts processor.status_string # => "C=0/2 F=0" end ``` ``` -------------------------------- ### Async::Job::Processor::Aggregate Source: https://context7.com/socketry/async-job/llms.txt A batching processor that buffers incoming jobs in memory and flushes them in batches using a background task. It employs a double-buffer swap for low-latency enqueueing, making it suitable for scenarios like front-end request-response cycles where deferred processing latency is desired. All pending jobs are flushed before the background task terminates. ```APIDOC ## Async::Job::Processor::Aggregate — Batching Processor `Processor::Aggregate` buffers incoming jobs in memory and flushes them in batches using a background task, using a double-buffer swap for low-latency enqueueing. This is ideal for front-end request-response cycles where you want to defer processing latency. All pending jobs are flushed before the background task exits. ```ruby require "async" require "async/job/processor/aggregate" require "async/job/processor/inline" ``` ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.