使用 Qdrant 的 FastEmbed 进行混合搜索

耗时:20 分钟级别:新手输出:GitHub

本教程将指导您如何构建并部署自己的混合搜索服务,以便在 startups-list.com 的公司描述中进行搜索,并挑选出与您查询内容最相似的结果。该网站包含每个条目的公司名称、描述、位置和图片。

正如我们已在博客中所述,混合搜索并没有单一的定义。在本教程中,我们将涵盖结合密集向量(dense)和稀疏向量(sparse embeddings)的情况。前者是指由 BERT 等知名神经网络生成的嵌入向量,而后者则更多地与传统的全文搜索方法相关。

我们的混合搜索服务将使用 Fastembed 库来生成文本描述的嵌入向量,并使用 FastAPI 来提供搜索 API。Fastembed 与 Qdrant 客户端原生集成,因此您可以轻松地将数据上传到 Qdrant 并执行搜索查询。

Hybrid Search Schema

工作流程

要创建混合搜索服务,您需要转换原始数据,然后创建一个搜索函数来操作它。首先,您将 1) 使用修改版的 BERT 机器学习模型下载并准备示例数据集。然后,您将 2) 把数据加载到 Qdrant 中,3) 创建一个混合搜索 API,并 4) 使用 FastAPI 进行部署。

Hybrid Search Workflow

先决条件

要完成本教程,您需要:

  • Docker - 使用 Qdrant 最简单的方法是运行预构建的 Docker 镜像。
  • 来自 startups-list.com 的原始解析数据
  • Python 版本 >= 3.9

准备示例数据集

要在创业公司描述上进行混合搜索,必须首先将描述数据编码为向量。Qdrant 客户端集成的 Fastembed 将编码和上传合并为一个步骤。

它还能自动处理批处理和并行化,因此您无需为此操心。

让我们从下载数据和安装必要的包开始。

  1. 首先,您需要下载数据集。
wget https://storage.googleapis.com/generall-shared-data/startups_demo.json

在 Docker 中运行 Qdrant

接下来,您需要使用向量引擎来管理所有数据。Qdrant 允许您存储、更新或删除已创建的向量。最重要的是,它允许您通过方便的 API 搜索最近的向量。

注意:在开始之前,请创建一个项目目录并在其中建立一个虚拟 Python 环境。

  1. 从 DockerHub 下载 Qdrant 镜像。
docker pull qdrant/qdrant
  1. 在 Docker 中启动 Qdrant。
docker run -p 6333:6333 \
    -v $(pwd)/qdrant_storage:/qdrant/storage \
    qdrant/qdrant

您应该看到如下输出

...
[2021-02-05T00:08:51Z INFO  actix_server::builder] Starting 12 workers
[2021-02-05T00:08:51Z INFO  actix_server::builder] Starting "actix-web-service-0.0.0.0:6333" service on 0.0.0.0:6333

访问 https://:6333/ 测试服务。您应该能在浏览器中看到 Qdrant 的版本信息。

所有上传到 Qdrant 的数据都保存在 ./qdrant_storage 目录中,即使您重新创建容器,这些数据也会被持久化。

将数据上传到Qdrant

  1. 安装官方 Python 客户端以便更好地与 Qdrant 交互。
pip install "qdrant-client[fastembed]>=1.14.2"

注意:本教程需要 fastembed 版本 >= 0.6.1。

此时,您应该在 startups_demo.json 文件中拥有创业公司记录,并且 Qdrant 正在本地机器上运行。

现在您需要编写一个脚本,将所有的创业公司数据和向量上传到搜索引擎中。

  1. 为 Qdrant 创建一个客户端对象。
# Import client library
from qdrant_client import QdrantClient, models

client = QdrantClient(url="https://:6333")
  1. 选择用于编码数据的模型并准备集合。

在本教程中,我们将分别使用两个预训练模型来计算密集向量和稀疏向量。模型为:sentence-transformers/all-MiniLM-L6-v2prithivida/Splade_PP_en_v1。选定模型后,我们需要在 Qdrant 中配置一个集合。

dense_vector_name = "dense"
sparse_vector_name = "sparse"
dense_model_name = "sentence-transformers/all-MiniLM-L6-v2"
sparse_model_name = "prithivida/Splade_PP_en_v1"
if not client.collection_exists("startups"):
    client.create_collection(
        collection_name="startups",
        vectors_config={
            dense_vector_name: models.VectorParams(
                size=client.get_embedding_size(dense_model_name), 
                distance=models.Distance.COSINE
            )
        },  # size and distance are model dependent
        sparse_vectors_config={sparse_vector_name: models.SparseVectorParams()},
    )

Qdrant 要求向量拥有各自的名称和配置。参数 sizedistance 是强制性的,不过您还可以为向量指定扩展配置,例如 quantization_confighnsw_config

  1. 从文件中读取数据。
import json

payload_path = "startups_demo.json"
documents = []
metadata = []

with open(payload_path) as fd:
    for line in fd:
        obj = json.loads(line)
        description = obj["description"]
        dense_document = models.Document(text=description, model=dense_model_name)
        sparse_document = models.Document(text=description, model=sparse_model_name)
        documents.append(
            {
                dense_vector_name: dense_document,
                sparse_vector_name: sparse_document,
            }
        )
        metadata.append(obj)

在此代码块中,我们从 startups_demo.json 文件读取数据并将其拆分为两个列表:documentsmetadata。文档是带有创业公司描述的模型以及用于嵌入数据的模型名称。元数据是与每家创业公司关联的有效载荷,例如名称、地点和图片。我们将使用 documents 将数据编码为向量。

  1. 编码并上传数据。
    client.upload_collection(
        collection_name="startups",
        vectors=tqdm.tqdm(documents),
        payload=metadata,
        parallel=4,  # Use 4 CPU cores to encode data.
        # This will spawn a model per process, which might be memory expensive
        # Make sure that your system does not use swap, and reduce the amount
        # # of processes if it does. 
        # Otherwise, it might significantly slow down the process.
        # Requires wrapping code into if __name__ == '__main__' block
    )
