/*
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* contributor license agreements. See the NOTICE file distributed with
|
* this work for additional information regarding copyright ownership.
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* (the "License"); you may not use this file except in compliance with
|
* the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package syncdb
|
|
import (
|
"context"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"github.com/hashicorp/memberlist"
|
"io/ioutil"
|
"net"
|
"os"
|
"strconv"
|
"sync"
|
|
//"os"
|
"strings"
|
"time"
|
|
"github.com/hashicorp/serf/cmd/serf/command/agent"
|
"github.com/hashicorp/serf/serf"
|
//"github.com/apache/servicecomb-service-center/pkg/log"
|
"log"
|
)
|
|
const (
|
QueryEventGetDB = "GetDatabase"
|
QueryEventUpdateDBData = "UpdateDBData"
|
UserEventSyncSql = "SyncSql"
|
UserEventSyncDbTablePersonCache = "SyncCache"
|
)
|
|
// Agent warps the serf agent
|
type Agent struct {
|
*agent.Agent
|
conf *Config
|
readyCh chan struct{}
|
errorCh chan error
|
}
|
|
type NodeInfo struct {
|
ClusterID string `json:"clusterID"`
|
NodeID string `json:"nodeID"`
|
NodeAddress string `json:"nodeAddress"`
|
IsAlive int `json:"isAlive"`
|
}
|
|
// Create create serf agent with config
|
func Create(conf *Config, snapshotPath string) (*Agent, error) {
|
// config cover to serf config
|
serfConf, err := conf.convertToSerf(snapshotPath)
|
if err != nil {
|
return nil, err
|
}
|
|
// create serf agent with serf config
|
//fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
|
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
|
if err != nil {
|
return nil, err
|
}
|
// Create the keyring
|
keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
|
if err != nil {
|
//fmt.Printf("Failed to restore keyring: %s", err)
|
return nil, err
|
}
|
serfConf.MemberlistConfig.Keyring = keyring
|
//fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
|
// len(conf.EncryptKey), conf.EncryptKey)
|
|
return &Agent{
|
Agent: serfAgent,
|
conf: conf,
|
readyCh: make(chan struct{}),
|
errorCh: make(chan error),
|
}, nil
|
}
|
|
// Start agent
|
func (a *Agent) Start(ctx context.Context) {
|
log.Println("aaaaaaaaa")
|
//log.Println(string(debug.Stack()))
|
//debug.PrintStack()
|
err := a.Agent.Start()
|
if err != nil {
|
log.Println(err, "start serf agent failed")
|
a.errorCh <- err
|
return
|
}
|
a.RegisterEventHandler(a)
|
|
err = a.retryJoin(ctx)
|
if err != nil {
|
log.Println(err, "start serf agent failed")
|
if err != ctx.Err() && a.errorCh != nil {
|
a.errorCh <- err
|
}
|
}
|
|
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
|
}
|
|
// define call back interface
|
type ReceiveSqlInterface interface {
|
Forward(content string)
|
}
|
|
// callback instance
|
var receiveSqlInterface ReceiveSqlInterface
|
|
// save the callback instance
|
func RegisterReceiveSqlInterface(c ReceiveSqlInterface) {
|
receiveSqlInterface = c
|
}
|
|
type DbHandler interface {
|
Execute(sql string) bool
|
}
|
|
type DbDumpHandler interface {
|
Dump(tables string) string
|
}
|
|
//数据库execute句柄
|
var DbHandle DbHandler
|
//数据库备份句柄,只在syncdb中用到
|
var dbDumpH DbDumpHandler
|
|
func RegisterDbHandler(h DbHandler) {
|
DbHandle = h
|
}
|
|
func RegisterDbDumpHandler(h DbDumpHandler) {
|
dbDumpH = h
|
}
|
|
// HandleEvent Handles serf.EventMemberJoin events,
|
// which will wait for members to join until the number of group members is equal to "groupExpect"
|
// when the startup mode is "ModeCluster",
|
// used for logical grouping of serf nodes
|
func (a *Agent) HandleEvent(event serf.Event) {
|
|
switch ev := event.(type) {
|
case serf.UserEvent:
|
if ev.Name == UserEventSyncSql {
|
var sqlUe SqlUserEvent
|
//fmt.Println(string(ev.Payload))
|
err := json.Unmarshal(ev.Payload, &sqlUe)
|
if err != nil {
|
//fmt.Println("sqlUe unmarshal err:", err)
|
return
|
}
|
if sqlUe.Owner != a.conf.NodeName {
|
if receiveSqlInterface != nil && sqlUe.Sql !=nil && len(sqlUe.Sql) > 0 {
|
receiveSqlInterface.Forward(sqlUe.Sql[0])
|
}
|
}
|
}
|
|
case *serf.Query:
|
|
if ev.Name == QueryEventGetDB {
|
//bak file and send resp
|
filename, err := BakDbFile()
|
if err != nil {
|
//fmt.Println("bak db file error!")
|
return
|
}
|
//fmt.Println(filename)
|
|
filebuf, err := ioutil.ReadFile(filename)
|
//fmt.Println("filebuf: ", len(filebuf))
|
if err != nil {
|
//fmt.Printf("file to []bytes error: %s\n", err)
|
return
|
}
|
|
err = os.Remove(filename)
|
if err != nil {
|
//fmt.Printf("remove file%s\n failed", filename)
|
return
|
}
|
|
//fmt.Println("query payload: ", len(ev.Payload))
|
if query, ok := event.(*serf.Query); ok {
|
if err := query.Respond(filebuf); err != nil {
|
//fmt.Printf("err: %s\n", err)
|
return
|
}
|
}
|
} else if ev.Name == QueryEventUpdateDBData {
|
//fmt.Println(string(ev.Payload))
|
var tableNames []string
|
err := json.Unmarshal(ev.Payload, &tableNames)
|
fmt.Println("androidSync tableNames:", tableNames, "err:", err)
|
|
dumpData := dbDumpH.Dump(strings.Join(tableNames, ","))
|
|
bytesReturn := []byte(dumpData)
|
fmt.Println("androidSync len(bytesReturn): ", len(bytesReturn))
|
if query, ok := event.(*serf.Query); ok {
|
if err := query.Respond(bytesReturn); err != nil {
|
fmt.Println("androidSync query.Respond err:", err)
|
return
|
} else {
|
fmt.Println("androidSync query.Respond success")
|
}
|
}
|
}
|
case serf.MemberEvent:
|
if event.EventType() == serf.EventMemberLeave {
|
if ev.Members != nil && len(ev.Members) == 1 {
|
leaveMember := ev.Members[0]
|
leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'"
|
//fmt.Println("leaveSql:", leaveSql)
|
if !DbHandle.Execute(leaveSql) {
|
fmt.Println("DbHandle.Execute ret false")
|
}
|
|
//fmt.Println("EventMemberLeave,current Members:", ev.Members)
|
}
|
return
|
}
|
|
default:
|
//fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
|
}
|
}
|
|
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
|
//serf := a.serf
|
serf := a.Agent.Serf()
|
mb := serf.LocalMember()
|
mblist := serf.Memberlist()
|
//fmt.Println("mb:", mb)
|
|
// copy local node
|
localNode := *mblist.LocalNode()
|
nodeID := a.conf.NodeName
|
nodeAddress := localNode.Address()
|
clusterID := mb.Tags[tagKeyClusterID]
|
isAlive := int(mb.Status)
|
|
message, _ := json.Marshal(NodeInfo{
|
clusterID,
|
nodeID,
|
nodeAddress,
|
isAlive,
|
})
|
|
// replace node address
|
localNode.Addr = net.ParseIP(BroadcastIP)
|
//localNode.Addr = net.IPv4(255,255,255,255)
|
localNode.Port = BroadcastPort
|
for {
|
// //fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
|
mblist.SendBestEffort(&localNode, []byte(message))
|
time.Sleep(delay)
|
}
|
}
|
|
// Ready Returns a channel that will be closed when serf is ready
|
func (a *Agent) Ready() <-chan struct{} {
|
return a.readyCh
|
}
|
|
// Error Returns a channel that will be transmit a serf error
|
func (a *Agent) Error() <-chan error {
|
return a.errorCh
|
}
|
|
// Stop serf agent
|
func (a *Agent) Stop() {
|
if a.errorCh != nil {
|
a.Leave()
|
a.Shutdown()
|
close(a.errorCh)
|
a.errorCh = nil
|
}
|
}
|
|
// LocalMember returns the Member information for the local node
|
func (a *Agent) LocalMember() *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
member := serfAgent.LocalMember()
|
return &member
|
}
|
return nil
|
}
|
|
// GroupMembers returns a point-in-time snapshot of the members of by clusterID
|
func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
for _, member := range serfAgent.Members() {
|
log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
|
if member.Tags[tagKeyClusterID] == clusterID {
|
members = append(members, member)
|
}
|
}
|
}
|
return
|
}
|
|
// Member get member information with node
|
func (a *Agent) Member(node string) *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
ms := serfAgent.Members()
|
for _, m := range ms {
|
if m.Name == node {
|
return &m
|
}
|
}
|
}
|
return nil
|
}
|
|
// SerfConfig get serf config
|
func (a *Agent) SerfConfig() *serf.Config {
|
return a.Agent.SerfConfig()
|
}
|
|
// Join serf clusters through one or more members
|
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
|
return a.Agent.Join(addrs, replay)
|
}
|
|
// UserEvent sends a UserEvent on Serf
|
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
|
return a.Agent.UserEvent(name, payload, coalesce)
|
}
|
|
// Query sends a Query on Serf
|
func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
|
return a.Agent.Query(name, payload, params)
|
}
|
|
func (a *Agent) retryJoin(ctx context.Context) (err error) {
|
if len(a.conf.RetryJoin) == 0 {
|
log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
|
return nil
|
}
|
|
// Count of attempts
|
attempt := 0
|
ticker := time.NewTicker(a.conf.RetryInterval)
|
for {
|
log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
|
var n int
|
|
// Try to join the specified serf nodes
|
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
|
if err == nil {
|
log.Printf("serf: Join completed. Synced with %d initial agents", n)
|
break
|
}
|
attempt++
|
|
// If RetryMaxAttempts is greater than 0, agent will exit
|
// and throw an error when the number of attempts exceeds RetryMaxAttempts,
|
// else agent will try to join other nodes until successful always
|
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
|
err = errors.New("serf: maximum retry join attempts made, exiting")
|
log.Println(err, err.Error())
|
break
|
}
|
select {
|
case <-ctx.Done():
|
err = ctx.Err()
|
goto done
|
// Waiting for ticker to trigger
|
case <-ticker.C:
|
}
|
}
|
done:
|
ticker.Stop()
|
return
|
}
|
|
//GetDbFromCluster get the newest database after join cluster
|
//dbPathWrite the path where to write after got a database,
|
func (a *Agent) GetDbFromCluster(dbPathWrite string) {
|
//members: get name of first member
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
var specmembername string
|
for _, m := range mbs {
|
if m.Addr.String() != a.conf.BindAddr {
|
specmembername = m.Name
|
break
|
}
|
}
|
//fmt.Println(specmembername)
|
|
//query: get db file.
|
params := serf.QueryParam{
|
FilterNodes: strings.Fields(specmembername),
|
}
|
|
resp, err := a.Query(QueryEventGetDB, []byte(""), ¶ms)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
//fmt.Println("err: ", err)
|
}
|
|
go func() {
|
respCh := resp.ResponseCh()
|
for {
|
select {
|
case r := <-respCh:
|
//fmt.Println("x length is: ", len(r.Payload))
|
|
// // byte to file.
|
SerfDbConn.Close()
|
SerfDbConn = nil
|
err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
|
if err != nil {
|
//fmt.Println("query byte to file error!", err)
|
}
|
err := InitDbConn("")
|
if err != nil {
|
//fmt.Println("create db conn of test.db error: ", err)
|
}
|
return
|
}
|
}
|
}()
|
}
|
|
//GetDbFromCluster get the newest database after join cluster
|
//dbPathWrite the path where to write after got a database,
|
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*string, error) {
|
//members: get name of first member
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
specmembername := ""
|
for _, m := range mbs {
|
//fmt.Println("m", m)
|
if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
|
if strings.HasPrefix(a.conf.NodeName, "PAD"){
|
if strings.HasPrefix(m.Name, "PAD") {
|
specmembername = m.Name
|
break
|
}
|
}
|
}
|
}
|
if specmembername == "" {
|
return nil, errors.New("alive node not found in cluster")
|
}
|
//fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername)
|
|
//query: get db file.
|
params := serf.QueryParam{
|
FilterNodes: strings.Fields(specmembername),
|
}
|
|
//SQL
|
tBytes, _ := json.Marshal(tableNames)
|
|
resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
//fmt.Println("QueryEventUpdateDBData err: ")
|
}
|
fmt.Println("Query.resp.err:", err, "resp:", resp)
|
|
var wg sync.WaitGroup
|
wg.Add(1)
|
var dumpSqls string
|
go func() {
|
defer wg.Done()
|
respCh := resp.ResponseCh()
|
for {
|
select {
|
case r := <-respCh:
|
fmt.Println("Query response's len:", len(r.Payload))
|
dumpSqls = string(r.Payload)
|
if len(dumpSqls) >0 {
|
fmt.Println("data dump success")
|
} else {
|
fmt.Println("Query response.len = 0")
|
}
|
return
|
}
|
}
|
}()
|
wg.Wait()
|
return &dumpSqls, nil
|
}
|
|
type SqlUserEvent struct {
|
Owner string `json:"owner"`
|
Sql []string `json:"sql"`
|
}
|
|
//SyncSql boardcast sql to cluster
|
func (a *Agent) SyncSql(sqlOp []string) {
|
// event : use to send command to operate db.
|
var sqlUe = SqlUserEvent{
|
Owner: a.conf.NodeName,
|
Sql: sqlOp,
|
}
|
ueB, err := json.Marshal(sqlUe)
|
if err != nil {
|
//fmt.Println("sqlUE marshal err:", err)
|
return
|
}
|
err = a.UserEvent(UserEventSyncSql, ueB, false)
|
if err == nil || !strings.Contains(err.Error(), "cannot contain") {
|
//fmt.Println("err: ", err)
|
}
|
}
|
|
//Init serf Init
|
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
|
agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
|
if err != nil {
|
//fmt.Printf("InitNode failed, error: %s", err)
|
return agent, err
|
}
|
|
err = agent.JoinByNodeAddrs(addrs)
|
if err != nil {
|
//fmt.Printf("JoinByNodeIP failed, error: %s", err)
|
return agent, err
|
}
|
|
return agent, err
|
}
|
|
//InitNode web后台收到创建集群的请求,
|
func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
|
conf := DefaultConfig()
|
//fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
|
conf.ClusterID = clusterID
|
conf.NodeName = nodeID
|
if password == "" {
|
conf.EncryptKey = DefaultEncryptKey
|
} else {
|
if len(password) >= 16 {
|
password = password[:16]
|
} else {
|
password = fmt.Sprintf("%016s", password)[:16]
|
//return nil, fmt.Errorf("error password")
|
}
|
conf.EncryptKey = password
|
}
|
agent, err := Create(conf, snapshotPath)
|
if err != nil {
|
//fmt.Printf("create agent failed, error: %s", err)
|
return agent, err
|
}
|
|
agent.Start(context.Background())
|
//<- agent.readyCh
|
go func() {
|
agent.ShutdownCh()
|
}()
|
time.Sleep(time.Second)
|
//fmt.Println("Stats:", agent.Agent.Serf().Stats())
|
//fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
|
//fmt.Println("create agent sucess!!")
|
|
return agent, nil
|
}
|
|
func (a *Agent) JoinByNodeAddrs(addrs []string) error {
|
var nodes []string
|
|
if len(addrs) == 0 {
|
return fmt.Errorf("No Nodes To Join!")
|
}
|
for _, addr := range addrs {
|
nodes = append(nodes, addr)
|
}
|
|
a.Agent.Join(nodes, true)
|
|
return nil
|
}
|
|
//func (a *Agent) JoinByNodeIP(ips []string) error {
|
// var nodes []string
|
//
|
// if len(ips) == 0 {
|
// return fmt.Errorf("No Nodes To Join!")
|
// }
|
// for _, ip := range ips {
|
// node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
|
// nodes = append(nodes, node)
|
// }
|
//
|
// n, err := a.Agent.Join(nodes, true)
|
// if err != nil || n == 0 {
|
// return fmt.Errorf("Error Encrypt Key!")
|
// }
|
//
|
// return err
|
//}
|
|
func (a *Agent) GetNodes() (nodes []NodeInfo) {
|
var node NodeInfo
|
//fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
|
mbs := a.GroupMembers(a.conf.ClusterID)
|
for _, mb := range mbs {
|
node.NodeID = mb.Name
|
node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
|
node.IsAlive = int(mb.Status)
|
node.ClusterID = mb.Tags[tagKeyClusterID]
|
|
nodes = append(nodes, node)
|
}
|
|
return nodes
|
}
|