2026年3月23日 13 分钟阅读

AI 智能体内存管理实战:如何让 RAG 应用记住用户对话上下文的 6 个核心技巧

tinyash 0 条评论

构建有”记忆”的 AI 应用:从会话状态管理到长期记忆存储的完整实战指南

引言:为什么你的 AI 智能体总是”失忆”?

想象这个场景:用户在你的 AI 客服应用中询问了产品价格,五分钟后又问”刚才说的那个产品有优惠吗?“——如果你的应用回答”请问您指的是哪个产品?”,用户体验瞬间崩塌。

这就是 AI 智能体”失忆症”的典型表现。在构建 RAG(检索增强生成)应用时,内存管理是决定用户体验的关键因素,却往往被开发者忽视。

根据 Anthropic 最新发布的 AI 智能体开发报告,73% 的生产级 AI 应用失败源于糟糕的上下文管理。用户期望 AI 能够记住对话历史、偏好设置、之前的决策——就像人类对话一样自然流畅。

本文将深入探讨 AI 智能体内存管理的 6 个核心技巧,帮助你构建真正”有记忆”的 AI 应用。无论你是使用 LangChain、LlamaIndex 还是自定义 RAG 架构,这些实战经验都能直接应用。

技巧一:分层记忆架构设计

为什么需要分层?

人类的记忆不是单一存储桶。我们有:

  • 工作记忆:当前对话的短期上下文
  • 情景记忆:特定会话中的事件和对话
  • 语义记忆:长期积累的知识和事实
  • 程序记忆:学会的技能和操作流程

AI 智能体同样需要这种分层架构。将所有信息塞进一个向量数据库是新手最常见的错误。

三层记忆模型实战

# 记忆分层架构示例
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime, timedelta

@dataclass
class WorkingMemory:
    """工作记忆:当前对话轮次,保存在内存中"""
    conversation_turns: List[Dict]  # 最近 10-20 轮对话
    current_intent: str  # 当前用户意图
    temporary_variables: Dict  # 临时变量(如表单填写进度)
    max_turns: int = 20
    
    def add_turn(self, role: str, content: str):
        self.conversation_turns.append({
            "role": role,
            "content": content,
            "timestamp": datetime.now().isoformat()
        })
        # 保持滑动窗口
        if len(self.conversation_turns) > self.max_turns:
            self.conversation_turns.pop(0)

@dataclass
class EpisodicMemory:
    """情景记忆:会话级别的事件,存储在 Redis 或类似数据库中"""
    session_id: str
    user_id: str
    events: List[Dict]
    created_at: datetime
    last_accessed: datetime
    
    def add_event(self, event_type: str, data: Dict, importance: float = 0.5):
        self.events.append({
            "type": event_type,
            "data": data,
            "importance": importance,
            "timestamp": datetime.now().isoformat()
        })

@dataclass
class SemanticMemory:
    """语义记忆:长期知识和用户画像,存储在向量数据库中"""
    user_id: str
    facts: List[Dict]  # 用户相关事实
    preferences: Dict  # 用户偏好
    knowledge_embeddings: List[float]  # 向量嵌入
    
    def add_fact(self, fact: str, confidence: float = 0.8):
        self.facts.append({
            "content": fact,
            "confidence": confidence,
            "created_at": datetime.now().isoformat()
        })

各层存储策略

记忆类型存储介质保留时间访问延迟典型用例
工作记忆应用内存当前会话<1ms对话上下文、临时变量
情景记忆Redis/内存数据库7-30 天<10ms会话历史、用户行为轨迹
语义记忆向量数据库 (Pinecone/Weaviate)永久50-200ms用户画像、长期偏好

实战代码:记忆管理器实现

import redis
import json
from typing import Optional

