演示:HNSW性能调优
了解如何通过 HNSW 调优和在真实 10 万数据集上进行有效载荷索引来提高向量搜索速度。
您将要做什么
昨天您学习了 HNSW 索引背后的理论。今天,您将在一个 100,000 向量的数据集上看到它的实际应用,测量性能差异并应用在生产环境中有效的优化策略。
您将学习
- 通过战略性 HNSW 配置优化批量上传速度
- 测量有效载荷索引对性能的影响
- 调整 HNSW 参数
- 比较全扫描与 HNSW 搜索性能
性能挑战
处理 10 万个高维向量(来自 OpenAI 的 text-embedding-3-large 的 1536 维度)带来了真正的性能挑战
- 上传速度:我们能以多快的速度摄取向量?
- 搜索速度:我们能以多快的速度找到相似向量?
- 过滤速度:有效载荷过滤器增加了多少开销?
- 内存效率:不同配置如何影响 RAM 需求?
步骤 1:环境设置
安装所需库
库用途
datasets:访问 Hugging Face 数据集,特别是我们的 DBpedia 10 万数据集qdrant-client:用于向量搜索操作的官方 Qdrant Python 客户端tqdm:用于批量操作的进度条(对于 10 万上传跟踪至关重要)openai:生成与数据集兼容的查询嵌入
设置 API 密钥
您将需要一个 OpenAI API 密钥用于查询嵌入
- 访问 OpenAI 的 API 平台
- 创建账户或登录
- 导航到 API 密钥并创建新密钥
- 重要:您的 OpenAI 账户中需要有积分(此演示 ~$1 应该足够)
环境配置
在您的项目目录中创建 .env 文件或使用 Google Colab secrets。
# .env file
QDRANT_URL=https://your-cluster-url.cloud.qdrant.io
QDRANT_API_KEY=your-qdrant-api-key-here
OPENAI_API_KEY=sk-your-openai-api-key-here
安全提示:切勿提交 .env 文件。
步骤 2:连接到 Qdrant Cloud
我们将使用 Qdrant Cloud 在 10 万规模上获得稳定的资源。
from datasets import load_dataset
from qdrant_client import QdrantClient, models
from tqdm import tqdm
import openai
import time
import os
client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))
# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))
# Verify connection
try:
collections = client.get_collections()
print(f"Connected to Qdrant Cloud successfully!")
print(f"Current collections: {len(collections.collections)}")
except Exception as e:
print(f"Connection failed: {e}")
print("Check your QDRANT_URL and QDRANT_API_KEY in .env file")
为什么选择云
- 便利性:无需本地设置的麻烦
- 免费层级:使用 10 万数据集,我们完全在免费层级内
- 真实测试:类似生产环境,用于准确的基准测试
- 可扩展性:以后易于扩展
步骤 3:加载 DBpedia 数据集
我们正在使用一个精心策划的包含 10 万维基百科文章的数据集,这些文章具有来自 OpenAI 的 text-embedding-3-large 模型的预计算 1536 维嵌入
# Load the dataset (this may take a few minutes for first download)
print("Loading DBpedia 100K dataset...")
ds = load_dataset("Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-100K")
collection_name = "dbpedia_100K"
print("Dataset loaded successfully!")
print(f"Dataset size: {len(ds['train'])} articles")
# Explore the dataset structure
print("\nDataset structure:")
print("Available columns:", ds["train"].column_names)
# Look at a sample entry
sample = ds["train"][0]
print(f"\nSample article:")
print(f"Title: {sample['title']}")
print(f"Text preview: {sample['text'][:200]}...")
print(f"Embedding dimensions: {len(sample['text-embedding-3-large-1536-embedding'])}")
关于此数据集
- 来源:Hugging Face DBpedia 数据集
- 内容:预计算的维基百科文章嵌入
- 大小:100,000 篇文章
- 嵌入:使用 OpenAI 的
text-embedding-3-large截断到 1536 维度 - 元数据:
_id、title和text
步骤 4:战略性集合创建
设置 m=0 以在批量上传期间跳过 HNSW 图链接。在摄取后切换到正常的 m 以构建图。这将使插入速度提高 5-10 倍,因为链接创建被延迟了。
警告: 如果您关心保留 HNSW 索引,请不要在已拥有 HNSW 索引的集合上切换回 m=0。从头开始重建速度慢且消耗更多资源。
# Delete collection if it exists (for clean restart)
try:
client.delete_collection(collection_name)
print(f"Deleted existing collection: {collection_name}")
except Exception:
pass # Collection doesn't exist, which is fine
# Create collection with optimized settings
print(f"Creating collection: {collection_name}")
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=1536, # Matches dataset dims
distance=models.Distance.COSINE, # Good for normalized embeddings
),
hnsw_config=models.HnswConfigDiff(
m=0, # Bulk load fast: m=0 (build links after ingest).
ef_construct=100, # Build quality: used after we set m>0
full_scan_threshold=10, # force HNSW instead of full scan
),
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=10
), # Force indexing even on small sets for demo
strict_mode_config=models.StrictModeConfig(
enabled=False,
), # More flexible while testing
)
print(f"Collection '{collection_name}' created successfully!")
# Verify collection settings
collection_info = client.get_collection(collection_name)
print(f"Vector size: {collection_info.config.params.vectors.size}")
print(f"Distance metric: {collection_info.config.params.vectors.distance}")
print(f"HNSW m: {collection_info.config.hnsw_config.m}")
配置详情
size=1536:与我们为 OpenAItext-embedding-3-large设置的维度参数匹配distance=COSINE:归一化嵌入和语义相似度的标准full_scan_threshold=10000:对较小的结果集使用精确搜索strict_mode_config:托管云默认以严格模式运行。我们设置enabled=False以允许您在演示期间试验未索引的有效载荷键。
附注: text-embedding-3-large 输出 3072 维度。将其截断到仅 1536 维度可以减少计算和内存,但会损失一些精度。
步骤 5:批量上传富有效载荷
我们将以 10K 批次上传 10 万个向量。有效载荷包括 title、length 和 has_numbers 用于过滤器测试。
def upload_batch(start_idx, end_idx):
points = []
for i in range(start_idx, min(end_idx, total_points)):
example = ds["train"][i]
# Get the pre-computed embedding
embedding = example["text-embedding-3-large-1536-embedding"]
# Create payload with fields for filtering tests
payload = {
"text": example["text"],
"title": example["title"],
"_id": example["_id"],
"length": len(example["text"]),
"has_numbers": any(char.isdigit() for char in example["text"]),
}
points.append(models.PointStruct(id=i, vector=embedding, payload=payload))
if points:
client.upload_points(collection_name=collection_name, points=points)
return len(points)
return 0
batch_size = 64 * 10
total_points = len(ds["train"])
print(f"Uploading {total_points} points in batches of {batch_size}")
# Upload all batches with progress tracking
total_uploaded = 0
for i in tqdm(range(0, total_points, batch_size), desc="Uploading points"):
uploaded = upload_batch(i, i + batch_size)
total_uploaded += uploaded
print(f"Upload completed! Total points uploaded: {total_uploaded}")
步骤 6:启用 HNSW 索引
现在从 m=0 切换到 m=16 以构建 HNSW 连接并缩短搜索时间。
client.update_collection(
collection_name=collection_name,
hnsw_config=models.HnswConfigDiff(
m=16 # Build HNSW now: m=16 after the bulk load.
),
)
print("HNSW indexing enabled with m=16")
现在发生了什么? Qdrant 构建了一个可导航的图,因此搜索变得接近对数级而不是线性扫描。
步骤 7:创建查询嵌入
我们需要使用与数据集相同的模型和维度以确保兼容性。
如果您没有 OpenAI 密钥,请使用下面注释掉的备用方案。
# Optional fallback without an API key:
# import requests
# test_query = "artificial intelligence"
# url = "https://storage.googleapis.com/qdrant-examples/query_embedding_day_2.json"
# resp = requests.get(url)
# query_embedding = resp.json()["query_vector"]
# print(f"Generated embedding for: '{test_query}'")
# print(f"Embedding dimensions: {len(query_embedding)}")
# print(f"First 5 values: {query_embedding[:5]}")
# Initialize OpenAI client
openai_client = openai.OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
# for colab:
# openai_client = openai.OpenAI(api_key=userdata.get('OPENAI_API_KEY'))
def get_query_embedding(text):
"""Generate embedding using the same model as the dataset"""
try:
response = openai_client.embeddings.create(
model="text-embedding-3-large", # Must match dataset model
input=text,
dimensions=1536 # Must match dataset dimensions
)
return response.data[0].embedding
except Exception as e:
print(f"Error getting OpenAI embedding: {e}")
print("Common issues:")
print(" - Check your OPENAI_API_KEY in .env file")
print(" - Ensure you have credits in your OpenAI account")
print(" - Verify your API key has embedding permissions")
print("Using random vector as fallback for demo purposes...")
import numpy as np
return np.random.normal(0, 1, 1536).tolist()
# Test embedding generation
print("Generating query embedding...")
test_query = "artificial intelligence"
query_embedding = get_query_embedding(test_query)
print(f"Generated embedding for: '{test_query}'")
print(f"Embedding dimensions: {len(query_embedding)}")
print(f"First 5 values: {query_embedding[:5]}")
查询嵌入兼容性
- 模型:必须使用
text-embedding-3-large(与数据集相同) - 维度:必须是 1536(与数据集相同)
步骤 8:基准性能测试
让我们测量 HNSW 启用集合上的搜索性能。
print("Running baseline performance test...")
# Warm up the RAM index/vectors cache with a test query
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# Measure vector search performance
search_times = []
for _ in range(25): # Multiple runs for a stable average
start_time = time.time()
response = client.query_points(
collection_name=collection_name, query=query_embedding, limit=10
)
search_time = (time.time() - start_time) * 1000
search_times.append(search_time)
baseline_time = sum(search_times) / len(search_times)
print(f"Average search time: {baseline_time:.2f}ms")
print(f"Search times: {[f'{t:.2f}ms' for t in search_times]}")
print(f"Found {len(response.points)} results")
print(
f"Top result: '{response.points[0].payload['title']}' (score: {response.points[0].score:.4f})"
)
# Show a few more results for context
print(f"\nTop 3 results:")
for i, point in enumerate(response.points[:3], 1):
title = point.payload["title"]
score = point.score
text_preview = point.payload["text"][:100] + "..."
print(f" {i}. {title} (score: {score:.4f})")
print(f" {text_preview}")
性能因素
- 缓存预热:第一次查询将相关的索引部分/向量加载到内存中,随后的查询速度更快
- HNSW 与 m=16:基于图的搜索比全扫描快得多
- 重复运行:多次查询的平均值提供更可靠的计时结果
步骤 9:不带有效载荷索引的过滤
现在,让我们测试没有索引的过滤性能。这迫使 Qdrant 扫描向量并针对过滤器检查每个向量
print("Testing filtering without payload indexes")
# Warning: We enable unindexed_filtering_retrieve only for demonstration purposes. In production, don’t use it.
# Demo only: allow filtering without an index by scanning. Turn this off later.
client.update_collection(
collection_name=collection_name,
strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=True),
)
# Create a text-based filter
text_filter = models.Filter(
must=[models.FieldCondition(key="text", match=models.MatchText(text="data"))]
)
# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# Run multiple times for more reliable measurement
unindexed_times = []
for i in range(25):
start_time = time.time()
response = client.query_points(
collection_name=collection_name,
query=query_embedding,
limit=10,
search_params=models.SearchParams(hnsw_ef=100),
query_filter=text_filter,
)
unindexed_times.append((time.time() - start_time) * 1000)
unindexed_filter_time = sum(unindexed_times) / len(unindexed_times)
print(f"Filtered search (WITHOUT index): {unindexed_filter_time:.2f}ms")
print(f"Individual times: {[f'{t:.2f}ms' for t in unindexed_times]}")
print(f"Overhead vs baseline: {unindexed_filter_time - baseline_time:.2f}ms")
print(f"Found {len(response.points)} matching results")
if response.points:
print(
f"Top result: '{response.points[0].payload['text']}'\nScore: {response.points[0].score:.4f}"
)
else:
print("No results found - try a different filter term")
步骤 10:创建有效载荷索引
创建一个全文索引以加快过滤速度。
# Create a payload index for 'text' so filters use an index, not a scan.
client.create_payload_index(
collection_name=collection_name,
field_name="text",
wait=True,
field_schema=models.TextIndexParams(
type="text", tokenizer="word", phrase_matching=False
),
)
client.update_collection(
collection_name=collection_name,
hnsw_config=models.HnswConfigDiff(
ef_construct=101
), # Added payload index after HNSW; bump ef_construct (+1) to rebuild with filter data.
strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=False),
)
print("Payload index created for 'text' field")
步骤 11:带有效载荷索引的过滤
使用索引运行相同的查询。
print("Testing filtering WITH payload indexes...")
# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# Run multiple times for more reliable measurement
indexed_times = []
for i in range(25):
start_time = time.time()
response = client.query_points(
collection_name=collection_name,
query=query_embedding,
limit=10,
search_params=models.SearchParams(hnsw_ef=100),
query_filter=text_filter,
)
indexed_times.append((time.time() - start_time) * 1000)
indexed_filter_time = sum(indexed_times) / len(indexed_times)
print(f"Filtered search (WITH index): {indexed_filter_time:.2f}ms")
print(f"Individual times: {[f'{t:.2f}ms' for t in indexed_times]}")
print(f"Overhead vs baseline: {indexed_filter_time - baseline_time:.2f}ms")
print(f"Found {len(response.points)} matching results")
if response.points:
print(
f"Top result: '{response.points[0].payload['text']}'\nScore: {response.points[0].score:.4f}"
)
else:
print("No results found - try a different filter term")
性能分析
比较您的结果,查看每次优化的效果
print("\n" + "=" * 60)
print("FINAL PERFORMANCE SUMMARY")
print("=" * 60)
# Key metrics
if unindexed_filter_time > 0 and indexed_filter_time > 0:
index_speedup = unindexed_filter_time / indexed_filter_time
filter_overhead_without = unindexed_filter_time - baseline_time
filter_overhead_with = indexed_filter_time - baseline_time
else:
index_speedup = 0
filter_overhead_without = 0
filter_overhead_with = 0
print(f"Baseline search (HNSW m=16): {baseline_time:.2f}ms")
print(f"Filtering WITHOUT index: {unindexed_filter_time:.2f}ms")
print(f"Filtering WITH index: {indexed_filter_time:.2f}ms")
print("")
print(f"Performance improvements:")
print(f" • Index speedup: {index_speedup:.1f}x faster")
print(f" • Filter overhead (no index): +{filter_overhead_without:.2f}ms")
print(f" • Filter overhead (with index): +{filter_overhead_with:.2f}ms")
print("")
print(f"Key insights:")
print(f" • HNSW (m=16) enables fast vector search")
print(f" • Payload indexes dramatically improve filtering")
print(f" • Upload strategy (m=0→m=16) optimizes ingestion")
print("=" * 60)
后续步骤和资源
您学到了什么
- 通过
m=0→m=16切换对初始批量上传进行战略优化 - 真实世界性能测量技术
- 有效载荷索引对过滤的巨大影响
- 生产就绪的配置模式
建议的下一步
- 试验参数:尝试不同的
m值(8、32、64)和ef_construct设置 - 用您自己的数据测试:将这些技术应用于您自己的领域数据集
- 生产部署:在实际应用程序中使用这些模式
- 高级功能:探索量化、分片和复制
其他资源
- Qdrant 文档 - 完整的技术参考
- HNSW 论文 - 原始算法研究
- Qdrant Cloud - 托管向量搜索服务
- 性能调优指南 - 高级优化技术
准备好进行中途项目了吗? 现在轮到您使用自己的数据集和用例来优化性能了。您将把这些相同的技术应用于您的特定领域数据,并测量不同 HNSW 参数和索引策略的实际影响。