视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-08 ce15b43db3e60acc65ddd25de253b8577c2693aa
analysis/main.go
@@ -2,26 +2,39 @@
import (
   "analysis/demo"
   "analysis/valib/ipc"
   srv "analysis/work/service"
   "bytes"
   "context"
   "encoding/gob"
   "encoding/json"
   "flag"
   "fmt"
   "strconv"
   "time"
)
var (
   streamURL string
   picFolder string
   asServer bool
   ipcURL    string
   streamURL string
   ipcServer bool
   procType  string
   procCount int
)
func init() {
   flag.StringVar(&streamURL, "i", "rtsp://192.168.1.203:8554/16.mkv", "input url")
   flag.StringVar(&picFolder, "f", ".", "test pic folder")
   flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
   flag.StringVar(&proc, "proc", "", "proc name")
   flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/piipc", "ipc label")
   flag.BoolVar(&asServer, "server", false, "run ipc as server")
   flag.BoolVar(&ipcServer, "server", false, "run ipc as server")
   flag.IntVar(&procCount, "c", 1, "proc run count")
}
const (
@@ -29,21 +42,96 @@
   reciever = "rcv"
)
//   每一个任务标签
type TaskLabel struct {
   Taskid string
   Sdkids []string
   Index  int
}
//每一条加工后的数据流
type SdkMessage struct {
   Cid     string
   Tasklab TaskLabel
   Data    []byte
}
type ChSdkMsg struct {
   sdkMsg SdkMessage
   ch     chan<- srv.ImageInfo
}
var mapSdkMsg = make(map[string]ChSdkMsg)
func recvSdkMsgInfoFromIPC(ctx context.Context, url string) {
   ipc := ipc.NewClient(ctx, url)
   fmt.Println("ipc address: ", url)
   for {
      msg := ipc.Recv()
      if msg != nil {
         var m SdkMessage
         if err := json.Unmarshal(msg, &m); err == nil {
            if v, ok := mapSdkMsg[m.Cid]; !ok {
               imageChan := make(chan srv.ImageInfo)
               mapSdkMsg[m.Cid] = ChSdkMsg{m, imageChan}
               y := demo.NewYolo()
               go y.ShowYoloWithName(imageChan, m.Cid)
            } else {
               fmt.Println("recv ipc : ", url, " cameraid : ", v.sdkMsg.Cid)
               var buf bytes.Buffer
               buf.Write(v.sdkMsg.Data)
               dec := gob.NewDecoder(&buf)
               var i srv.ImageInfo
               if err := dec.Decode(&i); err != nil {
                  fmt.Println("gob decode CameraImage error", err)
                  continue
               }
               v.ch <- i
            }
         } else {
            fmt.Println(err)
         }
      }
   }
}
func main() {
   flag.Parse()
   fmt.Println("start test, pic folder: ", picFolder)
   imageChan := make(chan srv.ImageInfo)
   d := srv.NewReciever(ipcURL, imageChan)
   tcp := "tcp://192.168.1.124:"
   port := 9000
   ctx, cancel := context.WithCancel(context.Background())
   for i := 0; i < procCount; i++ {
      go recvSdkMsgInfoFromIPC(ctx, tcp+strconv.Itoa(port))
      port++
   }
   for {
      time.Sleep(time.Duration(2) * time.Second)
   }
   cancel()
}
   if asServer {
func oneTest(ctx context.Context) {
   imageChan := make(chan srv.ImageInfo)
   d := srv.NewReciever(ctx, ipcURL, imageChan)
   if ipcServer {
      go d.RunAsServer()
   } else {
      go d.RunAsClient()
   }
   demo.ShowYolo(imageChan)
   y := demo.NewYolo()
   y.ShowYolo(imageChan)
   fakeStartProc()
}