Apache Spark

Spark 是一种专为大数据处理和分析设计的分布式计算框架。Qdrant-Spark 连接器使 Qdrant 能够成为 Spark 中的存储目的地。

安装

要将连接器集成到您的 Spark 环境中,请从下面列出的来源之一获取 JAR 文件。

  • GitHub 发行版

包含所有必需依赖项的打包好的 jar 文件可以在此处找到。

  • 从源代码构建

要从源代码构建 jar,您需要安装 JDK@8Maven。满足要求后,在项目根目录中运行以下命令。

mvn package -DskipTests

JAR 文件默认会写入 target 目录。

  • Maven Central

在 Maven Central 此处找到该项目。

用法

创建支持 Qdrant 的 Spark 会话

from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
        "spark.jars",
        "path/to/file/spark-VERSION.jar",  # Specify the path to the downloaded JAR file
    )
    .master("local[*]")
    .appName("qdrant")
    .getOrCreate()
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .config("spark.jars", "path/to/file/spark-VERSION.jar") // Specify the path to the downloaded JAR file
  .master("local[*]")
  .appName("qdrant")
  .getOrCreate()
import org.apache.spark.sql.SparkSession;

public class QdrantSparkJavaExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .config("spark.jars", "path/to/file/spark-VERSION.jar") // Specify the path to the downloaded JAR file
                .master("local[*]")
                .appName("qdrant")
                .getOrCreate(); 
    }
}

加载数据

在使用此连接器加载数据之前,必须预先创建一个具有适当向量维度和配置的集合。

该连接器支持摄取多个命名/未命名、密集/稀疏向量。

点击每个项展开。

未命名/默认向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
命名向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("vector_name", <VECTOR_NAME>)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

注意

保留 embedding_fieldvector_name 选项是为了向后兼容。建议使用 vector_fieldsvector_names 处理命名向量,如下所示。

多个命名向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
稀疏向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
多个稀疏向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
命名密集向量和稀疏向量的组合
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
多向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("multi_vector_fields", "<COLUMN_NAME>")
   .option("multi_vector_names", "<MULTI_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
多个多向量
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("multi_vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("multi_vector_names", "<MULTI_VECTOR_NAME>,<ANOTHER_MULTI_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
无向量 - 整个 dataframe 存储为 载荷
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

Databricks

您可以在 Databricks 中将 qdrant-spark 连接器用作库。

  • 在您的 Databricks 集群仪表板中,前往 Libraries 部分。
  • 选择 Install New 以打开库安装模态框。
  • 在 Maven 包中搜索 io.qdrant:spark:VERSION 并点击 Install

Databricks

数据类型支持

根据提供的 schema,适当的 Spark 数据类型会被映射到 Qdrant 的 载荷。

选项和 Spark 类型

选项描述列数据类型必需
qdrant_urlQdrant 实例的 gRPC URL。例如:http://localhost:6334-
collection_name要写入数据的集合名称-
schemadataframe schema 的 JSON 字符串-
embedding_field包含 embedding 的列名(已弃用 - 请改用 vector_fieldsArrayType(FloatType)
id_field包含点 ID 的列名。默认:随机 UUIDStringTypeIntegerType
batch_size上传批次的最大大小。默认:64-
retries上传重试次数。默认:3-
api_key用于身份验证的 Qdrant API 密钥-
vector_name集合中向量的名称。-
vector_fields包含向量的列名,用逗号分隔。ArrayType(FloatType)
vector_names集合中向量的名称,用逗号分隔。-
sparse_vector_index_fields包含稀疏向量索引的列名,用逗号分隔。ArrayType(IntegerType)
sparse_vector_value_fields包含稀疏向量值的列名,用逗号分隔。ArrayType(FloatType)
sparse_vector_names集合中稀疏向量的名称,用逗号分隔。-
multi_vector_fields包含多向量值的列名,用逗号分隔。ArrayType(ArrayType(FloatType))
multi_vector_names集合中多向量的名称,用逗号分隔。-
shard_key_selector在 upsert 过程中使用的自定义分片键名称,用逗号分隔。-
wait等待每个批次 upsert 完成。 truefalse。默认为 true-

更多信息,请务必查看Qdrant-Spark GitHub 仓库。 Apache Spark 指南可在此处获取。祝您数据处理愉快!

此页面有用吗?

感谢您的反馈! 🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑此页面,或者创建一个 GitHub issue。