龙赣华
2019-05-20 c7c180fcbe012173e56bde6141ae4b2ed3827ab4
add common socket protocol
5个文件已修改
213 ■■■■■ 已修改文件
camera/camera.go 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sdk/sdk.go 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test 补丁 | 查看 | 原始文档 | blame | 历史
camera/camera.go
@@ -1,13 +1,11 @@
package camera
import (
    "errors"
    "github.com/long/test/deliver"
    "github.com/long/test/httpclient"
    "github.com/long/test/sdk"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "context"
    "encoding/json"
@@ -30,7 +28,7 @@
}
type SocketContext struct {
    Sock    mangos.Socket
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
@@ -38,7 +36,6 @@
func Taskdolist(cid string, taskid string, data []byte) {
    fmt.Println("======================================")
    fmt.Println()
    //  数据加工(打标签)
    sdkmsg := sdk.SdkData(cid, taskid, data)
    if sdkmsg.Tasklab == nil {
@@ -64,10 +61,12 @@
    fmt.Println()
    fmt.Println("cid,taskid , sid: ", CtsId)
    _, socket, err := NewCamerSocketListen("init", "tcp", "192.168.1.124", 0)
    url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort)
    _, socket, err := NewCamerSocketListen(deliver.Pair, "init", url)
    if err != nil {
        return
    }
    UrlPort++
    go SendRecv(socket, Initchannel)
@@ -87,96 +86,37 @@
    }
}
func Recv(cameraid string, socket SocketContext) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    var msg []byte
    var err error
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("listen recv quit")
            return
        default:
            if msg, err = socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                for _, taskid := range GetAlltask(cameraid) {
                    go Taskdolist(cameraid, taskid, msg)
                    fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
                }
            }
        }
    }
}
func send(socket SocketContext, cam Camerdata) {
    b, err := json.Marshal(cam)
    if err != nil {
        fmt.Println("can not json convert !", cam)
    }
    if err := socket.Sock.Send(b); err != nil {
        fmt.Println("failed send")
        fmt.Println("camera info: failed send")
    }
}
func SendRecv(socket SocketContext, camera chan Camerdata) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    //socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    for cam := range camera {
        send(socket, cam)
        if _, ok := SocketManage[cam.Cameraid]; !ok {
            go func(id string) {
            go func(id string, urlport int) {
                fmt.Println("create cid server: ", id)
                cid, socketlisten, _ := NewCamerSocketListen(id, "tcp", "192.168.1.124", 0)
                url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", urlport)
                cid, socketlisten, err := NewCamerSocketListen(deliver.PushPull, id, url)
                if err != nil {
                    return
                }
                fmt.Println("input id: ", id, " output id :", cid)
                Recv(cid, socketlisten)
            }(cam.Cameraid)
        }
    }
}
            }(cam.Cameraid, UrlPort)
