/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package syncdb import ( "context" "encoding/json" "errors" "fmt" "github.com/hashicorp/memberlist" "io/ioutil" "net" "os" //"os" "strings" "time" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/hashicorp/serf/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "log" ) const ( QueryEventGetDB = "GetDatabase" QueryEventUpdateDBData = "UpdateDBData" ) // Agent warps the serf agent type Agent struct { *agent.Agent conf *Config readyCh chan struct{} errorCh chan error } type NodeInfo struct { ClusterID string `json:"clusterID"` NodeID string `json:"nodeID"` NodeAddress string `json:"nodeAddress"` IsAlive int `json:"isAlive"` } // Create create serf agent with config func Create(conf *Config) (*Agent, error) { // config cover to serf config serfConf, err := conf.convertToSerf() if err != nil { return nil, err } // create serf agent with serf config fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, nil) if err != nil { return nil, err } // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { fmt.Printf("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s", len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), errorCh: make(chan error), }, nil } // Start agent func (a *Agent) Start(ctx context.Context) { err := a.Agent.Start() if err != nil { log.Println(err, "start serf agent failed") a.errorCh <- err return } a.RegisterEventHandler(a) err = a.retryJoin(ctx) if err != nil { log.Println(err, "start serf agent failed") if err != ctx.Err() && a.errorCh != nil { a.errorCh <- err } } go a.BroadcastMemberlist(BroadcastInterval * time.Second) } // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", // used for logical grouping of serf nodes func (a *Agent) HandleEvent(event serf.Event) { switch ev := event.(type) { case serf.UserEvent: var sqlUe SqlUserEvent fmt.Println(string(ev.Payload)) err := json.Unmarshal(ev.Payload, &sqlUe) if err !=nil { fmt.Println("sqlUe unmarshal err:",err) return } if sqlUe.Owner != a.conf.NodeName { results, _ := ExecuteWriteSql(sqlUe.Sql) //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",results) } case *serf.Query: if ev.Name == QueryEventGetDB { //bak file and send resp filename, err := BakDbFile() if err != nil { fmt.Println("bak db file error!") return } fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) fmt.Println("filebuf: ", len(filebuf)) if err != nil { fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { fmt.Printf("remove file%s\n failed", filename) return } fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { fmt.Printf("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { fmt.Println(string(ev.Payload)) var tmpstringslice []string tmpstringslice = append(tmpstringslice, string(ev.Payload)) fmt.Println(tmpstringslice) rows, err := ExecuteQuerySql(tmpstringslice) if err != nil { fmt.Println("err: ", err) return } var rowsReturn []Rows for _, r := range rows { rowsReturn = append(rowsReturn, *r) } bytesReturn, err := json.Marshal(rowsReturn) fmt.Println("results: ", bytesReturn) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { fmt.Printf("err: %s\n", err) return } } //var res []*Rows //json.Unmarshal(bytesReturn, &res) } case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { if ev.Members !=nil && len(ev.Members) ==1 { leaveMember := ev.Members[0] leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'" fmt.Println("leaveSql:",leaveSql) //ExecuteSqlByGorm([]string{ leaveSql }) fmt.Println("EventMemberLeave,current Members:",ev.Members) } return } default: fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } //if event.EventType() != serf.EventMemberJoin { // fmt.Printf("event.EventType() != serf.EventMemberJoin") // return //} // //if a.conf.Mode == ModeCluster { // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect { // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect") // return // } //} //a.DeregisterEventHandler(a) //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() fmt.Println("mb:", mb) // copy local node localNode := *mblist.LocalNode() nodeID := a.conf.NodeName nodeAddress := localNode.Address() clusterID := mb.Tags[tagKeyClusterID] isAlive := int(mb.Status) message, _ := json.Marshal(NodeInfo{ clusterID, nodeID, nodeAddress, isAlive, }) // replace node address localNode.Addr = net.ParseIP(BroadcastIP) //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } } // Ready Returns a channel that will be closed when serf is ready func (a *Agent) Ready() <-chan struct{} { return a.readyCh } // Error Returns a channel that will be transmit a serf error func (a *Agent) Error() <-chan error { return a.errorCh } // Stop serf agent func (a *Agent) Stop() { if a.errorCh != nil { a.Leave() a.Shutdown() close(a.errorCh) a.errorCh = nil } } // LocalMember returns the Member information for the local node func (a *Agent) LocalMember() *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { member := serfAgent.LocalMember() return &member } return nil } // GroupMembers returns a point-in-time snapshot of the members of by clusterID func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) { serfAgent := a.Agent.Serf() if serfAgent != nil { for _, member := range serfAgent.Members() { log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID]) if member.Tags[tagKeyClusterID] == clusterID { members = append(members, member) } } } return } // Member get member information with node func (a *Agent) Member(node string) *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { ms := serfAgent.Members() for _, m := range ms { if m.Name == node { return &m } } } return nil } // SerfConfig get serf config func (a *Agent) SerfConfig() *serf.Config { return a.Agent.SerfConfig() } // Join serf clusters through one or more members func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { return a.Agent.Join(addrs, replay) } // UserEvent sends a UserEvent on Serf func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { return a.Agent.UserEvent(name, payload, coalesce) } // Query sends a Query on Serf func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { return a.Agent.Query(name, payload, params) } func (a *Agent) retryJoin(ctx context.Context) (err error) { if len(a.conf.RetryJoin) == 0 { log.Printf("retry join mumber %d", len(a.conf.RetryJoin)) return nil } // Count of attempts attempt := 0 ticker := time.NewTicker(a.conf.RetryInterval) for { log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) var n int // Try to join the specified serf nodes n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) if err == nil { log.Printf("serf: Join completed. Synced with %d initial agents", n) break } attempt++ // If RetryMaxAttempts is greater than 0, agent will exit // and throw an error when the number of attempts exceeds RetryMaxAttempts, // else agent will try to join other nodes until successful always if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { err = errors.New("serf: maximum retry join attempts made, exiting") log.Println(err, err.Error()) break } select { case <-ctx.Done(): err = ctx.Err() goto done // Waiting for ticker to trigger case <-ticker.C: } } done: ticker.Stop() return } //GetDbFromCluster get the newest database after join cluster //dbPathWrite the path where to write after got a database, func (a *Agent) GetDbFromCluster(dbPathWrite string) { //members: get name of first member mbs := a.GroupMembers(a.conf.ClusterID) var specmembername string for _, m := range mbs { if m.Addr.String() != a.conf.BindAddr { specmembername = m.Name break } } fmt.Println(specmembername) //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { fmt.Println("err: ", err) } go func() { respCh := resp.ResponseCh() for { select { case r := <-respCh: fmt.Println("x length is: ", len(r.Payload)) // // byte to file. SerfDbConn.Close() SerfDbConn = nil err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644) if err != nil { fmt.Println("query byte to file error!", err) } err := InitDbConn("") if err != nil { fmt.Println("create db conn of test.db error: ", err) } return } } }() } type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` } //SyncSql boardcast sql to cluster func (a *Agent) SyncSql(sqlOp []string) { // event : use to send command to operate db. var sqlUe = SqlUserEvent{ Owner: a.conf.NodeName, Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) if err !=nil { fmt.Println("sqlUE marshal err:",err) return } err = a.UserEvent("SyncSql", []byte(ueB), false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { fmt.Println("err: ", err) } } //Init serf Init func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) { agent, err := InitNode(clusterID, password, nodeID) if err != nil { fmt.Printf("InitNode failed, error: %s", err) return agent, err } err = agent.JoinByNodeIP(ips) if err != nil { fmt.Printf("JoinByNodeIP failed, error: %s", err) return agent, err } return agent, err } //InitNode web后台收到创建集群的请求, func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { conf := DefaultConfig() fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { conf.EncryptKey = DefaultEncryptKey } else { if len(password) >= 16 { password = password[:16] } else { password = fmt.Sprintf("%016s", password)[:16] //return nil, fmt.Errorf("error password") } conf.EncryptKey = password } agent, err := Create(conf) if err != nil { fmt.Printf("create agent failed, error: %s", err) return agent, err } agent.Start(context.Background()) //<- agent.readyCh go func() { agent.ShutdownCh() }() time.Sleep(time.Second) fmt.Println("Stats:", agent.Agent.Serf().Stats()) fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) fmt.Println("create agent sucess!!") return agent, nil } func (a *Agent) JoinByNodeIP(ips []string) error { var nodes []string if len(ips) == 0 { return fmt.Errorf("No Nodes To Join!") } for _, ip := range ips { node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) nodes = append(nodes, node) } n, err := a.Agent.Join(nodes, true) if err != nil || n == 0 { //a.Stop() //fmt.Println("Stop node") return fmt.Errorf("Error Encrypt Key!") } return err } type Node struct { clusterID string NodeID string IP string isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 } func (a *Agent) GetNodes() (nodes []Node) { var node Node fmt.Println("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name node.IP = mb.Addr.String() node.isAlive = int(mb.Status) node.clusterID = mb.Tags[tagKeyClusterID] nodes = append(nodes, node) } return nodes }