package sdk
|
|
import (
|
"context"
|
"encoding/json"
|
"fmt"
|
"github.com/long/test/httpclient"
|
"github.com/long/test/tasktag"
|
"github.com/long/test/util"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/pair"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
var SocketManage = make(map[string]SocketContext)
|
|
type SocketContext struct {
|
Sock mangos.Socket
|
Context context.Context
|
Cancel context.CancelFunc
|
}
|
|
func Init() {
|
|
sdklist := SdkAll() //获取所有sdk
|
fmt.Println("sdk list have: ", sdklist)
|
|
SdkCreateTopic(sdklist) // 创建主题
|
|
for _, sdkid := range sdklist { // 创建sdk server
|
sdkid, socket, err := NewSdkListen(sdkid, "tcp", "192.168.1.124", 0)
|
if err != nil {
|
continue
|
}
|
// 接受管道数据 ==》 发送给 对应的进程
|
go send(sdkid, socket, SdkMap[sdkid])
|
|
//从对应进程接受数据 == 》 重新送回到管道
|
go Recv(socket)
|
|
}
|
|
go es(SdkMap["es"])
|
|
}
|
|
//单独处理 es 主题的情况
|
func es(sdkmsgchan chan SdkMessage) {
|
for data := range sdkmsgchan {
|
fmt.Println("this data is finish all sdk! ", data)
|
}
|
|
}
|
|
//动态处理
|
func AutoDelSdk(Newsdklist []string) {
|
var oldSdk []string
|
for key, _ := range SdkMap {
|
oldSdk = append(oldSdk, key)
|
}
|
sdkChanDel := util.Difference(oldSdk, Newsdklist)
|
|
for key, op := range sdkChanDel {
|
if op == "add" {
|
SdkMap[key] = make(chan SdkMessage)
|
fmt.Println("创建主题 sdk: ", key)
|
} else {
|
close(SdkMap[key])
|
delete(SdkMap, key)
|
fmt.Println("删除主题 sdk: ", key)
|
}
|
}
|
}
|
|
//主题
|
var SdkMap = make(map[string]chan SdkMessage)
|
|
// 发送给算法进程的结构
|
type SdkMessage struct {
|
Cid string
|
Tasklab tasktag.TaskLabel
|
Data []byte
|
}
|
|
//sdk数据 加工器
|
func SdkData(cid string, taskid string, data []byte) (sdkmsg SdkMessage) {
|
sdkmsg.Cid = cid
|
sdkmsg.Tasklab = tasktag.TaskMapLab[taskid]
|
sdkmsg.Data = data
|
return
|
}
|
|
//sdk数据分发器
|
func SdkSendTopic(sdkmsg SdkMessage) (sdksend string) {
|
if sdkmsg.Tasklab.Index < len(sdkmsg.Tasklab.Sdkids) {
|
sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index]
|
} else {
|
sdksend = "es"
|
}
|
fmt.Println()
|
fmt.Println("分发的主题是: ", sdksend)
|
fmt.Println()
|
return
|
}
|
|
// 调用 http 借口获取摄像机信息
|
func SdkAll() (sdklist []string) {
|
sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid")
|
return
|
}
|
|
// 创建主题
|
func SdkCreateTopic(sdklist []string) (err error) {
|
for _, sdkid := range sdklist {
|
SdkMap[sdkid] = make(chan SdkMessage)
|
fmt.Println("create sdk channel: ", sdkid)
|
}
|
|
SdkMap["es"] = make(chan SdkMessage)
|
fmt.Println("create es channel: ")
|
|
return nil
|
}
|
|
var UrlPort = 9000
|
|
// 创建 sdk server listen
|
func NewSdkListen(sdkid string, protocol string, ip string, port int) (sid string, socket SocketContext, err error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
var url string
|
|
socket.Context = ctx
|
socket.Cancel = cancel
|
|
switch protocol {
|
case "tcp":
|
if port == 0 {
|
port = UrlPort
|
UrlPort++
|
}
|
url = fmt.Sprintf("%s://%s:%d", protocol, ip, port)
|
|
case "ipc":
|
url = fmt.Sprintf("%s://%s", sdkid)
|
}
|
fmt.Printf("sdkid= %s url=%s\n", sdkid, url)
|
|
if socket.Sock, err = pair.NewSocket(); err != nil {
|
fmt.Println(sdkid, "can't get new pair socket: ", err.Error())
|
return sdkid, socket, err
|
}
|
|
socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024)
|
socket.Sock.SetOption(mangos.OptionWriteQLen, 10)
|
socket.Sock.SetOption(mangos.OptionReadQLen, 10)
|
|
socket.Sock.AddTransport(tcp.NewTransport())
|
socket.Sock.AddTransport(ipc.NewTransport())
|
|
if err = socket.Sock.Listen(url); err != nil {
|
fmt.Println("socket lisnte error ", sdkid)
|
}
|
SocketManage[sdkid] = socket
|
return sdkid, socket, err
|
}
|
|
func Recv(socket SocketContext) {
|
socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
|
for {
|
select {
|
case <-socket.Context.Done():
|
fmt.Println("socket close")
|
return
|
default:
|
if msg, err := socket.Sock.Recv(); err != nil {
|
//fmt.Printf("%s ", err)
|
continue
|
} else {
|
|
var repsdkmsg SdkMessage
|
var reps interface{}
|
|
err = json.Unmarshal(msg, &reps)
|
if err != nil {
|
continue
|
}
|
|
switch v := reps.(type) {
|
case map[string]interface{}:
|
//调用计算函数, 分发给下一个主题
|
|
json.Unmarshal(msg, &repsdkmsg)
|
nexttopic := SdkSendTopic(repsdkmsg)
|
SdkMap[nexttopic] <- repsdkmsg
|
case string:
|
fmt.Println("this string is: ", v)
|
}
|
}
|
}
|
}
|
}
|
|
func send(sdkid string, socket SocketContext, in chan SdkMessage) {
|
var v SdkMessage
|
var b []byte
|
|
for {
|
select {
|
case <-socket.Context.Done():
|
fmt.Println("socket is close")
|
case v = <-in:
|
b, _ = json.Marshal(v)
|
fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(v.Data))
|
if err := socket.Sock.Send(b); err != nil {
|
fmt.Println("failed send")
|
}
|
fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(v.Data))
|
}
|
}
|
}
|