当前位置: 首页 > news >正文

Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.2)(代码示例)(BN同步主卡保存梯度累加多卡测试inference)

       DDP的使用非常简单,因为它不需要修改你网络的配置。其精髓只有一句话

model = DistributedDataPrallel(model, device_ids=[local_rank], output_device=local_rank)

      原本的model就是你的PyTorch模型,新得到的model,就是你的DDP模型。
最重要的是,后续的模型关于前向传播、后向传播的用法,和原来完全一致!DDP把分布式训练的细节都隐藏起来了,不需要暴露给用户,非常优雅!

       Data Parallel的多卡训练的BN是只在单卡上算的,相当于减小了批量大小(batch-size)

初始化

      在套model = DDP(model)之前,我们还是需要做一番准备功夫,把环境准备好的。这里需要注意的是,我们的程序虽然会在16个进程上跑起来,但是它们跑的是同一份代码,所以在写程序的时候要处理好不同进程的关系。

①设置local_rank, world_size

②dist.init_process_group

import torch
import argparse

# 新增1:依赖
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 新增2:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank

# 新增3:DDP backend初始化
#   a.根据local_rank来设定当前使用哪块GPU
torch.cuda.set_device(local_rank)
#   b.初始化DDP,使用默认backend(nccl)就行。如果是CPU模型运行,需要选择其他后端。
dist.init_process_group(backend='nccl')

# 新增4:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前做哦。
#       如果要加载模型,也必须在这里做哦。
device = torch.device("cuda", local_rank)

前向与后向传播

      我们知道,DDP同时起了很多个进程,但是他们用的是同一份数据,那么就会有数据上的冗余性。也就是说,你平时一个epoch如果是一万份数据,现在就要变成1*16=16万份数据了。
      那么,我们需要使用一个特殊的sampler,来使得各个进程上的数据各不相同,进而让一个epoch还是1万份数据。幸福的是,DDP也帮我们做好了!

①model = DDP(model)

        模型套了DDP后,能帮我们为不同 GPU 上求得的梯度进行allreduce(即汇总不同 GPU 计算所得的梯度,并同步计算结果)。allreduce 后不同 GPU 中模型的梯度均为 allreduce 之前各 GPU 梯度的均值

②使用 DistributedSampler 初始化 DataLoader

model = nn.Linear(10, 10).to(device)


# 新增5:初始化DDP模型
#就是给原模型套一个DDP外层
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True)
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
#       sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size, sampler=train_sampler)


for epoch in range(num_epochs):
    # 新增2:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子
    trainloader.sampler.set_epoch(epoch)
    #test的时候也需要 test_dataloader.sampler.set_epoch(t)
    # 后面这部分,则与原来完全一致了。
    for data, label in trainloader:
        prediction = model(data)
        loss = loss_fn(prediction, label)
        loss.backward()
        optimizer = optim.SGD(model.parameters(), lr=0.001)
        optimizer.step()

注意test的时候也需要

test_dataloader.sampler.set_epoch(t)

保存参数

      save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。

      我只需要在进程0上保存一次就行了,避免多次保存重复的东西。

if dist.get_rank() == 0:
    torch.save(model.module, "saved_model.ckpt")

完整

import torch
import argparse
from torch import nn
from torch import optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torchvision
from torchvision import transforms
 
# 新增1:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
args = parser.parse_args()
# local_rank = int(FLAGS.local_rank)
 
# 新增2:DDP backend初始化
#   a.根据local_rank来设定当前使用哪块GPU
torch.cuda.set_device(args.local_rank)
#   b.初始化DDP, nccl是GPU设备上最快、最推荐的后端.如果是CPU模型运行,需要选择其他后端。
dist.init_process_group(backend='nccl')
dist.barrier() #等待每块GPU都运行到这个地方之后再继续往下走
world_size = torch.distributed.get_world_size()
print('全局进程数: ',world_size)
 
 
# 新增3:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前做。
#       如果要加载模型,也必须在这里做。
device = torch.device("cuda", args.local_rank)
model = nn.Linear(3*32*32, 512).to(device)
# 可能的load模型...
 
