使用 Qdrant 的 FastEmbed 进行混合搜索
| 耗时:20 分钟 | 级别:新手 | 输出:GitHub |
|---|
本教程将指导您如何构建并部署自己的混合搜索服务,以便在 startups-list.com 的公司描述中进行搜索,并挑选出与您查询内容最相似的结果。该网站包含每个条目的公司名称、描述、位置和图片。
正如我们已在博客中所述,混合搜索并没有单一的定义。在本教程中,我们将涵盖结合密集向量(dense)和稀疏向量(sparse embeddings)的情况。前者是指由 BERT 等知名神经网络生成的嵌入向量,而后者则更多地与传统的全文搜索方法相关。
我们的混合搜索服务将使用 Fastembed 库来生成文本描述的嵌入向量,并使用 FastAPI 来提供搜索 API。Fastembed 与 Qdrant 客户端原生集成,因此您可以轻松地将数据上传到 Qdrant 并执行搜索查询。

工作流程
要创建混合搜索服务,您需要转换原始数据,然后创建一个搜索函数来操作它。首先,您将 1) 使用修改版的 BERT 机器学习模型下载并准备示例数据集。然后,您将 2) 把数据加载到 Qdrant 中,3) 创建一个混合搜索 API,并 4) 使用 FastAPI 进行部署。

