From a2d9de39e35663578861385fca686944857d473b Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期五, 27 九月 2019 19:03:58 +0800
Subject: [PATCH] add sync_serf init
---
agent.go | 141 ++++++++++++++++++++++++++--------------------
1 files changed, 80 insertions(+), 61 deletions(-)
diff --git a/agent.go b/agent.go
index 125f421..1994157 100644
--- a/agent.go
+++ b/agent.go
@@ -35,7 +35,7 @@
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
//"github.com/apache/servicecomb-service-center/pkg/log"
- "log"
+ "basic.com/valib/logger.git"
)
const (
@@ -69,7 +69,7 @@
}
// create serf agent with serf config
- fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
+ logger.Info("conf.Config.EncryptKey:", conf.EncryptKey)
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
if err != nil {
return nil, err
@@ -77,12 +77,16 @@
// Create the keyring
keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
if err != nil {
- fmt.Printf("Failed to restore keyring: %s", err)
+ logger.Error("Failed to restore keyring: %s", err)
return nil, err
}
serfConf.MemberlistConfig.Keyring = keyring
- fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
+ logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
len(conf.EncryptKey), conf.EncryptKey)
+
+ ltLock.Lock()
+ curLTime = QueryLTimeFromDbByGorm()
+ ltLock.Unlock()
return &Agent{
Agent: serfAgent,
@@ -96,7 +100,7 @@
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
if err != nil {
- log.Println(err, "start serf agent failed")
+ logger.Error(err, "start serf agent failed")
a.errorCh <- err
return
}
@@ -104,7 +108,7 @@
err = a.retryJoin(ctx)
if err != nil {
- log.Println(err, "start serf agent failed")
+ logger.Error(err, "start serf agent failed")
if err != ctx.Err() && a.errorCh != nil {
a.errorCh <- err
}
@@ -114,6 +118,8 @@
}
var SyncDbTablePersonCacheChan = make(chan []byte,0)
+var curLTime uint64
+var ltLock sync.RWMutex
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
@@ -123,19 +129,32 @@
switch ev := event.(type) {
case serf.UserEvent:
+ ltLock.Lock()
+ defer ltLock.Unlock()
if ev.Name == UserEventSyncSql {
var sqlUe SqlUserEvent
err := json.Unmarshal(ev.Payload, &sqlUe)
if err !=nil {
- fmt.Println("sqlUe unmarshal err:",err)
+ logger.Error("sqlUe unmarshal err:",err)
return
}
if sqlUe.Owner != a.conf.NodeName {
- //results, err := ExecuteWriteSql(sqlArr)
+ evTime := uint64(ev.LTime)
+ logger.Info("ev.LTime:",evTime,",curLTime:",curLTime,",SqlUserEvent.sql:",sqlUe.Sql)
+ if curLTime !=0 && evTime < curLTime{//鏄鐞嗚繃鐨勪簨浠�
+ logger.Info("already executed event,ev.LTime:",evTime,"SqlUserEvent.sql:",sqlUe.Sql)
+ return
+ }
flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
- fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",flag)
+ logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
+ if flag {
+ curLTime = evTime
+
+ ExecuteSqlByGorm([]string{"update sync_serf set lamport_time='"+strconv.FormatUint(curLTime,10)+"'"})
+ }
}
} else if ev.Name == UserEventSyncDbTablePersonCache {
+ logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
SyncDbTablePersonCacheChan <- ev.Payload
}
@@ -146,39 +165,39 @@
//bak file and send resp
filename, err := BakDbFile()
if err != nil {
- fmt.Println("bak db file error!")
+ logger.Error("bak db file error!")
return
}
- fmt.Println(filename)
+ logger.Info(filename)
filebuf, err := ioutil.ReadFile(filename)
- fmt.Println("filebuf: ", len(filebuf))
+ logger.Info("filebuf: ", len(filebuf))
if err != nil {
- fmt.Printf("file to []bytes error: %s\n", err)
+ logger.Error("file to []bytes error: %s\n", err)
return
}
err = os.Remove(filename)
if err != nil {
- fmt.Printf("remove file%s\n failed", filename)
+ logger.Error("remove file%s\n failed", filename)
return
}
- fmt.Println("query payload: ", len(ev.Payload))
+ logger.Info("query payload: ", len(ev.Payload))
if query, ok := event.(*serf.Query); ok {
if err := query.Respond(filebuf); err != nil {
- fmt.Printf("err: %s\n", err)
+ logger.Error("err: %s\n", err)
return
}
}
} else if ev.Name == QueryEventUpdateDBData {
- //fmt.Println(string(ev.Payload))
+ //logger.Info(string(ev.Payload))
//var tmpstringslice []string
//tmpstringslice = append(tmpstringslice, string(ev.Payload))
- //fmt.Println(tmpstringslice)
+ //logger.Info(tmpstringslice)
//rows, err := ExecuteQuerySql(tmpstringslice)
//if err != nil {
- // fmt.Println("err: ", err)
+ // logger.Error("err: ", err)
// return
//}
//var rowsReturn []Rows
@@ -188,20 +207,20 @@
var tableNames []string
err := json.Unmarshal(ev.Payload, &tableNames)
if err !=nil {
- fmt.Println("Query tableNames unmarshal err")
+ logger.Error("Query tableNames unmarshal err")
return
}
- fmt.Println("Query tableNames:",tableNames)
+ logger.Info("Query tableNames:",tableNames)
datas, err := ExecuteQueryByGorm(tableNames)
if err !=nil {
- fmt.Println("queryByGorm err")
+ logger.Error("queryByGorm err")
return
}
bytesReturn, err := json.Marshal(datas)
- fmt.Println("results.len: ", len(bytesReturn))
+ logger.Info("results.len: ", len(bytesReturn))
if query, ok := event.(*serf.Query); ok {
if err := query.Respond(bytesReturn); err != nil {
- fmt.Printf("err: %s\n", err)
+ logger.Error("err: %s\n", err)
return
}
}
@@ -216,24 +235,24 @@
leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'"
ExecuteSqlByGorm([]string{ leaveSql })
- fmt.Println("EventMemberLeave,current Members:",ev.Members)
+ logger.Info("EventMemberLeave,current Members:",ev.Members)
}
return
}
default:
- fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
+ logger.Warn("Unknown event type: %s\n", ev.EventType().String())
}
//if event.EventType() != serf.EventMemberJoin {
- // fmt.Printf("event.EventType() != serf.EventMemberJoin")
+ // logger.Info("event.EventType() != serf.EventMemberJoin")
// return
//}
//
//if a.conf.Mode == ModeCluster {
// if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
- // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
+ // logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
// return
// }
//}
@@ -246,7 +265,7 @@
serf := a.Agent.Serf()
mb := serf.LocalMember()
mblist := serf.Memberlist()
- fmt.Println("mb:", mb)
+ logger.Info("mb:", mb)
// copy local node
localNode := *mblist.LocalNode()
@@ -267,7 +286,7 @@
//localNode.Addr = net.IPv4(255,255,255,255)
localNode.Port = BroadcastPort
for {
- // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
+ // logger.Info("localNode: %v %v\n", nodeName, nodeAddress)
mblist.SendBestEffort(&localNode, []byte(message))
time.Sleep(delay)
}
@@ -286,8 +305,8 @@
// Stop serf agent
func (a *Agent) Stop() {
if a.errorCh != nil {
- a.Leave()
- a.Shutdown()
+ logger.Info("a.Shutdown()", a.Leave())
+ logger.Info("a.Shutdown()", a.Shutdown())
close(a.errorCh)
a.errorCh = nil
}
@@ -308,7 +327,7 @@
serfAgent := a.Agent.Serf()
if serfAgent != nil {
for _, member := range serfAgent.Members() {
- log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
+ logger.Info("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
if member.Tags[tagKeyClusterID] == clusterID {
members = append(members, member)
}
@@ -353,7 +372,7 @@
func (a *Agent) retryJoin(ctx context.Context) (err error) {
if len(a.conf.RetryJoin) == 0 {
- log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
+ logger.Error("retry join mumber %d", len(a.conf.RetryJoin))
return nil
}
@@ -361,13 +380,13 @@
attempt := 0
ticker := time.NewTicker(a.conf.RetryInterval)
for {
- log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
+ logger.Info("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
var n int
// Try to join the specified serf nodes
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
if err == nil {
- log.Printf("serf: Join completed. Synced with %d initial agents", n)
+ logger.Error("serf: Join completed. Synced with %d initial agents", n)
break
}
attempt++
@@ -377,7 +396,7 @@
// else agent will try to join other nodes until successful always
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
err = errors.New("serf: maximum retry join attempts made, exiting")
- log.Println(err, err.Error())
+ logger.Error(err, err.Error())
break
}
select {
@@ -405,7 +424,7 @@
break
}
}
- fmt.Println(specmembername)
+ logger.Info(specmembername)
//query: get db file.
params := serf.QueryParam{
@@ -414,7 +433,7 @@
resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- fmt.Println("err: ", err)
+ logger.Error("err: ", err)
}
go func() {
@@ -422,18 +441,18 @@
for {
select {
case r := <-respCh:
- fmt.Println("x length is: ", len(r.Payload))
+ logger.Info("x length is: ", len(r.Payload))
// // byte to file.
SerfDbConn.Close()
SerfDbConn = nil
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
if err != nil {
- fmt.Println("query byte to file error!", err)
+ logger.Error("query byte to file error!", err)
}
err := InitDbConn("")
if err != nil {
- fmt.Println("create db conn of test.db error: ", err)
+ logger.Error("create db conn of test.db error: ", err)
}
return
}
@@ -448,7 +467,7 @@
mbs := a.GroupMembers(a.conf.ClusterID)
var specmembername string
for _, m := range mbs {
- fmt.Println("m",m)
+ logger.Info("m",m)
if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad
if strings.HasPrefix(a.conf.NodeName, "DSVAD"){
if strings.HasPrefix(m.Name, "DSVAD") {
@@ -461,7 +480,7 @@
}
}
}
- fmt.Println("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
+ logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
//query: get db file.
params := serf.QueryParam{
@@ -473,9 +492,9 @@
resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- fmt.Println("err: ", err)
+ logger.Error("err: ", err)
}
- fmt.Println("Query.resp.err:",err,"resp:",resp)
+ logger.Info("Query.resp.err:",err,"resp:",resp)
var wg sync.WaitGroup
wg.Add(1)
@@ -486,11 +505,11 @@
for {
select {
case r := <-respCh:
- fmt.Println("Query response's len:", len(r.Payload))
+ logger.Info("Query response's len:", len(r.Payload))
err := json.Unmarshal(r.Payload, &dumpSqls)
if err ==nil {
- fmt.Println("dumpSql:",dumpSqls)
- fmt.Println("data dump success")
+ logger.Error("dumpSql:",dumpSqls)
+ logger.Error("data dump success")
}
return
}
@@ -504,7 +523,7 @@
// return err
//}
//for _, x := range r[0].Values {
- // y := fmt.Sprintf("%s;\n", x[0].(string))
+ // y := logger.Info("%s;\n", x[0].(string))
// if _, err := w.Write([]byte(y)); err != nil {
// return err
// }
@@ -526,12 +545,12 @@
}
ueB, err := json.Marshal(sqlUe)
if err !=nil {
- fmt.Println("sqlUE marshal err:",err)
+ logger.Error("sqlUE marshal err:",err)
return
}
err = a.UserEvent(UserEventSyncSql, ueB, false)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
- fmt.Println("err: ", err)
+ logger.Error("err: ", err)
}
}
@@ -539,7 +558,7 @@
func (a *Agent) SyncDbTablePersonCache(b []byte) {
err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
if err !=nil{
- fmt.Println("UserEventSyncDbTablePersonCache err:",err)
+ logger.Error("UserEventSyncDbTablePersonCache err:",err)
}
}
@@ -547,13 +566,13 @@
func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) {
agent, err := InitNode(clusterID, password, nodeID)
if err != nil {
- fmt.Printf("InitNode failed, error: %s", err)
+ logger.Error("InitNode failed, error: %s", err)
return agent, err
}
err = agent.JoinByNodeAddrs(addrs)
if err != nil {
- fmt.Printf("JoinByNodeIP failed, error: %s", err)
+ logger.Error("JoinByNodeIP failed, error: %s", err)
return agent, err
}
@@ -563,7 +582,7 @@
//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
conf := DefaultConfig()
- fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
+ logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
conf.ClusterID = clusterID
conf.NodeName = nodeID
if password == "" {
@@ -579,7 +598,7 @@
}
agent, err := Create(conf)
if err != nil {
- fmt.Printf("create agent failed, error: %s", err)
+ logger.Error("create agent failed, error: %s", err)
return agent, err
}
@@ -589,9 +608,9 @@
agent.ShutdownCh()
}()
time.Sleep(time.Second)
- fmt.Println("Stats:", agent.Agent.Serf().Stats())
- fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
- fmt.Println("create agent sucess!!")
+ logger.Info("Stats:", agent.Agent.Serf().Stats())
+ logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
+ logger.Info("create agent sucess!!")
return agent, nil
}
@@ -632,7 +651,7 @@
func (a *Agent) GetNodes() (nodes []NodeInfo) {
var node NodeInfo
- fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
+ logger.Info("a.conf.ClusterID:", a.conf.ClusterID)
mbs := a.GroupMembers(a.conf.ClusterID)
for _, mb := range mbs {
node.NodeID = mb.Name
--
Gitblit v1.8.0