Scheaven
2021-09-18 291deeb1fcf45dbf39a24aa72a213ff3fd6b3405
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
B
©r_ûã@sÔdZddlZddlZddlZddlZddlZddlmZ    dZ
e dœdd„Z e dœdd„Z e dœdd    „Ze dœd
d „Zedœd d „Zdd„Ze ¡dd„ƒZdd„Zdd„Zddd„Zd dd„Zdd„Zd!dd„ZdS)"zl
This file contains primitives for multi-gpu communication.
This is useful when doing distributed training.
éN)ÚreturncCs t ¡s dSt ¡sdSt ¡S)Né)ÚdistÚ is_availableÚis_initializedÚget_world_size©rrú./utils/comm.pyrs
rcCs t ¡s dSt ¡sdSt ¡S)Nr)rrrÚget_rankrrrr    r
s
r
cCs0t ¡s dSt ¡sdStdk    s$t‚tjtdS)zh
    Returns:
        The rank of the current process within the local (per-machine) process group.
    rN)Úgroup)rrrÚ_LOCAL_PROCESS_GROUPÚAssertionErrorr
rrrr    Úget_local_rank$s  rcCs$t ¡s dSt ¡sdStjtdS)zw
    Returns:
        The size of the per-machine process group,
        i.e. the number of processes per machine.
    r)r )rrrrr rrrr    Úget_local_size1s
rcCs
tƒdkS)Nr)r
rrrr    Úis_main_process>srcCs8t ¡s dSt ¡sdSt ¡}|dkr,dSt ¡dS)zj
    Helper function to synchronize (barrier) among all processes when
    using distributed training
    Nr)rrrrÚbarrier)Ú
world_sizerrr    Ú synchronizeBsrcCs$t ¡dkrtjddStjjSdS)zj
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    ÚncclÚgloo)ÚbackendN)rÚ get_backendÚ    new_groupr ÚWORLDrrrr    Ú_get_global_gloo_groupQs  rcCsŒt |¡}|dkst‚t |dkr&dnd¡}t |¡}t|ƒdkrjt     t
¡}|  d  t ƒt|ƒd|¡¡tj |¡}t |¡j|d}|S)N)rrrÚcpuÚcudai@z;Rank {} trying to all-gather {:.2f} GB of data on device {})Údevice)rrr ÚtorchrÚpickleÚdumpsÚlenÚloggingÚ    getLoggerÚ__name__ÚwarningÚformatr
Ú ByteStorageÚ from_bufferÚ
ByteTensorÚto)Údatar rrÚbufferÚloggerÚstorageÚtensorrrr    Ú_serialize_to_tensor]s
 
 
 r0cs®tj|d}|dkstdƒ‚tjˆ ¡gtjˆjd}‡fdd„t|ƒDƒ}tj    |||ddd„|Dƒ}t
|ƒ}||kr¦tj ||ftj ˆjd}tj ˆ|fdd    ‰|ˆfS)
zz
    Returns:
        list[int]: size of the tensor, on each rank
        Tensor: padded tensor that has the max size
    )r rzHcomm.gather/all_gather must be called from ranks within the given group!)Údtypercs"g|]}tjdgtjˆjd‘qS)r)r1r)rÚzerosÚint64r)Ú.0Ú_)r/rr    ú
<listcomp>{sz*_pad_to_largest_tensor.<locals>.<listcomp>cSsg|]}t| ¡ƒ‘qSr)ÚintÚitem)r4Úsizerrr    r6~sr)Údim)rrr rr/Únumelr3rÚrangeÚ
all_gatherÚmaxr2Úuint8Úcat)r/r rZ
local_sizeÚ    size_listÚmax_sizeÚpaddingr)r/r    Ú_pad_to_largest_tensoros 
rDcs¾tƒdkr|gS|dkrtƒ}t |¡dkr2|gSt||ƒ‰tˆ|ƒ\}‰t|ƒ‰‡‡fdd„|Dƒ}tj|ˆ|dg}x>t||ƒD]0\}‰ˆ ¡     ¡ 
¡d|…}|  t   |¡¡q†W|S)a;
    Run all_gather on arbitrary picklable data (not necessarily tensors).
    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.
    Returns:
        list[data]: list of data gathered from each rank
    rNcs"g|]}tjˆftjˆjd‘qS))r1r)rÚemptyr?r)r4r5)rBr/rr    r6¢szall_gather.<locals>.<listcomp>)r )rrrr0rDr>r=ÚziprÚnumpyÚtobytesÚappendrÚloads)r+r rAÚ tensor_listÚ    data_listr9r,r)rBr/r    r=Šs 
 
 
r=c    sðtƒdkr|gS|dkrtƒ}tj|ddkr4|gStj|d}t||ƒ‰tˆ|ƒ\}‰||krÖt|ƒ‰‡‡fdd„|Dƒ}tjˆ|||dg}x>t||ƒD]0\}‰ˆ     ¡ 
¡  ¡d|…}|  t  |¡¡qžW|Stjˆg||dgSdS)aŒ
    Run gather on arbitrary picklable data (not necessarily tensors).
    Args:
        data: any picklable object
        dst (int): destination rank
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.
    Returns:
        list[data]: on dst, a list of data gathered from each rank. Otherwise,
            an empty list.
    rN)r cs"g|]}tjˆftjˆjd‘qS))r1r)rrEr?r)r4r5)rBr/rr    r6Észgather.<locals>.<listcomp>)Údstr )rrrr
r0rDr>ÚgatherrFrrGrHrIrrJ)    r+rMr ÚrankrArKrLr9r,r)rBr/r    rN®s(
 
rNcCstj d¡}t|ƒ}|dS)zü
    Returns:
        int: a random number that is the same across all workers.
            If workers need a shared RNG, they can use this shared seed to
            create one.
    All workers must call this function, otherwise it will deadlock.
    lr)ÚnpÚrandomÚrandintr=)ÚintsZall_intsrrr    Úshared_random_seed×s rTTc    Cs¨tƒ}|dkr|St ¡„g}g}x,t| ¡ƒD]}| |¡| ||¡q2Wtj|dd}tj|ddt     ¡dkr†|r†||}dd„t
||ƒDƒ}WdQRX|S)ac
    Reduce the values in the dictionary from all processes so that process with rank
    0 has the reduced results.
    Args:
        input_dict (dict): inputs to be reduced. All the values must be scalar CUDA Tensor.
        average (bool): whether to do average or sum
    Returns:
        a dict with the same keys as input_dict, after reduction.
    ér)r:)rMcSsi|]\}}||“qSrr)r4ÚkÚvrrr    ú
<dictcomp>þszreduce_dict.<locals>.<dictcomp>N) rrÚno_gradÚsortedÚkeysrIÚstackrÚreducer
rF)Z
input_dictÚaveragerÚnamesÚvaluesrVZ reduced_dictrrr    Ú reduce_dictäs
 
 
ra)N)rN)T)Ú__doc__Ú    functoolsr"rGrPrrÚtorch.distributedÚ distributedrr r7rr
rrÚboolrrÚ    lru_cacherr0rDr=rNrTrarrrr    Ú<module>s(    
$
)