Calendar 第 4 天

项目:量化性能优化

将量化技术应用于您的领域搜索引擎,并衡量其对速度、内存和准确性的实际影响。您将了解不同的量化方法如何影响您的特定用例,并学习优化准确性恢复流程。

你的任务

通过实施量化优化,将前几天的搜索引擎转变为可投入生产的系统。您将测试不同的量化方法,衡量性能影响,并调整过采样 + 重排流程以获得最佳结果。

预计时间:120 分钟

您将构建什么

一个经过量化优化的搜索系统,展示了

  • 性能比较:量化前后的指标
  • 方法评估:在您的数据上测试标量和二进制量化
  • 准确性恢复:实现过采样和重排流程
  • 生产部署:内存优化的存储配置

先决条件

  • Qdrant 云集群(URL + API 密钥)
  • Python 3.9+(或 Google Colab)
  • 包:qdrant-client, numpy

模型

  • 使用与现有集合相同的嵌入模型和维度。

    • 如果您的向量是 1536 维,请将下面的 size 保持为 1536
    • 否则,请将 VectorParams(size=...) 更改为您的模型的维度。

数据集

  • 重用您第 1 天/第 2 天的领域数据集(理想情况下为 1,000+ 个项目),其中包含用于嵌入的主文本字段。
  • 至少包含一个数字字段(例如,lengthword_count)以衡量有效载荷索引的影响。

构建步骤

第 1 步:基线测量

首先测量您的当前系统在未量化时的性能

import time
import numpy as np

from qdrant_client import QdrantClient, models
import os

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))

def measure_search_performance(collection_name, test_queries, label="Baseline"):
    """Measure search performance across multiple queries"""
    latencies = []

    # Don't forget to warm up caches!

    #response = client.query_points(
    #        collection_name=collection_name,
    #        query=query,
    #        limit=10
    #    )
    
    for query in test_queries:
        start_time = time.time()
        
        response = client.query_points(
            collection_name=collection_name,
            query=query,
            limit=10
        )
        
        latency = (time.time() - start_time) * 1000
        latencies.append(latency)
    
    avg_latency = np.mean(latencies)
    p95_latency = np.percentile(latencies, 95)
    
    print(f"{label}:")
    print(f"  Average latency: {avg_latency:.2f}ms")
    print(f"  P95 latency: {p95_latency:.2f}ms")
    print(f"  Memory usage: Check Qdrant Cloud dashboard")
    
    return {"avg": avg_latency, "p95": p95_latency}

# Measure baseline performance
baseline_metrics = measure_search_performance(
    "your_domain_collection", 
    your_test_queries, 
    "Baseline (No Quantization)"
)

第 2 步:测试量化方法

使用不同的量化方法创建集合,以比较它们的影响

注意:出于教育目的,当创建具有不同量化配置(例如,原始、二进制量化、标量量化、2 位二进制量化)的多个集合时,请务必监控可用资源。原始向量会为每个集合存储(在这种情况下存储在磁盘上),此外还会存储其量化版本。

# Test configurations
quantization_configs = {
    "scalar": {
        "config": models.ScalarQuantization(
            scalar=models.ScalarQuantizationConfig(
                type=models.ScalarType.INT8,
                quantile=0.99,
                always_ram=True,
            )
        ),
        "expected_speedup": "2x",
        "expected_compression": "4x"
    },
    "binary": {
        "config": models.BinaryQuantization(
            binary=models.BinaryQuantizationConfig(
                encoding=models.BinaryQuantizationEncoding.ONE_BIT,
                always_ram=True,
            )
        ),
        "expected_speedup": "40x",
        "expected_compression": "32x"
    },
    "binary_2bit": {
        "config": models.BinaryQuantization(
            binary=models.BinaryQuantizationConfig(
                encoding=models.BinaryQuantizationEncoding.TWO_BITS,
                always_ram=True,
            )
        ),
        "expected_speedup": "20x",
        "expected_compression": "16x"
    }
}

# Create quantized collections
for method_name, config_info in quantization_configs.items():
    collection_name = f"quantized_{method_name}"
    
    client.create_collection(
        collection_name=collection_name,
        vectors_config=models.VectorParams(
            size=1536,  # Adjust to your embedding size
            distance=models.Distance.COSINE,
            on_disk=True,  # Store originals on disk
        ),
        quantization_config=config_info["config"]
    )
    
    print(f"Created {method_name} quantized collection: {collection_name}")

第 3 步:上传数据并测量影响

将您的数据集上传到每个量化集合并测量性能差异

