Apache Airflow

Apache Airflow 是一个用于创作、调度和监控数据和计算工作流的开源平台。Airflow 使用 Python 创建可以轻松调度和监控的工作流。

Qdrant 在 Airflow 中作为提供者(provider)可用,用于与数据库交互。

先决条件

在配置 Airflow 之前,你需要

  1. 一个要连接的 Qdrant 实例。你可以在我们的安装指南中进行设置。

  2. 一个正在运行的 Airflow 实例。你可以使用他们的快速入门指南

安装

你可以在 Airflow shell 中运行 pip install apache-airflow-providers-qdrant 来安装 Qdrant 提供者(provider)。

注意:你需要重启 Airflow 会话才能使提供者(provider)可用。

设置连接

打开 Airflow UI 的 Admin-> Connections 部分。点击创建链接创建一个新的 Qdrant 连接。

Qdrant connection

你还可以使用环境变量外部密钥后端设置连接。

Qdrant Hook

Airflow Hook 是对特定 API 的抽象,允许 Airflow 与外部系统交互。

from airflow.providers.qdrant.hooks.qdrant import QdrantHook

hook = QdrantHook(conn_id="qdrant_connection")

hook.verify_connection()

通过 QdrantHook 实例的 @property conn 可以获取 qdrant_client#QdrantClient 实例,以便在 Airflow 工作流中使用。

from qdrant_client import models

hook.conn.count("<COLLECTION_NAME>")

hook.conn.upsert(
    "<COLLECTION_NAME>",
    points=[
        models.PointStruct(id=32, vector=[0.32, 0.12, 0.123], payload={"color": "red"})
    ],
)

Qdrant 摄取操作符

Qdrant 提供者(provider)还提供了一个方便的操作符,用于将数据上传到 Qdrant collection,它内部使用了 Qdrant Hook。

from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator

vectors = [
    [0.11, 0.22, 0.33, 0.44],
    [0.55, 0.66, 0.77, 0.88],
    [0.88, 0.11, 0.12, 0.13],
]
ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"]
payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}]

QdrantIngestOperator(
    conn_id="qdrant_connection",
    task_id="qdrant_ingest",
    collection_name="<COLLECTION_NAME>",
    vectors=vectors,
    ids=ids,
    payload=payload,
)

参考

此页面有用吗?

感谢你的反馈!🙏

很抱歉得知此页面对你没有帮助。😔 你可以在 GitHub 上编辑此页面,或创建 GitHub issue。