package serf import ( "basic.com/syncdb.git" "basic.com/valib/logger.git" "basic.com/valib/serf.git/serf" "encoding/json" "errors" "fmt" "strings" "sync" "time" "vamicro/config" "vamicro/system-service/models" ) const ( QueryEventUpdateDBData = "UpdateDBData" QueryNodesByTopic = "queryNodeByTopic" QueryRpc = "queryRpc" UserEventSyncSql = "SyncSql" UserEventSyncDbTablePersonCache = "SyncCache" UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改 UserEventChangeMaster = "ChangeMaster " //修改节点状态 UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息 DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息 UserEventSyncMessage = "SyncMessageForProc" // 为其他进程同步消息 TcpTransportPort = 30194 //tcp传输大数据量接口 SUserEventSyncMessage ) var SyncDbTablePersonCacheChan = make(chan []byte, 512) var SyncVirtualIpChan = make(chan []byte, 512) var SyncProcMessageChan = make(chan []byte, 512) func HandleSerfEvent(event serf.Event) { switch ev := event.(type) { case serf.UserEvent: if ev.Name == UserEventSyncSql { HandleUserEventSyncSql(ev) } else if ev.Name == UserEventSyncDbTablePersonCache { HandleUserEventSyncDbTablePersonCache(ev) } else if ev.Name == UserEventSyncVirtualIp { HandleUserEventSyncVirtualIp(ev) } else if ev.Name == UserEventChangeMaster { HandleUserEventChangeMaster(ev) } else if ev.Name == UserEventSyncRegisterInfo { HandleSyncRegisterInfo(ev) } else if ev.Name == DataSystemSerfSubscribe { HandleDataSystemSerfSub(ev) } else if ev.Name == UserEventSyncMessage { logger.Debug("接收到SyncMessageForProc") HandleUserEventSyncMessage(ev) } case *serf.Query: if ev.Name == QueryEventUpdateDBData { HandleQueryEventUpdateDBData(ev) } else if ev.Name == QueryNodesByTopic { HandleOtherQuery(ev) } else if ev.Name == QueryRpc { HandleQueryRpc(ev) } case serf.MemberEvent: if event.EventType() == serf.EventMemberLeave { HandleEventMemberLeave(ev) } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate { HandleEventMemberJoin(ev) } else if event.EventType() == serf.EventMemberFailed { HandleEventMemberFail(ev) } logger.Error("serf MemberEvent ", event.EventType()) default: logger.Warn("Unknown event type: %s\n", ev.EventType().String()) } } func executeSqlByGorm(sqls []string) (bool, error) { if len(sqls) > 0 { db := models.GetDB() if db != nil { db.LogMode(false) defer db.LogMode(true) var err error tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() for _, sql := range sqls { result := tx.Exec(sql) err = result.Error if err != nil { logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) return false, err } //if result.RowsAffected == 0 { // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) // err = errors.New("ExecuteSqlByGorm RowsAffected == 0") // return false, err //} } tx.Commit() return true, nil } return false, errors.New("db handle is nil") } return true, nil } type SqlUserEvent struct { Owner string `json:"owner"` Sql []string `json:"sql"` } type ProcMessageEvent struct { Owner string `json:"owner"` // 发送者 Target string `json:"target"` // 指定接收者 Proc string `json:"procName"` // 进程名 Topic string `json:"topic"` // 主题 Payload []byte `json:"payload"` // 消息体,自行解析 } type TableDesc struct { Cid int `json:"cid"` Name string `json:"name"` Type string `json:"type"` Notnull bool `json:"notnull"` DFltValue interface{} `json:"dflt_value"` Pk int `json:"pk"` } type DumpSql struct { Sql string `json:"sql"` } const ( DbT_TableName = "dbtables" DBP_TableName = "dbtablepersons" ) func DumpTables(tableNames []string) ([]string, error) { db := models.GetDB() db.LogMode(false) defer db.LogMode(true) if tableNames != nil { var arr []string var dumpSql []DumpSql for _, table := range tableNames { logger.Info("dump current tableName:", table) dumpSql = make([]DumpSql, 0) var tDescArr []TableDesc tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table) err := db.Raw(tSql).Scan(&tDescArr).Error logger.Debug("tDescArr err:", err, "len(tDescArr)=", len(tDescArr)) if err != nil { return nil, errors.New("tableDesc err") } logger.Info(table, "'Columns is:", tDescArr) if tDescArr == nil || len(tDescArr) == 0 { return nil, errors.New(table + " has no column") } var columnNames []string for _, col := range tDescArr { columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name)) } if table == DbT_TableName { tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`, table, strings.Join(columnNames, ","), table) } else if table == DBP_TableName { tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`, table, strings.Join(columnNames, ","), table) } else { tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`, table, strings.Join(columnNames, ","), table) } logger.Info("tSql:", tSql) err = db.Raw(tSql).Scan(&dumpSql).Error logger.Debug("dump err:", err) if err != nil { return nil, errors.New("dump err") } if len(dumpSql) > 0 { for _, d := range dumpSql { arr = append(arr, d.Sql) } } } return arr, nil } return nil, errors.New("tableNames is nil") } type QueryTableDataParam struct { Tables []string `json:"tables"` From string `json:"from"` } var QueryTcpResponseChan = make(chan []byte) func GetTableDataFromCluster(a *syncdb.Agent, clusterId string, tableNames []string, timeout time.Duration) (*[]string, error) { //members: get name of first member mbs := a.GroupMembers(clusterId) var specmembername string for _, m := range mbs { logger.Info("member", m) if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { if strings.HasPrefix(m.Name, "DSVAD") { specmembername = m.Name break } } else { specmembername = m.Name break } } } logger.Info("members:", mbs, "specmembername:", specmembername) if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点 return nil, errors.New("specmembername not found") } //query: get db file. params := serf.QueryParam{ FilterNodes: strings.Fields(specmembername), } //get db tables var fromP = QueryTableDataParam{ Tables: tableNames, From: config.Server.AnalyServerId, } tBytes, _ := json.Marshal(fromP) resp, err := a.Query(QueryEventUpdateDBData, tBytes, ¶ms) if err == nil || !strings.Contains(err.Error(), "cannot contain") { logger.Error("err: ", err) } logger.Info("Query.resp.err:", err, "resp:", resp) var dumpSqls []string var wg sync.WaitGroup wg.Add(1) ticker := time.NewTicker(timeout) go func(tk *time.Ticker) { defer tk.Stop() defer wg.Done() for { select { case <-tk.C: return case msg := <-QueryTcpResponseChan: logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err == nil { //logger.Error("dumpSql:", dumpSqls) logger.Debug("data dump success") } return } } }(ticker) wg.Wait() return &dumpSqls, nil } func SyncSql(sqlOp []string) { var sqlUe = SqlUserEvent{ Owner: config.Server.AnalyServerId, Sql: sqlOp, } ueB, err := json.Marshal(sqlUe) if err != nil { logger.Error("sqlUE marshal err:", err) return } err = Agent.UserEvent(UserEventSyncSql, ueB, false) if err != nil { logger.Error("sending sync sql event err: ", err) } }