Calendar 第 2 天

演示:HNSW性能调优

了解如何通过 HNSW 调优和在真实 10 万数据集上进行有效载荷索引来提高向量搜索速度。

在 Colab 中跟随: 在 Colab 中打开

您将要做什么

昨天您学习了 HNSW 索引背后的理论。今天,您将在一个 100,000 向量的数据集上看到它的实际应用,测量性能差异并应用在生产环境中有效的优化策略。

您将学习

  • 通过战略性 HNSW 配置优化批量上传速度
  • 测量有效载荷索引对性能的影响
  • 调整 HNSW 参数
  • 比较全扫描与 HNSW 搜索性能

性能挑战

处理 10 万个高维向量(来自 OpenAI 的 text-embedding-3-large 的 1536 维度)带来了真正的性能挑战

  • 上传速度:我们能以多快的速度摄取向量?
  • 搜索速度:我们能以多快的速度找到相似向量?
  • 过滤速度:有效载荷过滤器增加了多少开销?
  • 内存效率:不同配置如何影响 RAM 需求?

步骤 1:环境设置

安装所需库

库用途

datasets:访问 Hugging Face 数据集,特别是我们的 DBpedia 10 万数据集
qdrant-client:用于向量搜索操作的官方 Qdrant Python 客户端
tqdm:用于批量操作的进度条(对于 10 万上传跟踪至关重要)
openai:生成与数据集兼容的查询嵌入

设置 API 密钥

您将需要一个 OpenAI API 密钥用于查询嵌入

  • 访问 OpenAI 的 API 平台
  • 创建账户或登录
  • 导航到 API 密钥并创建新密钥
  • 重要:您的 OpenAI 账户中需要有积分(此演示 ~$1 应该足够)

环境配置

在您的项目目录中创建 .env 文件或使用 Google Colab secrets。

# .env file
QDRANT_URL=https://your-cluster-url.cloud.qdrant.io
QDRANT_API_KEY=your-qdrant-api-key-here
OPENAI_API_KEY=sk-your-openai-api-key-here

安全提示:切勿提交 .env 文件。

步骤 2:连接到 Qdrant Cloud

我们将使用 Qdrant Cloud 在 10 万规模上获得稳定的资源。

from datasets import load_dataset
from qdrant_client import QdrantClient, models
from tqdm import tqdm
import openai
import time
import os

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))

# Verify connection
try:
    collections = client.get_collections()
    print(f"Connected to Qdrant Cloud successfully!")
    print(f"Current collections: {len(collections.collections)}")
except Exception as e:
    print(f"Connection failed: {e}")
    print("Check your QDRANT_URL and QDRANT_API_KEY in .env file")

为什么选择云

  • 便利性:无需本地设置的麻烦
  • 免费层级:使用 10 万数据集,我们完全在免费层级内
  • 真实测试:类似生产环境,用于准确的基准测试
  • 可扩展性:以后易于扩展

步骤 3:加载 DBpedia 数据集

我们正在使用一个精心策划的包含 10 万维基百科文章的数据集,这些文章具有来自 OpenAI 的 text-embedding-3-large 模型的预计算 1536 维嵌入

# Load the dataset (this may take a few minutes for first download)
print("Loading DBpedia 100K dataset...")
ds = load_dataset("Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-100K")
collection_name = "dbpedia_100K"

print("Dataset loaded successfully!")
print(f"Dataset size: {len(ds['train'])} articles")

# Explore the dataset structure
print("\nDataset structure:")
print("Available columns:", ds["train"].column_names)

# Look at a sample entry
sample = ds["train"][0]
print(f"\nSample article:")
print(f"Title: {sample['title']}")
print(f"Text preview: {sample['text'][:200]}...")
print(f"Embedding dimensions: {len(sample['text-embedding-3-large-1536-embedding'])}")

关于此数据集

  • 来源Hugging Face DBpedia 数据集
  • 内容:预计算的维基百科文章嵌入
  • 大小:100,000 篇文章
  • 嵌入:使用 OpenAI 的 text-embedding-3-large 截断到 1536 维度
  • 元数据_idtitletext

步骤 4:战略性集合创建