def benchmark(collection_name, your_test_queries, method_name):
    """Measure quantized search performance"""
    
    # Test without oversampling/rescoring first
    no_rescoring_metrics = measure_search_performance(
        collection_name, 
        your_test_queries, 
        f"{method_name} (No Rescoring)"
    )
    
    # Test with oversampling and rescoring
    def search_with_rescoring(collection_name, query, oversampling_factor=3.0):
        start_time = time.time()
        
        response = client.query_points(
            collection_name=collection_name,
            query=query,
            limit=10,
            search_params=models.SearchParams(
                quantization=models.QuantizationSearchParams(
                    rescore=True,
                    oversampling=oversampling_factor,
                )
            ),
        )
        
        return (time.time() - start_time) * 1000, response
    
    # Measure with rescoring
    rescoring_latencies = []
    for query in your_test_queries:
        latency, response = search_with_rescoring(collection_name, query)
        rescoring_latencies.append(latency)
    
    avg_rescoring = np.mean(rescoring_latencies)
    p95_rescoring = np.percentile(rescoring_latencies, 95)
    
    print(f"{method_name} (With Rescoring):")
    print(f"  Average latency: {avg_rescoring:.2f}ms")
    print(f"  P95 latency: {p95_rescoring:.2f}ms")
    
    return {
        "no_rescoring": no_rescoring_metrics,
        "with_rescoring": {"avg": avg_rescoring, "p95": p95_rescoring}
    }


# Upload your data (same as it was done in the previous days for a basic unquantized collection) in each collection

# Test each quantization method
quantization_results = {}
for method_name in quantization_configs.keys():
    collection_name = f"quantized_{method_name}"
    quantization_results[method_name] = benchmark(
        collection_name, your_test_queries, method_name
    )

第 4 步:优化过采样因子

根据延迟和保留准确性之间的平衡,为您表现最佳的量化方法找到最佳过采样因子

def measure_accuracy_retention(original_collection, quantized_collection, test_queries, factors=[2, 3, 5, 8, 10]):
    """Compare search results between original and quantized collections"""

    results = {}

    for factor in factors:
        accuracy_scores = []
        
        for query in test_queries:
            # Get baseline results
            baseline_results = client.query_points(
                collection_name=original_collection,
                query=query,
                limit=10
            )
            baseline_ids = [point.id for point in baseline_results.points]

            # Get quantized results with rescoring
            quantized_results = client.query_points(
                collection_name=quantized_collection,
                query=query,
                limit=10,
                search_params=models.SearchParams(
                    quantization=models.QuantizationSearchParams(
                        rescore=True,
                        oversampling=factor,
                    )
                ),
            )
            quantized_ids = [point.id for point in quantized_results.points]
            
            # Calculate overlap (simple accuracy measure)
            overlap = len(set(baseline_ids) & set(quantized_ids))
            accuracy = overlap / len(baseline_ids)
            accuracy_scores.append(accuracy)
        
        results[factor] = {
            "avg_accuracy": np.mean(accuracy_scores)
        }
    
    return results


def tune_oversampling(collection_name, test_queries, factors=[2, 3, 5, 8, 10]):
    """Find optimal oversampling factor"""
    results = {}
    
    for factor in factors:
        latencies = []
        
        for query in test_queries:
            start_time = time.time()
            
            response = client.query_points(
                collection_name=collection_name,
                query=query,
                limit=10,
                search_params=models.SearchParams(
                    quantization=models.QuantizationSearchParams(
                        rescore=True,
                        oversampling=factor,
                    )
                ),
            )
            
            latencies.append((time.time() - start_time) * 1000)
        
        results[factor] = {
            "avg_latency": np.mean(latencies),
            "p95_latency": np.percentile(latencies, 95)
        }
    
    return results

# Tune oversampling for your method of choice
best_method = "binary"  # Choose based on your results
oversampling_factors = [2, 3, 5, 8, 10]

oversampling_results_latency = tune_oversampling(
    f"quantized_{best_method}", 
    your_test_queries,
    oversampling_factors
)

oversampling_results_accuracy = measure_accuracy_retention(
    "your_domain_collection",
    f"quantized_{best_method}", 
    your_test_queries,
    oversampling_factors
)


print("Oversampling Factor Optimization:")
for factor in oversampling_factors:
    print(f"  {factor}x:")
    print(f"  {oversampling_results_latency[factor]['avg_latency']:.2f}ms avg latency, {oversampling_results_latency[factor]['p95_latency']:.2f}ms P95 latency")
    print(f"  {oversampling_results_accuracy[factor]['avg_accuracy']:.2f} avg accuracy retention")

第 5 步:分析您的结果

对您的量化实验进行全面分析