// new camera server
func NewCamerSocketListen(cameraid string, protocol string, ip string, port int) (cid string, socket SocketContext, err error) {
    ctx, cancel := context.WithCancel(context.Background())
    var url string
    socket.Context = ctx
    socket.Cancel = cancel
    switch protocol {
    case "tcp":
        if port == 0 {
            port = UrlPort
            UrlPort++
        }
        url = fmt.Sprintf("%s://%s:%d", protocol, ip, port)
    case "ipc":
        url = fmt.Sprintf("%s://%s", cameraid)
    }
    fmt.Println(url)
    if socket.Sock, err = pair.NewSocket(); err != nil {
        fmt.Println(cameraid, "can't get new pair socket: ", err.Error())
        return cameraid, socket, err
    }
    socket.Sock.AddTransport(tcp.NewTransport())
    socket.Sock.AddTransport(ipc.NewTransport())
    socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024)
    socket.Sock.SetOption(mangos.OptionWriteQLen, 10)
    socket.Sock.SetOption(mangos.OptionReadQLen, 10)
    if err = socket.Sock.Listen(url); err != nil {
        fmt.Println("socket lisnte error ", cameraid)
    }
    fmt.Println(cameraid, "success")
    SocketManage[cameraid] = socket
    return cameraid, socket, err
}
// 获取 cid , taskid, sdkid 关系
@@ -198,3 +138,46 @@
    }
    return
}
// create server
func NewCamerSocketListen(mode int, cameraid string, url string) (cid string, socket SocketContext, err error) {
    fmt.Println("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    socket.Context = ctx
    socket.Cancel = cancel
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    fmt.Println("new socket.Sock: ", socket.Sock)
    if socket.Sock == nil {
        return cameraid, socket, errors.New("create listen error")
    }
    SocketManage[cameraid] = socket
    return cameraid, socket, nil
}
func Recv(cameraid string, socket SocketContext) {
    //    socket.Sock.SetOption(mangos.OptionRecvDeadline, 5*time.Second)
    var msg []byte
    var err error
    for {
        select {
        case <-socket.Context.Done():
            fmt.Println("listen recv quit")
            return
        default:
            if msg, err = socket.Sock.Recv(); err != nil {
                //fmt.Printf("%s ", err)
                continue
            } else {
                fmt.Println("cameraid: ", len(msg))
                for _, taskid := range GetAlltask(cameraid) {
                    Taskdolist(cameraid, taskid, msg)
                    //    fmt.Println("receive: ", len(msg), "cameraid: ", cameraid, "taskid: ", taskid)
                }
            }
        }
    }
}
go.mod
@@ -5,6 +5,7 @@
require (
    github.com/Microsoft/go-winio v0.4.12 // indirect
    github.com/golang/protobuf v1.3.1
    github.com/gorilla/websocket v1.4.0 // indirect
    golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be // indirect
    nanomsg.org/go-mangos v1.4.0
)
go.sum
@@ -2,6 +2,8 @@
github.com/Microsoft/go-winio v0.4.12/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be h1:mI+jhqkn68ybP0ORJqunXn+fq+Eeb4hHKqLQcFICjAc=
golang.org/x/sys v0.0.0-20190418153312-f0ce4c0180be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
sdk/sdk.go
@@ -2,26 +2,23 @@
import (
    "context"
    "errors"
    "fmt"
    "time"
    "github.com/long/test/httpclient"
    "github.com/long/test/protomsg"
    "github.com/long/test/tasktag"
    "github.com/long/test/util"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pair"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "github.com/golang/protobuf/proto"
    "github.com/long/test/deliver"
)
//var doOnce sync.Once
var SocketManage = make(map[string]SocketContext)
type SocketContext struct {
    Sock    mangos.Socket
    Sock    deliver.Deliver
    Context context.Context
    Cancel  context.CancelFunc
}
@@ -34,16 +31,14 @@
    SdkCreateTopic(sdklist) // 创建主题
    for _, sdkid := range sdklist { // 创建sdk server
        sdkid, socket, err := NewSdkListen(sdkid, "tcp", "192.168.1.124", 0)
        url := fmt.Sprintf("%s%d", "tcp://192.168.1.124:", UrlPort)
        sdkid, socket, err := NewSdkSocketListen(deliver.PushPull, sdkid, url)
        if err != nil {
            continue
        }
        // 接受管道数据 ==》 发送给 对应的进程
        go send(sdkid, socket, SdkMap[sdkid])
        //从对应进程接受数据 == 》 重新送回到管道
        go Recv(socket)
        go Send(sdkid, socket, SdkMap[sdkid])
        //break
        //    Recv(socket)
    }
    go es(SdkMap["es"])
@@ -129,50 +124,28 @@
var UrlPort = 9000
// 创建 sdk server  listen
func NewSdkListen(sdkid string, protocol string, ip string, port int) (sid string, socket SocketContext, err error) {
// create server
func NewSdkSocketListen(mode int, sdkid string, url string) (sid string, socket SocketContext, err error) {
    fmt.Println("url is: ", url)
    ctx, cancel := context.WithCancel(context.Background())
    var url string
    socket.Context = ctx
    socket.Cancel = cancel
    switch protocol {
    case "tcp":
        if port == 0 {
            port = UrlPort
            UrlPort++
        }
        url = fmt.Sprintf("%s://%s:%d", protocol, ip, port)
    socket.Sock = deliver.NewServer(deliver.Mode(mode), url)
    fmt.Println(sdkid, socket.Sock)
    case "ipc":
        url = fmt.Sprintf("%s://%s", sdkid)
    }
    fmt.Printf("sdkid= %s  url=%s\n", sdkid, url)
    if socket.Sock, err = pair.NewSocket(); err != nil {
        fmt.Println(sdkid, "can't get new pair socket: ", err.Error())
        return sdkid, socket, err
    if socket.Sock == nil {
        return sdkid, socket, errors.New("create listen error")
    }
    socket.Sock.SetOption(mangos.OptionMaxRecvSize, 32*1024*1024)
    socket.Sock.SetOption(mangos.OptionWriteQLen, 10)
    socket.Sock.SetOption(mangos.OptionReadQLen, 10)
    socket.Sock.AddTransport(tcp.NewTransport())
    socket.Sock.AddTransport(ipc.NewTransport())
    if err = socket.Sock.Listen(url); err != nil {
        fmt.Println("socket lisnte error ", sdkid)
    }
    SocketManage[sdkid] = socket
    return sdkid, socket, err
    UrlPort++
    return sdkid, socket, nil
}
func Recv(socket SocketContext) {
    socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
    //socket.Sock.SetOption(mangos.OptionRecvDeadline, 1*time.Second)
    var repsdkmsg = &protomsg.SdkMessage{}
    for {
@@ -198,7 +171,7 @@
    }
}
func send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
func Send(sdkid string, socket SocketContext, in chan *protomsg.SdkMessage) {
    var v *protomsg.SdkMessage
    for {
@@ -211,11 +184,17 @@
                fmt.Println("proto marshal error ", err)
            }
            fmt.Printf("从管道sdkid=%s 接受数据 %d\n", sdkid, len(data))
            fmt.Println()
            fmt.Println("send len of data: ", len(data))
            if err := socket.Sock.Send(data); err != nil {
                fmt.Println("failed send")
            }
            fmt.Printf("sdkid = %s ,send success:%d \n", sdkid, len(data))
            //    go func(sock SocketContext) {
            //        doOnce.Do(func() { Recv(sock) })
            //    }(socket)
        }
    }
}
test
Binary files differ