package main
|
|
// #include <stdlib.h>
|
// #include <string.h>
|
import "C"
|
|
import (
|
"nsqCli/TST/test"
|
"nsqCli/nsqclient"
|
"sync"
|
"time"
|
"unsafe"
|
)
|
|
//export createProducer
|
func createProducer(addr string) unsafe.Pointer {
|
n, _ := nsqclient.NewProducer(addr)
|
return nsqclient.Save(n)
|
}
|
|
//export destroyProducer
|
func destroyProducer(ph unsafe.Pointer) {
|
nsqclient.Unref(ph)
|
nsqclient.DestroyProducerPool()
|
}
|
|
func pcvt(ph unsafe.Pointer) nsqclient.Producer {
|
return nsqclient.Restore(ph).(nsqclient.Producer)
|
}
|
|
//export publish
|
func publish(ph unsafe.Pointer, topic string, msg []byte) bool {
|
p := pcvt(ph)
|
if err := p.Publish(topic, msg); err != nil {
|
return false
|
}
|
return true
|
}
|
|
//export multiPublish
|
func multiPublish(ph unsafe.Pointer, topic string, msg [][]byte) bool {
|
p := pcvt(ph)
|
if err := p.MultiPublish(topic, msg); err != nil {
|
return false
|
}
|
return true
|
}
|
|
//export deferredPublish
|
func deferredPublish(ph unsafe.Pointer, topic string, ms int, msg []byte) bool {
|
p := pcvt(ph)
|
if err := p.DeferredPublish(topic, time.Duration(ms)*time.Millisecond, msg); err != nil {
|
return false
|
}
|
return true
|
}
|
|
/////////////////////////////////////////////////////////////
|
|
type consumer struct {
|
nsqcon *nsqclient.NsqConsumer
|
lck sync.Mutex
|
msgs [][]byte
|
}
|
|
//export createConsumer
|
func createConsumer(topic, channel string) unsafe.Pointer {
|
if c, err := nsqclient.NewNsqConsumer(nil, topic, channel); err == nil {
|
con := &consumer{
|
nsqcon: c,
|
}
|
return nsqclient.Save(con)
|
}
|
return nil
|
}
|
|
func ccvt(ch unsafe.Pointer) *consumer {
|
return nsqclient.Restore(ch).(*consumer)
|
}
|
|
//export destroyConsumer
|
func destroyConsumer(ch unsafe.Pointer) {
|
nsqclient.DestroyNsqConsumer(ccvt(ch).nsqcon)
|
nsqclient.Unref(ch)
|
}
|
|
//export Run
|
func Run(ch unsafe.Pointer, addr string) {
|
c := ccvt(ch)
|
c.nsqcon.AddHandler(func(msg []byte) error {
|
c.lck.Lock()
|
defer c.lck.Unlock()
|
c.msgs = append(c.msgs, msg)
|
return nil
|
})
|
|
c.nsqcon.Run(addr, 1)
|
}
|
|
//export RunLookupd
|
func RunLookupd(ch unsafe.Pointer, lookAddr string) {
|
c := ccvt(ch)
|
c.nsqcon.AddHandler(func(msg []byte) error {
|
c.lck.Lock()
|
defer c.lck.Unlock()
|
c.msgs = append(c.msgs, msg)
|
return nil
|
})
|
|
c.nsqcon.RunLookupd(lookAddr, 1)
|
}
|
|
//export getMessage
|
func getMessage(ch unsafe.Pointer, data *unsafe.Pointer, size *C.size_t) bool {
|
c := ccvt(ch)
|
c.lck.Lock()
|
defer c.lck.Unlock()
|
if len(c.msgs) == 0 {
|
return false
|
}
|
|
msg := c.msgs[0]
|
c.msgs = c.msgs[1:]
|
|
*size = C.size_t(len(msg))
|
ptr := C.malloc(*size)
|
C.memcpy(ptr, unsafe.Pointer(&msg[0]), *size)
|
*data = ptr
|
|
return true
|
}
|
|
//export relMessage
|
func relMessage(msg unsafe.Pointer) {
|
if msg != nil {
|
C.free(msg)
|
}
|
}
|
|
func main() {
|
test.Test()
|
}
|