Qdrant 中的基于时间的分片

在处理社交媒体或图像/视频流等海量、高速变化的数据集时,高效的存储和检索至关重要。通常情况下,只有最近的数据具有相关性,而旧数据可以归档或删除。例如,在社交媒体帖子的情感分析中,你可能只需要最近 7 天的数据来捕捉当前趋势,而大多数查询都集中在过去 24 小时内。

将所有数据存储在默认分片的 Qdrant 集合中,在删除旧点时可能会导致整个数据集进行昂贵的重新索引,从而影响性能。一个更好的解决方案是基于时间的分片(time-based sharding),即根据时间戳将数据点路由到特定的分片(或多个分片)。对于具有自然生存时间 (TTL) 分割的用例,按基于时间戳的键进行分片可以实现高效的近期数据查询,并允许用户无缝地丢弃旧数据。

例如,使用按天分片,今天的数据存储在今天的碎片中,昨天的数据存储在昨天的碎片中,依此类推。查询可以针对特定分片(例如今天)或多个分片以覆盖日期范围。

基于时间的分片根据时间戳将数据路由到不同的分片。通常,所有写入操作都流向最新的分片,而查询则可以针对一个或多个分片。后台可以修剪旧的分片,而不会影响性能。

根据数据量和保留需求,你可以按小时、周、月或任何适合你用例的时间间隔进行分片。

本教程将指导你如何实现基于时间的分片,内容涵盖

  • 创建具有用户自定义分片的 Qdrant 集合
  • 根据时间戳将历史数据批量摄入到正确的分片中
  • 将新数据分配到最新的分片
  • 查询一个或多个分片
  • 修剪较旧的分片

安装并初始化 Qdrant 客户端

首先,安装 Qdrant 客户端

!pip install qdrant-client

接下来,初始化客户端

from qdrant_client import QdrantClient, models

client = QdrantClient(
    url=QDRANT_URL,
    api_key=QDRANT_API_KEY,
    cloud_inference=True
)
const client = new QdrantClient({
    url: QDRANT_URL,
    apiKey: QDRANT_API_KEY,
});
let client = Qdrant::from_url(QDRANT_URL)
    .api_key(QDRANT_API_KEY)
    .build()?;
QdrantClient client =
    new QdrantClient(
        QdrantGrpcClient.newBuilder(QDRANT_URL, 6334, true)
            .withApiKey(QDRANT_API_KEY)
            .build());
var client = new QdrantClient(
	host: QDRANT_URL,
	https: true,
	apiKey: QDRANT_API_KEY
);
client, err := qdrant.NewClient(&qdrant.Config{
	Host:   QDRANT_URL,
	APIKey: QDRANT_API_KEY,
	UseTLS: true,
})

本教程假设你正在使用 Qdrant Cloud Inference 来生成向量嵌入。如果你管理自己的嵌入基础设施,你可以应用同样的原则,但需要调整代码示例以使用你的嵌入服务。

创建集合

通过将分片方法设置为 custom,创建一个具有用户自定义分片的集合。

from qdrant_client import models

collection_name = "my_collection"

if client.collection_exists(collection_name=collection_name):
    client.delete_collection(collection_name=collection_name)

client.create_collection(
    collection_name=collection_name,
    vectors_config={
        "dense_vector": models.VectorParams(
            size=384, distance=models.Distance.COSINE
        )
    },
    sharding_method=models.ShardingMethod.CUSTOM
)
const collectionName = "my_collection";

if (await client.collectionExists(collectionName)) {
    await client.deleteCollection(collectionName);
}

await client.createCollection(collectionName, {
    vectors: {
        dense_vector: {
            size: 384,
            distance: "Cosine",
        },
    },
    sharding_method: "custom",
});
let collection_name = "my_collection";

if client.collection_exists(collection_name).await? {
    client.delete_collection(collection_name).await?;
}

let mut vectors_config = VectorsConfigBuilder::default();
vectors_config.add_named_vector_params(
    "dense_vector",
    VectorParamsBuilder::new(384, Distance::Cosine),
);

client
    .create_collection(
        CreateCollectionBuilder::new(collection_name)
            .vectors_config(vectors_config)
            .sharding_method(ShardingMethod::Custom.into()),
    )
    .await?;
String collectionName = "my_collection";

if (client.collectionExistsAsync(collectionName).get()) {
    client.deleteCollectionAsync(collectionName).get();
}

client.createCollectionAsync(
    CreateCollection.newBuilder()
        .setCollectionName(collectionName)
        .setVectorsConfig(VectorsConfig.newBuilder().setParamsMap(
            VectorParamsMap.newBuilder().putAllMap(Map.of(
                "dense_vector",
                VectorParams.newBuilder()
                    .setSize(384)
                    .setDistance(Distance.Cosine)
                    .build()))))
        .setShardingMethod(ShardingMethod.Custom)
        .build()
).get();
string collectionName = "my_collection";

if (await client.CollectionExistsAsync(collectionName))
	await client.DeleteCollectionAsync(collectionName);

