Databricks 上的 Qdrant

时长:30 分钟难度:中等完整 Notebook

Databricks 是一个用于处理大数据和 AI 的统一分析平台。它基于 Apache Spark 构建,后者是一个强大的开源分布式计算系统,非常适合处理大规模数据集和执行复杂的分析任务。

Apache Spark 设计用于横向扩展,这意味着它可以通过在机器集群上分配计算来处理生成向量嵌入等耗时操作。这种可扩展性在处理大型数据集时至关重要。

在本例中,我们将演示如何使用 Qdrant 的 FastEmbed 库对数据集生成密集和稀疏嵌入。然后,我们将使用 Databricks 上的 Qdrant Spark 连接器 将这些向量化数据加载到 Qdrant 集群中。

设置 Databricks 项目

  • 按照官方文档指南设置一个Databricks 集群

  • 安装 Qdrant Spark 连接器 作为库

    • 在集群仪表盘中导航到Libraries(库)部分。

    • 点击右上角的Install New(安装新的)打开库安装模态框。

    • 在 Maven 包中搜索io.qdrant:spark:VERSION并点击Install(安装)。

      Install the library

  • 在您的集群上创建一个新的Databricks notebook,开始使用您的数据和库。

下载数据集

  • 安装所需的依赖项
%pip install fastembed datasets
  • 下载数据集
from datasets import load_dataset

dataset_name = "tasksource/med"
dataset = load_dataset(dataset_name, split="train")
# We'll use the first 100 entries from this dataset and exclude some unused columns.
dataset = dataset.select(range(100)).remove_columns(["gold_label", "genre"])
  • 将数据集转换为 Spark 数据框
dataset.to_parquet("/dbfs/pq.pq")
dataset_df = spark.read.parquet("file:/dbfs/pq.pq")

向量化数据

在本节中,我们将使用 FastEmbed 为我们的行生成密集和稀疏向量。我们将创建一个用户自定义函数 (UDF) 来处理此步骤。

创建向量化函数

from fastembed import TextEmbedding, SparseTextEmbedding

def vectorize(partition_data):
    # Initialize dense and sparse models
    dense_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
    sparse_model = SparseTextEmbedding(model_name="Qdrant/bm25")

    for row in partition_data:
        # Generate dense and sparse vectors
        dense_vector = next(dense_model.embed(row.sentence1))
        sparse_vector = next(sparse_model.embed(row.sentence2))

        yield [
            row.sentence1,  # 1st column: original text
            row.sentence2,  # 2nd column: original text
            dense_vector.tolist(),  # 3rd column: dense vector
            sparse_vector.indices.tolist(),  # 4th column: sparse vector indices
            sparse_vector.values.tolist(),  # 5th column: sparse vector values
        ]

我们使用 BAAI/bge-small-en-v1.5 模型生成密集嵌入,使用 BM25 生成稀疏嵌入。

在数据框上应用 UDF

接下来,将我们的vectorize UDF 应用到 Spark 数据框上以生成嵌入。

embeddings = dataset_df.rdd.mapPartitions(vectorize)

mapPartitions() 方法返回一个 弹性分布式数据集 (RDD),然后应将其转换回 Spark 数据框。

使用向量化数据构建新的 Spark 数据框

现在,我们将使用指定的 schema 构建一个新的 Spark 数据框 (embeddings_df),其中包含向量化数据。

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType

# Define the schema for the new dataframe
schema = StructType([
    StructField("sentence1", StringType()),
    StructField("sentence2", StringType()),
    StructField("dense_vector", ArrayType(FloatType())),
    StructField("sparse_vector_indices", ArrayType(IntegerType())),
    StructField("sparse_vector_values", ArrayType(FloatType()))
])

# Create the new dataframe with the vectorized data
embeddings_df = spark.createDataFrame(data=embeddings, schema=schema)

将数据上传到 Qdrant

  • 创建 Qdrant 集合

    • 遵循文档,使用适当的配置创建集合。以下是一个支持密集和稀疏向量的示例请求:
    PUT /collections/{collection_name}
    {
      "vectors": {
        "dense": {
          "size": 384,
          "distance": "Cosine"
        }
      },
      "sparse_vectors": {
        "sparse": {}
      }
    }
    
  • 将数据框上传到 Qdrant

options = {
    "qdrant_url": "<QDRANT_GRPC_URL>",
    "api_key": "<QDRANT_API_KEY>",
    "collection_name": "<QDRANT_COLLECTION_NAME>",
    "vector_fields": "dense_vector",
    "vector_names": "dense",
    "sparse_vector_value_fields": "sparse_vector_values",
    "sparse_vector_index_fields": "sparse_vector_indices",
    "sparse_vector_names": "sparse",
    "schema": embeddings_df.schema.json(),
}

embeddings_df.write.format("io.qdrant.spark.Qdrant").options(**options).mode(
    "append"
).save()

请务必将占位符值 (<QDRANT_GRPC_URL>, <QDRANT_API_KEY>, <QDRANT_COLLECTION_NAME>) 替换为您实际的值。如果未指定 id_field 选项,Qdrant Spark 连接器将为每个点生成随机 UUID。

您应看到的命令输出类似于:

Command took 40.37 seconds -- by xxxxx90@xxxxxx.com at 4/17/2024, 12:13:28 PM on fastembed

结论

我们的教程到此结束!欢迎您探索 Databricks、Spark 和 Qdrant 中提供的更多功能,并尝试不同的模型、参数和特性。

数据工程愉快!

此页面是否有用?

感谢您的反馈! 🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑此页面,或创建 GitHub Issue。