class MemoryManager:
    def __init__(self, redis_url: str, vector_db_client):
        self.redis = redis.from_url(redis_url)
        self.vector_db = vector_db_client
        self.working_memory: Dict[str, WorkingMemory] = {}
    
    def get_working_memory(self, session_id: str) -> WorkingMemory:
        """获取或创建工作记忆"""
        if session_id not in self.working_memory:
            self.working_memory[session_id] = WorkingMemory()
        return self.working_memory[session_id]
    
    async def save episodic_memory(self, session_id: str, user_id: str, event: Dict):
        """保存情景记忆到 Redis"""
        key = f"episodic:{user_id}:{session_id}"
        self.redis.lpush(key, json.dumps(event))
        self.redis.expire(key, 7 * 24 * 3600)  # 7 天过期
    
    async def save_semantic_memory(self, user_id: str, fact: str, embedding: List[float]):
        """保存语义记忆到向量数据库"""
        await self.vector_db.upsert(
            namespace=f"user:{user_id}",
            vectors=[embedding],
            metadata=[{"fact": fact, "created_at": datetime.now().isoformat()}]
        )
    
    async def retrieve_relevant_memories(self, user_id: str, query: str, top_k: int = 5):
        """检索相关记忆(结合情景和语义记忆)"""
        query_embedding = await self.vector_db.embed(query)
        
        # 从向量数据库检索语义记忆
        semantic_results = await self.vector_db.query(
            namespace=f"user:{user_id}",
            vector=query_embedding,
            top_k=top_k
        )
        
        # 从 Redis 检索最近的情景记忆
        episodic_keys = self.redis.keys(f"episodic:{user_id}:*")
        episodic_memories = []
        for key in episodic_keys[:5]:  # 最近 5 个会话
            events = self.redis.lrange(key, 0, 10)
            episodic_memories.extend([json.loads(e) for e in events])
        
        return {
            "semantic": semantic_results,
            "episodic": episodic_memories
        }

技巧二:智能上下文窗口管理

问题:Token 限制与信息丢失

所有 LLM 都有上下文窗口限制。即使是 Claude 的 200K token,在长对话中也会耗尽。关键是如何智能地决定保留什么、丢弃什么、压缩什么

策略一:重要性评分机制

不是所有对话内容都同等重要。实现一个重要性评分系统:

def calculate_message_importance(message: Dict) -> float:
    """
    计算消息的重要性评分 (0-1)
    """
    score = 0.5  # 基础分
    
    # 用户明确表达偏好的消息更重要
    preference_keywords = ["我喜欢", "我不喜欢", "记住", "总是", "从不"]
    if any(kw in message["content"] for kw in preference_keywords):
        score += 0.3
    
    # 包含实体信息(姓名、日期、数字)的消息更重要
    if re.search(r'\d+|[A-Z][a-z]+', message["content"]):
        score += 0.1
    
    # 问题 - 答案对很重要
    if message["role"] == "assistant" and "?" in message.get("previous_user_message", ""):
        score += 0.2
    
    # 最近的消息更重要(时间衰减)
    time_decay = calculate_time_decay(message["timestamp"])
    score *= time_decay
    
    return min(score, 1.0)

def calculate_time_decay(timestamp: str, half_life_hours: int = 24) -> float:
    """计算时间衰减因子"""
    message_time = datetime.fromisoformat(timestamp)
    hours_ago = (datetime.now() - message_time).total_seconds() / 3600
    return 0.5 ** (hours_ago / half_life_hours)

策略二:对话摘要压缩

当上下文接近限制时,自动压缩早期对话:

async def compress_conversation_history(
    conversation: List[Dict],
    llm_client,
    target_token_count: int
) -> List[Dict]:
    """
    压缩对话历史,保留关键信息
    """
    current_tokens = count_tokens(conversation)
    
    if current_tokens <= target_token_count:
        return conversation
    
    # 保留最近 N 轮完整对话
    recent_turns = 5
    preserved = conversation[-recent_turns * 2:]  # 用户 + 助手
    
    # 压缩早期对话为摘要
    early_conversation = conversation[:-recent_turns * 2]
    if early_conversation:
        summary = await llm_client.generate(
            prompt=f"""请将以下对话压缩为 3-5 句话的摘要,保留所有关键事实、用户偏好和已完成的决策:

{format_conversation(early_conversation)}

摘要:""",
            max_tokens=200
        )
        
        # 将摘要作为系统消息插入
        preserved.insert(0, {
            "role": "system",
            "content": f"【对话摘要】{summary}"
        })
    
    return preserved

策略三:分层检索策略

不要将所有历史都放入 prompt,而是按需检索