await client.CreateCollectionAsync(
	collectionName: collectionName,
	vectorsConfig: new VectorParamsMap
	{
		Map = {
			["dense_vector"] = new VectorParams { Size = 384, Distance = Distance.Cosine }
		}
	},
	shardingMethod: ShardingMethod.Custom
);
collectionName := "my_collection"

exists, err := client.CollectionExists(context.Background(), collectionName)
if exists {
	client.DeleteCollection(context.Background(), collectionName)
}

client.CreateCollection(context.Background(), &qdrant.CreateCollection{
	CollectionName: collectionName,
	VectorsConfig: qdrant.NewVectorsConfigMap(
		map[string]*qdrant.VectorParams{
			"dense_vector": {
				Size:     384,
				Distance: qdrant.Distance_Cosine,
			},
		},
	),
	ShardingMethod: qdrant.ShardingMethod_Custom.Enum(),
})

可以通过分片键访问自定义分片。在本教程中,分片键是从每个数据点的时间戳中提取的 YYYY-MM-DD 格式的日期。

该集合将为每个分片键提供一个单独的分片(即每天一个分片)。对于超大数据集,你可以通过配置集合的 shard_number 来提高写入吞吐量。shard_number 默认为 1。将其设置为更高的值可以在每个分片键下创建多个分片,从而将写入负载分散到集群中的多个节点上。但是,请避免创建过多的分片,因为每个分片都会消耗资源并增加开销,这可能导致性能下降。请测试你的数据集和集群配置的最佳分片数量。

关于使用用户自定义分片的集合与使用自动分片的常规集合,有两点需要注意

  • 对于使用自动分片的常规集合,shard_number 决定了集合的总分片数。然而,对于用户自定义分片,shard_number 决定了每个分片键的分片数量:集合的总分片数等于分片键(天数)的数量乘以 shard_number
  • 你可以应用于常规集合的集合级配置更改(例如 HNSW 参数)同样可以应用于具有用户自定义分片的集合。这些更改会追溯应用到现有分片,并应用到将来创建的新分片中。

摄入历史数据

时间序列数据通常以流的形式到达,并不断添加新的数据点。每个数据点可能包含一个表示其创建时间的时间戳。你可以使用此时间戳来确定数据点所属的分片。如果你的数据没有时间戳,可以使用当前时间。

本教程使用了包含时间戳的社交媒体帖子示例数据集。假设今天是 2026 年 4 月 7 日。你将从摄入本周(4 月 1 日至 7 日)的一些历史数据开始。以下是数据集中几行的示例

datetimetext
2026-04-06T09:04:28照进办公室的四月阳光让一切都变得美好。
2026-04-06T09:04:32晨间伸展,好咖啡,清晰的意图。周一:搞定。
2026-04-06T09:05:52感谢这一周的高效开端。

上传数据集并将每一天的数据存储在各自的分片中

from qdrant_client.http.models import PointStruct, Document
import uuid

csv_url = 'https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv'

# Retrieve a list of existing shard keys in the collection
existing_shard_keys = list(client.list_shard_keys(collection_name=collection_name).shard_keys)

dense_model = "sentence-transformers/all-MiniLM-L6-v2"
batch_size = 100
current_date = None
buffer: list[PointStruct] = []

for row in parse_csv(csv_url):
    shard_date = row['datetime'][:10]  # Extract YYYY-MM-DD

    if shard_date != current_date:
        # Flush buffer for the previous date before switching
        if buffer:
            client.upload_points(
                collection_name=collection_name,
                points=buffer,
                shard_key_selector=current_date,
            )
            buffer = []

        # Create shard for the new date if it doesn't exist yet
        if shard_date not in existing_shard_keys:
            client.create_shard_key(collection_name, shard_date)
            existing_shard_keys.append(shard_date)

        current_date = shard_date

    # Add point to buffer
    buffer.append(PointStruct(
        id=uuid.uuid4().hex,
        payload={"text": row['text'], "datetime": row['datetime']},
        vector={"dense_vector": Document(text=row["text"], model=dense_model)}
    ))

    # Flush batch if buffer size exceeds batch size
    if len(buffer) >= batch_size:
        client.upload_points(
            collection_name=collection_name,
            points=buffer,
            shard_key_selector=current_date,
        )
        buffer = []

# Flush remaining partial batch
if buffer:
    client.upload_points(
        collection_name=collection_name,
        points=buffer,
        shard_key_selector=current_date,
    )
const csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";

// Retrieve a list of existing shard keys in the collection
const shardKeysResult = await client.listShardKeys(collectionName);
const existingShardKeys = new Set((shardKeysResult.shard_keys ?? []).map((d) => String(d.key)));

const denseModel = "sentence-transformers/all-MiniLM-L6-v2";
const batchSize = 100;
let currentDate = "";
let buffer: Extract<Parameters<typeof client.upsert>[1], { points: unknown }>['points'] = [];

