From 7f307880a58012077833061b5ff18ba63c1a2269 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期二, 13 四月 2021 19:04:37 +0800 Subject: [PATCH] change timestamp to steady seconds. --- src/proto.cpp | 14 ++++++- utest/api_test.cpp | 32 ++++++++++++++++ box/center.cpp | 45 +++++++++++----------- src/proto.h | 12 ++++++ .vscode/settings.json | 4 + 5 files changed, 82 insertions(+), 25 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 6a4497a..97450e9 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -69,6 +69,8 @@ }, "cmake.configureOnOpen": false, "C_Cpp.default.includePath": [ - "build/proto" + "build/proto", + "src", + "box" ] } \ No newline at end of file diff --git a/box/center.cpp b/box/center.cpp index cde865f..f9044d4 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -33,10 +33,6 @@ namespace { -typedef steady_clock::time_point TimePoint; -typedef steady_clock::duration Duration; -inline TimePoint Now() { return steady_clock::now(); }; -inline int64_t Seconds(const Duration &d) { return duration_cast<seconds>(d).count(); }; //TODO check proc_id class NodeCenter @@ -56,13 +52,13 @@ }; struct ProcState { - TimePoint timestamp_; + int64_t timestamp_; uint32_t flag_ = 0; // reserved - void UpdateState(TimePoint now, const Duration &offline_time, const Duration &kill_time) + void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time) { auto diff = now - timestamp_; #ifndef NDEBUG - printf("diff: %ld\n", Seconds(diff)); + printf("state %p diff: %ld\n", this, diff); #endif if (diff < offline_time) { flag_ = kStateNormal; @@ -93,12 +89,19 @@ inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } + NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : + id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} + public: typedef std::set<TopicDest> Clients; - NodeCenter(const std::string &id, const Cleaner &cleaner, const Duration &offline_time, const Duration &kill_time) : - id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(Now()) {} - const std::string &id() const { return id_; } // no need to lock. + NodeCenter(const std::string &id, const Cleaner &cleaner, const steady_clock::duration offline_time, const steady_clock::duration kill_time) : + NodeCenter(id, cleaner, duration_cast<seconds>(offline_time).count(), duration_cast<seconds>(kill_time).count()) {} + + const std::string &id() const + { + return id_; + } // no need to lock. //TODO maybe just return serialized string. MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) @@ -114,8 +117,8 @@ node->addrs_.insert(addr.mq_id()); } node->proc_.Swap(msg.mutable_proc()); - node->state_.timestamp_ = Now(); - node->state_.flag_ = kStateNormal; + node->state_.timestamp_ = head.timestamp(); + node->state_.UpdateState(NowSec(), offline_time_, kill_time_); nodes_[node->proc_.proc_id()] = node; return MakeReply(eSuccess); } catch (...) { @@ -172,9 +175,8 @@ { return HandleMsg(head, [&](Node node) { NodeInfo &ni = *node; - auto now = Now(); - ni.state_.timestamp_ = now; - ni.state_.flag_ = kStateNormal; + ni.state_.timestamp_ = head.timestamp(); + ni.state_.UpdateState(NowSec(), offline_time_, kill_time_); auto &info = msg.proc(); if (!info.public_info().empty()) { @@ -307,9 +309,8 @@ private: void CheckNodes() { - auto now = Now(); - if (Seconds(now - last_check_time_) < 1) { return; } - + auto now = NowSec(); + if (now - last_check_time_ < 1) { return; } last_check_time_ = now; auto it = nodes_.begin(); @@ -348,9 +349,9 @@ std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<ProcId, Node> nodes_; Cleaner cleaner_; // remove mqs. - Duration offline_time_; - Duration kill_time_; - TimePoint last_check_time_; + int64_t offline_time_; + int64_t kill_time_; + int64_t last_check_time_; }; template <class Body, class OnMsg, class Replyer> @@ -383,7 +384,7 @@ bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) { - auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 3); + auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s); auto center_failed_q = std::make_shared<FailedMsgQ>(); auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) { return [&](auto &&rep_body) { diff --git a/src/proto.cpp b/src/proto.cpp index 0ec894f..287924b 100644 --- a/src/proto.cpp +++ b/src/proto.cpp @@ -17,12 +17,18 @@ */ #include "proto.h" #include <boost/uuid/uuid_generators.hpp> +#include <chrono> + +namespace +{ std::string RandId() { boost::uuids::uuid id = boost::uuids::random_generator()(); return std::string((char *) &id, sizeof(id)); } + +} // namespace BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) { @@ -35,7 +41,11 @@ msg.set_msg_id(msgid); msg.set_type(type); msg.set_proc_id(proc_id); - time_t tm = 0; - msg.set_timestamp(time(&tm)); + msg.set_timestamp(NowSec()); return msg; } + +bool IsMsgExpired(const BHMsgHead &head) +{ + return NowSec() > head.timestamp() + 10; +} \ No newline at end of file diff --git a/src/proto.h b/src/proto.h index fff19ac..42fe343 100644 --- a/src/proto.h +++ b/src/proto.h @@ -20,6 +20,7 @@ #include "bhome_msg.pb.h" #include "bhome_msg_api.pb.h" +#include <chrono> using namespace bhome::msg; @@ -76,4 +77,15 @@ BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; } +bool IsMsgExpired(const BHMsgHead &head); + +inline int64_t CountSeconds(const std::chrono::steady_clock::time_point tp) +{ + return std::chrono::duration_cast<std::chrono::seconds>(tp.time_since_epoch()).count(); +} +inline int64_t NowSec() +{ + return CountSeconds(std::chrono::steady_clock::now()); +} + #endif // end of include guard: PROTO_UA9UWKL1 diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 40ed2a1..113bb99 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -15,8 +15,40 @@ * * ===================================================================================== */ +#include "bh_api.h" #include "util.h" + +class DemoClient +{ +public: +}; BOOST_AUTO_TEST_CASE(ApiTest) { + auto max_time = std::chrono::steady_clock::time_point::max(); + auto dur = max_time.time_since_epoch(); + auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count(); + auto nmin = nsec / 60; + auto nhour = nmin / 60; + auto nday = nhour / 24; + auto years = nday / 365; + printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n", + nsec, nhour, nday, years); + std::chrono::steady_clock::duration a(123456); + printf("nowsec: %ld\n", NowSec()); + // for (int i = 0; i < 5; ++i) { + // std::this_thread::sleep_for(1s); + // printf("nowsec: %ld\n", NowSec()); + // } + + printf("maxsec: %ld\n", CountSeconds(max_time)); + + ProcInfo proc; + proc.set_proc_id("demo_client"); + proc.set_public_info("public info of demo_client. etc..."); + std::string proc_buf(proc.SerializeAsString()); + void *reply = 0; + int reply_len = 0; + bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000); + printf("register %s\n", r ? "ok" : "failed"); } \ No newline at end of file -- Gitblit v1.8.0