在 Qdrant 中实现零停机迁移到新的 Embedding 模型
| 预计耗时:40 分钟 | 难度:中等 |
|---|
在构建语义搜索应用时,你需要选择一个 Embedding 模型。随着时间的推移,你可能为了更好的质量或成本效益而需要切换到其他模型。如果你的应用处于生产环境,此过程必须实现零停机,以避免干扰用户。切换模型需要重新对集合中的所有向量进行 Embedding,这可能需要一些时间。如果你的数据不发生变化,你可以重新进行 Embedding 并切换到新模型。然而,在有频繁更新的系统中,停止搜索服务以进行重新 Embedding 是不可行的。
本教程将分步指导你完成迁移到新模型的过程,包括项目中需要做的修改。所有示例均使用 Python SDK,但同样原则也适用于其他语言。
解决方案
通过使用包含两个集合的蓝绿部署(Blue-Green Deployment),可以实现零停机切换 Embedding 模型。第一个集合包含旧的 Embedding,第二个集合用于存储新的 Embedding。迁移过程会将数据从旧集合复制到新集合,并使用新模型对向量进行重新 Embedding。在迁移期间,你可以继续搜索旧集合,同时将数据更新同步写入两个集合。一旦所有向量完成重新 Embedding,即可切换搜索至新集合。

