LangGraph 完全指南:构建多智能体协作工作流的实战教程
引言
在 AI 智能体开发领域,单个模型的能力往往有限。当面对复杂任务时,我们需要多个 AI 智能体协作完成——就像人类团队分工合作一样。LangGraph 正是为此而生的开源框架,它让开发者能够轻松构建状态化、多参与者的 AI 应用工作流。
本文将带你从零开始学习 LangGraph,通过实际案例掌握如何构建高效的多智能体协作系统。无论你是想打造自动化客服、智能代码审查系统,还是复杂的数据分析管道,LangGraph 都能提供强大的支持。
本文你将学到:
- LangGraph 的核心概念和工作原理
- 如何安装配置 LangGraph 开发环境
- 构建第一个多智能体工作流
- 实战案例:智能客服系统
- 高级技巧:状态管理、条件路由、人机协作
- 生产环境部署最佳实践
什么是 LangGraph?
LangGraph 是由 LangChain 团队开发的开源库,专门用于构建有状态的、多参与者的 AI 应用。它基于图(Graph)的概念,将 AI 工作流建模为节点(Nodes)和边(Edges)的组合。
核心特点
- 状态化管理:自动追踪工作流执行过程中的所有状态变化
- 循环支持:原生支持循环工作流,适合迭代式任务
- 多智能体协作:轻松编排多个 AI 智能体协同工作
- 人机交互:内置人类审批和干预机制
- 可观测性:完整的执行轨迹记录,便于调试和监控
适用场景
- 复杂任务分解:将大型任务拆分为多个子任务,由不同智能体处理
- 多轮对话系统:需要记忆上下文的多轮交互应用
- 审批工作流:需要人工审核关键决策的自动化流程
- 数据管道:多步骤的数据处理和转换任务
- 智能客服:根据问题类型路由到不同专家智能体
环境搭建与安装
前置要求
- Python 3.9 或更高版本
- pip 包管理器
- 一个 LLM API 密钥(OpenAI、Anthropic 等)
安装步骤
# 创建虚拟环境(推荐) python -m venv langgraph-env source langgraph-env/bin/activate # Linux/macOS # 或 langgraph-env\Scripts\activate # Windows # 安装 LangGraph 及相关依赖 pip install langgraph langchain langchain-openai langchain-anthropic # 安装可选的可视化工具 pip install langgraph-cli
配置 API 密钥
# 设置环境变量 export OPENAI_API_KEY="your-openai-api-key" # 或使用 Anthropic export ANTHROPIC_API_KEY="your-anthropic-api-key"
LangGraph 核心概念
在深入代码之前,让我们理解 LangGraph 的几个核心概念:
1. 图(Graph)
图是 LangGraph 的基本结构,由节点和边组成。它定义了工作流的执行路径。
2. 节点(Node)
节点是工作流中的执行单元,通常是一个函数或智能体。每个节点接收状态输入,处理后返回状态更新。
3. 边(Edge)
边定义了节点之间的连接关系,控制执行流程。分为两种类型:
- 普通边:直接连接到下一个节点
- 条件边:根据条件动态决定下一个节点
4. 状态(State)
状态是贯穿整个工作流的数据结构,所有节点都可以读取和修改状态。通常使用 TypedDict 或 Pydantic 模型定义。
5. 检查点(Checkpoint)
检查点机制允许工作流在任意时刻保存状态,支持断点续执行和长期记忆。
构建第一个 LangGraph 工作流
让我们从一个简单的例子开始:创建一个能够分析用户问题并给出建议的智能助手。
步骤 1:定义状态结构
from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph
import operator
# 定义消息类型
class Message(TypedDict):
role: str
content: str
# 定义工作流状态
class AgentState(TypedDict):
messages: Annotated[List[Message], operator.add]
current_step: str
analysis_result: str
final_response: str
步骤 2:创建节点函数
from langchain_openai import ChatOpenAI
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
def analyze_question(state: AgentState) -> AgentState:
"""分析用户问题类型"""
messages = state["messages"]
last_message = messages[-1]["content"]
prompt = f"""分析以下用户问题的类型:
问题:{last_message}
请判断这是以下哪种类型:
- 技术咨询
- 代码问题
- 产品使用
- 其他
只返回类型名称。"""
response = llm.invoke(prompt)
return {
"current_step": "analysis",
"analysis_result": response.content
}
def technical_response(state: AgentState) -> AgentState:
"""处理技术咨询"""
messages = state["messages"]
last_message = messages[-1]["content"]
prompt = f"""作为技术专家,回答以下问题:
{last_message}
提供详细、准确的技术解答。"""
response = llm.invoke(prompt)
return {
"current_step": "technical_response",
"final_response": response.content
}
def general_response(state: AgentState) -> AgentState:
"""处理一般问题"""
messages = state["messages"]
last_message = messages[-1]["content"]
prompt = f"""友好地回答以下问题:
{last_message}
保持回答简洁有帮助。"""
response = llm.invoke(prompt)
return {
"current_step": "general_response",
"final_response": response.content
}
步骤 3:构建图结构
def route_question(state: AgentState) -> str:
"""根据分析结果路由到不同节点"""
analysis = state.get("analysis_result", "").lower()
if "技术" in analysis or "代码" in analysis:
return "technical"
else:
return "general"
# 创建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("analyze", analyze_question)
workflow.add_node("technical", technical_response)
workflow.add_node("general", general_response)
# 设置入口点
workflow.set_entry_point("analyze")
# 添加条件边
workflow.add_conditional_edges(
"analyze",
route_question,
{
"technical": "technical",
"general": "general"
}
)
# 编译图
app = workflow.compile()
步骤 4:运行工作流
# 初始状态
initial_state = {
"messages": [{"role": "user", "content": "Python 中如何装饰器?"}],
"current_step": "",
"analysis_result": "",
"final_response": ""
}
# 执行工作流
result = app.invoke(initial_state)
print(result["final_response"])
实战案例:多智能体客服系统
现在让我们构建一个更复杂的实际应用——一个由多个专家智能体组成的客服系统。
系统架构
用户问题 → 分类器 → [技术专家 | 销售顾问 | 售后支持] → 质量检查 → 回复用户
↑
人工审核(可选)
完整实现代码
from typing import TypedDict, List, Annotated, Optional
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import operator
from datetime import datetime
# ============ 状态定义 ============
class Message(TypedDict):
role: str
content: str
timestamp: str
class AgentState(TypedDict):
conversation_id: str
messages: Annotated[List[Message], operator.add]
category: str
assigned_agent: str
agent_response: str
quality_score: int
requires_human_review: bool
final_response: str
metadata: dict
# ============ 初始化 LLM ============
llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
# ============ 节点函数 ============
def categorize_inquiry(state: AgentState) -> AgentState:
"""分类用户咨询类型"""
messages = state["messages"]
user_message = messages[-1]["content"]
prompt = f"""分析以下用户咨询,将其分类到最合适的类别:
咨询内容:{user_message}
可选类别:
1. technical - 技术问题、产品使用、功能咨询
2. sales - 价格、购买、套餐、折扣
3. support - 售后、退款、投诉、账户问题
4. feedback - 建议、反馈、评价
只返回类别名称(technical/sales/support/feedback)。"""
response = llm.invoke(prompt)
category = response.content.strip().lower()
# 映射到标准类别
category_map = {
"technical": "technical",
"sales": "sales",
"support": "support",
"feedback": "feedback"
}
assigned_category = category_map.get(category, "support")
return {
"category": assigned_category,
"metadata": {"categorized_at": datetime.now().isoformat()}
}
def technical_agent(state: AgentState) -> AgentState:
"""技术专家智能体"""
messages = state["messages"]
user_message = messages[-1]["content"]
system_prompt = """你是一位资深技术专家,专门解答产品使用和技术问题。
你的回答应该:
- 准确、详细、专业
- 提供具体的操作步骤
- 包含相关示例代码(如适用)
- 指出可能的注意事项和最佳实践"""
prompt = f"""{system_prompt}
用户问题:{user_message}
请提供专业、详细的技术解答。"""
response = llm.invoke(prompt)
return {
"assigned_agent": "technical_expert",
"agent_response": response.content
}
def sales_agent(state: AgentState) -> AgentState:
"""销售顾问智能体"""
messages = state["messages"]
user_message = messages[-1]["content"]
system_prompt = """你是一位专业的销售顾问,负责解答价格、购买相关问题。
你的回答应该:
- 友好、热情、有说服力
- 清晰说明价格信息和优惠政策
- 主动推荐合适的产品套餐
- 提供购买链接或引导"""
prompt = f"""{system_prompt}
用户咨询:{user_message}
请提供有帮助的销售建议。"""
response = llm.invoke(prompt)
return {
"assigned_agent": "sales_consultant",
"agent_response": response.content
}
def support_agent(state: AgentState) -> AgentState:
"""售后支持智能体"""
messages = state["messages"]
user_message = messages[-1]["content"]
system_prompt = """你是一位耐心的售后支持专家,处理用户投诉、退款、账户问题。
你的回答应该:
- 表达理解和同理心
- 提供清晰的解决方案
- 说明处理流程和预计时间
- 必要时提供升级渠道"""
prompt = f"""{system_prompt}
用户问题:{user_message}
请提供周到、专业的支持回复。"""
response = llm.invoke(prompt)
return {
"assigned_agent": "support_specialist",
"agent_response": response.content
}
def feedback_agent(state: AgentState) -> AgentState:
"""反馈处理智能体"""
messages = state["messages"]
user_message = messages[-1]["content"]
system_prompt = """你负责处理用户反馈和建议。
你的回答应该:
- 感谢用户的反馈
- 认真对待每一条建议
- 说明反馈的处理流程
- 邀请用户继续参与产品改进"""
prompt = f"""{system_prompt}
用户反馈:{user_message}
请提供真诚、专业的回复。"""
response = llm.invoke(prompt)
return {
"assigned_agent": "feedback_coordinator",
"agent_response": response.content
}
def quality_check(state: AgentState) -> AgentState:
"""质量检查节点"""
agent_response = state["agent_response"]
category = state["category"]
prompt = f"""评估以下客服回复的质量:
咨询类别:{category}
客服回复:{agent_response}
评分标准(1-5 分):
5 - 完美:专业、准确、完整、友好
4 - 良好:基本满足要求,小有改进空间
3 - 一般:回答了问题,但不够专业或完整
2 - 较差:有明显错误或遗漏
1 - 不合格:完全不符合要求
请只返回数字评分(1-5)。"""
response = llm.invoke(prompt)
try:
score = int(response.content.strip())
score = max(1, min(5, score)) # 限制在 1-5 范围
except:
score = 3 # 默认中等分数
# 低分需要人工审核
requires_human = score < 3
return {
"quality_score": score,
"requires_human_review": requires_human,
"metadata": {
**state.get("metadata", {}),
"quality_checked_at": datetime.now().isoformat()
}
}
def human_review(state: AgentState) -> AgentState:
"""人工审核节点(模拟)"""
# 在实际应用中,这里会等待真实的人工审核
# 这里我们模拟一个简化的审核流程
agent_response = state["agent_response"]
# 模拟人工微调
refined_response = f"[已审核] {agent_response}\n\n---\n*此回复已通过人工质量审核*"
return {
"final_response": refined_response,
"metadata": {
**state.get("metadata", {}),
"human_reviewed": True,
"reviewed_at": datetime.now().isoformat()
}
}
def auto_approve(state: AgentState) -> AgentState:
"""自动批准(高质量回复)"""
return {
"final_response": state["agent_response"],
"metadata": {
**state.get("metadata", {}),
"human_reviewed": False,
"auto_approved": True
}
}
def format_response(state: AgentState) -> AgentState:
"""格式化最终回复"""
final_response = state["final_response"]
# 添加统一签名
formatted = f"""{final_response}
---
**客服团队** | {datetime.now().strftime('%Y-%m-%d %H:%M')}
工单编号:{state.get('conversation_id', 'N/A')}"""
return {"final_response": formatted}
# ============ 构建图 ============
def route_to_agent(state: AgentState) -> str:
"""根据分类路由到对应智能体"""
category = state.get("category", "support")
route_map = {
"technical": "technical_agent",
"sales": "sales_agent",
"support": "support_agent",
"feedback": "feedback_agent"
}
return route_map.get(category, "support_agent")
def route_after_quality(state: AgentState) -> str:
"""质量检查后路由"""
if state.get("requires_human_review", False):
return "human_review"
else:
return "auto_approve"
# 创建工作流图
customer_service_graph = StateGraph(AgentState)
# 添加所有节点
customer_service_graph.add_node("categorize", categorize_inquiry)
customer_service_graph.add_node("technical_agent", technical_agent)
customer_service_graph.add_node("sales_agent", sales_agent)
customer_service_graph.add_node("support_agent", support_agent)
customer_service_graph.add_node("feedback_agent", feedback_agent)
customer_service_graph.add_node("quality_check", quality_check)
customer_service_graph.add_node("human_review", human_review)
customer_service_graph.add_node("auto_approve", auto_approve)
customer_service_graph.add_node("format_response", format_response)
# 设置入口
customer_service_graph.set_entry_point("categorize")
# 添加边
customer_service_graph.add_conditional_edges(
"categorize",
route_to_agent,
{
"technical_agent": "technical_agent",
"sales_agent": "sales_agent",
"support_agent": "support_agent",
"feedback_agent": "feedback_agent"
}
)
# 所有智能体完成后进入质量检查
customer_service_graph.add_edge("technical_agent", "quality_check")
customer_service_graph.add_edge("sales_agent", "quality_check")
customer_service_graph.add_edge("support_agent", "quality_check")
customer_service_graph.add_edge("feedback_agent", "quality_check")
# 质量检查后路由
customer_service_graph.add_conditional_edges(
"quality_check",
route_after_quality,
{
"human_review": "human_review",
"auto_approve": "auto_approve"
}
)
# 审核后进入格式化
customer_service_graph.add_edge("human_review", "format_response")
customer_service_graph.add_edge("auto_approve", "format_response")
# 格式化后结束
customer_service_graph.add_edge("format_response", END)
# 编译应用
customer_service_app = customer_service_graph.compile()
使用示例
import uuid
def chat_with_customer(message: str) -> str:
"""与客户对话的便捷函数"""
initial_state = {
"conversation_id": str(uuid.uuid4())[:8],
"messages": [{
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
}],
"category": "",
"assigned_agent": "",
"agent_response": "",
"quality_score": 0,
"requires_human_review": False,
"final_response": "",
"metadata": {}
}
result = customer_service_app.invoke(initial_state)
return result["final_response"]
# 测试不同场景
print("=" * 50)
print("场景 1:技术问题")
print("=" * 50)
response = chat_with_customer("如何配置 API 密钥?我的请求一直返回 401 错误。")
print(response)
print("\n" + "=" * 50)
print("场景 2:销售咨询")
print("=" * 50)
response = chat_with_customer("企业版套餐有什么优惠?我们公司有 50 人需要购买。")
print(response)
print("\n" + "=" * 50)
print("场景 3:售后支持")
print("=" * 50)
response = chat_with_customer("我昨天提交的退款申请什么时候能处理?")
print(response)
高级技巧
1. 持久化状态(检查点)
LangGraph 支持将状态保存到数据库,实现长期记忆和断点续执行:
from langgraph.checkpoint.memory import MemorySaver
# 创建内存检查点
memory = MemorySaver()
# 编译时传入检查点
app_with_memory = workflow.compile(checkpointer=memory)
# 使用线程 ID 区分不同会话
config = {"configurable": {"thread_id": "user-123"}}
# 第一次调用
result1 = app_with_memory.invoke(
{"messages": [{"role": "user", "content": "你好"}]},
config=config
)
# 后续调用会记住之前的对话
result2 = app_with_memory.invoke(
{"messages": [{"role": "user", "content": "继续刚才的话题"}]},
config=config
)
2. 流式输出
实时获取工作流执行过程中的状态更新:
for event in app.stream(initial_state, stream_mode="updates"):
for node, output in event.items():
print(f"节点 [{node}] 完成:")
print(output)
print("-" * 30)
3. 子图(Subgraph)
将复杂工作流拆分为可复用的子图:
# 创建子图
subgraph = StateGraph(SubState)
subgraph.add_node("step1", step1_func)
subgraph.add_node("step2", step2_func)
subgraph.add_edge("step1", "step2")
subgraph.set_entry_point("step1")
subgraph.set_finish_point("step2")
# 在主图中使用子图
main_graph = StateGraph(MainState)
main_graph.add_node("subgraph", subgraph.compile())
4. 并行执行
多个节点同时执行,提高效率:
# 添加并行节点
workflow.add_node("parallel_1", func1)
workflow.add_node("parallel_2", func2)
# 从同一节点分支
workflow.add_edge("previous_node", "parallel_1")
workflow.add_edge("previous_node", "parallel_2")
# 汇聚到同一节点
workflow.add_edge("parallel_1", "merge_node")
workflow.add_edge("parallel_2", "merge_node")
5. 超时和重试
from langgraph.pregel import RetryPolicy
app = workflow.compile(
retry_policy=RetryPolicy(
max_attempts=3,
initial_interval=1.0,
max_interval=30.0
)
)
生产环境部署
1. 使用 LangGraph Server
LangGraph 提供了专门的服务器用于生产部署:
# 安装 LangGraph CLI
pip install langgraph-cli
# 创建配置文件 langgraph.json
# {
# "graphs": {
# "customer_service": "./app.py:customer_service_app"
# },
# "env": "./.env"
# }
# 启动服务器
langgraph dev
2. Docker 部署
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . EXPOSE 8123 CMD ["langgraph", "dev", "--host", "0.0.0.0"]
3. 监控和日志
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("langgraph")
# 添加自定义监控
def monitor_node_execution(state: AgentState) -> AgentState:
start_time = datetime.now()
logger.info(f"节点执行开始:{state.get('current_step')}")
# 节点逻辑
result = process(state)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"节点执行完成,耗时:{duration}s")
return result
4. 错误处理
from langgraph.errors import GraphRecursionError
try:
result = app.invoke(initial_state)
except GraphRecursionError as e:
logger.error(f"工作流递归超限:{e}")
# 降级处理
result = {"final_response": "系统繁忙,请稍后重试。"}
except Exception as e:
logger.error(f"未知错误:{e}")
# 转人工处理
result = {"final_response": "遇到问题,已转接人工客服。"}
常见问题解答
Q1: LangGraph 和 LangChain 是什么关系?
LangGraph 是 LangChain 生态系统的一部分,专注于构建有状态的多智能体工作流。LangChain 提供基础的 LLM 集成和工具,而 LangGraph 在此基础上增加了图结构和状态管理能力。
Q2: 如何调试工作流?
使用 stream_mode="values" 可以查看每一步的完整状态:
for step, state in app.stream(initial_state, stream_mode="values"):
print(f"步骤 {step}:")
print(f"当前状态:{state}")
Q3: 支持哪些 LLM 提供商?
LangGraph 与 LangChain 完全兼容,支持所有 LangChain 集成的模型提供商,包括 OpenAI、Anthropic、Google、Azure、本地模型等。
Q4: 如何处理长对话历史?
使用检查点机制配合消息截断策略:
def truncate_messages(messages: List[Message], max_length: int = 20) -> List[Message]:
"""保留最近的消息,避免上下文过长"""
if len(messages) > max_length:
# 保留系统消息和最近的消息
system_messages = [m for m in messages if m["role"] == "system"]
recent_messages = messages[-max_length:]
return system_messages + recent_messages
return messages
Q5: 如何实现 A/B 测试?
创建多个版本的工作流,根据用户 ID 哈希分配:
def get_workflow_version(user_id: str) -> str:
hash_value = hash(user_id) % 100
if hash_value < 50:
return "version_a"
else:
return "version_b"
总结
LangGraph 为构建复杂的多智能体 AI 应用提供了强大的框架支持。通过本文的学习,你应该已经掌握了:
- ✅ LangGraph 的核心概念和架构
- ✅ 如何搭建开发环境并创建基本工作流
- ✅ 构建实用的多智能体客服系统
- ✅ 高级功能:持久化、流式输出、子图、并行执行
- ✅ 生产环境部署和监控最佳实践
下一步建议:
- 从简单工作流开始,逐步增加复杂度
- 充分利用 LangGraph 的可视化工具调试工作流
- 在生产环境中实施完善的监控和日志
- 关注 LangGraph 的更新,新功能不断推出