AI Agent上下文窗口怎么管?老金教你4招避免”失忆”和”超限”

5次阅读
没有评论

一、 开场:一个让我彻夜排查的Bug

大家好,我是老金。

上周,我们团队的一个AI Agent项目上线后出现了一个诡异的问题:

Agent偶尔会”失忆”——忘记之前对话的内容,或者记住了一些不该记住的东西。

我花了整整一晚排查,最后发现问题出在上下文窗口管理上。

说实话,上下文窗口这个问题,很多团队在做AI Agent时都没重视,但它其实是影响Agent表现的关键因素之一。

今天这篇文章,我想系统性地聊聊:如何科学地管理AI Agent的上下文窗口

二、 为什么上下文窗口管理这么重要?

2.1 什么是上下文窗口?

简单说,上下文窗口就是模型能”看到”的最大Token数量。

不同模型的上下文窗口大小:

模型 上下文窗口 实际可用(建议)
GPT-4 Turbo 128K tokens ~100K tokens
Claude-3-Opus 200K tokens ~150K tokens
GPT-3.5-Turbo 16K tokens ~12K tokens
国产模型(通义、文心等) 4K-32K tokens 视具体模型

2.2 不管理上下文的后果

如果不对上下文进行管理,会出现以下问题:

  • Token超限报错:对话太长,超出模型上下文窗口
  • 成本爆炸:每次对话都要发送全部历史,Token消耗巨大
  • 模型”失忆”:重要信息被截断,模型无法参考
  • 模型”混淆”:历史信息太多,模型被无关内容干扰

三、 上下文管理的核心策略

3.1 策略一:滑动窗口

核心思想:只保留最近N轮对话。

class SlidingWindow:
    def __init__(self, max_turns=10):
        self.max_turns = max_turns
        self.history = []
def add_message(self, role, content):
    self.history.append({"role": role, "content": content})

    # 如果超过最大轮数,删除最早的对话
    if len(self.history) > self.max_turns * 2:  # 一轮=一问一答
        self.history = self.history[2:]  # 删除最早的一轮

def get_context(self):
    return self.history.copy()

优点:简单、可控Token数量

缺点:早期重要信息可能丢失

3.2 策略二:摘要压缩

核心思想:用LLM把长对话压缩成摘要。

class SummaryCompressor:
    def __init__(self, summary_threshold=20):
        self.summary_threshold = summary_threshold
        self.history = []
        self.summary = ""
async def add_message(self, role, content):
    self.history.append({"role": role, "content": content})

    # 当对话超过阈值时,生成摘要
    if len(self.history) >= self.summary_threshold:
        await self._generate_summary()

async def _generate_summary(self):
    prompt = f"""请总结以下对话的关键信息:

{self._format_history(self.history)}

要求:

  1. 保留关键决策和结论

  2. 保留用户偏好和重要约束

  3. 控制在200字以内
    """
    self.summary = await llm.call(prompt)

    只保留最近几轮对话

     self.history = self.history[-5:]

    def get_context(self):
    context = []
    if self.summary:
    context.append({
    "role": "system",
    "content": f"对话历史摘要:{self.summary}"
    })
    context.extend(self.history)
    return context

优点:保留关键信息,大幅压缩Token

缺点:需要额外LLM调用,有信息损失

3.3 策略三:向量检索

核心思想:把历史对话存入向量数据库,按需检索。

from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings

class VectorMemory: def init(self): self.embeddings = OpenAIEmbeddings() self.vectorstore = Chroma(embedding_function=self.embeddings) self.recent_history = [] # 保留最近几轮

async def add_message(self, role, content):
    # 存入向量数据库
    self.vectorstore.add_texts(
        texts=[content],
        metadatas=[{"role": role, "timestamp": time.time()}]
    )

    # 同时保留在最近历史中
    self.recent_history.append({"role": role, "content": content})
    if len(self.recent_history) > 10:
        self.recent_history = self.recent_history[-10:]

def get_context(self, query, k=5):
    # 检索相关历史
    relevant_history = self.vectorstore.similarity_search(query, k=k)

    context = []
    # 先添加检索到的相关历史
    for doc in relevant_history:
        context.append({
            "role": doc.metadata["role"],
            "content": doc.page_content
        })

    # 再添加最近的对话
    context.extend(self.recent_history)

    return context

优点:能检索到相关历史,不会丢失重要信息

缺点:实现复杂,需要向量数据库

3.4 策略四:分层管理(推荐)

核心思想:短期记忆 + 长期记忆 + 工作记忆。

class HierarchicalMemory:
    def __init__(self):
        # 工作记忆:当前任务的关键信息
        self.working_memory = {}  # {key: value}
    # 短期记忆:最近几轮对话
    self.short_term = []
    self.short_term_limit = 10  # 10轮对话

    # 长期记忆:向量数据库
    self.long_term = VectorStore()

    # 摘要:压缩的历史
    self.summary = ""