async def build_context_aware_prompt(
    user_query: str,
    working_memory: WorkingMemory,
    memory_manager: MemoryManager,
    user_id: str,
    max_context_tokens: int = 3000
) -> str:
    """
    构建上下文感知的 prompt,动态检索相关记忆
    """
    # 1. 始终包含最近 5 轮对话
    recent_context = format_conversation(
        working_memory.conversation_turns[-10:]
    )
    
    # 2. 基于当前查询检索相关记忆
    relevant_memories = await memory_manager.retrieve_relevant_memories(
        user_id=user_id,
        query=user_query,
        top_k=5
    )
    
    # 3. 构建最终 prompt
    prompt_parts = []
    
    if relevant_memories["semantic"]:
        facts = "\n".join([m["fact"] for m in relevant_memories["semantic"]])
        prompt_parts.append(f"【用户已知信息】\n{facts}\n")
    
    if relevant_memories["episodic"]:
        recent_events = "\n".join([
            f"- {e['type']}: {e['data']}"
            for e in relevant_memories["episodic"][:5]
        ])
        prompt_parts.append(f"【最近活动】\n{recent_events}\n")
    
    prompt_parts.append(f"【当前对话】\n{recent_context}")
    prompt_parts.append(f"【用户问题】\n{user_query}")
    
    final_prompt = "\n\n".join(prompt_parts)
    
    # 4. 如果超过 token 限制,压缩最早的内容
    if count_tokens(final_prompt) > max_context_tokens:
        final_prompt = await compress_to_token_limit(
            final_prompt,
            max_context_tokens,
            llm_client
        )
    
    return final_prompt

技巧三:用户画像的动态更新

问题:静态画像 vs 动态学习

大多数 RAG 应用的用户画像是静态的——创建后不再更新。但用户会改变:新偏好、新需求、新行为模式。AI 智能体需要持续学习

实现:增量式画像更新

class UserProfileManager:
    def __init__(self, vector_db, redis_client):
        self.vector_db = vector_db
        self.redis = redis_client
    
    async def extract_user_facts(self, conversation_turn: Dict) -> List[Dict]:
        """
        从对话中提取用户相关事实
        """
        extraction_prompt = f"""从以下对话中提取关于用户的事实信息。
只提取明确的、可验证的事实,不要推断。

对话:
{conversation_turn['role']}: {conversation_turn['content']}

提取的事实(JSON 数组格式):"""
        
        response = await self.llm.generate(extraction_prompt)
        facts = json.loads(response)
        
        return [
            {
                "content": fact["statement"],
                "category": fact["category"],  # 如:偏好、人口统计、行为
                "confidence": fact["confidence"],
                "source_message_id": conversation_turn["id"],
                "timestamp": datetime.now().isoformat()
            }
            for fact in facts
        ]
    
    async def update_user_profile(self, user_id: str, new_facts: List[Dict]):
        """
        增量更新用户画像,处理冲突和重复
        """
        # 获取现有画像
        existing_facts = await self.get_user_facts(user_id)
        
        for new_fact in new_facts:
            # 检查是否有冲突或重复
            similar_facts = await self.find_similar_facts(
                user_id,
                new_fact["content"]
            )
            
            if similar_facts:
                # 更新现有事实(增加置信度或更新时间)
                await self.update_existing_fact(
                    user_id,
                    similar_facts[0]["id"],
                    new_fact
                )
            else:
                # 添加新事实
                await self.add_new_fact(user_id, new_fact)
        
        # 定期清理低置信度的旧事实
        await self.prune_low_confidence_facts(user_id)
    
    async def find_similar_facts(self, user_id: str, fact: str, threshold: float = 0.85):
        """使用向量相似度查找相似事实"""
        embedding = await self.vector_db.embed(fact)
        results = await self.vector_db.query(
            namespace=f"user_profile:{user_id}",
            vector=embedding,
            top_k=5,
            filter={"confidence": {"$gte": 0.5}}
        )
        
        return [r for r in results if r["score"] >= threshold]

实战:偏好学习系统

