From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 13 五月 2021 19:34:46 +0800
Subject: [PATCH] center alloc all msgs.

---
 src/socket.h |   90 ++++++++++++++++++++++++++++++++------------
 1 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/src/socket.h b/src/socket.h
index 981677f..d69b8d4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -42,7 +42,7 @@
 public:
 	typedef ShmMsgQueue::MQId MQId;
 	typedef bhome_shm::SharedMemory Shm;
-	typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
+	typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
 	typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
 	typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
 	typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -54,39 +54,74 @@
 	static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
 	bool Remove() { return Remove(shm(), id()); }
 	MQId id() const { return mq().Id(); }
+	void SetNodeProc(const int proc_index, const int socket_index)
+	{
+		node_proc_index_ = proc_index;
+		socket_index_ = socket_index;
+		LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
+	}
 	// start recv.
-	bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
-	bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
-	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
+	bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
+	bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
 	bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
 	bool Stop();
 
 	template <class Body>
-	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
 	{
 		try {
-			if (!cb) {
-				return SendImpl(remote, MsgI::Serialize(head, body));
-			} else {
-				std::string msg_id(head.msg_id());
-				per_msg_cbs_->Store(msg_id, std::move(cb));
-				auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
-					RecvCB cb_no_use;
-					per_msg_cbs_->Pick(msg_id, cb_no_use);
-				};
-				return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
-			}
+			//TODO alloc outsiez and use send.
+			MsgI msg;
+			if (!msg.Make(head, body)) { return false; }
+			DEFER1(msg.Release());
+
+			return Send(remote, msg);
 		} catch (...) {
 			SetLastError(eError, "Send internal error.");
 			return false;
 		}
 	}
 
-	bool Send(const MQId remote, const MsgI &imsg)
-	{
-		return SendImpl(remote, imsg);
-	}
+	bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
 
+	template <class Body>
+	bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+	{
+		std::string msg_id(head.msg_id());
+		std::string content(MsgI::Serialize(head, body));
+		size_t size = content.size();
+		auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+			if (!msg.Fill(content)) { return; }
+
+			try {
+				if (!cb) {
+					Send(remote, msg);
+				} else {
+					per_msg_cbs_->Store(msg_id, std::move(cb));
+					auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+						RecvCB cb_no_use;
+						per_msg_cbs_->Pick(msg_id, cb_no_use);
+					};
+					Send(remote, msg, onExpireRemoveCB);
+				}
+			} catch (...) {
+				SetLastError(eError, "Send internal error.");
+			}
+		};
+
+		return RequestAlloc(size, OnResult);
+	}
+	template <class... T>
+	bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+	{
+		return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
+	}
+	template <class... T>
+	bool Send(const MQId remote, const int64_t cmd, T &&...t)
+	{
+		return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
+	}
+	bool SyncRecv(int64_t &cmd, const int timeout_ms);
 	bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
 
 	template <class Body>
@@ -153,15 +188,15 @@
 	std::atomic<bool> run_;
 
 	Queue mq_;
-	template <class Key>
+	template <class Key, class CB>
 	class CallbackRecords
 	{
-		std::unordered_map<Key, RecvCB> store_;
+		std::unordered_map<Key, CB> store_;
 
 	public:
 		bool empty() const { return store_.empty(); }
-		bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
-		bool Pick(const Key &id, RecvCB &cb)
+		bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+		bool Pick(const Key &id, CB &cb)
 		{
 			auto pos = store_.find(id);
 			if (pos != store_.end()) {
@@ -174,9 +209,14 @@
 		}
 	};
 
-	Synced<CallbackRecords<std::string>> per_msg_cbs_;
+	Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
+	Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
 
 	SendQ send_buffer_;
+	// node request center alloc memory.
+	int node_proc_index_ = -1;
+	int socket_index_ = -1;
+	std::atomic<int> alloc_id_;
 };
 
 #endif // end of include guard: SOCKET_GWTJHBPO

--
Gitblit v1.8.0