From 7f307880a58012077833061b5ff18ba63c1a2269 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 13 四月 2021 19:04:37 +0800
Subject: [PATCH] change timestamp to steady seconds.
---
src/proto.cpp | 14 ++++++-
utest/api_test.cpp | 32 ++++++++++++++++
box/center.cpp | 45 +++++++++++-----------
src/proto.h | 12 ++++++
.vscode/settings.json | 4 +
5 files changed, 82 insertions(+), 25 deletions(-)
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 6a4497a..97450e9 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -69,6 +69,8 @@
},
"cmake.configureOnOpen": false,
"C_Cpp.default.includePath": [
- "build/proto"
+ "build/proto",
+ "src",
+ "box"
]
}
\ No newline at end of file
diff --git a/box/center.cpp b/box/center.cpp
index cde865f..f9044d4 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -33,10 +33,6 @@
namespace
{
-typedef steady_clock::time_point TimePoint;
-typedef steady_clock::duration Duration;
-inline TimePoint Now() { return steady_clock::now(); };
-inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); };
//TODO check proc_id
class NodeCenter
@@ -56,13 +52,13 @@
};
struct ProcState {
- TimePoint timestamp_;
+ int64_t timestamp_;
uint32_t flag_ = 0; // reserved
- void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time)
+ void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
{
auto diff = now - timestamp_;
#ifndef NDEBUG
- printf("diff: %ld\n", Seconds(diff));
+ printf("state %p diff: %ld\n", this, diff);
#endif
if (diff < offline_time) {
flag_ = kStateNormal;
@@ -93,12 +89,19 @@
inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
+ NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
+ id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
+
public:
typedef std::set<TopicDest> Clients;
- NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) :
- id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {}
- const std::string &id() const { return id_; } // no need to lock.
+ NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) :
+ NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {}
+
+ const std::string &id() const
+ {
+ return id_;
+ } // no need to lock.
//TODO maybe just return serialized string.
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -114,8 +117,8 @@
node->addrs_.insert(addr.mq_id());
}
node->proc_.Swap(msg.mutable_proc());
- node->state_.timestamp_ = Now();
- node->state_.flag_ = kStateNormal;
+ node->state_.timestamp_ = head.timestamp();
+ node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
nodes_[node->proc_.proc_id()] = node;
return MakeReply(eSuccess);
} catch (...) {
@@ -172,9 +175,8 @@
{
return HandleMsg(head, [&](Node node) {
NodeInfo &ni = *node;
- auto now = Now();
- ni.state_.timestamp_ = now;
- ni.state_.flag_ = kStateNormal;
+ ni.state_.timestamp_ = head.timestamp();
+ ni.state_.UpdateState(NowSec(), offline_time_, kill_time_);
auto &info = msg.proc();
if (!info.public_info().empty()) {
@@ -307,9 +309,8 @@
private:
void CheckNodes()
{
- auto now = Now();
- if (Seconds(now - last_check_time_) < 1) { return; }
-
+ auto now = NowSec();
+ if (now - last_check_time_ < 1) { return; }
last_check_time_ = now;
auto it = nodes_.begin();
@@ -348,9 +349,9 @@
std::unordered_map<Topic, Clients> subscribe_map_;
std::unordered_map<ProcId, Node> nodes_;
Cleaner cleaner_; // remove mqs.
- Duration offline_time_;
- Duration kill_time_;
- TimePoint last_check_time_;
+ int64_t offline_time_;
+ int64_t kill_time_;
+ int64_t last_check_time_;
};
template <class Body, class OnMsg, class Replyer>
@@ -383,7 +384,7 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
- auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3);
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
auto center_failed_q = std::make_shared<FailedMsgQ>();
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
return [&](auto &&rep_body) {
diff --git a/src/proto.cpp b/src/proto.cpp
index 0ec894f..287924b 100644
--- a/src/proto.cpp
+++ b/src/proto.cpp
@@ -17,12 +17,18 @@
*/
#include "proto.h"
#include <boost/uuid/uuid_generators.hpp>
+#include <chrono>
+
+namespace
+{
std::string RandId()
{
boost::uuids::uuid id = boost::uuids::random_generator()();
return std::string((char *) &id, sizeof(id));
}
+
+} // namespace
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
{
@@ -35,7 +41,11 @@
msg.set_msg_id(msgid);
msg.set_type(type);
msg.set_proc_id(proc_id);
- time_t tm = 0;
- msg.set_timestamp(time(&tm));
+ msg.set_timestamp(NowSec());
return msg;
}
+
+bool IsMsgExpired(const BHMsgHead &head)
+{
+ return NowSec() > head.timestamp() + 10;
+}
\ No newline at end of file
diff --git a/src/proto.h b/src/proto.h
index fff19ac..42fe343 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -20,6 +20,7 @@
#include "bhome_msg.pb.h"
#include "bhome_msg_api.pb.h"
+#include <chrono>
using namespace bhome::msg;
@@ -76,4 +77,15 @@
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
+bool IsMsgExpired(const BHMsgHead &head);
+
+inline int64_t CountSeconds(const std::chrono::steady_clock::time_point tp)
+{
+ return std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count();
+}
+inline int64_t NowSec()
+{
+ return CountSeconds(std::chrono::steady_clock::now());
+}
+
#endif // end of include guard: PROTO_UA9UWKL1
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 40ed2a1..113bb99 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -15,8 +15,40 @@
*
* =====================================================================================
*/
+#include "bh_api.h"
#include "util.h"
+
+class DemoClient
+{
+public:
+};
BOOST_AUTO_TEST_CASE(ApiTest)
{
+ auto max_time = std::chrono::steady_clock::time_point::max();
+ auto dur = max_time.time_since_epoch();
+ auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
+ auto nmin = nsec / 60;
+ auto nhour = nmin / 60;
+ auto nday = nhour / 24;
+ auto years = nday / 365;
+ printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n",
+ nsec, nhour, nday, years);
+ std::chrono::steady_clock::duration a(123456);
+ printf("nowsec: %ld\n", NowSec());
+ // for (int i = 0; i < 5; ++i) {
+ // std::this_thread::sleep_for(1s);
+ // printf("nowsec: %ld\n", NowSec());
+ // }
+
+ printf("maxsec: %ld\n", CountSeconds(max_time));
+
+ ProcInfo proc;
+ proc.set_proc_id("demo_client");
+ proc.set_public_info("public info of demo_client. etc...");
+ std::string proc_buf(proc.SerializeAsString());
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000);
+ printf("register %s\n", r ? "ok" : "failed");
}
\ No newline at end of file
--
Gitblit v1.8.0