数据同步模式

本页面介绍了在 Qdrant 边缘分片(Edge Shards)与 Qdrant 服务器集合(Collections)之间同步数据的模式。有关实现这些模式的实用端到端指南,请参阅 Qdrant 边缘同步指南

从现有的 Qdrant 集合初始化边缘分片

您可能不想从空的边缘分片开始,而是希望使用服务器端集合中预先存在的数据对其进行初始化。您可以通过恢复服务器端集合中某个分片的快照来实现这一点。

Qdrant Edge Shards can be initialized from snapshots of server-side shards

在创建用于同步的快照时,请在快照 URL 中指定适用的服务器端分片 ID。这允许单个集合服务于多个独立用户或设备,每个用户或设备拥有各自的边缘分片。有关 Qdrant 分片策略的更多信息,请阅读 分层多租户文档

首先,构建一个快照 URL

COLLECTION_NAME="edge-collection"

snapshot_url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
const COLLECTION_NAME: &str = "edge-collection";

let snapshot_url = format!(
    "{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot"
);

请注意,此示例使用的是分片 ID 0

使用该快照 URL,您可以将快照下载到本地磁盘,并利用其中的数据来初始化一个新的边缘分片。

from pathlib import Path
from qdrant_edge import EdgeShard
import requests
import shutil
import tempfile

SHARD_DIRECTORY = "./qdrant-edge-directory"
data_dir = Path(SHARD_DIRECTORY)

with tempfile.TemporaryDirectory(dir=data_dir.parent) as restore_dir:
    snapshot_path = Path(restore_dir) / "shard.snapshot"

    with requests.get(snapshot_url, headers={"api-key": QDRANT_API_KEY}, stream=True) as r:
        r.raise_for_status()
        with open(snapshot_path, "wb") as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)

    if data_dir.exists():
        shutil.rmtree(data_dir)
    data_dir.mkdir(parents=True, exist_ok=True)

    EdgeShard.unpack_snapshot(str(snapshot_path), str(data_dir))

edge_shard = EdgeShard.load(SHARD_DIRECTORY)
const SHARD_DIRECTORY: &str = "./qdrant-edge-directory";
let data_dir = Path::new(SHARD_DIRECTORY);

let restore_dir = tempfile::Builder::new()
    .tempdir_in(data_dir.parent().unwrap_or(Path::new(".")))?;
let snapshot_path = restore_dir.path().join("shard.snapshot");

let mut bytes = Vec::new();
std::io::copy(
    &mut ureq::get(&snapshot_url)
        .header("api-key", QDRANT_API_KEY)
        .call()?
        .into_body()
        .into_reader(),
    &mut bytes,
)?;
fs_err::write(&snapshot_path, &bytes)?;

if data_dir.exists() {
    fs_err::remove_dir_all(data_dir)?;
}
fs_err::create_dir_all(data_dir)?;

EdgeShard::unpack_snapshot(&snapshot_path, data_dir)?;

let edge_shard = EdgeShard::load(data_dir, None)?;

此代码首先将快照下载到临时目录。接下来,EdgeShard.unpack_snapshot 将下载的快照解压到数据目录中,并使用解压后的数据和配置初始化 EdgeShard。

edge_shard 将使用与创建快照的源集合相同的配置和文件结构,包括向量索引和载荷(payload)索引。

使用服务器端变更更新 Qdrant Edge

为了使边缘分片与服务器集合中的新数据保持同步,您可以定期下载并应用快照。每次都恢复完整快照会造成不必要的开销。相反,您可以使用增量快照来恢复自上次快照以来的变更。基于描述所有段(segments)和元数据的边缘分片清单,增量快照仅包含已变更的段。

manifest = edge_shard.snapshot_manifest()

url = f"{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot/partial/create"