async def learn_user_preferences(
    user_id: str,
    interaction: Dict,
    profile_manager: UserProfileManager
):
    """
    从用户交互中学习偏好
    """
    # 检测偏好信号
    preference_signals = []
    
    # 显式偏好(用户直接说)
    if "我喜欢" in interaction["content"] or "我不喜欢" in interaction["content"]:
        preference_signals.append({
            "type": "explicit",
            "content": interaction["content"],
            "confidence": 0.9
        })
    
    # 隐式偏好(行为推断)
    if interaction["type"] == "selection":
        preference_signals.append({
            "type": "implicit",
            "content": f"用户选择了选项{interaction['selected_option']}",
            "confidence": 0.6
        })
    
    if interaction["type"] == "rejection":
        preference_signals.append({
            "type": "implicit",
            "content": f"用户拒绝了选项{interaction['rejected_option']}",
            "confidence": 0.7
        })
    
    # 更新画像
    for signal in preference_signals:
        await profile_manager.update_user_profile(user_id, [signal])
    
    # 记录学习事件
    await profile_manager.log_learning_event(
        user_id=user_id,
        event_type="preference_learned",
        data=preference_signals
    )

技巧四:跨会话记忆持久化

问题:会话中断与记忆丢失

用户可能今天开始一个任务,明天继续。如果每次新会话都从零开始,体验极差。需要跨会话记忆持久化

方案:会话链接与记忆迁移

class CrossSessionMemory:
    def __init__(self, db_client, vector_db):
        self.db = db_client
        self.vector_db = vector_db
    
    async def create_session_checkpoint(self, session_id: str, user_id: str):
        """
        创建会话检查点,保存关键状态
        """
        # 提取会话关键信息
        session_data = await self.db.get_session(session_id)
        
        checkpoint = {
            "user_id": user_id,
            "session_id": session_id,
            "created_at": datetime.now().isoformat(),
            "summary": await self.summarize_session(session_data),
            "pending_tasks": self.extract_pending_tasks(session_data),
            "user_context": self.extract_user_context(session_data),
            "important_facts": await self.extract_important_facts(session_data)
        }
        
        # 保存到数据库
        await self.db.insert("session_checkpoints", checkpoint)
        
        # 同时保存到向量数据库以便检索
        summary_embedding = await self.vector_db.embed(checkpoint["summary"])
        await self.vector_db.upsert(
            namespace=f"checkpoints:{user_id}",
            vectors=[summary_embedding],
            metadata=[checkpoint]
        )
        
        return checkpoint["id"]
    
    async def restore_session_context(self, user_id: str, new_session_id: str):
        """
        为新会话恢复相关上下文
        """
        # 检索用户最近的检查点
        recent_checkpoints = await self.db.query(
            "session_checkpoints",
            filter={"user_id": user_id},
            order_by="created_at DESC",
            limit=5
        )
        
        # 检索相关的语义记忆
        # (基于新会话的第一条消息)
        first_message = await self.db.get_first_message(new_session_id)
        relevant_checkpoints = await self.vector_db.query(
            namespace=f"checkpoints:{user_id}",
            vector=await self.vector_db.embed(first_message["content"]),
            top_k=3
        )
        
        # 合并上下文
        restored_context = {
            "recent_sessions": recent_checkpoints,
            "relevant_sessions": relevant_checkpoints,
            "user_profile": await self.get_user_profile(user_id)
        }
        
        return restored_context
    
    async def summarize_session(self, session_data: Dict) -> str:
        """使用 LLM 生成会话摘要"""
        prompt = f"""总结以下会话的关键信息:
- 用户完成了什么任务
- 遗留了什么待办事项
- 揭示了什么用户偏好
- 任何重要的事实或决策

会话记录:
{format_session(session_data)}

摘要(200 字以内):"""
        
        return await self.llm.generate(prompt)

实战:任务连续性管理

