用腾讯云批量计算(batch-compute)调度GPU分布式机器学习

个人博客 658 0

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第1张

当用户提交一些机器学习任务时,往往需要大规模的计算资源,但是对于响应时间并没有严格的要求。在这种场景下,首先使用腾讯云的batch-compute(批量计算)产品来自动化提交用户的任务,然后使用分布式+gpu的方式解决算力问题,在任务完成后通知用户,是一个可行的解决方案。

本文将分成2部分:首先通过一个demo介绍上述过程的实现,从仅使用gpu、不考虑并行的简单情况开始,扩展至并行+gpu的情况,并简要介绍batch-compute的使用方法;然后介绍一些技术的实现原理(部分资料来源于知乎和博客,仅供参考)。

一个简单的Demo

使用pytorch,利用torch.Tensor对cuda的支持进行数据和模型的迁移。先不考虑并行,仅考虑如何将传统的基于cpu的机器学习任务迁移到gpu上。

1.定义一个简单的模型ConvNet:class ConvNet(nn.Module):

    def __init__(self, num_classes=10):
       super(ConvNet, self).__init__()
       self.layer1 = nn.Sequential(
           nn.Conv2d(1, 16, kernel_size=5, stride=1,             padding=2),
           nn.BatchNorm2d(16),
           nn.ReLU(),
           nn.MaxPool2d(kernel_size=2, stride=2))
       self.layer2 = nn.Sequential(
           nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
           nn.BatchNorm2d(32),
           nn.ReLU(),
           nn.MaxPool2d(kernel_size=2, stride=2))
       self.fc = nn.Linear(7*7*32, num_classes)

   def forward(self, x):
       out = self.layer1(x)
       out = self.layer2(out)
       out = out.reshape(out.size(0), -1)
       out = self.fc(out)
       return out

2.进行基于gpu的训练

def train(gpu, args):
   torch.manual_seed(0)
   model = ConvNet()
   torch.cuda.set_device(gpu)  # set default gpu
   model.cuda(gpu) # move model to gpu
   batch_size = 100
   criterion = nn.CrossEntropyLoss().cuda(gpu)     # move loss function to gpu
   optimizer = torch.optim.SGD(model.parameters(), 1e-4)
   # Data loading code
   train_dataset = torchvision.datasets.MNIST(root='./data',
                                              train=True,
                                     transform=transforms.ToTensor(),
                                              download=True)
   train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                              batch_size=batch_size,
                                              shuffle=True,
                                              num_workers=0,
                                              pin_memory=True)

   start = datetime.now()
   total_step = len(train_loader)
   for epoch in range(args.epochs):
       for i, (images, labels) in enumerate(train_loader):
           images = images.cuda(non_blocking=True)
           labels = labels.cuda(non_blocking=True)
           outputs = model(images)
           loss = criterion(outputs, labels)

           optimizer.zero_grad()
           loss.backward()
           optimizer.step()

上面代码中的train函数接收一个gpu的编号gpu作为参数,并且在第4行用其指定torch默认使用的gpu。在第5行,将模型迁移到gpu上。cuda()函数会返回将调用该函数的对象拷贝一份到cuda memory中并返回该拷贝。如果该对象已经存在cuda memory或是正确的gpu中,则直接返回原对象。在第7行,将损失函数迁移到gpu上。这样,机器学习任务就迁移到了gpu上。然后来考虑并行。这里假设有多个节点,每个节点上有多个gpu,每个进程使用一块gpu。pytorch提供了分布式训练的包torch.distributed,并且支持跨节点训练。

在脚本中设置master节点的ip和port

import torch.multiprocessing as mp
def main():
   ...
   args.world_size = args.gpus * args.nodes
   os.environ['MASTER_ADDR'] = 'xxx.xxx.xxx.xxx'
   os.environ['MASTER_PORT'] = '8888'
   mp.spawn(train, nprocs=args.gpus, args=(args,))

第5,6行通过环境变量的方式设置了master的ip和端口,之后master将在该端口监听worker的连接请求并完成初始化、广播等操作。

第7行通过spawn函数在本地启动了数量等于gpu数的进程,并且每个进程中运行相同的函数train。如果一个进程异常退出,那么其他进程也会被终止。

初始化本地进程,并等待其他进程初始化完毕

