From 86341ab9e3d7eda30fbb4df2a77b9419a89f97c1 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期五, 02 八月 2019 17:14:27 +0800
Subject: [PATCH] 添加基础业务处理接口
---
agent.go | 241 +++++++++++++++++++++++++++--
db.go | 2
config.go | 44 ++++-
dbself.go | 111 +++++++++++++
agent_test.go | 29 ++
db_test.go | 2
6 files changed, 392 insertions(+), 37 deletions(-)
diff --git a/agent.go b/agent.go
index f1b2e28..9cea45c 100644
--- a/agent.go
+++ b/agent.go
@@ -14,16 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package serf
+package syncdb
import (
"context"
"errors"
+ "fmt"
+ "github.com/hashicorp/memberlist"
+ "io/ioutil"
+ "os"
+ "strconv"
+
+ //"os"
+ "strings"
"time"
- "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
+ //"github.com/apache/servicecomb-service-center/pkg/log"
+ "log"
)
// Agent warps the serf agent
@@ -43,10 +52,21 @@
}
// create serf agent with serf config
+ fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
if err != nil {
return nil, err
}
+ // Create the keyring
+ keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
+ if err != nil {
+ fmt.Printf("Failed to restore keyring: %s", err)
+ return nil, err
+ }
+ serfConf.MemberlistConfig.Keyring = keyring
+ fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
+ len(conf.EncryptKey), conf.EncryptKey)
+
return &Agent{
Agent: serfAgent,
conf: conf,
@@ -59,7 +79,7 @@
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
if err != nil {
- log.Errorf(err, "start serf agent failed")
+ log.Println(err, "start serf agent failed")
a.errorCh <- err
return
}
@@ -67,7 +87,7 @@
err = a.retryJoin(ctx)
if err != nil {
- log.Errorf(err, "start serf agent failed")
+ log.Println(err, "start serf agent failed")
if err != ctx.Err() && a.errorCh != nil {
a.errorCh <- err
}
@@ -79,17 +99,66 @@
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
- if event.EventType() != serf.EventMemberJoin {
- return
- }
- if a.conf.Mode == ModeCluster {
- if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
+ switch ev := event.(type) {
+ case serf.UserEvent:
+ fmt.Println(string(ev.Payload))
+ var tmpstringslice []string
+ tmpstringslice = append(tmpstringslice, string(ev.Payload))
+ fmt.Println(tmpstringslice)
+ results, err := DoExecute(tmpstringslice)
+
+ for _, result := range results {
+ fmt.Println(result, "results err: ", err)
+ }
+
+ case *serf.Query:
+ //bak file and send resp
+ filename, err := BakDbFile()
+ if err != nil {
+ fmt.Println("bak db file error!")
return
}
+ fmt.Println(filename)
+
+ filebuf, err := ioutil.ReadFile(filename)
+ fmt.Println("filebuf: ", len(filebuf))
+ if err != nil {
+ fmt.Printf("file to []bytes error: %s\n", err)
+ return
+ }
+
+ err = os.Remove(filename)
+ if err != nil {
+ fmt.Printf("remove file%s\n failed", filename)
+ return
+ }
+
+ fmt.Println("query payload: ", len(ev.Payload))
+ if query, ok := event.(*serf.Query); ok {
+ if err := query.Respond(filebuf); err != nil {
+ fmt.Printf("err: %s\n", err)
+ return
+ }
+ }
+
+ default:
+ fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
}
- a.DeregisterEventHandler(a)
- close(a.readyCh)
+
+ //if event.EventType() != serf.EventMemberJoin {
+ // fmt.Printf("event.EventType() != serf.EventMemberJoin")
+ // return
+ //}
+ //
+ //if a.conf.Mode == ModeCluster {
+ // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
+ // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
+ // return
+ // }
+ //}
+ //a.DeregisterEventHandler(a)
+ //close(a.readyCh)
}
// Ready Returns a channel that will be closed when serf is ready
@@ -122,13 +191,13 @@
return nil
}
-// GroupMembers returns a point-in-time snapshot of the members of by groupName
-func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
+// GroupMembers returns a point-in-time snapshot of the members of by clusterID
+func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) {
serfAgent := a.Agent.Serf()
if serfAgent != nil {
for _, member := range serfAgent.Members() {
- log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
- if member.Tags[tagKeyClusterName] == groupName {
+ log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
+ if member.Tags[tagKeyClusterID] == clusterID {
members = append(members, member)
}
}
@@ -172,7 +241,7 @@
func (a *Agent) retryJoin(ctx context.Context) (err error) {
if len(a.conf.RetryJoin) == 0 {
- log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
+ log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
return nil
}
@@ -180,13 +249,13 @@
attempt := 0
ticker := time.NewTicker(a.conf.RetryInterval)
for {
- log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
+ log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
var n int
// Try to join the specified serf nodes
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
if err == nil {
- log.Infof("serf: Join completed. Synced with %d initial agents", n)
+ log.Printf("serf: Join completed. Synced with %d initial agents", n)
break
}
attempt++
@@ -196,7 +265,7 @@
// else agent will try to join other nodes until successful always
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
err = errors.New("serf: maximum retry join attempts made, exiting")
- log.Errorf(err, err.Error())
+ log.Println(err, err.Error())
break
}
select {
@@ -211,3 +280,137 @@
ticker.Stop()
return
}
+
+
+
+//GetDbFromCluster get the newest database after join cluster
+//dbPathWrite the path where to write after got a database,
+func (a *Agent)GetDbFromCluster(dbPathWrite string) {
+ //members: get name of first member
+ mbs := a.GroupMembers(a.conf.ClusterID)
+ var specmembername string
+ for _, m := range mbs {
+ if m.Addr.String() != a.conf.BindAddr {
+ specmembername = m.Name
+ break
+ }
+ }
+ fmt.Println(specmembername)
+
+ //query: get db file.
+ params := serf.QueryParam{
+ FilterNodes: strings.Fields(specmembername),
+ }
+
+ resp, err := a.Query("getDatabase", []byte(""), ¶ms)
+ if err == nil || !strings.Contains(err.Error(), "cannot contain") {
+ fmt.Println("err: ", err)
+ }
+
+ go func() {
+ respCh := resp.ResponseCh()
+ for {
+ select {
+ case r := <-respCh:
+ fmt.Println("x length is: ", len(r.Payload))
+
+ // // byte to file.
+ Dbconn.Close()
+ Dbconn = nil
+ err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
+ if err != nil {
+ fmt.Println("query byte to file error!", err)
+ }
+ err := GetConn()
+ if err != nil {
+ fmt.Println("create db conn of test.db error: ", err)
+ }
+ return
+ }
+ }
+ }()
+}
+
+//SyncSql boardcast sql to cluster
+func (a *Agent)SyncSql(sqlOp string) {
+ // event : use to send command to operate db.
+ err := a.UserEvent("SyncSql", []byte(sqlOp), false)
+ if err == nil || !strings.Contains(err.Error(), "cannot contain") {
+ fmt.Println("err: ", err)
+ }
+}
+
+//Init serf Init
+//web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
+func Init(clusterID string, password string, nodeID string) (*Agent, error) {
+ conf := DefaultConfig()
+ fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
+ //conf.ClusterID = clusterID
+ conf.NodeName = nodeID
+ if password == "" {
+ conf.EncryptKey = DefaultEncryptKey
+ }else{
+ if len(password) >= 16 {
+ password = password[:16]
+ }else{
+ password = fmt.Sprintf("%016s", password)[:16]
+ //return nil, fmt.Errorf("error password")
+ }
+ conf.EncryptKey = password
+ }
+ agent, err := Create(conf)
+ if err != nil {
+ fmt.Printf("create agent failed, error: %s", err)
+ return agent, err
+ }
+
+ agent.Start(context.Background())
+ //<- agent.readyCh
+ go func() {
+ agent.ShutdownCh()
+ }()
+ time.Sleep(time.Second)
+ fmt.Println("Stats:",agent.Agent.Serf().Stats())
+ fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
+ fmt.Printf("create agent sucess!!")
+
+ return agent, nil
+}
+
+func (a *Agent) JoinByNodeIP(ip string) error {
+ n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
+ if err != nil || n == 0{
+ a.Stop()
+ fmt.Println("Stop node")
+ return fmt.Errorf("Error Encrypt Key!")
+ }
+
+ return err
+}
+
+type Node struct {
+ clusterID string
+ NodeID string
+ IP string
+ isAlive int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
+}
+
+func (a *Agent) GetNodes() (nodes []Node) {
+ var node Node
+ fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
+ mbs := a.GroupMembers(a.conf.ClusterID)
+ for _, mb := range mbs {
+ node.NodeID = mb.Name
+ node.IP = mb.Addr.String()
+ node.isAlive = int(mb.Status)
+ node.clusterID = mb.Tags[tagKeyClusterID]
+
+ nodes = append(nodes, node)
+ }
+
+ return nodes
+}
+
+
+
+
diff --git a/agent_test.go b/agent_test.go
index af68f8f..2b68982 100644
--- a/agent_test.go
+++ b/agent_test.go
@@ -14,28 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package serf
+package syncdb
import (
"context"
+ "fmt"
+ "github.com/hashicorp/serf/serf"
"testing"
"time"
-
- "github.com/hashicorp/serf/serf"
)
func TestAgent(t *testing.T) {
conf := DefaultConfig()
+ conf.ClusterID = "testCluster"
+ conf.NodeName = "testnode"
agent, err := Create(conf)
if err != nil {
t.Errorf("create agent failed, error: %s", err)
}
+
+ fmt.Println("LocalMember1:", agent.LocalMember())
+
+
agent.Start(context.Background())
- <- agent.readyCh
+ //<- agent.readyCh
go func() {
agent.ShutdownCh()
}()
time.Sleep(time.Second)
+
+ fmt.Println("LocalMember2:", agent.LocalMember())
+
+ fmt.Println("ClusterID:", agent.conf.ClusterID)
+ fmt.Println("NodeName:", agent.conf.NodeName)
err = agent.UserEvent("test", []byte("test"), true)
if err != nil {
@@ -46,13 +57,15 @@
if err != nil {
t.Errorf("query for other node failed, error: %s", err)
}
- agent.LocalMember()
+ fmt.Println("LocalMember:", agent.LocalMember())
+ //agent.LocalMember()
- agent.Member("testnode")
+ mb := agent.Member("testnode")
+ fmt.Println("mb:", mb)
agent.SerfConfig()
- _, err = agent.Join([]string{"127.0.0.1:9999"}, true)
+ _, err = agent.Join([]string{"192.168.1.123:5000"}, true)
if err != nil {
t.Logf("join to other node failed, error: %s", err)
}
@@ -72,3 +85,5 @@
t.Errorf("angent shutdown failed, error: %s", err)
}
}
+
+
diff --git a/config.go b/config.go
index 8a25908..f24c38a 100644
--- a/config.go
+++ b/config.go
@@ -14,27 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package serf
+package syncdb
import (
"fmt"
+ "net"
"strconv"
- "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
+ //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
)
const (
- DefaultBindPort = 30190
- DefaultRPCPort = 30191
+ DefaultBindPort = 5000//30190
+ DefaultRPCPort = 7373//30191
DefaultClusterPort = 30192
ModeSingle = "single"
ModeCluster = "cluster"
retryMaxAttempts = 3
groupExpect = 3
- tagKeyClusterName = "syncer-cluster-name"
+ DefaultEncryptKey = "bjbasic@aiotlink"
+ tagKeyClusterID = "syncer-cluster-name"
TagKeyClusterPort = "syncer-cluster-port"
TagKeyRPCPort = "syncer-rpc-port"
)
@@ -42,6 +44,9 @@
// DefaultConfig default config
func DefaultConfig() *Config {
agentConf := agent.DefaultConfig()
+ agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
+ agentConf.QuerySizeLimit = 50 * 1024 *1024
+ agentConf.UserEventSizeLimit = 1024
agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
return &Config{
@@ -58,7 +63,7 @@
Mode string `json:"mode"`
// name to group members into cluster
- ClusterName string `json:"cluster_name"`
+ ClusterID string `json:"cluster_name"`
// port to communicate between cluster members
ClusterPort int `yaml:"cluster_port"`
@@ -77,7 +82,7 @@
func (c *Config) convertToSerf() (*serf.Config, error) {
serfConf := serf.DefaultConfig()
- bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
+ bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
if err != nil {
return nil, fmt.Errorf("invalid bind address: %s", err)
}
@@ -96,10 +101,11 @@
serfConf.MemberlistConfig.BindAddr = bindIP
serfConf.MemberlistConfig.BindPort = bindPort
serfConf.NodeName = c.NodeName
+
serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
- if c.ClusterName != "" {
- serfConf.Tags[tagKeyClusterName] = c.ClusterName
+ if c.ClusterID != "" {
+ serfConf.Tags[tagKeyClusterID] = c.ClusterID
serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
}
@@ -108,3 +114,23 @@
}
return serfConf, nil
}
+
+// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
+func SplitHostPort(address string, defaultPort int) (string, int, error) {
+ _, _, err := net.SplitHostPort(address)
+ if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
+ address = fmt.Sprintf("%s:%d", address, defaultPort)
+ _, _, err = net.SplitHostPort(address)
+ }
+ if err != nil {
+ return "", 0, err
+ }
+
+ addr, err := net.ResolveTCPAddr("tcp", address)
+ if err != nil {
+ return "", 0, err
+ }
+
+ return addr.IP.String(), addr.Port, nil
+}
+
diff --git a/db.go b/db.go
index 5585515..cce6d76 100644
--- a/db.go
+++ b/db.go
@@ -1,6 +1,6 @@
// Package db exposes a lightweight abstraction over the SQLite code.
// It performs some basic mapping of lower-level types to rqlite types.
-package db
+package syncdb
import (
"database/sql/driver"
diff --git a/db_test.go b/db_test.go
index 01ace9a..42a7da3 100644
--- a/db_test.go
+++ b/db_test.go
@@ -1,4 +1,4 @@
-package db
+package syncdb
import (
"encoding/json"
diff --git a/dbself.go b/dbself.go
new file mode 100644
index 0000000..f6cb6e5
--- /dev/null
+++ b/dbself.go
@@ -0,0 +1,111 @@
+package syncdb
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+)
+
+var Dbconn *Conn
+var sy sync.Mutex
+
+func init() {
+ GetConn()
+}
+
+// get Conn of db for do execute.
+func GetConn() error {
+ var err error
+ path, err := GetCurrentPath()
+ if err != nil {
+ return errors.New("get current path error")
+ }
+
+ filepath := fmt.Sprintf("%stest.db", path)
+ fmt.Println("self: ========>", filepath)
+ db, err := New(filepath, "", false)
+ if err != nil {
+ fmt.Println("new db database: ", err)
+ return err
+ }
+ Dbconn, err = db.Connect()
+ if err != nil {
+ fmt.Println("new db conn error; ", err)
+ return err
+ }
+ return nil
+}
+
+//bak dbdata.
+func BakDbFile() (string, error) {
+
+ path, err := GetCurrentPath()
+ if err != nil {
+ return "", errors.New("get current path error")
+ }
+
+ filepath := fmt.Sprintf("%stmptest.db", path)
+ db, err := New(filepath, "", false)
+ if err != nil {
+ fmt.Println("new db database: ", err)
+ return "", err
+ }
+
+ tmpconn, err := db.Connect()
+ if err != nil {
+ fmt.Println("new db conn error; ", err)
+ return "", err
+ }
+ defer tmpconn.Close()
+
+ err = Dbconn.Backup(tmpconn)
+ if err != nil {
+ return "", err
+ }
+ return filepath, nil
+}
+
+// do exet when get querystring.
+func DoExecute(executestring []string) ([]*Result, error) {
+ sy.Lock()
+ defer sy.Unlock()
+ allResults, err := Dbconn.Execute(executestring, false, false)
+ if err != nil {
+ fmt.Println("execute error!", err)
+ return nil, err
+ }
+ return allResults, nil
+}
+
+// get current path
+func GetCurrentPath() (string, error) {
+ file, err := exec.LookPath(os.Args[0])
+ if err != nil {
+ return "", err
+ }
+ path, err := filepath.Abs(file)
+ if err != nil {
+ return "", err
+ }
+ i := strings.LastIndex(path, "/")
+ if i < 0 {
+ i = strings.LastIndex(path, "\\")
+ }
+ if i < 0 {
+ return "", errors.New(`error: Can't find "/" or "\".`)
+ }
+ return string(path[0 : i+1]), nil
+}
+
+func Dumpdb() {
+
+ var b strings.Builder
+ if err := Dbconn.Dump(&b); err != nil {
+ fmt.Println("dump file ", err.Error())
+ }
+ fmt.Printf("%T\n", b)
+}
--
Gitblit v1.8.0