# 新增4:之后才是初始化DDP模型
model = DDP(model, device_ids=[args.local_rank], output_device=args.local_rank)
 
batch_size = 1024
#要保证batch_size能整除卡数
assert batch_size % world_size == 0
batch_size_per_GPU = batch_size // world_size
my_trainset = torchvision.datasets.CIFAR10(
                    root='./dataset', 
                    train=True,
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.5, 0.5, 0.5,), (0.5, 0.5, 0.5,))
                        ])
                    )
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
#       sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size_per_GPU, sampler=train_sampler, drop_last=True)
#单卡的时候dataloader经常会设置shuffle=True, 但是如果sampler不为None的时候,就不能设置shuffle=True了
#如果有testset也要同样进行上面的操作
 
loss_fn = nn.CrossEntropyLoss()
for epoch in range(5):
    # 新增2:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子
    trainloader.sampler.set_epoch(epoch)
    #test_dataloader.sampler.set_epoch(t)  #test的时候也需要
    # 后面这部分,则与原来完全一致了。
    for data, label in trainloader:
        data = data.to(device)
        label = label.to(device)
        prediction = model(data.view(-1, 3*32*32))
        loss = loss_fn(prediction, label)
        loss.backward()
        optimizer = optim.SGD(model.parameters(), lr=0.001)
        optimizer.step()
 
if dist.get_rank() == 0:
    torch.save(model.state_dict(), "saved_model.ckpt")

调用

python -m torch.distributed.launch --nproc_per_node 4 main.py

如果每一步算loss的时候,还想算一个各个卡loss的均值

import torch
import argparse
from torch import nn
from torch import optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import torchvision
from torchvision import transforms

def reduce_value(value, average=True):
    world_size = dist.get_world_size()
    if world_size < 2:  # 单GPU的情况
        return value

    with torch.no_grad():
        dist.all_reduce(value)
        if average:
            value /= world_size

        return value
    
# 新增1:从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
args = parser.parse_args()
# local_rank = int(FLAGS.local_rank)
 
# 新增2:DDP backend初始化
#   a.根据local_rank来设定当前使用哪块GPU
torch.cuda.set_device(args.local_rank)
#   b.初始化DDP, nccl是GPU设备上最快、最推荐的后端.如果是CPU模型运行,需要选择其他后端。
dist.init_process_group(backend='nccl')
dist.barrier() #等待每块GPU都运行到这个地方之后再继续往下走
world_size = torch.distributed.get_world_size()
print('全局进程数: ',world_size)
 
 
# 新增3:定义并把模型放置到单独的GPU上,需要在调用`model=DDP(model)`前做。
#       如果要加载模型,也必须在这里做。
device = torch.device("cuda", args.local_rank)
model = nn.Linear(3*32*32, 512).to(device)
# 可能的load模型...
 
# 新增4:之后才是初始化DDP模型
model = DDP(model, device_ids=[args.local_rank], output_device=args.local_rank)
 
batch_size = 1024
#要保证batch_size能整除卡数
assert batch_size % world_size == 0
batch_size_per_GPU = batch_size // world_size
my_trainset = torchvision.datasets.CIFAR10(
                    root='./', 
                    train=True,
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.5, 0.5, 0.5,), (0.5, 0.5, 0.5,))
                        ])
                    )
# 新增1:使用DistributedSampler,DDP帮我们把细节都封装起来了。用,就完事儿!
#       sampler的原理,后面也会介绍。
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
# 需要注意的是,这里的batch_size指的是每个进程下的batch_size。也就是说,总batch_size是这里的batch_size再乘以并行数(world_size)。
trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=batch_size_per_GPU, sampler=train_sampler, drop_last=True)
#单卡的时候dataloader经常会设置shuffle=True, 但是如果sampler不为None的时候,就不能设置shuffle=True了
#如果有testset也要同样进行上面的操作
 