先决条件
要完成本教程,您需要:
- Docker - 使用 Qdrant 最简单的方法是运行预构建的 Docker 镜像。
- 来自 startups-list.com 的原始解析数据。
- Python 版本 >= 3.9
准备示例数据集
要在创业公司描述上进行混合搜索,必须首先将描述数据编码为向量。Qdrant 客户端集成的 Fastembed 将编码和上传合并为一个步骤。
它还能自动处理批处理和并行化,因此您无需为此操心。
让我们从下载数据和安装必要的包开始。
- 首先,您需要下载数据集。
wget https://storage.googleapis.com/generall-shared-data/startups_demo.json
在 Docker 中运行 Qdrant
接下来,您需要使用向量引擎来管理所有数据。Qdrant 允许您存储、更新或删除已创建的向量。最重要的是,它允许您通过方便的 API 搜索最近的向量。
注意:在开始之前,请创建一个项目目录并在其中建立一个虚拟 Python 环境。
- 从 DockerHub 下载 Qdrant 镜像。
docker pull qdrant/qdrant
- 在 Docker 中启动 Qdrant。
docker run -p 6333:6333 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
您应该看到如下输出
...
[2021-02-05T00:08:51Z INFO actix_server::builder] Starting 12 workers
[2021-02-05T00:08:51Z INFO actix_server::builder] Starting "actix-web-service-0.0.0.0:6333" service on 0.0.0.0:6333
访问 https://:6333/ 测试服务。您应该能在浏览器中看到 Qdrant 的版本信息。
所有上传到 Qdrant 的数据都保存在 ./qdrant_storage 目录中,即使您重新创建容器,这些数据也会被持久化。
将数据上传到Qdrant
- 安装官方 Python 客户端以便更好地与 Qdrant 交互。
pip install "qdrant-client[fastembed]>=1.14.2"
注意:本教程需要 fastembed 版本 >= 0.6.1。
此时,您应该在 startups_demo.json 文件中拥有创业公司记录,并且 Qdrant 正在本地机器上运行。
现在您需要编写一个脚本,将所有的创业公司数据和向量上传到搜索引擎中。
- 为 Qdrant 创建一个客户端对象。
# Import client library
from qdrant_client import QdrantClient, models
client = QdrantClient(url="https://:6333")
- 选择用于编码数据的模型并准备集合。
在本教程中,我们将分别使用两个预训练模型来计算密集向量和稀疏向量。模型为:sentence-transformers/all-MiniLM-L6-v2 和 prithivida/Splade_PP_en_v1。选定模型后,我们需要在 Qdrant 中配置一个集合。
dense_vector_name = "dense"
sparse_vector_name = "sparse"
dense_model_name = "sentence-transformers/all-MiniLM-L6-v2"
sparse_model_name = "prithivida/Splade_PP_en_v1"
if not client.collection_exists("startups"):
client.create_collection(
collection_name="startups",
vectors_config={
dense_vector_name: models.VectorParams(
size=client.get_embedding_size(dense_model_name),
distance=models.Distance.COSINE
)
}, # size and distance are model dependent
sparse_vectors_config={sparse_vector_name: models.SparseVectorParams()},
)
Qdrant 要求向量拥有各自的名称和配置。参数 size 和 distance 是强制性的,不过您还可以为向量指定扩展配置,例如 quantization_config 或 hnsw_config。
- 从文件中读取数据。
import json
payload_path = "startups_demo.json"
documents = []
metadata = []
with open(payload_path) as fd:
for line in fd:
obj = json.loads(line)
description = obj["description"]
dense_document = models.Document(text=description, model=dense_model_name)
sparse_document = models.Document(text=description, model=sparse_model_name)
documents.append(
{
dense_vector_name: dense_document,
sparse_vector_name: sparse_document,
}
)
metadata.append(obj)
在此代码块中,我们从 startups_demo.json 文件读取数据并将其拆分为两个列表:documents 和 metadata。文档是带有创业公司描述的模型以及用于嵌入数据的模型名称。元数据是与每家创业公司关联的有效载荷,例如名称、地点和图片。我们将使用 documents 将数据编码为向量。
- 编码并上传数据。
client.upload_collection(
collection_name="startups",
vectors=tqdm.tqdm(documents),
payload=metadata,
parallel=4, # Use 4 CPU cores to encode data.
# This will spawn a model per process, which might be memory expensive
# Make sure that your system does not use swap, and reduce the amount
# # of processes if it does.
# Otherwise, it might significantly slow down the process.
# Requires wrapping code into if __name__ == '__main__' block
)
上传已处理数据
从此处下载并解压处理好的数据,或使用以下脚本
wget https://storage.googleapis.com/dataset-startup-search/startup-list-com/startups_hybrid_search_processed_40k.tar.gz
tar -xvf startups_hybrid_search_processed_40k.tar.gz
然后您可以将数据上传到 Qdrant。
import json
import numpy as np
def named_vectors(
vectors: list[float],
sparse_vectors: list[models.SparseVector]
) -> dict:
for vector, sparse_vector in zip(vectors, sparse_vectors):
yield {
dense_vector_name: vector,
sparse_vector_name: models.SparseVector(**sparse_vector),
}
with open("dense_vectors.npy", "rb") as f:
vectors = np.load(f)
with open("sparse_vectors.json", "r") as f:
sparse_vectors = json.load(f)
with open("payload.json", "r") as f:
payload = json.load(f)
client.upload_collection(
"startups",
vectors=named_vectors(vectors, sparse_vectors),
payload=payload
)
upload_collection 方法将对所有文档进行编码并将其上传到 Qdrant。
parallel 参数启用数据并行处理,而不是使用内置的 ONNX 并行处理。
此外,如果您想在以后使用 ID 来更新或删除文档,可以为每个文档指定 ID。如果您不指定 ID,它们将自动生成。
您可以通过将 tqdm 进度条传递给 upload_collection 方法来监控编码进度。
from tqdm import tqdm
client.upload_collection(
collection_name="startups",
vectors=documents,
payload=metadata,
ids=tqdm(range(len(documents))),
)
构建搜索 API
现在所有准备工作都已完成,让我们开始构建神经网络搜索类。
为了处理传入的请求,混合搜索类需要 3 件事:1) 将查询转换为向量的模型,2) 执行搜索查询的 Qdrant 客户端,3) 用于对密集和稀疏搜索结果进行重排序的融合函数。
Qdrant 支持 2 种用于组合结果的融合函数:倒数排名融合 (Reciprocal Rank Fusion) 和 基于分布的得分融合 (Distribution Based Score Fusion)
- 创建一个名为
hybrid_searcher.py的文件并指定以下内容。
from qdrant_client import QdrantClient, models
class HybridSearcher:
DENSE_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
SPARSE_MODEL = "prithivida/Splade_PP_en_v1"
def __init__(self, collection_name):
self.collection_name = collection_name
self.qdrant_client = QdrantClient()
- 编写搜索函数。
def search(self, text: str):
search_result = self.qdrant_client.query_points(
collection_name=self.collection_name,
query=models.FusionQuery(
fusion=models.Fusion.RRF # we are using reciprocal rank fusion here
),
prefetch=[
models.Prefetch(
query=models.Document(text=text, model=self.DENSE_MODEL),
using=dense_vector_name,
),
models.Prefetch(
query=models.Document(text=text, model=self.SPARSE_MODEL),
using=sparse_vector_name,
),
],
query_filter=None, # If you don't want any filters for now
limit=5, # 5 the closest results
).points
# `search_result` contains models.QueryResponse structure
# We can access list of scored points with the corresponding similarity scores,
# vectors (if `with_vectors` was set to `True`), and payload via `points` attribute.
# Select and return metadata
metadata = [point.payload for point in search_result]
return metadata
- 添加搜索过滤器。
使用 Qdrant,还可以向搜索添加一些条件。例如,如果您想在特定城市搜索创业公司,搜索查询可能如下所示
...
city_of_interest = "Berlin"
# Define a filter for cities
city_filter = models.Filter(
must=[
models.FieldCondition(
key="city",
match=models.MatchValue(value=city_of_interest)
)
]
)
# NOTE: it is not a hybrid search! It's just a dense query for simplicity
search_result = self.qdrant_client.query_points(
collection_name=self.collection_name,
query=models.Document(text=text, model=self.DENSE_MODEL),
query_filter=city_filter,
limit=5
).points
...
您现在已经创建了一个用于神经搜索查询的类。现在将其包装成一个服务。
使用 FastAPI 部署搜索服务
要构建服务,您将使用 FastAPI 框架。
- 安装 FastAPI。
要安装它,请使用命令
pip install fastapi uvicorn
- 实现服务。
创建一个名为 service.py 的文件并指定以下内容。
该服务将只有一个 API 端点,如下所示
from fastapi import FastAPI
# The file where HybridSearcher is stored
from hybrid_searcher import HybridSearcher
app = FastAPI()
# Create a neural searcher instance
hybrid_searcher = HybridSearcher(collection_name="startups")
@app.get("/api/search")
def search_startup(q: str):
return {"result": hybrid_searcher.search(text=q)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
- 运行服务。
python service.py
- 在浏览器中打开 https://:8000/docs。
您应该能够看到服务的调试接口。

随意尝试,针对我们语料库中的公司进行查询,并查看结果。
加入我们的 Discord 社区,我们会在那里讨论向量搜索和相似度学习,并发布其他神经网络和神经搜索应用的示例。