派生自 libgowrapper/face

zhangmeng
2020-01-15 d85f3edab0d8c495cecd7a81f31a9ead1eb001ac
common/recv.go
@@ -5,26 +5,50 @@
   "time"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/deliver.git"
   "github.com/gogo/protobuf/proto"
)
// Reciever recv from ipc
type Reciever struct {
   ctx    context.Context
   ipcURL string
   out    chan<- []byte
   chMsg  chan<- MsgRS
   shm      bool
   fnLogger func(...interface{})
}
// NewReciever new recv
func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
   return &Reciever{
      ipcURL:   url,
      out:      out,
      chMsg:    chMsg,
      shm:      shm,
      fnLogger: fn,
   }
}
func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
   for {
      select {
      case <-ctx.Done():
         return
      default:
         d := <-data
         if len(d) < 100 {
            continue
         }
         // logo.Infoln(len(d), "reciver数据")
         msg := protomsg.SdkMessage{}
         if err := proto.Unmarshal(d, &msg); err != nil {
            r.fnLogger(err, " msg 处理异常")
            continue
         }
         outMsg := MsgRS{Msg: msg}
         r.chMsg <- outMsg
      }
   }
}
@@ -40,8 +64,9 @@
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
   // t := time.Now()
   // sc := 0
   dataChan := make(chan []byte, 3)
   go r.unserilizeProto(ctx, dataChan)
   count := 0
@@ -75,7 +100,7 @@
                     count = 0
                     r.fnLogger("~~~shm recv image:", len(d))
                  }
                  r.out <- d
                  dataChan <- d
               }
            }
         } else {
@@ -87,16 +112,9 @@
                  count = 0
                  r.fnLogger("~~~mangos recv image:", len(msg))
               }
               r.out <- msg
               dataChan <- msg
            }
         }
         // sc++
         // if sc == 25 {
         //    logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
         //    sc = 0
         //    t = time.Now()
         // }
      }
   }