package sdk import ( "context" "errors" "fmt" // "github.com/long/test/httpclient" "github.com/long/test/tasktag" "github.com/long/test/util" "github.com/gogo/protobuf/proto" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" ) const ( postPull = "_1.ipc" postPush = "_2.ipc" ) var SocketManage = make(map[string]SocketContext) var SdkMap = make(map[string]chan protomsg.SdkMessage) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { fmt.Println("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server CreatesdkTopicandServer(sdkid) fmt.Println() } // 手动输入的主题 SdkMap["es"] = make(chan protomsg.SdkMessage) fmt.Println("create es channel: ") SdkMap["virtual-faceextract-sdk-pull"] = make(chan protomsg.SdkMessage) fmt.Println("create virtual-faceextract-sdk-pull") Createwebserver("virtual-faceextract-sdk-pull") go Dealextern() go AutoDelSdk(util.Sdkflag) } func CreatesdkTopicandServer(sdkid string) { SdkMap[sdkid] = make(chan protomsg.SdkMessage) fmt.Println("create sdk channel: ", sdkid) url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull) socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) fmt.Println(sdkid, "create server error!") return } go Send(sdkid, socketser, SdkMap[sdkid]) url = fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPush) socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) fmt.Println(sdkid, "create dial error!") return } go Recv(socketdial) } // web 接受端处理 func Createwebserver(webid string ){ url := fmt.Sprintf("ipc:///tmp/%s%s", webid, postPull) socketser, err := NewSdkSocketListen(deliver.PushPull, webid, url) if err != nil { fmt.Println(webid, "create server error!") return } go Send(webid, socketser, SdkMap[webid]) } func DeletesdkTopicandServer(sdkid string) { close(SdkMap[sdkid]) delete(SdkMap, sdkid) fmt.Println("删除主题 sdk: ", sdkid) SocketManage[sdkid].Cancel() delete(SocketManage, sdkid) fmt.Println("删除server sdk: ", sdkid) } //单独处理 es 主题的情况 func Dealextern() { for { select { case esmsg:= <-SdkMap["es"]: fmt.Println(esmsg) } } } //动态处理 func AutoDelSdk(sdkflag chan bool) { for _ = range sdkflag { fmt.Println("test autodelsdk") var oldSdk []string for key, _ := range SdkMap { oldSdk = append(oldSdk, key) } util.Sdklist = append(util.Sdklist, "es") util.Sdklist = append(util.Sdklist, "virtual-faceextract-sdk-pull") sdkChanDel := util.Difference(oldSdk, util.Sdklist) fmt.Println(sdkChanDel) for key, op := range sdkChanDel { if op == "add" { CreatesdkTopicandServer(key) } else { DeletesdkTopicandServer(key) fmt.Println("删除主题 sdk: ", key) } } } } //主题 //sdk数据 加工器 func SdkData(cid string,caddr string, taskid string, data []byte) protomsg.SdkMessage { var sdkmsg = protomsg.SdkMessage{} sdkmsg.Cid = cid sdkmsg.Caddr =caddr if val, ok := tasktag.TaskMapLab.Load(taskid); !ok { sdkmsg.Tasklab = nil return sdkmsg } else { sdkmsg.Tasklab = val.(*protomsg.TaskLabel) sdkmsg.Data = data } //sdkmsg.Tasklab = tasktag.TaskMapLab[taskid] return sdkmsg } //sdk数据分发器 func SdkSendTopic(sdkmsg protomsg.SdkMessage) (sdksend string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkinfos) { sdksend = sdkmsg.Tasklab.Sdkinfos[sdkmsg.Tasklab.Index].Sdkid } else { sdksend = "es" } fmt.Printf("分发的主题是:%s 位置 %d/%d\n ", sdksend, int(sdkmsg.Tasklab.Index)+1, len(sdkmsg.Tasklab.Sdkinfos)) return } // create server func NewSdkSocketListen(mode int, sdkid string, url string) (socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) if socket.Sock == nil { return socket, errors.New("create listen error") } SocketManage[sdkid] = socket return socket, nil } func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewClient(deliver.Mode(mode), url) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func Recv(socket SocketContext) { var repsdkmsg = protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): fmt.Println("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { //fmt.Printf("%s ", err) continue } else { err = proto.Unmarshal(msg, &repsdkmsg) fmt.Println("receive len: ", len(msg)) if err != nil { fmt.Println("unmarshal error: ", err) continue } repsdkmsg.Tasklab.Index++ //调用计算函数, 分发给下一个主题 nexttopic := SdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg } } } } func Send(sdkid string, socket SocketContext, in chan protomsg.SdkMessage) { // var v *protomsg.SdkMessage // var ok bool for { select { case <-socket.Context.Done(): fmt.Println("socket is close") return case v, ok := <-in: if ok { data, err :=v.Marshal() if err != nil { fmt.Println("proto marshal error ", err) continue fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data)) fmt.Println() if err := socket.Sock.Send(data); err != nil { fmt.Println(socket.Sock) fmt.Println("failed send") continue } fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data)) } else { fmt.Println(sdkid, " 主题关闭, 关闭send()") return } } } } }