| | |
| | | } |
| | | |
| | | // Create create serf agent with config |
| | | func Create(conf *Config) (*Agent, error) { |
| | | func Create(conf *Config, snapshotPath string) (*Agent, error) { |
| | | // config cover to serf config |
| | | serfConf, err := conf.convertToSerf() |
| | | serfConf, err := conf.convertToSerf(snapshotPath) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | |
| | | if event.EventType() == serf.EventMemberLeave { |
| | | if ev.Members !=nil && len(ev.Members) ==1 { |
| | | leaveMember := ev.Members[0] |
| | | leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'" |
| | | leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'" |
| | | ExecuteSqlByGorm([]string{ leaveSql }) |
| | | |
| | | logger.Info("EventMemberLeave,current Members:",ev.Members) |
| | | } |
| | | return |
| | | } else if event.EventType() == serf.EventMemberJoin { |
| | | if ev.Members !=nil && len(ev.Members) ==1 { |
| | | leaveMember := ev.Members[0] |
| | | leaveSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'" |
| | | ExecuteSqlByGorm([]string{ leaveSql }) |
| | | |
| | | logger.Info("EventMemberJoin,current Members:",ev.Members) |
| | | } |
| | | return |
| | | } |
| | |
| | | } |
| | | } |
| | | logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername) |
| | | if specmembername == "" {//如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点 |
| | | return nil,errors.New("specmembername not found") |
| | | } |
| | | |
| | | //query: get db file. |
| | | params := serf.QueryParam{ |
| | |
| | | |
| | | var wg sync.WaitGroup |
| | | wg.Add(1) |
| | | go func() { |
| | | ticker := time.NewTicker(300*time.Second) |
| | | go func(tk *time.Ticker) { |
| | | defer tk.Stop() |
| | | defer wg.Done() |
| | | for { |
| | | select { |
| | | case <-tk.C: |
| | | return |
| | | case msg := <- QueryTcpResponseChan: |
| | | logger.Info("Query response's len:", len(msg)) |
| | | err := json.Unmarshal(msg, &dumpSqls) |
| | |
| | | return |
| | | } |
| | | } |
| | | }() |
| | | }(ticker) |
| | | wg.Wait() |
| | | return &dumpSqls,nil |
| | | |
| | |
| | | } |
| | | |
| | | //Init serf Init |
| | | func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) { |
| | | agent, err := InitNode(clusterID, password, nodeID) |
| | | func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) { |
| | | agent, err := InitNode(clusterID, password, nodeID, snapshotPath) |
| | | if err != nil { |
| | | logger.Error("InitNode failed, error: %s", err) |
| | | return agent, err |
| | |
| | | } |
| | | |
| | | //InitNode web后台收到创建集群的请求, |
| | | func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { |
| | | func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) { |
| | | conf := DefaultConfig() |
| | | logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) |
| | | conf.ClusterID = clusterID |
| | |
| | | } |
| | | conf.EncryptKey = password |
| | | } |
| | | agent, err := Create(conf) |
| | | agent, err := Create(conf, snapshotPath) |
| | | if err != nil { |
| | | logger.Error("create agent failed, error: %s", err) |
| | | return agent, err |