zhangmeng
2020-01-21 93d62ff96dafca109b426e397396ad3a46cf295d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
 
import (
    "context"
    "time"
 
    "basic.com/valib/pubsub.git"
)
 
func wait(ctx context.Context, c chan pubsub.Message, out chan<- []byte) {
    for {
        select {
        case <-ctx.Done():
            return
        case msg := <-c:
            out <- msg.Msg
        default:
            time.Sleep(time.Second)
        }
    }
}
 
// Fetch Fetch from tcp://192.168.5.22:4005
func Fetch(ctx context.Context, url, heartBeatURL string, mode int, processID string, out chan<- []byte, fn func(...interface{})) {
    topics := []string{pubsub.Topic_Sdk}
    p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
    for {
        if err == nil {
            break
        }
        fn("libcomm.so, pubsub subscribe error: ", err)
        time.Sleep(time.Second)
        p, err = pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
    }
 
    c := p.Recv()
 
    go wait(ctx, c, out)
}