for await (const { text: postText, datetime } of parseCSV(csvUrl)) {
    const shardDate = datetime.slice(0, 10); // Extract YYYY-MM-DD

    if (shardDate !== currentDate) {
        // Flush buffer for the previous date before switching
        if (buffer.length > 0) {
            await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
            buffer = [];
        }

        // Create shard for the new date if it doesn't exist yet
        if (!existingShardKeys.has(shardDate)) {
            await client.createShardKey(collectionName, { shard_key: shardDate });
            existingShardKeys.add(shardDate);
        }

        currentDate = shardDate;
    }

    // Add point to buffer
    buffer.push({
        id: crypto.randomUUID(),
        vector: { dense_vector: { text: postText, model: denseModel } },
        payload: { text: postText, datetime },
    });

    // Flush batch if buffer size exceeds batch size
    if (buffer.length >= batchSize) {
        await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
        buffer = [];
    }
}

// Flush remaining partial batch
if (buffer.length > 0) {
    await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
}
let csv_url = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";

// Retrieve a list of existing shard keys in the collection
let response = client.list_shard_keys(collection_name).await?;
let mut existing_shard_keys: HashSet<String> = response
    .shard_keys
    .into_iter()
    .filter_map(|d| {
        d.key?.key.and_then(|k| match k {
            shard_key::Key::Keyword(s) => Some(s),
            _ => None,
        })
    })
    .collect();

let dense_model = "sentence-transformers/all-MiniLM-L6-v2";
let batch_size = 100;
let mut current_date = String::new();
let mut buffer: Vec<PointStruct> = Vec::new();

for row in parse_csv(csv_url)? {
    let row = row?;
    let text = row.text;
    let datetime = row.datetime;
    let shard_date = datetime[..10].to_string(); // Extract YYYY-MM-DD

    if shard_date != current_date {
        // Flush buffer for the previous date before switching
        if !buffer.is_empty() {
            client
                .upsert_points(
                    UpsertPointsBuilder::new(
                        collection_name,
                        std::mem::take(&mut buffer),
                    )
                    .shard_key_selector(current_date.clone()),
                )
                .await?;
        }

        // Create shard for the new date if it doesn't exist yet
        if !existing_shard_keys.contains(&shard_date) {
            client
                .create_shard_key(
                    CreateShardKeyRequestBuilder::new(collection_name).request(
                        CreateShardKeyBuilder::default().shard_key(shard_date.clone()),
                    ),
                )
                .await?;
            existing_shard_keys.insert(shard_date.clone());
        }

        current_date = shard_date;
    }

    // Add point to buffer
    buffer.push(PointStruct::new(
        uuid::Uuid::new_v4().to_string(),
        HashMap::from([(
            "dense_vector".to_string(),
            DocumentBuilder::new(&text, dense_model).build(),
        )]),
        [("text", text.into()), ("datetime", datetime.into())],
    ));

    // Flush batch if buffer size exceeds batch size
    if buffer.len() >= batch_size {
        client
            .upsert_points(
                UpsertPointsBuilder::new(collection_name, std::mem::take(&mut buffer))
                    .shard_key_selector(current_date.clone()),
            )
            .await?;
    }
}

// Flush remaining partial batch
if !buffer.is_empty() {
    client
        .upsert_points(
            UpsertPointsBuilder::new(collection_name, buffer)
                .shard_key_selector(current_date.clone()),
        )
        .await?;
}
String csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";

// Retrieve a list of existing shard keys in the collection
var shardKeyDescriptions = client.listShardKeysAsync(collectionName).get();
Set<String> existingShardKeys = new HashSet<>();
for (var desc : shardKeyDescriptions) {
    existingShardKeys.add(desc.getKey().getKeyword());
}

String denseModel = "sentence-transformers/all-MiniLM-L6-v2";
int batchSize = 100;
String currentDate = null;
List<PointStruct> buffer = new ArrayList<>();

try (var stream = parseCSV(csvUrl)) {
    for (var row : (Iterable<CsvRow>) stream::iterator) {
        String text = row.text;
        String datetime = row.datetime;
        String shardDate = datetime.substring(0, 10); // Extract YYYY-MM-DD

        if (!shardDate.equals(currentDate)) {
            // Flush buffer for the previous date before switching
            if (!buffer.isEmpty()) {
                client.upsertAsync(
                    UpsertPoints.newBuilder()
                        .setCollectionName(collectionName)
                        .addAllPoints(buffer)
                        .setShardKeySelector(shardKeySelector(currentDate))
                        .build()
                ).get();
                buffer.clear();
            }

            // Create shard for the new date if it doesn't exist yet
            if (!existingShardKeys.contains(shardDate)) {
                client.createShardKeyAsync(
                    CreateShardKeyRequest.newBuilder()
                        .setCollectionName(collectionName)
                        .setRequest(CreateShardKey.newBuilder()
                            .setShardKey(shardKey(shardDate))
                            .build())
                        .build()
                ).get();
                existingShardKeys.add(shardDate);
            }

            currentDate = shardDate;
        }

        // Add point to buffer
        buffer.add(
            PointStruct.newBuilder()
                .setId(id(UUID.randomUUID()))
                .setVectors(namedVectors(Map.of(
                    "dense_vector",
                    vector(Document.newBuilder()
                        .setText(text)
                        .setModel(denseModel)
                        .build()))))
                .putAllPayload(Map.of("text", value(text), "datetime", value(datetime)))
                .build());

        // Flush batch if buffer size exceeds batch size
        if (buffer.size() >= batchSize) {
            client.upsertAsync(
                UpsertPoints.newBuilder()
                    .setCollectionName(collectionName)
                    .addAllPoints(buffer)
                    .setShardKeySelector(shardKeySelector(currentDate))
                    .build()
            ).get();
            buffer.clear();
        }
    }
}

