Calendar 第 5 天

项目:构建推荐系统

在一个原子式的通用查询中汇集稠密、稀疏和多向量。您将检索候选、融合信号、使用 ColBERT 重新排序并应用业务筛选器——所有这些都在一个请求中完成。

你的任务

使用 Qdrant 的通用查询 API 构建一个完整的推荐系统,在一个请求中包含稠密、稀疏和 ColBERT 多向量。

预计时间:90 分钟

您将构建什么

使用以下技术构建混合推荐系统:

  • 多向量架构,包含稠密、稀疏和 ColBERT 向量
  • 通用查询 API 用于原子多阶段搜索
  • RRF 融合 用于组合候选
  • ColBERT 重新排序 用于细粒度相关性评分
  • 在多个管道阶段进行业务规则筛选
  • 推荐系统的生产就绪模式

设置

先决条件

  • Qdrant 云集群(URL + API 密钥)
  • Python 3.9+(或 Google Colab)
  • 包:qdrant-client, fastembed

模型

  • 稠密sentence-transformers/all-MiniLM-L6-v2 (384 维)
  • 稀疏prithivida/Splade_PP_en_v1 (SPLADE)
  • 多向量colbert-ir/colbertv2.0 (128 维 tokens)

数据集

  • 范围:一小部分示例项目(例如,10-20 部电影)。
  • Payload 字段title, description, category, genre, year, rating, user_segment, popularity_score, release_date
  • 使用的筛选器category, user_segment, release_date, popularity_score

构建步骤

第 1 步:设置混合集合

初始化客户端和集合

首先,连接到 Qdrant 并为我们的推荐系统创建一个干净的集合。

from datetime import datetime
from qdrant_client import QdrantClient, models
import os

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))

collection_name = "day5_recommendations_hybrid"

# Clean state
if client.collection_exists(collection_name=collection_name):
    client.delete_collection(collection_name=collection_name)

现在配置一个包含三个向量的集合——每个向量在我们的推荐管道中都有不同的用途。

client.create_collection(
    collection_name=collection_name,
    vectors_config={
        # Dense vectors for semantic understanding
        "dense": models.VectorParams(size=384, distance=models.Distance.COSINE),
        # ColBERT multivectors for fine-grained reranking
        "colbert": models.VectorParams(
            size=128,
            distance=models.Distance.COSINE,
            multivector_config=models.MultiVectorConfig(
                comparator=models.MultiVectorComparator.MAX_SIM
            ),
            hnsw_config=models.HnswConfigDiff(
                m=0  # Disable HNSW - used only for reranking
            ),
        ),
    },
    sparse_vectors_config={
        # Sparse vectors for exact keyword matching
        "sparse": models.SparseVectorParams(
            index=models.SparseIndexParams(on_disk=False)
        )
    },
)

为何如此设置:ColBERT 使用 MAX_SIM 进行 token 级比较,并使用 m=0,因为它仅用于重新排序,而非初始检索,将 m=0 设置为有效禁用 HNSW 索引以节省内存和计算。

创建 Payload 索引

在摄取数据之前,为我们将要筛选的字段创建索引。这使得在向量搜索期间能够高效筛选。

# Business metadata indexes
client.create_payload_index(
    collection_name=collection_name,
    field_name="category",
    field_schema="keyword",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="user_segment",
    field_schema="keyword",
)

# Quality and recency indexes
client.create_payload_index(
    collection_name=collection_name,
    field_name="release_date",
    field_schema="datetime",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="popularity_score",
    field_schema="float",
)
client.create_payload_index(
    collection_name=collection_name,
    field_name="rating",
    field_schema="float",
)

第 2 步:准备并上传推荐数据

创建样本推荐数据

让我们创建带有业务元数据的电影样本数据以进行筛选。

# Example: Create sample movie/content data
sample_data = [
    {
        "title": "The Matrix",
        "description": "A computer hacker learns about the true nature of reality and his role in the war against its controllers.",
        "category": "movie",
        "genre": ["sci-fi", "action"],
        "year": 1999,
        "rating": 8.7,
        "user_segment": "premium",
        "popularity_score": 0.9,
        "release_date": "1999-03-31T00:00:00Z",
    },
    # Add more sample data...
]

texts = [it["description"] for it in sample_data]

初始化嵌入模型

我们将使用 FastEmbed 生成所有三种向量类型。

from fastembed import TextEmbedding, SparseTextEmbedding, LateInteractionTextEmbedding

# Model configurations
DENSE_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"  # 384-dim
SPARSE_MODEL_ID = "prithivida/Splade_PP_en_v1"  # SPLADE sparse
COLBERT_MODEL_ID = "colbert-ir/colbertv2.0"  # 128-dim multivector