loss_fn = nn.CrossEntropyLoss()
for epoch in range(5):
    # 新增2:设置sampler的epoch,DistributedSampler需要这个来维持各个进程之间的相同随机数种子
    trainloader.sampler.set_epoch(epoch)
    #test_dataloader.sampler.set_epoch(t)  #test的时候也需要
    # 后面这部分,则与原来完全一致了。
    for data, label in trainloader:
        data = data.to(device)
        label = label.to(device)
        prediction = model(data.view(-1, 3*32*32))
        loss = loss_fn(prediction, label)
        loss.backward()
        loss = reduce_value(loss, average=True)
        optimizer = optim.SGD(model.parameters(), lr=0.001)
        optimizer.step()
 
if dist.get_rank() == 0:
    torch.save(model.state_dict(), "saved_model.ckpt")

就是加了loss = reduce_value(loss, average=True) 这一步

是否要使用BN同步

只有DDP能使用BN同步,DP不行

具体有没有必要使用SyncBN,要看单卡batch_size的大小,如果单卡batch_size太小,使用SyncBN可以提高性能。但是如果batch_size较大的时候就不需要使用SyncBN, 因为这需要多卡之间通信,会导致训练速度变慢

如果要加的话

# DDP init
dist.init_process_group(backend='nccl')

# 按照原来的方式定义模型,这里的BN都使用普通BN就行了。
model = MyModel()
# 引入SyncBN,这句代码,会将普通BN替换成SyncBN。
if args.sync_bn:
    model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)

# 构造DDP模型
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

避免冗余输出

只规定在主卡(rank=0)的时候输出就行了

if dist.get_rank() == 0:
    torch.save(model.state_dict(), "saved_model.ckpt")
if args.local_rank == 0:
    wandb.log({'loss': loss_value})
    wandb.log(loss_dict_reduced_scaled)
    wandb.log(loss_dict_reduced_unscaled)

DDP下的梯度累加Gradient Accumulation

对前K-1次step取消其梯度同步。

幸运的是,DDP给我们提供了一个暂时取消梯度同步的context函数 no_sync()(源代码)。在这个context下,DDP不会进行梯度同步

from contextlib import nullcontext
# 如果你的python版本小于3.7,请注释掉上面一行,使用下面这个:
# from contextlib import suppress as nullcontext

if local_rank != -1:
    model = DDP(model)

optimizer.zero_grad()
for i, (data, label) in enumerate(dataloader):
    # 只在DDP模式下,轮数不是K整数倍的时候使用no_sync
    my_context = model.no_sync if local_rank != -1 and i % K != 0 else nullcontext
    with my_context():
        prediction = model(data)
        loss = loss_fn(prediction, label) / K
        loss.backward()  # 积累梯度,不应用梯度改变
    if i % K == 0:
        optimizer.step()
        optimizer.zero_grad()

多卡inference/测试

PyTorch多卡/多GPU/分布式DPP——多卡测试inference_hxxjxw的博客-CSDN博客

[原创][深度][PyTorch] DDP系列第三篇:实战与技巧 - 知乎

调用方式(torch.distributed.launch)

DP模型下,python源代码的调用方式和原来的不一样了。现在,需要用torch.distributed.launch来启动训练。

  • 分布式训练的重要参数
    • 高级参数(可以先不看,多机模式才会用到)
      • 通讯的address
      • 通讯的port
    • 每台机器有多少个进程?
      • --nproc_per_node
    • 当前是哪台机器?
      • --node_rank
    • 有多少台机器?
      • --nnodes
  • 实现方式
    • 多机多卡时,我们需要在每一台机子(总共m台)上都运行一次torch.distributed.launch
    • 每个torch.distributed.launch会启动n个进程,并给每个进程一个--local_rank=i的参数
      • 这就是之前需要"新增:从外面得到local_rank参数"的原因
    • 这样我们就得到n*m个进程,world_size=n*m

