From 7ecd6323ffedbfef92c87c02b2a8680dd53b772c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 06 五月 2021 19:37:50 +0800
Subject: [PATCH] rename atomic queue io function.
---
utest/api_test.cpp | 206 ++++-----------------------------------------------
1 files changed, 18 insertions(+), 188 deletions(-)
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index debe8ad..bd59c7f 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -16,8 +16,10 @@
* =====================================================================================
*/
#include "bh_api.h"
+#include "robust.h"
#include "util.h"
#include <atomic>
+#include <boost/lockfree/queue.hpp>
using namespace bhome_msg;
@@ -49,7 +51,6 @@
static MsgStatus st;
return st;
}
-} // namespace
void SubRecvProc(const void *proc_id,
const int proc_id_len,
@@ -59,7 +60,7 @@
std::string proc((const char *) proc_id, proc_id_len);
MsgPublish pub;
pub.ParseFromArray(data, data_len);
- // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
+ printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
}
void ServerProc(const void *proc_id,
@@ -96,180 +97,10 @@
// printf("client Recv reply : %s\n", reply.data().c_str());
}
-class TLMutex
-{
- // typedef boost::interprocess::interprocess_mutex MutexT;
- typedef CasMutex MutexT;
- // typedef std::mutex MutexT;
- typedef std::chrono::steady_clock Clock;
- typedef Clock::duration Duration;
- static Duration Now() { return Clock::now().time_since_epoch(); }
-
- const Duration limit_;
- std::atomic<Duration> last_lock_time_;
- MutexT mutex_;
-
-public:
- struct Status {
- int64_t nlock_ = 0;
- int64_t nupdate_time_fail = 0;
- int64_t nfail = 0;
- int64_t nexcept = 0;
- };
- Status st_;
-
- explicit TLMutex(Duration limit) :
- limit_(limit) {}
- TLMutex() :
- TLMutex(std::chrono::seconds(1)) {}
- ~TLMutex() { static_assert(std::is_pod<Duration>::value); }
- bool try_lock()
- {
- if (mutex_.try_lock()) {
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
- } else {
- last_lock_time_.store(Now());
- return true;
- }
- } else {
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- return last_lock_time_.compare_exchange_strong(old_time, Now());
- } else {
- return false;
- }
- }
- }
- void lock()
- {
- int n = 0;
- while (!try_lock()) {
- n++;
- std::this_thread::yield();
- }
- st_.nlock_ += n;
- }
- void unlock()
- {
- auto old_time = last_lock_time_.load();
- if (Now() - old_time > limit_) {
- } else {
- if (last_lock_time_.compare_exchange_strong(old_time, Now())) {
- mutex_.unlock();
- }
- }
- }
-};
-
-namespace
-{
-typedef int64_t Offset;
-Offset Addr(void *ptr) { return reinterpret_cast<Offset>(ptr); }
-void *Ptr(const Offset offset) { return reinterpret_cast<void *>(offset); }
} // namespace
-
-class RobustMutex
-{
-public:
- RobustMutex()
- {
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setrobust(&attr, 1);
- pthread_mutex_init(mtx(), &attr);
- if (!valid()) {
- throw("init mutex error.");
- }
- }
- int TryLock() { return pthread_mutex_trylock(mtx()); }
- int Lock() { return pthread_mutex_lock(mtx()); }
- int Unlock() { return pthread_mutex_unlock(mtx()); }
- bool valid() const { return false; }
-
-private:
- pthread_mutex_t *mtx() { return &mutex_; }
- pthread_mutex_t mutex_;
-};
-
-BOOST_AUTO_TEST_CASE(MutexTest)
-{
- SharedMemory &shm = TestShm();
- GlobalInit(shm);
-
- const std::string mtx_name("test_mutex");
- const std::string int_name("test_int");
- auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
- auto pi = shm.FindOrCreate<int>(int_name, 100);
-
- typedef std::chrono::steady_clock Clock;
- auto Now = []() { return Clock::now().time_since_epoch(); };
- if (pi) {
- auto old = *pi;
- printf("int : %d, add1: %d\n", old, ++*pi);
- }
-
- {
- boost::timer::auto_cpu_timer timer;
- printf("test time: ");
- TLMutex mutex;
- // CasMutex mutex;
- auto Lock = [&]() {
- for (int i = 0; i < 10; ++i) {
- mutex.lock();
- mutex.unlock();
- }
- };
- std::thread t1(Lock), t2(Lock);
- t1.join();
- t2.join();
- printf("mutex nlock: %ld, update time error: %ld, normal fail: %ld, error wait: %ld\n",
- mutex.st_.nlock_,
- mutex.st_.nupdate_time_fail,
- mutex.st_.nfail,
- mutex.st_.nexcept);
- }
-
- auto MSFromNow = [](const int ms) {
- using namespace boost::posix_time;
- ptime cur = boost::posix_time::microsec_clock::universal_time();
- return cur + millisec(ms);
- };
-
- auto TryLock = [&]() {
- if (mtx->try_lock()) {
- printf("try_lock ok\n");
- return true;
- } else {
- printf("try_lock failed\n");
- return false;
- }
- };
- auto Unlock = [&]() {
- mtx->unlock();
- printf("unlocked\n");
- };
-
- if (mtx) {
- printf("mtx exists\n");
- if (TryLock()) {
- auto op = [&]() {
- if (TryLock()) {
- Unlock();
- }
- };
- op();
- std::thread t(op);
- t.join();
- // Unlock();
- } else {
- // mtx->unlock();
- }
- } else {
- printf("mtx not exists\n");
- }
-}
+#include <chrono>
+using namespace std::chrono;
+// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(ApiTest)
{
@@ -287,6 +118,8 @@
printf("maxsec: %ld\n", CountSeconds(max_time));
+ // BHCleanup();
+ // return;
bool reg = false;
for (int i = 0; i < 3 && !reg; ++i) {
ProcInfo proc;
@@ -318,7 +151,7 @@
bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
BHFree(reply, reply_len);
// printf("register topic : %s\n", r ? "ok" : "failed");
- Sleep(1s);
+ // Sleep(1s);
}
{ // Subscribe
@@ -334,7 +167,6 @@
printf("subscribe topic : %s\n", r ? "ok" : "failed");
}
- // BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
auto ServerLoop = [&](std::atomic<bool> *run) {
while (*run) {
void *proc_id = 0;
@@ -446,27 +278,23 @@
std::atomic<bool> run(true);
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
ThreadManager threads;
boost::timer::auto_cpu_timer timer;
threads.Launch(hb, &run);
- threads.Launch(ServerLoop, &run);
threads.Launch(showStatus, &run);
int ncli = 10;
- const uint64_t nreq = 1000 * 1;
+ const int64_t nreq = 1000 * 100;
for (int i = 0; i < ncli; ++i) {
- // threads.Launch(asyncRequest, nreq);
+ threads.Launch(asyncRequest, nreq);
}
-
- for (int i = 0; i < 10; ++i) {
- SyncRequest(i);
- }
- // run.store(false);
- // server_thread.join();
- // return;
+ // for (int i = 0; i < 100; ++i) {
+ // SyncRequest(0);
+ // }
int same = 0;
int64_t last = 0;
- while (last < nreq * ncli && same < 1) {
+ while (last < nreq * ncli && same < 2) {
Sleep(1s, false);
auto cur = Status().nreply_.load();
if (last == cur) {
@@ -481,4 +309,6 @@
threads.WaitAll();
auto &st = Status();
printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
+ BHCleanup();
+ printf("after cleanup\n");
}
\ No newline at end of file
--
Gitblit v1.8.0