package main // #include // #include import "C" import ( "nsqclient/TST/test" "nsqclient/nsqcli" "sync" "time" "unsafe" ) //export createProducer func createProducer(addr string) unsafe.Pointer { n, _ := nsqcli.NewProducer(addr) return nsqcli.Save(n) } //export destroyProducer func destroyProducer(ph unsafe.Pointer) { nsqcli.Unref(ph) nsqcli.DestroyProducerPool() } func pcvt(ph unsafe.Pointer) nsqcli.Producer { return nsqcli.Restore(ph).(nsqcli.Producer) } //export publish func publish(ph unsafe.Pointer, topic string, msg []byte) bool { p := pcvt(ph) if err := p.Publish(topic, msg); err != nil { return false } return true } //export multiPublish func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool { p := pcvt(ph) if err := p.MultiPublish(topic, msg); err != nil { return false } return true } //export deferredPublish func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool { p := pcvt(ph) if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil { return false } return true } ///////////////////////////////////////////////////////////// type consumer struct { nsqcon *nsqcli.NsqConsumer lck sync.Mutex msgs [][]byte } //export createConsumer func createConsumer(topic, channel string) unsafe.Pointer { if c, err := nsqcli.NewNsqConsumer(nil, topic, channel); err == nil { con := &consumer{ nsqcon: c, } return nsqcli.Save(con) } return nil } func ccvt(ch unsafe.Pointer) *consumer { return nsqcli.Restore(ch).(*consumer) } //export destroyConsumer func destroyConsumer(ch unsafe.Pointer) { nsqcli.DestroyNsqConsumer(ccvt(ch).nsqcon) nsqcli.Unref(ch) } //export Run func Run(ch unsafe.Pointer, addr string) { c := ccvt(ch) c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil }) c.nsqcon.Run(addr, 1) } //export RunLookupd func RunLookupd(ch unsafe.Pointer, lookAddr string) { c := ccvt(ch) c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil }) c.nsqcon.RunLookupd(lookAddr, 1) } //export getMessage func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool { c := ccvt(ch) c.lck.Lock() defer c.lck.Unlock() if len(c.msgs) == 0 { return false } msg := c.msgs[0] c.msgs = c.msgs[1:] *size = C.size_t(len(msg)) ptr := C.malloc(*size) C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size) *data = ptr return true } //export relMessage func relMessage(msg unsafe.Pointer) { if msg != nil { C.free(msg) } } func main() { test.Test() }