From 5459ba1d3f7f944aa97923ed9c09a5dbc7663928 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 14 一月 2020 18:01:24 +0800
Subject: [PATCH] update
---
run.go | 9
go.sum | 2
common/recv.go | 116 ++++++++
common/torule.go | 128 +++++++++
common/helper.go | 174 ++++++++++++
go.mod | 3
common/send.go | 141 ++++++++++
common/flow.go | 124 ++++++++
common/lockList.go | 89 ++++++
9 files changed, 778 insertions(+), 8 deletions(-)
diff --git a/common/flow.go b/common/flow.go
new file mode 100644
index 0000000..4dfb11c
--- /dev/null
+++ b/common/flow.go
@@ -0,0 +1,124 @@
+package common
+
+import (
+ "context"
+ "time"
+
+ "basic.com/pubsub/protomsg.git"
+
+ "basic.com/libgowrapper/sdkstruct.git"
+)
+
+/////////////////////////////////////////////////////////////////
+
+// FlowCreate create flow
+func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) {
+
+ const (
+ postPull = `_1`
+ postPush = `_2`
+ )
+ ipcRcv := GetIpcAddress(shm, id+postPull)
+ ipcSnd := GetIpcAddress(shm, id+postPush)
+ chRcv := make(chan []byte, 3)
+ chSnd := make(chan sdkstruct.MsgSDK, 3)
+
+ rcver := NewReciever(ipcRcv, chRcv, shm, fn)
+ snder := NewSender(ipcSnd, chSnd, shm, fn)
+ torule := NewToRule(ipc2Rule, ruleCacheSize, fn)
+
+ snder.ApplyCallbackFunc(torule.Push)
+
+ go rcver.Run(ctx)
+ go snder.Run(ctx)
+ go torule.Run(ctx)
+
+ return chRcv, chSnd
+}
+
+// WorkFlowSimple work
+func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string,
+ fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
+ fn func(...interface{})) {
+
+ tm := time.Now()
+ sc := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ elems := fnConsume()
+ if elems == nil || len(elems) == 0 {
+ time.Sleep(10 * time.Millisecond)
+ continue
+ }
+
+ var msgs []protomsg.SdkMessage
+ for _, v := range elems {
+ msgs = append(msgs, v.(protomsg.SdkMessage))
+ }
+
+ fnRun(msgs, out, typ)
+
+ sc++
+ if sc == 25 {
+ fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
+ sc = 0
+ tm = time.Now()
+ }
+ if time.Since(tm) > time.Second {
+ fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm))
+ sc = 0
+ tm = time.Now()
+ }
+ }
+ }
+}
+
+// FlowSimple wrap
+func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
+ fnProduce func(interface{}), fnConsume func() []interface{},
+ fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
+ fnClose func(), fn func(...interface{})) {
+
+ cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) {
+ fnRun(msgs[0], ch, typ)
+ }
+
+ FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn)
+
+}
+
+// FlowBatch batch
+func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string,
+ fnProduce func(interface{}), fnConsume func() []interface{},
+ fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string),
+ fnClose func(), fn func(...interface{})) {
+
+ chMsg := make(chan protomsg.SdkMessage, 3)
+ go UnserilizeProto(ctx, in, chMsg, fn)
+
+ go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn)
+
+ for {
+ select {
+ case <-ctx.Done():
+ fnClose()
+ return
+ case rMsg := <-chMsg:
+ if !ValidRemoteMessage(rMsg, typ, fn) {
+ fn(typ, " validremotemessage invalid")
+ EjectResult(nil, rMsg, out)
+ continue
+ }
+ fnProduce(rMsg)
+
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+
+}
diff --git a/common/helper.go b/common/helper.go
new file mode 100644
index 0000000..0e016c3
--- /dev/null
+++ b/common/helper.go
@@ -0,0 +1,174 @@
+package common
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "strconv"
+ "time"
+
+ "basic.com/libgowrapper/sdkstruct.git"
+ "basic.com/pubsub/protomsg.git"
+ "basic.com/valib/deliver.git"
+ "github.com/gogo/protobuf/proto"
+)
+
+const mode = deliver.PushPull
+
+// GetIpcAddress get ipc
+func GetIpcAddress(shm bool, id string) string {
+ if shm {
+ return id
+ }
+ return `ipc:///tmp/` + id + `.ipc`
+}
+
+// SubConfig sub
+type SubConfig struct {
+ SoFile string `json:"so_file_path"`
+ Env string `json:"runtime"`
+ Param map[string]string `json:"param"`
+}
+
+// SdkConfig sdk
+type SdkConfig struct {
+ SoFile string `json:"so_file_path"`
+ Env string `json:"runtime"`
+ Param map[string]string `json:"param"`
+ Sub *SubConfig `json:"sub"`
+}
+
+// ReadConfig conf
+func ReadConfig(file string) (SdkConfig, error) {
+ data, err := ioutil.ReadFile(file)
+ if err != nil {
+ return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
+ }
+
+ //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮�
+ var v SdkConfig
+ err = json.Unmarshal(data, &v)
+
+ return v, err
+}
+
+// Atoi atoi
+func Atoi(s string) int {
+ i, _ := strconv.Atoi(s)
+ return i
+}
+
+// UnserilizeProto un
+func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case d := <-data:
+ if len(d) < 100 {
+ continue
+ }
+ msg := protomsg.SdkMessage{}
+ if err := proto.Unmarshal(d, &msg); err != nil {
+ fn(err, " msg 澶勭悊寮傚父")
+ continue
+ }
+
+ out <- msg
+
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+// Msg2MsgSDK msg->msgsdk
+func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK {
+
+ d, err := proto.Marshal(&msg)
+ if err != nil {
+ return nil
+ }
+
+ index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos)
+ if index >= count {
+ return &sdkstruct.MsgSDK{
+ MsgData: d,
+ SdkCount: count,
+ SdkIndex: index,
+ SdkDataLen: 0,
+ }
+ }
+
+ return &sdkstruct.MsgSDK{
+ MsgData: d,
+ SdkCount: count,
+ SdkIndex: index,
+ SdkDataLen: len(d),
+ }
+}
+
+// EjectResult eject
+func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) {
+
+ if res == nil {
+ s := Msg2MsgSDK(msg)
+ if s == nil {
+ return
+ }
+ out <- *s
+ return
+ }
+
+ msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res
+
+ s := Msg2MsgSDK(msg)
+ if s == nil {
+ return
+ }
+ out <- *s
+}
+
+/////////////////////////////////////////////////////////////
+
+// ValidRemoteMessage valid or not
+func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool {
+ if msg.Tasklab == nil {
+ fn(fnName, " recieve msg nil")
+ return false
+ }
+
+ sdkLen := len(msg.Tasklab.Sdkinfos)
+ if sdkLen == 0 {
+ fn(fnName, " has no sdk info")
+ return false
+ }
+
+ curIndex := int(msg.Tasklab.Index)
+ if curIndex < 0 || curIndex >= sdkLen {
+ fn(fnName, " tasklab index ", curIndex, " error")
+ return false
+ }
+ if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName {
+ fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype)
+ return false
+ }
+ return true
+}
+
+// UnpackImage unpack
+func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image {
+ // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬
+ i := &protomsg.Image{}
+ err := proto.Unmarshal(msg.Data, i)
+ if err != nil {
+ fn(fnName, " protobuf decode CameraImage error: ", err.Error())
+ return nil
+ }
+ if i.Data == nil {
+ fn(fnName, " protomsg.Image data null")
+ return nil
+ }
+ return i
+}
diff --git a/common/lockList.go b/common/lockList.go
new file mode 100644
index 0000000..f630a6a
--- /dev/null
+++ b/common/lockList.go
@@ -0,0 +1,89 @@
+package common
+
+import (
+ "container/list"
+ "sync"
+)
+
+// LockList list
+type LockList struct {
+ cache *list.List
+ cv *sync.Cond
+ cond bool
+ size int
+}
+
+// NewLockList new
+func NewLockList(size int) *LockList {
+ return &LockList{
+ cache: list.New(),
+ cv: sync.NewCond(&sync.Mutex{}),
+ cond: false,
+ size: size,
+ }
+}
+
+// Push push
+func (l *LockList) Push(v interface{}) {
+ l.cv.L.Lock()
+ l.cache.PushBack(v)
+
+ for l.cache.Len() > l.size {
+ l.cache.Remove(l.cache.Front())
+ }
+
+ l.cond = true
+ l.cv.Signal()
+ l.cv.L.Unlock()
+}
+
+// Pop pop
+func (l *LockList) Pop() []interface{} {
+
+ var batch []interface{}
+
+ l.cv.L.Lock()
+
+ for !l.cond {
+ l.cv.Wait()
+ }
+
+ elem := l.cache.Front()
+ if elem != nil {
+ batch = append(batch, elem.Value)
+ l.cache.Remove(l.cache.Front())
+ }
+
+ l.cond = false
+ l.cv.L.Unlock()
+
+ return batch
+}
+
+// Drain drain all element
+func (l *LockList) Drain() []interface{} {
+
+ var batch []interface{}
+
+ l.cv.L.Lock()
+
+ for !l.cond {
+ l.cv.Wait()
+ }
+
+ for {
+
+ elem := l.cache.Front()
+ if elem == nil {
+ break
+ }
+
+ batch = append(batch, elem.Value)
+ l.cache.Remove(l.cache.Front())
+ }
+
+ l.cond = false
+ l.cv.L.Unlock()
+
+ return batch
+}
diff --git a/common/recv.go b/common/recv.go
new file mode 100644
index 0000000..fb31433
--- /dev/null
+++ b/common/recv.go
@@ -0,0 +1,116 @@
+package common
+
+import (
+ "context"
+
+ "time"
+
+ "basic.com/valib/deliver.git"
+)
+
+// Reciever recv from ipc
+type Reciever struct {
+ ctx context.Context
+ ipcURL string
+ out chan<- []byte
+
+ shm bool
+ fnLogger func(...interface{})
+}
+
+// NewReciever new recv
+func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+ return &Reciever{
+ ipcURL: url,
+ out: out,
+ shm: shm,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC client
+func (r *Reciever) Run(ctx context.Context) {
+
+ if r.shm {
+ r.runShm(ctx)
+ } else {
+ r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+ }
+}
+
+func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
+
+ // t := time.Now()
+ // sc := 0
+
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ default:
+
+ if r.shm {
+ if d, err := i.Recv(); err != nil {
+ i.Close()
+ r.fnLogger("ANALYSIS RECV ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("ANALYSIS CREATE FAILED : ", err)
+ }
+ i = c
+ r.fnLogger("ANALYSIS CREATE SHM")
+ } else {
+ if d != nil {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~shm recv image:", len(d))
+ }
+ r.out <- d
+ }
+ }
+ } else {
+ if msg, err := i.Recv(); err != nil {
+ // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
+ } else {
+ count++
+ if count > 10 {
+ count = 0
+ r.fnLogger("~~~mangos recv image:", len(msg))
+ }
+ r.out <- msg
+ }
+ }
+
+ // sc++
+ // if sc == 25 {
+ // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
+ // sc = 0
+ // t = time.Now()
+ // }
+
+ }
+ }
+}
+
+func (r *Reciever) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ r.run(ctx, c)
+}
diff --git a/common/send.go b/common/send.go
new file mode 100644
index 0000000..a0e7cc4
--- /dev/null
+++ b/common/send.go
@@ -0,0 +1,141 @@
+package common
+
+import (
+ "context"
+ "time"
+
+ "basic.com/libgowrapper/sdkstruct.git"
+ "basic.com/valib/deliver.git"
+)
+
+// Sender decoder ingo
+type Sender struct {
+ ipcURL string
+ chMsg <-chan sdkstruct.MsgSDK
+ shm bool
+ fn func([]byte, bool)
+
+ fnLogger func(...interface{})
+}
+
+// ApplyCallbackFunc cb
+func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
+
+ if s.fn == nil {
+ s.fn = f
+ }
+}
+
+// NewSender Sender
+func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender {
+ // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
+ return &Sender{
+ ipcURL: ipcURL,
+ chMsg: chMsg,
+ shm: shm,
+ fn: nil,
+ fnLogger: fn,
+ }
+}
+
+// Run run a IPC producer
+func (s *Sender) Run(ctx context.Context) {
+
+ if s.shm {
+ s.runShm(ctx)
+ } else {
+ i := deliver.NewClient(mode, s.ipcURL)
+ if i == nil {
+ s.fnLogger("sender 2 pubsub nng create error")
+ return
+ }
+ s.run(ctx, i)
+ }
+}
+
+func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
+
+ for {
+ select {
+ case <-ctx.Done():
+ s.fnLogger("stop Sender")
+ return
+ case i := <-s.chMsg:
+
+ data <- i.MsgData
+
+ if int(i.SdkIndex+1) == i.SdkCount {
+ if s.fn != nil {
+
+ sFlag := true
+ if i.SdkDataLen < 2 {
+ sFlag = false
+ }
+ s.fn(i.MsgData, sFlag)
+
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+
+ // go ruleserver.TimeTicker()
+
+ dataChan := make(chan []byte, 3)
+ go s.serializeProto(ctx, dataChan)
+
+ for {
+ select {
+ case <-ctx.Done():
+ i.Close()
+ return
+ case d := <-dataChan:
+
+ if s.shm {
+ if err := i.Send(d); err != nil {
+ i.Close()
+ s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ i = c
+ } else {
+
+ }
+ } else {
+ err := i.Send(d)
+ if err != nil {
+ // logo.Errorln("error sender 2 pubsub: ", err)
+ } else {
+ s.fnLogger("mangos send to pubsub len: ", len(d))
+ }
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (s *Sender) runShm(ctx context.Context) {
+ c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ for {
+ if err == nil {
+ break
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("CLIENT CREATE FAILED : ", err)
+ }
+ s.run(ctx, c)
+}
diff --git a/common/torule.go b/common/torule.go
new file mode 100644
index 0000000..c8be086
--- /dev/null
+++ b/common/torule.go
@@ -0,0 +1,128 @@
+package common
+
+import (
+ "container/list"
+ "context"
+ "sync"
+ "time"
+
+ "basic.com/valib/deliver.git"
+ // "basic.com/pubsub/protomsg.git"
+ // "github.com/gogo/protobuf/proto"
+)
+
+type runResult struct {
+ data []byte
+ valid bool
+}
+
+// ToRule ipc
+type ToRule struct {
+ ipcURL string
+ maxSize int
+ cache *list.List
+ cv *sync.Cond
+ cond bool
+ fnLogger func(...interface{})
+}
+
+// NewToRule send to ruleprocess
+func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
+ return &ToRule{
+ ipcURL: ipcURL,
+ maxSize: maxSize,
+ cache: list.New(),
+ cv: sync.NewCond(&sync.Mutex{}),
+ cond: false,
+ fnLogger: fn,
+ }
+}
+
+// Push data
+func (t *ToRule) Push(data []byte, valid bool) {
+
+ t.cv.L.Lock()
+ result := runResult{data, valid}
+ t.cache.PushBack(result)
+ if t.cache.Len() > t.maxSize {
+ for i := 0; i < t.cache.Len(); {
+ d := t.cache.Front().Value.(runResult)
+ if d.valid == false {
+ t.cache.Remove(t.cache.Front())
+ i = i + 2
+ } else {
+ i = i + 1
+ }
+ }
+ }
+ if t.cache.Len() > t.maxSize {
+ for i := 0; i < t.cache.Len(); {
+ t.cache.Remove(t.cache.Front())
+ i = i + 2
+ }
+ }
+ // logo.Infof("push to cache count : %d\n", t.cache.Len())
+ t.cond = true
+ t.cv.Signal()
+ t.cv.L.Unlock()
+}
+
+// Run forever
+func (t *ToRule) Run(ctx context.Context) {
+
+ var i deliver.Deliver
+ var err error
+
+ for {
+ i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
+ if err != nil {
+ time.Sleep(time.Second)
+ t.fnLogger("wait create to rule ipc", err)
+ continue
+ }
+ break
+ }
+
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ var d []byte
+ t.cv.L.Lock()
+
+ for !t.cond {
+ t.cv.Wait()
+ }
+
+ for j := 0; j < 8; j++ {
+ if t.cache.Len() <= 0 {
+ break
+ }
+
+ d = t.cache.Front().Value.(runResult).data
+ if i != nil && d != nil {
+
+ err := i.Send(d)
+ if err != nil {
+ t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
+ } else {
+ count++
+ if count > 5 {
+ count = 0
+ t.fnLogger("~~~~~~SEND TO RULE CORRECT")
+ }
+ }
+ }
+ t.cache.Remove(t.cache.Front())
+ }
+
+ t.cond = false
+ t.cv.L.Unlock()
+
+ }
+ }
+}
diff --git a/go.mod b/go.mod
index 5a3e3ff..ee5ce65 100644
--- a/go.mod
+++ b/go.mod
@@ -3,8 +3,7 @@
go 1.12
require (
- basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b
- basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c // indirect
+ basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573
basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3
basic.com/valib/godraw.git v0.0.0-20191122082247-26e9987cd183
diff --git a/go.sum b/go.sum
index 5217141..99a54be 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,3 @@
-basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b h1:CWneZPtnrpSv1SDYtB/D+kna3njXNWLExUfQ4xM3Lxc=
-basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I=
basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag=
basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA=
diff --git a/run.go b/run.go
index 31d5a28..fd70be9 100644
--- a/run.go
+++ b/run.go
@@ -10,7 +10,8 @@
"time"
"unsafe"
- "basic.com/libgowrapper/sdkhelper.git"
+ "reid/common"
+
"basic.com/valib/gogpu.git"
"basic.com/pubsub/protomsg.git"
@@ -35,7 +36,7 @@
// Create Reid
func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} {
- cfg, err := sdkhelper.ReadConfig(config)
+ cfg, err := common.ReadConfig(config)
if err != nil {
fn("Reid SDK Create Error When Read Config: ", err)
return nil
@@ -55,7 +56,7 @@
}
}
- gpuM := sdkhelper.Atoi(cfg.Param[sGPU])
+ gpuM := common.Atoi(cfg.Param[sGPU])
rGPU := gpu
@@ -105,7 +106,7 @@
if p >= 0 {
file := s.ipc[p+len(string(suf)):]
os.Remove(file)
- s.fnLogger("remove:", file)
+ s.fnLogger("remove:", file)
}
s.fnLogger("can't listen on rep socket: ", err)
time.Sleep(5 * time.Millisecond)
--
Gitblit v1.8.0