基于serf的数据库同步模块库
chenshijun
2019-08-02 86341ab9e3d7eda30fbb4df2a77b9419a89f97c1
添加基础业务处理接口
5个文件已修改
1个文件已添加
425 ■■■■■ 已修改文件
agent.go 237 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent_test.go 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
db_test.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -14,16 +14,25 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
package syncdb
import (
    "context"
    "errors"
    "fmt"
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "os"
    "strconv"
    //"os"
    "strings"
    "time"
    "github.com/apache/servicecomb-service-center/pkg/log"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "log"
)
// Agent warps the serf agent
@@ -43,10 +52,21 @@
    }
    // create serf agent with serf config
    fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    if err != nil {
        return nil, err
    }
    // Create the keyring
    keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        return nil, err
    }
    serfConf.MemberlistConfig.Keyring = keyring
    fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
    return &Agent{
        Agent:   serfAgent,
        conf:    conf,
@@ -59,7 +79,7 @@
func (a *Agent) Start(ctx context.Context) {
    err := a.Agent.Start()
    if err != nil {
        log.Errorf(err, "start serf agent failed")
        log.Println(err, "start serf agent failed")
        a.errorCh <- err
        return
    }
@@ -67,7 +87,7 @@
    err = a.retryJoin(ctx)
    if err != nil {
        log.Errorf(err, "start serf agent failed")
        log.Println(err, "start serf agent failed")
        if err != ctx.Err() && a.errorCh != nil {
            a.errorCh <- err
        }
@@ -79,17 +99,66 @@
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
    if event.EventType() != serf.EventMemberJoin {
    switch ev := event.(type) {
    case serf.UserEvent:
        fmt.Println(string(ev.Payload))
        var tmpstringslice []string
        tmpstringslice = append(tmpstringslice, string(ev.Payload))
        fmt.Println(tmpstringslice)
        results, err := DoExecute(tmpstringslice)
        for _, result := range results {
            fmt.Println(result, "results err: ", err)
        }
    case *serf.Query:
        //bak file and send resp
        filename, err := BakDbFile()
        if err != nil {
            fmt.Println("bak db file error!")
            return
        }
        fmt.Println(filename)
        filebuf, err := ioutil.ReadFile(filename)
        fmt.Println("filebuf: ", len(filebuf))
        if err != nil {
            fmt.Printf("file to []bytes error: %s\n", err)
        return
    }
    if a.conf.Mode == ModeCluster {
        if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
        err = os.Remove(filename)
        if err != nil {
            fmt.Printf("remove file%s\n failed", filename)
            return
        }
        fmt.Println("query payload: ", len(ev.Payload))
        if query, ok := event.(*serf.Query); ok {
            if err := query.Respond(filebuf); err != nil {
                fmt.Printf("err: %s\n", err)
            return
        }
    }
    a.DeregisterEventHandler(a)
    close(a.readyCh)
    default:
        fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
    }
    //if event.EventType() != serf.EventMemberJoin {
    //    fmt.Printf("event.EventType() != serf.EventMemberJoin")
    //    return
    //}
    //
    //if a.conf.Mode == ModeCluster {
    //    if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
    //        fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
    //        return
    //    }
    //}
    //a.DeregisterEventHandler(a)
    //close(a.readyCh)
}
// Ready Returns a channel that will be closed when serf is ready
@@ -122,13 +191,13 @@
    return nil
}
// GroupMembers returns a point-in-time snapshot of the members of by groupName
func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
// GroupMembers returns a point-in-time snapshot of the members of by clusterID
func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) {
    serfAgent := a.Agent.Serf()
    if serfAgent != nil {
        for _, member := range serfAgent.Members() {
            log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
            if member.Tags[tagKeyClusterName] == groupName {
            log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
            if member.Tags[tagKeyClusterID] == clusterID {
                members = append(members, member)
            }
        }
@@ -172,7 +241,7 @@
func (a *Agent) retryJoin(ctx context.Context) (err error) {
    if len(a.conf.RetryJoin) == 0 {
        log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
        log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
        return nil
    }
@@ -180,13 +249,13 @@
    attempt := 0
    ticker := time.NewTicker(a.conf.RetryInterval)
    for {
        log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
        log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
        var n int
        // Try to join the specified serf nodes
        n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
        if err == nil {
            log.Infof("serf: Join completed. Synced with %d initial agents", n)
            log.Printf("serf: Join completed. Synced with %d initial agents", n)
            break
        }
        attempt++
@@ -196,7 +265,7 @@
        // else agent will try to join other nodes until successful always
        if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
            err = errors.New("serf: maximum retry join attempts made, exiting")
            log.Errorf(err, err.Error())
            log.Println(err, err.Error())
            break
        }
        select {
@@ -211,3 +280,137 @@
    ticker.Stop()
    return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
    for _, m := range mbs {
        if m.Addr.String() != a.conf.BindAddr {
            specmembername = m.Name
            break
        }
    }
    fmt.Println(specmembername)
    //query: get db file.
    params := serf.QueryParam{
        FilterNodes: strings.Fields(specmembername),
    }
    resp, err := a.Query("getDatabase", []byte(""), &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
    go func() {
        respCh := resp.ResponseCh()
        for {
            select {
            case r := <-respCh:
                fmt.Println("x length is: ", len(r.Payload))
                // // byte to file.
                Dbconn.Close()
                Dbconn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    fmt.Println("query byte to file error!", err)
                }
                err := GetConn()
                if err != nil {
                    fmt.Println("create db conn of test.db error: ", err)
                }
                return
            }
        }
    }()
}
//SyncSql boardcast sql to cluster
func (a *Agent)SyncSql(sqlOp string) {
    // event : use to send command to operate db.
    err := a.UserEvent("SyncSql", []byte(sqlOp), false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
}
//Init serf Init
//web后台收到创建集群的请求,
func Init(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
    conf.NodeName = nodeID
    if password == "" {
        conf.EncryptKey = DefaultEncryptKey
    }else{
        if len(password) >= 16 {
            password = password[:16]
        }else{
            password = fmt.Sprintf("%016s", password)[:16]
            //return nil, fmt.Errorf("error password")
        }
        conf.EncryptKey = password
    }
    agent, err := Create(conf)
    if err != nil {
        fmt.Printf("create agent failed, error: %s", err)
        return agent, err
    }
    agent.Start(context.Background())
    //<- agent.readyCh
    go func() {
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    fmt.Println("Stats:",agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
    fmt.Printf("create agent sucess!!")
    return agent, nil
}
func (a *Agent) JoinByNodeIP(ip string) error {
    n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
    if err != nil || n == 0{
        a.Stop()
        fmt.Println("Stop node")
        return fmt.Errorf("Error Encrypt Key!")
    }
    return err
}
type Node struct {
    clusterID string
    NodeID string
    IP string
    isAlive int   //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
func (a *Agent) GetNodes() (nodes []Node) {
    var node Node
    fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
    mbs := a.GroupMembers(a.conf.ClusterID)
    for _, mb := range mbs {
        node.NodeID = mb.Name
        node.IP = mb.Addr.String()
        node.isAlive = int(mb.Status)
        node.clusterID = mb.Tags[tagKeyClusterID]
        nodes = append(nodes, node)
    }
    return nodes
}
agent_test.go
@@ -14,28 +14,39 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
package syncdb
import (
    "context"
    "fmt"
    "github.com/hashicorp/serf/serf"
    "testing"
    "time"
    "github.com/hashicorp/serf/serf"
)
func TestAgent(t *testing.T) {
    conf := DefaultConfig()
    conf.ClusterID = "testCluster"
    conf.NodeName = "testnode"
    agent, err := Create(conf)
    if err != nil {
        t.Errorf("create agent failed, error: %s", err)
    }
    fmt.Println("LocalMember1:", agent.LocalMember())
    agent.Start(context.Background())
    <- agent.readyCh
    //<- agent.readyCh
    go func() {
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    fmt.Println("LocalMember2:", agent.LocalMember())
    fmt.Println("ClusterID:", agent.conf.ClusterID)
    fmt.Println("NodeName:", agent.conf.NodeName)
    err = agent.UserEvent("test", []byte("test"), true)
    if err != nil {
@@ -46,13 +57,15 @@
    if err != nil {
        t.Errorf("query for other node failed, error: %s", err)
    }
    agent.LocalMember()
    fmt.Println("LocalMember:", agent.LocalMember())
    //agent.LocalMember()
    agent.Member("testnode")
    mb := agent.Member("testnode")
    fmt.Println("mb:", mb)
    agent.SerfConfig()
    _, err = agent.Join([]string{"127.0.0.1:9999"}, true)
    _, err = agent.Join([]string{"192.168.1.123:5000"}, true)
    if err != nil {
        t.Logf("join to other node failed, error: %s", err)
    }
@@ -72,3 +85,5 @@
        t.Errorf("angent shutdown failed, error: %s", err)
    }
}
config.go
@@ -14,27 +14,29 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package serf
package syncdb
import (
    "fmt"
    "net"
    "strconv"
    "github.com/apache/servicecomb-service-center/syncer/pkg/utils"
    //"github.com/apache/servicecomb-service-center/syncer/pkg/utils"
    "github.com/hashicorp/memberlist"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
)
const (
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultBindPort    = 5000//30190
    DefaultRPCPort     = 7373//30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
    retryMaxAttempts   = 3
    groupExpect        = 3
    tagKeyClusterName  = "syncer-cluster-name"
    DefaultEncryptKey   = "bjbasic@aiotlink"
    tagKeyClusterID  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
)
@@ -42,6 +44,9 @@
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
    agentConf.QuerySizeLimit = 50 * 1024 *1024
    agentConf.UserEventSizeLimit = 1024
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
@@ -58,7 +63,7 @@
    Mode string `json:"mode"`
    // name to group members into cluster
    ClusterName string `json:"cluster_name"`
    ClusterID string `json:"cluster_name"`
    // port to communicate between cluster members
    ClusterPort int `yaml:"cluster_port"`
@@ -77,7 +82,7 @@
func (c *Config) convertToSerf() (*serf.Config, error) {
    serfConf := serf.DefaultConfig()
    bindIP, bindPort, err := utils.SplitHostPort(c.BindAddr, DefaultBindPort)
    bindIP, bindPort, err := SplitHostPort(c.BindAddr, DefaultBindPort)
    if err != nil {
        return nil, fmt.Errorf("invalid bind address: %s", err)
    }
@@ -96,10 +101,11 @@
    serfConf.MemberlistConfig.BindAddr = bindIP
    serfConf.MemberlistConfig.BindPort = bindPort
    serfConf.NodeName = c.NodeName
    serfConf.Tags = map[string]string{TagKeyRPCPort: strconv.Itoa(c.RPCPort)}
    if c.ClusterName != "" {
        serfConf.Tags[tagKeyClusterName] = c.ClusterName
    if c.ClusterID != "" {
        serfConf.Tags[tagKeyClusterID] = c.ClusterID
        serfConf.Tags[TagKeyClusterPort] = strconv.Itoa(c.ClusterPort)
    }
@@ -108,3 +114,23 @@
    }
    return serfConf, nil
}
// SplitHostPort returns the parts of the address and port. If the port does not exist, use defaultPort.
func SplitHostPort(address string, defaultPort int) (string, int, error) {
    _, _, err := net.SplitHostPort(address)
    if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
        address = fmt.Sprintf("%s:%d", address, defaultPort)
        _, _, err = net.SplitHostPort(address)
    }
    if err != nil {
        return "", 0, err
    }
    addr, err := net.ResolveTCPAddr("tcp", address)
    if err != nil {
        return "", 0, err
    }
    return addr.IP.String(), addr.Port, nil
}
db.go
@@ -1,6 +1,6 @@
// Package db exposes a lightweight abstraction over the SQLite code.
// It performs some basic mapping of lower-level types to rqlite types.
package db
package syncdb
import (
    "database/sql/driver"
db_test.go
@@ -1,4 +1,4 @@
package db
package syncdb
import (
    "encoding/json"
dbself.go
New file
@@ -0,0 +1,111 @@
package syncdb
import (
    "errors"
    "fmt"
    "os"
    "os/exec"
    "path/filepath"
    "strings"
    "sync"
)
var Dbconn *Conn
var sy sync.Mutex
func init() {
    GetConn()
}
// get Conn of db for do execute.
func GetConn() error {
    var err error
    path, err := GetCurrentPath()
    if err != nil {
        return errors.New("get current path error")
    }
    filepath := fmt.Sprintf("%stest.db", path)
    fmt.Println("self: ========>", filepath)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        return err
    }
    Dbconn, err = db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        return err
    }
    return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
    path, err := GetCurrentPath()
    if err != nil {
        return "", errors.New("get current path error")
    }
    filepath := fmt.Sprintf("%stmptest.db", path)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        return "", err
    }
    tmpconn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        return "", err
    }
    defer tmpconn.Close()
    err = Dbconn.Backup(tmpconn)
    if err != nil {
        return "", err
    }
    return filepath, nil
}
// do exet when get querystring.
func DoExecute(executestring []string) ([]*Result, error) {
    sy.Lock()
    defer sy.Unlock()
    allResults, err := Dbconn.Execute(executestring, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
    }
    return allResults, nil
}
// get current path
func GetCurrentPath() (string, error) {
    file, err := exec.LookPath(os.Args[0])
    if err != nil {
        return "", err
    }
    path, err := filepath.Abs(file)
    if err != nil {
        return "", err
    }
    i := strings.LastIndex(path, "/")
    if i < 0 {
        i = strings.LastIndex(path, "\\")
    }
    if i < 0 {
        return "", errors.New(`error: Can't find "/" or "\".`)
    }
    return string(path[0 : i+1]), nil
}
func Dumpdb() {
    var b strings.Builder
    if err := Dbconn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}