From a79ff0adc62e2d9e3e198d7299e7670063201607 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期四, 14 五月 2020 20:48:39 +0800 Subject: [PATCH] rm no use import --- agent.go | 357 +++++++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 234 insertions(+), 123 deletions(-) diff --git a/agent.go b/agent.go index 1f84e47..f5bb23b 100644 --- a/agent.go +++ b/agent.go @@ -25,6 +25,8 @@ "io/ioutil" "net" "os" + "strconv" + "sync" //"os" "strings" @@ -37,8 +39,10 @@ ) const ( - QueryEventGetDB = "GetDatabase" - QueryEventUpdateDBData = "UpdateDBData" + QueryEventGetDB = "GetDatabase" + QueryEventUpdateDBData = "UpdateDBData" + UserEventSyncSql = "SyncSql" + UserEventSyncDbTablePersonCache = "SyncCache" ) // Agent warps the serf agent @@ -57,15 +61,15 @@ } // Create create serf agent with config -func Create(conf *Config) (*Agent, error) { +func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config - serfConf, err := conf.convertToSerf() + serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } // create serf agent with serf config - fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) + //fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, nil) if err != nil { return nil, err @@ -73,12 +77,12 @@ // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { - fmt.Printf("Failed to restore keyring: %s", err) + //fmt.Printf("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring - fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", - len(conf.EncryptKey), conf.EncryptKey) + //fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", + // len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, @@ -90,6 +94,9 @@ // Start agent func (a *Agent) Start(ctx context.Context) { + log.Println("aaaaaaaaa") + //log.Println(string(debug.Stack())) + //debug.PrintStack() err := a.Agent.Start() if err != nil { log.Println(err, "start serf agent failed") @@ -109,6 +116,40 @@ go a.BroadcastMemberlist(BroadcastInterval * time.Second) } +// define call back interface +type ReceiveSqlInterface interface { + Forward(content string) +} + +// callback instance +var receiveSqlInterface ReceiveSqlInterface + +// save the callback instance +func RegisterReceiveSqlInterface(c ReceiveSqlInterface) { + receiveSqlInterface = c +} + +type DbHandler interface { + Execute(sql string) bool +} + +type DbDumpHandler interface { + Dump(tables string) string +} + +//鏁版嵁搴揺xecute鍙ユ焺 +var DbHandle DbHandler +//鏁版嵁搴撳浠藉彞鏌�,鍙湪syncdb涓敤鍒� +var dbDumpH DbDumpHandler + +func RegisterDbHandler(h DbHandler) { + DbHandle = h +} + +func RegisterDbDumpHandler(h DbDumpHandler) { + dbDumpH = h +} + // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", @@ -117,17 +158,19 @@ switch ev := event.(type) { case serf.UserEvent: - var sqlUe SqlUserEvent - fmt.Println(string(ev.Payload)) - err := json.Unmarshal(ev.Payload, &sqlUe) - if err !=nil { - fmt.Println("sqlUe unmarshal err:",err) - return - } - if sqlUe.Owner != a.conf.NodeName { - results, _ := ExecuteWriteSql(sqlUe.Sql) - //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) - fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",results) + if ev.Name == UserEventSyncSql { + var sqlUe SqlUserEvent + //fmt.Println(string(ev.Payload)) + err := json.Unmarshal(ev.Payload, &sqlUe) + if err != nil { + //fmt.Println("sqlUe unmarshal err:", err) + return + } + if sqlUe.Owner != a.conf.NodeName { + if receiveSqlInterface != nil && sqlUe.Sql !=nil && len(sqlUe.Sql) > 0 { + receiveSqlInterface.Forward(sqlUe.Sql[0]) + } + } } case *serf.Query: @@ -136,89 +179,68 @@ //bak file and send resp filename, err := BakDbFile() if err != nil { - fmt.Println("bak db file error!") + //fmt.Println("bak db file error!") return } - fmt.Println(filename) + //fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) - fmt.Println("filebuf: ", len(filebuf)) + //fmt.Println("filebuf: ", len(filebuf)) if err != nil { - fmt.Printf("file to []bytes error: %s\n", err) + //fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { - fmt.Printf("remove file%s\n failed", filename) + //fmt.Printf("remove file%s\n failed", filename) return } - fmt.Println("query payload: ", len(ev.Payload)) + //fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { - fmt.Printf("err: %s\n", err) + //fmt.Printf("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { - fmt.Println(string(ev.Payload)) - var tmpstringslice []string - tmpstringslice = append(tmpstringslice, string(ev.Payload)) - fmt.Println(tmpstringslice) - rows, err := ExecuteQuerySql(tmpstringslice) - if err != nil { - fmt.Println("err: ", err) - return - } - var rowsReturn []Rows - for _, r := range rows { - rowsReturn = append(rowsReturn, *r) - } + //fmt.Println(string(ev.Payload)) + var tableNames []string + err := json.Unmarshal(ev.Payload, &tableNames) + fmt.Println("androidSync tableNames:", tableNames, "err:", err) - bytesReturn, err := json.Marshal(rowsReturn) - fmt.Println("results: ", bytesReturn) + dumpData := dbDumpH.Dump(strings.Join(tableNames, ",")) + + bytesReturn := []byte(dumpData) + fmt.Println("androidSync len(bytesReturn): ", len(bytesReturn)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { - fmt.Printf("err: %s\n", err) + fmt.Println("androidSync query.Respond err:", err) return + } else { + fmt.Println("androidSync query.Respond success") } } - - //var res []*Rows - //json.Unmarshal(bytesReturn, &res) } - case serf.MemberEvent: - if event.EventType() == serf.EventMemberLeave { - if ev.Members !=nil && len(ev.Members) ==1 { - leaveMember := ev.Members[0] - leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'" - fmt.Println("leaveSql:",leaveSql) - //ExecuteSqlByGorm([]string{ leaveSql }) - - fmt.Println("EventMemberLeave,current Members:",ev.Members) + case serf.MemberEvent: + if event.EventType() == serf.EventMemberLeave { + if ev.Members != nil && len(ev.Members) == 1 { + leaveMember := ev.Members[0] + leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'" + //fmt.Println("leaveSql:", leaveSql) + if !DbHandle.Execute(leaveSql) { + fmt.Println("DbHandle.Execute ret false") } - return - } + //fmt.Println("EventMemberLeave,current Members:", ev.Members) + } + return + } default: - fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) + //fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } - - //if event.EventType() != serf.EventMemberJoin { - // fmt.Printf("event.EventType() != serf.EventMemberJoin") - // return - //} - // - //if a.conf.Mode == ModeCluster { - // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect { - // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect") - // return - // } - //} - //a.DeregisterEventHandler(a) - //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { @@ -226,7 +248,7 @@ serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() - fmt.Println("mb:", mb) + //fmt.Println("mb:", mb) // copy local node localNode := *mblist.LocalNode() @@ -247,7 +269,7 @@ //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { - // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) + // //fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } @@ -385,7 +407,7 @@ break } } - fmt.Println(specmembername) + //fmt.Println(specmembername) //query: get db file. params := serf.QueryParam{ @@ -394,7 +416,7 @@ resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) } go func() { @@ -402,18 +424,18 @@ for { select { case r := <-respCh: - fmt.Println("x length is: ", len(r.Payload)) + //fmt.Println("x length is: ", len(r.Payload)) // // byte to file. SerfDbConn.Close() SerfDbConn = nil err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644) if err != nil { - fmt.Println("query byte to file error!", err) + //fmt.Println("query byte to file error!", err) } err := InitDbConn("") if err != nil { - fmt.Println("create db conn of test.db error: ", err) + //fmt.Println("create db conn of test.db error: ", err) } return } @@ -421,39 +443,122 @@ }() } -type SqlUserEvent struct { - Owner string `json:"owner"` - Sql []string `json:"sql"` +//GetDbFromCluster get the newest database after join cluster +//dbPathWrite the path where to write after got a database, +func (a *Agent) GetTableDataFromCluster(tableNames []string) (*string, error) { + //members: get name of first member + targetName := a.GetOneAliveNodeId(20 * time.Second) + + if targetName == "" { + return nil, errors.New("alive node not found in cluster") + } + //fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "targetName:", targetName) + + //query: get db file. + params := serf.QueryParam{ + FilterNodes: strings.Fields(targetName), + } + + //SQL + tBytes, _ := json.Marshal(tableNames) + + resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) + if err == nil || !strings.Contains(err.Error(), "cannot contain") { + //fmt.Println("QueryEventUpdateDBData err: ") + } + fmt.Println("Query.resp.err:", err, "resp:", resp) + + var wg sync.WaitGroup + wg.Add(1) + var dumpSqls string + go func() { + defer wg.Done() + respCh := resp.ResponseCh() + for { + select { + case r := <-respCh: + fmt.Println("Query response's len:", len(r.Payload)) + dumpSqls = string(r.Payload) + if len(dumpSqls) >0 { + fmt.Println("data dump success") + } else { + fmt.Println("Query response.len = 0") + } + return + } + } + }() + wg.Wait() + return &dumpSqls, nil } + +func (a *Agent) GetOneAliveNodeId(timeOut time.Duration) string { + nodeId := "" + sTk := time.NewTicker(1 * time.Second) + outTk := time.NewTicker(timeOut) + defer sTk.Stop() + defer outTk.Stop() + + for { + select { + case <- sTk.C: + mbs := a.GroupMembers(a.conf.ClusterID) + for _, m := range mbs { + //fmt.Println("m", m) + if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad + if strings.HasPrefix(a.conf.NodeName, "PAD"){ + if strings.HasPrefix(m.Name, "PAD") { + nodeId = m.Name + goto Out + } + } + } + } + case <-outTk.C: + fmt.Println("cannot get alive node from cluster,time out") + goto Out + default: + + } + } +Out: + return nodeId +} + +type SqlUserEvent struct { + Owner string `json:"owner"` + Sql []string `json:"sql"` +} + //SyncSql boardcast sql to cluster func (a *Agent) SyncSql(sqlOp []string) { // event : use to send command to operate db. var sqlUe = SqlUserEvent{ Owner: a.conf.NodeName, - Sql: sqlOp, + Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) - if err !=nil { - fmt.Println("sqlUE marshal err:",err) + if err != nil { + //fmt.Println("sqlUE marshal err:", err) return } - err = a.UserEvent("SyncSql", []byte(ueB), false) + err = a.UserEvent(UserEventSyncSql, ueB, false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) } } //Init serf Init -func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) { - agent, err := InitNode(clusterID, password, nodeID) +func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { + agent, err := InitNode(clusterID, password, nodeID, snapshotPath) if err != nil { - fmt.Printf("InitNode failed, error: %s", err) + //fmt.Printf("InitNode failed, error: %s", err) return agent, err } - err = agent.JoinByNodeIP(ips) + err = agent.JoinByNodeAddrs(addrs) if err != nil { - fmt.Printf("JoinByNodeIP failed, error: %s", err) + //fmt.Printf("JoinByNodeIP failed, error: %s", err) return agent, err } @@ -461,9 +566,9 @@ } //InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝 -func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { +func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { conf := DefaultConfig() - fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) + //fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { @@ -477,9 +582,9 @@ } conf.EncryptKey = password } - agent, err := Create(conf) + agent, err := Create(conf, snapshotPath) if err != nil { - fmt.Printf("create agent failed, error: %s", err) + //fmt.Printf("create agent failed, error: %s", err) return agent, err } @@ -489,53 +594,59 @@ agent.ShutdownCh() }() time.Sleep(time.Second) - fmt.Println("Stats:", agent.Agent.Serf().Stats()) - fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) - fmt.Println("create agent sucess!!") + //fmt.Println("Stats:", agent.Agent.Serf().Stats()) + //fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) + //fmt.Println("create agent sucess!!") return agent, nil } -func (a *Agent) JoinByNodeIP(ips []string) error { +func (a *Agent) JoinByNodeAddrs(addrs []string) error { var nodes []string - if len(ips) == 0 { + if len(addrs) == 0 { return fmt.Errorf("No Nodes To Join!") } - for _, ip := range ips { - node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) - nodes = append(nodes, node) + for _, addr := range addrs { + nodes = append(nodes, addr) } - n, err := a.Agent.Join(nodes, true) - if err != nil || n == 0 { - //a.Stop() - //fmt.Println("Stop node") - return fmt.Errorf("Error Encrypt Key!") - } + a.Agent.Join(nodes, true) - return err + return nil } -type Node struct { - clusterID string - NodeID string - IP string - isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 -} +//func (a *Agent) JoinByNodeIP(ips []string) error { +// var nodes []string +// +// if len(ips) == 0 { +// return fmt.Errorf("No Nodes To Join!") +// } +// for _, ip := range ips { +// node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) +// nodes = append(nodes, node) +// } +// +// n, err := a.Agent.Join(nodes, true) +// if err != nil || n == 0 { +// return fmt.Errorf("Error Encrypt Key!") +// } +// +// return err +//} -func (a *Agent) GetNodes() (nodes []Node) { - var node Node - fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) +func (a *Agent) GetNodes() (nodes []NodeInfo) { + var node NodeInfo + //fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name - node.IP = mb.Addr.String() - node.isAlive = int(mb.Status) - node.clusterID = mb.Tags[tagKeyClusterID] + node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port)) + node.IsAlive = int(mb.Status) + node.ClusterID = mb.Tags[tagKeyClusterID] nodes = append(nodes, node) } return nodes -} +} \ No newline at end of file -- Gitblit v1.8.0