From a79ff0adc62e2d9e3e198d7299e7670063201607 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期四, 14 五月 2020 20:48:39 +0800 Subject: [PATCH] rm no use import --- agent.go | 141 ++++++++++++++++++++++++++++------------------ 1 files changed, 85 insertions(+), 56 deletions(-) diff --git a/agent.go b/agent.go index 45f9ab7..f5bb23b 100644 --- a/agent.go +++ b/agent.go @@ -36,7 +36,6 @@ "github.com/hashicorp/serf/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "log" - "runtime/debug" ) const ( @@ -62,9 +61,9 @@ } // Create create serf agent with config -func Create(conf *Config) (*Agent, error) { +func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config - serfConf, err := conf.convertToSerf() + serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } @@ -96,7 +95,7 @@ // Start agent func (a *Agent) Start(ctx context.Context) { log.Println("aaaaaaaaa") - log.Println(string(debug.Stack())) + //log.Println(string(debug.Stack())) //debug.PrintStack() err := a.Agent.Start() if err != nil { @@ -130,6 +129,26 @@ receiveSqlInterface = c } +type DbHandler interface { + Execute(sql string) bool +} + +type DbDumpHandler interface { + Dump(tables string) string +} + +//鏁版嵁搴揺xecute鍙ユ焺 +var DbHandle DbHandler +//鏁版嵁搴撳浠藉彞鏌�,鍙湪syncdb涓敤鍒� +var dbDumpH DbDumpHandler + +func RegisterDbHandler(h DbHandler) { + DbHandle = h +} + +func RegisterDbDumpHandler(h DbDumpHandler) { + dbDumpH = h +} // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" @@ -148,9 +167,6 @@ return } if sqlUe.Owner != a.conf.NodeName { - ExecuteWriteSql(sqlUe.Sql, true) - //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) - //fmt.Println("userEvent exec 1 ", sqlUe.Sql, ",Result:", results) if receiveSqlInterface != nil && sqlUe.Sql !=nil && len(sqlUe.Sql) > 0 { receiveSqlInterface.Forward(sqlUe.Sql[0]) } @@ -192,27 +208,18 @@ //fmt.Println(string(ev.Payload)) var tableNames []string err := json.Unmarshal(ev.Payload, &tableNames) - if err !=nil { - //fmt.Println("Query tableNames unmarshal err") - return - } + fmt.Println("androidSync tableNames:", tableNames, "err:", err) - rows, err := ExecuteQuerySql(tableNames) - if err != nil { - //fmt.Println("err: ", err) - return - } - var rowsReturn []Rows - for _, r := range rows { - rowsReturn = append(rowsReturn, *r) - } + dumpData := dbDumpH.Dump(strings.Join(tableNames, ",")) - bytesReturn, err := json.Marshal(rowsReturn) - //fmt.Println("results: ", bytesReturn) + bytesReturn := []byte(dumpData) + fmt.Println("androidSync len(bytesReturn): ", len(bytesReturn)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { - //fmt.Printf("err: %s\n", err) + fmt.Println("androidSync query.Respond err:", err) return + } else { + fmt.Println("androidSync query.Respond success") } } } @@ -222,9 +229,8 @@ leaveMember := ev.Members[0] leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'" //fmt.Println("leaveSql:", leaveSql) - _, err := ExecuteWriteSql([]string{leaveSql}, false) - if err != nil { - //fmt.Printf("err: %s\n", err) + if !DbHandle.Execute(leaveSql) { + fmt.Println("DbHandle.Execute ret false") } //fmt.Println("EventMemberLeave,current Members:", ev.Members) @@ -439,29 +445,18 @@ //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, -func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string, error) { +func (a *Agent) GetTableDataFromCluster(tableNames []string) (*string, error) { //members: get name of first member - mbs := a.GroupMembers(a.conf.ClusterID) - var specmembername string - for _, m := range mbs { - //fmt.Println("m", m) - if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad - if strings.HasPrefix(a.conf.NodeName, "DSVAD"){ - if strings.HasPrefix(m.Name, "DSVAD") { - specmembername = m.Name - break - } - }else{ - specmembername = m.Name - break - } - } + targetName := a.GetOneAliveNodeId(20 * time.Second) + + if targetName == "" { + return nil, errors.New("alive node not found in cluster") } - //fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername) + //fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "targetName:", targetName) //query: get db file. params := serf.QueryParam{ - FilterNodes: strings.Fields(specmembername), + FilterNodes: strings.Fields(targetName), } //SQL @@ -469,24 +464,25 @@ resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - //fmt.Println("err: ", err) + //fmt.Println("QueryEventUpdateDBData err: ") } - //fmt.Println("Query.resp.err:", err, "resp:", resp) + fmt.Println("Query.resp.err:", err, "resp:", resp) var wg sync.WaitGroup wg.Add(1) - var dumpSqls []string + var dumpSqls string go func() { defer wg.Done() respCh := resp.ResponseCh() for { select { case r := <-respCh: - //fmt.Println("Query response's len:", len(r.Payload)) - err := json.Unmarshal(r.Payload, &dumpSqls) - if err == nil { - //fmt.Println("dumpSql:", dumpSqls) - //fmt.Println("data dump success") + fmt.Println("Query response's len:", len(r.Payload)) + dumpSqls = string(r.Payload) + if len(dumpSqls) >0 { + fmt.Println("data dump success") + } else { + fmt.Println("Query response.len = 0") } return } @@ -494,6 +490,39 @@ }() wg.Wait() return &dumpSqls, nil +} + +func (a *Agent) GetOneAliveNodeId(timeOut time.Duration) string { + nodeId := "" + sTk := time.NewTicker(1 * time.Second) + outTk := time.NewTicker(timeOut) + defer sTk.Stop() + defer outTk.Stop() + + for { + select { + case <- sTk.C: + mbs := a.GroupMembers(a.conf.ClusterID) + for _, m := range mbs { + //fmt.Println("m", m) + if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad + if strings.HasPrefix(a.conf.NodeName, "PAD"){ + if strings.HasPrefix(m.Name, "PAD") { + nodeId = m.Name + goto Out + } + } + } + } + case <-outTk.C: + fmt.Println("cannot get alive node from cluster,time out") + goto Out + default: + + } + } +Out: + return nodeId } type SqlUserEvent struct { @@ -520,8 +549,8 @@ } //Init serf Init -func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) { - agent, err := InitNode(clusterID, password, nodeID) +func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { + agent, err := InitNode(clusterID, password, nodeID, snapshotPath) if err != nil { //fmt.Printf("InitNode failed, error: %s", err) return agent, err @@ -537,7 +566,7 @@ } //InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝 -func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { +func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { conf := DefaultConfig() //fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID @@ -553,7 +582,7 @@ } conf.EncryptKey = password } - agent, err := Create(conf) + agent, err := Create(conf, snapshotPath) if err != nil { //fmt.Printf("create agent failed, error: %s", err) return agent, err -- Gitblit v1.8.0