Apache Airflow

Apache Airflow 是一个开源平台,用于编写、调度和监控数据和计算工作流。Airflow 使用 Python 创建工作流,这些工作流可以轻松地进行调度和监控。

Qdrant 可作为 Airflow 中的提供者,用于与数据库进行接口。

先决条件

在配置 Airflow 之前,您需要

  1. 一个 Qdrant 实例来连接。您可以在我们的安装指南中设置一个。

  2. 一个正在运行的 Airflow 实例。您可以使用他们的快速入门指南

安装

您可以通过在 Airflow shell 中运行 pip install apache-airflow-providers-qdrant 来安装 Qdrant 提供者。

注意:您必须重启 Airflow 会话才能使提供者可用。

设置连接

打开 Airflow UI 的 Admin-> Connections 部分。点击 Create 链接以创建新的Qdrant 连接

Qdrant connection

您还可以使用环境变量外部秘密后端设置连接。

Qdrant Hook

Airflow Hook 是一个特定 API 的抽象,允许 Airflow 与外部系统交互。

from airflow.providers.qdrant.hooks.qdrant import QdrantHook

hook = QdrantHook(conn_id="qdrant_connection")

hook.verify_connection()

一个 qdrant_client#QdrantClient 实例可通过 QdrantHook 实例的 @property conn 获得,用于您的 Airflow 工作流。

from qdrant_client import models

hook.conn.count("<COLLECTION_NAME>")

hook.conn.upsert(
    "<COLLECTION_NAME>",
    points=[
        models.PointStruct(id=32, vector=[0.32, 0.12, 0.123], payload={"color": "red"})
    ],
)

Qdrant 摄取操作符

Qdrant 提供者还提供了一个方便的操作符,用于将数据上传到 Qdrant 集合,该操作符内部使用 Qdrant hook。

from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator

vectors = [
    [0.11, 0.22, 0.33, 0.44],
    [0.55, 0.66, 0.77, 0.88],
    [0.88, 0.11, 0.12, 0.13],
]
ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"]
payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}]

QdrantIngestOperator(
    conn_id="qdrant_connection",
    task_id="qdrant_ingest",
    collection_name="<COLLECTION_NAME>",
    vectors=vectors,
    ids=ids,
    payload=payload,
)

参考

此页面有用吗?

感谢您的反馈!🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑此页面,或创建一个 GitHub issue。