From 6eefba812ede29549af3633c490f2e85a4805524 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 31 三月 2021 11:24:20 +0800
Subject: [PATCH] format code style.

---
 src/pubsub.cpp |  248 ++++++++++++++++++++++++------------------------
 1 files changed, 124 insertions(+), 124 deletions(-)

diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index a0dc4e9..d5c7dd2 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,170 +16,170 @@
  * =====================================================================================
  */
 #include "pubsub.h"
-#include <chrono>
 #include "bh_util.h"
 #include "defs.h"
+#include <chrono>
 
-namespace bhome_shm {
+namespace bhome_shm
+{
 
 using namespace std::chrono_literals;
 const int kMaxWorker = 16;
 using namespace bhome_msg;
 
-BusManager::BusManager(SharedMemory &shm):
-shm_(shm),
-busq_(kBHBusQueueId, shm, 16),
-run_(false)
+BusManager::BusManager(SharedMemory &shm) :
+    shm_(shm),
+    busq_(kBHBusQueueId, shm, 16),
+    run_(false)
 {
 }
-	
+
 BusManager::~BusManager()
 {
-    Stop();
+	Stop();
 }
 
 bool BusManager::Start(const int nworker)
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    StopNoLock();
-    // start
-    auto Worker = [&](){
-        while (this->run_) {
-            BusManager &self = *this;
-            MsgI msg;
-            const int timeout_ms = 100;
-            if (self.busq_.Recv(msg, timeout_ms)) {
-                self.OnMsg(msg);
-            }
-        }
-    };
+	std::lock_guard<std::mutex> guard(mutex_);
+	StopNoLock();
+	// start
+	auto Worker = [&]() {
+		while (this->run_) {
+			BusManager &self = *this;
+			MsgI msg;
+			const int timeout_ms = 100;
+			if (self.busq_.Recv(msg, timeout_ms)) {
+				self.OnMsg(msg);
+			}
+		}
+	};
 
-    run_.store(true);
-    const int n = std::min(nworker, kMaxWorker);
-    for (int i = 0; i < n; ++i) {
-        workers_.emplace_back(Worker);
-    }
-    return true;
+	run_.store(true);
+	const int n = std::min(nworker, kMaxWorker);
+	for (int i = 0; i < n; ++i) {
+		workers_.emplace_back(Worker);
+	}
+	return true;
 }
 
 bool BusManager::Stop()
 {
-    std::lock_guard<std::mutex> guard(mutex_);
-    return StopNoLock();
+	std::lock_guard<std::mutex> guard(mutex_);
+	return StopNoLock();
 }
 
 bool BusManager::StopNoLock()
 {
-    if (run_.exchange(false)) {
-        for (auto &w: workers_) {
-            if (w.joinable()) {
-                w.join();
-            }
-        }
-        return true;
-    }    
-    return false;
+	if (run_.exchange(false)) {
+		for (auto &w : workers_) {
+			if (w.joinable()) {
+				w.join();
+			}
+		}
+		return true;
+	}
+	return false;
 }
 
 void BusManager::OnMsg(MsgI &imsg)
 {
-    DEFER1(imsg.Release(shm_));
+	DEFER1(imsg.Release(shm_));
 
-    BHMsg msg;
-    if (!imsg.Unpack(msg)) {
-        return;
-    }
+	BHMsg msg;
+	if (!imsg.Unpack(msg)) {
+		return;
+	}
 
-    auto OnSubChange = [&](auto &&update) {
-        DataSub sub;
-        if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
-            assert(sizeof(MQId) == msg.route(0).mq_id().size());
-            MQId client;
-            memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
+	auto OnSubChange = [&](auto &&update) {
+		DataSub sub;
+		if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
+			assert(sizeof(MQId) == msg.route(0).mq_id().size());
+			MQId client;
+			memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
 
-            std::lock_guard<std::mutex> guard(mutex_);
-            auto &topics = sub.topics();
-            for (auto &topic : topics) {
-                try {
-                    update(topic, client);
-                } catch(...) {
-                    //TODO log error
-                }
-            }
-        }
-    };
+			std::lock_guard<std::mutex> guard(mutex_);
+			auto &topics = sub.topics();
+			for (auto &topic : topics) {
+				try {
+					update(topic, client);
+				} catch (...) {
+					//TODO log error
+				}
+			}
+		}
+	};
 
