| | |
| | | } |
| | | serfConf.MemberlistConfig.Keyring = keyring |
| | | |
| | | serfConf.MemberlistConfig.Delegate = &UserDelegate{} |
| | | |
| | | logger.Info("[INFO] agent: Restored keyring with %d keys from %s", |
| | | len(conf.EncryptKey), conf.EncryptKey) |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | logger.Debug("targetNode:",targetNode.Name) |
| | | if targetNode !=nil { |
| | | sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn) |
| | | addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) |
| | | sendErr := rawSendTcpMsg(addr, bytesReturn) |
| | | |
| | | if sendErr != nil { |
| | | logger.Debug("sendToTcp err:",sendErr) |
| | | } else { |
| | | logger.Debug("sendToTcp success") |
| | | } |
| | | |
| | | } else { |
| | | logger.Debug("targetNode is nil") |
| | | } |
| | |
| | | //a.DeregisterEventHandler(a) |
| | | //close(a.readyCh) |
| | | } |
| | | |
| | | |
| | | func (a *Agent) BroadcastMemberlist(delay time.Duration) { |
| | | //serf := a.serf |
| | |
| | | wg.Add(1) |
| | | go func() { |
| | | defer wg.Done() |
| | | //respCh := resp.ResponseCh() |
| | | for { |
| | | select { |
| | | //case r := <-respCh: |
| | | // logger.Info("Query response's len:", len(r.Payload)) |
| | | // err := json.Unmarshal(r.Payload, &dumpSqls) |
| | | // if err ==nil { |
| | | // logger.Error("dumpSql:",dumpSqls) |
| | | // logger.Error("data dump success") |
| | | // } |
| | | // return |
| | | case msg := <-QueryTcpResponseChan: |
| | | logger.Debug("QueryTcpResponseChan receive msg len:",len(msg)) |
| | | logger.Info("Query response's len:", len(msg)) |
| | | err := json.Unmarshal(msg, &dumpSqls) |
| | | if err ==nil { |
| | | logger.Error("dumpSql success:",dumpSqls) |
| | | } else { |
| | | logger.Error("data dump err:",err) |
| | | logger.Error("dumpSql:", dumpSqls) |
| | | logger.Error("data dump success") |
| | | } |
| | | return |
| | | } |
| | | |
| | | } |
| | | }() |
| | | wg.Wait() |
| | |
| | | MaxUserEventSize = 5 * 1024 |
| | | ReplayOnJoinDefault = false |
| | | SnapshotPathDefault = "/opt/vasystem/serfSnapShot" |
| | | |
| | | TcpTransportPort = 30194 //tcp传输大数据量接口 |
| | | ) |
| | | |
| | | // DefaultConfig default config |
| New file |
| | |
| | | package syncdb |
| | | |
| | | import ( |
| | | "basic.com/valib/logger.git" |
| | | "net" |
| | | "strconv" |
| | | ) |
| | | |
| | | func rawSendTcpMsg(addr string, sendBuf []byte) error { |
| | | conn, err := net.Dial("tcp", addr) |
| | | if err != nil { |
| | | logger.Debug("net.Dialt err", err) |
| | | return err |
| | | } |
| | | |
| | | defer conn.Close() |
| | | |
| | | //发送 |
| | | _, err = conn.Write(sendBuf) |
| | | if err != nil { |
| | | logger.Debug("conn.Write err", err) |
| | | return err |
| | | } else { |
| | | logger.Debug("raw send success") |
| | | return nil |
| | | } |
| | | } |
| | | |
| | | func RawReceiveTcpMsg() { |
| | | var tcpAddr *net.TCPAddr |
| | | tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:"+strconv.Itoa(TcpTransportPort)) |
| | | |
| | | listener,_ := net.ListenTCP("tcp",tcpAddr) |
| | | defer listener.Close() |
| | | |
| | | for{ |
| | | conn,err := listener.AcceptTCP() |
| | | if err!=nil { |
| | | logger.Debug("listener.Accept err:", err) |
| | | continue |
| | | } |
| | | logger.Debug("A transport client connected :" +conn.RemoteAddr().String()) |
| | | go readStream(conn) |
| | | } |
| | | } |
| | | |
| | | func readStream(conn *net.TCPConn) { |
| | | data := make([]byte,0) |
| | | buf := make([]byte, 4096) |
| | | for { |
| | | n,err :=conn.Read(buf) |
| | | if n == 0{ |
| | | break |
| | | } |
| | | if err !=nil { |
| | | return |
| | | } |
| | | data = append(data,buf...) |
| | | } |
| | | QueryTcpResponseChan <- data |
| | | } |