向量数据库 Faiss 的实践与增强探索

Practice and enhanced exploration of vector database Faiss

Posted by Bryan on April 23, 2024

背景介绍

在构建大模型知识库,即检索增强生成 (RAG) 服务时,中间存在一个核心环节就是向量化搜索,如果不希望引入第三方的向量数据库,那么开源的 Faiss 就是一个不错的选择。

Faiss 是 FaceBook 提供的开源向量库搜索库,在 Github 上已经有 28.1k star,相对成熟而且使用方便。

之前构建的 RAG 服务使用的就是 Faiss 提供的向量化搜索服务,Faiss 功能强大,上手简单。但是在实际产品化的过程中发现了不少不足,在实践过程中进行了必要的迭代增强,本文就是对这个过程的记录。

Faiss 问题

向量数据库 Faiss 的数据是直接存储在本地磁盘中的,使用 index.faissindex.pkl 进行保存。和 SQLite 的使用类似。

这样的好处是设计简单,迁移方便。但是作为向量数据库在生产环境使用时就会暴露出下面的问题:

  1. 多线程并发写入 Faiss 文件时会存在冲突,可能会导致 Faiss 本地的向量库文件被损坏;
  2. 主从库读写分离,Faiss 读写使用同样的文件,写入异常会导致 Faiss 查询服务完全不可用(教训惨痛);
  3. 所有的向量数据都存储在同一个向量库中, 无法支持类似数据库分库分表查询的能力;

如果希望在生产环境用上 Faiss 向量库,必然会需要进行必要的增强

Faiss 多线程安全

为了解决 Faiss 多线程并发写入时的异常,需要将 Faiss 转换为一个线程安全的 Faiss。

可以基于线程锁进行封装,外部总是使用线程安全的 Faiss 对象,避免多线程写入导致向量库文件损坏最终导致不可用。实现的封装简化后如下所示:

import os
import threading
from contextlib import contextmanager

from langchain.vectorstores.faiss import FAISS


class ThreadSafeFaiss:
    def __init__(self, obj: FAISS):
        self._obj = obj
        self._lock = threading.RLock()

    # 获取线程锁,只有获取到线程锁的 Faiss 对象才能执行操作

    @contextmanager
    def acquire(self):
        try:
            self._lock.acquire()
            yield self._obj
        finally:
            self._lock.release()

    def save_local(self, path: str):
        with self.acquire():
            if not os.path.isdir(path):
                os.makedirs(path)
            self._obj.save_local(path)

    def delete(self):
        ret = []
        with self.acquire():
            ids = list(self._obj.docstore._dict.keys())
            if ids:
                ret = self._obj.delete(ids)
        return ret

在上面的实现中,通过内部封装线程锁 threading.RLock,避免了多线程写入时互相影响导致向量库写入本地的文件出现异常。

Faiss 主从库读写分离

作为一个 RAG 服务,后台会持续有新的文档入库,需要在后台进行向量化并插入,而前台用户会持续进行向量查询,需要依赖 Faiss 持续提供查询服务。

当后台的 Faiss 插入过程中出现异常导致不完整的 Faiss 向量数据写入,常规情况下可能会导致 Faiss 查询无法使用,影响线上用户。

在经历过类似的惨痛教训导致被迫重建向量库之后,参考数据库的设计思路,实现主库写入,从库查询的能力。将封装简化后如下所示:

import shutil

from langchain.schema import Document
from langchain_core.embeddings import Embeddings
from langchain.vectorstores.faiss import FAISS

MASTER_LOCAL_PATH = ""
SLAVE_LOCAL_PATH = ""

# faiss 主库,主要完成写入操作

master_faiss = None

# faiss 从库,主要完成向量检索

slave_faiss = None


# 新建向量库

def new_vector_store(
    embed_model: str,
) -> FAISS:
    embeddings = get_embedding(embed_model)
    doc = Document(page_content="init", metadata={})
    vector_store = FAISS.from_documents(
        [doc],
        embeddings,
        normalize_L2=True,
        distance_strategy="METRIC_INNER_PRODUCT",
    )
    ids = list(vector_store.docstore._dict.keys())
    vector_store.delete(ids)
    return vector_store


# 从本地文件中加载向量库

def do_load_exist_vector_store(
    vs_path: str,
    embed_model: str,
) -> FAISS:
    embeddings = get_embedding(default_embed_model=embed_model)
    vector_store = FAISS.load_local(
        vs_path,
        embeddings,
        normalize_L2=True,
        distance_strategy="METRIC_INNER_PRODUCT",
    )
    return vector_store


# 加载向量库, 使用 master 参数确定使用主库还是从库

