/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package syncdb import ( "context" "encoding/json" "errors" "fmt" "github.com/hashicorp/memberlist" "io/ioutil" "net" "os" "strconv" "sync" //"os" "strings" "time" "basic.com/valib/serf.git/serf" "basic.com/valib/serf.git/cmd/serf/command/agent" //"github.com/apache/servicecomb-service-center/pkg/log" "basic.com/valib/logger.git" ) const ( QueryEventGetDB = "GetDatabase" QueryEventUpdateDBData = "UpdateDBData" UserEventSyncSql = "SyncSql" UserEventSyncDbTablePersonCache = "SyncCache" ) // Agent warps the serf agent type Agent struct { *agent.Agent conf *Config readyCh chan struct{} errorCh chan error } type NodeInfo struct { ClusterID string `json:"clusterID"` NodeID string `json:"nodeID"` NodeAddress string `json:"nodeAddress"` IsAlive int `json:"isAlive"` } // Create create serf agent with config func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } // create serf agent with serf config logger.Info("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, logger.GetLogFile()) if err != nil { return nil, err } // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { logger.Error("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring logger.Info("[INFO] agent: Restored keyring with %d keys from %s", len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), errorCh: make(chan error), }, nil } // Start agent func (a *Agent) Start(ctx context.Context) { err := a.Agent.Start() if err != nil { logger.Error(err, "start serf agent failed") a.errorCh <- err return } a.RegisterEventHandler(a) err = a.retryJoin(ctx) if err != nil { logger.Error(err, "start serf agent failed") if err != ctx.Err() && a.errorCh != nil { a.errorCh <- err } } go a.BroadcastMemberlist(BroadcastInterval * time.Second) } var SyncDbTablePersonCacheChan = make(chan []byte,512) // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", // used for logical grouping of serf nodes func (a *Agent) HandleEvent(event serf.Event) { switch ev := event.(type) { case serf.UserEvent: if ev.Name == UserEventSyncSql { logger.Info("receive a UserEventSyncSql event") var sqlUe SqlUserEvent err := json.Unmarshal(ev.Payload, &sqlUe) if err !=nil { logger.Error("sqlUe unmarshal err:",err) return } logger.Info("ev.LTime:", ev.LTime ,"owner:", sqlUe.Owner, "sql:", sqlUe.Sql) if sqlUe.Owner != a.conf.NodeName { go func() { flag, e := ExecuteSqlByGorm(sqlUe.Sql) logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e) }() } } else if ev.Name == UserEventSyncDbTablePersonCache { logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload)) SyncDbTablePersonCacheChan <- ev.Payload } case *serf.Query: if ev.Name == QueryEventGetDB { //bak file and send resp filename, err := BakDbFile() if err != nil { logger.Error("bak db file error!") return } logger.Info(filename) filebuf, err := ioutil.ReadFile(filename) logger.Info("filebuf: ", len(filebuf)) if err != nil { logger.Error("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { logger.Error("remove file%s\n failed", filename) return } logger.Info("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { logger.Error("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { //logger.Info(string(ev.Payload)) //var tmpstringslice []string //tmpstringslice = append(tmpstringslice, string(ev.Payload)) //logger.Info(tmpstringslice) //rows, err := ExecuteQuerySql(tmpstringslice) //if err != nil { // logger.Error("err: ", err) // return //} //var rowsReturn []Rows //for _, r := range rows { // rowsReturn = append(rowsReturn, *r) //} logger.Info("receive QueryEventUpdateDBData, current node:", a.conf.NodeName) var fromP QueryTableDataParam err := json.Unmarshal(ev.Payload, &fromP) if err !=nil { logger.Error("Query tableNames unmarshal err") if query, ok := event.(*serf.Query); ok { if err := query.Respond([]byte("request unmarshal err")); err != nil { logger.Error("query.Respond err: %s\n", err) return } } return } logger.Info("Query tableNames:",fromP.Tables) datas, err := ExecuteQueryByGorm(fromP.Tables) if err !=nil { logger.Error("queryByGorm err:", err) if query, ok := event.(*serf.Query); ok { if err := query.Respond([]byte("queryByGorm err")); err != nil { logger.Error("query.Respond err: %s\n", err) return } } return } bytesReturn, err := json.Marshal(datas) logger.Info("results.len: ", len(bytesReturn)) var targetNode *memberlist.Node nodes := a.Serf().Memberlist().Members() if nodes != nil && len(nodes) > 0 { for _,n :=range nodes { if n.Name == fromP.From { targetNode = n break } } } logger.Debug("targetNode:",targetNode.Name) if targetNode !=nil { go func() { addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) sendErr := rawSendTcpMsg(addr, bytesReturn) if sendErr != nil { logger.Debug("sendToTcp err:",sendErr) } else { logger.Debug("sendToTcp success") } }() } else { logger.Debug("targetNode is nil") } //if query, ok := event.(*serf.Query); ok { // if err := query.Respond(bytesReturn); err != nil { // logger.Error("err: %s\n", err) // return // } //} } case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { if ev.Members !=nil && len(ev.Members) ==1 { leaveMember := ev.Members[0] leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'" ExecuteSqlByGorm([]string{ leaveSql }) logger.Info("EventMemberLeave,current Members:",ev.Members) } return } else if event.EventType() == serf.EventMemberJoin { if ev.Members !=nil && len(ev.Members) ==1 { leaveMember := ev.Members[0] leaveSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'" ExecuteSqlByGorm([]string{ leaveSql }) logger.Info("EventMemberJoin,current Members:",ev.Members) } return } default: logger.Warn("Unknown event type: %s\n", ev.EventType().String()) } //if event.EventType() != serf.EventMemberJoin { // logger.Info("event.EventType() != serf.EventMemberJoin") // return //} // //if a.conf.Mode == ModeCluster { // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect { // logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect") // return // } //} //a.DeregisterEventHandler(a) //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() logger.Info("mb:", mb) // copy local node localNode := *mblist.LocalNode() nodeID := a.conf.NodeName nodeAddress := localNode.Address() clusterID := mb.Tags[tagKeyClusterID] isAlive := int(mb.Status) message, _ := json.Marshal(NodeInfo{ clusterID, nodeID, nodeAddress, isAlive, }) // replace node address localNode.Addr = net.ParseIP(BroadcastIP) //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { // logger.Info("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } } // Ready Returns a channel that will be closed when serf is ready func (a *Agent) Ready() <-chan struct{} { return a.readyCh } // Error Returns a channel that will be transmit a serf error func (a *Agent) Error() <-chan error { return a.errorCh } // Stop serf agent func (a *Agent) Stop() { if a.errorCh != nil { logger.Info("a.Shutdown()", a.Leave()) logger.Info("a.Shutdown()", a.Shutdown()) close(a.errorCh) a.errorCh = nil } } // LocalMember returns the Member information for the local node func (a *Agent) LocalMember() *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { member := serfAgent.LocalMember() return &member } return nil } // GroupMembers returns a point-in-time snapshot of the members of by clusterID func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) { serfAgent := a.Agent.Serf() if serfAgent != nil { for _, member := range serfAgent.Members() { logger.Info("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID]) if member.Tags[tagKeyClusterID] == clusterID { members = append(members, member) } } } return } // Member get member information with node func (a *Agent) Member(node string) *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { ms := serfAgent.Members() for _, m := range ms { if m.Name == node { return &m } } } return nil } // SerfConfig get serf config func (a *Agent) SerfConfig() *serf.Config { return a.Agent.SerfConfig() } // Join serf clusters through one or more members func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { return a.Agent.Join(addrs, replay) } // UserEvent sends a UserEvent on Serf func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { return a.Agent.UserEvent(name, payload, coalesce) } // Query sends a Query on Serf func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { return a.Agent.Query(name, payload, params) } func (a *Agent) retryJoin(ctx context.Context) (err error) { if len(a.conf.RetryJoin) == 0 { logger.Error("retry join mumber %d", len(a.conf.RetryJoin)) return nil } // Count of attempts attempt := 0 ticker := time.NewTicker(a.conf.RetryInterval) for { logger.Info("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) var n int // Try to join the specified serf nodes n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) if err == nil { logger.Error("serf: Join completed. Synced with %d initial agents", n) break } attempt++ // If RetryMaxAttempts is greater than 0, agent will exit // and throw an error when the number of attempts exceeds RetryMaxAttempts, // else agent will try to join other nodes until successful always if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { err = errors.New("serf: maximum retry join attempts made, exiting") logger.Error(err, err.Error()) break } select { case <-ctx.Done(): err = ctx.Err() goto done // Waiting for ticker to trigger case <-ticker.C: } } done: ticker.Stop() return } //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, func (a *Agent) GetDbFromCluster(dbPathWrite string) { //members: get name of first member mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { if m.Addr.String() != a.conf.BindAddr { specmembername = m.Name break } } logger.Info(specmembername) //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { logger.Error("err: ", err) } go func() { respCh := resp.ResponseCh() for { select { case r := <-respCh: logger.Info("x length is: ", len(r.Payload)) // // byte to file. SerfDbConn.Close() SerfDbConn = nil err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644) if err != nil { logger.Error("query byte to file error!", err) } err := InitDbConn("") if err != nil { logger.Error("create db conn of test.db error: ", err) } return } } }() } type QueryTableDataParam struct { Tables []string `json:"tables"` From string `json:"from"` } var QueryTcpResponseChan = make(chan []byte) //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, func (a *Agent) GetTableDataFromCluster(tableNames []string, timeout time.Duration) (*[]string,error) { //members: get name of first member mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { logger.Info("m",m) if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad if strings.HasPrefix(a.conf.NodeName, "DSVAD"){ if strings.HasPrefix(m.Name, "DSVAD") { specmembername = m.Name break } }else{ specmembername = m.Name break } } } logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername) if specmembername == "" {//如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点 return nil,errors.New("specmembername not found") } //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } //get db tables var fromP = QueryTableDataParam{ Tables: tableNames, From: a.conf.NodeName, } tBytes, _ := json.Marshal(fromP) resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { logger.Error("err: ", err) } logger.Info("Query.resp.err:",err,"resp:",resp) var dumpSqls []string var wg sync.WaitGroup wg.Add(1) ticker := time.NewTicker(timeout) go func(tk *time.Ticker) { defer tk.Stop() defer wg.Done() for { select { case <-tk.C: return case msg := <- QueryTcpResponseChan: logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err == nil { logger.Error("dumpSql:", dumpSqls) logger.Error("data dump success") } return } } }(ticker) wg.Wait() return &dumpSqls,nil //r, err = c.Query([]string{query}, false, false) //if err != nil { // return err //} //for _, x := range r[0].Values { // y := logger.Info("%s;\n", x[0].(string)) // if _, err := w.Write([]byte(y)); err != nil { // return err // } //} } type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` } //SyncSql boardcast sql to cluster func (a *Agent) SyncSql(sqlOp []string) { // event : use to send command to operate db. var sqlUe = SqlUserEvent{ Owner: a.conf.NodeName, Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) if err !=nil { logger.Error("sqlUE marshal err:",err) return } err = a.UserEvent(UserEventSyncSql, ueB, false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { logger.Error("err: ", err) } } //更新同步库的比对缓存 func (a *Agent) SyncDbTablePersonCache(b []byte) { err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false) if err !=nil{ logger.Error("UserEventSyncDbTablePersonCache err:",err) } } //Init serf Init func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { agent, err := InitNode(clusterID, password, nodeID, snapshotPath) if err != nil { logger.Error("InitNode failed, error: %s", err) return agent, err } err = agent.JoinByNodeAddrs(addrs) if err != nil { logger.Error("JoinByNodeIP failed, error: %s", err) return agent, err } return agent, err } //InitNode web后台收到创建集群的请求, func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { conf := DefaultConfig() logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { conf.EncryptKey = DefaultEncryptKey } else { if len(password) >= 16 { password = password[:16] } else { password = fmt.Sprintf("%016s", password)[:16] //return nil, fmt.Errorf("error password") } conf.EncryptKey = password } agent, err := Create(conf, snapshotPath) if err != nil { logger.Error("create agent failed, error: %s", err) return agent, err } agent.Start(context.Background()) //<- agent.readyCh go func() { agent.ShutdownCh() }() time.Sleep(time.Second) logger.Info("Stats:", agent.Agent.Serf().Stats()) logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) logger.Info("create agent sucess!!") return agent, nil } func (a *Agent) JoinByNodeAddrs(addrs []string) error { var nodes []string if len(addrs) == 0 { return fmt.Errorf("No Nodes To Join!") } for _, addr := range addrs { nodes = append(nodes, addr) } a.Agent.Join(nodes, true) return nil } //func (a *Agent) JoinByNodeIP(ips []string) error { // var nodes []string // // if len(ips) == 0 { // return fmt.Errorf("No Nodes To Join!") // } // for _, ip := range ips { // node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) // nodes = append(nodes, node) // } // // n, err := a.Agent.Join(nodes, true) // if err != nil || n == 0 { // return fmt.Errorf("Error Encrypt Key!") // } // // return err //} func (a *Agent) GetNodes() (nodes []NodeInfo) { var node NodeInfo logger.Info("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port)) node.IsAlive = int(mb.Status) node.ClusterID = mb.Tags[tagKeyClusterID] nodes = append(nodes, node) } return nodes }