Scheaven
2021-09-18 291deeb1fcf45dbf39a24aa72a213ff3fd6b3405
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
U
©r_ûã@sÔdZddlZddlZddlZddlZddlZddlmZ    dZ
e dœdd„Z e dœdd„Z e dœdd    „Ze dœd
d „Zedœd d „Zdd„Ze ¡dd„ƒZdd„Zdd„Zddd„Zd dd„Zdd„Zd!dd„ZdS)"zl
This file contains primitives for multi-gpu communication.
This is useful when doing distributed training.
éN)ÚreturncCs t ¡s dSt ¡sdSt ¡S)Né)ÚdistÚ is_availableÚis_initializedÚget_world_size©rrú./utils/comm.pyrs
rcCs t ¡s dSt ¡sdSt ¡S©Nr)rrrÚget_rankrrrr    r s
r cCs0t ¡s dSt ¡sdStdk    s$t‚tjtdS)zh
    Returns:
        The rank of the current process within the local (per-machine) process group.
    rN©Úgroup)rrrÚ_LOCAL_PROCESS_GROUPÚAssertionErrorr rrrr    Úget_local_rank$s  rcCs$t ¡s dSt ¡sdStjtdS)zw
    Returns:
        The size of the per-machine process group,
        i.e. the number of processes per machine.
    rr )rrrrrrrrr    Úget_local_size1s
rcCs
tƒdkSr
)r rrrr    Úis_main_process>srcCs8t ¡s dSt ¡sdSt ¡}|dkr,dSt ¡dS)zj
    Helper function to synchronize (barrier) among all processes when
    using distributed training
    Nr)rrrrÚbarrier)Ú
world_sizerrr    Ú synchronizeBsrcCs$t ¡dkrtjddStjjSdS)zj
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    ÚncclÚgloo)ÚbackendN)rÚ get_backendÚ    new_groupr ÚWORLDrrrr    Ú_get_global_gloo_groupQs  rcCsŒt |¡}|dkst‚t |dkr&dnd¡}t |¡}t|ƒdkrjt     t
¡}|  d  t ƒt|ƒd|¡¡tj |¡}t |¡j|d}|S)N)rrrÚcpuÚcudai@z;Rank {} trying to all-gather {:.2f} GB of data on device {})Údevice)rrrÚtorchrÚpickleÚdumpsÚlenÚloggingÚ    getLoggerÚ__name__ÚwarningÚformatr Ú ByteStorageÚ from_bufferÚ
ByteTensorÚto)Údatar rrÚbufferÚloggerÚstorageÚtensorrrr    Ú_serialize_to_tensor]s 
 
 
 
ÿÿ r2cs®tj|d}|dkstdƒ‚tjˆ ¡gtjˆjd}‡fdd„t|ƒDƒ}tj    |||ddd„|Dƒ}t
|ƒ}||kr¦tj ||ftj ˆjd}tj ˆ|fdd    ‰|ˆfS)
zz
    Returns:
        list[int]: size of the tensor, on each rank
        Tensor: padded tensor that has the max size
    r rzHcomm.gather/all_gather must be called from ranks within the given group!©Údtypercs"g|]}tjdgtjˆjd‘qS)rr3)r ÚzerosÚint64r©Ú.0Ú_©r1rr    Ú
<listcomp>zsz*_pad_to_largest_tensor.<locals>.<listcomp>cSsg|]}t| ¡ƒ‘qSr)ÚintÚitem)r8Úsizerrr    r;~sr©Údim)rrrr r1Únumelr6rÚrangeÚ
all_gatherÚmaxr5Úuint8Úcat)r1r rZ
local_sizeÚ    size_listÚmax_sizeÚpaddingrr:r    Ú_pad_to_largest_tensoros  ÿþ
ÿrJcsºtƒdkr|gS|dkrtƒ}t |¡dkr2|gSt||ƒ‰tˆ|ƒ\}‰t|ƒ‰‡‡fdd„|Dƒ}tj|ˆ|dg}t||ƒD]0\}‰ˆ ¡     ¡ 
¡d|…}|  t   |¡¡q„|S)a;
    Run all_gather on arbitrary picklable data (not necessarily tensors).
    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.
    Returns:
        list[data]: list of data gathered from each rank
    rNcs"g|]}tjˆftjˆjd‘qS©r3©r ÚemptyrErr7©rHr1rr    r;¡szall_gather.<locals>.<listcomp>r )rrrr2rJrDrCÚziprÚnumpyÚtobytesÚappendr!Úloads)r-r rGÚ tensor_listÚ    data_listr>r.rrNr    rCŠs$
 
 
 ÿrCc    sìtƒdkr|gS|dkrtƒ}tj|ddkr4|gStj|d}t||ƒ‰tˆ|ƒ\}‰||krÒt|ƒ‰‡‡fdd„|Dƒ}tjˆ|||dg}t||ƒD]0\}‰ˆ     ¡ 
¡  ¡d|…}|  t  |¡¡qœ|Stjˆg||dgSdS)aŒ
    Run gather on arbitrary picklable data (not necessarily tensors).
    Args:
        data: any picklable object
        dst (int): destination rank
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.
    Returns:
        list[data]: on dst, a list of data gathered from each rank. Otherwise,
            an empty list.
    rNr cs"g|]}tjˆftjˆjd‘qSrKrLr7rNrr    r;Èszgather.<locals>.<listcomp>)Údstr )rrrr r2rJrDÚgatherrOrrPrQrRr!rS)    r-rVr ÚrankrGrTrUr>r.rrNr    rW®s,
 
 ÿrWcCstj d¡}t|ƒ}|dS)zü
    Returns:
        int: a random number that is the same across all workers.
            If workers need a shared RNG, they can use this shared seed to
            create one.
    All workers must call this function, otherwise it will deadlock.
    lr)ÚnpÚrandomÚrandintrC)ÚintsZall_intsrrr    Úshared_random_seed×s r]Tc    Cs¤tƒ}|dkr|St ¡€g}g}t| ¡ƒD]}| |¡| ||¡q0tj|dd}tj|ddt     ¡dkr‚|r‚||}dd„t
||ƒDƒ}W5QRX|S)ac
    Reduce the values in the dictionary from all processes so that process with rank
    0 has the reduced results.
    Args:
        input_dict (dict): inputs to be reduced. All the values must be scalar CUDA Tensor.
        average (bool): whether to do average or sum
    Returns:
        a dict with the same keys as input_dict, after reduction.
    érr?)rVcSsi|]\}}||“qSrr)r8ÚkÚvrrr    Ú
<dictcomp>þszreduce_dict.<locals>.<dictcomp>) rr Úno_gradÚsortedÚkeysrRÚstackrÚreducer rO)Z
input_dictÚaveragerÚnamesÚvaluesr_Z reduced_dictrrr    Ú reduce_dictäs
 
 
rj)N)rN)T)Ú__doc__Ú    functoolsr$rPrYr!r Ztorch.distributedÚ distributedrrr<rr rrÚboolrrÚ    lru_cacherr2rJrCrWr]rjrrrr    Ú<module>s*   
 
$
)