优化 FastEmbed 吞吐量
默认情况下,FastEmbed 在主处理线程中按顺序处理文档。为了优化吞吐量,FastEmbed 支持并行处理文档。
当启用并行处理时,FastEmbed 会将数据集拆分给多个工作进程(worker),每个工作进程运行嵌入模型的独立副本。在内部,文档被拆分成批次并放入共享输入队列中。每个批次由其中一个工作进程处理,放入共享输出队列,然后进行收集并重新排序,以匹配原始输入的顺序。
要配置 FastEmbed 进行并行处理,请使用以下参数:
parallel:工作进程的数量。- 当设置为
None(默认值)时,嵌入模型在主进程中运行。 - 当设置为
0时,FastEmbed 会检测可用的 CPU 核心数,并使用相应数量的工作进程进行并行化。 - 当设置为
1或更大值时,FastEmbed 将使用指定数量的工作进程。
- 当设置为
批次大小(Batch size):每个工作进程在每个批次中处理的文档数量。调整此参数以平衡内存使用和处理速度。如果您在本地推理时内存不足,请调低该值。如果您有充足的内存且正在处理大型文档集,请调高该值以提高吞吐量。
要配置批次大小,请使用:
batch_size:用于批处理的独立 FastEmbed 参数。文本默认为256,图像默认为16。local_inference_batch_size:用于批处理的 Qdrant 客户端参数。默认为8。
lazy_load:设置为True可避免在需要推理之前加载嵌入模型。在使用多个工作进程时,启用懒加载(lazy loading)可以防止在主进程中加载模型,从而节省内存并缩短启动时间。
使用 Qdrant 客户端并行化 FastEmbed
在将 FastEmbed 与 Qdrant 客户端结合使用时,请在初始化客户端时指定 local_inference_batch_size 参数来配置批次大小。例如:
client = QdrantClient(
url=QDRANT_URL,
api_key=QDRANT_API_KEY,
local_inference_batch_size=256, # FastEmbed batch size
)
接下来,在创建点(points)时,在推理对象中将 lazy_load 设置为 True,以避免在主进程中加载嵌入模型:
point = models.PointStruct(
id=1,
vector=models.Document(
text="The text to embed",
model="BAAI/bge-small-en-v1.5",
options={
"lazy_load": True,
},
)
)
在使用 fastembed-gpu 时,还需要将 cuda 设置为 True 以启用 GPU 加速:
point = models.PointStruct(
id=1,
vector=models.Document(
text="The text to embed",
model="BAAI/bge-small-en-v1.5",
options={
"lazy_load": True,
"cuda": True,
},
)
)
最后,在上传点时,将 parallel 参数设置为所需的工作进程数量:
client.upload_points(
collection_name=COLLECTION_NAME,
points=points,
parallel=4 # use 4 workers to process documents in parallel
)
并行化独立版 FastEmbed
当将 FastEmbed 作为独立库使用时,首先启用嵌入模型的懒加载:
model = TextEmbedding(
model_name="BAAI/bge-small-en-v1.5",
lazy_load=True, # don't load the model until first embed call
)
FastEmbed 支持将工作负载分配到多个 GPU 设备上。要启用此功能:
- 安装
fastembed-gpu。 - 将
cuda设置为True以启用 GPU 加速。 - 使用 GPU 设备 ID 列表配置
device_ids,以便为工作进程指定设备。例如,device_ids=[0, 1]会将工作进程分配给 GPU 0 和 1。如果未指定,FastEmbed 会将所有工作进程分配给默认的 GPU 设备。
model = TextEmbedding(
model_name="BAAI/bge-small-en-v1.5",
lazy_load=True, # don't load the model until first embed call
cuda=True, # enable GPU acceleration
device_ids=[0, 1], # spread workers across GPUs 0 and 1
)
生成嵌入时,使用 batch_size 设置批次大小,并使用 parallel 设置工作进程数量:
embeddings = list(model.embed(docs, batch_size=256, parallel=4))