fujuntang
2021-12-09 73689afc09ce346f9eb00e02faf7f242e55dc7ee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#ifndef _BUS_SERVER_SOCKET_H_
#define _BUS_SERVER_SOCKET_H_
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include "msg_mgr.h"
#include "socket_def.h"
#include <set>
 
 
 
 
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
 
typedef struct _LinkNode
{
  int data;
  int data_fix;
  int count;
 
  _LinkNode *next; 
} LinkNode;
 
class list
{
 
private:
 
  LinkNode *head;
 
public:
 
  list() {head = NULL;};
  
  void Insert(int aDate, int bDate);
  
  void Delete(int Data);
  
  int dataFixGet(int data);
  
  int dataGet(int data);
  
  void dataSet(int data, int val);
  
  int NodeNum(void);
 
  int nodeGet(int index);
  
  LinkNode *getHead() {return head;};
  
};
 
class BusServerSocket {
private:
    shm_socket_t *shm_socket;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
  recvbuf_data recvBuf_data;
 
private:
    int  destroy();
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, char *buf, size_t size, int key);
    int _run_proxy_();
    // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
      
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
 
  void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
 
  static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    // static bool include_in_keys(int key, int keys[], size_t length);
 
public:
    static size_t remove_subscripters(int keys[], size_t length) ;
    
public:
    BusServerSocket();
    ~BusServerSocket();
     
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int key);
 
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int key);
     
 
    /**
     * 启动bus
     * 
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start();
    int get_data(int val);
 
    /**
     * 停止bus
     * 
     * @return 0 成功, 其他值 失败的错误码
    */
    int  stop();
    int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag);
    void remove_proc(int val);
 
    /**
     * 获取soket key
     */
    int get_key() ;
 
  void _data_remove(int val);
  void buf_data_set(int data, int val);
  void buf_data_remove(int data);
 
};
 
#endif