HoneyHive

HoneyHive 是一个用于生成式 AI 应用的 AI 评估和可观测性平台。HoneyHive 平台为开发者提供了企业级工具,用于调试复杂的检索管道、评估大型测试套件的性能、实时监控使用情况以及在共享工作空间中管理提示。团队使用 HoneyHive 来更快地迭代、大规模检测故障并交付卓越的 AI 产品。

通过将 Qdrant 与 HoneyHive 集成,您可以

  • 追踪向量数据库操作
  • 监控延迟、嵌入质量和上下文相关性
  • 评估 RAG 管道中的检索性能
  • 优化参数,例如 chunk_sizechunk_overlap

先决条件

  • 一个 HoneyHive 账户和 API 密钥
  • Python 3.8+

安装

安装所需的包

pip install qdrant-client openai honeyhive

基本集成示例

以下示例演示了一个带有 HoneyHive 追踪的完整 RAG 管道,用于 Qdrant 操作。我们将逐步分解每个组件。

初始化客户端和设置

首先,为 HoneyHive、OpenAI 和 Qdrant 设置必要的客户端和配置

from qdrant_client import QdrantClient
from qdrant_client.http.models import PointStruct, VectorParams, Distance
import openai
import os
from honeyhive.tracer import HoneyHiveTracer
from honeyhive.tracer.custom import trace
from openai import OpenAI

# Set API Keys
openai.api_key = os.getenv("OPENAI_API_KEY")
honeyhive_api_key = os.getenv("HONEYHIVE_API_KEY")

# Initialize HoneyHive Tracer
HoneyHiveTracer.init(
    api_key=honeyhive_api_key,
    project="qdrant-rag-example",
    session_name="qdrant-integration-demo"
)

# Initialize OpenAI client
openai_client = OpenAI(api_key=openai.api_key)

连接到 Qdrant

您可以通过两种方式连接到 Qdrant:自托管(本地)或云托管(Qdrant Cloud)

选项 1:自托管 Qdrant(本地)

要在本地运行 Qdrant,您需要安装 Docker 并运行以下命令

docker pull qdrant/qdrant
docker run -p 6333:6333 -p 6334:6334 -v "$(pwd)/qdrant_storage:/qdrant/storage" qdrant/qdrant

然后连接到本地 Qdrant 实例

# Connect to local Qdrant
client = QdrantClient(url="http://localhost:6333")
print("Connected to local Qdrant instance")

选项 2:Qdrant Cloud

对于 Qdrant Cloud,您需要集群主机和 API 密钥

# Qdrant Cloud configuration
QDRANT_HOST = os.getenv("QDRANT_HOST")  # e.g., "your-cluster-id.eu-central.aws.cloud.qdrant.io"
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")

# Connect to Qdrant Cloud
client = QdrantClient(url=QDRANT_HOST, api_key=QDRANT_API_KEY)
print("Connected to Qdrant Cloud")

创建集合

创建一个集合来存储文档嵌入

collection_name = "documents"
vector_size = 1536  # For text-embedding-3-small
vector_distance = Distance.COSINE

# Create collection if it doesn't exist
if not client.collection_exists(collection_name):
    client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=vector_size, distance=vector_distance)
    )

定义带追踪的嵌入函数

创建一个函数来生成带 HoneyHive 追踪的嵌入

@trace()
def embed_text(text: str) -> list:
    """Generate embeddings for a text using OpenAI's API."""
    response = openai_client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

插入带追踪的文档

创建一个函数将带追踪的文档插入到 Qdrant 中

@trace()
def insert_documents(docs):
    """Insert documents into Qdrant collection."""
    points = []
    for idx, doc in enumerate(docs):
        vector = embed_text(doc)
        points.append(PointStruct(
            id=idx + 1,
            vector=vector,
            payload={"text": doc}
        ))
    
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    return len(points)

# Sample documents
documents = [
    "Qdrant is a vector database optimized for storing and searching high-dimensional vectors.",
    "HoneyHive provides observability for AI applications, including RAG pipelines.",
    "Retrieval-Augmented Generation (RAG) combines retrieval systems with generative models.",
    "Vector databases like Qdrant are essential for efficient similarity search in RAG systems.",
    "OpenAI's embedding models convert text into high-dimensional vectors for semantic search."
]