设置 m=0 以在批量上传期间跳过 HNSW 图链接。在摄取后切换到正常的 m 以构建图。这将使插入速度提高 5-10 倍,因为链接创建被延迟了。

警告: 如果您关心保留 HNSW 索引,请不要在已拥有 HNSW 索引的集合上切换回 m=0。从头开始重建速度慢且消耗更多资源。

# Delete collection if it exists (for clean restart)
try:
    client.delete_collection(collection_name)
    print(f"Deleted existing collection: {collection_name}")
except Exception:
    pass  # Collection doesn't exist, which is fine

# Create collection with optimized settings
print(f"Creating collection: {collection_name}")

client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(
        size=1536,  # Matches dataset dims
        distance=models.Distance.COSINE,  # Good for normalized embeddings
    ),
    hnsw_config=models.HnswConfigDiff(
        m=0,  # Bulk load fast: m=0 (build links after ingest).
        ef_construct=100,  # Build quality: used after we set m>0
        full_scan_threshold=10,  # force HNSW instead of full scan
    ),
    optimizers_config=models.OptimizersConfigDiff(
        indexing_threshold=10
    ),  # Force indexing even on small sets for demo
    strict_mode_config=models.StrictModeConfig(
        enabled=False,
    ),  # More flexible while testing
)

print(f"Collection '{collection_name}' created successfully!")

# Verify collection settings
collection_info = client.get_collection(collection_name)
print(f"Vector size: {collection_info.config.params.vectors.size}")
print(f"Distance metric: {collection_info.config.params.vectors.distance}")
print(f"HNSW m: {collection_info.config.hnsw_config.m}")

配置详情

  • size=1536:与我们为 OpenAI text-embedding-3-large 设置的维度参数匹配
  • distance=COSINE:归一化嵌入和语义相似度的标准
  • full_scan_threshold=10000:对较小的结果集使用精确搜索
  • strict_mode_config:托管云默认以严格模式运行。我们设置 enabled=False 以允许您在演示期间试验未索引的有效载荷键。

附注: text-embedding-3-large 输出 3072 维度。将其截断到仅 1536 维度可以减少计算和内存,但会损失一些精度。

步骤 5:批量上传富有效载荷

我们将以 10K 批次上传 10 万个向量。有效载荷包括 titlelengthhas_numbers 用于过滤器测试。

def upload_batch(start_idx, end_idx):
    points = []
    for i in range(start_idx, min(end_idx, total_points)):
        example = ds["train"][i]

        # Get the pre-computed embedding
        embedding = example["text-embedding-3-large-1536-embedding"]

        # Create payload with fields for filtering tests
        payload = {
            "text": example["text"],
            "title": example["title"],
            "_id": example["_id"],
            "length": len(example["text"]),
            "has_numbers": any(char.isdigit() for char in example["text"]),
        }

        points.append(models.PointStruct(id=i, vector=embedding, payload=payload))

    if points:
        client.upload_points(collection_name=collection_name, points=points)
        return len(points)
    return 0


batch_size = 64 * 10
total_points = len(ds["train"])
print(f"Uploading {total_points} points in batches of {batch_size}")

# Upload all batches with progress tracking
total_uploaded = 0
for i in tqdm(range(0, total_points, batch_size), desc="Uploading points"):
    uploaded = upload_batch(i, i + batch_size)
    total_uploaded += uploaded

print(f"Upload completed! Total points uploaded: {total_uploaded}")

步骤 6:启用 HNSW 索引

现在从 m=0 切换到 m=16 以构建 HNSW 连接并缩短搜索时间。

client.update_collection(
    collection_name=collection_name,
    hnsw_config=models.HnswConfigDiff(
        m=16  # Build HNSW now: m=16 after the bulk load.
    ),
)

print("HNSW indexing enabled with m=16")

现在发生了什么? Qdrant 构建了一个可导航的图,因此搜索变得接近对数级而不是线性扫描。

步骤 7:创建查询嵌入

我们需要使用与数据集相同的模型和维度以确保兼容性。

如果您没有 OpenAI 密钥,请使用下面注释掉的备用方案。