// Flush remaining partial batch
if (!buffer.isEmpty()) {
    client.upsertAsync(
        UpsertPoints.newBuilder()
            .setCollectionName(collectionName)
            .addAllPoints(buffer)
            .setShardKeySelector(shardKeySelector(currentDate))
            .build()
    ).get();
}
string csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";

// Retrieve a list of existing shard keys in the collection
var existingShardKeys = (await client.ListShardKeysAsync(collectionName))
	.Select(sk => sk.Key.Keyword)
	.ToHashSet();

string denseModel = "sentence-transformers/all-MiniLM-L6-v2";
int batchSize = 100;
string? currentDate = null;
var buffer = new List<PointStruct>();

await foreach (var (text, datetime) in ParseCsv(csvUrl))
{
	string shardDate = datetime[..10]; // Extract YYYY-MM-DD

	if (shardDate != currentDate)
	{
		// Flush buffer for the previous date before switching
		if (buffer.Count > 0)
		{
			await client.UpsertAsync(
				collectionName: collectionName,
				points: buffer,
				shardKeySelector: new ShardKeySelector
				{
					ShardKeys = { new List<ShardKey> { currentDate! } }
				}
			);
			buffer.Clear();
		}

		// Create shard for the new date if it doesn't exist yet
		if (!existingShardKeys.Contains(shardDate))
		{
			await client.CreateShardKeyAsync(
				collectionName,
				new CreateShardKey { ShardKey = new ShardKey { Keyword = shardDate } }
			);
			existingShardKeys.Add(shardDate);
		}

		currentDate = shardDate;
	}

	// Add point to buffer
	buffer.Add(new PointStruct
	{
		Id = Guid.NewGuid(),
		Vectors = new Dictionary<string, Vector>
		{
			["dense_vector"] = new Document { Text = text, Model = denseModel }
		},
		Payload = { ["text"] = text, ["datetime"] = datetime }
	});

	// Flush batch if buffer size exceeds batch size
	if (buffer.Count >= batchSize)
	{
		await client.UpsertAsync(
			collectionName: collectionName,
			points: buffer,
			shardKeySelector: new ShardKeySelector
			{
				ShardKeys = { new List<ShardKey> { currentDate! } }
			}
		);
		buffer.Clear();
	}
}

// Flush remaining partial batch
if (buffer.Count > 0)
{
	await client.UpsertAsync(
		collectionName: collectionName,
		points: buffer,
		shardKeySelector: new ShardKeySelector
		{
			ShardKeys = { new List<ShardKey> { currentDate! } }
		}
	);
}
csvUrl := "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv"

shardKeyDescriptions, err := client.ListShardKeys(context.Background(), collectionName)

// Retrieve a list of existing shard keys in the collection
existingShardKeys := make(map[string]bool)
for _, desc := range shardKeyDescriptions {
	existingShardKeys[desc.Key.GetKeyword()] = true
}

denseModel := "sentence-transformers/all-MiniLM-L6-v2"
batchSize := 100
var currentDate string
var buffer []*qdrant.PointStruct

err = parseCSV(csvUrl, func(row CSVRow) {
	text := row.Text
	datetime := row.Datetime
	shardDate := datetime[:10] // Extract YYYY-MM-DD

	if shardDate != currentDate {
		// Flush buffer for the previous date before switching
		if len(buffer) > 0 {
			client.Upsert(context.Background(), &qdrant.UpsertPoints{
				CollectionName: collectionName,
				Points:         buffer,
				ShardKeySelector: &qdrant.ShardKeySelector{
					ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
				},
			})
			buffer = nil
		}

		// Create shard for the new date if it doesn't exist yet
		if !existingShardKeys[shardDate] {
			client.CreateShardKey(context.Background(), collectionName, &qdrant.CreateShardKey{
				ShardKey: qdrant.NewShardKey(shardDate),
			})
			existingShardKeys[shardDate] = true
		}

		currentDate = shardDate
	}

	// Add point to buffer
	buffer = append(buffer, &qdrant.PointStruct{
		Id: qdrant.NewID(uuid.New().String()),
		Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{
			"dense_vector": qdrant.NewVectorDocument(&qdrant.Document{
				Text:  text,
				Model: denseModel,
			}),
		}),
		Payload: qdrant.NewValueMap(map[string]any{
			"text":     text,
			"datetime": datetime,
		}),
	})

	// Flush batch if buffer size exceeds batch size
	if len(buffer) >= batchSize {
		client.Upsert(context.Background(), &qdrant.UpsertPoints{
			CollectionName: collectionName,
			Points:         buffer,
			ShardKeySelector: &qdrant.ShardKeySelector{
				ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
			},
		})
		buffer = nil
	}
})

// Flush remaining partial batch
if len(buffer) > 0 {
	client.Upsert(context.Background(), &qdrant.UpsertPoints{
		CollectionName: collectionName,
		Points:         buffer,
		ShardKeySelector: &qdrant.ShardKeySelector{
			ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
		},
	})
}