print("=" * 60)
print("QUANTIZATION PERFORMANCE ANALYSIS")
print("=" * 60)

print(f"\nBaseline Performance:")
print(f"  Average latency: {baseline_metrics['avg']:.2f}ms")
print(f"  P95 latency: {baseline_metrics['p95']:.2f}ms")

print(f"\nQuantization Results:")
for method, results in quantization_results.items():
    no_rescoring = results['no_rescoring']
    with_rescoring = results['with_rescoring']
    
    speedup_no_rescoring = baseline_metrics['avg'] / no_rescoring['avg']
    speedup_with_rescoring = baseline_metrics['avg'] / with_rescoring['avg']
    
    print(f"\n{method.upper()}:")
    print(f"  Without rescoring: {no_rescoring['avg']:.2f}ms ({speedup_no_rescoring:.1f}x speedup)")
    print(f"  With rescoring: {with_rescoring['avg']:.2f}ms ({speedup_with_rescoring:.1f}x speedup)")

成功标准

你将在以下情况下知道你已成功:

您已实现可衡量的搜索速度提升
通过过采样优化,您已保持可接受的准确性
您已通过 on_disk 配置展示了显著的热内存节省
您可以针对您的领域对量化提出明智的建议

分享你的发现

步骤 1:反思你的发现

  1. 哪种量化方法在速度和准确性之间提供了最佳平衡?
  2. 过采样因子如何改变延迟和准确性?
  3. 实际内存和成本影响是什么?
  4. 您的结果与参考最大值(≈40 倍速度,≈32 倍压缩)相比如何?

步骤 2:发布你的结果

将你的结果发布到 Post your results in Discord ,使用此模板:

**[Day 4] Quantization Performance Optimization**

**High-Level Summary**
- **Domain:** "I optimized [your domain] search with quantization"
- **Key Result:** "Best was [Scalar/Binary/(2-bit Binary)] with oversampling [x]× → [Z]× faster, [A]% accuracy retained."

**Reproducibility**
- **Collections:** day4_baseline_collection, day4_quantized_scalar, day4_quantized_binary (and/or day4_quantized_2bit)
- **Model:** [name, dim]
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)
- **Search settings:** hnsw_ef=[..] (if used)

**Results**
- **Baseline latency:** [X] ms
- **Quantized latency (rescoring on):** [Y] ms
- **Oversampling:** [factor]×
- **Accuracy retention:** [..]%
- **Memory:** [before GB] → [after GB] (**[compression]×**)
- **(Optional) Cost:** ~$[before]/mo → ~$[after]/mo, save ~$[delta]/mo

**Method Notes**
- **Scalar (INT8):** [one line]
- **Binary (1-bit / 2-bit):** [one line]

**Surprise**
- "[most unexpected finding]"

**Next step**
- "[one concrete action for tomorrow]"

可选:更进一步

动态过采样

根据查询特征实现自适应过采样

def adaptive_oversampling(query, base_factor=3.0):
    """Adjust oversampling based on query complexity"""
    # Simple heuristic: longer queries may need more oversampling (adapt to your domain/use case)
    query_length = len(query) if isinstance(query, str) else len([x for x in query if x != 0])
    
    if query_length > 1000:  # Complex query
        return base_factor * 1.5
    elif query_length < 100:  # Simple query
        return base_factor * 0.8
    else:
        return base_factor

# Test adaptive oversampling vs fixed oversampling

成本-性能分析

计算量化的真实成本影响

def calculate_cost_savings(baseline_memory_gb, compression_ratio, ram_cost_per_gb_monthly=10):
    """Calculate monthly cost savings from quantization"""
    quantized_memory_gb = baseline_memory_gb / compression_ratio
    monthly_savings = (baseline_memory_gb - quantized_memory_gb) * ram_cost_per_gb_monthly
    
    return {
        "baseline_cost": baseline_memory_gb * ram_cost_per_gb_monthly,
        "quantized_cost": quantized_memory_gb * ram_cost_per_gb_monthly,
        "monthly_savings": monthly_savings,
        "annual_savings": monthly_savings * 12
    }

# Calculate cost impact for your deployment
cost_analysis = calculate_cost_savings(
    baseline_memory_gb=10,  # Your baseline memory usage
    compression_ratio=32,   # Your best quantization compression
)

print(f"Annual cost savings: ${cost_analysis['annual_savings']:.2f}")

内存使用监控

跟踪实际内存使用变化

# Monitor collection memory usage
collection_info = client.get_collection("quantized_binary")
print(f"Vectors count: {collection_info.points_count}")
print(f"Memory usage: Check Qdrant Cloud metrics")

# Compare RAM usage with and without on_disk configuration