From 400044583627cc72a1a3071f1a389e18953cbba0 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 一月 2020 09:42:30 +0800
Subject: [PATCH] update

---
 /dev/null        |  128 -------------------------
 common/helper.go |  131 --------------------------
 2 files changed, 0 insertions(+), 259 deletions(-)

diff --git a/common/flow.go b/common/flow.go
deleted file mode 100644
index 4dfb11c..0000000
--- a/common/flow.go
+++ /dev/null
@@ -1,124 +0,0 @@
-package common
-
-import (
-	"context"
-	"time"
-
-	"basic.com/pubsub/protomsg.git"
-
-	"basic.com/libgowrapper/sdkstruct.git"
-)
-
-/////////////////////////////////////////////////////////////////
-
-// FlowCreate create flow
-func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
-
-	const (
-		postPull = `_1`
-		postPush = `_2`
-	)
-	ipcRcv := GetIpcAddress(shm, id+postPull)
-	ipcSnd := GetIpcAddress(shm, id+postPush)
-	chRcv := make(chan []byte, 3)
-	chSnd := make(chan sdkstruct.MsgSDK, 3)
-
-	rcver := NewReciever(ipcRcv, chRcv, shm, fn)
-	snder := NewSender(ipcSnd, chSnd, shm, fn)
-	torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
-
-	snder.ApplyCallbackFunc(torule.Push)
-
-	go rcver.Run(ctx)
-	go snder.Run(ctx)
-	go torule.Run(ctx)
-
-	return chRcv, chSnd
-}
-
-// WorkFlowSimple work
-func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
-	fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
-	fn func(...interface{})) {
-
-	tm := time.Now()
-	sc := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-
-			elems := fnConsume()
-			if elems == nil || len(elems) == 0 {
-				time.Sleep(10 * time.Millisecond)
-				continue
-			}
-
-			var msgs []protomsg.SdkMessage
-			for _, v := range elems {
-				msgs = append(msgs, v.(protomsg.SdkMessage))
-			}
-
-			fnRun(msgs, out, typ)
-
-			sc++
-			if sc == 25 {
-				fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
-				sc = 0
-				tm = time.Now()
-			}
-			if time.Since(tm) > time.Second {
-				fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
-				sc = 0
-				tm = time.Now()
-			}
-		}
-	}
-}
-
-// FlowSimple wrap
-func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
-	fnProduce func(interface{}), fnConsume func() []interface{},
-	fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
-	fnClose func(), fn func(...interface{})) {
-
-	cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
-		fnRun(msgs[0], ch, typ)
-	}
-
-	FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
-
-}
-
-// FlowBatch batch
-func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
-	fnProduce func(interface{}), fnConsume func() []interface{},
-	fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
-	fnClose func(), fn func(...interface{})) {
-
-	chMsg := make(chan protomsg.SdkMessage, 3)
-	go UnserilizeProto(ctx, in, chMsg, fn)
-
-	go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
-
-	for {
-		select {
-		case <-ctx.Done():
-			fnClose()
-			return
-		case rMsg := <-chMsg:
-			if !ValidRemoteMessage(rMsg, typ, fn) {
-				fn(typ, " validremotemessage invalid")
-				EjectResult(nil, rMsg, out)
-				continue
-			}
-			fnProduce(rMsg)
-
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-
-}
diff --git a/common/helper.go b/common/helper.go
index 0e016c3..dc6f7b0 100644
--- a/common/helper.go
+++ b/common/helper.go
@@ -1,28 +1,11 @@
 package common
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"strconv"
-	"time"
-
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/pubsub/protomsg.git"
-	"basic.com/valib/deliver.git"
-	"github.com/gogo/protobuf/proto"
 )
-
-const mode = deliver.PushPull
-
-// GetIpcAddress get ipc
-func GetIpcAddress(shm bool, id string) string {
-	if shm {
-		return id
-	}
-	return `ipc:///tmp/` + id + `.ipc`
-}
 
 // SubConfig sub
 type SubConfig struct {
@@ -56,119 +39,5 @@
 // Atoi atoi
 func Atoi(s string) int {
 	i, _ := strconv.Atoi(s)
-	return i
-}
-
-// UnserilizeProto un
-func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case d := <-data:
-			if len(d) < 100 {
-				continue
-			}
-			msg := protomsg.SdkMessage{}
-			if err := proto.Unmarshal(d, &msg); err != nil {
-				fn(err, " msg 澶勭悊寮傚父")
-				continue
-			}
-
-			out <- msg
-
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-// Msg2MsgSDK msg->msgsdk
-func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
-
-	d, err := proto.Marshal(&msg)
-	if err != nil {
-		return nil
-	}
-
-	index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
-	if index >= count {
-		return &sdkstruct.MsgSDK{
-			MsgData:    d,
-			SdkCount:   count,
-			SdkIndex:   index,
-			SdkDataLen: 0,
-		}
-	}
-
-	return &sdkstruct.MsgSDK{
-		MsgData:    d,
-		SdkCount:   count,
-		SdkIndex:   index,
-		SdkDataLen: len(d),
-	}
-}
-
-// EjectResult eject
-func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
-
-	if res == nil {
-		s := Msg2MsgSDK(msg)
-		if s == nil {
-			return
-		}
-		out <- *s
-		return
-	}
-
-	msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
-
-	s := Msg2MsgSDK(msg)
-	if s == nil {
-		return
-	}
-	out <- *s
-}
-
-/////////////////////////////////////////////////////////////
-
-// ValidRemoteMessage valid or not
-func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
-	if msg.Tasklab == nil {
-		fn(fnName, " recieve msg nil")
-		return false
-	}
-
-	sdkLen := len(msg.Tasklab.Sdkinfos)
-	if sdkLen == 0 {
-		fn(fnName, " has no sdk info")
-		return false
-	}
-
-	curIndex := int(msg.Tasklab.Index)
-	if curIndex < 0 || curIndex >= sdkLen {
-		fn(fnName, " tasklab index ", curIndex, " error")
-		return false
-	}
-	if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
-		fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
-		return false
-	}
-	return true
-}
-
-// UnpackImage unpack
-func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
-	// 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
-	i := &protomsg.Image{}
-	err := proto.Unmarshal(msg.Data, i)
-	if err != nil {
-		fn(fnName, " protobuf decode CameraImage error: ", err.Error())
-		return nil
-	}
-	if i.Data == nil {
-		fn(fnName, " protomsg.Image data null")
-		return nil
-	}
 	return i
 }