让我们拆解一下代码

  • 首先,检索集合中现有的分片键列表。由于你刚创建集合,列表应该为空,但在生产环境中,这能确保你考虑到任何现有数据。
  • 接下来,使用辅助函数从 URL 流式传输并解析 CSV 文件。
    详细信息
    import csv
      import urllib.request
      
      def parse_csv(url):
          with urllib.request.urlopen(url) as response:
              reader = csv.DictReader(line.decode('utf-8') for line in response)
              yield from reader
      
    function parseCsvLine(line: string): string[] {
          const fields: string[] = [];
          let i = 0;
          while (i < line.length) {
              if (line[i] === '"') {
                  i++;
                  let field = "";
                  while (i < line.length) {
                      if (line[i] === '"' && line[i + 1] === '"') { field += '"'; i += 2; }
                      else if (line[i] === '"') { i++; break; }
                      else { field += line[i++]; }
                  }
                  fields.push(field);
                  if (line[i] === ",") i++;
              } else {
                  const start = i;
                  while (i < line.length && line[i] !== ",") i++;
                  fields.push(line.slice(start, i));
                  if (i < line.length) i++;
              }
          }
          return fields;
      }
      
      async function* parseCSV(url: string): AsyncGenerator<{ text: string; datetime: string }> {
          const response = await fetch(url);
          const reader = response.body!.getReader();
          const decoder = new TextDecoder();
          let remainder = "";
          let headers: string[] | null = null;
          let textIdx = -1;
          let datetimeIdx = -1;
      
          while (true) {
              const { done, value } = await reader.read();
              const chunk = done ? "" : decoder.decode(value, { stream: true });
              const lines = (remainder + chunk).split("\n");
              remainder = done ? "" : lines.pop()!;
      
              for (const line of lines) {
                  if (!line.trim()) continue;
                  if (headers === null) {
                      headers = line.split(",");
                      textIdx = headers.indexOf("text");
                      datetimeIdx = headers.indexOf("datetime");
                      continue;
                  }
                  const fields = parseCsvLine(line);
                  yield { text: fields[textIdx], datetime: fields[datetimeIdx] };
              }
      
              if (done) break;
          }
      }
      
    struct CsvRow {
          text: String,
          datetime: String,
      }
      
      fn parse_csv(url: &str) -> anyhow::Result<impl Iterator<Item = anyhow::Result<CsvRow>>> {
          let reader = ureq::get(url).call()?.into_body().into_reader();
          let mut rdr = csv::Reader::from_reader(reader);
          let headers = rdr.headers()?.clone();
          let text_idx = headers.iter().position(|h| h == "text").unwrap();
          let datetime_idx = headers.iter().position(|h| h == "datetime").unwrap();
          let iter = rdr.into_records().map(move |result| {
              let record = result?;
              Ok(CsvRow {
                  text: record[text_idx].to_string(),
                  datetime: record[datetime_idx].to_string(),
              })
          });
          Ok(iter)
      }
      
    static class CsvRow {
          final String text;
          final String datetime;
          CsvRow(String text, String datetime) { this.text = text; this.datetime = datetime; }
      }
      
      static Stream<CsvRow> parseCSV(String url) throws Exception {
          Function<String, List<String>> parseCsvLine = line -> {
              List<String> fields = new ArrayList<>();
              boolean inQuotes = false;
              var sb = new StringBuilder();
              for (char c : line.toCharArray()) {
                  if (c == '"') {
                      inQuotes = !inQuotes;
                  } else if (c == ',' && !inQuotes) {
                      fields.add(sb.toString());
                      sb.setLength(0);
                  } else {
                      sb.append(c);
                  }
              }
              fields.add(sb.toString());
              return fields;
          };
      
          var reader = new BufferedReader(new InputStreamReader(new URL(url).openStream()));
          String headerLine = reader.readLine();
          List<String> headers = List.of(headerLine.split(","));
          int textIdx = headers.indexOf("text");
          int datetimeIdx = headers.indexOf("datetime");
      
          return reader.lines()
              .map(line -> {
                  List<String> fields = parseCsvLine.apply(line);
                  return new CsvRow(fields.get(textIdx), fields.get(datetimeIdx));
              })
              .onClose(() -> { try { reader.close(); } catch (Exception ignored) {} });
      }
      
    async IAsyncEnumerable<(string text, string datetime)> ParseCsv(string url)
      {
      	using var httpClient = new HttpClient();
      	using var stream = await httpClient.GetStreamAsync(url);
      	using var parser = new TextFieldParser(new StreamReader(stream));
      	parser.TextFieldType = Microsoft.VisualBasic.FileIO.FieldType.Delimited;
      	parser.SetDelimiters(",");
      	string[]? headers = parser.ReadFields();
      	int textIdx = Array.IndexOf(headers!, "text");
      	int datetimeIdx = Array.IndexOf(headers!, "datetime");
      	while (!parser.EndOfData)
      	{
      		var fields = parser.ReadFields()!;
      		yield return (fields[textIdx], fields[datetimeIdx]);
      	}
      }
      
    type CSVRow struct {
      	Text     string
      	Datetime string
      }
      
      func parseCSV(url string, fn func(CSVRow)) error {
      	resp, err := http.Get(url)
      	if err != nil {
      		return err
      	}
      	defer resp.Body.Close()
      
      	csvReader := csv.NewReader(resp.Body)
      	headers, err := csvReader.Read()
      	if err != nil {
      		return err
      	}
      
      	textIdx, datetimeIdx := -1, -1
      	for i, h := range headers {
      		switch h {
      		case "text":
      			textIdx = i
      		case "datetime":
      			datetimeIdx = i
      		}
      	}
      
      	for {
      		row, err := csvReader.Read()
      		if err == io.EOF {
      			break
      		}
      		if err != nil {
      			return err
      		}
      		fn(CSVRow{Text: row[textIdx], Datetime: row[datetimeIdx]})
      	}
      	return nil
      }
      
  • CSV 文件被逐行流式处理,并以 100 个点为一批进行缓冲,以实现高效上传。最佳批次大小取决于你的数据和集群,因此你可能需要尝试不同的大小以获得最佳性能。
  • 日期 (YYYY-MM-DD) 从每一行的 datetime 字段中提取。此日期被用作分片键,将数据路由到正确的分片。
  • 如果遇到新的日期且该分片尚不存在,则会创建一个新的分片。
  • 只要流中途日期发生变化,缓冲区就会刷新到前一个日期的分片中,确保帖子不会被写入错误的分片。
  • 数据写入时,每个点都会获得
    • 一个随机 UUID 作为点 ID
    • 帖子 textdatetime 作为 Payload
    • 使用 sentence-transformers/all-MiniLM-L6-v2 从帖子文本生成的稠密向量嵌入
  • 每 100 个点的完整批次都会被上传到 Qdrant,通过分片键选择器参数定位到正确的日期分片。
  • 循环结束后,任何剩余的点将作为最后一个部分批次进行上传。

