使用 Airflow 和 Astronomer 进行语义查询

时间:45 分钟级别:中级

在本教程中,您将使用 Qdrant 作为 提供程序Apache Airflow 中,Apache Airflow 是一个允许您设置数据工程工作流的开源工具。

您将使用 Python 将管道编写为 DAG(有向无环图)。通过这种方式,您可以利用 Python 强大的功能和库套件,实现数据管道所需的几乎任何功能。

Astronomer 是一个托管平台,通过其易于使用的 CLI 和强大的自动化功能,简化了 Airflow 项目的开发和部署过程。

当基于数据事件在 Qdrant 中运行操作或构建用于生成向量嵌入的并行任务时,Airflow 非常有用。通过使用 Airflow,您可以为管道设置监控和警报,以实现完全的可观测性。

前提条件

请确保您已准备好以下内容

实现

我们将构建一个 DAG,用于并行生成数据语料库的嵌入,并根据用户输入执行语义检索。

设置项目

Astronomer CLI 使设置 Airflow 项目变得非常简单。

mkdir qdrant-airflow-tutorial && cd qdrant-airflow-tutorial
astro dev init

此命令会生成您在本地运行 Airflow 所需的所有项目文件。您可以找到一个名为 dags 的目录,这是我们可以放置 Python DAG 文件的地方。

要在 Airflow 中使用 Qdrant,请通过在 requirements.txt 文件中添加以下内容来安装 Qdrant Airflow provider

apache-airflow-providers-qdrant

配置凭据

我们可以使用 Airflow UI、环境变量或 airflow_settings.yml 文件设置 provider 连接。

将以下内容添加到项目中的 .env 文件。根据您的凭据替换值。

HUGGINGFACE_TOKEN="<YOUR_HUGGINGFACE_ACCESS_TOKEN>"
AIRFLOW_CONN_QDRANT_DEFAULT='{
    "conn_type": "qdrant",
    "host": "xyz-example.eu-central.aws.cloud.qdrant.io:6333",
    "password": "<YOUR_QDRANT_API_KEY>"
}'

添加数据语料库

让我们添加一些示例数据进行操作。将以下内容粘贴到 include 目录下的一个名为 books.txt 的文件中。

1 | To Kill a Mockingbird (1960) | fiction | Harper Lee's Pulitzer Prize-winning novel explores racial injustice and moral growth through the eyes of young Scout Finch in the Deep South.
2 | Harry Potter and the Sorcerer's Stone (1997) | fantasy | J.K. Rowling's magical tale follows Harry Potter as he discovers his wizarding heritage and attends Hogwarts School of Witchcraft and Wizardry.
3 | The Great Gatsby (1925) | fiction | F. Scott Fitzgerald's classic novel delves into the glitz, glamour, and moral decay of the Jazz Age through the eyes of narrator Nick Carraway and his enigmatic neighbour, Jay Gatsby.
4 | 1984 (1949) | dystopian | George Orwell's dystopian masterpiece paints a chilling picture of a totalitarian society where individuality is suppressed and the truth is manipulated by a powerful regime.
5 | The Catcher in the Rye (1951) | fiction | J.D. Salinger's iconic novel follows disillusioned teenager Holden Caulfield as he navigates the complexities of adulthood and society's expectations in post-World War II America.
6 | Pride and Prejudice (1813) | romance | Jane Austen's beloved novel revolves around the lively and independent Elizabeth Bennet as she navigates love, class, and societal expectations in Regency-era England.
7 | The Hobbit (1937) | fantasy | J.R.R. Tolkien's adventure follows Bilbo Baggins, a hobbit who embarks on a quest with a group of dwarves to reclaim their homeland from the dragon Smaug.
8 | The Lord of the Rings (1954-1955) | fantasy | J.R.R. Tolkien's epic fantasy trilogy follows the journey of Frodo Baggins to destroy the One Ring and defeat the Dark Lord Sauron in the land of Middle-earth.
9 | The Alchemist (1988) | fiction | Paulo Coelho's philosophical novel follows Santiago, an Andalusian shepherd boy, on a journey of self-discovery and spiritual awakening as he searches for a hidden treasure.
10 | The Da Vinci Code (2003) | mystery/thriller | Dan Brown's gripping thriller follows symbologist Robert Langdon as he unravels clues hidden in art and history while trying to solve a murder mystery with far-reaching implications.

现在,是编码部分 - 编写我们的 Airflow DAG!

编写 DAG

我们将把以下内容添加到 dags 目录下的一个名为 books_recommend.py 的文件中。让我们来看看每个任务的作用。

import os
import requests

