From 400044583627cc72a1a3071f1a389e18953cbba0 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 一月 2020 09:42:30 +0800
Subject: [PATCH] update
---
/dev/null | 128 -------------------------
common/helper.go | 131 --------------------------
2 files changed, 0 insertions(+), 259 deletions(-)
diff --git a/common/flow.go b/common/flow.go
deleted file mode 100644
index 4dfb11c..0000000
--- a/common/flow.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package common
-
-import (
- "context"
- "time"
-
- "basic.com/pubsub/protomsg.git"
-
- "basic.com/libgowrapper/sdkstruct.git"
-)
-
-/////////////////////////////////////////////////////////////////
-
-// FlowCreate create flow
-func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
-
- const (
- postPull = `_1`
- postPush = `_2`
- )
- ipcRcv := GetIpcAddress(shm, id+postPull)
- ipcSnd := GetIpcAddress(shm, id+postPush)
- chRcv := make(chan []byte, 3)
- chSnd := make(chan sdkstruct.MsgSDK, 3)
-
- rcver := NewReciever(ipcRcv, chRcv, shm, fn)
- snder := NewSender(ipcSnd, chSnd, shm, fn)
- torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
-
- snder.ApplyCallbackFunc(torule.Push)
-
- go rcver.Run(ctx)
- go snder.Run(ctx)
- go torule.Run(ctx)
-
- return chRcv, chSnd
-}
-
-// WorkFlowSimple work
-func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
- fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
- fn func(...interface{})) {
-
- tm := time.Now()
- sc := 0
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
-
- elems := fnConsume()
- if elems == nil || len(elems) == 0 {
- time.Sleep(10 * time.Millisecond)
- continue
- }
-
- var msgs []protomsg.SdkMessage
- for _, v := range elems {
- msgs = append(msgs, v.(protomsg.SdkMessage))
- }
-
- fnRun(msgs, out, typ)
-
- sc++
- if sc == 25 {
- fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
- sc = 0
- tm = time.Now()
- }
- if time.Since(tm) > time.Second {
- fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
- sc = 0
- tm = time.Now()
- }
- }
- }
-}
-
-// FlowSimple wrap
-func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
- fnProduce func(interface{}), fnConsume func() []interface{},
- fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
- fnClose func(), fn func(...interface{})) {
-
- cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
- fnRun(msgs[0], ch, typ)
- }
-
- FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
-
-}
-
-// FlowBatch batch
-func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
- fnProduce func(interface{}), fnConsume func() []interface{},
- fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
- fnClose func(), fn func(...interface{})) {
-
- chMsg := make(chan protomsg.SdkMessage, 3)
- go UnserilizeProto(ctx, in, chMsg, fn)
-
- go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
-
- for {
- select {
- case <-ctx.Done():
- fnClose()
- return
- case rMsg := <-chMsg:
- if !ValidRemoteMessage(rMsg, typ, fn) {
- fn(typ, " validremotemessage invalid")
- EjectResult(nil, rMsg, out)
- continue
- }
- fnProduce(rMsg)
-
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
-
-}
diff --git a/common/helper.go b/common/helper.go
index 0e016c3..dc6f7b0 100644
--- a/common/helper.go
+++ b/common/helper.go
@@ -1,28 +1,11 @@
package common
import (
- "context"
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
- "time"
-
- "basic.com/libgowrapper/sdkstruct.git"
- "basic.com/pubsub/protomsg.git"
- "basic.com/valib/deliver.git"
- "github.com/gogo/protobuf/proto"
)
-
-const mode = deliver.PushPull
-
-// GetIpcAddress get ipc
-func GetIpcAddress(shm bool, id string) string {
- if shm {
- return id
- }
- return `ipc:///tmp/` + id + `.ipc`
-}
// SubConfig sub
type SubConfig struct {
@@ -56,119 +39,5 @@
// Atoi atoi
func Atoi(s string) int {
i, _ := strconv.Atoi(s)
- return i
-}
-
-// UnserilizeProto un
-func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
- for {
- select {
- case <-ctx.Done():
- return
- case d := <-data:
- if len(d) < 100 {
- continue
- }
- msg := protomsg.SdkMessage{}
- if err := proto.Unmarshal(d, &msg); err != nil {
- fn(err, " msg 澶勭悊寮傚父")
- continue
- }
-
- out <- msg
-
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
-}
-
-// Msg2MsgSDK msg->msgsdk
-func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
-
- d, err := proto.Marshal(&msg)
- if err != nil {
- return nil
- }
-
- index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
- if index >= count {
- return &sdkstruct.MsgSDK{
- MsgData: d,
- SdkCount: count,
- SdkIndex: index,
- SdkDataLen: 0,
- }
- }
-
- return &sdkstruct.MsgSDK{
- MsgData: d,
- SdkCount: count,
- SdkIndex: index,
- SdkDataLen: len(d),
- }
-}
-
-// EjectResult eject
-func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
-
- if res == nil {
- s := Msg2MsgSDK(msg)
- if s == nil {
- return
- }
- out <- *s
- return
- }
-
- msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
-
- s := Msg2MsgSDK(msg)
- if s == nil {
- return
- }
- out <- *s
-}
-
-/////////////////////////////////////////////////////////////
-
-// ValidRemoteMessage valid or not
-func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
- if msg.Tasklab == nil {
- fn(fnName, " recieve msg nil")
- return false
- }
-
- sdkLen := len(msg.Tasklab.Sdkinfos)
- if sdkLen == 0 {
- fn(fnName, " has no sdk info")
- return false
- }
-
- curIndex := int(msg.Tasklab.Index)
- if curIndex < 0 || curIndex >= sdkLen {
- fn(fnName, " tasklab index ", curIndex, " error")
- return false
- }
- if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
- fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
- return false
- }
- return true
-}
-
-// UnpackImage unpack
-func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
- // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
- i := &protomsg.Image{}
- err := proto.Unmarshal(msg.Data, i)
- if err != nil {
- fn(fnName, " protobuf decode CameraImage error: ", err.Error())
- return nil
- }
- if i.Data == nil {
- fn(fnName, " protomsg.Image data null")
- return nil
- }
return i
}
diff --git a/common/lockList.go b/common/lockList.go
deleted file mode 100644
index f630a6a..0000000
--- a/common/lockList.go
+++ /dev/null
@@ -1,89 +0,0 @@
-package common
-
-import (
- "container/list"
- "sync"
-)
-
-// LockList list
-type LockList struct {
- cache *list.List
- cv *sync.Cond
- cond bool
- size int
-}
-
-// NewLockList new
-func NewLockList(size int) *LockList {
- return &LockList{
- cache: list.New(),
- cv: sync.NewCond(&sync.Mutex{}),
- cond: false,
- size: size,
- }
-}
-
-// Push push
-func (l *LockList) Push(v interface{}) {
- l.cv.L.Lock()
- l.cache.PushBack(v)
-
- for l.cache.Len() > l.size {
- l.cache.Remove(l.cache.Front())
- }
-
- l.cond = true
- l.cv.Signal()
- l.cv.L.Unlock()
-}
-
-// Pop pop
-func (l *LockList) Pop() []interface{} {
-
- var batch []interface{}
-
- l.cv.L.Lock()
-
- for !l.cond {
- l.cv.Wait()
- }
-
- elem := l.cache.Front()
- if elem != nil {
- batch = append(batch, elem.Value)
- l.cache.Remove(l.cache.Front())
- }
-
- l.cond = false
- l.cv.L.Unlock()
-
- return batch
-}
-
-// Drain drain all element
-func (l *LockList) Drain() []interface{} {
-
- var batch []interface{}
-
- l.cv.L.Lock()
-
- for !l.cond {
- l.cv.Wait()
- }
-
- for {
-
- elem := l.cache.Front()
- if elem == nil {
- break
- }
-
- batch = append(batch, elem.Value)
- l.cache.Remove(l.cache.Front())
- }
-
- l.cond = false
- l.cv.L.Unlock()
-
- return batch
-}
diff --git a/common/recv.go b/common/recv.go
deleted file mode 100644
index fb31433..0000000
--- a/common/recv.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package common
-
-import (
- "context"
-
- "time"
-
- "basic.com/valib/deliver.git"
-)
-
-// Reciever recv from ipc
-type Reciever struct {
- ctx context.Context
- ipcURL string
- out chan<- []byte
-
- shm bool
- fnLogger func(...interface{})
-}
-
-// NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
- return &Reciever{
- ipcURL: url,
- out: out,
- shm: shm,
- fnLogger: fn,
- }
-}
-
-// Run run a IPC client
-func (r *Reciever) Run(ctx context.Context) {
-
- if r.shm {
- r.runShm(ctx)
- } else {
- r.run(ctx, deliver.NewClient(mode, r.ipcURL))
- }
-}
-
-func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
-
- // t := time.Now()
- // sc := 0
-
- count := 0
-
- for {
- select {
- case <-ctx.Done():
- i.Close()
- return
- default:
-
- if r.shm {
- if d, err := i.Recv(); err != nil {
- i.Close()
- r.fnLogger("ANALYSIS RECV ERROR: ", err)
-
- c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
- for {
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
- r.fnLogger("ANALYSIS CREATE FAILED : ", err)
- }
- i = c
- r.fnLogger("ANALYSIS CREATE SHM")
- } else {
- if d != nil {
- count++
- if count > 10 {
- count = 0
- r.fnLogger("~~~shm recv image:", len(d))
- }
- r.out <- d
- }
- }
- } else {
- if msg, err := i.Recv(); err != nil {
- // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
- } else {
- count++
- if count > 10 {
- count = 0
- r.fnLogger("~~~mangos recv image:", len(msg))
- }
- r.out <- msg
- }
- }
-
- // sc++
- // if sc == 25 {
- // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
- // sc = 0
- // t = time.Now()
- // }
-
- }
- }
-}
-
-func (r *Reciever) runShm(ctx context.Context) {
- c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
- for {
- if err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
- r.fnLogger("CLIENT CREATE FAILED : ", err)
- }
- r.run(ctx, c)
-}
diff --git a/common/send.go b/common/send.go
deleted file mode 100644
index a0e7cc4..0000000
--- a/common/send.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package common
-
-import (
- "context"
- "time"
-
- "basic.com/libgowrapper/sdkstruct.git"
- "basic.com/valib/deliver.git"
-)
-
-// Sender decoder ingo
-type Sender struct {
- ipcURL string
- chMsg <-chan sdkstruct.MsgSDK
- shm bool
- fn func([]byte, bool)
-
- fnLogger func(...interface{})
-}
-
-// ApplyCallbackFunc cb
-func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
-
- if s.fn == nil {
- s.fn = f
- }
-}
-
-// NewSender Sender
-func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
- // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
- return &Sender{
- ipcURL: ipcURL,
- chMsg: chMsg,
- shm: shm,
- fn: nil,
- fnLogger: fn,
- }
-}
-
-// Run run a IPC producer
-func (s *Sender) Run(ctx context.Context) {
-
- if s.shm {
- s.runShm(ctx)
- } else {
- i := deliver.NewClient(mode, s.ipcURL)
- if i == nil {
- s.fnLogger("sender 2 pubsub nng create error")
- return
- }
- s.run(ctx, i)
- }
-}
-
-func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
-
- for {
- select {
- case <-ctx.Done():
- s.fnLogger("stop Sender")
- return
- case i := <-s.chMsg:
-
- data <- i.MsgData
-
- if int(i.SdkIndex+1) == i.SdkCount {
- if s.fn != nil {
-
- sFlag := true
- if i.SdkDataLen < 2 {
- sFlag = false
- }
- s.fn(i.MsgData, sFlag)
-
- }
- }
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
-}
-
-func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
-
- // go ruleserver.TimeTicker()
-
- dataChan := make(chan []byte, 3)
- go s.serializeProto(ctx, dataChan)
-
- for {
- select {
- case <-ctx.Done():
- i.Close()
- return
- case d := <-dataChan:
-
- if s.shm {
- if err := i.Send(d); err != nil {
- i.Close()
- s.fnLogger("ANALYSIS SENDER ERROR: ", err)
-
- c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
- for {
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("CLIENT CREATE FAILED : ", err)
- }
- i = c
- } else {
-
- }
- } else {
- err := i.Send(d)
- if err != nil {
- // logo.Errorln("error sender 2 pubsub: ", err)
- } else {
- s.fnLogger("mangos send to pubsub len: ", len(d))
- }
- }
- default:
- time.Sleep(10 * time.Millisecond)
- }
- }
-}
-
-func (s *Sender) runShm(ctx context.Context) {
- c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
- for {
- if err == nil {
- break
- }
- time.Sleep(1 * time.Second)
- c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("CLIENT CREATE FAILED : ", err)
- }
- s.run(ctx, c)
-}
diff --git a/common/torule.go b/common/torule.go
deleted file mode 100644
index c8be086..0000000
--- a/common/torule.go
+++ /dev/null
@@ -1,128 +0,0 @@
-package common
-
-import (
- "container/list"
- "context"
- "sync"
- "time"
-
- "basic.com/valib/deliver.git"
- // "basic.com/pubsub/protomsg.git"
- // "github.com/gogo/protobuf/proto"
-)
-
-type runResult struct {
- data []byte
- valid bool
-}
-
-// ToRule ipc
-type ToRule struct {
- ipcURL string
- maxSize int
- cache *list.List
- cv *sync.Cond
- cond bool
- fnLogger func(...interface{})
-}
-
-// NewToRule send to ruleprocess
-func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
- return &ToRule{
- ipcURL: ipcURL,
- maxSize: maxSize,
- cache: list.New(),
- cv: sync.NewCond(&sync.Mutex{}),
- cond: false,
- fnLogger: fn,
- }
-}
-
-// Push data
-func (t *ToRule) Push(data []byte, valid bool) {
-
- t.cv.L.Lock()
- result := runResult{data, valid}
- t.cache.PushBack(result)
- if t.cache.Len() > t.maxSize {
- for i := 0; i < t.cache.Len(); {
- d := t.cache.Front().Value.(runResult)
- if d.valid == false {
- t.cache.Remove(t.cache.Front())
- i = i + 2
- } else {
- i = i + 1
- }
- }
- }
- if t.cache.Len() > t.maxSize {
- for i := 0; i < t.cache.Len(); {
- t.cache.Remove(t.cache.Front())
- i = i + 2
- }
- }
- // logo.Infof("push to cache count : %d\n", t.cache.Len())
- t.cond = true
- t.cv.Signal()
- t.cv.L.Unlock()
-}
-
-// Run forever
-func (t *ToRule) Run(ctx context.Context) {
-
- var i deliver.Deliver
- var err error
-
- for {
- i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
- if err != nil {
- time.Sleep(time.Second)
- t.fnLogger("wait create to rule ipc", err)
- continue
- }
- break
- }
-
- count := 0
-
- for {
- select {
- case <-ctx.Done():
- return
- default:
-
- var d []byte
- t.cv.L.Lock()
-
- for !t.cond {
- t.cv.Wait()
- }
-
- for j := 0; j < 8; j++ {
- if t.cache.Len() <= 0 {
- break
- }
-
- d = t.cache.Front().Value.(runResult).data
- if i != nil && d != nil {
-
- err := i.Send(d)
- if err != nil {
- t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
- } else {
- count++
- if count > 5 {
- count = 0
- t.fnLogger("~~~~~~SEND TO RULE CORRECT")
- }
- }
- }
- t.cache.Remove(t.cache.Front())
- }
-
- t.cond = false
- t.cv.L.Unlock()
-
- }
- }
-}
--
Gitblit v1.8.0