查询数据

查询今天的数据

现在你可以对帖子执行语义查询。将分片键选择器设置为 2026-04-07(假设今天是 2026 年 4 月 7 日),将查询限制在存储今天数据的分片中

query_text = "coffee"

resp = client.query_points(
    collection_name=collection_name,
    query=Document(text=query_text, model=dense_model),
    using="dense_vector",
    limit=5,
    shard_key_selector="2026-04-07"
)
print(resp)
const queryText = "coffee";

const singleShardResult = await client.query(collectionName, {
    query: { text: queryText, model: denseModel },
    using: "dense_vector",
    limit: 5,
    shard_key: "2026-04-07",
});

for (const hit of singleShardResult.points) {
    console.log(hit);
}
let query_text = "coffee";

let result = client
    .query(
        QueryPointsBuilder::new(collection_name)
            .query(Query::new_nearest(Document::new(query_text, dense_model)))
            .using("dense_vector")
            .limit(5)
            .shard_key_selector("2026-04-07".to_string()),
    )
    .await?;

for hit in result.result {
    println!("{:?}", hit);
}
String queryText = "coffee";

var result = client.queryAsync(
    QueryPoints.newBuilder()
        .setCollectionName(collectionName)
        .setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
        .setUsing("dense_vector")
        .setLimit(5)
        .setShardKeySelector(shardKeySelector("2026-04-07"))
        .build()
).get();

for (var hit : result) {
    System.out.println(hit);
}
string queryText = "coffee";

var result = await client.QueryAsync(
	collectionName: collectionName,
	query: new Document { Text = queryText, Model = denseModel },
	usingVector: "dense_vector",
	limit: 5,
	shardKeySelector: new ShardKeySelector
	{
		ShardKeys = { new List<ShardKey> { "2026-04-07" } }
	}
);

foreach (var hit in result)
	Console.WriteLine(hit);
queryText := "coffee"

result, err := client.Query(context.Background(), &qdrant.QueryPoints{
	CollectionName: collectionName,
	Query:          qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
	Using:          qdrant.PtrOf("dense_vector"),
	Limit:          qdrant.PtrOf(uint64(5)),
	ShardKeySelector: &qdrant.ShardKeySelector{
		ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey("2026-04-07")},
	},
})

for _, hit := range result {
	fmt.Println(hit)
}

查询多天的数据

要查询多个分片,请将分片键选择器设置为分片键列表。例如,查询过去 2 天的数据(4 月 6 日至 7 日)

resp = client.query_points(
    collection_name=collection_name,
    query=Document(text=query_text, model=dense_model),
    using="dense_vector",
    limit=5,
    shard_key_selector=["2026-04-06","2026-04-07"]
)
print(resp)
const multiShardResult = await client.query(collectionName, {
    query: { text: queryText, model: denseModel },
    using: "dense_vector",
    limit: 5,
    shard_key: ["2026-04-06", "2026-04-07"],
});

for (const hit of multiShardResult.points) {
    console.log(hit);
}
let result = client
    .query(
        QueryPointsBuilder::new(collection_name)
            .query(Query::new_nearest(Document::new(query_text, dense_model)))
            .using("dense_vector")
            .limit(5)
            .shard_key_selector(ShardKeySelector {
                shard_keys: vec![
                    "2026-04-06".to_string().into(),
                    "2026-04-07".to_string().into(),
                ],
                fallback: None,
            }),
    )
    .await?;

