From 934c3f788266b32ef4e85b116c6686d1a08d76d7 Mon Sep 17 00:00:00 2001 From: chenshijun <csj_sky@126.com> Date: 星期五, 06 九月 2019 14:22:11 +0800 Subject: [PATCH] 完善一些业务处里 --- agent.go | 179 ++++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 116 insertions(+), 63 deletions(-) diff --git a/agent.go b/agent.go index 0a6b6ac..f6657d6 100644 --- a/agent.go +++ b/agent.go @@ -26,6 +26,7 @@ "net" "os" "strconv" + "sync" //"os" "strings" @@ -38,8 +39,10 @@ ) const ( - QueryEventGetDB = "GetDatabase" - QueryEventUpdateDBData = "UpdateDBData" + QueryEventGetDB = "GetDatabase" + QueryEventUpdateDBData = "UpdateDBData" + UserEventSyncSql = "SyncSql" + UserEventSyncDbTablePersonCache = "SyncCache" ) // Agent warps the serf agent @@ -118,17 +121,19 @@ switch ev := event.(type) { case serf.UserEvent: - var sqlUe SqlUserEvent - fmt.Println(string(ev.Payload)) - err := json.Unmarshal(ev.Payload, &sqlUe) - if err !=nil { - fmt.Println("sqlUe unmarshal err:",err) - return - } - if sqlUe.Owner != a.conf.NodeName { - results, _ := ExecuteWriteSql(sqlUe.Sql) - //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) - fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",results) + if ev.Name == UserEventSyncSql { + var sqlUe SqlUserEvent + fmt.Println(string(ev.Payload)) + err := json.Unmarshal(ev.Payload, &sqlUe) + if err != nil { + fmt.Println("sqlUe unmarshal err:", err) + return + } + if sqlUe.Owner != a.conf.NodeName { + results, _ := ExecuteWriteSql(sqlUe.Sql, true) + //flag, _ := ExecuteSqlByGorm(sqlUe.Sql) + fmt.Println("userEvent exec ", sqlUe.Sql, ",Result:", results) + } } case *serf.Query: @@ -189,37 +194,25 @@ //var res []*Rows //json.Unmarshal(bytesReturn, &res) } - case serf.MemberEvent: - if event.EventType() == serf.EventMemberLeave { - if ev.Members !=nil && len(ev.Members) ==1 { - leaveMember := ev.Members[0] - leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'" - fmt.Println("leaveSql:",leaveSql) - //ExecuteSqlByGorm([]string{ leaveSql }) - - fmt.Println("EventMemberLeave,current Members:",ev.Members) + case serf.MemberEvent: + if event.EventType() == serf.EventMemberLeave { + if ev.Members != nil && len(ev.Members) == 1 { + leaveMember := ev.Members[0] + leaveSql := "delete from cluster_node where node_id='" + leaveMember.Name + "'" + fmt.Println("leaveSql:", leaveSql) + _, err := ExecuteWriteSql([]string{leaveSql}, false) + if err != nil { + fmt.Printf("err: %s\n", err) } - return - } + fmt.Println("EventMemberLeave,current Members:", ev.Members) + } + return + } default: fmt.Printf("Unknown event type: %s\n", ev.EventType().String()) } - - //if event.EventType() != serf.EventMemberJoin { - // fmt.Printf("event.EventType() != serf.EventMemberJoin") - // return - //} - // - //if a.conf.Mode == ModeCluster { - // if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect { - // fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect") - // return - // } - //} - //a.DeregisterEventHandler(a) - //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { @@ -422,23 +415,83 @@ }() } -type SqlUserEvent struct { - Owner string `json:"owner"` - Sql []string `json:"sql"` +//GetDbFromCluster get the newest database after join cluster +//dbPathWrite the path where to write after got a database, +func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string, error) { + //members: get name of first member + mbs := a.GroupMembers(a.conf.ClusterID) + var specmembername string + for _, m := range mbs { + fmt.Println("m", m) + if m.Name != a.conf.NodeName { //鍓嶇紑锛欴SVAD:鍒嗘瀽鏈嶅姟鍣� DSPAD:杩涘嚭鍏ad + if strings.HasPrefix(a.conf.NodeName, "DSVAD"){ + if strings.HasPrefix(m.Name, "DSVAD") { + specmembername = m.Name + break + } + }else{ + specmembername = m.Name + break + } + } + } + fmt.Println("mbs:", mbs, "a.conf.BindAddr:", a.conf.BindAddr, "specmembername:", specmembername) + + //query: get db file. + params := serf.QueryParam{ + FilterNodes: strings.Fields(specmembername), + } + + //SQL + tBytes, _ := json.Marshal(tableNames) + + resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) + if err == nil || !strings.Contains(err.Error(), "cannot contain") { + fmt.Println("err: ", err) + } + fmt.Println("Query.resp.err:", err, "resp:", resp) + + var wg sync.WaitGroup + wg.Add(1) + var dumpSqls []string + go func() { + defer wg.Done() + respCh := resp.ResponseCh() + for { + select { + case r := <-respCh: + fmt.Println("Query response's len:", len(r.Payload)) + err := json.Unmarshal(r.Payload, &dumpSqls) + if err == nil { + fmt.Println("dumpSql:", dumpSqls) + fmt.Println("data dump success") + } + return + } + } + }() + wg.Wait() + return &dumpSqls, nil } + +type SqlUserEvent struct { + Owner string `json:"owner"` + Sql []string `json:"sql"` +} + //SyncSql boardcast sql to cluster func (a *Agent) SyncSql(sqlOp []string) { // event : use to send command to operate db. var sqlUe = SqlUserEvent{ Owner: a.conf.NodeName, - Sql: sqlOp, + Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) - if err !=nil { - fmt.Println("sqlUE marshal err:",err) + if err != nil { + fmt.Println("sqlUE marshal err:", err) return } - err = a.UserEvent("SyncSql", []byte(ueB), false) + err = a.UserEvent(UserEventSyncSql, ueB, false) if err == nil || !strings.Contains(err.Error(), "cannot contain") { fmt.Println("err: ", err) } @@ -512,24 +565,24 @@ return nil } -func (a *Agent) JoinByNodeIP(ips []string) error { - var nodes []string - - if len(ips) == 0 { - return fmt.Errorf("No Nodes To Join!") - } - for _, ip := range ips { - node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) - nodes = append(nodes, node) - } - - n, err := a.Agent.Join(nodes, true) - if err != nil || n == 0 { - return fmt.Errorf("Error Encrypt Key!") - } - - return err -} +//func (a *Agent) JoinByNodeIP(ips []string) error { +// var nodes []string +// +// if len(ips) == 0 { +// return fmt.Errorf("No Nodes To Join!") +// } +// for _, ip := range ips { +// node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) +// nodes = append(nodes, node) +// } +// +// n, err := a.Agent.Join(nodes, true) +// if err != nil || n == 0 { +// return fmt.Errorf("Error Encrypt Key!") +// } +// +// return err +//} func (a *Agent) GetNodes() (nodes []NodeInfo) { var node NodeInfo -- Gitblit v1.8.0