/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package syncdb import ( "context" "encoding/json" "errors" "fmt" "github.com/hashicorp/memberlist" "io/ioutil" "net" "os" "strconv" "sync" //"os" "strings" "time" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/hashicorp/serf/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "log" "runtime/debug" ) const ( QueryEventGetDB = "GetDatabase" QueryEventUpdateDBData = "UpdateDBData" UserEventSyncSql = "SyncSql" UserEventSyncDbTablePersonCache = "SyncCache" ) // Agent warps the serf agent type Agent struct { *agent.Agent conf *Config readyCh chan struct{} errorCh chan error } type NodeInfo struct { ClusterID string `json:"clusterID"` NodeID string `json:"nodeID"` NodeAddress string `json:"nodeAddress"` IsAlive int `json:"isAlive"` } // Create create serf agent with config func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } // create serf agent with serf config //fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, nil) if err != nil { return nil, err } // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { //fmt.Printf("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring //fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", // len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), errorCh: make(chan error), }, nil } // Start agent func (a *Agent) Start(ctx context.Context) { log.Println("aaaaaaaaa") log.Println(string(debug.Stack())) //debug.PrintStack() err := a.Agent.Start() if err != nil { log.Println(err, "start serf agent failed") a.errorCh <- err return } a.RegisterEventHandler(a) err = a.retryJoin(ctx) if err != nil { log.Println(err, "start serf agent failed") if err != ctx.Err() && a.errorCh != nil { a.errorCh <- err } } go a.BroadcastMemberlist(BroadcastInterval * time.Second) } // define call back interface type ReceiveSqlInterface interface { Forward(content string) } // callback instance var receiveSqlInterface ReceiveSqlInterface // save the callback instance func RegisterReceiveSqlInterface(c ReceiveSqlInterface) { receiveSqlInterface = c } type DbHandler interface { Execute(sql string) bool } type DbDumpHandler interface { Dump(tables string) string } //数据库execute句柄 var DbHandle DbHandler //数据库备份句柄,只在syncdb中用到 var dbDumpH DbDumpHandler func RegisterDbHandler(h DbHandler) { DbHandle = h } func RegisterDbDumpHandler(h DbDumpHandler) { dbDumpH = h } // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", // used for logical grouping of serf nodes func (a *Agent) HandleEvent(event serf.Event) { switch ev := event.(type) { case serf.UserEvent: if ev.Name == UserEventSyncSql { var sqlUe SqlUserEvent //fmt.Println(string(ev.Payload)) err := json.Unmarshal(ev.Payload, &sqlUe) if err != nil { //fmt.Println("sqlUe unmarshal err:", err) return } if sqlUe.Owner != a.conf.NodeName { //ExecuteWriteSql(sqlUe.Sql, true) //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) //fmt.Println("userEvent exec 1 ", sqlUe.Sql, ",Result:", results) if receiveSqlInterface != nil && sqlUe.Sql !=nil && len(sqlUe.Sql) > 0 { receiveSqlInterface.Forward(sqlUe.Sql[0]) } } } case *serf.Query: if ev.Name == QueryEventGetDB { //bak file and send resp filename, err := BakDbFile() if err != nil { //fmt.Println("bak db file error!") return } //fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) //fmt.Println("filebuf: ", len(filebuf)) if err != nil { //fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { //fmt.Printf("remove file%s\n failed", filename) return } //fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { //fmt.Printf("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { //fmt.Println(string(ev.Payload)) var tableNames []string json.Unmarshal(ev.Payload, &tableNames) dumpData := dbDumpH.Dump(strings.Join(tableNames, ",")) bytesReturn, _ := json.Marshal(dumpData) //fmt.Println("results: ", bytesReturn) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { //fmt.Printf("err: %s\n", err) return } } } case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { if ev.Members != nil && len(ev.Members) == 1 { leaveMember := ev.Members[0] leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'" //fmt.Println("leaveSql:", leaveSql) _, err := ExecuteWriteSql([]string{leaveSql}, false) if err != nil { //fmt.Printf("err: %s\n", err) } //fmt.Println("EventMemberLeave,current Members:", ev.Members) } return } default: //fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() //fmt.Println("mb:", mb) // copy local node localNode := *mblist.LocalNode() nodeID := a.conf.NodeName nodeAddress := localNode.Address() clusterID := mb.Tags[tagKeyClusterID] isAlive := int(mb.Status) message, _ := json.Marshal(NodeInfo{ clusterID, nodeID, nodeAddress, isAlive, }) // replace node address localNode.Addr = net.ParseIP(BroadcastIP) //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { // //fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } } // Ready Returns a channel that will be closed when serf is ready func (a *Agent) Ready() <-chan struct{} { return a.readyCh } // Error Returns a channel that will be transmit a serf error func (a *Agent) Error() <-chan error { return a.errorCh } // Stop serf agent func (a *Agent) Stop() { if a.errorCh != nil { a.Leave() a.Shutdown() close(a.errorCh) a.errorCh = nil } } // LocalMember returns the Member information for the local node func (a *Agent) LocalMember() *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { member := serfAgent.LocalMember() return &member } return nil } // GroupMembers returns a point-in-time snapshot of the members of by clusterID func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) { serfAgent := a.Agent.Serf() if serfAgent != nil { for _, member := range serfAgent.Members() { log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID]) if member.Tags[tagKeyClusterID] == clusterID { members = append(members, member) } } } return } // Member get member information with node func (a *Agent) Member(node string) *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { ms := serfAgent.Members() for _, m := range ms { if m.Name == node { return &m } } } return nil } // SerfConfig get serf config func (a *Agent) SerfConfig() *serf.Config { return a.Agent.SerfConfig() } // Join serf clusters through one or more members func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { return a.Agent.Join(addrs, replay) } // UserEvent sends a UserEvent on Serf func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { return a.Agent.UserEvent(name, payload, coalesce) } // Query sends a Query on Serf func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { return a.Agent.Query(name, payload, params) } func (a *Agent) retryJoin(ctx context.Context) (err error) { if len(a.conf.RetryJoin) == 0 { log.Printf("retry join mumber %d", len(a.conf.RetryJoin)) return nil } // Count of attempts attempt := 0 ticker := time.NewTicker(a.conf.RetryInterval) for { log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) var n int // Try to join the specified serf nodes n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) if err == nil { log.Printf("serf: Join completed. Synced with %d initial agents", n) break } attempt++ // If RetryMaxAttempts is greater than 0, agent will exit // and throw an error when the number of attempts exceeds RetryMaxAttempts, // else agent will try to join other nodes until successful always if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { err = errors.New("serf: maximum retry join attempts made, exiting") log.Println(err, err.Error()) break } select { case <-ctx.Done(): err = ctx.Err() goto done // Waiting for ticker to trigger case <-ticker.C: } } done: ticker.Stop() return } //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, func (a *Agent) GetDbFromCluster(dbPathWrite string) { //members: get name of first member mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { if m.Addr.String() != a.conf.BindAddr { specmembername = m.Name break } } //fmt.Println(specmembername) //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { //fmt.Println("err: ", err) } go func() { respCh := resp.ResponseCh() for { select { case r := <-respCh: //fmt.Println("x length is: ", len(r.Payload)) // // byte to file. SerfDbConn.Close() SerfDbConn = nil err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644) if err != nil { //fmt.Println("query byte to file error!", err) } err := InitDbConn("") if err != nil { //fmt.Println("create db conn of test.db error: ", err) } return } } }() } //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string, error) { //members: get name of first member mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { //fmt.Println("m", m) if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad if strings.HasPrefix(a.conf.NodeName, "PSPAD"){ if strings.HasPrefix(m.Name, "PSPAD") { specmembername = m.Name break } }else{ specmembername = m.Name break } } } //fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername) //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } //SQL tBytes, _ := json.Marshal(tableNames) resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { //fmt.Println("err: ", err) } //fmt.Println("Query.resp.err:", err, "resp:", resp) var wg sync.WaitGroup wg.Add(1) var dumpSqls []string go func() { defer wg.Done() respCh := resp.ResponseCh() for { select { case r := <-respCh: //fmt.Println("Query response's len:", len(r.Payload)) err := json.Unmarshal(r.Payload, &dumpSqls) if err == nil { //fmt.Println("dumpSql:", dumpSqls) //fmt.Println("data dump success") } return } } }() wg.Wait() return &dumpSqls, nil } type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` } //SyncSql boardcast sql to cluster func (a *Agent) SyncSql(sqlOp []string) { // event : use to send command to operate db. var sqlUe = SqlUserEvent{ Owner: a.conf.NodeName, Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) if err != nil { //fmt.Println("sqlUE marshal err:", err) return } err = a.UserEvent(UserEventSyncSql, ueB, false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { //fmt.Println("err: ", err) } } //Init serf Init func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { agent, err := InitNode(clusterID, password, nodeID, snapshotPath) if err != nil { //fmt.Printf("InitNode failed, error: %s", err) return agent, err } err = agent.JoinByNodeAddrs(addrs) if err != nil { //fmt.Printf("JoinByNodeIP failed, error: %s", err) return agent, err } return agent, err } //InitNode web后台收到创建集群的请求, func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { conf := DefaultConfig() //fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { conf.EncryptKey = DefaultEncryptKey } else { if len(password) >= 16 { password = password[:16] } else { password = fmt.Sprintf("%016s", password)[:16] //return nil, fmt.Errorf("error password") } conf.EncryptKey = password } agent, err := Create(conf, snapshotPath) if err != nil { //fmt.Printf("create agent failed, error: %s", err) return agent, err } agent.Start(context.Background()) //<- agent.readyCh go func() { agent.ShutdownCh() }() time.Sleep(time.Second) //fmt.Println("Stats:", agent.Agent.Serf().Stats()) //fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) //fmt.Println("create agent sucess!!") return agent, nil } func (a *Agent) JoinByNodeAddrs(addrs []string) error { var nodes []string if len(addrs) == 0 { return fmt.Errorf("No Nodes To Join!") } for _, addr := range addrs { nodes = append(nodes, addr) } a.Agent.Join(nodes, true) return nil } //func (a *Agent) JoinByNodeIP(ips []string) error { // var nodes []string // // if len(ips) == 0 { // return fmt.Errorf("No Nodes To Join!") // } // for _, ip := range ips { // node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) // nodes = append(nodes, node) // } // // n, err := a.Agent.Join(nodes, true) // if err != nil || n == 0 { // return fmt.Errorf("Error Encrypt Key!") // } // // return err //} func (a *Agent) GetNodes() (nodes []NodeInfo) { var node NodeInfo //fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port)) node.IsAlive = int(mb.Status) node.ClusterID = mb.Tags[tagKeyClusterID] nodes = append(nodes, node) } return nodes }