视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-07 f963cd025c1aa88ac8b211e24f46ceb0eb64c418
decoder/work/service/ipcsender.go
@@ -7,6 +7,7 @@
   "decoder/valib/ipc"
   "encoding/gob"
   "fmt"
   "time"
)
// Sender decoder ingo
@@ -23,6 +24,8 @@
// NewSender Sender
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
   ctx, cancel := context.WithCancel(context.Background())
   fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipcURL, streamURL, cameraID)
   return &Sender{
      ctxIPC: &ContextIPC{ctx, cancel},
@@ -49,6 +52,7 @@
         }
         b := buf.Bytes()
         data <- b
         fmt.Printf("send %d length data ipc address %s\n", len(b), s.ipcURL)
      }
   }
@@ -58,20 +62,30 @@
   gf := goffmpeg.New()
   if gf != nil {
      s.ffmpeg = gf
   } else {
      fmt.Println("create decoder ffmpeg error")
      return
   }
   imageChan := make(chan ImageInfo)
   dataChan := make(chan []byte)
   go s.serializeImageInfo(imageChan, dataChan)
   gf.ActiveDecoder(func(d *[]byte, w, h *int) {
      if *w > 0 && *h > 0 {
         i := ImageInfo{*d, *w, *h, s.cameraID}
         imageChan <- i
      } else {
         fmt.Println("decode jpg error")
   gf.BuildDecoder()
   go func(f *goffmpeg.GoFFMPEG, ch chan<- ImageInfo) {
      for {
         data, wid, hei := f.GetPicDecoder()
         if wid > 0 && hei > 0 {
            img := ImageInfo{Data: data, Width: wid, Height: hei}
            ch <- img
         } else {
            time.Sleep(time.Millisecond * time.Duration(10))
         }
      }
   })
   }(gf, imageChan)
   gf.Run(s.streamURL)
   i.SendFromChannel(dataChan)
@@ -79,13 +93,17 @@
// RunAsServer run a IPC server
func (s *Sender) RunAsServer() {
   s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL))
   i := ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)
   fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
   s.run(i)
}
// RunAsClient run as a IPC client
func (s *Sender) RunAsClient() {
   s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL))
   i := ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)
   fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
   s.run(i)
}
// Stop stop run decoder, must run in goroutine