### Install River UI from Source Source: https://riverqueue.com/docs/river-ui Installs the open-source River UI command-line tool using Go modules. Run the 'riverui' command after installation. ```bash go install riverqueue.com/riverui/cmd/riverui@latest riverui ``` -------------------------------- ### Install River Pro UI from Source Source: https://riverqueue.com/docs/river-ui Installs the River Pro UI command-line tool using Go modules. Run the 'riverproui' command after installation. ```bash go install riverqueue.com/riverui/riverproui/cmd/riverproui@latest priverproui ``` -------------------------------- ### Install River Pro CLI Source: https://riverqueue.com/docs/pro/migrations Installs the River Pro CLI using `go install`. Ensure your environment is configured for Pro, including `GOPROXY` and `GONOSUMDB`. ```bash go install riverqueue.com/riverpro/cmd/riverpro@latest ``` -------------------------------- ### Install River CLI and Run Migrations Source: https://riverqueue.com/docs/river-ui Installs the River CLI tool and applies database migrations. Ensure the DATABASE_URL environment variable is set. ```bash go install github.com/riverqueue/river@latest river migrate-up --database-url "$DATABASE_URL" ``` -------------------------------- ### GitHub Actions Setup for River Pro Source: https://riverqueue.com/docs/pro/go-proxy Configure GitHub Actions to install River Pro CLI and set necessary environment variables for Go builds. This includes setting GOPROXY and GONOSUMDB using repository secrets. ```yaml - name: Setup Go uses: actions/setup-go@v5 with: go-version: "stable" check-latest: true ``` ```yaml - name: Install River Pro CLI run: go install riverqueue.com/riverpro/cmd/riverpro@latest env: GOPROXY: https://proxy.golang.org,https://river:${{ secrets.RIVER_PRO_SECRET }}@riverqueue.com/goproxy,direct GONOSUMDB: riverqueue.com/riverpro ``` ```yaml - name: Migrate River run: | riverpro migrate-up --database-url "$DATABASE_URL" riverpro migrate-up --database-url "$DATABASE_URL" --line pro ``` ```yaml - name: Go test run: go test ./... env: GOPROXY: https://proxy.golang.org,https://river:${{ secrets.RIVER_PRO_SECRET }}@riverqueue.com/goproxy,direct GONOSUMDB: riverqueue.com/riverpro ``` -------------------------------- ### Install River CLI Source: https://riverqueue.com/docs Install the River command-line interface tool to manage database migrations. This tool ensures River's database schema is up-to-date. ```go go install github.com/riverqueue/river/cmd/river@latest ``` -------------------------------- ### Install OpenTelemetry Middleware Source: https://riverqueue.com/docs/open-telemetry Install the OpenTelemetry middleware package for River using go get. ```go go get -u github.com/riverqueue/rivercontrib/otelriver ``` -------------------------------- ### Benchmark Output Example Source: https://riverqueue.com/docs/benchmarks Example output from the benchmark utility showing job throughput over time. ```text bench: jobs worked [ 0 ], inserted [ 1000000 ], job/sec [ 0.0 ] [0s] bench: jobs worked [ 82657 ], inserted [ 0 ], job/sec [ 41328.5 ] [2s] bench: jobs worked [ 96057 ], inserted [ 0 ], job/sec [ 48028.5 ] [2s] bench: jobs worked [ 89829 ], inserted [ 0 ], job/sec [ 44914.5 ] [2s] bench: jobs worked [ 96847 ], inserted [ 0 ], job/sec [ 48423.5 ] [2s] bench: jobs worked [ 96042 ], inserted [ 0 ], job/sec [ 48021.0 ] [2s] bench: jobs worked [ 87198 ], inserted [ 0 ], job/sec [ 43599.0 ] [2s] bench: jobs worked [ 96474 ], inserted [ 0 ], job/sec [ 48237.0 ] [2s] bench: jobs worked [ 94126 ], inserted [ 0 ], job/sec [ 47063.0 ] [2s] bench: jobs worked [ 85323 ], inserted [ 0 ], job/sec [ 42661.5 ] [2s] bench: jobs worked [ 94043 ], inserted [ 0 ], job/sec [ 47021.5 ] [2s] bench: jobs worked [ 81387 ], inserted [ 0 ], job/sec [ 40693.5 ] [2s] bench: total jobs worked [ 1000000 ], total jobs inserted [ 1000000 ], overall job/sec [ 45753.1 ], running 21.856442959s ``` -------------------------------- ### Install River CLI Source: https://riverqueue.com/docs/benchmarks Installs the River CLI tool. Ensure you have a recent version for the latest optimizations. ```bash go install github.com/riverqueue/river/cmd/river@latest ``` -------------------------------- ### Initialize River Pro with database/sql Driver Source: https://riverqueue.com/docs/pro/changelog Example of initializing a River Pro client using the `riverdatabasesql` driver, which supports Pgx or lib/pq. This is useful for integrating with ORMs like Bun and GORM. ```go import ( "database/sql" _ "github.com/jackc/pgx/v5/stdlib" riverdatabasesql "riverqueue.com/riverpro/driver/riverdatabasesql" ) func main() { db, err := sql.Open("pgx", "postgres://user:password@host:port/database") // handle error client, err := riverdatabasesql.NewClient(db) // handle error } ``` -------------------------------- ### Start River Client Source: https://riverqueue.com/docs/inserting-and-working-jobs Initialize and start a River client with a database pool, driver, and configuration including queues and registered workers. The client will process jobs from the specified queues. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } // Run the client inline. All executed jobs will inherit from ctx: if err := riverClient.Start(ctx); err != nil { panic(err) } ``` -------------------------------- ### Create and Start River Client Source: https://riverqueue.com/docs Initialize a River client with a database connection, driver, and configuration. The client manages job processing and can be started to begin working jobs. ```go dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { // handle error } riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { // handle error } // Run the client inline. All executed jobs will inherit from ctx: if err := riverClient.Start(ctx); err != nil { // handle error } ``` -------------------------------- ### Configuring River Client with ErrorHandler Source: https://riverqueue.com/docs/error-handling Example of how to provide a custom ErrorHandler implementation when initializing a River client. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ ErrorHandler: &CustomErrorHandler{}, ... }) ``` -------------------------------- ### Workflow API Migration Example Source: https://riverqueue.com/docs/pro/changelog Illustrates the find and replace operations needed to migrate from the older `river` package APIs to the new `riverpro` client for workflow management. ```go // Change river.NewClient to riverpro.NewClient // Change river.Client to riverpro.Client // Change river.ClientFromContext to riverpro.ClientFromContext // Change river.Config to be nested as the Config attr within a riverpro.Config // Change riverworkflow.Workflow to riverpro.NewWorkflow // Change riverworkflow.Opts to riverpro.WorkflowOpts // Change riverworkflow.TaskOpts to riverpro.WorkflowTaskOpts // Change riverworkflow.Prepare(ctx, riverClient, to riverClient.WorkflowPrepare(ctx, // Change riverworkflow.PrepareTx(ctx, riverClient, to riverClient.WorkflowPrepareTx(ctx, ``` -------------------------------- ### Install robfig/cron Package Source: https://riverqueue.com/docs/periodic-jobs Install the `robfig/cron` package to use complex cron schedules. ```bash go get github.com/robfig/cron/v3 ``` -------------------------------- ### Install River Packages Source: https://riverqueue.com/docs Install the River package and the pgx driver for PostgreSQL integration. Alternatively, use the database/sql driver for compatibility with Go's built-in sql package. ```go go get github.com/riverqueue/river go get github.com/riverqueue/river/riverdriver/riverpgxv5 ``` -------------------------------- ### Create and Start River UI Handler Source: https://riverqueue.com/docs/river-ui Initializes River UI endpoints and creates an http.Handler. Starts the handler to initialize background processes and mounts it to an HTTP mux. ```go endpoints := riverui.NewEndpoints(riverClient, nil) // or, for Pro customers: endpoints = riverproui.NewEndpoints(riverProClient, nil) opts := &riverui.HandlerOpts{ Endpoints: endpoints, Logger: slogLogger, Prefix: "/riverui", // mount the UI and its APIs under /riverui or another path // ... } handler, err := riverui.NewHandler(opts) if err != nil { log.Fatal(err) } // Start the handler to initialize background processes for caching and periodic queries: handler.Start(ctx) mux := http.NewServeMux() mux.Handle("/riverui/", handler) // ... start and run your HTTP server ``` -------------------------------- ### Example SQL for River Migration 3 (Up) Source: https://riverqueue.com/docs/migrations The raw SQL content for River migration version 3, applied in the 'up' direction. This example demonstrates altering and updating the 'tags' column in the 'river_job' table. ```sql -- River migration 003 [up] ALTER TABLE river_job ALTER COLUMN tags SET DEFAULT '{}'; UPDATE river_job SET tags = '{}' WHERE tags IS NULL; ALTER TABLE river_job ALTER COLUMN tags SET NOT NULL; ``` -------------------------------- ### Implement Batching for a Job Kind Source: https://riverqueue.com/docs/pro/batching Implement `BatchOpts()` on your job args to enable batching and configure how batches are formed. This example shows the basic structure for enabling batching and defining `Work` and `WorkMany` methods. ```go type MyBatchArgs struct{} func (MyBatchArgs) Kind() string { return "my_batch" } // Enable batching for this job kind. Customize options as needed. func (MyBatchArgs) BatchOpts() riverpro.BatchOpts { return riverpro.BatchOpts{} } type MyWorker struct { river.WorkerDefaults[MyBatchArgs] } func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyBatchArgs]) error { // Invoke the batch helper so this job can gather a batch and run WorkMany. return riverbatch.Work[MyBatchArgs, pgx.Tx](ctx, job, w, nil) } func (w *MyWorker) WorkMany(ctx context.Context, jobs []*river.Job[MyBatchArgs]) error { // Process the entire batch at once. // // Return nil to mark the entire batch as successful, a regular error to // fail the entire batch with the same error, or a MultiError to return // individualized errors per job. return nil } ``` -------------------------------- ### Get River Client from Context (database/sql) Source: https://riverqueue.com/docs/context-client When using the `database/sql` driver, the type parameter for `ClientFromContext` should be `*sql.Tx`. ```go client := river.ClientFromContext[*sql.Tx](ctx) ``` -------------------------------- ### Install riverlog Middleware Source: https://riverqueue.com/docs/job-logging Configure a River client with the `riverlog.Middleware` to enable job-persisted logging. The middleware accepts a function to initialize a `slog` logger. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Middleware: []rivertype.Middleware{ riverlog.NewMiddleware(func(w io.Writer) slog.Handler { return slog.NewJSONHandler(w, nil) }, nil), }, }) if err != nil { panic(err) } ``` -------------------------------- ### Install OpenTelemetry Middleware on River Client Source: https://riverqueue.com/docs/open-telemetry Install the OpenTelemetry middleware when creating a new River client. This middleware will run for all jobs inserted or worked by this client. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Middleware: []rivertype.Middleware{ // Install the OpenTelemetry middleware to run for all jobs inserted // or worked by this River client. otelriver.NewMiddleware(nil), }, }) ``` -------------------------------- ### Create an Insert-Only River Client Source: https://riverqueue.com/docs/insert-only-clients Initialize a River client for inserting jobs. Ensure the client is not started with `Start` and that `Queues` and `Workers` fields are omitted from its config. ```go insertOnlyClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{}) ``` -------------------------------- ### Initialize River Client with Encryption Hook Source: https://riverqueue.com/docs/pro/changelog Example of initializing a River client with a new encryption hook using NaCl Secretbox. A 32-byte key is generated for encryption. ```go var key [32]byte rand.Reader.Read(key[:]) riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: river.Config{ Hooks: []rivertype.Hook{ riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(key)), }, }, }) ``` -------------------------------- ### Initiate Workflow from Existing Job Source: https://riverqueue.com/docs/pro/workflows Use `Client.WorkflowFromExisting()` to get a workflow object based on an existing job. This is the first step to dynamically adding tasks to a workflow. ```go workflow, err := riverClient.WorkflowFromExisting(job.JobRow, nil) if err != nil { return err } ``` -------------------------------- ### Integrate River Migrations with Goose Go API Source: https://riverqueue.com/docs/migrations Set up River migrations within a Goose Go migration file. This example demonstrates initializing the River migrator with a standard library sql.DB and performing migrations. ```go package main import ( "context" "database/sql" "github.com/pressly/goose/v3" "github.com/riverqueue/river/riverdriver/riverdatabasesql" "github.com/riverqueue/river/rivermigrate" ) func init() { goose.AddMigrationNoTxContext(Up, Down) } func Up(ctx context.Context, db *sql.DB) error { migrator := rivermigrate.New(riverdatabasesql.New(db), nil) // Migrate up. An empty MigrateOpts will migrate all the way up, but // best practice is to specify a specific target version. _, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ TargetVersion: , }) return err } func Down(ctx context.Context, db *sql.DB) error { migrator := rivermigrate.New(riverdatabasesql.New(db), nil) // TargetVersion -1 removes River's schema completely. _, err := migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ TargetVersion: -1, }) return err } ``` -------------------------------- ### Connect to Postgres with pgx for Goose Source: https://riverqueue.com/docs/migrations Example of connecting to a PostgreSQL database using the pgx driver for use with the Goose migration framework. Ensure the database string is correctly formatted. ```go import _ "github.com/jackc/pgx/v5/stdlib" ... db, err := goose.OpenDBWithDriver("pgx", dbstring) if err != nil { // handle error } ``` -------------------------------- ### Get River Pro Client from Context Source: https://riverqueue.com/docs/context-client For River Pro customers, the `riverpro.Client` can be accessed from the context using `riverpro.ClientFromContext`. ```go riverproClient := riverpro.ClientFromContext[pgx.Tx](ctx) ``` -------------------------------- ### Define Initial Periodic Job Schedule Source: https://riverqueue.com/docs/pro/durable-periodic-jobs Sets up a periodic job with an initial schedule and a unique ID. This is the starting point before any modifications. ```go PeriodicJobs: []*river.PeriodicJob{ river.NewPeriodicJob( river.PeriodicInterval(365*24*time.Hour), ... &river.PeriodicJobOpts{ ID: "my_periodic_job", }, ), }, ``` -------------------------------- ### Configure Unique Job Properties Source: https://riverqueue.com/docs/unique-jobs Implement `JobArgsWithInsertOpts` and populate `UniqueOpts` to define uniqueness constraints for a job type. This example enforces uniqueness by arguments and a 24-hour period. ```go type ReconcileAccountArgs struct { AccountID int `json:"account_id"` } func (ReconcileAccountArgs) Kind() string { return "reconcile_account" } // InsertOpts returns custom insert options that every job of this type will // inherit, including unique options. func (ReconcileAccountArgs) InsertOpts() river.InsertOpts { return river.InsertOpts{ UniqueOpts: river.UniqueOpts{ ByArgs: true, ByPeriod: 24 * time.Hour, }, } } ... ``` -------------------------------- ### Initial Key Configuration for Encrypted Jobs Source: https://riverqueue.com/docs/pro/encrypted-jobs Configure the River client with the initial encryption key. This is the starting point for key rotation. ```go keyOld := mustDecodeBase64EncodedKey("fdnQ7+v/5Pb28rYqpynRSdzWfqs1gD6/J/0I9IUh65s=") _, err := riverpro.NewClient(riverpropgxv5.New(nil), &riverpro.Config{ Config: river.Config{ Hooks: []rivertype.Hook{ riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor( keyOld, )), }, }, }) ``` -------------------------------- ### Fetch River Pro modules using .netrc Source: https://riverqueue.com/docs/pro/go-proxy Fetch River Pro modules after configuring credentials in ~/.netrc. This example uses GOPROXY without an inline secret. ```shell export GOPROXY=https://proxy.golang.org,https://riverqueue.com/goproxy,direct export GONOSUMDB=riverqueue.com/riverpro,$GONOSUMDB go get riverqueue.com/riverpro go get riverqueue.com/riverpro/driver/riverpropgxv5 ``` -------------------------------- ### Configure Go environment variables with go env Source: https://riverqueue.com/docs/pro/go-proxy Use `go env -w` to persistently configure GOPROXY and GONOSUMDB. This example shows configuration with and without an inline secret. ```shell go env -w GOPROXY=https://proxy.golang.org,https://riverqueue.com/goproxy,direct go env -w GONOSUMDB=riverqueue.com/riverpro ``` ```shell # or, with a secret inline instead of being read from `~/.netrc` go env -w GOPROXY=https://proxy.golang.org,https://river:river_secret_...@riverqueue.com/goproxy,direct ``` ```shell go get riverqueue.com/riverpro go get riverqueue.com/riverpro/driver/riverpropgxv5 ``` -------------------------------- ### Share a transaction between Bun and River Source: https://riverqueue.com/docs/bun Start a transaction using Bun's `BeginTx`, then use the embedded `*sql.Tx` with River's `InsertTx` to perform operations within the same transaction. Ensure to commit the transaction afterwards. ```go tx, err := bunDB.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return nil, err } _, err = riverClient.InsertTx(ctx, tx.Tx, SortArgs{ // tx.Tx is *sql.Tx Strings: []string{ "whale", "tiger", "bear", }, }, nil) if err != nil { return nil, err } if err := tx.Commit(); err != nil { return nil, err } ``` -------------------------------- ### Initialize River Client with `riverpgxv5` Driver Source: https://riverqueue.com/docs/database-drivers Demonstrates how to initialize a River client using the `riverpgxv5` driver, which wraps a `pgxpool` database pool. Ensure the `DATABASE_URL` environment variable is set. ```go import "github.com/riverqueue/river" import "github.com/riverqueue/river/riverdriver/riverpgxv5" ... dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) if err != nil { panic(err) } deferr dbPool.Close() riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ ... }) if err != nil { panic(err) } ``` -------------------------------- ### Initialize River Client with libSQL Source: https://riverqueue.com/docs/sqlite Set up the database connection and River client using the libsql driver. Ensure the database pool is configured for single connection to manage concurrency. ```go dbPool, err := sql.Open("libsql", "file:./river.libsql") if err != nil { panic(err) } deferr dbPool.Close() dbPool.SetMaxOpenConns(1) workers := river.NewWorkers() river.AddWorker(workers, &SortWorker{}) riverClient, err := river.NewClient(riversqlite.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } ``` -------------------------------- ### Base64 Encoded Key Example Source: https://riverqueue.com/docs/pro/encrypted-jobs An example of a base64 encoded 32-byte encryption key. ```text encoded key: iRmwTuVGl2BAwTUPRTJbP/iA2EKpTrzXpEcNIXG2BI0= ``` -------------------------------- ### Initialize River Pro Client Source: https://riverqueue.com/docs/pro/getting-started Initialize a River Pro client using `riverpro.NewClient` with the `riverpropgxv5` driver and a database connection pool. Configure queues and workers as needed. ```go import ( ... "github.com/riverqueue/river" "river.com/riverpro" "river.com/riverpro/driver/riverpropgxv5" ) riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, } }) if err != nil { // handle error } if err := riverClient.Start(ctx); err != nil { // handle error } ``` -------------------------------- ### Initialize River Client with Sequel Source: https://riverqueue.com/docs/ruby Connect to the database using Sequel and initialize the River client with the Sequel driver. ```ruby DB = Sequel.connect("postgres://...") client = River::Client.new(River::Driver::Sequel.new(DB)) ``` -------------------------------- ### Import necessary packages for Bun and River integration Source: https://riverqueue.com/docs/bun Import the pgx driver for database connection, River core library, and the River SQL database driver. ```go import ( _ "github.com/jackc/pgx/v5/stdlib" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverdatabasesql" ) ``` -------------------------------- ### Initialize SQLite Client with River Source: https://riverqueue.com/docs/sqlite Activate the SQLite driver by initializing a client with `riversqlite` instead of the default `riverpgxv5`. Ensure `modernc.org/sqlite` is imported. ```go import ( "database/sql" _ "modernc.org/sqlite" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riversqlite" ) ``` ```go dbPool, err := sql.Open("sqlite", "file:./river.sqlite3") if err != nil { panic(err) } deferr dbPool.Close() dbPool.SetMaxOpenConns(1) workers := river.NewWorkers() river.AddWorker(workers, &SortWorker{}) riverClient, err := river.NewClient(riversqlite.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { panic(err) } ``` -------------------------------- ### Add Periodic Job After Client Start Source: https://riverqueue.com/docs/periodic-jobs Dynamically add a periodic job using `riverClient.PeriodicJobs().Add()` after the client has started. This operation only has an effect if the client is the elected leader. ```go riverClient, err := river.NewClient(...) if err != nil { panic(err) } if err := riverClient.Start(ctx); err != nil { panic(err) } // Add a periodic job after client has already started. periodicJobHandle := riverClient.PeriodicJobs().Add( river.NewPeriodicJob( river.PeriodicInterval(15*time.Minute), func() (river.JobArgs, *river.InsertOpts) { return MyPeriodicJobArgs{}, nil }, nil, ), ) ``` -------------------------------- ### Install EncryptHook with NaCl Secretbox Source: https://riverqueue.com/docs/pro/encrypted-jobs Enable encryption by installing `riverencrypt.EncryptHook` on a River client. This hook tolerates jobs that are not encrypted, making it safe to enable without immediate migration. ```go import ( "encoding/base64" "riverqueue.com/riverpro/riverencrypt" "riverqueue.com/riverpro/riverencrypt/riversecretbox" ) decodedBytes, err := base64.StdEncoding.DecodeString( "iRmwTuVGl2BAwTUPRTJbP/iA2EKpTrzXpEcNIXG2BI0=", ) if err != nil { panic(err) } var key [32]byte if copy(key[:], decodedBytes) != 32 { panic("expected to copy exactly 32 bytes") } riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: river.Config{ Hooks: []rivertype.Hook{ riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(key)), }, }, }) ``` -------------------------------- ### Initialize River Client with ActiveRecord Source: https://riverqueue.com/docs/ruby Establish a database connection using ActiveRecord and initialize the River client with the ActiveRecord driver. ```ruby require "riverqueue" require "riverqueue-activerecord" ActiveRecord::Base.establish_connection("postgres://...") client = River::Client.new(River::Driver::ActiveRecord.new) ``` -------------------------------- ### Safely Get River Client from Context (pgx.Tx) Source: https://riverqueue.com/docs/context-client Use `ClientFromContextSafely` to get the River client without panicking if called on an invalid context. This version returns an error if the client cannot be retrieved. ```go func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyArgs]) error { client, err := river.ClientFromContextSafely[pgx.Tx](ctx) if err != nil { return fmt.Errorf("error getting client from context: %w", err) } ... } ``` -------------------------------- ### Run Pro Migrations from Go Source: https://riverqueue.com/docs/pro/migrations Initializes and runs Pro migrations from Go code using the `riverpropgxv5` driver and specifying the `pro` line in the configuration. Leaving the `Line` property empty defaults to the main River migration line. ```go migrator := rivermigrate.New(riverpropgxv5.New(dbPool), &rivermigrate.Config{ Line: "pro", }) res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{}) if err != nil { // handle error } ``` -------------------------------- ### Add Riverqueue Dependency with Rye Source: https://riverqueue.com/docs/python Install the riverqueue package using the Rye package manager. ```bash rye add riverqueue ``` -------------------------------- ### Initialize River Client with EncryptHook Source: https://riverqueue.com/docs/pro/encrypted-jobs Configure a River client to use `riverencrypt.NewEncryptHook` with `riversecretbox.NewEncryptor` for NaCl Secretbox encryption. ```go import ( "riverqueue.com/riverpro/riverencrypt" "riverqueue.com/riverpro/riverencrypt/riversecretbox" ) riverClient, err := riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{ Config: river.Config{ Hooks: []rivertype.Hook{ riverencrypt.NewEncryptHook(riversecretbox.NewEncryptor(key)), }, }, }) ``` -------------------------------- ### Custom Error Handler Implementation Source: https://riverqueue.com/docs/error-handling Example implementation of the ErrorHandler interface for custom error and panic handling. ```APIDOC ## Custom Error Handler Implementation ### Description This example shows a basic implementation of the `ErrorHandler` interface. It prints messages to the console when a job errors or panics. ### Methods - **HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult** - Prints the error message to standard output. - Returns `nil`, allowing the job to follow the retry schedule. - **HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult** - Prints the panic value and stack trace to standard output. - Returns `nil`, allowing the job to follow the retry schedule. ### Code Example ```go type CustomErrorHandler struct{} func (*CustomErrorHandler) HandleError(ctx context.Context, job *rivertype.JobRow, err error) *river.ErrorHandlerResult { fmt.Printf("Job errored with: %s\n", err) return nil } func (*CustomErrorHandler) HandlePanic(ctx context.Context, job *rivertype.JobRow, panicVal any, trace string) *river.ErrorHandlerResult { fmt.Printf("Job panicked with: %v\n", panicVal) fmt.Printf("Stack trace: %s\n", trace) return nil } ``` ``` -------------------------------- ### Define a Basic Workflow with Dependencies Source: https://riverqueue.com/docs/pro/workflows Use `Client.NewWorkflow()` to create a workflow and `Add()` to define tasks with dependencies. Jobs and arguments are defined like any other River job. ```go import ( "context" "riverqueue.com/river" "riverqueue.com/river/pgx" "riverqueue.com/riverpro" ) // MyJobArgs is a sample River JobArgs struct type MyJobArgs struct { // ... } func (MyJobArgs) Kind() string { return "my_job" } func SampleWorkflow(client *riverpro.Client[pgx.Tx]) *riverpro.Workflow[pgx.Tx] { // Create a new workflow: workflow := client.NewWorkflow(&riverpro.WorkflowOpts{Name: "My first workflow"}) // Add a first task to the workflow, named "a": taskA := workflow.Add("a", MyJobArgs{}, nil, nil) // Fan-out to tasks b1 and b2, which both depend on task a: taskB1 := workflow.Add("b1", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskA.Name}}) taskB2 := workflow.Add("b2", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskA.Name}}) // Fan-in to task c, which depends on both b1 and b2: taskC := workflow.Add("c", MyJobArgs{}, nil, &riverpro.WorkflowTaskOpts{Deps: []string{taskB1.Name, taskB2.Name}}) var _ = taskC // avoids "declared and not used" error return workflow } func main() { ctx := context.Background() // Assuming dbPool and workers are defined elsewhere var dbPool *pgx.Pool var workers []river.Worker riverClient, err := riverpro.NewClient(riverpgx.New(dbPool), &riverpro.Config{ Config: river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }, }) // Prepare the workflow for insertion and validate it: workflow := SampleWorkflow(riverClient) result, err := workflow.Prepare(ctx) if err != nil { panic(err) } // The result.Jobs field holds a slice of river.InsertManyOpts which can be // enqueued with a riverClient.InsertMany / InsertManyTx call: if _, err := riverClient.InsertMany(ctx, result.Jobs); err != nil { panic(err) } // continue execution, stop client, etc... } ``` -------------------------------- ### Implement SequenceOpts for Basic Sequencing Source: https://riverqueue.com/docs/pro/sequences Implement the SequenceOpts() interface on your JobArgs struct to enable basic sequencing. By default, all jobs of the same kind run in a single sequence. ```go type MyJobArgs struct { CustomerID string `json:"customer_id"` } func (MyJobArgs) Kind() string { return "my_job" } func (MyJobArgs) SequenceOpts() riverpro.SequenceOpts { // Use the default sequence partitioning based solely on the job kind. return riverpro.SequenceOpts{} } ``` -------------------------------- ### Assertion Failure Example Source: https://riverqueue.com/docs/testing This output shows a typical failure message from `RequireInsertedTx` when no jobs matching the expected kind are found. ```text --- FAIL: TestRequireInsertedTx (0.00s) --- FAIL: TestRequireInsertedTx/FailsWithoutInsert (0.12s) rivertest.go:352: River assertion failure: No jobs found with kind: required ``` -------------------------------- ### Run River Pro Migrations Source: https://riverqueue.com/docs/pro/getting-started Execute database migrations using the `riverpro` CLI. First, apply standard River migrations, then apply Pro-specific migration lines. ```bash # first, install the standard River migrations: riverpro migrate-up --database-url "$DATABASE_URL" ``` ```bash # then add the pro migration line: riverpro migrate-up --database-url "$DATABASE_URL" --line pro ``` -------------------------------- ### Initialize River Client with DataDog Integration Source: https://riverqueue.com/docs/open-telemetry Initialize a River client with the OpenTelemetry middleware, assuming the DataDog tracer provider has been globally configured. ```go riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{ Middleware: []rivertype.Middleware{ otelriver.NewMiddleware(nil), }, }) ``` -------------------------------- ### Share a transaction between GORM and River Source: https://riverqueue.com/docs/gorm Start a transaction in GORM, extract the underlying `*sql.Tx`, and use it with River's `InsertTx` for atomic operations. ```go tx := gormDB.Begin() if err := tx.Error; err != nil { return nil, err } // If in a transaction, ConnPool can be type asserted as an *sql.Tx so // operations from GORM and River occur on the same transaction. sqlTx := tx.Statement.ConnPool.(*sql.Tx) _, err = riverClient.InsertTx(ctx, sqlTx, SortArgs{ Strings: []string{ "whale", "tiger", "bear", }, }, nil) if err != nil { return nil, err } if err := tx.Commit().Error; err != nil { return nil, err } ``` -------------------------------- ### Enable Immediate Transactions in SQLite Source: https://riverqueue.com/docs/sqlite Configure the database connection to use `BEGIN IMMEDIATE` transactions by appending `?_txlock=immediate` to the database URL. This prevents SQLITE_BUSY errors during transaction upgrades. ```go dbPool, err := sql.Open("sqlite", "file:./river.sqlite3?_txlock=immediate") ``` -------------------------------- ### Inject Tracer Provider into Middleware Source: https://riverqueue.com/docs/open-telemetry Inject a specific OpenTelemetry tracer provider into the otelriver middleware configuration, useful for testing or custom setups. ```go provider := ddotel.NewTracerProvider() deferr func() { _ = provider.Shutdown() }() riverClient, err := river.NewClient(riverpgxv5.New(nil), &river.Config{ Middleware: []rivertype.Middleware{ otelriver.NewMiddleware(&otelriver.MiddlewareConfig{ TracerProvider: provider, }), }, }) ``` -------------------------------- ### Access Logger in Worker Context Source: https://riverqueue.com/docs/job-logging Once the middleware is installed, retrieve the logger within a worker's `Work` function using `riverlog.Logger(ctx)`. Logs are then accumulated and stored. ```go import ( "context" "log/slog" "github.com/riverqueue/river" ) ``` ```go type LoggingWorker struct { river.WorkerDefaults[LoggingArgs] } func (w *LoggingWorker) Work(ctx context.Context, job *river.Job[LoggingArgs]) error { riverlog.Logger(ctx).Info("Logged from worker") riverlog.Logger(ctx).Info("Another line logged from worker", slog.String("key", "value")) return nil } ``` -------------------------------- ### Login to River Pro Docker Registry and Run Image Source: https://riverqueue.com/docs/river-ui Logs into the riverqueue.com Docker registry using a River Pro license key and then pulls and runs the Pro variant of the River UI Docker image. ```bash # assuming RIVER_PRO_SECRET is set to a valid River Pro license key: echo $RIVER_PRO_SECRET | docker login riverqueue.com -u river --password-stdin docker pull riverqueue.com/riverproui:latest docker run -p 8080:8080 --env DATABASE_URL riverqueue.com/riverproui:latest ``` -------------------------------- ### Import necessary packages for GORM and River Source: https://riverqueue.com/docs/gorm Import the required packages for GORM, River, and the `riverdatabasesql` driver. ```go import ( "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverdatabasesql" "gorm.io/driver/postgres" "gorm.io/gorm" ) ``` -------------------------------- ### Prepare and Insert Workflow Source: https://riverqueue.com/docs/pro/workflows Prepare a workflow for insertion using Prepare() or PrepareTx(). These methods validate the dependency graph. The returned river.InsertManyParams can then be inserted into the database. ```go // client is a riverpro.Client: result, err := workflow.Prepare(ctx) if err != nil { return err } if _, err := client.InsertMany(ctx, result.Jobs); err != nil { return err } ``` -------------------------------- ### Fetch River Pro modules with secret Source: https://riverqueue.com/docs/pro/go-proxy Configure GOPROXY and GONOSUMDB to fetch River Pro modules, including setting a River Pro secret and installing the module and driver. ```shell export RIVER_PRO_SECRET=river_secret_... export GOPROXY=https://proxy.golang.org,https://river:$RIVER_PRO_SECRET@riverqueue.com/goproxy,direct export GONOSUMDB=riverqueue.com/riverpro,$GONOSUMDB go get riverqueue.com/riverpro go get riverqueue.com/riverpro/driver/riverpropgxv5 ``` ```shell # install riverpro CLI: go install riverqueue.com/riverpro/cmd/riverpro@latest ``` -------------------------------- ### Initialize Riverqueue Client with SQLAlchemy Source: https://riverqueue.com/docs/python Set up a Riverqueue client using a SQLAlchemy engine and the SQLALchemy driver. Ensure your PostgreSQL connection string is correctly formatted. ```python import riverqueue from riverqueue.driver import riversqlalchemy engine = sqlalchemy.create_engine("postgresql://...") client = riverqueue.Client(riversqlalchemy.Driver(engine)) ``` -------------------------------- ### Configure Multiple Queues with River Client Source: https://riverqueue.com/docs/multiple-queues Set up a River client with both the default queue and a custom 'high_priority' queue, each with a maximum of 100 workers. Includes basic logger configuration. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}), Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, "high_priority": {MaxWorkers: 100}, }, Workers: workers, }) ``` -------------------------------- ### Custom Worker Retry Delay Source: https://riverqueue.com/docs/job-retries Override the default retry behavior for a specific worker type by implementing the `NextRetry` method. This example sets a constant 10-second delay for all retries. ```go type ConstantRetryTimeWorker struct { river.WorkerDefaults[MyJobArgs] } func (w *ConstantRetryTimeWorker) Work(job *Job[MyJobArgs]) error { // ... } // NextRetry always schedules the next retry for 10 seconds from now. func (w *ConstantRetryTimeWorker) NextRetry(job *Job[MyJobArgs]) time.Time { return time.Now().Add(10*time.Second) } ``` -------------------------------- ### Prepare and Insert Workflow Tasks Source: https://riverqueue.com/docs/pro/workflows After adding tasks, prepare the workflow for insertion using `Prepare()` and then insert the new jobs using `InsertMany()`. This ensures the workflow is validated and updated. ```go result, err := workflow.Prepare(ctx) if err != nil { panic(err) } // Insert the new task(s): if _, err := riverClient.InsertMany(ctx, result.Jobs); err != nil { panic(err) } ``` -------------------------------- ### Custom Client Retry Policy with Linear Backoff Source: https://riverqueue.com/docs/job-retries Implement a custom `RetryPolicy` for the client to define a linear backoff strategy. This example delays retries by 5 seconds for each failure. ```go // LinearRetryPolicy delays subsequent retries by 5 seconds for each time // the job has failed (5s, 10s, 15s, etc.). type LinearRetryPolicy struct {} // NextRetry returns the next retry time based on the non-generic JobRow // which includes an up-to-date Errors list. func (policy *LinearRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { // The latest error is not yet included in the job's Errors list, so we // add 1 to the length to account for that. return time.Now().Add((len(job.Errors) + 1) * 5 * time.Second) } client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ RetryPolicy: &LinearRetryPolicy{}, // ... }) ``` -------------------------------- ### Export All Migrations SQL (Down, Exclude V1) Source: https://riverqueue.com/docs/migrations Exports the raw SQL for all migrations, excluding version 1, in the 'down' direction. Useful for bootstrapping new projects with a custom migration setup. ```bash river migrate-get --line main --all --exclude-version 1 --down > river_all.down.sql ``` -------------------------------- ### Inserting with Ruby Hash Source: https://riverqueue.com/docs/ruby Demonstrates inserting jobs using a simple Ruby hash and `JobArgsHash` when a dedicated class is not needed. ```APIDOC ## Inserting with Ruby Hash ### Description Allows job insertion using a kind string and a JSON-serializable hash, bypassing the need to define a custom class for simple jobs. ### Method `client.insert(River::JobArgsHash.new(kind, hash_data))` ### Parameters - `kind` (String): A unique string identifying the job type. - `hash_data` (Hash): A hash containing the job arguments, which will be JSON serialized. ### Request Example ```ruby insert_res = client.insert(River::JobArgsHash.new("hash_kind", { job_num: 1 })) ``` ``` -------------------------------- ### Export All Migrations SQL (Up, Exclude V1) Source: https://riverqueue.com/docs/migrations Exports the raw SQL for all migrations, excluding version 1, in the 'up' direction. Useful for bootstrapping new projects with a custom migration setup. ```bash river migrate-get --line main --all --exclude-version 1 --up > river_all.up.sql ``` -------------------------------- ### Build and Run Custom Goose Binary Source: https://riverqueue.com/docs/migrations Commands to build a custom Goose migration binary and then run migrations against a PostgreSQL database using a provided connection string. ```bash go build -o goose-custom *.go ./goose-custom postgres "$DATABASE_URL" up ``` -------------------------------- ### Configure Default Queue with River Client Source: https://riverqueue.com/docs/multiple-queues Initialize a River client with a default queue and a maximum of 100 workers. Ensure proper error handling for client creation. ```go riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, Workers: workers, }) if err != nil { // handle error } ``` -------------------------------- ### Get Raw SQL for Sequence Migrations (CLI) Source: https://riverqueue.com/docs/pro/changelog Use the River Pro CLI to dump raw SQL for sequence migrations. This is an alternative to using River's internal migration system. ```bash go install riverqueue.com/riverpro/cmd/riverpro@latest riverpro migrate-get --version 2 --line sequence --up > river-sequence-v2.up.sql riverpro migrate-get --version 2 --line sequence --down > river-sequence-v2.down.sql ``` -------------------------------- ### Get River Client from Context (pgx.Tx) Source: https://riverqueue.com/docs/context-client Retrieve the River client for use within a worker job. The type parameter `pgx.Tx` should match the generic type parameter of the underlying `Client[pgx.Tx]`. ```go func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyArgs]) error { client := river.ClientFromContext[pgx.Tx](ctx) ... } ``` -------------------------------- ### Initialize Async Riverqueue Client Source: https://riverqueue.com/docs/python Set up an asynchronous Riverqueue client using SQLAlchemy's async engine and an async driver like `asyncpg`. This enables non-blocking I/O operations. ```python engine = sqlalchemy.ext.asyncio.create_async_engine("postgresql+asyncpg://...") client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine)) insert_res = await client.insert( SortArgs(strings=["whale", "tiger", "bear"]), ) ``` -------------------------------- ### Get Raw SQL for River Pro Migrations (CLI) Source: https://riverqueue.com/docs/pro/changelog Use the River CLI to dump raw SQL for Pro database migrations. This is an alternative to using River's internal migration system. ```bash go install riverqueue.com/riverpro/cmd/riverpro@latest riverpro migrate-get --version 2 --line pro --up > river-pro-v2.up.sql riverpro migrate-get --version 2 --line pro --down > river-pro-v2.down.sql ```