package work import ( "analysis/logo" "context" "time" "basic.com/valib/deliver.git" "basic.com/pubsub/protomsg.git" "github.com/gogo/protobuf/proto" ) // Reciever recv from ipc type Reciever struct { ctx context.Context ipcURL string chMsg chan MsgRS shm bool } // NewReciever new recv func NewReciever(url string, chMsg chan MsgRS, shm bool) *Reciever { return &Reciever{ ipcURL: url, chMsg: chMsg, shm: shm, } } func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) { for { select { case <-ctx.Done(): return default: d := <-data if len(d) < 100 { continue } // logo.Infoln(len(d), "reciver数据") msg := protomsg.SdkMessage{} if err := proto.Unmarshal(d, &msg); err != nil { logo.Errorln(err, " msg 处理异常") continue } // logo.Infoln("RECV MSG: ", msg.Cid, " TASK: ", msg.Tasklab.Taskid, " SDK: ", msg.Tasklab.Sdkinfos[msg.Tasklab.Index].Sdktype) outMsg := MsgRS{Msg: msg} r.chMsg <- outMsg } } } func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { dataChan := make(chan []byte) go r.unserilizeProto(ctx, dataChan) // t := time.Now() // sc := 0 for { select { case <-ctx.Done(): i.Close() return default: if r.shm { if d, err := i.Recv(); err != nil { i.Close() logo.Infoln("ANALYSIS RECV ERROR: ", err) c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) for { if err == nil { break } time.Sleep(time.Second) c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) logo.Infoln("ANALYSIS CREATE FAILED : ", err) } i = c logo.Infoln("ANALYSIS CREATE SHM") } else { if d != nil { logo.Infoln("~~~~~~shm recv image:", len(d)) dataChan <- d } } } else { if msg, err := i.Recv(); err != nil { // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) } else { logo.Infoln("~~~~~~mangos recv image:", len(msg)) dataChan <- msg } } // sc++ // if sc == 25 { // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) // sc = 0 // t = time.Now() // } } } } // Run run a IPC client func (r *Reciever) Run(ctx context.Context) { if r.shm { r.runShm(ctx) } else { r.run(ctx, deliver.NewClient(mode, r.ipcURL)) } } func (r *Reciever) runShm(ctx context.Context) { c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) for { if err == nil { break } time.Sleep(1 * time.Second) c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) logo.Infoln("CLIENT CREATE FAILED : ", err) } r.run(ctx, c) } /////////////////////////////////////////////////// ///////// test /////////////////////////////////////////////////// // Run2 run a IPC server or client func (r *Reciever) Run2(ctx context.Context, server bool) { if server { r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL)) } else { r.run(ctx, deliver.NewClient(deliver.PushPull, r.ipcURL)) } }