#include <stdio.h>
|
#include <string.h>
|
#include "clib/libnsqclient.h"
|
|
#include <string>
|
#include <thread>
|
#include <mutex>
|
|
using namespace std;
|
|
static void produce(int two){
|
char ip[] = "192.168.20.108:4150";
|
GoString addr = {ip, (ptrdiff_t)strlen(ip)};
|
void* p = createProducer(addr);
|
|
string msg("cnsqclient dynamic library");
|
while(msg.size() < 32){
|
msg += msg;
|
}
|
// printf("msg %s\n", msg.c_str());
|
|
for(int i = 0; i < 1000000; i++){
|
GoString topic = {"test", 4};
|
string amsg = msg + "-x";
|
GoSlice data{(void*)amsg.data(), (GoInt)amsg.size(), (GoInt)amsg.size()};
|
if (!publish(p, topic, data)){
|
printf("publish msg failed topic %s\n", topic.p);
|
exit(0);
|
}
|
|
if (two){
|
topic.p = "test2";
|
topic.n = 5;
|
amsg = msg + "-y";
|
data.data = (void*)amsg.data();
|
if (!publish(p, topic, data)){
|
printf("publish msg failed topic %s\n", topic.p);
|
exit(0);
|
}
|
}
|
|
}
|
destroyProducer(p);
|
}
|
|
|
static void consume(const char* topic, const char* channel){
|
GoString t = {topic, (ptrdiff_t)strlen(topic)};
|
GoString c = {channel, (ptrdiff_t)strlen(channel)};
|
|
void* con = createConsumer(t, c);
|
|
|
// thread
|
thread([&con]{
|
|
// char ip[] = "192.168.20.108:4150";
|
// GoString addr = {ip, (ptrdiff_t)strlen(ip)};
|
// Run(con, addr);
|
|
char lip[] = "192.168.20.108:4161";
|
GoString laddr = {lip, (ptrdiff_t)strlen(lip)};
|
RunLookupd(con, laddr);
|
|
}).detach();
|
|
auto start = chrono::steady_clock::now();
|
int count = 0;
|
while (true) {
|
void* msg = NULL;
|
size_t size = 0;
|
GoUint8 ok = getMessage(con, &msg, &size);
|
if (!ok){
|
this_thread::sleep_for(chrono::milliseconds(100));
|
continue;
|
}
|
count++;
|
printf("======>> recv msg %s size %d\n", (char*)msg, count);
|
relMessage(msg);
|
if (count > 999000){
|
printf("======>> use time %ld\n",
|
chrono::duration_cast<chrono::seconds>(chrono::steady_clock::now()-start).count());
|
}
|
}
|
printf("======>> recv all msg size %d\n", count);
|
}
|
|
int main(int argc, char const *argv[])
|
{
|
bool two = false;
|
|
thread([two]{
|
produce(two);
|
}).detach();
|
|
if (two) thread([]{ consume("test2", "sensor01"); }).detach();
|
|
consume("test", "sensor01");
|
|
return 0;
|
}
|