package sdk import ( "context" "errors" "fmt" "github.com/long/test/httpclient" "github.com/long/test/protomsg" "github.com/long/test/tasktag" "github.com/long/test/util" "github.com/golang/protobuf/proto" "github.com/long/test/deliver" ) //var doOnce sync.Once var SocketManage = make(map[string]SocketContext) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { sdklist := SdkAll() //获取所有sdk fmt.Println("sdk list have: ", sdklist) SdkCreateTopic(sdklist) // 创建主题 for _, sdkid := range sdklist { // 创建sdk server url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort) sdkidser, socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { continue } UrlPort++ go Send(sdkidser, socketser, SdkMap[sdkid]) url = fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPortR) _, socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { continue } UrlPortR++ go Recv(socketdial) } go es(SdkMap["es"]) } //单独处理 es 主题的情况 func es(sdkmsgchan chan *protomsg.SdkMessage) { for _ = range sdkmsgchan { fmt.Println("this data is finish all sdk! ") } } //动态处理 func AutoDelSdk(Newsdklist []string) { var oldSdk []string for key, _ := range SdkMap { oldSdk = append(oldSdk, key) } sdkChanDel := util.Difference(oldSdk, Newsdklist) for key, op := range sdkChanDel { if op == "add" { SdkMap[key] = make(chan *protomsg.SdkMessage) fmt.Println("创建主题 sdk: ", key) } else { close(SdkMap[key]) delete(SdkMap, key) fmt.Println("删除主题 sdk: ", key) } } } //主题 var SdkMap = make(map[string]chan *protomsg.SdkMessage) //sdk数据 加工器 func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage { var sdkmsg = &protomsg.SdkMessage{} sdkmsg.Cid = cid if _, ok := tasktag.TaskMapLab[taskid]; !ok { sdkmsg.Tasklab = nil return sdkmsg } sdkmsg.Tasklab = tasktag.TaskMapLab[taskid] sdkmsg.Data = data return sdkmsg } //sdk数据分发器 func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) { sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index] } else { sdksend = "es" } fmt.Println() fmt.Println("分发的主题是: ", sdksend) fmt.Println() return } // 调用 http 借口获取摄像机信息 func SdkAll() (sdklist []string) { sdklist = httpclient.GetSdk("http://127.0.0.1:8000/data/api-v/sdk/findskdid") return } // 创建主题 func SdkCreateTopic(sdklist []string) (err error) { for _, sdkid := range sdklist { SdkMap[sdkid] = make(chan *protomsg.SdkMessage) fmt.Println("create sdk channel: ", sdkid) } SdkMap["es"] = make(chan *protomsg.SdkMessage) fmt.Println("create es channel: ") return nil } var UrlPort = 9000 var UrlPortR = 9500 // create server func NewSdkSocketListen(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) fmt.Println(sdkid, socket.Sock) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewClient(deliver.Mode(mode), url) fmt.Println(sdkid, socket.Sock) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func Recv(socket SocketContext) { //socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second) var repsdkmsg = &protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): fmt.Println("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { //fmt.Printf("%s ", err) continue } else { err = proto.Unmarshal(msg, repsdkmsg) fmt.Println("receive len: ", len(msg)) if err != nil { fmt.Println("unmarshal error: ", err) continue } //调用计算函数, 分发给下一个主题 nexttopic := SdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg } } } } func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) { var v *protomsg.SdkMessage for { select { case <-socket.Context.Done(): fmt.Println("socket is close") case v = <-in: data, err := proto.Marshal(v) if err != nil { fmt.Println("proto marshal error ", err) continue } fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data)) fmt.Println() fmt.Println("send len of data: ", len(data)) if err := socket.Sock.Send(data); err != nil { fmt.Println("failed send") continue } fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data)) } } }