# Insert documents
num_inserted = insert_documents(documents)

检索带追踪的文档

创建一个函数从 Qdrant 检索带追踪的相关文档

@trace()
def get_relevant_docs(query: str, top_k: int = 3) -> list:
    """Retrieve relevant documents for a query."""
    # Embed the query
    q_vector = embed_text(query)
    
    # Search in Qdrant
    search_response = client.query_points(
        collection_name=collection_name,
        query=q_vector,
        limit=top_k,
        with_payload=True
    )
    
    # Extract results
    docs = []
    for point in search_response.points:
        docs.append({
            "id": point.id,
            "text": point.payload.get("text"),
            "score": point.score
        })
    
    return docs

生成带追踪的响应

创建一个函数使用 OpenAI 生成带追踪的响应

@trace()
def answer_query(query: str, relevant_docs: list) -> str:
    """Generate an answer for a query using retrieved documents."""
    if not relevant_docs:
        return "Could not retrieve relevant documents to answer the query."

    # Format context from retrieved documents
    context_parts = []
    for i, doc in enumerate(relevant_docs):
        context_parts.append(f"Document {i+1} (ID: {doc['id']}, Score: {doc['score']:.4f}):\n{doc['text']}")
    context = "\n\n".join(context_parts)

    # Create prompt
    prompt = f"""Answer the question based ONLY on the following context:

Context:
{context}

Question: {query}

Answer:"""

    # Generate answer
    completion = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a helpful assistant that answers questions based strictly on the provided context. If the answer is not in the context, say so clearly."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.2
    )

    return completion.choices[0].message.content.strip()

完整的 RAG 管道

创建一个函数运行带追踪的完整 RAG 管道

@trace()
def rag_pipeline(query: str) -> dict:
    """End-to-end RAG pipeline."""
    # Get relevant documents
    relevant_docs = get_relevant_docs(query)
    
    # Generate answer
    answer = answer_query(query, relevant_docs)
    
    return {
        "query": query,
        "answer": answer,
        "retrieved_documents": relevant_docs
    }

批量处理

对于更大的文档集,您可以使用批量处理来提高性能

@trace()
def batch_insert_documents(documents_to_insert, batch_size=10, start_id_offset=0):
    """Insert documents in batches."""
    total_inserted = 0
    
    for i in range(0, len(documents_to_insert), batch_size):
        batch_docs = documents_to_insert[i:i+batch_size]
        points = []
        
        for local_idx, doc in enumerate(batch_docs):
            relative_idx = i + local_idx
            vector = embed_text(doc)
            point_id = relative_idx + start_id_offset + 1
            points.append(PointStruct(
                id=point_id,
                vector=vector,
                payload={"text": doc}
            ))
        
        if points:
            client.upsert(
                collection_name=collection_name,
                points=points
            )
            total_inserted += len(points)
    
    return total_inserted

测试 RAG 管道

以下是测试完整 RAG 管道的方法

# Test query
test_query = "What is Qdrant used for?"
result = rag_pipeline(test_query)

print(f"Query: {result['query']}")
print(f"Answer: {result['answer']}")
print("\nRetrieved Documents:")
for i, doc in enumerate(result['retrieved_documents']):
    print(f"Document {i+1} (ID: {doc['id']}, Score: {doc['score']:.4f}): {doc['text']}")

在 HoneyHive 中查看追踪

在使用 Qdrant 运行 RAG 管道后,您可以在 HoneyHive UI 中查看追踪

  1. 导航到您在 HoneyHive 面板中的项目
  2. 点击“追踪”选项卡,查看 RAG 管道中的所有追踪
  3. 点击特定的追踪,查看管道中每个步骤的详细信息
  4. 分析向量操作、嵌入和检索过程的性能

借助 HoneyHive,您可以轻松监控和优化由 Qdrant 支持的 RAG 管道,确保它为您的用户提供最佳结果。

进一步阅读

本页是否有用?

感谢您的反馈!🙏

听到这个消息我们很抱歉。😔 您可以在 GitHub 上编辑此页面,或者创建一个 GitHub 问题。