上传已处理数据

此处下载并解压处理好的数据,或使用以下脚本

wget https://storage.googleapis.com/dataset-startup-search/startup-list-com/startups_hybrid_search_processed_40k.tar.gz
tar -xvf startups_hybrid_search_processed_40k.tar.gz

然后您可以将数据上传到 Qdrant。

import json
import numpy as np


def named_vectors(
        vectors: list[float], 
        sparse_vectors: list[models.SparseVector]
) -> dict:
    for vector, sparse_vector in zip(vectors, sparse_vectors):
        yield {
            dense_vector_name: vector,
            sparse_vector_name: models.SparseVector(**sparse_vector),
        }


with open("dense_vectors.npy", "rb") as f:
    vectors = np.load(f)
with open("sparse_vectors.json", "r") as f:
    sparse_vectors = json.load(f)

with open("payload.json", "r") as f:
    payload = json.load(f)

client.upload_collection(
    "startups", 
    vectors=named_vectors(vectors, sparse_vectors), 
    payload=payload
)

upload_collection 方法将对所有文档进行编码并将其上传到 Qdrant。

parallel 参数启用数据并行处理,而不是使用内置的 ONNX 并行处理。

此外,如果您想在以后使用 ID 来更新或删除文档,可以为每个文档指定 ID。如果您不指定 ID,它们将自动生成。

您可以通过将 tqdm 进度条传递给 upload_collection 方法来监控编码进度。

from tqdm import tqdm

client.upload_collection(
    collection_name="startups",
    vectors=documents,
    payload=metadata,
    ids=tqdm(range(len(documents))),
)

构建搜索 API

现在所有准备工作都已完成,让我们开始构建神经网络搜索类。

为了处理传入的请求,混合搜索类需要 3 件事:1) 将查询转换为向量的模型,2) 执行搜索查询的 Qdrant 客户端,3) 用于对密集和稀疏搜索结果进行重排序的融合函数。

Qdrant 支持 2 种用于组合结果的融合函数:倒数排名融合 (Reciprocal Rank Fusion)基于分布的得分融合 (Distribution Based Score Fusion)

  1. 创建一个名为 hybrid_searcher.py 的文件并指定以下内容。
from qdrant_client import QdrantClient, models


class HybridSearcher:
    DENSE_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
    SPARSE_MODEL = "prithivida/Splade_PP_en_v1"
    
    def __init__(self, collection_name):
        self.collection_name = collection_name
        self.qdrant_client = QdrantClient()
  1. 编写搜索函数。
def search(self, text: str):
    search_result = self.qdrant_client.query_points(
        collection_name=self.collection_name,
        query=models.FusionQuery(
            fusion=models.Fusion.RRF  # we are using reciprocal rank fusion here
        ),
        prefetch=[
            models.Prefetch(
                query=models.Document(text=text, model=self.DENSE_MODEL),
                using=dense_vector_name,
            ),
            models.Prefetch(
                query=models.Document(text=text, model=self.SPARSE_MODEL),
                using=sparse_vector_name,
            ),
        ],
        query_filter=None,  # If you don't want any filters for now
        limit=5,  # 5 the closest results
    ).points
    # `search_result` contains models.QueryResponse structure
    # We can access list of scored points with the corresponding similarity scores,
    # vectors (if `with_vectors` was set to `True`), and payload via `points` attribute.

    # Select and return metadata
    metadata = [point.payload for point in search_result]
    return metadata
  1. 添加搜索过滤器。

使用 Qdrant,还可以向搜索添加一些条件。例如,如果您想在特定城市搜索创业公司,搜索查询可能如下所示

    ...

    city_of_interest = "Berlin"

    # Define a filter for cities
    city_filter = models.Filter(
        must=[
            models.FieldCondition(
                key="city", 
                match=models.MatchValue(value=city_of_interest)
            )
        ]
    )

    # NOTE: it is not a hybrid search! It's just a dense query for simplicity
    search_result = self.qdrant_client.query_points(
        collection_name=self.collection_name,
        query=models.Document(text=text, model=self.DENSE_MODEL),
        query_filter=city_filter,
        limit=5
    ).points
    ...

您现在已经创建了一个用于神经搜索查询的类。现在将其包装成一个服务。

使用 FastAPI 部署搜索服务

要构建服务,您将使用 FastAPI 框架。

  1. 安装 FastAPI。

要安装它,请使用命令

pip install fastapi uvicorn
  1. 实现服务。

创建一个名为 service.py 的文件并指定以下内容。

该服务将只有一个 API 端点,如下所示

from fastapi import FastAPI

# The file where HybridSearcher is stored
from hybrid_searcher import HybridSearcher

app = FastAPI()

# Create a neural searcher instance
hybrid_searcher = HybridSearcher(collection_name="startups")


@app.get("/api/search")
def search_startup(q: str):
    return {"result": hybrid_searcher.search(text=q)}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)
  1. 运行服务。
python service.py
  1. 在浏览器中打开 https://:8000/docs

您应该能够看到服务的调试接口。

FastAPI Swagger interface

随意尝试,针对我们语料库中的公司进行查询,并查看结果。

加入我们的 Discord 社区,我们会在那里讨论向量搜索和相似度学习,并发布其他神经网络和神经搜索应用的示例。

此页面有用吗?

感谢您的反馈!🙏

听到这个消息我们很遗憾。😔 您可以在 GitHub 上编辑此页面,或创建一个 GitHub Issue。