remove github.com nsq depend
| | |
| | | int main(int argc, char const *argv[]) |
| | | { |
| | | thread([]{ |
| | | produce(false); |
| | | produce(true); |
| | | }).detach(); |
| | | |
| | | // thread([]{ |
| | | // consume("test2", "sensor01"); |
| | | // }).detach(); |
| | | thread([]{ consume("test2", "sensor01"); }).detach(); |
| | | |
| | | consume("test", "sensor01"); |
| | | |
| | |
| | | "log" |
| | | "nsqCli/nsqclient" |
| | | "time" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | func produce(two bool) { |
| | |
| | | ch := make(chan struct{}) |
| | | |
| | | count := 0 |
| | | c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | c.AddHandler(func(data []byte) error { |
| | | count++ |
| | | fmt.Println("recv msg ", string(msg.Body), " size", count) |
| | | fmt.Println("recv msg ", string(data), " size", count) |
| | | if count > 999000 { |
| | | ch <- struct{}{} |
| | | } |
| | | return nil |
| | | })) |
| | | }) |
| | | // go c.Run("192.168.20.108:4150", 2) |
| | | go c.RunLookupd("192.168.20.108:4161", 2) |
| | | |
| | |
| | | } |
| | | |
| | | func Test() { |
| | | go produce(false) |
| | | go produce(true) |
| | | |
| | | // go consume("test2", "sensor01") |
| | | go consume("test2", "sensor01") |
| | | consume("test", "sensor01") |
| | | } |
| | |
| | | "sync" |
| | | "time" |
| | | "unsafe" |
| | | |
| | | "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | //export createProducer |
| | |
| | | type consumer struct { |
| | | nsqcon *nsqclient.NsqConsumer |
| | | lck sync.Mutex |
| | | msgs []*nsq.Message |
| | | msgs [][]byte |
| | | } |
| | | |
| | | //export createConsumer |
| | |
| | | //export Run |
| | | func Run(ch unsafe.Pointer, addr string) { |
| | | c := ccvt(ch) |
| | | c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | c.nsqcon.AddHandler(func(msg []byte) error { |
| | | c.lck.Lock() |
| | | defer c.lck.Unlock() |
| | | c.msgs = append(c.msgs, msg) |
| | | return nil |
| | | })) |
| | | }) |
| | | |
| | | c.nsqcon.Run(addr, 1) |
| | | } |
| | |
| | | //export RunLookupd |
| | | func RunLookupd(ch unsafe.Pointer, lookAddr string) { |
| | | c := ccvt(ch) |
| | | c.nsqcon.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | c.nsqcon.AddHandler(func(msg []byte) error { |
| | | c.lck.Lock() |
| | | defer c.lck.Unlock() |
| | | c.msgs = append(c.msgs, msg) |
| | | return nil |
| | | })) |
| | | }) |
| | | |
| | | c.nsqcon.RunLookupd(lookAddr, 1) |
| | | } |
| | |
| | | msg := c.msgs[0] |
| | | c.msgs = c.msgs[1:] |
| | | |
| | | *size = C.size_t(len(msg.Body)) |
| | | *size = C.size_t(len(msg)) |
| | | ptr := C.malloc(*size) |
| | | C.memcpy(ptr, unsafe.Pointer(&msg.Body[0]), *size) |
| | | C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size) |
| | | *data = ptr |
| | | |
| | | return true |
| | |
| | | ) |
| | | |
| | | type NsqConsumer struct { |
| | | consumer *nsq.Consumer |
| | | handler nsq.Handler |
| | | consumer *nsq.Consumer |
| | | // handler nsq.Handler |
| | | handler func([]byte) error |
| | | ctx context.Context |
| | | ctxCancel context.CancelFunc |
| | | topic string |
| | |
| | | } |
| | | } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // n.handler = handler |
| | | // } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler func([]byte) error) { |
| | | n.handler = handler |
| | | } |
| | | |
| | |
| | | |
| | | func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { |
| | | n.consumer.ChangeMaxInFlight(concurrency) |
| | | n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | // n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | return n.handler(msg.Body) |
| | | // return nil |
| | | }), concurrency) |
| | | |
| | | var err error |
| | | if len(qAddr) > 0 { |