wangzhengquan
2020-07-27 aeaeda81cfe398081a7c1a5c287981c8df974aa2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#include "usg_common.h"
#include "dgram_mod_socket.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include <set>
 
#define ACTION_LIDENTIFIER "<**"
#define ACTION_RIDENTIFIER "**>"
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
 
 
static Logger logger = LoggerFactory::getLogger();
#define BUS_MAP_KEY 1
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<SHMString, SHMKeySet *> > > SHMTopicSubMap;
 
typedef struct dgram_mod_socket_t {
  shm_socket_t *shm_socket;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
} dgram_mod_socket_t;
 
static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
void * run_pubsub_proxy(dgram_mod_socket_t * socket) ;
 
void *dgram_mod_open_socket() {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
    // socket->mod = (socket_mod_t)mod;
    socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    return (void *)socket;
}
 
 
int dgram_mod_close_socket(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
 
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            delete subscripter_set;
        }
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
    
 
    shm_close_socket(socket->shm_socket);
    free(_socket);
}
 
 
int dgram_mod_bind(void * _socket, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return  shm_socket_bind(socket->shm_socket, port);
}
 
 
int dgram_mod_force_bind(void * _socket, int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_socket_force_bind(socket->shm_socket, port);
}
 
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendto(socket->shm_socket, buf, size, port);
 
}
 
int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
 
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
// printf("dgram_mod_recvfrom  before\n");
    int rv = shm_recvfrom(socket->shm_socket, buf, size, port);
// printf("dgram_mod_recvfrom  after\n");
    return rv;
}
 
 
 
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size);
 
}
 
 
 
int dgram_mod_get_port(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->shm_socket->port;
}
 
 
void dgram_mod_free(void *buf) {
    free(buf);
}
 
int  dgram_mod_start_bus(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
printf("mem_pool_malloc_by_key before\n");
    // void *map_ptr = mem_pool_malloc_by_key(1, sizeof(SHMTopicSubMap));
    socket->topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
printf("mem_pool_malloc_by_key after\n");
 
    // socket->topic_sub_map = new(map_ptr) SHMTopicSubMap;
 
    //socket->topic_sub_map = new SHMTopicSubMap;
    run_pubsub_proxy(socket);
    // pthread_t tid;
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
    return 0;
 
}
 
/**
 * @port 总线端口
 */
int  dgram_mod_sub(void * _socket, void *topic, int size, int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    char buf[8192];
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port);
}
 
/**
 * @port 总线端口
 */
int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
 
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(socket->shm_socket, buf, head_len+content_size, port);
 
}
 
 
//==========================================================================================================================
 
/*
 * 处理订阅
*/
void _proxy_sub(dgram_mod_socket_t *socket, char *topic, int port) {
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
 
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
 
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
        void *set_ptr = mm_malloc(sizeof(SHMKeySet));
        subscripter_set = new(set_ptr) SHMKeySet;
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(port);
}
 
/*
 * 处理发布,代理转发
*/
void _proxy_pub(dgram_mod_socket_t * socket, char *topic, size_t head_len, void *buf, size_t size, int port) {
    SHMTopicSubMap *topic_sub_map = socket->topic_sub_map;
    SHMKeySet *subscripter_set;
 
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
 
    std::vector<int> subscripter_to_del;
    std::vector<int>::iterator vector_iter;
 
    int send_port;
    struct timespec timeout = {1,0};
 
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_port = *set_iter;
 printf("_proxy_pub send before %d \n", send_port);
            if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_port);
            } else {
printf("_proxy_pub send after: %d \n", send_port);
            }
 
            
        }
 
        // 删除已关闭的端
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
            if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
                subscripter_set->erase(set_iter);
                printf("remove closed subscripter %d \n", send_port);
            }
        }
        subscripter_to_del.clear();
 
    }
}
 
void * run_pubsub_proxy(dgram_mod_socket_t * socket) {
    // pthread_detach(pthread_self());
    int size;
    int port;
    char * action, *topic, *topics, *buf;
    size_t head_len;
 
    const char *topic_delim = ",";
printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) {
printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = trim(strtok(topics, topic_delim), NULL);
              while(topic) {
           _proxy_sub(socket, topic, port);
            topic = trim(strtok(NULL, topic_delim), NULL);
              }
 
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(socket, topics, head_len, buf, size, port);
            }
            
            free(action);
            free(topics);
        } else {
            err_msg(0, "incorrect format msg");
        }
        free(buf);
    }
    return NULL;
}
 
 
/**
 * @str "<**sub**>{经济}"
 */
 
static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
 char *ptr = str;
 char *str_end_ptr = str + size;
 char *action_start_ptr;
 char *action_end_ptr;
 size_t action_len = 0;
 
 char *topic_start_ptr;
 char *topic_end_ptr;
 size_t topic_len = 0;
 
 // if (strlen(identifier) > strlen(str)) {
 //  return 0;
 // }
 
 if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
  ptr += strlen(ACTION_LIDENTIFIER);
  action_start_ptr = ptr;
  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
    if(ptr >= str_end_ptr) {
      return 0;
    }
  }
// printf("%s\n", ptr);
  action_end_ptr = ptr;
  action_len = action_end_ptr - action_start_ptr;
  ptr += strlen(ACTION_RIDENTIFIER);
// printf("%s\n", ptr);
// printf("%s\n", str_end_ptr-1);
  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
   
 
    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
      if(ptr >= str_end_ptr) {
        return 0;
      }
    }
    topic_end_ptr = ptr;
    topic_len = topic_end_ptr - topic_start_ptr;
    
    ptr += strlen(TOPIC_RIDENTIFIER);
   
  } else {
    return 0;
  }
 } else {
  return 0;
 }
 
 char *topic = (char *)calloc(1, topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *_topic = topic;
 
 char *action = (char *)calloc(1, action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *_action = action;
 *head_len = ptr-str;
 
 return 1;
}