项目:构建推荐系统
在一个原子式的通用查询中汇集稠密、稀疏和多向量。您将检索候选、融合信号、使用 ColBERT 重新排序并应用业务筛选器——所有这些都在一个请求中完成。
你的任务
使用 Qdrant 的通用查询 API 构建一个完整的推荐系统,在一个请求中包含稠密、稀疏和 ColBERT 多向量。
预计时间:90 分钟
您将构建什么
使用以下技术构建混合推荐系统:
- 多向量架构,包含稠密、稀疏和 ColBERT 向量
- 通用查询 API 用于原子多阶段搜索
- RRF 融合 用于组合候选
- ColBERT 重新排序 用于细粒度相关性评分
- 在多个管道阶段进行业务规则筛选
- 推荐系统的生产就绪模式
设置
先决条件
- Qdrant 云集群(URL + API 密钥)
- Python 3.9+(或 Google Colab)
- 包:
qdrant-client,fastembed
模型
- 稠密:
sentence-transformers/all-MiniLM-L6-v2(384 维) - 稀疏:
prithivida/Splade_PP_en_v1(SPLADE) - 多向量:
colbert-ir/colbertv2.0(128 维 tokens)
数据集
- 范围:一小部分示例项目(例如,10-20 部电影)。
- Payload 字段:
title,description,category,genre,year,rating,user_segment,popularity_score,release_date。 - 使用的筛选器:
category,user_segment,release_date,popularity_score。
构建步骤
第 1 步:设置混合集合
初始化客户端和集合
首先,连接到 Qdrant 并为我们的推荐系统创建一个干净的集合。
from datetime import datetime
from qdrant_client import QdrantClient, models
import os
client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))
# For Colab:
# from google.colab import userdata
# client = QdrantClient(url=userdata.get("QDRANT_URL"), api_key=userdata.get("QDRANT_API_KEY"))
collection_name = "day5_recommendations_hybrid"
# Clean state
if client.collection_exists(collection_name=collection_name):
client.delete_collection(collection_name=collection_name)
现在配置一个包含三个向量的集合——每个向量在我们的推荐管道中都有不同的用途。
client.create_collection(
collection_name=collection_name,
vectors_config={
# Dense vectors for semantic understanding
"dense": models.VectorParams(size=384, distance=models.Distance.COSINE),
# ColBERT multivectors for fine-grained reranking
"colbert": models.VectorParams(
size=128,
distance=models.Distance.COSINE,
multivector_config=models.MultiVectorConfig(
comparator=models.MultiVectorComparator.MAX_SIM
),
hnsw_config=models.HnswConfigDiff(
m=0 # Disable HNSW - used only for reranking
),
),
},
sparse_vectors_config={
# Sparse vectors for exact keyword matching
"sparse": models.SparseVectorParams(
index=models.SparseIndexParams(on_disk=False)
)
},
)
为何如此设置:ColBERT 使用 MAX_SIM 进行 token 级比较,并使用 m=0,因为它仅用于重新排序,而非初始检索,将 m=0 设置为有效禁用 HNSW 索引以节省内存和计算。
创建 Payload 索引
在摄取数据之前,为我们将要筛选的字段创建索引。这使得在向量搜索期间能够高效筛选。
# Business metadata indexes
client.create_payload_index(
collection_name=collection_name,
field_name="category",
field_schema="keyword",
)
client.create_payload_index(
collection_name=collection_name,
field_name="user_segment",
field_schema="keyword",
)
# Quality and recency indexes
client.create_payload_index(
collection_name=collection_name,
field_name="release_date",
field_schema="datetime",
)
client.create_payload_index(
collection_name=collection_name,
field_name="popularity_score",
field_schema="float",
)
client.create_payload_index(
collection_name=collection_name,
field_name="rating",
field_schema="float",
)
第 2 步:准备并上传推荐数据
创建样本推荐数据
让我们创建带有业务元数据的电影样本数据以进行筛选。
# Example: Create sample movie/content data
sample_data = [
{
"title": "The Matrix",
"description": "A computer hacker learns about the true nature of reality and his role in the war against its controllers.",
"category": "movie",
"genre": ["sci-fi", "action"],
"year": 1999,
"rating": 8.7,
"user_segment": "premium",
"popularity_score": 0.9,
"release_date": "1999-03-31T00:00:00Z",
},
# Add more sample data...
]
texts = [it["description"] for it in sample_data]
初始化嵌入模型
我们将使用 FastEmbed 生成所有三种向量类型。
from fastembed import TextEmbedding, SparseTextEmbedding, LateInteractionTextEmbedding
# Model configurations
DENSE_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2" # 384-dim
SPARSE_MODEL_ID = "prithivida/Splade_PP_en_v1" # SPLADE sparse
COLBERT_MODEL_ID = "colbert-ir/colbertv2.0" # 128-dim multivector
dense_model = TextEmbedding(DENSE_MODEL_ID)
sparse_model = SparseTextEmbedding(SPARSE_MODEL_ID)
colbert_model = LateInteractionTextEmbedding(COLBERT_MODEL_ID)
生成嵌入
使用所有三种嵌入模型编码所有描述。
# Generate embeddings for all items
dense_embeds = list(
dense_model.embed(texts, parallel=0)
) # list[np.ndarray] shape (384,)
sparse_embeds = list(
sparse_model.embed(texts, parallel=0)
) # list[SparseEmbedding] with .indices/.values
colbert_multivectors = list(
colbert_model.embed(texts, parallel=0)
) # list[np.ndarray] shape (tokens, 128)
创建点并上传
现在将所有内容打包成点并上传到 Qdrant。
# Generate vectors for each item
points = []
for i, item in enumerate(sample_data):
# Create sparse vector (keyword matching)
sparse_vector = sparse_embeds[i].as_object()
# Create dense vector (semantic understanding)
dense_vector = dense_embeds[i]
# Create ColBERT multivector (token-level understanding)
colbert_vector = colbert_multivectors[i]
points.append(
models.PointStruct(
id=i,
vector={
"dense": dense_vector,
"sparse": sparse_vector,
"colbert": colbert_vector,
},
payload=item,
)
)
client.upload_points(collection_name=collection_name, points=points)
print(f"Uploaded {len(points)} recommendation items")
第 3 步:一个通用查询 — 检索 → 融合 → 重新排序 → 筛选
现在让我们构建一个完整的多阶段搜索,将所有内容组合在一个请求中。
准备查询嵌入
首先,使用所有三种嵌入模型编码用户的搜索意图。
from datetime import timedelta
# Example user intent
user_query = "premium user likes sci-fi action movies with strong hacker themes"
user_dense_vector = next(dense_model.query_embed(user_query))
user_sparse_vector = next(sparse_model.query_embed(user_query)).as_object()
user_multivector = next(colbert_model.query_embed(user_query))
定义具有自动传播的全局筛选器
创建一个将在管道所有阶段自动传播的单一筛选器。
# Global filter - this will be propagated to ALL prefetch stages
global_filter = models.Filter(
must=[
# Content type and user segment
models.FieldCondition(
key="category", match=models.MatchValue(value="movie")
),
models.FieldCondition(
key="user_segment", match=models.MatchValue(value="premium")
),
# Quality and recency constraints
models.FieldCondition(
key="release_date",
range=models.DatetimeRange(
gte=(datetime.now() - timedelta(days=365 * 30)).isoformat()
),
),
models.FieldCondition(key="popularity_score", range=models.Range(gte=0.7)),
]
)
关键见解:此筛选器自动传播到所有预取阶段。Qdrant 不进行“延迟筛选”或“后筛选”——筛选器在 HNSW 搜索级别应用,以实现最大效率。
设置预取和融合
配置并行检索和融合策略。
# Prefetch queries - global filter will be automatically applied to both
hybrid_query = [
models.Prefetch(query=user_dense_vector, using="dense", limit=100),
models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
]
# Fusion stage - combine candidates with RRF
fusion_query = models.Prefetch(
prefetch=hybrid_query,
query=models.FusionQuery(fusion=models.Fusion.RRF),
limit=100,
)
这些预取查询并行运行,主查询中的全局筛选器将自动传播到稠密和稀疏搜索。
执行带重新排序的通用查询
最后,发送带有 ColBERT 重新排序的完整查询。
# The Universal Query: Global filter propagates through all stages
response = client.query_points(
collection_name=collection_name,
prefetch=fusion_query,
query=user_multivector,
using="colbert",
query_filter=global_filter, # Propagates to all prefetch stages
limit=10,
with_payload=True,
)
for hit in response.points or []:
print(hit.payload)
为何如此有效:稠密和稀疏检索并行发生(应用筛选器),RRF 融合结果,ColBERT 以 token 级精度重新排序,并且全局筛选器通过自动传播应用于每个阶段——所有这些都在一个原子 API 调用中完成。
第 4 步:构建推荐服务
现在让我们将所有内容封装到一个生产就绪的函数中,该函数处理动态用户偏好。
构建筛选器的辅助函数
首先,创建一个可重用的辅助函数,用于从用户配置文件构建筛选器。
def build_recommendation_filter(user_profile, user_preference=None):
"""
Build a global filter from user profile and preferences.
This filter will automatically propagate to all prefetch stages.
Args:
user_profile: {
"liked_titles": list[str], # optional
"preferred_genres": list[str], # e.g. ["sci-fi","action"]
"segment": str, # e.g. "premium"
"query": str # free-text intent, e.g. "smart sci-fi with hacker vibe"
}
user_preference: {
"category": str | None, # e.g. "movie"
"min_rating": float | None, # e.g. 8.0
"released_within_days": int | None # e.g. 365
}
Returns:
models.Filter object or None if no conditions
"""
from datetime import datetime, timedelta
filter_conditions = []
# User segment filtering
if user_profile.get("segment"):
filter_conditions.append(
models.FieldCondition(
key="user_segment",
match=models.MatchValue(value=user_profile["segment"]),
)
)
if user_preference:
# Category filtering
if user_preference.get("category"):
filter_conditions.append(
models.FieldCondition(
key="category",
match=models.MatchValue(value=user_preference["category"]),
)
)
# Rating filtering
if user_preference.get("min_rating") is not None:
filter_conditions.append(
models.FieldCondition(
key="rating",
range=models.Range(gte=user_preference["min_rating"])
)
)
# Recency filtering
if user_preference.get("released_within_days"):
days = int(user_preference["released_within_days"])
filter_conditions.append(
models.FieldCondition(
key="release_date",
range=models.DatetimeRange(
gte=(datetime.utcnow() - timedelta(days=days)).isoformat()
),
)
)
return models.Filter(must=filter_conditions) if filter_conditions else None
推荐函数
现在使用辅助函数创建主推荐函数。
def get_recommendations(user_profile, user_preference=None, limit=10):
"""
Get personalized recommendations using Universal Query API
Args:
user_profile: {
"liked_titles": list[str], # optional
"preferred_genres": list[str], # e.g. ["sci-fi","action"]
"segment": str, # e.g. "premium"
"query": str # free-text intent, e.g. "smart sci-fi with hacker vibe"
}
user_preference: {
"category": str | None, # e.g. "movie"
"min_rating": float | None, # e.g. 8.0
"released_within_days": int | None # e.g. 365
}
limit: top-k to return
"""
# Generate query embeddings
user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
user_sparse_vector = next(
sparse_model.query_embed(user_profile["query"])
).as_object()
user_multivector = next(colbert_model.query_embed(user_profile["query"]))
# Build global filter using helper function
global_filter = build_recommendation_filter(user_profile, user_preference)
# Prefetch queries - global filter will propagate automatically
hybrid_query = [
models.Prefetch(query=user_dense_vector, using="dense", limit=100),
models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
]
# Combine candidates with RRF
fusion_query = models.Prefetch(
prefetch=hybrid_query,
query=models.FusionQuery(fusion=models.Fusion.RRF),
limit=100,
)
# Universal query - global filter propagates to all stages
response = client.query_points(
collection_name=collection_name,
prefetch=fusion_query,
query=user_multivector,
using="colbert",
query_filter=global_filter, # Propagates to all prefetch stages
limit=limit,
with_payload=True,
)
return [
{
"title": hit.payload["title"],
"description": hit.payload["description"],
"score": hit.score,
"metadata": {
k: v
for k, v in hit.payload.items()
if k not in ["title", "description"]
},
}
for hit in (response.points or [])
]
测试服务
让我们测试推荐函数。
# Test the recommendation service
user_profile = {
"liked_titles": ["The Matrix", "Blade Runner"],
"preferred_genres": ["sci-fi", "action"],
"segment": "premium",
"query": "highly rated cyberpunk movies",
}
recommendations = get_recommendations(
user_profile,
user_preference={
"category": "movie",
"min_rating": 8.0,
"released_within_days": 365 * 30,
},
limit=10,
)
for i, rec in enumerate(recommendations, 1):
print(f"{i}. {rec['title']} (Score: {rec['score']:.3f})")
幕后发生了什么:Qdrant 并行从稠密检索 100 个候选,从稀疏检索 100 个候选,用 RRF 融合它们,用 ColBERT 的 MaxSim 对 token 级子向量进行重新排序,应用最终的业务筛选器,并返回前 10 个——所有这些都在一个调用中完成。
成功标准
你将在以下情况下知道你已成功:
您的集合包含稠密、稀疏和 ColBERT 向量。
您可以在单个 API 调用中执行复杂的多阶段搜索。
RRF 融合有效地结合了不同的向量类型。
ColBERT 重新排序提高了结果相关性。
业务筛选器自动传播到所有预取阶段。
您的推荐服务提供个性化、高质量的结果。
分享你的发现
步骤 1:反思你的发现
- 通用查询 API 如何简化您的推荐管道(更少的调用、更少的连接、更少的代码)?
- 对于您的查询,哪种融合策略(RRF vs. DBSF)排名更好?
- ColBERT 重新排序对推荐质量有何影响(top-k 变化、点击类信号、nDCG/MRR)?
- 多阶段筛选(预取、重新排序、总计)对延迟有何影响?
步骤 2:发布你的结果
**[Day N] Recommendations with the Universal Query API**
**High-Level Summary**
- **Domain:** "I built recommendations for [domain]"
- **Key Result:** "Using [RRF/DBSF] + ColBERT rerank improved [metric] to [value] with total latency [Z] ms."
**Reproducibility**
- **Collection:** [name]
- **Models:** dense=[id, dim], sparse=[method], colbert=[id, dim]
- **Dataset:** [N items] (snapshot: YYYY-MM-DD)
**Settings (today)**
- **Fusion:** [RRF/DBSF], k_dense=[..], k_sparse=[..]
- **Reranker:** ColBERT (MaxSim), top-k=[..]
- **Filters:** category=[...], segment=[...], rating≥..., release_date≥...
- **Filter propagation:** applied to all prefetch stages
**Results (demo query: "[user intent]")**
- **Top picks (rank → title → score):**
1) ...
2) ...
3) ...
- **Why these won:** [token hit like “hacker”, genre match, rating signal]
- **Latency:** prefetch ~[X] ms | rerank ~[Y] ms | total ~[Z] ms
- **Dropped by rules:** [ids/titles → which rule]
**Surprise**
- "[one thing you didn’t expect]"
**Next step**
- "[what you’ll try next]"
可选:更进一步
试验融合策略
比较 RRF 和基于分布的分数融合
# Test DBSF vs RRF
# Filters and other prefetch params same as above
fusion_query = models.Prefetch(
prefetch=hybrid_query,
query=models.FusionQuery(fusion=models.Fusion.DBSF),
limit=100,
)
response = client.query_points(
collection_name=collection_name,
prefetch=fusion_query,
query=user_multivector,
using="colbert",
query_filter=global_filter, # Same global filter propagates to all stages
limit=10,
with_payload=True,
)
for hit in response.points or []:
print(hit.payload)
A/B 测试框架
构建一个框架,系统地比较多个用户配置文件上的融合策略。
初始化测试函数
设置函数结构和结果跟踪。
def ab_test_fusion_strategies(user_profiles, user_preferences):
"""Compare RRF vs DBSF performance"""
results = {"RRF": [], "DBSF": []}
for user_profile, user_preference in zip(user_profiles, user_preferences):
for fusion_type in [models.Fusion.RRF, models.Fusion.DBSF]:
# Run recommendation query
user_dense_vector = next(dense_model.query_embed(user_profile["query"]))
user_sparse_vector = next(
sparse_model.query_embed(user_profile["query"])
).as_object()
user_multivector = next(colbert_model.query_embed(user_profile["query"]))
# Build global filter using helper function
global_filter = build_recommendation_filter(user_profile, user_preference)
# Prefetch queries - global filter will propagate automatically
hybrid_query = [
models.Prefetch(query=user_dense_vector, using="dense", limit=100),
models.Prefetch(query=user_sparse_vector, using="sparse", limit=100),
]
# Combine candidates with chosen fusion strategy
fusion_query = models.Prefetch(
prefetch=hybrid_query,
query=models.FusionQuery(fusion=fusion_type),
limit=100,
)
response = client.query_points(
collection_name=collection_name,
prefetch=fusion_query,
query=user_multivector,
using="colbert",
query_filter=global_filter, # Propagates to all prefetch stages
limit=10,
with_payload=True,
)
strategy_name = "RRF" if fusion_type == models.Fusion.RRF else "DBSF"
results[strategy_name].append(
{
"user_id": user_profile["user_id"],
"recommendations": [hit.id for hit in response.points],
"scores": [hit.score for hit in response.points],
}
)
return results
后续步骤
将其转化为一个微型服务:摄取您自己的项目和用户信号,编写一个小型函数,该函数接受配置文件向量并通过通用查询 API 返回 top-k,然后尝试使用筛选器和融合策略(尝试 DBSF vs RRF),看看排名如何变化。