/*
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* contributor license agreements. See the NOTICE file distributed with
|
* this work for additional information regarding copyright ownership.
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* (the "License"); you may not use this file except in compliance with
|
* the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package syncdb
|
|
import (
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"github.com/hashicorp/memberlist"
|
"io/ioutil"
|
"net"
|
"os"
|
"strconv"
|
"sync"
|
|
//"os"
|
"strings"
|
"time"
|
|
"github.com/hashicorp/serf/cmd/serf/command/agent"
|
"github.com/hashicorp/serf/serf"
|
//"github.com/apache/servicecomb-service-center/pkg/log"
|
"basic.com/valib/logger.git"
|
)
|
|
const (
|
QueryEventGetDB = "GetDatabase"
|
QueryEventUpdateDBData = "UpdateDBData"
|
UserEventSyncSql = "SyncSql"
|
UserEventSyncDbTablePersonCache = "SyncCache"
|
)
|
|
// Agent warps the serf agent
|
type Agent struct {
|
*agent.Agent
|
conf *Config
|
readyCh chan struct{}
|
errorCh chan error
|
}
|
|
type NodeInfo struct {
|
ClusterID string `json:"clusterID"`
|
NodeID string `json:"nodeID"`
|
NodeAddress string `json:"nodeAddress"`
|
IsAlive int `json:"isAlive"`
|
}
|
|
// Create create serf agent with config
|
func Create(conf *Config) (*Agent, error) {
|
// config cover to serf config
|
serfConf, err := conf.convertToSerf()
|
if err != nil {
|
return nil, err
|
}
|
|
// create serf agent with serf config
|
logger.Info("conf.Config.EncryptKey:", conf.EncryptKey)
|
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
|
if err != nil {
|
return nil, err
|
}
|
// Create the keyring
|
keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
|
if err != nil {
|
logger.Error("Failed to restore keyring: %s", err)
|
return nil, err
|
}
|
serfConf.MemberlistConfig.Keyring = keyring
|
|
logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
|
len(conf.EncryptKey), conf.EncryptKey)
|
|
return &Agent{
|
Agent: serfAgent,
|
conf: conf,
|
readyCh: make(chan struct{}),
|
errorCh: make(chan error),
|
}, nil
|
}
|
|
// Start agent
|
func (a *Agent) Start(ctx context.Context) {
|
err := a.Agent.Start()
|
if err != nil {
|
logger.Error(err, "start serf agent failed")
|
a.errorCh <- err
|
return
|
}
|
a.RegisterEventHandler(a)
|
|
err = a.retryJoin(ctx)
|
if err != nil {
|
logger.Error(err, "start serf agent failed")
|
if err != ctx.Err() && a.errorCh != nil {
|
a.errorCh <- err
|
}
|
}
|
|
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
|
}
|
|
var SyncDbTablePersonCacheChan = make(chan []byte,0)
|
|
// HandleEvent Handles serf.EventMemberJoin events,
|
// which will wait for members to join until the number of group members is equal to "groupExpect"
|
// when the startup mode is "ModeCluster",
|
// used for logical grouping of serf nodes
|
func (a *Agent) HandleEvent(event serf.Event) {
|
|
switch ev := event.(type) {
|
case serf.UserEvent:
|
if ev.Name == UserEventSyncSql {
|
var sqlUe SqlUserEvent
|
err := json.Unmarshal(ev.Payload, &sqlUe)
|
if err !=nil {
|
logger.Error("sqlUe unmarshal err:",err)
|
return
|
}
|
if sqlUe.Owner != a.conf.NodeName {
|
|
flag, e := ExecuteSqlByGorm(sqlUe.Sql)
|
|
evTime := uint64(ev.LTime)
|
logger.Info("ev.LTime:",evTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
|
}
|
} else if ev.Name == UserEventSyncDbTablePersonCache {
|
logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
|
SyncDbTablePersonCacheChan <- ev.Payload
|
}
|
|
|
case *serf.Query:
|
|
if ev.Name == QueryEventGetDB {
|
//bak file and send resp
|
filename, err := BakDbFile()
|
if err != nil {
|
logger.Error("bak db file error!")
|
return
|
}
|
logger.Info(filename)
|
|
filebuf, err := ioutil.ReadFile(filename)
|
logger.Info("filebuf: ", len(filebuf))
|
if err != nil {
|
logger.Error("file to []bytes error: %s\n", err)
|
return
|
}
|
|
err = os.Remove(filename)
|
if err != nil {
|
logger.Error("remove file%s\n failed", filename)
|
return
|
}
|
|
logger.Info("query payload: ", len(ev.Payload))
|
if query, ok := event.(*serf.Query); ok {
|
if err := query.Respond(filebuf); err != nil {
|
logger.Error("err: %s\n", err)
|
return
|
}
|
}
|
} else if ev.Name == QueryEventUpdateDBData {
|
//logger.Info(string(ev.Payload))
|
//var tmpstringslice []string
|
//tmpstringslice = append(tmpstringslice, string(ev.Payload))
|
//logger.Info(tmpstringslice)
|
//rows, err := ExecuteQuerySql(tmpstringslice)
|
//if err != nil {
|
// logger.Error("err: ", err)
|
// return
|
//}
|
//var rowsReturn []Rows
|
//for _, r := range rows {
|
// rowsReturn = append(rowsReturn, *r)
|
//}
|
var fromP QueryTableDataParam
|
err := json.Unmarshal(ev.Payload, &fromP)
|
if err !=nil {
|
logger.Error("Query tableNames unmarshal err")
|
return
|
}
|
logger.Info("Query tableNames:",fromP.Tables)
|
datas, err := ExecuteQueryByGorm(fromP.Tables)
|
if err !=nil {
|
logger.Error("queryByGorm err")
|
return
|
}
|
bytesReturn, err := json.Marshal(datas)
|
logger.Info("results.len: ", len(bytesReturn))
|
|
var targetNode *memberlist.Node
|
nodes := a.Serf().Memberlist().Members()
|
if nodes != nil && len(nodes) > 0 {
|
for _,n :=range nodes {
|
if n.Name == fromP.From {
|
targetNode = n
|
break
|
}
|
}
|
}
|
logger.Debug("targetNode:",targetNode.Name)
|
if targetNode !=nil {
|
addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
|
sendErr := rawSendTcpMsg(addr, bytesReturn)
|
|
if sendErr != nil {
|
logger.Debug("sendToTcp err:",sendErr)
|
} else {
|
logger.Debug("sendToTcp success")
|
}
|
|
} else {
|
logger.Debug("targetNode is nil")
|
}
|
|
//if query, ok := event.(*serf.Query); ok {
|
// if err := query.Respond(bytesReturn); err != nil {
|
// logger.Error("err: %s\n", err)
|
// return
|
// }
|
//}
|
}
|
case serf.MemberEvent:
|
if event.EventType() == serf.EventMemberLeave {
|
if ev.Members !=nil && len(ev.Members) ==1 {
|
leaveMember := ev.Members[0]
|
leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'"
|
ExecuteSqlByGorm([]string{ leaveSql })
|
|
logger.Info("EventMemberLeave,current Members:",ev.Members)
|
}
|
return
|
}
|
|
|
default:
|
logger.Warn("Unknown event type: %s\n", ev.EventType().String())
|
}
|
|
//if event.EventType() != serf.EventMemberJoin {
|
// logger.Info("event.EventType() != serf.EventMemberJoin")
|
// return
|
//}
|
//
|
//if a.conf.Mode == ModeCluster {
|
// if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
|
// logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
|
// return
|
// }
|
//}
|
//a.DeregisterEventHandler(a)
|
//close(a.readyCh)
|
}
|
|
|
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
|
//serf := a.serf
|
serf := a.Agent.Serf()
|
mb := serf.LocalMember()
|
mblist := serf.Memberlist()
|
logger.Info("mb:", mb)
|
|
// copy local node
|
localNode := *mblist.LocalNode()
|
nodeID := a.conf.NodeName
|
nodeAddress := localNode.Address()
|
clusterID := mb.Tags[tagKeyClusterID]
|
isAlive := int(mb.Status)
|
|
message, _ := json.Marshal(NodeInfo{
|
clusterID,
|
nodeID,
|
nodeAddress,
|
isAlive,
|
})
|
|
// replace node address
|
localNode.Addr = net.ParseIP(BroadcastIP)
|
//localNode.Addr = net.IPv4(255,255,255,255)
|
localNode.Port = BroadcastPort
|
for {
|
// logger.Info("localNode: %v %v\n", nodeName, nodeAddress)
|
mblist.SendBestEffort(&localNode, []byte(message))
|
time.Sleep(delay)
|
}
|
}
|
|
// Ready Returns a channel that will be closed when serf is ready
|
func (a *Agent) Ready() <-chan struct{} {
|
return a.readyCh
|
}
|
|
// Error Returns a channel that will be transmit a serf error
|
func (a *Agent) Error() <-chan error {
|
return a.errorCh
|
}
|
|
// Stop serf agent
|
func (a *Agent) Stop() {
|
if a.errorCh != nil {
|
logger.Info("a.Shutdown()", a.Leave())
|
logger.Info("a.Shutdown()", a.Shutdown())
|
close(a.errorCh)
|
a.errorCh = nil
|
}
|
}
|
|
// LocalMember returns the Member information for the local node
|
func (a *Agent) LocalMember() *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
member := serfAgent.LocalMember()
|
return &member
|
}
|
return nil
|
}
|
|
// GroupMembers returns a point-in-time snapshot of the members of by clusterID
|
func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
for _, member := range serfAgent.Members() {
|
logger.Info("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
|
if member.Tags[tagKeyClusterID] == clusterID {
|
members = append(members, member)
|
}
|
}
|
}
|
return
|
}
|
|
// Member get member information with node
|
func (a *Agent) Member(node string) *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
ms := serfAgent.Members()
|
for _, m := range ms {
|
if m.Name == node {
|
return &m
|
}
|
}
|
}
|
return nil
|
}
|
|
// SerfConfig get serf config
|
func (a *Agent) SerfConfig() *serf.Config {
|
return a.Agent.SerfConfig()
|
}
|
|
// Join serf clusters through one or more members
|
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
|
return a.Agent.Join(addrs, replay)
|
}
|
|
// UserEvent sends a UserEvent on Serf
|
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
|
return a.Agent.UserEvent(name, payload, coalesce)
|
}
|
|
// Query sends a Query on Serf
|
func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
|
return a.Agent.Query(name, payload, params)
|
}
|
|
func (a *Agent) retryJoin(ctx context.Context) (err error) {
|
if len(a.conf.RetryJoin) == 0 {
|
logger.Error("retry join mumber %d", len(a.conf.RetryJoin))
|
return nil
|
}
|
|
// Count of attempts
|
attempt := 0
|
ticker := time.NewTicker(a.conf.RetryInterval)
|
for {
|
logger.Info("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
|
var n int
|
|
// Try to join the specified serf nodes
|
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
|
if err == nil {
|
logger.Error("serf: Join completed. Synced with %d initial agents", n)
|
break
|
}
|
attempt++
|
|
// If RetryMaxAttempts is greater than 0, agent will exit
|
// and throw an error when the number of attempts exceeds RetryMaxAttempts,
|
// else agent will try to join other nodes until successful always
|
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
|
err = errors.New("serf: maximum retry join attempts made, exiting")
|
logger.Error(err, err.Error())
|
break
|
}
|
select {
|
case <-ctx.Done():
|
err = ctx.Err()
|
goto done
|
// Waiting for ticker to trigger
|
case <-ticker.C:
|
}
|
}
|
done:
|
ticker.Stop()
|
return
|
}
|
|
//GetDbFromCluster get the newest database after join cluster
|
//dbPathWrite the path where to write after got a database,
|
func (a *Agent) GetDbFromCluster(dbPathWrite string) {
|
//members: get name of first member
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
var specmembername string
|
for _, m := range mbs {
|
if m.Addr.String() != a.conf.BindAddr {
|
specmembername = m.Name
|
break
|
}
|
}
|
logger.Info(specmembername)
|
|
//query: get db file.
|
params := serf.QueryParam{
|
FilterNodes: strings.Fields(specmembername),
|
}
|
|
resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
logger.Error("err: ", err)
|
}
|
|
go func() {
|
respCh := resp.ResponseCh()
|
for {
|
select {
|
case r := <-respCh:
|
logger.Info("x length is: ", len(r.Payload))
|
|
// // byte to file.
|
SerfDbConn.Close()
|
SerfDbConn = nil
|
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
|
if err != nil {
|
logger.Error("query byte to file error!", err)
|
}
|
err := InitDbConn("")
|
if err != nil {
|
logger.Error("create db conn of test.db error: ", err)
|
}
|
return
|
}
|
}
|
}()
|
}
|
|
type QueryTableDataParam struct {
|
Tables []string `json:"tables"`
|
From string `json:"from"`
|
}
|
|
var QueryTcpResponseChan = make(chan []byte)
|
//GetDbFromCluster get the newest database after join cluster
|
//dbPathWrite the path where to write after got a database,
|
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string,error) {
|
//members: get name of first member
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
var specmembername string
|
for _, m := range mbs {
|
logger.Info("m",m)
|
if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
|
if strings.HasPrefix(a.conf.NodeName, "DSVAD"){
|
if strings.HasPrefix(m.Name, "DSVAD") {
|
specmembername = m.Name
|
break
|
}
|
}else{
|
specmembername = m.Name
|
break
|
}
|
}
|
}
|
logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
|
|
//query: get db file.
|
params := serf.QueryParam{
|
FilterNodes: strings.Fields(specmembername),
|
}
|
|
//get db tables
|
var fromP = QueryTableDataParam{
|
Tables: tableNames,
|
From: a.conf.NodeName,
|
}
|
tBytes, _ := json.Marshal(fromP)
|
|
resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
logger.Error("err: ", err)
|
}
|
logger.Info("Query.resp.err:",err,"resp:",resp)
|
|
var dumpSqls []string
|
|
var wg sync.WaitGroup
|
wg.Add(1)
|
ticker := time.NewTicker(300*time.Second)
|
go func(tk *time.Ticker) {
|
defer tk.Stop()
|
defer wg.Done()
|
for {
|
select {
|
case <-tk.C:
|
return
|
case msg := <- QueryTcpResponseChan:
|
logger.Info("Query response's len:", len(msg))
|
err := json.Unmarshal(msg, &dumpSqls)
|
if err == nil {
|
logger.Error("dumpSql:", dumpSqls)
|
logger.Error("data dump success")
|
}
|
return
|
}
|
}
|
}(ticker)
|
wg.Wait()
|
return &dumpSqls,nil
|
|
//r, err = c.Query([]string{query}, false, false)
|
//if err != nil {
|
// return err
|
//}
|
//for _, x := range r[0].Values {
|
// y := logger.Info("%s;\n", x[0].(string))
|
// if _, err := w.Write([]byte(y)); err != nil {
|
// return err
|
// }
|
//}
|
|
}
|
|
type SqlUserEvent struct {
|
Owner string `json:"owner"`
|
Sql []string `json:"sql"`
|
}
|
|
//SyncSql boardcast sql to cluster
|
func (a *Agent) SyncSql(sqlOp []string) {
|
// event : use to send command to operate db.
|
var sqlUe = SqlUserEvent{
|
Owner: a.conf.NodeName,
|
Sql: sqlOp,
|
}
|
ueB, err := json.Marshal(sqlUe)
|
if err !=nil {
|
logger.Error("sqlUE marshal err:",err)
|
return
|
}
|
err = a.UserEvent(UserEventSyncSql, ueB, false)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
logger.Error("err: ", err)
|
}
|
}
|
|
//更新同步库的比对缓存
|
func (a *Agent) SyncDbTablePersonCache(b []byte) {
|
err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
|
if err !=nil{
|
logger.Error("UserEventSyncDbTablePersonCache err:",err)
|
}
|
}
|
|
//Init serf Init
|
func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) {
|
agent, err := InitNode(clusterID, password, nodeID)
|
if err != nil {
|
logger.Error("InitNode failed, error: %s", err)
|
return agent, err
|
}
|
|
err = agent.JoinByNodeAddrs(addrs)
|
if err != nil {
|
logger.Error("JoinByNodeIP failed, error: %s", err)
|
return agent, err
|
}
|
|
return agent, err
|
}
|
|
//InitNode web后台收到创建集群的请求,
|
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
|
conf := DefaultConfig()
|
logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
|
conf.ClusterID = clusterID
|
conf.NodeName = nodeID
|
if password == "" {
|
conf.EncryptKey = DefaultEncryptKey
|
} else {
|
if len(password) >= 16 {
|
password = password[:16]
|
} else {
|
password = fmt.Sprintf("%016s", password)[:16]
|
//return nil, fmt.Errorf("error password")
|
}
|
conf.EncryptKey = password
|
}
|
agent, err := Create(conf)
|
if err != nil {
|
logger.Error("create agent failed, error: %s", err)
|
return agent, err
|
}
|
|
agent.Start(context.Background())
|
//<- agent.readyCh
|
go func() {
|
agent.ShutdownCh()
|
}()
|
time.Sleep(time.Second)
|
logger.Info("Stats:", agent.Agent.Serf().Stats())
|
logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
|
logger.Info("create agent sucess!!")
|
|
return agent, nil
|
}
|
|
func (a *Agent) JoinByNodeAddrs(addrs []string) error {
|
var nodes []string
|
|
if len(addrs) == 0 {
|
return fmt.Errorf("No Nodes To Join!")
|
}
|
for _, addr := range addrs {
|
nodes = append(nodes, addr)
|
}
|
|
a.Agent.Join(nodes, true)
|
|
return nil
|
}
|
|
//func (a *Agent) JoinByNodeIP(ips []string) error {
|
// var nodes []string
|
//
|
// if len(ips) == 0 {
|
// return fmt.Errorf("No Nodes To Join!")
|
// }
|
// for _, ip := range ips {
|
// node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
|
// nodes = append(nodes, node)
|
// }
|
//
|
// n, err := a.Agent.Join(nodes, true)
|
// if err != nil || n == 0 {
|
// return fmt.Errorf("Error Encrypt Key!")
|
// }
|
//
|
// return err
|
//}
|
|
func (a *Agent) GetNodes() (nodes []NodeInfo) {
|
var node NodeInfo
|
logger.Info("a.conf.ClusterID:", a.conf.ClusterID)
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
for _, mb := range mbs {
|
node.NodeID = mb.Name
|
node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
|
node.IsAlive = int(mb.Status)
|
node.ClusterID = mb.Tags[tagKeyClusterID]
|
|
nodes = append(nodes, node)
|
}
|
|
return nodes
|
}
|