with tempfile.TemporaryDirectory(dir=data_dir) as temp_dir:
    partial_snapshot_path = Path(temp_dir) / "partial.snapshot"
    response = requests.post(url, headers={"api-key": QDRANT_API_KEY}, json=manifest, stream=True)
    response.raise_for_status()

    with open(partial_snapshot_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    edge_shard.update_from_snapshot(str(partial_snapshot_path))
let current_manifest = edge_shard.snapshot_manifest()?;

let update_url = format!(
    "{QDRANT_URL}/collections/{COLLECTION_NAME}/shards/0/snapshot\
    /partial/create"
);

let temp_dir = tempfile::tempdir_in(data_dir)?;
let partial_snapshot_path = temp_dir.path().join("partial.snapshot");

let mut bytes = Vec::new();
std::io::copy(
    &mut ureq::post(&update_url)
        .header("api-key", QDRANT_API_KEY)
        .send_json(&current_manifest)?
        .into_body()
        .into_reader(),
    &mut bytes,
)?;
fs_err::write(&partial_snapshot_path, &bytes)?;

let unpacked_dir = tempfile::tempdir_in(data_dir)?;
EdgeShard::unpack_snapshot(&partial_snapshot_path, unpacked_dir.path())?;
let snapshot_manifest = SnapshotManifest::load_from_snapshot(
    unpacked_dir.path(),
    None,
)?;

let edge_shard = EdgeShard::recover_partial_snapshot(
    data_dir,
    &current_manifest,
    unpacked_dir.path(),
    &snapshot_manifest,
)?;

从边缘分片更新服务器集合

要将数据从边缘分片同步到服务器集合,请在您的应用程序中实现双写(dual-write)机制。当您在边缘分片中添加或更新点时,请同时使用 Qdrant 客户端将其存储到服务器集合中。

除了直接写入服务器集合外,您可能还需要设置一个后台作业或消息队列,以异步处理同步操作。由于应用程序的网络连接可能并不总是稳定,因此将更新排队可以确保在网络恢复连接时数据最终能够完成同步。

首先,进行初始化

  • 从零开始或从服务器端快照初始化边缘分片
  • 建立 Qdrant 服务器连接。
详细信息

初始化边缘分片

from pathlib import Path
from qdrant_edge import (
    Distance,
    EdgeConfig,
    EdgeVectorParams,
)

SHARD_DIRECTORY = "./qdrant-edge-directory"
VECTOR_NAME="my-vector"
VECTOR_DIMENSION=4

Path(SHARD_DIRECTORY).mkdir(parents=True, exist_ok=True)
config = EdgeConfig(
    vectors={
        VECTOR_NAME: EdgeVectorParams(
            size=VECTOR_DIMENSION,
            distance=Distance.Cosine,
        )
    }
)

edge_shard = EdgeShard.create(SHARD_DIRECTORY, config)
const VECTOR_DIMENSION: usize = 4;
const VECTOR_NAME: &str = "my-vector";

fs_err::create_dir_all(SHARD_DIRECTORY)?;
let config = EdgeConfig {
    on_disk_payload: true,
    vectors: HashMap::from([(
        VECTOR_NAME.to_string(),
        EdgeVectorParams {
            size: VECTOR_DIMENSION,
            distance: qdrant_edge::Distance::Cosine,
            on_disk: Some(true),
            quantization_config: None,
            multivector_config: None,
            datatype: None,
            hnsw_config: None,
        },
    )]),
    sparse_vectors: HashMap::new(),
    hnsw_config: Default::default(),
    quantization_config: None,
    optimizers: Default::default(),
};

let edge_shard = EdgeShard::new(
    Path::new(SHARD_DIRECTORY),
    config,
)?;

初始化到服务器的 Qdrant 客户端连接,并在目标集合不存在时创建它

from qdrant_client import QdrantClient, models

server_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)

COLLECTION_NAME="edge-collection"

if not server_client.collection_exists(collection_name=COLLECTION_NAME):

    server_client.create_collection(
        collection_name=COLLECTION_NAME,
        vectors_config={VECTOR_NAME: models.VectorParams(size=VECTOR_DIMENSION, distance=models.Distance.COSINE)}
    )
let server_client = Qdrant::from_url(QDRANT_URL)
    .api_key(QDRANT_API_KEY)
    .build()?;

if !server_client.collection_exists(COLLECTION_NAME).await? {
    server_client
        .create_collection(
            CreateCollectionBuilder::new(COLLECTION_NAME).vectors_config(
                VectorParamsBuilder::new(
                    VECTOR_DIMENSION as u64,
                    Distance::Cosine,
                ),
            ),
        )
        .await?;
}

接下来,实例化用于保存需要与服务器同步的点的队列

from queue import Empty, Queue

# This is in-memory queue
# For production use cases consider persisting changes
upload_queue: Queue[models.PointStruct] = Queue()
// This is an in-memory queue.
// For production use cases consider persisting changes.
let mut upload_queue: std::collections::VecDeque<PointStruct> =
    std::collections::VecDeque::new();

在边缘分片中添加或更新点时,同时将该点加入同步队列。

from qdrant_edge import ( Point, UpdateOperation )
from qdrant_client import models

id=1
vector=[0.1, 0.2, 0.3, 0.4]
payload={"color": "red"}

point = Point(
    id=id,
    vector={VECTOR_NAME: vector},
    payload=payload
)

edge_shard.update(UpdateOperation.upsert_points([point]))

rest_point = models.PointStruct(id=id, vector={VECTOR_NAME: vector}, payload=payload)

upload_queue.put(rest_point)
let id = 1u64;
let vector = vec![0.1f32, 0.2, 0.3, 0.4];
let payload = json!({"color": "red"});

let edge_points: Vec<PointStructPersisted> = vec![
    EdgePoint::new(
        PointId::NumId(id),
        Vectors::new_named([(VECTOR_NAME, vector.clone())]),
        payload.clone(),
    )
    .into(),
];
edge_shard.update(UpdateOperation::PointOperation(
    PointOperations::UpsertPoints(
        PointInsertOperations::PointsList(edge_points),
    ),
))?;

let server_point = PointStruct::new(
    id,
    HashMap::from([(VECTOR_NAME.to_string(), vector)]),
    payload.as_object().cloned().unwrap_or_default(),
);
upload_queue.push_back(server_point);

后台工作进程可以处理上传队列并将点同步到服务器集合。此示例一次最多批量上传 10 个点

BATCH_SIZE = 10
points_to_upload: list[models.PointStruct] = []

while len(points_to_upload) < BATCH_SIZE:
    try:
        points_to_upload.append(upload_queue.get_nowait())
    except Empty:
        break

if points_to_upload:
    server_client.upsert(
        collection_name=COLLECTION_NAME, points=points_to_upload
    )
const BATCH_SIZE: usize = 10;
let points_to_upload: Vec<PointStruct> = upload_queue
    .drain(..BATCH_SIZE.min(upload_queue.len()))
    .collect();

if !points_to_upload.is_empty() {
    server_client
        .upsert_points(UpsertPointsBuilder::new(
            COLLECTION_NAME,
            points_to_upload,
        ))
        .await?;
}

请确保妥善处理错误和重试逻辑,以应对网络问题或服务器不可用的情况。

此页面有用吗?

感谢您的反馈!🙏

很遗憾听到这些。😔 您可以在 GitHub 上编辑此页面,或者创建一个 GitHub Issue。