#include "bipc.h" #include #include #include #include #include #include #include #include #include #include static void fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); exit(1); } int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod) { int rv; switch(mod) { case PULL_PUSH: rv = nng_pull0_open(sock); break; case PAIR: rv = nng_pair0_open(sock); break; case BUS: rv = nng_bus0_open(sock); break; case REQ_REP: rv = nng_rep0_open(sock); break; case SURVEY: rv = nng_surveyor0_open(sock); break; case PUB_SUB: rv = nng_pub0_open(sock); break; default: fprintf(stderr, "无法识别的模式"); return -1; } if (rv != 0) { // fatal("open", rv); return rv; } if ((rv = nng_listen(*sock, url, NULL, 0)) != 0) { // fatal("nng_listen", rv); return rv; } return 0; } int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod) { int rv = 0 ; switch(mod) { case PULL_PUSH: rv = nng_push0_open(sock); break; case PAIR: rv = nng_pair0_open(sock); break; case BUS: break; case REQ_REP: rv = nng_req0_open(sock); break; case SURVEY: rv = nng_respondent0_open(sock); break; case PUB_SUB: if ((rv = nng_sub0_open(sock)) != 0) { //fatal("sub0_open", rv); return rv; } if ((rv = nng_setopt(*sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) { //fatal("nng_setopt", rv); return rv; } break; default: fprintf(stderr, "无法识别的模式"); return rv; } if (rv != 0) { fatal("bipc_connect open socket", rv); return rv; } if ((rv = nng_dial(*sock, url, NULL, 0)) != 0) { // fatal("dial", rv); return rv; } return 0; } int bipc_send(bipc_socket_t *sock, const void *data, size_t size) { return nng_send(*sock, const_cast(data), size, 0); } int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep) { //int rv = nng_recv(*sock, data, sizep, 0); // char *buf = NULL; // int rv = nng_recv(*sock, &buf, sizep, NNG_FLAG_ALLOC); // memcpy(data, buf, *sizep); // nng_free(buf, *sizep); int rv = nng_recv(*sock, data, sizep, NNG_FLAG_ALLOC); if (rv == NNG_ETIMEDOUT) return BIPC_ETIMEDOUT ; return rv; } int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) { const char *tmp_opt; if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) { tmp_opt = NNG_OPT_RECVTIMEO; } return nng_setopt(*s, tmp_opt, val, valsz); } void bipc_free(void *ptr, size_t size) { nng_free(ptr, size); } int bipc_close(bipc_socket_t *s){ return nng_close(*s); } const char * bipc_strerror(int error) { return nng_strerror(error); }