package main /* #include "bh_api_go.h" */ // #cgo LDFLAGS: -L/home/lichao/code/shmsg/build/lib -L/usr/local/lib -lbhome_shmq -lbhome_msg -lprotobuf-lite -lstdc++ -lpthread -lrt import "C" import ( bh "bhome_node/bhome_msg" "fmt" "time" "unsafe" ) func BHApiIn1Out1(bhfunc C.FBHApiIn1Out1, data []byte, reply *bh.MsgCommonReply, timeout_ms int) bool { creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHApiIn1Out1Proxy(bhfunc, unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0 if r { reply.Unmarshal(C.GoBytes(creply, creply_len)) } return r } func Register(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool { data, _ := proc.Marshal() return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegister), data, reply, timeout_ms) } func RegisterTopics(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { data, _ := topics.Marshal() return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHRegisterTopics), data, reply, timeout_ms) } func Subscribe(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { data, _ := topics.Marshal() return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms) } func Heartbeat(topics *bh.MsgTopicList, reply *bh.MsgCommonReply, timeout_ms int) bool { data, _ := topics.Marshal() return BHApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms) } func HeartbeatEasy(timeout_ms int) bool { return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0 } func Publish(pub *bh.MsgPublish, timeout_ms int) bool { data, _ := pub.Marshal() return C.BHPublish(unsafe.Pointer(&data[0]), C.int(len(data)), C.int(timeout_ms)) > 0 } func ReadSub(proc_id *string, pub *bh.MsgPublish, timeout_ms int) bool { cpid := unsafe.Pointer(nil) cpid_len := C.int(0) defer C.BHFree(cpid, cpid_len) creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHReadSub(&cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 *proc_id = string(C.GoBytes(cpid, cpid_len)) pub.Unmarshal(C.GoBytes(creply, creply_len)) return r } func AsyncRequest(req *bh.MsgRequestTopic, msg_id *[]byte) bool { data, _ := req.Marshal() creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHAsyncRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &creply, &creply_len) > 0 if r { *msg_id = C.GoBytes(creply, creply_len) } return r } func Request(req *bh.MsgRequestTopic, proc_id *string, reply *bh.MsgRequestTopicReply, timeout_ms int) bool { data, _ := req.Marshal() cpid := unsafe.Pointer(nil) cpid_len := C.int(0) defer C.BHFree(cpid, cpid_len) creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHRequest(unsafe.Pointer(&data[0]), C.int(len(data)), &cpid, &cpid_len, &creply, &creply_len, C.int(timeout_ms)) > 0 if r { *proc_id = string(C.GoBytes(cpid, cpid_len)) reply.Unmarshal(C.GoBytes(creply, creply_len)) } return r } func ReadRequest(proc_id *string, req *bh.MsgRequestTopic, psrc *unsafe.Pointer, timeout_ms int) bool { cpid := unsafe.Pointer(nil) cpid_len := C.int(0) defer C.BHFree(cpid, cpid_len) creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHReadRequest(&cpid, &cpid_len, &creply, &creply_len, psrc, C.int(timeout_ms)) > 0 if r { *proc_id = string(C.GoBytes(cpid, cpid_len)) req.Unmarshal(C.GoBytes(creply, creply_len)) } return r } func SendReply(src unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool { data, _ := rep.Marshal() return C.BHSendReply(src, unsafe.Pointer(&data[0]), C.int(len(data))) > 0 } func GetLastError() (int, string) { creply := unsafe.Pointer(nil) creply_len := C.int(0) defer C.BHFree(creply, creply_len) r := C.BHGetLastError(&creply, &creply_len) return int(r), string(C.GoBytes(creply, creply_len)) } type ServerCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) type SubDataCB func(proc_id *string, pub *bh.MsgPublish) var cgoServerCB ServerCB var cgoClientCB ClientCB var cgoSubDataCB SubDataCB //export CGoSubDataCallback func CGoSubDataCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int) { proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) msg := bh.MsgPublish{} msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len)) cgoSubDataCB(&proc_id, &msg) } //export CGoServerCallback func CGoServerCallback(cpid C.PCVoid, pid_len C.int, data C.PCVoid, data_len C.int, src unsafe.Pointer) { proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) msg := bh.MsgRequestTopic{} msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len)) cgoServerCB(src, &proc_id, &msg) } //export CGoClientCallback func CGoClientCallback(cpid C.PCVoid, pid_len C.int, msgid C.PCVoid, msgid_len C.int, data C.PCVoid, data_len C.int) { proc_id := string(C.GoBytes(unsafe.Pointer(cpid), pid_len)) msg_id := C.GoBytes(unsafe.Pointer(msgid), msgid_len) var msg bh.MsgRequestTopicReply msg.Unmarshal(C.GoBytes(unsafe.Pointer(data), data_len)) cgoClientCB(&proc_id, &msg_id, &msg) } func StartWorker(c ClientCB, s ServerCB, sub SubDataCB) { cgoClientCB = c cgoServerCB = s cgoSubDataCB = sub C.CGoStartWorker() } ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // user code: func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) { fmt.Println("user server cb called, request topic: " + string(req.Topic) + ", data:" + string(req.Data)) reply := bh.MsgRequestTopicReply{} reply.Data = []byte("reply 1234") SendReply(src, &reply) } func SubDataCallback(proc_id *string, pub *bh.MsgPublish) { fmt.Println("user sub data cb called") } func ClientCallback(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply) { fmt.Println("user client cb reply: " + string(reply.Data)) } func main() { proc_id := "test_proc" proc := bh.ProcInfo{} proc.ProcId = []byte(proc_id) reply := bh.MsgCommonReply{} StartWorker(ClientCallback, ServerCallback, SubDataCallback) r := Register(&proc, &reply, 1000) if r { fmt.Println("register ok") } else { fmt.Println("register failed") return } r = HeartbeatEasy(1000) if r { fmt.Println("heartbeat ok") } else { fmt.Println("heartbeat failed") } topics := bh.MsgTopicList{} topics.TopicList = append(topics.TopicList, []byte("topic0"), []byte("topic1")) RegisterTopics(&topics, &reply, 0) if r { fmt.Println("reg topics ok") } else { fmt.Println("reg topics failed") } req := bh.MsgRequestTopic{} time.Sleep(time.Second * 1) req.Topic = []byte("topic0") req.Data = []byte("data0") // var msg_id []byte // AsyncRequest(&req, &msg_id) // fmt.Println(msg_id) // time.Sleep(time.Second * 5) pid := "" rr := bh.MsgRequestTopicReply{} for i := 0; i < 10000; i++ { if Request(&req, &pid, &rr, 3000) { fmt.Println("server:" + pid + ", reply:" + string(rr.Data)) } else { e, s := GetLastError() fmt.Println("ec:", e, ", msg:"+s) } } }