async def handle_session_resumption(
    user_id: str,
    new_session_id: str,
    memory_manager: MemoryManager
) -> Dict:
    """
    处理用户返回时的会话恢复
    """
    # 1. 查找用户的未完成的任务
    pending_tasks = await memory_manager.db.query(
        "user_tasks",
        filter={
            "user_id": user_id,
            "status": "in_progress"
        }
    )
    
    # 2. 检索最近的相关会话
    last_sessions = await memory_manager.db.query(
        "session_checkpoints",
        filter={"user_id": user_id},
        order_by="created_at DESC",
        limit=3
    )
    
    # 3. 生成欢迎消息,提及继续点
    if pending_tasks:
        task_summary = ", ".join([t["name"] for t in pending_tasks[:3]])
        welcome_message = f"欢迎回来!我看到你之前正在处理:{task_summary}。需要继续吗?"
    elif last_sessions:
        last_topic = last_sessions[0]["summary"][:100]
        welcome_message = f"欢迎回来!上次我们讨论了:{last_topic}..."
    else:
        welcome_message = "欢迎!有什么我可以帮助你的?"
    
    return {
        "welcome_message": welcome_message,
        "pending_tasks": pending_tasks,
        "recent_context": last_sessions
    }

技巧五:记忆检索的语义优化

问题:关键词匹配 vs 语义理解

传统的关键词检索会错过语义相关但不含相同词汇的记忆。例如:

  • 用户问:“我之前问过关于定价的问题吗?”
  • 实际历史:“这个产品多少钱?”

关键词检索可能失败,但语义检索应该匹配。

方案:混合检索策略

class HybridMemoryRetriever:
    def __init__(self, vector_db, keyword_index, llm_client):
        self.vector_db = vector_db
        self.keyword_index = keyword_index  # Elasticsearch 或类似
        self.llm = llm_client
    
    async def retrieve(self, query: str, user_id: str, top_k: int = 10):
        """
        混合检索:结合语义和关键词
        """
        # 1. 语义检索(向量搜索)
        query_embedding = await self.vector_db.embed(query)
        semantic_results = await self.vector_db.query(
            namespace=f"user:{user_id}",
            vector=query_embedding,
            top_k=top_k
        )
        
        # 2. 关键词检索
        keyword_results = await self.keyword_index.search(
            index=f"user_memories_{user_id}",
            query=query,
            size=top_k
        )
        
        # 3. 合并结果(去重 + 重新排序)
        merged = self.merge_and_rerank(
            semantic_results,
            keyword_results,
            query
        )
        
        # 4. 使用 LLM 进行最终相关性判断(可选,用于高价值场景)
        if len(merged) > 5:
            merged = await self.llm_rerank(query, merged, top_k=5)
        
        return merged
    
    def merge_and_rerank(
        self,
        semantic: List[Dict],
        keyword: List[Dict],
        query: str
    ) -> List[Dict]:
        """
        合并两种检索结果,使用 Reciprocal Rank Fusion
        """
        # 创建排名映射
        semantic_ranks = {item["id"]: idx + 1 for idx, item in enumerate(semantic)}
        keyword_ranks = {item["id"]: idx + 1 for idx, item in enumerate(keyword)}
        
        # 所有唯一 ID
        all_ids = set(semantic_ranks.keys()) | set(keyword_ranks.keys())
        
        # 计算 RRF 分数
        k = 60  # RRF 常数
        scores = []
        for item_id in all_ids:
            rank_s = semantic_ranks.get(item_id, float('inf'))
            rank_k = keyword_ranks.get(item_id, float('inf'))
            
            rrf_score = (1 / (k + rank_s)) + (1 / (k + rank_k))
            
            # 获取原始 item
            item = next(
                (i for i in semantic if i["id"] == item_id),
                next((i for i in keyword if i["id"] == item_id), None)
            )
            
            scores.append({
                **item,
                "rrf_score": rrf_score,
                "semantic_rank": rank_s,
                "keyword_rank": rank_k
            })
        
        # 按 RRF 分数排序
        scores.sort(key=lambda x: x["rrf_score"], reverse=True)
        
        return scores
    
    async def llm_rerank(self, query: str, candidates: List[Dict], top_k: int):
        """使用 LLM 进行最终相关性重排序"""
        prompt = f"""评估以下记忆片段与用户查询的相关性。

查询:{query}

候选记忆:
{json.dumps([{"id": c["id"], "content": c["content"]} for c in candidates], indent=2)}

请按相关性从高到低排序,返回前{top_k}个的 ID 列表(JSON 数组):"""
        
        response = await self.llm.generate(prompt)
        top_ids = json.loads(response)
        
        # 按 LLM 排序返回
        id_to_item = {c["id"]: c for c in candidates}
        return [id_to_item[id] for id in top_ids if id in id_to_item]

