在 Qdrant 中实现零停机迁移到新的 Embedding 模型

预计耗时:40 分钟难度:中等

在构建语义搜索应用时,你需要选择一个 Embedding 模型。随着时间的推移,你可能为了更好的质量或成本效益而需要切换到其他模型。如果你的应用处于生产环境,此过程必须实现零停机,以避免干扰用户。切换模型需要重新对集合中的所有向量进行 Embedding,这可能需要一些时间。如果你的数据不发生变化,你可以重新进行 Embedding 并切换到新模型。然而,在有频繁更新的系统中,停止搜索服务以进行重新 Embedding 是不可行的。

本教程将分步指导你完成迁移到新模型的过程,包括项目中需要做的修改。所有示例均使用 Python SDK,但同样原则也适用于其他语言。

解决方案

通过使用包含两个集合的蓝绿部署(Blue-Green Deployment),可以实现零停机切换 Embedding 模型。第一个集合包含旧的 Embedding,第二个集合用于存储新的 Embedding。迁移过程会将数据从旧集合复制到新集合,并使用新模型对向量进行重新 Embedding。在迁移期间,你可以继续搜索旧集合,同时将数据更新同步写入两个集合。一旦所有向量完成重新 Embedding,即可切换搜索至新集合。

Embedding model migration in blue-green deployment

蓝绿部署中的嵌入模型迁移

重新 Embedding 需要访问创建 Embedding 时使用的原始数据。这些数据可以来自主数据库,也可以存储在 Qdrant 点(point)的 Payload 中。本教程假设必要数据存储在 Payload 中。这种情况很常见,因为 Payload 通常包含用于生成 Embedding 的文本或其他数据。

本教程中概述的解决方案仅适用于 upsert(插入/更新)操作。如果你使用了删除或部分更新操作,则需要在迁移期间暂停这些操作,或者实现额外的逻辑来处理它们。

本教程假设你使用 Qdrant Cloud Inference 来生成向量 Embedding。如果你管理自己的 Embedding 基础设施,也可以应用相同的原则,但需要调整代码示例以使用你的 Embedding 服务。

第 1 步:创建新集合

第一步是在 Qdrant 中创建一个新集合,用于存储新的 Embedding,该集合在向量维度和相似度函数方面需与新模型兼容。

client.create_collection(
    collection_name=NEW_COLLECTION,
    vectors_config=(
        models.VectorParams(
            size=512,  # Size of the new embedding vectors
            distance=models.Distance.COSINE  # Similarity function for the new model
        )
    )
)
await client.createCollection(NEW_COLLECTION, {
    vectors: {
        size: 512, // Size of the new embedding vectors
        distance: "Cosine", // Similarity function for the new model
    },
});
client
    .create_collection(
        CreateCollectionBuilder::new(new_collection)
            .vectors_config(VectorParamsBuilder::new(512, Distance::Cosine)), // Size of the new embedding vectors
    )
    .await?;
client.createCollectionAsync(NEW_COLLECTION,
        VectorParams.newBuilder()
            .setSize(512) // Size of the new embedding vectors
            .setDistance(Distance.Cosine) // Similarity function for the new model
            .build()).get();
await client.CreateCollectionAsync(
	collectionName: NEW_COLLECTION,
	vectorsConfig: new VectorParams { Size = 512, Distance = Distance.Cosine }
);
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
	CollectionName: NEW_COLLECTION,
	VectorsConfig: qdrant.NewVectorsConfig(&qdrant.VectorParams{
		Size:     512, // Size of the new embedding vectors
		Distance: qdrant.Distance_Cosine,
	}),
})

现在也是考虑更改集合其他设置(如自定义分片、副本因子等)的好时机。切换模型可能是一个提高搜索性能的良好契机。

新创建的集合是空的,已准备好用于存储新的 Embedding。

第 2 步:启用双写(Dual Writes)

为了确保在迁移期间两个集合保持同步,你需要同时向两个集合写入任何更改。这样,任何新数据或对现有数据的更新都会在两个集合中得到反映。

