2026年3月15日 7 分钟阅读

LangGraph 完全指南:构建多智能体协作工作流的实战教程

tinyash 0 条评论
langgraph

引言

在 AI 智能体开发领域,单个模型的能力往往有限。当面对复杂任务时,我们需要多个 AI 智能体协作完成——就像人类团队分工合作一样。LangGraph 正是为此而生的开源框架,它让开发者能够轻松构建状态化、多参与者的 AI 应用工作流。

本文将带你从零开始学习 LangGraph,通过实际案例掌握如何构建高效的多智能体协作系统。无论你是想打造自动化客服、智能代码审查系统,还是复杂的数据分析管道,LangGraph 都能提供强大的支持。

本文你将学到:

  • LangGraph 的核心概念和工作原理
  • 如何安装配置 LangGraph 开发环境
  • 构建第一个多智能体工作流
  • 实战案例:智能客服系统
  • 高级技巧:状态管理、条件路由、人机协作
  • 生产环境部署最佳实践

什么是 LangGraph?

LangGraph 是由 LangChain 团队开发的开源库,专门用于构建有状态的、多参与者的 AI 应用。它基于图(Graph)的概念,将 AI 工作流建模为节点(Nodes)和边(Edges)的组合。

核心特点

  1. 状态化管理:自动追踪工作流执行过程中的所有状态变化
  2. 循环支持:原生支持循环工作流,适合迭代式任务
  3. 多智能体协作:轻松编排多个 AI 智能体协同工作
  4. 人机交互:内置人类审批和干预机制
  5. 可观测性:完整的执行轨迹记录,便于调试和监控

适用场景

  • 复杂任务分解:将大型任务拆分为多个子任务,由不同智能体处理
  • 多轮对话系统:需要记忆上下文的多轮交互应用
  • 审批工作流:需要人工审核关键决策的自动化流程
  • 数据管道:多步骤的数据处理和转换任务
  • 智能客服:根据问题类型路由到不同专家智能体

环境搭建与安装

前置要求

  • Python 3.9 或更高版本
  • pip 包管理器
  • 一个 LLM API 密钥(OpenAI、Anthropic 等)

安装步骤

# 创建虚拟环境(推荐)
python -m venv langgraph-env
source langgraph-env/bin/activate  # Linux/macOS
# 或
langgraph-env\Scripts\activate  # Windows
# 安装 LangGraph 及相关依赖
pip install langgraph langchain langchain-openai langchain-anthropic
# 安装可选的可视化工具
pip install langgraph-cli

配置 API 密钥

# 设置环境变量
export OPENAI_API_KEY="your-openai-api-key"
# 或使用 Anthropic
export ANTHROPIC_API_KEY="your-anthropic-api-key"

LangGraph 核心概念

在深入代码之前,让我们理解 LangGraph 的几个核心概念:

1. 图(Graph)

图是 LangGraph 的基本结构,由节点和边组成。它定义了工作流的执行路径。

2. 节点(Node)

节点是工作流中的执行单元,通常是一个函数或智能体。每个节点接收状态输入,处理后返回状态更新。

3. 边(Edge)

边定义了节点之间的连接关系,控制执行流程。分为两种类型:

  • 普通边:直接连接到下一个节点
  • 条件边:根据条件动态决定下一个节点

4. 状态(State)

状态是贯穿整个工作流的数据结构,所有节点都可以读取和修改状态。通常使用 TypedDict 或 Pydantic 模型定义。

5. 检查点(Checkpoint)

检查点机制允许工作流在任意时刻保存状态,支持断点续执行和长期记忆。


构建第一个 LangGraph 工作流

让我们从一个简单的例子开始:创建一个能够分析用户问题并给出建议的智能助手。

步骤 1:定义状态结构

from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph
import operator
# 定义消息类型
class Message(TypedDict):
    role: str
    content: str
# 定义工作流状态
class AgentState(TypedDict):
    messages: Annotated[List[Message], operator.add]
    current_step: str
    analysis_result: str
    final_response: str

步骤 2:创建节点函数

from langchain_openai import ChatOpenAI
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
def analyze_question(state: AgentState) -> AgentState:
    """分析用户问题类型"""
    messages = state["messages"]
    last_message = messages[-1]["content"]
    
    prompt = f"""分析以下用户问题的类型:
    问题:{last_message}
    
    请判断这是以下哪种类型:
    - 技术咨询
    - 代码问题
    - 产品使用
    - 其他
    
    只返回类型名称。"""
    
    response = llm.invoke(prompt)
    
    return {
        "current_step": "analysis",
        "analysis_result": response.content
    }