查询重写增强

async def expand_query_for_retrieval(query: str, llm_client) -> List[str]:
    """
    将用户查询扩展为多个变体,提高检索召回率
    """
    prompt = f"""将以下用户查询重写为 5 个不同的变体,保持相同语义但使用不同词汇:

原查询:{query}

变体(JSON 数组):"""
    
    response = await llm_client.generate(prompt)
    variants = json.loads(response)
    
    return [query] + variants  # 包含原查询

# 使用示例
async def retrieve_with_query_expansion(query: str, user_id: str, retriever):
    """使用查询扩展进行检索"""
    query_variants = await expand_query_for_retrieval(query, retriever.llm)
    
    all_results = []
    for variant in query_variants:
        results = await retriever.retrieve(variant, user_id, top_k=3)
        all_results.extend(results)
    
    # 去重
    seen_ids = set()
    unique_results = []
    for result in all_results:
        if result["id"] not in seen_ids:
            seen_ids.add(result["id"])
            unique_results.append(result)
    
    return unique_results[:10]

技巧六:隐私与记忆清理

问题:GDPR 合规与用户数据权利

用户有权要求:

  • 查看你存储的关于他们的所有数据
  • 删除特定记忆或全部数据
  • 导出他们的数据

实现:记忆生命周期管理

class MemoryGovernance:
    def __init__(self, db_client, vector_db):
        self.db = db_client
        self.vector_db = vector_db
    
    async def get_user_data_export(self, user_id: str) -> Dict:
        """
        导出用户的所有数据(GDPR 数据可携带权)
        """
        export = {
            "user_id": user_id,
            "exported_at": datetime.now().isoformat(),
            "profile": await self.db.get_user_profile(user_id),
            "conversation_history": await self.db.get_user_conversations(user_id),
            "semantic_memories": await self.vector_db.query(
                namespace=f"user:{user_id}",
                vector=[0] * 1536,  # 获取所有
                top_k=10000
            ),
            "episodic_memories": await self.db.get_user_events(user_id),
            "preferences": await self.db.get_user_preferences(user_id)
        }
        
        return export
    
    async def delete_user_memory(
        self,
        user_id: str,
        scope: str = "all",  # "all", "conversations", "profile", "specific"
        memory_ids: List[str] = None
    ):
        """
        删除用户记忆(GDPR 被遗忘权)
        """
        if scope == "all":
            # 删除所有数据
            await self.db.delete_user_data(user_id)
            await self.vector_db.delete_namespace(f"user:{user_id}")
            
            # 记录删除操作
            await self.log_deletion_event(user_id, "full_deletion")
            
        elif scope == "specific" and memory_ids:
            # 删除特定记忆
            for memory_id in memory_ids:
                await self.db.delete_memory(memory_id)
                await self.vector_db.delete(memory_id)
            
            await self.log_deletion_event(user_id, "partial_deletion", memory_ids)
        
        # 清理应用层缓存
        await self.invalidate_caches(user_id)
    
    async def set_memory_retention_policy(
        self,
        user_id: str,
        policy: Dict
    ):
        """
        设置用户的记忆保留策略
        """
        # 示例策略
        policy = {
            "conversations": {"retain_days": 90},
            "semantic_memories": {"retain_days": 365},
            "episodic_memories": {"retain_days": 180},
            "preferences": {"retain_days": None},  # 永久保留
            "auto_anonymize": True,  # 过期后匿名化而非删除
        }
        
        await self.db.set_user_policy(user_id, policy)
    
    async def run_retention_cleanup(self):
        """
        定期运行保留策略清理(每日 cron)
        """
        policies = await self.db.get_all_retention_policies()
        
        for policy in policies:
            user_id = policy["user_id"]
            cutoff_date = datetime.now() - timedelta(days=policy["retain_days"])
            
            # 删除过期记忆
            expired = await self.db.find_expired_memories(user_id, cutoff_date)
            
            for memory in expired:
                if policy.get("auto_anonymize"):
                    await self.anonymize_memory(memory["id"])
                else:
                    await self.delete_memory(memory["id"])

敏感信息检测与过滤