单机模式

假设我们只在一台机器上运行,可用卡数是8

python -m torch.distributed.launch --nproc_per_node 8 main.py

假如我们在一台机器上同时要跑两个程序

这时,为避免master_port冲突,我们需要指定一个新的

# 假设我们只用4,5,6,7号卡
CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 main.py
# 假如我们还有另外一个实验要跑,也就是同时跑两个不同实验。
#    这时,为避免master_port冲突,我们需要指定一个新的。这里我随便敲了一个。
CUDA_VISIBLE_DEVICES="4,5,6,7" python -m torch.distributed.launch --nproc_per_node 4 \
    --master_port 53453 main.py

多机模式

master进程就是rank=0的进程。

在使用多机模式前,需要介绍两个参数:

  • 通讯的address
    • --master_address
    • 也就是master进程的网络地址
    • 默认是:127.0.0.1,只能用于单机。
  • 通讯的port
    • --master_port
    • 也就是master进程的一个端口,要先确认这个端口没有被其他程序占用了哦。一般情况下用默认的就行
    • 默认是:29500

假设我们在2台机器上运行,每台可用卡数是8

#    机器1:
python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \
  --master_adderss $my_address --master_port $my_port main.py
#    机器2:
python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node 8 \
  --master_adderss $my_address --master_port $my_port main.py

相关文章:

  • Python itertools库
  • Pytorch模型提速
  • batchsize大小对模型训练的影响
  • Pytorch混合精度(FP16FP32)(AMP自动混合精度)/半精度 训练(一) —— 原理(torch.half)
  • CUDA编程(一) —— 相关概念基础知识
  • CUDA编程(二) —— CUDA编程模型
  • Python Fastai框架
  • ubuntu安装docker
  • Linux(ubuntu)(十三) —— (系统)服务管理 (systemctlservicechkconfig)服务的运行级别(Runlevel)
  • linux 文件/目录名 颜色
  • nvcc(CUDA编译器)
  • docker使用GPU(nvidia-docker)
  • Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.3)(torch.multiprocessing(spawn) Apex)
  • OpenStack
  • Python logging日志模块
  • 【Under-the-hood-ReactJS-Part0】React源码解读
  • ➹使用webpack配置多页面应用(MPA)
  • 30秒的PHP代码片段(1)数组 - Array
  • co模块的前端实现
  • ES10 特性的完整指南
  • Java,console输出实时的转向GUI textbox
  • Laravel核心解读--Facades
  • Mysql优化
  • QQ浏览器x5内核的兼容性问题
  • Vue UI框架库开发介绍
  • Webpack 4x 之路 ( 四 )
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 关于 Linux 进程的 UID、EUID、GID 和 EGID
  • 前端代码风格自动化系列(二)之Commitlint
  • 使用parted解决大于2T的磁盘分区
  • 用Visual Studio开发以太坊智能合约
  • 由插件封装引出的一丢丢思考
  • MPAndroidChart 教程:Y轴 YAxis
  • 树莓派用上kodexplorer也能玩成私有网盘
  • 我们雇佣了一只大猴子...
  • 移动端高清、多屏适配方案
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • #13 yum、编译安装与sed命令的使用
  • #Linux(帮助手册)
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • $forceUpdate()函数
  • (8)STL算法之替换
  • (done) 两个矩阵 “相似” 是什么意思?
  • (Matalb回归预测)PSO-BP粒子群算法优化BP神经网络的多维回归预测
  • (附源码)spring boot车辆管理系统 毕业设计 031034
  • (附源码)spring boot建达集团公司平台 毕业设计 141538
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • (转)树状数组
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .NET 8.0 发布到 IIS
  • .Net的DataSet直接与SQL2005交互
  • .net之微信企业号开发(一) 所使用的环境与工具以及准备工作
  • @RestController注解的使用
  • [ vulhub漏洞复现篇 ] AppWeb认证绕过漏洞(CVE-2018-8715)