def load_vector_store(
    embed_model: str,
    master: bool = False,
) -> ThreadSafeFaiss:
    global master_faiss
    global slave_faiss

    # 使用从库提供向量检索服务

    if not master:
        if slave_faiss:
            return slave_faiss

        if os.path.isfile(os.path.join(SLAVE_LOCAL_PATH, "index.faiss")):
            vector_store = do_load_exist_vector_store(SLAVE_LOCAL_PATH, embed_model)
        else:
            vector_store = new_vector_store(embed_model=embed_model)
            vector_store.save_local(SLAVE_LOCAL_PATH)

        slave_faiss = ThreadSafeFaiss(vector_store)
        return slave_faiss

    # 使用主库提供向量更新能力

    if master_faiss:
        return master_faiss

    if os.path.isfile(os.path.join(MASTER_LOCAL_PATH, "index.faiss")):
        vector_store = do_load_exist_vector_store(MASTER_LOCAL_PATH, embed_model)
    # 需要向前兼容,没上线之前只有目前的从库提供服务,原始数据需要从目前的从库拷贝过来

    elif os.path.isfile(os.path.join(SLAVE_LOCAL_PATH, "index.faiss")):
        shutil.copytree(SLAVE_LOCAL_PATH, MASTER_LOCAL_PATH)
        vector_store = do_load_exist_vector_store(MASTER_LOCAL_PATH, embed_model)
    else:
        vector_store = new_vector_store(embed_model=embed_model)
        vector_store.save_local(MASTER_LOCAL_PATH)

    master_faiss = ThreadSafeFaiss(vector_store)
    return master_faiss

通过上面的 load_vector_store() 方法可以获取向量库的读写向量库,向量的增删会先在主库 master_faiss 完成, 而实际的查询是由从库 slave_faiss 提供,避免主库的更新过程中的异常导致查询服务不可用

在主库完成了数据更新后,可以执行 shutil.copytree() 将主库的本地的数据同步至从库对应的本地路径,并执行从库 slave_faiss 的重新加载。

Faiss 分表数据隔离

在 Faiss 使用过程中,所有的数据都放在一起,实际的 RAG 服务构建中,会需要支持知识库的隔离,因此需要拓展现有的 Faiss 数据隔离的能力。

拓展的过程中可以实现一个 Faiss 对象缓存池,根据 key 区分不同的 Faiss 对象,Faiss 数据在本地存储的路径根据 key 确定,简化后的实现如下所示:


import shutil
import threading
from collections import OrderedDict
from typing import List, Optional


class CachePool:
    def __init__(self, cache_num: int = -1, local_base_path: str = "./faiss_data"):
        self._cache_num = cache_num
        self._cache: OrderedDict[str, ThreadSafeFaiss] = OrderedDict()
        self.atomic = threading.RLock()
        self.local_base_path = local_base_path

    def keys(self) -> List[str]:
        return list(self._cache.keys())

    def _check_count(self):
        if isinstance(self._cache_num, int) and self._cache_num > 0:
            while len(self._cache) > self._cache_num:
                self._cache.popitem(last=False)

    def get(self, key: str) -> Optional[ThreadSafeFaiss]:
        cache = self._cache.get(key)
        return cache

    def set(self, key: str, obj: ThreadSafeFaiss) -> ThreadSafeFaiss:
        self._cache[key] = obj
        self._check_count()
        return obj

    def pop(self, key: Optional[str] = None) -> Optional[ThreadSafeFaiss]:
        if key is None:
            _, value = self._cache.popitem(last=False)
            return value
        else:
            return self._cache.pop(key, None)

    def get_local_path(self, key: str) -> str:
        return os.path.join(self.local_base_path, key)

    # 根据 key 对应的 Faiss 对象,key 用于隔离业务层的向量数据

    def load_vector_store_from_pool(
        self, key: str, embed_model: str, master: bool = False
    ):
        slave_data_key = key
        key += "_master" if master else ""

        self.atomic.acquire()
        cache = self.get(key)
        if cache:
            self.atomic.release()
            return cache

        vs_path = self.get_local_path(key)
        slave_local_path = self.get_local_path(slave_data_key)

        # 本地数据已存在, 直接加载

        if os.path.isfile(os.path.join(vs_path, "index.faiss")):
            vector_store = do_load_exist_vector_store(vs_path, embed_model)

        # 向前兼容,兼容之前只有 slave 数据的情况, 将原始 slave 数据作为基础数据

        elif master and os.path.isfile(os.path.join(slave_local_path, "index.faiss")):
            shutil.copytree(slave_local_path, vs_path)
            vector_store = do_load_exist_vector_store(vs_path, embed_model)
        else:
            vector_store = new_vector_store(embed_model=embed_model)
            vector_store.save_local(vs_path)

        faiss_obj = ThreadSafeFaiss(vector_store)
        self.set(key, faiss_obj)
        self.atomic.release()

        return faiss_obj

    # 将 master 数据同步至 slave,并将缓存中的 slave 数据失效, 下次获取的就是 slave 最新的数据

    def sync_master_to_slave(self, key: str):
        slave_key = key
        master_key = key + "_master"

        master_local_path = self.get_local_path(master_key)
        slave_local_path = self.get_local_path(slave_key)

        shutil.copytree(master_local_path, slave_local_path)

        self.pop(slave_key)


在上面的实现中,通过调用 load_vector_store_from_pool() 方法可以获取对应的 Faiss 对象,并提供 sync_master_to_slave() 用于 master 到 slave 向量数据库的同步。

总结

本文实际是对 RAG 实现过程中向量数据库 Faiss 的增强思路,Faiss 虽然简单而强大,但是如果将其作为一个向量数据库应用于生产环境,就会存在一些明显的问题。

本文是在实践中对于 Faiss 本身存在的几个问题进行了定向的优化,解决了向量库数据库 Faiss 使用过程中的高频痛点,希望对大家有所助益。