panlei
2019-09-10 542b2d8daae637a500ccc143d8e1449926703b9b
update call compare
2个文件已修改
72 ■■■■ 已修改文件
main.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruleserver/readyDataForRule.go 70 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go
@@ -49,7 +49,7 @@
    dbapi.Init(*dbIp, *dbPort)
    go cache.Init(initchan, *dbIp, *surveyPort, *pubPort)
    logger.Info("cache init completed!!!", <-initchan) //dbserver初始化完毕
    go ruleserver.Init()
    ruleserver.Init()
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
ruleserver/readyDataForRule.go
@@ -51,16 +51,11 @@
    }
    logger.Debug("本机信息和server信息:", localConfig, serverIp, serverPort)
    bigCache.Init(dbTablePersons, serverIp, serverPort, localConfig.ServerId)
    sock, err = req.NewSocket();
    if err != nil {
        logger.Error("创建请求socket失败: %s", err.Error())
    }
    serverIP, _ := GetLocalIP()
    if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
        logger.Error("请求socket拨号失败: %s", err.Error())
    }
    Push1()
}
var sender chan *protomsg.CompareArgs = make(chan *protomsg.CompareArgs)
var receiver chan []byte = make(chan []byte)
type BaseInfo struct {
    TableId      string  `json:"tableId"`
    TableName    string  `json:"tableName"`
@@ -180,7 +175,7 @@
        Source:false,
    }
    bytes := Push(comArg,sock)
    bytes := getCompareMsg(comArg)
    //bytes := []byte{}
    var scResult protomsg.SdkCompareResult
    err1 := proto.Unmarshal(bytes, &scResult)
@@ -351,6 +346,10 @@
        logger.Error("Failed set MaxRecvSize: %v", err)
        return nil
    }
    serverIP, _ := GetLocalIP()
    if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
        logger.Error("请求socket拨号失败: %s", err.Error())
    }
    //sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
@@ -374,6 +373,59 @@
    return msg
}
func Push1(){
    //var sock mangos.Socket
    var err error
    var msg []byte
    if sock, err = req.NewSocket(); err != nil {
        logger.Error("创建请求socket失败: %s", err.Error())
    }
    errSize := sock.SetOption(mangos.OptionMaxRecvSize,5*1024*1024)
    if errSize != nil {
        logger.Error("Failed set MaxRecvSize: %v", err)
    }
    serverIP, _ := GetLocalIP()
    if err = sock.Dial("tcp://"+serverIP+":4010"); err != nil {
        logger.Error("请求socket拨号失败: %s", err.Error())
    }
    //sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    logger.Info("序列化数据")
    for {
        select {
        // case <-ctx.Done():
        //     return
        case data := <- sender:
            bytes,err1 := proto.Marshal(data)
            logger.Info("数据长度为:",len(bytes))
            if err1 != nil {
                logger.Info("序列化失败:",err1)
            }
            logger.Debug("推送数据")
            //bytes := []byte("ndfasojdfaidsos")
            if err = sock.Send(bytes); err != nil {
                logger.Error("推送socket发送数据失败: %s", err.Error())
                //os.Exit(1)
            }
            if msg, err = sock.Recv(); err != nil {
                logger.Error("接收响应失败: %s", err.Error())
                //os.Exit(1)
            }
            logger.Debug("数据推送成功!收到响应,数据长度为:",len(msg))
            receiver <- msg
        default:
        }
    }
}
func getCompareMsg(data *protomsg.CompareArgs) []byte{
    sender <- data
    return <-receiver
}
// 获取本机ip
func GetLocalIP() (ipv4 string, err error) {
    var (