importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
tensor = torch.tensor([rank +。1], dtype=torch.long, device='cuda')
torch.distributed.all_reduce(tensor)
print(f'rank:{rank}{tensor}')
rank0的tensor=1,rank1的tensor=2,求和结果为3,因此能看到这样的输出:规约操作基本可以算是最常用的通信算子。例如rank之间通过规约操作同步梯度、计算平均loss,再比如分布式softmax利用规约计算最大值、归一化分母等等。
聚集操作中最典型的是torch.distributed.all_gather(),能够把不同rank的数据收集到一起,特点是参与通信的rank有多少,就会得到多少个tensor。例如下面这段代码,作用是将所有rank的tensor收集到一起:
importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
tensor=torch.tensor([rank+1],dtype=torch.long,device='cuda')
tensors=[torch.empty_like(tensor)for_inrange(world_size)]
torch.distributed.all_gather(tensors,tensor)
print(f'rank:{rank}{tensors}')
由于“所有”rank都收集了“所有”tensor,所以每个rank都会拥有其他所有rank和自己的tensor,因此会打印出如下内容:all_gather可以模拟all_reduce,比如sum(tensors)就等于all_reduce sum,max(tensors) 就是all_reduce max。还有一个需要注意的点是不能使用列表乘法创建tensors:
tensors=[torch.empty_like(tensor)]*world_size
因为除了all_gather_object等支持python变量的通信算子,大部分都是原地操作,而列表乘法创建的tensors每一个tensor都是指向同一个对象,与原地操作会发生冲突。聚集相比规约更加灵活,但是规约的效率和显存占用通常会更好。
广播的典型操作是 torch.distributed.broadcast(),作用是将指定rank的tensor发送给其他所有rank,比如下面这段代码,是将rank0的tensor发送给其他所有rank,这样最终所有rank的tensor都是一样的
importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
tensor=torch.tensor([rank+1],dtype=torch.long,device='cuda')
torch.distributed.broadcast(tensor,0)
print(f'rank:{rank}{tensor}')
执行后你应该能看到下面这样的结果:广播的特点是数据由一个节点到所有节点,通常用于将只会出现在某一个rank的信号发送到全部rank。例如动态数据量训练,是否到达最后一个batch,会以最后一个数据并行rank能否获得一个完整的micro batch作为信号。此时可以利用广播操作,将最后一个数据并行rank的结束信号广播到其他所有rank。再比如在megatron中,当使用tensor并行时,仅第一个tensor rank会获取数据,其他tensor rank的数据是由第一个tensor rank广播来的。
除了上述几个所有rank之间的通信,有时我们也需要两个rank之间的两两通信,这时就会用到p2p通信。p2p通信的发送方使用torch.distributed.send(),接收方使用torch.distributed.recv()。比如下面这段代码,功能是所有偶数rank将tensor发送到下一个奇数rank:
importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
ifrank%2==0:
tensor=torch.tensor([999],dtype=torch.long,device='cuda')
torch.distributed.send(tensor,rank+1)
else:
tensor=torch.empty(1,dtype=torch.long,device='cuda')
torch.distributed.recv(tensor,rank-1)
print(f'rank:{rank}{tensor}')
运行会得到以下结果上面大多数算子都是原地操作,这就带来一个问题,原地操作要求先创建一个空张量,等待通信算子把数据放进来。但是一个rank怎么知道被通信过来的张量shape是什么样的,怎么提前创建这个空张量?尤其是在广播和p2p通信时经常会碰到这个问题。一个常用的方案是在通信以前,先通信一个固定ndim的张量用来表示接下来要通信的张量的shape,然后再通信真正的数据,比如下面这样:
importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
ifrank%2==0:
tensor=torch.randn(1,4,dtype=torch.float16,device='cuda')
shape_tensor=torch.tensor(tensor.size(),dtype=torch.long,device='cuda')
torch.distributed.send(shape_tensor,rank+1)
torch.distributed.send(tensor,rank+1)
else:
shape_tensor=torch.empty(2,dtype=torch.long,device='cuda')
torch.distributed.recv(shape_tensor,rank-1)
tensor=torch.empty(torch.Size(shape_tensor),dtype=torch.float16,device='cuda')
torch.distributed.recv(tensor,rank-1)
print(f'rank:{rank}{tensor}')
这种方法有点像定义一种通信协议,第一次握手通信shape,第二次通信数据。如果在使用时ndim也固定不下来,或者tensor的 dtype也需要通信,那么我们就可以像定义通信协议一样,定义一个长一点的定长shape tensor,比如shape=(10,) ,用前9位表示接下来要通信的数据张量的shape,不足的位置补0,最后一位用一个数字表示数据类型。点对点通信主要的应用场景一个是pipeline并行,由上一个pipe发送数据到下一个pipe,一个是蒸馏的teacher 模型发送 probabilities 给 student,以及类似的reference-polocy model。
还有一个经常使用的是同步屏障 torch.distributed.barrier(),这个操作不通信任何数据,作用是确保所有进程都运行到此处后再开始之后的动作。比如当存储checkpoint时,我们通常只会让第一个数据并行的rank进行保存,其他rank此时就应该使用同步屏障等待第一个rank保存结束。这样可以避免其他rank提前开始新的计算或提前结束导致保存失败,例如:
importtorch
importos
importtime
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
ifrank==0:
time.sleep(20)#用sleep20秒模拟rank0在做一些高耗时行为,比如存储checkpoint
else:
a=1+2
torch.distributed.barrier()
除了上面这几种,还有all-to-all算子和scatter算子。在LLM场景中经这两个算子经常出现在序列并行的正、反向传播中。
在使用通信算子时,需要确保所有相关rank都要执行到这一段代码,不然已经执行到这一步的rank会hang住一直等待,导致程序无法继续。
上面提到的通信算子基本都对应有自己特殊的通信模式,比如reduce算子的背后有tree-reduce和ring-reduce通信模式,广播、p2p等等也都分别对应自己的通信模式。在LLM训练框架开发这个层面,我们可能需要关注下ring-reduce和tree-reduce这两种通信模式。按理说这两种通信模式属于all_reduce的底层实现,torch这个层面应该不需要过多关注,但是我在实际开发中多次遇到相关问题,比如:
当你怀疑可能自己也遇到了这类问题时,可以考虑通过环境变量NCCL_ALGO强制指定使用tree还是ring。或者尝试用all_gather替换all_reduce,或者更新torch版本。
当你打算涉足通信,就意味着你即将离开torch稳定的后方,站上痛苦debug的前线。
上面的提到的算子,除了p2p通信,基本都是所有rank参与的通信,如果只能这样未免有些太死板了。想要前5个rank all_reduce,后5个 rank all_gather应该怎么做?广播只广播给奇数rank怎么做?一部分操作在gpu上用nccl通信,一部分在cpu上用gloo通信怎么实现?如果2个rank同时用p2p通信不同张量,怎么做区分?这里就轮到通信组登场了。
注:之后的例子会更复杂一点,之后的所有脚本都会启动4个rank,也就是在此之后的所有 --nproc-per-node都为4
通信组是通过下面这种形式创建的:
ranks=[0,1]
group=torch.distributed.new_group(ranks,backend='nccl')
上面这段代码的意思是创建1个通信组,包含 rank0和rank1,以nccl作为后端。在使用通信算子时,我们可以通过group参数指定通信组,这样数据交换就只会发生在组内。比如当你这样使用barrier时,可以指定通信组,这样只要组内的rank都到达barrier,就可以继续,而不是等待所有rank都到达barrier才能继续:
importtorch
importos
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
ranks=[0,1]
group=torch.distributed.new_group(ranks)
ifrankinranks:
torch.distributed.barrier(group=group)
print(f'rank:{rank}finish')
else:
torch.distributed.barrier()
print(f'rank:{rank}finish')
执行这段代码可以看到:因为rank0、1的barrier指定了通信组,所以只要0、1两个rank运行到barrier就可以继续,打印出finish。rank2、3的barrier没有指定通信组,因此他们会等待所有rank到达,但是rank0、1并不会进入这个else分支,所以rank2、3会卡住。
所有通信相关算子都支持指定通信组,作用都是类似的,就是把之前全部rank间的通信变为组内rank间的通信。对于p2p通信来说,group还有一个重要作用就是可以用来作为通信标识符。如果两个rank间同时进行多个p2p通信,不同的group可以用于区分不同的通信。
通信组可以创建多个,并且每个可以指定使用不同的后端,方便进行cuda或cpu张量的混合通信:
group1=torch.distributed.new_group([0,1])
group2=torch.distributed.new_group([2,3])
group3=torch.distributed.new_group([1,2,3])
group4=torch.distributed.new_group([0,3],backend='gloo')
在使用通信组时,rank自身必须是这个通信组的成员,比如rank3不能使用group1。那么既然rank3不能使用group1,那rank3能不能干脆就不创建group1,只创建group2、3、4呢?答案是不行。torch对创建通信组有两个要求:
比如在rank0创建的第一个通信组是[0, 1],那么rank1创建的第一个也必须这个,rank2、3也一样,即使他们不能用这个通信组。所以多通信组的创建代码一般是长这样的:
group=None
rank=torch.distributed.get_rank()
forranksin[[0,1],[2,3]]:
_group=torch.distributed.new_group(ranks)
ifrankinranks:
group=_group
这样保证group的创建顺序是一致的,并且只保留自己这个rank能用的组。想一想这个设计也是合理的,想象一下两个并行的进程,相互之间是访问不到对方变量的,那两个进程怎么知道对方是用哪个通信组呢?首先构成通信组的rank不行,因为可以两个组可以由相同的rank组成,这个不具有唯一性。创建时间也不行,因为每个rank并不是完全同步的。可以人为地为每个组指定一个唯一id,那自增id也可以。
说到这里,其实基础的torch分布式训练功能就都讲完了,下面我们做个把这些功能都用上的小demo。
我们以语言模型为背景,实现一个蒸馏框架,这里teacher和student模型是模拟语言模型输入输出的假模型。蒸馏框架支持数据并行,训练数据也是模拟的。分布式优化器要自己实现。框架要支持teacher和student的重叠计算和数据通信。(如果有兴趣的话,推荐在megatron框架里实现一个蒸馏框架)
下面我们一步一步来完成,部分上面已经实现过的函数这里就不再实现了。写demo我习惯讲一步写一步,并且不给出可以直接执行的完整代码。
模拟数据
我们写一个模拟语言模型输入的迭代器来生成假的数据。语言模型的输入是token序列,也就是shape = [batch_size, seq_length] 值为 0 - vocab_size 的整型张量,这里seq_length 我们每次随机一个值。
importtorch
classDataloader:
def__init__(self,batch_size,max_length,vocab_size):
self.batch_size=batch_size
self.max_length=max_length
self.vocab_size=vocab_size
def__iter__(self):
whileTrue:
length=torch.randint(2,self.max_length,size=(1,))
input_ids=torch.randint(0,self.vocab_size,size=(self.batch_size,length),device='cpu')
yieldinput_ids
teacher&student模型
teacher和student模型都是语言模型,输入是token序列,输出是token序列的词表概率,也就是输出一个 shape = [batch_size, seq_length, vocab_size] 的浮点张量。这里我们假设参数就假设只有一个lm head头。
classModel(torch.nn.Module):
def__init__(self,vocab_size):
super().__init__()
self.lm_head=torch.nn.Parameter(torch.randn(1,vocab_size,dtype=torch.float16))
defforward(self,input_ids:torch.Tensor):
logits=input_ids.unsqueeze(-1).to(self.lm_head.dtype)@self.lm_head
probs=logits.softmax(-1)
returnprobs
分布式优化器
我们要实现一个分布式优化器用来更新student模型的参数。因为我们使用的是数据并行,梯度分散在每张卡上,在更新模型参数前,需要先进行一次all_reduce,把所有梯度加在一起。这里优化器需要传入一个通信组参数,包含所有学生rank,因为teacher模型没有优化器,不参与all_reduce。
classDistrubutedAdam(torch.optim.Adam):
def__init__(self,*args,group=None,**kwargs):
self.group=group
super().__init__(*args,**kwargs)
defstep(self,closure=None):
ifclosureisnotNone:
closure.mean().backward()
forgroupinself.param_groups:
forparamingroup['params']:
ifparam.gradisnotNone:
torch.distributed.all_reduce(param.grad,group=self.group)
super().step()
创建通信组
根据我们的任务目标,我们需要2种通信组:
我们这样规定我们的rank划分方式:
通信组的创建方式如下:
data_parallel_group=None
teacher_student_group=None
defcreate_group():
rank=torch.distributed.get_rank()
world_size=torch.distributed.get_world_size()
assertworld_size%2==0
student_ranks=list(range(world_size//2))
teacher_ranks=list(range(world_size//2,world_size))
globaldata_parallel_group
globalteacher_student_group
forranksin[student_ranks,teacher_ranks]:
group=torch.distributed.new_group(ranks,backend='nccl')
ifrankinranks:
data_parallel_group=group
forranksinzip(student_ranks,teacher_ranks):
group=torch.distributed.new_group(ranks,backend='nccl')
ifrankinranks:
teacher_student_group=group
数据发送&接收
teacher模型计算出的prob要发送给student模型。每条数据只需被一个teacher模型计算,teacher模型计算结果也只需要发给一个student,所以这里用p2p通信。我们的输入序列长度是随机变化的,所以这里需要用到上面提到的动态 shape 的 p2p通信。之前说p2p算子用的是send和recv,但是这两个算子是同步算子,这里我们用异步算子。
tensor发送
先发送shape,再发送真实tensor:
defsend_tensor(tensor,dst):
shape=torch.tensor(tensor.shape,dtype=torch.int64,device='cuda')
ops=[]
ops.append(torch.distributed.P2POp(
torch.distributed.isend,
shape,
dst,
group=teacher_student_group
))
reqs=torch.distributed.batch_isend_irecv(ops)
forreqinreqs:
req.wait()
ops=[]
ops.append(torch.distributed.P2POp(
torch.distributed.isend,
tensor,
dst,
group=teacher_student_group
))
reqs=torch.distributed.batch_isend_irecv(ops)
forreqinreqs:
req.wait()
tensor接收
这里实现的简单点,tensor的ndim和dtype作为参数传入
defrecv_tensor(src,ndim,dtype):
shape=torch.empty(ndim,dtype=torch.int64,device='cuda')
ops=[]
ops.append(torch.distributed.P2POp(
torch.distributed.irecv,
shape,
src,
group=teacher_student_group
))
reqs=torch.distributed.batch_isend_irecv(ops)
forreqinreqs:
req.wait()
tensor=torch.empty(torch.Size(shape),dtype=dtype,device='cuda')
ops=[]
ops.append(torch.distributed.P2POp(
torch.distributed.irecv,
tensor,
src,
group=teacher_student_group
))
reqs=torch.distributed.batch_isend_irecv(ops)
forreqinreqs:
req.wait()
returntensor
注:异步p2p操作是先创建算子(op),再批量执行(batch_isend_irecv),可以增加并行度。比如在megatron的pp平行中,向后一张卡发送计算结果和接收后一张卡回传梯度是同时进行的。
主函数
上面各个重要模块的功能已经实现完了,最后还剩下主函数。首先还是环境初始化,设置默认cuda设备。然后创建通信组,定义vocab size为20,计算teacher rank的 offset:
if__name__=='__main__':
rank=int(os.getenv('RANK','0'))
world_size=int(os.getenv('WORLD_SIZE','1'))
torch.distributed.init_process_group(rank=rank,world_size=world_size,backend='nccl')
devices=torch.cuda.device_count()
torch.cuda.set_device(rank%devices)
create_group()
vocab_size=20
teacher_offset=world_size//2
下面是student的逻辑:
ifrank<teacher_offset:
#student
torch.random.manual_seed(1)
dataloader=Dataloader(1,200,vocab_size)
model=Model(vocab_size).half().cuda()
optimizer=DistrubutedAdam(model.parameters(),lr=1e-2,group=data_parallel_group,eps=1e-4)
fori,input_idsinenumerate(dataloader):
ifi%teacher_offset!=rank:
continue
optimizer.zero_grad()
input_ids=input_ids.cuda()
send_tensor(input_ids,rank+teacher_offset)
student_probs=model(input_ids)
teacher_probs=recv_tensor(rank+teacher_offset,3,torch.float16)
kl_loss=teacher_probs*((teacher_probs+1e-5).log()-(student_probs+1e-5).log())
kl_loss=kl_loss.sum(-1).mean()/torch.distributed.get_world_size(data_parallel_group)
kl_loss.backward()
optimizer.step()
reporting_kl_loss=kl_loss.clone()
torch.distributed.all_reduce(reporting_kl_loss,group=data_parallel_group)
print(f'rank:{rank}reportingklloss:{reporting_kl_loss}klloss:{kl_loss}weight:{model.lm_head.data[0,:2]}',flush=True)
torch.distributed.barrier(group=data_parallel_group)
ifi>=10:
break
else:
#teacher
下面是teacher
一般所有rank的随机种子我们都设成一样的就行,这里故意把teacher的随机种子设成不一样的,避免teacher和student计算出来的prob完全一致,kl始终为0。
ifrank<teacher_offset:
#student
else:
#teacher
torch.random.manual_seed(2)
model=Model(vocab_size).half().cuda()
model.eval()
whileTrue:
input_ids=recv_tensor(rank-teacher_offset,2,torch.int64)
teacher_probs=model(input_ids)
send_tensor(teacher_probs,rank-teacher_offset)
运行后观察到这样的输出:把训练10条数据退出的逻辑去掉,应该能看到loss很快降到1e-4以下,这样demo就算完成了。
这个demo并不是最优的蒸馏框架。首先teacher和student的参数量不一样,且teacher不进行反向传播,因此两者的计算速度不一样,一个teacher配一个student效率并不一定高。可能会需要几个teacher对1个student,或者1个teacher对几个student的情况,这是完全体框架要实现的。其次,目前主流模型的词表大小在15万左右,训练数据的长度一般是8k,也就是最后的teacher_probs 是一个 8k * 150k的float张量,通信成本太高。一种优化策略是sample一部分,比如取top-n,或者放回、不放回采样。另一个策略是把teacher的lm_head层放到student所在的rank。在计算logits之前,把teacher的hidden_states通信过来,在student本地乘lm_head算出logits。
register_hook虽然不是分布式相关的功能,但基本每个框架都会用到。register_hook 的作用是在参数或算子上注册一个回调函数,当该参数或算子的梯度计算完成,但还没有赋值给grad的时候调用。如果回调函数有返回值,会使用返回值替换原本的梯度。
importtorch
defprint_grad(grad):
print(grad)
returngrad/2
w=torch.nn.Parameter(torch.randn(2,2))
w.register_hook(print_grad)
loss=(w-1)**2
print('beforebackward')
loss.mean().backward()
print('afterbackward')
print(w.grad)
上面这段代码会打印出下面这样的内容:用0替换掉梯度里的nan值是一些文章介绍register_hook给出的例子,但是实际编程我不推荐这么做。我建议遇到nan直接抛出异常,不要改成某个安全值然后更新模型,否则模型出了问题完全无法定位。
register_hook不仅可以把回调函数注册在参数上,还可以注册在算子上,这也是各个框架对register_hook的主要用法。比如下面这个操作,就是注册在了加法算子上:
importtorch
defparameter_hook(grad):
print('parameterhook')
defoperator_hook(*grads):
print('operatorhook')
w=torch.nn.Parameter(torch.randn(2,2))
w.register_hook(parameter_hook)
print('first')
y=w+1
op1=y.grad_fn
print(op1)
op1.register_hook(operator_hook)
y.sum().backward()
print('second')
z=w+1
op2=z.grad_fn
print(op2)
z.sum().backward()
运行时你会看到如下结果:算子一般都是一次性的,且是先执行算子的回调再执行参数的回调。但是有一个特殊的算子是梯度累积算子,它的回调函数发生在参数的回调函数之后,且这个算子不会每次都创建新的。
importtorch
defparameter_hook(grad):
print('parameterhook')
defoperator_hook(*grads):
print('operatorhook')
w=torch.nn.Parameter(torch.randn(2,2))
w.register_hook(parameter_hook)
y=w+1
op=y.grad_fn.next_functions[0][0]
print(op)
op.register_hook(operator_hook)
print('first')
y.sum().backward()
print('second')
z=w+1
op2=z.grad_fn.next_functions[0][0]
print(op2)
z.sum().backward()
很多框架会围绕着梯度累计算子的这个特性展开。为了获得梯度累积算子,需要创建一个计算图。一般用expand_as,这个计算结果的grad_fn指向的是expand_as自己,next_functions指向的是上一个算子,也就是梯度累积算子:
grad_acc_op=w.expand_as(w).grad_fn.next_functions[0][0]
然后可以利用闭包注册一个hook,让hook能够直接访问参数而不仅仅是梯度。
defmake_grad_hook(param):
defhook(*grads):
print(param.grad)
returnhook
grad_acc_op.register_hook(make_grad_hook(w))
这样就可以做一些骚操作了。
我们可以简单看下megatron和deepspeed的代码,看看他们是怎么用的。下面这段是megatron的用法:megatron的优化器并不使用param.grad,而是自己在参数上注册了一个param.main_grad,用它来累积梯度。这个main_grad不会删除,只会累积和清0。然后在最后一个microbatch的时候做allreduce,而不是等优化器来做:deepspeed也有类似的逻辑:deepspeed因为要支持连续内存和cpu offload,逻辑更加复杂,会等积攒的grad数量够了一批一批的操作:会使用不同事件流(stream)来增加计算和梯度累积的重叠度:使用了锁页内存,并且会在cpu和gpu之间来回来去传递张量来进行计算。有兴趣的读者可以自己研究一下,deepspeed的代码我一直没机会仔细读一遍。
在理解torch分布式训练时,多进程这个概念是一直伴随我们左右的。使用torchrun启动脚本,就是以多进程方式启动脚本。这里我们还可以再深入了解一下torch与多进程。
首先来看看python原生的多进程启动方式:
importmultiprocessingasmp
defmain(rank,world):
print(rank,world)
if__name__=='__main__':
world_size=4
ps=[mp.Process(None,main,args=(rank,world_size))forrankinrange(world_size)]
forpinps:
p.start()
forpinps:
p.join()
这种多进程能不能拿来作为torch的分布式训练环境呢?当然是可以的,只需要这样操作:
importmultiprocessingasmp
importtorch
importtorch.distributed
defmain(rank,world_size,master_addr='127.0.0.1',master_port=29500):
init_method=f'tcp://{master_addr}:{master_port}'
torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl')
print(f'rank:{torch.distributed.get_rank()}world_size:{torch.distributed.get_world_size()}')
if__name__=='__main__':
world_size=4
ps=[mp.Process(None,main,args=(rank,world_size))forrankinrange(world_size)]
forpinps:
p.start()
forpinps:
p.join()
这里init_method就是用来在初始化阶段,实现进程发现的方法,除了tcp,还可以用本地文件发现或者环境变量。除了multiprocessing 的多进程,用subprocess 的多进程也是一样可以的。当然torch也提供了一种启动多进程的方法:
importtorch
defmain(rank,world_size,master_addr='127.0.0.1',master_port=29500):
init_method=f'tcp://{master_addr}:{master_port}'
torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl')
print(f'rank:{torch.distributed.get_rank()}world_size:{torch.distributed.get_world_size()}')
if__name__=='__main__':
world_size=4
torch.multiprocessing.spawn(main,args=(world_size,),nprocs=world_size)
你也可以在多进程里再创建子进程再初始化环境:
importmultiprocessingasmp
importtorch
importtorch.distributed
defsub_process(rank,world_size,master_addr='127.0.0.1',master_port=29500):
init_method=f'tcp://{master_addr}:{master_port}'
torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl')
torch.distributed.barrier()
print(f'rank:{torch.distributed.get_rank()}world_size:{torch.distributed.get_world_size()}')
defmain(rank,world_size,master_addr='127.0.0.1',master_port=29500):
init_method=f'tcp://{master_addr}:{master_port}'
process=mp.Process(None,sub_process,args=(rank+world_size,2*world_size,))
process.start()
torch.distributed.init_process_group(rank=rank,world_size=2*world_size,init_method=init_method,backend='nccl')
torch.distributed.barrier()
print(f'rank:{torch.distributed.get_rank()}world_size:{torch.distributed.get_world_size()}')
if__name__=='__main__':
world_size=2
ps=[mp.Process(None,main,args=(rank,world_size))forrankinrange(world_size)]
forpinps:
p.start()
forpinps:
p.join()
在torch的多进程中,再次启动子进程有一点需要注意的地方。那就是如果在启动子进程之前触发了任何与cuda相关的操作,比如使用了set_device,或者在cuda上创建了一个张量,那么子进程中就不能再使用cuda。比如下面这段代码:
importmultiprocessingasmp
importtorch
importtorch.distributed
defsub_process():
tensor=torch.tensor([2]).cuda(0)
if__name__=='__main__':
torch.cuda.set_device(0)
process=mp.Process(None,sub_process)
process.start()
process.join()
运行时会报错:这个报错的意思是,cuda环境只能初始化一次,并且与进程绑定。Linux上创建的子进程默认使用的是fork的方式。fork创建的子进程会继承父进程的内存空间,因此已经绑定了父进程的cuda环境被继承给了子进程,子进程使用cuda就会报错。报错中要求子进程以spawn方式启动,是因为spawn方式启动的子进程使用的是全新的解释器,cuda还处于未初始化的状态。
这里用的时候需要权衡清楚,究竟需不需要子进程继承父进程的内存,以及是否需要在子进程使用cuda。子进程如果用spawn方式启动不继承父进程,可能需要单独初始化分布式环境,父进程的全局变量子进程也用不了。如果用fork方式启动继承父进程内存,意味着继承了父进程创建的各种变量,以及父进程初始化过的分布式环境但是不能用cuda。另外需要注意的是,就算用fork方式启动,子进程也继承不了父进程创建的通信组,但是会继承“通信组的创建顺序”。意思是如果rank0顺序创建了5个group,rank1创建的3个group,然后用fork方式启动了一个子进程,子进程又创建了2个,这2个会去对应rank0的第4、5个group。
说了这么多,多进程好像挺麻烦,那他相比torchrun有啥好处呢?
好处就是可以更加灵活的使用init_process_group初始化环境,以区分不同角色。比如上面的我们的demo中的这个蒸馏场景,我们是4个rank,分成2个student2个teacher,通信还不是很复杂。那如果student和teacher不是各占一卡,而是用了3d混合并行占很多卡,相互之间还有tp、pp的通信,通信逻辑就很复杂了。我们可以考虑给teacher和student分别用一套不同的rank、world_size个init_method初始化,让他们在这个分布式环境中只能看见自己这个角色的进程,这样就只需要实现自己的3d混合并行就可以了。
再比如如果你想在自己的训练环境中引入一个VLLM模型。VLLM内部是会调用init_process_group创建自己的环境,使用自己的rank和world_size来实现TP并行的,和你训练环境的init_process_group是冲突的。这个时候使用自定义的多进程,就可以减小VLLM的干扰。
问题来了,每个角色都使用独立的分布式环境,相互之间怎么通信呢?这就是最后的部分,TorchRPC。
TorchRPC原本的用法是在本地创建远程变量的引用,在本地调用远程函数。但是我觉得这种编程不灵活且抽象,堪比tf的静态图,所以我这里不把rpc当作远程调用,只把他当作对p2p算子的封装,以及除init_process_group之外第二种建立rank间通信的方式。
rpc的初始化和分布式环境很像。rpc的初始化和init_process_group可以同时存在,且可以使用不同的rank和world_size:
importmultiprocessingasmp
importtorch
defmain(rank,world_size):
torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}')
options=torch.distributed.rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30001')
torch.distributed.rpc.init_rpc(f'worker-{rank}',rank=rank,world_size=world_size,rpc_backend_options=options)
print(f'rank:{torch.distributed.get_rank()}',
f'world_size:{torch.distributed.get_world_size()}',
f'{torch.distributed.rpc.get_worker_info()}')
torch.distributed.rpc.shutdown()
if__name__=='__main__':
world_size=4
ps=[mp.Process(None,main,args=(rank,world_size))forrankinrange(world_size)]
forpinps:
p.start()
forpinps:
p.join()
直接用python运行,打印出如下内容:这里需要注意一点,因为我们每个rank都独立各自初始化分布式环境,互不干扰,因此init_method的port要换一下。
我们在简单重写一下之前的蒸馏demo,主要为了演示在这种用法下如何使用rpc通信。
首先定义模型和模型的调用函数,再定义一个全局变量
importmultiprocessingasmp
importtorch
importtorch.distributed
fromtorch.distributedimportrpc
classModel:
def__call__(self,tensor)->torch.Any:
returntensor+1
defcall_model(tensor):
returnmodel(tensor)
model=None
定义teacher的逻辑:
defteacher(rank,world_size):
torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}')
options=rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000')
globalmodel
model=Model()
rpc.init_rpc('teacher',rank=rank,world_size=world_size,rpc_backend_options=options)
rpc.shutdown()
先初始化分布式环境,然后创建模型,赋值给全局变量,然后再初始化rpc,确保rpc初始化后模型一定已经准备好了,最后shutdown等待。这里初始化分布式环境只是模拟一下,RPC本身并不依赖分布式环境。
然后定义student的逻辑:
defstudent(rank,world_size):
torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}')
options=rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000')
rpc.init_rpc('student',rank=rank,world_size=world_size,rpc_backend_options=options)
input_ids=torch.randn(4)
teacher_probs=rpc.rpc_async('teacher',call_model,args=(input_ids,))
#这里student计算
student_probs=input_ids
loss=teacher_probs.wait()-student_probs
print(loss)
rpc.shutdown()
student也是先初始化分布式环境和rpc,然后模拟一下输入数据input_ids。然后通过rpc.rpc_async异步远程调用teacher进程的call_model函数。此时teacher进程的全局变量应该已经有值了,call_model可以正常返回。这里使用异步调用,不需要等待结果,直接继续。下面就是假装student在计算,得到student_probs,然后计算一下差值,使用teacher_probs.wait() 等待远程调用的结果,不出意外应该等于全1向量。
最后是启动和划分角色的代码:
defmain(rank,world_size):
teacher_offset=world_size//2
ifrank<teacher_offset:
student(rank,world_size)
else:
teacher(rank,world_size)
if__name__=='__main__':
world_size=2
ps=[mp.Process(None,main,args=(rank,world_size))forrankinrange(world_size)]
forpinps:
p.start()
forpinps:
p.join()
RPC后端对NVlink、IB等都是支持的,也支持传递cuda张量,只需要在初始化rpc环境时,指定一下本机cuda和远程cuda的映射。
比如你可以在配置rpc后端时这样设置:
rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000',device_maps={'teacher':{0:1}})
这个表示把teacher进程的cuda1映射到本地的cuda0,这样本地cuda0张量通信到远端时就会被放到cuda1,不需要移动到cpu。
最后再说一下,RPC的官方用法是远程调用和远程引用,可以去看官网教程。
目前已经在Megatron-RPC框架下实现了SFT、DPO、Distillation和on-policy RS,性能持平P2P通信,远超一些非IB、NVlink的通信方案,证明RPC方案确实可行。RPC是torch1.4版本就引入的特性,支持远程引用(本地创建一个模型,但是占用远程机器的显存)、链式异步调用(以异步函数的方式调用远程模型,且调用过程中支持继续调用其他远程模型)和自动求导(远程调用返回的结果可以求导,梯度传递给远程模型)。不得不感慨torch确实有前瞻性,以前都没太关注过这个特性。