async def add_message(self, role, content):
    # 1. 更新短期记忆
    self.short_term.append({"role": role, "content": content})

    # 2. 如果超过限制,压缩到长期记忆
    if len(self.short_term) > self.short_term_limit:
        old_messages = self.short_term[:-self.short_term_limit]
        await self._archive_to_long_term(old_messages)
        self.short_term = self.short_term[-self.short_term_limit:]

    # 3. 提取关键信息到工作记忆
    await self._extract_working_memory(content)

async def get_context(self, query):
    context = []

    # 1. 添加工作记忆
    if self.working_memory:
        working_context = "当前任务关键信息:n"
        for key, value in self.working_memory.items():
            working_context += f"- {key}: {value}n"
        context.append({"role": "system", "content": working_context})

    # 2. 从长期记忆检索相关内容
    relevant = await self.long_term.search(query, k=3)
    if relevant:
        context.append({
            "role": "system", 
            "content": f"相关历史:{self._format(relevant)}"
        })

    # 3. 添加短期记忆
    context.extend(self.short_term)

    return context

四、 Token计算与监控

4.1 如何计算Token?

不同模型使用不同的Tokenizer:

import tiktoken

def count_tokens(text, model="gpt-4"): """计算文本的Token数量""" try: encoding = tiktoken.encoding_for_model(model) except KeyError:

如果模型不支持,使用cl100k_base(GPT-4的编码)

    encoding = tiktoken.get_encoding("cl100k_base")

return len(encoding.encode(text))

def count_messages_tokens(messages, model="gpt-4"):
"""计算消息列表的Token数量"""
total = 0
for message in messages:
total += count_tokens(message.get("content", ""), model)

每条消息有额外的格式开销

    total += 4  # role + content 的格式开销
total += 2  # 消息列表的开销
return total

4.2 Token监控与告警

class TokenMonitor:
    def __init__(self, warning_threshold=0.8, max_tokens=128000):
        self.warning_threshold = warning_threshold
        self.max_tokens = max_tokens
        self.history = []  # 记录每次调用的Token数
def check(self, messages, model):
    current_tokens = count_messages_tokens(messages, model)
    usage_ratio = current_tokens / self.max_tokens

    # 记录
    self.history.append({
        "timestamp": time.time(),
        "tokens": current_tokens,
        "ratio": usage_ratio
    })

    # 告警
    if usage_ratio > self.warning_threshold:
        logger.warning(
            f"上下文使用率过高: {usage_ratio:.1%} "
            f"({current_tokens}/{self.max_tokens} tokens)"
        )

    return {
        "tokens": current_tokens,
        "ratio": usage_ratio,
        "warning": usage_ratio > self.warning_threshold
    }

五、 实战案例:客服Agent的上下文管理

以我之前做的客服Agent为例:

5.1 需求分析

  • 对话可能持续几十轮
  • 需要记住用户的订单信息、投诉内容
  • 需要引用之前的承诺
  • 成本敏感

5.2 解决方案

class CustomerServiceAgent:
    def __init__(self):
        self.memory = HierarchicalMemory()
        self.token_monitor = TokenMonitor(
            warning_threshold=0.7,
            max_tokens=128000
        )
async def process(self, user_input):
    # 1. 构建上下文
    context = await self.memory.get_context(user_input)

    # 2. Token检查
    token_info = self.token_monitor.check(context, "gpt-4")
    if token_info["warning"]:
        # 触发压缩策略
        await self.memory.compress()
        context = await self.memory.get_context(user_input)

    # 3. 调用LLM
    response = await self.llm.call(
        messages=context + [{"role": "user", "content": user_input}]
    )

    # 4. 更新记忆
    await self.memory.add_message("user", user_input)
    await self.memory.add_message("assistant", response)

    # 5. 提取关键信息(如订单号、投诉内容)
    await self._extract_key_info(user_input, response)

    return response

async def _extract_key_info(self, user_input, response):
    """提取关键信息到工作记忆"""
    # 使用LLM提取
    prompt = f"""从以下对话中提取关键信息:

用户:{user_input}
客服:{response}

请提取:订单号、投诉内容、用户诉求、已承诺的解决方案等。
以JSON格式返回。
"""
key_info = await self.llm.call(prompt)
self.memory.working_memory.update(json.loads(key_info))

5.3 效果评估

指标 优化前 优化后
平均Token消耗 15,000/对话 5,000/对话
Token超限错误率 8% 0%
关键信息保留率 65% 95%
用户满意度 3.8/5 4.5/5

六、 写在最后

上下文窗口管理,是AI Agent工程化中容易被忽视但至关重要的一环。

核心原则:

  1. 不要无限堆历史:Token是有限资源,要用在刀刃上
  2. 分层管理:短期记忆+长期记忆+工作记忆
  3. 监控先行:建立Token监控和告警机制
  4. 动态调整:根据Token使用情况自动压缩

如果你也在做AI Agent系统,欢迎在评论区分享你的上下文管理经验。

我是技术老金,我们下期见!


📌 往期精彩回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-29发表,共计6127字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)