Apache Airflow
Apache Airflow 是一个开源平台,用于编写、调度和监控数据和计算工作流。Airflow 使用 Python 创建工作流,这些工作流可以轻松地进行调度和监控。
Qdrant 可作为 Airflow 中的提供者,用于与数据库进行接口。
先决条件
在配置 Airflow 之前,您需要
安装
您可以通过在 Airflow shell 中运行 pip install apache-airflow-providers-qdrant 来安装 Qdrant 提供者。
注意:您必须重启 Airflow 会话才能使提供者可用。
设置连接
打开 Airflow UI 的 Admin-> Connections 部分。点击 Create 链接以创建新的Qdrant 连接。

Qdrant Hook
Airflow Hook 是一个特定 API 的抽象,允许 Airflow 与外部系统交互。
from airflow.providers.qdrant.hooks.qdrant import QdrantHook
hook = QdrantHook(conn_id="qdrant_connection")
hook.verify_connection()
一个 qdrant_client#QdrantClient 实例可通过 QdrantHook 实例的 @property conn 获得,用于您的 Airflow 工作流。
from qdrant_client import models
hook.conn.count("<COLLECTION_NAME>")
hook.conn.upsert(
"<COLLECTION_NAME>",
points=[
models.PointStruct(id=32, vector=[0.32, 0.12, 0.123], payload={"color": "red"})
],
)
Qdrant 摄取操作符
Qdrant 提供者还提供了一个方便的操作符,用于将数据上传到 Qdrant 集合,该操作符内部使用 Qdrant hook。
from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator
vectors = [
[0.11, 0.22, 0.33, 0.44],
[0.55, 0.66, 0.77, 0.88],
[0.88, 0.11, 0.12, 0.13],
]
ids = [32, 21, "b626f6a9-b14d-4af9-b7c3-43d8deb719a6"]
payload = [{"meta": "data"}, {"meta": "data_2"}, {"meta": "data_3", "extra": "data"}]
QdrantIngestOperator(
conn_id="qdrant_connection",
task_id="qdrant_ingest",
collection_name="<COLLECTION_NAME>",
vectors=vectors,
ids=ids,
payload=payload,
)