package main // #include // #include import "C" import ( "basic.com/aps/nsqclient.git" "gonsqcli/TST/test" "sync" "time" "unsafe" ) //export createProducer func createProducer(addr string) unsafe.Pointer { n, _ := nsqclient.NewProducer(addr) return nsqclient.Save(n) } //export destroyProducer func destroyProducer(ph unsafe.Pointer) { nsqclient.Unref(ph) nsqclient.DestroyProducerPool() } func pcvt(ph unsafe.Pointer) nsqclient.Producer { return nsqclient.Restore(ph).(nsqclient.Producer) } //export publish func publish(ph unsafe.Pointer, topic string, msg []byte) bool { p := pcvt(ph) if err := p.Publish(topic, msg); err != nil { return false } return true } //export multiPublish func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool { p := pcvt(ph) if err := p.MultiPublish(topic, msg); err != nil { return false } return true } //export deferredPublish func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool { p := pcvt(ph) if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil { return false } return true } ///////////////////////////////////////////////////////////// type consumer struct { nsqcon *nsqclient.NsqConsumer lck sync.Mutex msgs [][]byte } //export createConsumer func createConsumer(topic, channel string) unsafe.Pointer { if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil { con := &consumer{ nsqcon: c, } return nsqclient.Save(con) } return nil } func ccvt(ch unsafe.Pointer) *consumer { return nsqclient.Restore(ch).(*consumer) } //export destroyConsumer func destroyConsumer(ch unsafe.Pointer) { nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon) nsqclient.Unref(ch) } //export Run func Run(ch unsafe.Pointer, addr string) { c := ccvt(ch) c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil }) c.nsqcon.Run(addr, 1) } //export RunLookupd func RunLookupd(ch unsafe.Pointer, lookAddr string) { c := ccvt(ch) c.nsqcon.AddHandler(func(msg []byte) error { c.lck.Lock() defer c.lck.Unlock() c.msgs = append(c.msgs, msg) return nil }) c.nsqcon.RunLookupd(lookAddr, 1) } //export getMessage func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool { c := ccvt(ch) c.lck.Lock() defer c.lck.Unlock() if len(c.msgs) == 0 { return false } msg := c.msgs[0] c.msgs = c.msgs[1:] *size = C.size_t(len(msg)) ptr := C.malloc(*size) C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size) *data = ptr return true } //export relMessage func relMessage(msg unsafe.Pointer) { if msg != nil { C.free(msg) } } func main() { test.Test() }