From 954aef5b16c944b0f27e03ccaafb1748b62f0d48 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期二, 06 八月 2019 09:46:52 +0800
Subject: [PATCH] 增加接口,初始化自动加入集群
---
agent.go | 126 ++++++++++++++++++++++---------
config.go | 4
dbself.go | 79 +++++++++++--------
3 files changed, 139 insertions(+), 70 deletions(-)
diff --git a/agent.go b/agent.go
index 9cea45c..2fb96b1 100644
--- a/agent.go
+++ b/agent.go
@@ -18,12 +18,12 @@
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"github.com/hashicorp/memberlist"
"io/ioutil"
"os"
- "strconv"
//"os"
"strings"
@@ -35,6 +35,10 @@
"log"
)
+const (
+ QueryEventGetDB = "GetDatabase"
+ QueryEventUpdateDBData = "UpdateDBData"
+)
// Agent warps the serf agent
type Agent struct {
*agent.Agent
@@ -106,41 +110,71 @@
var tmpstringslice []string
tmpstringslice = append(tmpstringslice, string(ev.Payload))
fmt.Println(tmpstringslice)
- results, err := DoExecute(tmpstringslice)
+ results, err := ExecuteWriteSql(tmpstringslice)
for _, result := range results {
fmt.Println(result, "results err: ", err)
}
case *serf.Query:
- //bak file and send resp
- filename, err := BakDbFile()
- if err != nil {
- fmt.Println("bak db file error!")
- return
- }
- fmt.Println(filename)
- filebuf, err := ioutil.ReadFile(filename)
- fmt.Println("filebuf: ", len(filebuf))
- if err != nil {
- fmt.Printf("file to []bytes error: %s\n", err)
- return
- }
-
- err = os.Remove(filename)
- if err != nil {
- fmt.Printf("remove file%s\n failed", filename)
- return
- }
-
- fmt.Println("query payload: ", len(ev.Payload))
- if query, ok := event.(*serf.Query); ok {
- if err := query.Respond(filebuf); err != nil {
- fmt.Printf("err: %s\n", err)
+ if ev.Name == QueryEventGetDB{
+ //bak file and send resp
+ filename, err := BakDbFile()
+ if err != nil {
+ fmt.Println("bak db file error!")
return
}
+ fmt.Println(filename)
+
+ filebuf, err := ioutil.ReadFile(filename)
+ fmt.Println("filebuf: ", len(filebuf))
+ if err != nil {
+ fmt.Printf("file to []bytes error: %s\n", err)
+ return
+ }
+
+ err = os.Remove(filename)
+ if err != nil {
+ fmt.Printf("remove file%s\n failed", filename)
+ return
+ }
+
+ fmt.Println("query payload: ", len(ev.Payload))
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(filebuf); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+ } else if ev.Name == QueryEventUpdateDBData {
+ fmt.Println(string(ev.Payload))
+ var tmpstringslice []string
+ tmpstringslice = append(tmpstringslice, string(ev.Payload))
+ fmt.Println(tmpstringslice)
+ rows, err := ExecuteQuerySql(tmpstringslice)
+ if err != nil {
+ fmt.Println("err: ", err)
+ return
+ }
+ var rowsReturn []Rows
+ for _,r := range rows {
+ rowsReturn = append(rowsReturn, *r)
+ }
+
+ bytesReturn, err := json.Marshal(rowsReturn)
+ fmt.Println("results: ", bytesReturn)
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(bytesReturn); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+
+ //var res []*Rows
+ //json.Unmarshal(bytesReturn, &res)
}
+
default:
fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -281,8 +315,6 @@
return
}
-
-
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
@@ -302,7 +334,7 @@
FilterNodes: strings.Fields(specmembername),
}
- resp, err := a.Query("getDatabase", []byte(""), ¶ms)
+ resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
fmt.Println("err: ", err)
}
@@ -315,13 +347,13 @@
fmt.Println("x length is: ", len(r.Payload))
// // byte to file.
- Dbconn.Close()
- Dbconn = nil
+ SerfDbConn.Close()
+ SerfDbConn = nil
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
if err != nil {
fmt.Println("query byte to file error!", err)
}
- err := GetConn()
+ err := InitDbConn("")
if err != nil {
fmt.Println("create db conn of test.db error: ", err)
}
@@ -341,8 +373,24 @@
}
//Init serf Init
-//web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func Init(clusterID string, password string, nodeID string) (*Agent, error) {
+func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID)
+ if err != nil {
+ fmt.Printf("InitNode failed, error: %s", err)
+ return agent, err
+ }
+
+ err = agent.JoinByNodeIP(ips)
+ if err != nil {
+ fmt.Printf("JoinByNodeIP failed, error: %s", err)
+ return agent, err
+ }
+
+ return agent, err
+}
+
+//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
+func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
conf := DefaultConfig()
fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
//conf.ClusterID = clusterID
@@ -377,8 +425,14 @@
return agent, nil
}
-func (a *Agent) JoinByNodeIP(ip string) error {
- n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
+func (a *Agent) JoinByNodeIP(ips []string) error {
+ var nodes []string
+ for _, ip := range ips {
+ node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
+ nodes = append(nodes, node)
+ }
+
+ n, err := a.Agent.Join(nodes, true)
if err != nil || n == 0{
a.Stop()
fmt.Println("Stop node")
diff --git a/config.go b/config.go
index f24c38a..91ea106 100644
--- a/config.go
+++ b/config.go
@@ -28,8 +28,8 @@
)
const (
- DefaultBindPort = 5000//30190
- DefaultRPCPort = 7373//30191
+ DefaultBindPort = 30190
+ DefaultRPCPort = 30191
DefaultClusterPort = 30192
ModeSingle = "single"
ModeCluster = "cluster"
diff --git a/dbself.go b/dbself.go
index f6cb6e5..62f2293 100644
--- a/dbself.go
+++ b/dbself.go
@@ -10,45 +10,48 @@
"sync"
)
-var Dbconn *Conn
-var sy sync.Mutex
-func init() {
- GetConn()
-}
+const (
+ PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
+)
+
+var syncMut sync.Mutex
+var SerfDbConn *Conn
// get Conn of db for do execute.
-func GetConn() error {
- var err error
- path, err := GetCurrentPath()
- if err != nil {
- return errors.New("get current path error")
+func InitDbConn(dbPath string) error {
+
+ if dbPath == "" {
+ dbPath = PersonSqliteDBPath
}
- filepath := fmt.Sprintf("%stest.db", path)
- fmt.Println("self: ========>", filepath)
- db, err := New(filepath, "", false)
+ fmt.Println("self: ========>", dbPath)
+ db, err := New(dbPath, "", false)
if err != nil {
fmt.Println("new db database: ", err)
return err
}
- Dbconn, err = db.Connect()
+ dbConn, err := db.Connect()
if err != nil {
fmt.Println("new db conn error; ", err)
return err
}
+
+ SerfDbConn = dbConn
return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
- path, err := GetCurrentPath()
+ path, err := getCurrentPath()
if err != nil {
- return "", errors.New("get current path error")
+ fmt.Println("getCurrentPath error; ", err)
+ return "", err
}
- filepath := fmt.Sprintf("%stmptest.db", path)
+ filepath := path + "tmp.db"
+ fmt.Println("filepath:", filepath)
db, err := New(filepath, "", false)
if err != nil {
fmt.Println("new db database: ", err)
@@ -62,7 +65,7 @@
}
defer tmpconn.Close()
- err = Dbconn.Backup(tmpconn)
+ err = SerfDbConn.Backup(tmpconn)
if err != nil {
return "", err
}
@@ -70,10 +73,10 @@
}
// do exet when get querystring.
-func DoExecute(executestring []string) ([]*Result, error) {
- sy.Lock()
- defer sy.Unlock()
- allResults, err := Dbconn.Execute(executestring, false, false)
+func ExecuteWriteSql(sqlString []string) ([]*Result, error) {
+ syncMut.Lock()
+ defer syncMut.Unlock()
+ allResults, err := SerfDbConn.Execute(sqlString, false, false)
if err != nil {
fmt.Println("execute error!", err)
return nil, err
@@ -81,8 +84,29 @@
return allResults, nil
}
+// do exet when get querystring.
+func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
+ syncMut.Lock()
+ defer syncMut.Unlock()
+ rows, err := SerfDbConn.Query(sqlString, false, false)
+ if err != nil {
+ fmt.Println("execute error!", err)
+ return nil, err
+ }
+ return rows, nil
+}
+
+func Dumpdb() {
+
+ var b strings.Builder
+ if err := SerfDbConn.Dump(&b); err != nil {
+ fmt.Println("dump file ", err.Error())
+ }
+ fmt.Printf("%T\n", b)
+}
+
// get current path
-func GetCurrentPath() (string, error) {
+func getCurrentPath() (string, error) {
file, err := exec.LookPath(os.Args[0])
if err != nil {
return "", err
@@ -99,13 +123,4 @@
return "", errors.New(`error: Can't find "/" or "\".`)
}
return string(path[0 : i+1]), nil
-}
-
-func Dumpdb() {
-
- var b strings.Builder
- if err := Dbconn.Dump(&b); err != nil {
- fmt.Println("dump file ", err.Error())
- }
- fmt.Printf("%T\n", b)
}
--
Gitblit v1.8.0