一种与众不同的 RAG 架构探索

An exploration of a distinctive RAG architecture

Posted by Bryan on May 2, 2024

背景介绍

在设计检索增强生成 (RAG) 架构时,目前常规的技术方案是基于 LangChain + Faiss 的架构。

这样的架构好处很明显,基于 LangChain 框架可以快速接入大模型,并可以充分利用 LangChain 内置的各个环节的处理能力,利用 Faiss 可以支持离线的向量库构建,简单方便。Faiss 原生的框架可能存在一些薄弱之处,之前的 向量数据库 Faiss 的实践与增强探索 也有探索过对应的增强手段。

但是在生产实践中,这个架构依旧会存在一些明显的不便之处:

  1. 基于 LangChain 的框架去应用大模型时,大量依赖 prompt 的构造,比如 Langchain 中实现 RAG 的 MultiQuery Retriever 时,LangChain 就实现一个 prompt 告知大模型需要根据原始 query 生成 3 个类似 query 用于 RAG 的查询,而在实现 Rewrite 机制时,LangChain 中就需要实现 prompt 告知大模型执行 query 的重写用于提升 RAG 查询效果,而 prompt 机制本身是相对脆弱和不可靠的,因此最终的系统存在着明显的脆弱性;
  2. Faiss 向量库使用时,需要将所有需要检索的内容向量化放入特定的向量库中,后续具备查询与特定向量最接近的 topK 的能力,但是如果希望与业务结合,比如获取向量库中符合特定条件筛选后的 topK 的能力就会很困难,比如构建了医院所有病例的向量库,通过特定的查询手段可以查到最接近的病例。但是如果已知查询的内容只涉及产科,希望只从产科的病例查找 topK,Faiss 的向量库就会有心无力,除非重新构建产科的向量库,这样应对变化中业务需求就很困难。

新的 RAG 技术架构

针对上面提到的两大痛点,本文提出一种新的 RAG 技术框架,使用的架构为 Dspy + Mongo Atlas Vector Search

Dspy

Dspy 是一个由斯坦福大学开发的框架,旨在通过编程而非手工提示(prompting)来解决基于大型语言模型(LLM)的应用程序的脆弱性问题。Dspy 可以通过用户提供的少量测试数据生成合适的 prompt,并可以在使用中迭代调优,避免依赖脆弱的 prompt。

Dspy 的最简单的使用流程如下所示:

1、定义签名(Signature):

签名描述了描述了任务以及对应的输入,输出,一个简单示例如下所示:

class BasicQA(dspy.Signature):
    """Answer questions with short factoid answers."""

    question = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 5 words")

2、定义预测器(Predictor):

预测器是一个知道如何使用 LM 来实现签名的模块。重要的是,预测者可以学习使自己的行为适应任务,一个简单的示例如下所示:

# Define the predictor.

generate_answer = dspy.Predict(BasicQA)

# Call the predictor on a particular input.

pred = generate_answer(question=dev_example.question)

# Print the input and the prediction.

print(f"Question: {dev_example.question}")
print(f"Predicted Answer: {pred.answer}")

3、迭代调优

通过定义评估方法以及对应的训练数据, Dspy 可以自动优化调整 prompt,从而实现鲁棒性更强的服务,一个官方示例如下所示:

from dspy.teleprompt import BootstrapFewShot

# 评估模块,判断 dspy 的预测是否正确

def validate_context_and_answer(example, pred, trace=None):
    answer_EM = dspy.evaluate.answer_exact_match(example, pred)
    answer_PM = dspy.evaluate.answer_passage_match(example, pred)
    return answer_EM and answer_PM

# 设置基础的 teleprompter,用于自动优化 prompt

teleprompter = BootstrapFewShot(metric=validate_context_and_answer)

# 使用数据集 trainset 用于训练,并基于前面的评估模块确定是否正确

compiled_rag = teleprompter.compile(RAG(), trainset=trainset)

Mongo Atlas Vector Search 是 MongoDB 官方提供的向量化搜索能力,可以将向量化的数据直接存入 MongoDB 数据库表中的特定字段,之后为此字段创建向量搜索的索引。

之后就可以执行向量化搜索,而 Mongo Atlas Vector Search 提供了 Pre-Filter 的能力,这样就可以在向量化搜索前执行过滤,前面提到的查询完整病例库中执行产科病例的向量化检索就可以在 Pre-Filter 中完成。

案例实践

实践选择一个医学病例 + 医学指南的混合向量数据库进行实践,医学病例包含妇科和产科的病例,为了简化问题,实际选择两种场景:

  1. 病例检索,单独检索特定类型的病例;
  2. 问题回答,结合病例与医学指南的知识去给出合适的答案;

数据库表设计

根据前面的描述,数据库表设计如下所示:

import enum
from typing import Annotated, List, Optional

from pydantic import BaseModel, BeforeValidator, Field

PyObjectId = Annotated[str, BeforeValidator(str)]


class ContentType(str, enum.Enum):
    # 产科病例

    Obstetric_cases = "Obstetric_cases"
    # 妇科病例

    Gynecological_cases = "Gynecological_cases"
    # 医学指南

    Medical_guide = "Medical_guide"


# 数据对应的 MongoDB 数据库表定义


class MedicalModel(BaseModel):
    id: Optional[PyObjectId] = Field(alias="_id", default=None)
    data_model_id: str
    #  原始数据字段

    content_data: str
    # 向量化 embedding 字段

    content_embedding: Optional[List[float]] = None
    # 数据类型,用于区分 产科病例,妇科病例,医学指南

    content_type: ContentType

实际使用 content_type 区分不同的数据类型,方便在对应的数据类型中进行向量化搜索。

数据写入

基于 OpenAI 提供的 embedding 接口,可以执行内容的向量化,并写入数据库表,实现的数据写入的封装如下所示:

