lichao
2021-05-20 6b96ee857e366af317ab9dc1b6b4f9c22576e83b
add sleeper, to reduce cpu usage.
2个文件已添加
4个文件已修改
91 ■■■■ 已修改文件
src/fib.h 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sleeper.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/fib.h
New file
@@ -0,0 +1,43 @@
#ifndef FIB_COLLJGBP
#define FIB_COLLJGBP
// fibonacci sequence.
// 0,1,1,2,3,5,8,13,21,34,...
class FibSeq
{
public:
    typedef uint64_t int_type;
    FibSeq(int_type limit) :
        m_limit(limit), m_cur(0), m_next(1) {}
    void Reset()
    {
        m_cur = 0;
        m_next = 1;
    }
    void ResetLimit(const int_type v)
    {
        m_limit = v;
        Reset();
    }
    int_type Limit() const { return m_limit; }
    int_type Cur() const { return m_cur; }
    int_type Inc()
    {
        if (m_next < m_limit) {
            m_next = m_next + m_cur;
            m_cur = m_next - m_cur;
        } else {
            m_cur = m_limit;
        }
        return Cur();
    }
private:
    int_type m_limit;
    int_type m_cur;
    int_type m_next;
};
#endif // end of include guard: FIB_COLLJGBP
src/shm_queue.h
@@ -46,7 +46,7 @@
            if (TryRead(d)) {
                return true;
            } else {
                robust::QuickSleep();
                std::this_thread::sleep_for(1ms);
            }
        } while (steady_clock::now() < end_time);
        return false;
@@ -75,7 +75,6 @@
private:
    Circular<D> queue_;
    // bhome_shm::Mutex mutex_;
};
template <int Power = 4>
@@ -96,7 +95,7 @@
                    return true;
                }
            }
            robust::QuickSleep();
            std::this_thread::sleep_for(1ms);
        } while (steady_clock::now() < end_time);
        return false;
    }
src/shm_socket.cpp
@@ -20,7 +20,9 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "sleeper.h"
#include <chrono>
using namespace std::chrono;
using namespace std::chrono_literals;
@@ -90,11 +92,15 @@
        };
        try {
            thread_local FibUSleeper sleeper(1000 * 10);
            bool more_to_send = DoSend();
            bool more_to_recv = DoRecv();
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                robust::QuickSleep();
                sleeper.Sleep();
            } else {
                sleeper.Reset();
            }
        } catch (...) {
        }
src/sleeper.h
New file
@@ -0,0 +1,23 @@
#ifndef SLEEPER_BWPGKQCV
#define SLEEPER_BWPGKQCV
#include "fib.h"
#include <chrono>
#include <thread>
class FibUSleeper : private FibSeq
{
public:
    FibUSleeper(int_type limit) :
        FibSeq(limit) {}
    void Sleep() { std::this_thread::sleep_for(std::chrono::microseconds(Inc())); }
    using FibSeq::Cur;
    using FibSeq::Limit;
    using FibSeq::Reset;
    using FibSeq::ResetLimit;
protected:
    using FibSeq::Inc;
};
#endif // end of include guard: SLEEPER_BWPGKQCV
src/topic_node.cpp
@@ -17,6 +17,7 @@
 */
#include "topic_node.h"
#include "bh_util.h"
#include "sleeper.h"
#include <chrono>
#include <list>
@@ -51,7 +52,6 @@
TopicNode::~TopicNode()
{
    LOG_DEBUG() << "~TopicNode()";
    Stop();
}
@@ -140,9 +140,7 @@
}
void TopicNode::Stop()
{
    LOG_DEBUG() << "Node Stopping";
    for (auto &p : sockets_) { p->Stop(); }
    LOG_INFO() << "Node Stopped";
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
@@ -399,10 +397,11 @@
    }
    BHMsgHead head;
    std::string body;
    FibUSleeper sleeper(1000 * 10);
    auto end_time = steady_clock::now() + milliseconds(timeout_ms);
    while (!server_buffer_->Read(head, body)) {
        if (steady_clock::now() < end_time) {
            robust::QuickSleep();
            sleeper.Sleep();
        } else {
            return false;
        }
@@ -677,10 +676,11 @@
    BHMsgHead head;
    std::string body;
    FibUSleeper sleeper(1000 * 10);
    auto end_time = steady_clock::now() + milliseconds(timeout_ms);
    while (!sub_buffer_->Read(head, body)) {
        if (steady_clock::now() < end_time) {
            robust::QuickSleep();
            sleeper.Sleep();
        } else {
            return false;
        }
utest/api_test.cpp
@@ -372,7 +372,7 @@
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const int64_t nreq = 1; //000 * 100;
    const int64_t nreq = 1000 * 100;
    for (int i = 0; i < 10; ++i) {
        SyncRequest(i);