Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Add Docs
Task Runner Launcher
https://github.com/n8n-io/task-runner-launcher
Admin
A CLI utility that launches n8n task runners in external mode on-demand, minimizing resource use
...
Tokens:
7,823
Snippets:
66
Trust Score:
9.7
Update:
4 days ago
Context
Skills
Chat
Benchmark
87
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Task Runner Launcher The Task Runner Launcher is a CLI utility for [n8n](https://n8n.io) that launches task runners in `external` mode on demand. Rather than keeping a runner process alive continuously, the launcher impersonates a task runner by registering with n8n's task broker and sending a standing, non-expiring task offer. Only when a task actually needs to be executed does the launcher spawn the real runner process as a child, then step aside. This design minimizes idle resource consumption while keeping task execution latency low. The launcher handles the full lifecycle of external task runners: it waits for the n8n task broker to become ready, authenticates via a shared auth token to obtain single-use grant tokens, performs a WebSocket handshake to claim work, spawns the runner process with a carefully filtered environment, monitors runner health via HTTP polling, and automatically restarts the cycle after the runner exits on idle timeout or crashes. Multiple runner types (e.g., `javascript`, `python`) can be managed concurrently, each in its own goroutine. --- ## CLI Entry Point The binary accepts one or more runner type names as positional arguments. Each runner type is managed independently and concurrently. Configuration is loaded from environment variables and a JSON config file. ```sh # Single runner type ./task-runner-launcher javascript # Multiple runner types managed concurrently ./task-runner-launcher javascript python # Expected startup output: # 2024/11/29 13:37:46 INFO [launcher:js] Starting launcher goroutine... # 2024/11/29 13:37:46 DEBUG [launcher:js] Changed into working directory # 2024/11/29 13:37:46 DEBUG [launcher:js] Prepared env vars for runner # 2024/11/29 13:37:46 INFO [launcher:js] Waiting for task broker to be ready... # 2024/11/29 13:37:46 DEBUG [launcher:js] Task broker is ready # 2024/11/29 13:37:46 DEBUG [launcher:js] Fetched grant token for launcher # 2024/11/29 13:37:46 DEBUG [launcher:js] Launcher ID: fc6c24b9f764ae55 # 2024/11/29 13:37:46 DEBUG [launcher:js] Connected: ws://127.0.0.1:5679/runners/_ws?id=fc6c24b9f764ae55 # 2024/11/29 13:37:46 INFO [launcher:js] Waiting for launcher's task offer to be accepted... ``` --- ## Configuration: `LoadLauncherConfig` Loads the full launcher configuration by reading base settings from environment variables and runner-specific settings from the JSON config file at `N8N_RUNNERS_CONFIG_PATH` (default: `/etc/n8n-task-runners.json`). Validates all values, including URL formats, port ranges, and timeout integers, and returns aggregated errors if any are invalid. ```go // Environment variables consumed by LoadLauncherConfig: // N8N_RUNNERS_AUTH_TOKEN (required) shared secret with n8n // N8N_RUNNERS_TASK_BROKER_URI (default: http://127.0.0.1:5679) // N8N_RUNNERS_LAUNCHER_LOG_LEVEL (default: info) // N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT (default: 15, seconds) // N8N_RUNNERS_TASK_TIMEOUT (default: 60, seconds) // N8N_RUNNERS_LAUNCHER_HEALTH_CHECK_PORT (default: 5680) // N8N_RUNNERS_CONFIG_PATH (default: /etc/n8n-task-runners.json) // SENTRY_DSN (optional, enables Sentry error reporting) // // Any env var can also be set via a file by appending _FILE, e.g.: // N8N_RUNNERS_AUTH_TOKEN_FILE=/run/secrets/auth-token launcherConfig, err := config.LoadLauncherConfig( []string{"javascript"}, envconfig.OsLookuper(), ) if err != nil { log.Fatalf("Failed to load config: %v", err) } // launcherConfig.BaseConfig.TaskBrokerURI => "http://127.0.0.1:5679" // launcherConfig.RunnerConfigs["javascript"].Command => "node" // launcherConfig.RunnerConfigs["javascript"].Args => ["/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"] ``` --- ## Config File Format The runner config file defines per-runner-type settings: the working directory, command, arguments, health check port, environment variable allowlist, and environment overrides. ```json // /etc/n8n-task-runners.json { "task-runners": [ { "runner-type": "javascript", "workdir": "/usr/local/lib/node_modules/n8n", "command": "node", "args": [ "/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js" ], "health-check-server-port": "5681", "allowed-env": [ "N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT", "N8N_RUNNERS_TASK_TIMEOUT", "NODE_OPTIONS", "NODE_FUNCTION_ALLOW_BUILTIN", "NODE_FUNCTION_ALLOW_EXTERNAL" ], "env-overrides": { "GENERIC_TIMEZONE": "UTC" } }, { "runner-type": "python", "workdir": "/usr/local/lib/python-runner", "command": "python3", "args": ["-m", "n8n_task_runner"], "health-check-server-port": "5682", "allowed-env": ["PYTHONPATH"], "env-overrides": {} } ] } ``` --- ## `LaunchCommand.Execute` The core execution loop for a single runner type. Changes into the runner's working directory, prepares its environment, then enters an infinite loop: wait for broker readiness, fetch a launcher grant token, perform the WebSocket handshake (blocking until a task is claimed), fetch a runner grant token, spawn the runner process, monitor its health, and restart after the runner exits. ```go logger := logs.NewLogger(logs.ParseLevel("debug"), logs.GetLauncherPrefix("javascript")) cmd := commands.NewLaunchCommand(logger) if err := cmd.Execute(launcherConfig, "javascript"); err != nil { // Only fatal, non-recoverable errors are returned. // Transient broker downtime and runner crashes are handled internally. logger.Errorf("Fatal error in launcher: %v", err) os.Exit(1) } // Internal loop behavior: // 1. http.CheckUntilBrokerReady -> polls GET {brokerURI}/healthz forever // 2. http.FetchGrantToken -> POST {brokerURI}/runners/auth (with retries) // 3. ws.Handshake -> WS connect, register, offer, wait for accept // 4. http.FetchGrantToken -> second grant token for the actual runner // 5. exec.Command(...) -> spawns runner with filtered env // 6. http.ManageRunnerHealth -> polls runner /healthz every 10s; kills after 6 failures // 7. cmd.Wait() -> waits for runner exit (idle timeout or crash) // 8. → loop back to step 1 ``` --- ## `ws.Handshake` Connects to the n8n task broker via WebSocket and impersonates a runner until a task offer is accepted. Uses a non-expiring offer (`validFor: -1`). Blocks until the broker accepts the offer and the launcher defers the task, at which point the connection is closed and the function returns. Returns `errs.ErrServerDown` if the broker closes the connection unexpectedly, allowing the caller to reconnect. ```go handshakeCfg := ws.HandshakeConfig{ TaskType: "javascript", TaskBrokerServerURI: "http://127.0.0.1:5679", GrantToken: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", } err := ws.Handshake(handshakeCfg, logger) switch { case errors.Is(err, errs.ErrServerDown): // Broker restarted; back off and retry time.Sleep(5 * time.Second) case err != nil: log.Fatalf("Handshake failed permanently: %v", err) default: // Handshake complete: broker accepted our task offer and we deferred it. // Safe to launch the real runner process now. } // WebSocket message sequence: // <- broker:inforequest // -> runner:info { types: ["javascript"], name: "launcher-javascript" } // <- broker:runnerregistered // -> runner:taskoffer { taskType: "javascript", offerId: "abc123", validFor: -1 } // <- broker:taskofferaccept { taskId: "task-xyz" } // -> runner:taskdeferred { taskId: "task-xyz" } // WS connection closed ``` --- ## `http.FetchGrantToken` Exchanges the long-lived launcher auth token for a single-use grant token from the task broker's REST endpoint. The grant token is then injected into the environment of the spawned runner process. Uses `LimitedRetry` (up to 100 attempts over 60 seconds) to handle transient broker unavailability. ```go // POST {taskBrokerURI}/runners/auth // Body: {"token": "<N8N_RUNNERS_AUTH_TOKEN>"} // Response: {"data": {"token": "<single-use-grant-token>"}} grantToken, err := http.FetchGrantToken("http://127.0.0.1:5679", "my-shared-secret") if err != nil { // All retries exhausted log.Fatalf("Failed to fetch grant token: %v", err) } // Use the token as an env var for the runner process: runnerEnv = append(runnerEnv, fmt.Sprintf("N8N_RUNNERS_GRANT_TOKEN=%s", grantToken)) ``` --- ## `http.CheckUntilBrokerReady` Polls the task broker's `/healthz` endpoint indefinitely using `UnlimitedRetry` (5-second interval) until a `200 OK` response is received. This is designed to handle arbitrarily long n8n startup times, including database migrations. ```go // Blocks until GET http://127.0.0.1:5679/healthz returns HTTP 200 err := http.CheckUntilBrokerReady("http://127.0.0.1:5679", logger) if err != nil { // Only returns error if retry itself fails (should not happen with unlimited retry) log.Fatalf("Broker readiness check failed: %v", err) } // Broker is ready; proceed to fetch grant token ``` --- ## `http.InitHealthCheckServer` Starts the launcher's own health check HTTP server in a background goroutine. Exposes `GET /healthz` on the configured port, returning `{"status":"ok"}`. Used by orchestrators (e.g., Kubernetes) to verify the launcher process is alive. ```go // Starts server on port 5680 (N8N_RUNNERS_LAUNCHER_HEALTH_CHECK_PORT) http.InitHealthCheckServer("5680") // Health check endpoint: // GET http://localhost:5680/healthz // Response: 200 OK // Body: {"status":"ok"} // curl example: // curl -s http://localhost:5680/healthz // {"status":"ok"} ``` --- ## `http.ManageRunnerHealth` Monitors a running runner process by polling its `/healthz` HTTP endpoint every 10 seconds (after an initial 3-second startup delay). If the runner fails to respond 6 consecutive times, the process is forcefully killed (`SIGKILL`), triggering a restart in the main loop. Monitoring is cancelled gracefully via context when the runner exits normally. ```go ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup runnerCmd := exec.CommandContext(ctx, "node", "/path/to/runner/start.js") runnerCmd.Env = runnerEnv _ = runnerCmd.Start() // runnerServerURI: http://<RunnerHealthCheckServerHost>:<HealthCheckServerPort> go http.ManageRunnerHealth(ctx, runnerCmd, "http://127.0.0.1:5681", &wg, logger) // Wait for runner to exit (idle timeout, crash, or health-check kill) err := runnerCmd.Wait() cancel() // stop health monitoring goroutine wg.Wait() // wait for monitoring goroutine to finish // Health check parameters: // initialDelay = 3s (allows runner startup) // healthCheckInterval = 10s // healthCheckTimeout = 5s // healthCheckMaxFailures = 6 (60s total before kill) ``` --- ## `env.PrepareRunnerEnv` Builds the environment variable slice to pass to a spawned runner process. Filters the launcher's own environment through the `allowed-env` allowlist from the config file, applies `env-overrides`, and injects four mandatory runtime variables that cannot be disallowed or overridden: `N8N_RUNNERS_TASK_BROKER_URI`, `N8N_RUNNERS_GRANT_TOKEN`, `N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED`, and `N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT`. ```go runnerEnv := env.PrepareRunnerEnv(baseConfig, runnerConfig, logger) // runnerEnv will contain: // LANG=en_US.UTF-8 (default passthrough) // PATH=/usr/local/bin:/usr/bin:/bin (default passthrough) // NODE_OPTIONS=--max-old-space-size=1024 (from allowed-env, if set in launcher env) // GENERIC_TIMEZONE=UTC (from env-overrides) // N8N_RUNNERS_TASK_BROKER_URI=http://127.0.0.1:5679 (always injected) // N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED=true (always injected) // N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT=5681 (always injected) // N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT=15 (legacy, deprecated) // N8N_RUNNERS_TASK_TIMEOUT=60 (legacy, deprecated) // Note: N8N_RUNNERS_GRANT_TOKEN is appended separately just before exec // Clear a specific env var (e.g., remove stale grant token before re-loop): runnerEnv = env.Clear(runnerEnv, env.EnvVarGrantToken) ``` --- ## `retry.LimitedRetry` and `retry.UnlimitedRetry` Generic retry utilities used throughout the launcher. `LimitedRetry` gives up after 60 seconds or 100 attempts. `UnlimitedRetry` retries forever (used for broker readiness checks). Both wait 5 seconds between attempts. ```go // LimitedRetry: used for grant token fetching token, err := retry.LimitedRetry("grant-token-fetch", func() (string, error) { resp, err := http.Post(brokerURL+"/runners/auth", "application/json", body) if err != nil { return "", err } // ... decode response return grantToken, nil }) if err != nil { // exhausted 100 retries or 60 seconds return fmt.Errorf("exhausted retries: %w", err) } // UnlimitedRetry: used for broker readiness (retries forever) _, err = retry.UnlimitedRetry("readiness-check", func() (string, error) { resp, err := http.Get(brokerURL + "/healthz") if err != nil || resp.StatusCode != 200 { return "", fmt.Errorf("not ready yet") } return "", nil }) // err is always nil for UnlimitedRetry unless the retry mechanism itself breaks ``` --- ## Development Setup and Build ```sh # Clone and configure git clone https://github.com/n8n-io/task-runner-launcher cd task-runner-launcher # Create config file cat > /etc/n8n-task-runners.json <<'EOF' { "task-runners": [ { "runner-type": "javascript", "workdir": "/usr/local/lib/node_modules/n8n", "command": "node", "args": ["/usr/local/lib/node_modules/n8n/node_modules/@n8n/task-runner/dist/start.js"], "allowed-env": ["N8N_RUNNERS_AUTO_SHUTDOWN_TIMEOUT", "N8N_RUNNERS_TASK_TIMEOUT"], "env-overrides": {} } ] } EOF # Start n8n in external runner mode export N8N_RUNNERS_ENABLED=true export N8N_RUNNERS_MODE=external export N8N_RUNNERS_AUTH_TOKEN=test pnpm start # n8n >= 1.69.0 required # Build and run launcher export N8N_RUNNERS_AUTH_TOKEN=test export N8N_RUNNERS_LAUNCHER_LOG_LEVEL=debug make build make run # equivalent to: ./bin/task-runner-launcher javascript # Run tests make test # Lint make lint ``` --- ## Summary The Task Runner Launcher's primary use case is production deployments of n8n in `external` runner mode, where resource efficiency is critical. By holding a permanent WebSocket connection and standing task offer instead of keeping a full runner process alive, the launcher eliminates idle CPU and memory consumption of runner runtimes (e.g., Node.js V8 heap). The launcher is designed to be deployed as a sidecar container alongside an n8n main or worker instance, with the orchestrator (Kubernetes, Docker Compose) performing liveness checks on the launcher's `/healthz` endpoint (port 5680) and on n8n's task broker `/healthz` (port 5679). Integration follows a straightforward pattern: set `N8N_RUNNERS_ENABLED=true` and `N8N_RUNNERS_MODE=external` on the n8n instance, share the same `N8N_RUNNERS_AUTH_TOKEN` secret with the launcher, provide a JSON config file mapping runner types to their commands, and start the launcher binary with the desired runner type(s) as arguments. The launcher handles all reconnection, crash recovery, and process lifecycle automatically in a continuous loop, making it robust to broker restarts, runner crashes, and network interruptions without any external orchestration logic.