From 542b2d8daae637a500ccc143d8e1449926703b9b Mon Sep 17 00:00:00 2001
From: panlei <2799247126@qq.com>
Date: 星期二, 10 九月 2019 18:02:41 +0800
Subject: [PATCH] update call compare
---
main.go | 2
ruleserver/readyDataForRule.go | 70 ++++++++++++++++++++++++++++++----
2 files changed, 62 insertions(+), 10 deletions(-)
diff --git a/main.go b/main.go
index 8212b3d..34393ee 100644
--- a/main.go
+++ b/main.go
@@ -49,7 +49,7 @@
dbapi.Init(*dbIp, *dbPort)
go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
logger.Info("cache init completed!!!", <-initchan) //dbserver鍒濆鍖栧畬姣�
- go ruleserver.Init()
+ ruleserver.Init()
go ruleserver.TimeTicker()
go ruleserver.StartServer()
nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
diff --git a/ruleserver/readyDataForRule.go b/ruleserver/readyDataForRule.go
index cbf9a5a..a641688 100644
--- a/ruleserver/readyDataForRule.go
+++ b/ruleserver/readyDataForRule.go
@@ -51,16 +51,11 @@
}
logger.Debug("鏈満淇℃伅鍜宻erver淇℃伅锛�", localConfig, serverIp, serverPort)
bigCache.Init(dbTablePersons, serverIp, serverPort, localConfig.ServerId)
- sock, err = req.NewSocket();
- if err != nil {
- logger.Error("鍒涘缓璇锋眰socket澶辫触: %s", err.Error())
- }
- serverIP, _ := GetLocalIP()
- if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
- logger.Error("璇锋眰socket鎷ㄥ彿澶辫触: %s", err.Error())
- }
+ Push1()
}
+var sender chan *protomsg.CompareArgs = make(chan *protomsg.CompareArgs)
+var receiver chan []byte = make(chan []byte)
type BaseInfo struct {
TableId string `json:"tableId"`
TableName string `json:"tableName"`
@@ -180,7 +175,7 @@
Source:false,
}
- bytes := Push(comArg,sock)
+ bytes := getCompareMsg(comArg)
//bytes := []byte{}
var scResult protomsg.SdkCompareResult
err1 := proto.Unmarshal(bytes, &scResult)
@@ -351,6 +346,10 @@
logger.Error("Failed set MaxRecvSize: %v", err)
return nil
}
+ serverIP, _ := GetLocalIP()
+ if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
+ logger.Error("璇锋眰socket鎷ㄥ彿澶辫触: %s", err.Error())
+ }
//sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
@@ -374,6 +373,59 @@
return msg
}
+
+func Push1(){
+ //var sock mangos.Socket
+ var err error
+ var msg []byte
+
+ if sock, err = req.NewSocket(); err != nil {
+ logger.Error("鍒涘缓璇锋眰socket澶辫触: %s", err.Error())
+ }
+ errSize := sock.SetOption(mangos.OptionMaxRecvSize,5*1024*1024)
+ if errSize != nil {
+ logger.Error("Failed set MaxRecvSize: %v", err)
+ }
+ serverIP, _ := GetLocalIP()
+ if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
+ logger.Error("璇锋眰socket鎷ㄥ彿澶辫触: %s", err.Error())
+ }
+ //sock.AddTransport(ipc.NewTransport())
+ sock.AddTransport(tcp.NewTransport())
+
+ logger.Info("搴忓垪鍖栨暟鎹�")
+
+ for {
+ select {
+ // case <-ctx.Done():
+ // return
+ case data := <- sender:
+ bytes,err1 := proto.Marshal(data)
+ logger.Info("鏁版嵁闀垮害涓猴細",len(bytes))
+ if err1 != nil {
+ logger.Info("搴忓垪鍖栧け璐ワ細",err1)
+ }
+ logger.Debug("鎺ㄩ�佹暟鎹�")
+ //bytes := []byte("ndfasojdfaidsos")
+ if err = sock.Send(bytes); err != nil {
+ logger.Error("鎺ㄩ�乻ocket鍙戦�佹暟鎹け璐�: %s", err.Error())
+ //os.Exit(1)
+ }
+ if msg, err = sock.Recv(); err != nil {
+ logger.Error("鎺ユ敹鍝嶅簲澶辫触: %s", err.Error())
+ //os.Exit(1)
+ }
+ logger.Debug("鏁版嵁鎺ㄩ�佹垚鍔燂紒鏀跺埌鍝嶅簲,鏁版嵁闀垮害涓猴細",len(msg))
+ receiver <- msg
+ default:
+
+ }
+ }
+}
+func getCompareMsg(data *protomsg.CompareArgs) []byte{
+ sender <- data
+ return <-receiver
+}
// 鑾峰彇鏈満ip
func GetLocalIP() (ipv4 string, err error) {
var (
--
Gitblit v1.8.0