AI 智能体内存管理实战:如何让 RAG 应用记住用户对话上下文的 6 个核心技巧
构建有”记忆”的 AI 应用:从会话状态管理到长期记忆存储的完整实战指南
引言:为什么你的 AI 智能体总是”失忆”?
想象这个场景:用户在你的 AI 客服应用中询问了产品价格,五分钟后又问”刚才说的那个产品有优惠吗?“——如果你的应用回答”请问您指的是哪个产品?”,用户体验瞬间崩塌。
这就是 AI 智能体”失忆症”的典型表现。在构建 RAG(检索增强生成)应用时,内存管理是决定用户体验的关键因素,却往往被开发者忽视。
根据 Anthropic 最新发布的 AI 智能体开发报告,73% 的生产级 AI 应用失败源于糟糕的上下文管理。用户期望 AI 能够记住对话历史、偏好设置、之前的决策——就像人类对话一样自然流畅。
本文将深入探讨 AI 智能体内存管理的 6 个核心技巧,帮助你构建真正”有记忆”的 AI 应用。无论你是使用 LangChain、LlamaIndex 还是自定义 RAG 架构,这些实战经验都能直接应用。
技巧一:分层记忆架构设计
为什么需要分层?
人类的记忆不是单一存储桶。我们有:
- 工作记忆:当前对话的短期上下文
- 情景记忆:特定会话中的事件和对话
- 语义记忆:长期积累的知识和事实
- 程序记忆:学会的技能和操作流程
AI 智能体同样需要这种分层架构。将所有信息塞进一个向量数据库是新手最常见的错误。
三层记忆模型实战
# 记忆分层架构示例
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta
@dataclass
class WorkingMemory:
"""工作记忆:当前对话轮次,保存在内存中"""
conversation_turns: List[Dict] # 最近 10-20 轮对话
current_intent: str # 当前用户意图
temporary_variables: Dict # 临时变量(如表单填写进度)
max_turns: int = 20
def add_turn(self, role: str, content: str):
self.conversation_turns.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
# 保持滑动窗口
if len(self.conversation_turns) > self.max_turns:
self.conversation_turns.pop(0)
@dataclass
class EpisodicMemory:
"""情景记忆:会话级别的事件,存储在 Redis 或类似数据库中"""
session_id: str
user_id: str
events: List[Dict]
created_at: datetime
last_accessed: datetime
def add_event(self, event_type: str, data: Dict, importance: float = 0.5):
self.events.append({
"type": event_type,
"data": data,
"importance": importance,
"timestamp": datetime.now().isoformat()
})
@dataclass
class SemanticMemory:
"""语义记忆:长期知识和用户画像,存储在向量数据库中"""
user_id: str
facts: List[Dict] # 用户相关事实
preferences: Dict # 用户偏好
knowledge_embeddings: List[float] # 向量嵌入
def add_fact(self, fact: str, confidence: float = 0.8):
self.facts.append({
"content": fact,
"confidence": confidence,
"created_at": datetime.now().isoformat()
})
各层存储策略
| 记忆类型 | 存储介质 | 保留时间 | 访问延迟 | 典型用例 |
|---|---|---|---|---|
| 工作记忆 | 应用内存 | 当前会话 | <1ms | 对话上下文、临时变量 |
| 情景记忆 | Redis/内存数据库 | 7-30 天 | <10ms | 会话历史、用户行为轨迹 |
| 语义记忆 | 向量数据库 (Pinecone/Weaviate) | 永久 | 50-200ms | 用户画像、长期偏好 |
实战代码:记忆管理器实现
import redis
import json
from typing import Optional
class MemoryManager:
def __init__(self, redis_url: str, vector_db_client):
self.redis = redis.from_url(redis_url)
self.vector_db = vector_db_client
self.working_memory: Dict[str, WorkingMemory] = {}
def get_working_memory(self, session_id: str) -> WorkingMemory:
"""获取或创建工作记忆"""
if session_id not in self.working_memory:
self.working_memory[session_id] = WorkingMemory()
return self.working_memory[session_id]
async def save episodic_memory(self, session_id: str, user_id: str, event: Dict):
"""保存情景记忆到 Redis"""
key = f"episodic:{user_id}:{session_id}"
self.redis.lpush(key, json.dumps(event))
self.redis.expire(key, 7 * 24 * 3600) # 7 天过期
async def save_semantic_memory(self, user_id: str, fact: str, embedding: List[float]):
"""保存语义记忆到向量数据库"""
await self.vector_db.upsert(
namespace=f"user:{user_id}",
vectors=[embedding],
metadata=[{"fact": fact, "created_at": datetime.now().isoformat()}]
)
async def retrieve_relevant_memories(self, user_id: str, query: str, top_k: int = 5):
"""检索相关记忆(结合情景和语义记忆)"""
query_embedding = await self.vector_db.embed(query)
# 从向量数据库检索语义记忆
semantic_results = await self.vector_db.query(
namespace=f"user:{user_id}",
vector=query_embedding,
top_k=top_k
)
# 从 Redis 检索最近的情景记忆
episodic_keys = self.redis.keys(f"episodic:{user_id}:*")
episodic_memories = []
for key in episodic_keys[:5]: # 最近 5 个会话
events = self.redis.lrange(key, 0, 10)
episodic_memories.extend([json.loads(e) for e in events])
return {
"semantic": semantic_results,
"episodic": episodic_memories
}
技巧二:智能上下文窗口管理
问题:Token 限制与信息丢失
所有 LLM 都有上下文窗口限制。即使是 Claude 的 200K token,在长对话中也会耗尽。关键是如何智能地决定保留什么、丢弃什么、压缩什么。
策略一:重要性评分机制
不是所有对话内容都同等重要。实现一个重要性评分系统:
def calculate_message_importance(message: Dict) -> float:
"""
计算消息的重要性评分 (0-1)
"""
score = 0.5 # 基础分
# 用户明确表达偏好的消息更重要
preference_keywords = ["我喜欢", "我不喜欢", "记住", "总是", "从不"]
if any(kw in message["content"] for kw in preference_keywords):
score += 0.3
# 包含实体信息(姓名、日期、数字)的消息更重要
if re.search(r'\d+|[A-Z][a-z]+', message["content"]):
score += 0.1
# 问题 - 答案对很重要
if message["role"] == "assistant" and "?" in message.get("previous_user_message", ""):
score += 0.2
# 最近的消息更重要(时间衰减)
time_decay = calculate_time_decay(message["timestamp"])
score *= time_decay
return min(score, 1.0)
def calculate_time_decay(timestamp: str, half_life_hours: int = 24) -> float:
"""计算时间衰减因子"""
message_time = datetime.fromisoformat(timestamp)
hours_ago = (datetime.now() - message_time).total_seconds() / 3600
return 0.5 ** (hours_ago / half_life_hours)
策略二:对话摘要压缩
当上下文接近限制时,自动压缩早期对话:
async def compress_conversation_history(
conversation: List[Dict],
llm_client,
target_token_count: int
) -> List[Dict]:
"""
压缩对话历史,保留关键信息
"""
current_tokens = count_tokens(conversation)
if current_tokens <= target_token_count:
return conversation
# 保留最近 N 轮完整对话
recent_turns = 5
preserved = conversation[-recent_turns * 2:] # 用户 + 助手
# 压缩早期对话为摘要
early_conversation = conversation[:-recent_turns * 2]
if early_conversation:
summary = await llm_client.generate(
prompt=f"""请将以下对话压缩为 3-5 句话的摘要,保留所有关键事实、用户偏好和已完成的决策:
{format_conversation(early_conversation)}
摘要:""",
max_tokens=200
)
# 将摘要作为系统消息插入
preserved.insert(0, {
"role": "system",
"content": f"【对话摘要】{summary}"
})
return preserved
策略三:分层检索策略
不要将所有历史都放入 prompt,而是按需检索:
async def build_context_aware_prompt(
user_query: str,
working_memory: WorkingMemory,
memory_manager: MemoryManager,
user_id: str,
max_context_tokens: int = 3000
) -> str:
"""
构建上下文感知的 prompt,动态检索相关记忆
"""
# 1. 始终包含最近 5 轮对话
recent_context = format_conversation(
working_memory.conversation_turns[-10:]
)
# 2. 基于当前查询检索相关记忆
relevant_memories = await memory_manager.retrieve_relevant_memories(
user_id=user_id,
query=user_query,
top_k=5
)
# 3. 构建最终 prompt
prompt_parts = []
if relevant_memories["semantic"]:
facts = "\n".join([m["fact"] for m in relevant_memories["semantic"]])
prompt_parts.append(f"【用户已知信息】\n{facts}\n")
if relevant_memories["episodic"]:
recent_events = "\n".join([
f"- {e['type']}: {e['data']}"
for e in relevant_memories["episodic"][:5]
])
prompt_parts.append(f"【最近活动】\n{recent_events}\n")
prompt_parts.append(f"【当前对话】\n{recent_context}")
prompt_parts.append(f"【用户问题】\n{user_query}")
final_prompt = "\n\n".join(prompt_parts)
# 4. 如果超过 token 限制,压缩最早的内容
if count_tokens(final_prompt) > max_context_tokens:
final_prompt = await compress_to_token_limit(
final_prompt,
max_context_tokens,
llm_client
)
return final_prompt
技巧三:用户画像的动态更新
问题:静态画像 vs 动态学习
大多数 RAG 应用的用户画像是静态的——创建后不再更新。但用户会改变:新偏好、新需求、新行为模式。AI 智能体需要持续学习。
实现:增量式画像更新
class UserProfileManager:
def __init__(self, vector_db, redis_client):
self.vector_db = vector_db
self.redis = redis_client
async def extract_user_facts(self, conversation_turn: Dict) -> List[Dict]:
"""
从对话中提取用户相关事实
"""
extraction_prompt = f"""从以下对话中提取关于用户的事实信息。
只提取明确的、可验证的事实,不要推断。
对话:
{conversation_turn['role']}: {conversation_turn['content']}
提取的事实(JSON 数组格式):"""
response = await self.llm.generate(extraction_prompt)
facts = json.loads(response)
return [
{
"content": fact["statement"],
"category": fact["category"], # 如:偏好、人口统计、行为
"confidence": fact["confidence"],
"source_message_id": conversation_turn["id"],
"timestamp": datetime.now().isoformat()
}
for fact in facts
]
async def update_user_profile(self, user_id: str, new_facts: List[Dict]):
"""
增量更新用户画像,处理冲突和重复
"""
# 获取现有画像
existing_facts = await self.get_user_facts(user_id)
for new_fact in new_facts:
# 检查是否有冲突或重复
similar_facts = await self.find_similar_facts(
user_id,
new_fact["content"]
)
if similar_facts:
# 更新现有事实(增加置信度或更新时间)
await self.update_existing_fact(
user_id,
similar_facts[0]["id"],
new_fact
)
else:
# 添加新事实
await self.add_new_fact(user_id, new_fact)
# 定期清理低置信度的旧事实
await self.prune_low_confidence_facts(user_id)
async def find_similar_facts(self, user_id: str, fact: str, threshold: float = 0.85):
"""使用向量相似度查找相似事实"""
embedding = await self.vector_db.embed(fact)
results = await self.vector_db.query(
namespace=f"user_profile:{user_id}",
vector=embedding,
top_k=5,
filter={"confidence": {"$gte": 0.5}}
)
return [r for r in results if r["score"] >= threshold]
实战:偏好学习系统
async def learn_user_preferences(
user_id: str,
interaction: Dict,
profile_manager: UserProfileManager
):
"""
从用户交互中学习偏好
"""
# 检测偏好信号
preference_signals = []
# 显式偏好(用户直接说)
if "我喜欢" in interaction["content"] or "我不喜欢" in interaction["content"]:
preference_signals.append({
"type": "explicit",
"content": interaction["content"],
"confidence": 0.9
})
# 隐式偏好(行为推断)
if interaction["type"] == "selection":
preference_signals.append({
"type": "implicit",
"content": f"用户选择了选项{interaction['selected_option']}",
"confidence": 0.6
})
if interaction["type"] == "rejection":
preference_signals.append({
"type": "implicit",
"content": f"用户拒绝了选项{interaction['rejected_option']}",
"confidence": 0.7
})
# 更新画像
for signal in preference_signals:
await profile_manager.update_user_profile(user_id, [signal])
# 记录学习事件
await profile_manager.log_learning_event(
user_id=user_id,
event_type="preference_learned",
data=preference_signals
)
技巧四:跨会话记忆持久化
问题:会话中断与记忆丢失
用户可能今天开始一个任务,明天继续。如果每次新会话都从零开始,体验极差。需要跨会话记忆持久化。
方案:会话链接与记忆迁移
class CrossSessionMemory:
def __init__(self, db_client, vector_db):
self.db = db_client
self.vector_db = vector_db
async def create_session_checkpoint(self, session_id: str, user_id: str):
"""
创建会话检查点,保存关键状态
"""
# 提取会话关键信息
session_data = await self.db.get_session(session_id)
checkpoint = {
"user_id": user_id,
"session_id": session_id,
"created_at": datetime.now().isoformat(),
"summary": await self.summarize_session(session_data),
"pending_tasks": self.extract_pending_tasks(session_data),
"user_context": self.extract_user_context(session_data),
"important_facts": await self.extract_important_facts(session_data)
}
# 保存到数据库
await self.db.insert("session_checkpoints", checkpoint)
# 同时保存到向量数据库以便检索
summary_embedding = await self.vector_db.embed(checkpoint["summary"])
await self.vector_db.upsert(
namespace=f"checkpoints:{user_id}",
vectors=[summary_embedding],
metadata=[checkpoint]
)
return checkpoint["id"]
async def restore_session_context(self, user_id: str, new_session_id: str):
"""
为新会话恢复相关上下文
"""
# 检索用户最近的检查点
recent_checkpoints = await self.db.query(
"session_checkpoints",
filter={"user_id": user_id},
order_by="created_at DESC",
limit=5
)
# 检索相关的语义记忆
# (基于新会话的第一条消息)
first_message = await self.db.get_first_message(new_session_id)
relevant_checkpoints = await self.vector_db.query(
namespace=f"checkpoints:{user_id}",
vector=await self.vector_db.embed(first_message["content"]),
top_k=3
)
# 合并上下文
restored_context = {
"recent_sessions": recent_checkpoints,
"relevant_sessions": relevant_checkpoints,
"user_profile": await self.get_user_profile(user_id)
}
return restored_context
async def summarize_session(self, session_data: Dict) -> str:
"""使用 LLM 生成会话摘要"""
prompt = f"""总结以下会话的关键信息:
- 用户完成了什么任务
- 遗留了什么待办事项
- 揭示了什么用户偏好
- 任何重要的事实或决策
会话记录:
{format_session(session_data)}
摘要(200 字以内):"""
return await self.llm.generate(prompt)
实战:任务连续性管理
async def handle_session_resumption(
user_id: str,
new_session_id: str,
memory_manager: MemoryManager
) -> Dict:
"""
处理用户返回时的会话恢复
"""
# 1. 查找用户的未完成的任务
pending_tasks = await memory_manager.db.query(
"user_tasks",
filter={
"user_id": user_id,
"status": "in_progress"
}
)
# 2. 检索最近的相关会话
last_sessions = await memory_manager.db.query(
"session_checkpoints",
filter={"user_id": user_id},
order_by="created_at DESC",
limit=3
)
# 3. 生成欢迎消息,提及继续点
if pending_tasks:
task_summary = ", ".join([t["name"] for t in pending_tasks[:3]])
welcome_message = f"欢迎回来!我看到你之前正在处理:{task_summary}。需要继续吗?"
elif last_sessions:
last_topic = last_sessions[0]["summary"][:100]
welcome_message = f"欢迎回来!上次我们讨论了:{last_topic}..."
else:
welcome_message = "欢迎!有什么我可以帮助你的?"
return {
"welcome_message": welcome_message,
"pending_tasks": pending_tasks,
"recent_context": last_sessions
}
技巧五:记忆检索的语义优化
问题:关键词匹配 vs 语义理解
传统的关键词检索会错过语义相关但不含相同词汇的记忆。例如:
- 用户问:“我之前问过关于定价的问题吗?”
- 实际历史:“这个产品多少钱?”
关键词检索可能失败,但语义检索应该匹配。
方案:混合检索策略
class HybridMemoryRetriever:
def __init__(self, vector_db, keyword_index, llm_client):
self.vector_db = vector_db
self.keyword_index = keyword_index # Elasticsearch 或类似
self.llm = llm_client
async def retrieve(self, query: str, user_id: str, top_k: int = 10):
"""
混合检索:结合语义和关键词
"""
# 1. 语义检索(向量搜索)
query_embedding = await self.vector_db.embed(query)
semantic_results = await self.vector_db.query(
namespace=f"user:{user_id}",
vector=query_embedding,
top_k=top_k
)
# 2. 关键词检索
keyword_results = await self.keyword_index.search(
index=f"user_memories_{user_id}",
query=query,
size=top_k
)
# 3. 合并结果(去重 + 重新排序)
merged = self.merge_and_rerank(
semantic_results,
keyword_results,
query
)
# 4. 使用 LLM 进行最终相关性判断(可选,用于高价值场景)
if len(merged) > 5:
merged = await self.llm_rerank(query, merged, top_k=5)
return merged
def merge_and_rerank(
self,
semantic: List[Dict],
keyword: List[Dict],
query: str
) -> List[Dict]:
"""
合并两种检索结果,使用 Reciprocal Rank Fusion
"""
# 创建排名映射
semantic_ranks = {item["id"]: idx + 1 for idx, item in enumerate(semantic)}
keyword_ranks = {item["id"]: idx + 1 for idx, item in enumerate(keyword)}
# 所有唯一 ID
all_ids = set(semantic_ranks.keys()) | set(keyword_ranks.keys())
# 计算 RRF 分数
k = 60 # RRF 常数
scores = []
for item_id in all_ids:
rank_s = semantic_ranks.get(item_id, float('inf'))
rank_k = keyword_ranks.get(item_id, float('inf'))
rrf_score = (1 / (k + rank_s)) + (1 / (k + rank_k))
# 获取原始 item
item = next(
(i for i in semantic if i["id"] == item_id),
next((i for i in keyword if i["id"] == item_id), None)
)
scores.append({
**item,
"rrf_score": rrf_score,
"semantic_rank": rank_s,
"keyword_rank": rank_k
})
# 按 RRF 分数排序
scores.sort(key=lambda x: x["rrf_score"], reverse=True)
return scores
async def llm_rerank(self, query: str, candidates: List[Dict], top_k: int):
"""使用 LLM 进行最终相关性重排序"""
prompt = f"""评估以下记忆片段与用户查询的相关性。
查询:{query}
候选记忆:
{json.dumps([{"id": c["id"], "content": c["content"]} for c in candidates], indent=2)}
请按相关性从高到低排序,返回前{top_k}个的 ID 列表(JSON 数组):"""
response = await self.llm.generate(prompt)
top_ids = json.loads(response)
# 按 LLM 排序返回
id_to_item = {c["id"]: c for c in candidates}
return [id_to_item[id] for id in top_ids if id in id_to_item]
查询重写增强
async def expand_query_for_retrieval(query: str, llm_client) -> List[str]:
"""
将用户查询扩展为多个变体,提高检索召回率
"""
prompt = f"""将以下用户查询重写为 5 个不同的变体,保持相同语义但使用不同词汇:
原查询:{query}
变体(JSON 数组):"""
response = await llm_client.generate(prompt)
variants = json.loads(response)
return [query] + variants # 包含原查询
# 使用示例
async def retrieve_with_query_expansion(query: str, user_id: str, retriever):
"""使用查询扩展进行检索"""
query_variants = await expand_query_for_retrieval(query, retriever.llm)
all_results = []
for variant in query_variants:
results = await retriever.retrieve(variant, user_id, top_k=3)
all_results.extend(results)
# 去重
seen_ids = set()
unique_results = []
for result in all_results:
if result["id"] not in seen_ids:
seen_ids.add(result["id"])
unique_results.append(result)
return unique_results[:10]
技巧六:隐私与记忆清理
问题:GDPR 合规与用户数据权利
用户有权要求:
- 查看你存储的关于他们的所有数据
- 删除特定记忆或全部数据
- 导出他们的数据
实现:记忆生命周期管理
class MemoryGovernance:
def __init__(self, db_client, vector_db):
self.db = db_client
self.vector_db = vector_db
async def get_user_data_export(self, user_id: str) -> Dict:
"""
导出用户的所有数据(GDPR 数据可携带权)
"""
export = {
"user_id": user_id,
"exported_at": datetime.now().isoformat(),
"profile": await self.db.get_user_profile(user_id),
"conversation_history": await self.db.get_user_conversations(user_id),
"semantic_memories": await self.vector_db.query(
namespace=f"user:{user_id}",
vector=[0] * 1536, # 获取所有
top_k=10000
),
"episodic_memories": await self.db.get_user_events(user_id),
"preferences": await self.db.get_user_preferences(user_id)
}
return export
async def delete_user_memory(
self,
user_id: str,
scope: str = "all", # "all", "conversations", "profile", "specific"
memory_ids: List[str] = None
):
"""
删除用户记忆(GDPR 被遗忘权)
"""
if scope == "all":
# 删除所有数据
await self.db.delete_user_data(user_id)
await self.vector_db.delete_namespace(f"user:{user_id}")
# 记录删除操作
await self.log_deletion_event(user_id, "full_deletion")
elif scope == "specific" and memory_ids:
# 删除特定记忆
for memory_id in memory_ids:
await self.db.delete_memory(memory_id)
await self.vector_db.delete(memory_id)
await self.log_deletion_event(user_id, "partial_deletion", memory_ids)
# 清理应用层缓存
await self.invalidate_caches(user_id)
async def set_memory_retention_policy(
self,
user_id: str,
policy: Dict
):
"""
设置用户的记忆保留策略
"""
# 示例策略
policy = {
"conversations": {"retain_days": 90},
"semantic_memories": {"retain_days": 365},
"episodic_memories": {"retain_days": 180},
"preferences": {"retain_days": None}, # 永久保留
"auto_anonymize": True, # 过期后匿名化而非删除
}
await self.db.set_user_policy(user_id, policy)
async def run_retention_cleanup(self):
"""
定期运行保留策略清理(每日 cron)
"""
policies = await self.db.get_all_retention_policies()
for policy in policies:
user_id = policy["user_id"]
cutoff_date = datetime.now() - timedelta(days=policy["retain_days"])
# 删除过期记忆
expired = await self.db.find_expired_memories(user_id, cutoff_date)
for memory in expired:
if policy.get("auto_anonymize"):
await self.anonymize_memory(memory["id"])
else:
await self.delete_memory(memory["id"])
敏感信息检测与过滤
async def detect_and_filter_pii(content: str, llm_client) -> Dict:
"""
检测并过滤个人敏感信息 (PII)
"""
prompt = f"""分析以下文本,识别所有个人敏感信息 (PII):
- 姓名
- 邮箱地址
- 电话号码
- 身份证号
- 银行卡号
- 地址
- 其他可识别个人的信息
文本:{content}
返回 JSON 格式:
{{
"detected_pii": [
{{"type": "email", "value": "...", "start": 0, "end": 10}},
...
],
"sanitized_content": "替换 PII 为 [REDACTED] 后的内容"
}}"""
response = await llm_client.generate(prompt)
return json.loads(response)
async def store_memory_safely(
user_id: str,
content: str,
memory_manager: MemoryManager
):
"""
安全存储记忆,自动过滤 PII
"""
# 检测 PII
pii_result = await detect_and_filter_pii(content, memory_manager.llm)
# 存储时询问用户是否保存 PII
if pii_result["detected_pii"]:
# 选项 1:不存储 PII
await memory_manager.store(
user_id=user_id,
content=pii_result["sanitized_content"],
metadata={"pii_filtered": True}
)
# 选项 2:加密存储 PII(如果需要)
# await memory_manager.store_encrypted_pii(...)
else:
await memory_manager.store(user_id, content)
实战案例:构建完整的记忆增强 RAG 应用
架构总览
┌─────────────────────────────────────────────────────────────┐
│ 用户请求 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 请求处理器 │
│ - 解析用户意图 │
│ - 提取查询关键词 │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 工作记忆层 │ │ 情景记忆层 │ │ 语义记忆层 │
│ (Redis) │ │ (PostgreSQL) │ │ (Pinecone) │
│ 最近对话 │ │ 会话历史 │ │ 用户画像 │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└───────────────┼───────────────┘
▼
┌─────────────────────────────────────────────────────────────┐
│ 混合检索引擎 │
│ - 向量检索 + 关键词检索 │
│ - RRF 重排序 │
│ - LLM 相关性判断 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 上下文构建器 │
│ - 组装检索结果 │
│ - Token 预算 management │
│ - 生成最终 prompt │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ LLM 生成 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 响应后处理 │
│ - 提取新事实更新画像 │
│ - 保存对话到情景记忆 │
│ - 检查是否需要创建检查点 │
└─────────────────────────────────────────────────────────────┘
完整代码示例
from fastapi import FastAPI, Depends
from typing import Optional
app = FastAPI()
# 初始化组件
memory_manager = MemoryManager(
redis_url="redis://localhost:6379",
vector_db_client=PineconeClient(api_key="...")
)
retriever = HybridMemoryRetriever(
vector_db=memory_manager.vector_db,
keyword_index=ElasticsearchClient("http://localhost:9200"),
llm_client=AnthropicClient(api_key="...")
)
governance = MemoryGovernance(
db_client=memory_manager.db,
vector_db=memory_manager.vector_db
)
@app.post("/chat/{user_id}")
async def chat(
user_id: str,
message: str,
session_id: Optional[str] = None
):
# 1. 获取或创建会话
if not session_id:
session_id = await create_new_session(user_id)
working_memory = memory_manager.get_working_memory(session_id)
# 2. 保存用户消息到工作记忆
working_memory.add_turn("user", message)
# 3. 构建上下文感知的 prompt
context = await build_context_aware_prompt(
user_query=message,
working_memory=working_memory,
memory_manager=memory_manager,
user_id=user_id
)
# 4. 调用 LLM 生成响应
response = await llm_client.generate(
prompt=context,
max_tokens=1000
)
# 5. 保存助手响应
working_memory.add_turn("assistant", response)
# 6. 从响应中提取新事实(异步,不阻塞)
asyncio.create_task(
extract_and_store_facts(user_id, message, response, memory_manager)
)
# 7. 检查是否需要创建会话检查点
if should_create_checkpoint(working_memory):
await memory_manager.create_session_checkpoint(session_id, user_id)
return {
"response": response,
"session_id": session_id,
"memory_updated": True
}
@app.get("/user/{user_id}/memory")
async def get_user_memory(user_id: str):
"""获取用户的所有记忆(用于调试或导出)"""
return await governance.get_user_data_export(user_id)
@app.delete("/user/{user_id}/memory")
async def delete_user_memory(
user_id: str,
scope: str = "all",
memory_ids: List[str] = None
):
"""删除用户记忆"""
await governance.delete_user_memory(user_id, scope, memory_ids)
return {"status": "deleted", "scope": scope}
性能优化建议
缓存策略
from functools import lru_cache
import hashlib
class MemoryCache:
def __init__(self, redis_client):
self.redis = redis_client
self.ttl = 300 # 5 分钟
def _cache_key(self, user_id: str, query: str) -> str:
query_hash = hashlib.md5(query.encode()).hexdigest()
return f"memory_cache:{user_id}:{query_hash}"
async def get(self, user_id: str, query: str) -> Optional[List[Dict]]:
key = self._cache_key(user_id, query)
cached = await self.redis.get(key)
return json.loads(cached) if cached else None
async def set(self, user_id: str, query: str, results: List[Dict]):
key = self._cache_key(user_id, query)
await self.redis.setex(
key,
self.ttl,
json.dumps(results)
)
async def invalidate(self, user_id: str):
"""用户数据更新时清除缓存"""
pattern = f"memory_cache:{user_id}:*"
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
批量操作优化
async def batch_store_memories(memories: List[Dict], vector_db):
"""
批量存储记忆,减少 API 调用次数
"""
# 批量生成嵌入
contents = [m["content"] for m in memories]
embeddings = await vector_db.embed_batch(contents, batch_size=100)
# 批量 upsert
await vector_db.upsert_batch(
vectors=embeddings,
metadata=memories,
batch_size=100
)
常见陷阱与解决方案
陷阱 1:记忆污染
问题:错误信息被存储并反复检索,导致 AI 持续输出错误内容。
解决:
- 实现置信度评分,低置信度事实需要多次验证
- 允许用户纠正 AI 的记忆
- 定期审核和清理低质量记忆
async def user_correct_memory(
user_id: str,
memory_id: str,
correction: str,
memory_manager: MemoryManager
):
"""用户纠正错误记忆"""
# 降低原记忆置信度
await memory_manager.decrease_confidence(memory_id, delta=-0.5)
# 存储纠正后的事实(高置信度)
await memory_manager.store_semantic_memory(
user_id=user_id,
fact=correction,
confidence=0.95,
metadata={"corrected_by_user": True}
)
陷阱 2:检索延迟过高
问题:每次查询都进行向量检索,响应时间超过 2 秒。
解决:
- 实现多级缓存(L1 内存缓存 + L2 Redis 缓存)
- 预计算常用查询的结果
- 使用近似最近邻 (ANN) 索引
陷阱 3:上下文爆炸
问题:检索到的记忆太多,超出 LLM 上下文窗口。
解决:
- 严格的 Token 预算管理
- 分层检索(先粗筛再精筛)
- 智能摘要压缩早期内容
总结
构建有”记忆”的 AI 智能体不是一蹴而就的,需要:
- 分层架构:工作记忆、情景记忆、语义记忆各司其职
- 智能管理:重要性评分、摘要压缩、按需检索
- 持续学习:动态更新用户画像,捕捉偏好变化
- 跨会话持久化:会话检查点,任务连续性
- 语义优化:混合检索,查询扩展,LLM 重排序
- 隐私合规:数据导出、删除、PII 过滤
记住:好的记忆系统让用户感觉被理解,而不是被监控。始终给用户控制权,透明化数据存储,这是建立信任的关键。