将 Qdrant Edge 与服务器同步
Qdrant Edge 可以与外部 Qdrant 服务器上的集合进行同步,以支持以下应用场景:
- 卸载索引 (Offload indexing):索引是一项计算密集型操作。通过将 Edge Shard(边缘分片)与服务器集合同步,您可以将索引过程卸载到性能更强大的服务器实例上。索引后的数据随后可以同步回 Edge Shard。
- 备份与恢复 (Back up and Restore):定期将 Edge Shard 数据备份到中央 Qdrant 实例,以防数据丢失。如果发生硬件故障或数据损坏,您可以从中央实例恢复数据。
- 数据聚合 (Data Aggregation):从部署在不同地点的多个 Edge Shard 收集数据,并将其聚合到中央 Qdrant 实例中,以便进行综合分析和报告。
- 设备间同步 (Synchronization between devices):通过将多个边缘设备的 Edge Shard 与中央 Qdrant 实例同步,保持跨设备的数据一致性。
将 Qdrant Edge 与服务器同步
为了同时支持本地更新和来自中央服务器的更新,建议实现一种包含两个 Edge Shard 的架构:
- 一个可变 (mutable) 的 Edge Shard,用于处理本地数据更新。
- 一个不可变 (immutable) 的 Edge Shard,通过部分快照 (partial snapshots) 镜像服务器上集合的分片。
在查询数据时,合并两个 Edge Shard 的结果以提供统一视图。这样,本地添加的新点即可与从服务器同步的数据一起进行搜索。

