wangzhengquan
2020-07-17 5e3e6719f7d7922decdc16d2313baf2e94210750
pub_sub finished
1个文件已添加
6个文件已修改
109 ■■■■■ 已修改文件
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/mod_socket.c 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_socket.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/pub_sub 补丁 | 查看 | 原始文档 | blame | 历史
test2/pub_sub.c 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/req_rep 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a
Binary files differ
queue/mod_socket.c
@@ -112,14 +112,29 @@
int mod_send(void * _socket, void *buf, int size) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    if(!socket->is_server ) {
        return shm_send(socket->shm_socket, buf, size);
    std::map<int, shm_socket_t* > *clientSocketMap = socket->shm_socket->clientSocketMap;
    std::map<int, shm_socket_t* >::iterator iter;
    int rv;
    if(socket->is_server ) {
        switch(socket->mod) {
            case REQ_REP:
                SemUtil::dec(socket->items);
                rv = shm_send(socket->client_socket, buf, size);
                SemUtil::inc(socket->slots);
                break;
            case PUB_SUB:
                for(iter = clientSocketMap->begin(); iter != clientSocketMap->end(); iter++) {
                    rv = shm_send(iter->second, buf, size);
                }
                break;
            default:
                err_exit(0, "不支持的模式%d", socket->mod);
        }
        return rv;
    }
    else if(socket->mod == REQ_REP) {
        SemUtil::dec(socket->items);
        shm_send(socket->client_socket, buf, size);
        SemUtil::inc(socket->slots);
        return 0;
    else {
        return shm_send(socket->shm_socket, buf, size);
    }
    return -1;
    
@@ -128,22 +143,32 @@
int mod_recv(void * _socket, void **buf, int *size) {
    mod_socket_t * socket = (mod_socket_t *) _socket;
    mod_entry_t entry;
    int rv;
    if(!socket->is_server ) {
    if(socket->is_server ) {
        switch(socket->mod) {
            case REQ_REP:
                SemUtil::dec(socket->slots);
                rv = socket->recvQueue->pop(entry);
                *buf = entry.buf;
                *size = entry.size;
                socket->client_socket = entry.client_socket;
                SemUtil::inc(socket->items);
                break;
            case PUB_SUB:
                rv = 0;
                break;
            default:
                err_exit(0, "不支持的模式%d", socket->mod);
        }
        return rv;
    }
    else {
        return shm_recv(socket->shm_socket, buf, size);
    }
    else if(socket->mod == REQ_REP) {
        SemUtil::dec(socket->slots);
        socket->recvQueue->pop(entry);
        *buf = entry.buf;
        *size = entry.size;
        socket->client_socket = entry.client_socket;
        SemUtil::inc(socket->items);
        return 0;
    }
    return -1;
}
queue/shm_socket.c
@@ -308,6 +308,10 @@
int shm_send(shm_socket_t *socket, void *buf, int size) {
    // hashtable_t *hashtable = mm_get_hashtable();
    if(socket->remoteQueue == NULL) {
        err_msg(errno, "当前客户端无连接!");
        return -1;
    }
    shm_msg_t dest;
    dest.type=SHM_COMMON_MSG;
    dest.port = socket->port;
test2/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    req_rep
PROGS =    req_rep pub_sub
build: $(PROGS)
test2/pub_sub
Binary files differ
test2/pub_sub.c
@@ -1,41 +1,37 @@
#include "socket.h"
#include "mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
void server(int port) {
    void *socket = shm_open_socket(PUB_SUB);
    shm_bind(socket, port);
    shm_listen(socket);
    void *socket = mod_open_socket(PUB_SUB);
    mod_socket_bind(socket, port);
    mod_listen(socket);
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        sprintf(sendbuf, "pub: %s", recvbuf);
        puts(sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_free(recvbuf);
        printf("请输入发布消息:");
        scanf("%s", sendbuf);
        mod_send(socket, sendbuf, strlen(sendbuf)+1) ;
        free(recvbuf);
    }
    shm_close_socket(socket);
    mod_close_socket(socket);
}
void client(int port) {
    void *socket = shm_open_socket(PUB_SUB);
    shm_connect(socket, port);
    void *socket = mod_open_socket(PUB_SUB);
    mod_connect(socket, port);
    int size;
    void *recvbuf;
    char sendbuf[512];
    sprintf(sendbuf, "sub");
    shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        printf("received sub message: %s\n", (char *)recvbuf);
        shm_free(recvbuf);
    while(mod_recv(socket, &recvbuf, &size) == 0) {
        printf("收到订阅消息: %s\n", (char *)recvbuf);
        free(recvbuf);
    }
    shm_close_socket(socket);
    mod_close_socket(socket);
}
int main(int argc, char *argv[]) {
test2/req_rep
Binary files differ