diff --git a/common/lockList.go b/common/lockList.go
deleted file mode 100644
index f630a6a..0000000
--- a/common/lockList.go
+++ /dev/null
@@ -1,89 +0,0 @@
-package common
-
-import (
-	"container/list"
-	"sync"
-)
-
-// LockList list
-type LockList struct {
-	cache *list.List
-	cv    *sync.Cond
-	cond  bool
-	size  int
-}
-
-// NewLockList new
-func NewLockList(size int) *LockList {
-	return &LockList{
-		cache: list.New(),
-		cv:    sync.NewCond(&sync.Mutex{}),
-		cond:  false,
-		size:  size,
-	}
-}
-
-// Push push
-func (l *LockList) Push(v interface{}) {
-	l.cv.L.Lock()
-	l.cache.PushBack(v)
-
-	for l.cache.Len() > l.size {
-		l.cache.Remove(l.cache.Front())
-	}
-
-	l.cond = true
-	l.cv.Signal()
-	l.cv.L.Unlock()
-}
-
-// Pop pop
-func (l *LockList) Pop() []interface{} {
-
-	var batch []interface{}
-
-	l.cv.L.Lock()
-
-	for !l.cond {
-		l.cv.Wait()
-	}
-
-	elem := l.cache.Front()
-	if elem != nil {
-		batch = append(batch, elem.Value)
-		l.cache.Remove(l.cache.Front())
-	}
-
-	l.cond = false
-	l.cv.L.Unlock()
-
-	return batch
-}
-
-// Drain drain all element
-func (l *LockList) Drain() []interface{} {
-
-	var batch []interface{}
-
-	l.cv.L.Lock()
-
-	for !l.cond {
-		l.cv.Wait()
-	}
-
-	for {
-
-		elem := l.cache.Front()
-		if elem == nil {
-			break
-		}
-
-		batch = append(batch, elem.Value)
-		l.cache.Remove(l.cache.Front())
-	}
-
-	l.cond = false
-	l.cv.L.Unlock()
-
-	return batch
-}
diff --git a/common/recv.go b/common/recv.go
deleted file mode 100644
index fb31433..0000000
--- a/common/recv.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package common
-
-import (
-	"context"
-
-	"time"
-
-	"basic.com/valib/deliver.git"
-)
-
-// Reciever recv from ipc
-type Reciever struct {
-	ctx    context.Context
-	ipcURL string
-	out    chan<- []byte
-
-	shm      bool
-	fnLogger func(...interface{})
-}
-
-// NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
-	return &Reciever{
-		ipcURL:   url,
-		out:      out,
-		shm:      shm,
-		fnLogger: fn,
-	}
-}
-
-// Run run a IPC client
-func (r *Reciever) Run(ctx context.Context) {
-
-	if r.shm {
-		r.runShm(ctx)
-	} else {
-		r.run(ctx, deliver.NewClient(mode, r.ipcURL))
-	}
-}
-
-func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
-
-	// t := time.Now()
-	// sc := 0
-
-	count := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			i.Close()
-			return
-		default:
-
-			if r.shm {
-				if d, err := i.Recv(); err != nil {
-					i.Close()
-					r.fnLogger("ANALYSIS RECV ERROR: ", err)
-
-					c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
-					for {
-						if err == nil {
-							break
-						}
-						time.Sleep(time.Second)
-						c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
-						r.fnLogger("ANALYSIS CREATE FAILED : ", err)
-					}
-					i = c
-					r.fnLogger("ANALYSIS CREATE SHM")
-				} else {
-					if d != nil {
-						count++
-						if count > 10 {
-							count = 0
-							r.fnLogger("~~~shm recv image:", len(d))
-						}
-						r.out <- d
-					}
-				}
-			} else {
-				if msg, err := i.Recv(); err != nil {
-					// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
-				} else {
-					count++
-					if count > 10 {
-						count = 0
-						r.fnLogger("~~~mangos recv image:", len(msg))
-					}
-					r.out <- msg
-				}
-			}
-
-			// sc++
-			// if sc == 25 {
-			// 	logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
-			// 	sc = 0
-			// 	t = time.Now()
-			// }
-
-		}
-	}
-}
-
-func (r *Reciever) runShm(ctx context.Context) {
-	c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
-	for {
-		if err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-		c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
-		r.fnLogger("CLIENT CREATE FAILED : ", err)
-	}
-	r.run(ctx, c)
-}
diff --git a/common/send.go b/common/send.go
deleted file mode 100644
index a0e7cc4..0000000
--- a/common/send.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package common
-
-import (
-	"context"
-	"time"
-
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/valib/deliver.git"
-)
-
-// Sender decoder ingo
-type Sender struct {
-	ipcURL string
-	chMsg  <-chan sdkstruct.MsgSDK
-	shm    bool
-	fn     func([]byte, bool)
-
-	fnLogger func(...interface{})
-}
-
-// ApplyCallbackFunc cb
-func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
-
-	if s.fn == nil {
-		s.fn = f
-	}
-}
-
-// NewSender Sender
-func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
-	// logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
-	return &Sender{
-		ipcURL:   ipcURL,
-		chMsg:    chMsg,
-		shm:      shm,
-		fn:       nil,
-		fnLogger: fn,
-	}
-}
-
-// Run run a IPC producer
-func (s *Sender) Run(ctx context.Context) {
-
-	if s.shm {
-		s.runShm(ctx)
-	} else {
-		i := deliver.NewClient(mode, s.ipcURL)
-		if i == nil {
-			s.fnLogger("sender 2 pubsub nng create error")
-			return
-		}
-		s.run(ctx, i)
-	}
-}
-
-func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
-
-	for {
-		select {
-		case <-ctx.Done():
-			s.fnLogger("stop Sender")
-			return
-		case i := <-s.chMsg:
-
-			data <- i.MsgData
-
-			if int(i.SdkIndex+1) == i.SdkCount {
-				if s.fn != nil {
-
-					sFlag := true
-					if i.SdkDataLen < 2 {
-						sFlag = false
-					}
-					s.fn(i.MsgData, sFlag)
-
-				}
-			}
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
-
-	// go ruleserver.TimeTicker()
-
-	dataChan := make(chan []byte, 3)
-	go s.serializeProto(ctx, dataChan)
-
-	for {
-		select {
-		case <-ctx.Done():
-			i.Close()
-			return
-		case d := <-dataChan:
-
-			if s.shm {
-				if err := i.Send(d); err != nil {
-					i.Close()
-					s.fnLogger("ANALYSIS SENDER ERROR: ", err)
-
-					c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
-					for {
-						if err == nil {
-							break
-						}
-						time.Sleep(time.Second)
-						c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
-						s.fnLogger("CLIENT CREATE FAILED : ", err)
-					}
-					i = c
-				} else {
-
-				}
-			} else {
-				err := i.Send(d)
-				if err != nil {
-					// logo.Errorln("error sender 2 pubsub: ", err)
-				} else {
-					s.fnLogger("mangos send to pubsub len: ", len(d))
-				}
-			}
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-func (s *Sender) runShm(ctx context.Context) {
-	c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
-	for {
-		if err == nil {
-			break
-		}
-		time.Sleep(1 * time.Second)
-		c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
-		s.fnLogger("CLIENT CREATE FAILED : ", err)
-	}
-	s.run(ctx, c)
-}
diff --git a/common/torule.go b/common/torule.go
deleted file mode 100644
index c8be086..0000000
--- a/common/torule.go
+++ /dev/null
@@ -1,128 +0,0 @@
-package common
-
-import (
-	"container/list"
-	"context"
-	"sync"
-	"time"
-
-	"basic.com/valib/deliver.git"
-	// "basic.com/pubsub/protomsg.git"
-	// "github.com/gogo/protobuf/proto"
-)
-
-type runResult struct {
-	data  []byte
-	valid bool
-}
-
-// ToRule ipc
-type ToRule struct {
-	ipcURL   string
-	maxSize  int
-	cache    *list.List
-	cv       *sync.Cond
-	cond     bool
-	fnLogger func(...interface{})
-}
-
-// NewToRule send to ruleprocess
-func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
-	return &ToRule{
-		ipcURL:   ipcURL,
-		maxSize:  maxSize,
-		cache:    list.New(),
-		cv:       sync.NewCond(&sync.Mutex{}),
-		cond:     false,
-		fnLogger: fn,
-	}
-}
-
-// Push data
-func (t *ToRule) Push(data []byte, valid bool) {
-
-	t.cv.L.Lock()
-	result := runResult{data, valid}
-	t.cache.PushBack(result)
-	if t.cache.Len() > t.maxSize {
-		for i := 0; i < t.cache.Len(); {
-			d := t.cache.Front().Value.(runResult)
-			if d.valid == false {
-				t.cache.Remove(t.cache.Front())
-				i = i + 2
-			} else {
-				i = i + 1
-			}
-		}
-	}
-	if t.cache.Len() > t.maxSize {
-		for i := 0; i < t.cache.Len(); {
-			t.cache.Remove(t.cache.Front())
-			i = i + 2
-		}
-	}
-	// logo.Infof("push to cache count : %d\n", t.cache.Len())
-	t.cond = true
-	t.cv.Signal()
-	t.cv.L.Unlock()
-}
-
-// Run forever
-func (t *ToRule) Run(ctx context.Context) {
-
-	var i deliver.Deliver
-	var err error
-
-	for {
-		i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
-		if err != nil {
-			time.Sleep(time.Second)
-			t.fnLogger("wait create to rule ipc", err)
-			continue
-		}
-		break
-	}
-
-	count := 0
-
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-
-			var d []byte
-			t.cv.L.Lock()
-
-			for !t.cond {
-				t.cv.Wait()
-			}
-
-			for j := 0; j < 8; j++ {
-				if t.cache.Len() <= 0 {
-					break
-				}
-
-				d = t.cache.Front().Value.(runResult).data
-				if i != nil && d != nil {
-
-					err := i.Send(d)
-					if err != nil {
-						t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
-					} else {
-						count++
-						if count > 5 {
-							count = 0
-							t.fnLogger("~~~~~~SEND TO RULE CORRECT")
-						}
-					}
-				}
-				t.cache.Remove(t.cache.Front())
-			}
-
-			t.cond = false
-			t.cv.L.Unlock()
-
-		}
-	}
-}

--
Gitblit v1.8.0