wangzhengquan
2020-09-10 591aacee97f4a6486631c38a6b418e20b2c4109c
test/nng/pubsub.c
@@ -4,16 +4,17 @@
#include <time.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#define SERVER "server"
#define CLIENT "client"
void
fatal(const char *func)
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
        fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
}
char *
@@ -29,21 +30,22 @@
int
server(const char *url)
{
        int sock;
        nng_socket sock;
        int rv;
        char buf[1024];
        if ((sock = nn_socket(AF_SP, NN_PUB)) < 0) {
                fatal("nn_socket");
        if ((rv = nng_pub0_open(&sock)) != 0) {
                fatal("nng_pub0_open", rv);
        }
          if (nn_bind(sock, url) < 0) {
                fatal("nn_bind");
        if ((rv = nng_listen(sock, url, NULL, 0)) < 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *d = date();
                int sz_d = strlen(d) + 1; // '\0' too
                snprintf(buf, 1024, "time:%s", d);
                printf("SERVER: PUBLISHING DATE %s\n", d);
                int bytes = nn_send(sock, d, sz_d, 0);
                if (bytes < 0) {
                        fatal("nn_send");
                if ((rv = nng_send(sock, buf, strlen(buf) + 1, 0)) != 0) {
                        fatal("nng_send", rv);
                }
                sleep(1);
        }
@@ -52,27 +54,28 @@
int
client(const char *url, const char *name)
{
        int sock;
        nng_socket sock;
        int rv;
        if ((sock = nn_socket(AF_SP, NN_SUB)) < 0) {
                fatal("nn_socket");
        if ((rv = nng_sub0_open(&sock)) != 0) {
                fatal("nng_sub0_open", rv);
        }
        // subscribe to everything ("" means all topics)
        if (nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
                fatal("nn_setsockopt");
        // subscribe to everything (empty means all topics)
        if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "nnnnnnnnn", 0)) != 0) {
                fatal("nng_setopt", rv);
        }
        if (nn_connect(sock, url) < 0) {
                fatal("nn_connet");
        if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
                fatal("nng_dial", rv);
        }
        for (;;) {
                char *buf = NULL;
                int bytes = nn_recv(sock, &buf, NN_MSG, 0);
                if (bytes < 0) {
                        fatal("nn_recv");
                size_t sz;
                if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("CLIENT (%s): RECEIVED %s\n", name, buf); 
                nn_freemsg(buf);
                nng_free(buf, sz);
        }
}