From 7d1bbdc8a9a4854edb6d45877fb0d2b9ce12478b Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期一, 05 八月 2019 09:46:29 +0800
Subject: [PATCH] 重构之前的db操作的代码
---
agent.go | 77 ++++++++++++++-----------
config.go | 13 ++-
dbself.go | 67 +++++++++++----------
3 files changed, 87 insertions(+), 70 deletions(-)
diff --git a/agent.go b/agent.go
index 9cea45c..1b4d4b5 100644
--- a/agent.go
+++ b/agent.go
@@ -23,15 +23,11 @@
"github.com/hashicorp/memberlist"
"io/ioutil"
"os"
- "strconv"
-
- //"os"
"strings"
"time"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
- //"github.com/apache/servicecomb-service-center/pkg/log"
"log"
)
@@ -52,7 +48,7 @@
}
// create serf agent with serf config
- fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
+ fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
if err != nil {
return nil, err
@@ -281,11 +277,9 @@
return
}
-
-
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
-func (a *Agent)GetDbFromCluster(dbPathWrite string) {
+func (a *Agent) GetDbFromCluster(dbPathWrite string) {
//members: get name of first member
mbs := a.GroupMembers(a.conf.ClusterID)
var specmembername string
@@ -315,13 +309,13 @@
fmt.Println("x length is: ", len(r.Payload))
// // byte to file.
- Dbconn.Close()
- Dbconn = nil
+ SerfDbConn.Close()
+ SerfDbConn = nil
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
if err != nil {
fmt.Println("query byte to file error!", err)
}
- err := GetConn()
+ err := InitDbConn("")
if err != nil {
fmt.Println("create db conn of test.db error: ", err)
}
@@ -332,7 +326,7 @@
}
//SyncSql boardcast sql to cluster
-func (a *Agent)SyncSql(sqlOp string) {
+func (a *Agent) SyncSql(sqlOp string) {
// event : use to send command to operate db.
err := a.UserEvent("SyncSql", []byte(sqlOp), false)
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
@@ -341,18 +335,34 @@
}
//Init serf Init
-//web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func Init(clusterID string, password string, nodeID string) (*Agent, error) {
+func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID)
+ if err != nil {
+ fmt.Printf("InitNode failed, error: %s", err)
+ return agent, err
+ }
+
+ err = agent.JoinByNodeIP(ips)
+ if err != nil {
+ fmt.Printf("JoinByNodeIP failed, error: %s", err)
+ return agent, err
+ }
+
+ return agent, err
+}
+
+//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
+func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
conf := DefaultConfig()
fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
//conf.ClusterID = clusterID
conf.NodeName = nodeID
if password == "" {
conf.EncryptKey = DefaultEncryptKey
- }else{
+ } else {
if len(password) >= 16 {
password = password[:16]
- }else{
+ } else {
password = fmt.Sprintf("%016s", password)[:16]
//return nil, fmt.Errorf("error password")
}
@@ -370,19 +380,24 @@
agent.ShutdownCh()
}()
time.Sleep(time.Second)
- fmt.Println("Stats:",agent.Agent.Serf().Stats())
- fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
- fmt.Printf("create agent sucess!!")
+ fmt.Println("Stats:", agent.Agent.Serf().Stats())
+ fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
+ fmt.Println("create agent sucess!!")
return agent, nil
}
-func (a *Agent) JoinByNodeIP(ip string) error {
- n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
- if err != nil || n == 0{
- a.Stop()
- fmt.Println("Stop node")
- return fmt.Errorf("Error Encrypt Key!")
+func (a *Agent) JoinByNodeIP(ips []string) error {
+ var nodes []string
+ fmt.Println("len(ips):", len(ips))
+ for _, ip := range ips {
+ node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
+ nodes = append(nodes, node)
+ }
+
+ n, err := a.Agent.Join(nodes, true)
+ if err != nil || n == 0 {
+ return fmt.Errorf("Error Agent.Join!")
}
return err
@@ -390,14 +405,14 @@
type Node struct {
clusterID string
- NodeID string
- IP string
- isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
+ NodeID string
+ IP string
+ isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
func (a *Agent) GetNodes() (nodes []Node) {
var node Node
- fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
+ fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
mbs := a.GroupMembers(a.conf.ClusterID)
for _, mb := range mbs {
node.NodeID = mb.Name
@@ -410,7 +425,3 @@
return nodes
}
-
-
-
-
diff --git a/config.go b/config.go
index f24c38a..950e3a1 100644
--- a/config.go
+++ b/config.go
@@ -28,8 +28,8 @@
)
const (
- DefaultBindPort = 5000//30190
- DefaultRPCPort = 7373//30191
+ DefaultBindPort = 30190
+ DefaultRPCPort = 30191
DefaultClusterPort = 30192
ModeSingle = "single"
ModeCluster = "cluster"
@@ -39,14 +39,17 @@
tagKeyClusterID = "syncer-cluster-name"
TagKeyClusterPort = "syncer-cluster-port"
TagKeyRPCPort = "syncer-rpc-port"
+ MaxQueryRespSize = 50 * 1024 *1024
+ MaxQuerySize = 1024 *1024
+ MaxUserEventSize = 1024
)
// DefaultConfig default config
func DefaultConfig() *Config {
agentConf := agent.DefaultConfig()
- agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
- agentConf.QuerySizeLimit = 50 * 1024 *1024
- agentConf.UserEventSizeLimit = 1024
+ agentConf.QueryResponseSizeLimit = MaxQueryRespSize
+ agentConf.QuerySizeLimit = MaxQuerySize
+ agentConf.UserEventSizeLimit = MaxUserEventSize
agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
return &Config{
diff --git a/dbself.go b/dbself.go
index f6cb6e5..a264aa9 100644
--- a/dbself.go
+++ b/dbself.go
@@ -10,45 +10,48 @@
"sync"
)
-var Dbconn *Conn
-var sy sync.Mutex
-func init() {
- GetConn()
-}
+const (
+ PersonSqliteDBPath = "/opt/workspace/DataBases/sync.db"
+)
+
+var syncMut sync.Mutex
+var SerfDbConn *Conn
// get Conn of db for do execute.
-func GetConn() error {
- var err error
- path, err := GetCurrentPath()
- if err != nil {
- return errors.New("get current path error")
+func InitDbConn(dbPath string) error {
+
+ if dbPath == "" {
+ dbPath = PersonSqliteDBPath
}
- filepath := fmt.Sprintf("%stest.db", path)
- fmt.Println("self: ========>", filepath)
- db, err := New(filepath, "", false)
+ fmt.Println("self: ========>", dbPath)
+ db, err := New(dbPath, "", false)
if err != nil {
fmt.Println("new db database: ", err)
return err
}
- Dbconn, err = db.Connect()
+ dbConn, err := db.Connect()
if err != nil {
fmt.Println("new db conn error; ", err)
return err
}
+
+ SerfDbConn = dbConn
return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
- path, err := GetCurrentPath()
+ path, err := getCurrentPath()
if err != nil {
- return "", errors.New("get current path error")
+ fmt.Println("getCurrentPath error; ", err)
+ return "", err
}
- filepath := fmt.Sprintf("%stmptest.db", path)
+ filepath := path + "tmp.db"
+ fmt.Println("filepath:", filepath)
db, err := New(filepath, "", false)
if err != nil {
fmt.Println("new db database: ", err)
@@ -62,7 +65,7 @@
}
defer tmpconn.Close()
- err = Dbconn.Backup(tmpconn)
+ err = SerfDbConn.Backup(tmpconn)
if err != nil {
return "", err
}
@@ -70,10 +73,10 @@
}
// do exet when get querystring.
-func DoExecute(executestring []string) ([]*Result, error) {
- sy.Lock()
- defer sy.Unlock()
- allResults, err := Dbconn.Execute(executestring, false, false)
+func DoExecute(sqlString []string) ([]*Result, error) {
+ syncMut.Lock()
+ defer syncMut.Unlock()
+ allResults, err := SerfDbConn.Execute(sqlString, false, false)
if err != nil {
fmt.Println("execute error!", err)
return nil, err
@@ -81,8 +84,17 @@
return allResults, nil
}
+func Dumpdb() {
+
+ var b strings.Builder
+ if err := SerfDbConn.Dump(&b); err != nil {
+ fmt.Println("dump file ", err.Error())
+ }
+ fmt.Printf("%T\n", b)
+}
+
// get current path
-func GetCurrentPath() (string, error) {
+func getCurrentPath() (string, error) {
file, err := exec.LookPath(os.Args[0])
if err != nil {
return "", err
@@ -99,13 +111,4 @@
return "", errors.New(`error: Can't find "/" or "\".`)
}
return string(path[0 : i+1]), nil
-}
-
-func Dumpdb() {
-
- var b strings.Builder
- if err := Dbconn.Dump(&b); err != nil {
- fmt.Println("dump file ", err.Error())
- }
- fmt.Printf("%T\n", b)
}
--
Gitblit v1.8.0