### Start Redis Server and Navigate to Examples Source: https://github.com/marselester/gopher-celery/blob/main/README.md Commands to start a Redis server and navigate to the Redis examples directory. ```sh $ redis-server $ cd ./examples/redis ``` -------------------------------- ### Start RabbitMQ Server and Navigate to Examples Source: https://github.com/marselester/gopher-celery/blob/main/README.md Commands to start a RabbitMQ server and navigate to the RabbitMQ examples directory. ```sh $ rabbitmq-server $ cd ./examples/rabbitmq ``` -------------------------------- ### Implement Logging Middleware Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/app.md Example of a middleware that logs task start and completion. It accesses the task name from the context. ```go func loggingMiddleware(next celery.TaskF) celery.TaskF { return func(ctx context.Context, p *celery.TaskParam) error { taskName, _ := ctx.Value(celery.ContextKeyTaskName).(string) log.Printf("Starting task: %s", taskName) err := next(ctx, p) log.Printf("Task completed: %s, error: %v", taskName, err) return err } } app := celery.NewApp( celery.WithMiddlewares(loggingMiddleware), ) ``` -------------------------------- ### Complete Gopher-Celery Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md A full example demonstrating how to create a Gopher-Celery app, register a task handler, send a task, and run the worker. This includes setting max workers and task protocol version. ```go import ( "context" "fmt" "log" "os" "os/signal" celery "github.com/marselester/gopher-celery" ) func main() { // Create app with configuration app := celery.NewApp( celery.WithMaxWorkers(10), celery.WithTaskProtocol(celery.V2), ) // Register task handler app.Register( "myproject.tasks.greet", "default", func(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("name", "greeting") name := p.MustString("name") greeting := p.MustString("greeting") fmt.Printf("%s, %s!\n", greeting, name) return nil }, ) // Send a task if err := app.Delay("myproject.tasks.greet", "default", "Alice", "Hello"); err != nil { log.Fatal(err) } // Start worker ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() if err := app.Run(ctx); err != nil { log.Fatal(err) } } ``` -------------------------------- ### Example TaskF Implementation Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/types.md Demonstrates how to implement a TaskF function, accessing context metadata and task parameters. ```go var myTask celery.TaskF = func(ctx context.Context, p *celery.TaskParam) error { name, _ := ctx.Value(celery.ContextKeyTaskName).(string) taskID, _ := ctx.Value(celery.ContextKeyTaskID).(string) p.NameArgs("user_id", "action") userID := p.MustInt("user_id") action := p.MustString("action") log.Printf("[%s] Processing user %d action %s", taskID, userID, action) return nil } ``` -------------------------------- ### Redis Connection Pool Configuration Source: https://github.com/marselester/gopher-celery/blob/main/README.md Example of running the Go producer and a Redis-specific Go example, likely demonstrating how to pass a connection pool to the broker when Redis is not on localhost. ```sh $ go run ./producer/ {"err":null,"msg":"task was sent using protocol v2"} {"err":null,"msg":"task was sent using protocol v1"} $ go run ./redis/ ``` -------------------------------- ### RabbitMQ Fair Queue Processing Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Illustrates the round-robin queue order change after consumption. This example shows the state transitions of the queue list. ```text Initial order: [queue-a, queue-b, queue-c] Consume from queue-a ``` -------------------------------- ### Create and Send AsyncParam Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/types.md Example of creating an AsyncParam with arguments and expiration, then sending it via ApplyAsync. ```go param := &celery.AsyncParam{ Args: []interface{}{1, 2, "three"}, Kwargs: map[string]interface{}{"key": "value", "flag": true}, Expires: time.Now().Add(24 * time.Hour), } err := app.ApplyAsync("myproject.tasks.process", "important", param) ``` -------------------------------- ### Basic Consumer Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Shows how to set up a Celery app to consume tasks. It includes registering a task function and running the worker with context cancellation. ```go app := celery.NewApp() app.Register("myproject.tasks.process", "important", func(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("arg1", "arg2") // Process... return nil }) ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) app.Run(ctx) ``` -------------------------------- ### Implement Metrics Middleware Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/types.md Example of a middleware that records task execution duration and success/failure status. ```go func metricsMiddleware(next celery.TaskF) celery.TaskF { return func(ctx context.Context, p *celery.TaskParam) error { taskName, _ := ctx.Value(celery.ContextKeyTaskName).(string) start := time.Now() err := next(ctx, p) duration := time.Since(start).Seconds() recordMetric(taskName, duration, err != nil) return err } } app := celery.NewApp( celery.WithMiddlewares(metricsMiddleware), ) ``` -------------------------------- ### Basic Producer Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Demonstrates how to create a new Celery app and send a task using the Delay function. This is suitable for basic task dispatching. ```go app := celery.NewApp() err := app.Delay("myproject.tasks.process", "important", "arg1", "arg2") ``` -------------------------------- ### Fair Queue Processing Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Demonstrates how fairness ensures both fast and slow queues receive processing time. ```text Queues: [fast, slow] Iteration 1: Check fast (has 10 msgs), consume 1 → [slow, fast] Iteration 2: Check slow (has msgs), consume 1 → [fast, slow] Iteration 3: Check fast, consume 1 → [slow, fast] Iteration 4: Check slow, consume 1 → [fast, slow] ... ``` -------------------------------- ### Receive and Process Tasks Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/00-START-HERE.txt Register a task handler to receive and process tasks from a queue. This example shows how to access task arguments. ```go app := celery.NewApp() app.Register("myproject.tasks.process", "queue", func(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("arg1", "arg2") fmt.Println(p.MustString("arg1")) return nil }) app.Run(context.Background()) ``` -------------------------------- ### Registering a Custom Serializer Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Example of registering a ProtobufSerializer with specific MIME types. ```go registry.Register( &ProtobufSerializer{}, "application/x-protobuf", "binary", ) // Adds: "application/x-protobuf" → *ProtobufSerializer ``` -------------------------------- ### RabbitMQ Connection and Channel Setup Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Illustrates the RabbitMQ connection and channel structure, maintaining a single AMQP connection with per-queue channels for deliveries. Delivery channels are created on-demand. ```go conn *amqp.Connection channel *amqp.Channel delivery map[string]<-chan amqp.Delivery // One per queue ``` -------------------------------- ### Move2back Function Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Demonstrates the usage of the Move2back function. After consuming a message from a queue, that queue is moved to the end to allow other queues to be processed next. ```go queues := []string{"a", "b", "c"} broker.Move2back(queues, "a") // Result: []string{"b", "c", "a"} ``` -------------------------------- ### Create Celery App with Custom Options Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/configuration.md Example of creating a new Celery app with several custom configuration options. This demonstrates passing multiple Option functions to NewApp(). ```go app := celery.NewApp( celery.WithLogger(logger), celery.WithMaxWorkers(500), celery.WithTaskProtocol(1), celery.WithBroker(customBroker), ) ``` -------------------------------- ### Redis Queue Processing Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Illustrates the fair queue processing mechanism using BRPOP command in go-redis. Queues are reordered after each pop to prevent starvation. ```text Initial order: [queue-a, queue-b, queue-c] Pop from queue-a → order becomes: [queue-b, queue-c, queue-a] Pop from queue-c → order becomes: [queue-b, queue-a, queue-c] ``` -------------------------------- ### Unfair Queue Processing Example Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Illustrates how a fast queue can starve a slow queue without fairness mechanisms. ```text Queues: [fast, slow] Iteration 1: Check fast (has 10 msgs), consume 1 Iteration 2: Check fast again (still has 9 msgs), consume 1 ... Iteration 100: Check fast again (still has 1 msg), consume it Iteration 101: FINALLY check slow (has been waiting) ``` -------------------------------- ### Custom Middleware Implementation Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Example of how to define and apply custom middleware to wrap task functions. Middleware can be used for cross-cutting concerns before or after task execution. ```go func myMiddleware(next celery.TaskF) celery.TaskF { return func(ctx context.Context, p *celery.TaskParam) error { // Before task err := next(ctx, p) // After task return err } } app := celery.NewApp(celery.WithMiddlewares(myMiddleware)) ``` -------------------------------- ### Prometheus Task Metrics Collection Source: https://github.com/marselester/gopher-celery/blob/main/README.md Demonstrates running Go producer and metrics exporter, then querying Prometheus metrics via curl. Shows example output for task duration and total tasks processed. ```sh $ go run ./producer/ $ go run ./metrics/ $ curl http://0.0.0.0:8080/metrics # HELP task_duration_seconds How long it took in seconds to process a task. # TYPE task_duration_seconds histogram task_duration_seconds_bucket{task="myproject.mytask",le="0.016"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="0.032"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="0.064"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="0.128"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="0.256"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="0.512"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="1.024"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="2.048"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="4.096"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="8.192"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="16.384"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="32.768"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="60"} 2 task_duration_seconds_bucket{task="myproject.mytask",le="+Inf"} 2 task_duration_seconds_sum{task="myproject.mytask"} 7.2802e-05 task_duration_seconds_count{task="myproject.mytask"} 2 # HELP tasks_total How many Celery tasks processed, partitioned by task name and error. # TYPE tasks_total counter tasks_total{error="false",task="myproject.mytask"} 2 ``` -------------------------------- ### Configure Logging with go-kit/log Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Configure logging for the Gopher-Celery app using the WithLogger option. This example demonstrates creating a JSON logger that writes to standard error. ```go import "github.com/go-kit/log" logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr)) app := celery.NewApp(celery.WithLogger(logger)) ``` -------------------------------- ### Consume and Process Tasks (Consumer Pattern) Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md Implement this pattern to consume and process tasks from a message queue. Register task handlers before starting the workers. ```go import ( "context" "fmt" "log" "os" "os/signal" celery "github.com/marselester/gopher-celery" ) func main() { app := celery.NewApp() // Register a task handler app.Register( "myproject.apps.tasks.process_payment", "payments", func(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("order_id", "amount") orderID := p.MustInt("order_id") amount := p.MustFloat("amount") fmt.Printf("Processing payment: order=%d amount=%.2f\n", orderID, amount) return nil }, ) // Start workers ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() if err := app.Run(ctx); err != nil { log.Printf("worker error: %v", err) } } ``` -------------------------------- ### V2 Example JSON Payload Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Provides an example of a v2 message envelope's JSON payload. This version includes detailed header information such as task ID, origin, and expiration. ```json { "body": "W1td...", "content-type": "application/json", "content-encoding": "utf-8", "headers": { "lang": "go", "id": "uuid-1234", "root_id": "uuid-1234", "task": "myproject.tasks.process", "origin": "12345@myhost", "expires": "2024-01-02T15:30:00Z", "retries": 0 }, "properties": {...} } ``` -------------------------------- ### Create Default Redis Broker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-redis.md Creates a new Redis broker instance using default connection settings. This is suitable for quick setup or when Redis is running on localhost. ```go package redis func NewBroker(options ...BrokerOption) *Broker ``` ```go import "github.com/marselester/gopher-celery/redis" broker := redis.NewBroker() app := celery.NewApp( celery.WithBroker(broker), ) ``` -------------------------------- ### Logging Middleware for Task Execution Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md Implement a middleware to add structured logging around task execution. This middleware logs task start, errors, and completion events. ```go import ( context "context" "github.com/go-kit/log" celery "github.com/marselester/gopher-celery" ) func loggingMiddleware(logger log.Logger) celery.Middleware { return func(next celery.TaskF) celery.TaskF { return func(ctx context.Context, p *celery.TaskParam) error { name, _ := ctx.Value(celery.ContextKeyTaskName).(string) taskID, _ := ctx.Value(celery.ContextKeyTaskID).(string) logger.Log("event", "task_start", "task", name, "id", taskID) err := next(ctx, p) if err != nil { logger.Log("event", "task_error", "task", name, "id", taskID, "error", err) } else { logger.Log("event", "task_done", "task", name, "id", taskID) } return err } } } // Usage: app := celery.NewApp( celery.WithLogger(logger), celery.WithMiddlewares(loggingMiddleware(logger)), ) ``` -------------------------------- ### MustInt: Safely Get Integer Task Parameter Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Demonstrates retrieving an integer parameter using MustInt, showing how it can be sourced from either positional or keyword arguments. ```go // All these invocations would work with the same task: p := celery.NewTaskParam( []interface{}{42}, map[string]interface{}{}, ) p.NameArgs("count") count := p.MustInt("count") // Gets 42 from positional // Or: p := celery.NewTaskParam( []interface{}{}, map[string]interface{}{"count": 42}, ) p.NameArgs("count") count := p.MustInt("count") // Gets 42 from kwargs ``` -------------------------------- ### Configure New Celery App with Options Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/types.md Demonstrates creating a new Celery application instance with various configuration options, such as logger, max workers, and broker. ```go app := celery.NewApp( celery.WithLogger(logger), celery.WithMaxWorkers(100), celery.WithTaskProtocol(2), celery.WithBroker(customBroker), ) ``` -------------------------------- ### V1 Example JSON Payload Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Illustrates the JSON structure for a v1 message envelope. This example shows how task data and delivery properties are represented in JSON format. ```json { "body": "W3si...", "content-type": "application/json", "content-encoding": "utf-8", "headers": {}, "properties": { "delivery_info": { "exchange": "important", "routing_key": "important" }, "correlation_id": "uuid-1234", "reply_to": "uuid-5678", "body_encoding": "base64", "delivery_tag": "uuid-9012", "delivery_mode": 2, "priority": 0 } } ``` -------------------------------- ### Reusable Buffer Management with sync.Pool Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Demonstrates how to create and use a sync.Pool for reusing bytes.Buffer instances to reduce GC pressure and improve performance during encoding/decoding. ```go // Create pool pool: sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }, // Use pool buf := ser.pool.Get().(*bytes.Buffer) buf.Reset() // Clear previous contents // ... write to buf ser.pool.Put(buf) // Return to pool ``` -------------------------------- ### Get Float64 Parameter Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Retrieves a parameter by name and casts it to a float64. Panics if the parameter is not found or if the value is not a float64. ```go func (p *TaskParam) MustFloat(name string) float64 ``` ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("price", "tax_rate") price := p.MustFloat("price") taxRate := p.MustFloat("tax_rate") total := price * (1.0 + taxRate) fmt.Println("Total:", total) return nil } ``` -------------------------------- ### JSONSerializer Struct Definition Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/protocol.md Defines the JSONSerializer struct, which includes a buffer pool for encoding and a function to get the current time. ```go type JSONSerializer struct { pool sync.Pool // Buffer pool for encoding now func() time.Time } ``` -------------------------------- ### go-redis Broker Configuration with Authentication Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Sets up a go-redis broker with authentication. Provide the correct password and DB number for your Redis instance. ```go client := redis.NewClient(&redis.Options{ Addr: "redis-host:6379", Password: "your-password", DB: 0, }) broker := goredis.NewBroker(goredis.WithClient(client)) ``` -------------------------------- ### Run Benchmarks and Save Output Source: https://github.com/marselester/gopher-celery/blob/main/README.md Run all benchmarks in the internal directory with memory profiling and save the output to a file for later comparison. The `-count=10` flag runs each benchmark multiple times. ```sh $ go test -bench=. -benchmem -count=10 ./internal/... | tee bench-new.txt ``` -------------------------------- ### Create a new Celery App instance Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/app.md Use NewApp to create a Celery application. Configure it using variable-length option functions. Defaults include Redis broker, JSON serializer, and protocol v2. ```go import celery "github.com/marselester/gopher-celery" app := celery.NewApp( celery.WithLogger(logger), celery.WithMaxWorkers(100), celery.WithTaskProtocol(2), ) ``` -------------------------------- ### Python Task Definition Source: https://github.com/marselester/gopher-celery/blob/main/README.md A simple Python function that can be called as a Celery task. This example shows a basic task that prints the sum of two arguments. ```python def mytask(a, b): print(a + b) ``` -------------------------------- ### NewBroker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Creates a message broker instance backed by RabbitMQ. It connects to localhost with guest credentials by default and can be configured further using BrokerOption functions. ```APIDOC ## NewBroker (rabbitmq) ### Description Creates a message broker backed by RabbitMQ using `github.com/rabbitmq/amqp091-go`. By default connects to `localhost` with guest credentials. ### Parameters #### Options - **options** (`...BrokerOption`) - Required - Variable-length option functions to configure the broker ### Returns - `*Broker` - Configured RabbitMQ broker - `error` - On connection or channel creation failure ### Default connection `amqp://guest:guest@localhost:5672/` ### Example ```go import "github.com/marselester/gopher-celery/rabbitmq" broker, err := rabbitmq.NewBroker() if err != nil { log.Fatal(err) } app := celery.NewApp( celery.WithBroker(broker), ) ``` ``` -------------------------------- ### Using go-redis as a Broker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md This snippet shows how to configure Gopher-Celery to use go-redis as the message broker. Ensure you have go-redis installed and a Redis server running. ```go import ( "github.com/redis/go-redis/v9" celery "github.com/marselester/gopher-celery" "github.com/marselester/gopher-celery/goredis" ) func main() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "password", }) broker := goredis.NewBroker( goredis.WithClient(client), ) app := celery.NewApp( celery.WithBroker(broker), ) // Use app... } ``` -------------------------------- ### WithClient Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Provides a `BrokerOption` to supply a custom pre-configured go-redis client to the broker. This is useful for setting custom connection parameters, authentication, and database selection. ```APIDOC ## WithClient ### Description Provides a custom Redis client. Use this to set custom connection parameters, authentication, database selection, or other go-redis configuration. ### Method Signature `func WithClient(c *redis.Client) BrokerOption` ### Parameters #### Client - `c` (*redis.Client) - Configured go-redis client ### Example ```go import ( "github.com/redis/go-redis/v9" "github.com/marselester/gopher-celery/goredis" ) client := redis.NewClient(&redis.Options{ Addr: "redis-host:6379", Password: "your-password", DB: 0, }) broker := goredis.NewBroker( goredis.WithClient(client), ) ``` ``` -------------------------------- ### Register Task with Keyword Arguments Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md Register a task that expects keyword arguments from the Python side. Use `p.Get()` to retrieve values by their keyword name. ```go app.Register( "myproject.tasks.notify", "notifications", func(ctx context.Context, p *celery.TaskParam) error { // Get from kwargs (Python sends these as named args) userID, ok := p.Get("user_id") if !ok { return fmt.Errorf("user_id required") } message, ok := p.Get("message") if !ok { return fmt.Errorf("message required") } fmt.Printf("Notify user %v: %v\n", userID, message) return nil }, ) // Called from Python as: // notify.delay(user_id=123, message="Hello") ``` -------------------------------- ### MustBool: Safely Get Boolean Task Parameter Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Use MustBool to retrieve a boolean parameter by name. It panics if the parameter is not found or if its value cannot be cast to a boolean. ```go func (p *TaskParam) MustBool(name string) bool ``` ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("enabled", "admin") enabled := p.MustBool("enabled") isAdmin := p.MustBool("admin") if isAdmin && enabled { fmt.Println("Admin user enabled") } return nil } ``` -------------------------------- ### TaskParam.NameArgs Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Assigns names to positional arguments for later access by name using `Get()` or `Must*()` methods. The order of names corresponds to the order of positional arguments. ```APIDOC ## TaskParam.NameArgs ### Description Assigns names to positional arguments for later access by name using `Get()` or `Must*()` methods. The order of names corresponds to the order of positional arguments. ### Parameters #### Path Parameters - **name** (`...string`) - Description: Names corresponding to each positional argument in order ### Example ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("first", "second", "third") // Now these work: a := p.MustString("first") b := p.MustInt("second") c := p.MustFloat("third") return nil } ``` ``` -------------------------------- ### WithClient Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Provides a pre-configured AMQP connection. Use this if you want to manage the connection externally or share it across brokers. ```APIDOC ## WithClient (rabbitmq) ### Description Provides a pre-configured AMQP connection. Use this if you want to manage the connection externally or share it across brokers. ### Method Signature ```go func WithClient(c *amqp.Connection) BrokerOption ``` ### Parameters #### Query Parameters - **c** (*amqp.Connection) - Required - Existing AMQP connection ### Example ```go import amqp "github.com/rabbitmq/amqp091-go" conn, err := amqp.Dial("amqp://localhost:5672/") if err != nil { log.Fatal(err) } broker, err := rabbitmq.NewBroker( rabbitmq.WithClient(conn), ) ``` ``` -------------------------------- ### Create New RabbitMQ Broker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Creates a new message broker instance backed by RabbitMQ. It connects to localhost with default guest credentials unless otherwise specified by options. ```go package rabbitmq func NewBroker(options ...BrokerOption) (*Broker, error) ``` ```go import "github.com/marselester/gopher-celery/rabbitmq" broker, err := rabbitmq.NewBroker() if err != nil { log.Fatal(err) } app := celery.NewApp( celery.WithBroker(broker), ) ``` -------------------------------- ### MustString: Get Task Parameter as String Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Looks up a parameter by name and casts it to a string. Panics if the parameter is missing or not a string. The panic is logged by the worker and does not affect other tasks. ```go func (p *TaskParam) MustString(name string) string ``` ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("username", "email") username := p.MustString("username") email := p.MustString("email") fmt.Printf("User: %s <%s>\n", username, email) return nil } ``` -------------------------------- ### Compare Benchmark Results Between Two Files Source: https://github.com/marselester/gopher-celery/blob/main/README.md Compare the benchmark results from two different runs (e.g., `bench-old.txt` and `bench-new.txt`) using `benchstat` to see performance differences. ```sh $ benchstat bench-old.txt bench-new.txt ``` -------------------------------- ### Get Task Parameter by Name Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Retrieves a parameter by name, searching keyword arguments first, then named positional arguments. Returns the value and a boolean indicating if it was found. ```go func (p *TaskParam) Get(name string) (v interface{}, ok bool) ``` ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("a", "b") // Try to get from kwargs first, then named args if val, ok := p.Get("a"); ok { fmt.Println("Got value:", val) } else { fmt.Println("Parameter 'a' not found") } return nil } ``` -------------------------------- ### Register Task with Mixed Arguments Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md Register a task that can handle a mix of positional, named positional, and keyword arguments. Use `p.Args()` for positional arguments and `p.Get()` for named positional or keyword arguments. ```go app.Register( "myproject.tasks.create_user", "accounts", func(ctx context.Context, p *celery.TaskParam) error { // Positional args args := p.Args() if len(args) > 0 { email := args[0].(string) fmt.Printf("Email from position: %s\n", email) } // Named positional args p.NameArgs("email", "name") if name, ok := p.Get("name"); ok { fmt.Printf("Name from position: %v\n", name) } // Keyword args if role, ok := p.Get("role"); ok { fmt.Printf("Role from kwargs: %v\n", role) } return nil }, ) ``` -------------------------------- ### Configure Redis Broker with Custom Client Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Provides a custom go-redis client to the broker. This allows for specifying connection details like address, password, and database. ```go func WithClient(c *redis.Client) BrokerOption ``` ```go import ( "github.com/redis/go-redis/v9" "github.com/marselester/gopher-celery/goredis" ) client := redis.NewClient(&redis.Options{ Addr: "redis-host:6379", Password: "your-password", DB: 0, }) broker := goredis.NewBroker( goredis.WithClient(client), ) ``` -------------------------------- ### Analyze Benchmark Results with Benchstat Source: https://github.com/marselester/gopher-celery/blob/main/README.md Compare benchmark results from a new run against an old file using the `benchstat` tool. This helps in identifying performance regressions or improvements. ```sh $ benchstat bench-old.txt ``` -------------------------------- ### Task Retries within Goroutine (Redis) Source: https://github.com/marselester/gopher-celery/blob/main/README.md Illustrates implementing task retries within the same goroutine when direct Celery task retries are not supported. Shows example output of failed attempts. ```sh $ go run ./retry/ ... {"attempt":1,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:23.401191Z"} {"attempt":2,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:28.337204Z"} {"attempt":3,"err":"uh oh","msg":"request failed","ts":"2022-08-07T23:42:37.279873Z"} ``` -------------------------------- ### Custom Broker Configuration (RabbitMQ) Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Demonstrates how to configure the Celery app to use a custom broker, in this case, RabbitMQ, by providing a specific AMQP URI. ```go broker := rabbitmq.NewBroker( rabbitmq.WithAmqpUri("amqp://user:pass@host/"), ) app := celery.NewApp(celery.WithBroker(broker)) ``` -------------------------------- ### Get Integer Parameter with Automatic Float Conversion Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Retrieves a parameter by name and casts it to an integer. It handles both native integers and JSON-decoded floats (which are often float64). Panics if the parameter is not found or if the type cannot be converted. ```go func (p *TaskParam) MustInt(name string) int ``` ```go func myTask(ctx context.Context, p *celery.TaskParam) error { p.NameArgs("a", "b") a := p.MustInt("a") // Works even if decoded as float64 from JSON b := p.MustInt("b") fmt.Println("Sum:", a+b) return nil } ``` -------------------------------- ### Configure Redis Client with Connection Pooling (go-redis) Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Use `redis.NewClient` with `redis.Options` to configure internal connection pooling for go-redis. This maintains a minimum number of idle connections. ```go client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", PoolSize: 10, MinIdleConns: 5, }) ``` -------------------------------- ### Send a Task Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/00-START-HERE.txt Use this snippet to send a task to a specified queue with arguments. Ensure you have initialized the Celery app. ```go app := celery.NewApp() err := app.Delay("myproject.tasks.process", "queue", arg1, arg2) ``` -------------------------------- ### Import Core and Broker Packages Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Import the core Gopher Celery package and specific broker implementations like Redis, go-redis, or RabbitMQ. ```go // Core import celery "github.com/marselester/gopher-celery" // Brokers import "github.com/marselester/gopher-celery/redis" // gomodule/redigo import "github.com/marselester/gopher-celery/goredis" // redis/go-redis import "github.com/marselester/gopher-celery/rabbitmq" // rabbitmq/amqp091-go // Protocol (internal, rarely needed) import "github.com/marselester/gopher-celery/protocol" ``` -------------------------------- ### Create New Serializer Registry Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/protocol.md Initializes a new registry with the JSON serializer pre-registered for common MIME types. Use this to manage message serialization and deserialization. ```go func NewSerializerRegistry() *SerializerRegistry ``` ```go registry := protocol.NewSerializerRegistry() ``` -------------------------------- ### Celery App Creation Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/INDEX.md Creates a new Celery application instance with optional configurations. ```APIDOC ## NewApp ### Description Creates a new Celery application instance. ### Signature `NewApp(options ...Option) *App` ### Parameters - `options` (*Option) - Variadic list of configuration options. ``` -------------------------------- ### Import Gopher Celery Brokers Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/INDEX.md Import message broker implementations for Gopher Celery. Choose the appropriate import based on your chosen broker. ```go // Brokers import "github.com/marselester/gopher-celery/redis" // Redis (redigo) import "github.com/marselester/gopher-celery/goredis" // Redis (go-redis) import "github.com/marselester/gopher-celery/rabbitmq" // RabbitMQ ``` -------------------------------- ### NewBroker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-redis.md Creates a message broker backed by Redis, using github.com/gomodule/redigo for connections. By default connects to localhost:6379. ```APIDOC ## NewBroker ### Description Creates a message broker backed by Redis, using `github.com/gomodule/redigo` for connections. By default connects to `localhost:6379`. ### Signature ```go func NewBroker(options ...BrokerOption) *Broker ``` ### Parameters #### Options - **options** (`...BrokerOption`) - Variable-length option functions to configure the broker ### Returns - `*Broker` - configured Redis broker ### Default connection `redis://localhost` (uses redigo's default port 6379) ### Example ```go import "github.com/marselester/gopher-celery/redis" broker := redis.NewBroker() app := celery.NewApp( celery.WithBroker(broker), ) ``` ``` -------------------------------- ### Middleware Execution Order Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/types.md Illustrates that middleware functions are composed in reverse order of provision, with the first provided middleware acting as the outermost wrapper. ```go // With chain [mw1, mw2, mw3]: // Execution order: mw1 → mw2 → mw3 → actual task ``` -------------------------------- ### Configure Redis Broker with Connection Pooling and Receive Timeout Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-redis.md Sets up a Redis connection pool with a read timeout larger than the receive timeout to prevent i/o errors. Initializes a Redis broker with the configured pool and receive timeout. ```go pool := &redis.Pool{ Dial: func() (redis.Conn, error) { return redis.Dial( "tcp", "localhost:6379", redis.DialReadTimeout(15*time.Second), ) }, } broker := redis.NewBroker( redis.WithPool(pool), redis.WithReceiveTimeout(10*time.Second), ) ``` -------------------------------- ### Observe RabbitMQ Queues Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Declares and observes queues for incoming tasks. Creates queues if they don't exist. Use this to set up which queues the broker should listen to. ```go func (br *Broker) Observe(queues []string) error ``` ```go err := broker.Observe([]string{"important", "background"}) ``` -------------------------------- ### Consumer with Metrics Middleware Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Illustrates how to integrate custom middleware, specifically for metrics, when creating a Celery app. This allows for monitoring task execution. ```go app := celery.NewApp( celery.WithMiddlewares(metricsMiddleware), ) ``` -------------------------------- ### Configure Custom Message Broker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/configuration.md Use WithBroker to provide a custom message broker implementation. This allows integration with various message queue systems like Redis or RabbitMQ, or any custom transport satisfying the Broker interface. ```go import ( celery "github.com/marselester/gopher-celery" "github.com/marselester/gopher-celery/rabbitmq" ) broker, err := rabbitmq.NewBroker( rabbitmq.WithAmqpUri("amqp://user:pass@rabbitmq:5672/"), ) if err != nil { log.Fatal(err) } app := celery.NewApp(celery.WithBroker(broker)) ``` -------------------------------- ### Create TaskParam Instance Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/task-param.md Use NewTaskParam to create a TaskParam accessor. It accepts positional and keyword arguments for flexible task parameter handling. ```go p := celery.NewTaskParam( []interface{}{1, 2, 3}, map[string]interface{}{"key": "value"}, ) ``` -------------------------------- ### Provide Pre-configured AMQP Connection Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Provides a pre-configured AMQP connection to the RabbitMQ broker. Use this if you want to manage the connection externally or share it across brokers. ```go import amqp "github.com/rabbitmq/amqp091-go" conn, err := amqp.Dial("amqp://localhost:5672/") if err != nil { log.Fatal(err) } broker, err := rabbitmq.NewBroker( rabbitmq.WithClient(conn), ) ``` -------------------------------- ### Buffer Pool Initialization Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/internals.md Initializes a sync.Pool for reusing bytes.Buffer instances to minimize memory allocations during JSON encoding. ```go pool sync.Pool // Initialized with bytes.Buffer ``` -------------------------------- ### Configure RabbitMQ Broker with AMQP URI Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-rabbitmq.md Sets a custom AMQP connection URI for the RabbitMQ broker. This allows for specifying host, port, credentials, and vhost. ```go func WithAmqpUri(amqpUri string) BrokerOption ``` ```go broker, err := rabbitmq.NewBroker( rabbitmq.WithAmqpUri("amqp://user:pass@rabbitmq:5672/"), ) if err != nil { log.Fatal(err) } ``` -------------------------------- ### Basic go-redis Broker Configuration Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Configures a go-redis broker with a new redis client. Ensure the redis server is accessible at localhost:6379. ```go import ( "github.com/redis/go-redis/v9" "github.com/marselester/gopher-celery/goredis" ) client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) broker := goredis.NewBroker( goredis.WithClient(client), ) ``` -------------------------------- ### Send Task with Positional Arguments using App.Delay Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/app.md Use App.Delay for a quick way to send a task with only positional arguments. Ensure the task path and queue are correctly specified. ```go func (a *App) Delay(path, queue string, args ...interface{}) error ``` ```go err := app.Delay( "myproject.apps.myapp.tasks.mytask", "important", 2, 3, ) if err != nil { log.Printf("failed to send task: %v", err) } ``` -------------------------------- ### Accessing Task Metadata from Context Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/README.md Demonstrates how to retrieve task-specific metadata, such as the task name and ID, from the context within a task's execution. ```go taskName, _ := ctx.Value(celery.ContextKeyTaskName).(string) taskID, _ := ctx.Value(celery.ContextKeyTaskID).(string) ``` -------------------------------- ### NewBroker Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Creates a new message broker instance backed by Redis using the go-redis library. It allows for default connection settings or custom configurations via broker options. ```APIDOC ## NewBroker ### Description Creates a message broker backed by Redis using `github.com/redis/go-redis` for connections. By default connects to `localhost:6379` with no authentication. ### Method Signature `func NewBroker(options ...BrokerOption) *Broker` ### Parameters #### Options - `options` (...BrokerOption) - Variable-length option functions to configure the broker ### Returns - `*Broker` - Configured Redis broker ### Default Connection Uses `redis.NewClient()` with default options (localhost:6379). ### Example ```go import "github.com/marselester/gopher-celery/goredis" broker := goredis.NewBroker() app := celery.NewApp( celery.WithBroker(broker), ) ``` ``` -------------------------------- ### Go to Python Task Communication (Redis) Source: https://github.com/marselester/gopher-celery/blob/main/README.md Demonstrates sending tasks from Go to a Python Celery worker using Redis. Includes commands to run the Go producer and the Celery worker, showing expected output. ```sh $ go run ./producer/ {"err":null,"msg":"task was sent using protocol v2"} {"err":null,"msg":"task was sent using protocol v1"} $ celery --app myproject worker --queues important --loglevel=debug --without-heartbeat --without-mingle ... [... WARNING/ForkPoolWorker-1] received a=fizz b=bazz [... WARNING/ForkPoolWorker-8] received a=fizz b=bazz ``` -------------------------------- ### Configure Redis Broker with Redigo Connection Pooling Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-redis.md Initializes a Redis broker using Redigo's redis.Pool for connection management. Configures maximum idle and active connections, and specifies behavior when no idle connections are available. ```go pool := &redis.Pool{ Dial: func() (redis.Conn, error) { return redis.DialURL("redis://localhost") }, MaxIdle: 10, MaxActive: 20, Wait: false, } broker := redis.NewBroker(redis.WithPool(pool)) ``` -------------------------------- ### Gopher-Celery Configuration Structure Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/configuration.md The internal Config structure holds application settings. Use Option functions to configure the application instead of accessing this structure directly. ```go type Config struct { logger log.Logger broker Broker registry *protocol.SerializerRegistry mime string protocol int maxWorkers int chain Middleware } ``` -------------------------------- ### go-redis Broker Configuration with Connection Pooling Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/brokers-goredis.md Configures a go-redis broker with internal connection pooling settings. Adjust PoolSize, MinIdleConns, and MaxRetries for optimal performance. ```go client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", PoolSize: 10, // Connections to keep in pool MinIdleConns: 5, // Minimum idle connections MaxRetries: 3, // Retry failed commands }) broker := goredis.NewBroker(goredis.WithClient(client)) ``` -------------------------------- ### Go-Redis Broker Options Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/INDEX.md Configuration options for the go-redis broker. ```APIDOC ## Go-Redis Broker Options ### Description Functions that modify the configuration of the go-redis broker during creation. ### Options - `WithClient(c *redis.Client) BrokerOption`: Sets a custom go-redis client. - `WithReceiveTimeout(timeout time.Duration) BrokerOption`: Sets the timeout for receiving messages. ``` -------------------------------- ### Configure Custom Redis Broker with Connection Pool in Go Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/usage-patterns.md Set up a Redis broker using a custom connection pool for fine-grained control over Redis connections. This is useful for managing connection parameters like passwords and timeouts. ```go import ( "github.com/gomodule/redigo/redis" celery "github.com/marselester/gopher-celery" "github.com/marselester/gopher-celery/redis" ) func main() { // Create custom connection pool pool := &redis.Pool{ Dial: func() (redis.Conn, error) { return redis.Dial( "tcp", "redis-host:6379", redis.DialPassword("your-password"), redis.DialDatabase(0), ) }, MaxIdle: 10, MaxActive: 20, } // Create broker with pool broker := redis.NewBroker( redis.WithPool(pool), redis.WithReceiveTimeout(10*time.Second), ) app := celery.NewApp( celery.WithBroker(broker), ) // Use app... } ``` -------------------------------- ### NewApp Source: https://github.com/marselester/gopher-celery/blob/main/_autodocs/app.md Creates a Celery application instance. This app is responsible for managing task registration, serialization, and worker coordination. It can be configured using various option functions. ```APIDOC ## NewApp ### Description Creates a Celery application for producing or consuming asynchronous tasks. The app manages task registration, message encoding/decoding, and worker coordination. ### Method `NewApp` ### Parameters #### Options - **options** (`...Option`) - Optional - Variable-length option functions to configure the app. ### Returns - **`*App`** - Configured Celery application. ### Defaults Redis broker on localhost, JSON serializer, protocol v2, 1000 max workers. ### Example ```go import celery "github.com/marselester/gopher-celery" app := celery.NewApp( celery.WithLogger(logger), celery.WithMaxWorkers(100), celery.WithTaskProtocol(2), ) ``` ```