前言

常见的多GPU并行的方法有model parallel 和 data parallel两种,这里主要讲的是data parallel:并行训练数据,相当于增大了batch_size.

主要需要注意的几点:

  1. 数据集如何在不同的设备间分配
  2. 误差梯度如何在不同的设备间通信
  3. BatchNormalization如何在不同的设备间同步(使用会提升一点,但是会降低一点并行速度)

一、DataParalled和DistributeDataParallel

  1. DataParalled只能用于单机多卡,而DistributeDataParallel可以用于单机多卡,多机多卡
  2. 单机情况下通常DataParalled要慢于DistributeDataParallel

二、多GPU训练常见启动方式

  1. torch.distributed.launch: 代码少,启动速度快(用的多
    注意: 如果开始训练后,手动强制终止程序,有小概率会出现进程没有杀掉的情况,最好看下GPU占用情况。
# 其中nproc_per_node为并行GPU的数量
python -m torch.distributed.launch --nproc_per_node=2 --use_env train_multi_gpu_using_launch.py
  1. torch.multiprocessing: 代码多点,速度慢点,但是拥有更好的控制和灵活性(这里不讲)

三、torch.distributed.launch代码讲解

3.1、main中添加了几个新的变量

注意: 这里一般只需要手动选择syncBN即可,后面三个参数不要动,系统会自动选择

    # 是否启用SyncBatchNorm BN在多个GPU上同步
    parser.add_argument('--syncBN', type=bool, default=True, help="同步BatchNormalization")

    # 不要改该参数,系统会自动分配
    parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
    # 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置
    parser.add_argument('--world-size', default=4, type=int,
                        help='number of distributed processes')
    parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')

3.2、初始化各进程环境

# 初始化各进程环境
init_distributed_mode(args=args)
def init_distributed_mode(args):
    # 使用python -m torch.distributed.launch --nproc_per_node=2 --use_env train_multi_gpu_using_launch.py指令
    # --use_env这个参数,他就会在我们os环境中(os.environ)存入RANK、WORLD_SIZE、LOCAL_RANK
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        # 多机多卡:WORLD_SIZE代表使用几台机器,RANK代表第几台机器,LOCAL_RANK代表某台机器上第几块GPU设备
        # 单机多卡:WORLD_SIZE代表有几块GPU,RANK=LOCAL_RANK代表哪块GPU
        args.rank = int(os.environ["RANK"])
        args.world_size = int(os.environ['WORLD_SIZE'])
        args.gpu = int(os.environ['LOCAL_RANK'])
    elif 'SLURM_PROCID' in os.environ:  # 一般不执行
        args.rank = int(os.environ['SLURM_PROCID'])
        args.gpu = args.rank % torch.cuda.device_count()
    else:  # 一般不执行
        print('Not using distributed mode')
        args.distributed = False
        return

    args.distributed = True

    torch.cuda.set_device(args.gpu)
    args.dist_backend = 'nccl'  # 通信后端,nvidia GPU推荐使用NCCL
    print('| distributed init (rank {}): {}'.format(
        args.rank, args.dist_url), flush=True)

    # 创建进程组(重要)
    # backend:通信后端  init_method:使用默认(env://) world_size:几块GPU  rank:当前进程处于哪块GPU
    # 注意:对于不同的进程而言,它的world_size是相同的,但是rank是不相同的
    dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                            world_size=args.world_size, rank=args.rank)
    dist.barrier()  # 等待每一块GPU都运行到这个地方之后再接着往下走

3.3、调整学习率

学习率要根据并行GPU的数量进行倍增,这里方法不一定,有很多种
这里暴力增加,直接乘以GPU个数

# 学习率要根据并行GPU的数量进行倍增  world_size = GPU数量
args.lr *= args.world_size 

# 也可以写成这样
args.lr *= max(1., args.world_size * args.batch_size / 64)

3.4、在第一个进程中进行打印和保存等操作

 if rank == 0:  # 在第一个进程中打印信息,并实例化tensorboard
        print(args)
        print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
        tb_writer = SummaryWriter()
        if os.path.exists("./weights") is False:
            os.makedirs("./weights")

3.5、DistributedSampler

DistributedSampler: 给每个rank(gpu)对应的进程分配训练的样本索引

 train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
 val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)

