From f62f2cd8fee44ecf7bc54f2635172e48dc348321 Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期五, 23 八月 2019 10:34:14 +0800
Subject: [PATCH] add shm

---
 /dev/null        |    0 
 .gitignore       |    2 
 go.sum           |    4 
 go.mod           |    3 
 main.go          |   20 +++++-
 sdk/sdk.go       |   34 +++++++++--
 util/util.go     |   55 ++++++++++-------
 camera/camera.go |   19 +++++
 8 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/.gitignore b/.gitignore
index 8365624..b825233 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,5 @@
 
 *.exe
 *.test
+
+taskpubsub
diff --git a/camera/camera.go b/camera/camera.go
index 5329bae..8bc26d0 100644
--- a/camera/camera.go
+++ b/camera/camera.go
@@ -10,6 +10,8 @@
 	"taskpubsub/sdk"
 	"taskpubsub/util"
 
+	// "golang.org/x/sys/unix"
+
 	"fmt"
 	//"time"
 )
@@ -21,6 +23,8 @@
 
 var SocketManage = make(map[string]util.SocketContext)
 
+var shm bool = false
+
 var innerRecvTopic = []string{
 	"virtual-faceextract-sdk-pull_2", //to web 浠ュ浘鎼滃浘
 }
@@ -31,7 +35,9 @@
 	}
 }
 
