### watch - Main Entry Point for Visualization Source: https://context7.com/proactive-agent/langgraphics/llms.txt The `watch()` function wraps a compiled LangGraph graph to enable real-time visualization. It starts local web and WebSocket servers, opens a browser visualization, and returns a `Viewport` object that proxies graph methods and broadcasts execution events. ```APIDOC ## watch - Main Entry Point for Visualization ### Description Wraps a compiled LangGraph graph to enable real-time visualization. It extracts the graph topology, starts HTTP and WebSocket servers for the frontend, and returns a `Viewport` object that proxies all graph methods while broadcasting execution events to connected clients. ### Method `watch()` ### Parameters #### Arguments - **graph** (Compiled LangGraph) - Required - The compiled LangGraph graph to visualize. - **host** (string) - Optional - HTTP server host (default: "localhost"). - **port** (integer) - Optional - HTTP server port (default: 8764). - **ws_port** (integer) - Optional - WebSocket server port (default: 8765). - **open_browser** (boolean) - Optional - Auto-open browser (default: True). - **direction** (string) - Optional - Graph layout: "TB" (top-bottom) or "LR" (left-right) (default: "TB"). - **mode** (string) - Optional - View mode: "auto" or "manual" (default: "auto"). - **inspect** (string) - Optional - Inspector: "off", "tree", or "full" (default: "off"). - **theme** (string) - Optional - Theme: "system", "dark", or "light" (default: "system"). ### Request Example ```python from langgraph.graph import StateGraph, MessagesState, START, END from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch # Define your state and workflow class State(MessagesState): pass def chat_node(state: State) -> dict: # Your LLM logic here return {"messages": [AIMessage(content="Hello!")]} # Build the graph builder = StateGraph(State) builder.add_node("chat", chat_node) builder.add_edge(START, "chat") builder.add_edge("chat", END) graph = builder.compile() # Wrap with LangGraphics - opens browser automatically graph = watch( graph, host="localhost", # HTTP server host (default: "localhost") port=8764, # HTTP server port (default: 8764) ws_port=8765, # WebSocket server port (default: 8765) open_browser=True, # Auto-open browser (default: True) direction="TB", # Graph layout: "TB" (top-bottom) or "LR" (left-right) mode="auto", # View mode: "auto" or "manual" inspect="off", # Inspector: "off", "tree", or "full" theme="system", # Theme: "system", "dark", or "light" ) # Invoke normally - visualization happens automatically result = graph.invoke({"messages": [HumanMessage(content="Hi there!")]}) ``` ### Response - **Viewport object** - A `Viewport` object that proxies the original graph's methods and broadcasts execution events. ``` -------------------------------- ### Cyclic Graphs and Iterative Agents with LangGraphics Source: https://context7.com/proactive-agent/langgraphics/llms.txt This example illustrates how LangGraphics visualizes cyclic graphs, essential for iterative agents like those using the ReAct pattern. The `watch` decorator tracks each iteration of the loop, showing the path from START through repeated node visits to END. ```python import asyncio from typing import Annotated, TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch class IterativeState(TypedDict): messages: Annotated[list, add_messages] iteration: int max_iterations: int def think(state: IterativeState) -> dict: iteration = state["iteration"] + 1 return { "messages": [AIMessage(content=f"Thinking... (iteration {iteration})")], "iteration": iteration } def act(state: IterativeState) -> dict: return {"messages": [AIMessage(content=f"Acting on iteration {state['iteration']}")]} def check_done(state: IterativeState) -> Literal["continue", "finish"]: if state["iteration"] >= state["max_iterations"]: return "finish" return "continue" def finalize(state: IterativeState) -> dict: return {"messages": [AIMessage(content=f"Done after {state['iteration']} iterations")]} # Build cyclic graph builder = StateGraph(IterativeState) builder.add_node("think", think) builder.add_node("act", act) builder.add_node("finalize", finalize) builder.add_edge(START, "think") builder.add_edge("think", "act") builder.add_conditional_edges( "act", check_done, {"continue": "think", "finish": "finalize"} # Loop back to think or exit ) builder.add_edge("finalize", END) graph = watch(builder.compile()) async def main(): result = await graph.ainvoke({ "messages": [HumanMessage(content="Start iterative process")], "iteration": 0, "max_iterations": 3 }) # Visualization shows: START -> think -> act -> think -> act -> think -> act -> finalize -> END print(result["messages"][-1].content) asyncio.run(main()) ``` -------------------------------- ### Conditional Routing with LangGraphics Source: https://context7.com/proactive-agent/langgraphics/llms.txt This example demonstrates how to set up conditional edges in a LangGraph. The `watch` decorator visualizes the routing decisions made by the `route_by_intent` function, highlighting the execution path taken. ```python import asyncio from typing import Annotated, TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch class RouterState(TypedDict): messages: Annotated[list, add_messages] intent: str def classify_intent(state: RouterState) -> dict: content = state["messages"][-1].content.lower() if "help" in content: intent = "support" elif "buy" in content or "purchase" in content: intent = "sales" else: intent = "general" return {"intent": intent} def support_handler(state: RouterState) -> dict: return {"messages": [AIMessage(content="Routing to support team...")]} def sales_handler(state: RouterState) -> dict: return {"messages": [AIMessage(content="Connecting you with sales...")]} def general_handler(state: RouterState) -> dict: return {"messages": [AIMessage(content="How can I assist you today?")]} # Routing function def route_by_intent(state: RouterState) -> Literal["support", "sales", "general"]: return state["intent"] # Build graph with conditional routing builder = StateGraph(RouterState) builder.add_node("classifier", classify_intent) builder.add_node("support", support_handler) builder.add_node("sales", sales_handler) builder.add_node("general", general_handler) builder.add_edge(START, "classifier") builder.add_conditional_edges( "classifier", route_by_intent, { "support": "support", "sales": "sales", "general": "general" } ) builder.add_edge("support", END) builder.add_edge("sales", END) builder.add_edge("general", END) graph = watch(builder.compile()) async def main(): # Test different intents - watch visualization to see routing queries = [ "I need help with my account", "I want to buy a subscription", "What time is it?" ] for query in queries: result = await graph.ainvoke({ "messages": [HumanMessage(content=query)], "intent": "" }) print(f"Query: {query}") print(f"Response: {result['messages'][-1].content}\n") asyncio.run(main()) ``` -------------------------------- ### Visualize Graph Execution with Error Handling Source: https://context7.com/proactive-agent/langgraphics/llms.txt Demonstrates how LangGraphics visualizes errors during graph execution. Failed edges are highlighted, and error details are accessible. This setup allows for debugging and understanding execution flow even when errors occur. ```python import asyncio from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch class ErrorState(TypedDict): messages: Annotated[list, add_messages] should_fail: bool def safe_node(state: ErrorState) -> dict: return {"messages": [AIMessage(content="Safe node executed successfully")]} def risky_node(state: ErrorState) -> dict: if state["should_fail"]: raise ValueError("Intentional error for demonstration") return {"messages": [AIMessage(content="Risky node succeeded")]} def recovery_node(state: ErrorState) -> dict: return {"messages": [AIMessage(content="Recovered from error")]} # Build graph builder = StateGraph(ErrorState) builder.add_node("safe", safe_node) builder.add_node("risky", risky_node) builder.add_node("recovery", recovery_node) builder.add_edge(START, "safe") builder.add_edge("safe", "risky") builder.add_edge("risky", "recovery") builder.add_edge("recovery", END) graph = watch(builder.compile()) async def main(): # Successful run try: result = await graph.ainvoke({ "messages": [HumanMessage(content="Test successful path")], "should_fail": False }) print("Success:", result["messages"][-1].content) except Exception as e: print(f"Error: {e}") # Failing run - visualization shows error edge highlighted try: result = await graph.ainvoke({ "messages": [HumanMessage(content="Test failing path")], "should_fail": True }) except ValueError as e: print(f"Caught expected error: {e}") # Visualization shows the failed edge in red asyncio.run(main()) ``` -------------------------------- ### Define Run Lifecycle Events Source: https://context7.com/proactive-agent/langgraphics/llms.txt Messages indicating the start and end of a graph execution run. ```python run_start = {"type": "run_start", "run_id": "abc123"} run_end = {"type": "run_end", "run_id": "abc123"} ``` -------------------------------- ### Implement WebSocket Client Source: https://context7.com/proactive-agent/langgraphics/llms.txt A reference implementation for connecting to the LangGraphics WebSocket server and processing incoming events. ```python import asyncio import websockets import json async def connect_to_langgraphics(): uri = "ws://localhost:8765" async with websockets.connect(uri) as ws: # First message is always the graph topology topology = json.loads(await ws.recv()) print(f"Graph has {len(topology['nodes'])} nodes") # Subsequent messages are execution events async for message in ws: event = json.loads(message) if event["type"] == "run_start": print(f"Run started: {event['run_id']}") elif event["type"] == "edge_active": print(f"Edge: {event['source']} -> {event['target']}") elif event["type"] == "node_output": print(f"Node {event['node_id']}: {event['status']}") elif event["type"] == "run_end": print(f"Run completed: {event['run_id']}") break # asyncio.run(connect_to_langgraphics()) ``` -------------------------------- ### Invoke and Ainvoke with Viewport Source: https://context7.com/proactive-agent/langgraphics/llms.txt Demonstrates using the Viewport object returned by watch() to execute graphs synchronously and asynchronously while broadcasting events. ```python import asyncio from typing import Annotated, TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langchain_core.tools import tool from langgraph.prebuilt import ToolNode from langgraphics import watch # Define tools @tool def search_web(query: str) -> str: """Search the web for information.""" return f"Search results for: {query}" # Define state class AgentState(TypedDict): messages: Annotated[list, add_messages] # Define nodes def agent_node(state: AgentState) -> dict: last_msg = state["messages"][-1].content # Simulate LLM deciding to use tool or respond if "search" in last_msg.lower(): return {"messages": [AIMessage( content="", tool_calls=[{"name": "search_web", "args": {"query": last_msg}, "id": "call-1"}] )]} return {"messages": [AIMessage(content="I can help you with that!")]} def route_decision(state: AgentState) -> Literal["tools", "__end__"]: last_msg = state["messages"][-1] if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "tools" return "__end__" # Build graph with conditional edges builder = StateGraph(AgentState) builder.add_node("agent", agent_node) builder.add_node("tools", ToolNode([search_web])) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", route_decision, {"tools": "tools", "__end__": END}) builder.add_edge("tools", "agent") graph = watch(builder.compile()) # Synchronous invocation result = graph.invoke({"messages": [HumanMessage(content="Please search for Python tutorials")]}) print(result["messages"][-1].content) # Async invocation async def main(): result = await graph.ainvoke({"messages": [HumanMessage(content="Hello!")]}) print(result["messages"][-1].content) asyncio.run(main()) ``` -------------------------------- ### Stream and Astream Execution Source: https://context7.com/proactive-agent/langgraphics/llms.txt Demonstrates synchronous and asynchronous streaming of graph execution using the watch wrapper to maintain real-time visualization. ```python import asyncio from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch class State(TypedDict): messages: Annotated[list, add_messages] def step_one(state: State) -> dict: return {"messages": [AIMessage(content="Step 1 complete")]} def step_two(state: State) -> dict: return {"messages": [AIMessage(content="Step 2 complete")]} def step_three(state: State) -> dict: return {"messages": [AIMessage(content="Final step complete")]} builder = StateGraph(State) builder.add_node("step_one", step_one) builder.add_node("step_two", step_two) builder.add_node("step_three", step_three) builder.add_edge(START, "step_one") builder.add_edge("step_one", "step_two") builder.add_edge("step_two", "step_three") builder.add_edge("step_three", END) graph = watch(builder.compile()) # Synchronous streaming print("Streaming execution:") for chunk in graph.stream( {"messages": [HumanMessage(content="Start workflow")]}, stream_mode="updates" # Get node-by-node updates ): for node_name, output in chunk.items(): print(f" Node '{node_name}': {output}") # Async streaming async def stream_async(): async for chunk in graph.astream( {"messages": [HumanMessage(content="Start async workflow")]}, stream_mode="updates" ): for node_name, output in chunk.items(): print(f" Async node '{node_name}': {output}") asyncio.run(stream_async()) ``` -------------------------------- ### Visualize LangGraph workflows with watch() Source: https://context7.com/proactive-agent/langgraphics/llms.txt Wraps a compiled LangGraph object to enable real-time visualization in the browser. Configuration options allow customization of server ports, layout direction, and UI themes. ```python from langgraph.graph import StateGraph, MessagesState, START, END from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch # Define your state and workflow class State(MessagesState): pass def chat_node(state: State) -> dict: # Your LLM logic here return {"messages": [AIMessage(content="Hello!")]} # Build the graph builder = StateGraph(State) builder.add_node("chat", chat_node) builder.add_edge(START, "chat") builder.add_edge("chat", END) graph = builder.compile() # Wrap with LangGraphics - opens browser automatically graph = watch( graph, host="localhost", # HTTP server host (default: "localhost") port=8764, # HTTP server port (default: 8764) ws_port=8765, # WebSocket server port (default: 8765) open_browser=True, # Auto-open browser (default: True) direction="TB", # Graph layout: "TB" (top-bottom) or "LR" (left-right) mode="auto", # View mode: "auto" or "manual" inspect="off", # Inspector: "off", "tree", or "full" theme="system", # Theme: "system", "dark", or "light" ) # Invoke normally - visualization happens automatically result = graph.invoke({"messages": [HumanMessage(content="Hi there!")]}) ``` -------------------------------- ### Visualize LangGraph execution with watch Source: https://github.com/proactive-agent/langgraphics/blob/main/README.md Wrap a compiled LangGraph workflow with the watch function to enable real-time browser visualization during agent invocation. ```python from langgraph.graph import StateGraph, MessagesState from langgraphics import watch workflow = StateGraph(MessagesState) workflow.add_node(...) workflow.add_edge(...) graph = watch(workflow.compile()) await graph.ainvoke({"messages": [...]}) ``` -------------------------------- ### Fanout and Parallel Node Execution Source: https://context7.com/proactive-agent/langgraphics/llms.txt Illustrates parallel node execution where multiple nodes run concurrently and results are aggregated using state reducers. ```python import asyncio import operator from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraphics import watch class FanoutState(TypedDict): input_data: str parallel_results: Annotated[list[str], operator.add] # Reducer to collect results final_output: str async def initial_processor(state: FanoutState) -> dict: await asyncio.sleep(0.5) return {"input_data": "processed_input"} async def parallel_worker_a(state: FanoutState) -> dict: await asyncio.sleep(1.0) return {"parallel_results": ["Worker A completed"]} async def parallel_worker_b(state: FanoutState) -> dict: await asyncio.sleep(0.8) return {"parallel_results": ["Worker B completed"]} async def parallel_worker_c(state: FanoutState) -> dict: await asyncio.sleep(1.2) return {"parallel_results": ["Worker C completed"]} async def aggregator(state: FanoutState) -> dict: combined = " | ".join(state["parallel_results"]) return {"final_output": f"Aggregated: {combined}"} # Build fanout graph builder = StateGraph(FanoutState) builder.add_node("initial", initial_processor) builder.add_node("worker_a", parallel_worker_a) builder.add_node("worker_b", parallel_worker_b) builder.add_node("worker_c", parallel_worker_c) builder.add_node("aggregator", aggregator) # Create fanout pattern: initial -> [worker_a, worker_b, worker_c] -> aggregator builder.add_edge(START, "initial") builder.add_edge("initial", "worker_a") builder.add_edge("initial", "worker_b") builder.add_edge("initial", "worker_c") builder.add_edge("worker_a", "aggregator") builder.add_edge("worker_b", "aggregator") builder.add_edge("worker_c", "aggregator") builder.add_edge("aggregator", END) graph = watch(builder.compile(), direction="TB") async def main(): result = await graph.ainvoke({ "input_data": "", "parallel_results": [], "final_output": "" }) print(result["final_output"]) # Output: Aggregated: Worker A completed | Worker B completed | Worker C completed asyncio.run(main()) ``` -------------------------------- ### Viewport.invoke / Viewport.ainvoke - Synchronous and Async Invocation Source: https://context7.com/proactive-agent/langgraphics/llms.txt The `Viewport` object returned by `watch()` proxies the original graph's `invoke` and `ainvoke` methods. It broadcasts execution events to the visualization frontend, tracks node transitions, emits edge activations, and handles errors. ```APIDOC ## Viewport.invoke / Viewport.ainvoke - Synchronous and Async Invocation ### Description The `Viewport` object returned by `watch()` proxies the original graph's `invoke` and `ainvoke` methods while broadcasting execution events to the visualization frontend. It tracks node transitions, emits edge activations, and handles errors gracefully. ### Method `invoke(input)` and `ainvoke(input)` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **input** (any) - Required - The input to the graph's invoke or ainvoke method. ### Request Example ```python import asyncio from typing import Annotated, TypedDict, Literal from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langchain_core.tools import tool from langgraph.prebuilt import ToolNode from langgraphics import watch # Define tools @tool def search_web(query: str) -> str: """Search the web for information.""" return f"Search results for: {query}" # Define state class AgentState(TypedDict): messages: Annotated[list, add_messages] # Define nodes def agent_node(state: AgentState) -> dict: last_msg = state["messages"][-1].content # Simulate LLM deciding to use tool or respond if "search" in last_msg.lower(): return {"messages": [AIMessage( content="", tool_calls=[{"name": "search_web", "args": {"query": last_msg}, "id": "call-1"}] )]} return {"messages": [AIMessage(content="I can help you with that!")]} def route_decision(state: AgentState) -> Literal["tools", "__end__"]: last_msg = state["messages"][-1] if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "tools" return "__end__" # Build graph with conditional edges builder = StateGraph(AgentState) builder.add_node("agent", agent_node) builder.add_node("tools", ToolNode([search_web])) builder.add_edge(START, "agent") builder.add_conditional_edges("agent", route_decision, {"tools": "tools", "__end__": END}) builder.add_edge("tools", "agent") graph = watch(builder.compile()) # Synchronous invocation result = graph.invoke({"messages": [HumanMessage(content="Please search for Python tutorials")]}) print(result["messages"][-1].content) # Async invocation async def main(): result = await graph.ainvoke({"messages": [HumanMessage(content="Hello!")]}) print(result["messages"][-1].content) asyncio.run(main()) ``` ### Response #### Success Response (200) - **output** (any) - The output of the graph's invoke or ainvoke method. #### Response Example ```json { "messages": [ // ... message history ... ] } ``` ``` -------------------------------- ### Define Node Output Event Source: https://context7.com/proactive-agent/langgraphics/llms.txt Detailed execution data for a node, including input, output, state, and performance metrics. ```python node_output = { "type": "node_output", "run_id": "abc123", "node_id": "my_node", "node_kind": "chain", # llm, chain, tool, retriever, chat_model "status": "ok", # ok or error "input": "[{\"role\": \"human\", \"content\": \"Hello\"}]", "output": "[{\"role\": \"ai\", \"content\": \"Hi there!\"}]", "state": "{\"messages\": [...]}", "metrics": { "latency": "1s 234ms", "costs": {"cached": "0.0", "total": "0.00123"}, "tokens": {"cached": 0, "total": 150} } } ``` -------------------------------- ### Integrate Subgraph into Parent Graph Source: https://context7.com/proactive-agent/langgraphics/llms.txt Builds a parent graph that includes a compiled subgraph as a node. Use this pattern to create modular and reusable graph components. The subgraph's state is passed and its results are incorporated into the parent graph's state. ```python import asyncio from typing import Annotated, TypedDict from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langchain_core.messages import HumanMessage, AIMessage from langgraphics import watch # Subgraph state and implementation class SubgraphState(TypedDict): messages: Annotated[list, add_messages] summary: str def analyze(state: SubgraphState) -> dict: return {"messages": [AIMessage(content="Analyzing input...")]} def summarize(state: SubgraphState) -> dict: content = " ".join(m.content for m in state["messages"]) return {"summary": f"Summary: {content[:50]}..."} # Build subgraph subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node("analyze", analyze) subgraph_builder.add_node("summarize", summarize) subgraph_builder.add_edge(START, "analyze") subgraph_builder.add_edge("analyze", "summarize") subgraph_builder.add_edge("summarize", END) subgraph = subgraph_builder.compile() # Parent graph state class ParentState(TypedDict): messages: Annotated[list, add_messages] final_result: str def preprocess(state: ParentState) -> dict: return {"messages": [AIMessage(content="Preprocessing complete")]} def run_subgraph(state: ParentState) -> dict: # Run subgraph with parent state result = subgraph.invoke({"messages": state["messages"], "summary": ""}) return {"messages": [AIMessage(content=f"Subgraph result: {result['summary']}")]} def postprocess(state: ParentState) -> dict: return {"final_result": "Pipeline complete"} # Build parent graph with subgraph node builder = StateGraph(ParentState) builder.add_node("preprocess", preprocess) builder.add_node("subgraph_runner", run_subgraph) builder.add_node("postprocess", postprocess) builder.add_edge(START, "preprocess") builder.add_edge("preprocess", "subgraph_runner") builder.add_edge("subgraph_runner", "postprocess") builder.add_edge("postprocess", END) graph = watch(builder.compile()) async def main(): result = await graph.ainvoke({ "messages": [HumanMessage(content="Process this document")], "final_result": "" }) print(result["final_result"]) asyncio.run(main()) ``` -------------------------------- ### Define Graph Topology Message Source: https://context7.com/proactive-agent/langgraphics/llms.txt The initial message sent upon connection to define the graph structure, including nodes and edges. ```python graph_message = { "type": "graph", "nodes": [ {"id": "node_id", "name": "node_name", "node_type": "start|end|node"} ], "edges": [ {"id": "e0", "source": "src", "target": "tgt", "conditional": False, "label": None} ] } ``` -------------------------------- ### Define Edge Activation Event Source: https://context7.com/proactive-agent/langgraphics/llms.txt Message sent when the agent traverses a specific edge in the graph. ```python edge_active = { "type": "edge_active", "source": "node_a", "target": "node_b", "edge_id": "e1" } ``` -------------------------------- ### Define Error Event Source: https://context7.com/proactive-agent/langgraphics/llms.txt Message sent when an error occurs during edge traversal or node execution. ```python error_event = { "type": "error", "source": "failing_node", "target": "next_node", "edge_id": "e5" } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.