sunty
2020-04-16 b7ce0659f5e04952c4ea8a6ebb714c5615017c3b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package middleware
 
import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
    "swfs/config"
    "swfs/tools/es"
    "swfs/tools/script"
    "sync"
    "time"
)
 
//作为主节点加入(默认包含数据节点)
func AsMaster(role string) bool {
    es.AddNewMasterToPeers()
    nowPeers := es.GetNowPeersList()
    fmt.Println("nowPeers: ", nowPeers)
    RequestNodesOperation(nowPeers, role)
    return true
}
 
//作为数据节点加入
func AsVolume() bool {
    nowPeers := es.GetNowPeersList()
    if nowPeers == nil || len(nowPeers) == 0 {
        return false
    }
    script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsVolume, config.Option, config.Server.ScriptPath, config.StartServerScript)
    script.ReplaceLineContentBySearch(es.GetNewPeers(), config.Peer, config.Server.ScriptPath, config.StartServerScript)
    script.StartServer(config.Server.ScriptPath)
    return true
}
 
//作为 主+数据 节点加入
func AsMaVo(role string) {
    AsMaster(role)
    script.ReplaceLineContentBySearch(config.Option+config.StartScriptAsMaVo, config.Option, config.Server.ScriptPath, config.StartServerScript)
}
 
//更新所有节点的脚本参数
func UpdateAllNodesScriptArgument(peersIp []string) {
    fmt.Println("开始更新本地配置文件")
    for _, ip := range peersIp {
        fmt.Println("ip: ", ip)
        url := "http://" + ip + ":7020/node/api-v/swfs/updateSWFSService"
        fmt.Println("url", url)
        resp, _ := http.Get(url)
        fmt.Println("更新返回状态:", resp.StatusCode)
        if resp.StatusCode == 200 {
            fmt.Println("请求完毕", resp.StatusCode)
        }
    }
}
 
//请求作为当前角色节点操作流程
func RequestNodesOperation(nowPeers []interface{}, role string) {
    peersIp := make([]string, 0)
    for _, val := range nowPeers {
        peersIp = append(peersIp, strings.Split(val.(string), ":")[0])
    }
    coreBaseUnit, _ := strconv.Atoi(config.Server.CoreBaseUnit)
    UpdateAllNodesScriptArgument(peersIp)
    replaceContent := ""
    if role == "master" {
        replaceContent = config.Option + config.StartScriptAsMaster
    } else if role == "master+volume" {
        replaceContent = config.Option + config.StartScriptAsMaVo
    }
    script.ReplaceLineContentBySearch(replaceContent, config.Option, config.Server.ScriptPath, config.StartServerScript)
    RestartAllServer(peersIp, coreBaseUnit)
}
 
//重启所有节点服务并验证
func Restart(ip string, timeOut int) {
    url := "http://" + ip + ":7020/node/api-v/swfs/restartServer"
    var info interface{}
    httpRes, _ := http.Get(url)
    body, _ := ioutil.ReadAll(httpRes.Body)
    json.Unmarshal(body, &info)
    res, ok := info.(map[string]interface{})
    if !ok {
        fmt.Println("http response interface can not change map[string]interface{}")
    }
    fmt.Println("res: ", res)
    startupItem := res["data"].(string)
    if httpRes.StatusCode != 200 {
        return
    }
    fmt.Println("Restart startupItem: ", startupItem)
    tick := time.Tick(1 * time.Second)
    fmt.Println("准备开始验证节点服务")
    for countdown := timeOut; countdown > 0; countdown-- {
        fmt.Println("第", countdown, "次验证")
        result := Verification(startupItem, ip)
        fmt.Println("第一次验证result结果:", result)
        if result == true {
            break
        }
        <-tick
    }
    fmt.Println("验证完毕")
}
 
//验证服务状态
func Verification(startupItem string, ip string) bool {
    resStatu := false
    fmt.Println("Verification startupItem: ", startupItem)
    switch startupItem {
    case config.StartScriptAsVolume:
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, volume1Err := http.Get(verificationVolumeUrl)
        fmt.Println("volume1Err", volume1Err)
        if volume1Err == nil {
            resStatu = true
        }
    case config.StartScriptAsMaster:
        verificationMasterUrl := "http://" + ip + ":6333"
        fmt.Println("verificationMasterUrl: ", verificationMasterUrl)
        _, masterErr := http.Get(verificationMasterUrl)
        fmt.Println("masterErr", masterErr)
        if masterErr == nil {
            resStatu = true
        }
    case config.StartScriptAsMaVo:
        verificationMasterUrl := "http://" + ip + ":6333"
        verificationVolumeUrl := "http://" + ip + ":6700/ui/index.html"
        _, masterErr := http.Get(verificationMasterUrl)
        fmt.Println("masterErr", masterErr)
        _, volume1Err := http.Get(verificationVolumeUrl)
        fmt.Println("volume1Err", volume1Err)
        if masterErr == nil && volume1Err == nil {
            resStatu = true
        }
    }
    return resStatu
}
 
//获取本地启动项
func GetLocalStartupItem(scriptPath string, scriptFile string) string {
    startupItem := script.GetNowLineContent(scriptPath+"/"+scriptFile, config.Option)
    fmt.Println("startupItem: ", startupItem)
    return startupItem
}
 
//构建重启流程
func RestartAllServer(peersIp []string, coreBaseUnit int) {
    fmt.Println("开始构建重启流程")
    coreThread := len(peersIp)/coreBaseUnit + 1
    masterIp := make([]string, 0)
    timeOut, _ := strconv.Atoi(config.Server.TimeOut)
    var waitGroup sync.WaitGroup
    fmt.Println("当前并行度coreThread:", coreThread)
    for i, ip := range peersIp {
        fmt.Println("重启当前组服务" + ip)
        if (i+1)%coreThread == 0 {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            fmt.Println("加入第一组并开始验证第一组 masterIp: ", masterIp)
            for i := 0; i < len(masterIp); i++ {
                fmt.Println("len masterIp: ", len(masterIp))
                fmt.Println("第" + strconv.Itoa(i) + "个线程")
                fmt.Println("当前goroutinebe")
                waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                fmt.Println("当前goroutineaf")
                go Restart(masterIp[i], timeOut)
                waitGroup.Done()
            }
            fmt.Println("这里为阻塞!!!!!")
            waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
            fmt.Println("当前组任务完成")
            masterIp = make([]string, 0)
            fmt.Println("清空当前组成员:", masterIp)
        } else {
            masterIp = append(masterIp, strings.Split(ip, ":")[0])
            if len(peersIp) == i+1 {
                var waitGroup sync.WaitGroup
                for i := 0; i < len(masterIp); i++ {
                    waitGroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1
                    go Restart(masterIp[i], timeOut)
                    waitGroup.Done()
                }
                waitGroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞
                masterIp = make([]string, 0)
                break
            }
        }
 
    }
    fmt.Println("服务流程执行完毕")
 
}