一套用于处理有序数据集合的 Python multiprocessing 多进程数据分割使用范例
一套用于处理有序数据集合的 Python multiprocessing 多进程数据分割使用范例
前言
之前很多博文是写有关分布式训练过程,即将模型放到多个 GPU 上进行训练,而对于一些跑在 CPU 上的密集型任务处理时,应该使用 multiprocessing
模块进行处理。Python 的 multiprocessing
模块提供了一个与线程类似的 API,用于创建和管理进程。多进程可以利用多核 CPU 的优势,执行 CPU 密集型任务时可以显著提高性能,因为每个进程可以在不同的 CPU 核心上运行。
在一种使用场景中,假设我们需要对一套数据集进行处理,然而这一套数据集是有序的,也就是说,对于有序数据集,我们该如何并行的执行处理呢?本文聚焦这个使用场景,提出一套使用范例,希望可以以简单、高效、模块化的方式进行处理。
数据集分割思想
对于待处理的数据集,假设数据长度是 9 ,单线程串行处理如下所示:
如果采用多进程进行处理,那么首先需要使用将数据集分割给各个进程,一种简单有效的方式是,通过 取余 的方式将数据集分割到每个进程:
下面的图表示采用 取余 这种方式分割数据时,各个进程获得的数据表示,每个颜色表示一个进程,这里假设有 四个进程,数据集的长度是 9:
if __name__ == "__main__":
# 设置进程数量
num_processes = 4
dataset_length = len(dataset['train'])
# 计算每个进程需要处理的数据量
chunk_size = dataset_length // num_processes
# 创建进程
processes = []
for rank in range(num_processes):
# 通过取余的方式来分割数据
data_chunk = [data for i, data in enumerate(dataset['train'])
if i % num_processes == rank ]
process = Process(target=process_data_chunk, args=(rank, data_chunk))
print(f"Process {rank} created")
processes.append(process)
这个处理方式是简单直白的,但问题在于,如果整个数据集是有序的,我们通过这种方式处理的数据集在处理后进行整合时,应该按照原顺序进行整合,这个过程并没有那么方便,所以我们采取下面这种方式分割数据集会更加方便:
这样分割数据集后,只需要进程内按照数据集顺序排序,整体根据进程序号排序即可还原数据集的顺序了。
使用范例代码
下面阐述基于上述介绍的数据集分割代码,需要注意的是,如果整体数据集的长度不是 num_processes 的倍数,我们会将剩余部分的数据留给最后一个进程,这个操作过程是直白的:
import datasets
from multiprocessing import Process
# 加载数据集和分词器
dataset = datasets.load_dataset("")
# 定义多进程任务函数
def process_data_chunk(process_num, data_chunk):
pass
# 运行多进程主函数
if __name__ == "__main__":
# 设置进程数量
num_processes = 4
dataset_length = len(dataset['train'])
# 计算每个进程需要处理的数据量
chunk_size = dataset_length // num_processes
# 创建进程
processes = []
for i in range(num_processes):
# 做数据分割,这里将最后没有分尽的数据给了最后一个进程
start = i * chunk_size
end = (i+1) * chunk_size if i != num_processes - 1 else dataset_length
data_chunk = dataset['train'][start:end]
process = Process(target=process_data_chunk, args=(i, data_chunk))
print(f"Process {i} created")
processes.append(process)
# 启动进程
for process in processes:
process.start()
# 等待进程结束
for process in processes:
process.join()
注意:这里提供的代码只是一个切割数据集的框架和范例,其中的数据集可以只是一个数组或者文件列表或者其他表现形式,不一定是 datasets
同样,通过在 process_data_chunk
中实现相应的数据处理过程,可以实现多进程的数据处理。