理想情况下,Qdrant 中的数据是由从更新队列读取的更新服务进行更新的。该服务负责对文档进行 Embedding 并将其写入 Qdrant。它使用的代码类似于:

client.upsert(
    collection_name=OLD_COLLECTION,
    points=[
        models.PointStruct(
            id=1,
            vector=models.Document(
                text="Example document",
                model=OLD_MODEL,
            ),
            payload={"text": "Example document"}
        )
    ]
)
await client.upsert(OLD_COLLECTION, {
    points: [
        {
            id: 1,
            vector: {
                text: "Example document",
                model: OLD_MODEL,
            },
            payload: { text: "Example document" },
        },
    ],
});
client
    .upsert_points(UpsertPointsBuilder::new(
        old_collection,
        vec![PointStruct::new(
            1,
            Document::new("Example document", old_model),
            [("text", "Example document".into())],
        )],
    ))
    .await?;
client.upsertAsync(OLD_COLLECTION, List.of(
        PointStruct.newBuilder()
            .setId(id(1))
            .setVectors(
                vectors(
                    vector(
                        Document.newBuilder()
                            .setText("Example document")
                            .setModel(OLD_MODEL)
                            .build())))
            .putAllPayload(Map.of("text", value("Example document")))
            .build())).get();
await client.UpsertAsync(
	collectionName: OLD_COLLECTION,
	points: new List<PointStruct>
	{
		new()
		{
			Id = 1,
			Vectors = new Document
			{
				Text = "Example document",
				Model = OLD_MODEL
			},
			Payload = { ["text"] = "Example document" }
		}
	}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
	CollectionName: OLD_COLLECTION,
	Points: []*qdrant.PointStruct{
		{
			Id: qdrant.NewIDNum(1),
			Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
				Text:  "Example document",
				Model: OLD_MODEL,
			}),
			Payload: qdrant.NewValueMap(map[string]any{"text": "Example document"}),
		},
	},
})

要更新新集合,请部署第二个服务,与现有服务并行更新新集合。该服务使用新的 Embedding 模型对文档进行编码并将其写入新集合。

client.upsert(
    collection_name=NEW_COLLECTION,
    points=[
        models.PointStruct(
            id=1,
            # Use the new embedding model to encode the document
            vector=models.Document(
                text="Example document",
                model=NEW_MODEL,
            ),
            payload={"text": "Example document"}
        )
    ]
)
await client.upsert(NEW_COLLECTION, {
    points: [
        {
            id: 1,
            // Use the new embedding model to encode the document
            vector: {
                text: "Example document",
                model: NEW_MODEL,
            },
            payload: { text: "Example document" },
        },
    ],
});
client
    .upsert_points(UpsertPointsBuilder::new(
        new_collection,
        vec![PointStruct::new(
            1,
            // Use the new embedding model to encode the document
            Document::new("Example document", new_model),
            [("text", "Example document".into())],
        )],
    ))
    .await?;
client.upsertAsync(NEW_COLLECTION, List.of(
        PointStruct.newBuilder()
            .setId(id(1))
            // Use the new embedding model to encode the document
            .setVectors(
                vectors(
                    vector(
                        Document.newBuilder()
                            .setText("Example document")
                            .setModel(NEW_MODEL)
                            .build())))
            .putAllPayload(Map.of("text", value("Example document")))
            .build())).get();
await client.UpsertAsync(
	collectionName: NEW_COLLECTION,
	points: new List<PointStruct>
	{
		new()
		{
			Id = 1,
			// Use the new embedding model to encode the document
			Vectors = new Document
			{
				Text = "Example document",
				Model = NEW_MODEL
			},
			Payload = { ["text"] = "Example document" }
		}
	}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
	CollectionName: NEW_COLLECTION,
	Points: []*qdrant.PointStruct{
		{
			Id: qdrant.NewIDNum(1),
			// Use the new embedding model to encode the document
			Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
				Text:  "Example document",
				Model: NEW_MODEL,
			}),
			Payload: qdrant.NewValueMap(map[string]any{"text": "Example document"}),
		},
	},
})