def technical_response(state: AgentState) -> AgentState:
    """处理技术咨询"""
    messages = state["messages"]
    last_message = messages[-1]["content"]
    
    prompt = f"""作为技术专家,回答以下问题:
    {last_message}
    
    提供详细、准确的技术解答。"""
    
    response = llm.invoke(prompt)
    
    return {
        "current_step": "technical_response",
        "final_response": response.content
    }
def general_response(state: AgentState) -> AgentState:
    """处理一般问题"""
    messages = state["messages"]
    last_message = messages[-1]["content"]
    
    prompt = f"""友好地回答以下问题:
    {last_message}
    
    保持回答简洁有帮助。"""
    
    response = llm.invoke(prompt)
    
    return {
        "current_step": "general_response",
        "final_response": response.content
    }

步骤 3:构建图结构

def route_question(state: AgentState) -> str:
    """根据分析结果路由到不同节点"""
    analysis = state.get("analysis_result", "").lower()
    
    if "技术" in analysis or "代码" in analysis:
        return "technical"
    else:
        return "general"
# 创建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("analyze", analyze_question)
workflow.add_node("technical", technical_response)
workflow.add_node("general", general_response)
# 设置入口点
workflow.set_entry_point("analyze")
# 添加条件边
workflow.add_conditional_edges(
    "analyze",
    route_question,
    {
        "technical": "technical",
        "general": "general"
    }
)
# 编译图
app = workflow.compile()

步骤 4:运行工作流

# 初始状态
initial_state = {
    "messages": [{"role": "user", "content": "Python 中如何装饰器?"}],
    "current_step": "",
    "analysis_result": "",
    "final_response": ""
}
# 执行工作流
result = app.invoke(initial_state)
print(result["final_response"])

实战案例:多智能体客服系统

现在让我们构建一个更复杂的实际应用——一个由多个专家智能体组成的客服系统。

系统架构

用户问题 → 分类器 → [技术专家 | 销售顾问 | 售后支持] → 质量检查 → 回复用户
                              ↑
                          人工审核(可选)

完整实现代码

from typing import TypedDict, List, Annotated, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import operator
from datetime import datetime
# ============ 状态定义 ============
class Message(TypedDict):
    role: str
    content: str
    timestamp: str
class AgentState(TypedDict):
    conversation_id: str
    messages: Annotated[List[Message], operator.add]
    category: str
    assigned_agent: str
    agent_response: str
    quality_score: int
    requires_human_review: bool
    final_response: str
    metadata: dict
# ============ 初始化 LLM ============
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# ============ 节点函数 ============
def categorize_inquiry(state: AgentState) -> AgentState:
    """分类用户咨询类型"""
    messages = state["messages"]
    user_message = messages[-1]["content"]
    
    prompt = f"""分析以下用户咨询,将其分类到最合适的类别:
咨询内容:{user_message}
可选类别:
1. technical - 技术问题、产品使用、功能咨询
2. sales - 价格、购买、套餐、折扣
3. support - 售后、退款、投诉、账户问题
4. feedback - 建议、反馈、评价
只返回类别名称(technical/sales/support/feedback)。"""
    response = llm.invoke(prompt)
    category = response.content.strip().lower()
    
    # 映射到标准类别
    category_map = {
        "technical": "technical",
        "sales": "sales", 
        "support": "support",
        "feedback": "feedback"
    }
    
    assigned_category = category_map.get(category, "support")
    
    return {
        "category": assigned_category,
        "metadata": {"categorized_at": datetime.now().isoformat()}
    }
def technical_agent(state: AgentState) -> AgentState:
    """技术专家智能体"""
    messages = state["messages"]
    user_message = messages[-1]["content"]
    
    system_prompt = """你是一位资深技术专家,专门解答产品使用和技术问题。
你的回答应该:
- 准确、详细、专业
- 提供具体的操作步骤
- 包含相关示例代码(如适用)
- 指出可能的注意事项和最佳实践"""
    prompt = f"""{system_prompt}
用户问题:{user_message}
请提供专业、详细的技术解答。"""
    response = llm.invoke(prompt)
    
    return {
        "assigned_agent": "technical_expert",
        "agent_response": response.content
    }