# Optional fallback without an API key:
# import requests
# test_query = "artificial intelligence"
# url = "https://storage.googleapis.com/qdrant-examples/query_embedding_day_2.json"
# resp = requests.get(url)
# query_embedding = resp.json()["query_vector"]
# print(f"Generated embedding for: '{test_query}'")
# print(f"Embedding dimensions: {len(query_embedding)}")
# print(f"First 5 values: {query_embedding[:5]}")


# Initialize OpenAI client
openai_client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY'))

# for colab:
# openai_client = openai.OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

def get_query_embedding(text):
    """Generate embedding using the same model as the dataset"""
    try:
        response = openai_client.embeddings.create(
            model="text-embedding-3-large",  # Must match dataset model
            input=text,
            dimensions=1536  # Must match dataset dimensions
        )
        return response.data[0].embedding
    except Exception as e:
        print(f"Error getting OpenAI embedding: {e}")
        print("Common issues:")
        print("   - Check your OPENAI_API_KEY in .env file")
        print("   - Ensure you have credits in your OpenAI account")
        print("   - Verify your API key has embedding permissions")
        print("Using random vector as fallback for demo purposes...")
        import numpy as np
        return np.random.normal(0, 1, 1536).tolist()

# Test embedding generation
print("Generating query embedding...")
test_query = "artificial intelligence"
query_embedding = get_query_embedding(test_query)
print(f"Generated embedding for: '{test_query}'")
print(f"Embedding dimensions: {len(query_embedding)}")
print(f"First 5 values: {query_embedding[:5]}")

查询嵌入兼容性

  • 模型:必须使用 text-embedding-3-large(与数据集相同)
  • 维度:必须是 1536(与数据集相同)

步骤 8:基准性能测试

让我们测量 HNSW 启用集合上的搜索性能。

print("Running baseline performance test...")

# Warm up the RAM index/vectors cache with a test query
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

# Measure vector search performance
search_times = []
for _ in range(25):  # Multiple runs for a stable average
    start_time = time.time()
    response = client.query_points(
        collection_name=collection_name, query=query_embedding, limit=10
    )
    search_time = (time.time() - start_time) * 1000
    search_times.append(search_time)

baseline_time = sum(search_times) / len(search_times)

print(f"Average search time: {baseline_time:.2f}ms")
print(f"Search times: {[f'{t:.2f}ms' for t in search_times]}")
print(f"Found {len(response.points)} results")
print(
    f"Top result: '{response.points[0].payload['title']}' (score: {response.points[0].score:.4f})"
)

# Show a few more results for context
print(f"\nTop 3 results:")
for i, point in enumerate(response.points[:3], 1):
    title = point.payload["title"]
    score = point.score
    text_preview = point.payload["text"][:100] + "..."
    print(f"   {i}. {title} (score: {score:.4f})")
    print(f"      {text_preview}")

性能因素

  • 缓存预热:第一次查询将相关的索引部分/向量加载到内存中,随后的查询速度更快
  • HNSW 与 m=16:基于图的搜索比全扫描快得多
  • 重复运行:多次查询的平均值提供更可靠的计时结果

步骤 9:不带有效载荷索引的过滤

现在,让我们测试没有索引的过滤性能。这迫使 Qdrant 扫描向量并针对过滤器检查每个向量

print("Testing filtering without payload indexes")

# Warning: We enable unindexed_filtering_retrieve only for demonstration purposes. In production, don’t use it.
# Demo only: allow filtering without an index by scanning. Turn this off later.
client.update_collection(
    collection_name=collection_name,
    strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=True),
)

# Create a text-based filter
text_filter = models.Filter(
    must=[models.FieldCondition(key="text", match=models.MatchText(text="data"))]
)

# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

# Run multiple times for more reliable measurement
unindexed_times = []
for i in range(25):
    start_time = time.time()
    response = client.query_points(
        collection_name=collection_name,
        query=query_embedding,
        limit=10,
        search_params=models.SearchParams(hnsw_ef=100),
        query_filter=text_filter,
    )
    unindexed_times.append((time.time() - start_time) * 1000)

unindexed_filter_time = sum(unindexed_times) / len(unindexed_times)

