From aaa2b8f734e8a2f8cc4d57a8e8adfc2fe5da77d9 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期三, 13 五月 2020 11:50:34 +0800 Subject: [PATCH] rm ExecuteSql when recv UserEvent --- agent.go | 144 +++++++++++++++++++++++++++-------------------- 1 files changed, 83 insertions(+), 61 deletions(-) diff --git a/agent.go b/agent.go index f6657d6..c3b488c 100644 --- a/agent.go +++ b/agent.go @@ -36,6 +36,7 @@ "github.com/hashicorp/serf/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "log" + "runtime/debug" ) const ( @@ -61,15 +62,15 @@ } // Create create serf agent with config -func Create(conf *Config) (*Agent, error) { +func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config - serfConf, err := conf.convertToSerf() + serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } // create serf agent with serf config - fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) + //fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, nil) if err != nil { return nil, err @@ -77,12 +78,12 @@ // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { - fmt.Printf("Failed to restore keyring: %s", err) + //fmt.Printf("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring - fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", - len(conf.EncryptKey), conf.EncryptKey) + //fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", + // len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, @@ -94,6 +95,9 @@ // Start agent func (a *Agent) Start(ctx context.Context) { + log.Println("aaaaaaaaa") + log.Println(string(debug.Stack())) + //debug.PrintStack() err := a.Agent.Start() if err != nil { log.Println(err, "start serf agent failed") @@ -113,6 +117,20 @@ go a.BroadcastMemberlist(BroadcastInterval * time.Second) } +// define call back interface +type ReceiveSqlInterface interface { + Forward(content string) +} + +// callback instance +var receiveSqlInterface ReceiveSqlInterface + +// save the callback instance +func RegisterReceiveSqlInterface(c ReceiveSqlInterface) { + receiveSqlInterface = c +} + + // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", @@ -123,16 +141,19 @@ case serf.UserEvent: if ev.Name == UserEventSyncSql { var sqlUe SqlUserEvent - fmt.Println(string(ev.Payload)) + //fmt.Println(string(ev.Payload)) err := json.Unmarshal(ev.Payload, &sqlUe) if err != nil { - fmt.Println("sqlUe unmarshal err:", err) + //fmt.Println("sqlUe unmarshal err:", err) return } if sqlUe.Owner != a.conf.NodeName { - results, _ := ExecuteWriteSql(sqlUe.Sql, true) + //ExecuteWriteSql(sqlUe.Sql, true) //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) - fmt.Println("userEvent exec ", sqlUe.Sql, ",Result:", results) + //fmt.Println("userEvent exec 1 ", sqlUe.Sql, ",Result:", results) + if receiveSqlInterface != nil && sqlUe.Sql !=nil && len(sqlUe.Sql) > 0 { + receiveSqlInterface.Forward(sqlUe.Sql[0]) + } } } @@ -142,39 +163,43 @@ //bak file and send resp filename, err := BakDbFile() if err != nil { - fmt.Println("bak db file error!") + //fmt.Println("bak db file error!") return } - fmt.Println(filename) + //fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) - fmt.Println("filebuf: ", len(filebuf)) + //fmt.Println("filebuf: ", len(filebuf)) if err != nil { - fmt.Printf("file to []bytes error: %s\n", err) + //fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { - fmt.Printf("remove file%s\n failed", filename) + //fmt.Printf("remove file%s\n failed", filename) return } - fmt.Println("query payload: ", len(ev.Payload)) + //fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { - fmt.Printf("err: %s\n", err) + //fmt.Printf("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { - fmt.Println(string(ev.Payload)) - var tmpstringslice []string - tmpstringslice = append(tmpstringslice, string(ev.Payload)) - fmt.Println(tmpstringslice) - rows, err := ExecuteQuerySql(tmpstringslice) + //fmt.Println(string(ev.Payload)) + var tableNames []string + err := json.Unmarshal(ev.Payload, &tableNames) + if err !=nil { + //fmt.Println("Query tableNames unmarshal err") + return + } + + rows, err := ExecuteQuerySql(tableNames) if err != nil { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) return } var rowsReturn []Rows @@ -183,35 +208,32 @@ } bytesReturn, err := json.Marshal(rowsReturn) - fmt.Println("results: ", bytesReturn) + //fmt.Println("results: ", bytesReturn) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { - fmt.Printf("err: %s\n", err) + //fmt.Printf("err: %s\n", err) return } } - - //var res []*Rows - //json.Unmarshal(bytesReturn, &res) } case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'" - fmt.Println("leaveSql:", leaveSql) + //fmt.Println("leaveSql:", leaveSql) _, err := ExecuteWriteSql([]string{leaveSql}, false) if err != nil { - fmt.Printf("err: %s\n", err) + //fmt.Printf("err: %s\n", err) } - fmt.Println("EventMemberLeave,current Members:", ev.Members) + //fmt.Println("EventMemberLeave,current Members:", ev.Members) } return } default: - fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) + //fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } } @@ -220,7 +242,7 @@ serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() - fmt.Println("mb:", mb) + //fmt.Println("mb:", mb) // copy local node localNode := *mblist.LocalNode() @@ -241,7 +263,7 @@ //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { - // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) + // //fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } @@ -379,7 +401,7 @@ break } } - fmt.Println(specmembername) + //fmt.Println(specmembername) //query: get db file. params := serf.QueryParam{ @@ -388,7 +410,7 @@ resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) } go func() { @@ -396,18 +418,18 @@ for { select { case r := <-respCh: - fmt.Println("x length is: ", len(r.Payload)) + //fmt.Println("x length is: ", len(r.Payload)) // // byte to file. SerfDbConn.Close() SerfDbConn = nil err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644) if err != nil { - fmt.Println("query byte to file error!", err) + //fmt.Println("query byte to file error!", err) } err := InitDbConn("") if err != nil { - fmt.Println("create db conn of test.db error: ", err) + //fmt.Println("create db conn of test.db error: ", err) } return } @@ -422,7 +444,7 @@ mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { - fmt.Println("m", m) + //fmt.Println("m", m) if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad if strings.HasPrefix(a.conf.NodeName, "DSVAD"){ if strings.HasPrefix(m.Name, "DSVAD") { @@ -435,7 +457,7 @@ } } } - fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername) + //fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername) //query: get db file. params := serf.QueryParam{ @@ -447,9 +469,9 @@ resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) } - fmt.Println("Query.resp.err:", err, "resp:", resp) + //fmt.Println("Query.resp.err:", err, "resp:", resp) var wg sync.WaitGroup wg.Add(1) @@ -460,11 +482,11 @@ for { select { case r := <-respCh: - fmt.Println("Query response's len:", len(r.Payload)) + //fmt.Println("Query response's len:", len(r.Payload)) err := json.Unmarshal(r.Payload, &dumpSqls) if err == nil { - fmt.Println("dumpSql:", dumpSqls) - fmt.Println("data dump success") + //fmt.Println("dumpSql:", dumpSqls) + //fmt.Println("data dump success") } return } @@ -488,26 +510,26 @@ } ueB, err := json.Marshal(sqlUe) if err != nil { - fmt.Println("sqlUE marshal err:", err) + //fmt.Println("sqlUE marshal err:", err) return } err = a.UserEvent(UserEventSyncSql, ueB, false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { - fmt.Println("err: ", err) + //fmt.Println("err: ", err) } } //Init serf Init -func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) { - agent, err := InitNode(clusterID, password, nodeID) +func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { + agent, err := InitNode(clusterID, password, nodeID, snapshotPath) if err != nil { - fmt.Printf("InitNode failed, error: %s", err) + //fmt.Printf("InitNode failed, error: %s", err) return agent, err } err = agent.JoinByNodeAddrs(addrs) if err != nil { - fmt.Printf("JoinByNodeIP failed, error: %s", err) + //fmt.Printf("JoinByNodeIP failed, error: %s", err) return agent, err } @@ -515,9 +537,9 @@ } //InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝 -func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { +func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { conf := DefaultConfig() - fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) + //fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { @@ -531,9 +553,9 @@ } conf.EncryptKey = password } - agent, err := Create(conf) + agent, err := Create(conf, snapshotPath) if err != nil { - fmt.Printf("create agent failed, error: %s", err) + //fmt.Printf("create agent failed, error: %s", err) return agent, err } @@ -543,9 +565,9 @@ agent.ShutdownCh() }() time.Sleep(time.Second) - fmt.Println("Stats:", agent.Agent.Serf().Stats()) - fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) - fmt.Println("create agent sucess!!") + //fmt.Println("Stats:", agent.Agent.Serf().Stats()) + //fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) + //fmt.Println("create agent sucess!!") return agent, nil } @@ -586,7 +608,7 @@ func (a *Agent) GetNodes() (nodes []NodeInfo) { var node NodeInfo - fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) + //fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name @@ -598,4 +620,4 @@ } return nodes -} +} \ No newline at end of file -- Gitblit v1.8.0