Qdrant 中的基于时间的分片
在处理社交媒体或图像/视频流等海量、高速变化的数据集时,高效的存储和检索至关重要。通常情况下,只有最近的数据具有相关性,而旧数据可以归档或删除。例如,在社交媒体帖子的情感分析中,你可能只需要最近 7 天的数据来捕捉当前趋势,而大多数查询都集中在过去 24 小时内。
将所有数据存储在默认分片的 Qdrant 集合中,在删除旧点时可能会导致整个数据集进行昂贵的重新索引,从而影响性能。一个更好的解决方案是基于时间的分片(time-based sharding),即根据时间戳将数据点路由到特定的分片(或多个分片)。对于具有自然生存时间 (TTL) 分割的用例,按基于时间戳的键进行分片可以实现高效的近期数据查询,并允许用户无缝地丢弃旧数据。
例如,使用按天分片,今天的数据存储在今天的碎片中,昨天的数据存储在昨天的碎片中,依此类推。查询可以针对特定分片(例如今天)或多个分片以覆盖日期范围。

根据数据量和保留需求,你可以按小时、周、月或任何适合你用例的时间间隔进行分片。
本教程将指导你如何实现基于时间的分片,内容涵盖
- 创建具有用户自定义分片的 Qdrant 集合
- 根据时间戳将历史数据批量摄入到正确的分片中
- 将新数据分配到最新的分片
- 查询一个或多个分片
- 修剪较旧的分片
安装并初始化 Qdrant 客户端
首先,安装 Qdrant 客户端
!pip install qdrant-client
接下来,初始化客户端
from qdrant_client import QdrantClient, models
client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
cloud_inference=True
)
const client = new QdrantClient({
url: QDRANT_URL,
apiKey: QDRANT_API_KEY,
});
let client = Qdrant::from_url(QDRANT_URL)
.api_key(QDRANT_API_KEY)
.build()?;
QdrantClient client =
new QdrantClient(
QdrantGrpcClient.newBuilder(QDRANT_URL, 6334, true)
.withApiKey(QDRANT_API_KEY)
.build());
var client = new QdrantClient(
host: QDRANT_URL,
https: true,
apiKey: QDRANT_API_KEY
);
client, err := qdrant.NewClient(&qdrant.Config{
Host: QDRANT_URL,
APIKey: QDRANT_API_KEY,
UseTLS: true,
})
本教程假设你正在使用 Qdrant Cloud Inference 来生成向量嵌入。如果你管理自己的嵌入基础设施,你可以应用同样的原则,但需要调整代码示例以使用你的嵌入服务。
创建集合
通过将分片方法设置为 custom,创建一个具有用户自定义分片的集合。
from qdrant_client import models
collection_name = "my_collection"
if client.collection_exists(collection_name=collection_name):
client.delete_collection(collection_name=collection_name)
client.create_collection(
collection_name=collection_name,
vectors_config={
"dense_vector": models.VectorParams(
size=384, distance=models.Distance.COSINE
)
},
sharding_method=models.ShardingMethod.CUSTOM
)
const collectionName = "my_collection";
if (await client.collectionExists(collectionName)) {
await client.deleteCollection(collectionName);
}
await client.createCollection(collectionName, {
vectors: {
dense_vector: {
size: 384,
distance: "Cosine",
},
},
sharding_method: "custom",
});
let collection_name = "my_collection";
if client.collection_exists(collection_name).await? {
client.delete_collection(collection_name).await?;
}
let mut vectors_config = VectorsConfigBuilder::default();
vectors_config.add_named_vector_params(
"dense_vector",
VectorParamsBuilder::new(384, Distance::Cosine),
);
client
.create_collection(
CreateCollectionBuilder::new(collection_name)
.vectors_config(vectors_config)
.sharding_method(ShardingMethod::Custom.into()),
)
.await?;
String collectionName = "my_collection";
if (client.collectionExistsAsync(collectionName).get()) {
client.deleteCollectionAsync(collectionName).get();
}
client.createCollectionAsync(
CreateCollection.newBuilder()
.setCollectionName(collectionName)
.setVectorsConfig(VectorsConfig.newBuilder().setParamsMap(
VectorParamsMap.newBuilder().putAllMap(Map.of(
"dense_vector",
VectorParams.newBuilder()
.setSize(384)
.setDistance(Distance.Cosine)
.build()))))
.setShardingMethod(ShardingMethod.Custom)
.build()
).get();
string collectionName = "my_collection";
if (await client.CollectionExistsAsync(collectionName))
await client.DeleteCollectionAsync(collectionName);
await client.CreateCollectionAsync(
collectionName: collectionName,
vectorsConfig: new VectorParamsMap
{
Map = {
["dense_vector"] = new VectorParams { Size = 384, Distance = Distance.Cosine }
}
},
shardingMethod: ShardingMethod.Custom
);
collectionName := "my_collection"
exists, err := client.CollectionExists(context.Background(), collectionName)
if exists {
client.DeleteCollection(context.Background(), collectionName)
}
client.CreateCollection(context.Background(), &qdrant.CreateCollection{
CollectionName: collectionName,
VectorsConfig: qdrant.NewVectorsConfigMap(
map[string]*qdrant.VectorParams{
"dense_vector": {
Size: 384,
Distance: qdrant.Distance_Cosine,
},
},
),
ShardingMethod: qdrant.ShardingMethod_Custom.Enum(),
})
可以通过分片键访问自定义分片。在本教程中,分片键是从每个数据点的时间戳中提取的 YYYY-MM-DD 格式的日期。
该集合将为每个分片键提供一个单独的分片(即每天一个分片)。对于超大数据集,你可以通过配置集合的 shard_number 来提高写入吞吐量。shard_number 默认为 1。将其设置为更高的值可以在每个分片键下创建多个分片,从而将写入负载分散到集群中的多个节点上。但是,请避免创建过多的分片,因为每个分片都会消耗资源并增加开销,这可能导致性能下降。请测试你的数据集和集群配置的最佳分片数量。
关于使用用户自定义分片的集合与使用自动分片的常规集合,有两点需要注意
- 对于使用自动分片的常规集合,
shard_number决定了集合的总分片数。然而,对于用户自定义分片,shard_number决定了每个分片键的分片数量:集合的总分片数等于分片键(天数)的数量乘以shard_number。 - 你可以应用于常规集合的集合级配置更改(例如 HNSW 参数)同样可以应用于具有用户自定义分片的集合。这些更改会追溯应用到现有分片,并应用到将来创建的新分片中。
摄入历史数据
时间序列数据通常以流的形式到达,并不断添加新的数据点。每个数据点可能包含一个表示其创建时间的时间戳。你可以使用此时间戳来确定数据点所属的分片。如果你的数据没有时间戳,可以使用当前时间。
本教程使用了包含时间戳的社交媒体帖子示例数据集。假设今天是 2026 年 4 月 7 日。你将从摄入本周(4 月 1 日至 7 日)的一些历史数据开始。以下是数据集中几行的示例
| datetime | text |
|---|---|
| 2026-04-06T09:04:28 | 照进办公室的四月阳光让一切都变得美好。 |
| 2026-04-06T09:04:32 | 晨间伸展,好咖啡,清晰的意图。周一:搞定。 |
| 2026-04-06T09:05:52 | 感谢这一周的高效开端。 |
上传数据集并将每一天的数据存储在各自的分片中
from qdrant_client.http.models import PointStruct, Document
import uuid
csv_url = 'https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv'
# Retrieve a list of existing shard keys in the collection
existing_shard_keys = list(client.list_shard_keys(collection_name=collection_name).shard_keys)
dense_model = "sentence-transformers/all-MiniLM-L6-v2"
batch_size = 100
current_date = None
buffer: list[PointStruct] = []
for row in parse_csv(csv_url):
shard_date = row['datetime'][:10] # Extract YYYY-MM-DD
if shard_date != current_date:
# Flush buffer for the previous date before switching
if buffer:
client.upload_points(
collection_name=collection_name,
points=buffer,
shard_key_selector=current_date,
)
buffer = []
# Create shard for the new date if it doesn't exist yet
if shard_date not in existing_shard_keys:
client.create_shard_key(collection_name, shard_date)
existing_shard_keys.append(shard_date)
current_date = shard_date
# Add point to buffer
buffer.append(PointStruct(
id=uuid.uuid4().hex,
payload={"text": row['text'], "datetime": row['datetime']},
vector={"dense_vector": Document(text=row["text"], model=dense_model)}
))
# Flush batch if buffer size exceeds batch size
if len(buffer) >= batch_size:
client.upload_points(
collection_name=collection_name,
points=buffer,
shard_key_selector=current_date,
)
buffer = []
# Flush remaining partial batch
if buffer:
client.upload_points(
collection_name=collection_name,
points=buffer,
shard_key_selector=current_date,
)
const csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";
// Retrieve a list of existing shard keys in the collection
const shardKeysResult = await client.listShardKeys(collectionName);
const existingShardKeys = new Set((shardKeysResult.shard_keys ?? []).map((d) => String(d.key)));
const denseModel = "sentence-transformers/all-MiniLM-L6-v2";
const batchSize = 100;
let currentDate = "";
let buffer: Extract<Parameters<typeof client.upsert>[1], { points: unknown }>['points'] = [];
for await (const { text: postText, datetime } of parseCSV(csvUrl)) {
const shardDate = datetime.slice(0, 10); // Extract YYYY-MM-DD
if (shardDate !== currentDate) {
// Flush buffer for the previous date before switching
if (buffer.length > 0) {
await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
buffer = [];
}
// Create shard for the new date if it doesn't exist yet
if (!existingShardKeys.has(shardDate)) {
await client.createShardKey(collectionName, { shard_key: shardDate });
existingShardKeys.add(shardDate);
}
currentDate = shardDate;
}
// Add point to buffer
buffer.push({
id: crypto.randomUUID(),
vector: { dense_vector: { text: postText, model: denseModel } },
payload: { text: postText, datetime },
});
// Flush batch if buffer size exceeds batch size
if (buffer.length >= batchSize) {
await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
buffer = [];
}
}
// Flush remaining partial batch
if (buffer.length > 0) {
await client.upsert(collectionName, { points: buffer, shard_key: currentDate });
}
let csv_url = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";
// Retrieve a list of existing shard keys in the collection
let response = client.list_shard_keys(collection_name).await?;
let mut existing_shard_keys: HashSet<String> = response
.shard_keys
.into_iter()
.filter_map(|d| {
d.key?.key.and_then(|k| match k {
shard_key::Key::Keyword(s) => Some(s),
_ => None,
})
})
.collect();
let dense_model = "sentence-transformers/all-MiniLM-L6-v2";
let batch_size = 100;
let mut current_date = String::new();
let mut buffer: Vec<PointStruct> = Vec::new();
for row in parse_csv(csv_url)? {
let row = row?;
let text = row.text;
let datetime = row.datetime;
let shard_date = datetime[..10].to_string(); // Extract YYYY-MM-DD
if shard_date != current_date {
// Flush buffer for the previous date before switching
if !buffer.is_empty() {
client
.upsert_points(
UpsertPointsBuilder::new(
collection_name,
std::mem::take(&mut buffer),
)
.shard_key_selector(current_date.clone()),
)
.await?;
}
// Create shard for the new date if it doesn't exist yet
if !existing_shard_keys.contains(&shard_date) {
client
.create_shard_key(
CreateShardKeyRequestBuilder::new(collection_name).request(
CreateShardKeyBuilder::default().shard_key(shard_date.clone()),
),
)
.await?;
existing_shard_keys.insert(shard_date.clone());
}
current_date = shard_date;
}
// Add point to buffer
buffer.push(PointStruct::new(
uuid::Uuid::new_v4().to_string(),
HashMap::from([(
"dense_vector".to_string(),
DocumentBuilder::new(&text, dense_model).build(),
)]),
[("text", text.into()), ("datetime", datetime.into())],
));
// Flush batch if buffer size exceeds batch size
if buffer.len() >= batch_size {
client
.upsert_points(
UpsertPointsBuilder::new(collection_name, std::mem::take(&mut buffer))
.shard_key_selector(current_date.clone()),
)
.await?;
}
}
// Flush remaining partial batch
if !buffer.is_empty() {
client
.upsert_points(
UpsertPointsBuilder::new(collection_name, buffer)
.shard_key_selector(current_date.clone()),
)
.await?;
}
String csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";
// Retrieve a list of existing shard keys in the collection
var shardKeyDescriptions = client.listShardKeysAsync(collectionName).get();
Set<String> existingShardKeys = new HashSet<>();
for (var desc : shardKeyDescriptions) {
existingShardKeys.add(desc.getKey().getKeyword());
}
String denseModel = "sentence-transformers/all-MiniLM-L6-v2";
int batchSize = 100;
String currentDate = null;
List<PointStruct> buffer = new ArrayList<>();
try (var stream = parseCSV(csvUrl)) {
for (var row : (Iterable<CsvRow>) stream::iterator) {
String text = row.text;
String datetime = row.datetime;
String shardDate = datetime.substring(0, 10); // Extract YYYY-MM-DD
if (!shardDate.equals(currentDate)) {
// Flush buffer for the previous date before switching
if (!buffer.isEmpty()) {
client.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName(collectionName)
.addAllPoints(buffer)
.setShardKeySelector(shardKeySelector(currentDate))
.build()
).get();
buffer.clear();
}
// Create shard for the new date if it doesn't exist yet
if (!existingShardKeys.contains(shardDate)) {
client.createShardKeyAsync(
CreateShardKeyRequest.newBuilder()
.setCollectionName(collectionName)
.setRequest(CreateShardKey.newBuilder()
.setShardKey(shardKey(shardDate))
.build())
.build()
).get();
existingShardKeys.add(shardDate);
}
currentDate = shardDate;
}
// Add point to buffer
buffer.add(
PointStruct.newBuilder()
.setId(id(UUID.randomUUID()))
.setVectors(namedVectors(Map.of(
"dense_vector",
vector(Document.newBuilder()
.setText(text)
.setModel(denseModel)
.build()))))
.putAllPayload(Map.of("text", value(text), "datetime", value(datetime)))
.build());
// Flush batch if buffer size exceeds batch size
if (buffer.size() >= batchSize) {
client.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName(collectionName)
.addAllPoints(buffer)
.setShardKeySelector(shardKeySelector(currentDate))
.build()
).get();
buffer.clear();
}
}
}
// Flush remaining partial batch
if (!buffer.isEmpty()) {
client.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName(collectionName)
.addAllPoints(buffer)
.setShardKeySelector(shardKeySelector(currentDate))
.build()
).get();
}
string csvUrl = "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv";
// Retrieve a list of existing shard keys in the collection
var existingShardKeys = (await client.ListShardKeysAsync(collectionName))
.Select(sk => sk.Key.Keyword)
.ToHashSet();
string denseModel = "sentence-transformers/all-MiniLM-L6-v2";
int batchSize = 100;
string? currentDate = null;
var buffer = new List<PointStruct>();
await foreach (var (text, datetime) in ParseCsv(csvUrl))
{
string shardDate = datetime[..10]; // Extract YYYY-MM-DD
if (shardDate != currentDate)
{
// Flush buffer for the previous date before switching
if (buffer.Count > 0)
{
await client.UpsertAsync(
collectionName: collectionName,
points: buffer,
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { currentDate! } }
}
);
buffer.Clear();
}
// Create shard for the new date if it doesn't exist yet
if (!existingShardKeys.Contains(shardDate))
{
await client.CreateShardKeyAsync(
collectionName,
new CreateShardKey { ShardKey = new ShardKey { Keyword = shardDate } }
);
existingShardKeys.Add(shardDate);
}
currentDate = shardDate;
}
// Add point to buffer
buffer.Add(new PointStruct
{
Id = Guid.NewGuid(),
Vectors = new Dictionary<string, Vector>
{
["dense_vector"] = new Document { Text = text, Model = denseModel }
},
Payload = { ["text"] = text, ["datetime"] = datetime }
});
// Flush batch if buffer size exceeds batch size
if (buffer.Count >= batchSize)
{
await client.UpsertAsync(
collectionName: collectionName,
points: buffer,
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { currentDate! } }
}
);
buffer.Clear();
}
}
// Flush remaining partial batch
if (buffer.Count > 0)
{
await client.UpsertAsync(
collectionName: collectionName,
points: buffer,
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { currentDate! } }
}
);
}
csvUrl := "https://raw.githubusercontent.com/qdrant/examples/refs/heads/master/time-based-sharding/social-media-posts.csv"
shardKeyDescriptions, err := client.ListShardKeys(context.Background(), collectionName)
// Retrieve a list of existing shard keys in the collection
existingShardKeys := make(map[string]bool)
for _, desc := range shardKeyDescriptions {
existingShardKeys[desc.Key.GetKeyword()] = true
}
denseModel := "sentence-transformers/all-MiniLM-L6-v2"
batchSize := 100
var currentDate string
var buffer []*qdrant.PointStruct
err = parseCSV(csvUrl, func(row CSVRow) {
text := row.Text
datetime := row.Datetime
shardDate := datetime[:10] // Extract YYYY-MM-DD
if shardDate != currentDate {
// Flush buffer for the previous date before switching
if len(buffer) > 0 {
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: collectionName,
Points: buffer,
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
},
})
buffer = nil
}
// Create shard for the new date if it doesn't exist yet
if !existingShardKeys[shardDate] {
client.CreateShardKey(context.Background(), collectionName, &qdrant.CreateShardKey{
ShardKey: qdrant.NewShardKey(shardDate),
})
existingShardKeys[shardDate] = true
}
currentDate = shardDate
}
// Add point to buffer
buffer = append(buffer, &qdrant.PointStruct{
Id: qdrant.NewID(uuid.New().String()),
Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{
"dense_vector": qdrant.NewVectorDocument(&qdrant.Document{
Text: text,
Model: denseModel,
}),
}),
Payload: qdrant.NewValueMap(map[string]any{
"text": text,
"datetime": datetime,
}),
})
// Flush batch if buffer size exceeds batch size
if len(buffer) >= batchSize {
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: collectionName,
Points: buffer,
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
},
})
buffer = nil
}
})
// Flush remaining partial batch
if len(buffer) > 0 {
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: collectionName,
Points: buffer,
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(currentDate)},
},
})
}
让我们拆解一下代码
- 首先,检索集合中现有的分片键列表。由于你刚创建集合,列表应该为空,但在生产环境中,这能确保你考虑到任何现有数据。
- 接下来,使用辅助函数从 URL 流式传输并解析 CSV 文件。
详细信息
import csv import urllib.request def parse_csv(url): with urllib.request.urlopen(url) as response: reader = csv.DictReader(line.decode('utf-8') for line in response) yield from readerfunction parseCsvLine(line: string): string[] { const fields: string[] = []; let i = 0; while (i < line.length) { if (line[i] === '"') { i++; let field = ""; while (i < line.length) { if (line[i] === '"' && line[i + 1] === '"') { field += '"'; i += 2; } else if (line[i] === '"') { i++; break; } else { field += line[i++]; } } fields.push(field); if (line[i] === ",") i++; } else { const start = i; while (i < line.length && line[i] !== ",") i++; fields.push(line.slice(start, i)); if (i < line.length) i++; } } return fields; } async function* parseCSV(url: string): AsyncGenerator<{ text: string; datetime: string }> { const response = await fetch(url); const reader = response.body!.getReader(); const decoder = new TextDecoder(); let remainder = ""; let headers: string[] | null = null; let textIdx = -1; let datetimeIdx = -1; while (true) { const { done, value } = await reader.read(); const chunk = done ? "" : decoder.decode(value, { stream: true }); const lines = (remainder + chunk).split("\n"); remainder = done ? "" : lines.pop()!; for (const line of lines) { if (!line.trim()) continue; if (headers === null) { headers = line.split(","); textIdx = headers.indexOf("text"); datetimeIdx = headers.indexOf("datetime"); continue; } const fields = parseCsvLine(line); yield { text: fields[textIdx], datetime: fields[datetimeIdx] }; } if (done) break; } }struct CsvRow { text: String, datetime: String, } fn parse_csv(url: &str) -> anyhow::Result<impl Iterator<Item = anyhow::Result<CsvRow>>> { let reader = ureq::get(url).call()?.into_body().into_reader(); let mut rdr = csv::Reader::from_reader(reader); let headers = rdr.headers()?.clone(); let text_idx = headers.iter().position(|h| h == "text").unwrap(); let datetime_idx = headers.iter().position(|h| h == "datetime").unwrap(); let iter = rdr.into_records().map(move |result| { let record = result?; Ok(CsvRow { text: record[text_idx].to_string(), datetime: record[datetime_idx].to_string(), }) }); Ok(iter) }static class CsvRow { final String text; final String datetime; CsvRow(String text, String datetime) { this.text = text; this.datetime = datetime; } } static Stream<CsvRow> parseCSV(String url) throws Exception { Function<String, List<String>> parseCsvLine = line -> { List<String> fields = new ArrayList<>(); boolean inQuotes = false; var sb = new StringBuilder(); for (char c : line.toCharArray()) { if (c == '"') { inQuotes = !inQuotes; } else if (c == ',' && !inQuotes) { fields.add(sb.toString()); sb.setLength(0); } else { sb.append(c); } } fields.add(sb.toString()); return fields; }; var reader = new BufferedReader(new InputStreamReader(new URL(url).openStream())); String headerLine = reader.readLine(); List<String> headers = List.of(headerLine.split(",")); int textIdx = headers.indexOf("text"); int datetimeIdx = headers.indexOf("datetime"); return reader.lines() .map(line -> { List<String> fields = parseCsvLine.apply(line); return new CsvRow(fields.get(textIdx), fields.get(datetimeIdx)); }) .onClose(() -> { try { reader.close(); } catch (Exception ignored) {} }); }async IAsyncEnumerable<(string text, string datetime)> ParseCsv(string url) { using var httpClient = new HttpClient(); using var stream = await httpClient.GetStreamAsync(url); using var parser = new TextFieldParser(new StreamReader(stream)); parser.TextFieldType = Microsoft.VisualBasic.FileIO.FieldType.Delimited; parser.SetDelimiters(","); string[]? headers = parser.ReadFields(); int textIdx = Array.IndexOf(headers!, "text"); int datetimeIdx = Array.IndexOf(headers!, "datetime"); while (!parser.EndOfData) { var fields = parser.ReadFields()!; yield return (fields[textIdx], fields[datetimeIdx]); } }type CSVRow struct { Text string Datetime string } func parseCSV(url string, fn func(CSVRow)) error { resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() csvReader := csv.NewReader(resp.Body) headers, err := csvReader.Read() if err != nil { return err } textIdx, datetimeIdx := -1, -1 for i, h := range headers { switch h { case "text": textIdx = i case "datetime": datetimeIdx = i } } for { row, err := csvReader.Read() if err == io.EOF { break } if err != nil { return err } fn(CSVRow{Text: row[textIdx], Datetime: row[datetimeIdx]}) } return nil } - CSV 文件被逐行流式处理,并以 100 个点为一批进行缓冲,以实现高效上传。最佳批次大小取决于你的数据和集群,因此你可能需要尝试不同的大小以获得最佳性能。
- 日期 (
YYYY-MM-DD) 从每一行的 datetime 字段中提取。此日期被用作分片键,将数据路由到正确的分片。 - 如果遇到新的日期且该分片尚不存在,则会创建一个新的分片。
- 只要流中途日期发生变化,缓冲区就会刷新到前一个日期的分片中,确保帖子不会被写入错误的分片。
- 数据写入时,每个点都会获得
- 一个随机 UUID 作为点 ID
- 帖子
text和datetime作为 Payload - 使用
sentence-transformers/all-MiniLM-L6-v2从帖子文本生成的稠密向量嵌入
- 每 100 个点的完整批次都会被上传到 Qdrant,通过分片键选择器参数定位到正确的日期分片。
- 循环结束后,任何剩余的点将作为最后一个部分批次进行上传。
查询数据
查询今天的数据
现在你可以对帖子执行语义查询。将分片键选择器设置为 2026-04-07(假设今天是 2026 年 4 月 7 日),将查询限制在存储今天数据的分片中
query_text = "coffee"
resp = client.query_points(
collection_name=collection_name,
query=Document(text=query_text, model=dense_model),
using="dense_vector",
limit=5,
shard_key_selector="2026-04-07"
)
print(resp)
const queryText = "coffee";
const singleShardResult = await client.query(collectionName, {
query: { text: queryText, model: denseModel },
using: "dense_vector",
limit: 5,
shard_key: "2026-04-07",
});
for (const hit of singleShardResult.points) {
console.log(hit);
}
let query_text = "coffee";
let result = client
.query(
QueryPointsBuilder::new(collection_name)
.query(Query::new_nearest(Document::new(query_text, dense_model)))
.using("dense_vector")
.limit(5)
.shard_key_selector("2026-04-07".to_string()),
)
.await?;
for hit in result.result {
println!("{:?}", hit);
}
String queryText = "coffee";
var result = client.queryAsync(
QueryPoints.newBuilder()
.setCollectionName(collectionName)
.setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
.setUsing("dense_vector")
.setLimit(5)
.setShardKeySelector(shardKeySelector("2026-04-07"))
.build()
).get();
for (var hit : result) {
System.out.println(hit);
}
string queryText = "coffee";
var result = await client.QueryAsync(
collectionName: collectionName,
query: new Document { Text = queryText, Model = denseModel },
usingVector: "dense_vector",
limit: 5,
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { "2026-04-07" } }
}
);
foreach (var hit in result)
Console.WriteLine(hit);
queryText := "coffee"
result, err := client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: collectionName,
Query: qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
Using: qdrant.PtrOf("dense_vector"),
Limit: qdrant.PtrOf(uint64(5)),
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey("2026-04-07")},
},
})
for _, hit := range result {
fmt.Println(hit)
}
查询多天的数据
要查询多个分片,请将分片键选择器设置为分片键列表。例如,查询过去 2 天的数据(4 月 6 日至 7 日)
resp = client.query_points(
collection_name=collection_name,
query=Document(text=query_text, model=dense_model),
using="dense_vector",
limit=5,
shard_key_selector=["2026-04-06","2026-04-07"]
)
print(resp)
const multiShardResult = await client.query(collectionName, {
query: { text: queryText, model: denseModel },
using: "dense_vector",
limit: 5,
shard_key: ["2026-04-06", "2026-04-07"],
});
for (const hit of multiShardResult.points) {
console.log(hit);
}
let result = client
.query(
QueryPointsBuilder::new(collection_name)
.query(Query::new_nearest(Document::new(query_text, dense_model)))
.using("dense_vector")
.limit(5)
.shard_key_selector(ShardKeySelector {
shard_keys: vec![
"2026-04-06".to_string().into(),
"2026-04-07".to_string().into(),
],
fallback: None,
}),
)
.await?;
for hit in result.result {
println!("{:?}", hit);
}
result = client.queryAsync(
QueryPoints.newBuilder()
.setCollectionName(collectionName)
.setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
.setUsing("dense_vector")
.setLimit(5)
.setShardKeySelector(ShardKeySelector.newBuilder()
.addShardKeys(shardKey("2026-04-06"))
.addShardKeys(shardKey("2026-04-07"))
.build())
.build()
).get();
for (var hit : result) {
System.out.println(hit);
}
result = await client.QueryAsync(
collectionName: collectionName,
query: new Document { Text = queryText, Model = denseModel },
usingVector: "dense_vector",
limit: 5,
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { "2026-04-06", "2026-04-07" } }
}
);
foreach (var hit in result)
Console.WriteLine(hit);
result, err = client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: collectionName,
Query: qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
Using: qdrant.PtrOf("dense_vector"),
Limit: qdrant.PtrOf(uint64(5)),
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{
qdrant.NewShardKey("2026-04-06"),
qdrant.NewShardKey("2026-04-07"),
},
},
})
for _, hit := range result {
fmt.Println(hit)
}
查询完整数据集
要查询整个数据集(所有分片),请省略分片键选择器参数
resp = client.query_points(
collection_name=collection_name,
query=Document(text=query_text, model=dense_model),
using="dense_vector",
limit=5,
)
print(resp)
const allShardsResult = await client.query(collectionName, {
query: { text: queryText, model: denseModel },
using: "dense_vector",
limit: 5,
});
for (const hit of allShardsResult.points) {
console.log(hit);
}
let result = client
.query(
QueryPointsBuilder::new(collection_name)
.query(Query::new_nearest(Document::new(query_text, dense_model)))
.using("dense_vector")
.limit(5),
)
.await?;
for hit in result.result {
println!("{:?}", hit);
}
result = client.queryAsync(
QueryPoints.newBuilder()
.setCollectionName(collectionName)
.setQuery(nearest(Document.newBuilder().setText(queryText).setModel(denseModel).build()))
.setUsing("dense_vector")
.setLimit(5)
.build()
).get();
for (var hit : result) {
System.out.println(hit);
}
result = await client.QueryAsync(
collectionName: collectionName,
query: new Document { Text = queryText, Model = denseModel },
usingVector: "dense_vector",
limit: 5
);
foreach (var hit in result)
Console.WriteLine(hit);
result, err = client.Query(context.Background(), &qdrant.QueryPoints{
CollectionName: collectionName,
Query: qdrant.NewQueryDocument(&qdrant.Document{Text: queryText, Model: denseModel}),
Using: qdrant.PtrOf("dense_vector"),
Limit: qdrant.PtrOf(uint64(5)),
})
for _, hit := range result {
fmt.Println(hit)
}
修剪分片
每天午夜,为当天要摄入的新数据创建一个新分片。如果你只查询最近 7 天的数据,你也可以删除最旧的分片。你可以通过 cron 任务自动化此过程。
from datetime import date, timedelta
today = "2026-04-08"
oldest_shard_key = (date.fromisoformat(today) - timedelta(days=7)).isoformat()
client.create_shard_key(collection_name, today)
client.delete_shard_key(collection_name, oldest_shard_key)
const today = "2026-04-08";
const oldestDate = new Date(today);
oldestDate.setDate(oldestDate.getDate() - 7);
const oldestShardKey = oldestDate.toISOString().slice(0, 10);
await client.createShardKey(collectionName, { shard_key: today });
await client.deleteShardKey(collectionName, { shard_key: oldestShardKey });
let today = "2026-04-08";
let oldest_shard_key = (NaiveDate::parse_from_str(today, "%Y-%m-%d")?
- chrono::Duration::days(7))
.to_string();
client
.create_shard_key(
CreateShardKeyRequestBuilder::new(collection_name)
.request(CreateShardKeyBuilder::default().shard_key(today.to_string())),
)
.await?;
client
.delete_shard_key(
DeleteShardKeyRequestBuilder::new(collection_name)
.key(shard_key::Key::Keyword(oldest_shard_key)),
)
.await?;
String today = "2026-04-08";
String oldestShardKey = LocalDate.parse(today).minusDays(7).toString();
client.createShardKeyAsync(
CreateShardKeyRequest.newBuilder()
.setCollectionName(collectionName)
.setRequest(CreateShardKey.newBuilder()
.setShardKey(shardKey(today))
.build())
.build()
).get();
client.deleteShardKeyAsync(
DeleteShardKeyRequest.newBuilder()
.setCollectionName(collectionName)
.setRequest(DeleteShardKey.newBuilder()
.setShardKey(shardKey(oldestShardKey))
.build())
.build()
).get();
string today = "2026-04-08";
string oldestShardKey = DateOnly.ParseExact(today, "yyyy-MM-dd")
.AddDays(-7)
.ToString("yyyy-MM-dd");
await client.CreateShardKeyAsync(
collectionName,
new CreateShardKey { ShardKey = new ShardKey { Keyword = today } }
);
await client.DeleteShardKeyAsync(
collectionName,
new DeleteShardKey { ShardKey = new ShardKey { Keyword = oldestShardKey } }
);
today := "2026-04-08"
t, _ := time.Parse("2006-01-02", today)
oldestShardKey := t.AddDate(0, 0, -7).Format("2006-01-02")
client.CreateShardKey(context.Background(), collectionName, &qdrant.CreateShardKey{
ShardKey: qdrant.NewShardKey(today),
})
client.DeleteShardKey(context.Background(), collectionName, &qdrant.DeleteShardKey{
ShardKey: qdrant.NewShardKey(oldestShardKey),
})
摄入新数据
摄入新数据时,将 shard_key_selector 设置为当天的日期,以便数据进入正确的分片
client.upsert(
collection_name=collection_name,
points=[PointStruct(
id=uuid.uuid4().hex,
payload={"text": "The best way to start a Wednesday is with a cup of coffee", "datetime": "2026-04-08T07:57:47"},
vector={
"dense_vector": Document(text="The best way to start a Wednesday is with a cup of coffee", model=dense_model)
})],
shard_key_selector=today
)
await client.upsert(collectionName, {
points: [
{
id: crypto.randomUUID(),
vector: {
dense_vector: {
text: "The best way to start a Wednesday is with a cup of coffee",
model: denseModel,
},
},
payload: {
text: "The best way to start a Wednesday is with a cup of coffee",
datetime: "2026-04-08T07:57:47",
},
},
],
shard_key: today,
});
client
.upsert_points(
UpsertPointsBuilder::new(
collection_name,
vec![PointStruct::new(
uuid::Uuid::new_v4().to_string(),
HashMap::from([(
"dense_vector".to_string(),
DocumentBuilder::new(
"The best way to start a Wednesday is with a cup of coffee",
dense_model,
)
.build(),
)]),
[
("text", "The best way to start a Wednesday is with a cup of coffee".into()),
("datetime", "2026-04-08T07:57:47".into()),
],
)],
)
.shard_key_selector(today.to_string()),
)
.await?;
client.upsertAsync(
UpsertPoints.newBuilder()
.setCollectionName(collectionName)
.addAllPoints(List.of(
PointStruct.newBuilder()
.setId(id(UUID.randomUUID()))
.setVectors(namedVectors(Map.of(
"dense_vector",
vector(Document.newBuilder()
.setText("The best way to start a Wednesday is with a cup of coffee")
.setModel(denseModel)
.build()))))
.putAllPayload(Map.of(
"text", value("The best way to start a Wednesday is with a cup of coffee"),
"datetime", value("2026-04-08T07:57:47")))
.build()))
.setShardKeySelector(shardKeySelector(today))
.build()
).get();
await client.UpsertAsync(
collectionName: collectionName,
points: new List<PointStruct>
{
new()
{
Id = Guid.NewGuid(),
Vectors = new Dictionary<string, Vector>
{
["dense_vector"] = new Document
{
Text = "The best way to start a Wednesday is with a cup of coffee",
Model = denseModel
}
},
Payload =
{
["text"] = "The best way to start a Wednesday is with a cup of coffee",
["datetime"] = "2026-04-08T07:57:47"
}
}
},
shardKeySelector: new ShardKeySelector
{
ShardKeys = { new List<ShardKey> { today } }
}
);
client.Upsert(context.Background(), &qdrant.UpsertPoints{
CollectionName: collectionName,
Points: []*qdrant.PointStruct{
{
Id: qdrant.NewID(uuid.New().String()),
Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{
"dense_vector": qdrant.NewVectorDocument(&qdrant.Document{
Text: "The best way to start a Wednesday is with a cup of coffee",
Model: denseModel,
}),
}),
Payload: qdrant.NewValueMap(map[string]any{
"text": "The best way to start a Wednesday is with a cup of coffee",
"datetime": "2026-04-08T07:57:47",
}),
},
},
ShardKeySelector: &qdrant.ShardKeySelector{
ShardKeys: []*qdrant.ShardKey{qdrant.NewShardKey(today)},
},
})
结论
基于时间的分片是在 Qdrant 中管理大型时间序列数据集的强大技术。通过根据时间戳将数据路由到不同的分片,你可以高效地存储和查询近期数据,并在不影响性能的情况下轻松修剪旧数据。这种方法非常适合社交媒体分析等数据相关性随时间递减的用例。