From 365c864a587365fe443b11cc0cd7cfc8f8f8eb81 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 01 六月 2021 11:19:22 +0800
Subject: [PATCH] refactor, clean up useless code.
---
src/robust.h | 187 ----------
/dev/null | 220 ------------
utest/simple_tests.cpp | 18
src/shm_msg_queue.h | 32 -
utest/robust_test.cpp | 129 -------
utest/utest.cpp | 292 -----------------
src/robust.cpp | 71 ++--
src/shm_msg_queue.cpp | 24 -
8 files changed, 46 insertions(+), 927 deletions(-)
diff --git a/src/robust.cpp b/src/robust.cpp
index 4654652..015f97a 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -22,43 +22,46 @@
namespace robust
{
-namespace
+bool AtomicReqRep::ClientRequest(const Data request, Data &reply)
{
-static_assert(sizeof(steady_clock::duration) == sizeof(int64_t));
-
-auto Now() { return steady_clock::now().time_since_epoch(); }
-void Yield() { std::this_thread::sleep_for(10us); }
-
-} // namespace
-
-void QuickSleep() { Yield(); }
-
-bool FMutex::try_lock()
-{
- if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
- ++count_;
- if (mtx_.try_lock()) {
- return true;
- } else {
- if (--count_ == 0) {
- flock(fd_, LOCK_UN);
- }
+ auto end_time = now() + 3s;
+ do {
+ Data cur = data_.load();
+ if (GetState(cur) == eStateFree &&
+ DataCas(cur, Encode(request, eStateRequest))) {
+ do {
+ yield();
+ cur = data_.load();
+ if (GetState(cur) == eStateReply) {
+ DataCas(cur, Encode(0, eStateFree));
+ reply = Decode(cur);
+ return true;
+ }
+ } while (now() < end_time);
}
+ yield();
+ } while (now() < end_time);
+ return false;
+}
+
+bool AtomicReqRep::ServerProcess(Handler onReq)
+{
+ Data cur = data_.load();
+ switch (GetState(cur)) {
+ case eStateRequest:
+ if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
+ timestamp_ = now();
+ return true;
+ }
+ break;
+ case eStateReply:
+ if (timestamp_.load() + 3s < now()) {
+ DataCas(cur, Encode(0, eStateFree));
+ }
+ break;
+ case eStateFree:
+ default: break;
}
return false;
}
-void FMutex::lock()
-{
- flock(fd_, LOCK_EX);
- ++count_;
- mtx_.lock();
-}
-void FMutex::unlock()
-{
- mtx_.unlock();
- if (--count_ == 0) {
- flock(fd_, LOCK_UN);
- }
-}
-
} // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
index b3b99c4..1bbe8fc 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -31,6 +31,7 @@
#include <sys/sem.h>
#include <sys/stat.h>
#include <sys/types.h>
+#include <thread>
#include <unistd.h>
namespace robust
@@ -38,144 +39,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
-void QuickSleep();
-
-class CasMutex
-{
- typedef uint64_t locker_t;
- static inline locker_t this_locker() { return pthread_self(); }
- static const uint64_t kLockerMask = MaskBits(63);
-
-public:
- CasMutex() :
- meta_(0) {}
- int try_lock()
- {
- auto old = meta_.load();
- int r = 0;
- if (!Locked(old)) {
- r = MetaCas(old, Meta(1, this_locker()));
- }
- return r;
- }
- int lock()
- {
- int r = 0;
- do {
- r = try_lock();
- } while (r == 0);
- return r;
- }
- void unlock()
- {
- auto old = meta_.load();
- if (Locked(old) && Locker(old) == this_locker()) {
- MetaCas(old, Meta(0, this_locker()));
- }
- }
-
-private:
- std::atomic<uint64_t> meta_;
- bool Locked(uint64_t meta) { return (meta >> 63) == 1; }
- locker_t Locker(uint64_t meta) { return meta & kLockerMask; }
- uint64_t Meta(uint64_t lk, locker_t lid) { return (lk << 63) | lid; }
- bool MetaCas(uint64_t exp, uint64_t val) { return meta_.compare_exchange_strong(exp, val); }
-};
-
-class NullMutex
-{
-public:
- template <class... T>
- explicit NullMutex(T &&...t) {} // easy test.
- bool try_lock() { return true; }
- void lock() {}
- void unlock() {}
-};
-
-// flock + mutex
-class FMutex
-{
-public:
- typedef uint64_t id_t;
- FMutex(id_t id) :
- id_(id), fd_(Open(id_)), count_(0)
- {
- if (fd_ == -1) { throw "error create mutex!"; }
- }
- ~FMutex() { Close(fd_); }
- bool try_lock();
- void lock();
- void unlock();
-
-private:
- static std::string GetPath(id_t id)
- {
- const std::string dir("/tmp/.bhome_mtx");
- mkdir(dir.c_str(), 0777);
- return dir + "/fm_" + std::to_string(id);
- }
- static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
- static int Close(int fd) { return close(fd); }
- id_t id_;
- int fd_;
- std::mutex mtx_;
- std::atomic<int32_t> count_;
-};
-
-union semun {
- int val; /* Value for SETVAL */
- struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
- unsigned short *array; /* Array for GETALL, SETALL */
- struct seminfo *__buf; /* Buffer for IPC_INFO
- (Linux-specific) */
-};
-
-class SemMutex
-{
-public:
- SemMutex(key_t key) :
- key_(key), sem_id_(semget(key, 1, 0666))
- {
- if (sem_id_ == -1) { throw "error create semaphore."; }
- }
- ~SemMutex() {}
-
- bool try_lock()
- {
- sembuf op = {0, -1, SEM_UNDO | IPC_NOWAIT};
- return semop(sem_id_, &op, 1) == 0;
- }
-
- void lock()
- {
- sembuf op = {0, -1, SEM_UNDO};
- semop(sem_id_, &op, 1);
- }
-
- void unlock()
- {
- sembuf op = {0, 1, SEM_UNDO};
- semop(sem_id_, &op, 1);
- }
-
-private:
- key_t key_;
- int sem_id_;
-};
-
-template <class Lock>
-class Guard
-{
-public:
- Guard(Lock &l) :
- l_(l) { l_.lock(); }
- ~Guard() { l_.unlock(); }
-
-private:
- Guard(const Guard &);
- Guard(Guard &&);
- Lock &l_;
-};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
@@ -194,8 +57,6 @@
AtomicQueue() { memset(this, 0, sizeof(*this)); }
size_type head() const { return head_.load(); }
size_type tail() const { return tail_.load(); }
- bool like_empty() const { return head() == tail() && Empty(buf[head()]); }
- bool like_full() const { return head() == tail() && !Empty(buf[head()]); }
bool push(const Data d, bool try_more = false)
{
bool r = false;
@@ -276,48 +137,8 @@
public:
typedef int64_t Data;
typedef std::function<Data(const Data)> Handler;
- bool ClientRequest(const Data request, Data &reply)
- {
- auto end_time = now() + 3s;
- do {
- Data cur = data_.load();
- if (GetState(cur) == eStateFree &&
- DataCas(cur, Encode(request, eStateRequest))) {
- do {
- yield();
- cur = data_.load();
- if (GetState(cur) == eStateReply) {
- DataCas(cur, Encode(0, eStateFree));
- reply = Decode(cur);
- return true;
- }
- } while (now() < end_time);
- }
- yield();
- } while (now() < end_time);
- return false;
- }
-
- bool ServerProcess(Handler onReq)
- {
- Data cur = data_.load();
- switch (GetState(cur)) {
- case eStateRequest:
- if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
- timestamp_ = now();
- return true;
- }
- break;
- case eStateReply:
- if (timestamp_.load() + 3s < now()) {
- DataCas(cur, Encode(0, eStateFree));
- }
- break;
- case eStateFree:
- default: break;
- }
- return false;
- }
+ bool ClientRequest(const Data request, Data &reply);
+ bool ServerProcess(Handler onReq);
private:
enum State {
@@ -328,7 +149,7 @@
static int GetState(Data d) { return d & MaskBits(3); }
static Data Encode(Data d, State st) { return (d << 3) | st; }
static Data Decode(Data d) { return d >> 3; }
- static void yield() { QuickSleep(); }
+ static void yield() { std::this_thread::sleep_for(10us); }
typedef steady_clock::duration Duration;
Duration now() { return steady_clock::now().time_since_epoch(); }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index be2d2a2..9db4c6b 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -53,22 +53,6 @@
ShmMsgQueue::~ShmMsgQueue() {}
-#ifndef BH_USE_ATOMIC_Q
-ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
-{
- static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
-
- static std::mutex mtx;
- std::lock_guard<std::mutex> lock(mtx);
- auto pos = imm.find(id);
- if (pos == imm.end()) {
- pos = imm.emplace(id, new Mutex(id)).first;
- // pos = imm.emplace(id, new Mutex()).first;
- }
- return *pos->second;
-}
-#endif
-
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
Queue *q = Find(shm, id);
@@ -95,17 +79,9 @@
try {
//TODO find from center, or use offset.
ShmMsgQueue dest(remote.offset_, shm, remote.id_);
-#ifndef BH_USE_ATOMIC_Q
- Guard lock(GetMutex(remote_id));
-#endif
return dest.queue().TryWrite(val);
} catch (...) {
// SetLastError(eNotFound, "remote not found");
return false;
}
}
-
-// Test shows that in the 2 cases:
-// 1) build msg first, then find remote queue;
-// 2) find remote queue first, then build msg;
-// 1 is about 50% faster than 2, maybe cache related.
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index 6d922aa..23faa24 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -25,23 +25,11 @@
using namespace bhome_shm;
using namespace bhome_msg;
-#define BH_USE_ATOMIC_Q
-
class ShmMsgQueue
{
public:
typedef int64_t RawData;
-
-#ifdef BH_USE_ATOMIC_Q
typedef ShmObject<SharedQ63<0>> Shmq;
-#else
- typedef ShmObject<SharedQueue<RawData>> Shmq;
- // typedef robust::FMutex Mutex;
- // typedef robust::SemMutex Mutex;
- typedef robust::NullMutex Mutex;
- typedef robust::Guard<Mutex> Guard;
-#endif
-
typedef Shmq::Data Queue;
typedef Shmq::ShmType ShmType;
typedef uint64_t MQId;
@@ -55,21 +43,8 @@
ShmType &shm() const { return queue_.shm(); }
int64_t AbsAddr() const { return queue_.offset(); }
- bool Recv(RawData &val, const int timeout_ms)
- {
-#ifndef BH_USE_ATOMIC_Q
- Guard lock(GetMutex(Id()));
-#endif
- return queue().Read(val, timeout_ms);
- }
-
- bool TryRecv(RawData &val)
- {
-#ifndef BH_USE_ATOMIC_Q
- Guard lock(GetMutex(Id()));
-#endif
- return queue().TryRead(val);
- }
+ bool Recv(RawData &val, const int timeout_ms) { return queue().Read(val, timeout_ms); }
+ bool TryRecv(RawData &val) { return queue().TryRead(val); }
bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
@@ -78,9 +53,6 @@
bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
-#ifndef BH_USE_ATOMIC_Q
- static Mutex &GetMutex(const MQId id);
-#endif
MQId id_;
Queue &queue() { return *queue_.data(); }
Shmq queue_;
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index ea6144c..3270481 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -20,7 +20,7 @@
{
AtomicReqRep rr;
auto client = [&]() {
- for (int i = 0; i < 20; ++i) {
+ for (int i = 0; i < 5; ++i) {
int64_t reply = 0;
bool r = rr.ClientRequest(i, reply);
printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply);
@@ -196,130 +196,3 @@
printf("total: %ld, expected: %ld\n", total.load(), correct_total);
BOOST_CHECK_EQUAL(total.load(), correct_total);
}
-
-BOOST_AUTO_TEST_CASE(MutexTest)
-{
- {
- int sem_id = semget(100, 1, 0666 | IPC_CREAT);
- auto P = [&]() {
- sembuf op = {0, -1, SEM_UNDO};
- semop(sem_id, &op, 1);
- };
- auto V = [&]() {
- sembuf op = {0, 1, SEM_UNDO};
- semop(sem_id, &op, 1);
- };
- for (int i = 0; i < 10; ++i) {
- V();
- }
- Sleep(10s);
-
- return;
- }
-
- // typedef robust::MFMutex RobustMutex;
- typedef robust::SemMutex RobustMutex;
-
- for (int i = 0; i < 20; ++i) {
- int size = i;
- int left = size & 7;
- int rsize = size + ((8 - left) & 7);
- printf("size: %3d, rsize: %3d\n", size, rsize);
- }
- SharedMemory &shm = TestShm();
- // shm.Remove();
- // return;
- GlobalInit(shm);
-
- const std::string mtx_name("test_mutex");
- const std::string int_name("test_int");
- // auto mtx = shm.FindOrCreate<RobustMutex>(mtx_name, 12345);
- RobustMutex rmtx(12345);
- auto mtx = &rmtx;
- auto pi = shm.FindOrCreate<int>(int_name, 100);
-
- std::mutex m;
- typedef std::chrono::steady_clock Clock;
-
- if (pi) {
- auto old = *pi;
- printf("int : %d, add1: %d\n", old, ++*pi);
- }
-
- auto LockSpeed = [](auto &mutex, const std::string &name) {
- const int ntimes = 1000 * 1;
- auto Lock = [&]() {
- for (int i = 0; i < ntimes; ++i) {
- mutex.lock();
- mutex.unlock();
- }
- };
- printf("\nTesting %s lock/unlock %d times\n", name.c_str(), ntimes);
- {
- boost::timer::auto_cpu_timer timer;
- printf("1 thread: ");
- Lock();
- }
- auto InThread = [&](int nthread) {
- boost::timer::auto_cpu_timer timer;
- printf("%d threads: ", nthread);
- std::vector<std::thread> vt;
- for (int i = 0; i < nthread; ++i) {
- vt.emplace_back(Lock);
- }
- for (auto &t : vt) {
- t.join();
- }
- };
- InThread(4);
- InThread(16);
- InThread(100);
- InThread(1000);
- };
- NullMutex null_mtx;
- std::mutex std_mtx;
- CasMutex cas_mtx;
- FMutex mfmtx(3);
- boost::interprocess::interprocess_mutex ipc_mutex;
- SemMutex sem_mtx(3);
- LockSpeed(null_mtx, "null mutex");
- LockSpeed(std_mtx, "std::mutex");
- // LockSpeed(cas_mtx, "CAS mutex");
- LockSpeed(ipc_mutex, "boost ipc mutex");
- LockSpeed(mfmtx, "mutex+flock");
- LockSpeed(sem_mtx, "sem mutex");
-
- auto TryLock = [&]() {
- if (mtx->try_lock()) {
- printf("try_lock ok\n");
- return true;
- } else {
- printf("try_lock failed\n");
- return false;
- }
- };
- auto Unlock = [&]() {
- mtx->unlock();
- printf("unlocked\n");
- };
-
- if (mtx) {
- printf("mtx exists\n");
- if (TryLock()) {
- // Sleep(10s);
- auto op = [&]() {
- if (TryLock()) {
- Unlock();
- }
- };
- op();
- std::thread t(op);
- t.join();
- // Unlock();
- } else {
- // mtx->unlock();
- }
- } else {
- printf("mtx not exists\n");
- }
-}
\ No newline at end of file
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index e9131b0..6d3c7a2 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -104,21 +104,6 @@
}
}
-BOOST_AUTO_TEST_CASE(TimedWaitTest)
-{
- SharedMemory &shm = TestShm();
- GlobalInit(shm);
- ShmMsgQueue q(shm, NewSession(), 64);
- for (int i = 0; i < 2; ++i) {
- int ms = i * 100;
- printf("Timeout Test %4d: ", ms);
- boost::timer::auto_cpu_timer timer;
- MsgI msg(shm);
- bool r = q.Recv(msg, ms);
- BOOST_CHECK(!r);
- }
-}
-
BOOST_AUTO_TEST_CASE(RefCountTest)
{
SharedMemory &shm = TestShm();
@@ -126,7 +111,8 @@
GlobalInit(shm);
Msg m0(1000, shm);
- BOOST_CHECK(m0.valid());
+ BOOST_CHECK(!m0.valid());
+ m0.Make(100);
BOOST_CHECK_EQUAL(m0.Count(), 1);
Msg m1 = m0;
BOOST_CHECK(m1.valid());
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
deleted file mode 100644
index f33f0db..0000000
--- a/utest/speed_test.cpp
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: speed_test.cpp
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�03鏈�30鏃� 11鏃�35鍒�21绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "robust.h"
-#include "util.h"
-
-using namespace robust;
-
-BOOST_AUTO_TEST_CASE(SpeedTest)
-{
- SharedMemory &shm = TestShm();
- GlobalInit(shm);
- MQId server_id = NewSession();
- ShmMsgQueue server(server_id, shm, 1000);
-
- const int timeout = 1000;
- const uint32_t data_size = 1001;
- const std::string proc_id = "demo_proc";
- std::atomic<int64_t> nwrite(0);
- std::atomic<int64_t> nread(0);
-
- std::string str(data_size, 'a');
- auto Writer = [&](int writer_id, uint64_t n) {
- MQId cli_id = NewSession();
-
- ShmMsgQueue mq(cli_id, shm, 64);
- MsgI msg(shm);
- MsgRequestTopic body;
- body.set_topic("topic");
- body.set_data(str);
- auto head(InitMsgHead(GetType(body), proc_id, mq.Id()));
- msg.Make(head, body);
- assert(msg.valid());
- DEFER1(msg.Release(););
-
- for (uint64_t i = 0; i < n; ++i) {
- msg.AddRef();
- while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
- ++nwrite;
- }
- };
- auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
- ShmMsgQueue &mq = server;
- auto now = []() { return steady_clock::now(); };
- auto tm = now();
- while (*run) {
- MsgI msg(shm);
- BHMsgHead head;
- if (mq.TryRecv(msg)) {
- DEFER1(msg.Release());
- tm = now();
- ++nread;
- } else if (isfork) {
- if (now() > tm + 1s) {
- exit(0); // for forked quit after 1s.
- }
- }
- }
- };
- auto State = [&](std::atomic<bool> *run) {
- auto init = shm.get_free_memory();
- printf("shm init : %ld\n", init);
- uint64_t last_read = 0;
- while (*run) {
- auto cur = shm.get_free_memory();
- auto cur_read = nread.load();
- printf("shm used : %8ld/%ld, write: %8ld, read: %8ld, speed: %8ld\n", init - cur, init, nwrite.load(), cur_read, cur_read - last_read);
- last_read = cur_read;
- std::this_thread::sleep_for(1s);
- }
- };
-
- int nwriters[] = {1, 10, 100, 1000};
- int nreaders[] = {2};
- const int64_t total_msg = 1000 * 1000;
-
- auto Test = [&](auto &www, auto &rrr, bool isfork) {
- for (auto nreader : nreaders) {
- for (auto nwriter : nwriters) {
- const uint64_t nmsg = total_msg / nwriter;
- std::atomic<bool> run(true);
- std::this_thread::sleep_for(10ms);
- boost::timer::auto_cpu_timer timer;
- for (int i = 0; i < nreader; ++i) {
- rrr.Launch(Reader, i, &run, isfork);
- }
- for (int i = 0; i < nwriter; ++i) {
- www.Launch(Writer, i, nmsg);
- }
- www.WaitAll();
- printf("writer finished\n");
- run.store(false);
- rrr.WaitAll();
- printf("Write %ld msg R(%3d) W(%3d), : ", total_msg, nreader, nwriter);
- }
- }
- };
-
- std::atomic<bool> run(true);
- ThreadManager state;
- state.Launch(State, &run);
- DEFER1(run.store(false););
-
- // typedef ProcessManager Manager;
- // typedef ThreadManager Manager;
- // const bool isfork = IsSameType<Manager, ProcessManager>::value;
-
- if (0) {
- ThreadManager tw, tr;
- printf("---------------- Testing thread io: -------------------------------------------------------\n");
- Test(tw, tr, false);
- }
-
- if (1) {
- ProcessManager pw, pr;
- printf("================ Testing process io: =======================================================\n");
- Test(pw, pr, true);
- }
-}
-
-// Send Recv Test
-BOOST_AUTO_TEST_CASE(SRTest)
-{
- const int qlen = 64;
- const size_t msg_length = 100;
- std::string msg_content(msg_length, 'a');
- msg_content[20] = '\0';
- const std::string client_proc_id = "client_proc";
- const std::string server_proc_id = "server_proc";
-
- SharedMemory &shm = TestShm();
- // shm.Remove();
- // return;
- GlobalInit(shm);
-
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
- ShmSocket srv(shm, NewSession(), qlen);
- ShmSocket cli(shm, NewSession(), qlen);
-
- int ncli = 1;
- uint64_t nmsg = 1000 * 1000 * 1;
- std::atomic<uint64_t> count(0);
-
- std::atomic<int64_t> last_time(NowSec() - 1);
- std::atomic<uint64_t> last_count(0);
-
- auto PrintStatus = [&](int64_t cur) {
- std::cout << "time: " << cur;
- printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
- };
- auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
- ++count;
- auto cur = NowSec();
- if (last_time.exchange(cur) < cur) {
- PrintStatus(cur);
- }
- };
- cli.Start(onRecv, 2);
-
- auto Client = [&](int cli_id, int nmsg) {
- for (int i = 0; i < nmsg; ++i) {
- auto Req = [&]() {
- MsgRequestTopic req_body;
- req_body.set_topic("topic");
- req_body.set_data(msg_content);
- auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
- auto route = req_head.add_route();
- route->set_mq_id(cli.id());
- route->set_abs_addr(cli.AbsAddr());
- return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
- };
-
- Req();
- }
- };
- auto onRequest = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
- if (head.type() == kMsgTypeRequestTopic) {
- MQInfo src_mq = {head.route()[0].mq_id(), head.route()[0].abs_addr()};
-
- MsgRequestTopic reply_body;
- reply_body.set_topic("topic");
- reply_body.set_data(msg_content);
- auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), head.msg_id()));
- srv.Send(src_mq, reply_head, reply_body);
- }
- };
- srv.Start(onRequest);
-
- boost::timer::auto_cpu_timer timer;
- DEFER1(printf("Request Reply Test:"););
-
- ThreadManager clients;
-
- printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
- for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
- clients.WaitAll();
- printf("request ok: %ld\n", count.load());
- do {
- std::this_thread::sleep_for(100ms);
- } while (count.load() < ncli * nmsg);
- PrintStatus(NowSec());
- srv.Stop();
- // BOOST_CHECK_THROW(reply.Count(), int);
-}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 7cb9587..60b490f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -56,294 +56,6 @@
}
}
printf("time: %ld ns\n", (tps.back() - tps.front()).count());
- return;
- // sub topic partial match.
- Topic topics[] = {
- "",
- ".",
- "a",
- "sp",
- "sport",
- "sport.",
- "sport.a",
- "sport.a.b.c",
- "sport.ab.c",
- "sport.basketball",
- "sport.football",
- };
- const char sep = '.';
- auto Adjust = [&](const std::string &user_topic) {
- if (user_topic.empty() || user_topic.back() == sep) {
- return user_topic;
- } else {
- return user_topic + sep;
- }
- };
-
- for (auto &t : topics) {
- const std::string &a = Adjust(t);
- printf("orig: %20s adjusted: %20s parts:[", ("'" + t + "'").c_str(), ('\'' + a + '\'').c_str());
-
- size_t pos = 0;
- while (true) {
- auto &topic = t;
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- printf("'%s',", topic.substr(0, pos).c_str());
- }
- }
- printf("]\n");
- }
-}
-
-BOOST_AUTO_TEST_CASE(PubSubTest)
-{
- SharedMemory &shm = TestShm();
- GlobalInit(shm);
-
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
- int *flag = shm.FindOrCreate<int>("flag", 123);
- printf("flag = %d\n", *flag);
- ++*flag;
- const std::string sub_proc_id = "subscriber";
- const std::string pub_proc_id = "publisher";
-
- BHCenter center(shm);
- center.Start();
-
- Sleep(100ms);
-
- std::atomic<uint64_t> total_count(0);
- std::atomic<int64_t> last_time(NowSec() - 1);
- std::atomic<uint64_t> last_count(0);
-
- const uint64_t nmsg = 100 * 2;
- const int timeout = 1000;
- auto Sub = [&](int id, const std::vector<std::string> &topics) {
- DemoNode client("client_" + std::to_string(id), shm);
- MsgTopicList tlist;
- for (auto &t : topics) {
- tlist.add_topic_list(t);
- }
- MsgCommonReply reply_body;
- bool r = client.Subscribe(tlist, reply_body, timeout);
- if (!r) {
- printf("client subscribe failed.\n");
- }
- std::mutex mutex;
- std::condition_variable cv;
-
- std::atomic<uint64_t> n(0);
- auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) {
- ++total_count;
-
- auto cur = NowSec();
- if (last_time.exchange(cur) < cur) {
- std::cout << "time: " << cur;
- printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
- }
- if (++n >= nmsg * topics.size()) {
- cv.notify_one();
- }
- // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
- };
- client.SubscribeStartWorker(OnTopicData, 1);
-
- std::unique_lock<std::mutex> lk(mutex);
- cv.wait(lk);
- };
-
- auto Pub = [&](const std::string &topic) {
- DemoNode provider("server_" + topic, shm);
-
- for (unsigned i = 0; i < nmsg; ++i) {
- std::string data = topic + std::to_string(i) + std::string(1000, '-');
- MsgPublish pub;
- pub.set_topic(topic);
- pub.set_data(data);
- bool r = provider.Publish(pub, 0);
- if (!r) {
- static std::atomic<int> an(0);
- int n = ++an;
- printf("pub %d ret: %s\n", n, r ? "ok" : "fail");
- }
- }
- };
- ThreadManager threads;
- typedef std::vector<Topic> Topics;
- Topics topics;
- for (int i = 0; i < 100; ++i) {
- topics.push_back("t" + std::to_string(i));
- }
- Topics part;
- boost::timer::auto_cpu_timer pubsub_timer;
- for (size_t i = 0; i < topics.size(); ++i) {
- part.push_back(topics[i]);
- threads.Launch(Sub, i, topics);
- }
- Sleep(100ms);
- for (auto &topic : topics) {
- threads.Launch(Pub, topic);
- }
- threads.Launch(Pub, "some_else");
-
- threads.WaitAll();
-
- printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
-}
-
-namespace
-{
-struct C {
- C() { printf("+C\n"); }
- C(const C &c) { printf("+C(const C&)\n"); }
- void F() { printf("C::F()\n"); }
- ~C() { printf("-C\n"); }
- char arr[100];
-};
-int F(C &c) { return printf(":::::::::::::F()\n"); }
-} // namespace
-
-BOOST_AUTO_TEST_CASE(ReqRepTest)
-{
- SharedMemory &shm = TestShm();
- GlobalInit(shm);
-
- auto Avail = [&]() { return shm.get_free_memory(); };
- auto init_avail = Avail();
- int *flag = shm.FindOrCreate<int>("flag", 123);
- printf("flag = %d\n", *flag);
- ++*flag;
-
- const std::string client_proc_id = "client_proc_";
- const std::string server_proc_id = "server_proc_";
-
- BHCenter center(shm);
- center.Start();
- std::atomic<bool> run(true);
-
- auto Client = [&](const std::string &topic, const int nreq) {
- DemoNode client(client_proc_id + topic, shm);
-
- std::atomic<int> count(0);
- std::string reply;
- auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) {
- reply = msg.data();
- if (++count >= nreq) {
- printf("count: %d\n", count.load());
- }
- };
- MsgRequestTopic req;
- req.set_topic(topic);
- req.set_data("data " + std::string(100, 'a'));
-
- client.ClientStartWorker(onRecv, 2);
-
- boost::timer::auto_cpu_timer timer;
- for (int i = 0; i < nreq; ++i) {
- std::string msg_id;
- if (!client.ClientAsyncRequest(BHAddress(), req, msg_id)) {
- printf("client request failed\n");
- ++count;
- }
-
- // std::string proc_id;
- // MsgRequestTopicReply reply;
- // if (!client.ClientSyncRequest(req, proc_id, reply, 1000)) {
- // printf("client request failed\n");
- // }
- // ++count;
- }
- do {
- std::this_thread::sleep_for(100ms);
- } while (count.load() < nreq);
- client.Stop();
- printf("request %s %d done ", topic.c_str(), count.load());
- };
-
- std::atomic_uint64_t server_msg_count(0);
- auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
- DemoNode server(name, shm);
-
- auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
- ++server_msg_count;
- reply.set_data(request.topic() + ':' + request.data());
- return true;
- };
- auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) {
- ++server_msg_count;
- MsgRequestTopicReply reply;
- reply.set_data(request.topic() + ':' + request.data());
- server.ServerSendReply(src, reply);
- };
- server.ServerStart(onDataAsync);
-
- MsgTopicList rpc;
- for (auto &topic : topics) {
- rpc.add_topic_list(topic);
- }
- MsgCommonReply reply_body;
- if (!server.ServerRegisterRPC(rpc, reply_body, 100)) {
- printf("server register topic failed\n");
- return;
- }
-
- while (run) {
- std::this_thread::sleep_for(100ms);
- }
- };
- ThreadManager clients, servers;
- std::vector<Topic> topics = {"topic1", "topic2"};
- servers.Launch(Server, "server", topics);
- Sleep(100ms);
- for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 100 * 2);
- }
- clients.WaitAll();
- printf("clients done, server replyed: %ld\n", server_msg_count.load());
- run = false;
- servers.WaitAll();
-}
-
-BOOST_AUTO_TEST_CASE(HeartbeatTest)
-{
- const std::string shm_name("ShmHeartbeat");
- ShmRemover auto_remove(shm_name);
- SharedMemory shm(shm_name, 1024 * 1024 * 50);
-
- BHCenter center(shm);
- center.Start();
-
- {
-
- DemoNode node("demo_node", shm);
- auto Check = [&]() {
- bool r = node.Heartbeat(100);
- printf("hearbeat ret : %s\n", r ? "ok" : "failed");
- };
- Check();
- for (int i = 0; i < 3; ++i) {
- Sleep(1s);
- Check();
- }
- Sleep(4s);
- for (int i = 0; i < 2; ++i) {
- Sleep(1s);
- Check();
- }
- }
- Sleep(8s);
-}
-inline int MyMin(int a, int b)
-{
- printf("MyMin\n");
- return a < b ? a : b;
}
int test_main(int argc, char *argv[])
@@ -352,10 +64,6 @@
int a = 0;
int b = 0;
BOOST_CHECK_EQUAL(a, b);
- int n = MyMin(4, 6);
- for (int i = 0; i < n; ++i) {
- printf("i = %d\n", i);
- }
return 0;
}
--
Gitblit v1.8.0