| agent.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| config.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| transport.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| userDelegate.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
agent.go
@@ -82,8 +82,6 @@ } serfConf.MemberlistConfig.Keyring = keyring serfConf.MemberlistConfig.Delegate = &UserDelegate{} logger.Info("[INFO] agent: Restored keyring with %d keys from %s", len(conf.EncryptKey), conf.EncryptKey) @@ -216,11 +214,17 @@ } } } logger.Debug("targetNode:",targetNode.Name) if targetNode !=nil { sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn) addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort) sendErr := rawSendTcpMsg(addr, bytesReturn) if sendErr != nil { logger.Debug("sendToTcp err:",sendErr) } else { logger.Debug("sendToTcp success") } } else { logger.Debug("targetNode is nil") } @@ -263,6 +267,7 @@ //a.DeregisterEventHandler(a) //close(a.readyCh) } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf @@ -516,28 +521,17 @@ wg.Add(1) go func() { defer wg.Done() //respCh := resp.ResponseCh() for { select { //case r := <-respCh: // logger.Info("Query response's len:", len(r.Payload)) // err := json.Unmarshal(r.Payload, &dumpSqls) // if err ==nil { // logger.Error("dumpSql:",dumpSqls) // logger.Error("data dump success") // } // return case msg := <-QueryTcpResponseChan: logger.Debug("QueryTcpResponseChan receive msg len:",len(msg)) case msg := <- QueryTcpResponseChan: logger.Info("Query response's len:", len(msg)) err := json.Unmarshal(msg, &dumpSqls) if err ==nil { logger.Error("dumpSql success:",dumpSqls) } else { logger.Error("data dump err:",err) if err == nil { logger.Error("dumpSql:", dumpSqls) logger.Error("data dump success") } return } } }() wg.Wait() config.go
@@ -47,6 +47,8 @@ MaxUserEventSize = 5 * 1024 ReplayOnJoinDefault = false SnapshotPathDefault = "/opt/vasystem/serfSnapShot" TcpTransportPort = 30194 //tcp传输大数据量接口 ) // DefaultConfig default config transport.go
New file @@ -0,0 +1,61 @@ package syncdb import ( "basic.com/valib/logger.git" "net" "strconv" ) func rawSendTcpMsg(addr string, sendBuf []byte) error { conn, err := net.Dial("tcp", addr) if err != nil { logger.Debug("net.Dialt err", err) return err } defer conn.Close() //发送 _, err = conn.Write(sendBuf) if err != nil { logger.Debug("conn.Write err", err) return err } else { logger.Debug("raw send success") return nil } } func RawReceiveTcpMsg() { var tcpAddr *net.TCPAddr tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:"+strconv.Itoa(TcpTransportPort)) listener,_ := net.ListenTCP("tcp",tcpAddr) defer listener.Close() for{ conn,err := listener.AcceptTCP() if err!=nil { logger.Debug("listener.Accept err:", err) continue } logger.Debug("A transport client connected :" +conn.RemoteAddr().String()) go readStream(conn) } } func readStream(conn *net.TCPConn) { data := make([]byte,0) buf := make([]byte, 4096) for { n,err :=conn.Read(buf) if n == 0{ break } if err !=nil { return } data = append(data,buf...) } QueryTcpResponseChan <- data } userDelegate.go
File was deleted