def sales_agent(state: AgentState) -> AgentState:
    """销售顾问智能体"""
    messages = state["messages"]
    user_message = messages[-1]["content"]
    
    system_prompt = """你是一位专业的销售顾问,负责解答价格、购买相关问题。
你的回答应该:
- 友好、热情、有说服力
- 清晰说明价格信息和优惠政策
- 主动推荐合适的产品套餐
- 提供购买链接或引导"""
    prompt = f"""{system_prompt}
用户咨询:{user_message}
请提供有帮助的销售建议。"""
    response = llm.invoke(prompt)
    
    return {
        "assigned_agent": "sales_consultant",
        "agent_response": response.content
    }
def support_agent(state: AgentState) -> AgentState:
    """售后支持智能体"""
    messages = state["messages"]
    user_message = messages[-1]["content"]
    
    system_prompt = """你是一位耐心的售后支持专家,处理用户投诉、退款、账户问题。
你的回答应该:
- 表达理解和同理心
- 提供清晰的解决方案
- 说明处理流程和预计时间
- 必要时提供升级渠道"""
    prompt = f"""{system_prompt}
用户问题:{user_message}
请提供周到、专业的支持回复。"""
    response = llm.invoke(prompt)
    
    return {
        "assigned_agent": "support_specialist",
        "agent_response": response.content
    }
def feedback_agent(state: AgentState) -> AgentState:
    """反馈处理智能体"""
    messages = state["messages"]
    user_message = messages[-1]["content"]
    
    system_prompt = """你负责处理用户反馈和建议。
你的回答应该:
- 感谢用户的反馈
- 认真对待每一条建议
- 说明反馈的处理流程
- 邀请用户继续参与产品改进"""
    prompt = f"""{system_prompt}
用户反馈:{user_message}
请提供真诚、专业的回复。"""
    response = llm.invoke(prompt)
    
    return {
        "assigned_agent": "feedback_coordinator",
        "agent_response": response.content
    }
def quality_check(state: AgentState) -> AgentState:
    """质量检查节点"""
    agent_response = state["agent_response"]
    category = state["category"]
    
    prompt = f"""评估以下客服回复的质量:
咨询类别:{category}
客服回复:{agent_response}
评分标准(1-5 分):
5 - 完美:专业、准确、完整、友好
4 - 良好:基本满足要求,小有改进空间
3 - 一般:回答了问题,但不够专业或完整
2 - 较差:有明显错误或遗漏
1 - 不合格:完全不符合要求
请只返回数字评分(1-5)。"""
    response = llm.invoke(prompt)
    
    try:
        score = int(response.content.strip())
        score = max(1, min(5, score))  # 限制在 1-5 范围
    except:
        score = 3  # 默认中等分数
    
    # 低分需要人工审核
    requires_human = score < 3
    
    return {
        "quality_score": score,
        "requires_human_review": requires_human,
        "metadata": {
            **state.get("metadata", {}),
            "quality_checked_at": datetime.now().isoformat()
        }
    }
def human_review(state: AgentState) -> AgentState:
    """人工审核节点(模拟)"""
    # 在实际应用中,这里会等待真实的人工审核
    # 这里我们模拟一个简化的审核流程
    
    agent_response = state["agent_response"]
    
    # 模拟人工微调
    refined_response = f"[已审核] {agent_response}\n\n---\n*此回复已通过人工质量审核*"
    
    return {
        "final_response": refined_response,
        "metadata": {
            **state.get("metadata", {}),
            "human_reviewed": True,
            "reviewed_at": datetime.now().isoformat()
        }
    }
def auto_approve(state: AgentState) -> AgentState:
    """自动批准(高质量回复)"""
    return {
        "final_response": state["agent_response"],
        "metadata": {
            **state.get("metadata", {}),
            "human_reviewed": False,
            "auto_approved": True
        }
    }
def format_response(state: AgentState) -> AgentState:
    """格式化最终回复"""
    final_response = state["final_response"]
    
    # 添加统一签名
    formatted = f"""{final_response}
---
**客服团队** | {datetime.now().strftime('%Y-%m-%d %H:%M')}
工单编号:{state.get('conversation_id', 'N/A')}"""
    
    return {"final_response": formatted}
# ============ 构建图 ============
def route_to_agent(state: AgentState) -> str:
    """根据分类路由到对应智能体"""
    category = state.get("category", "support")
    
    route_map = {
        "technical": "technical_agent",
        "sales": "sales_agent",
        "support": "support_agent",
        "feedback": "feedback_agent"
    }
    
    return route_map.get(category, "support_agent")
