a
554325746@qq.com
2019-10-24 c61e776980f038bb0e195f7753a3d7e127d6028f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package models
 
 
import (
    sdb "basic.com/syncdb.git"
    "dbserver/extend/config"
    "dbserver/extend/logger"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
 
var Agent *sdb.Agent
var SyncTables = []string{ "area", "camera_area", "cameras", "cluster", "cluster_node", "dictionary", "gb28181_config" }
 
var syncSdkCompareCacheChan = make(chan []byte,0)
 
func PublishSdkCompareCacheChange(b []byte) {
    syncSdkCompareCacheChan <- b
}
func InitAgent() {
 
    sdb.InitLocalDb(GetDB())
 
    var lc LocalConfig
    if lc.Select() != nil || lc.ServerId == "" {
        return
    }
    var clusterE Cluster
    clusters, err := clusterE.FindAll()
    if err ==nil && clusters !=nil && len(clusters) >0 {
        c := clusters[0]
        var nodeE Node
        nodes, e := nodeE.FindNodesByClusterId(c.ClusterId)
        if e == nil && nodes !=nil && len(nodes) >0 {
            var nodeIps []string
            for _,n :=range nodes {
                if n.NodeId != lc.ServerId {
                    nodeIps = append(nodeIps, n.NodeIp)
                }
            }
            Agent,err = sdb.Init(c.ClusterId, c.Password, lc.ServerId, nodeIps)
            if Agent != nil {
                logger.Debug("sync.Agent init success!")
            } else {
                logger.Debug("sync.Agent init fail!")
            }
 
        }
    }
    go func() {
        for {
            select {
            case scMsg := <-sdb.SyncDbTablePersonCacheChan:{
                PublishSdkCompareCacheChange(scMsg)
            }
            default:
 
            }
        }
    }()
    go func() {
        for {
            select {
            case b := <-syncSdkCompareCacheChan:{
                logger.Debug("SyncSdkCompareCache in,len(b):",len(b))
                doUpdateCompareCacheRequest(b)
            }
            default:
 
            }
        }
    }()
}
 
func doUpdateCompareCacheRequest(b []byte) {
    logger.Debug("syncDbTablePerson:recv doUpdateCompareCacheRequest msg")
    var sock mangos.Socket
    var err error
    if sock,err = req.NewSocket();err !=nil {
        logger.Debug("comp cache can't new req socket:%s",err.Error())
        return
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    var url = "tcp://"+config.DbPersonCompare.Url
    if err = sock.Dial(url);err !=nil {
        logger.Debug("comp cache can't dial on req socket:%s",err.Error())
        return
    }
    sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
    //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
    if err = sock.Send(b);err !=nil {
        logger.Debug("comp cache can't send message on push socket:%s",err.Error())
        return
    }
    if msg,err := sock.Recv();err !=nil {
        logger.Debug("comp cache sock.Recv receive err:%s",err.Error())
        return
    } else {
        logger.Debug("comp cache sock.Recv msg:",string(msg))
    }
    sock.Close()
}