Calendar 第 2 天

项目:HNSW性能基准测试

现在您已经了解了 HNSW 参数和有效载荷索引如何影响 DBpedia 数据集的性能,是时候针对您的领域和用例进行优化了。

你的任务

在您第一天的搜索引擎基础上,添加性能优化。您将发现哪些 HNSW 设置最适合您的特定数据和查询,并衡量有效载荷索引的实际影响。

预计时间:90 分钟

您将构建什么

您第一天搜索引擎的性能优化版本,演示了

  • 快速批量加载:使用 m=0 加载,然后切换到 HNSW
  • HNSW 参数调优:尝试不同的 mef_construct
  • 有效载荷索引影响:有索引和无索引的过滤时间
  • 领域发现:什么最适合您的内容

设置

先决条件

  • Qdrant 云集群(URL + API 密钥)
  • Python 3.9+(或 Google Colab)
  • 软件包:qdrant-client, sentence-transformers, numpy

模型

  • 嵌入:sentence-transformers/all-MiniLM-L6-v2 (384-dim)

数据集

  • 重复使用您第一天的领域数据,或准备一个包含 1,000+ 项和丰富文本字段(例如 description)的数据集。
  • 包含一些用于过滤的数字字段(例如 length, word_count),以便可以测量有效载荷索引的影响。

构建步骤

步骤 1:扩展您的第一天项目

从您第一天的领域搜索引擎开始,或创建一个包含 1000+ 项的新搜索引擎

from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer
import time
import numpy as np
import os

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))

encoder = SentenceTransformer("all-MiniLM-L6-v2")

步骤 2:创建多个测试集合

测试不同的 HNSW 配置以找到最适合的

# Test configurations
configs = [
    {"name": "fast_initial_upload", "m": 0, "ef_construct": 100},  # m=0 = ingest-only
    {"name": "memory_optimized", "m": 8, "ef_construct": 100},  # m=8 = lower RAM
    {"name": "balanced", "m": 16, "ef_construct": 200},  # m=16 = balanced
    {"name": "high_quality", "m": 32, "ef_construct": 400},  # m=32 = higher recall, slower build
]

for config in configs:
    collection_name = f"my_domain_{config['name']}"
    if client.collection_exists(collection_name=collection_name):
        client.delete_collection(collection_name=collection_name)

    client.create_collection(
        collection_name=collection_name,
        vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE),
        hnsw_config=models.HnswConfigDiff(
            m=config["m"],
            ef_construct=config["ef_construct"],
            full_scan_threshold=10,  # force HNSW instead of full scan
        ),
        optimizers_config=models.OptimizersConfigDiff(
            indexing_threshold=10
        ),  # Force indexing even on small sets for demo
    )
    print(f"Created collection: {collection_name}")

步骤 3:上传和计时

测量每种配置的上传性能

def upload_with_timing(collection_name, data, config_name):
    embeddings = encoder.encode([d["description"] for d in data], show_progress_bar=True).tolist()

    points = []
    for i, item in enumerate(data):
        embedding = embeddings[i]

        points.append(
            models.PointStruct(
                id=i,
                vector=embedding,
                payload={
                    **item,
                    "length": len(item["description"]),
                    "word_count": len(item["description"].split()),
                    "has_keywords": any(
                        keyword in item["description"].lower() for keyword in ["important", "key", "main"]
                    ),
                },
            )
        )

    # Warmup
    client.query_points(collection_name=collection_name, query=points[0].vector, limit=1)

    start_time = time.time()
    client.upload_points(collection_name=collection_name, points=points)
    upload_time = time.time() - start_time

    print(f"{config_name}: Uploaded {len(points)} points in {upload_time:.2f}s")
    return upload_time


# Load your dataset here. The larger the dataset, the more accurate the benchmark will be.
# your_dataset = [{"description": "This is a description of a product"}, ...]

# Upload to each collection
upload_times = {}
for config in configs:
    collection_name = f"my_domain_{config['name']}"
    upload_times[config["name"]] = upload_with_timing(collection_name, your_dataset, config["name"])