dense_model = TextEmbedding(DENSE_MODEL_ID)
sparse_model = SparseTextEmbedding(SPARSE_MODEL_ID)
colbert_model = LateInteractionTextEmbedding(COLBERT_MODEL_ID)

生成嵌入

使用所有三种嵌入模型编码所有描述。

# Generate embeddings for all items
dense_embeds = list(
    dense_model.embed(texts, parallel=0)
)  # list[np.ndarray] shape (384,)

sparse_embeds = list(
    sparse_model.embed(texts, parallel=0)
)  # list[SparseEmbedding] with .indices/.values

colbert_multivectors = list(
    colbert_model.embed(texts, parallel=0)
)  # list[np.ndarray] shape (tokens, 128)

创建点并上传

现在将所有内容打包成点并上传到 Qdrant。

# Generate vectors for each item
points = []
for i, item in enumerate(sample_data):
    # Create sparse vector (keyword matching)
    sparse_vector = sparse_embeds[i].as_object()

    # Create dense vector (semantic understanding)
    dense_vector = dense_embeds[i]

    # Create ColBERT multivector (token-level understanding)
    colbert_vector = colbert_multivectors[i]

    points.append(
        models.PointStruct(
            id=i,
            vector={
                "dense": dense_vector,
                "sparse": sparse_vector,
                "colbert": colbert_vector,
            },
            payload=item,
        )
    )

client.upload_points(collection_name=collection_name, points=points)
print(f"Uploaded {len(points)} recommendation items")

第 3 步:一个通用查询 — 检索 → 融合 → 重新排序 → 筛选

现在让我们构建一个完整的多阶段搜索,将所有内容组合在一个请求中。

准备查询嵌入

首先,使用所有三种嵌入模型编码用户的搜索意图。

from datetime import timedelta

# Example user intent
user_query = "premium user likes sci-fi action movies with strong hacker themes"

user_dense_vector = next(dense_model.query_embed(user_query))
user_sparse_vector = next(sparse_model.query_embed(user_query)).as_object()
user_multivector = next(colbert_model.query_embed(user_query))

定义具有自动传播的全局筛选器

创建一个将在管道所有阶段自动传播的单一筛选器。

# Global filter - this will be propagated to ALL prefetch stages
global_filter = models.Filter(
    must=[
        # Content type and user segment
        models.FieldCondition(
            key="category", match=models.MatchValue(value="movie")
        ),
        models.FieldCondition(
            key="user_segment", match=models.MatchValue(value="premium")
        ),
        # Quality and recency constraints
        models.FieldCondition(
            key="release_date",
            range=models.DatetimeRange(
                gte=(datetime.now() - timedelta(days=365 * 30)).isoformat()
            ),
        ),
        models.FieldCondition(key="popularity_score", range=models.Range(gte=0.7)),
    ]
)

关键见解:此筛选器自动传播到所有预取阶段。Qdrant 不进行“延迟筛选”或“后筛选”——筛选器在 HNSW 搜索级别应用,以实现最大效率。

设置预取和融合

配置并行检索和融合策略。

# Prefetch queries - global filter will be automatically applied to both
hybrid_query = [
    models.Prefetch(query=user_dense_vector, using="dense", limit=100),
    models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
]

# Fusion stage - combine candidates with RRF
fusion_query = models.Prefetch(
    prefetch=hybrid_query,
    query=models.FusionQuery(fusion=models.Fusion.RRF),
    limit=100,
)

这些预取查询并行运行,主查询中的全局筛选器将自动传播到稠密和稀疏搜索。

执行带重新排序的通用查询

最后,发送带有 ColBERT 重新排序的完整查询。

# The Universal Query: Global filter propagates through all stages
response = client.query_points(
    collection_name=collection_name,
    prefetch=fusion_query,
    query=user_multivector,
    using="colbert",
    query_filter=global_filter,  # Propagates to all prefetch stages
    limit=10,
    with_payload=True,
)

for hit in response.points or []:
    print(hit.payload)

为何如此有效:稠密和稀疏检索并行发生(应用筛选器),RRF 融合结果,ColBERT 以 token 级精度重新排序,并且全局筛选器通过自动传播应用于每个阶段——所有这些都在一个原子 API 调用中完成。

第 4 步:构建推荐服务

现在让我们将所有内容封装到一个生产就绪的函数中,该函数处理动态用户偏好。

构建筛选器的辅助函数

首先,创建一个可重用的辅助函数,用于从用户配置文件构建筛选器。