def route_after_quality(state: AgentState) -> str:
    """质量检查后路由"""
    if state.get("requires_human_review", False):
        return "human_review"
    else:
        return "auto_approve"
# 创建工作流图
customer_service_graph = StateGraph(AgentState)
# 添加所有节点
customer_service_graph.add_node("categorize", categorize_inquiry)
customer_service_graph.add_node("technical_agent", technical_agent)
customer_service_graph.add_node("sales_agent", sales_agent)
customer_service_graph.add_node("support_agent", support_agent)
customer_service_graph.add_node("feedback_agent", feedback_agent)
customer_service_graph.add_node("quality_check", quality_check)
customer_service_graph.add_node("human_review", human_review)
customer_service_graph.add_node("auto_approve", auto_approve)
customer_service_graph.add_node("format_response", format_response)
# 设置入口
customer_service_graph.set_entry_point("categorize")
# 添加边
customer_service_graph.add_conditional_edges(
    "categorize",
    route_to_agent,
    {
        "technical_agent": "technical_agent",
        "sales_agent": "sales_agent",
        "support_agent": "support_agent",
        "feedback_agent": "feedback_agent"
    }
)
# 所有智能体完成后进入质量检查
customer_service_graph.add_edge("technical_agent", "quality_check")
customer_service_graph.add_edge("sales_agent", "quality_check")
customer_service_graph.add_edge("support_agent", "quality_check")
customer_service_graph.add_edge("feedback_agent", "quality_check")
# 质量检查后路由
customer_service_graph.add_conditional_edges(
    "quality_check",
    route_after_quality,
    {
        "human_review": "human_review",
        "auto_approve": "auto_approve"
    }
)
# 审核后进入格式化
customer_service_graph.add_edge("human_review", "format_response")
customer_service_graph.add_edge("auto_approve", "format_response")
# 格式化后结束
customer_service_graph.add_edge("format_response", END)
# 编译应用
customer_service_app = customer_service_graph.compile()

使用示例

import uuid
def chat_with_customer(message: str) -> str:
    """与客户对话的便捷函数"""
    
    initial_state = {
        "conversation_id": str(uuid.uuid4())[:8],
        "messages": [{
            "role": "user",
            "content": message,
            "timestamp": datetime.now().isoformat()
        }],
        "category": "",
        "assigned_agent": "",
        "agent_response": "",
        "quality_score": 0,
        "requires_human_review": False,
        "final_response": "",
        "metadata": {}
    }
    
    result = customer_service_app.invoke(initial_state)
    return result["final_response"]
# 测试不同场景
print("=" * 50)
print("场景 1:技术问题")
print("=" * 50)
response = chat_with_customer("如何配置 API 密钥?我的请求一直返回 401 错误。")
print(response)
print("\n" + "=" * 50)
print("场景 2:销售咨询")
print("=" * 50)
response = chat_with_customer("企业版套餐有什么优惠?我们公司有 50 人需要购买。")
print(response)
print("\n" + "=" * 50)
print("场景 3:售后支持")
print("=" * 50)
response = chat_with_customer("我昨天提交的退款申请什么时候能处理?")
print(response)

高级技巧

1. 持久化状态(检查点)

LangGraph 支持将状态保存到数据库,实现长期记忆和断点续执行:

from langgraph.checkpoint.memory import MemorySaver
# 创建内存检查点
memory = MemorySaver()
# 编译时传入检查点
app_with_memory = workflow.compile(checkpointer=memory)
# 使用线程 ID 区分不同会话
config = {"configurable": {"thread_id": "user-123"}}
# 第一次调用
result1 = app_with_memory.invoke(
    {"messages": [{"role": "user", "content": "你好"}]},
    config=config
)
# 后续调用会记住之前的对话
result2 = app_with_memory.invoke(
    {"messages": [{"role": "user", "content": "继续刚才的话题"}]},
    config=config
)

2. 流式输出

实时获取工作流执行过程中的状态更新:

for event in app.stream(initial_state, stream_mode="updates"):
    for node, output in event.items():
        print(f"节点 [{node}] 完成:")
        print(output)
        print("-" * 30)

3. 子图(Subgraph)

将复杂工作流拆分为可复用的子图:

# 创建子图
subgraph = StateGraph(SubState)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.set_entry_point("step1")
subgraph.set_finish_point("step2")
# 在主图中使用子图
main_graph = StateGraph(MainState)
main_graph.add_node("subgraph", subgraph.compile())