async def detect_and_filter_pii(content: str, llm_client) -> Dict:
    """
    检测并过滤个人敏感信息 (PII)
    """
    prompt = f"""分析以下文本,识别所有个人敏感信息 (PII):
- 姓名
- 邮箱地址
- 电话号码
- 身份证号
- 银行卡号
- 地址
- 其他可识别个人的信息

文本:{content}

返回 JSON 格式:
{{
    "detected_pii": [
        {{"type": "email", "value": "...", "start": 0, "end": 10}},
        ...
    ],
    "sanitized_content": "替换 PII 为 [REDACTED] 后的内容"
}}"""
    
    response = await llm_client.generate(prompt)
    return json.loads(response)

async def store_memory_safely(
    user_id: str,
    content: str,
    memory_manager: MemoryManager
):
    """
    安全存储记忆,自动过滤 PII
    """
    # 检测 PII
    pii_result = await detect_and_filter_pii(content, memory_manager.llm)
    
    # 存储时询问用户是否保存 PII
    if pii_result["detected_pii"]:
        # 选项 1:不存储 PII
        await memory_manager.store(
            user_id=user_id,
            content=pii_result["sanitized_content"],
            metadata={"pii_filtered": True}
        )
        
        # 选项 2:加密存储 PII(如果需要)
        # await memory_manager.store_encrypted_pii(...)
    else:
        await memory_manager.store(user_id, content)

实战案例:构建完整的记忆增强 RAG 应用

架构总览

┌─────────────────────────────────────────────────────────────┐
│                      用户请求                                │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                   请求处理器                                  │
│  - 解析用户意图                                               │
│  - 提取查询关键词                                             │
└─────────────────────────────────────────────────────────────┘
                            │
            ┌───────────────┼───────────────┐
            ▼               ▼               ▼
    ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
    │  工作记忆层   │ │  情景记忆层   │ │  语义记忆层   │
    │  (Redis)     │ │  (PostgreSQL) │ │  (Pinecone)   │
    │  最近对话     │ │  会话历史     │ │  用户画像     │
    └──────────────┘ └──────────────┘ └──────────────┘
            │               │               │
            └───────────────┼───────────────┘
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                  混合检索引擎                                 │
│  - 向量检索 + 关键词检索                                      │
│  - RRF 重排序                                                 │
│  - LLM 相关性判断                                             │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                   上下文构建器                                │
│  - 组装检索结果                                              │
│  - Token 预算 management                                     │
│  - 生成最终 prompt                                           │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                      LLM 生成                                 │
└─────────────────────────────────────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                   响应后处理                                  │
│  - 提取新事实更新画像                                         │
│  - 保存对话到情景记忆                                         │
│  - 检查是否需要创建检查点                                     │
└─────────────────────────────────────────────────────────────┘

完整代码示例

from fastapi import FastAPI, Depends
from typing import Optional

app = FastAPI()

# 初始化组件
memory_manager = MemoryManager(
    redis_url="redis://localhost:6379",
    vector_db_client=PineconeClient(api_key="...")
)
retriever = HybridMemoryRetriever(
    vector_db=memory_manager.vector_db,
    keyword_index=ElasticsearchClient("http://localhost:9200"),
    llm_client=AnthropicClient(api_key="...")
)
governance = MemoryGovernance(
    db_client=memory_manager.db,
    vector_db=memory_manager.vector_db
)

@app.post("/chat/{user_id}")
async def chat(
    user_id: str,
    message: str,
    session_id: Optional[str] = None
):
    # 1. 获取或创建会话
    if not session_id:
        session_id = await create_new_session(user_id)
    
    working_memory = memory_manager.get_working_memory(session_id)
    
    # 2. 保存用户消息到工作记忆
    working_memory.add_turn("user", message)
    
    # 3. 构建上下文感知的 prompt
    context = await build_context_aware_prompt(
        user_query=message,
        working_memory=working_memory,
        memory_manager=memory_manager,
        user_id=user_id
    )
    
    # 4. 调用 LLM 生成响应
    response = await llm_client.generate(
        prompt=context,
        max_tokens=1000
    )
    
    # 5. 保存助手响应
    working_memory.add_turn("assistant", response)
    
    # 6. 从响应中提取新事实(异步,不阻塞)
    asyncio.create_task(
        extract_and_store_facts(user_id, message, response, memory_manager)
    )
    
    # 7. 检查是否需要创建会话检查点
    if should_create_checkpoint(working_memory):
        await memory_manager.create_session_checkpoint(session_id, user_id)
    
    return {
        "response": response,
        "session_id": session_id,
        "memory_updated": True
    }

