Try Live
Add Docs
Rankings
Pricing
Docs
Install
Install
Docs
Pricing
More...
More...
Try Live
Rankings
Enterprise
Create API Key
Add Docs
LangGraph
https://github.com/langchain-ai/langgraph
Admin
Build resilient language agents as graphs.
Tokens:
66,344
Snippets:
245
Trust Score:
9.2
Update:
2 days ago
Context
Skills
Chat
Benchmark
85.2
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# LangGraph LangGraph is a low-level orchestration framework for building, managing, and deploying long-running, stateful agents and workflows. Built on Anthropic's Pregel model and inspired by Apache Beam, it provides durable execution, human-in-the-loop capabilities, comprehensive memory management, and production-ready deployment infrastructure. LangGraph enables developers to create sophisticated AI applications with features like automatic state persistence, resumable workflows, and seamless integration with LangChain components. The framework offers two primary APIs: the Graph API (StateGraph) for building node-based workflows with shared state, and the Functional API (@entrypoint, @task decorators) for a more Pythonic approach to defining workflows. LangGraph supports both synchronous and asynchronous execution, multiple streaming modes, checkpointing for state persistence, and includes prebuilt components like the ReAct agent pattern for rapid development of tool-calling agents. --- ## StateGraph - Building Stateful Workflows StateGraph is the core class for creating node-based workflows where nodes communicate by reading and writing to a shared state. Each node receives the current state and returns partial updates that are merged using optional reducer functions. ```python from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver # Define state with a reducer for list aggregation def add_to_list(current: list, new: int | None) -> list: if new is not None: return current + [new] return current class State(TypedDict): values: Annotated[list[int], add_to_list] total: int # Define node functions def add_value(state: State) -> dict: return {"values": len(state["values"]) + 1} def compute_total(state: State) -> dict: return {"total": sum(state["values"])} # Build the graph builder = StateGraph(State) builder.add_node("add_value", add_value) builder.add_node("compute_total", compute_total) builder.add_edge(START, "add_value") builder.add_edge("add_value", "compute_total") builder.add_edge("compute_total", END) # Compile with checkpointer for persistence graph = builder.compile(checkpointer=InMemorySaver()) # Execute with thread ID for state persistence config = {"configurable": {"thread_id": "my-thread"}} result = graph.invoke({"values": [], "total": 0}, config) print(result) # Output: {'values': [1], 'total': 1} # Continue on same thread - state is preserved result = graph.invoke({"values": [], "total": 0}, config) print(result) # Output: {'values': [1, 2], 'total': 3} ``` --- ## add_messages - Message List Reducer The add_messages function is a specialized reducer for managing conversation histories. It merges message lists, updates existing messages by ID, and supports message removal operations. ```python from typing import Annotated from typing_extensions import TypedDict from langchain_core.messages import HumanMessage, AIMessage, RemoveMessage from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages, MessagesState, REMOVE_ALL_MESSAGES # Using MessagesState (pre-built state with messages key) class State(MessagesState): summary: str def chatbot(state: State) -> dict: # Messages are automatically merged using add_messages return {"messages": [AIMessage(content="Hello! How can I help?")]} def summarize(state: State) -> dict: # Summarize and clear old messages summary = f"Conversation had {len(state['messages'])} messages" return { "summary": summary, "messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)] # Clear all messages } builder = StateGraph(State) builder.add_node("chatbot", chatbot) builder.add_node("summarize", summarize) builder.add_edge(START, "chatbot") builder.add_edge("chatbot", "summarize") builder.add_edge("summarize", END) graph = builder.compile() result = graph.invoke({ "messages": [HumanMessage(content="Hi there!", id="msg-1")], "summary": "" }) print(result) # Output: {'messages': [], 'summary': 'Conversation had 2 messages'} # Direct usage of add_messages msgs1 = [HumanMessage(content="Hello", id="1")] msgs2 = [AIMessage(content="Hi!", id="2"), HumanMessage(content="Updated", id="1")] merged = add_messages(msgs1, msgs2) # Result: [HumanMessage(content='Updated', id='1'), AIMessage(content='Hi!', id='2')] ``` --- ## Conditional Edges - Dynamic Routing Conditional edges allow dynamic routing between nodes based on state or computation results. Use add_conditional_edges to define branching logic in your workflow. ```python from typing import Literal from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict): input: str classification: str response: str def classify(state: State) -> dict: text = state["input"].lower() if "urgent" in text: return {"classification": "urgent"} elif "question" in text: return {"classification": "question"} return {"classification": "general"} def handle_urgent(state: State) -> dict: return {"response": "URGENT: Escalating immediately!"} def handle_question(state: State) -> dict: return {"response": "Let me find the answer for you."} def handle_general(state: State) -> dict: return {"response": "Thank you for your message."} # Router function returns the next node name def route_by_classification(state: State) -> Literal["urgent", "question", "general"]: return state["classification"] builder = StateGraph(State) builder.add_node("classify", classify) builder.add_node("urgent", handle_urgent) builder.add_node("question", handle_question) builder.add_node("general", handle_general) builder.add_edge(START, "classify") builder.add_conditional_edges( "classify", route_by_classification, {"urgent": "urgent", "question": "question", "general": "general"} ) builder.add_edge("urgent", END) builder.add_edge("question", END) builder.add_edge("general", END) graph = builder.compile() result = graph.invoke({"input": "This is urgent!", "classification": "", "response": ""}) print(result["response"]) # Output: URGENT: Escalating immediately! result = graph.invoke({"input": "I have a question", "classification": "", "response": ""}) print(result["response"]) # Output: Let me find the answer for you. ``` --- ## Send API - Parallel Node Execution The Send API enables map-reduce patterns by dynamically spawning multiple instances of a node with different inputs. This is useful for parallel processing of items. ```python from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.types import Send import operator class OverallState(TypedDict): subjects: list[str] jokes: Annotated[list[str], operator.add] class JokeState(TypedDict): subject: str def generate_joke(state: JokeState) -> dict: return {"jokes": [f"Why did the {state['subject']} cross the road? To get to the other side!"]} def fan_out_jokes(state: OverallState) -> list[Send]: # Create a Send for each subject - they run in parallel return [Send("generate_joke", {"subject": s}) for s in state["subjects"]] builder = StateGraph(OverallState) builder.add_node("generate_joke", generate_joke) builder.add_conditional_edges(START, fan_out_jokes, ["generate_joke"]) builder.add_edge("generate_joke", END) graph = builder.compile() result = graph.invoke({"subjects": ["chicken", "robot", "programmer"], "jokes": []}) print(result["jokes"]) # Output: [ # 'Why did the chicken cross the road? To get to the other side!', # 'Why did the robot cross the road? To get to the other side!', # 'Why did the programmer cross the road? To get to the other side!' # ] ``` --- ## Functional API - @entrypoint and @task The Functional API provides a more Pythonic way to define workflows using decorators. Tasks are parallelizable units of work, and entrypoints define the workflow entry. ```python import asyncio from langgraph.func import entrypoint, task from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command @task def fetch_data(url: str) -> dict: """Task that fetches data - runs in parallel when called multiple times.""" return {"url": url, "data": f"Content from {url}"} @task def process_data(data: dict) -> str: """Process fetched data.""" return f"Processed: {data['url']}" @entrypoint(checkpointer=InMemorySaver()) def data_pipeline(urls: list[str], *, previous: list[str] | None = None) -> list[str]: """Main workflow that orchestrates tasks.""" # Launch tasks in parallel fetch_futures = [fetch_data(url) for url in urls] # Wait for all fetches to complete fetched = [f.result() for f in fetch_futures] # Process in parallel process_futures = [process_data(data) for data in fetched] results = [f.result() for f in process_futures] # Combine with previous results if any if previous: results = previous + results return results # Execute the workflow config = {"configurable": {"thread_id": "pipeline-1"}} result = data_pipeline.invoke( ["https://api.example.com/1", "https://api.example.com/2"], config ) print(result) # Output: ['Processed: https://api.example.com/1', 'Processed: https://api.example.com/2'] # Continue with same thread - previous results are preserved result = data_pipeline.invoke(["https://api.example.com/3"], config) print(result) # Output: ['Processed: https://api.example.com/1', 'Processed: https://api.example.com/2', 'Processed: https://api.example.com/3'] ``` --- ## entrypoint.final - Decoupling Return and Save Values The entrypoint.final class allows returning a different value to the caller than what is saved to the checkpoint. Useful for maintaining internal state separate from output. ```python from typing import Any from langgraph.func import entrypoint from langgraph.checkpoint.memory import InMemorySaver @entrypoint(checkpointer=InMemorySaver()) def counter( increment: int, *, previous: int | None = None, ) -> entrypoint.final[int, int]: """Returns current count, saves new count for next invocation.""" current_count = previous or 0 new_count = current_count + increment # Return the current count to caller, but save new_count for next time return entrypoint.final(value=current_count, save=new_count) config = {"configurable": {"thread_id": "counter-1"}} # First call - returns 0 (initial), saves 5 result = counter.invoke(5, config) print(f"Returned: {result}") # Returned: 0 # Second call - returns 5 (previous save), saves 8 result = counter.invoke(3, config) print(f"Returned: {result}") # Returned: 5 # Third call - returns 8 (previous save), saves 18 result = counter.invoke(10, config) print(f"Returned: {result}") # Returned: 8 ``` --- ## interrupt() - Human-in-the-Loop The interrupt function pauses graph execution and surfaces a value to the client for human review or input. Resume execution using the Command primitive. ```python from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver from langgraph.types import interrupt, Command class State(TypedDict): proposal: str approved: bool final_result: str def generate_proposal(state: State) -> dict: return {"proposal": "Proposal: Increase budget by 20%"} def human_review(state: State) -> dict: # This will pause execution and send the proposal for review decision = interrupt({ "question": "Do you approve this proposal?", "proposal": state["proposal"], "options": ["approve", "reject"] }) return {"approved": decision == "approve"} def finalize(state: State) -> dict: if state["approved"]: return {"final_result": "Proposal approved and implemented!"} return {"final_result": "Proposal rejected."} builder = StateGraph(State) builder.add_node("generate", generate_proposal) builder.add_node("review", human_review) builder.add_node("finalize", finalize) builder.add_edge(START, "generate") builder.add_edge("generate", "review") builder.add_edge("review", "finalize") builder.add_edge("finalize", END) graph = builder.compile(checkpointer=InMemorySaver()) config = {"configurable": {"thread_id": "approval-flow"}} # Start the workflow - will pause at interrupt for chunk in graph.stream({"proposal": "", "approved": False, "final_result": ""}, config): print(chunk) # Output includes: {'__interrupt__': (Interrupt(value={'question': 'Do you approve...'}),)} # Resume with human decision for chunk in graph.stream(Command(resume="approve"), config): print(chunk) # Output: {'finalize': {'final_result': 'Proposal approved and implemented!'}} ``` --- ## Command - Control Flow and State Updates The Command class combines state updates with control flow, allowing nodes to both update state and direct navigation to specific nodes. ```python from typing import Literal from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.types import Command class State(TypedDict): value: int path: list[str] def router(state: State) -> Command[Literal["double", "triple", "done"]]: value = state["value"] path = state["path"] + ["router"] if value >= 100: return Command(goto="done", update={"path": path}) elif value % 2 == 0: return Command(goto="double", update={"path": path}) else: return Command(goto="triple", update={"path": path}) def double_value(state: State) -> Command[Literal["router"]]: return Command( goto="router", update={"value": state["value"] * 2, "path": state["path"] + ["double"]} ) def triple_value(state: State) -> Command[Literal["router"]]: return Command( goto="router", update={"value": state["value"] * 3, "path": state["path"] + ["triple"]} ) def done(state: State) -> dict: return {"path": state["path"] + ["done"]} builder = StateGraph(State) builder.add_node("router", router, destinations=("double", "triple", "done")) builder.add_node("double", double_value, destinations=("router",)) builder.add_node("triple", triple_value, destinations=("router",)) builder.add_node("done", done) builder.add_edge(START, "router") builder.add_edge("done", END) graph = builder.compile() result = graph.invoke({"value": 5, "path": []}) print(f"Final value: {result['value']}") # Final value: 135 (5->15->45->135) print(f"Path: {result['path']}") # Path: ['router', 'triple', 'router', 'triple', 'router', 'triple', 'router', 'done'] ``` --- ## InMemorySaver - Checkpointing for Development InMemorySaver provides in-memory checkpoint storage for development and testing. For production, use PostgresSaver or other persistent backends. ```python from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver class State(TypedDict): messages: list[str] step: int def process(state: State) -> dict: return { "messages": state["messages"] + [f"Step {state['step']} completed"], "step": state["step"] + 1 } # Create checkpointer memory = InMemorySaver() builder = StateGraph(State) builder.add_node("process", process) builder.add_edge(START, "process") builder.add_edge("process", END) graph = builder.compile(checkpointer=memory) # Thread-based execution config = {"configurable": {"thread_id": "workflow-1"}} # First run result = graph.invoke({"messages": [], "step": 1}, config) print(result) # Output: {'messages': ['Step 1 completed'], 'step': 2} # Second run on same thread - continues from checkpoint result = graph.invoke({"messages": [], "step": 1}, config) # Input is ignored, uses checkpoint print(result) # Output: {'messages': ['Step 1 completed', 'Step 2 completed'], 'step': 3} # Get state snapshot snapshot = graph.get_state(config) print(f"Current step: {snapshot.values['step']}") print(f"Messages: {snapshot.values['messages']}") # List all checkpoints for checkpoint in memory.list(config): print(f"Checkpoint ID: {checkpoint.config['configurable']['checkpoint_id']}") ``` --- ## Runtime - Injecting Context and Store The Runtime class provides access to run-scoped context, store, and stream writer within nodes. Define a context_schema to type the runtime context. ```python from dataclasses import dataclass from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.runtime import Runtime from langgraph.store.memory import InMemoryStore @dataclass class UserContext: user_id: str permissions: list[str] class State(TypedDict): query: str response: str def authorized_handler(state: State, runtime: Runtime[UserContext]) -> dict: """Node that uses runtime context and store.""" user_id = runtime.context.user_id permissions = runtime.context.permissions # Check permissions if "admin" not in permissions: return {"response": f"Access denied for user {user_id}"} # Use store for persistence if runtime.store: # Store the query runtime.store.put(("queries", user_id), state["query"], {"query": state["query"]}) # Get user preferences prefs = runtime.store.get(("preferences",), user_id) pref_str = prefs.value if prefs else "default" else: pref_str = "no-store" return {"response": f"Processed '{state['query']}' for {user_id} with prefs: {pref_str}"} # Set up store with initial data store = InMemoryStore() store.put(("preferences",), "user-123", {"theme": "dark"}) builder = StateGraph(State, context_schema=UserContext) builder.add_node("handler", authorized_handler) builder.add_edge(START, "handler") builder.add_edge("handler", END) graph = builder.compile(store=store) # Invoke with context result = graph.invoke( {"query": "Get report", "response": ""}, context=UserContext(user_id="user-123", permissions=["admin", "read"]) ) print(result["response"]) # Output: Processed 'Get report' for user-123 with prefs: {'theme': 'dark'} # Different user without admin permissions result = graph.invoke( {"query": "Get report", "response": ""}, context=UserContext(user_id="user-456", permissions=["read"]) ) print(result["response"]) # Output: Access denied for user user-456 ``` --- ## Streaming - Multiple Stream Modes LangGraph supports multiple streaming modes: "values", "updates", "messages", "custom", "debug", and "checkpoints". Each provides different granularity of output. ```python from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.types import StreamWriter import operator class State(TypedDict): numbers: Annotated[list[int], operator.add] total: int def add_numbers(state: State, writer: StreamWriter) -> dict: """Node that uses custom streaming.""" total = 0 for n in [1, 2, 3]: total += n # Write custom stream data writer({"progress": f"Added {n}, running total: {total}"}) return {"numbers": [1, 2, 3], "total": total} def finalize(state: State) -> dict: return {"total": state["total"] * 2} builder = StateGraph(State) builder.add_node("add", add_numbers) builder.add_node("finalize", finalize) builder.add_edge(START, "add") builder.add_edge("add", "finalize") builder.add_edge("finalize", END) graph = builder.compile() # Stream mode: "updates" - only node outputs print("=== Updates Mode ===") for chunk in graph.stream({"numbers": [], "total": 0}, stream_mode="updates"): print(chunk) # Output: # {'add': {'numbers': [1, 2, 3], 'total': 6}} # {'finalize': {'total': 12}} # Stream mode: "values" - full state after each step print("\n=== Values Mode ===") for chunk in graph.stream({"numbers": [], "total": 0}, stream_mode="values"): print(chunk) # Output: # {'numbers': [], 'total': 0} # {'numbers': [1, 2, 3], 'total': 6} # {'numbers': [1, 2, 3], 'total': 12} # Stream mode: "custom" - only custom writer output print("\n=== Custom Mode ===") for chunk in graph.stream({"numbers": [], "total": 0}, stream_mode="custom"): print(chunk) # Output: # {'progress': 'Added 1, running total: 1'} # {'progress': 'Added 2, running total: 3'} # {'progress': 'Added 3, running total: 6'} # Multiple stream modes at once print("\n=== Multiple Modes ===") for chunk in graph.stream({"numbers": [], "total": 0}, stream_mode=["updates", "custom"]): print(f"Type: {chunk[0]}, Data: {chunk[1]}") ``` --- ## RetryPolicy - Automatic Retry on Failure Configure automatic retries for nodes that may fail transiently. RetryPolicy supports exponential backoff, jitter, and custom retry conditions. ```python from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.types import RetryPolicy import random class State(TypedDict): attempts: int result: str # Custom retry condition def should_retry(error: Exception) -> bool: return isinstance(error, (ConnectionError, TimeoutError)) # Create retry policy retry_policy = RetryPolicy( initial_interval=0.5, # Start with 0.5 second delay backoff_factor=2.0, # Double the delay each retry max_interval=30.0, # Cap at 30 seconds max_attempts=5, # Try up to 5 times jitter=True, # Add randomness to prevent thundering herd retry_on=should_retry # Custom condition ) def unreliable_service(state: State) -> dict: """Simulates an unreliable external service.""" state["attempts"] = state.get("attempts", 0) + 1 # Fail 70% of the time for first 3 attempts if state["attempts"] < 4 and random.random() < 0.7: raise ConnectionError(f"Service unavailable (attempt {state['attempts']})") return { "attempts": state["attempts"], "result": f"Success after {state['attempts']} attempts" } builder = StateGraph(State) builder.add_node( "service", unreliable_service, retry_policy=retry_policy # Attach retry policy to node ) builder.add_edge(START, "service") builder.add_edge("service", END) graph = builder.compile() # Multiple retry policies can be chained - first matching one is used multi_policy = [ RetryPolicy(max_attempts=2, retry_on=TimeoutError), RetryPolicy(max_attempts=5, retry_on=ConnectionError), ] builder2 = StateGraph(State) builder2.add_node("service", unreliable_service, retry_policy=multi_policy) builder2.add_edge(START, "service") builder2.add_edge("service", END) ``` --- ## Subgraphs - Composing Graphs Graphs can be composed by adding compiled graphs as nodes in a parent graph. State is automatically mapped between parent and child graphs. ```python from typing import Annotated from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END import operator # Child graph state class ChildState(TypedDict): value: int child_result: str def child_process(state: ChildState) -> dict: return {"child_result": f"Processed value: {state['value'] * 2}"} # Build child graph child_builder = StateGraph(ChildState) child_builder.add_node("process", child_process) child_builder.add_edge(START, "process") child_builder.add_edge("process", END) child_graph = child_builder.compile() # Parent graph state - must include child's input/output keys class ParentState(TypedDict): value: int child_result: str final_result: str def prepare(state: ParentState) -> dict: return {"value": state["value"] + 10} def finalize(state: ParentState) -> dict: return {"final_result": f"Final: {state['child_result']}"} # Build parent graph with child as subgraph parent_builder = StateGraph(ParentState) parent_builder.add_node("prepare", prepare) parent_builder.add_node("child", child_graph) # Add compiled graph as node parent_builder.add_node("finalize", finalize) parent_builder.add_edge(START, "prepare") parent_builder.add_edge("prepare", "child") parent_builder.add_edge("child", "finalize") parent_builder.add_edge("finalize", END) parent_graph = parent_builder.compile() result = parent_graph.invoke({"value": 5, "child_result": "", "final_result": ""}) print(result) # Output: { # 'value': 15, # 5 + 10 # 'child_result': 'Processed value: 30', # 15 * 2 # 'final_result': 'Final: Processed value: 30' # } ``` --- ## create_react_agent - Prebuilt Tool-Calling Agent create_react_agent provides a ready-to-use ReAct agent that iteratively calls tools until a task is complete. Note: This is deprecated in favor of langchain.agents.create_agent. ```python from langchain_core.tools import tool from langgraph.prebuilt import create_react_agent from langgraph.checkpoint.memory import InMemorySaver # Define tools @tool def get_weather(location: str) -> str: """Get the current weather for a location.""" # Simulated weather data weather_data = { "san francisco": "Foggy, 62F", "new york": "Sunny, 75F", "london": "Rainy, 55F" } return weather_data.get(location.lower(), f"Weather data not available for {location}") @tool def search_web(query: str) -> str: """Search the web for information.""" return f"Search results for '{query}': Found 10 relevant articles." # Create the agent (note: requires langchain and a chat model) # Using string format "provider:model" requires langchain installed agent = create_react_agent( model="openai:gpt-4", # Or pass a ChatModel instance directly tools=[get_weather, search_web], prompt="You are a helpful assistant. Always be concise.", checkpointer=InMemorySaver(), debug=False ) # Invoke the agent config = {"configurable": {"thread_id": "agent-session-1"}} result = agent.invoke( {"messages": [{"role": "user", "content": "What's the weather in San Francisco?"}]}, config ) # Stream agent responses for chunk in agent.stream( {"messages": [{"role": "user", "content": "Search for LangGraph tutorials"}]}, config, stream_mode="updates" ): print(chunk) ``` --- ## ToolNode - Executing Tool Calls ToolNode handles execution of tool calls from AI messages. It automatically maps tool calls to the appropriate tool functions and returns ToolMessages. ```python from langchain_core.messages import AIMessage, ToolMessage, HumanMessage from langchain_core.tools import tool from langgraph.prebuilt import ToolNode, tools_condition from langgraph.graph import StateGraph, START, END from langgraph.graph.message import MessagesState @tool def multiply(a: int, b: int) -> int: """Multiply two numbers together.""" return a * b @tool def add(a: int, b: int) -> int: """Add two numbers together.""" return a + b # Create ToolNode with tools tool_node = ToolNode([multiply, add]) # Simulate an AI message with tool calls ai_message = AIMessage( content="", tool_calls=[ {"id": "call_1", "name": "multiply", "args": {"a": 5, "b": 3}}, {"id": "call_2", "name": "add", "args": {"a": 10, "b": 7}} ] ) # Build a graph with tool node def call_model(state: MessagesState) -> dict: # In practice, this would call an LLM # Here we return a pre-made AI message with tool calls last_message = state["messages"][-1] if isinstance(last_message, HumanMessage): return {"messages": [ai_message]} return {"messages": [AIMessage(content="Results computed!")]} builder = StateGraph(MessagesState) builder.add_node("agent", call_model) builder.add_node("tools", tool_node) builder.add_edge(START, "agent") builder.add_conditional_edges( "agent", tools_condition, # Routes to "tools" if tool_calls present, else END {"tools": "tools", "__end__": END} ) builder.add_edge("tools", "agent") graph = builder.compile() result = graph.invoke({"messages": [HumanMessage(content="Calculate 5*3 and 10+7")]}) for msg in result["messages"]: print(f"{msg.__class__.__name__}: {msg.content}") # Output: # HumanMessage: Calculate 5*3 and 10+7 # AIMessage: # ToolMessage: 15 # ToolMessage: 17 # AIMessage: Results computed! ``` --- ## Time Travel - State History Navigation With checkpointing enabled, you can navigate through the state history, replay from any checkpoint, or fork execution from a past state. ```python from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import InMemorySaver class State(TypedDict): counter: int history: list[str] def increment(state: State) -> dict: new_count = state["counter"] + 1 return { "counter": new_count, "history": state["history"] + [f"Incremented to {new_count}"] } memory = InMemorySaver() builder = StateGraph(State) builder.add_node("increment", increment) builder.add_edge(START, "increment") builder.add_edge("increment", END) graph = builder.compile(checkpointer=memory) config = {"configurable": {"thread_id": "time-travel-demo"}} # Run multiple times to build history for i in range(3): result = graph.invoke({"counter": 0, "history": []}, config) print(f"Run {i+1}: counter = {result['counter']}") # Get current state current_state = graph.get_state(config) print(f"\nCurrent state: {current_state.values}") print(f"Checkpoint ID: {current_state.config['configurable']['checkpoint_id']}") # List all checkpoints (history) print("\n=== State History ===") checkpoints = list(memory.list(config)) for i, cp in enumerate(checkpoints): print(f"{i}: ID={cp.config['configurable']['checkpoint_id'][:8]}... counter={cp.checkpoint['channel_values'].get('counter', 'N/A')}") # Travel back to a previous checkpoint if len(checkpoints) > 1: old_checkpoint = checkpoints[-2] # Second to last old_config = old_checkpoint.config # Get state at that checkpoint past_state = graph.get_state(old_config) print(f"\nPast state: {past_state.values}") # Fork from past state with new thread fork_config = {"configurable": {"thread_id": "forked-timeline"}} # Update state to match past checkpoint graph.update_state(fork_config, past_state.values) # Continue from forked state forked_result = graph.invoke({"counter": 0, "history": []}, fork_config) print(f"Forked result: {forked_result}") ``` --- ## Graph Visualization Compiled graphs can be visualized using get_graph() and various drawing methods to understand the workflow structure. ```python from typing import Literal from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END class State(TypedDict): value: str route: str def process_a(state: State) -> dict: return {"value": state["value"] + "_A"} def process_b(state: State) -> dict: return {"value": state["value"] + "_B"} def router(state: State) -> Literal["a", "b"]: return "a" if len(state["value"]) < 5 else "b" builder = StateGraph(State) builder.add_node("process_a", process_a) builder.add_node("process_b", process_b) builder.add_conditional_edges(START, router, {"a": "process_a", "b": "process_b"}) builder.add_edge("process_a", END) builder.add_edge("process_b", END) graph = builder.compile() # Get graph structure graph_structure = graph.get_graph() print("Nodes:", list(graph_structure.nodes)) print("Edges:", list(graph_structure.edges)) # Generate ASCII representation print("\n=== ASCII Graph ===") print(graph.get_graph().draw_ascii()) # Generate Mermaid diagram syntax print("\n=== Mermaid Diagram ===") print(graph.get_graph().draw_mermaid()) # For PNG output (requires graphviz): # png_data = graph.get_graph().draw_png() # with open("graph.png", "wb") as f: # f.write(png_data) # For interactive display in Jupyter: # from IPython.display import Image, display # display(Image(graph.get_graph().draw_mermaid_png())) ``` --- LangGraph excels at building complex, stateful AI applications that require reliability, human oversight, and sophisticated control flow. The framework is particularly well-suited for conversational agents with memory, multi-step reasoning workflows, document processing pipelines, and any application requiring durable execution with automatic state persistence. Its integration with LangChain provides access to a rich ecosystem of LLM providers, tools, and utilities. Common integration patterns include: combining StateGraph with external APIs and databases through the Runtime context, using the functional API for data processing pipelines, implementing approval workflows with interrupt(), building multi-agent systems with subgraphs, and deploying production agents with PostgresSaver for persistent checkpointing. The framework's streaming capabilities enable real-time user interfaces, while retry policies and checkpointing ensure robustness against transient failures.