| 2019-08-06 | chenshijun | ![]() |
| 2019-08-06 | chenshijun | ![]() |
| 2019-08-06 | chenshijun | ![]() |
| API.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| agent.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| agent_test.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| config.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| dbself.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| searcher.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
API.txt
New file @@ -0,0 +1,19 @@ 1. func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) 初始化:查询数据库,如果数据库有数据,将得到集群id,节点id,密码,其他节点ip,然后调用初始化函数,即可自动通过其他节点加入集群。若是空,则不初始化,等待页面填写参数并启动。注意事项:目前集群id并没有真正意义。 2. func (a *Agent) JoinByNodeIP(ips []string) error 加入集群:先初始化节点, 然后通过节点ip加入集群和密码。注意事项:集群id在初始化的时候就需要填写,但是还未加入集群,无法获取集群id。所以目前集群id未使用。 3. func (a *Agent) Stop() 退出集群:退出集群后,外部需要清空同步库的所有数据。 4. func (a *Agent) GetNodes() (nodes []Node) 获取集群节点列表:通过该接口获取节点列表,然后维护到数据库和页面展示。 5. func (a *Agent)GetDbFromCluster(dbPathWrite string) 获取数据库文件:新节点加入集群后,需要调用该接口去集群中任意一个结点获取一个数据库文件。数据库文件包含本地库和同步库的表结构,但是只有同步库有数据,本地库是空的。 6. func (a *Agent)SyncSql(sqlOp string) 同步数据到集群:所有操作同步库的SQL操作都需要同步到集群,集群其他节点收到后,调用数据库接口写入数据库。 7. 查找集群信息:未加入集群前,查询集群信息。 agent.go
@@ -18,17 +18,27 @@ import ( "context" "encoding/json" "errors" "fmt" "github.com/hashicorp/memberlist" "io/ioutil" "net" "os" //"os" "strings" "time" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/hashicorp/serf/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "log" ) const ( QueryEventGetDB = "GetDatabase" QueryEventUpdateDBData = "UpdateDBData" ) // Agent warps the serf agent @@ -37,6 +47,13 @@ conf *Config readyCh chan struct{} errorCh chan error } type NodeInfo struct { ClusterID string `json:"clusterID"` NodeID string `json:"nodeID"` NodeAddress string `json:"nodeAddress"` IsAlive int `json:"isAlive"` } // Create create serf agent with config @@ -88,6 +105,8 @@ a.errorCh <- err } } go a.BroadcastMemberlist(BroadcastInterval * time.Second) } // HandleEvent Handles serf.EventMemberJoin events, @@ -102,40 +121,69 @@ var tmpstringslice []string tmpstringslice = append(tmpstringslice, string(ev.Payload)) fmt.Println(tmpstringslice) results, err := DoExecute(tmpstringslice) results, err := ExecuteWriteSql(tmpstringslice) for _, result := range results { fmt.Println(result, "results err: ", err) } case *serf.Query: //bak file and send resp filename, err := BakDbFile() if err != nil { fmt.Println("bak db file error!") return } fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) fmt.Println("filebuf: ", len(filebuf)) if err != nil { fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { fmt.Printf("remove file%s\n failed", filename) return } fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { fmt.Printf("err: %s\n", err) if ev.Name == QueryEventGetDB { //bak file and send resp filename, err := BakDbFile() if err != nil { fmt.Println("bak db file error!") return } fmt.Println(filename) filebuf, err := ioutil.ReadFile(filename) fmt.Println("filebuf: ", len(filebuf)) if err != nil { fmt.Printf("file to []bytes error: %s\n", err) return } err = os.Remove(filename) if err != nil { fmt.Printf("remove file%s\n failed", filename) return } fmt.Println("query payload: ", len(ev.Payload)) if query, ok := event.(*serf.Query); ok { if err := query.Respond(filebuf); err != nil { fmt.Printf("err: %s\n", err) return } } } else if ev.Name == QueryEventUpdateDBData { fmt.Println(string(ev.Payload)) var tmpstringslice []string tmpstringslice = append(tmpstringslice, string(ev.Payload)) fmt.Println(tmpstringslice) rows, err := ExecuteQuerySql(tmpstringslice) if err != nil { fmt.Println("err: ", err) return } var rowsReturn []Rows for _, r := range rows { rowsReturn = append(rowsReturn, *r) } bytesReturn, err := json.Marshal(rowsReturn) fmt.Println("results: ", bytesReturn) if query, ok := event.(*serf.Query); ok { if err := query.Respond(bytesReturn); err != nil { fmt.Printf("err: %s\n", err) return } } //var res []*Rows //json.Unmarshal(bytesReturn, &res) } default: @@ -155,6 +203,38 @@ //} //a.DeregisterEventHandler(a) //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() fmt.Println("mb:", mb) // copy local node localNode := *mblist.LocalNode() nodeID := a.conf.NodeName nodeAddress := localNode.Address() clusterID := mb.Tags[tagKeyClusterID] isAlive := int(mb.Status) message, _ := json.Marshal(NodeInfo{ clusterID, nodeID, nodeAddress, isAlive, }) // replace node address localNode.Addr = net.ParseIP(BroadcastIP) //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } } // Ready Returns a channel that will be closed when serf is ready @@ -296,7 +376,7 @@ FilterNodes: strings.Fields(specmembername), } resp, err := a.Query("getDatabase", []byte(""), ¶ms) resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { fmt.Println("err: ", err) } @@ -355,7 +435,7 @@ func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { conf := DefaultConfig() fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) //conf.ClusterID = clusterID conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { conf.EncryptKey = DefaultEncryptKey @@ -389,7 +469,10 @@ func (a *Agent) JoinByNodeIP(ips []string) error { var nodes []string fmt.Println("len(ips):", len(ips)) if len(ips) == 0 { return fmt.Errorf("No Nodes To Join!") } for _, ip := range ips { node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) nodes = append(nodes, node) @@ -397,7 +480,9 @@ n, err := a.Agent.Join(nodes, true) if err != nil || n == 0 { return fmt.Errorf("Error Agent.Join!") a.Stop() fmt.Println("Stop node") return fmt.Errorf("Error Encrypt Key!") } return err agent_test.go
@@ -35,7 +35,6 @@ fmt.Println("LocalMember1:", agent.LocalMember()) agent.Start(context.Background()) //<- agent.readyCh go func() { @@ -85,5 +84,3 @@ t.Errorf("angent shutdown failed, error: %s", err) } } config.go
@@ -35,13 +35,16 @@ ModeCluster = "cluster" retryMaxAttempts = 3 groupExpect = 3 DefaultEncryptKey = "bjbasic@aiotlink" tagKeyClusterID = "syncer-cluster-name" DefaultEncryptKey = "bjbasic@aiotlink" tagKeyClusterID = "syncer-cluster-name" TagKeyClusterPort = "syncer-cluster-port" TagKeyRPCPort = "syncer-rpc-port" MaxQueryRespSize = 50 * 1024 *1024 MaxQuerySize = 1024 *1024 MaxUserEventSize = 1024 BroadcastIP = "255.255.255.255" BroadcastPort = 30193 BroadcastInterval = 5 MaxQueryRespSize = 50 * 1024 * 1024 MaxQuerySize = 50 * 1024 * 1024 MaxUserEventSize = 5 * 1024 ) // DefaultConfig default config @@ -136,4 +139,3 @@ return addr.IP.String(), addr.Port, nil } dbself.go
@@ -10,18 +10,17 @@ "sync" ) const ( PersonSqliteDBPath = "/opt/workspace/DataBases/sync.db" PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db" ) var syncMut sync.Mutex var syncMut sync.Mutex var SerfDbConn *Conn // get Conn of db for do execute. func InitDbConn(dbPath string) error { if dbPath == "" { if dbPath == "" { dbPath = PersonSqliteDBPath } @@ -73,7 +72,7 @@ } // do exet when get querystring. func DoExecute(sqlString []string) ([]*Result, error) { func ExecuteWriteSql(sqlString []string) ([]*Result, error) { syncMut.Lock() defer syncMut.Unlock() allResults, err := SerfDbConn.Execute(sqlString, false, false) @@ -84,6 +83,18 @@ return allResults, nil } // do exet when get querystring. func ExecuteQuerySql(sqlString []string) ([]*Rows, error) { syncMut.Lock() defer syncMut.Unlock() rows, err := SerfDbConn.Query(sqlString, false, false) if err != nil { fmt.Println("execute error!", err) return nil, err } return rows, nil } func Dumpdb() { var b strings.Builder searcher.go
New file @@ -0,0 +1,110 @@ package syncdb import ( "encoding/json" "fmt" "time" "github.com/hashicorp/memberlist" ) var ( members [][]byte delay time.Duration ) // delegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist. type delegate struct{} // NodeMeta is the delegate method, must implement. func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } // LocalState is the delegate method, must implement. func (d *delegate) LocalState(join bool) []byte { return []byte{} } // MergeRemoteState is the delegate method, must implement. func (d *delegate) MergeRemoteState(buf []byte, join bool) { } // GetBroadcasts is the delegate method, must implement. func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { return [][]byte{} } // eventDelegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist. type eventDelegate struct{} // NotifyJoin is the eventDelegate method, must implement. func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) { } // NotifyLeave is the eventDelegate method, must implement. func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) { } // NotifyUpdate is the eventDelegate method, must implement. func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) { } // NotifyMsg is called when a user-data message is received func (d *delegate) NotifyMsg(b []byte) { // logMsg(b) members = append(members, b) } func logMsg(b []byte) { type nodeInfo struct { NodeName string `json:"name"` Address string `json:"address"` } node := nodeInfo{} if err := json.Unmarshal(b, &node); err != nil { fmt.Println("Umarshal failed:", err) return } fmt.Println(node) } func CreateSearchNode(key string) (*memberlist.Memberlist, error) { conf := memberlist.DefaultLocalConfig() conf.Events = &eventDelegate{} conf.Delegate = &delegate{} conf.BindAddr = BroadcastIP conf.BindPort = BroadcastPort conf.Name = "Cluster-Searcher" keyring, err := memberlist.NewKeyring(nil, []byte(key)) if err != nil { fmt.Printf("Failed to restore keyring: %s", err) return nil, err } conf.Keyring = keyring return memberlist.Create(conf) } func CreateSearchNodeWhitClose(key string, delay time.Duration) [][]byte { m, err := CreateSearchNode(key) if err == nil { // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) time.Sleep(delay) m.Shutdown() } return members } func CloseSearchNode(m *memberlist.Memberlist) error { return m.Shutdown() } func GetSearchNodes() [][]byte { return members }