from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.providers.qdrant.hooks.qdrant import QdrantHook
from airflow.providers.qdrant.operators.qdrant import QdrantIngestOperator
from pendulum import datetime
from qdrant_client import models


QDRANT_CONNECTION_ID = "qdrant_default"
DATA_FILE_PATH = "include/books.txt"
COLLECTION_NAME = "airflow_tutorial_collection"

EMBEDDING_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDING_DIMENSION = 384
SIMILARITY_METRIC = models.Distance.COSINE


def embed(text: str) -> list:
    HUGGINFACE_URL = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{EMBEDDING_MODEL_ID}"
    response = requests.post(
        HUGGINFACE_URL,
        headers={"Authorization": f"Bearer {os.getenv('HUGGINGFACE_TOKEN')}"},
        json={"inputs": [text], "options": {"wait_for_model": True}},
    )
    return response.json()[0]


@dag(
    dag_id="books_recommend",
    start_date=datetime(2023, 10, 18),
    schedule=None,
    catchup=False,
    params={"preference": Param("Something suspenseful and thrilling.", type="string")},
)
def recommend_book():
    @task
    def import_books(text_file_path: str) -> list:
        data = []
        with open(text_file_path, "r") as f:
            for line in f:
                _, title, genre, description = line.split("|")
                data.append(
                    {
                        "title": title.strip(),
                        "genre": genre.strip(),
                        "description": description.strip(),
                    }
                )

        return data

    @task
    def init_collection():
        hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)
        if not  hook.conn..collection_exists(COLLECTION_NAME):
            hook.conn.create_collection(
                COLLECTION_NAME,
                vectors_config=models.VectorParams(
                    size=EMBEDDING_DIMENSION, distance=SIMILARITY_METRIC
                ),
            )

    @task
    def embed_description(data: dict) -> list:
        return embed(data["description"])

    books = import_books(text_file_path=DATA_FILE_PATH)
    embeddings = embed_description.expand(data=books)

    qdrant_vector_ingest = QdrantIngestOperator(
        conn_id=QDRANT_CONNECTION_ID,
        task_id="qdrant_vector_ingest",
        collection_name=COLLECTION_NAME,
        payload=books,
        vectors=embeddings,
    )

    @task
    def embed_preference(**context) -> list:
        user_mood = context["params"]["preference"]
        response = embed(text=user_mood)

        return response

    @task
    def search_qdrant(
        preference_embedding: list,
    ) -> None:
        hook = QdrantHook(conn_id=QDRANT_CONNECTION_ID)

        result = hook.conn.query_points(
            collection_name=COLLECTION_NAME,
            query=preference_embedding,
            limit=1,
            with_payload=True,
        ).points

        print("Book recommendation: " + result[0].payload["title"])
        print("Description: " + result[0].payload["description"])

    chain(
        init_collection(),
        qdrant_vector_ingest,
        search_qdrant(embed_preference()),
    )


recommend_book()

import_books:此任务读取一个包含书籍信息(如标题、体裁和描述)的文本文件,然后将数据作为字典列表返回。

init_collection:此任务在 Qdrant 数据库中初始化一个 collection,我们将在此存储书籍描述的向量表示。

embed_description:这是一个动态任务,为列表中的每本书创建一个 mapped task 实例。该任务使用 embed 函数为每个描述生成向量嵌入。要使用不同的嵌入模型,您可以调整 EMBEDDING_MODEL_IDEMBEDDING_DIMENSION 的值。

embed_user_preference:在此,我们接收用户输入,并使用与书籍描述相同的预训练模型将其转换为向量。

qdrant_vector_ingest:此任务使用 QdrantIngestOperator 将书籍数据摄取到 Qdrant collection 中,将每本书籍描述与其对应的向量嵌入关联起来。

search_qdrant:最后,此任务使用向量化的用户偏好在 Qdrant 数据库中执行搜索。它根据向量相似性找到 collection 中最相关的书籍。

运行 DAG

前往您的终端并运行 astro dev start

一个本地 Airflow 容器应该会启动。您现在可以通过 http://localhost:8080 访问 Airflow UI。点击 books_recommend 来访问我们的 DAG。

DAG

点击右侧的 PLAY 按钮运行 DAG。您将被要求输入您的偏好,默认值已预先填写。

Preference

在您的 DAG 运行完成后,您应该能够在 search_qdrant 任务的日志中看到搜索结果。

Output

就这样,一个与 Qdrant 集成的 Airflow 管道就完成了!请随意尝试和探索 Airflow。下面有一些可能会派上用场的参考资料。

延伸阅读

此页面是否有用?

感谢您的反馈!🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑此页面,或创建一个 GitHub issue。