@app.get("/user/{user_id}/memory")
async def get_user_memory(user_id: str):
    """获取用户的所有记忆(用于调试或导出)"""
    return await governance.get_user_data_export(user_id)

@app.delete("/user/{user_id}/memory")
async def delete_user_memory(
    user_id: str,
    scope: str = "all",
    memory_ids: List[str] = None
):
    """删除用户记忆"""
    await governance.delete_user_memory(user_id, scope, memory_ids)
    return {"status": "deleted", "scope": scope}

性能优化建议

缓存策略

from functools import lru_cache
import hashlib

class MemoryCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 300  # 5 分钟
    
    def _cache_key(self, user_id: str, query: str) -> str:
        query_hash = hashlib.md5(query.encode()).hexdigest()
        return f"memory_cache:{user_id}:{query_hash}"
    
    async def get(self, user_id: str, query: str) -> Optional[List[Dict]]:
        key = self._cache_key(user_id, query)
        cached = await self.redis.get(key)
        return json.loads(cached) if cached else None
    
    async def set(self, user_id: str, query: str, results: List[Dict]):
        key = self._cache_key(user_id, query)
        await self.redis.setex(
            key,
            self.ttl,
            json.dumps(results)
        )
    
    async def invalidate(self, user_id: str):
        """用户数据更新时清除缓存"""
        pattern = f"memory_cache:{user_id}:*"
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

批量操作优化

async def batch_store_memories(memories: List[Dict], vector_db):
    """
    批量存储记忆,减少 API 调用次数
    """
    # 批量生成嵌入
    contents = [m["content"] for m in memories]
    embeddings = await vector_db.embed_batch(contents, batch_size=100)
    
    # 批量 upsert
    await vector_db.upsert_batch(
        vectors=embeddings,
        metadata=memories,
        batch_size=100
    )

常见陷阱与解决方案

陷阱 1:记忆污染

问题:错误信息被存储并反复检索,导致 AI 持续输出错误内容。

解决

  • 实现置信度评分,低置信度事实需要多次验证
  • 允许用户纠正 AI 的记忆
  • 定期审核和清理低质量记忆
async def user_correct_memory(
    user_id: str,
    memory_id: str,
    correction: str,
    memory_manager: MemoryManager
):
    """用户纠正错误记忆"""
    # 降低原记忆置信度
    await memory_manager.decrease_confidence(memory_id, delta=-0.5)
    
    # 存储纠正后的事实(高置信度)
    await memory_manager.store_semantic_memory(
        user_id=user_id,
        fact=correction,
        confidence=0.95,
        metadata={"corrected_by_user": True}
    )

陷阱 2:检索延迟过高

问题:每次查询都进行向量检索,响应时间超过 2 秒。

解决

  • 实现多级缓存(L1 内存缓存 + L2 Redis 缓存)
  • 预计算常用查询的结果
  • 使用近似最近邻 (ANN) 索引

陷阱 3:上下文爆炸

问题:检索到的记忆太多,超出 LLM 上下文窗口。

解决

  • 严格的 Token 预算管理
  • 分层检索(先粗筛再精筛)
  • 智能摘要压缩早期内容

总结

构建有”记忆”的 AI 智能体不是一蹴而就的,需要:

  1. 分层架构:工作记忆、情景记忆、语义记忆各司其职
  2. 智能管理:重要性评分、摘要压缩、按需检索
  3. 持续学习:动态更新用户画像,捕捉偏好变化
  4. 跨会话持久化:会话检查点,任务连续性
  5. 语义优化:混合检索,查询扩展,LLM 重排序
  6. 隐私合规:数据导出、删除、PII 过滤

记住:好的记忆系统让用户感觉被理解,而不是被监控。始终给用户控制权,透明化数据存储,这是建立信任的关键。

参考资源


AI

发表评论

你的邮箱地址不会被公开,带 * 的为必填项。