基于serf的数据库同步模块库
chenshijun
2019-08-06 954aef5b16c944b0f27e03ccaafb1748b62f0d48
agent.go
@@ -18,12 +18,12 @@
import (
   "context"
   "encoding/json"
   "errors"
   "fmt"
   "github.com/hashicorp/memberlist"
   "io/ioutil"
   "os"
   "strconv"
   //"os"
   "strings"
@@ -35,6 +35,10 @@
   "log"
)
const (
   QueryEventGetDB = "GetDatabase"
   QueryEventUpdateDBData = "UpdateDBData"
)
// Agent warps the serf agent
type Agent struct {
   *agent.Agent
@@ -106,13 +110,15 @@
      var tmpstringslice []string
      tmpstringslice = append(tmpstringslice, string(ev.Payload))
      fmt.Println(tmpstringslice)
      results, err := DoExecute(tmpstringslice)
      results, err := ExecuteWriteSql(tmpstringslice)
      for _, result := range results {
         fmt.Println(result, "results err: ", err)
      }
   case *serf.Query:
      if ev.Name == QueryEventGetDB{
      //bak file and send resp
      filename, err := BakDbFile()
      if err != nil {
@@ -141,6 +147,34 @@
            return
         }
      }
      } else if ev.Name == QueryEventUpdateDBData {
         fmt.Println(string(ev.Payload))
         var tmpstringslice []string
         tmpstringslice = append(tmpstringslice, string(ev.Payload))
         fmt.Println(tmpstringslice)
         rows, err := ExecuteQuerySql(tmpstringslice)
         if err != nil {
            fmt.Println("err: ", err)
            return
         }
         var rowsReturn []Rows
         for _,r := range rows {
            rowsReturn = append(rowsReturn, *r)
         }
         bytesReturn, err := json.Marshal(rowsReturn)
         fmt.Println("results: ", bytesReturn)
         if query, ok := event.(*serf.Query); ok {
            if err := query.Respond(bytesReturn); err != nil {
               fmt.Printf("err: %s\n", err)
               return
            }
         }
         //var res []*Rows
         //json.Unmarshal(bytesReturn, &res)
      }
   default:
      fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -281,8 +315,6 @@
   return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
@@ -302,7 +334,7 @@
      FilterNodes: strings.Fields(specmembername),
   }
   resp, err := a.Query("getDatabase", []byte(""), &params)
   resp, err := a.Query(QueryEventGetDB, []byte(""), &params)
   if err == nil || !strings.Contains(err.Error(), "cannot contain") {
      fmt.Println("err: ", err)
   }
@@ -315,13 +347,13 @@
            fmt.Println("x length is: ", len(r.Payload))
            // // byte to file.
            Dbconn.Close()
            Dbconn = nil
            SerfDbConn.Close()
            SerfDbConn = nil
            err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
            if err != nil {
               fmt.Println("query byte to file error!", err)
            }
            err := GetConn()
            err := InitDbConn("")
            if err != nil {
               fmt.Println("create db conn of test.db error: ", err)
            }
@@ -341,8 +373,24 @@
}
//Init serf Init
//web后台收到创建集群的请求,
func Init(clusterID string, password string, nodeID string) (*Agent, error) {
func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
   agent, err := InitNode(clusterID, password, nodeID)
   if err != nil {
      fmt.Printf("InitNode failed, error: %s", err)
      return agent, err
   }
   err = agent.JoinByNodeIP(ips)
   if err != nil {
      fmt.Printf("JoinByNodeIP failed, error: %s", err)
      return agent, err
   }
   return agent, err
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
   conf := DefaultConfig()
   fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
   //conf.ClusterID = clusterID
@@ -377,8 +425,14 @@
   return agent, nil
}
func (a *Agent) JoinByNodeIP(ip string) error {
   n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
func (a *Agent) JoinByNodeIP(ips []string) error {
   var nodes []string
   for _, ip := range ips {
      node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
      nodes = append(nodes, node)
   }
   n, err := a.Agent.Join(nodes, true)
   if err != nil || n == 0{
      a.Stop()
      fmt.Println("Stop node")