### Install conc package Source: https://github.com/sourcegraph/conc/blob/main/README.md Command to install the conc Go package using the go get command. This is the initial step to include the library in your Go project. ```sh go get github.com/sourcegraph/conc ``` -------------------------------- ### Process Slice in Pool: stdlib vs conc Source: https://github.com/sourcegraph/conc/blob/main/README.md Illustrates processing elements of a slice concurrently using a worker pool. The `conc.iter.ForEach` function provides a high-level abstraction for this common task, simplifying the setup and management of goroutines. ```go import ( "sync" "sync/atomic" ) func handle(elem int) {} func process(values []int) { feeder := make(chan int, 8) var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for elem := range feeder { handle(elem) } }() } for _, value := range values { feeder <- value } close(feeder) wg.Wait() } ``` ```go import "github.com/sourcegraph/conc/iter" func handle(elem int) {} func process(values []int) { iter.ForEach(values, handle) } ``` -------------------------------- ### Spawn Goroutines: stdlib vs conc Source: https://github.com/sourcegraph/conc/blob/main/README.md Compares the standard library's `sync.WaitGroup` with `conc.WaitGroup` for managing a pool of goroutines. The `conc` version simplifies the pattern by directly launching goroutines with `wg.Go()`. ```go import "sync" func doSomething() {} func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() // crashes on panic! doSomething() }() } wg.Wait() } ``` ```go import "github.com/sourcegraph/conc" func doSomething() {} func main() { var wg conc.WaitGroup for i := 0; i < 10; i++ { wg.Go(doSomething) } wg.Wait() } ``` -------------------------------- ### Process Stream in Pool: stdlib vs conc Source: https://github.com/sourcegraph/conc/blob/main/README.md Demonstrates processing elements from a channel using a fixed-size pool of goroutines. The `conc` library's `pool.New().WithMaxGoroutines()` offers a more concise way to manage this pattern. ```go import "sync" func handle(elem int) {} func process(stream chan int) { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for elem := range stream { handle(elem) } }() } wg.Wait() } ``` ```go import "github.com/sourcegraph/conc/pool" func handle(elem int) {} func process(stream chan int) { p := pool.New().WithMaxGoroutines(10) for elem := range stream { elem := elem p.Go(func() { handle(elem) }) } p.Wait() } ``` -------------------------------- ### Process Ordered Stream: stdlib vs conc Source: https://github.com/sourcegraph/conc/blob/main/README.md Shows how to process an ordered stream concurrently, ensuring output order is maintained. The `conc.stream.New().WithMaxGoroutines()` approach simplifies the coordination of workers and result callbacks. ```go import "sync" func mapStream( in chan int, out chan int, f func(int) int, ) { tasks := make(chan func()) taskResults := make(chan chan int) // Worker goroutines var workerWg sync.WaitGroup for i := 0; i < 10; i++ { workerWg.Add(1) go func() { defer workerWg.Done() for task := range tasks { task() } }() } // Ordered reader goroutines var readerWg sync.WaitGroup readerWg.Add(1) go func() { defer readerWg.Done() for result := range taskResults { item := <-result out <- item } }() // Feed the workers with tasks for elem := range in { resultCh := make(chan int, 1) taskResults <- resultCh tasks <- func() { resultCh <- f(elem) } } // We've exhausted input. // Wait for everything to finish close(tasks) workerWg.Wait() close(taskResults) readerWg.Wait() } ``` ```go import "github.com/sourcegraph/conc/stream" func mapStream( in chan int, out chan int, f func(int) int, ) { s := stream.New().WithMaxGoroutines(10) for elem := range in { elem := elem s.Go(func() stream.Callback { res := f(elem) return func() { out <- res } }) } s.Wait() } ``` -------------------------------- ### Graceful Panic Handling with conc.WaitGroup Source: https://github.com/sourcegraph/conc/blob/main/README.md Compares the standard library's manual panic handling boilerplate with the conc package's simplified approach. conc.WaitGroup automatically captures panics from spawned goroutines and re-panics them on Wait() with enriched stack traces. ```go type caughtPanicError struct { val any stack []byte } func (e *caughtPanicError) Error() string { return fmt.Sprintf( "panic: %q\n%s", e.val, string(e.stack) ) } func main() { done := make(chan error) go func() { defer func() { if v := recover(); v != nil { done <- &caughtPanicError{ val: v, stack: debug.Stack() } } else { done <- nil } }() doSomethingThatMightPanic() }() err := <-done if err != nil { panic(err) } } ``` ```go func main() { var wg conc.WaitGroup wg.Go(doSomethingThatMightPanic) // panics with a nice stacktrace wg.Wait() } ``` -------------------------------- ### Concurrent Map Slice: stdlib vs conc Source: https://github.com/sourcegraph/conc/blob/main/README.md Compares standard library Go code for concurrently mapping a slice with `conc.iter.Map`. The `conc` version is significantly more concise, abstracting away the manual management of goroutines and index tracking. ```go import ( "sync" "sync/atomic" ) func concMap( input []int, f func(int) int, ) []int { res := make([]int, len(input)) var idx atomic.Int64 var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for { i := int(idx.Add(1) - 1) if i >= len(input) { return } res[i] = f(input[i]) } }() } wg.Wait() return res } ``` ```go import "github.com/sourcegraph/conc/iter" func concMap( input []int, f func(*int) int, ) []int { return iter.Map(input, f) } ``` -------------------------------- ### Panic Handling Source: https://github.com/sourcegraph/conc/blob/main/README.md The 'panics' package provides a Catcher utility to safely catch panics occurring in goroutines. ```APIDOC panics.Catcher: A utility to catch panics in goroutines. Creation: New(): Creates a new Catcher. Methods: Try(f func()) (r interface{}) Executes the function f and returns any panic value. If no panic occurs, returns nil. Usage: var c panics.Catcher defer func() { if r := c.Try(nil); r != nil { fmt.Printf("Caught panic: %v\n", r) } }() go func() { panic("something went wrong") }() ``` -------------------------------- ### Concurrent Iteration Source: https://github.com/sourcegraph/conc/blob/main/README.md The 'iter' package offers utilities for performing operations concurrently over slices, such as mapping or iterating. ```APIDOC iter.Map: Concurrently maps a slice of input values to a slice of output values. Usage: input := []int{1, 2, 3, 4, 5} output := iter.Map(input, func(i int) int { return i * 2 }) // output will be []int{2, 4, 6, 8, 10} ``` ```APIDOC iter.ForEach: Concurrently iterates over a slice, applying a function to each element. Usage: items := []string{"a", "b", "c"} iter.ForEach(items, func(item string) { fmt.Println(item) }) ``` -------------------------------- ### conc.WaitGroup Source: https://github.com/sourcegraph/conc/blob/main/README.md Provides a safer alternative to Go's standard sync.WaitGroup, offering structured concurrency benefits for managing goroutines. ```APIDOC conc.WaitGroup: A WaitGroup that ensures all goroutines complete before the WaitGroup is done. It is a drop-in replacement for sync.WaitGroup but with added safety features for structured concurrency. Methods: Add(delta int) Adds delta, the number of goroutines to wait for, to the WaitGroup counter. If delta is negative, it decrements the counter. Done() Calls Add(-1) to indicate that a goroutine has finished. Wait() Blocks until the WaitGroup counter is zero, meaning all goroutines have completed. Example: var wg conc.WaitGroup wg.Add(1) go func() { defer wg.Done() // ... do work ... }() wg.Wait() ``` -------------------------------- ### Pools for Concurrency Management Source: https://github.com/sourcegraph/conc/blob/main/README.md The 'pool' package offers various types of concurrency-limited task runners. They can be configured to manage goroutines, collect results, handle errors, and support context cancellation. ```APIDOC pool.Pool: A concurrency-limited task runner. Creation: New(): Creates a new Pool with default settings. NewWithResults[T](): Creates a new Pool that collects results of type T. Configuration Methods: WithMaxGoroutines(n int): Sets the maximum number of goroutines the pool can run concurrently. WithErrors(): Configures the pool to run tasks that return errors. Errors are collected. WithContext(ctx context.Context): Configures the pool to run tasks that should be canceled on the first error. Usage: p := pool.New() p.WithMaxGoroutines(10) p.Go(func() { /* task */ }) p.Wait() ``` ```APIDOC pool.ResultPool[T]: A concurrency-limited task runner that collects results of type T. Creation: NewWithResults[T](): Creates a new ResultPool for results of type T. Configuration Methods: WithMaxGoroutines(n int): Sets the maximum number of goroutines. WithErrors(): Configures the pool to run tasks that return errors. WithContext(ctx context.Context): Configures the pool to run tasks that should be canceled on the first error. WithCollectErrored(): Configures the pool to collect results even when a task errors. Usage: p := pool.NewWithResults[int]() p.WithMaxGoroutines(5) p.Go(func() (int, error) { return 42, nil }) results := p.Wait() // results will contain the collected return values. ``` ```APIDOC pool.ErrorPool: A concurrency-limited task runner for tasks that return errors. Creation: New(): Creates a new ErrorPool. NewWithResults[T](): Creates a new ErrorPool that collects results of type T. Configuration Methods: WithMaxGoroutines(n int): Sets the maximum number of goroutines. WithFirstError(): Configures the pool to only keep the first returned error, rather than aggregating all errors. WithContext(ctx context.Context): Configures the pool to run tasks that should be canceled on the first error. Usage: p := pool.New() p.WithMaxGoroutines(5) p.WithErrors() p.Go(func() error { /* task that might error */ }) err := p.Wait() // err will be nil if all tasks succeeded, or the first error encountered. ``` ```APIDOC pool.ContextPool: A concurrency-limited task runner where tasks should be canceled on the first error. Creation: New(): Creates a new ContextPool. NewWithResults[T](): Creates a new ContextPool that collects results of type T. Configuration Methods: WithMaxGoroutines(n int): Sets the maximum number of goroutines. WithFirstError(): Configures the pool to only keep the first returned error. WithCollectErrored(): Configures the pool to collect results even when a task errors. Usage: ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := pool.NewWithContext(ctx) p.WithMaxGoroutines(5) p.Go(func() (int, error) { /* task */ }) results := p.Wait() ``` -------------------------------- ### Stream Processing Source: https://github.com/sourcegraph/conc/blob/main/README.md The 'stream' package provides a way to process an ordered stream of tasks in parallel while allowing serial callbacks for results or errors. ```APIDOC stream.Stream: Processes an ordered stream of tasks in parallel with serial callbacks. Creation: New(): Creates a new Stream. NewWithResults[T](): Creates a new Stream that collects results of type T. Configuration Methods: WithMaxGoroutines(n int): Sets the maximum number of goroutines to process tasks concurrently. WithErrors(): Configures the stream to run tasks that return errors. WithContext(ctx context.Context): Configures the stream to run tasks that should be canceled on the first error. WithFirstError(): Configures error streams to only keep the first returned error. WithCollectErrored(): Configures result streams to collect results even when the task errored. Usage: s := stream.NewWithResults[int]() s.WithMaxGoroutines(5) s.ForEach(func(item int) { /* process item */ }) s.Wait() ``` -------------------------------- ### Scoped Goroutine with conc.WaitGroup Source: https://github.com/sourcegraph/conc/blob/main/README.md Demonstrates how to use conc.WaitGroup to ensure goroutines are properly managed and waited upon. This approach enforces scoped concurrency, preventing goroutine leaks by requiring explicit waiting before a WaitGroup goes out of scope. ```go func main() { var wg conc.WaitGroup defer wg.Wait() startTheThing(&wg) } func startTheThing(wg *conc.WaitGroup) { wg.Go(func() { // Goroutine logic here }) } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.