From f62f2cd8fee44ecf7bc54f2635172e48dc348321 Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期五, 23 八月 2019 10:34:14 +0800
Subject: [PATCH] add shm
---
/dev/null | 0
.gitignore | 2
go.sum | 4
go.mod | 3
main.go | 20 +++++-
sdk/sdk.go | 34 +++++++++--
util/util.go | 55 ++++++++++-------
camera/camera.go | 19 +++++
8 files changed, 98 insertions(+), 39 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8365624..b825233 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,5 @@
*.exe
*.test
+
+taskpubsub
diff --git a/camera/camera.go b/camera/camera.go
index 5329bae..8bc26d0 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -10,6 +10,8 @@
"taskpubsub/sdk"
"taskpubsub/util"
+ // "golang.org/x/sys/unix"
+
"fmt"
//"time"
)
@@ -21,6 +23,8 @@
var SocketManage = make(map[string]util.SocketContext)
+var shm bool = false
+
var innerRecvTopic = []string{
"virtual-faceextract-sdk-pull_2", //to web 浠ュ浘鎼滃浘
}
@@ -31,7 +35,9 @@
}
}
-func Init() {
+func Init(useShm bool) {
+ shm = useShm
+
logger.Info("============ camera info ====================")
for _, cd := range util.CameraIds {
logger.Info(cd)
@@ -54,7 +60,16 @@
if _, isExist := SocketManage[id]; !isExist { //涓嶅瓨鍦�
url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
- socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+ m := deliver.PushPull
+ if shm{
+ m = deliver.Shm
+ url = id
+
+ // unix.Unlink("/dev/shm/" + url)
+ }
+
+ fmt.Println("ipc url: ", url)
+ socket, err := util.NewSocketListen(int(m), url, shm)
if err != nil {
logger.Error("create socket error")
return
diff --git a/go.mod b/go.mod
index e962b32..98c90be 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
require (
basic.com/dbapi.git v0.0.0-20190709070522-8a9676731a65
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2
- basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
+ basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
github.com/ajg/form v1.5.1 // indirect
github.com/gogo/protobuf v1.2.1
@@ -17,5 +17,6 @@
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
+ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
nanomsg.org/go-mangos v1.4.0
)
diff --git a/go.sum b/go.sum
index 9c009ab..147a0c2 100644
--- a/go.sum
+++ b/go.sum
@@ -14,8 +14,8 @@
basic.com/pubsub/protomsg.git v0.0.0-20190705101637-65381a182a3c/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
-basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
+basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09 h1:NKyT8G/68gcBNSS3H3EElZ0v6oKWa/XUhHMykd6CJ9w=
+basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 h1:3hejanzPEBvZSSvjIqayB83/6/6SLLrX9oNZAdiYELg=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28/go.mod h1:CQ+UJyZV8MRzwwckncdUDu6/RDTKAzSIPCxc9tFcwPs=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
diff --git a/main.go b/main.go
index 15a31ac..53572a8 100644
--- a/main.go
+++ b/main.go
@@ -5,6 +5,7 @@
"flag"
_ "net/http/pprof"
"time"
+ "fmt"
"taskpubsub/camera"
"taskpubsub/sdk"
@@ -16,6 +17,8 @@
var initchan = make(chan bool)
+var useShm bool
+
func init(){
var logFile = "./taskpubsub.log"
var logSaveDays = 15
@@ -23,13 +26,22 @@
// 鏃ュ織鍒濆鍖�
logger.Config(logFile, logger.InfoLevel)
logger.SetSaveDays(logSaveDays)
- logger.Info("loginit success !")
+ logger.Info("loginit success !")
+
+ flag.BoolVar(&useShm, "shm", false, "use shm for performance")
}
-func main() {
+func main() {
flag.Parse()
time.Sleep(time.Second)
+ if useShm{
+ logger.Info("USE SHARE MEMORY")
+ fmt.Println("USE SHARE MEMORY")
+ }else{
+ logger.Info("USE PIPE")
+ fmt.Println("USE PIPE")
+ }
// pprof 鐢ㄤ簬鍒嗘瀽鎬ц兘
go func() {
logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
@@ -38,8 +50,8 @@
go util.Init(initchan)
logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
- sdk.Init() // 鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
+ sdk.Init(useShm) // 鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
tasktag.Init() // 鑾峰彇鎵�鏈変换鍔★紝寤虹珛浠诲姟鏍囩锛� 鍦ㄦ暟鎹繘鍏ユ椂锛� 鎵撴爣绛�
- camera.Init() //鑾峰彇cid, taskid, sdkid ,鍏崇郴
+ camera.Init(useShm) //鑾峰彇cid, taskid, sdkid ,鍏崇郴
select {}
}
diff --git a/sdk/sdk.go b/sdk/sdk.go
index 0998b97..8fdbc9c 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ // "golang.org/x/sys/unix"
"github.com/gogo/protobuf/proto"
"taskpubsub/tasktag"
@@ -13,12 +14,14 @@
)
const (
- postPush = "_1.ipc"
- postPull = "_2.ipc"
+ postPush = "_1"
+ postPull = "_2"
)
var SocketManage = make(map[string]util.SocketContext)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
+
+var shm bool = false
var innerRecvTopic = []string{
"facedetect-sdk-no-track", //to sdk-no-track 浠ュ浘鎼滃浘
@@ -39,7 +42,9 @@
}
}
-func Init() {
+func Init(useShm bool) {
+
+ shm = useShm
logger.Info("============= init sdk info =====================")
for _, sdkid := range util.Sdklist { // 鍒涘缓sdk server
@@ -83,8 +88,14 @@
logger.Info("create", id)
}
- url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
- socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+ url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPush)
+ m := deliver.PushPull
+ if shm{
+ m = deliver.Shm
+ url = id + postPush
+ // unix.Unlink("/dev/shm/" + url)
+ }
+ socket, err := util.NewSocketListen(int(m), url, shm)
if err != nil {
delete(SdkMap, id)
logger.Error(id, "create socket error!")
@@ -101,8 +112,17 @@
logger.Info("create", id)
}
- url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
- socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+ url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPull)
+ m := deliver.PushPull
+ if shm{
+ m = deliver.Shm
+ url = id + postPull
+
+ // unix.Unlink("/dev/shm/" + url)
+ }
+
+ socket, err := util.NewSocketListen(int(m), url, shm)
+
if err != nil {
delete(SdkMap, id)
logger.Error(id, "create socket error!")
diff --git a/taskpubsub b/taskpubsub
deleted file mode 100755
index cf091f9..0000000
--- a/taskpubsub
+++ /dev/null
Binary files differ
diff --git a/util/util.go b/util/util.go
index e19a6a3..4c92331 100644
--- a/util/util.go
+++ b/util/util.go
@@ -4,7 +4,7 @@
"basic.com/valib/deliver.git"
"context"
"errors"
- "github.com/pierrec/lz4"
+ // "github.com/pierrec/lz4"
"taskpubsub/logger"
)
@@ -48,40 +48,49 @@
// UnCompress uncompress
func UnCompress(in []byte) ([]byte, error) {
- out := make([]byte, 10*len(in))
- n, err := lz4.UncompressBlock(in, out)
- if err != nil {
- logger.Error("uncompress error: ", err)
- return nil, err
- }
- out = out[:n] // uncompressed data
- return out, nil
+ return in, nil
+
+ // out := make([]byte, 3*len(in))
+ // n, err := lz4.UncompressBlock(in, out)
+ // if err != nil {
+ // logger.Error("uncompress error: ", err)
+ // return nil, err
+ // }
+ // out = out[:n] // uncompressed data
+ // return out, nil
}
// Compress compress
func Compress(in []byte) ([]byte, error) {
- out := make([]byte, len(in))
- ht := make([]int, 64<<10) // buffer for the compression table
- n, err := lz4.CompressBlock(in, out, ht)
- if err != nil {
- logger.Error("compress: ", err)
- return nil, err
- }
- if n >= len(in) {
- logger.Error("image is not compressible")
- }
- out = out[:n] // compressed data
- return out, nil
+ return in, nil
+
+ // out := make([]byte, len(in))
+ // ht := make([]int, 64<<10) // buffer for the compression table
+ // n, err := lz4.CompressBlock(in, out, ht)
+ // if err != nil {
+ // logger.Error("compress: ", err)
+ // return nil, err
+ // }
+ // if n >= len(in) {
+ // logger.Error("image is not compressible")
+ // }
+ // out = out[:n] // compressed data
+ // return out, nil
}
// create server
-func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
+func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) {
logger.Info("url is: ", url)
ctx, cancel := context.WithCancel(context.Background())
socket.Context = ctx
socket.Cancel = cancel
- socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+
+ if shm{
+ socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+ }else{
+ socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+ }
if socket.Sock == nil {
return socket, errors.New("create listen error")
--
Gitblit v1.8.0