实现双写机制(将数据同时写入可变 Edge Shard 和服务器集合)可确保数据在服务器上建立索引并同步回不可变 Edge Shard,从而优化搜索性能。
有关本指南中所述模式的 Python 实现示例,请参考 Qdrant Edge Demo GitHub 仓库。
1. 初始化可变 Edge Shard
可变 Edge Shard 将管理本地数据更新。它可以从头开始初始化,详情请参见 Qdrant Edge 快速入门指南。
from pathlib import Path
from qdrant_edge import (
Distance,
EdgeConfig,
EdgeShard,
EdgeVectorParams,
)
MUTABLE_SHARD_DIR = "./qdrant-edge-directory/mutable"
Path(MUTABLE_SHARD_DIR).mkdir(parents=True, exist_ok=True)
VECTOR_NAME="my-vector"
VECTOR_DIMENSION=4
config = EdgeConfig(
vectors={
VECTOR_NAME: EdgeVectorParams(
size=VECTOR_DIMENSION,
distance=Distance.Cosine,
)
}
)
mutable_shard = EdgeShard.create(MUTABLE_SHARD_DIR, config)
const MUTABLE_SHARD_DIR: &str = "./qdrant-edge-directory/mutable";
const VECTOR_DIMENSION: usize = 4;
const VECTOR_NAME: &str = "my-vector";
fs_err::create_dir_all(MUTABLE_SHARD_DIR)?;
let config = EdgeConfig {
on_disk_payload: true,
vectors: HashMap::from([(
VECTOR_NAME.to_string(),
EdgeVectorParams {
size: VECTOR_DIMENSION,
distance: Distance::Cosine,
on_disk: Some(true),
quantization_config: None,
multivector_config: None,
datatype: None,
hnsw_config: None,
},
)]),
sparse_vectors: HashMap::new(),
hnsw_config: Default::default(),
quantization_config: None,
optimizers: Default::default(),
};
let mutable_shard = EdgeShard::new(
Path::new(MUTABLE_SHARD_DIR),
config,
)?;
2. 从服务器快照初始化不可变 Edge Shard
接下来,根据 从现有 Qdrant 集合初始化 Edge Shard 中的说明,从服务器上的快照创建不可变 Edge Shard。
import requests
import tempfile
import shutil
COLLECTION_NAME="edge-collection"
snapshot_url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
IMMUTABLE_SHARD_DIR = "./qdrant-edge-directory/mutable"
data_dir = Path(IMMUTABLE_SHARD_DIR)
with tempfile.TemporaryDirectory(dir=data_dir.parent) as restore_dir:
snapshot_path = Path(restore_dir) / "shard.snapshot"
with requests.get(snapshot_url, headers={"api-key": QDRANT_API_KEY}, stream=True) as r:
r.raise_for_status()
with open(snapshot_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
immutable_shard = None
if data_dir.exists():
shutil.rmtree(data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
EdgeShard.unpack_snapshot(str(snapshot_path), str(data_dir))
immutable_shard = EdgeShard.load(IMMUTABLE_SHARD_DIR)
const COLLECTION_NAME: &str = "edge-collection";
let snapshot_url = format!(
"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
);
const IMMUTABLE_SHARD_DIR: &str = "./qdrant-edge-directory/immutable";
let data_dir = Path::new(IMMUTABLE_SHARD_DIR);
let restore_dir = tempfile::Builder::new()
.tempdir_in(data_dir.parent().unwrap_or(Path::new(".")))?;
let snapshot_path = restore_dir.path().join("shard.snapshot");
let mut bytes = Vec::new();
std::io::copy(
&mut ureq::get(&snapshot_url)
.header("api-key", QDRANT_API_KEY)
.call()?
.into_body()
.into_reader(),
&mut bytes,
)?;
fs_err::write(&snapshot_path, &bytes)?;
if data_dir.exists() {
fs_err::remove_dir_all(data_dir)?;
}
fs_err::create_dir_all(data_dir)?;
EdgeShard::unpack_snapshot(&snapshot_path, data_dir)?;
let immutable_shard = EdgeShard::load(data_dir, None)?;
3. 实现双写机制
在两个 Edge Shard 都初始化完成后,您可以按照 从 Edge Shard 更新服务器集合 中的说明,在应用程序中实现双写机制。
首先,实例化一个队列来保存需要写入服务器的待处理更新。
from queue import Empty, Queue
# This is in-memory queue
# For production use cases consider persisting changes
upload_queue: Queue[models.PointStruct] = Queue()
// This is an in-memory queue.
// For production use cases consider persisting changes.
let mut upload_queue: std::collections::VecDeque<PointStruct> =
std::collections::VecDeque::new();
当添加或更新点时,将其写入可变 Edge Shard,并将其放入队列以便写入服务器集合。
from qdrant_edge import ( Point, UpdateOperation )
from qdrant_client import models
import time
SYNC_TIMESTAMP_KEY="timestamp"
id=2
vector=[0.4, 0.3, 0.2, 0.1]
payload={
"color": "green",
SYNC_TIMESTAMP_KEY: time.time()
}
point = Point(
id=id,
vector={VECTOR_NAME: vector},
payload=payload
)
mutable_shard.update(UpdateOperation.upsert_points([point]))
rest_point = models.PointStruct(id=id, vector={VECTOR_NAME: vector}, payload=payload)
upload_queue.put(rest_point)
const SYNC_TIMESTAMP_KEY: &str = "timestamp";
let id = 2u64;
let vector = vec![0.4f32, 0.3, 0.2, 0.1];
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let payload = json!({
"color": "green",
SYNC_TIMESTAMP_KEY: timestamp,
});
let edge_points: Vec<PointStructPersisted> = vec![
EdgePoint::new(
PointId::NumId(id),
Vectors::new_named([(VECTOR_NAME, vector.clone())]),
payload.clone(),
)
.into(),
];
mutable_shard.update(UpdateOperation::PointOperation(
PointOperations::UpsertPoints(
PointInsertOperations::PointsList(edge_points),
),
))?;
let rest_point = PointStruct::new(
id,
HashMap::from([(VECTOR_NAME.to_string(), vector)]),
payload.as_object().cloned().unwrap_or_default(),
);
upload_queue.push_back(rest_point);
每个点的有效载荷 (payload) 应包含一个时间戳字段(本例中为 SYNC_TIMESTAMP_KEY),用于记录该点被插入(upsert)的时间。此时间戳用于在不可变 Edge Shard 与服务器同步时对数据进行去重。
后台工作线程可以定期处理上传队列并将更新写入服务器集合。这确保了服务器集合始终与可变 Edge Shard 的本地更改保持同步。
BATCH_SIZE = 10
points_to_upload: list[models.PointStruct] = []
while len(points_to_upload) < BATCH_SIZE:
try:
points_to_upload.append(upload_queue.get_nowait())
except Empty:
break
if points_to_upload:
server_client.upsert(
collection_name=COLLECTION_NAME, points=points_to_upload
)
const BATCH_SIZE: usize = 10;
let points_to_upload: Vec<PointStruct> = upload_queue
.drain(..BATCH_SIZE.min(upload_queue.len()))
.collect();
if !points_to_upload.is_empty() {
server_client
.upsert_points(UpsertPointsBuilder::new(
COLLECTION_NAME,
points_to_upload,
))
.await?;
}
4. 定期更新不可变 Edge Shard
您可以按照 使用服务器端更改更新 Qdrant Edge 中的说明,使用部分快照定期更新不可变 Edge Shard。
在恢复快照时,您可能需要暂停并缓冲可变 Edge Shard 上正在进行的数据更新。在获取快照之前,请确保所有排队的数据都已写入服务器。恢复完成后,即可恢复正常操作。有关 Python 实现示例,请参考 Qdrant Edge Demo GitHub 仓库。
import time
manifest = immutable_shard.snapshot_manifest()
url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot/partial/create"
sync_timestamp = time.time()
with tempfile.TemporaryDirectory(dir=data_dir) as temp_dir:
partial_snapshot_path = Path(temp_dir) / "partial.snapshot"
response = requests.post(url, headers={"api-key": QDRANT_API_KEY}, json=manifest, stream=True)
response.raise_for_status()
with open(partial_snapshot_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
immutable_shard.update_from_snapshot(str(partial_snapshot_path))
let sync_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let current_manifest = immutable_shard.snapshot_manifest()?;
let update_url = format!(
"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot\
/partial/create"
);
let temp_dir = tempfile::tempdir_in(data_dir)?;
let partial_snapshot_path = temp_dir.path().join("partial.snapshot");
let mut bytes = Vec::new();
std::io::copy(
&mut ureq::post(&update_url)
.header("api-key", QDRANT_API_KEY)
.send_json(¤t_manifest)?
.into_body()
.into_reader(),
&mut bytes,
)?;
fs_err::write(&partial_snapshot_path, &bytes)?;
let unpacked_dir = tempfile::tempdir_in(data_dir)?;
EdgeShard::unpack_snapshot(&partial_snapshot_path, unpacked_dir.path())?;
let snapshot_manifest = SnapshotManifest::load_from_snapshot(
unpacked_dir.path(),
None,
)?;
let immutable_shard = EdgeShard::recover_partial_snapshot(
data_dir,
¤t_manifest,
unpacked_dir.path(),
&snapshot_manifest,
)?;
此示例在创建部分快照时记录了一个 sync_timestamp。所有在此时戳之前添加到可变 Edge Shard 的点现在都已恢复到不可变 Edge Shard 中。这些重复的点现在可以从可变 Edge Shard 中删除。
from qdrant_edge import (
Filter,
FieldCondition,
RangeFloat
)
mutable_shard.update(
UpdateOperation.delete_points_by_filter(Filter(
must=[
FieldCondition(
key=SYNC_TIMESTAMP_KEY, range=RangeFloat(lte=sync_timestamp)
)
])
)
)
let filter = Filter::new_must(Condition::Field(FieldCondition::new_range(
SYNC_TIMESTAMP_KEY.parse::<JsonPath>().unwrap(),
Range {
lte: Some(OrderedFloat(sync_timestamp)),
..Default::default()
},
)));
mutable_shard.update(UpdateOperation::PointOperation(
PointOperations::DeletePointsByFilter(filter),
))?;
5. 查询两个 Edge Shard
为了在所有数据上提供统一的搜索体验,请查询可变和不可变两个 Edge Shard 并合并结果集。由于一个点可能同时存在于两个 Edge Shard 中,请根据点 ID 对结果进行去重。
from qdrant_edge import Query, QueryRequest
query_request = QueryRequest(
query=Query.Nearest([0.2, 0.1, 0.9, 0.7], using=VECTOR_NAME),
limit=10,
with_vector=False,
with_payload=True
)
mutable_results = mutable_shard.query(query_request)
immutable_results = immutable_shard.query(query_request)
all_results = list(mutable_results) + list(immutable_results)
all_results.sort(key=lambda x: x.score, reverse=True)
seen_ids = set()
unique_results = []
for result in all_results:
if result.id not in seen_ids:
seen_ids.add(result.id)
unique_results.append(result)
results= [
{
"id": result.id,
"score": result.score,
"payload": result.payload
}
for result in unique_results[:10]
]
let query = QueryRequest {
prefetches: vec![],
query: Some(ScoringQuery::Vector(QueryEnum::Nearest(NamedQuery {
query: vec![0.2f32, 0.1, 0.9, 0.7].into(),
using: Some(VECTOR_NAME.to_string()),
}))),
filter: None,
score_threshold: None,
limit: 10,
offset: 0,
params: None,
with_vector: WithVector::Bool(false),
with_payload: WithPayloadInterface::Bool(true),
};
let mut all_results = mutable_shard.query(query.clone())?;
all_results.extend(immutable_shard.query(query)?);
all_results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut seen_ids = std::collections::HashSet::new();
let results: Vec<_> = all_results
.into_iter()
.filter(|p| seen_ids.insert(p.id.clone()))
.take(10)
.collect();
支持
如需在项目中实施 Qdrant Edge 的明确支持,请联系 Qdrant 销售团队。