for hit in result.result {
    println!("{:?}", hit);
}
result = client.queryAsync(
    QueryPoints.newBuilder()
        .setCollectionName(collectionName)
        .setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
        .setUsing("dense_vector")
        .setLimit(5)
        .setShardKeySelector(ShardKeySelector.newBuilder()
            .addShardKeys(shardKey("2026-04-06"))
            .addShardKeys(shardKey("2026-04-07"))
            .build())
        .build()
).get();

for (var hit : result) {
    System.out.println(hit);
}
result = await client.QueryAsync(
	collectionName: collectionName,
	query: new Document { Text = queryText, Model = denseModel },
	usingVector: "dense_vector",
	limit: 5,
	shardKeySelector: new ShardKeySelector
	{
		ShardKeys = { new List<ShardKey> { "2026-04-06", "2026-04-07" } }
	}
);

foreach (var hit in result)
	Console.WriteLine(hit);
result, err = client.Query(context.Background(), &qdrant.QueryPoints{
	CollectionName: collectionName,
	Query:          qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
	Using:          qdrant.PtrOf("dense_vector"),
	Limit:          qdrant.PtrOf(uint64(5)),
	ShardKeySelector: &qdrant.ShardKeySelector{
		ShardKeys: []*qdrant.ShardKey{
			qdrant.NewShardKey("2026-04-06"),
			qdrant.NewShardKey("2026-04-07"),
		},
	},
})

for _, hit := range result {
	fmt.Println(hit)
}

查询完整数据集

要查询整个数据集(所有分片),请省略分片键选择器参数

resp = client.query_points(
    collection_name=collection_name,
    query=Document(text=query_text, model=dense_model),
    using="dense_vector",
    limit=5,
)
print(resp)
const allShardsResult = await client.query(collectionName, {
    query: { text: queryText, model: denseModel },
    using: "dense_vector",
    limit: 5,
});

for (const hit of allShardsResult.points) {
    console.log(hit);
}
let result = client
    .query(
        QueryPointsBuilder::new(collection_name)
            .query(Query::new_nearest(Document::new(query_text, dense_model)))
            .using("dense_vector")
            .limit(5),
    )
    .await?;

for hit in result.result {
    println!("{:?}", hit);
}
result = client.queryAsync(
    QueryPoints.newBuilder()
        .setCollectionName(collectionName)
        .setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
        .setUsing("dense_vector")
        .setLimit(5)
        .build()
).get();

for (var hit : result) {
    System.out.println(hit);
}
result = await client.QueryAsync(
	collectionName: collectionName,
	query: new Document { Text = queryText, Model = denseModel },
	usingVector: "dense_vector",
	limit: 5
);

foreach (var hit in result)
	Console.WriteLine(hit);
result, err = client.Query(context.Background(), &qdrant.QueryPoints{
	CollectionName: collectionName,
	Query:          qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
	Using:          qdrant.PtrOf("dense_vector"),
	Limit:          qdrant.PtrOf(uint64(5)),
})

for _, hit := range result {
	fmt.Println(hit)
}

修剪分片

每天午夜,为当天要摄入的新数据创建一个新分片。如果你只查询最近 7 天的数据,你也可以删除最旧的分片。你可以通过 cron 任务自动化此过程。

from datetime import date, timedelta

today = "2026-04-08"
oldest_shard_key = (date.fromisoformat(today) - timedelta(days=7)).isoformat()

client.create_shard_key(collection_name, today)
client.delete_shard_key(collection_name, oldest_shard_key)
const today = "2026-04-08";
const oldestDate = new Date(today);
oldestDate.setDate(oldestDate.getDate() - 7);
const oldestShardKey = oldestDate.toISOString().slice(0, 10);

await client.createShardKey(collectionName, { shard_key: today });
await client.deleteShardKey(collectionName, { shard_key: oldestShardKey });
let today = "2026-04-08";
let oldest_shard_key = (NaiveDate::parse_from_str(today, "%Y-%m-%d")?
    - chrono::Duration::days(7))
.to_string();

client
    .create_shard_key(
        CreateShardKeyRequestBuilder::new(collection_name)
            .request(CreateShardKeyBuilder::default().shard_key(today.to_string())),
    )
    .await?;

client
    .delete_shard_key(
        DeleteShardKeyRequestBuilder::new(collection_name)
            .key(shard_key::Key::Keyword(oldest_shard_key)),
    )
    .await?;
String today = "2026-04-08";
String oldestShardKey = LocalDate.parse(today).minusDays(7).toString();

client.createShardKeyAsync(
    CreateShardKeyRequest.newBuilder()
        .setCollectionName(collectionName)
        .setRequest(CreateShardKey.newBuilder()
            .setShardKey(shardKey(today))
            .build())
        .build()
).get();

