项目:HNSW性能基准测试
现在您已经了解了 HNSW 参数和有效载荷索引如何影响 DBpedia 数据集的性能,是时候针对您的领域和用例进行优化了。
你的任务
在您第一天的搜索引擎基础上,添加性能优化。您将发现哪些 HNSW 设置最适合您的特定数据和查询,并衡量有效载荷索引的实际影响。
预计时间:90 分钟
您将构建什么
您第一天搜索引擎的性能优化版本,演示了
- 快速批量加载:使用
m=0加载,然后切换到 HNSW - HNSW 参数调优:尝试不同的
m和ef_construct - 有效载荷索引影响:有索引和无索引的过滤时间
- 领域发现:什么最适合您的内容
设置
先决条件
- Qdrant 云集群(URL + API 密钥)
- Python 3.9+(或 Google Colab)
- 软件包:
qdrant-client,sentence-transformers,numpy
模型
- 嵌入:
sentence-transformers/all-MiniLM-L6-v2(384-dim)
数据集
- 重复使用您第一天的领域数据,或准备一个包含 1,000+ 项和丰富文本字段(例如
description)的数据集。 - 包含一些用于过滤的数字字段(例如
length,word_count),以便可以测量有效载荷索引的影响。
构建步骤
步骤 1:扩展您的第一天项目
从您第一天的领域搜索引擎开始,或创建一个包含 1000+ 项的新搜索引擎
from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer
import time
import numpy as np
import os
client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))
# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))
encoder = SentenceTransformer("all-MiniLM-L6-v2")
步骤 2:创建多个测试集合
测试不同的 HNSW 配置以找到最适合的
# Test configurations
configs = [
{"name": "fast_initial_upload", "m": 0, "ef_construct": 100}, # m=0 = ingest-only
{"name": "memory_optimized", "m": 8, "ef_construct": 100}, # m=8 = lower RAM
{"name": "balanced", "m": 16, "ef_construct": 200}, # m=16 = balanced
{"name": "high_quality", "m": 32, "ef_construct": 400}, # m=32 = higher recall, slower build
]
for config in configs:
collection_name = f"my_domain_{config['name']}"
if client.collection_exists(collection_name=collection_name):
client.delete_collection(collection_name=collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE),
hnsw_config=models.HnswConfigDiff(
m=config["m"],
ef_construct=config["ef_construct"],
full_scan_threshold=10, # force HNSW instead of full scan
),
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=10
), # Force indexing even on small sets for demo
)
print(f"Created collection: {collection_name}")
步骤 3:上传和计时
测量每种配置的上传性能
def upload_with_timing(collection_name, data, config_name):
embeddings = encoder.encode([d["description"] for d in data], show_progress_bar=True).tolist()
points = []
for i, item in enumerate(data):
embedding = embeddings[i]
points.append(
models.PointStruct(
id=i,
vector=embedding,
payload={
**item,
"length": len(item["description"]),
"word_count": len(item["description"].split()),
"has_keywords": any(
keyword in item["description"].lower() for keyword in ["important", "key", "main"]
),
},
)
)
# Warmup
client.query_points(collection_name=collection_name, query=points[0].vector, limit=1)
start_time = time.time()
client.upload_points(collection_name=collection_name, points=points)
upload_time = time.time() - start_time
print(f"{config_name}: Uploaded {len(points)} points in {upload_time:.2f}s")
return upload_time
# Load your dataset here. The larger the dataset, the more accurate the benchmark will be.
# your_dataset = [{"description": "This is a description of a product"}, ...]
# Upload to each collection
upload_times = {}
for config in configs:
collection_name = f"my_domain_{config['name']}"
upload_times[config["name"]] = upload_with_timing(collection_name, your_dataset, config["name"])
def wait_for_indexing(collection_name, timeout=60, poll_interval=1):
print(f"Waiting for collection '{collection_name}' to be indexed...")
start_time = time.time()
while time.time() - start_time < timeout:
info = client.get_collection(collection_name=collection_name)
if info.indexed_vectors_count > 0 and info.status == models.CollectionStatus.GREEN:
print(f"Success! Collection '{collection_name}' is indexed and ready.")
print(f" - Status: {info.status.value}")
print(f" - Indexed vectors: {info.indexed_vectors_count}")
return
print(f" - Status: {info.status.value}, Indexed vectors: {info.indexed_vectors_count}. Waiting...")
time.sleep(poll_interval)
info = client.get_collection(collection_name=collection_name)
raise Exception(
f"Timeout reached after {timeout} seconds. Collection '{collection_name}' is not ready. "
f"Final status: {info.status.value}, Indexed vectors: {info.indexed_vectors_count}"
)
for config in configs:
if config["m"] > 0: # m=0 has no HNSW to wait for
collection_name = f"my_domain_{config['name']}"
wait_for_indexing(collection_name)
步骤 4:基准测试搜索性能
使用不同的 hnsw_ef 值测试搜索速度
def benchmark_search(collection_name, query_embedding, ef_values=[64, 128, 256]):
# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# hnsw_ef: higher = better recall, but slower. Tune per your latency goal.
results = {}
for hnsw_ef in ef_values:
times = []
# Run multiple queries for more reliable timing
for _ in range(25):
start_time = time.time()
_ = client.query_points(
collection_name=collection_name,
query=query_embedding,
limit=10,
search_params=models.SearchParams(hnsw_ef=hnsw_ef),
with_payload=False,
)
times.append((time.time() - start_time) * 1000)
results[hnsw_ef] = {
"avg_time": np.mean(times),
"min_time": np.min(times),
"max_time": np.max(times),
}
return results
test_query = "your test query"
query_embedding = encoder.encode(test_query).tolist()
performance_results = {}
for config in configs:
if config["m"] > 0: # Skip m=0 collections for search
collection_name = f"my_domain_{config['name']}"
performance_results[config["name"]] = benchmark_search(
collection_name, query_embedding
)
步骤 5:测量有效载荷索引影响
测量有索引和无索引的过滤性能
def test_filtering_performance(collection_name):
query_embedding = encoder.encode("your filter test query").tolist()
# Test filter without index
filter_condition = models.Filter(
must=[models.FieldCondition(key="length", range=models.Range(gte=10, lte=200))]
)
# Demo only: unindexed_filtering_retrieve=True forces a scan; turn it off right after measuring.
client.update_collection(
collection_name=collection_name,
strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=True),
)
# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# Timing without payload index
times = []
for _ in range(25):
start_time = time.time()
_ = client.query_points(
collection_name=collection_name,
query=query_embedding,
query_filter=filter_condition,
limit=10,
with_payload=False,
)
times.append((time.time() - start_time) * 1000)
time_without_index = np.mean(times)
# Create payload index
client.create_payload_index(
collection_name=collection_name,
field_name="length",
field_schema=models.PayloadSchemaType.INTEGER,
wait=True,
)
# HNSW was already built; adding the payload index doesn’t rebuild it.
# Bump ef_construct (+1) once to trigger a safe rebuild.
base_ef = client.get_collection(
collection_name=collection_name
).config.hnsw_config.ef_construct
new_ef_construct = base_ef + 1
client.update_collection(
collection_name=collection_name,
hnsw_config=models.HnswConfigDiff(ef_construct=new_ef_construct),
strict_mode_config=models.StrictModeConfig(
unindexed_filtering_retrieve=False
), # Turn off scanning and use payload index instead.
)
wait_for_indexing(collection_name)
# Warmup
client.query_points(collection_name=collection_name, query=query_embedding, limit=1)
# Timing with index
times = []
for _ in range(25):
start_time = time.time()
_ = client.query_points(
collection_name=collection_name,
query=query_embedding,
query_filter=filter_condition,
limit=10,
with_payload=False,
)
times.append((time.time() - start_time) * 1000)
time_with_index = np.mean(times)
return {
"without_index": time_without_index,
"with_index": time_with_index,
"speedup": time_without_index / time_with_index,
}
# Test on your best performing collection
best_collection = "my_domain_balanced" # Choose based on your results
filtering_results = test_filtering_performance(best_collection)
步骤 6:分析您的结果
创建您的发现摘要
print("=" * 60)
print("PERFORMANCE OPTIMIZATION RESULTS")
print("=" * 60)
print("\n1) Upload Performance:")
for config_name, time_taken in upload_times.items():
print(f" {config_name}: {time_taken:.2f}s")
print("\n2) Search Performance (hnsw_ef=128):")
for config_name, results in performance_results.items():
if 128 in results:
print(f" {config_name}: {results[128]['avg_time']:.2f}ms")
print("\n3) Filtering Impact:")
print(f" Without index: {filtering_results['without_index']:.2f}ms")
print(f" With index: {filtering_results['with_index']:.2f}ms")
print(f" Speedup: {filtering_results['speedup']:.1f}x")
成功标准
你将在以下情况下知道你已成功:
您已经使用真实的计时数据测试了多种 HNSW 配置
您可以解释哪些设置最适合您的领域以及原因
您已经测量了有效载荷索引的具体影响
您对生产部署有明确的建议
分享你的发现
步骤 1:反思你的发现
- 哪种 HNSW 配置(
m,ef_construct)最适合您的领域? - 上传时间与搜索速度之间的平衡如何?
- 添加有效载荷索引有什么影响?
- 您的结果与 DBpedia 演示如何比较?
步骤 2:发布你的结果
**[Day 2] HNSW Performance Benchmarking**
**High-Level Summary**
- **Domain:** "[your domain]"
- **Key Result:** "m=[..], ef_construct=[..], hnsw_ef=[..] gave [X] ms search and [Y] s upload (best balance)."
**Reproducibility**
- **Collections:** ...
- **Model:** sentence-transformers/all-MiniLM-L6-v2 (384-dim)
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)
**Configuration Results**
| m | ef_construct | Upload_s | Search_ms@ef=128 |
|----|--------------|----------|------------------|
| 0 | 100 | X.X | — |
| 8 | 100 | Y.Y | A.A |
| 16 | 200 | Z.Z | B.B |
| 32 | 400 | W.W | C.C |
**Filtering Impact**
- Payload index on `length`: **[speedup]×**
Without index: [T1] ms → With index: [T2] ms
**Recommendations**
- Best config for this domain: [m, ef_construct, hnsw_ef]
- When to pick another setting: [short guidance]
- Notes for production: [one line on indexing order / filters]
**Surprise**
- "[one unexpected finding]"
**Next Step**
- "[one concrete action you’ll try next]"
可选:更进一步
测试更细粒度的参数
- ef_construct 对召回率和构建时间的影响
- hnsw_ef 根据复杂性进行每次查询调优
跟踪内存使用差异(RAM/磁盘、有效载荷索引)
添加准确性指标与少量标记查询集进行比较,以查看更高的
m是否真正提高了您领域的质量