fujuntang
2021-11-10 c479ef57baaaa28964fc3ec8d80ff99dffa7d49f
src/socket/bus_server_socket.h
@@ -3,11 +3,12 @@
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include "msg_mgr.h"
#include "socket_def.h"
#include <set>
@@ -18,6 +19,43 @@
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
typedef struct _LinkNode
{
  int data;
  int data_fix;
  int count;
  _LinkNode *next;
} LinkNode;
class list
{
private:
  LinkNode *head;
public:
  list() {head = NULL;};
  void Insert(int aDate, int bDate);
  void Delete(int Data);
  int dataFixGet(int data);
  int dataGet(int data);
  void dataSet(int data, int val);
  int NodeNum(void);
  int nodeGet(int index);
  LinkNode *getHead() {return head;};
};
class BusServerSocket {
private:
@@ -25,17 +63,21 @@
  // pthread_t recv_thread;
  // <主题, 订阅者>
   SHMTopicSubMap *topic_sub_map;
  recvbuf_data recvBuf_data;
private:
   int  destroy();
   void _proxy_sub( char *topic, int key);
   void _proxy_pub( char *topic, void *buf, size_t size, int key);
   void *run_pubsub_proxy();
   int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
   void _proxy_pub( char *topic, char *buf, size_t size, int key);
   int _run_proxy_();
   // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
   void _proxy_desub( char *topic, int key);
   void _proxy_desub_all(int key);
   static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
  void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
  static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
   // static bool include_in_keys(int key, int keys[], size_t length);
public:
@@ -64,6 +106,7 @@
    * @return 0 成功, 其他值 失败的错误码
   */
   int  start();
    int get_data(int val);
   /**
    * 停止bus
@@ -71,14 +114,18 @@
    * @return 0 成功, 其他值 失败的错误码
   */
   int  stop();
   int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag);
    void remove_proc(int val);
   /**
    * 获取soket key
    */
   int get_key() ;
  void _data_remove(int val);
  void buf_data_set(int data, int val);
  void buf_data_remove(int data);
};