基于serf的数据库同步模块库
4个文件已修改
2个文件已添加
310 ■■■■ 已修改文件
API.txt 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go 143 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent_test.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
searcher.go 110 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
API.txt
New file
@@ -0,0 +1,19 @@
1. func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error)
初始化:查询数据库,如果数据库有数据,将得到集群id,节点id,密码,其他节点ip,然后调用初始化函数,即可自动通过其他节点加入集群。若是空,则不初始化,等待页面填写参数并启动。注意事项:目前集群id并没有真正意义。
2. func (a *Agent) JoinByNodeIP(ips []string) error
加入集群:先初始化节点, 然后通过节点ip加入集群和密码。注意事项:集群id在初始化的时候就需要填写,但是还未加入集群,无法获取集群id。所以目前集群id未使用。
3. func (a *Agent) Stop()
退出集群:退出集群后,外部需要清空同步库的所有数据。
4. func (a *Agent) GetNodes() (nodes []Node)
获取集群节点列表:通过该接口获取节点列表,然后维护到数据库和页面展示。
5. func (a *Agent)GetDbFromCluster(dbPathWrite string)
获取数据库文件:新节点加入集群后,需要调用该接口去集群中任意一个结点获取一个数据库文件。数据库文件包含本地库和同步库的表结构,但是只有同步库有数据,本地库是空的。
6. func (a *Agent)SyncSql(sqlOp string)
同步数据到集群:所有操作同步库的SQL操作都需要同步到集群,集群其他节点收到后,调用数据库接口写入数据库。
7. 查找集群信息:未加入集群前,查询集群信息。
agent.go
@@ -18,17 +18,27 @@
import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "net"
    "os"
    //"os"
    "strings"
    "time"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "log"
)
const (
    QueryEventGetDB        = "GetDatabase"
    QueryEventUpdateDBData = "UpdateDBData"
)
// Agent warps the serf agent
@@ -37,6 +47,13 @@
    conf    *Config
    readyCh chan struct{}
    errorCh chan error
}
type NodeInfo struct {
    ClusterID   string `json:"clusterID"`
    NodeID      string `json:"nodeID"`
    NodeAddress string `json:"nodeAddress"`
    IsAlive     int    `json:"isAlive"`
}
// Create create serf agent with config
@@ -88,6 +105,8 @@
            a.errorCh <- err
        }
    }
    go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
// HandleEvent Handles serf.EventMemberJoin events,
@@ -102,40 +121,69 @@
        var tmpstringslice []string
        tmpstringslice = append(tmpstringslice, string(ev.Payload))
        fmt.Println(tmpstringslice)
        results, err := DoExecute(tmpstringslice)
        results, err := ExecuteWriteSql(tmpstringslice)
        for _, result := range results {
            fmt.Println(result, "results err: ", err)
        }
    case *serf.Query:
        //bak file and send resp
        filename, err := BakDbFile()
        if err != nil {
            fmt.Println("bak db file error!")
            return
        }
        fmt.Println(filename)
        filebuf, err := ioutil.ReadFile(filename)
        fmt.Println("filebuf: ", len(filebuf))
        if err != nil {
            fmt.Printf("file to []bytes error: %s\n", err)
            return
        }
        err = os.Remove(filename)
        if err != nil {
            fmt.Printf("remove file%s\n failed", filename)
            return
        }
        fmt.Println("query payload: ", len(ev.Payload))
        if query, ok := event.(*serf.Query); ok {
            if err := query.Respond(filebuf); err != nil {
                fmt.Printf("err: %s\n", err)
        if ev.Name == QueryEventGetDB {
            //bak file and send resp
            filename, err := BakDbFile()
            if err != nil {
                fmt.Println("bak db file error!")
                return
            }
            fmt.Println(filename)
            filebuf, err := ioutil.ReadFile(filename)
            fmt.Println("filebuf: ", len(filebuf))
            if err != nil {
                fmt.Printf("file to []bytes error: %s\n", err)
                return
            }
            err = os.Remove(filename)
            if err != nil {
                fmt.Printf("remove file%s\n failed", filename)
                return
            }
            fmt.Println("query payload: ", len(ev.Payload))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(filebuf); err != nil {
                    fmt.Printf("err: %s\n", err)
                    return
                }
            }
        } else if ev.Name == QueryEventUpdateDBData {
            fmt.Println(string(ev.Payload))
            var tmpstringslice []string
            tmpstringslice = append(tmpstringslice, string(ev.Payload))
            fmt.Println(tmpstringslice)
            rows, err := ExecuteQuerySql(tmpstringslice)
            if err != nil {
                fmt.Println("err: ", err)
                return
            }
            var rowsReturn []Rows
            for _, r := range rows {
                rowsReturn = append(rowsReturn, *r)
            }
            bytesReturn, err := json.Marshal(rowsReturn)
            fmt.Println("results: ", bytesReturn)
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(bytesReturn); err != nil {
                    fmt.Printf("err: %s\n", err)
                    return
                }
            }
            //var res []*Rows
            //json.Unmarshal(bytesReturn, &res)
        }
    default:
