HoneyHive

HoneyHive 是一个用于生成式 AI 应用程序的 AI 评估和可观测性平台。HoneyHive 的平台为开发者提供企业级工具,以调试复杂的检索管道、评估大型测试套件的性能、实时监控使用情况以及在共享工作区中管理提示。团队使用 HoneyHive 更快地迭代、大规模检测故障并交付卓越的 AI 产品。

通过将 Qdrant 与 HoneyHive 集成,您可以:

  • 跟踪向量数据库操作
  • 监控延迟、嵌入质量和上下文相关性
  • 评估 RAG 管道中的检索性能
  • 优化参数,例如 chunk_sizechunk_overlap

先决条件

  • 一个 HoneyHive 账户和 API 密钥
  • Python 3.8+

安装

安装所需的包

pip install qdrant-client openai honeyhive

基本集成示例

以下示例演示了一个完整的 RAG 管道,其中包含用于 Qdrant 操作的 HoneyHive 跟踪。我们将逐步分解每个组件。

初始化客户端和设置

首先,为 HoneyHive、OpenAI 和 Qdrant 设置必要的客户端和配置

from qdrant_client import QdrantClient
from qdrant_client.http.models import PointStruct, VectorParams, Distance
import openai
import os
from honeyhive.tracer import HoneyHiveTracer
from honeyhive.tracer.custom import trace
from openai import OpenAI

# Set API Keys
openai.api_key = os.getenv("OPENAI_API_KEY")
honeyhive_api_key = os.getenv("HONEYHIVE_API_KEY")

# Initialize HoneyHive Tracer
HoneyHiveTracer.init(
    api_key=honeyhive_api_key,
    project="qdrant-rag-example",
    session_name="qdrant-integration-demo"
)

# Initialize OpenAI client
openai_client = OpenAI(api_key=openai.api_key)

连接到 Qdrant

您可以通过两种方式连接到 Qdrant:自托管(本地)或云托管(Qdrant Cloud)

选项 1:自托管 Qdrant(本地)

要在本地运行 Qdrant,您需要安装 Docker 并运行以下命令

docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 -v "$(pwd)/qdrant_storage:/qdrant/storage" qdrant/qdrant

然后连接到本地 Qdrant 实例

# Connect to local Qdrant
client = QdrantClient(url="https://:6333")
print("Connected to local Qdrant instance")

选项 2:Qdrant Cloud

对于 Qdrant Cloud,您需要集群主机和 API 密钥

# Qdrant Cloud configuration
QDRANT_HOST = os.getenv("QDRANT_HOST")  # e.g., "your-cluster-id.eu-central.aws.cloud.qdrant.io"
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")

# Connect to Qdrant Cloud
client = QdrantClient(url=QDRANT_HOST, api_key=QDRANT_API_KEY)
print("Connected to Qdrant Cloud")

创建集合

创建一个集合来存储文档嵌入

collection_name = "documents"
vector_size = 1536  # For text-embedding-3-small
vector_distance = Distance.COSINE

# Create collection if it doesn't exist
if not client.collection_exists(collection_name):
    client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=vector_size, distance=vector_distance)
    )

使用跟踪定义嵌入函数

创建一个函数来生成带有 HoneyHive 跟踪的嵌入

@trace()
def embed_text(text: str) -> list:
    """Generate embeddings for a text using OpenAI's API."""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

使用跟踪插入文档

创建一个函数来将文档插入到 Qdrant 中并进行跟踪

@trace()
def insert_documents(docs):
    """Insert documents into Qdrant collection."""
    points = []
    for idx, doc in enumerate(docs):
        vector = embed_text(doc)
        points.append(PointStruct(
            id=idx + 1,
            vector=vector,
            payload={"text": doc}
        ))
    
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    return len(points)

# Sample documents
documents = [
    "Qdrant is a vector database optimized for storing and searching high-dimensional vectors.",
    "HoneyHive provides observability for AI applications, including RAG pipelines.",
    "Retrieval-Augmented Generation (RAG) combines retrieval systems with generative models.",
    "Vector databases like Qdrant are essential for efficient similarity search in RAG systems.",
    "OpenAI's embedding models convert text into high-dimensional vectors for semantic search."
]

