554325746@qq.com
2019-05-28 70df7b912014201e271a2966599b84d77addd0f9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package camera
 
import (
        "errors"
 
        "basic.com/dbapi.git"
        "basic.com/valib/deliver.git"
        "basic.com/pubsub/protomsg.git"
 
        "github.com/long/test/sdk"
 
        "context"
        "fmt"
        "sync"
        "time"
       )
 
//var SocketManage = make(map[string]SocketContext)
var SocketManage sync.Map
 
var Initchannel = make(chan string)
 
    type SocketContext struct {
        Sock    deliver.Deliver
            Context context.Context
            Cancel  context.CancelFunc
    }
 
var camval dbapi.CameraApi
 
//get camera with task
var cameraTasks []protomsg.CameraAndTaskInfo
 
func Init() {
 
    fmt.Println("============ camera info ====================")
    ctsid := camval.FindAll()
 
    fmt.Println("==============camera camera with task ================")
    cameraTasks=camval.FindAllCameraAndTask()
 
    go CreateCamera(Initchannel)
 
    for _, cam := range ctsid {
        Initchannel <- cam.Id
    }
}
 
func CreateCamera(camera chan string) {
    for camid := range camera {
        if _, ok := SocketManage.Load(camid); !ok {
            url := fmt.Sprintf("ipc:///tmp/%s.ipc", camid)
 
            id, socketlisten, err := NewCamerSocketListen(deliver.PushPull, camid, url)
            if err != nil {
              fmt.Println("create socket error")
              continue 
            }
 
             go func(cid string, sock SocketContext ){
                  Recv(cid,sock)
             }(id,socketlisten)
        }
    }
}
 
// create server
func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) {
        ctx, cancel := context.WithCancel(context.Background())
 
        socket.Context = ctx
        socket.Cancel = cancel
 
        socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
        fmt.Println("new socket.Sock: ", socket.Sock)
 
        if socket.Sock == nil {
            return cameraid, socket, errors.New("create listen error")
        }
        SocketManage.Store(cameraid, socket)
        return cameraid, socket, nil
}
 
func Recv(cameraid string, socket SocketContext) {
        var msg []byte
        var err error
        for {
            select {
                case <-socket.Context.Done():
                    fmt.Println("listen recv quit")
                        return
                default:
                        if msg, err = socket.Sock.Recv(); err != nil {
                            fmt.Println("err is: ", cameraid, err)
                                continue
                        } else {
                            fmt.Println()
                            fmt.Println("============== one msg input ==========")
                            fmt.Println("cameraid: ",cameraid,  len(msg))
                            for _, taskid := range GetAlltask(cameraid) {
                                time.Sleep(5* time.Second)
                                fmt.Println("cameraid: ",cameraid," taskid: ", taskid)
                                Taskdolist(cameraid, taskid, msg)
                             }
                        }
            }
        }
}
 
//   根据cid 获取 所有的任务
func GetAlltask(cid string) (tasks []string) {
    for _, camsingle := range cameraTasks {
        if cid == camsingle.Camera.Id {
            for _, tasksingle := range camsingle.Tasks {
                tasks = append(tasks, tasksingle.Taskid)
            }
                return
        }
    }
    return
}
 
func Taskdolist(cid string, taskid string, data []byte) {
 
        //  数据加工(打标签)
        sdkmsg := sdk.SdkData(cid, taskid, data)
        if sdkmsg.Tasklab == nil {
            fmt.Println("cid:%s 没有任务%s", cid, taskid)
                return
        }
 
    //  计算分发的主题
            SendTopic := sdk.SdkSendTopic(sdkmsg)
               if _, ok := sdk.SdkMap[SendTopic]; ok {
                   fmt.Println("分发的主题存在")
                       sdk.SdkMap[SendTopic] <- sdkmsg
                       //fmt.Println("重新开始循环: ", sdk.SdkMap)
               } else {
                   fmt.Println("分发的主题不存在")
               }
}