wangzhengquan
2021-02-05 e15d6fe15898caab7180b2065fea3382ecabd3e0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#ifndef __DMODE_SOCKET_H__
#define __DMODE_SOCKET_H__
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include <set>
#include "socket_def.h"
 
#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t))
class BusServerSocket;
 
struct bus_head_t
{
    char action[64];
    uint32_t topic_size;
    uint32_t content_size;
};
 
 
class ShmModSocket {
friend class BusServerSocket;
private:
    shm_socket_t *shm_socket;
  
    std::set<int> *bus_set;
 
private:
     
     
 
    static int get_bus_sendbuf(bus_head_t &request_head, const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf);
 
public:
    // static size_t remove_keys(int keys[], size_t length);
    // static size_t remove_keys_exclude(int keys[], size_t length);
 
  // bus header 编码为网络传输的字节
  static void * encode_bus_head(bus_head_t & bushead);
  // 解码 bus  header
  static bus_head_t  decode_bus_head(void *headbs); 
  
public:
    ShmModSocket();
    ~ShmModSocket();
     
    int stop();
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int key);
 
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int key);
    /**
     * 发送信息
     * @key 发送给谁
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     * @return 0 成功, 其他值 失败的错误码
     */
 
    int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
 
 
    /**
     * 接收信息
     * @key 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
 
    int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
 
    /**
     * 发送请求信息并等待接收应答
     * @key 发送给谁
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     * @return 0 成功, 其他值 失败的错误码
    */
 
    int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,
     const struct timespec *timeout = NULL, int flag = 0);
 
 
    /**
     * 
     */
  int recvandsend( recvandsend_callback_fn callback, const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL);
 
    /**
     * 订阅指定主题
     * @topic 主题
     * @size 主题长度
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  sub(const char *topic, int size, int key,  const struct timespec *timeout = NULL, int flag = 0);
 
 
     /**
     * 取消订阅指定主题
      * @topic 主题,主题为空时取消全部订阅
     * @size 主题长度
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int desub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0);
 
    /**
     * 发布主题
     * @topic 主题
     * @content 主题内容
     * @key 总线端口
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     */
    int  pub(const char *topic, int topic_size, const void *content, int content_size, int key, const  struct timespec *timeout = NULL, int flag = 0);
 
 
    /**
     * 获取soket key
     */
    int get_key() ;
 
 
};
 
#endif