3

Pytorch DDP使用方法以及注意点

 2 years ago
source link: https://blog.kamino.link/2021/12/09/Pytorch%20DDP%E4%BD%BF%E7%94%A8%E6%96%B9%E6%B3%95%E4%BB%A5%E5%8F%8A%E6%B3%A8%E6%84%8F%E7%82%B9/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Pytorch DDP使用方法以及注意点

[TOC]

Pytorch的DDP指的是DistributedDataParallel,位于torch.nn.parallel中,用于多GPU的模型训练。相比于之前的DP,DDP的速度快了很多。DDP支持多卡多机器,但我没有多机器,所以本文针对最常用的单机器多卡。

DDP加速的原理是通过启动多个进程,提高同时训练的batch size来增加并行度的,每一个进程都会加载一个模型,用不同的数据进行训练之后得到各自的梯度,然后通过Ring-Reduce算法获得所有进程的梯度,然后进行相同的梯度下降。注意在训练前和训练后,所有进程的模型参数都是同步了的。

Ring-Reduce是很简单理解的一个算法:每个进程都从左手边获得一份梯度,然后从右手发送一份梯度(一份指的是一个GPU得出的梯度),经过n次迭代之后,所有进程都获得了相同的完整的梯度。如下图,假设梯度∇w是GPU0算出来的,第一次∇w被发送到GPU1,第二次被GPU1发送到GPU2,第三次被GPU2发送到GPU3,第四次被发送到GPU4,第五次被发回给GPU0。这样每个进程都只需要接收一个,发送一个,而且能够清楚知道什么时候结束。

先来了解基础概念:

  • world_size:并行数,即总共用的卡数。
  • rank:当前进程全局序号,范围0~world_size。
  • local_rank:本地序号,由于本文介绍单机器,所以和上面一样。
  • master_port:DDP需要进程间传递数据,所以需要使用端口。
  • master_address:同上,需要使用IP地址。

基本逻辑是这样:

初始化多进程并获取rank号
使用DistributedSampler提供数据
将数据和模型都放在GPU上
将模型用DDP包裹
loss.backward同步梯度

初始化多进程、获取rank号

# 使用这种方法获取local_rank
# 其他获取local_rank的方法已经被弃用
local_rank = int(os.environ["LOCAL_RANK"])
# 设置device
torch.cuda.set_device(local_rank)
# 用nccl后端初始化多进程,一般都用这个
dist.init_process_group(backend='nccl')
# 获取device,之后的模型和张量都.to(device)
device = torch.device("cuda", local_rank)

对于n张卡上的n个模型,我们当然希望将数据分成不同的n部分,分别送给它们训练,所以需要使用torch.utils.data.distributed.DistributedSampler帮助我们自动分配数据。

注意!只有训练集才要用Sampler!在train之后,经常使用验证集对数据集进行验证得到validation_loss,此时没有必要使用多卡,只需要在一个进程上进行验证。

在多卡模式下要进行只在一个进程上的操作,通过model.module(inputs)而不是model(inputs)来调用forward()前向传播,而其他进程通过torch.distributed.barrier()来等待主进程完成validate操作。

假如要多卡推理,参考这篇文章写一个新的sampler[原创][深度][PyTorch] DDP系列第三篇:实战与技巧 - 知乎 (zhihu.com)

train_dataset = MyDataset()
# shuffle不在dataloader中设置,而是在sampler中设置
train_sampler = DistributedSampler(train_dataset, shuffle=True)
# batch_size指的是一张卡上的batch_size,总batch_size应该是要乘并行数
train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=64,
sampler=train_sampler)

for epoch in range(EPOCHS):
# 每轮开始前需要调用这个方法进行正确的数据shuffle
# 否则每一轮用的都是相同的顺序
train_sampler.set_epoch(epoch)
...

把数据和模型放在GPU上

# 直接to(device)即可,注意要收到返回值
model = YourModel()
model = model.to(device)
your_data = your_data.to(device)

把模型用DDP包裹

# local_rank就是进程号
# 从此之后,调用model()就是用DDP模式的前向传播
# 要使用原始的前向传播,需要model.module()
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

训练、同步梯度

# 仍然可以直接调用模型的train()方法
# 但是假如要调用其他你自己写的方法,就得model.module.func()
model.train()
for data in dataloader:
loss = model(data)
optimizer.zero_grad()
loss.backward() # 这个操作自动同步梯度
optimizer.step()
# 但是仍然需要累加得到所有进程loss的值的和
dist.all_reduce(loss, op=dist.ReduceOp.SUM)
# 然后除以并行数,就是这个batch的loss值了
loss /= world_size
# 保存的是参数,不需要DDP包裹
torch.save(model.module.state_dict())
# 通过外部命令运行 
# 通过CUDA_VISIBLE_DEVICES控制可见的卡数
# 通过--nproc_per_node确定使用多少卡
CUDA_VISIBLE_DEVICES="0,1,2,3" python -m torch.distributed.run --nproc_per_node 4 train.py

DDP注意点复习!

  1. 要把模型和数据放在进程对应的那张卡上
  2. 要使用Sampler来分发训练数据,并且shuffle不设置在Dataloder中而是Sampler中,每个epoch还需要调用Sampler的set_epoch()方法。
  3. 训练和验证区分较大,验证一般在主进程中进行一次验证即可,不需要sampler,操作和单卡一样,之后将数据同步给其他进程。
  4. 在多卡时要调用模型的其他方法或者使用单卡的模式,需要用model.module来获得原始模型,同样保存参数时也保存的是model.module的参数而不是DDP包裹的。

DDP小技巧

使用dist.all_reduce(loss, op=dist.ReduceOp.SUM)可以同步tensor的数据,由于算法限制,要算平均值只能用求和运算dist.ReduceOp.SUM之后再除以world_size

假如要同步的不是tensor,可以创建Tensor然后放进对应的GPU,再同步。

假如需要获得每个进程的某个tensor的值(即有n个GPU就获得n个值),那么使用dist.all_gather可以获得tensor列表。

# 官网API文档
# All tensors below are of torch.int64 dtype.
# We have 2 process groups, 2 ranks.
tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)]
tensor_list
# [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1
tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank
tensor
# tensor([1, 2]) # Rank 0
# tensor([3, 4]) # Rank 1
dist.all_gather(tensor_list, tensor)
tensor_list
# [tensor([1, 2]), tensor([3, 4])] # Rank 0
# [tensor([1, 2]), tensor([3, 4])] # Rank 1

同步数据时假如要控制所有进程同时,可以使用torch.distributed.barrier(),让快的进程等一下慢的进程,假如timeout了,可以看看代码是否能优化,或者在运行之前提供参数提高timeout的值。

假如dist.barrier()失效,可能是这种情况DistributedDataParallel barrier doesn’t work as expected during evaluation - distributed - PyTorch Forums

参考文献:

[原创][深度][PyTorch] DDP系列第一篇:入门教程 - 知乎 (zhihu.com)

ring allreduce和tree allreduce的具体区别是什么? - 知乎 (zhihu.com)

DistributedDataParallel barrier doesn’t work as expected during evaluation - distributed - PyTorch Forums


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK