| | |
| | | UserEventSyncSql = "SyncSql" |
| | | UserEventSyncDbTablePersonCache = "SyncCache" |
| | | UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改 |
| | | UserEventChangeMaster = "ChangeMaster " //修改节点状态 |
| | | UserEventSyncRegisterInfo = "SyncRegisterInfo" //同步注册信息 |
| | | DataSystemSerfSubscribe = "data-system-serf-subscribe" //各app从serf订阅消息 |
| | | UserEventSyncMessage = "SyncMessageForProc" // 为其他进程同步消息 |
| | | TcpTransportPort = 30194 //tcp传输大数据量接口 |
| | | |
| | | SUserEventSyncMessage |
| | | ) |
| | | |
| | | var SyncDbTablePersonCacheChan = make(chan []byte, 512) |
| | | var SyncVirtualIpChan = make(chan []byte, 512) |
| | | var SyncProcMessageChan = make(chan []byte, 512) |
| | | |
| | | func HandleSerfEvent(event serf.Event) { |
| | | switch ev := event.(type) { |
| | |
| | | HandleUserEventSyncDbTablePersonCache(ev) |
| | | } else if ev.Name == UserEventSyncVirtualIp { |
| | | HandleUserEventSyncVirtualIp(ev) |
| | | } else if ev.Name == UserEventChangeMaster { |
| | | HandleUserEventChangeMaster(ev) |
| | | } else if ev.Name == UserEventSyncRegisterInfo { |
| | | HandleSyncRegisterInfo(ev) |
| | | } else if ev.Name == DataSystemSerfSubscribe { |
| | | HandleDataSystemSerfSub(ev) |
| | | } else if ev.Name == UserEventSyncMessage { |
| | | logger.Debug("接收到SyncMessageForProc") |
| | | HandleUserEventSyncMessage(ev) |
| | | } |
| | | case *serf.Query: |
| | | if ev.Name == QueryEventUpdateDBData { |
| | |
| | | case serf.MemberEvent: |
| | | if event.EventType() == serf.EventMemberLeave { |
| | | HandleEventMemberLeave(ev) |
| | | } else if event.EventType() == serf.EventMemberJoin { |
| | | } else if event.EventType() == serf.EventMemberJoin || event.EventType() == serf.EventMemberUpdate { |
| | | HandleEventMemberJoin(ev) |
| | | } else if event.EventType() == serf.EventMemberFailed { |
| | | HandleEventMemberFail(ev) |
| | | } |
| | | |
| | | logger.Error("serf MemberEvent ", event.EventType()) |
| | | default: |
| | | logger.Warn("Unknown event type: %s\n", ev.EventType().String()) |
| | | } |
| | |
| | | logger.Error("ExecuteSqlByGorm err:", err, ",sql:", sql) |
| | | return false, err |
| | | } |
| | | if result.RowsAffected == 0 { |
| | | logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | err = errors.New("ExecuteSqlByGorm RowsAffected == 0") |
| | | return false, err |
| | | } |
| | | //if result.RowsAffected == 0 { |
| | | // logger.Debug("ExecuteSqlByGorm RowsAffected == 0", ",sql:", sql) |
| | | // err = errors.New("ExecuteSqlByGorm RowsAffected == 0") |
| | | // return false, err |
| | | //} |
| | | } |
| | | tx.Commit() |
| | | return true, nil |
| | |
| | | type SqlUserEvent struct { |
| | | Owner string `json:"owner"` |
| | | Sql []string `json:"sql"` |
| | | } |
| | | |
| | | type ProcMessageEvent struct { |
| | | Owner string `json:"owner"` // 发送者 |
| | | Target string `json:"target"` // 指定接收者 |
| | | Proc string `json:"procName"` // 进程名 |
| | | Topic string `json:"topic"` // 主题 |
| | | Payload []byte `json:"payload"` // 消息体,自行解析 |
| | | } |
| | | |
| | | type TableDesc struct { |
| | |
| | | mbs := a.GroupMembers(clusterId) |
| | | var specmembername string |
| | | for _, m := range mbs { |
| | | logger.Info("m", m) |
| | | logger.Info("member", m) |
| | | if m.Name != config.Server.AnalyServerId { //前缀:DSVAD:分析服务器 DSPAD:进出入pad |
| | | if strings.HasPrefix(config.Server.AnalyServerId, "DSVAD") { |
| | | if strings.HasPrefix(m.Name, "DSVAD") { |
| | |
| | | } |
| | | } |
| | | } |
| | | logger.Info("mbs:", mbs, "specmembername:", specmembername) |
| | | logger.Info("members:", mbs, "specmembername:", specmembername) |
| | | if specmembername == "" { //如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点 |
| | | return nil, errors.New("specmembername not found") |
| | | } |
| | |
| | | logger.Info("Query response's len:", len(msg)) |
| | | err := json.Unmarshal(msg, &dumpSqls) |
| | | if err == nil { |
| | | logger.Error("dumpSql:", dumpSqls) |
| | | logger.Error("data dump success") |
| | | //logger.Error("dumpSql:", dumpSqls) |
| | | logger.Debug("data dump success") |
| | | } |
| | | return |
| | | } |
| | |
| | | return |
| | | } |
| | | err = Agent.UserEvent(UserEventSyncSql, ueB, false) |
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") { |
| | | logger.Error("err: ", err) |
| | | if err != nil { |
| | | logger.Error("sending sync sql event err: ", err) |
| | | } |
| | | } |