AI应用开发进阶(二):RAG优化从60分到90分的实战技巧

6次阅读
没有评论

AI应用开发进阶(二):RAG优化从60分到90分

一、开场:为什么你的RAG效果差

大家好,我是老金。

很多人做RAG,效果只有60分。

用户问:”这个产品的退换货政策是什么?”
AI答:”抱歉,我不知道。”

问题出在哪?

  • 文档没切好
  • 检索不精准
  • 上下文不完整

今天聊聊RAG优化的实战技巧。

二、RAG基础回顾

2.1 标准RAG流程

┌─────────────────────────────────────────────────────────┐
│                  RAG基本流程                            │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Indexing(索引)                               │   │
│  │  文档 → 解析 → 分块 → Embedding → 向量存储     │   │
│  └─────────────────────────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Retrieval(检索)                               │   │
│  │  Query → Embedding → 向量相似度 → Top-K        │   │
│  └─────────────────────────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Generation(生成)                              │   │
│  │  Query + Context → LLM → Response              │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

2.2 常见问题

问题 表现 原因
答非所问 检索到了,但回答不相关 Chunk质量差
不知道 检索不到相关内容 检索策略差
幻觉 答了文档里没有的 上下文利用差
不完整 只回答了部分 重排序或截断

三、索引优化

3.1 文档解析

from llama_index import Document
import pdfplumber
import docx

def parse_document(file_path: str) -> List[Document]:
    """解析多种格式文档"""
    docs = []

    if file_path.endswith('.pdf'):
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text = page.extract_text()
                if text:
                    docs.append(Document(
                        text=text,
                        metadata={"source": file_path, "page": page.page_number}
                    ))

    elif file_path.endswith('.docx'):
        doc = docx.Document(file_path)
        for para in doc.paragraphs:
            if para.text.strip():
                docs.append(Document(
                    text=para.text,
                    metadata={"source": file_path, "type": "paragraph"}
                ))

    elif file_path.endswith('.md'):
        with open(file_path, 'r') as f:
            content = f.read()
            # 按标题分割
            sections = split_by_headers(content)
            for section in sections:
                docs.append(Document(
                    text=section['content'],
                    metadata={"source": file_path, "header": section['header']}
                ))

    return docs

def split_by_headers(content: str) -> List[dict]:
    """按标题分割Markdown"""
    import re

    # 匹配各种标题格式
    header_pattern = r'^(#{1,6})s+(.+)$'
    lines = content.split('n')

    sections = []
    current_header = "Introduction"
    current_content = []

    for line in lines:
        match = re.match(header_pattern, line)
        if match:
            if current_content:
                sections.append({
                    "header": current_header,
                    "content": 'n'.join(current_content)
                })
            current_header = match.group(2)
            current_content = []
        else:
            current_content.append(line)

    if current_content:
        sections.append({
            "header": current_header,
            "content": 'n'.join(current_content)
        })

    return sections

3.2 智能分块

from llama_index.node_parser import (
    SimpleNodeParser,
    SemanticSplitterNodeParser
)
from llama_index.embeddings import OpenAIEmbedding

# 方案1:基于语义的分块(推荐)
semantic_parser = SemanticSplitterNodeParser(
    embed_model=OpenAIEmbedding(),
    buffer_size=1,  # 句子窗口
    breakpoint_percentile_threshold=95,  # 分割阈值
    # 分割原则:句子间的语义相似度低于阈值时分割
)

nodes = semantic_parser.get_nodes_from_documents(documents)

# 方案2:层级分块(适合长文档)
from llama_index.node_parser import HierarchicalNodeParser

hierarchical_parser = HierarchicalNodeParser(
    chunk_sizes=[2048, 512, 128],  # 大块→中块→小块
    chunk_overlap=100
)

nodes = hierarchical_parser.get_nodes_from_documents(documents)

