package sdk import ( "context" "errors" "fmt" // "github.com/long/test/httpclient" "github.com/long/test/tasktag" "github.com/long/test/util" "github.com/golang/protobuf/proto" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" ) const ( postPull = "_1.ipc" postPush = "_2.ipc" ) var SocketManage = make(map[string]SocketContext) var SdkMap = make(map[string]chan *protomsg.SdkMessage) type SocketContext struct { Sock deliver.Deliver Context context.Context Cancel context.CancelFunc } func Init() { fmt.Println("============= init sdk info =====================") for _, sdkid := range util.Sdklist { // 创建sdk server CreatesdkTopicandServer(sdkid) fmt.Println() } SdkMap["es"] = make(chan *protomsg.SdkMessage) fmt.Println("create es channel: ") go es(SdkMap["es"]) go AutoDelSdk(util.Sdkflag) } func CreatesdkTopicandServer(sdkid string) { SdkMap[sdkid] = make(chan *protomsg.SdkMessage) fmt.Println("create sdk channel: ", sdkid) url := fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPull) sdkidser, socketser, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) fmt.Println(sdkid, "create server error!") return } go Send(sdkidser, socketser, SdkMap[sdkid]) url = fmt.Sprintf("ipc:///tmp/%s%s", sdkid, postPush) _, socketdial, err := NewSdkSocketListen(deliver.PushPull, sdkid, url) if err != nil { delete(SdkMap, sdkid) fmt.Println(sdkid, "create dial error!") return } go Recv(socketdial) } func DeletesdkTopicandServer(sdkid string) { close(SdkMap[sdkid]) delete(SdkMap, sdkid) fmt.Println("删除主题 sdk: ", sdkid) SocketManage[sdkid].Cancel() delete(SocketManage, sdkid) fmt.Println("删除server sdk: ", sdkid) } //单独处理 es 主题的情况 func es(sdkmsgchan chan *protomsg.SdkMessage) { for _ = range sdkmsgchan { fmt.Println("this data is finish all sdk! ") } } //动态处理 func AutoDelSdk(sdkflag chan bool) { for _ = range sdkflag { fmt.Println("test autodelsdk") var oldSdk []string for key, _ := range SdkMap { oldSdk = append(oldSdk, key) } util.Sdklist = append(util.Sdklist, "es") sdkChanDel := util.Difference(oldSdk, util.Sdklist) fmt.Println(sdkChanDel) for key, op := range sdkChanDel { if op == "add" { CreatesdkTopicandServer(key) } else { DeletesdkTopicandServer(key) fmt.Println("删除主题 sdk: ", key) } } } } //主题 //sdk数据 加工器 func SdkData(cid string, taskid string, data []byte) *protomsg.SdkMessage { var sdkmsg = &protomsg.SdkMessage{} sdkmsg.Cid = cid //if _, ok := tasktag.TaskMapLab[taskid]; !ok { // sdkmsg.Tasklab = nil // return sdkmsg //} if val, ok := tasktag.TaskMapLab.Load(taskid); ok { sdkmsg.Tasklab = nil return sdkmsg } else { sdkmsg.Tasklab = val.(*protomsg.TaskLabel) sdkmsg.Data = data } //sdkmsg.Tasklab = tasktag.TaskMapLab[taskid] return sdkmsg } //sdk数据分发器 func SdkSendTopic(sdkmsg *protomsg.SdkMessage) (sdksend string) { if int(sdkmsg.Tasklab.Index) < len(sdkmsg.Tasklab.Sdkids) { sdksend = sdkmsg.Tasklab.Sdkids[sdkmsg.Tasklab.Index] } else { sdksend = "es" } fmt.Println("分发的主题是: ", sdksend) return } // create server func NewSdkSocketListen(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewServer(deliver.Mode(mode), url) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func NewSdkSocketDial(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) { fmt.Println("url is: ", url) ctx, cancel := context.WithCancel(context.Background()) socket.Context = ctx socket.Cancel = cancel socket.Sock = deliver.NewClient(deliver.Mode(mode), url) if socket.Sock == nil { return sdkid, socket, errors.New("create listen error") } SocketManage[sdkid] = socket return sdkid, socket, nil } func Recv(socket SocketContext) { var repsdkmsg = &protomsg.SdkMessage{} for { select { case <-socket.Context.Done(): fmt.Println("socket close") return default: if msg, err := socket.Sock.Recv(); err != nil { //fmt.Printf("%s ", err) continue } else { err = proto.Unmarshal(msg, repsdkmsg) fmt.Println("receive len: ", len(msg)) if err != nil { fmt.Println("unmarshal error: ", err) continue } //调用计算函数, 分发给下一个主题 nexttopic := SdkSendTopic(repsdkmsg) SdkMap[nexttopic] <- repsdkmsg } } } } func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) { var v *protomsg.SdkMessage var ok bool for { select { case <-socket.Context.Done(): fmt.Println("socket is close") return case v, ok = <-in: if ok { data, err := proto.Marshal(v) if err != nil { fmt.Println("proto marshal error ", err) continue } fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data)) fmt.Println() fmt.Println("send len of data: ", len(data)) if err := socket.Sock.Send(data); err != nil { fmt.Println(socket.Sock) fmt.Println("failed send") continue } fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data)) } else { fmt.Println(sdkid, " 主题关闭, 关闭send()") return } } } }