client.deleteShardKeyAsync(
    DeleteShardKeyRequest.newBuilder()
        .setCollectionName(collectionName)
        .setRequest(DeleteShardKey.newBuilder()
            .setShardKey(shardKey(oldestShardKey))
            .build())
        .build()
).get();
string today = "2026-04-08";
string oldestShardKey = DateOnly.ParseExact(today, "yyyy-MM-dd")
	.AddDays(-7)
	.ToString("yyyy-MM-dd");

await client.CreateShardKeyAsync(
	collectionName,
	new CreateShardKey { ShardKey = new ShardKey { Keyword = today } }
);
await client.DeleteShardKeyAsync(
	collectionName,
	new DeleteShardKey { ShardKey = new ShardKey { Keyword = oldestShardKey } }
);
today := "2026-04-08"
t, _ := time.Parse("2006-01-02", today)
oldestShardKey := t.AddDate(0, 0, -7).Format("2006-01-02")

client.CreateShardKey(context.Background(), collectionName, &qdrant.CreateShardKey{
	ShardKey: qdrant.NewShardKey(today),
})
client.DeleteShardKey(context.Background(), collectionName, &qdrant.DeleteShardKey{
	ShardKey: qdrant.NewShardKey(oldestShardKey),
})

摄入新数据

摄入新数据时,将 shard_key_selector 设置为当天的日期,以便数据进入正确的分片

client.upsert(
    collection_name=collection_name,
    points=[PointStruct(
        id=uuid.uuid4().hex,
        payload={"text": "The best way to start a Wednesday is with a cup of coffee", "datetime": "2026-04-08T07:57:47"},
        vector={
            "dense_vector": Document(text="The best way to start a Wednesday is with a cup of coffee", model=dense_model)
        })],
    shard_key_selector=today
)
await client.upsert(collectionName, {
    points: [
        {
            id: crypto.randomUUID(),
            vector: {
                dense_vector: {
                    text: "The best way to start a Wednesday is with a cup of coffee",
                    model: denseModel,
                },
            },
            payload: {
                text: "The best way to start a Wednesday is with a cup of coffee",
                datetime: "2026-04-08T07:57:47",
            },
        },
    ],
    shard_key: today,
});
client
    .upsert_points(
        UpsertPointsBuilder::new(
            collection_name,
            vec![PointStruct::new(
                uuid::Uuid::new_v4().to_string(),
                HashMap::from([(
                    "dense_vector".to_string(),
                    DocumentBuilder::new(
                        "The best way to start a Wednesday is with a cup of coffee",
                        dense_model,
                    )
                    .build(),
                )]),
                [
                    ("text", "The best way to start a Wednesday is with a cup of coffee".into()),
                    ("datetime", "2026-04-08T07:57:47".into()),
                ],
            )],
        )
        .shard_key_selector(today.to_string()),
    )
    .await?;
client.upsertAsync(
    UpsertPoints.newBuilder()
        .setCollectionName(collectionName)
        .addAllPoints(List.of(
            PointStruct.newBuilder()
                .setId(id(UUID.randomUUID()))
                .setVectors(namedVectors(Map.of(
                    "dense_vector",
                    vector(Document.newBuilder()
                        .setText("The best way to start a Wednesday is with a cup of coffee")
                        .setModel(denseModel)
                        .build()))))
                .putAllPayload(Map.of(
                    "text", value("The best way to start a Wednesday is with a cup of coffee"),
                    "datetime", value("2026-04-08T07:57:47")))
                .build()))
        .setShardKeySelector(shardKeySelector(today))
        .build()
).get();
await client.UpsertAsync(
	collectionName: collectionName,
	points: new List<PointStruct>
	{
		new()
		{
			Id = Guid.NewGuid(),
			Vectors = new Dictionary<string, Vector>
			{
				["dense_vector"] = new Document
				{
					Text = "The best way to start a Wednesday is with a cup of coffee",
					Model = denseModel
				}
			},
			Payload =
			{
				["text"] = "The best way to start a Wednesday is with a cup of coffee",
				["datetime"] = "2026-04-08T07:57:47"
			}
		}
	},
	shardKeySelector: new ShardKeySelector
	{
		ShardKeys = { new List<ShardKey> { today } }
	}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
	CollectionName: collectionName,
	Points: []*qdrant.PointStruct{
		{
			Id: qdrant.NewID(uuid.New().String()),
			Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{
				"dense_vector": qdrant.NewVectorDocument(&qdrant.Document{
					Text:  "The best way to start a Wednesday is with a cup of coffee",
					Model: denseModel,
				}),
			}),
			Payload: qdrant.NewValueMap(map[string]any{
				"text":     "The best way to start a Wednesday is with a cup of coffee",
				"datetime": "2026-04-08T07:57:47",
			}),
		},
	},
	ShardKeySelector: &qdrant.ShardKeySelector{
		ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(today)},
	},
})

结论

基于时间的分片是在 Qdrant 中管理大型时间序列数据集的强大技术。通过根据时间戳将数据路由到不同的分片,你可以高效地存储和查询近期数据,并在不影响性能的情况下轻松修剪旧数据。这种方法非常适合社交媒体分析等数据相关性随时间递减的用例。

此页面有用吗?

感谢您的反馈!🙏

很遗憾听到这个消息。😔 你可以在 GitHub 上编辑此页面,或者创建一个 GitHub issue。