# Insert documents
num_inserted = insert_documents(documents)

使用跟踪检索文档

创建一个函数来从 Qdrant 中检索相关文档并进行跟踪

@trace()
def get_relevant_docs(query: str, top_k: int = 3) -> list:
    """Retrieve relevant documents for a query."""
    # Embed the query
    q_vector = embed_text(query)
    
    # Search in Qdrant
    search_response = client.query_points(
        collection_name=collection_name,
        query=q_vector,
        limit=top_k,
        with_payload=True
    )
    
    # Extract results
    docs = []
    for point in search_response.points:
        docs.append({
            "id": point.id,
            "text": point.payload.get("text"),
            "score": point.score
        })
    
    return docs

使用跟踪生成响应

创建一个函数来使用 OpenAI 生成响应并进行跟踪

@trace()
def answer_query(query: str, relevant_docs: list) -> str:
    """Generate an answer for a query using retrieved documents."""
    if not relevant_docs:
        return "Could not retrieve relevant documents to answer the query."

    # Format context from retrieved documents
    context_parts = []
    for i, doc in enumerate(relevant_docs):
        context_parts.append(f"Document {i+1} (ID: {doc['id']}, Score: {doc['score']:.4f}):\n{doc['text']}")
    context = "\n\n".join(context_parts)

    # Create prompt
    prompt = f"""Answer the question based ONLY on the following context:

Context:
{context}

Question: {query}

Answer:"""

    # Generate answer
    completion = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a helpful assistant that answers questions based strictly on the provided context. If the answer is not in the context, say so clearly."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.2
    )

    return completion.choices[0].message.content.strip()

完整的 RAG 管道

创建一个函数来运行完整的 RAG 管道并进行跟踪

@trace()
def rag_pipeline(query: str) -> dict:
    """End-to-end RAG pipeline."""
    # Get relevant documents
    relevant_docs = get_relevant_docs(query)
    
    # Generate answer
    answer = answer_query(query, relevant_docs)
    
    return {
        "query": query,
        "answer": answer,
        "retrieved_documents": relevant_docs
    }

批量处理

对于大型文档集,您可以使用批量处理来提高性能

@trace()
def batch_insert_documents(documents_to_insert, batch_size=10, start_id_offset=0):
    """Insert documents in batches."""
    total_inserted = 0
    
    for i in range(0, len(documents_to_insert), batch_size):
        batch_docs = documents_to_insert[i:i+batch_size]
        points = []
        
        for local_idx, doc in enumerate(batch_docs):
            relative_idx = i + local_idx
            vector = embed_text(doc)
            point_id = relative_idx + start_id_offset + 1
            points.append(PointStruct(
                id=point_id,
                vector=vector,
                payload={"text": doc}
            ))
        
        if points:
            client.upsert(
                collection_name=collection_name,
                points=points
            )
            total_inserted += len(points)
    
    return total_inserted

测试 RAG 管道

以下是测试完整 RAG 管道的方法

# Test query
test_query = "What is Qdrant used for?"
result = rag_pipeline(test_query)

print(f"Query: {result['query']}")
print(f"Answer: {result['answer']}")
print("\nRetrieved Documents:")
for i, doc in enumerate(result['retrieved_documents']):
    print(f"Document {i+1} (ID: {doc['id']}, Score: {doc['score']:.4f}): {doc['text']}")

在 HoneyHive 中查看跟踪

运行 Qdrant 的 RAG 管道后,您可以在 HoneyHive UI 中查看跟踪

  1. 导航到 HoneyHive 控制面板中的项目
  2. 单击“跟踪”选项卡以查看 RAG 管道中的所有跟踪
  3. 单击特定跟踪以查看管道中每个步骤的详细信息
  4. 分析向量操作、嵌入和检索过程的性能

借助 HoneyHive,您可以轻松监控和优化基于 Qdrant 的 RAG 管道,确保它为您的用户提供最佳结果。

延伸阅读

此页面有用吗?

感谢您的反馈!🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑此页面,或者创建一个 GitHub 问题。