数据同步模式
本页面介绍了在 Qdrant 边缘分片(Edge Shards)与 Qdrant 服务器集合(Collections)之间同步数据的模式。有关实现这些模式的实用端到端指南,请参阅 Qdrant 边缘同步指南。
从现有的 Qdrant 集合初始化边缘分片
您可能不想从空的边缘分片开始,而是希望使用服务器端集合中预先存在的数据对其进行初始化。您可以通过恢复服务器端集合中某个分片的快照来实现这一点。

在创建用于同步的快照时,请在快照 URL 中指定适用的服务器端分片 ID。这允许单个集合服务于多个独立用户或设备,每个用户或设备拥有各自的边缘分片。有关 Qdrant 分片策略的更多信息,请阅读 分层多租户文档。
首先,构建一个快照 URL
COLLECTION_NAME="edge-collection"
snapshot_url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
const COLLECTION_NAME: &str = "edge-collection";
let snapshot_url = format!(
"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
);
请注意,此示例使用的是分片 ID 0。
使用该快照 URL,您可以将快照下载到本地磁盘,并利用其中的数据来初始化一个新的边缘分片。
from pathlib import Path
from qdrant_edge import EdgeShard
import requests
import shutil
import tempfile
SHARD_DIRECTORY = "./qdrant-edge-directory"
data_dir = Path(SHARD_DIRECTORY)
with tempfile.TemporaryDirectory(dir=data_dir.parent) as restore_dir:
snapshot_path = Path(restore_dir) / "shard.snapshot"
with requests.get(snapshot_url, headers={"api-key": QDRANT_API_KEY}, stream=True) as r:
r.raise_for_status()
with open(snapshot_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if data_dir.exists():
shutil.rmtree(data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
EdgeShard.unpack_snapshot(str(snapshot_path), str(data_dir))
edge_shard = EdgeShard.load(SHARD_DIRECTORY)
const SHARD_DIRECTORY: &str = "./qdrant-edge-directory";
let data_dir = Path::new(SHARD_DIRECTORY);
let restore_dir = tempfile::Builder::new()
.tempdir_in(data_dir.parent().unwrap_or(Path::new(".")))?;
let snapshot_path = restore_dir.path().join("shard.snapshot");
let mut bytes = Vec::new();
std::io::copy(
&mut ureq::get(&snapshot_url)
.header("api-key", QDRANT_API_KEY)
.call()?
.into_body()
.into_reader(),
&mut bytes,
)?;
fs_err::write(&snapshot_path, &bytes)?;
if data_dir.exists() {
fs_err::remove_dir_all(data_dir)?;
}
fs_err::create_dir_all(data_dir)?;
EdgeShard::unpack_snapshot(&snapshot_path, data_dir)?;
let edge_shard = EdgeShard::load(data_dir, None)?;
此代码首先将快照下载到临时目录。接下来,EdgeShard.unpack_snapshot 将下载的快照解压到数据目录中,并使用解压后的数据和配置初始化 EdgeShard。
edge_shard 将使用与创建快照的源集合相同的配置和文件结构,包括向量索引和载荷(payload)索引。
使用服务器端变更更新 Qdrant Edge
为了使边缘分片与服务器集合中的新数据保持同步,您可以定期下载并应用快照。每次都恢复完整快照会造成不必要的开销。相反,您可以使用增量快照来恢复自上次快照以来的变更。基于描述所有段(segments)和元数据的边缘分片清单,增量快照仅包含已变更的段。
manifest = edge_shard.snapshot_manifest()
url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot/partial/create"
with tempfile.TemporaryDirectory(dir=data_dir) as temp_dir:
partial_snapshot_path = Path(temp_dir) / "partial.snapshot"
response = requests.post(url, headers={"api-key": QDRANT_API_KEY}, json=manifest, stream=True)
response.raise_for_status()
with open(partial_snapshot_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
edge_shard.update_from_snapshot(str(partial_snapshot_path))
let current_manifest = edge_shard.snapshot_manifest()?;
let update_url = format!(
"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot\
/partial/create"
);
let temp_dir = tempfile::tempdir_in(data_dir)?;
let partial_snapshot_path = temp_dir.path().join("partial.snapshot");
let mut bytes = Vec::new();
std::io::copy(
&mut ureq::post(&update_url)
.header("api-key", QDRANT_API_KEY)
.send_json(¤t_manifest)?
.into_body()
.into_reader(),
&mut bytes,
)?;
fs_err::write(&partial_snapshot_path, &bytes)?;
let unpacked_dir = tempfile::tempdir_in(data_dir)?;
EdgeShard::unpack_snapshot(&partial_snapshot_path, unpacked_dir.path())?;
let snapshot_manifest = SnapshotManifest::load_from_snapshot(
unpacked_dir.path(),
None,
)?;
let edge_shard = EdgeShard::recover_partial_snapshot(
data_dir,
¤t_manifest,
unpacked_dir.path(),
&snapshot_manifest,
)?;
从边缘分片更新服务器集合
要将数据从边缘分片同步到服务器集合,请在您的应用程序中实现双写(dual-write)机制。当您在边缘分片中添加或更新点时,请同时使用 Qdrant 客户端将其存储到服务器集合中。
除了直接写入服务器集合外,您可能还需要设置一个后台作业或消息队列,以异步处理同步操作。由于应用程序的网络连接可能并不总是稳定,因此将更新排队可以确保在网络恢复连接时数据最终能够完成同步。
首先,进行初始化
- 从零开始或从服务器端快照初始化边缘分片
- 建立 Qdrant 服务器连接。
详细信息
初始化边缘分片
from pathlib import Path
from qdrant_edge import (
Distance,
EdgeConfig,
EdgeVectorParams,
)
SHARD_DIRECTORY = "./qdrant-edge-directory"
VECTOR_NAME="my-vector"
VECTOR_DIMENSION=4
Path(SHARD_DIRECTORY).mkdir(parents=True, exist_ok=True)
config = EdgeConfig(
vectors={
VECTOR_NAME: EdgeVectorParams(
size=VECTOR_DIMENSION,
distance=Distance.Cosine,
)
}
)
edge_shard = EdgeShard.create(SHARD_DIRECTORY, config)
const VECTOR_DIMENSION: usize = 4;
const VECTOR_NAME: &str = "my-vector";
fs_err::create_dir_all(SHARD_DIRECTORY)?;
let config = EdgeConfig {
on_disk_payload: true,
vectors: HashMap::from([(
VECTOR_NAME.to_string(),
EdgeVectorParams {
size: VECTOR_DIMENSION,
distance: qdrant_edge::Distance::Cosine,
on_disk: Some(true),
quantization_config: None,
multivector_config: None,
datatype: None,
hnsw_config: None,
},
)]),
sparse_vectors: HashMap::new(),
hnsw_config: Default::default(),
quantization_config: None,
optimizers: Default::default(),
};
let edge_shard = EdgeShard::new(
Path::new(SHARD_DIRECTORY),
config,
)?;
初始化到服务器的 Qdrant 客户端连接,并在目标集合不存在时创建它
from qdrant_client import QdrantClient, models
server_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
COLLECTION_NAME="edge-collection"
if not server_client.collection_exists(collection_name=COLLECTION_NAME):
server_client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={VECTOR_NAME: models.VectorParams(size=VECTOR_DIMENSION, distance=models.Distance.COSINE)}
)
let server_client = Qdrant::from_url(QDRANT_URL)
.api_key(QDRANT_API_KEY)
.build()?;
if !server_client.collection_exists(COLLECTION_NAME).await? {
server_client
.create_collection(
CreateCollectionBuilder::new(COLLECTION_NAME).vectors_config(
VectorParamsBuilder::new(
VECTOR_DIMENSION as u64,
Distance::Cosine,
),
),
)
.await?;
}
接下来,实例化用于保存需要与服务器同步的点的队列
from queue import Empty, Queue
# This is in-memory queue
# For production use cases consider persisting changes
upload_queue: Queue[models.PointStruct] = Queue()
// This is an in-memory queue.
// For production use cases consider persisting changes.
let mut upload_queue: std::collections::VecDeque<PointStruct> =
std::collections::VecDeque::new();
在边缘分片中添加或更新点时,同时将该点加入同步队列。
from qdrant_edge import ( Point, UpdateOperation )
from qdrant_client import models
id=1
vector=[0.1, 0.2, 0.3, 0.4]
payload={"color": "red"}
point = Point(
id=id,
vector={VECTOR_NAME: vector},
payload=payload
)
edge_shard.update(UpdateOperation.upsert_points([point]))
rest_point = models.PointStruct(id=id, vector={VECTOR_NAME: vector}, payload=payload)
upload_queue.put(rest_point)
let id = 1u64;
let vector = vec![0.1f32, 0.2, 0.3, 0.4];
let payload = json!({"color": "red"});
let edge_points: Vec<PointStructPersisted> = vec![
EdgePoint::new(
PointId::NumId(id),
Vectors::new_named([(VECTOR_NAME, vector.clone())]),
payload.clone(),
)
.into(),
];
edge_shard.update(UpdateOperation::PointOperation(
PointOperations::UpsertPoints(
PointInsertOperations::PointsList(edge_points),
),
))?;
let server_point = PointStruct::new(
id,
HashMap::from([(VECTOR_NAME.to_string(), vector)]),
payload.as_object().cloned().unwrap_or_default(),
);
upload_queue.push_back(server_point);
后台工作进程可以处理上传队列并将点同步到服务器集合。此示例一次最多批量上传 10 个点
BATCH_SIZE = 10
points_to_upload: list[models.PointStruct] = []
while len(points_to_upload) < BATCH_SIZE:
try:
points_to_upload.append(upload_queue.get_nowait())
except Empty:
break
if points_to_upload:
server_client.upsert(
collection_name=COLLECTION_NAME, points=points_to_upload
)
const BATCH_SIZE: usize = 10;
let points_to_upload: Vec<PointStruct> = upload_queue
.drain(..BATCH_SIZE.min(upload_queue.len()))
.collect();
if !points_to_upload.is_empty() {
server_client
.upsert_points(UpsertPointsBuilder::new(
COLLECTION_NAME,
points_to_upload,
))
.await?;
}
请确保妥善处理错误和重试逻辑,以应对网络问题或服务器不可用的情况。