def build_recommendation_filter(user_profile, user_preference=None):
    """
    Build a global filter from user profile and preferences.
    This filter will automatically propagate to all prefetch stages.

    Args:
        user_profile: {
            "liked_titles": list[str],      # optional
            "preferred_genres": list[str],  # e.g. ["sci-fi","action"]
            "segment": str,                 # e.g. "premium"
            "query": str                    # free-text intent, e.g. "smart sci-fi with hacker vibe"
        }
        user_preference: {
            "category": str | None,         # e.g. "movie"
            "min_rating": float | None,     # e.g. 8.0
            "released_within_days": int | None  # e.g. 365
        }

    Returns:
        models.Filter object or None if no conditions
    """
    from datetime import datetime, timedelta

    filter_conditions = []

    # User segment filtering
    if user_profile.get("segment"):
        filter_conditions.append(
            models.FieldCondition(
                key="user_segment",
                match=models.MatchValue(value=user_profile["segment"]),
            )
        )

    if user_preference:
        # Category filtering
        if user_preference.get("category"):
            filter_conditions.append(
                models.FieldCondition(
                    key="category",
                    match=models.MatchValue(value=user_preference["category"]),
                )
            )

        # Rating filtering
        if user_preference.get("min_rating") is not None:
            filter_conditions.append(
                models.FieldCondition(
                    key="rating",
                    range=models.Range(gte=user_preference["min_rating"])
                )
            )

        # Recency filtering
        if user_preference.get("released_within_days"):
            days = int(user_preference["released_within_days"])
            filter_conditions.append(
                models.FieldCondition(
                    key="release_date",
                    range=models.DatetimeRange(
                        gte=(datetime.utcnow() - timedelta(days=days)).isoformat()
                    ),
                )
            )

    return models.Filter(must=filter_conditions) if filter_conditions else None

推荐函数

现在使用辅助函数创建主推荐函数。

def get_recommendations(user_profile, user_preference=None, limit=10):
    """
    Get personalized recommendations using Universal Query API

    Args:
        user_profile: {
            "liked_titles": list[str],      # optional
            "preferred_genres": list[str],  # e.g. ["sci-fi","action"]
            "segment": str,                 # e.g. "premium"
            "query": str                    # free-text intent, e.g. "smart sci-fi with hacker vibe"
        }
        user_preference: {
            "category": str | None,         # e.g. "movie"
            "min_rating": float | None,     # e.g. 8.0
            "released_within_days": int | None  # e.g. 365
        }
        limit: top-k to return
    """

    # Generate query embeddings
    user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
    user_sparse_vector = next(
        sparse_model.query_embed(user_profile["query"])
    ).as_object()
    user_multivector = next(colbert_model.query_embed(user_profile["query"]))

    # Build global filter using helper function
    global_filter = build_recommendation_filter(user_profile, user_preference)

    # Prefetch queries - global filter will propagate automatically
    hybrid_query = [
        models.Prefetch(query=user_dense_vector, using="dense", limit=100),
        models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
    ]

    # Combine candidates with RRF
    fusion_query = models.Prefetch(
        prefetch=hybrid_query,
        query=models.FusionQuery(fusion=models.Fusion.RRF),
        limit=100,
    )

    # Universal query - global filter propagates to all stages
    response = client.query_points(
        collection_name=collection_name,
        prefetch=fusion_query,
        query=user_multivector,
        using="colbert",
        query_filter=global_filter,  # Propagates to all prefetch stages
        limit=limit,
        with_payload=True,
    )

    return [
        {
            "title": hit.payload["title"],
            "description": hit.payload["description"],
            "score": hit.score,
            "metadata": {
                k: v
                for k, v in hit.payload.items()
                if k not in ["title", "description"]
            },
        }
        for hit in (response.points or [])
    ]

测试服务

让我们测试推荐函数。

# Test the recommendation service
user_profile = {
    "liked_titles": ["The Matrix", "Blade Runner"],
    "preferred_genres": ["sci-fi", "action"],
    "segment": "premium",
    "query": "highly rated cyberpunk movies",
}

recommendations = get_recommendations(
    user_profile,
    user_preference={
        "category": "movie",
        "min_rating": 8.0,
        "released_within_days": 365 * 30,
    },
    limit=10,
)

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec['title']} (Score: {rec['score']:.3f})")

幕后发生了什么:Qdrant 并行从稠密检索 100 个候选,从稀疏检索 100 个候选,用 RRF 融合它们,用 ColBERT 的 MaxSim 对 token 级子向量进行重新排序,应用最终的业务筛选器,并返回前 10 个——所有这些都在一个调用中完成。

成功标准

你将在以下情况下知道你已成功:

