From 954aef5b16c944b0f27e03ccaafb1748b62f0d48 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期二, 06 八月 2019 09:46:52 +0800
Subject: [PATCH] 增加接口,初始化自动加入集群
---
agent.go | 126 ++++++++++++++++++++++++++++++------------
1 files changed, 90 insertions(+), 36 deletions(-)
diff --git a/agent.go b/agent.go
index 9cea45c..2fb96b1 100644
--- a/agent.go
+++ b/agent.go
@@ -18,12 +18,12 @@
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"github.com/hashicorp/memberlist"
"io/ioutil"
"os"
- "strconv"
//"os"
"strings"
@@ -35,6 +35,10 @@
"log"
)
+const (
+ QueryEventGetDB = "GetDatabase"
+ QueryEventUpdateDBData = "UpdateDBData"
+)
// Agent warps the serf agent
type Agent struct {
*agent.Agent
@@ -106,41 +110,71 @@
var tmpstringslice []string
tmpstringslice = append(tmpstringslice, string(ev.Payload))
fmt.Println(tmpstringslice)
- results, err := DoExecute(tmpstringslice)
+ results, err := ExecuteWriteSql(tmpstringslice)
for _, result := range results {
fmt.Println(result, "results err: ", err)
}
case *serf.Query:
- //bak file and send resp
- filename, err := BakDbFile()
- if err != nil {
- fmt.Println("bak db file error!")
- return
- }
- fmt.Println(filename)
- filebuf, err := ioutil.ReadFile(filename)
- fmt.Println("filebuf: ", len(filebuf))
- if err != nil {
- fmt.Printf("file to []bytes error: %s\n", err)
- return
- }
-
- err = os.Remove(filename)
- if err != nil {
- fmt.Printf("remove file%s\n failed", filename)
- return
- }
-
- fmt.Println("query payload: ", len(ev.Payload))
- if query, ok := event.(*serf.Query); ok {
- if err := query.Respond(filebuf); err != nil {
- fmt.Printf("err: %s\n", err)
+ if ev.Name == QueryEventGetDB{
+ //bak file and send resp
+ filename, err := BakDbFile()
+ if err != nil {
+ fmt.Println("bak db file error!")
return
}
+ fmt.Println(filename)
+
+ filebuf, err := ioutil.ReadFile(filename)
+ fmt.Println("filebuf: ", len(filebuf))
+ if err != nil {
+ fmt.Printf("file to []bytes error: %s\n", err)
+ return
+ }
+
+ err = os.Remove(filename)
+ if err != nil {
+ fmt.Printf("remove file%s\n failed", filename)
+ return
+ }
+
+ fmt.Println("query payload: ", len(ev.Payload))
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(filebuf); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+ } else if ev.Name == QueryEventUpdateDBData {
+ fmt.Println(string(ev.Payload))
+ var tmpstringslice []string
+ tmpstringslice = append(tmpstringslice, string(ev.Payload))
+ fmt.Println(tmpstringslice)
+ rows, err := ExecuteQuerySql(tmpstringslice)
+ if err != nil {
+ fmt.Println("err: ", err)
+ return
+ }
+ var rowsReturn []Rows
+ for _,r := range rows {
+ rowsReturn = append(rowsReturn, *r)
+ }
+
+ bytesReturn, err := json.Marshal(rowsReturn)
+ fmt.Println("results: ", bytesReturn)
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(bytesReturn); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+
+ //var res []*Rows
+ //json.Unmarshal(bytesReturn, &res)
}
+
default:
fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -281,8 +315,6 @@
return
}
-
-
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
@@ -302,7 +334,7 @@
FilterNodes: strings.Fields(specmembername),
}
- resp, err := a.Query("getDatabase", []byte(""), ¶ms)
+ resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
fmt.Println("err: ", err)
}
@@ -315,13 +347,13 @@
fmt.Println("x length is: ", len(r.Payload))
// // byte to file.
- Dbconn.Close()
- Dbconn = nil
+ SerfDbConn.Close()
+ SerfDbConn = nil
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
if err != nil {
fmt.Println("query byte to file error!", err)
}
- err := GetConn()
+ err := InitDbConn("")
if err != nil {
fmt.Println("create db conn of test.db error: ", err)
}
@@ -341,8 +373,24 @@
}
//Init serf Init
-//web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func Init(clusterID string, password string, nodeID string) (*Agent, error) {
+func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID)
+ if err != nil {
+ fmt.Printf("InitNode failed, error: %s", err)
+ return agent, err
+ }
+
+ err = agent.JoinByNodeIP(ips)
+ if err != nil {
+ fmt.Printf("JoinByNodeIP failed, error: %s", err)
+ return agent, err
+ }
+
+ return agent, err
+}
+
+//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
+func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
conf := DefaultConfig()
fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
//conf.ClusterID = clusterID
@@ -377,8 +425,14 @@
return agent, nil
}
-func (a *Agent) JoinByNodeIP(ip string) error {
- n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
+func (a *Agent) JoinByNodeIP(ips []string) error {
+ var nodes []string
+ for _, ip := range ips {
+ node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
+ nodes = append(nodes, node)
+ }
+
+ n, err := a.Agent.Join(nodes, true)
if err != nil || n == 0{
a.Stop()
fmt.Println("Stop node")
--
Gitblit v1.8.0