1. 前言

Accelerate 能帮助我们:
  • 方便用户在不同设备上 run Pytorch training script.
    • mixed precision
    • 不同的分布式训练场景, e.g., multi-GPU, TPUs, …
  • 提供了一些 CLI 工具方便用户更快的 configure & test 训练环境,launch the scripts.
方便使用:

用一个例子感受一下。传统的 PyTorch training loop一般长这样:

my_model.to(device)

for batch in my_training_dataloader:
    my_optimizer.zero_grad()
    inputs, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    outputs = my_model(inputs)
    loss = my_loss_function(outputs, targets)
    loss.backward()
    my_optimizer.step()

Accelerate 配合使用,只需要增加一丢丢代码:

+ from accelerate import Accelerator

+ accelerator = Accelerator()
- # my_model.to(device)
  ## Pass every important object (model, optimizer, dataloader) to *accelerator.prepare*
+ my_model, my_optimizer, my_training_dataloader = accelerate.prepare(
+     my_model, my_optimizer, my_training_dataloader
+ )

  for batch in my_training_dataloader:
      my_optimizer.zero_grad()
      inputs, targets = batch
- #    inputs = inputs.to(device)
- #    targets = targets.to(device)
      outputs = my_model(inputs)
      loss = my_loss_function(outputs, targets)
      # Just a small change for the backward instruction
-     loss.backward()
+     accelerator.backward(loss)
      my_optimizer.step()
Script Launcher:

Accelerate 通过一个 CLI tool 使得用户不需要再去学习 torch.distributed.lauch,也不需要了解如何专门面向 TPU training 写 specific launcher. 具体操作为:

  • 在用户的 machine(s) 上面 run:
accelerate config
  • 回答相关问题。此时会生成 一个 config 文件,该文件将在用户执行以下命令时被自动调用,来进行正确的配置 (properly set the default options)
accelerate launch my_script.py --args_to_my_script
Accelerate 具体支持:
  • 仅 CPU 可用的设备
  • 单个 GPU
  • 单个 node 多个 GPU
  • 多个 nodes 上的多个 GPU
  • TPU
  • FP16 with native AMP (apex on the roadmap)
  • DeepSpeed (experimental support)

2. 入门小教程

2.1 典型使用方法

1. import Accelorator 并实例化

from accelerate import Accelerator

accelerator = Accelerator()
【注意】
- 越早越好,比如在进入 main 函数的第一步,此时会初始化(分布式)训练所需要的 everything.
- 不需要指明实验环境,accelerator 会自动检测。

2. 删除 model 和 data 对 .to(device).cuda() 的调用。accelerator 会帮你自动把模型和数据 place 到正确的 device 上面。

【注意】
- 可以保留对 .to(device) 的调用,但此时需保证 device = accelerator.device.
- 若不想采用 accelerator 的自动配置,则需要在初始化 Accelerator 的时候设置 device_placement=False.

3. 将所有 training 设计的对象 (optimizer, model, training_dataloader, learning rate scheduler) 送进 prepare().

model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
    model, optimizer, train_dataloader, lr_scheduler
)

此时,training dataloader 将在所有的 GPU/TPU 中间进行分享,也就是说,每个 device 拿到了 training dataset 的不同部分。
同时,所有 processes 的 random states 将在 dataloader 开始每个 iteration的时候进行同步,来确保 data 以相同的方式进行 shuffle (如果设置sampler shuffle=True 的话)。

【注意】
- 实际的 batch_size = number_of_devices * batch_size_set_in_script.
- 如果在 initialize Accelerator 的时候设置 split_batches=True,则 batch_size 不会随 device 的数量改变。
- 在所有关于 training 的 object 创建结束之后的第一时刻调用 prepare()。
- 对 len(training_dataloader) 的调用应当出现在 prepare() 之后。
- 如果不需要进行分布式 evaluation, 则无需将 eval_dataloader() 送入 prepare()。

4. 用 accelerator.backward(loss) 替换 loss.backward().

以上即为 accelerator 关于分布式训练的基本操作。Accelerate launcher 可以。

2.2 分布式 evaluation
  • 如何进行 regular evaluation?:不要将 eval_dataloader 送入 prepare(),但注意此时需要手动将 data 执行 .to(accelerator.device) 操作。
  • 如何进行 distributed evaluation?
validation_dataloader = accelerator.prepare(validation_dataloader)
for inputs, targets in validation_dataloader:
    predictions = model(inputs)
    # Gather all predictions and targets
    all_predictions = accelerator.gather(predictions)
    all_targets = accelerator.gather(targets)
    # Example of use with a *Datasets.Metric*
    metric.add_batch(all_predictions, all_targets)