# 方案3:自定义分块
def custom_chunking(text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """自定义分块策略"""
    sentences = text.split('。')  # 按句子分割

    chunks = []
    current = ""

    for sentence in sentences:
        if len(current) + len(sentence)  dict:
    """丰富文档元数据"""
    metadata = document.metadata.copy()

    # 添加文档级别信息
    metadata["indexed_at"] = datetime.now().isoformat()
    metadata["word_count"] = len(document.text)

    # 添加内容级别信息
    if "header" in metadata:
        metadata["section"] = metadata["header"]

    # 关键词提取
    keywords = extract_keywords(document.text)
    metadata["keywords"] = keywords

    # 实体识别
    entities = extract_entities(document.text)
    metadata["entities"] = entities

    # 问题生成(用于问答)
    questions = generate_questions(document.text)
    metadata["related_questions"] = questions

    return metadata

def extract_keywords(text: str) -> List[str]:
    """提取关键词"""
    # 简单实现:高频词
    words = text.split()
    # 过滤停用词
    stopwords = {'的', '是', '在', '了', '和', '与', '或', '等'}
    filtered = [w for w in words if w not in stopwords and len(w) > 1]

    from collections import Counter
    return [w for w, _ in Counter(filtered).most_common(5)]

def extract_entities(text: str) -> List[dict]:
    """提取实体(简化版)"""
    # 实际应该用NER模型
    entities = []
    import re

    # 提取日期
    dates = re.findall(r'd{4}年d{1,2}月d{1,2}日', text)
    entities.extend([{"type": "date", "value": d} for d in dates])

    # 提取金额
    amounts = re.findall(r'd+元', text)
    entities.extend([{"type": "amount", "value": a} for a in amounts])

    return entities

四、检索优化

4.1 混合检索

from llama_index.retrievers import (
    VectorIndexRetriever,
    KeywordTableRetriever
)
from llama_index.retrievers import QueryFusionRetriever

# 方案1:混合检索
vector_retriever = VectorIndexRetriever(
    index=index,
    similarity_top_k=5
)

keyword_retriever = KeywordTableRetriever(
    index=index,
    similarity_top_k=5
)

# 融合检索
fusion_retriever = QueryFusionRetriever(
    retrievers=[vector_retriever, keyword_retriever],
    mode="reciprocal_rerank",  # 重排序融合
    top_k=5
)

# 方案2:多路召回 + 重排
from llama_index.retrievers import BaseRetriever

class MultiRetriever(BaseRetriever):
    """多路召回"""

    def __init__(self, retrievers: List[BaseRetriever]):
        self.retrievers = retrievers

    def _retrieve(self, query_bundle) -> List[Node]:
        all_nodes = []

        for retriever in self.retrievers:
            nodes = retriever.retrieve(query_bundle)
            all_nodes.extend(nodes)

        # 重排
        reranked = self._rerank(all_nodes, query_bundle)
        return reranked[:10]  # 返回Top10

    def _rerank(self, nodes, query_bundle, top_k=10):
        # 使用LLM重排
        # 简化:按相似度分数排序去重
        seen = set()
        unique_nodes = []
        for node in sorted(nodes, key=lambda x: x.score, reverse=True):
            if node.node_id not in seen:
                seen.add(node.node_id)
                unique_nodes.append(node)
        return unique_nodes[:top_k]

4.2 查询改写

# 查询改写:让检索更精准
class QueryRewriter:
    """查询改写器"""

    def __init__(self, llm):
        self.llm = llm

    async def rewrite(self, query: str) -> str:
        """改写查询"""
        prompt = f"""
原始查询:{query}

请改写这个查询,使其更适合检索。

要求:
1. 提取核心实体
2. 补充同义词
3. 简化复杂查询
4. 保持原意

只输出改写后的查询,不要解释。
"""
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        return response.strip()

    async def expand(self, query: str) -> List[str]:
        """查询扩展"""
        prompt = f"""
查询:{query}

请生成3-5个相关的搜索查询,用于多路检索。

要求:
1. 同义词扩展
2. 上下位扩展
3. 问题形式扩展

格式:
1. [查询1]
2. [查询2]
...
"""
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        queries = [q.strip() for q in response.split('n') if q.strip()]
        return queries[:5]

    async def decompose(self, query: str) -> List[str]:
        """查询分解(复杂问题拆成简单问题)"""
        prompt = f"""
复杂查询:{query}

如果这是一个复杂查询,请拆分成多个简单查询。

格式:
子查询1: [内容]
子查询2: [内容]
...
"""
        response = await self.llm.chat([{"role": "user", "content": prompt}])
        # 解析响应...
        return []

4.3 向量模型选择

# 向量模型选择
EMBEDDING_MODELS = {
    # OpenAI
    "text-embedding-ada-002": {
        "dims": 1536,
        "cost": "$0.0001/1K tokens",
        "quality": "⭐⭐⭐",
        "speed": "⭐⭐⭐⭐⭐"
    },
    "text-embedding-3-small": {
        "dims": 1536,
        "cost": "$0.00002/1K tokens",
        "quality": "⭐⭐⭐",
        "speed": "⭐⭐⭐⭐⭐"
    },
    "text-embedding-3-large": {
        "dims": 3072,
        "cost": "$0.00013/1K tokens",
        "quality": "⭐⭐⭐⭐⭐",
        "speed": "⭐⭐⭐⭐"
    },

    # 开源
    "bge-large-zh": {
        "dims": 1024,
        "cost": "免费",
        "quality": "⭐⭐⭐⭐",
        "speed": "⭐⭐⭐"
    },
    "m3e-large": {
        "dims": 1024,
        "cost": "免费",
        "quality": "⭐⭐⭐⭐",
        "speed": "⭐⭐⭐⭐"
    }
}

# 选择建议
RECOMMENDATION = """
中文场景:
- 生产环境:text-embedding-3-large(效果好)
- 预算有限:bge-large-zh(开源可用)
- 极速要求:text-embedding-3-small(便宜快速)

英文场景:
- 高质量:text-embedding-3-large
- 平衡选择:text-embedding-3-small
"""

五、生成优化

5.1 上下文组织

# 上下文组织:让LLM更好理解检索结果
def organize_context(nodes: List[Node], query: str) -> str:
    """组织检索结果"""
    if not nodes:
        return "没有找到相关信息。"

    context_parts = []

    for i, node in enumerate(nodes, 1):
        # 添加来源信息
        source = node.metadata.get('source', '未知来源')
        header = node.metadata.get('header', '')

        part = f"""
--- 参考文档 {i} ---
来源:{source}
{f"标题:{header}" if header else ""}

内容:
{node.text.strip()}
"""
        context_parts.append(part)

    return 'n'.join(context_parts)

# Prompt优化
IMPROVED_PROMPT = """
# 角色
你是一个专业的客服助手。基于提供的参考文档回答用户问题。

# 规则
1. 只使用参考文档中的信息
2. 如果信息不足,说"我没有找到相关信息"
3. 引用时说明来源
4. 回答要准确、友好、有帮助

# 参考文档
{context}

# 用户问题
{query}

# 回答
"""

5.2 后处理过滤

# 生成后处理
def post_process_response(response: str, context: str) -> str:
    """后处理回答"""
    # 1. 检查幻觉:回答中的事实是否在context中
    claims = extract_claims(response)
    verified_claims = []
    hallucinated_claims = []

    for claim in claims:
        if claim in context or any(fact in claim for fact in context.split()):
            verified_claims.append(claim)
        else:
            hallucinated_claims.append(claim)

    # 如果有幻觉,降低置信度
    if hallucinated_claims:
        response += f"nn⚠️ 注意:部分内容可能不准确。"

    # 2. 添加引用
    if verified_claims:
        response += f"nn📚 参考了 {len(set(source for source in [n.metadata.get('source') for n in nodes]))} 个文档"

    return response

def extract_claims(text: str) -> List[str]:
    """提取陈述"""
    # 简化:按句子分割
    import re
    sentences = re.split(r'[。!?n]', text)
    return [s.strip() for s in sentences if len(s.strip()) > 10]

5.3 流式输出

# 流式生成
async def stream_generate(query: str, context: str):
    """流式生成回答"""
    prompt = IMPROVED_PROMPT.format(query=query, context=context)

    stream = await openai.ChatCompletion.acreate(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

六、高级技巧

6.1 RAG Fusion

# RAG Fusion:多查询检索融合
async def rag_fusion(query: str, top_k: int = 10) -> List[Node]:
    """RAG Fusion"""
    # 1. 生成多个查询
    rewritter = QueryRewriter(llm)
    queries = await rewritter.expand(query)
    queries.append(query)  # 包含原查询

    # 2. 多路检索
    all_nodes = []
    for q in queries:
        nodes = await index.as_retriever().aretrieve(q)
        all_nodes.extend(nodes)

    # 3. RRF融合排序
    # RRF = 1 / (rank + 60), rank从0开始
    scores = {}
    for node in all_nodes:
        node_id = node.node_id
        if node_id not in scores:
            scores[node_id] = {"node": node, "score": 0}
        # 多路召回中的排名
        rank = [n.node_id for n in all_nodes].index(node_id)
        scores[node_id]["score"] += 1 / (rank + 60)

    # 4. 排序输出
    reranked = sorted(scores.values(), key=lambda x: x["score"], reverse=True)
    return [item["node"] for item in reranked[:top_k]]

6.2 Self-RAG

# Self-RAG:让模型判断是否需要检索
SELF_RAG_PROMPT = """
你是一个RAG系统助手。

对于每个问题,你需要判断:
1. 是否需要检索外部信息?
2. 如果检索了,检索结果是否有用?
3. 如何结合检索结果回答?

格式:
是否检索:[是/否]
理由:[简短理由]
检索结果是否有用:[是/否]
回答:[你的回答]
"""

七、评估与优化

7.1 RAG评估指标

# RAG评估指标
RAG_METRICS = {
    "上下文相关性": {
        "description": "检索到的文档与问题的相关程度",
        "评估方法": "人工打分 / LLM评估",
        "目标": " > 0.8"
    },
    "答案忠诚度": {
        "description": "回答是否忠实于检索到的文档",
        "评估方法": "幻觉检测",
        "目标": " > 0.9"
    },
    "答案相关性": {
        "description": "回答是否真正回答了问题",
        "评估方法": "人工打分 / LLM评估",
        "目标": " > 0.85"
    },
    "检索召回率": {
        "description": "相关文档被检索到的比例",
        "评估方法": "Ground Truth对比",
        "目标": " > 0.7"
    }
}

async def evaluate_rag_system(
    test_questions: List[dict],
    rag_system
) -> Dict:
    """评估RAG系统"""
    results = []

    for q in test_questions:
        # 检索
        nodes = await rag_system.retrieve(q["query"])

        # 生成
        response = await rag_system.generate(q["query"], nodes)

        # 评估
        evaluation = await evaluate_single(
            question=q["query"],
            response=response,
            ground_truth=q.get("answer"),
            context=[n.text for n in nodes]
        )

        results.append(evaluation)

    # 汇总
    summary = {
        "context_relevance": sum(r.context_relevance for r in results) / len(results),
        "faithfulness": sum(r.faithfulness for r in results) / len(results),
        "answer_relevance": sum(r.answer_relevance for r in results) / len(results),
        "recall": sum(r.recall for r in results) / len(results)
    }

    return summary

八、总结

优化要点

  1. 索引优化:智能分块 + 元数据丰富
  2. 检索优化:混合检索 + 查询改写
  3. 生成优化:上下文组织 + 后处理

优化路径

Level 1(60分):基础RAG
- 简单分块
- 单一检索
- 直接生成

Level 2(75分):
- 语义分块
- 混合检索
- 结构化Prompt

Level 3(85分):
- 查询改写
- RRF重排
- 后处理过滤

Level 4(90分+):
- Self-RAG
- RAG Fusion
- 微调Embedding

相关阅读

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-04-03发表,共计10238字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)