这里简单讲下DistributedSampler到底要做什么事情:
在这里插入图片描述
如上图: 假设当前数据集总共有11个样本(0~10)DistributedSampler执行步骤:

  1. Shuffle处理:将数据集的样本随机打乱;
  2. 数据补充:假设现在有两块GPU,它会先用11/2向上取整=6,再乘以GPU的个数=12,少了(12-11)=1个数据,所以需要对数据进行补充。从头开始补,差一个就把第一个数据(6)补上,差几个补上前几个数据。这样就可以保证数据均衡的分配到每一个GPU当中;
  3. 分配数据:间隔的将数据分配到对应的GPU当中。

感兴趣的也可以读下它的源码(可读可不读):

class DistributedSampler(Sampler):
    def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True, seed=0):
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        self.dataset = dataset  # 数据集
        self.num_replicas = num_replicas  # 进程个数 默认等于world_size(GPU个数)
        self.rank = rank   # 当前属于哪个进程/哪块GPU
        self.epoch = 0
        self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))  # 每个进程的样本个数
        self.total_size = self.num_samples * self.num_replicas  # 数据集总样本的个数
        self.shuffle = shuffle  # 是否要打乱数据集
        self.seed = seed

    def __iter__(self):
        # 1、 Shuffle处理:打乱数据集顺序
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            # 这里self.seed是一个定值,通过set_epoch改变self.epoch可以改变我们的初始化种子
            # 这就可以让我们在每一个epoch中数据集的打乱顺序不同,使我们每一个epoch中每一块GPU拿到的数据都不一样,这样可以有利于更好的训练
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()
        else:
            indices = list(range(len(self.dataset)))

        # 数据补充
        indices += indices[:(self.total_size - len(indices))]
        assert len(indices) == self.total_size

        # 分配数据
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples

        return iter(indices)

    def __len__(self):
        return self.num_samples

    def set_epoch(self, epoch):
        r"""
        Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.
        Arguments:
            epoch (int): Epoch number.
        """
        self.epoch = epoch

3.6、BatchSampler

BatchSampler: 将训练集打包成Batch的形式(将样本索引每batch_size个元素组成一个list)

train_batch_sampler = torch.utils.data.BatchSampler(
        train_sampler, batch_size, drop_last=True)

简单讲下BatchSampler到底要做什么事情:
在这里插入图片描述
分组:通过DistributedSampler将训练集成功的分配到了两个GPU当中了,我们以第一块GPU为例,它分配到如上图的数据样本,假设传入的barch_size=2,它就会按顺序将当前数据组合成一组一组的形式(每batch_size个数据为一组)。
注:

  1. drop_last参数作用:假设batch_size=4, 那么我们这里只有前四个数据可以打包成一组,少了两个,如果drop_last为Ture,就直接将多余的数据直接丢掉,为False,则会将最后两个数据打包成一组(少了也没关系)。
  2. 为什么验证集不需要分组???不是很理解

3.7、DataLoader

 nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])  # number of workers
 if rank == 0:
     print('Using {} dataloader workers every process'.format(nw))
 train_loader = torch.utils.data.DataLoader(train_data_set,
                                            batch_sampler=train_batch_sampler,
                                            pin_memory=True, # 将数据加载到GPU当中
                                            num_workers=nw,
                                            collate_fn=train_data_set.collate_fn)

 val_loader = torch.utils.data.DataLoader(val_data_set,
                                          batch_size=batch_size,
                                          sampler=val_sampler,
                                          pin_memory=True,
                                          num_workers=nw,
                                          collate_fn=val_data_set.collate_fn)

3.8、保证初始化权重一致

原因:如果不使用预训练权重的话,那么就必须保证每个设备(GPU)上初始的权重是一模一样的。如果不能保证一样的话,那我们训练的梯度就不是针对同一组参数而言了,那么不同设备之间的梯度通信也就没有了任何意义。

 if os.path.exists(weights_path):
       # 如果使用预训练权重则载入  这时候初始化权重肯定一致
       weights_dict = torch.load(weights_path, map_location=device)
       load_weights_dict = {k: v for k, v in weights_dict.items()
                            if model.state_dict()[k].numel() == v.numel()}
       model.load_state_dict(load_weights_dict, strict=False)
 else:
      checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
      # 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
      if rank == 0:
          torch.save(model.state_dict(), checkpoint_path)

      dist.barrier()  # 等待每一块GPU都运行到这个地方之后再接着往下走
      # 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
      model.load_state_dict(torch.load(checkpoint_path, map_location=device))

