【PyTorch】多GPU并行训练DistributeDataParallel(Linux版)
目录前言一、DataParalled和DistributeDataParallel二、多GPU训练常见启动方式三、torch.distributed.launch代码讲解3.1、main中添加了几个新的变量3.2、初始化各进程环境3.3、调整学习率3.4、在第一个进程中进行打印和保存等操作3.5、DistributedSampler3.6、BatchSampler3.7、DataLoader3.8
前言
常见的多GPU并行的方法有model parallel 和 data parallel两种,这里主要讲的是data parallel:并行训练数据,相当于增大了batch_size.
主要需要注意的几点:
- 数据集如何在不同的设备间分配
- 误差梯度如何在不同的设备间通信
- BatchNormalization如何在不同的设备间同步(使用会提升一点,但是会降低一点并行速度)
一、DataParalled和DistributeDataParallel
- DataParalled只能用于单机多卡,而DistributeDataParallel可以用于单机多卡,多机多卡
- 单机情况下通常DataParalled要慢于DistributeDataParallel
二、多GPU训练常见启动方式
- torch.distributed.launch: 代码少,启动速度快(用的多)
注意: 如果开始训练后,手动强制终止程序,有小概率会出现进程没有杀掉的情况,最好看下GPU占用情况。
# 其中nproc_per_node为并行GPU的数量
python -m torch.distributed.launch --nproc_per_node=2 --use_env train_multi_gpu_using_launch.py
- torch.multiprocessing: 代码多点,速度慢点,但是拥有更好的控制和灵活性(这里不讲)
三、torch.distributed.launch代码讲解
3.1、main中添加了几个新的变量
注意: 这里一般只需要手动选择syncBN即可,后面三个参数不要动,系统会自动选择
# 是否启用SyncBatchNorm BN在多个GPU上同步
parser.add_argument('--syncBN', type=bool, default=True, help="同步BatchNormalization")
# 不要改该参数,系统会自动分配
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
# 开启的进程数(注意不是线程),不用设置该参数,会根据nproc_per_node自动设置
parser.add_argument('--world-size', default=4, type=int,
help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
3.2、初始化各进程环境
# 初始化各进程环境
init_distributed_mode(args=args)
def init_distributed_mode(args):
# 使用python -m torch.distributed.launch --nproc_per_node=2 --use_env train_multi_gpu_using_launch.py指令
# --use_env这个参数,他就会在我们os环境中(os.environ)存入RANK、WORLD_SIZE、LOCAL_RANK
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
# 多机多卡:WORLD_SIZE代表使用几台机器,RANK代表第几台机器,LOCAL_RANK代表某台机器上第几块GPU设备
# 单机多卡:WORLD_SIZE代表有几块GPU,RANK=LOCAL_RANK代表哪块GPU
args.rank = int(os.environ["RANK"])
args.world_size = int(os.environ['WORLD_SIZE'])
args.gpu = int(os.environ['LOCAL_RANK'])
elif 'SLURM_PROCID' in os.environ: # 一般不执行
args.rank = int(os.environ['SLURM_PROCID'])
args.gpu = args.rank % torch.cuda.device_count()
else: # 一般不执行
print('Not using distributed mode')
args.distributed = False
return
args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl' # 通信后端,nvidia GPU推荐使用NCCL
print('| distributed init (rank {}): {}'.format(
args.rank, args.dist_url), flush=True)
# 创建进程组(重要)
# backend:通信后端 init_method:使用默认(env://) world_size:几块GPU rank:当前进程处于哪块GPU
# 注意:对于不同的进程而言,它的world_size是相同的,但是rank是不相同的
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
dist.barrier() # 等待每一块GPU都运行到这个地方之后再接着往下走
3.3、调整学习率
学习率要根据并行GPU的数量进行倍增,这里方法不一定,有很多种
这里暴力增加,直接乘以GPU个数
# 学习率要根据并行GPU的数量进行倍增 world_size = GPU数量
args.lr *= args.world_size
# 也可以写成这样
args.lr *= max(1., args.world_size * args.batch_size / 64)
3.4、在第一个进程中进行打印和保存等操作
if rank == 0: # 在第一个进程中打印信息,并实例化tensorboard
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
3.5、DistributedSampler
DistributedSampler: 给每个rank(gpu)对应的进程分配训练的样本索引
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
这里简单讲下DistributedSampler到底要做什么事情:
如上图: 假设当前数据集总共有11个样本(0~10)DistributedSampler执行步骤:
- Shuffle处理:将数据集的样本随机打乱;
- 数据补充:假设现在有两块GPU,它会先用11/2向上取整=6,再乘以GPU的个数=12,少了(12-11)=1个数据,所以需要对数据进行补充。从头开始补,差一个就把第一个数据(6)补上,差几个补上前几个数据。这样就可以保证数据均衡的分配到每一个GPU当中;
- 分配数据:间隔的将数据分配到对应的GPU当中。
感兴趣的也可以读下它的源码(可读可不读):
class DistributedSampler(Sampler):
def __init__(self, dataset, num_replicas=None, rank=None, shuffle=True, seed=0):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset # 数据集
self.num_replicas = num_replicas # 进程个数 默认等于world_size(GPU个数)
self.rank = rank # 当前属于哪个进程/哪块GPU
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas)) # 每个进程的样本个数
self.total_size = self.num_samples * self.num_replicas # 数据集总样本的个数
self.shuffle = shuffle # 是否要打乱数据集
self.seed = seed
def __iter__(self):
# 1、 Shuffle处理:打乱数据集顺序
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
# 这里self.seed是一个定值,通过set_epoch改变self.epoch可以改变我们的初始化种子
# 这就可以让我们在每一个epoch中数据集的打乱顺序不同,使我们每一个epoch中每一块GPU拿到的数据都不一样,这样可以有利于更好的训练
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# 数据补充
indices += indices[:(self.total_size - len(indices))]
assert len(indices) == self.total_size
# 分配数据
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
r"""
Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
use a different random ordering for each epoch. Otherwise, the next iteration of this
sampler will yield the same ordering.
Arguments:
epoch (int): Epoch number.
"""
self.epoch = epoch
3.6、BatchSampler
BatchSampler: 将训练集打包成Batch的形式(将样本索引每batch_size个元素组成一个list)
train_batch_sampler = torch.utils.data.BatchSampler(
train_sampler, batch_size, drop_last=True)
简单讲下BatchSampler到底要做什么事情:
分组:通过DistributedSampler将训练集成功的分配到了两个GPU当中了,我们以第一块GPU为例,它分配到如上图的数据样本,假设传入的barch_size=2,它就会按顺序将当前数据组合成一组一组的形式(每batch_size个数据为一组)。
注:
- drop_last参数作用:假设batch_size=4, 那么我们这里只有前四个数据可以打包成一组,少了两个,如果drop_last为Ture,就直接将多余的数据直接丢掉,为False,则会将最后两个数据打包成一组(少了也没关系)。
- 为什么验证集不需要分组???不是很理解
3.7、DataLoader
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
if rank == 0:
print('Using {} dataloader workers every process'.format(nw))
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_sampler=train_batch_sampler,
pin_memory=True, # 将数据加载到GPU当中
num_workers=nw,
collate_fn=train_data_set.collate_fn)
val_loader = torch.utils.data.DataLoader(val_data_set,
batch_size=batch_size,
sampler=val_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=val_data_set.collate_fn)
3.8、保证初始化权重一致
原因:如果不使用预训练权重的话,那么就必须保证每个设备(GPU)上初始的权重是一模一样的。如果不能保证一样的话,那我们训练的梯度就不是针对同一组参数而言了,那么不同设备之间的梯度通信也就没有了任何意义。
if os.path.exists(weights_path):
# 如果使用预训练权重则载入 这时候初始化权重肯定一致
weights_dict = torch.load(weights_path, map_location=device)
load_weights_dict = {k: v for k, v in weights_dict.items()
if model.state_dict()[k].numel() == v.numel()}
model.load_state_dict(load_weights_dict, strict=False)
else:
checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
# 如果不存在预训练权重,需要将第一个进程中的权重保存,然后其他进程载入,保持初始化权重一致
if rank == 0:
torch.save(model.state_dict(), checkpoint_path)
dist.barrier() # 等待每一块GPU都运行到这个地方之后再接着往下走
# 这里注意,一定要指定map_location参数,否则会导致第一块GPU占用更多资源
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
3.9、SyncBatchNorm
# 是否冻结权重 如果冻结权重 那么没有BN层
if args.freeze_layers:
for name, para in model.named_parameters():
# 除最后的全连接层外,其他权重全部冻结
if "fc" not in name:
para.requires_grad_(False)
else:
# 只有训练带有BN结构的网络时使用SyncBatchNorm采用意义
if args.syncBN:
# 使用SyncBatchNorm后训练会更耗时
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
3.10、转为DDP模型
通过这个方法包装模型,再指认device_id后,就可以在各个设备间进行通信了。
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
3.11、DistributedSampler.set_epoch
for epoch in range(args.epochs):
train_sampler.set_epoch(epoch)
set_epoch这个方法是再我们的DistributedSampler当中的,主要是通过set_epoch这个方法来改变epoch,再间接的改变我们的初始化随机种子,让我们每一轮每块GPU上的数据都不一样,有利于我们的训练。具体的代码可以看第3.5节。
3.12、train_one_epoch
这个部分的代码和单GPU的代码基本相同,这里只是简单的讲下他们不同的部分:
1、打印信息
针对多GPU的情况,我们没必要打印每一个进程的训练信息,打印一下主进程的训练信息就够了。
# 在进程0中打印训练进度 只显示主进程的训练情况
if is_main_process():
data_loader = tqdm(data_loader)
# 在进程0中打印平均loss
if is_main_process():
data_loader.desc = "[epoch {}] mean loss {}".format(epoch, round(mean_loss.item(), 3))
2、loss计算
针对多GPU的情况,我们一般会求所有GPU上loss的均值
loss = reduce_value(loss, average=True) # 多GPU使用 loss=多GPU的loss的均值
mean_loss = (mean_loss * step + loss.detach()) / (step + 1) # update mean losses
def reduce_value(value, average=True):
world_size = get_world_size()
if world_size < 2: # 单GPU的情况
return value
with torch.no_grad():
dist.all_reduce(value) # 求和
if average: # 取平均
value /= world_size
return value
3、多GPU 需要同步一下进度
# 等待所有进程计算完毕 多GPU 需要同步一下进度
if device != torch.device("cpu"):
torch.cuda.synchronize(device)
3.13、evaluate
这个部分的代码和单GPU的代码基本相同,这里只是简单的讲下他们不同的部分:
1、打印信息
针对多GPU的情况,我们没必要打印每一个进程的训练信息,打印一下主进程的训练信息就够了。
# 在进程0中打印验证进度 在主进程中包装下data_loader
if is_main_process():
data_loader = tqdm(data_loader)
2、sum_num(预测对的个数)计算
针对多GPU的情况,我们一般会求所有GPU上sum_num的和
sum_num = reduce_value(sum_num, average=False) # 多GPU使用 返回多个设备的sum_num之和
# val_dataset经过DistributedSampler之后可能样本个数会增加,但是如果数据量很大的话影响也不是很大
acc = sum_num / val_sampler.total_size
def reduce_value(value, average=True):
world_size = get_world_size()
if world_size < 2: # 单GPU的情况
return value
with torch.no_grad():
dist.all_reduce(value) # 求和
if average: # 取平均
value /= world_size
return value
3、多GPU 需要同步一下进度
# 等待所有进程计算完毕
if device != torch.device("cpu"):
torch.cuda.synchronize(device)
3.14、保存到tensorboard、保存模型权重
只要在主进程中保存就可以了
if rank == 0: # 只在主进程
print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
tags = ["loss", "accuracy", "learning_rate"]
tb_writer.add_scalar(tags[0], mean_loss, epoch)
tb_writer.add_scalar(tags[1], acc, epoch)
tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
torch.save(model.module.state_dict(), "./weights/model-{}.pth".format(epoch))
3.15、删除主进程初始化权重临时缓存文件
# 删除临时缓存文件
# 讲刚才保存的主进程初始化权重的零时文件删除
if rank == 0: # 只在主进程
if os.path.exists(checkpoint_path) is True:
os.remove(checkpoint_path)
3.16、释放进程组资源
cleanup() # 释放进程组资源
def cleanup():
dist.destroy_process_group()
Reference
- https://www.bilibili.com/video/BV1yt4y1e7sZ?t=2923
更多推荐



所有评论(0)