wangzhengquan
2020-07-16 1a7a9bdc976e4496d739bc57053e613f993bd85b
update
1个文件已删除
5个文件已添加
8个文件已修改
453 ■■■■■ 已修改文件
queue/Makefile 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.c 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/hashtable.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/shm_queue_wrapper.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/include/socket.h 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
queue/libshm_queue.so 补丁 | 查看 | 原始文档 | blame | 历史
queue/shm_queue_wrapper.c 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/socket.c 234 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/pub_sub.c 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test2/req_rep 补丁 | 查看 | 原始文档 | blame | 历史
test2/req_rep.c 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
queue/Makefile
@@ -22,6 +22,12 @@
MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
ifeq ($(DEBUG),y)
  MYLIBS = $(LIBSQUEUE)
else
  MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
endif
all: build
 
queue/hashtable.c
@@ -15,8 +15,6 @@
  TAILQ_ENTRY(tailq_entry_t) joint;
} tailq_entry_t;
static int hashtable_mutex;
#define START_KEY 1000
typedef  TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
@@ -56,7 +54,7 @@
}
void* _hashtable_put(hashtable_t *hashtable, int key, void *value)
void * _hashtable_put(hashtable_t *hashtable, int key, void *value)
{
  size_t code = hashcode(key);
  void *oldvalue;
@@ -209,7 +207,7 @@
   return res;
}
void* hashtable_put(hashtable_t *hashtable, int key, void *value) {
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  SemUtil::dec(hashtable->mutex);
  while (hashtable->readcnt > 0)
  {
@@ -262,4 +260,4 @@
    }
  }
  return keyset;
}
}
queue/hashtable.h
@@ -19,7 +19,7 @@
void hashtable_init(hashtable_t *hashtable);
void *hashtable_get(hashtable_t *hashtable, int key);
void* hashtable_put(hashtable_t *hashtable, int key, void *value);
void hashtable_put(hashtable_t *hashtable, int key, void *value);
void *hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
queue/include/shm_queue.h
@@ -64,7 +64,7 @@
    bool found;
    for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
        found = false;
        for(int i = 0; i < length; i++) {
        for(size_t i = 0; i < length; i++) {
            if(*keyItr == keys[i]) {
                found = true;
                break;
queue/include/shm_queue_wrapper.h
@@ -9,18 +9,6 @@
#ifdef __cplusplus
extern "C" {
#endif
/**
 * 初始化共享内存
 * @size 共享内存大小
 *
 */
void shm_init(int size);
/**
 * 销毁共享内存
 * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
 */
void shm_destroy();
 
queue/include/socket.h
New file
@@ -0,0 +1,61 @@
#ifndef __BASIC_SHM_SOCKET_H__
#define __BASIC_SHM_SOCKET_H__
#include "usg_common.h"
#ifdef __cplusplus
extern "C" {
#endif
enum shm_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
};
/**
 * 初始化共享内存
 * @size 共享内存大小, 单位M
 *
 */
void shm_init(int size);
/**
 * 销毁共享内存
 * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。
 */
void shm_destroy();
/**
 * 释放recv方法分配的buf
 */
void shm_free(void *buf);
void *shm_open_socket(int mod);
int shm_close_socket(void *socket) ;
int shm_bind(void* socket, int port) ;
int shm_listen(void* socket) ;
int shm_connect(void* socket, int port);
int shm_send(void *socket, void *buf, int size) ;
int shm_recv(void* socket, void **buf, int *size) ;
#ifdef __cplusplus
}
#endif
#endif
queue/libshm_queue.a
Binary files differ
queue/libshm_queue.so
Binary files differ
queue/shm_queue_wrapper.c
@@ -13,13 +13,6 @@
} shmqueue_t;
void shm_init(int size) {
    mem_pool_init(size);
}
void shm_destroy() {
    mem_pool_destroy();
}
//移除不包含在keys中的队列
void shm_remove_queues_exclue(void *keys, int length) {
queue/socket.c
New file
@@ -0,0 +1,234 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "socket.h"
#include <map>
enum shm_msg_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOKET_CLOSE = 2,
    SHM_COMMON_MSG = 3
};
typedef struct shm_msg_t {
    int port;
    shm_msg_type_t type;
    size_t size;
    void * buf;
} shm_msg_t;
typedef struct shm_socket_t {
    int port;
    shm_mod_t mod;
    SHMQueue<shm_msg_t> *queue;
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
    int slots;
    int items;
    int is_server;
} shm_socket_t;
void shm_init(int size) {
    mem_pool_init(size);
}
void shm_destroy() {
    mem_pool_destroy();
}
void shm_free(void *buf) {
    free(buf);
}
void *shm_open_socket(int mod) {
    shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
    socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >;
    socket->port = -1;
    socket->mod = (shm_mod_t)mod;
printf("mod===%d\n", socket->mod);
    socket->is_server = 0;
    if (mod == REQ_REP) {
        socket->slots = SemUtil::get(IPC_PRIVATE, 1);
        socket->items = SemUtil::get(IPC_PRIVATE, 0);
    }
    return (void *)socket;
}
int shm_close_socket(void *socket) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    delete _socket->queue;
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
    for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
        delete iter->second;
    }
    delete _socket->remoteQueueMap;
    if (_socket->mod == REQ_REP) {
        SemUtil::remove(_socket->slots);
        SemUtil::remove(_socket->items);
    }
    free(socket);
    return 0;
}
int shm_bind(void* socket, int port) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    _socket -> port = port;
    return 0;
}
int shm_listen(void* socket) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    _socket->is_server = 1;
    int  port;
    hashtable_t *hashtable = mm_get_hashtable();
    if(_socket -> port == -1) {
        port = hashtable_alloc_key(hashtable);
        _socket -> port = port;
    } else {
        if(hashtable_get(hashtable, _socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", _socket->port);
        }
    }
    _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
    return 0;
}
static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) {
    shm_msg_t src;
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
    bool rv = _socket->queue->pop(src);
    if (rv) {
        if(src.type=="open")
        if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
         if(_socket->mod == REQ_REP)
              SemUtil::dec(_socket->slots);
          remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
          if(_socket->mod == REQ_REP)
              SemUtil::inc(_socket->items);
        }
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
        *size = src.size;
        mm_free(src.buf);
        return 0;
    } else {
        return 1;
    }
}
int shm_connect(void* socket, int port) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    hashtable_t *hashtable = mm_get_hashtable();
    if(hashtable_get(hashtable, port)== NULL) {
        err_exit(0, "shm_connect:connect at port %d  failed!", port);
    }
    if(_socket -> port == -1) {
        _socket -> port = hashtable_alloc_key(hashtable);
    } else {
        if(hashtable_get(hashtable, _socket->port)!= NULL) {
            err_exit(0, "key %d has already been in used!", _socket->port);
        }
    }
    _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
    _socket->remoteQueueMap->insert({port,  new SHMQueue<shm_msg_t>(port, 0)});
    return 0;
}
int shm_send(void *socket, void *buf, int size) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    hashtable_t *hashtable = mm_get_hashtable();
    shm_msg_t dest;
    dest.port = _socket->port;
    dest.size = size;
    dest.buf = mm_malloc(size);
    memcpy(dest.buf, buf, size);
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
    for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
        if(hashtable_get(hashtable, iter->first)== NULL) {
            err_msg(0, "shm_send:connect at port %d  failed, the other part has been closed!", iter->first);
            delete iter->second;
            remoteQueueMap->erase(iter);
            continue;
        }
        if(_socket->mod == REQ_REP && _socket->is_server == 1)
              SemUtil::dec(_socket->items);
        iter->second->push(dest);
        if( _socket->mod == REQ_REP && _socket->is_server == 1) {
            delete iter->second;
            remoteQueueMap->erase(iter);
            SemUtil::inc(_socket->slots);
        }
    }
    return 0;
}
int shm_recv(void* socket, void **buf, int *size) {
    shm_socket_t * _socket = (shm_socket_t *) socket;
    shm_msg_t src;
    std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
    bool rv = _socket->queue->pop(src);
    if (rv) {
        if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
         if(_socket->mod == REQ_REP)
              SemUtil::dec(_socket->slots);
          remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
          if(_socket->mod == REQ_REP)
              SemUtil::inc(_socket->items);
        }
        void * _buf = malloc(src.size);
        memcpy(_buf, src.buf, src.size);
        *buf = _buf;
        *size = src.size;
        mm_free(src.buf);
        return 0;
    } else {
        return 1;
    }
}
test2/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    test_queue_wrapper server client pub sub
PROGS =    req_rep
build: $(PROGS)
test2/pub_sub.c
New file
@@ -0,0 +1,60 @@
#include "socket.h"
void server(int port) {
    void *socket = shm_open_socket(PUB_SUB);
    shm_bind(socket, port);
    shm_listen(socket);
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        sprintf(sendbuf, "pub: %s", recvbuf);
        puts(sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_free(recvbuf);
    }
    shm_close_socket(socket);
}
void client(int port) {
    void *socket = shm_open_socket(PUB_SUB);
    shm_connect(socket, port);
    int size;
    void *recvbuf;
    char sendbuf[512];
    sprintf(sendbuf, "sub");
    shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        printf("received sub message: %s\n", (char *)recvbuf);
        shm_free(recvbuf);
    }
    shm_close_socket(socket);
}
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  if (argc < 3) {
       fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
       return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0 ) {
     server(port);
  }
  if (strcmp("client", argv[1]) == 0)
     client(port);
 shm_destroy();
 // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
  return 0;
}
test2/req_rep
Binary files differ
test2/req_rep.c
New file
@@ -0,0 +1,59 @@
#include "socket.h"
void server(int port) {
    void *socket = shm_open_socket(REQ_REP);
    shm_bind(socket, port);
    shm_listen(socket);
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        shm_recv(socket, &recvbuf, &size);
        sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
        puts(sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_free(recvbuf);
    }
    shm_close_socket(socket);
}
void client(int port) {
    void *socket = shm_open_socket(REQ_REP);
    shm_connect(socket, port);
    int size;
    void *recvbuf;
    char sendbuf[512];
    while(true) {
        printf("request: ");
        scanf("%s", sendbuf);
        shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
        shm_recv(socket, &recvbuf, &size);
        printf("reply: %s\n", (char *)recvbuf);
        shm_free(recvbuf);
    }
    shm_close_socket(socket);
}
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  if (argc < 3) {
       fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
       return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0 ) {
     server(port);
  }
  if (strcmp("client", argv[1]) == 0)
     client(port);
 shm_destroy();
 // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
  return 0;
}