背景介绍
在构建大模型知识库,即检索增强生成 (RAG) 服务时,中间存在一个核心环节就是向量化搜索,如果不希望引入第三方的向量数据库,那么开源的 Faiss 就是一个不错的选择。
Faiss 是 FaceBook 提供的开源向量库搜索库,在 Github 上已经有 28.1k star,相对成熟而且使用方便。
之前构建的 RAG 服务使用的就是 Faiss 提供的向量化搜索服务,Faiss 功能强大,上手简单。但是在实际产品化的过程中发现了不少不足,在实践过程中进行了必要的迭代增强,本文就是对这个过程的记录。
Faiss 问题
向量数据库 Faiss 的数据是直接存储在本地磁盘中的,使用 index.faiss
和 index.pkl
进行保存。和 SQLite 的使用类似。
这样的好处是设计简单,迁移方便。但是作为向量数据库在生产环境使用时就会暴露出下面的问题:
- 多线程并发写入 Faiss 文件时会存在冲突,可能会导致 Faiss 本地的向量库文件被损坏;
- 主从库读写分离,Faiss 读写使用同样的文件,写入异常会导致 Faiss 查询服务完全不可用(教训惨痛);
- 所有的向量数据都存储在同一个向量库中, 无法支持类似数据库分库分表查询的能力;
如果希望在生产环境用上 Faiss 向量库,必然会需要进行必要的增强
Faiss 多线程安全
为了解决 Faiss 多线程并发写入时的异常,需要将 Faiss 转换为一个线程安全的 Faiss。
可以基于线程锁进行封装,外部总是使用线程安全的 Faiss 对象,避免多线程写入导致向量库文件损坏最终导致不可用。实现的封装简化后如下所示:
import os
import threading
from contextlib import contextmanager
from langchain.vectorstores.faiss import FAISS
class ThreadSafeFaiss:
def __init__(self, obj: FAISS):
self._obj = obj
self._lock = threading.RLock()
# 获取线程锁,只有获取到线程锁的 Faiss 对象才能执行操作
@contextmanager
def acquire(self):
try:
self._lock.acquire()
yield self._obj
finally:
self._lock.release()
def save_local(self, path: str):
with self.acquire():
if not os.path.isdir(path):
os.makedirs(path)
self._obj.save_local(path)
def delete(self):
ret = []
with self.acquire():
ids = list(self._obj.docstore._dict.keys())
if ids:
ret = self._obj.delete(ids)
return ret
在上面的实现中,通过内部封装线程锁 threading.RLock
,避免了多线程写入时互相影响导致向量库写入本地的文件出现异常。
Faiss 主从库读写分离
作为一个 RAG 服务,后台会持续有新的文档入库,需要在后台进行向量化并插入,而前台用户会持续进行向量查询,需要依赖 Faiss 持续提供查询服务。
当后台的 Faiss 插入过程中出现异常导致不完整的 Faiss 向量数据写入,常规情况下可能会导致 Faiss 查询无法使用,影响线上用户。
在经历过类似的惨痛教训导致被迫重建向量库之后,参考数据库的设计思路,实现主库写入,从库查询的能力。将封装简化后如下所示:
import shutil
from langchain.schema import Document
from langchain_core.embeddings import Embeddings
from langchain.vectorstores.faiss import FAISS
MASTER_LOCAL_PATH = ""
SLAVE_LOCAL_PATH = ""
# faiss 主库,主要完成写入操作
master_faiss = None
# faiss 从库,主要完成向量检索
slave_faiss = None
# 新建向量库
def new_vector_store(
embed_model: str,
) -> FAISS:
embeddings = get_embedding(embed_model)
doc = Document(page_content="init", metadata={})
vector_store = FAISS.from_documents(
[doc],
embeddings,
normalize_L2=True,
distance_strategy="METRIC_INNER_PRODUCT",
)
ids = list(vector_store.docstore._dict.keys())
vector_store.delete(ids)
return vector_store
# 从本地文件中加载向量库
def do_load_exist_vector_store(
vs_path: str,
embed_model: str,
) -> FAISS:
embeddings = get_embedding(default_embed_model=embed_model)
vector_store = FAISS.load_local(
vs_path,
embeddings,
normalize_L2=True,
distance_strategy="METRIC_INNER_PRODUCT",
)
return vector_store
# 加载向量库, 使用 master 参数确定使用主库还是从库
def load_vector_store(
embed_model: str,
master: bool = False,
) -> ThreadSafeFaiss:
global master_faiss
global slave_faiss
# 使用从库提供向量检索服务
if not master:
if slave_faiss:
return slave_faiss
if os.path.isfile(os.path.join(SLAVE_LOCAL_PATH, "index.faiss")):
vector_store = do_load_exist_vector_store(SLAVE_LOCAL_PATH, embed_model)
else:
vector_store = new_vector_store(embed_model=embed_model)
vector_store.save_local(SLAVE_LOCAL_PATH)
slave_faiss = ThreadSafeFaiss(vector_store)
return slave_faiss
# 使用主库提供向量更新能力
if master_faiss:
return master_faiss
if os.path.isfile(os.path.join(MASTER_LOCAL_PATH, "index.faiss")):
vector_store = do_load_exist_vector_store(MASTER_LOCAL_PATH, embed_model)
# 需要向前兼容,没上线之前只有目前的从库提供服务,原始数据需要从目前的从库拷贝过来
elif os.path.isfile(os.path.join(SLAVE_LOCAL_PATH, "index.faiss")):
shutil.copytree(SLAVE_LOCAL_PATH, MASTER_LOCAL_PATH)
vector_store = do_load_exist_vector_store(MASTER_LOCAL_PATH, embed_model)
else:
vector_store = new_vector_store(embed_model=embed_model)
vector_store.save_local(MASTER_LOCAL_PATH)
master_faiss = ThreadSafeFaiss(vector_store)
return master_faiss
通过上面的 load_vector_store()
方法可以获取向量库的读写向量库,向量的增删会先在主库 master_faiss
完成, 而实际的查询是由从库 slave_faiss
提供,避免主库的更新过程中的异常导致查询服务不可用
在主库完成了数据更新后,可以执行 shutil.copytree()
将主库的本地的数据同步至从库对应的本地路径,并执行从库 slave_faiss
的重新加载。
Faiss 分表数据隔离
在 Faiss 使用过程中,所有的数据都放在一起,实际的 RAG 服务构建中,会需要支持知识库的隔离,因此需要拓展现有的 Faiss 数据隔离的能力。
拓展的过程中可以实现一个 Faiss 对象缓存池,根据 key 区分不同的 Faiss 对象,Faiss 数据在本地存储的路径根据 key 确定,简化后的实现如下所示:
import shutil
import threading
from collections import OrderedDict
from typing import List, Optional
class CachePool:
def __init__(self, cache_num: int = -1, local_base_path: str = "./faiss_data"):
self._cache_num = cache_num
self._cache: OrderedDict[str, ThreadSafeFaiss] = OrderedDict()
self.atomic = threading.RLock()
self.local_base_path = local_base_path
def keys(self) -> List[str]:
return list(self._cache.keys())
def _check_count(self):
if isinstance(self._cache_num, int) and self._cache_num > 0:
while len(self._cache) > self._cache_num:
self._cache.popitem(last=False)
def get(self, key: str) -> Optional[ThreadSafeFaiss]:
cache = self._cache.get(key)
return cache
def set(self, key: str, obj: ThreadSafeFaiss) -> ThreadSafeFaiss:
self._cache[key] = obj
self._check_count()
return obj
def pop(self, key: Optional[str] = None) -> Optional[ThreadSafeFaiss]:
if key is None:
_, value = self._cache.popitem(last=False)
return value
else:
return self._cache.pop(key, None)
def get_local_path(self, key: str) -> str:
return os.path.join(self.local_base_path, key)
# 根据 key 对应的 Faiss 对象,key 用于隔离业务层的向量数据
def load_vector_store_from_pool(
self, key: str, embed_model: str, master: bool = False
):
slave_data_key = key
key += "_master" if master else ""
self.atomic.acquire()
cache = self.get(key)
if cache:
self.atomic.release()
return cache
vs_path = self.get_local_path(key)
slave_local_path = self.get_local_path(slave_data_key)
# 本地数据已存在, 直接加载
if os.path.isfile(os.path.join(vs_path, "index.faiss")):
vector_store = do_load_exist_vector_store(vs_path, embed_model)
# 向前兼容,兼容之前只有 slave 数据的情况, 将原始 slave 数据作为基础数据
elif master and os.path.isfile(os.path.join(slave_local_path, "index.faiss")):
shutil.copytree(slave_local_path, vs_path)
vector_store = do_load_exist_vector_store(vs_path, embed_model)
else:
vector_store = new_vector_store(embed_model=embed_model)
vector_store.save_local(vs_path)
faiss_obj = ThreadSafeFaiss(vector_store)
self.set(key, faiss_obj)
self.atomic.release()
return faiss_obj
# 将 master 数据同步至 slave,并将缓存中的 slave 数据失效, 下次获取的就是 slave 最新的数据
def sync_master_to_slave(self, key: str):
slave_key = key
master_key = key + "_master"
master_local_path = self.get_local_path(master_key)
slave_local_path = self.get_local_path(slave_key)
shutil.copytree(master_local_path, slave_local_path)
self.pop(slave_key)
在上面的实现中,通过调用 load_vector_store_from_pool()
方法可以获取对应的 Faiss 对象,并提供 sync_master_to_slave()
用于 master 到 slave 向量数据库的同步。
总结
本文实际是对 RAG 实现过程中向量数据库 Faiss 的增强思路,Faiss 虽然简单而强大,但是如果将其作为一个向量数据库应用于生产环境,就会存在一些明显的问题。
本文是在实践中对于 Faiss 本身存在的几个问题进行了定向的优化,解决了向量库数据库 Faiss 使用过程中的高频痛点,希望对大家有所助益。