zhangmeng
2023-05-16 4244acad88afc44ae8a34a8fbd9e296780a0cc64
remove github.com nsq depend
4个文件已修改
51 ■■■■ 已修改文件
TST/ctest/ctest.cpp 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
TST/test/test.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/consumer.go 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
TST/ctest/ctest.cpp
@@ -82,12 +82,10 @@
int main(int argc, char const *argv[])
{
    thread([]{
        produce(false);
        produce(true);
    }).detach();
    // thread([]{
    //     consume("test2", "sensor01");
    // }).detach();
    thread([]{ consume("test2", "sensor01"); }).detach();
    consume("test", "sensor01");
TST/test/test.go
@@ -6,8 +6,6 @@
    "log"
    "nsqCli/nsqclient"
    "time"
    "github.com/nsqio/go-nsq"
)
func produce(two bool) {
@@ -49,14 +47,14 @@
        ch := make(chan struct{})
        count := 0
        c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
        c.AddHandler(func(data []byte) error {
            count++
            fmt.Println("recv msg ", string(msg.Body), " size", count)
            fmt.Println("recv msg ", string(data), " size", count)
            if count > 999000 {
                ch <- struct{}{}
            }
            return nil
        }))
        })
        // go c.Run("192.168.20.108:4150", 2)
        go c.RunLookupd("192.168.20.108:4161", 2)
@@ -69,8 +67,8 @@
}
func Test() {
    go produce(false)
    go produce(true)
    // go consume("test2", "sensor01")
    go consume("test2", "sensor01")
    consume("test", "sensor01")
}
main.go
@@ -10,8 +10,6 @@
    "sync"
    "time"
    "unsafe"
    "github.com/nsqio/go-nsq"
)
//export createProducer
@@ -62,7 +60,7 @@
type consumer struct {
    nsqcon *nsqclient.NsqConsumer
    lck    sync.Mutex
    msgs   []*nsq.Message
    msgs   [][]byte
}
//export createConsumer
@@ -89,12 +87,12 @@
//export Run
func Run(ch unsafe.Pointer, addr string) {
    c := ccvt(ch)
    c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
    c.nsqcon.AddHandler(func(msg []byte) error {
        c.lck.Lock()
        defer c.lck.Unlock()
        c.msgs = append(c.msgs, msg)
        return nil
    }))
    })
    c.nsqcon.Run(addr, 1)
}
@@ -102,12 +100,12 @@
//export RunLookupd
func RunLookupd(ch unsafe.Pointer, lookAddr string) {
    c := ccvt(ch)
    c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
    c.nsqcon.AddHandler(func(msg []byte) error {
        c.lck.Lock()
        defer c.lck.Unlock()
        c.msgs = append(c.msgs, msg)
        return nil
    }))
    })
    c.nsqcon.RunLookupd(lookAddr, 1)
}
@@ -124,9 +122,9 @@
    msg := c.msgs[0]
    c.msgs = c.msgs[1:]
    *size = C.size_t(len(msg.Body))
    *size = C.size_t(len(msg))
    ptr := C.malloc(*size)
    C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size)
    C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size)
    *data = ptr
    return true
nsqclient/consumer.go
@@ -9,8 +9,9 @@
)
type NsqConsumer struct {
    consumer  *nsq.Consumer
    handler   nsq.Handler
    consumer *nsq.Consumer
    // handler   nsq.Handler
    handler   func([]byte) error
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
@@ -46,7 +47,11 @@
    }
}
func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//     n.handler = handler
// }
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
    n.handler = handler
}
@@ -60,7 +65,11 @@
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
        return n.handler(msg.Body)
        // return nil
    }), concurrency)
    var err error
    if len(qAddr) > 0 {