fujuntang
2021-09-23 82b028cf63953d8080b63d85468eae488d212194
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#ifndef _BUS_SERVER_SOCKET_H_
#define _BUS_SERVER_SOCKET_H_
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "shm_mm.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include "socket_def.h"
#include <set>
 
 
 
 
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
 
typedef struct _LinkNode
{
  int data;
  int data_fix;
  int count;
    
  _LinkNode *next; 
} LinkNode;
 
class list
{
 
private:
 
  LinkNode *head;
 
public:
 
  list() {head = NULL;};
  
  void Insert(int aDate, int bDate);
  
  void Delete(int Data);
  
  int dataFixGet(int data);
  
  int dataGet(int data);
  
  void dataSet(int data, int val);
  
  int NodeNum(void);
 
  int nodeGet(int index);
  
  LinkNode *getHead() {return head;};
  
};
 
class BusServerSocket {
private:
    shm_socket_t *shm_socket;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
 
private:
    int  destroy();
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, char *buf, size_t size, int key);
    int _run_proxy_();
    // int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
      
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
 
  void _proxy_reg(const char *topic, size_t topic_size, const char *content, size_t content_size, int key, int flag);
 
  static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    // static bool include_in_keys(int key, int keys[], size_t length);
 
public:
    static size_t remove_subscripters(int keys[], size_t length) ;
    
public:
    BusServerSocket();
    ~BusServerSocket();
     
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int key);
 
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int key);
     
 
    /**
     * 启动bus
     * 
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start();
    int get_data(int val);
 
    /**
     * 停止bus
     * 
     * @return 0 成功, 其他值 失败的错误码
    */
    int  stop();
    int check_proc(int val, const void *buf, int len, void **buf_ret, int *len_ret, \
                          const struct timespec *timeout, const int flag);
    void remove_proc(int val);
 
    /**
     * 获取soket key
     */
    int get_key() ;
 
  void _data_remove(int val);
 
};
 
#endif