import torch.distributed as dist
def train(gpu, args):
   rank = args.nr * args.gpus + gpu
   print('starting making group.......')
   dist.init_process_group(
       backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
   print('all processes have been started!')
   torch.manual_seed(0)
   ...

第5行的init_process_group是一个阻塞函数,在所有进程启动完毕且socket连接建立成功后返回。这里使用了nccl作为后端(也就是通信架构),可以参考pytorch官方给出的最佳指南;init_method参数表示通过环境变量发现master;rank表示当前进程在进程组中的优先级,rank=0的进程是master进程;world_size表示进程组中总共有多少进程。

模型梯度同步

mode = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

参与训练的数据集被分成多份,每个进程取一份input输入神经网络,独立计算,然梯度后将各个进程的梯度求平均值,用平均值更新模型参数。

将数据划分到各个gpu上

train_sampler = torch.utils.data.distributed.DistributedSampler(
   train_dataset,
   num_replicas=args.world_size,
   rank=rank)

DistributedSampler将输入按照batch_size划分到不同的gpu上,使得每个进程能读到不同的batch,且不同进程间不会读到重复的batch。

这样,机器学习任务就可以在不同节点的多个gpu上并行地执行,不同的进程只需指定不同的rank即可。

最后将任务通过batch-compute实现自动化的任务提交和执行。

首先介绍batch-compute的概念。现代云计算有多种形式,其中常见的2种是流式计算(stream computing)和批量计算(batch computing)。流式计算处理对实时性要求高的请求,具有低延迟、持续性等特征,一般用于实时推荐、监控等服务;批量计算处理对实时性要求低但需要大量计算资源的请求,往往是耗时较长的一次性作业。机器学习任务就是一种很典型的批量计算。

利用腾讯云的batch-compute(批量计算)产品,开发者需要提供计算执行的环境、命令和输入输出存放的位置,由该产品自动去根据负载获取腾讯云的弹性资源,并自动调度作业执行流程,将企业和科研机构的双手从架设和配置数据中心中解放出来。

本文中使用batch-compute的python SDK,分为2步:先创建计算环境,然后提交计算作业。

Step1 创建计算环境

req = batch_models.CreateComputeEnvRequest()
params = '{\"ComputeEnv\":{\"EnvName\":\"batch-concurrent-test\",\"EnvData\":{\"InstanceType\":\"GN10X.2XLARGE40\",\"ImageId\":\"img-xxxxxx\",\"SystemDisk\":{\"DiskSize\":120},\"DesiredComputeNodeCount\":2},\"Placement\":{\"Zone\":\"ap-guangzhou-3\"}}'
print(params)
req.from_json_string(params)
resp = self.batch_client.CreateComputeEnv(req)
self.computeEnvId = json.loads(resp.to_json_string())["EnvId"]

第2行指定了创建2个节点,使用带gpu的机型GN10X.2XLARGE40;通过ImageId指定cvm的镜像,在这个镜像中部署了anaconda,pytorch,nvidia driver,cuda等。

如果需要获取创建节点的ip地址,可以通过第6行获取计算环境的id查看环境的详细信息。

Step 2 提交计算作业

commands = [
   'sudo service docker restart',
   'sudo service docker status',
   'set -x',
   'docker run -t --network host --gpus all <image-name> bash concurrent/task.sh',
]
params = '{\"Placement\":{\"Zone\":\"ap-guangzhou-3\"},\"Job\":{\"JobName\":\"test-job\",\"Tasks\":[{\"TaskName\":\"concurrent-task\",\"InputMappings\":[{\"SourcePath\":\"%s\",\"DestinationPath\":\"%s\"}],\"TaskInstanceNum\":2,\"Application\":{\"Command\":\"%s\"},\"EnvId\":\"%s\",\"RedirectInfo\":{\"StdoutRedirectPath\":\"%s\",\"StderrRedirectPath\":\"%s\"}}]}}' % (self.inPath, self.destPath, " && ".join(commands), self.computeEnvId, self.outPath, self.errPath)
req.from_json_string(params)
resp = self.batch_client.SubmitJob(req)

但是不能设置-i参数,因为输入设备并不是一个真正的tty;设置cmd参数使得容器启动后执行task.sh脚本在第5行启动了一个docker容器并使用容器内装好的cuda。此处将网络设置为host模式使得可以在容器内通过host ip直接访问另一个节点上的容器;设置-t参数使得运行结果与在终端通过命令行手动执行的输出保持一致;

[[ $(hostname -I | cut -d ' ' -f 1) == "xxx.xxx.xxx.xxx" ]];
python3 concurrent/mnist-distributed.py -n 2 -nr $?

第1行判断当前节点的ip是否为master节点的ip;第二行运行执行机器学习任务的python脚本,并传入rank参数,如果是master节点则传入0,否则,传入1

运行结果

为了直观地演示并行机器学习的输出结果,笔者在两台cvm上手动执行了脚本:

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第2张

如图,首先通过ip地址判断脚本输入参数中的rank值,并且等待所有进程启动成功。

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第3张

然后开始训练,可以看见每个节点上进行了3个epoch,batch_size为300,耗时8秒左右。

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第4张

为了对比使用并行前后的差距,在一个节点上启动任务。

如图,进行了3个epoch,batch_size为600,耗时为12秒左右。

至此,机器学习的任务就通过batch-compute产品提交并且在2台云服务器上并行地执行了,以下搬运一些pytorch文档/博客/知乎上关于分布式训练的原理实现。

原理

DDP(DistributedDataParallel)的构造函数

每个进程都有一个模型(module)。在构造函数中,DDP首先获得该module的引用,然后将module.state_dict()从master进程广播到全体进程,使得所有进程具有相同的初始状态。state_dict的返回值是buffer等不在参数列表中但是代表了网络状态的数据,例如batch normalization中的running_mean。不同进程间梯度的汇总、求和和同步是通过一个Reducer类实现的。在构造函数中初始化了一个Reducer对象,并通过该对象管理梯度计算。在Reducer对象的构造函数中,首先将所有的参数装进若干个bucket(桶),之后一桶一桶地计算可以提高效率。参数进入桶的顺序和其在数组Model.parameters中的顺序相反,后向传播中最后一层的梯度是最先被计算完毕的,因此应该最先参加求和。然后,Reducer为每个参数注册了一个autograd_hook,在该参数被计算完毕后触发。

前向传播

前向传播没有涉及梯度计算,但是设计一个corner case——如果用户定义了某些参数但是没有将其加入模型之中(即神经网络中存在孤立节点),那么autograd_hook永远不会被触发。为此,DDP的构造函数中提供了find_unused_parameters,如果被设置为True,则在前向传播完毕后会找出这些节点并直接将其标记为已完成计算。当然这一操作会引入额外的开销,因此作为一个参数。

后向传播

当所有节点上的同一编号的bucket中所有梯度均计算完成后,启动异步函数all_reduce求和。本地计算梯度和跨节点求平均值可以并行地进行,因为后向传播中用到的只是本地的计算结果(因为前向传播中的output就是只用local input算出来的)。

all_reduce实现细节

all_reduce实现了跨节点的求和计算。一种主流的实现方式是Parameter Server,即一个master节点接收其他节点发送的数值并求和,然后将结果发送给其他节点。

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第5张

但是这样会引入单点故障,因此Pytorch 1.x使用了一种名为Ring AllReduce的算法(Uber的开源分布式框架Horovord也采用了这一算法)。正如其名字所表现的,所有节点排成一个环,每个节点从作邻居接收数据,在本地完成一部分求和工作,然后向右邻居发送数据。所有节点是平等的,没有master节点。

Ring AllReduce算法分成2个阶段:Share-Reduce阶段和Share-Only阶段。

在Share-Reduce阶段结束后,每个节点上会得到一部分位置的求和结果。

用腾讯云批量计算(batch-compute)调度GPU分布式机器学习 第6张

Master进程有何意义?

既然使用了Ring AllReduce算法,那么在使用torch.distributed包时一定要指定的master ip&port有什么作用呢?

Master的主要作用时在初始化时为各个进程建立连接。具体而言,Master会创建一个守护线程,在这个线程中为所有worker各自创建一个socket,然后等待worker的连接,并在连上后发送其他进程所在的位置。

Worker则创建和master通信的socket,并主动连接master,在连上后获取其他进程的位置信息并报告自己的位置,然后和其他进程建立连接。

参考文献

pytorch.org/docs/stable/data.html

yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html

pytorch.org/docs/stable/notes/ddp.html

towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da

zhuanlan.zhihu.com/p/76638962

www.zhihu.com/question/306242771/answer/825668022


标签: 腾讯云 云服务器

发表评论 (已有0条评论)

还木有评论哦,快来抢沙发吧~