package sdk
|
|
import (
|
"taskpubsub/tasktag"
|
"taskpubsub/util"
|
|
"github.com/gogo/protobuf/proto"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/deliver.git"
|
"basic.com/valib/logger.git"
|
)
|
|
const (
|
postPush = "_1"
|
postPull = "_2"
|
)
|
|
var SocketManage = make(map[string]util.SocketContext)
|
var SdkMap = make(map[string]chan protomsg.SdkMessage)
|
|
var shm bool = false
|
|
var innerRecvTopic = []string{
|
"facedetect-sdk-no-track", //to sdk-no-track 以图搜图
|
}
|
|
var innerSendTopic = []string{
|
"facedetect-sdk-no-track", //to sdk-no-track 以图搜图
|
"virtual-faceextract-sdk-pull", //to web 以图搜图
|
}
|
|
func initInnerTopic() {
|
// for _, sendTopic := range innerSendTopic {
|
// createSdkSendServerAndListen(sendTopic)
|
// }
|
|
// for _, recvTopic := range innerRecvTopic {
|
// createSdkRecvServerAndListen(recvTopic)
|
// }
|
}
|
|
func Init(useShm bool) {
|
|
shm = useShm
|
|
logger.Info("============= init sdk info =====================")
|
for _, sdkid := range util.Sdklist { // 创建sdk server
|
createSdkTopicAndServer(sdkid)
|
logger.Info()
|
}
|
|
// 手动输入的主题
|
initInnerTopic()
|
|
// es
|
SdkMap["es"] = make(chan protomsg.SdkMessage)
|
logger.Info("create es channel: ")
|
go DealEsTopic()
|
|
go autoUpdateSdk(util.Sdkflag)
|
}
|
|
func createSdkTopicAndServer(sdkid string) {
|
createSdkSendServerAndListen(sdkid)
|
createSdkRecvServerAndListen(sdkid)
|
}
|
|
func deleteSdkTopicAndServer(id string) {
|
if _, isExist := SdkMap[id]; isExist { //存在
|
close(SdkMap[id])
|
delete(SdkMap, id)
|
logger.Info("删除主题 sdk: ", id)
|
}
|
|
if _, isExist := SocketManage[id]; isExist { //存在
|
SocketManage[id].Cancel()
|
delete(SocketManage, id)
|
logger.Info("删除server sdk: ", id)
|
}
|
}
|
|
func createSdkSendServerAndListen(id string) {
|
if _, isExist := SdkMap[id]; !isExist { //不存在
|
SdkMap[id] = make(chan protomsg.SdkMessage)
|
logger.Info("create", id)
|
}
|
|
url := "ipc:///tmp/" + id + postPush + ".ipc"
|
m := deliver.PushPull
|
if shm {
|
m = deliver.Shm
|
url = id + postPush
|
}
|
|
logger.Info("SDK URL: ", url)
|
|
socket, err := util.NewSocketListen(int(m), url, shm)
|
if err != nil {
|
delete(SdkMap, id)
|
logger.Error(id, "create socket error!")
|
return
|
}
|
SocketManage[id] = socket
|
|
go Send(id, socket, SdkMap[id])
|
}
|
|
func createSdkRecvServerAndListen(id string) {
|
if _, isExist := SdkMap[id]; !isExist { //不存在
|
SdkMap[id] = make(chan protomsg.SdkMessage)
|
logger.Info("create", id)
|
}
|
|
url := "ipc:///tmp/" + id + postPull + ".ipc"
|
m := deliver.PushPull
|
if shm {
|
m = deliver.Shm
|
url = id + postPull
|
|
}
|
|
socket, err := util.NewSocketListen(int(m), url, shm)
|
|
if err != nil {
|
delete(SdkMap, id)
|
logger.Error(id, "create socket error!")
|
return
|
}
|
SocketManage[id] = socket
|
|
go Recv(socket)
|
}
|
|
//单独处理 es 主题的情况
|
func DealEsTopic() {
|
for {
|
select {
|
case <-SdkMap["es"]:
|
//logger.Info("es finanl sdk!")
|
}
|
}
|
}
|
|
//动态处理
|
func autoUpdateSdk(sdkflag chan bool) {
|
|
for _ = range sdkflag {
|
logger.Info("test autodelsdk")
|
var oldSdkList []string
|
for key := range SdkMap {
|
oldSdkList = append(oldSdkList, key)
|
}
|
|
newSdkList := util.Sdklist
|
|
// 手动添加的全部加上
|
// for _, sendTopic := range innerSendTopic {
|
// newSdkList = append(newSdkList, sendTopic)
|
// }
|
// for _, recvTopic := range innerRecvTopic {
|
// newSdkList = append(newSdkList, recvTopic)
|
// }
|
|
sdkListUpdate := util.Difference(oldSdkList, newSdkList)
|
logger.Info(sdkListUpdate)
|
|
for key, op := range sdkListUpdate {
|
if op == "add" {
|
createSdkTopicAndServer(key)
|
} else {
|
deleteSdkTopicAndServer(key)
|
}
|
}
|
}
|
}
|
|
//sdk数据 加工器
|
func ToSdkMsg(cid string, caddr string, taskid string, data []byte) protomsg.SdkMessage {
|
var sdkmsg = protomsg.SdkMessage{}
|
sdkmsg.Cid = cid
|
sdkmsg.Caddr = caddr
|
if val, ok := tasktag.TaskLabelMap.Load(taskid); !ok {
|
sdkmsg.Tasklab = nil
|
return sdkmsg
|
} else {
|
sdkmsg.Tasklab = val.(*protomsg.TaskLabel)
|
sdkmsg.Data = data
|
}
|
return sdkmsg
|
}
|
|
var logShouldUntil = 0
|
|
//sdk数据分发器
|
func GetSdkSendTopic(sdkmsg protomsg.SdkMessage) (sendTopic string) {
|
if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) {
|
sendTopic = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Ipcid
|
|
logShouldUntil++
|
if logShouldUntil > 68 {
|
logShouldUntil = 0
|
logger.Info("=========分发的主题=========")
|
logger.Info("分发的主题:", sendTopic, "!Sdktype:", sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdktype)
|
}
|
} else {
|
sendTopic = "es"
|
}
|
|
return sendTopic
|
}
|
|
func Recv(socket util.SocketContext) {
|
|
tryCount := 0
|
|
var repsdkmsg = protomsg.SdkMessage{}
|
for {
|
select {
|
case <-socket.Context.Done():
|
socket.Sock.Close()
|
|
logger.Info("socket close")
|
return
|
default:
|
if msg, err := socket.Sock.Recv(); err != nil {
|
|
// tryCount++
|
// socket = util.MaybeRestartSocket(socket, &tryCount)
|
|
if socket.UseSHM {
|
if tryCount > util.ShmMaxTryCount {
|
socket.Sock.Close()
|
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
|
logger.Info("=========SDK RECV SHM ERROR=========")
|
logger.Info("SDK RECV FROM:", socket.URL, " ERROR ", util.ShmMaxTryCount, " TIMES SO RESTART IT")
|
tryCount = 0
|
continue
|
}
|
tryCount++
|
}
|
continue
|
} else {
|
err = proto.Unmarshal(msg, &repsdkmsg)
|
if err != nil {
|
logger.Error("unmarshal error: ", err)
|
continue
|
}
|
repsdkmsg.Tasklab.Index++
|
//调用计算函数, 分发给下一个主题
|
nexttopic := GetSdkSendTopic(repsdkmsg)
|
if "facedetect-sdk-no-track" == nexttopic || "virtual-faceextract-sdk-pull" == nexttopic {
|
logger.Info("nexttopic:", nexttopic)
|
}
|
SdkMap[nexttopic] <- repsdkmsg
|
}
|
}
|
}
|
}
|
|
func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) {
|
|
tryCount := 0
|
|
for {
|
select {
|
case <-socket.Context.Done():
|
socket.Sock.Close()
|
|
logger.Info("socket is close")
|
return
|
case v, ok := <-in:
|
if ok {
|
data, err := v.Marshal()
|
if err != nil {
|
logger.Error("proto marshal error ", err)
|
continue
|
}
|
|
if err := socket.Sock.Send(data); err != nil {
|
if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
|
logger.Error("failed send:sdkid=", sdkid)
|
}
|
|
// tryCount++
|
// socket = util.MaybeRestartSocket(socket, &tryCount)
|
|
if socket.UseSHM {
|
if tryCount > util.ShmMaxTryCount {
|
socket.Sock.Close()
|
socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
|
logger.Info("=========SDK SEND SHM ERROR=========")
|
logger.Info("SDK SEND TO: ", socket.URL, " ERROR ", util.ShmMaxTryCount, " TIMES SO RESTART IT")
|
tryCount = 0
|
continue
|
}
|
tryCount++
|
}
|
|
continue
|
}
|
if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
|
logger.Info(sdkid, " send success: ", len(data))
|
}
|
} else {
|
logger.Debug(sdkid, " 主题关闭, 关闭send()")
|
return
|
}
|
}
|
}
|
}
|