554325746@qq.com
2019-08-23 f62f2cd8fee44ecf7bc54f2635172e48dc348321
add shm
7个文件已修改
1个文件已删除
137 ■■■■ 已修改文件
.gitignore 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
taskpubsub 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -21,3 +21,5 @@
*.exe
*.test
taskpubsub
camera/camera.go
@@ -10,6 +10,8 @@
    "taskpubsub/sdk"
    "taskpubsub/util"
    // "golang.org/x/sys/unix"
    "fmt"
    //"time"
)
@@ -21,6 +23,8 @@
var SocketManage = make(map[string]util.SocketContext)
var shm bool = false
var innerRecvTopic = []string{
    "virtual-faceextract-sdk-pull_2", //to web 以图搜图
}
@@ -31,7 +35,9 @@
    }
}
func Init() {
func Init(useShm bool) {
    shm = useShm
    logger.Info("============ camera info ====================")
    for _, cd := range util.CameraIds {
        logger.Info(cd)
@@ -54,7 +60,16 @@
    if _, isExist := SocketManage[id]; !isExist { //不存在
        url := fmt.Sprintf("ipc:///tmp/%s.ipc", id)
        socket, err := util.NewSocketListen(int(deliver.PushPull), url)
        m := deliver.PushPull
        if shm{
            m = deliver.Shm
            url = id
            // unix.Unlink("/dev/shm/" + url)
        }
        fmt.Println("ipc url: ", url)
        socket, err := util.NewSocketListen(int(m), url, shm)
        if err != nil {
            logger.Error("create socket error")
            return
go.mod
@@ -5,7 +5,7 @@
require (
    basic.com/dbapi.git v0.0.0-20190709070522-8a9676731a65
    basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2
    basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce
    basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09
    basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
    github.com/ajg/form v1.5.1 // indirect
    github.com/gogo/protobuf v1.2.1
@@ -17,5 +17,6 @@
    github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
    github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 // indirect
    golang.org/x/net v0.0.0-20190522155817-f3200d17e092
    golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -14,8 +14,8 @@
basic.com/pubsub/protomsg.git v0.0.0-20190705101637-65381a182a3c/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce h1:/D6k+FVN1sMqLz6tMlsIl9bKwE2Mpc5d4QfPh0y4DSQ=
basic.com/valib/deliver.git v0.0.0-20190529080650-3e64847c9bce/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09 h1:NKyT8G/68gcBNSS3H3EElZ0v6oKWa/XUhHMykd6CJ9w=
basic.com/valib/deliver.git v0.0.0-20190823023101-df7358b07a09/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 h1:3hejanzPEBvZSSvjIqayB83/6/6SLLrX9oNZAdiYELg=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28/go.mod h1:CQ+UJyZV8MRzwwckncdUDu6/RDTKAzSIPCxc9tFcwPs=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
main.go
@@ -5,6 +5,7 @@
    "flag"
    _ "net/http/pprof"
    "time"
    "fmt"
    "taskpubsub/camera"
    "taskpubsub/sdk"
@@ -16,6 +17,8 @@
var initchan = make(chan bool)
var useShm bool
func init(){
    var logFile = "./taskpubsub.log"
    var logSaveDays    =    15
@@ -23,13 +26,22 @@
    // 日志初始化
    logger.Config(logFile, logger.InfoLevel)
    logger.SetSaveDays(logSaveDays)
    logger.Info("loginit success !")
    logger.Info("loginit success !")
    flag.BoolVar(&useShm, "shm", false, "use shm for performance")
}     
        
func     main() {
func main() {
    flag.Parse()
    time.Sleep(time.Second)
    if useShm{
        logger.Info("USE SHARE MEMORY")
        fmt.Println("USE SHARE MEMORY")
    }else{
        logger.Info("USE PIPE")
        fmt.Println("USE PIPE")
    }
    // pprof 用于分析性能
    go func() {
        logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
@@ -38,8 +50,8 @@
    go util.Init(initchan)
    logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
    sdk.Init()        //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    sdk.Init(useShm)        //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    tasktag.Init()   // 获取所有任务,建立任务标签, 在数据进入时, 打标签
    camera.Init()   //获取cid, taskid, sdkid ,关系
    camera.Init(useShm)   //获取cid, taskid, sdkid ,关系
    select {}
}
sdk/sdk.go
@@ -2,6 +2,7 @@
import (
    "fmt"
    // "golang.org/x/sys/unix"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/tasktag"
@@ -13,12 +14,14 @@
)
const (
    postPush = "_1.ipc"
    postPull = "_2.ipc"
    postPush = "_1"
    postPull = "_2"
)
var SocketManage = make(map[string]util.SocketContext)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
var shm bool = false
var innerRecvTopic = []string{
    "facedetect-sdk-no-track", //to sdk-no-track 以图搜图
@@ -39,7 +42,9 @@
    }
}
func Init() {
func Init(useShm bool) {
    shm = useShm
    logger.Info("============= init sdk info =====================")
    for _, sdkid := range util.Sdklist { // 创建sdk server
@@ -83,8 +88,14 @@
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPush)
    m := deliver.PushPull
    if shm{
        m = deliver.Shm
        url = id + postPush
        // unix.Unlink("/dev/shm/" + url)
    }
    socket, err := util.NewSocketListen(int(m), url, shm)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
@@ -101,8 +112,17 @@
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    url := fmt.Sprintf("ipc:///tmp/%s%s.ipc", id, postPull)
    m := deliver.PushPull
    if shm{
        m = deliver.Shm
        url = id + postPull
        // unix.Unlink("/dev/shm/" + url)
    }
    socket, err := util.NewSocketListen(int(m), url, shm)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
taskpubsub
Binary files differ
util/util.go
@@ -4,7 +4,7 @@
    "basic.com/valib/deliver.git"
    "context"
    "errors"
    "github.com/pierrec/lz4"
    // "github.com/pierrec/lz4"
    "taskpubsub/logger"
)
@@ -48,40 +48,49 @@
// UnCompress uncompress
func UnCompress(in []byte) ([]byte, error) {
    out := make([]byte, 10*len(in))
    n, err := lz4.UncompressBlock(in, out)
    if err != nil {
        logger.Error("uncompress error: ", err)
        return nil, err
    }
    out = out[:n] // uncompressed data
    return out, nil
    return in, nil
    // out := make([]byte, 3*len(in))
    // n, err := lz4.UncompressBlock(in, out)
    // if err != nil {
    //     logger.Error("uncompress error: ", err)
    //     return nil, err
    // }
    // out = out[:n] // uncompressed data
    // return out, nil
}
// Compress compress
func Compress(in []byte) ([]byte, error) {
    out := make([]byte, len(in))
    ht := make([]int, 64<<10) // buffer for the compression table
    n, err := lz4.CompressBlock(in, out, ht)
    if err != nil {
        logger.Error("compress: ", err)
        return nil, err
    }
    if n >= len(in) {
        logger.Error("image is not compressible")
    }
    out = out[:n] // compressed data
    return out, nil
    return in, nil
    // out := make([]byte, len(in))
    // ht := make([]int, 64<<10) // buffer for the compression table
    // n, err := lz4.CompressBlock(in, out, ht)
    // if err != nil {
    //     logger.Error("compress: ", err)
    //     return nil, err
    // }
    // if n >= len(in) {
    //     logger.Error("image is not compressible")
    // }
    // out = out[:n] // compressed data
    // return out, nil
}
// create server
func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    if shm{
        socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    }else{
        socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    }
    if socket.Sock == nil {
        return socket, errors.New("create listen error")