# WorkflowAgent - Natural Language to Workflow Automation WorkflowAgent is an AI-powered workflow automation platform built on Dify that enables users to generate complete workflows using natural language. The system eliminates the complexity of traditional workflow builders by allowing users to simply describe their needs in plain language, and an intelligent agent automatically constructs the workflow with appropriate nodes, connections, and configurations. Based on the ReAct (Reasoning + Acting) agent architecture, WorkflowAgent uses LLM reasoning combined with specialized tools to transform user requirements into production-ready workflows. The project follows a full-stack architecture with a Flask-based Python backend for agent processing and a Next.js React frontend. The agent engine supports multiple LLM providers (OpenAI, Anthropic Claude, Gemini) through an adapter pattern and provides real-time streaming responses via Server-Sent Events (SSE). The system includes comprehensive workflow validation, persistent conversation history, and automatic canvas rendering, making it suitable for both workflow and chatflow application modes. ## Core APIs and Functions ### 1. Agent Message Processing API Send messages to the workflow agent and receive streaming responses with real-time tool execution updates. ```python # Endpoint: POST /console/api/apps/{app_id}/workflow-agent/message # Request payload { "message": "Create a workflow that translates content between Chinese and English", "stream": true } # Python implementation (api/controllers/console/app/workflow_agent.py) from agent.service import workflow_agent_service from flask import current_app as app def _stream_response(user_id: str, app_id: str, message: str): """Streams agent responses as JSON lines""" def generate(): try: for response in workflow_agent_service.process_user_message(user_id, app_id, message): json_response = json.dumps(response, ensure_ascii=False) yield f"{json_response}\n" except Exception as e: error_response = json.dumps({ 'type': 'error', 'message': str(e) }, ensure_ascii=False) yield f"{error_response}\n" return app.response_class( generate(), mimetype='application/json', headers={'Cache-Control': 'no-cache', 'Connection': 'keep-alive'} ) # Response stream examples: # 1. Status update {"type": "status", "message": "Agent is thinking..."} # 2. Tool execution {"type": "tool_execution", "tool": "get_available_nodes", "message": "正在执行工具获取可以使用的节点"} # 3. Tool result {"type": "tool_result", "tool": "update_workflow", "success": true, "auto_refresh": true} # 4. Final message {"type": "message", "content": "Workflow has been successfully generated with 3 nodes..."} ``` ### 2. Workflow Agent Engine Core The ReAct agent loop that orchestrates tool calls and manages conversation state. ```python # File: api/agent/engine.py from agent.tools.manager import ToolManager from agent.llm_adapter import create_llm_adapter class WorkflowAgentEngine: def __init__(self, user_id: str, app_id: str): self.user_id = user_id self.app_id = app_id self.conversation_history = [] # Initialize LLM adapter with environment config self.llm_adapter = create_llm_adapter( base_url=os.getenv("AGENT_MODEL_BASE_URL"), api_key=os.getenv("AGENT_MODEL_API_KEY"), model=os.getenv("AGENT_MODEL_NAME"), api_type=os.getenv("AGENT_MODEL_TYPE", "auto") ) # Initialize tool manager self.tool_manager = ToolManager(user_id, app_id) def process_message(self, user_message: str) -> Generator[dict, None, None]: """ReAct loop: up to 10 iterations""" self.conversation_history.append({"role": "user", "content": user_message}) max_iterations = 10 iteration = 0 while iteration < max_iterations: iteration += 1 # Call LLM with system prompt, conversation history, and tools response = self.llm_adapter.call_api( system_prompt=self.get_system_prompt(), messages=self.conversation_history, tools=self.tool_manager.get_tools_schema(), max_tokens=32000, timeout=120.0 ) # Check response type if response.get("type") == "text": # Agent decided to respond to user self.conversation_history.append({ "role": "assistant", "content": response["content"] }) yield {"type": "message", "content": response["content"]} break elif response.get("type") == "tool_use": # Agent decided to use tools for tool_use in response.get("tool_uses", []): tool_name = tool_use.get("name") tool_input = tool_use.get("input", {}) yield { "type": "tool_execution", "tool": tool_name, "message": f"Executing {tool_name}..." } # Execute tool result = self.tool_manager.execute_tool(tool_name, **tool_input) # Add tool result to conversation self.conversation_history.append({ "role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_use["id"], "content": json.dumps(result)}] }) yield { "type": "tool_result", "tool": tool_name, "success": result.get("success", False), "auto_refresh": result.get("data", {}).get("auto_refresh", False) } # Continue to next iteration continue # Example usage engine = WorkflowAgentEngine(user_id="user123", app_id="app456") for event in engine.process_message("Create a sentiment analysis workflow"): print(event) ``` ### 3. Multi-LLM Adapter System Unified interface supporting OpenAI, Anthropic Claude, and Google Gemini APIs with automatic format conversion. ```python # File: api/agent/llm_adapter.py from abc import ABC, abstractmethod import httpx class LLMAdapter(ABC): """Base adapter for all LLM providers""" def call_api(self, system_prompt: str, messages: list, tools: list, max_tokens: int, timeout: float): """Universal API call with retry mechanism""" headers = self.get_headers() payload = self.format_request(system_prompt, messages, tools, max_tokens) response = httpx.post(self.base_url, json=payload, headers=headers, timeout=timeout) return self.parse_response(response.json()) class OpenAIAdapter(LLMAdapter): """OpenAI and compatible APIs""" def format_request(self, system_prompt: str, messages: list, tools: list, max_tokens: int): formatted_messages = [{"role": "system", "content": system_prompt}] # Convert Anthropic tool_use format to OpenAI tool_calls for msg in messages: if isinstance(msg.get("content"), list): tool_uses = [item for item in msg["content"] if item.get("type") == "tool_use"] if tool_uses: formatted_messages.append({ "role": "assistant", "tool_calls": [{ "id": tu["id"], "type": "function", "function": {"name": tu["name"], "arguments": json.dumps(tu["input"])} } for tu in tool_uses] }) else: formatted_messages.append(msg) return { "model": self.model, "messages": formatted_messages, "tools": [{"type": "function", "function": tool} for tool in tools], "max_tokens": max_tokens } class AnthropicAdapter(LLMAdapter): """Anthropic Claude native format""" def format_request(self, system_prompt: str, messages: list, tools: list, max_tokens: int): return { "model": self.model, "system": system_prompt, "messages": messages, "tools": tools, # Already in correct format "max_tokens": max_tokens } def get_headers(self): return { "x-api-key": self.api_key, "anthropic-version": "2023-06-01", "Content-Type": "application/json" } # Factory function with auto-detection def create_llm_adapter(base_url: str, api_key: str, model: str, api_type: str = "auto"): """Auto-detect API type from URL patterns""" if api_type == "auto": if "anthropic.com" in base_url or "claude" in base_url.lower(): api_type = "anthropic" elif "generativelanguage.googleapis.com" in base_url: api_type = "gemini" else: api_type = "openai" # Default adapters = {"openai": OpenAIAdapter, "anthropic": AnthropicAdapter, "gemini": GeminiAdapter} return adapters[api_type](base_url, api_key, model) # Configuration via environment variables adapter = create_llm_adapter( base_url="https://api.anthropic.com/v1/messages", api_key="sk-ant-...", model="claude-opus-4.5", api_type="auto" ) ``` ### 4. Update Workflow Tool Core tool that validates and saves workflow JSON to the database with multi-layer verification. ```python # File: api/agent/tools/update_workflow.py from services.workflow_service import WorkflowService class UpdateWorkflowTool(AgentTool): @property def input_schema(self): return { "type": "object", "properties": { "workflow_json": { "type": "object", "description": "Complete workflow definition with nodes and edges", "properties": { "nodes": {"type": "array"}, "edges": {"type": "array"} }, "required": ["nodes", "edges"] } }, "required": ["workflow_json"] } def execute(self, **kwargs): workflow_json = kwargs.get('workflow_json') # Layer 1: Basic structure validation validation = self._validate_workflow_json(workflow_json) if not validation["valid"]: return {"success": False, "error": validation["error"]} # Layer 2: Get app context app = db.session.query(App).filter(App.id == self.app_id).first() account = db.session.query(Account).filter(Account.id == self.user_id).first() # Layer 3: Update database using WorkflowService workflow_service = WorkflowService() current_workflow = workflow_service.get_draft_workflow(app_model=app) updated_workflow = workflow_service.sync_draft_workflow( app_model=app, graph=workflow_json, features={}, unique_hash=current_workflow.unique_hash if current_workflow else None, account=account, environment_variables=[], conversation_variables=[] ) # Layer 4: Verify save success verify_workflow = workflow_service.get_draft_workflow(app_model=app) if verify_workflow.updated_at != updated_workflow.updated_at: return {"success": False, "error": "Verification failed: timestamp mismatch"} return { "success": True, "data": { "workflow_id": updated_workflow.id, "version": updated_workflow.version, "message": "Workflow updated and verified. Auto-refreshing canvas...", "auto_refresh": True # Triggers frontend reload } } def _validate_workflow_json(self, workflow_json: dict): """Multi-layer validation""" # Check basic structure if not isinstance(workflow_json, dict): return {"valid": False, "error": "workflow_json must be an object"} nodes = workflow_json.get("nodes", []) edges = workflow_json.get("edges", []) # Validate node structure for i, node in enumerate(nodes): if not all(k in node for k in ["id", "type", "data"]): return {"valid": False, "error": f"Node {i} missing required fields"} if node["type"] != "custom": return {"valid": False, "error": f"Node {i} type must be 'custom'"} # Validate with Pydantic models node_type = node["data"].get("type") validation = self._validate_node_with_pydantic(node_type, node["data"], node["id"]) if not validation["valid"]: return validation # Validate connectivity node_ids = {n["id"] for n in nodes} for edge in edges: if edge["source"] not in node_ids or edge["target"] not in node_ids: return {"valid": False, "error": f"Edge references non-existent node"} # Validate app mode requirements app_mode = self._get_app_mode() output_types = [n["data"]["type"] for n in nodes if n["data"]["type"] in ["end", "answer"]] if app_mode == "workflow" and "answer" in output_types: return {"valid": False, "error": "workflow mode cannot use ANSWER nodes, use END instead"} if app_mode == "advanced-chat" and "end" in output_types: return {"valid": False, "error": "advanced-chat mode cannot use END nodes, use ANSWER instead"} return {"valid": True} # Example workflow JSON workflow = { "nodes": [ { "id": "start", "type": "custom", "position": {"x": 100, "y": 100}, "data": { "type": "start", "title": "Start", "variables": [{"variable": "query", "type": "text-input", "required": true}] } }, { "id": "llm_node", "type": "custom", "position": {"x": 300, "y": 100}, "data": { "type": "llm", "title": "Process", "model": {"provider": "anthropic", "name": "claude-opus-4.5", "mode": "chat"}, "prompt_template": [{"role": "user", "text": "{{#start.query#}}"}], "context": {"enabled": false} } }, { "id": "answer", "type": "custom", "position": {"x": 500, "y": 100}, "data": {"type": "answer", "title": "Answer", "answer": "{{#llm_node.text#}}"} } ], "edges": [ {"id": "e1", "source": "start", "target": "llm_node"}, {"id": "e2", "source": "llm_node", "target": "answer"} ] } ``` ### 5. Tool Manager and Available Tools Registers and executes all agent tools with unified error handling. ```python # File: api/agent/tools/manager.py class ToolManager: def __init__(self, user_id: str, app_id: str): self.user_id = user_id self.app_id = app_id self._tools = {} self._register_tools() def _register_tools(self): """Register all available tools""" tool_classes = [ GetAvailableModelsTool, # List user's configured LLM models GetAvailableNodesTool, # Get all workflow node types and schemas GetAvailableTriggerPluginsTool, # Get installed trigger plugins GetCurrentWorkflowTool, # Retrieve current workflow state UpdateWorkflowTool, # Update workflow on canvas TestWorkflowTool # Execute workflow test run ] for tool_class in tool_classes: tool_instance = tool_class(self.user_id, self.app_id) self._tools[tool_instance.name] = tool_instance def get_tools_schema(self) -> list[dict]: """Get OpenAI-compatible function schemas""" return [tool.get_schema() for tool in self._tools.values()] def execute_tool(self, name: str, **kwargs) -> dict: """Execute tool with error handling""" tool = self._tools.get(name) if not tool: return {"success": False, "error": f"Tool '{name}' not found"} try: result = tool.execute(**kwargs) return result except Exception as e: return {"success": False, "error": f"Tool execution failed: {str(e)}"} # Tool schema example (for GetAvailableNodesTool) { "name": "get_available_nodes", "description": "Get all available workflow node types with their schemas and required fields", "input_schema": { "type": "object", "properties": { "node_type": { "type": "string", "description": "Optional: filter by specific node type (e.g., 'llm', 'code', 'http-request')" } } } } # Usage in agent system prompt """ ## Available Tools - get_available_nodes: Understand all usable workflow node types and their schemas - get_available_models: Get user's configured model providers and models - get_current_workflow: View current workflow state and configuration - update_workflow: Accept complete workflow JSON and replace current canvas - test_workflow: Test if workflow runs correctly """ ``` ### 6. Frontend Hook for Agent Chat React hook managing SSE connection, message streaming, and auto-refresh. ```typescript // File: web/app/components/workflow-agent/hooks/use-workflow-agent.ts import { useState, useCallback } from 'react' export const useWorkflowAgent = (appId: string) => { const [messages, setMessages] = useState([]) const [isLoading, setIsLoading] = useState(false) const [shouldRefresh, setShouldRefresh] = useState(false) const sendMessage = useCallback(async (content: string) => { setIsLoading(true) // Add user message const userMessage: Message = { type: 'user', content, timestamp: Date.now() } setMessages(prev => [...prev, userMessage]) try { // Establish SSE connection const response = await fetch(`/console/api/apps/${appId}/workflow-agent/message`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: content, stream: true }) }) const reader = response.body?.getReader() const decoder = new TextDecoder() let assistantContent = '' let currentToolExecution = null while (true) { const { done, value } = await reader!.read() if (done) break const chunk = decoder.decode(value) const lines = chunk.split('\n').filter(line => line.trim()) for (const line of lines) { try { const event = JSON.parse(line) switch (event.type) { case 'status': // Show thinking indicator setMessages(prev => [...prev, { type: 'status', content: event.message }]) break case 'tool_execution': // Show tool being executed currentToolExecution = { type: 'tool', tool: event.tool, status: 'executing' } setMessages(prev => [...prev, currentToolExecution]) break case 'tool_result': // Update tool result if (event.auto_refresh) { setShouldRefresh(true) // Trigger canvas refresh } setMessages(prev => prev.map(msg => msg === currentToolExecution ? { ...msg, status: event.success ? 'success' : 'failed' } : msg )) break case 'message': // Final assistant message assistantContent = event.content setMessages(prev => [...prev, { type: 'assistant', content: assistantContent, timestamp: Date.now() }]) break case 'error': setMessages(prev => [...prev, { type: 'error', content: event.message }]) break } } catch (e) { console.error('Failed to parse SSE event:', e) } } } } catch (error) { setMessages(prev => [...prev, { type: 'error', content: `Request failed: ${error.message}` }]) } finally { setIsLoading(false) } }, [appId]) return { messages, isLoading, sendMessage, shouldRefresh } } // Usage in React component function WorkflowAgentChat({ appId }) { const { messages, isLoading, sendMessage, shouldRefresh } = useWorkflowAgent(appId) useEffect(() => { if (shouldRefresh) { // Reload workflow canvas window.location.reload() } }, [shouldRefresh]) return (
{messages.map((msg, i) => ( ))}
) } ``` ### 7. Agent System Prompt and Workflow Generation Rules The comprehensive system prompt that guides the agent's behavior and workflow construction logic. ```python # File: api/agent/engine.py - get_system_prompt() system_prompt = """ # Role You are a professional Dify workflow builder expert. Your task is to understand user's natural language requirements and help build, modify, or test workflows by calling available tools. # Available Tools - get_current_workflow: View current workflow state and configuration - get_available_nodes: Learn all usable workflow node types and their required/optional fields - get_available_models: Get user's configured model providers and models - update_workflow: Accept complete workflow JSON and fully replace current canvas - test_workflow: Test if workflow runs correctly - get_available_trigger_plugins: Get installed trigger plugins (only when needed) # Guiding Principles - User-oriented: Focus on meeting user needs - Simplicity: Consider if there's a simpler way to meet requirements - Autonomous: Decide whether to respond directly or call tools based on context - Progressive: Handle complex requirements in steps - Error handling: * If format error, adjust parameters and retry once * If same tool fails 2+ times consecutively, stop and tell user why * Never retry infinitely # Workflow Generation Requirements ## App Mode Rules (CRITICAL) - **workflow mode**: * Entry: START node (manual trigger) OR trigger nodes (webhook/schedule/plugin for auto-trigger) * Output: END node (must use, ANSWER forbidden) * START and trigger nodes are mutually exclusive - **advanced-chat mode**: * Entry: START node only (trigger nodes forbidden) * Output: ANSWER node (must use, END forbidden) ## Node Structure Every node must include: - id: Unique identifier - type: "custom" (ReactFlow requirement) - position: {x, y} coordinates - data.type: Functional node type (from get_available_nodes) - data.title: Display name - Required fields from get_available_nodes ## Variable Reference Rules - Format: {{#node_id.field#}} for single-level - Format: {{#node_id.field1.field2#}} for nested fields - **Determining reference paths**: * If node config has "outputs" field, use its "value_selector" as complete path * Example: value_selector ["llm_node", "structured_output", "score"] → {{#llm_node.structured_output.score#}} * If no "outputs" field, use node's default output field from documentation - Reference node IDs and field paths must exactly match workflow definitions ## Example Workflow (advanced-chat mode) { "nodes": [ { "id": "start", "type": "custom", "position": {"x": 100, "y": 100}, "data": { "type": "start", "title": "Start", "variables": [ {"variable": "query", "type": "text-input", "label": "User Input", "required": true} ] } }, { "id": "llm_process", "type": "custom", "position": {"x": 300, "y": 100}, "data": { "type": "llm", "title": "AI Process", "model": { "provider": "FROM_get_available_models", "name": "FROM_get_available_models", "mode": "chat", "completion_params": {"temperature": 0.7} }, "prompt_template": [ {"role": "system", "text": "You are an AI assistant"}, {"role": "user", "text": "{{#start.query#}}"} ], "context": {"enabled": false} } }, { "id": "answer", "type": "custom", "position": {"x": 500, "y": 100}, "data": { "type": "answer", "title": "Answer", "answer": "{{#llm_process.text#}}" } } ], "edges": [ {"id": "e1", "source": "start", "target": "llm_process", "sourceHandle": "source", "targetHandle": "target"}, {"id": "e2", "source": "llm_process", "target": "answer", "sourceHandle": "source", "targetHandle": "target"} ] } # Constraints - Strictly follow Anthropic Messages API v1 format - Only help with workflow generation/modification/testing, explicitly refuse unrelated requests - Must use nodes from get_available_nodes, cannot invent new types - Must use models from get_available_models for model configurations """ # This system prompt is sent with every LLM API call along with: # - Conversation history (user messages, tool calls, tool results) # - Tool schemas from ToolManager # - max_tokens: 32000 (to handle large workflow JSONs) ``` ## Summary and Integration WorkflowAgent transforms natural language into executable workflows through an intelligent agent architecture. The primary use case is eliminating workflow builder complexity for non-technical users—operations, sales, and marketing teams can describe their automation needs in plain language and receive a working workflow in seconds. The agent automatically selects appropriate nodes (LLM, Code, HTTP Request, Iteration, etc.), configures parameters, establishes variable references, and ensures connectivity integrity. Advanced use cases include workflow modification through conversational iteration, automated testing with generated test cases, and error-driven refinement where users paste error messages for automatic fixes. Integration patterns leverage a microservices architecture: the Flask backend exposes RESTful APIs for SSE streaming, workflow management, and conversation history; the Next.js frontend embeds the chat interface directly in the workflow canvas; and Docker Compose orchestrates all services including PostgreSQL for persistence, Redis for caching, and Weaviate for vector storage. Developers can extend the system by adding custom tools to `api/agent/tools/`, implementing new node types with Pydantic validators in `core/workflow/nodes/`, or integrating additional LLM providers through the adapter pattern. The system supports both standalone deployment and embedding as a component in larger Dify installations, with environment variable configuration for agent models, database connections, and feature flags.