wangzhengquan
2020-07-23 b6043642f60ef23a7a100418cd4fec1251a98ad9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include "usg_common.h"
#include "dgram_mod_socket.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
 
typedef struct dgram_mod_socket_t {
    socket_mod_t mod;
  shm_socket_t *shm_socket;
  // pthread_t recv_thread;
    // std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map;
} dgram_mod_socket_t;
 
 
void *dgram_mod_open_socket(int mod) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t));
    socket->mod = (socket_mod_t)mod;
    // socket->recv_thread = 0;
    // socket->recv_queue_map = NULL;
    socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    return (void *)socket;
}
 
 
int dgram_mod_close_socket(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    shm_close_socket(socket->shm_socket);
    // if(socket->recv_queue_map != NULL) {
    //     for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) {
    //         delete iter->second;
    //         socket->recv_queue_map->erase(iter);
            
    //     }
    //     delete socket->recv_queue_map;
    // }
 
 
    // if(socket->recv_thread != 0)
    //     pthread_cancel(socket->recv_thread);
    free(_socket);
}
 
 
int dgram_mod_bind(void * _socket, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return  shm_socket_bind(socket->shm_socket, port);
}
 
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendto(socket->shm_socket, buf, size, port);
 
}
 
int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
 
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    // if(socket->mod == REQ_REP && socket->recv_thread != 0) {
    //     err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time.");
    //     return -1;
    // }
    return shm_recvfrom(socket->shm_socket, buf, size, port);
}
 
// void *_dgram_mod_run_recv(void * _socket) {
//     pthread_detach(pthread_self());
//     dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
//     void *buf;
//     int size;
//     int port;
//     shm_msg_t msg;
//     LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
//     std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
// // printf("==============_dgram_mod_run_recv recv before\n");
//     while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) {
//         if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
//             queue = iter->second;
//         } else {
//             queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
//             socket->recv_queue_map->insert({port, queue});
//         }
 
//         msg.buf = buf;
//         msg.size = size;
//         msg.port = port;
// // printf("==============_dgram_mod_run_recv push before\n");
//         queue->push(msg);
// // printf("==============_dgram_mod_run_recv push after\n");
     
//     }
//     return NULL;
 
 
 
// }
 
 
 
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) {
    
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    if(socket->mod != REQ_REP) {
        err_exit(0, "you can't use this method other than REQ_REP mod!");
    }
    // if(socket->recv_queue_map == NULL) {
    //     socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >;
    // }
 
    // std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter;
    // LockFreeQueue<shm_msg_t, DM_Allocator> *queue;
    // if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) {
    //     queue = iter->second;
    // } else {
    //     queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    //     socket->recv_queue_map->insert({port, queue});
    // }
 
    // if (socket->recv_thread == 0) {
        
    //     pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket);
        
    // }
 
    // shm_msg_t msg;
    // if(queue->pop(msg)) {
    //     *recv_buf = msg.buf;
    //     *recv_size = msg.size;
    //     return 0;
    // }
 
    int recv_port;
    int rv;
 
    shm_socket_t *shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    if (shm_sendto(shm_socket, send_buf, send_size, send_port) == 0) {
        rv = shm_recvfrom(shm_socket, recv_buf, recv_size, &recv_port);
        shm_close_socket(shm_socket);
        return rv;
    }
    
    
    return -1;
 
}