### Complete Periodic Job Setup Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/periodic-jobs.md Demonstrates defining a periodic job with a specific interval and function, adding it to the client's periodic job bundle, and later removing it. ```go // Define a periodic job that runs every hour dailyJob := river.NewPeriodicJob( river.PeriodicInterval(1 * time.Hour), func() (river.JobArgs, *river.InsertOpts) { return CleanupArgs{ OlderThanDays: 30, }, &river.InsertOpts{ Priority: 3, // lower priority } }, &river.PeriodicJobOpts{ ID: "hourly_cleanup", RunOnStart: true, }, ) // Add to client bundle := client.PeriodicJobs() handle := bundle.Add(dailyJob) // Later, remove if needed bundle.Remove(handle) ``` -------------------------------- ### Middleware Implementation Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Provides a basic structure for implementing middleware in River. The example shows optional methods for `JobInsertBegin`, `WorkerBegin`, and `WorkerEnd` hooks. ```go type MyMiddleware struct{} // Optional: called around job insertion func (m *MyMiddleware) JobInsertBegin(hook *rivertype.JobInsertBeginPayload) {} // Optional: called around job execution func (m *MyMiddleware) WorkerBegin(hook *rivertype.WorkerBeginPayload) {} func (m *MyMiddleware) WorkerEnd(hook *rivertype.WorkerEndPayload) {} ``` -------------------------------- ### Creating a SubscribeConfig Instance Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Example of initializing a SubscribeConfig to listen for specific event kinds with a buffer. ```go config := &river.SubscribeConfig{ Kinds: []river.EventKind{river.EventKindJobFailed}, BufferSize: 50, } eventChan, cleanup := client.SubscribeConfig(config) defer cleanup() ``` -------------------------------- ### Configure and Start River Client Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Initialize a new River client with a driver and configuration, including queue settings and registered workers. Start the client to begin processing jobs. ```go client, err := river.NewClient(driver, &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } if err := client.Start(ctx); err != nil { panic(err) } ``` -------------------------------- ### Hook Implementation Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Demonstrates how to implement hooks for job lifecycle events. The `HookWorkBegin` logs when a job starts, and `HookWorkEnd` logs when a job finishes, including any errors. ```go type MyHook struct{} func (h *MyHook) HookWorkBegin(ctx context.Context, job *rivertype.JobRow) error { log.Printf("Job %d starting", job.ID) return nil } func (h *MyHook) HookWorkEnd(ctx context.Context, job *rivertype.JobRow, jobErr error) error { log.Printf("Job %d finished: %v", job.ID, jobErr) return nil } ``` -------------------------------- ### Start River Client Source: https://github.com/riverqueue/river/blob/master/docs/README.md Start the River client to begin processing jobs. The client will inherit the provided context, which can be used for graceful shutdown. ```go // Run the client inline. All executed jobs will inherit from ctx: if err := riverClient.Start(ctx); err != nil { panic(err) } ``` -------------------------------- ### Config Validation Error Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/errors.md This example demonstrates a configuration error where FetchCooldown is set too low. The client creation will fail with a specific validation error message. ```go config := &river.Config{ FetchCooldown: 10 * time.Millisecond, // too small, min is 100ms } client, err := river.NewClient(driver, config) // err: "FetchCooldown must be at least 1ms" ``` -------------------------------- ### Generate SQL Code with sqlc Source: https://github.com/riverqueue/river/blob/master/docs/development.md Generate Go code from SQL files using sqlc. Ensure sqlc is installed (`brew install sqlc`) before running this command. ```shell make generate ``` -------------------------------- ### InsertManyParams Example Usage Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Demonstrates how to prepare a slice of InsertManyParams to insert multiple jobs, each with potentially different arguments and insertion options. ```go params := []river.InsertManyParams{ { Args: SendEmailArgs{To: "user1@example.com"}, Opts: &river.InsertOpts{Priority: 1}, }, { Args: SendEmailArgs{To: "user2@example.com"}, Opts: nil, // uses defaults }, } results, err := client.InsertMany(ctx, params) ``` -------------------------------- ### River Client Configuration with Queues Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Example of initializing a River client with custom configurations for multiple queues. Each queue can have its MaxWorkers set independently. ```go config := &river.Config{ Queues: map[string]river.QueueConfig{ "default": {MaxWorkers: 100}, "critical": {MaxWorkers: 50}, "low": {MaxWorkers: 10}, }, Workers: workers, } ``` -------------------------------- ### Example InsertOpts Usage Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Demonstrates how to create and use InsertOpts to customize job insertion parameters like attempts, priority, queue, scheduling time, tags, and uniqueness. ```go opts := &river.InsertOpts{ MaxAttempts: 5, Priority: 1, Queue: "critical", ScheduledAt: time.Now().Add(1 * time.Hour), Tags: []string{"email", "urgent"}, UniqueOpts: river.UniqueOpts{ ByArgs: true, ByQueue: true, }, } result, err := client.Insert(ctx, MyJobArgs{}, opts) ``` -------------------------------- ### UniqueOpts Example: ByPeriod Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Sets up uniqueness constraints based on a time period, allowing only one job within that period. ```go // Only one job per hour opts := &river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByPeriod: 1 * time.Hour, }, } ``` -------------------------------- ### Start River Client Processing Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Begin processing jobs from configured queues. Cancelling the provided context will initiate a graceful shutdown. ```go ctx, cancel := context.WithCancel(context.Background()) defuncancel() if err := client.Start(ctx); err != nil { panic(err) } ``` -------------------------------- ### Receive Events from Subscription Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Example of subscribing to events and then ranging over the event channel to receive and print event kinds. Remember to call the cleanup function. ```go // Subscribe to specific events eventChan, cleanup := client.Subscribe( river.EventKindJobCompleted, river.EventKindJobFailed, ) def cleanup() { // Call cleanup to unsubscribe } // Receive events for event := range eventChan { fmt.Printf("Event: %s\n", event.Kind) } ``` -------------------------------- ### Client Creation & Lifecycle Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Provides methods for creating, starting, and stopping the River client, managing its lifecycle. ```APIDOC ## Client Creation & Lifecycle ### Description Methods for creating, starting, and stopping the River client. ### Methods - `NewClient()`: Create a client - `Client.Start()`: Start processing - `Client.Stop()`: Graceful shutdown - `Client.StopAndCancel()`: Force stop ``` -------------------------------- ### Configure Periodic Jobs Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Specify periodic jobs to be run when the client starts. These can also be added dynamically. ```go config := &river.Config{ PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob( river.PeriodicInterval(1 * time.Hour), constructor, nil, ), }, } ``` -------------------------------- ### Complete River Configuration Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md This Go code demonstrates a comprehensive configuration for the River job queue. It includes setting up workers, defining multiple queues with specific fetch intervals and worker limits, configuring job timeouts and retry attempts, specifying retention periods for different job states, and enabling graceful shutdown and logging. ```go package main import ( "log/slog" "os" "time" "github.com/riverqueue/river" ) func exampleConfig() *river.Config { workers := river.NewWorkers() river.AddWorker(workers, &SendEmailWorker{}) river.AddWorker(workers, &ProcessPaymentWorker{}) return &river.Config{ // Database and workers Workers: workers, // Queues Queues: map[string]river.QueueConfig{ river.QueueDefault: { MaxWorkers: 100, FetchCooldown: 100 * time.Millisecond, FetchPollInterval: 1 * time.Second, }, "critical": { MaxWorkers: 50, }, "low": { MaxWorkers: 10, }, }, // Job configuration JobTimeout: 5 * time.Minute, MaxAttempts: 25, // Retry behavior RescueStuckJobsAfter: 1 * time.Hour, RetryPolicy: &river.DefaultClientRetryPolicy{}, // Job retention CompletedJobRetentionPeriod: 24 * time.Hour, CancelledJobRetentionPeriod: 24 * time.Hour, DiscardedJobRetentionPeriod: 7 * 24 * time.Hour, // Graceful shutdown SoftStopTimeout: 30 * time.Second, // Logging Logger: slog.New(slog.NewTextHandler(os.Stderr, nil)), // ID (auto-generated if not set) ID: "worker-1", // Periodic jobs PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob( river.PeriodicInterval(1 * time.Hour), func() (river.JobArgs, *river.InsertOpts) { return CleanupArgs{}, nil }, &river.PeriodicJobOpts{ID: "hourly_cleanup"}, ), }, } } ``` -------------------------------- ### Subscribe to Events with Configuration Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Example of subscribing to events using SubscribeConfig with a specified buffer size and event kinds. The cleanup function should be called to unsubscribe. ```go config := &river.SubscribeConfig{ Kinds: []river.EventKind{river.EventKindJobCompleted}, BufferSize: 100, } eventChan, cleanup := client.SubscribeConfig(config) def cleanup() { // Call cleanup to unsubscribe } ``` -------------------------------- ### Client.Start Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Starts the River client, enabling it to begin processing jobs from the configured queues. This method should be called after client initialization and before any jobs are expected to be processed. ```APIDOC ## Client.Start ### Description Starts the client to begin processing jobs from configured queues. ### Signature ```go func (c *Client[TTx]) Start(ctx context.Context) error ``` ### Parameters #### Path Parameters * None #### Query Parameters * None #### Request Body * None ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | ctx | `context.Context` | Yes | — | Context for client lifecycle. Cancelling this context initiates graceful shutdown | ### Returns `error` — Nil on successful start, error on failure. ### Throws Database connection errors, worker initialization errors. ### Example ```go ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := client.Start(ctx); err != nil { panic(err) } ``` ``` -------------------------------- ### UniqueOpts Example: Selective Fields for Uniqueness Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Illustrates how to use `ByArgs` with struct tags to selectively include specific fields for uniqueness checks. ```go type SendEmailArgs struct { CustomerID string `json:"customer_id" river:"unique"` TraceID string `json:"trace_id"` // not included in uniqueness } // Only customer_id matters for uniqueness opts := &river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, }, } ``` -------------------------------- ### UniqueOpts Example: Complex Nested Structures for Uniqueness Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Shows how to define uniqueness for nested structures by marking fields with `river:"unique"` tags. ```go type MyJobArgs struct { Customer *Customer `json:"customer"` TraceID string `json:"trace_id"` } type Customer struct { ID string `json:"id" river:"unique"` } // Only customer.id will be used for uniqueness opts := &river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, }, } ``` -------------------------------- ### Graceful Client Shutdown with Context Source: https://github.com/riverqueue/river/blob/master/docs/README.md Configure a client with a SoftStopTimeout and start it with a context that can be cancelled on signals like SIGINT/SIGTERM for graceful shutdown. Active jobs are cancelled after the timeout. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ SoftStopTimeout: 10 * time.Second, ... }) if err != nil { panic(err) } signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) deffer stop() // Stop fetching new work and wait for active jobs to finish. Cancel jobs after // SoftStopTimeout elapses. if err := riverClient.Start(signalCtx); err != nil { panic(err) } <-riverClient.Stopped() ``` -------------------------------- ### UniqueOpts Example: ByArgs Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Configures uniqueness to consider the job's arguments, ensuring only one job with identical arguments can be active. ```go // Only one job with these specific args can be active opts := &river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, }, } ``` -------------------------------- ### Example of using QueueUpdateParams Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Demonstrates how to create and use QueueUpdateParams to update a queue's maximum worker count. The updated queue information is returned upon successful update. ```go maxWorkers := 100 params := &river.QueueUpdateParams{ MaxWorkers: &maxWorkers, } queue, err := client.QueueUpdate(ctx, "default", params) ``` -------------------------------- ### Go Parallel Test Bundle Template Source: https://github.com/riverqueue/river/blob/master/AGENTS.md Illustrates the structure for a top-level Go test function using a parallel test bundle pattern. Includes setup for a test bundle and demonstrates running parallel subtests, including one that requires a context parameter. ```go func TestThing(t *testing.T) { t.Parallel() tpe testBundle struct { // Put SUT + common fixtures here. } setup := func(t *testing.T) *testBundle { t.Helper() return &testBundle{} } t.Run("CaseName", func(t *testing.T) { t.Parallel() bundle := setup(t) // ... use `bundle` in assertions/actions ... }) t.Run("CaseNameWithCtxRequiredBySetup", func(t *testing.T) { t.Parallel() setupWithCtx := func(ctx context.Context, t *testing.T) *testBundle { t.Helper() _ = ctx return &testBundle{} } ctx := context.Background() bundle := setupWithCtx(ctx, t) // ... use `bundle` in assertions/actions ... }) } ``` -------------------------------- ### List Queues with Filtering Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Lists all queues with optional filtering and pagination. Use this to get an overview of available queues or to find specific queues based on criteria. ```go result, err := client.QueueList(ctx, &river.QueueListParams{ Limit: 50, }) if err != nil { panic(err) } for _, q := range result.Queues { fmt.Printf("Queue: %s\n", q.Name) } ``` -------------------------------- ### Process Job Timing Statistics Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Example of accessing and printing job timing statistics from a completed job event. It shows how to retrieve QueueWaitDuration, RunDuration, and CompleteDuration. ```go eventChan, cleanup := client.Subscribe(river.EventKindJobCompleted) def cleanup() { // Call cleanup to unsubscribe } for event := range eventChan { stats := event.JobStats fmt.Printf("Job %d: queue_wait=%v, run=%v, complete=%v\n", event.Job.ID, stats.QueueWaitDuration, stats.RunDuration, stats.CompleteDuration) } ``` -------------------------------- ### Create a Periodic Job Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/periodic-jobs.md Use NewPeriodicJob to create a periodic job configuration. This example shows how to set a schedule, a constructor function, and optional parameters like ID and RunOnStart. ```go job := river.NewPeriodicJob( river.PeriodicInterval(1 * time.Hour), func() (river.JobArgs, *river.InsertOpts) { return MyJobArgs{}, nil }, &river.PeriodicJobOpts{ ID: "my_periodic_job", RunOnStart: true, }, ) ``` -------------------------------- ### Initialize Insert-Only Client Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Initializes a River client configured for insert-only operations, suitable when a separate service processes the jobs. No queues are configured, and Start() is not called. ```go // No Queues configured, no Start() call client, _ := river.NewClient(driver, &river.Config{ Workers: workers, // optional, for validation }) // Just insert jobs result, _ := client.Insert(ctx, args, nil) ``` -------------------------------- ### Process Job Completion and Failure Events Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Example of receiving and processing job completion and failure events from a subscription channel. It demonstrates how to access event details like Job ID and RunDuration. ```go eventChan, cleanup := client.Subscribe(river.EventKindJobCompleted) def cleanup() { // Call cleanup to unsubscribe } for event := range eventChan { switch event.Kind { case river.EventKindJobCompleted: fmt.Printf("Job %d completed in %v\n", event.Job.ID, event.JobStats.RunDuration) case river.EventKindJobFailed: fmt.Printf("Job %d failed: %s\n", event.Job.ID, event.Job.LastError) } } ``` -------------------------------- ### Job Struct and Work Method Example Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-args-and-worker.md Access job metadata via the embedded JobRow and typed arguments via the Args field within the Work method. ```go func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { fmt.Printf("Job ID: %d\n", job.ID) fmt.Printf("To: %s\n", job.Args.To) return nil } ``` -------------------------------- ### Preventing UnknownJobKindError Source: https://github.com/riverqueue/river/blob/master/_autodocs/errors.md Register all necessary workers before starting the client. If using insert-only, ensure SkipUnknownJobCheck is false to enable validation on insert. ```go // Register all workers before starting client workers := river.NewWorkers() river.AddWorker(workers, &SendEmailWorker{}) river.AddWorker(workers, &ProcessPaymentWorker{}) // If using insert-only, enable validation config := &river.Config{ Workers: workers, // enables job kind validation on insert SkipUnknownJobCheck: false, // default - check on insert } ``` -------------------------------- ### Get Queue Information Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Retrieves information about a specific queue. Use this when you need to inspect the state of a single queue without a transaction. ```go queue, err := client.QueueGet(ctx, "critical") if err != nil { panic(err) } fmt.Printf("Queue: %s, Paused: %v\n", queue.Name, queue.PausedAt != nil) ``` -------------------------------- ### Set up and Migrate Development Database Source: https://github.com/riverqueue/river/blob/master/docs/development.md Create and migrate the development database for local program execution. This is separate from the test database. ```shell createdb river_dev go run ./cmd/river migrate-up --database-url postgres:///river_dev --line main ``` -------------------------------- ### Prepare for New Version Release (Part 1) Source: https://github.com/riverqueue/river/blob/master/docs/development.md Fetch latest changes, set the new version, and update module versions. This is the initial step in the release process. ```shell git checkout master && git pull --rebase export VERSION=v0.x.y make update-mod-version git checkout -b $USER-$VERSION ``` -------------------------------- ### PeriodicJobOpts Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/periodic-jobs.md Options for configuring a periodic job. Allows setting an ID and controlling immediate execution on scheduler start. ```APIDOC ## PeriodicJobOpts ### Description Options for configuring a periodic job. Allows setting an ID and controlling immediate execution on scheduler start. ### Fields - **ID** (`string`) - Optional unique identifier for the periodic job. Defaults to "". - **RunOnStart** (`bool`) - If true, run job immediately when scheduler starts. Defaults to `false`. ### Example ```go opts := &river.PeriodicJobOpts{ ID: "daily_cleanup", RunOnStart: true, } ``` ``` -------------------------------- ### Get Job by ID Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-management.md Retrieves a job's details using its unique ID. Use this for direct job lookups. ```go func (c *Client[TTx]) JobGet(ctx context.Context, id int64) (*rivertype.JobRow, error) ``` ```go job, err := client.JobGet(ctx, 12345) if err != nil { panic(err) } fmt.Printf("Job state: %s\n", job.State) ``` -------------------------------- ### Get Underlying Database Driver Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Accesses the specific database driver being used by the client. This can be useful for advanced driver-specific operations. ```go driver := client.Driver() ``` -------------------------------- ### Create New River Client Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Instantiate a new River client with a database driver and configuration. Ensure the configuration is valid to avoid errors. ```go import "github.com/riverqueue/river/riverdriver/riverpgxv5" dbPool, _ := pgxpool.New(ctx, "postgres://...") driver := riverpgxv5.New(dbPool) client, err := river.NewClient(driver, &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } ``` -------------------------------- ### Add a new queue using QueueBundle Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Demonstrates how to add a new queue to the client using the QueueBundle interface. This involves specifying the queue name and its configuration, such as the maximum number of workers. ```go bundle := client.Queues() err := bundle.Add("new_queue", &river.QueueConfig{MaxWorkers: 50}) ``` -------------------------------- ### Initialize River Client Source: https://github.com/riverqueue/river/blob/master/docs/README.md Initialize a River client with a database pool, driver, and configuration. The config specifies queues and their worker limits, along with the registered workers. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } ``` -------------------------------- ### Get Client ID Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Retrieves the unique identifier for the River client instance. This is useful for logging or distinguishing between multiple client instances. ```go clientID := client.ID() fmt.Printf("Client ID: %s\n", clientID) ``` -------------------------------- ### Get Database Schema Name Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Retrieves the database schema name configured for this client. Returns an empty string if the default schema is in use. ```go schema := client.Schema() fmt.Printf("Schema: %s\n", schema) ``` -------------------------------- ### Batch Job Insertion Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Demonstrates efficient insertion of multiple jobs using `InsertMany` and `InsertManyFast`. `InsertMany` takes a slice of `InsertManyParams`, while `InsertManyFast` is optimized for very large batches. ```go params := []river.InsertManyParams{ {Args: Job1Args{}, Opts: nil}, {Args: Job2Args{}, Opts: nil}, } results, _ := client.InsertMany(ctx, params) // Or use fast variant for huge batches count, _ := client.InsertManyFast(ctx, params) ``` -------------------------------- ### Get Job by ID within Transaction Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-management.md Retrieves a job's details within an active database transaction. Ensures consistency for operations within the same transaction. ```go func (c *Client[TTx]) JobGetTx(ctx context.Context, tx TTx, id int64) (*rivertype.JobRow, error) ``` ```go job, err := client.JobGetTx(ctx, tx, 12345) ``` -------------------------------- ### Client.SubscribeConfig Method Signature Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Subscribes to events using an advanced configuration object. Returns an event channel and a cleanup function. ```go func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) ``` -------------------------------- ### Tagging Modules for Release Source: https://github.com/riverqueue/river/blob/master/docs/development.md Tag all project modules and the main project with the new version. This is part of the release process after preparing changes. ```shell git checkout master && git pull --rebase git tag cmd/river/$VERSION -m "release cmd/river/$VERSION" git tag riverdriver/$VERSION -m "release riverdriver/$VERSION" git tag riverdriver/riverpgxv5/$VERSION -m "release riverdriver/riverpgxv5/$VERSION" git tag riverdriver/riverdatabasesql/$VERSION -m "release riverdriver/riverdatabasesql/$VERSION" git tag riverdriver/riversqlite/$VERSION -m "release riverdriver/riversqlite/$VERSION" git tag rivershared/$VERSION -m "release rivershared/$VERSION" git tag rivertype/$VERSION -m "release rivertype/$VERSION" git tag $VERSION ``` -------------------------------- ### Get Queue Information within a Transaction Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Retrieves queue information within a database transaction. Use this when queue operations need to be part of a larger atomic unit. ```go queue, err := client.QueueGetTx(ctx, tx, "critical") ``` -------------------------------- ### PeriodicJobOpts Structure Source: https://github.com/riverqueue/river/blob/master/_autodocs/types.md Options for configuring a periodic job, including its ID and whether it should run on startup. ```go type PeriodicJobOpts struct { ID string RunOnStart bool } ``` -------------------------------- ### Enable Test-Only Behaviors Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Enable test-friendly behaviors, such as disabling jitter in maintenance service startup. This should only be used in test environments. ```go config := &river.Config{ TestOnly: true, } ``` -------------------------------- ### Use JobCancel in Worker Source: https://github.com/riverqueue/river/blob/master/_autodocs/errors.md Example of using the JobCancel function within a worker's Work method. If a specific condition is met, JobCancel is called to wrap an error, ensuring the job is cancelled. ```go func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyArgs]) error { // ... do work if someCondition { return river.JobCancel(fmt.Errorf("invalid state: %v", err)) } return nil } ``` -------------------------------- ### Worker Middleware Implementation Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-args-and-worker.md Implement the Middleware method to return worker-specific middleware. Returning nil uses only global middleware. ```go func (w *SendEmailWorker) Middleware(job *rivertype.JobRow) []rivertype.WorkerMiddleware { return nil // use global middleware only } ``` -------------------------------- ### Set Postgres Schema Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Specify the Postgres schema where River tables are located. If set, all River queries will prefix table names with this schema. The schema name must be alphanumeric with underscores, start with a letter or underscore, and be at most 47 characters long. ```go config := &river.Config{ Schema: "river_schema", } ``` -------------------------------- ### Immediately Stop and Cancel Jobs Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Stop the client and cancel any currently running jobs. This is a forceful stop and should be used with caution. ```go if err := client.StopAndCancel(ctx); err != nil { panic(err) } ``` -------------------------------- ### InsertOpts Structure Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Defines the configuration options for inserting a single job, allowing customization of attempts, metadata, priority, queue, scheduling, tags, and uniqueness. ```go type InsertOpts struct { MaxAttempts int Metadata []byte Pending bool Priority int Queue string ScheduledAt time.Time Tags []string UniqueOpts UniqueOpts } ``` -------------------------------- ### Use JobSnooze in Worker for Rescheduling Source: https://github.com/riverqueue/river/blob/master/_autodocs/errors.md Example of using the JobSnooze function within a worker's Work method. This is used to reschedule a job for a later time (e.g., 5 minutes) or to retry immediately (duration 0) without counting the attempt. ```go func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyArgs]) error { // Try external service if isBusy { // Retry in 5 minutes return river.JobSnooze(5 * time.Minute) } if contextCancelled { // Retry immediately without counting the attempt return river.JobSnooze(0) } return nil } ``` -------------------------------- ### NewClient Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Creates a new River client instance, which is the primary entry point for interacting with River's job processing capabilities. It requires a database driver and configuration to initialize. ```APIDOC ## NewClient ### Description Creates a new River client. ### Signature ```go func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client[TTx], error) ``` ### Parameters #### Path Parameters * None #### Query Parameters * None #### Request Body * None ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | driver | `riverdriver.Driver[TTx]` | Yes | — | Database driver for job persistence (e.g., `riverpgxv5.New(dbPool)`) | | config | `*Config` | Yes | — | Configuration struct with queues, workers, and other settings | ### Returns `(*Client[TTx], error)` — A new client instance ready to start, or an error if configuration is invalid. ### Throws Configuration validation errors if invalid settings are provided. ### Example ```go import "github.com/riverqueue/river/riverdriver/riverpgxv5" dbPool, _ := pgxpool.New(ctx, "postgres://...") driver := riverpgxv5.New(dbPool) client, err := river.NewClient(driver, &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } ``` ``` -------------------------------- ### Create a Worker Implementation Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Implement the Worker interface to process jobs. The Work method receives the job context and job details, returning an error to indicate success or failure. ```go type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] } func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { return sendEmail(job.Args.To, job.Args.Subject, job.Args.Body) } ``` -------------------------------- ### Run a Single Test in Isolation Source: https://github.com/riverqueue/river/blob/master/AGENTS.md Execute a specific test case in isolation. Navigate to the module directory and use `go test` with the `-run` flag. ```bash cd $MODULE && go test ./path/to/package -run TestName ``` -------------------------------- ### Client.QueueList Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Lists all available queues, with support for filtering and pagination through provided parameters. Returns a list of queues and pagination details. ```APIDOC ## Client.QueueList ### Description Lists all queues with optional filtering. ### Method Not specified (likely a client SDK method call) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **params** (*QueueListParams) - Required - Filtering and pagination parameters for the queue list. ### Request Example ```go result, err := client.QueueList(ctx, &river.QueueListParams{ Limit: 50, }) if err != nil { panic(err) } for _, q := range result.Queues { fmt.Printf("Queue: %s\n", q.Name) } ``` ### Response #### Success Response - **Result** (*QueueListResult) - Contains the list of queues and pagination information. #### Response Example (Response structure not fully detailed in source, but implies a `QueueListResult` object containing a `Queues` field) **Throws:** Database errors. ``` -------------------------------- ### Configure Global Middleware Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Define global middleware for the job lifecycle. Middleware can implement various interfaces, and their order is significant. ```go config := &river.Config{ Middleware: []rivertype.Middleware{ &TraceMiddleware{}, &MetricsMiddleware{}, }, } ``` -------------------------------- ### Access Periodic Job Management Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Obtains a bundle for managing periodic jobs. Use this to add or configure recurring jobs within the system. ```go periodics := client.PeriodicJobs() handle := periodics.Add(myPeriodicJob) ``` -------------------------------- ### Update Go or Toolchain Versions Source: https://github.com/riverqueue/river/blob/master/docs/development.md Update Go or toolchain versions across all `go.mod` files in the workspace. Modify `go.work` first, then run this command. ```shell make update-mod-go ``` -------------------------------- ### Insert Multiple Jobs in a Single Operation Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Use this method to efficiently insert multiple jobs in one go. Provide a slice of `InsertManyParams` for the jobs to be inserted. ```go jobs := []river.InsertManyParams{ {Args: MyJobArgs{Param: "1"}}, {Args: MyJobArgs{Param: "2"}}, } results, err := client.InsertMany(ctx, jobs) ``` -------------------------------- ### Configure Queues Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Map queue names to their specific configurations, including the maximum number of workers. This is required if the client will execute jobs. ```go config := &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, "critical": {MaxWorkers: 50}, "low": {MaxWorkers: 10}, }, } ``` -------------------------------- ### Define Job Arguments with Default Insert Options Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-args-and-worker.md The JobArgsWithInsertOpts interface allows specifying default insertion options per job type. Implement this to override system-wide defaults for specific jobs. ```go type JobArgsWithInsertOpts interface { JobArgs InsertOpts() InsertOpts } ``` -------------------------------- ### Configure Periodic Job Options Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/periodic-jobs.md Use PeriodicJobOpts to specify optional settings for a periodic job. This includes setting a unique ID and determining if the job should run on scheduler startup. ```go opts := &river.PeriodicJobOpts{ ID: "daily_cleanup", RunOnStart: true, } ``` -------------------------------- ### Insert a Job Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Insert a new job into the queue using the client. The result contains information about the inserted job, including its ID. ```go result, err := client.Insert(ctx, SendEmailArgs{ To: "user@example.com", Subject: "Hello", Body: "World", }, nil) if err != nil { panic(err) } fmt.Printf("Job inserted: %d\n", result.Job.ID) ``` -------------------------------- ### Configure Client ID Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Set a unique identifier for the client instance. This is auto-generated if not specified and is used for leader election and client statistics. ```go config := &river.Config{ ID: "web-server-1", } ``` -------------------------------- ### SubscribeConfig Structure Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Defines the configuration for event subscriptions, specifying event kinds and buffer size. ```go type SubscribeConfig struct { Kinds []EventKind BufferSize int } ``` -------------------------------- ### Define Job Arguments Source: https://github.com/riverqueue/river/blob/master/docs/README.md Define the arguments for a job, including JSON annotations for serialization and a Kind() method to uniquely identify the job type. ```go type SortArgs struct { // Strings is a slice of strings to sort. Strings []string `json:"strings"` } func (SortArgs) Kind() string { return "sort" } ``` -------------------------------- ### Resume a paused queue Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/queue-management.md Use this method to resume a queue that has been previously paused. Pass the queue name and an optional configuration object. Context for the operation is required. ```go err := client.QueueResume(ctx, "critical", nil) fmt.Println("Queue resumed") ``` -------------------------------- ### Configure Test Settings Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Set test-specific configurations, such as disabling unique enforcement. This allows for custom time generators for testing purposes. ```go config := &river.Config{ Test: river.TestConfig{ DisableUniqueEnforcement: true, }, } ``` -------------------------------- ### Client.SubscribeConfig Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Subscribes to events with advanced configuration options. This method allows for more granular control over subscriptions, such as setting a buffer size for the event channel. ```APIDOC ## Client.SubscribeConfig ### Description Subscribes to events with advanced configuration. This method provides more control over the subscription process. ### Method `SubscribeConfig` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters #### config - **config** (`*SubscribeConfig`) - Required - - Subscription configuration object. ### Returns - **`<-chan *Event`** - An event channel to receive events. - **`func()`** - A cleanup function to unsubscribe from events. ### Example ```go config := &river.SubscribeConfig{ Kinds: []river.EventKind{river.EventKindJobCompleted}, BufferSize: 100, } eventChan, cleanup := client.SubscribeConfig(config) defer cleanup() ``` ``` -------------------------------- ### Implement Job Execution Logic Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-args-and-worker.md The Work method executes the job's logic. Return an error to mark the job as failed and trigger a retry. Ensure context cancellation is respected. ```go type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] } func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error { select { case <-ctx.Done(): return ctx.Err() default: } return sendEmail(job.Args.To, job.Args.Subject) } ``` -------------------------------- ### Access Queue Management Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Retrieves the queue bundle, which provides methods for managing job queues within the River system. ```go queues := client.Queues() ``` -------------------------------- ### Set Fetch Cooldown Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Configure the minimum time between job fetches from the database. This controls throughput and defaults to 100ms. ```go config := &river.Config{ FetchCooldown: 50 * time.Millisecond, } ``` -------------------------------- ### List Jobs with Filtering Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-management.md Retrieves a list of jobs, supporting filtering by state and pagination using a limit. This is a non-transactional operation. ```go func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobListResult, error) ``` ```go result, err := client.JobList(ctx, &river.JobListParams{ States: []rivertype.JobState{rivertype.JobStateAvailable}, Limit: 100, }) ``` -------------------------------- ### client.Subscribe Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Subscribes to a variadic list of event kinds. This is a convenience method for simple subscriptions without explicit buffering configuration. ```APIDOC ## Example: Monitoring Job Completion ```go package main import ( "fmt" "log" "context" "github.com/riverqueue/river" ) func main() { // Create client... client, _ := river.NewClient(driver, config) // Subscribe to job completion events eventChan, cleanup := client.Subscribe( river.EventKindJobCompleted, river.EventKindJobFailed, ) defer cleanup() // Monitor events in background go func() { for event := range eventChan { switch event.Kind { case river.EventKindJobCompleted: log.Printf("Job %d completed: ran for %v\n", event.Job.ID, event.JobStats.RunDuration) case river.EventKindJobFailed: log.Printf("Job %d failed: %s (attempt %d/%d)\n", event.Job.ID, event.Job.LastError, event.Job.Attempt, event.Job.MaxAttempts) } } }() // Client runs in main context if err := client.Start(context.Background()); err != nil { panic(err) } } ``` ``` -------------------------------- ### Configure Global Hooks Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Define global hooks to be invoked at different job lifecycle points. The order of hooks matters. ```go config := &river.Config{ Hooks: []rivertype.Hook{ &LoggingHook{}, &MetricsHook{}, }, } ``` -------------------------------- ### Client.Subscribe Method Signature Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/events.md Subscribes to client events, returning an event channel and a cleanup function. The cleanup function must be called to unsubscribe. ```go func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) ``` -------------------------------- ### Run River Linter Source: https://github.com/riverqueue/river/blob/master/docs/development.md Execute the golangci-lint linter to check code quality and attempt to automatically fix issues. ```shell golangci-lint run --fix ``` -------------------------------- ### Run All Tests with Race Detector Source: https://github.com/riverqueue/river/blob/master/AGENTS.md Use this command to run all tests with the race detector enabled. This is useful for identifying race conditions in concurrent code. ```bash make test/race ``` -------------------------------- ### Configure Structured Logger Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Provide a structured logger for River messages. If none is specified, a default warn-level text logger to stdout is used. ```go logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) config := &river.Config{ Logger: logger, } ``` -------------------------------- ### Register Workers Source: https://github.com/riverqueue/river/blob/master/docs/README.md Create a worker bundle and register workers using AddWorker. This panics if a worker is already registered or invalid. ```go workers := river.NewWorkers() // AddWorker panics if the worker is already registered or invalid: river.AddWorker(workers, &SortWorker{}) ``` -------------------------------- ### Client.Queues Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/client.md Returns the queue bundle for managing queues. This provides access to functionalities for interacting with and managing job queues. ```APIDOC ## Client.Queues ### Description Returns the queue bundle for managing queues. This provides access to functionalities for interacting with and managing job queues. ### Method (Implied GET or similar for retrieval) ### Endpoint (Not explicitly defined, assumed to be part of a client operation) ### Parameters (None) ### Response #### Success Response - **(*QueueBundle)** — Bundle for managing queues. ### Response Example ```go queues := client.Queues() ``` ``` -------------------------------- ### Use Default Client Retry Policy Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Configure a custom retry policy for job scheduling. The default uses exponential backoff. ```go config := &river.Config{ RetryPolicy: &river.DefaultClientRetryPolicy{}, } ``` -------------------------------- ### Configure Reindexer Index Names Source: https://github.com/riverqueue/river/blob/master/_autodocs/configuration.md Specify the indexes to be reindexed. If nil, a default set of 7 indexes is used. ```go config := &river.Config{ ReindexerIndexNames: river.ReindexerIndexNamesDefault(), } ``` -------------------------------- ### InsertOpts Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Configuration for job insertion, overriding defaults. Allows customization of attempts, metadata, pending state, priority, queue, scheduled time, tags, and uniqueness constraints. ```APIDOC ## InsertOpts ### Description Configuration for job insertion, overriding defaults. Allows customization of attempts, metadata, pending state, priority, queue, scheduled time, tags, and uniqueness constraints. ### Fields - **MaxAttempts** (`int`) - Maximum total attempts (original + retries) before job is discarded. Default: 25. - **Metadata** (`[]byte`) - Arbitrary JSON metadata stored with the job. Default: `nil`. - **Pending** (`bool`) - If true, insert job in pending state (not immediately available). Default: `false`. - **Priority** (`int`) - Priority 1-4, with 1 being highest. Affects fetch order. Default: 2. - **Queue** (`string`) - Queue name to insert into. Default: "default". - **ScheduledAt** (`time.Time`) - Time to schedule job for execution. Must be future time. Default: now. - **Tags** (`[]string`) - Arbitrary tags for job categorization (max 255 chars each). Default: `nil`. - **UniqueOpts** (`UniqueOpts`) - Uniqueness constraints for the job. Default: empty. ### Example ```go opts := &river.InsertOpts{ MaxAttempts: 5, Priority: 1, Queue: "critical", ScheduledAt: time.Now().Add(1 * time.Hour), Tags: []string{"email", "urgent"}, UniqueOpts: river.UniqueOpts{ ByArgs: true, ByQueue: true, }, } result, err := client.Insert(ctx, MyJobArgs{}, opts) ``` ``` -------------------------------- ### Implement Default Insertion Options Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/job-args-and-worker.md The InsertOpts method returns default insertion options for a job type. These options can override system defaults, such as MaxAttempts and Priority. ```go func (SendEmailArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ MaxAttempts: 10, Priority: 1, } } ``` -------------------------------- ### Create and Add a Periodic Job Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Defines a periodic job that runs on a schedule and adds it to the client. The job is configured to run hourly and perform a cleanup operation. ```go job := river.NewPeriodicJob( river.PeriodicInterval(1 * time.Hour), func() (river.JobArgs, *river.InsertOpts) { return CleanupArgs{}, nil }, &river.PeriodicJobOpts{ID: "hourly_cleanup"}, ) client.PeriodicJobs().Add(job) ``` -------------------------------- ### Fast Batch Insertion with InsertManyFast Source: https://github.com/riverqueue/river/blob/master/_autodocs/api-reference/insert-opts.md Use InsertManyFast for high-performance batch insertion of many jobs (1000+) when only the count of inserted jobs is needed. This method bypasses returning individual job details for speed. ```go func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) ``` ```go params := make([]river.InsertManyParams, 10000) for i := 0; i < 10000; i++ { params[i] = river.InsertManyParams{ Args: MyJobArgs{ID: i}, } } count, err := client.InsertManyFast(ctx, params) fmt.Printf("Inserted %d jobs\n", count) ``` -------------------------------- ### Config Struct Definition Source: https://github.com/riverqueue/river/blob/master/_autodocs/types.md Configuration for a River client, including various settings for job retention, fetching, error handling, and middleware. ```go type Config struct { AdvisoryLockPrefix int32 CancelledJobRetentionPeriod time.Duration CompletedJobRetentionPeriod time.Duration DiscardedJobRetentionPeriod time.Duration ErrorHandler ErrorHandler FetchCooldown time.Duration FetchPollInterval time.Duration ID string Hooks []rivertype.Hook JobInsertMiddleware []rivertype.JobInsertMiddleware JobTimeout time.Duration Logger *slog.Logger MaxAttempts int Middleware []rivertype.Middleware PeriodicJobs []*PeriodicJob PollOnly bool Queues map[string]QueueConfig ReindexerSchedule PeriodicSchedule ReindexerIndexNames []string ReindexerTimeout time.Duration RescueStuckJobsAfter time.Duration RetryPolicy ClientRetryPolicy Schema string SoftStopTimeout time.Duration SkipJobKindValidation bool SkipUnknownJobCheck bool Test TestConfig TestOnly bool Workers *Workers WorkerMiddleware []rivertype.WorkerMiddleware } ``` -------------------------------- ### Custom Retry Policy Implementation Source: https://github.com/riverqueue/river/blob/master/_autodocs/README.md Implements a custom retry policy for jobs using exponential backoff. The `NextRetryDelay` method calculates the delay based on the number of attempts. ```go type MyRetryPolicy struct{} func (p *MyRetryPolicy) NextRetryDelay(args *rivertype.JobInsertParams, attempt int) time.Duration { // Custom exponential backoff return time.Duration(math.Pow(2, float64(attempt))) * time.Second } config := &river.Config{ RetryPolicy: &MyRetryPolicy{}, } ```