-func Init() {
+func Init(useShm bool) {
+	shm = useShm
+
 	logger.Info("============ camera info ====================")
 	for _, cd := range util.CameraIds {
 		logger.Info(cd)
@@ -54,7 +60,16 @@
 	if _, isExist := SocketManage[id]; !isExist { //涓嶅瓨鍦�
 
 		url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
-		socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+		m := deliver.PushPull
+		if shm{
+			m = deliver.Shm
+			url = id
+
+			// unix.Unlink("/dev/shm/" + url)
+		}
+		
+		fmt.Println("ipc url: ", url)
+		socket, err := util.NewSocketListen(int(m), url, shm)
 		if err != nil {
 			logger.Error("create socket error")
 			return
diff --git a/go.mod b/go.mod
index e962b32..98c90be 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
 require (
 	basic.com/dbapi.git v0.0.0-20190709070522-8a9676731a65
 	basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2
-	basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
+	basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09
 	basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
 	github.com/ajg/form v1.5.1 // indirect
 	github.com/gogo/protobuf v1.2.1
@@ -17,5 +17,6 @@
 	github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
 	github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 // indirect
 	golang.org/x/net v0.0.0-20190522155817-f3200d17e092
+	golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
 	nanomsg.org/go-mangos v1.4.0
 )
diff --git a/go.sum b/go.sum
index 9c009ab..147a0c2 100644
--- a/go.sum
+++ b/go.sum
@@ -14,8 +14,8 @@
 basic.com/pubsub/protomsg.git v0.0.0-20190705101637-65381a182a3c/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
 basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
 basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
-basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
+basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09 h1:NKyT8G/68gcBNSS3H3EElZ0v6oKWa/XUhHMykd6CJ9w=
+basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
 basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 h1:3hejanzPEBvZSSvjIqayB83/6/6SLLrX9oNZAdiYELg=
 basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28/go.mod h1:CQ+UJyZV8MRzwwckncdUDu6/RDTKAzSIPCxc9tFcwPs=
 github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
diff --git a/main.go b/main.go
index 15a31ac..53572a8 100644
--- a/main.go
+++ b/main.go
@@ -5,6 +5,7 @@
     "flag"
 	_ "net/http/pprof"
     "time"
+	"fmt"
 
 	"taskpubsub/camera"
 	"taskpubsub/sdk"
@@ -16,6 +17,8 @@
 
 var initchan = make(chan bool)
 
+var useShm bool
+
 func init(){
     var logFile = "./taskpubsub.log"
     var logSaveDays    =    15
@@ -23,13 +26,22 @@
     // 鏃ュ織鍒濆鍖�
     logger.Config(logFile, logger.InfoLevel)
     logger.SetSaveDays(logSaveDays)
-    logger.Info("loginit success !")
+	logger.Info("loginit success !")
+	
+	flag.BoolVar(&useShm, "shm", false, "use shm for performance")
 }     
         
-func     main() {
+func main() {
     flag.Parse()
     time.Sleep(time.Second)
 
+	if useShm{
+		logger.Info("USE SHARE MEMORY")
+		fmt.Println("USE SHARE MEMORY")
+	}else{
+		logger.Info("USE PIPE")
+		fmt.Println("USE PIPE")
+	}
 	// pprof 鐢ㄤ簬鍒嗘瀽鎬ц兘
 	go func() {
 		logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
@@ -38,8 +50,8 @@
 	go util.Init(initchan)
 
 	logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
-	sdk.Init()        //  鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
+	sdk.Init(useShm)        //  鑾峰彇鎵�鏈夌畻娉昳d ,寤虹珛 sdk 涓婚锛� 寤虹珛sdk server(send, recv 杩愯)
 	tasktag.Init()   // 鑾峰彇鎵�鏈変换鍔★紝寤虹珛浠诲姟鏍囩锛� 鍦ㄦ暟鎹繘鍏ユ椂锛� 鎵撴爣绛�
-	camera.Init()   //鑾峰彇cid, taskid, sdkid ,鍏崇郴
+	camera.Init(useShm)   //鑾峰彇cid, taskid, sdkid ,鍏崇郴
 	select {}
 }
diff --git a/sdk/sdk.go b/sdk/sdk.go
index 0998b97..8fdbc9c 100644
--- a/sdk/sdk.go
+++ b/sdk/sdk.go
@@ -2,6 +2,7 @@
 
 import (
 	"fmt"
+	// "golang.org/x/sys/unix"
 
 	"github.com/gogo/protobuf/proto"
 	"taskpubsub/tasktag"
@@ -13,12 +14,14 @@
 )
 
 const (
-	postPush = "_1.ipc"
-	postPull = "_2.ipc"
+	postPush = "_1"
+	postPull = "_2"
 )
 
 var SocketManage = make(map[string]util.SocketContext)
 var SdkMap = make(map[string]chan protomsg.SdkMessage)
+
+var shm bool = false
 
 var innerRecvTopic = []string{
 	"facedetect-sdk-no-track", //to sdk-no-track 浠ュ浘鎼滃浘
@@ -39,7 +42,9 @@
 	}
 }
 
-func Init() {
+func Init(useShm bool) {
+
+	shm = useShm
 
 	logger.Info("============= init sdk info =====================")
 	for _, sdkid := range util.Sdklist { // 鍒涘缓sdk server
@@ -83,8 +88,14 @@
 		logger.Info("create", id)
 	}
 
-	url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
-	socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+	url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPush)
+	m := deliver.PushPull
+	if shm{
+		m = deliver.Shm
+		url = id + postPush
+		// unix.Unlink("/dev/shm/" + url)
+	}
+	socket, err := util.NewSocketListen(int(m), url, shm)
 	if err != nil {
 		delete(SdkMap, id)
 		logger.Error(id, "create socket error!")
@@ -101,8 +112,17 @@
 		logger.Info("create", id)
 	}
 
-	url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
-	socket, err := util.NewSocketListen(int(deliver.PushPull), url)
+	url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPull)
+	m := deliver.PushPull
+	if shm{
+		m = deliver.Shm
+		url = id + postPull
+
+		// unix.Unlink("/dev/shm/" + url)
+	}
+
+	socket, err := util.NewSocketListen(int(m), url, shm)
+
 	if err != nil {
 		delete(SdkMap, id)
 		logger.Error(id, "create socket error!")
diff --git a/taskpubsub b/taskpubsub
deleted file mode 100755
index cf091f9..0000000
--- a/taskpubsub
+++ /dev/null
Binary files differ
diff --git a/util/util.go b/util/util.go
index e19a6a3..4c92331 100644
--- a/util/util.go
+++ b/util/util.go
@@ -4,7 +4,7 @@
 	"basic.com/valib/deliver.git"
 	"context"
 	"errors"
-	"github.com/pierrec/lz4"
+	// "github.com/pierrec/lz4"
 	"taskpubsub/logger"
 )
 
@@ -48,40 +48,49 @@
 
 // UnCompress uncompress
 func UnCompress(in []byte) ([]byte, error) {
-	out := make([]byte, 10*len(in))
-	n, err := lz4.UncompressBlock(in, out)
-	if err != nil {
-		logger.Error("uncompress error: ", err)
-		return nil, err
-	}
-	out = out[:n] // uncompressed data
-	return out, nil
+	return in, nil
+
+	// out := make([]byte, 3*len(in))
+	// n, err := lz4.UncompressBlock(in, out)
+	// if err != nil {
+	// 	logger.Error("uncompress error: ", err)
+	// 	return nil, err
+	// }
+	// out = out[:n] // uncompressed data
+	// return out, nil
 }
 
 // Compress compress
 func Compress(in []byte) ([]byte, error) {
-	out := make([]byte, len(in))
-	ht := make([]int, 64<<10) // buffer for the compression table
-	n, err := lz4.CompressBlock(in, out, ht)
-	if err != nil {
-		logger.Error("compress: ", err)
-		return nil, err
-	}
-	if n >= len(in) {
-		logger.Error("image is not compressible")
-	}
-	out = out[:n] // compressed data
-	return out, nil
+	return in, nil
+
+	// out := make([]byte, len(in))
+	// ht := make([]int, 64<<10) // buffer for the compression table
+	// n, err := lz4.CompressBlock(in, out, ht)
+	// if err != nil {
+	// 	logger.Error("compress: ", err)
+	// 	return nil, err
+	// }
+	// if n >= len(in) {
+	// 	logger.Error("image is not compressible")
+	// }
+	// out = out[:n] // compressed data
+	// return out, nil
 }
 
 // create server
-func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
+func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) {
 	logger.Info("url is: ", url)
 	ctx, cancel := context.WithCancel(context.Background())
 
 	socket.Context = ctx
 	socket.Cancel = cancel
-	socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+
+	if shm{
+		socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+	}else{
+		socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
+	}
 
 	if socket.Sock == nil {
 		return socket, errors.New("create listen error")

--
Gitblit v1.8.0