zhangmeng
2019-08-27 fbafdc0bb9ace05f477a747e6c6744309008c027
update deliver
5个文件已修改
121 ■■■■ 已修改文件
deliver @ 020e17 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
profile/shmrecv.go 111 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver
@@ -1 +1 @@
Subproject commit 20a4c4bfb5b9ea427f9117408ff0e4513ebef9eb
Subproject commit 020e17cc5311d091d713eb4fabae2a3d50944916
go.mod
@@ -4,6 +4,8 @@
require (
    basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93
    github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478
    github.com/funny/utest v0.0.0-20161029064919-43870a374500 // indirect
    github.com/gorilla/websocket v1.4.1 // indirect
    golang.org/x/sys v0.0.0-20190825160603-fb81701db80f
    nanomsg.org/go-mangos v1.4.0
go.sum
@@ -1,5 +1,9 @@
basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93 h1:KawyUKwFGIQIv043VGV1QvXOiU8aq7DEyB3QhAq+Syc=
basic.com/valib/shm.git v0.0.0-20190826090635-7db9aba5ca93/go.mod h1:yYRM7bM9y0KKd4IfNt3myjsvkFVFIIWNjsvK14tNbq4=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=
github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/sys v0.0.0-20190825160603-fb81701db80f h1:LCxigP8q3fPRGNVYndYsyHnF0zRrvcoVwZMfb8iQZe4=
main.go
@@ -64,7 +64,7 @@
func main() {
    flag.Parse()
    if server {
    if proto == recv {
        go func() {
            http.ListenAndServe("0.0.0.0:6061", nil)
        }()
profile/shmrecv.go
@@ -5,6 +5,7 @@
    "demo/deliver"
    "fmt"
    "os"
    "sync"
    "time"
)
@@ -54,8 +55,8 @@
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
                    index, count, string(msg), len(msg), elapse)
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                    index, count, len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
@@ -66,6 +67,72 @@
    }
}
func shmrecver2(ctx context.Context, c deliver.Deliver, index int, ch chan<- bool, pool *sync.Pool) {
    // var msg []byte
    // var err error
    var t int64
    var elapse int64
    count := 0
    for {
        select {
        case <-ctx.Done():
            return
        default:
            msg := *pool.Get().(*[]byte)
            err := c.Recv2(msg)
            if err != nil {
                c.Close()
                url := "hello"
                i, err := deliver.NewClientWithError(deliver.Shm, url)
                for {
                    if err == nil {
                        break
                    }
                    time.Sleep(1 * time.Second)
                    i, err = deliver.NewClientWithError(deliver.Shm, url)
                    fmt.Println("client create failed : ", err)
                }
                c = i
                pool.Put(&msg)
                fmt.Println("recv error : ", err)
                continue
            }
            if ch != nil {
                ch <- true
            }
            if t == 0 {
                t = time.Now().UnixNano()
            }
            elapse = time.Now().UnixNano() - t
            count++
            if elapse > 1e9 {
                fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
                    index, count, len(msg), elapse)
                elapse = 0
                count = 0
                t = 0
            }
            pool.Put(&msg)
        }
        // time.Sleep(10 * time.Millisecond)
    }
}
var chunkSize = 32 * 1024 * 1024
func Shmrecv(ctx context.Context, server bool, ipc string, count int) {
    blocks := 2
@@ -83,35 +150,6 @@
    } else {
        recvers(ctx, ipc, count, nil)
        return
        chWaiter := make(chan bool, count)
        cs := recvers(ctx, ipc, count, chWaiter)
        go func() {
            waitCount := 0
            for {
                select {
                case <-ctx.Done():
                    return
                case <-chWaiter:
                    waitCount = 0
                default:
                    if waitCount > 200*3 {
                        for _, v := range cs {
                            v.Close()
                        }
                        // cs = recvers(ctx, ipc, count, chWaiter)
                        // fmt.Println("restart recievers")
                        // waitCount = 0
                        // continue
                    }
                    time.Sleep(time.Millisecond * 5)
                    waitCount++
                }
            }
        }()
    }
}
@@ -125,6 +163,16 @@
        fmt.Println("wait for shm server start")
        time.Sleep(time.Second)
    }
    // pool := &sync.Pool{
    //     New: func() interface{} {
    //         b := make([]byte, chunkSize)
    //         return &b
    //     },
    // }
    // pool := slab.NewAtomPool(chunkSize, chunkSize, 2, chunkSize*8)
    var cs []deliver.Deliver
    for i := 0; i < count; i++ {
        c, err := deliver.NewClientWithError(deliver.Shm, url)
@@ -138,6 +186,7 @@
        }
        cs = append(cs, c)
        go shmrecver(ctx, c, i, ch)
        // go shmrecver2(ctx, c, i, ch, pool)
    }
    return cs
}