chenshijun
2019-09-11 91ca2658d546bf5d9857b144e6beefa1019ba972
合并张蒙的共享内存,增加logger
7个文件已修改
1个文件已删除
294 ■■■■■ 已修改文件
.gitignore 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
taskpubsub 补丁 | 查看 | 原始文档 | blame | 历史
util/util.go 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -21,3 +21,6 @@
*.exe
*.test
taskpubsub
*.log*
camera/camera.go
@@ -16,6 +16,8 @@
var SocketManage = make(map[string]util.SocketContext)
var shm bool = false
var innerRecvTopic = []string{
    "virtual-faceextract-sdk-pull_2", //to web 以图搜图
}
@@ -26,7 +28,9 @@
    }
}
func Init() {
func Init(useShm bool) {
    shm = useShm
    logger.Info("============ camera info ====================")
    for _, cd := range util.CameraIds {
        logger.Info(cd)
@@ -49,7 +53,15 @@
    if _, isExist := SocketManage[id]; !isExist { //不存在
        url := "ipc:///tmp/" + id + ".ipc"
        socket, err := util.NewSocketListen(int(deliver.PushPull), url)
        m := deliver.PushPull
        if shm {
            m = deliver.Shm
            url = id
        }
        logger.Info("CAMERA URL : ", url)
        socket, err := util.NewSocketListen(int(m), url, shm)
        if err != nil {
            logger.Error("create socket error")
            return
@@ -104,6 +116,8 @@
func Recv(socket util.SocketContext) {
    tryCount := 0
    var recvmessage []byte
    var imagemsg protomsg.Image
    var err error
@@ -115,6 +129,18 @@
        default:
            if recvmessage, err = socket.Sock.Recv(); err != nil {
                //logger.Error("[camera] err is: ", err)
                if socket.UseSHM {
                    if tryCount > util.SHMMaxTryCount {
                        socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
                        logger.Info("CAMERA SHM TRY :", tryCount, " RESTART IT")
                        tryCount = 0
                        continue
                    }
                    tryCount++
                }
                continue
            }
@@ -134,7 +160,7 @@
            } else {
                taskIDs := GetAllTaskByID(imagemsg.Cid)
                for _, taskID := range taskIDs {
                    //logger.Info("id: ", imagemsg.Cid, " taskid: ", taskID)
                    logger.Debug("id: ", imagemsg.Cid, " taskid: ", taskID)
                    doTaskList(imagemsg.Cid, "", taskID, recvmessage)
                }
            }
@@ -147,6 +173,9 @@
    for _, camsingle := range util.CameraTasks {
        if cid == camsingle.Camera.Id {
            for _, tasksingle := range camsingle.Tasks {
                if !tasksingle.Enable {
                    continue
                }
                tasks = append(tasks, tasksingle.Taskid)
            }
            return
go.mod
@@ -3,18 +3,20 @@
go 1.12
require (
    basic.com/dbapi.git v0.0.0-20190724082851-b6ae90344405
    basic.com/pubsub/protomsg.git v0.0.0-20190724114524-fc1623501c41
    basic.com/valib/deliver.git v0.0.0-20190716093339-1de0716341cd
    basic.com/dbapi.git v0.0.0-20190831100542-bc0c5d2266dd
    basic.com/pubsub/protomsg.git v0.0.0-20190905061607-7b96dafe8f99
    basic.com/valib/deliver.git v0.0.0-20190830083657-47adbbbb6651
    basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
    basic.com/valib/logger.git v0.0.0-20190904090733-b737ad2f8f18
    basic.com/valib/shm.git v0.0.0-20190829074754-ad2e00879627 // indirect
    github.com/ajg/form v1.5.1 // indirect
    github.com/gogo/protobuf v1.2.1
    github.com/golang/protobuf v1.3.2 // indirect
    github.com/golang/protobuf v1.3.1
    github.com/gorilla/websocket v1.4.0 // indirect
    github.com/pierrec/lz4 v2.2.4+incompatible
    github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
    github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
    github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 // indirect
    nanomsg.org/go-mangos v1.4.0 // indirect
    github.com/pierrec/lz4 v2.0.5+incompatible
    github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a
    golang.org/x/net v0.0.0-20190522155817-f3200d17e092
    golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
    google.golang.org/grpc v1.23.0 // indirect
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -1,68 +1,58 @@
basic.com/dbapi.git v0.0.0-20190724082851-b6ae90344405 h1:BJzdtGipKxQAaptrwUNOVQZ3Qx4jbeAf72wkqBmm5vE=
basic.com/dbapi.git v0.0.0-20190724082851-b6ae90344405/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/pubsub/protomsg.git v0.0.0-20190724093546-c998030fd269 h1:gXyJVIGOaECCtqzmG27AZc8Af7KToWbsfL9ulZosJM0=
basic.com/pubsub/protomsg.git v0.0.0-20190724093546-c998030fd269/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190724113021-21f454bb4964 h1:VGvEMKQnCE1X0BG1zDn0oRfS/+lRv9Ue7ec1mCj1slY=
basic.com/pubsub/protomsg.git v0.0.0-20190724113021-21f454bb4964/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190724114524-fc1623501c41 h1:Myyy43oOGfmtGsCSfKixqhMAJnYax8LUskRS2+xAtSo=
basic.com/pubsub/protomsg.git v0.0.0-20190724114524-fc1623501c41/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190716093339-1de0716341cd h1:xbHU3s8iyUbegjIOkrw2Z4rl+jGDPONKsqzdU6a+F+U=
basic.com/valib/deliver.git v0.0.0-20190716093339-1de0716341cd/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
basic.com/dbapi.git v0.0.0-20190831100542-bc0c5d2266dd h1:8Q5PUZqdRvyXZ6aTOaHP6ge8Ir2Qs/eD0r8Sv8P/8p4=
basic.com/dbapi.git v0.0.0-20190831100542-bc0c5d2266dd/go.mod h1:eDXPnxaz6jZPDvBSk7ya7oSASWPCuUEgRTJCjsfKt/Q=
basic.com/pubsub/protomsg.git v0.0.0-20190905061607-7b96dafe8f99 h1:YSmWZPp/mHoq+/L5d0iTsqjiCcVwZqEQRQAXxQFSbvY=
basic.com/pubsub/protomsg.git v0.0.0-20190905061607-7b96dafe8f99/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/valib/deliver.git v0.0.0-20190830083657-47adbbbb6651 h1:kZyOKcTe0MnN8WMjrbZ96VhFqQ13VUO9yL6p2lRv6J0=
basic.com/valib/deliver.git v0.0.0-20190830083657-47adbbbb6651/go.mod h1:bkYiTUGzckyNOjAgn9rB/DOjFzwoSHJlruuWQ6hu6IY=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 h1:3hejanzPEBvZSSvjIqayB83/6/6SLLrX9oNZAdiYELg=
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28/go.mod h1:CQ+UJyZV8MRzwwckncdUDu6/RDTKAzSIPCxc9tFcwPs=
basic.com/valib/logger.git v0.0.0-20190904090733-b737ad2f8f18 h1:lfBv29ApfEvecKkUIpKHHsS1qu74ZZGNmzZkazK9PJ8=
basic.com/valib/logger.git v0.0.0-20190904090733-b737ad2f8f18/go.mod h1:SPlOGUUlxCscwF1dkqmLb0oJXVqg1uJ8hsPXLFxrw1M=
code.cloudfoundry.org/bytefmt v0.0.0-20180906201452-2aa6f33b730c/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
basic.com/valib/shm.git v0.0.0-20190829074754-ad2e00879627 h1:y0t0XG2uPSygF+hiSdLP3Lr959tip/FDPYJpHhbKRec=
basic.com/valib/shm.git v0.0.0-20190829074754-ad2e00879627/go.mod h1:yYRM7bM9y0KKd4IfNt3myjsvkFVFIIWNjsvK14tNbq4=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.4.0/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pierrec/cmdflag v0.0.2/go.mod h1:a3zKGZ3cdQUfxjd0RGMLZr8xI3nvpJOB+m6o/1X5BmU=
github.com/pierrec/lz4 v2.2.4+incompatible h1:Nqu6xTyB8QULqQb4eoa7pB8mIOPXwyi47u+hn45ykxs=
github.com/pierrec/lz4 v2.2.4+incompatible/go.mod h1:i5iVM8Tm8BGXjrx6RR3Wz6sQZlZvYr/QQ+kwo1ocGwk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/schollz/progressbar/v2 v2.12.1/go.mod h1:fBI3onORwtNtwCWJHsrXtjE3QnJOtqIZrvr3rDaF7L0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco=
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.23.0 h1:AzbTB6ux+okLTzP8Ru1Xs41C303zdcfEht7MQnYJt5A=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
main.go
@@ -5,7 +5,6 @@
    "flag"
    _ "net/http/pprof"
    "time"
    "taskpubsub/camera"
    "taskpubsub/sdk"
    "taskpubsub/tasktag"
@@ -16,6 +15,8 @@
var initchan = make(chan bool)
var useShm bool
func init(){
    var logFile = "./taskpubsub.log"
    var logSaveDays    =    15
@@ -23,13 +24,20 @@
    // 日志初始化
    logger.Config(logFile, logger.InfoLevel)
    logger.SetSaveDays(logSaveDays)
    logger.Info("loginit success !")
    logger.Info("loginit success !")
    flag.BoolVar(&useShm, "shm", false, "use shm for performance")
}     
        
func     main() {
func main() {
    flag.Parse()
    time.Sleep(time.Second)
    if useShm{
        logger.Info("USE SHARE MEMORY")
    }else{
        logger.Info("USE PIPE")
    }
    // pprof 用于分析性能
    go func() {
        logger.Info(http.ListenAndServe("0.0.0.0:6061", nil))
@@ -38,8 +46,8 @@
    go util.Init(initchan)
    logger.Info("init ok !!!!, start sdk, task, camera init process ....", <-initchan)
    sdk.Init()        //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    sdk.Init(useShm)        //  获取所有算法id ,建立 sdk 主题, 建立sdk server(send, recv 运行)
    tasktag.Init()   // 获取所有任务,建立任务标签, 在数据进入时, 打标签
    camera.Init()   //获取cid, taskid, sdkid ,关系
    camera.Init(useShm)   //获取cid, taskid, sdkid ,关系
    select {}
}
sdk/sdk.go
@@ -1,8 +1,6 @@
package sdk
import (
    "fmt"
    "github.com/gogo/protobuf/proto"
    "taskpubsub/tasktag"
    "taskpubsub/util"
@@ -13,12 +11,14 @@
)
const (
    postPush = "_1.ipc"
    postPull = "_2.ipc"
    postPush = "_1"
    postPull = "_2"
)
var SocketManage = make(map[string]util.SocketContext)
var SdkMap = make(map[string]chan protomsg.SdkMessage)
var shm bool = false
var innerRecvTopic = []string{
    "facedetect-sdk-no-track", //to sdk-no-track 以图搜图
@@ -39,7 +39,9 @@
    }
}
func Init() {
func Init(useShm bool) {
    shm = useShm
    logger.Info("============= init sdk info =====================")
    for _, sdkid := range util.Sdklist { // 创建sdk server
@@ -53,8 +55,6 @@
    // es
    SdkMap["es"] = make(chan protomsg.SdkMessage)
    logger.Info("create es channel: ")
    logger.Info("SdkMap:", SdkMap)
    go DealEsTopic()
    go autoUpdateSdk(util.Sdkflag)
@@ -85,8 +85,16 @@
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPush)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    url := "ipc:///tmp/" + id + postPush + ".ipc"
    m := deliver.PushPull
    if shm {
        m = deliver.Shm
        url = id + postPush
    }
    logger.Info("SDK URL: ", url)
    socket, err := util.NewSocketListen(int(m), url, shm)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
@@ -103,8 +111,16 @@
        logger.Info("create", id)
    }
    url := fmt.Sprintf("ipc:///tmp/%s%s", id, postPull)
    socket, err := util.NewSocketListen(int(deliver.PushPull), url)
    url := "ipc:///tmp/" + id + postPull + ".ipc"
    m := deliver.PushPull
    if shm {
        m = deliver.Shm
        url = id + postPull
    }
    socket, err := util.NewSocketListen(int(m), url, shm)
    if err != nil {
        delete(SdkMap, id)
        logger.Error(id, "create socket error!")
@@ -192,6 +208,9 @@
}
func Recv(socket util.SocketContext) {
    tryCount := 0
    var repsdkmsg = protomsg.SdkMessage{}
    for {
        select {
@@ -200,6 +219,17 @@
            return
        default:
            if msg, err := socket.Sock.Recv(); err != nil {
                if socket.UseSHM {
                    if tryCount > util.SHMMaxTryCount {
                        socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
                        logger.Info("SDK RECV SHM TRY :", tryCount, " RESTART IT")
                        tryCount = 0
                        continue
                    }
                    tryCount++
                }
                continue
            } else {
                err = proto.Unmarshal(msg, &repsdkmsg)
@@ -221,6 +251,8 @@
func Send(sdkid string, socket util.SocketContext, in chan protomsg.SdkMessage) {
    tryCount := 0
    for {
        select {
        case <-socket.Context.Done():
@@ -238,6 +270,19 @@
                    if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
                        logger.Error("failed send:sdkid=", sdkid)
                    }
                    if socket.UseSHM {
                        if tryCount > util.SHMMaxTryCount {
                            socket, err = util.NewSocketListen(socket.Mode, socket.URL, socket.UseSHM)
                            logger.Info("SDK SEND SHM TRY :", util.SHMMaxTryCount, " RESTART IT")
                            tryCount = 0
                            continue
                        }
                        tryCount++
                    }
                    continue
                }
                if "facedetect-sdk-no-track" == sdkid || "virtual-faceextract-sdk-pull" == sdkid {
taskpubsub
Binary files differ
util/util.go
@@ -4,14 +4,20 @@
    "basic.com/valib/deliver.git"
    "context"
    "errors"
    "github.com/pierrec/lz4"
    "basic.com/valib/logger.git"
)
const SHMMaxTryCount = 3
type SocketContext struct {
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
    Mode   int
    URL    string
    UseSHM bool
}
//  1. oldstring element is not in new  : abandon(delete)
@@ -48,40 +54,57 @@
// UnCompress uncompress
func UnCompress(in []byte) ([]byte, error) {
    out := make([]byte, 10*len(in))
    n, err := lz4.UncompressBlock(in, out)
    if err != nil {
        logger.Error("uncompress error: ", err)
        return nil, err
    }
    out = out[:n] // uncompressed data
    return out, nil
    return in, nil
    // out := make([]byte, 3*len(in))
    // n, err := lz4.UncompressBlock(in, out)
    // if err != nil {
    //     logger.Error("uncompress error: ", err)
    //     return nil, err
    // }
    // out = out[:n] // uncompressed data
    // return out, nil
}
// Compress compress
func Compress(in []byte) ([]byte, error) {
    out := make([]byte, len(in))
    ht := make([]int, 64<<10) // buffer for the compression table
    n, err := lz4.CompressBlock(in, out, ht)
    if err != nil {
        logger.Error("compress: ", err)
        return nil, err
    }
    if n >= len(in) {
        logger.Error("image is not compressible")
    }
    out = out[:n] // compressed data
    return out, nil
    return in, nil
    // out := make([]byte, len(in))
    // ht := make([]int, 64<<10) // buffer for the compression table
    // n, err := lz4.CompressBlock(in, out, ht)
    // if err != nil {
    //     logger.Error("compress: ", err)
    //     return nil, err
    // }
    // if n >= len(in) {
    //     logger.Error("image is not compressible")
    // }
    // out = out[:n] // compressed data
    // return out, nil
}
// create server
func NewSocketListen(mode int, url string) (socket SocketContext, err error) {
func NewSocketListen(mode int, url string, shm bool) (socket SocketContext, err error) {
    logger.Info("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    socket.Mode = mode
    socket.URL = url
    socket.UseSHM = shm
    if shm {
        var err error
        socket.Sock, err = deliver.NewServerWithError(deliver.Mode(mode), url)
        if err != nil {
            logger.Info("TASK CREATE SERVER SHM ERROR: ", err)
        }
    } else {
        socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    }
    if socket.Sock == nil {
        return socket, errors.New("create listen error")