【注意】
- gather() 要求由不同 process 得到的 tensor 具有相同的 siz
- 若此条件不满足 (e.g., dynamic padding),则应当通过 pad_across_processes() 将不同 process 的 tensor pad 到相同 size.
model.eval()
samples_seen = 0
for step, batch in enumerate(eval_dataloader):
    with torch.no_grad():
        outputs = model(**batch)
    predictions = outputs.logits.argmax(dim=-1)
    labels = batch["labels"]
    if not args.pad_to_max_length:  # necessary to pad predictions and labels for being gathered
        predictions = accelerator.pad_across_processes(predictions, dim=1, pad_index=-100)
        labels = accelerator.pad_across_processes(labels, dim=1, pad_index=-100)
    predictions_gathered, labels_gathered = accelerator.gather((predictions, labels))
    # If we are in a multiprocess environment, the last batch has duplicates
    if accelerator.num_processes > 1:
        if step == len(eval_dataloader):
            predictions_gathered = predictions_gathered[: len(eval_dataloader.dataset) - samples_seen]
            labels_gathered = labels_gathered[: len(eval_dataloader.dataset) - samples_seen]
        else:
            samples_seen += labels_gathered.shape[0]
    preds, refs = get_labels(predictions_gathered, labels_gathered)
    metric.add_batch(
        predictions=preds,
        references=refs,
    )  # predictions and preferences are expected to be a nested list of labels, not label_ids
2.3 Launching your distributed script
2.4 Launching training from a notebook
2.5 Training on TPU
2.6 一些常见问题及解决方法
  • 2.6.1 仅在一个 process 中执行一些操作,如 download data, logging, progress bar…
if accelerator.is_local_main_process:
    # Is executed once per server
from tqdm.auto import tqdm

progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)

注意,上面 code 中的 local 指的是 per machine。对于一些只需要进行一次的操作,例如将 train 好的 model upload 到 huggingface model hub,则需要

if accelerator.is_main_process:
    # Is executed once only
    accelerator.print('Test!')
  • 2.6.2 当有多个process的时候,难免出现执行速度不一致的情况,只需要执行 (当只有一张 CPU/GPU 时,则无事发生……):
accelerator.wait_for_everyone()
  • 2.6.3 保存/载入模型
    当 model 被送入 prepare() 之后,可能会被 place 到一个更大的 model 之中来便于进行分布式训练。因此在 save model 之前首先需要进行 unwarp():
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
# save
accelerator.save(unwrapped_model.state_dict(), filename)

如果在模型通过prepare() 后使用load函数时,则需要:

unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.load_state_dict(torch.load(filename))
2.7 保存所有状态

当我们在训练模型时,我们可能想要保存 model, optimizer, random generators, learning rate schedulers 等的当前状态。此时我们可以通过调用 accelerator.save_state(path_str)accelerator.load_state(path_str) 来分别实现相关对象状态的存储和调用。

其他通过 accelerator.register_for_checkingpointing register 的涉及状态的 items,也将通过上述函数的调用被存储/调用。

注意,送入 register_for_checkpointing 对象必须具有 load_state_dictsave_dict 函数可以调用。

  • Gradient clipping:如果我们的 code 调用了 torch.nn.utils.clip_grad_norm_torch.nn.utils.clip_grad_value_,我们应当使用 accelerator.clip_grad_norm_accelerator.clip_grad_value_ 进行替换。
  • Mixed precision training:当基于 Accelerate 进行 Mixed Precision 训练时,我们对于 loss 的计算也是在模型内(inside your model)完成的,例如 Transformer 模型。模型之外的所有计算都将以full precision 执行(这通常是我们在计算loss所期望的精度,尤其涉及softmax时)。此时,我们可以通过 accelerator.autocast() 实现:
with accelerator.autocast():
    loss = complex_loss_function(outputs, target):

【注意】在 Mixed Precision training 中,起始阶段可能会 skip 几步梯度更新(由 dynamic loss scaling stratergy 造成:在训练过程中,有些点会出现梯度溢出现象,此时会减小 loss scaling factor 的取值来避免在后续更新中再次发生这种情况)。在这种情况之下,我们可能在没有梯度更新的时候更新 learning rate scheduler (该现象暂记为 mismatch)。一般情况下,出现 mismatch 现象无伤大雅,但当训练样本很少时可能存在一些问题。如果学习率的初始值对于我们的模型训练十分关键,此时我们可以通过 skip learning rate scheduler 的更新来避免 mismatch 的发生:

if not accelerator.optimizer_step_was_skipped:
    lr_scheduler.step()
2.8 内部机制

=> 本质上说,accelerate lib 首先会分析 launch 当前 script 的环境来决定 i) 具体采用的 distributed setup, ii) 不同 processes 的数目, iii) 当前 script 所在的 process. 相关信息均存储在 ~AcceleratorState 中。

当我们实例化 Accelerator (并根据我们所需的 distributed setup 执行个性化初始化) 时会进行第一次初始化,然后共享至 AcceleratorState 的所有实例。

=> 当我们调用 prepare() 时,accelerate lib 会

  • Wrap your model(s) in the container adapted for the distributed setup.
  • Wrap your optimizer(s) in a AcceleratedOptimizer.
  • Create a new version of your dataloader(s) in a DataLoaderShard.

=> 当把 model 和 optimizer 送入 wraper 后,lib 会重新 create dataloader(s)。这主要是因为 PyTorch dataloader 在实例化之后,用户无法更改 batch_sampler。而 lib 通过改变 batch_sampler 的方式来利用不同的 process 处理不同的数据分片。

DataLoader 的子类 DataLoaderShard 中新的功能主要包括:

  • 它在每个迭代中同步所有processes的随机数生成器,以确保所有随机化操作(例如 shuffle)在不同的 processes 之间以完全相同的方式完成。
  • 在生成batch之前,将batch放在合适的 device 上(除非我们设置 device\u placement=True)。
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