4. 并行执行

多个节点同时执行,提高效率:

# 添加并行节点
workflow.add_node("parallel_1", func1)
workflow.add_node("parallel_2", func2)
# 从同一节点分支
workflow.add_edge("previous_node", "parallel_1")
workflow.add_edge("previous_node", "parallel_2")
# 汇聚到同一节点
workflow.add_edge("parallel_1", "merge_node")
workflow.add_edge("parallel_2", "merge_node")

5. 超时和重试

from langgraph.pregel import RetryPolicy
app = workflow.compile(
    retry_policy=RetryPolicy(
        max_attempts=3,
        initial_interval=1.0,
        max_interval=30.0
    )
)

生产环境部署

1. 使用 LangGraph Server

LangGraph 提供了专门的服务器用于生产部署:

# 安装 LangGraph CLI
pip install langgraph-cli
# 创建配置文件 langgraph.json
# {
#   "graphs": {
#     "customer_service": "./app.py:customer_service_app"
#   },
#   "env": "./.env"
# }
# 启动服务器
langgraph dev

2. Docker 部署

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8123
CMD ["langgraph", "dev", "--host", "0.0.0.0"]

3. 监控和日志

import logging
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("langgraph")
# 添加自定义监控
def monitor_node_execution(state: AgentState) -> AgentState:
    start_time = datetime.now()
    logger.info(f"节点执行开始:{state.get('current_step')}")
    
    # 节点逻辑
    result = process(state)
    
    duration = (datetime.now() - start_time).total_seconds()
    logger.info(f"节点执行完成,耗时:{duration}s")
    
    return result

4. 错误处理

from langgraph.errors import GraphRecursionError
try:
    result = app.invoke(initial_state)
except GraphRecursionError as e:
    logger.error(f"工作流递归超限:{e}")
    # 降级处理
    result = {"final_response": "系统繁忙,请稍后重试。"}
except Exception as e:
    logger.error(f"未知错误:{e}")
    # 转人工处理
    result = {"final_response": "遇到问题,已转接人工客服。"}

常见问题解答

Q1: LangGraph 和 LangChain 是什么关系?

LangGraph 是 LangChain 生态系统的一部分,专注于构建有状态的多智能体工作流。LangChain 提供基础的 LLM 集成和工具,而 LangGraph 在此基础上增加了图结构和状态管理能力。

Q2: 如何调试工作流?

使用 stream_mode="values" 可以查看每一步的完整状态:

for step, state in app.stream(initial_state, stream_mode="values"):
    print(f"步骤 {step}:")
    print(f"当前状态:{state}")

Q3: 支持哪些 LLM 提供商?

LangGraph 与 LangChain 完全兼容,支持所有 LangChain 集成的模型提供商,包括 OpenAI、Anthropic、Google、Azure、本地模型等。

Q4: 如何处理长对话历史?

使用检查点机制配合消息截断策略:

def truncate_messages(messages: List[Message], max_length: int = 20) -> List[Message]:
    """保留最近的消息,避免上下文过长"""
    if len(messages) > max_length:
        # 保留系统消息和最近的消息
        system_messages = [m for m in messages if m["role"] == "system"]
        recent_messages = messages[-max_length:]
        return system_messages + recent_messages
    return messages

Q5: 如何实现 A/B 测试?

创建多个版本的工作流,根据用户 ID 哈希分配:

def get_workflow_version(user_id: str) -> str:
    hash_value = hash(user_id) % 100
    if hash_value < 50:
        return "version_a"
    else:
        return "version_b"

总结

LangGraph 为构建复杂的多智能体 AI 应用提供了强大的框架支持。通过本文的学习,你应该已经掌握了:

  • ✅ LangGraph 的核心概念和架构
  • ✅ 如何搭建开发环境并创建基本工作流
  • ✅ 构建实用的多智能体客服系统
  • ✅ 高级功能:持久化、流式输出、子图、并行执行
  • ✅ 生产环境部署和监控最佳实践

下一步建议:

  1. 从简单工作流开始,逐步增加复杂度
  2. 充分利用 LangGraph 的可视化工具调试工作流
  3. 在生产环境中实施完善的监控和日志
  4. 关注 LangGraph 的更新,新功能不断推出

参考资源

发表评论

你的邮箱地址不会被公开,带 * 的为必填项。