-    auto Sub1 = [this](const std::string &topic, const MQId &id) {
-        records_[topic].insert(id);
-    };
+	auto Sub1 = [this](const std::string &topic, const MQId &id) {
+		records_[topic].insert(id);
+	};
 
-    auto Unsub1 = [this](const std::string &topic, const MQId &id) {
-        auto pos = records_.find(topic);
-        if (pos != records_.end()) {
-            if (pos->second.erase(id) && pos->second.empty()) {
-                records_.erase(pos);
-            }
-        }
-    };
+	auto Unsub1 = [this](const std::string &topic, const MQId &id) {
+		auto pos = records_.find(topic);
+		if (pos != records_.end()) {
+			if (pos->second.erase(id) && pos->second.empty()) {
+				records_.erase(pos);
+			}
+		}
+	};
 
-    auto OnPublish = [&]() {
-        DataPub pub;
-        if (!pub.ParseFromString(msg.body())) {
-            return;
-        }
-        auto FindClients = [&](const std::string &topic){
-            Clients dests;
-            std::lock_guard<std::mutex> guard(mutex_);
-            auto Find1 = [&](const std::string &t) {
-                auto pos = records_.find(topic);
-                if (pos != records_.end() && !pos->second.empty()) {
-                    auto &clients = pos->second;
-                    for (auto &cli : clients) {
-                        dests.insert(cli);
-                    }
-                }
-            };
-            Find1(topic);
+	auto OnPublish = [&]() {
+		DataPub pub;
+		if (!pub.ParseFromString(msg.body())) {
+			return;
+		}
+		auto FindClients = [&](const std::string &topic) {
+			Clients dests;
+			std::lock_guard<std::mutex> guard(mutex_);
+			auto Find1 = [&](const std::string &t) {
+				auto pos = records_.find(topic);
+				if (pos != records_.end() && !pos->second.empty()) {
+					auto &clients = pos->second;
+					for (auto &cli : clients) {
+						dests.insert(cli);
+					}
+				}
+			};
+			Find1(topic);
 
-            //TODO check and adjust topic on client side sub/pub.
-            size_t pos = 0;
-            while (true) {
-                pos = topic.find(kTopicSep, pos);
-                if (pos == topic.npos || ++pos == topic.size()) {
-                    // Find1(std::string()); // sub all.
-                    break;
-                } else {
-                    Find1(topic.substr(0, pos));
-                }
-            }
-            return dests;
-        };
+			//TODO check and adjust topic on client side sub/pub.
+			size_t pos = 0;
+			while (true) {
+				pos = topic.find(kTopicSep, pos);
+				if (pos == topic.npos || ++pos == topic.size()) {
+					// Find1(std::string()); // sub all.
+					break;
+				} else {
+					Find1(topic.substr(0, pos));
+				}
+			}
+			return dests;
+		};
 
-        auto Dispatch = [&](auto &&send1) {
-            const Clients &clients(FindClients(pub.topic()));
-            for (auto &cli : clients) {
-                send1(cli);
-            }
-        };
+		auto Dispatch = [&](auto &&send1) {
+			const Clients &clients(FindClients(pub.topic()));
+			for (auto &cli : clients) {
+				send1(cli);
+			}
+		};
 
-        if (imsg.IsCounted()) {
-            Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
-        } else {
-            MsgI pubmsg;
-            if (!pubmsg.MakeRC(shm_, msg)) { return; }
-            DEFER1(pubmsg.Release(shm_));
+		if (imsg.IsCounted()) {
+			Dispatch([&](const MQId &cli) { busq_.Send(cli, imsg, 100); });
+		} else {
+			MsgI pubmsg;
+			if (!pubmsg.MakeRC(shm_, msg)) { return; }
+			DEFER1(pubmsg.Release(shm_));
 
-            Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
-        }
-    };
+			Dispatch([&](const MQId &cli) { busq_.Send(cli, pubmsg, 100); });
+		}
+	};
 
-    switch (msg.type()) {
-        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
-        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
-        case kMsgTypePublish : OnPublish(); break;
-        default: break;
-    }
+	switch (msg.type()) {
+	case kMsgTypeSubscribe: OnSubChange(Sub1); break;
+	case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
+	case kMsgTypePublish: OnPublish(); break;
+	default: break;
+	}
 }
 
 } // namespace bhome_shm
-

--
Gitblit v1.8.0