def wait_for_indexing(collection_name, timeout=60, poll_interval=1):
    print(f"Waiting for collection '{collection_name}' to be indexed...")
    start_time = time.time()

    while time.time() - start_time < timeout:
        info = client.get_collection(collection_name=collection_name)

        if info.indexed_vectors_count > 0 and info.status == models.CollectionStatus.GREEN:
            print(f"Success! Collection '{collection_name}' is indexed and ready.")
            print(f" - Status: {info.status.value}")
            print(f" - Indexed vectors: {info.indexed_vectors_count}")
            return

        print(f" - Status: {info.status.value}, Indexed vectors: {info.indexed_vectors_count}. Waiting...")
        time.sleep(poll_interval)

    info = client.get_collection(collection_name=collection_name)
    raise Exception(
        f"Timeout reached after {timeout} seconds. Collection '{collection_name}' is not ready. "
        f"Final status: {info.status.value}, Indexed vectors: {info.indexed_vectors_count}"
    )


for config in configs:
    if config["m"] > 0:  # m=0 has no HNSW to wait for
        collection_name = f"my_domain_{config['name']}"
        wait_for_indexing(collection_name)

步骤 4:基准测试搜索性能

使用不同的 hnsw_ef 值测试搜索速度

def benchmark_search(collection_name, query_embedding, ef_values=[64, 128, 256]):
    # Warmup
    client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

    # hnsw_ef: higher = better recall, but slower. Tune per your latency goal.
    results = {}
    for hnsw_ef in ef_values:
        times = []

        # Run multiple queries for more reliable timing
        for _ in range(25):
            start_time = time.time()

            _ = client.query_points(
                collection_name=collection_name,
                query=query_embedding,
                limit=10,
                search_params=models.SearchParams(hnsw_ef=hnsw_ef),
                with_payload=False,
            )

            times.append((time.time() - start_time) * 1000)

        results[hnsw_ef] = {
            "avg_time": np.mean(times),
            "min_time": np.min(times),
            "max_time": np.max(times),
        }

    return results


test_query = "your test query"
query_embedding = encoder.encode(test_query).tolist()

performance_results = {}
for config in configs:
    if config["m"] > 0:  # Skip m=0 collections for search
        collection_name = f"my_domain_{config['name']}"
        performance_results[config["name"]] = benchmark_search(
            collection_name, query_embedding
        )

步骤 5:测量有效载荷索引影响

测量有索引和无索引的过滤性能

def test_filtering_performance(collection_name):
    query_embedding = encoder.encode("your filter test query").tolist()

    # Test filter without index
    filter_condition = models.Filter(
        must=[models.FieldCondition(key="length", range=models.Range(gte=10, lte=200))]
    )

    # Demo only: unindexed_filtering_retrieve=True forces a scan; turn it off right after measuring.
    client.update_collection(
        collection_name=collection_name,
        strict_mode_config=models.StrictModeConfig(unindexed_filtering_retrieve=True),
    )

    # Warmup
    client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

    # Timing without payload index
    times = []
    for _ in range(25):
        start_time = time.time()
        _ = client.query_points(
            collection_name=collection_name,
            query=query_embedding,
            query_filter=filter_condition,
            limit=10,
            with_payload=False,
        )
        times.append((time.time() - start_time) * 1000)
    time_without_index = np.mean(times)

    # Create payload index
    client.create_payload_index(
        collection_name=collection_name,
        field_name="length",
        field_schema=models.PayloadSchemaType.INTEGER,
        wait=True,
    )

    # HNSW was already built; adding the payload index doesn’t rebuild it.
    # Bump ef_construct (+1) once to trigger a safe rebuild.
    base_ef = client.get_collection(
        collection_name=collection_name
    ).config.hnsw_config.ef_construct
    new_ef_construct = base_ef + 1

    client.update_collection(
        collection_name=collection_name,
        hnsw_config=models.HnswConfigDiff(ef_construct=new_ef_construct),
        strict_mode_config=models.StrictModeConfig(
            unindexed_filtering_retrieve=False
        ),  # Turn off scanning and use payload index instead.
    )

    wait_for_indexing(collection_name)

    # Warmup
    client.query_points(collection_name=collection_name, query=query_embedding, limit=1)

    # Timing with index
    times = []
    for _ in range(25):
        start_time = time.time()
        _ = client.query_points(
            collection_name=collection_name,
            query=query_embedding,
            query_filter=filter_condition,
            limit=10,
            with_payload=False,
        )
        times.append((time.time() - start_time) * 1000)
    time_with_index = np.mean(times)

    return {
        "without_index": time_without_index,
        "with_index": time_with_index,
        "speedup": time_without_index / time_with_index,
    }


