#include "bipc.h"
|
#include <nng/protocol/pipeline0/pull.h>
|
#include <nng/protocol/pipeline0/push.h>
|
#include <nng/protocol/survey0/survey.h>
|
#include <nng/protocol/survey0/respond.h>
|
#include <nng/protocol/pubsub0/pub.h>
|
#include <nng/protocol/pubsub0/sub.h>
|
#include <nng/protocol/pair0/pair.h>
|
#include <nng/protocol/bus0/bus.h>
|
#include <nng/protocol/reqrep0/rep.h>
|
#include <nng/protocol/reqrep0/req.h>
|
|
void
|
static fatal(const char *func, int rv)
|
{
|
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
|
exit(1);
|
}
|
|
int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod) {
|
|
int rv;
|
switch(mod) {
|
case PULL_PUSH:
|
rv = nng_pull0_open(sock);
|
break;
|
case PAIR:
|
rv = nng_pair0_open(sock);
|
break;
|
case BUS:
|
rv = nng_bus0_open(sock);
|
break;
|
case REQ_REP:
|
rv = nng_rep0_open(sock);
|
break;
|
case SURVEY:
|
rv = nng_surveyor0_open(sock);
|
break;
|
case PUB_SUB:
|
rv = nng_pub0_open(sock);
|
break;
|
default:
|
fprintf(stderr, "无法识别的模式");
|
return -1;
|
}
|
|
if (rv != 0) {
|
// fatal("open", rv);
|
return rv;
|
}
|
|
if ((rv = nng_listen(*sock, url, NULL, 0)) != 0) {
|
// fatal("nng_listen", rv);
|
return rv;
|
}
|
return 0;
|
|
}
|
|
int bipc_connect(bipc_socket_t *sock, const char *url, bipc_mod_t mod) {
|
int rv = 0 ;
|
switch(mod) {
|
case PULL_PUSH:
|
rv = nng_push0_open(sock);
|
break;
|
case PAIR:
|
rv = nng_pair0_open(sock);
|
break;
|
case BUS:
|
break;
|
case REQ_REP:
|
rv = nng_req0_open(sock);
|
break;
|
case SURVEY:
|
rv = nng_respondent0_open(sock);
|
break;
|
case PUB_SUB:
|
if ((rv = nng_sub0_open(sock)) != 0) {
|
//fatal("sub0_open", rv);
|
return rv;
|
}
|
if ((rv = nng_setopt(*sock, NNG_OPT_SUB_SUBSCRIBE, "", 0)) != 0) {
|
//fatal("nng_setopt", rv);
|
return rv;
|
}
|
break;
|
default:
|
fprintf(stderr, "无法识别的模式");
|
return rv;
|
}
|
|
if (rv != 0) {
|
fatal("bipc_connect open socket", rv);
|
return rv;
|
}
|
|
if ((rv = nng_dial(*sock, url, NULL, 0)) != 0) {
|
// fatal("dial", rv);
|
return rv;
|
}
|
return 0;
|
}
|
|
int bipc_send(bipc_socket_t *sock, const void *data, size_t size) {
|
return nng_send(*sock, const_cast<void *>(data), size, 0);
|
}
|
|
|
int bipc_recv(bipc_socket_t *sock, void *data, size_t *sizep) {
|
|
//int rv = nng_recv(*sock, data, sizep, 0);
|
// char *buf = NULL;
|
// int rv = nng_recv(*sock, &buf, sizep, NNG_FLAG_ALLOC);
|
// memcpy(data, buf, *sizep);
|
// nng_free(buf, *sizep);
|
int rv = nng_recv(*sock, data, sizep, NNG_FLAG_ALLOC);
|
if (rv == NNG_ETIMEDOUT)
|
return BIPC_ETIMEDOUT ;
|
|
return rv;
|
}
|
|
int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) {
|
const char *tmp_opt;
|
if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) {
|
tmp_opt = NNG_OPT_RECVTIMEO;
|
}
|
return nng_setopt(*s, tmp_opt, val, valsz);
|
}
|
|
void bipc_free(void *ptr, size_t size) {
|
nng_free(ptr, size);
|
}
|
|
|
int bipc_close(bipc_socket_t *s){
|
return nng_close(*s);
|
}
|
|
|
|
const char * bipc_strerror(int error) {
|
return nng_strerror(error);
|
}
|