zhangzengfei
2023-10-19 bf2b61519fd0d79ddb19f0469749fbbe1d6c4ad8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package nsq
 
import (
    "apsClient/conf"
    "apsClient/pkg/logx"
    "apsClient/pkg/nsqclient"
)
 
var producer nsqclient.Producer
 
func GetProducer() nsqclient.Producer {
    return producer
}
 
func initProducer() (err error) {
    producer, err = nsqclient.NewProducer(conf.Conf.NsqConf.NsqdAddr)
    if err != nil {
        logx.Errorf("NewProducer err:%v", err)
        return err
    }
    ////测试发布数据
    //go func() {
    //    for {
    //        time.Sleep(time.Second * 2)
    //        _ = producer.Publish("test", []byte("123"))
    //    }
    //}()
    //
    //go func() {
    //    testH := &testHand{}
    //    c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel1")
    //    c.AddHandler(testH.HandleMessage)
    //    c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
    //    if err != nil {
    //        return
    //    }
    //}()
    //go func() {
    //    testH := &testHand2{}
    //    c, err := nsqclient.NewNsqConsumer(context.Background(), "test", "channel2")
    //    c.AddHandler(testH.HandleMessage)
    //    c.Run(conf.Conf.NsqConf.NsqdAddr, 1)
    //    if err != nil {
    //        return
    //    }
    //}()
    return nil
}
 
//// 测试消费数据
//type testHand struct {
//}
//
//func (slf *testHand) HandleMessage(data []byte) (err error) {
//    fmt.Println("testHand1", string(data))
//    return err
//}
//
//type testHand2 struct {
//}
//
//func (slf *testHand2) HandleMessage(data []byte) (err error) {
//    fmt.Println("testHand2", string(data))
//    return err
//}