@@ -155,6 +203,38 @@
    //}
    //a.DeregisterEventHandler(a)
    //close(a.readyCh)
}
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
    //serf := a.serf
    serf := a.Agent.Serf()
    mb := serf.LocalMember()
    mblist := serf.Memberlist()
    fmt.Println("mb:", mb)
    // copy local node
    localNode := *mblist.LocalNode()
    nodeID := a.conf.NodeName
    nodeAddress := localNode.Address()
    clusterID := mb.Tags[tagKeyClusterID]
    isAlive := int(mb.Status)
    message, _ := json.Marshal(NodeInfo{
        clusterID,
        nodeID,
        nodeAddress,
        isAlive,
    })
    // replace node address
    localNode.Addr = net.ParseIP(BroadcastIP)
    //localNode.Addr = net.IPv4(255,255,255,255)
    localNode.Port = BroadcastPort
    for {
        // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
        mblist.SendBestEffort(&localNode, []byte(message))
        time.Sleep(delay)
    }
}
// Ready Returns a channel that will be closed when serf is ready
@@ -296,7 +376,7 @@
        FilterNodes: strings.Fields(specmembername),
    }
    resp, err := a.Query("getDatabase", []byte(""), &params)
    resp, err := a.Query(QueryEventGetDB, []byte(""), &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
@@ -355,7 +435,7 @@
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
    conf.ClusterID = clusterID
    conf.NodeName = nodeID
    if password == "" {
        conf.EncryptKey = DefaultEncryptKey
@@ -389,7 +469,10 @@
func (a *Agent) JoinByNodeIP(ips []string) error {
    var nodes []string
    fmt.Println("len(ips):", len(ips))
    if len(ips) == 0 {
        return fmt.Errorf("No Nodes To Join!")
    }
    for _, ip := range ips {
        node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
        nodes = append(nodes, node)
@@ -397,7 +480,9 @@
    n, err := a.Agent.Join(nodes, true)
    if err != nil || n == 0 {
        return fmt.Errorf("Error Agent.Join!")
        a.Stop()
        fmt.Println("Stop node")
        return fmt.Errorf("Error Encrypt Key!")
    }
    return err
agent_test.go
@@ -35,7 +35,6 @@
    fmt.Println("LocalMember1:", agent.LocalMember())
    agent.Start(context.Background())
    //<- agent.readyCh
    go func() {
@@ -85,5 +84,3 @@
        t.Errorf("angent shutdown failed, error: %s", err)
    }
}
config.go
@@ -35,13 +35,16 @@
    ModeCluster        = "cluster"
    retryMaxAttempts   = 3
    groupExpect        = 3
    DefaultEncryptKey   = "bjbasic@aiotlink"
    tagKeyClusterID  = "syncer-cluster-name"
    DefaultEncryptKey  = "bjbasic@aiotlink"
    tagKeyClusterID    = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
    MaxQueryRespSize = 50 * 1024 *1024
    MaxQuerySize = 1024 *1024
    MaxUserEventSize = 1024
    BroadcastIP        = "255.255.255.255"
    BroadcastPort      = 30193
    BroadcastInterval  = 5
    MaxQueryRespSize   = 50 * 1024 * 1024
    MaxQuerySize       = 50 * 1024 * 1024
    MaxUserEventSize   = 5 * 1024
)
// DefaultConfig default config
@@ -136,4 +139,3 @@
    return addr.IP.String(), addr.Port, nil
}
dbself.go
@@ -10,18 +10,17 @@
    "sync"
)
const (
    PersonSqliteDBPath = "/opt/workspace/DataBases/sync.db"
    PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
)
var syncMut    sync.Mutex
var syncMut sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func InitDbConn(dbPath string) error {
    if dbPath == ""    {
    if dbPath == "" {
        dbPath = PersonSqliteDBPath
    }
@@ -73,7 +72,7 @@
}
// do exet when get querystring.
func DoExecute(sqlString []string) ([]*Result, error) {
func ExecuteWriteSql(sqlString []string) ([]*Result, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    allResults, err := SerfDbConn.Execute(sqlString, false, false)
@@ -84,6 +83,18 @@
    return allResults, nil
}
// do exet when get querystring.
func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    rows, err := SerfDbConn.Query(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
    }
    return rows, nil
}
func Dumpdb() {
    var b strings.Builder
searcher.go
New file
@@ -0,0 +1,110 @@
package syncdb
import (
    "encoding/json"
    "fmt"
    "time"
    "github.com/hashicorp/memberlist"
)
var (
    members [][]byte
    delay   time.Duration
)
// delegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type delegate struct{}
// NodeMeta is the delegate method, must implement.
func (d *delegate) NodeMeta(limit int) []byte {
    return []byte{}
}
// LocalState is the delegate method, must implement.
func (d *delegate) LocalState(join bool) []byte {
    return []byte{}
}
// MergeRemoteState is the delegate method, must implement.
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
}
// GetBroadcasts is the delegate method, must implement.
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
    return [][]byte{}
}
// eventDelegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type eventDelegate struct{}
// NotifyJoin is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) {
}
// NotifyLeave is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) {
}
// NotifyUpdate is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) {
}
// NotifyMsg is called when a user-data message is received
func (d *delegate) NotifyMsg(b []byte) {
    // logMsg(b)
    members = append(members, b)
}
func logMsg(b []byte) {
    type nodeInfo struct {
        NodeName string `json:"name"`
        Address  string `json:"address"`
    }
    node := nodeInfo{}
    if err := json.Unmarshal(b, &node); err != nil {
        fmt.Println("Umarshal failed:", err)
        return
    }
    fmt.Println(node)
}
func CreateSearchNode(key string) (*memberlist.Memberlist, error) {
    conf := memberlist.DefaultLocalConfig()
    conf.Events = &eventDelegate{}
    conf.Delegate = &delegate{}
    conf.BindAddr = BroadcastIP
    conf.BindPort = BroadcastPort
    conf.Name = "Cluster-Searcher"
    keyring, err := memberlist.NewKeyring(nil, []byte(key))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        return nil, err
    }
    conf.Keyring = keyring
    return memberlist.Create(conf)
}
func CreateSearchNodeWhitClose(key string, delay time.Duration) [][]byte {
    m, err := CreateSearchNode(key)
    if err == nil {
        // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
        time.Sleep(delay)
        m.Shutdown()
    }
    return members
}
func CloseSearchNode(m *memberlist.Memberlist) error {
    return m.Shutdown()
}
func GetSearchNodes() [][]byte {
    return members
}