一个良好的做法是始终确保两次操作都成功。任何错误都需要在客户端进行处理。你可以将错误存储在日志或“死信队列”中以便后续处理。瞬时错误可以在稍后重试。其他错误则需要进行相应的分析和解决。

如果你使用的是单体应用而不是更新服务,则需要在过渡期间修改应用代码,使其同时写入两个集合。在处理文档 Embedding 的代码逻辑中,你应该添加写入两个集合的逻辑。

请注意,本教程中概述的方法仅适用于 upsert 操作。例如,如果一个点在旧集合中存在但在新集合中尚未存在,delete 操作在新集合上会失败,并且该点随后可能会被迁移过程错误地添加。如果你使用以下方法修改集合中的点,则需要在迁移期间暂停这些操作或实现额外的逻辑来处理它们:

  • .delete - 从集合中删除指定的点
  • .update_vectors - 更新点上的指定向量
  • .delete_vectors - 从点中删除指定的向量
  • .set_payload - 为指定点设置 Payload 值
  • .overwrite_payload - 用新的 Payload 覆盖指定点的整个 Payload
  • .delete_payload - 为点删除指定的 Key Payload
  • .clear_payload - 移除指定点的整个 Payload
  • .batch_update_points - 对点进行批量更新,包括它们各自的向量和 Payload

请参阅你所使用的 SDK 文档,或 HTTP/gRPC 定义以获取精确的方法名称,因为它们可能因语言而异。

完成这些更改后,你将处于双写模式,任何更改都会被写入旧集合和新集合。这使你能够在迁移过程中保持两个集合的同步。

第 3 步:将现有数据点迁移到新集合

现在你处于双写模式,是时候将现有数据点从旧集合迁移到新集合了。这可以在与常规 Upsert 服务并行运行的独立进程中完成。

迁移过程会从旧集合中读取点,使用新模型重新进行 Embedding,并将它们写入新集合,确保不会覆盖由更新服务插入的现有点。以下是此类迁移过程的代码示例:

last_offset = None
batch_size = 100  # Number of points to read in each batch
reached_end = False

while not reached_end:
    # Get the next batch of points from the old collection
    records, last_offset = client.scroll(
        collection_name=OLD_COLLECTION,
        limit=batch_size,
        offset=last_offset,
        # Include payloads in the response, as we need them to re-embed the vectors
        with_payload=True,
        # We don't need the old vectors, so let's save on the bandwidth
        with_vectors=False,
    )

    # Re-embed the points using the new model
    points = [
        models.PointStruct(
            # Keep the original ID to ensure consistency
            id=record.id,
            # Use the new embedding model to encode the text from the payload,
            # assuming that was the original source of the embedding
            vector=models.Document(
                text=(record.payload or {}).get("text", ""),
                model=NEW_MODEL,
            ),
            # Keep the original payload
            payload=record.payload
        )
        for record in records
    ]

    # Upsert the re-embedded points into the new collection
    client.upsert(
        collection_name=NEW_COLLECTION,
        points=points,
        # Only insert the point if a point with this ID does not already exist.
        update_mode=models.UpdateMode.INSERT_ONLY
    )

    # Check if we reached the end of the collection
    reached_end = (last_offset == None)
let lastOffset: number | string | undefined = undefined;
const batchSize = 100; // Number of points to read in each batch
let reachedEnd = false;

while (!reachedEnd) {
    // Get the next batch of points from the old collection
    const scrollResult = await client.scroll(OLD_COLLECTION, {
        limit: batchSize,
        offset: lastOffset,
        // Include payloads in the response, as we need them to re-embed the vectors
        with_payload: true,
        // We don't need the old vectors, so let's save on the bandwidth
        with_vector: false,
    });

    const records = scrollResult.points;
    lastOffset = scrollResult.next_page_offset as number | string | undefined;

    // Re-embed the points using the new model
    const points = records.map((record) => ({
        // Keep the original ID to ensure consistency
        id: record.id,
        // Use the new embedding model to encode the text from the payload,
        // assuming that was the original source of the embedding
        vector: {
            text: ((record.payload?.text as string) ?? ""),
            model: NEW_MODEL,
        },
        // Keep the original payload
        payload: record.payload,
    }));

    // Upsert the re-embedded points into the new collection
    await client.upsert(NEW_COLLECTION, {
        points,
        // Only insert the point if a point with this ID does not already exist.
        update_mode: "insert_only" as const,
    });

    // Check if we reached the end of the collection
    reachedEnd = lastOffset == null;
}
let mut last_offset = None;
let batch_size = 100; // Number of points to read in each batch

