From 86341ab9e3d7eda30fbb4df2a77b9419a89f97c1 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期五, 02 八月 2019 17:14:27 +0800
Subject: [PATCH] 添加基础业务处理接口
---
agent.go | 241 ++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 222 insertions(+), 19 deletions(-)
diff --git a/agent.go b/agent.go
index f1b2e28..9cea45c 100644
--- a/agent.go
+++ b/agent.go
@@ -14,16 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package serf
+package syncdb
import (
"context"
"errors"
+ "fmt"
+ "github.com/hashicorp/memberlist"
+ "io/ioutil"
+ "os"
+ "strconv"
+
+ //"os"
+ "strings"
"time"
- "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
+ //"github.com/apache/servicecomb-service-center/pkg/log"
+ "log"
)
// Agent warps the serf agent
@@ -43,10 +52,21 @@
}
// create serf agent with serf config
+ fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
if err != nil {
return nil, err
}
+ // Create the keyring
+ keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
+ if err != nil {
+ fmt.Printf("Failed to restore keyring: %s", err)
+ return nil, err
+ }
+ serfConf.MemberlistConfig.Keyring = keyring
+ fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
+ len(conf.EncryptKey), conf.EncryptKey)
+
return &Agent{
Agent: serfAgent,
conf: conf,
@@ -59,7 +79,7 @@
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
if err != nil {
- log.Errorf(err, "start serf agent failed")
+ log.Println(err, "start serf agent failed")
a.errorCh <- err
return
}
@@ -67,7 +87,7 @@
err = a.retryJoin(ctx)
if err != nil {
- log.Errorf(err, "start serf agent failed")
+ log.Println(err, "start serf agent failed")
if err != ctx.Err() && a.errorCh != nil {
a.errorCh <- err
}
@@ -79,17 +99,66 @@
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
- if event.EventType() != serf.EventMemberJoin {
- return
- }
- if a.conf.Mode == ModeCluster {
- if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
+ switch ev := event.(type) {
+ case serf.UserEvent:
+ fmt.Println(string(ev.Payload))
+ var tmpstringslice []string
+ tmpstringslice = append(tmpstringslice, string(ev.Payload))
+ fmt.Println(tmpstringslice)
+ results, err := DoExecute(tmpstringslice)
+
+ for _, result := range results {
+ fmt.Println(result, "results err: ", err)
+ }
+
+ case *serf.Query:
+ //bak file and send resp
+ filename, err := BakDbFile()
+ if err != nil {
+ fmt.Println("bak db file error!")
return
}
+ fmt.Println(filename)
+
+ filebuf, err := ioutil.ReadFile(filename)
+ fmt.Println("filebuf: ", len(filebuf))
+ if err != nil {
+ fmt.Printf("file to []bytes error: %s\n", err)
+ return
+ }
+
+ err = os.Remove(filename)
+ if err != nil {
+ fmt.Printf("remove file%s\n failed", filename)
+ return
+ }
+
+ fmt.Println("query payload: ", len(ev.Payload))
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(filebuf); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+
+ default:
+ fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
}
- a.DeregisterEventHandler(a)
- close(a.readyCh)
+
+ //if event.EventType() != serf.EventMemberJoin {
+ // fmt.Printf("event.EventType() != serf.EventMemberJoin")
+ // return
+ //}
+ //
+ //if a.conf.Mode == ModeCluster {
+ // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
+ // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
+ // return
+ // }
+ //}
+ //a.DeregisterEventHandler(a)
+ //close(a.readyCh)
}
// Ready Returns a channel that will be closed when serf is ready
@@ -122,13 +191,13 @@
return nil
}
-// GroupMembers returns a point-in-time snapshot of the members of by groupName
-func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
+// GroupMembers returns a point-in-time snapshot of the members of by clusterID
+func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) {
serfAgent := a.Agent.Serf()
if serfAgent != nil {
for _, member := range serfAgent.Members() {
- log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
- if member.Tags[tagKeyClusterName] == groupName {
+ log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
+ if member.Tags[tagKeyClusterID] == clusterID {
members = append(members, member)
}
}
@@ -172,7 +241,7 @@
func (a *Agent) retryJoin(ctx context.Context) (err error) {
if len(a.conf.RetryJoin) == 0 {
- log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
+ log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
return nil
}
@@ -180,13 +249,13 @@
attempt := 0
ticker := time.NewTicker(a.conf.RetryInterval)
for {
- log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
+ log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
var n int
// Try to join the specified serf nodes
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
if err == nil {
- log.Infof("serf: Join completed. Synced with %d initial agents", n)
+ log.Printf("serf: Join completed. Synced with %d initial agents", n)
break
}
attempt++
@@ -196,7 +265,7 @@
// else agent will try to join other nodes until successful always
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
err = errors.New("serf: maximum retry join attempts made, exiting")
- log.Errorf(err, err.Error())
+ log.Println(err, err.Error())
break
}
select {
@@ -211,3 +280,137 @@
ticker.Stop()
return
}
+
+
+
+//GetDbFromCluster get the newest database after join cluster
+//dbPathWrite the path where to write after got a database,
+func (a *Agent)GetDbFromCluster(dbPathWrite string) {
+ //members: get name of first member
+ mbs := a.GroupMembers(a.conf.ClusterID)
+ var specmembername string
+ for _, m := range mbs {
+ if m.Addr.String() != a.conf.BindAddr {
+ specmembername = m.Name
+ break
+ }
+ }
+ fmt.Println(specmembername)
+
+ //query: get db file.
+ params := serf.QueryParam{
+ FilterNodes: strings.Fields(specmembername),
+ }
+
+ resp, err := a.Query("getDatabase", []byte(""), ¶ms)
+ if err == nil || !strings.Contains(err.Error(), "cannot contain") {
+ fmt.Println("err: ", err)
+ }
+
+ go func() {
+ respCh := resp.ResponseCh()
+ for {
+ select {
+ case r := <-respCh:
+ fmt.Println("x length is: ", len(r.Payload))
+
+ // // byte to file.
+ Dbconn.Close()
+ Dbconn = nil
+ err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
+ if err != nil {
+ fmt.Println("query byte to file error!", err)
+ }
+ err := GetConn()
+ if err != nil {
+ fmt.Println("create db conn of test.db error: ", err)
+ }
+ return
+ }
+ }
+ }()
+}
+
+//SyncSql boardcast sql to cluster
+func (a *Agent)SyncSql(sqlOp string) {
+ // event : use to send command to operate db.
+ err := a.UserEvent("SyncSql", []byte(sqlOp), false)
+ if err == nil || !strings.Contains(err.Error(), "cannot contain") {
+ fmt.Println("err: ", err)
+ }
+}
+
+//Init serf Init
+//web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
+func Init(clusterID string, password string, nodeID string) (*Agent, error) {
+ conf := DefaultConfig()
+ fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
+ //conf.ClusterID = clusterID
+ conf.NodeName = nodeID
+ if password == "" {
+ conf.EncryptKey = DefaultEncryptKey
+ }else{
+ if len(password) >= 16 {
+ password = password[:16]
+ }else{
+ password = fmt.Sprintf("%016s", password)[:16]
+ //return nil, fmt.Errorf("error password")
+ }
+ conf.EncryptKey = password
+ }
+ agent, err := Create(conf)
+ if err != nil {
+ fmt.Printf("create agent failed, error: %s", err)
+ return agent, err
+ }
+
+ agent.Start(context.Background())
+ //<- agent.readyCh
+ go func() {
+ agent.ShutdownCh()
+ }()
+ time.Sleep(time.Second)
+ fmt.Println("Stats:",agent.Agent.Serf().Stats())
+ fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
+ fmt.Printf("create agent sucess!!")
+
+ return agent, nil
+}
+
+func (a *Agent) JoinByNodeIP(ip string) error {
+ n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
+ if err != nil || n == 0{
+ a.Stop()
+ fmt.Println("Stop node")
+ return fmt.Errorf("Error Encrypt Key!")
+ }
+
+ return err
+}
+
+type Node struct {
+ clusterID string
+ NodeID string
+ IP string
+ isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
+}
+
+func (a *Agent) GetNodes() (nodes []Node) {
+ var node Node
+ fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
+ mbs := a.GroupMembers(a.conf.ClusterID)
+ for _, mb := range mbs {
+ node.NodeID = mb.Name
+ node.IP = mb.Addr.String()
+ node.isAlive = int(mb.Status)
+ node.clusterID = mb.Tags[tagKeyClusterID]
+
+ nodes = append(nodes, node)
+ }
+
+ return nodes
+}
+
+
+
+
--
Gitblit v1.8.0