wangzhengquan
2020-11-26 72b7aebb0022f8e391c999348763acd5f7a16133
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#ifndef __DMODE_SOCKET_H__
#define __DMODE_SOCKET_H__
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include <set>
 
#define ACTION_LIDENTIFIER "<**"
#define ACTION_RIDENTIFIER "**>"
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
 
static Logger *logger = LoggerFactory::getLogger();
#define BUS_MAP_KEY 1
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
 
enum socket_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
    
};
 
class ShmModSocket {
private:
    shm_socket_t *shm_socket;
  socket_mod_t mod;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
    std::set<int> *bus_set;
 
private:
    inline int _recvfrom_(void **buf, int *size, int *port,  struct timespec *timeout, int flags);
    void _proxy_sub( char *topic, int port);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
 
    void _proxy_desub( char *topic, int port);
    void _proxy_desub_all(int port);
    int  _desub_( char *topic, int size, int port, struct timespec *timeout, int flags);
 
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    static bool include_in_keys(int key, int keys[], size_t length);
    static size_t remove_subscripters(int keys[], size_t length) ;
public:
    static size_t remove_keys(int keys[], size_t length);
public:
    ShmModSocket();
    ~ShmModSocket();
     
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int port);
 
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int port);
    /**
     * 发送信息
     * @port 发送给谁
     * @return 0 成功, 其他值 失败的错误码
     */
    int sendto(const void *buf, const int size, const int port);
    // 发送信息超时返回。 @sec 秒 , @nsec 纳秒
    int sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout);
    // 发送信息立刻返回。
    int sendto_nowait(const void *buf, const int size, const int port);
 
    /**
     * 接收信息
     * @port 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
    int recvfrom(void **buf, int *size, int *port);
    // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
    int recvfrom_timeout(void **buf, int *size, int *port,  struct timespec *timeout);
    int recvfrom_nowait(void **buf, int *size, int *port);
 
    /**
     * 发送请求信息并等待接收应答
     * @port 发送给谁
     * @return 0 成功, 其他值 失败的错误码
    */
    int sendandrecv(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
 
 
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
 
 
    /**
     * 启动bus
     * 
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start_bus();
 
    /**
     * 订阅指定主题
     * @topic 主题
     * @size 主题长度
     * @port 总线端口
     */
    int  sub(char *topic, int size, int port);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int  sub_timeout(char *topic, int size, int port,  struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int port);
 
 
     /**
     * 取消订阅指定主题
      * @topic 主题,主题为空时取消全部订阅
     * @size 主题长度
     * @port 总线端口
     */
    int desub( char *topic, int size, int port);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(char *topic, int size, int port, struct timespec *timeout);
    int desub_nowait(char *topic, int size, int port) ;
 
    /**
     * 发布主题
     * @topic 主题
     * @content 主题内容
     * @port 总线端口
     */
    int  pub(char *topic, int topic_size, void *content, int content_size, int port);
    //  超时返回。 @sec 秒 , @nsec 纳秒
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int port,  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int port);
 
 
    /**
     * 获取soket key
     */
    int get_key() ;
 
 
};
 
#endif