| | |
| | | |
| | | "time" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/deliver.git" |
| | | "github.com/gogo/protobuf/proto" |
| | | ) |
| | | |
| | | // Reciever recv from ipc |
| | | type Reciever struct { |
| | | ctx context.Context |
| | | ipcURL string |
| | | out chan<- []byte |
| | | chMsg chan<- MsgRS |
| | | |
| | | shm bool |
| | | fnLogger func(...interface{}) |
| | | } |
| | | |
| | | // NewReciever new recv |
| | | func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { |
| | | func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever { |
| | | return &Reciever{ |
| | | ipcURL: url, |
| | | out: out, |
| | | chMsg: chMsg, |
| | | shm: shm, |
| | | fnLogger: fn, |
| | | } |
| | | } |
| | | |
| | | func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | d := <-data |
| | | if len(d) < 100 { |
| | | continue |
| | | } |
| | | // logo.Infoln(len(d), "reciver数据") |
| | | msg := protomsg.SdkMessage{} |
| | | if err := proto.Unmarshal(d, &msg); err != nil { |
| | | r.fnLogger(err, " msg 处理异常") |
| | | continue |
| | | } |
| | | outMsg := MsgRS{Msg: msg} |
| | | r.chMsg <- outMsg |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { |
| | | |
| | | // t := time.Now() |
| | | // sc := 0 |
| | | dataChan := make(chan []byte, 3) |
| | | |
| | | go r.unserilizeProto(ctx, dataChan) |
| | | |
| | | count := 0 |
| | | |
| | |
| | | count = 0 |
| | | r.fnLogger("~~~shm recv image:", len(d)) |
| | | } |
| | | r.out <- d |
| | | dataChan <- d |
| | | } |
| | | } |
| | | } else { |
| | |
| | | count = 0 |
| | | r.fnLogger("~~~mangos recv image:", len(msg)) |
| | | } |
| | | r.out <- msg |
| | | dataChan <- msg |
| | | } |
| | | } |
| | | |
| | | // sc++ |
| | | // if sc == 25 { |
| | | // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) |
| | | // sc = 0 |
| | | // t = time.Now() |
| | | // } |
| | | |
| | | } |
| | | } |