print(f"Filtered search (WITHOUT index): {unindexed_filter_time:.2f}ms")
print(f"Individual times: {[f'{t:.2f}ms' for t in unindexed_times]}")
print(f"Overhead vs baseline: {unindexed_filter_time - baseline_time:.2f}ms")
print(f"Found {len(response.points)} matching results")
if response.points:
    print(
        f"Top result: '{response.points[0].payload['text']}'\nScore: {response.points[0].score:.4f}"
    )
else:
    print("No results found - try a different filter term")

步骤 10:创建有效载荷索引

创建一个全文索引以加快过滤速度。

# Create a payload index for 'text' so filters use an index, not a scan.
client.create_payload_index(
    collection_name=collection_name,
    field_name="text",
    wait=True,
    field_schema=models.TextIndexParams(
        type="text", tokenizer="word", phrase_matching=False
    ),
)

client.update_collection(
    collection_name=collection_name,
    hnsw_config=models.HnswConfigDiff(
        ef_construct=101
    ),  # Added payload index after HNSW; bump ef_construct (+1) to rebuild with filter data.
    strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=False),
)

print("Payload index created for 'text' field")

步骤 11:带有效载荷索引的过滤

使用索引运行相同的查询。

print("Testing filtering WITH payload indexes...")


# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

# Run multiple times for more reliable measurement
indexed_times = []
for i in range(25):
    start_time = time.time()
    response = client.query_points(
        collection_name=collection_name,
        query=query_embedding,
        limit=10,
        search_params=models.SearchParams(hnsw_ef=100),
        query_filter=text_filter,
    )
    indexed_times.append((time.time() - start_time) * 1000)

indexed_filter_time = sum(indexed_times) / len(indexed_times)

print(f"Filtered search (WITH index): {indexed_filter_time:.2f}ms")
print(f"Individual times: {[f'{t:.2f}ms' for t in indexed_times]}")
print(f"Overhead vs baseline: {indexed_filter_time - baseline_time:.2f}ms")
print(f"Found {len(response.points)} matching results")
if response.points:
    print(
        f"Top result: '{response.points[0].payload['text']}'\nScore: {response.points[0].score:.4f}"
    )
else:
    print("No results found - try a different filter term")

性能分析

比较您的结果,查看每次优化的效果

print("\n" + "=" * 60)
print("FINAL PERFORMANCE SUMMARY")
print("=" * 60)

# Key metrics
if unindexed_filter_time > 0 and indexed_filter_time > 0:
    index_speedup = unindexed_filter_time / indexed_filter_time
    filter_overhead_without = unindexed_filter_time - baseline_time
    filter_overhead_with = indexed_filter_time - baseline_time
else:
    index_speedup = 0
    filter_overhead_without = 0
    filter_overhead_with = 0

print(f"Baseline search (HNSW m=16):     {baseline_time:.2f}ms")
print(f"Filtering WITHOUT index:        {unindexed_filter_time:.2f}ms")
print(f"Filtering WITH index:           {indexed_filter_time:.2f}ms")
print("")
print(f"Performance improvements:")
print(f"   • Index speedup:                {index_speedup:.1f}x faster")
print(f"   • Filter overhead (no index):   +{filter_overhead_without:.2f}ms")
print(f"   • Filter overhead (with index): +{filter_overhead_with:.2f}ms")
print("")
print(f"Key insights:")
print(f"   • HNSW (m=16) enables fast vector search")
print(f"   • Payload indexes dramatically improve filtering")
print(f"   • Upload strategy (m=0→m=16) optimizes ingestion")
print("=" * 60)

后续步骤和资源

您学到了什么

  • 通过 m=0m=16 切换对初始批量上传进行战略优化
  • 真实世界性能测量技术
  • 有效载荷索引对过滤的巨大影响
  • 生产就绪的配置模式

建议的下一步

  1. 试验参数:尝试不同的 m 值(8、32、64)和 ef_construct 设置
  2. 用您自己的数据测试:将这些技术应用于您自己的领域数据集
  3. 生产部署:在实际应用程序中使用这些模式
  4. 高级功能:探索量化、分片和复制

其他资源

准备好进行中途项目了吗? 现在轮到您使用自己的数据集和用例来优化性能了。您将把这些相同的技术应用于您的特定领域数据,并测量不同 HNSW 参数和索引策略的实际影响。