#include #include #include "clib/libnsqclient.h" #include #include #include using namespace std; static void produce(int two){ char ip[] = "192.168.20.108:4150"; GoString addr = {ip, (ptrdiff_t)strlen(ip)}; void* p = createProducer(addr); string msg("cnsqclient dynamic library"); while(msg.size() < 32){ msg += msg; } // printf("msg %s\n", msg.c_str()); for(int i = 0; i < 1000000; i++){ GoString topic = {"test", 4}; string amsg = msg + "-x"; GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()}; if (!publish(p, topic, data)){ printf("publish msg failed topic %s\n", topic.p); exit(0); } if (two){ topic.p = "test2"; topic.n = 5; amsg = msg + "-y"; data.data = (void*)amsg.data(); if (!publish(p, topic, data)){ printf("publish msg failed topic %s\n", topic.p); exit(0); } } } destroyProducer(p); } static void consume(const char* topic, const char* channel){ GoString t = {topic, (ptrdiff_t)strlen(topic)}; GoString c = {channel, (ptrdiff_t)strlen(channel)}; void* con = createConsumer(t, c); char ip[] = "192.168.20.108:4150"; GoString addr = {ip, (ptrdiff_t)strlen(ip)}; // thread thread([&con,&addr]{ Run(con, addr); }).detach(); auto start = chrono::steady_clock::now(); int count = 0; while (true) { void* msg = NULL; size_t size = 0; GoUint8 ok = getMessage(con, &msg, &size); if (!ok){ this_thread::sleep_for(chrono::milliseconds(100)); continue; } count++; printf("======>> recv msg %s size %d\n", (char*)msg, count); relMessage(msg); if (count > 999000){ printf("======>> use time %d\n", chrono::duration_cast(chrono::steady_clock::now()-start).count()); } } printf("======>> recv all msg size %d\n", count); } int main(int argc, char const *argv[]) { thread([]{ produce(false); }).detach(); // thread([]{ // consume("test2", "sensor01"); // }).detach(); consume("test", "sensor01"); return 0; }