From 101b5cf85397ef9350aaedd12cfcf9fd3d07a565 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 20 五月 2021 12:41:51 +0800
Subject: [PATCH] refactor node center.
---
src/shm.h | 233 ++++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 177 insertions(+), 56 deletions(-)
diff --git a/src/shm.h b/src/shm.h
index 91a339d..b5ec2ea 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -19,84 +19,205 @@
#ifndef SHM_6CHO6D6C
#define SHM_6CHO6D6C
-#include <boost/noncopyable.hpp>
-#include <boost/uuid/uuid.hpp>
+#include "log.h"
+#include <atomic>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
-#include <boost/interprocess/sync/interprocess_condition.hpp>
-#include <boost/interprocess/sync/scoped_lock.hpp>
+#include <boost/noncopyable.hpp>
+#include <thread>
-namespace bhome_shm {
+namespace bhome_shm
+{
using namespace boost::interprocess;
typedef managed_shared_memory mshm_t;
+
+class MutexWithPidCheck
+{
+ typedef boost::interprocess::interprocess_mutex MutexT;
+ static pid_t pid()
+ {
+ static pid_t val = getpid();
+ return val;
+ }
+ static bool Killed(pid_t pid)
+ {
+ char buf[64] = {0};
+ snprintf(buf, sizeof(buf) - 1, "/proc/%d/stat", pid);
+ return access(buf, F_OK) != 0;
+ }
+ bool PidCas(pid_t exp, pid_t val) { return pid_.compare_exchange_strong(exp, val); }
+ MutexT mutex_;
+ std::atomic<pid_t> pid_;
+
+public:
+ typedef MutexT::internal_mutex_type internal_mutex_type;
+ const internal_mutex_type &internal_mutex() const { return mutex_.internal_mutex(); }
+ internal_mutex_type &internal_mutex() { return mutex_.internal_mutex(); }
+ MutexWithPidCheck() :
+ pid_(0) {}
+ bool try_lock()
+ {
+ bool r = false;
+ if (mutex_.try_lock()) {
+ auto old = pid_.load();
+ r = PidCas(old, pid());
+ } else {
+ auto old = pid_.load();
+ if (Killed(old)) {
+ r = PidCas(old, pid());
+ if (r) {
+ LOG_DEBUG() << "PidCheck captured pid " << old << " -> " << pid();
+ }
+ }
+ }
+ return r;
+ }
+
+ void lock()
+ {
+ while (!try_lock()) {
+ std::this_thread::yield();
+ }
+ }
+ void unlock()
+ {
+ auto old = pid_.load();
+ if (old == pid()) {
+ mutex_.unlock();
+ }
+ }
+};
+
typedef interprocess_mutex Mutex;
typedef scoped_lock<Mutex> Guard;
-typedef interprocess_condition Cond;
+// typedef robust::Guard<Mutex> Guard;
class SharedMemory : public mshm_t
{
- std::string name_;
+ std::string name_;
+ Mutex *pmutex_ = 0;
- static permissions AllowAll() {
- permissions perm;
- perm.set_unrestricted();
- return perm;
- }
+ static permissions AllowAll()
+ {
+ permissions perm;
+ perm.set_unrestricted();
+ return perm;
+ }
+ void Swap(SharedMemory &a);
+
public:
- static bool Remove(const std::string &name) {
- return shared_memory_object::remove(name.c_str());
- }
- SharedMemory(const std::string &name, const uint64_t size):
- mshm_t(open_or_create, name.c_str(), size, 0, AllowAll()),
- name_(name)
- {}
- std::string name() const { return name_; }
- bool Remove() { return Remove(name()); }
- template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
- template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
- template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
+ static bool Remove(const std::string &name) { return shared_memory_object::remove(name.c_str()); }
+ SharedMemory(const std::string &name, const uint64_t size);
+ ~SharedMemory();
+ std::string name() const { return name_; }
+ bool Remove() { return Remove(name()); }
+ template <class T, class... Params>
+ T *FindOrCreate(const std::string &name, Params &&...params)
+ {
+ return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
+ template <class T, class... Params>
+ T *Create(const std::string &name, Params &&...params)
+ {
+ return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
+ void *Alloc(const size_t size)
+ {
+ Guard lock(*pmutex_);
+ return allocate(size, std::nothrow);
+ }
+ void Dealloc(void *p)
+ {
+ Guard lock(*pmutex_);
+ if (p) { deallocate(p); }
+ }
+ template <class T>
+ void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
+
+ template <class T, class... Params>
+ T *New(Params &&...params)
+ {
+ Guard lock(*pmutex_);
+ return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
+ template <class T>
+ void Delete(T *p)
+ {
+ Guard lock(*pmutex_);
+ if (p) { destroy_ptr<T>(p); };
+ }
+ template <class T>
+ void Delete(offset_ptr<T> p) { Delete(p.get()); }
+ template <class T>
+ T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
};
+
+template <class D>
+using Allocator = allocator<D, SharedMemory::segment_manager>;
+template <class D>
+using Deleter = deleter<D, SharedMemory::segment_manager>;
+template <class D>
+using SharedPtr = shared_ptr<D, Allocator<void>, Deleter<D>>;
// ShmObject manages an object in shared memory, but ShmObject itself is not in shared memory.
-// works like a smart pointer of an object in shared memory.
template <class T>
-class ShmObject : private boost::noncopyable {
- static std::string ObjName(const std::string &name) { return "obj" + name; }
-protected:
- typedef T Data;
- typedef SharedMemory ShmType;
-private:
- ShmType &shm_;
- std::string name_;
- Data *pdata_ = nullptr;
+class ShmObject : private boost::noncopyable
+{
+ static std::string ObjName(const std::string &name) { return "obj" + name; }
- bool IsOk() const { return pdata_; }
-protected:
- ShmType &shm() { return shm_; }
public:
- template <class...Params>
- ShmObject(ShmType &segment, const std::string &name, Params&&...t):
- shm_(segment), name_(name)
- {
- pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
- if (!IsOk()) {
- throw("shm error: " + name_);
- }
- }
- Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; }
- virtual ~ShmObject() {}
- std::string name() const { return name_; }
- Data* data() { return pdata_; }
- const Data* data() const { return pdata_; }
- Data* operator->() { return data(); }
- const Data* operator->() const { return data(); }
- virtual bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
-};
+ typedef T Data;
+ typedef SharedMemory ShmType;
+ ShmType &shm() const { return shm_; }
-template <class D> using Allocator = allocator<D, SharedMemory::segment_manager>;
+ template <class... Params>
+ ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
+ shm_(segment), name_(name)
+ {
+ pdata_ = shm_.FindOrCreate<Data>(ObjName(name_), std::forward<decltype(t)>(t)...);
+ if (!IsOk()) {
+ throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
+ }
+ }
+ template <class... Params>
+ ShmObject(ShmType &segment, const bool create_or_else_find, const std::string &name, Params &&...t) :
+ shm_(segment), name_(name)
+ {
+ if (create_or_else_find) {
+ pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...);
+ } else {
+ pdata_ = shm_.Find<Data>(ObjName(name_));
+ }
+ }
+ ShmObject(const int64_t offset, ShmType &segment, const std::string &name) :
+ shm_(segment), name_(name)
+ {
+ pdata_ = reinterpret_cast<Data *>(Addr(shm_.get_address()) + offset);
+ }
+
+ bool IsOk() const { return pdata_; }
+
+ static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
+ static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
+ Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
+ virtual ~ShmObject() {}
+ std::string name() const { return name_; }
+ Data *data() { return pdata_; }
+ const Data *data() const { return pdata_; }
+ int64_t offset() const { return Addr(pdata_) - Addr(shm_.get_address()); }
+ Data *operator->() { return data(); }
+ const Data *operator->() const { return data(); }
+ bool Remove() { return Remove(shm_, name_); }
+
+private:
+ static int64_t Addr(const void *p) { return reinterpret_cast<int64_t>(p); }
+ ShmType &shm_;
+ std::string name_;
+ Data *pdata_ = nullptr;
+};
} // namespace bhome_shm
--
Gitblit v1.8.0