跳至主要內容

一套用于处理有序数据集合的 Python multiprocessing 多进程数据分割使用范例

CK...大约 3 分钟技巧总结技巧Python多进程分布式开发

一套用于处理有序数据集合的 Python multiprocessing 多进程数据分割使用范例

前言

之前很多博文是写有关分布式训练过程,即将模型放到多个 GPU 上进行训练,而对于一些跑在 CPU 上的密集型任务处理时,应该使用 multiprocessing 模块进行处理。Python 的 multiprocessing 模块提供了一个与线程类似的 API,用于创建和管理进程。多进程可以利用多核 CPU 的优势,执行 CPU 密集型任务时可以显著提高性能,因为每个进程可以在不同的 CPU 核心上运行。

在一种使用场景中,假设我们需要对一套数据集进行处理,然而这一套数据集是有序的,也就是说,对于有序数据集,我们该如何并行的执行处理呢?本文聚焦这个使用场景,提出一套使用范例,希望可以以简单、高效、模块化的方式进行处理。

数据集分割思想

对于待处理的数据集,假设数据长度是 9 ,单线程串行处理如下所示:

image-20240407140337930
image-20240407140337930

如果采用多进程进行处理,那么首先需要使用将数据集分割给各个进程,一种简单有效的方式是,通过 取余 的方式将数据集分割到每个进程:

下面的图表示采用 取余 这种方式分割数据时,各个进程获得的数据表示,每个颜色表示一个进程,这里假设有 四个进程,数据集的长度是 9:

image-20240407140417587
image-20240407140417587
if __name__ == "__main__":
    # 设置进程数量
    num_processes = 4
    dataset_length = len(dataset['train'])

    # 计算每个进程需要处理的数据量
    chunk_size = dataset_length // num_processes

    # 创建进程
    processes = []
    for rank in range(num_processes):
        # 通过取余的方式来分割数据
        data_chunk = [data for i, data in enumerate(dataset['train']) 
                      if i % num_processes == rank ]
        
        process = Process(target=process_data_chunk, args=(rank, data_chunk))
        print(f"Process {rank} created")
        processes.append(process)

这个处理方式是简单直白的,但问题在于,如果整个数据集是有序的,我们通过这种方式处理的数据集在处理后进行整合时,应该按照原顺序进行整合,这个过程并没有那么方便,所以我们采取下面这种方式分割数据集会更加方便:

image-20240407140442430
image-20240407140442430

这样分割数据集后,只需要进程内按照数据集顺序排序,整体根据进程序号排序即可还原数据集的顺序了。

使用范例代码

下面阐述基于上述介绍的数据集分割代码,需要注意的是,如果整体数据集的长度不是 num_processes 的倍数,我们会将剩余部分的数据留给最后一个进程,这个操作过程是直白的:

import datasets
from multiprocessing import Process


# 加载数据集和分词器
dataset = datasets.load_dataset("")

# 定义多进程任务函数
def process_data_chunk(process_num, data_chunk):
    pass


# 运行多进程主函数
if __name__ == "__main__":
    # 设置进程数量
    num_processes = 4
    dataset_length = len(dataset['train'])

    # 计算每个进程需要处理的数据量
    chunk_size = dataset_length // num_processes

    # 创建进程
    processes = []
    for i in range(num_processes):
        # 做数据分割,这里将最后没有分尽的数据给了最后一个进程
        start = i * chunk_size
        end = (i+1) * chunk_size if i != num_processes - 1 else dataset_length

        data_chunk = dataset['train'][start:end]

        process = Process(target=process_data_chunk, args=(i, data_chunk))
        print(f"Process {i} created")
        processes.append(process)

    # 启动进程
    for process in processes:
        process.start()

    # 等待进程结束
    for process in processes:
        process.join()

注意:这里提供的代码只是一个切割数据集的框架和范例,其中的数据集可以只是一个数组或者文件列表或者其他表现形式,不一定是 datasets 同样,通过在 process_data_chunk 中实现相应的数据处理过程,可以实现多进程的数据处理。