蓝绿部署中的嵌入模型迁移
重新 Embedding 需要访问创建 Embedding 时使用的原始数据。这些数据可以来自主数据库,也可以存储在 Qdrant 点(point)的 Payload 中。本教程假设必要数据存储在 Payload 中。这种情况很常见,因为 Payload 通常包含用于生成 Embedding 的文本或其他数据。
本教程中概述的解决方案仅适用于 upsert(插入/更新)操作。如果你使用了删除或部分更新操作,则需要在迁移期间暂停这些操作,或者实现额外的逻辑来处理它们。
本教程假设你使用 Qdrant Cloud Inference 来生成向量 Embedding。如果你管理自己的 Embedding 基础设施,也可以应用相同的原则,但需要调整代码示例以使用你的 Embedding 服务。
第 1 步:创建新集合
第一步是在 Qdrant 中创建一个新集合,用于存储新的 Embedding,该集合在向量维度和相似度函数方面需与新模型兼容。
client.create_collection(
collection_name=NEW_COLLECTION,
vectors_config=(
models.VectorParams(
size=512, # Size of the new embedding vectors
distance=models.Distance.COSINE # Similarity function for the new model
)
)
)
await client.createCollection(NEW_COLLECTION, {
vectors: {
size: 512, // Size of the new embedding vectors
distance: "Cosine", // Similarity function for the new model
},
});
client
.create_collection(
CreateCollectionBuilder::new(new_collection)
.vectors_config(VectorParamsBuilder::new(512, Distance::Cosine)), // Size of the new embedding vectors
)
.await?;
client.createCollectionAsync(NEW_COLLECTION,
VectorParams.newBuilder()
.setSize(512) // Size of the new embedding vectors
.setDistance(Distance.Cosine) // Similarity function for the new model
.build()).get();
await client.CreateCollectionAsync(
collectionName: NEW_COLLECTION,
vectorsConfig: new VectorParams { Size = 512, Distance = Distance.Cosine }
);
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: NEW_COLLECTION,
VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
Size: 512, // Size of the new embedding vectors
Distance: qdrant.Distance_Cosine,
}),
})
现在也是考虑更改集合其他设置(如自定义分片、副本因子等)的好时机。切换模型可能是一个提高搜索性能的良好契机。
新创建的集合是空的,已准备好用于存储新的 Embedding。
第 2 步:启用双写(Dual Writes)
为了确保在迁移期间两个集合保持同步,你需要同时向两个集合写入任何更改。这样,任何新数据或对现有数据的更新都会在两个集合中得到反映。
理想情况下,Qdrant 中的数据是由从更新队列读取的更新服务进行更新的。该服务负责对文档进行 Embedding 并将其写入 Qdrant。它使用的代码类似于:
client.upsert(
collection_name=OLD_COLLECTION,
points=[
models.PointStruct(
id=1,
vector=models.Document(
text="Example document",
model=OLD_MODEL,
),
payload={"text": "Example document"}
)
]
)
await client.upsert(OLD_COLLECTION, {
points: [
{
id: 1,
vector: {
text: "Example document",
model: OLD_MODEL,
},
payload: { text: "Example document" },
},
],
});
client
.upsert_points(UpsertPointsBuilder::new(
old_collection,
vec![PointStruct::new(
1,
Document::new("Example document", old_model),
[("text", "Example document".into())],
)],
))
.await?;
client.upsertAsync(OLD_COLLECTION, List.of(
PointStruct.newBuilder()
.setId(id(1))
.setVectors(
vectors(
vector(
Document.newBuilder()
.setText("Example document")
.setModel(OLD_MODEL)
.build())))
.putAllPayload(Map.of("text", value("Example document")))
.build())).get();
await client.UpsertAsync(
collectionName: OLD_COLLECTION,
points: new List<PointStruct>
{
new()
{
Id = 1,
Vectors = new Document
{
Text = "Example document",
Model = OLD_MODEL
},
Payload = { ["text"] = "Example document" }
}
}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: OLD_COLLECTION,
Points: []*qdrant.PointStruct{
{
Id: qdrant.NewIDNum(1),
Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
Text: "Example document",
Model: OLD_MODEL,
}),
Payload: qdrant.NewValueMap(map[string]any{"text": "Example document"}),
},
},
})
要更新新集合,请部署第二个服务,与现有服务并行更新新集合。该服务使用新的 Embedding 模型对文档进行编码并将其写入新集合。
client.upsert(
collection_name=NEW_COLLECTION,
points=[
models.PointStruct(
id=1,
# Use the new embedding model to encode the document
vector=models.Document(
text="Example document",
model=NEW_MODEL,
),
payload={"text": "Example document"}
)
]
)
await client.upsert(NEW_COLLECTION, {
points: [
{
id: 1,
// Use the new embedding model to encode the document
vector: {
text: "Example document",
model: NEW_MODEL,
},
payload: { text: "Example document" },
},
],
});
client
.upsert_points(UpsertPointsBuilder::new(
new_collection,
vec![PointStruct::new(
1,
// Use the new embedding model to encode the document
Document::new("Example document", new_model),
[("text", "Example document".into())],
)],
))
.await?;
client.upsertAsync(NEW_COLLECTION, List.of(
PointStruct.newBuilder()
.setId(id(1))
// Use the new embedding model to encode the document
.setVectors(
vectors(
vector(
Document.newBuilder()
.setText("Example document")
.setModel(NEW_MODEL)
.build())))
.putAllPayload(Map.of("text", value("Example document")))
.build())).get();
await client.UpsertAsync(
collectionName: NEW_COLLECTION,
points: new List<PointStruct>
{
new()
{
Id = 1,
// Use the new embedding model to encode the document
Vectors = new Document
{
Text = "Example document",
Model = NEW_MODEL
},
Payload = { ["text"] = "Example document" }
}
}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: NEW_COLLECTION,
Points: []*qdrant.PointStruct{
{
Id: qdrant.NewIDNum(1),
// Use the new embedding model to encode the document
Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
Text: "Example document",
Model: NEW_MODEL,
}),
Payload: qdrant.NewValueMap(map[string]any{"text": "Example document"}),
},
},
})
一个良好的做法是始终确保两次操作都成功。任何错误都需要在客户端进行处理。你可以将错误存储在日志或“死信队列”中以便后续处理。瞬时错误可以在稍后重试。其他错误则需要进行相应的分析和解决。
如果你使用的是单体应用而不是更新服务,则需要在过渡期间修改应用代码,使其同时写入两个集合。在处理文档 Embedding 的代码逻辑中,你应该添加写入两个集合的逻辑。
请注意,本教程中概述的方法仅适用于 upsert 操作。例如,如果一个点在旧集合中存在但在新集合中尚未存在,delete 操作在新集合上会失败,并且该点随后可能会被迁移过程错误地添加。如果你使用以下方法修改集合中的点,则需要在迁移期间暂停这些操作或实现额外的逻辑来处理它们:
.delete- 从集合中删除指定的点.update_vectors- 更新点上的指定向量.delete_vectors- 从点中删除指定的向量.set_payload- 为指定点设置 Payload 值.overwrite_payload- 用新的 Payload 覆盖指定点的整个 Payload.delete_payload- 为点删除指定的 Key Payload.clear_payload- 移除指定点的整个 Payload.batch_update_points- 对点进行批量更新,包括它们各自的向量和 Payload
请参阅你所使用的 SDK 文档,或 HTTP/gRPC 定义以获取精确的方法名称,因为它们可能因语言而异。
完成这些更改后,你将处于双写模式,任何更改都会被写入旧集合和新集合。这使你能够在迁移过程中保持两个集合的同步。
第 3 步:将现有数据点迁移到新集合
现在你处于双写模式,是时候将现有数据点从旧集合迁移到新集合了。这可以在与常规 Upsert 服务并行运行的独立进程中完成。
迁移过程会从旧集合中读取点,使用新模型重新进行 Embedding,并将它们写入新集合,确保不会覆盖由更新服务插入的现有点。以下是此类迁移过程的代码示例:
last_offset = None
batch_size = 100 # Number of points to read in each batch
reached_end = False
while not reached_end:
# Get the next batch of points from the old collection
records, last_offset = client.scroll(
collection_name=OLD_COLLECTION,
limit=batch_size,
offset=last_offset,
# Include payloads in the response, as we need them to re-embed the vectors
with_payload=True,
# We don't need the old vectors, so let's save on the bandwidth
with_vectors=False,
)
# Re-embed the points using the new model
points = [
models.PointStruct(
# Keep the original ID to ensure consistency
id=record.id,
# Use the new embedding model to encode the text from the payload,
# assuming that was the original source of the embedding
vector=models.Document(
text=(record.payload or {}).get("text", ""),
model=NEW_MODEL,
),
# Keep the original payload
payload=record.payload
)
for record in records
]
# Upsert the re-embedded points into the new collection
client.upsert(
collection_name=NEW_COLLECTION,
points=points,
# Only insert the point if a point with this ID does not already exist.
update_mode=models.UpdateMode.INSERT_ONLY
)
# Check if we reached the end of the collection
reached_end = (last_offset == None)
let lastOffset: number | string | undefined = undefined;
const batchSize = 100; // Number of points to read in each batch
let reachedEnd = false;
while (!reachedEnd) {
// Get the next batch of points from the old collection
const scrollResult = await client.scroll(OLD_COLLECTION, {
limit: batchSize,
offset: lastOffset,
// Include payloads in the response, as we need them to re-embed the vectors
with_payload: true,
// We don't need the old vectors, so let's save on the bandwidth
with_vector: false,
});
const records = scrollResult.points;
lastOffset = scrollResult.next_page_offset as number | string | undefined;
// Re-embed the points using the new model
const points = records.map((record) => ({
// Keep the original ID to ensure consistency
id: record.id,
// Use the new embedding model to encode the text from the payload,
// assuming that was the original source of the embedding
vector: {
text: ((record.payload?.text as string) ?? ""),
model: NEW_MODEL,
},
// Keep the original payload
payload: record.payload,
}));
// Upsert the re-embedded points into the new collection
await client.upsert(NEW_COLLECTION, {
points,
// Only insert the point if a point with this ID does not already exist.
update_mode: "insert_only" as const,
});
// Check if we reached the end of the collection
reachedEnd = lastOffset == null;
}
let mut last_offset = None;
let batch_size = 100; // Number of points to read in each batch
loop {
// Get the next batch of points from the old collection
let mut scroll_builder = ScrollPointsBuilder::new(old_collection)
.limit(batch_size)
// Include payloads in the response, as we need them to re-embed the vectors
.with_payload(true)
// We don't need the old vectors, so let's save on the bandwidth
.with_vectors(false);
if let Some(offset) = last_offset {
scroll_builder = scroll_builder.offset(offset);
}
let scroll_result = client.scroll(scroll_builder).await?;
let records = scroll_result.result;
last_offset = scroll_result.next_page_offset;
// Re-embed the points using the new model
let points: Vec<PointStruct> = records
.iter()
.map(|record| {
PointStruct::new(
// Keep the original ID to ensure consistency
record.id.clone().unwrap(),
// Use the new embedding model to encode the text from the payload,
// assuming that was the original source of the embedding
Document::new(
record.payload.get("text")
.and_then(|v| v.as_str())
.map_or("", |v| v),
new_model,
),
// Keep the original payload
record.payload.clone(),
)
})
.collect();
// Upsert the re-embedded points into the new collection
client
.upsert_points(
// Only insert the point if a point with this ID does not already exist.
UpsertPointsBuilder::new(new_collection, points)
.update_mode(UpdateMode::InsertOnly),
)
.await?;
// Check if we reached the end of the collection
if last_offset.is_none() {
break;
}
}
int batchSize = 100; // Number of points to read in each batch
boolean reachedEnd = false;
// Get the next batch of points from the old collection
var scrollBuilder = ScrollPoints.newBuilder()
.setCollectionName(OLD_COLLECTION)
.setLimit(batchSize)
// Include payloads in the response, as we need them to re-embed the vectors
.setWithPayload(WithPayloadSelectorFactory.enable(true))
// We don't need the old vectors, so let's save on the bandwidth
.setWithVectors(WithVectorsSelectorFactory.enable(false));
while (!reachedEnd) {
var scrollResult = client.scrollAsync(scrollBuilder.build()).get();
var records = scrollResult.getResultList();
// Re-embed the points using the new model
List<PointStruct> points = new ArrayList<>();
for (var record : records) {
String text = record.getPayloadMap().containsKey("text")
? record.getPayloadMap().get("text").getStringValue()
: "";
points.add(
PointStruct.newBuilder()
// Keep the original ID to ensure consistency
.setId(record.getId())
// Use the new embedding model to encode the text from the payload,
// assuming that was the original source of the embedding
.setVectors(
vectors(
vector(
Document.newBuilder()
.setText(text)
.setModel(NEW_MODEL)
.build())))
// Keep the original payload
.putAllPayload(record.getPayloadMap())
.build());
}
// Upsert the re-embedded points into the new collection
client.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName(NEW_COLLECTION)
.addAllPoints(points)
// Only insert the point if a point with this ID does not already exist.
.setUpdateMode(UpdateMode.InsertOnly)
.build()).get();
// Check if we reached the end of the collection
if (scrollResult.hasNextPageOffset()) {
scrollBuilder.setOffset(scrollResult.getNextPageOffset());
} else {
reachedEnd = true;
}
}
PointId? lastOffset = null;
uint limit = 100; // Number of points to read in each batch
bool reachedEnd = false;
while (!reachedEnd)
{
// Get the next batch of points from the old collection
var scrollResult = await client.ScrollAsync(
collectionName: OLD_COLLECTION,
limit: limit,
offset: lastOffset,
// Include payloads in the response, as we need them to re-embed the vectors
payloadSelector: true,
// We don't need the old vectors, so let's save on the bandwidth
vectorsSelector: false
);
var records = scrollResult.Result;
lastOffset = scrollResult.NextPageOffset;
// Re-embed the points using the new model
var points = new List<PointStruct>();
foreach (var record in records)
{
var text = record.Payload.ContainsKey("text")
? record.Payload["text"].StringValue
: "";
points.Add(new PointStruct
{
// Keep the original ID to ensure consistency
Id = record.Id,
// Use the new embedding model to encode the text from the payload,
// assuming that was the original source of the embedding
Vectors = new Document
{
Text = text,
Model = NEW_MODEL
},
// Keep the original payload
Payload = { record.Payload }
});
}
// Upsert the re-embedded points into the new collection
await client.UpsertAsync(
new()
{
CollectionName = NEW_COLLECTION,
Points = { points },
// Only insert the point if a point with this ID does not already exist.
UpdateMode = UpdateMode.InsertOnly
}
);
// Check if we reached the end of the collection
reachedEnd = (lastOffset == null);
}
var lastOffset *qdrant.PointId
batchSize := uint32(100) // Number of points to read in each batch
reachedEnd := false
for !reachedEnd {
// Get the next batch of points from the old collection
scrollResult, err := client.Scroll(context.Background(), &qdrant.ScrollPoints{
CollectionName: OLD_COLLECTION,
Limit: qdrant.PtrOf(batchSize),
Offset: lastOffset,
// Include payloads in the response, as we need them to re-embed the vectors
WithPayload: qdrant.NewWithPayload(true),
// We don't need the old vectors, so let's save on the bandwidth
WithVectors: qdrant.NewWithVectors(false),
})
records := scrollResult
// Re-embed the points using the new model
points := make([]*qdrant.PointStruct, len(records))
for idx, record := range records {
text := ""
if val, ok := record.Payload["text"]; ok {
text = val.GetStringValue()
}
points[idx] = &qdrant.PointStruct{
// Keep the original ID to ensure consistency
Id: record.Id,
// Use the new embedding model to encode the text from the payload,
// assuming that was the original source of the embedding
Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
Text: text,
Model: NEW_MODEL,
}),
// Keep the original payload
Payload: record.Payload,
}
}
// Upsert the re-embedded points into the new collection
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: NEW_COLLECTION,
Points: points,
// Only insert the point if a point with this ID does not already exist.
UpdateMode: qdrant.UpdateMode_InsertOnly.Enum(),
})
// Check if we reached the end of the collection
reachedEnd = (lastOffset == nil)
}
代码步骤分解:
- 数据通过 scroll(滚动)方式以每次 100 个点为一批从旧集合读取。
last_offset变量用于跟踪集合中的滚动位置。 - 对于每一批点,该过程使用新的 Embedding 模型重新对向量进行 Embedding。它假设用于 Embedding 的原始文本存储在 Payload 的
text键中。 - 利用重新 Embedding 后的向量,将其 Upsert 到新集合中,保留原始 ID 和 Payload。Upsert 操作使用 仅插入模式(insert-only mode),以确保仅在点在新集合中不存在时才插入。这可以防止覆盖来自常规更新服务的较新更新。
此类迁移过程可能需要一些时间,且偏移量(offset)应以持久化方式存储,以便在发生故障时恢复迁移进程。你可以使用数据库、文件或任何其他持久化存储来记录最后一个偏移量。话虽如此,由于条件 Upsert 不会覆盖新集合中的任何点,如果需要,你也可以放心地从头开始重新启动迁移过程。
第 4 步:更改搜索所使用的集合和 Embedding 模型
一旦迁移过程完成,旧集合中的所有点都已重新 Embedding 并存储在新集合中,你就可以进行后端应用的配置更改。你需要做出两个关键更改:
- 集合名称。将其从旧集合切换到新集合。如果你使用了 集合别名,只需切换别名指向新集合即可。
- Embedding 模型。将其从旧的 Embedding 模型切换到新的 Embedding 模型。
如果这些值在你的应用中是硬编码的,你需要直接在代码中更改它们并部署新版本的应用。例如,如果当前的搜索代码如下:
results = client.query_points(
collection_name=OLD_COLLECTION,
query=models.Document(text="my query", model=OLD_MODEL),
limit=10,
)
const results = await client.query(OLD_COLLECTION, {
query: {
text: "my query",
model: OLD_MODEL,
},
limit: 10,
});
let results = client
.query(
QueryPointsBuilder::new(old_collection)
.query(Query::new_nearest(Document::new("my query", old_model)))
.limit(10),
)
.await?;
QueryPoints oldRequest =
QueryPoints.newBuilder()
.setCollectionName(OLD_COLLECTION)
.setQuery(
nearest(
Document.newBuilder()
.setText("my query")
.setModel(OLD_MODEL)
.build()))
.setLimit(10)
.build();
var results = client.queryAsync(oldRequest).get();
var results = await client.QueryAsync(
collectionName: OLD_COLLECTION,
query: new Document
{
Text = "my query",
Model = OLD_MODEL
},
limit: 10
);
results, err := client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: OLD_COLLECTION,
Query: qdrant.NewQueryDocument(&qdrant.Document{
Text: "my query",
Model: OLD_MODEL,
}),
Limit: qdrant.PtrOf(uint64(10)),
})
你需要按以下方式进行修改:
results = client.query_points(
collection_name=NEW_COLLECTION,
query=models.Document(text="my query", model=NEW_MODEL),
limit=10,
)
const resultsNew = await client.query(NEW_COLLECTION, {
query: {
text: "my query",
model: NEW_MODEL,
},
limit: 10,
});
let results = client
.query(
QueryPointsBuilder::new(new_collection)
.query(Query::new_nearest(Document::new("my query", new_model)))
.limit(10),
)
.await?;
QueryPoints newRequest =
QueryPoints.newBuilder()
.setCollectionName(NEW_COLLECTION)
.setQuery(
nearest(
Document.newBuilder()
.setText("my query")
.setModel(NEW_MODEL)
.build()))
.setLimit(10)
.build();
results = client.queryAsync(newRequest).get();
results = await client.QueryAsync(
collectionName: NEW_COLLECTION,
query: new Document
{
Text = "my query",
Model = NEW_MODEL
},
limit: 10
);
results, err = client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: NEW_COLLECTION,
Query: qdrant.NewQueryDocument(&qdrant.Document{
Text: "my query",
Model: NEW_MODEL,
}),
Limit: qdrant.PtrOf(uint64(10)),
})
第 5 步:收尾
应用切换到新集合后,请禁用第 2 步中实施的双写模式。从现在起,应用应只写入新集合。
现在所有的搜索都将使用新的 Embedding。如果不再需要旧集合,你可以安全地将其删除。为了确保必要时可以回滚,请保留旧集合的快照。