loop {
    // Get the next batch of points from the old collection
    let mut scroll_builder = ScrollPointsBuilder::new(old_collection)
        .limit(batch_size)
        // Include payloads in the response, as we need them to re-embed the vectors
        .with_payload(true)
        // We don't need the old vectors, so let's save on the bandwidth
        .with_vectors(false);

    if let Some(offset) = last_offset {
        scroll_builder = scroll_builder.offset(offset);
    }

    let scroll_result = client.scroll(scroll_builder).await?;

    let records = scroll_result.result;
    last_offset = scroll_result.next_page_offset;

    // Re-embed the points using the new model
    let points: Vec<PointStruct> = records
        .iter()
        .map(|record| {
            PointStruct::new(
                // Keep the original ID to ensure consistency
                record.id.clone().unwrap(),
                // Use the new embedding model to encode the text from the payload,
                // assuming that was the original source of the embedding
                Document::new(
                    record.payload.get("text")
                        .and_then(|v| v.as_str())
                        .map_or("", |v| v),
                    new_model,
                ),
                // Keep the original payload
                record.payload.clone(),
            )
        })
        .collect();

    // Upsert the re-embedded points into the new collection
    client
        .upsert_points(
            // Only insert the point if a point with this ID does not already exist.
            UpsertPointsBuilder::new(new_collection, points)
                .update_mode(UpdateMode::InsertOnly),
        )
        .await?;

    // Check if we reached the end of the collection
    if last_offset.is_none() {
        break;
    }
}
int batchSize = 100; // Number of points to read in each batch
boolean reachedEnd = false;

// Get the next batch of points from the old collection
var scrollBuilder = ScrollPoints.newBuilder()
    .setCollectionName(OLD_COLLECTION)
    .setLimit(batchSize)
    // Include payloads in the response, as we need them to re-embed the vectors
    .setWithPayload(WithPayloadSelectorFactory.enable(true))
    // We don't need the old vectors, so let's save on the bandwidth
    .setWithVectors(WithVectorsSelectorFactory.enable(false));

while (!reachedEnd) {
    var scrollResult = client.scrollAsync(scrollBuilder.build()).get();

    var records = scrollResult.getResultList();

    // Re-embed the points using the new model
    List<PointStruct> points = new ArrayList<>();
    for (var record : records) {
        String text = record.getPayloadMap().containsKey("text")
            ? record.getPayloadMap().get("text").getStringValue()
            : "";

        points.add(
            PointStruct.newBuilder()
                // Keep the original ID to ensure consistency
                .setId(record.getId())
                // Use the new embedding model to encode the text from the payload,
                // assuming that was the original source of the embedding
                .setVectors(
                    vectors(
                        vector(
                            Document.newBuilder()
                                .setText(text)
                                .setModel(NEW_MODEL)
                                .build())))
                // Keep the original payload
                .putAllPayload(record.getPayloadMap())
                .build());
    }

    // Upsert the re-embedded points into the new collection
    client.upsertAsync(
        UpsertPoints.newBuilder()
            .setCollectionName(NEW_COLLECTION)
            .addAllPoints(points)
            // Only insert the point if a point with this ID does not already exist.
            .setUpdateMode(UpdateMode.InsertOnly)
            .build()).get();

    // Check if we reached the end of the collection
    if (scrollResult.hasNextPageOffset()) {
        scrollBuilder.setOffset(scrollResult.getNextPageOffset());
    } else {
        reachedEnd = true;
    }
}
PointId? lastOffset = null;
uint limit = 100; // Number of points to read in each batch
bool reachedEnd = false;