3.9、SyncBatchNorm

 # 是否冻结权重 如果冻结权重 那么没有BN层 
 if args.freeze_layers:
     for name, para in model.named_parameters():
         # 除最后的全连接层外,其他权重全部冻结
         if "fc" not in name:
             para.requires_grad_(False)
 else:
     # 只有训练带有BN结构的网络时使用SyncBatchNorm采用意义
     if args.syncBN:
         # 使用SyncBatchNorm后训练会更耗时
         model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)

3.10、转为DDP模型

通过这个方法包装模型,再指认device_id后,就可以在各个设备间进行通信了。

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])

3.11、DistributedSampler.set_epoch

for epoch in range(args.epochs):
    train_sampler.set_epoch(epoch)

set_epoch这个方法是再我们的DistributedSampler当中的,主要是通过set_epoch这个方法来改变epoch,再间接的改变我们的初始化随机种子,让我们每一轮每块GPU上的数据都不一样,有利于我们的训练。具体的代码可以看第3.5节。

3.12、train_one_epoch

这个部分的代码和单GPU的代码基本相同,这里只是简单的讲下他们不同的部分:

1、打印信息

针对多GPU的情况,我们没必要打印每一个进程的训练信息,打印一下主进程的训练信息就够了。

 # 在进程0中打印训练进度  只显示主进程的训练情况
 if is_main_process():
     data_loader = tqdm(data_loader)
# 在进程0中打印平均loss
 if is_main_process():
     data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))

2、loss计算

针对多GPU的情况,我们一般会求所有GPU上loss的均值

loss = reduce_value(loss, average=True)  # 多GPU使用 loss=多GPU的loss的均值
mean_loss = (mean_loss * step + loss.detach()) / (step + 1)  # update mean losses
def reduce_value(value, average=True):
    world_size = get_world_size()
    if world_size < 2:  # 单GPU的情况
        return value

    with torch.no_grad():
        dist.all_reduce(value) # 求和
        if average:  # 取平均
            value /= world_size

        return value

3、多GPU 需要同步一下进度

# 等待所有进程计算完毕  多GPU 需要同步一下进度
if device != torch.device("cpu"):
     torch.cuda.synchronize(device)

3.13、evaluate

这个部分的代码和单GPU的代码基本相同,这里只是简单的讲下他们不同的部分:

1、打印信息

针对多GPU的情况,我们没必要打印每一个进程的训练信息,打印一下主进程的训练信息就够了。

# 在进程0中打印验证进度  在主进程中包装下data_loader
if is_main_process():
     data_loader = tqdm(data_loader)

2、sum_num(预测对的个数)计算

针对多GPU的情况,我们一般会求所有GPU上sum_num的和

sum_num = reduce_value(sum_num, average=False)  # 多GPU使用 返回多个设备的sum_num之和
# val_dataset经过DistributedSampler之后可能样本个数会增加,但是如果数据量很大的话影响也不是很大
acc = sum_num / val_sampler.total_size  
def reduce_value(value, average=True):
    world_size = get_world_size()
    if world_size < 2:  # 单GPU的情况
        return value

    with torch.no_grad():
        dist.all_reduce(value) # 求和
        if average:  # 取平均
            value /= world_size
        return value

3、多GPU 需要同步一下进度

# 等待所有进程计算完毕
if device != torch.device("cpu"):
    torch.cuda.synchronize(device)

3.14、保存到tensorboard、保存模型权重

只要在主进程中保存就可以了

if rank == 0:   # 只在主进程
    print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
    tags = ["loss", "accuracy", "learning_rate"]
    tb_writer.add_scalar(tags[0], mean_loss, epoch)
    tb_writer.add_scalar(tags[1], acc, epoch)
    tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)

    torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch))

3.15、删除主进程初始化权重临时缓存文件

# 删除临时缓存文件
# 讲刚才保存的主进程初始化权重的零时文件删除
if rank == 0:  # 只在主进程
     if os.path.exists(checkpoint_path) is True:
         os.remove(checkpoint_path)

3.16、释放进程组资源

cleanup()  # 释放进程组资源
def cleanup():
    dist.destroy_process_group()

Reference

  1. https://www.bilibili.com/video/BV1yt4y1e7sZ?t=2923
Logo

更多推荐