# Test on your best performing collection
best_collection = "my_domain_balanced"  # Choose based on your results
filtering_results = test_filtering_performance(best_collection)

步骤 6:分析您的结果

创建您的发现摘要

print("=" * 60)
print("PERFORMANCE OPTIMIZATION RESULTS")
print("=" * 60)

print("\n1) Upload Performance:")
for config_name, time_taken in upload_times.items():
    print(f"   {config_name}: {time_taken:.2f}s")

print("\n2) Search Performance (hnsw_ef=128):")
for config_name, results in performance_results.items():
    if 128 in results:
        print(f"   {config_name}: {results[128]['avg_time']:.2f}ms")

print("\n3) Filtering Impact:")
print(f"   Without index: {filtering_results['without_index']:.2f}ms")
print(f"   With index: {filtering_results['with_index']:.2f}ms")
print(f"   Speedup: {filtering_results['speedup']:.1f}x")

成功标准

你将在以下情况下知道你已成功:

您已经使用真实的计时数据测试了多种 HNSW 配置
您可以解释哪些设置最适合您的领域以及原因
您已经测量了有效载荷索引的具体影响
您对生产部署有明确的建议

分享你的发现

步骤 1:反思你的发现

  1. 哪种 HNSW 配置(m, ef_construct)最适合您的领域?
  2. 上传时间与搜索速度之间的平衡如何?
  3. 添加有效载荷索引有什么影响?
  4. 您的结果与 DBpedia 演示如何比较?

步骤 2:发布你的结果

将你的结果发布到 Post your results in Discord ,使用此模板:

**[Day 2] HNSW Performance Benchmarking**

**High-Level Summary**
- **Domain:** "[your domain]"
- **Key Result:** "m=[..], ef_construct=[..], hnsw_ef=[..] gave [X] ms search and [Y] s upload (best balance)."

**Reproducibility**
- **Collections:** ...
- **Model:** sentence-transformers/all-MiniLM-L6-v2 (384-dim)
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)

**Configuration Results**
| m  | ef_construct | Upload_s | Search_ms@ef=128 |
|----|--------------|----------|------------------|
| 0  | 100          | X.X      | —                |
| 8  | 100          | Y.Y      | A.A              |
| 16 | 200          | Z.Z      | B.B              |
| 32 | 400          | W.W      | C.C              |

**Filtering Impact**
- Payload index on `length`: **[speedup]×**  
  Without index: [T1] ms → With index: [T2] ms

**Recommendations**
- Best config for this domain: [m, ef_construct, hnsw_ef]
- When to pick another setting: [short guidance]
- Notes for production: [one line on indexing order / filters]

**Surprise**
- "[one unexpected finding]"

**Next Step**
- "[one concrete action you’ll try next]"

可选:更进一步

  • 测试更细粒度的参数

    • ef_construct 对召回率和构建时间的影响
    • hnsw_ef 根据复杂性进行每次查询调优
  • 跟踪内存使用差异(RAM/磁盘、有效载荷索引)

  • 添加准确性指标与少量标记查询集进行比较,以查看更高的 m 是否真正提高了您领域的质量