使用 FastEmbed 和 Qdrant 构建混合搜索服务
| 时间:20 分钟 | 级别:新手 | 输出:GitHub |
|---|
本教程将向您展示如何构建和部署您自己的混合搜索服务,以浏览来自 startups-list.com 的公司描述,并选择与您的查询最相似的公司。该网站包含公司名称、描述、位置和每个条目的图片。
正如我们已经在 博客 中写道的,混合搜索没有单一的定义。在本教程中,我们将介绍结合稠密和 稀疏嵌入 的情况。前者指由 BERT 等知名神经网络生成的嵌入,而后者则与传统的全文搜索方法更相关。
我们的混合搜索服务将使用 Fastembed 包生成文本描述的嵌入,并使用 FastAPI 提供搜索 API。Fastembed 原生集成了 Qdrant 客户端,因此您可以轻松地将数据上传到 Qdrant 并执行搜索查询。

工作流程
要创建混合搜索服务,您需要转换原始数据,然后创建搜索函数来操作它。首先,您将 1) 使用 BERT ML 模型的修改版本下载并准备样本数据集。然后,您将 2) 将数据加载到 Qdrant 中,3) 创建混合搜索 API,并 4) 使用 FastAPI 提供服务。

先决条件
要完成本教程,您需要
- Docker - 使用 Qdrant 最简单的方法是运行预构建的 Docker 镜像。
- 来自 startups-list.com 的 原始解析数据。
- Python 版本 >=3.9
准备样本数据集
要在创业公司描述上进行混合搜索,您必须首先将描述数据编码成向量。Fastembed 集成到 Qdrant 客户端中,将编码和上传合并为一个步骤。
它还负责批处理和并行化,因此您无需担心。
让我们从下载数据和安装必要的软件包开始。
- 首先,您需要下载数据集。
wget https://storage.googleapis.com/generall-shared-data/startups_demo.json
在 Docker 中运行 Qdrant
接下来,您需要使用向量引擎管理所有数据。Qdrant 允许您存储、更新或删除创建的向量。最重要的是,它允许您通过方便的 API 搜索最近的向量。
注意:在开始之前,请创建一个项目目录并在其中创建一个虚拟 Python 环境。
- 从 DockerHub 下载 Qdrant 镜像。
docker pull qdrant/qdrant
- 在 Docker 中启动 Qdrant。
docker run -p 6333:6333 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
您应该看到如下输出
...
[2021-02-05T00:08:51Z INFO actix_server::builder] Starting 12 workers
[2021-02-05T00:08:51Z INFO actix_server::builder] Starting "actix-web-service-0.0.0.0:6333" service on 0.0.0.0:6333
通过访问 https://:6333/ 测试服务。您应该会在浏览器中看到 Qdrant 版本信息。
所有上传到 Qdrant 的数据都保存在 ./qdrant_storage 目录中,即使您重新创建容器,数据也会持久化。
将数据上传到Qdrant
- 安装官方 Python 客户端以更好地与 Qdrant 交互。
pip install "qdrant-client[fastembed]>=1.14.2"
注意:本教程需要 fastembed 版本 >=0.6.1。
此时,您应该在 startups_demo.json 文件中有创业公司记录,并且 Qdrant 正在本地机器上运行。
现在您需要编写一个脚本,将所有创业公司数据和向量上传到搜索引擎。
- 为 Qdrant 创建一个客户端对象。
# Import client library
from qdrant_client import QdrantClient, models
client = QdrantClient(url="https://:6333")
- 选择模型来编码您的数据并准备集合。
在本教程中,我们将使用两个预训练模型分别计算稠密和稀疏向量。这些模型是:sentence-transformers/all-MiniLM-L6-v2 和 prithivida/Splade_PP_en_v1。一旦做出选择,我们需要在 Qdrant 中配置一个集合。
dense_vector_name = "dense"
sparse_vector_name = "sparse"
dense_model_name = "sentence-transformers/all-MiniLM-L6-v2"
sparse_model_name = "prithivida/Splade_PP_en_v1"
if not client.collection_exists("startups"):
client.create_collection(
collection_name="startups",
vectors_config={
dense_vector_name: models.VectorParams(
size=client.get_embedding_size(dense_model_name),
distance=models.Distance.COSINE
)
}, # size and distance are model dependent
sparse_vectors_config={sparse_vector_name: models.SparseVectorParams()},
)
Qdrant 要求向量有自己的名称和配置。参数 size 和 distance 是强制性的,但是,您可以额外为您的向量指定扩展配置,例如 quantization_config 或 hnsw_config。
- 从文件中读取数据。
import json
payload_path = "startups_demo.json"
documents = []
metadata = []
with open(payload_path) as fd:
for line in fd:
obj = json.loads(line)
description = obj["description"]
dense_document = models.Document(text=description, model=dense_model_name)
sparse_document = models.Document(text=description, model=sparse_model_name)
documents.append(
{
dense_vector_name: dense_document,
sparse_vector_name: sparse_document,
}
)
metadata.append(obj)
在此代码块中,我们从 startups_demo.json 文件中读取数据,并将其拆分为两个列表:documents 和 metadata。文档是包含创业公司描述和用于嵌入数据的模型名称的模型。元数据是与每个创业公司关联的负载,例如名称、位置和图片。我们将使用 documents 将数据编码成向量。
- 编码并上传数据。
client.upload_collection(
collection_name="startups",
vectors=tqdm.tqdm(documents),
payload=metadata,
parallel=4, # Use 4 CPU cores to encode data.
# This will spawn a model per process, which might be memory expensive
# Make sure that your system does not use swap, and reduce the amount
# # of processes if it does.
# Otherwise, it might significantly slow down the process.
# Requires wrapping code into if __name__ == '__main__' block
)
上传已处理的数据
从此处下载并解压已处理的数据,或使用以下脚本
wget https://storage.googleapis.com/dataset-startup-search/startup-list-com/startups_hybrid_search_processed_40k.tar.gz
tar -xvf startups_hybrid_search_processed_40k.tar.gz
然后您可以将数据上传到 Qdrant。
import json
import numpy as np
def named_vectors(
vectors: list[float],
sparse_vectors: list[models.SparseVector]
) -> dict:
for vector, sparse_vector in zip(vectors, sparse_vectors):
yield {
dense_vector_name: vector,
sparse_vector_name: models.SparseVector(**sparse_vector),
}
with open("dense_vectors.npy", "rb") as f:
vectors = np.load(f)
with open("sparse_vectors.json", "r") as f:
sparse_vectors = json.load(f)
with open("payload.json", "r") as f:
payload = json.load(f)
client.upload_collection(
"startups",
vectors=named_vectors(vectors, sparse_vectors),
payload=payload
)
upload_collection 方法将编码所有文档并将它们上传到 Qdrant。
parallel 参数启用数据并行化而不是内置的 ONNX 并行化。
此外,如果您以后想使用它们来更新或删除文档,您可以为每个文档指定 ID。如果您不指定 ID,它们将自动生成。
您可以通过将 tqdm 进度条传递给 upload_collection 方法来监控编码进度。
from tqdm import tqdm
client.upload_collection(
collection_name="startups",
vectors=documents,
payload=metadata,
ids=tqdm(range(len(documents))),
)
构建搜索 API
现在所有准备工作都已完成,让我们开始构建神经网络搜索类。
为了处理传入的请求,混合搜索类需要 3 件事:1) 将查询转换为向量的模型,2) 执行搜索查询的 Qdrant 客户端,3) 用于重新排序稠密和稀疏搜索结果的融合函数。
Qdrant 支持 2 种融合函数来组合结果:倒数秩融合 和 基于分布的分数融合
- 创建一个名为
hybrid_searcher.py的文件并指定以下内容。
from qdrant_client import QdrantClient, models
class HybridSearcher:
DENSE_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
SPARSE_MODEL = "prithivida/Splade_PP_en_v1"
def __init__(self, collection_name):
self.collection_name = collection_name
self.qdrant_client = QdrantClient()
- 编写搜索函数。
def search(self, text: str):
search_result = self.qdrant_client.query_points(
collection_name=self.collection_name,
query=models.FusionQuery(
fusion=models.Fusion.RRF # we are using reciprocal rank fusion here
),
prefetch=[
models.Prefetch(
query=models.Document(text=text, model=self.DENSE_MODEL),
using=dense_vector_name,
),
models.Prefetch(
query=models.Document(text=text, model=self.SPARSE_MODEL),
using=sparse_vector_name,
),
],
query_filter=None, # If you don't want any filters for now
limit=5, # 5 the closest results
).points
# `search_result` contains models.QueryResponse structure
# We can access list of scored points with the corresponding similarity scores,
# vectors (if `with_vectors` was set to `True`), and payload via `points` attribute.
# Select and return metadata
metadata = [point.payload for point in search_result]
return metadata
- 添加搜索过滤器。
使用 Qdrant,还可以为搜索添加一些条件。例如,如果您想搜索某个城市的创业公司,搜索查询可能如下所示
...
city_of_interest = "Berlin"
# Define a filter for cities
city_filter = models.Filter(
must=[
models.FieldCondition(
key="city",
match=models.MatchValue(value=city_of_interest)
)
]
)
# NOTE: it is not a hybrid search! It's just a dense query for simplicity
search_result = self.qdrant_client.query_points(
collection_name=self.collection_name,
query=models.Document(text=text, model=self.DENSE_MODEL),
query_filter=city_filter,
limit=5
).points
...
您现在已经为神经网络搜索查询创建了一个类。现在将其封装到一个服务中。
使用 FastAPI 部署搜索
要构建服务,您将使用 FastAPI 框架。
- 安装 FastAPI。
要安装它,请使用命令
pip install fastapi uvicorn
- 实现服务。
创建一个名为 service.py 的文件并指定以下内容。
该服务将只有一个 API 端点,它将如下所示
from fastapi import FastAPI
# The file where HybridSearcher is stored
from hybrid_searcher import HybridSearcher
app = FastAPI()
# Create a neural searcher instance
hybrid_searcher = HybridSearcher(collection_name="startups")
@app.get("/api/search")
def search_startup(q: str):
return {"result": hybrid_searcher.search(text=q)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
- 运行服务。
python service.py
- 在浏览器中打开 https://:8000/docs。
您应该能够看到服务的调试界面。

随意使用它,对我们语料库中的公司进行查询,并查看结果。
加入我们的 Discord 社区,我们在这里讨论向量搜索和相似性学习,发布其他神经网络和神经网络搜索应用程序的示例。