# Flowneer Flowneer is a tiny, zero-dependency fluent flow builder for TypeScript designed for building AI agents and composable workflows. It provides a single `FlowBuilder` class that enables chaining sequential steps, branching on conditions, looping, batch-processing items, and running tasks in parallel—all through a clean, fluent API. The library operates on a shared state object that flows through each step, making data management straightforward. The framework is highly extensible through a plugin system that adds capabilities for tool calling, ReAct agent loops, human-in-the-loop interactions, memory management, structured output parsing, streaming, graph-based flow composition, telemetry, and more. Flowneer maps directly to common AI agent patterns while remaining lightweight and composable, making it ideal for building autonomous agents, multi-step workflows, and LLM-powered applications. ## Core API ### FlowBuilder.startWith(fn, options?) Sets the first step in the flow, resetting any prior chain. The function receives the shared state object and optional params, which it can mutate directly. Returns the builder for chaining. ```typescript import { FlowBuilder } from "flowneer"; interface State { count: number; message: string; } const flow = new FlowBuilder() .startWith(async (s) => { s.count = 0; s.message = "initialized"; }); await flow.run({ count: 0, message: "" }); // State after: { count: 0, message: "initialized" } ``` ### FlowBuilder.then(fn, options?) Appends a sequential step to the flow. Each step receives the shared state and can mutate it. Steps execute in order and support retry, delay, and timeout options. ```typescript import { FlowBuilder } from "flowneer"; interface State { value: number; doubled: number; tripled: number; } const flow = new FlowBuilder() .startWith(async (s) => { s.value = 10; }) .then(async (s) => { s.doubled = s.value * 2; }) .then(async (s) => { s.tripled = s.value * 3; }, { retries: 3, delaySec: 1, timeoutMs: 5000 }); const state: State = { value: 0, doubled: 0, tripled: 0 }; await flow.run(state); console.log(state); // { value: 10, doubled: 20, tripled: 30 } ``` ### FlowBuilder.branch(router, branches, options?) Routes execution to a named branch based on the return value of the router function. After the branch executes, the chain continues normally. ```typescript import { FlowBuilder } from "flowneer"; interface AuthState { role: string; message: string; } const flow = new FlowBuilder() .startWith(async (s) => { s.role = "admin"; }) .branch( (s) => s.role, { admin: async (s) => { s.message = "Welcome, admin! Full access granted."; }, user: async (s) => { s.message = "Welcome, user! Limited access."; }, default: async (s) => { s.message = "Access denied."; }, } ) .then(async (s) => { console.log(s.message); }); await flow.run({ role: "", message: "" }); // Output: "Welcome, admin! Full access granted." ``` ### FlowBuilder.loop(condition, body) Repeats a sub-flow while the condition function returns true. The body receives a new FlowBuilder to define the loop contents. ```typescript import { FlowBuilder } from "flowneer"; interface TickState { ticks: number; log: string[]; } const flow = new FlowBuilder() .startWith(async (s) => { s.ticks = 0; s.log = []; }) .loop( (s) => s.ticks < 3, (b) => b.startWith(async (s) => { s.ticks += 1; s.log.push(`tick ${s.ticks}`); }) ) .then(async (s) => { console.log("Done:", s.log); }); await flow.run({ ticks: 0, log: [] }); // Output: Done: ["tick 1", "tick 2", "tick 3"] ``` ### FlowBuilder.batch(items, processor, options?) Runs a sub-flow once per item extracted from the state. The current item is written to `shared.__batchItem` by default, or a custom key via options. ```typescript import { FlowBuilder } from "flowneer"; interface SumState { numbers: number[]; results: number[]; __batchItem?: number; } const flow = new FlowBuilder() .startWith(async (s) => { s.results = []; }) .batch( (s) => s.numbers, (b) => b.startWith(async (s) => { s.results.push((s.__batchItem ?? 0) * 2); }) ) .then(async (s) => { console.log(s.results); }); await flow.run({ numbers: [1, 2, 3, 4, 5], results: [] }); // Output: [2, 4, 6, 8, 10] ``` ### FlowBuilder.parallel(fns, options?, reducer?) Runs multiple functions concurrently against the same shared state. When a reducer is provided, each function receives its own shallow clone to prevent mutation races. ```typescript import { FlowBuilder } from "flowneer"; interface FetchState { posts?: any[]; users?: any[]; combined?: string; } const flow = new FlowBuilder() .parallel([ async (s) => { const res = await fetch("https://jsonplaceholder.typicode.com/posts?_limit=3"); s.posts = await res.json(); }, async (s) => { const res = await fetch("https://jsonplaceholder.typicode.com/users?_limit=3"); s.users = await res.json(); }, ]) .then(async (s) => { console.log(`Fetched ${s.posts?.length} posts and ${s.users?.length} users`); }); await flow.run({}); // Output: Fetched 3 posts and 3 users ``` ### FlowBuilder.anchor(name) and Goto Inserts a named marker that can be jumped to from any step by returning `"#anchorName"`. Enables iterative refinement loops without nesting. ```typescript import { FlowBuilder } from "flowneer"; interface RefineState { draft: string; quality: number; passes: number; } const flow = new FlowBuilder() .startWith(async (s) => { s.draft = "Initial draft"; s.quality = 0; s.passes = 0; }) .anchor("refine") .then(async (s) => { s.passes++; s.quality = Math.min(1, s.quality + 0.3); // Simulate improvement s.draft = `Draft v${s.passes} (quality: ${s.quality.toFixed(1)})`; if (s.quality < 0.8) { return "#refine"; // Jump back to the anchor } }) .then(async (s) => { console.log(`Final: ${s.draft} after ${s.passes} passes`); }); await flow.run({ draft: "", quality: 0, passes: 0 }); // Output: Final: Draft v3 (quality: 0.9) after 3 passes ``` ### FlowBuilder.run(shared, params?, options?) Executes the flow with the provided shared state. Optionally pass params accessible to all steps and an AbortSignal for cancellation. ```typescript import { FlowBuilder } from "flowneer"; interface State { result: string; } const flow = new FlowBuilder() .startWith(async (s, params) => { s.result = `Processing user: ${params.userId}`; }); // Basic run await flow.run({ result: "" }); // With params await flow.run({ result: "" }, { userId: "123" }); // With AbortSignal for cancellation const controller = new AbortController(); setTimeout(() => controller.abort(), 5000); await flow.run({ result: "" }, { userId: "456" }, { signal: controller.signal }); ``` ### FlowBuilder.stream(shared, params?, options?) An async-generator alternative to `run()` that yields StreamEvent values as the flow executes. Steps emit chunks by assigning to `shared.__stream`. ```typescript import { FlowBuilder, StreamEvent } from "flowneer"; interface StreamState { prompt: string; response: string; __stream?: (chunk: string) => void; } const flow = new FlowBuilder() .startWith(async (s) => { const tokens = ["Hello", " ", "world", "!"]; s.response = ""; for (const token of tokens) { s.response += token; s.__stream?.(token); // Emit chunk await new Promise((r) => setTimeout(r, 100)); } }); const state: StreamState = { prompt: "Hi", response: "" }; for await (const event of flow.stream(state)) { switch (event.type) { case "step:before": console.log(`Starting step ${event.meta.index}`); break; case "chunk": process.stdout.write(event.data as string); break; case "step:after": console.log(`\nStep ${event.meta.index} complete`); break; case "done": console.log("Flow finished"); break; } } // Output: Starting step 0 // Hello world! // Step 0 complete // Flow finished ``` ## Plugin System ### FlowBuilder.use(plugin) Registers a plugin by copying its methods onto the FlowBuilder prototype. Plugins add new chainable methods to extend functionality. ```typescript import { FlowBuilder, FlowneerPlugin, StepMeta } from "flowneer"; // Augment FlowBuilder interface for type safety declare module "flowneer" { interface FlowBuilder { withLogging(): this; } } // Implement the plugin const loggingPlugin: FlowneerPlugin = { withLogging(this: FlowBuilder) { (this as any)._setHooks({ beforeStep: (meta: StepMeta) => { console.log(`[LOG] Starting ${meta.type} step ${meta.index}`); }, afterStep: (meta: StepMeta) => { console.log(`[LOG] Completed ${meta.type} step ${meta.index}`); }, }); return this; }, }; // Register once FlowBuilder.use(loggingPlugin); // Use in flows const flow = new FlowBuilder<{ value: number }>() .withLogging() .startWith(async (s) => { s.value = 42; }); await flow.run({ value: 0 }); // Output: [LOG] Starting fn step 0 // [LOG] Completed fn step 0 ``` ## Tool Calling ### ToolRegistry and withTools Registers typed tools and makes them available to all steps via `shared.__tools`. Tools can be executed individually or in batch. ```typescript import { FlowBuilder } from "flowneer"; import { withTools, ToolRegistry, Tool, executeTool } from "flowneer/plugins/tools"; FlowBuilder.use(withTools); const tools: Tool[] = [ { name: "calculator", description: "Evaluate arithmetic expressions", params: { expression: { type: "string", description: "Math expression", required: true }, }, execute: async ({ expression }) => { try { return { result: eval(expression) }; // Use safe eval in production } catch { return { error: "Invalid expression" }; } }, }, { name: "weather", description: "Get weather for a city", params: { city: { type: "string", description: "City name", required: true }, }, execute: async ({ city }) => { return { city, temperature: 72, condition: "sunny" }; }, }, ]; interface State { query: string; result?: unknown; __tools?: ToolRegistry; } const flow = new FlowBuilder() .withTools(tools) .startWith(async (s) => { // Execute single tool const calcResult = await s.__tools!.execute({ name: "calculator", args: { expression: "2 + 2 * 3" }, }); console.log("Calculator:", calcResult); // Execute multiple tools concurrently const results = await s.__tools!.executeAll([ { name: "weather", args: { city: "Paris" } }, { name: "weather", args: { city: "Tokyo" } }, ]); console.log("Weather:", results); // Get tool definitions for LLM console.log("Available tools:", s.__tools!.definitions()); }); await flow.run({ query: "" }); // Output: Calculator: { result: 8 } // Weather: [{ city: "Paris", ... }, { city: "Tokyo", ... }] ``` ## ReAct Agent Loop ### withReActLoop Inserts a built-in ReAct agent loop that cycles through think, tool-call, and observe phases until completion or max iterations. ```typescript import { FlowBuilder } from "flowneer"; import { withTools, Tool, ToolRegistry } from "flowneer/plugins/tools"; import { withReActLoop, ThinkResult } from "flowneer/plugins/agent"; FlowBuilder.use(withTools); FlowBuilder.use(withReActLoop); const searchTool: Tool = { name: "search", description: "Search for information", params: { query: { type: "string", description: "Search query", required: true } }, execute: async ({ query }) => ({ results: [`Result for: ${query}`] }), }; interface AgentState { question: string; context: string[]; __tools?: ToolRegistry; __toolResults?: any[]; __reactOutput?: string; __reactExhausted?: boolean; } const flow = new FlowBuilder() .withTools([searchTool]) .withReActLoop({ maxIterations: 5, think: async (s): Promise => { // Simulate LLM deciding to search or finish if (s.context.length === 0) { return { action: "tool", calls: [{ name: "search", args: { query: s.question } }], }; } return { action: "finish", output: `Answer based on: ${s.context.join(", ")}`, }; }, onObservation: (results, s) => { // Process tool results for (const r of results) { if (r.result) s.context.push(JSON.stringify(r.result)); } }, }) .then(async (s) => { console.log("Final answer:", s.__reactOutput); if (s.__reactExhausted) console.log("Warning: max iterations reached"); }); await flow.run({ question: "What is Flowneer?", context: [] }); // Output: Final answer: Answer based on: {"results":["Result for: What is Flowneer?"]} ``` ## Human-in-the-Loop ### withHumanNode and resumeFlow Inserts a human-in-the-loop pause point. When reached, throws an InterruptError that can be caught to obtain human input before resuming. ```typescript import { FlowBuilder, InterruptError } from "flowneer"; import { withHumanNode, resumeFlow } from "flowneer/plugins/agent"; FlowBuilder.use(withHumanNode); interface DraftState { draft: string; feedback?: string; __humanPrompt?: string; } const flow = new FlowBuilder() .startWith(async (s) => { s.draft = "Here is the initial draft content..."; }) .humanNode({ prompt: "Please review and provide feedback on this draft." }) .then(async (s) => { console.log("Applying feedback:", s.feedback); s.draft = `${s.draft}\n\n[Revised based on: ${s.feedback}]`; }); const state: DraftState = { draft: "" }; try { await flow.run(state); } catch (e) { if (e instanceof InterruptError) { console.log("Prompt:", (e.savedShared as DraftState).__humanPrompt); console.log("Current draft:", (e.savedShared as DraftState).draft); // Simulate getting human input const userFeedback = "Please add more examples"; // Resume the flow with the feedback await resumeFlow(flow, e.savedShared as DraftState, { feedback: userFeedback }, 2); } } // Output: Prompt: Please review and provide feedback on this draft. // Current draft: Here is the initial draft content... // Applying feedback: Please add more examples ``` ### interruptIf Conditionally pauses the flow when a condition is met, throwing an InterruptError with a deep clone of the current state. ```typescript import { FlowBuilder, InterruptError } from "flowneer"; import { withInterrupts } from "flowneer/plugins/observability"; FlowBuilder.use(withInterrupts); interface ApprovalState { amount: number; requiresApproval: boolean; approved?: boolean; } const flow = new FlowBuilder() .startWith(async (s) => { s.requiresApproval = s.amount > 1000; }) .interruptIf((s) => s.requiresApproval) .then(async (s) => { console.log(`Processing payment of $${s.amount}`); }); try { await flow.run({ amount: 5000, requiresApproval: false }); } catch (e) { if (e instanceof InterruptError) { console.log("Approval required for:", (e.savedShared as ApprovalState).amount); // Get approval, then re-run with approved state } } // Output: Approval required for: 5000 ``` ## Multi-Agent Patterns ### supervisorCrew Creates a supervisor-workers pattern: supervisor runs first, workers run in parallel, optional post-step aggregates results. ```typescript import { FlowBuilder } from "flowneer"; import { supervisorCrew } from "flowneer/plugins/agent"; interface CrewState { task: string; plan: string[]; results: string[]; summary?: string; } const crew = supervisorCrew( // Supervisor: plan the work async (s) => { s.plan = ["research", "write", "review"]; s.results = []; console.log("Supervisor: Created plan"); }, // Workers: execute in parallel [ async (s) => { s.results.push("Research findings..."); console.log("Worker 1: Research done"); }, async (s) => { s.results.push("Written content..."); console.log("Worker 2: Writing done"); }, async (s) => { s.results.push("Review comments..."); console.log("Worker 3: Review done"); }, ], // Options: post-step for aggregation { post: async (s) => { s.summary = s.results.join(" | "); console.log("Supervisor: Aggregated results"); }, } ); await crew.run({ task: "Create report", plan: [], results: [] }); // Output: Supervisor: Created plan // Worker 1: Research done // Worker 2: Writing done // Worker 3: Review done // Supervisor: Aggregated results ``` ### sequentialCrew Creates a strict sequential pipeline where each step runs in order, passing state like a baton. ```typescript import { FlowBuilder } from "flowneer"; import { sequentialCrew } from "flowneer/plugins/agent"; interface PipelineState { input: string; researched?: string; drafted?: string; final?: string; } const pipeline = sequentialCrew([ async (s) => { s.researched = `Researched: ${s.input}`; console.log("Step 1: Research complete"); }, async (s) => { s.drafted = `Draft based on ${s.researched}`; console.log("Step 2: Draft complete"); }, async (s) => { s.final = `Final: ${s.drafted}`; console.log("Step 3: Finalization complete"); }, ]); await pipeline.run({ input: "AI Agents" }); // Output: Step 1: Research complete // Step 2: Draft complete // Step 3: Finalization complete ``` ### roundRobinDebate Creates a round-robin debate where agents take turns for a specified number of rounds. ```typescript import { FlowBuilder } from "flowneer"; import { roundRobinDebate } from "flowneer/plugins/agent"; interface DebateState { topic: string; debate: { agent: string; text: string }[]; __debateRound?: number; } const debate = roundRobinDebate( [ async (s) => { s.debate.push({ agent: "optimist", text: `Pro argument round ${s.__debateRound! + 1}` }); }, async (s) => { s.debate.push({ agent: "critic", text: `Counter argument round ${s.__debateRound! + 1}` }); }, async (s) => { s.debate.push({ agent: "synthesizer", text: `Synthesis round ${s.__debateRound! + 1}` }); }, ], 2 // 2 rounds ); const state: DebateState = { topic: "AI Safety", debate: [] }; await debate.run(state); console.log(state.debate); // Output: [ // { agent: "optimist", text: "Pro argument round 1" }, // { agent: "critic", text: "Counter argument round 1" }, // { agent: "synthesizer", text: "Synthesis round 1" }, // { agent: "optimist", text: "Pro argument round 2" }, // ... // ] ``` ## Memory Management ### BufferWindowMemory and withMemory Attaches a memory instance to `shared.__memory` for managing conversation history with a sliding window. ```typescript import { FlowBuilder } from "flowneer"; import { BufferWindowMemory, withMemory } from "flowneer/plugins/memory"; FlowBuilder.use(withMemory); interface ChatState { userInput: string; response: string; __memory?: BufferWindowMemory; } const memory = new BufferWindowMemory({ maxMessages: 10 }); const flow = new FlowBuilder() .withMemory(memory) .startWith(async (s) => { // Add user message s.__memory!.add({ role: "user", content: s.userInput }); // Get conversation context const context = s.__memory!.toContext(); console.log("Context:", context); // Simulate LLM response s.response = `Response to: ${s.userInput}`; // Add assistant message s.__memory!.add({ role: "assistant", content: s.response }); }); // First turn await flow.run({ userInput: "Hello!", response: "" }); // Context: user: Hello! // Second turn await flow.run({ userInput: "How are you?", response: "" }); // Context: user: Hello! // assistant: Response to: Hello! // user: How are you? // Check memory console.log("All messages:", memory.get()); ``` ## Output Parsing ### parseJsonOutput Extracts and parses JSON from LLM output, handling raw JSON, markdown fences, and embedded JSON in prose. ```typescript import { parseJsonOutput } from "flowneer/plugins/output"; // Raw JSON const raw = '{"name": "Alice", "age": 30}'; console.log(parseJsonOutput(raw)); // Output: { name: "Alice", age: 30 } // Markdown fenced JSON const fenced = `Here's the data: \`\`\`json {"name": "Bob", "items": [1, 2, 3]} \`\`\` That's all!`; console.log(parseJsonOutput(fenced)); // Output: { name: "Bob", items: [1, 2, 3] } // JSON embedded in prose const prose = `The analysis shows {"score": 95, "passed": true} as the result.`; console.log(parseJsonOutput(prose)); // Output: { score: 95, passed: true } // With Zod validation import { z } from "zod"; const schema = z.object({ name: z.string(), age: z.number() }); const validated = parseJsonOutput<{ name: string; age: number }>(raw, schema); console.log(validated.name); // Type-safe access ``` ### parseListOutput Parses bulleted, numbered, or newline-separated lists from LLM output. ```typescript import { parseListOutput } from "flowneer/plugins/output"; // Bullet points const bullets = `- Apple - Banana - Cherry`; console.log(parseListOutput(bullets)); // Output: ["Apple", "Banana", "Cherry"] // Numbered list const numbered = `1. First item 2. Second item 3. Third item`; console.log(parseListOutput(numbered)); // Output: ["First item", "Second item", "Third item"] // Mixed formats const mixed = `* Task A - Task B 3) Task C • Task D`; console.log(parseListOutput(mixed)); // Output: ["Task A", "Task B", "Task C", "Task D"] ``` ## Structured Output ### withStructuredOutput Validates LLM output against a schema after each step, storing the result in `shared.__structuredOutput`. ```typescript import { FlowBuilder } from "flowneer"; import { withStructuredOutput } from "flowneer/plugins/llm"; import { z } from "zod"; FlowBuilder.use(withStructuredOutput); const responseSchema = z.object({ answer: z.string(), confidence: z.number().min(0).max(1), sources: z.array(z.string()).optional(), }); interface LLMState { prompt: string; __llmOutput?: string; __structuredOutput?: z.infer; __validationError?: { message: string; raw: string; attempts: number }; } const flow = new FlowBuilder() .withStructuredOutput(responseSchema, { retries: 2 }) .startWith(async (s) => { // Simulate LLM output s.__llmOutput = JSON.stringify({ answer: "Flowneer is a flow builder for TypeScript", confidence: 0.95, sources: ["README.md", "docs"], }); }) .then(async (s) => { if (s.__validationError) { console.error("Validation failed:", s.__validationError.message); } else { console.log("Parsed:", s.__structuredOutput); console.log("Confidence:", s.__structuredOutput?.confidence); } }); await flow.run({ prompt: "What is Flowneer?" }); // Output: Parsed: { answer: "...", confidence: 0.95, sources: [...] } // Confidence: 0.95 ``` ## Observability Plugins ### withHistory Records a shallow snapshot of state after each step in `shared.__history`. ```typescript import { FlowBuilder } from "flowneer"; import { withHistory } from "flowneer/plugins/observability"; FlowBuilder.use(withHistory); interface State { value: number; __history?: Array<{ index: number; type: string; snapshot: any }>; } const flow = new FlowBuilder() .withHistory() .startWith(async (s) => { s.value = 1; }) .then(async (s) => { s.value = 2; }) .then(async (s) => { s.value = 3; }); const state: State = { value: 0 }; await flow.run(state); console.log(state.__history); // Output: [ // { index: 0, type: "fn", snapshot: { value: 1 } }, // { index: 1, type: "fn", snapshot: { value: 2 } }, // { index: 2, type: "fn", snapshot: { value: 3 } } // ] ``` ### withTiming Records wall-clock duration of each step in milliseconds. ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; FlowBuilder.use(withTiming); interface State { data: string; __timings?: Record; } const flow = new FlowBuilder() .withTiming() .startWith(async (s) => { await new Promise((r) => setTimeout(r, 100)); s.data = "step 0"; }) .then(async (s) => { await new Promise((r) => setTimeout(r, 50)); s.data = "step 1"; }); const state: State = { data: "" }; await flow.run(state); console.log(state.__timings); // Output: { "0": 102, "1": 51 } (approximate ms) ``` ## Persistence Plugins ### withCheckpoint Saves state to a store after each successful step for crash recovery. ```typescript import { FlowBuilder } from "flowneer"; import { withCheckpoint, CheckpointStore } from "flowneer/plugins/persistence"; FlowBuilder.use(withCheckpoint); // In-memory store (use Redis/DB in production) const checkpoints: Map = new Map(); const store: CheckpointStore = { save: async (stepIndex, shared) => { checkpoints.set(stepIndex, JSON.parse(JSON.stringify(shared))); console.log(`Checkpoint saved at step ${stepIndex}`); }, }; interface State { progress: number; } const flow = new FlowBuilder() .withCheckpoint(store) .startWith(async (s) => { s.progress = 25; }) .then(async (s) => { s.progress = 50; }) .then(async (s) => { s.progress = 100; }); await flow.run({ progress: 0 }); // Output: Checkpoint saved at step 0 // Checkpoint saved at step 1 // Checkpoint saved at step 2 console.log("Checkpoints:", [...checkpoints.entries()]); ``` ### withReplay Skips execution of all steps before a specified index, useful for resuming from checkpoints. ```typescript import { FlowBuilder } from "flowneer"; import { withReplay } from "flowneer/plugins/persistence"; FlowBuilder.use(withReplay); interface State { steps: string[]; } const flow = new FlowBuilder() .withReplay(2) // Skip steps 0 and 1 .startWith(async (s) => { s.steps.push("step 0"); }) .then(async (s) => { s.steps.push("step 1"); }) .then(async (s) => { s.steps.push("step 2"); }) .then(async (s) => { s.steps.push("step 3"); }); const state: State = { steps: ["restored from checkpoint"] }; await flow.run(state); console.log(state.steps); // Output: ["restored from checkpoint", "step 2", "step 3"] ``` ## Resilience Plugins ### withCircuitBreaker Opens the circuit after consecutive failures and rejects all steps until a reset timeout elapses. ```typescript import { FlowBuilder } from "flowneer"; import { withCircuitBreaker } from "flowneer/plugins/resilience"; FlowBuilder.use(withCircuitBreaker); let attempts = 0; const flow = new FlowBuilder<{ result: string }>() .withCircuitBreaker({ maxFailures: 2, resetMs: 5000 }) .startWith(async (s) => { attempts++; if (attempts < 4) { throw new Error(`Attempt ${attempts} failed`); } s.result = "Success!"; }); // First 2 attempts fail, circuit opens for (let i = 0; i < 3; i++) { try { await flow.run({ result: "" }); } catch (e) { console.log(`Run ${i + 1}:`, (e as Error).message); } } // Output: Run 1: Flow failed at step 0: Attempt 1 failed // Run 2: Flow failed at step 0: Attempt 2 failed // Run 3: Flow failed at step 0: circuit open after 2 consecutive failures ``` ### withCycles Guards against infinite anchor-jump loops by limiting the number of jumps. ```typescript import { FlowBuilder } from "flowneer"; import { withCycles } from "flowneer/plugins/resilience"; FlowBuilder.use(withCycles); interface LoopState { iterations: number; } const flow = new FlowBuilder() .withCycles(5) // Max 5 total jumps .withCycles(3, "retry") // Max 3 visits to "retry" anchor .startWith(async (s) => { s.iterations = 0; }) .anchor("retry") .then(async (s) => { s.iterations++; console.log(`Iteration ${s.iterations}`); if (s.iterations < 10) { return "#retry"; } }); try { await flow.run({ iterations: 0 }); } catch (e) { console.log("Stopped:", (e as Error).message); } // Output: Iteration 1 // Iteration 2 // Iteration 3 // Stopped: cycle limit exceeded for anchor "retry": 4 visits > limit(3) ``` ## Messaging Plugins ### withChannels Initializes a message channel system for inter-step communication. ```typescript import { FlowBuilder } from "flowneer"; import { withChannels, sendTo, receiveFrom, peekChannel } from "flowneer/plugins/messaging"; FlowBuilder.use(withChannels); interface State { __channels?: Map; } const flow = new FlowBuilder() .withChannels() .startWith(async (s) => { sendTo(s, "results", { id: 1, score: 95 }); sendTo(s, "results", { id: 2, score: 87 }); sendTo(s, "errors", { code: "E001" }); }) .then(async (s) => { // Peek without draining console.log("Peeked:", peekChannel(s, "results")); // Receive and drain const results = receiveFrom(s, "results"); console.log("Received:", results); // Second receive returns empty console.log("After drain:", receiveFrom(s, "results")); }); await flow.run({}); // Output: Peeked: [{ id: 1, score: 95 }, { id: 2, score: 87 }] // Received: [{ id: 1, score: 95 }, { id: 2, score: 87 }] // After drain: [] ``` ## Graph-Based Composition ### withGraph Describes flows as directed acyclic graphs with nodes and edges, then compiles to executable FlowBuilder chains. ```typescript import { FlowBuilder } from "flowneer"; import { withGraph } from "flowneer/plugins/graph"; FlowBuilder.use(withGraph); interface PipelineState { url: string; data?: string; parsed?: object; valid: boolean; retries: number; } const flow = (new FlowBuilder() as any) .withGraph() .addNode("fetch", async (s: PipelineState) => { console.log(`Fetching ${s.url} (attempt ${s.retries + 1})`); s.data = `Data from ${s.url}`; }) .addNode("parse", async (s: PipelineState) => { console.log("Parsing data"); s.parsed = { content: s.data }; }) .addNode("validate", async (s: PipelineState) => { s.valid = s.retries >= 1; // Simulate validation passing on retry console.log(`Validation: ${s.valid ? "passed" : "failed"}`); }) .addNode("retry", async (s: PipelineState) => { s.retries++; console.log("Retrying..."); }) .addEdge("fetch", "parse") .addEdge("parse", "validate") .addEdge("validate", "retry", (s: PipelineState) => !s.valid) // Conditional back-edge .addEdge("retry", "fetch") .compile(); await flow.run({ url: "https://api.example.com", valid: false, retries: 0 }); // Output: Fetching https://api.example.com (attempt 1) // Parsing data // Validation: failed // Retrying... // Fetching https://api.example.com (attempt 2) // Parsing data // Validation: passed ``` ## Evaluation Harness ### runEvalSuite and Scoring Functions Runs flows against labeled datasets and collects per-item scores for testing and benchmarking. ```typescript import { FlowBuilder } from "flowneer"; import { runEvalSuite, exactMatch, f1Score, containsMatch, } from "flowneer/plugins/eval"; interface TestState { input: string; expected: string; output: string; } // Simple QA flow const qaFlow = new FlowBuilder() .startWith(async (s) => { // Simulate LLM response const answers: Record = { "What is 2+2?": "4", "Capital of France?": "Paris is the capital", "Color of sky?": "blue", }; s.output = answers[s.input] ?? "unknown"; }); // Test dataset const dataset: TestState[] = [ { input: "What is 2+2?", expected: "4", output: "" }, { input: "Capital of France?", expected: "Paris", output: "" }, { input: "Color of sky?", expected: "blue", output: "" }, ]; const { results, summary } = await runEvalSuite(dataset, qaFlow, { exact: (s) => exactMatch(s.output, s.expected), contains: (s) => containsMatch(s.output, s.expected), f1: (s) => f1Score(s.output, s.expected), }); console.log("Summary:", summary); // Output: Summary: { // total: 3, // passed: 3, // failed: 0, // averages: { exact: 0.667, contains: 1.0, f1: 0.889 } // } console.log("Per-item scores:", results.map((r) => r.scores)); ``` ## Telemetry ### TelemetryDaemon Collects per-step spans with timing and exports them in batches for observability. ```typescript import { FlowBuilder } from "flowneer"; import { TelemetryDaemon, consoleExporter, otlpExporter, } from "flowneer/plugins/telemetry"; // Create daemon with console exporter const telemetry = new TelemetryDaemon({ exporter: consoleExporter, flushIntervalMs: 5000, maxBuffer: 100, }); // Or use OTLP exporter for production // const telemetry = new TelemetryDaemon({ // exporter: otlpExporter("http://localhost:4318/v1/traces"), // }); interface State { value: number; } const flow = new FlowBuilder() .startWith(async (s) => { await new Promise((r) => setTimeout(r, 50)); s.value = 1; }) .then(async (s) => { await new Promise((r) => setTimeout(r, 30)); s.value = 2; }); // Attach telemetry hooks (flow as any)._setHooks(telemetry.hooks()); await flow.run({ value: 0 }); // Output: [telemetry] ✓ fn[0] 52ms trace=abc12345 span=def67890 // [telemetry] ✓ fn[1] 31ms trace=abc12345 span=ghi11121 // Graceful shutdown await telemetry.stop(); ``` ## Error Handling ### FlowError and InterruptError FlowError wraps step failures with context; InterruptError is used for human-in-the-loop pauses. ```typescript import { FlowBuilder, FlowError, InterruptError } from "flowneer"; const flow = new FlowBuilder<{ value: number }>() .startWith(async (s) => { s.value = 1; }) .then(async () => { throw new Error("Something went wrong"); }) .then(async (s) => { s.value = 3; // Never reached }); try { await flow.run({ value: 0 }); } catch (err) { if (err instanceof FlowError) { console.log("Step that failed:", err.step); console.log("Original error:", err.cause); console.log("Full message:", err.message); } } // Output: Step that failed: step 1 // Original error: Error: Something went wrong // Full message: Flow failed at step 1: Something went wrong ``` ## LLM Cost Management ### withTokenBudget Aborts the flow before any step if token usage exceeds a limit. ```typescript import { FlowBuilder } from "flowneer"; import { withTokenBudget } from "flowneer/plugins/llm"; FlowBuilder.use(withTokenBudget); interface State { tokensUsed: number; response: string; } const flow = new FlowBuilder() .withTokenBudget(1000) .startWith(async (s) => { s.tokensUsed = 500; s.response = "First response"; }) .then(async (s) => { s.tokensUsed += 600; // Total: 1100 s.response = "Second response"; }) .then(async (s) => { s.response = "Never reached"; }); try { await flow.run({ tokensUsed: 0, response: "" }); } catch (e) { console.log((e as Error).message); } // Output: token budget exceeded: 1100 >= 1000 ``` ### withRateLimit Enforces a minimum gap between steps to avoid rate-limited API hammering. ```typescript import { FlowBuilder } from "flowneer"; import { withRateLimit } from "flowneer/plugins/llm"; FlowBuilder.use(withRateLimit); const flow = new FlowBuilder<{ calls: number }>() .withRateLimit({ intervalMs: 500 }) .startWith(async (s) => { console.log(`Step 0 at ${Date.now()}`); s.calls = 1; }) .then(async (s) => { console.log(`Step 1 at ${Date.now()}`); // ~500ms later s.calls = 2; }) .then(async (s) => { console.log(`Step 2 at ${Date.now()}`); // ~500ms after step 1 s.calls = 3; }); await flow.run({ calls: 0 }); // Steps are spaced at least 500ms apart ``` ## Development Plugins ### withDryRun Skips all step bodies while still firing hooks, useful for validating observability wiring. ```typescript import { FlowBuilder } from "flowneer"; import { withDryRun } from "flowneer/plugins/dev"; import { withTiming } from "flowneer/plugins/observability"; FlowBuilder.use(withDryRun); FlowBuilder.use(withTiming); const flow = new FlowBuilder<{ value: number; __timings?: Record }>() .withDryRun() .withTiming() .startWith(async (s) => { await new Promise((r) => setTimeout(r, 1000)); s.value = 42; // Never executed }) .then(async (s) => { s.value = 100; // Never executed }); const state = { value: 0 }; await flow.run(state); console.log("Value:", state.value); // Still 0 console.log("Timings recorded:", state.__timings); // { 0: 0, 1: 0 } ``` ## Summary Flowneer's primary use cases center around building AI agents and multi-step workflows where state flows through a sequence of operations. The fluent API makes it natural to compose complex flows from simple building blocks—chaining LLM calls, tool executions, conditional branches, and iterative refinement loops. Common patterns include ReAct-style autonomous agents that reason and act in a loop, human-in-the-loop workflows that pause for approval, multi-agent orchestration with supervisor-worker patterns, and evaluation harnesses for testing LLM-powered applications. The plugin architecture enables clean separation of concerns: observability plugins track timing and history without cluttering business logic, resilience plugins add circuit breakers and retry behavior declaratively, and persistence plugins enable checkpoint/resume for long-running flows. Flowneer integrates naturally with existing TypeScript/JavaScript ecosystems—use any LLM client library, any database for checkpoints, any observability backend for telemetry. The zero-dependency core keeps bundles small while the plugin system allows teams to adopt exactly the capabilities they need.