while (!reachedEnd)
{
	// Get the next batch of points from the old collection
	var scrollResult = await client.ScrollAsync(
		collectionName: OLD_COLLECTION,
		limit: limit,
		offset: lastOffset,
		// Include payloads in the response, as we need them to re-embed the vectors
		payloadSelector: true,
		// We don't need the old vectors, so let's save on the bandwidth
		vectorsSelector: false
	);

	var records = scrollResult.Result;
	lastOffset = scrollResult.NextPageOffset;

	// Re-embed the points using the new model
	var points = new List<PointStruct>();
	foreach (var record in records)
	{
		var text = record.Payload.ContainsKey("text")
			? record.Payload["text"].StringValue
			: "";

		points.Add(new PointStruct
		{
			// Keep the original ID to ensure consistency
			Id = record.Id,
			// Use the new embedding model to encode the text from the payload,
			// assuming that was the original source of the embedding
			Vectors = new Document
			{
				Text = text,
				Model = NEW_MODEL
			},
			// Keep the original payload
			Payload = { record.Payload }
		});
	}

	// Upsert the re-embedded points into the new collection
	await client.UpsertAsync(
		new()
		{
			CollectionName = NEW_COLLECTION,
			Points = { points },
			// Only insert the point if a point with this ID does not already exist.
			UpdateMode = UpdateMode.InsertOnly
		}
	);

	// Check if we reached the end of the collection
	reachedEnd = (lastOffset == null);
}
var lastOffset *qdrant.PointId
batchSize := uint32(100) // Number of points to read in each batch
reachedEnd := false

for !reachedEnd {
	// Get the next batch of points from the old collection
	scrollResult, err := client.Scroll(context.Background(), &qdrant.ScrollPoints{
		CollectionName: OLD_COLLECTION,
		Limit:          qdrant.PtrOf(batchSize),
		Offset:         lastOffset,
		// Include payloads in the response, as we need them to re-embed the vectors
		WithPayload: qdrant.NewWithPayload(true),
		// We don't need the old vectors, so let's save on the bandwidth
		WithVectors: qdrant.NewWithVectors(false),
	})

	records := scrollResult

	// Re-embed the points using the new model
	points := make([]*qdrant.PointStruct, len(records))
	for idx, record := range records {
		text := ""
		if val, ok := record.Payload["text"]; ok {
			text = val.GetStringValue()
		}

		points[idx] = &qdrant.PointStruct{
			// Keep the original ID to ensure consistency
			Id: record.Id,
			// Use the new embedding model to encode the text from the payload,
			// assuming that was the original source of the embedding
			Vectors: qdrant.NewVectorsDocument(&qdrant.Document{
				Text:  text,
				Model: NEW_MODEL,
			}),
			// Keep the original payload
			Payload: record.Payload,
		}
	}

	// Upsert the re-embedded points into the new collection
	client.Upsert(context.Background(), &qdrant.UpsertPoints{
		CollectionName: NEW_COLLECTION,
		Points:         points,
		// Only insert the point if a point with this ID does not already exist.
		UpdateMode: qdrant.UpdateMode_InsertOnly.Enum(),
	})

	// Check if we reached the end of the collection
	reachedEnd = (lastOffset == nil)
}

代码步骤分解:

  • 数据通过 scroll(滚动)方式以每次 100 个点为一批从旧集合读取。last_offset 变量用于跟踪集合中的滚动位置。
  • 对于每一批点,该过程使用新的 Embedding 模型重新对向量进行 Embedding。它假设用于 Embedding 的原始文本存储在 Payload 的 text 键中。
  • 利用重新 Embedding 后的向量,将其 Upsert 到新集合中,保留原始 ID 和 Payload。Upsert 操作使用 仅插入模式(insert-only mode),以确保仅在点在新集合中不存在时才插入。这可以防止覆盖来自常规更新服务的较新更新。