from motor.motor_asyncio import AsyncIOMotorDatabase
from openai import OpenAI
from pymongo import UpdateOne
from pymongo.results import BulkWriteResult

# 使用 OpenAI 的接口执行数据的向量化

def get_embedding(
    text: str, embedding_model: str = "text-embedding-3-small"
) -> List[float]:
    response = OpenAI().embeddings.create(
        model=embedding_model,
        input=text,
        encoding_format="float",
    )
    return response.data[0].embedding


# 批量写入数据,内置数据向量化能力

async def batch_upsert_medical_models(
    db: AsyncIOMotorDatabase,
    medical_models: List[MedicalModel],
) -> Optional[BulkWriteResult]:
    operations = []
    for medical_model in medical_models:
        # 生成向量化数据,并写入 content_embedding 字段

        medical_model.content_embedding = get_embedding(medical_model.content_data)
        operations.append(
            UpdateOne(
                {"data_model_id": medical_model.data_model_id},
                {
                    "$set": {
                        **medical_model.model_dump(exclude_unset=True),
                    }
                },
                upsert=True,
            )
        )

    if operations:
        bulk_result = await db["medical_record"].bulk_write(operations)
        return bulk_result
    return None

在上面的封装中,实现 batch_upsert_medical_models() 方法用于生成向量化数据并写入 content_embedding 字段,最终调用 UpdateOne() 方法将数据写入 MongoDB 数据库中。

向量库查询

进行向量化查询时,可以使用 Pymongo 提供的向量化搜索的能力,此时可以根据业务属性与其他的查询条件组合起来。在本次中我们主要结合 content_type 进行联合查询。实现的查询的封装如下所示:


class MedicalModelQueryResponse(MedicalModel):
    # 返回原始数据,并增加向量相似分

    score: float


# 查询向量知识库

async def query_medical_models(
    db: AsyncIOMotorDatabase,
    query: str,
    num_candidates: int = 20,
    limit: int = 10,
    content_types: Optional[List[ContentType]] = None,
) -> List[MedicalModelQueryResponse]:
    # 将查询问题向量化

    query_embedding = get_embedding(query)

    # 执行向量库查询,使用 content_embedding 字段进行向量库查询

    vector_search_query = {
        "index": "vector_index",
        "path": "content_embedding",
        "queryVector": query_embedding,
        "numCandidates": num_candidates,
        "limit": limit,
    }

    # 结合 content_type 进行联合查询

    if content_types:
        vector_search_query.update({"filter": {"content_type": {"$in": content_types}}})

    cursor = db["medical_record"].aggregate(
        [
            {"$vectorSearch": vector_search_query},
            {
                "$project": {
                    "_id": 1,
                    "content_data": 1,
                    "content_type": 1,
                    "data_model_id": 1,
                    "score": {"$meta": "vectorSearchScore"},
                }
            },
        ]
    )

    return [MedicalModelQueryResponse(**r) async for r in cursor]

向量化索引构造

向量化查询需要构造单独的索引,需要在 MongoDB 提供的网页端创建对应的索引,根据上面的需求,生成的 content_embedding 的索引,并补充 content_type 的联合查询的索引,对应的配置如下所示:

index

RAG 流程

接下来就可以使用上面的知识库的数据构造实际的 RAG 服务,需要先定义 RAG 查询对应的输入输出,此时使用 dspy 提供的签名进行描述 (Signature),对应如下所示:

import dspy

# 初始化指定 dspy 对应的大模型,使用的 OpenAI 提供的 GPT3.5

dspy.settings.configure(lm=dspy.OpenAI(model="gpt-3.5-turbo-0125"))


class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""

    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="often between 1 and 100 words")

接下来封装实现 RAG 查询的方法,用于根据用户的查询问题调用 query_medical_models() 查询对应的数据,并使用 dspy 生成对应的响应,封装方法如下所示:

async def rag_query(
    db: AsyncIOMotorDatabase,
    query: str,
    content_types: Optional[List[ContentType]] = None,
):
    rag_chain = dspy.ChainOfThought(GenerateAnswer)

    # 根据问题查询向量库获取相关的文档数据

    contents = await query_medical_models(db, query=query, content_types=content_types)
    content_datas = [content.content_data for content in contents]

    # 调用 dspy 预测器生成对应的回答

    predict = rag_chain(question=query, context=content_datas)
    return predict

实际就可以调用上面的 rag_query() 方法进行预测,使用类似如下所示:

db = get_database()
# 寻找产科病例

await rag_query(db, "寻找所有接受过剖宫产手术的病例", [ContentType.Obstetric_cases])
# 寻找妇科病例

await rag_query(db, "查找所有被诊断为子宫肌瘤的病例", [ContentType.Gynecological_cases])
# 结合病例与指南给出综合的回答

await rag_query(db, "为一位高血压患者制定一个包含生活方式改变和药物治疗的综合管理计划")

前面介绍过 Dspy 中可以准备必要的训练数据,利用 Few-Shot 机制调优 prompt,从而实现更好的预测效果。

总结

本文通过一个实际的案例来验证了一个新的 RAG 技术架构 Dspy + Mongo Atlas Vector Search,这个新的框架可以避免向量库查询与业务查询结合困难的问题,同时利用 Dspy 避免大量的查询需要构造复杂的 prompt 的问题。

整体而言,这个框架适用于快速构建 RAG 服务,可以解决常规 RAG 框架存在的一些问题,但是同样也存在一些不便之处,比如 Mongo Atlas Vector Search 只能通过 Mongo 官方的云服务进行使用,离线部署会更加困难。希望文章给大家一些启发,在实践中根据需要进行技术选型,从而实现鲁棒性更强,功能更加灵活的 RAG 服务。