您的集合包含稠密、稀疏和 ColBERT 向量。
您可以在单个 API 调用中执行复杂的多阶段搜索。
RRF 融合有效地结合了不同的向量类型。
ColBERT 重新排序提高了结果相关性。
业务筛选器自动传播到所有预取阶段。
您的推荐服务提供个性化、高质量的结果。

分享你的发现

步骤 1:反思你的发现

  1. 通用查询 API 如何简化您的推荐管道(更少的调用、更少的连接、更少的代码)?
  2. 对于您的查询,哪种融合策略(RRF vs. DBSF)排名更好?
  3. ColBERT 重新排序对推荐质量有何影响(top-k 变化、点击类信号、nDCG/MRR)?
  4. 多阶段筛选(预取、重新排序、总计)对延迟有何影响?

步骤 2:发布你的结果

将你的结果发布到 Post your results in Discord ,使用此模板:

**[Day N] Recommendations with the Universal Query API**

**High-Level Summary**
- **Domain:** "I built recommendations for [domain]"
- **Key Result:** "Using [RRF/DBSF] + ColBERT rerank improved [metric] to [value] with total latency [Z] ms."

**Reproducibility**
- **Collection:** [name]
- **Models:** dense=[id, dim], sparse=[method], colbert=[id, dim]
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)

**Settings (today)**
- **Fusion:** [RRF/DBSF], k_dense=[..], k_sparse=[..]
- **Reranker:** ColBERT (MaxSim), top-k=[..]
- **Filters:** category=[...], segment=[...], rating≥..., release_date≥...
- **Filter propagation:** applied to all prefetch stages

**Results (demo query: "[user intent]")**
- **Top picks (rank → title → score):**
  1) ...
  2) ...
  3) ...
- **Why these won:** [token hit like “hacker”, genre match, rating signal]
- **Latency:** prefetch ~[X] ms | rerank ~[Y] ms | total ~[Z] ms
- **Dropped by rules:** [ids/titles → which rule]

**Surprise**
- "[one thing you didn’t expect]"

**Next step**
- "[what you’ll try next]"

可选:更进一步

试验融合策略

比较 RRF 和基于分布的分数融合

# Test DBSF vs RRF
# Filters and other prefetch params same as above

fusion_query = models.Prefetch(
    prefetch=hybrid_query,
    query=models.FusionQuery(fusion=models.Fusion.DBSF),
    limit=100,
)

response = client.query_points(
    collection_name=collection_name,
    prefetch=fusion_query,
    query=user_multivector,
    using="colbert",
    query_filter=global_filter,  # Same global filter propagates to all stages
    limit=10,
    with_payload=True,
)

for hit in response.points or []:
    print(hit.payload)

A/B 测试框架

构建一个框架,系统地比较多个用户配置文件上的融合策略。

初始化测试函数

设置函数结构和结果跟踪。

def ab_test_fusion_strategies(user_profiles, user_preferences):
    """Compare RRF vs DBSF performance"""

    results = {"RRF": [], "DBSF": []}

    for user_profile, user_preference in zip(user_profiles, user_preferences):
        for fusion_type in [models.Fusion.RRF, models.Fusion.DBSF]:
            # Run recommendation query
            user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
            user_sparse_vector = next(
                sparse_model.query_embed(user_profile["query"])
            ).as_object()
            user_multivector = next(colbert_model.query_embed(user_profile["query"]))

            # Build global filter using helper function
            global_filter = build_recommendation_filter(user_profile, user_preference)
            
            # Prefetch queries - global filter will propagate automatically
            hybrid_query = [
                models.Prefetch(query=user_dense_vector, using="dense", limit=100),
                models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
            ]

            # Combine candidates with chosen fusion strategy
            fusion_query = models.Prefetch(
                prefetch=hybrid_query,
                query=models.FusionQuery(fusion=fusion_type),
                limit=100,
            )

            response = client.query_points(
                collection_name=collection_name,
                prefetch=fusion_query,
                query=user_multivector,
                using="colbert",
                query_filter=global_filter,  # Propagates to all prefetch stages
                limit=10,
                with_payload=True,
            )

            strategy_name = "RRF" if fusion_type == models.Fusion.RRF else "DBSF"
            results[strategy_name].append(
                {
                    "user_id": user_profile["user_id"],
                    "recommendations": [hit.id for hit in response.points],
                    "scores": [hit.score for hit in response.points],
                }
            )

    return results

后续步骤

将其转化为一个微型服务:摄取您自己的项目和用户信号,编写一个小型函数,该函数接受配置文件向量并通过通用查询 API 返回 top-k,然后尝试使用筛选器和融合策略(尝试 DBSF vs RRF),看看排名如何变化。