此类迁移过程可能需要一些时间,且偏移量(offset)应以持久化方式存储,以便在发生故障时恢复迁移进程。你可以使用数据库、文件或任何其他持久化存储来记录最后一个偏移量。话虽如此,由于条件 Upsert 不会覆盖新集合中的任何点,如果需要,你也可以放心地从头开始重新启动迁移过程。

第 4 步:更改搜索所使用的集合和 Embedding 模型

一旦迁移过程完成,旧集合中的所有点都已重新 Embedding 并存储在新集合中,你就可以进行后端应用的配置更改。你需要做出两个关键更改:

  1. 集合名称。将其从旧集合切换到新集合。如果你使用了 集合别名,只需切换别名指向新集合即可。
  2. Embedding 模型。将其从旧的 Embedding 模型切换到新的 Embedding 模型。

如果这些值在你的应用中是硬编码的,你需要直接在代码中更改它们并部署新版本的应用。例如,如果当前的搜索代码如下:

results = client.query_points(
    collection_name=OLD_COLLECTION,
    query=models.Document(text="my query", model=OLD_MODEL),
    limit=10,
)
const results = await client.query(OLD_COLLECTION, {
    query: {
        text: "my query",
        model: OLD_MODEL,
    },
    limit: 10,
});
let results = client
    .query(
        QueryPointsBuilder::new(old_collection)
            .query(Query::new_nearest(Document::new("my query", old_model)))
            .limit(10),
    )
    .await?;
QueryPoints oldRequest =
    QueryPoints.newBuilder()
        .setCollectionName(OLD_COLLECTION)
        .setQuery(
            nearest(
                Document.newBuilder()
                    .setText("my query")
                    .setModel(OLD_MODEL)
                    .build()))
        .setLimit(10)
        .build();

var results = client.queryAsync(oldRequest).get();
var results = await client.QueryAsync(
	collectionName: OLD_COLLECTION,
	query: new Document
	{
		Text = "my query",
		Model = OLD_MODEL
	},
	limit: 10
);
results, err := client.Query(context.Background(), &qdrant.QueryPoints{
	CollectionName: OLD_COLLECTION,
	Query: qdrant.NewQueryDocument(&qdrant.Document{
		Text:  "my query",
		Model: OLD_MODEL,
	}),
	Limit: qdrant.PtrOf(uint64(10)),
})

你需要按以下方式进行修改:

results = client.query_points(
    collection_name=NEW_COLLECTION,
    query=models.Document(text="my query", model=NEW_MODEL),
    limit=10,
)
const resultsNew = await client.query(NEW_COLLECTION, {
    query: {
        text: "my query",
        model: NEW_MODEL,
    },
    limit: 10,
});
let results = client
    .query(
        QueryPointsBuilder::new(new_collection)
            .query(Query::new_nearest(Document::new("my query", new_model)))
            .limit(10),
    )
    .await?;
QueryPoints newRequest =
    QueryPoints.newBuilder()
        .setCollectionName(NEW_COLLECTION)
        .setQuery(
            nearest(
                Document.newBuilder()
                    .setText("my query")
                    .setModel(NEW_MODEL)
                    .build()))
        .setLimit(10)
        .build();

results = client.queryAsync(newRequest).get();
results = await client.QueryAsync(
	collectionName: NEW_COLLECTION,
	query: new Document
	{
		Text = "my query",
		Model = NEW_MODEL
	},
	limit: 10
);
results, err = client.Query(context.Background(), &qdrant.QueryPoints{
	CollectionName: NEW_COLLECTION,
	Query: qdrant.NewQueryDocument(&qdrant.Document{
		Text:  "my query",
		Model: NEW_MODEL,
	}),
	Limit: qdrant.PtrOf(uint64(10)),
})

第 5 步:收尾

应用切换到新集合后,请禁用第 2 步中实施的双写模式。从现在起,应用应只写入新集合。

现在所有的搜索都将使用新的 Embedding。如果不再需要旧集合,你可以安全地将其删除。为了确保必要时可以回滚,请保留旧集合的快照。

此页面有用吗?

感谢您的反馈!🙏

很遗憾听到这个消息。😔 你可以在 GitHub 上 编辑 此页面,或 创建 一个 GitHub Issue。