wangzhengquan
2020-06-16 27a32410481fc10e789315b3a1dab88a33020270
finished bipc
1个文件已删除
12个文件已添加
12个文件已修改
446 ■■■■■ 已修改文件
bipc/Makefile 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/bipc.c 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/bipc.h 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/core 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_bus 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_bus.c 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_bus.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pair 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pair.c 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pair.sh 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pubsub 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pullpush 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pullpush.c 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_pullpush.sh 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_reqrep 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_reqrep.c 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_reqrep.sh 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey 补丁 | 查看 | 原始文档 | blame | 历史
bipc/test_survey.c 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
device/libnetdisk.a 补丁 | 查看 | 原始文档 | blame | 历史
service/netdisk_service 补丁 | 查看 | 原始文档 | blame | 历史
service/test 补丁 | 查看 | 原始文档 | blame | 历史
service/test_client 补丁 | 查看 | 原始文档 | blame | 历史
service/test_properties 补丁 | 查看 | 原始文档 | blame | 历史
service/test_queue 补丁 | 查看 | 原始文档 | blame | 历史
bipc/Makefile
@@ -16,7 +16,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
 
PROGS = test_survey test_survey2 test_pubsub
PROGS = test_survey test_survey2 test_pubsub test_pullpush test_pair test_bus test_reqrep
build: $(PROGS)
@@ -26,6 +26,14 @@
 
test_pubsub:  test_pubsub.c bipc.c
test_pullpush:  test_pullpush.c bipc.c
test_pair: test_pair.c bipc.c
test_bus:  test_bus.c bipc.c
test_reqrep: test_reqrep.c bipc.c
clean:
    rm -f $(TEMPFILES) $(PROGS)
bipc/bipc.c
@@ -106,7 +106,7 @@
}
int bipc_recv(nng_socket *sock,  void *data, size_t *sizep) {
int bipc_recv(bipc_socket_t *sock,  void *data, size_t *sizep) {
    
    //int rv = nng_recv(*sock, data, sizep, 0);
    // char *buf = NULL;
@@ -120,12 +120,24 @@
    return rv;
}
int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz) {
    const char *tmp_opt;
    if(strcmp(opt, BIPC_OPT_RECVTIMEO) == 0) {
        tmp_opt = NNG_OPT_RECVTIMEO;
    }
    return nng_setopt(*s, tmp_opt, val, valsz);
}
void bipc_free(void *ptr, size_t size) {
     nng_free(ptr, size);
}
int bipc_close(bipc_socket_t *s){
    return nng_close(*s);
}
const char * bipc_strerror(int error) {
    return nng_strerror(error);
bipc/bipc.h
@@ -23,6 +23,27 @@
};
#define BIPC_OPT_SOCKNAME      "socket-name"
#define BIPC_OPT_RAW           "raw"
#define BIPC_OPT_PROTO         "protocol"
#define BIPC_OPT_PROTONAME     "protocol-name"
#define BIPC_OPT_PEER          "peer"
#define BIPC_OPT_PEERNAME      "peer-name"
#define BIPC_OPT_RECVBUF       "recv-buffer"
#define BIPC_OPT_SENDBUF       "send-buffer"
#define BIPC_OPT_RECVFD        "recv-fd"
#define BIPC_OPT_SENDFD        "send-fd"
#define BIPC_OPT_RECVTIMEO     "recv-timeout"
#define BIPC_OPT_SENDTIMEO     "send-timeout"
#define BIPC_OPT_LOCADDR       "local-address"
#define BIPC_OPT_REMADDR       "remote-address"
#define BIPC_OPT_URL           "url"
#define BIPC_OPT_MAXTTL        "ttl-max"
#define BIPC_OPT_RECVMAXSZ     "recv-size-max"
#define BIPC_OPT_RECONNMINT    "reconnect-time-min"
#define BIPC_OPT_RECONNMAXT    "reconnect-time-max"
typedef nng_socket bipc_socket_t;
int bipc_listen(bipc_socket_t *sock, const char *url, bipc_mod_t mod);
@@ -31,8 +52,12 @@
int bipc_send(bipc_socket_t *sock,  const void *data, size_t size);
int bipc_recv(nng_socket *sock,  void *data, size_t *sizep);
int bipc_recv(bipc_socket_t *sock,  void *data, size_t *sizep);
void bipc_free(void *ptr, size_t size) ;
int bipc_setopt(bipc_socket_t *s, const char *opt, const void *val, size_t valsz);
void bipc_free(void *ptr, size_t size);
int bipc_close(bipc_socket_t *s);
const char * bipc_strerror(int error);
bipc/core
Binary files differ
bipc/test_bus
Binary files differ
bipc/test_bus.c
New file
@@ -0,0 +1,74 @@
#include "bipc.h"
void
fatal(const char *func, int rv)
{
  fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
  exit(1);
}
int
node(int argc, char **argv)
{
  bipc_socket_t sock;
  int rv;
  size_t sz;
  if ((rv = bipc_listen(&sock, argv[2], BUS)) != 0)
  {
    fatal("nng_listen", rv);
  }
  sleep(1); // wait for peers to bind
  if (argc >= 3)
  {
    for (int x = 3; x < argc; x++)
    {
      if ((rv = bipc_connect(&sock, argv[x], BUS)) != 0)
      {
        fatal("nng_dial", rv);
      }
    }
  }
  sleep(1); // wait for connects to establish
  // SEND
  sz = strlen(argv[1]) + 1; // '\0' too
  printf("%s: SENDING '%s' ONTO BUS\n", argv[1], argv[1]);
  if ((rv = bipc_send(&sock, argv[1], sz)) != 0)
  {
    fatal("nng_send", rv);
  }
  // RECV
  for (;;)
  {
    char *buf = NULL;
    size_t sz;
    if ((rv = bipc_recv(&sock, &buf, &sz)) != 0)
    {
      if (rv == BIPC_ETIMEDOUT)
      {
        fatal("nng_recv", rv);
      }
    }
    printf("%s: RECEIVED '%s' FROM BUS\n", argv[1], buf);
    bipc_free(buf, sz);
  }
  bipc_close(&sock);
  return (0);
}
int
main(int argc, char **argv)
{
  if (argc >= 3)
  {
    return (node(argc, argv));
  }
  fprintf(stderr, "Usage: bus <NODE_NAME> <URL> <URL> ...\n");
  return 1;
}
bipc/test_bus.sh
New file
@@ -0,0 +1,6 @@
./test_bus node0 ipc:///tmp/node0.ipc ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc & node0=$!
./test_bus node1 ipc:///tmp/node1.ipc ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node1=$!
./test_bus node2 ipc:///tmp/node2.ipc ipc:///tmp/node3.ipc & node2=$!
./test_bus node3 ipc:///tmp/node3.ipc ipc:///tmp/node0.ipc & node3=$!
sleep 5
kill $node0 $node1 $node2 $node3
bipc/test_pair
Binary files differ
bipc/test_pair.c
New file
@@ -0,0 +1,93 @@
#include "bipc.h"
#define NODE0 "node0"
#define NODE1 "node1"
void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
        exit(1);
}
int
send_name(bipc_socket_t sock, const char *name)
{
        int rv;
        printf("%s: SENDING \"%s\"\n", name, name);
        if ((rv = bipc_send(&sock, name, strlen(name) + 1)) != 0) {
                fatal("bipc_send", rv);
        }
        return (rv);
}
int
recv_name(bipc_socket_t sock, const char *name)
{
        char *buf = NULL;
        int rv;
        size_t sz;
        if ((rv = bipc_recv(&sock, &buf, &sz)) == 0) {
                printf("%s: RECEIVED \"%s\"\n", name, buf);
                nng_free(buf, sz);
        }
        return (rv);
}
int
send_recv(bipc_socket_t sock, const char *name)
{
    int rv;
    int timeout = 100;
    // if ((rv = nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 100)) != 0) {
    if ((rv = bipc_setopt(&sock, BIPC_OPT_RECVTIMEO, &timeout, sizeof(int))) != 0) {
            fatal("nng_setopt_ms", rv);
    }
    for (;;) {
            recv_name(sock, name);
            sleep(1);
            send_name(sock, name);
    }
}
int
node0(const char *url)
{
    bipc_socket_t sock;
    int rv;
    // if ((rv = nng_pair0_open(&sock)) != 0) {
    //         fatal("nng_pair0_open", rv);
    // }
     if ((rv = bipc_listen(&sock, url, PAIR)) !=0) {
            fatal("nng_listen", rv);
    }
    return (send_recv(sock, NODE0));
}
int
node1(const char *url)
{
    bipc_socket_t sock;
    int rv;
    sleep(1);
    // if ((rv = nng_pair0_open(&sock)) != 0) {
    //         fatal("nng_pair0_open", rv);
    // }
    if ((rv = bipc_connect(&sock, url, PAIR)) != 0) {
            fatal("nng_dial", rv);
    }
    return (send_recv(sock, NODE1));
}
int
main(int argc, char **argv)
{
    if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
            return (node0(argv[2]));
    if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
            return (node1(argv[2]));
    fprintf(stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", NODE0, NODE1);
    return 1;
}
bipc/test_pair.sh
New file
@@ -0,0 +1,4 @@
./test_pair node0 ipc:///tmp/test_pair.ipc & node0=$!
./test_pair node1 ipc:///tmp/test_pair.ipc & node1=$!
sleep 4
kill $node0 $node1
bipc/test_pubsub
Binary files differ
bipc/test_pullpush
Binary files differ
bipc/test_pullpush.c
New file
@@ -0,0 +1,71 @@
#include "bipc.h"
#define NODE0 "node0"
#define NODE1 "node1"
void
fatal(const char *func, int rv)
{
        fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
        exit(1);
}
int
node0(const char *url)
{
        nng_socket sock;
        int rv;
        // if ((rv = nng_pull0_open(&sock)) != 0) {
        //         fatal("nng_pull0_open", rv);
        // }
        if ((rv = bipc_listen(&sock, url, PULL_PUSH)) != 0) {
                fatal("nng_listen", rv);
        }
        for (;;) {
                char *buf = NULL;
                size_t sz;
                if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
                        fatal("nng_recv", rv);
                }
                printf("NODE0: RECEIVED \"%s\"\n", buf);
                bipc_free(buf, sz);
        }
}
int
node1(const char *url, char *msg)
{
        int sz_msg = strlen(msg) + 1; // '\0' too
        nng_socket sock;
        int rv;
        int bytes;
        // if ((rv = nng_push0_open(&sock)) != 0) {
        //         fatal("nng_push0_open", rv);
        // }
        if ((rv = bipc_connect(&sock, url, PULL_PUSH)) != 0) {
                fatal("nng_dial", rv);
        }
        printf("NODE1: SENDING \"%s\"\n", msg);
        if ((rv = bipc_send(&sock, msg, strlen(msg)+1)) != 0) {
                fatal("nng_send", rv);
        }
        sleep(1); // wait for messages to flush before shutting down
        bipc_close(&sock);
        return (0);
}
int
main(int argc, char **argv)
{
        if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
                return (node0(argv[2]));
        if ((argc > 2) && (strcmp(NODE1, argv[1]) == 0))
                return (node1(argv[2], argv[3]));
        fprintf(stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
                NODE0, NODE1);
        return (1);
}
bipc/test_pullpush.sh
New file
@@ -0,0 +1,4 @@
./test_pullpush node0 ipc:///tmp/test_pullpush.ipc & node0=$! && sleep 1
./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Hello, World!"
./test_pullpush node1 ipc:///tmp/test_pullpush.ipc "Goodbye."
kill $node0
bipc/test_reqrep
Binary files differ
bipc/test_reqrep.c
New file
@@ -0,0 +1,92 @@
#include "bipc.h"
#define NODE0 "node0"
#define NODE1 "node1"
#define DATE "DATE"
void
fatal(const char *func, int rv)
{
    fprintf(stderr, "%s: %s\n", func, bipc_strerror(rv));
    exit(1);
}
char *
date(void)
{
    time_t now = time(&now);
    struct tm *info = localtime(&now);
    char *text = asctime(info);
    text[strlen(text)-1] = '\0'; // remove '\n'
    return (text);
}
int
node0(const char *url)
{
    bipc_socket_t sock;
    int rv;
    // if ((rv = nng_rep0_open(&sock)) != 0) {
    //         fatal("nng_rep0_open", rv);
    // }
    if ((rv = bipc_listen(&sock, url, REQ_REP)) != 0) {
        fatal("nng_listen", rv);
    }
    for (;;) {
        char *buf = NULL;
        size_t sz;
        if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
                fatal("nng_recv", rv);
        }
        if ((sz == (strlen(DATE) + 1)) && (strcmp(DATE, buf) == 0)) {
            printf("NODE0: RECEIVED DATE REQUEST\n");
            char *d = date();
            printf("NODE0: SENDING DATE %s\n", d);
            if ((rv = bipc_send(&sock, d, strlen(d) + 1)) != 0) {
                    fatal("nng_send", rv);
            }
        }
        bipc_free(buf, sz);
    }
}
int
node1(const char *url)
{
    bipc_socket_t sock;
    int rv;
    size_t sz;
    char *buf = NULL;
    // if ((rv = nng_req0_open(&sock)) != 0) {
    //         fatal("bipc_socket_t", rv);
    // }
    if ((rv = bipc_connect(&sock, url, REQ_REP)) != 0) {
            fatal("nng_dial", rv);
    }
    printf("NODE1: SENDING DATE REQUEST %s\n", DATE);
    if ((rv = bipc_send(&sock, DATE, strlen(DATE)+1)) != 0) {
            fatal("nng_send", rv);
    }
    if ((rv = bipc_recv(&sock, &buf, &sz)) != 0) {
            fatal("nng_recv", rv);
    }
    printf("NODE1: RECEIVED DATE %s\n", buf);
    bipc_free(buf, sz);
    bipc_close(&sock);
    return (0);
}
int
main(const int argc, const char **argv)
{
    if ((argc > 1) && (strcmp(NODE0, argv[1]) == 0))
        return (node0(argv[2]));
    if ((argc > 1) && (strcmp(NODE1, argv[1]) == 0))
        return (node1(argv[2]));
    fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", NODE0, NODE1);
    return (1);
}
bipc/test_reqrep.sh
New file
@@ -0,0 +1,3 @@
./test_reqrep node0 ipc:///tmp/test_reqrep.ipc & node0=$! && sleep 1
./test_reqrep node1 ipc:///tmp/test_reqrep.ipc
kill $node0
bipc/test_survey
Binary files differ
bipc/test_survey.c
@@ -26,35 +26,35 @@
int
server(const char *url)
{
        bipc_socket_t sock;
        int rv;
    bipc_socket_t sock;
    int rv;
        if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
            fatal("nng_listen", rv);
    if ((rv = bipc_listen(&sock, url, SURVEY)) != 0) {
        fatal("nng_listen", rv);
    }
    for (;;) {
        printf("SERVER: SENDING DATE SURVEY REQUEST\n");
        if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
            fatal("nng_send", rv);
        }
        for (;;) {
            printf("SERVER: SENDING DATE SURVEY REQUEST\n");
            if ((rv = bipc_send(&sock, DATE, strlen(DATE) + 1)) != 0) {
                fatal("nng_send", rv);
            char *buf = NULL;
            size_t sz;
            rv = bipc_recv(&sock, &buf, &sz);
            if (rv == BIPC_ETIMEDOUT) {
                break;
            }
            for (;;) {
                char *buf = NULL;
                size_t sz;
                rv = bipc_recv(&sock, &buf, &sz);
                if (rv == BIPC_ETIMEDOUT) {
                    break;
                }
                if (rv != 0) {
                    fatal("nng_recv", rv);
                }
                printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
                bipc_free(buf, sz);
            if (rv != 0) {
                fatal("nng_recv", rv);
            }
            printf("SERVER: SURVEY COMPLETE\n");
            printf("SERVER: RECEIVED \"%s\" SURVEY RESPONSE\n", buf);
            bipc_free(buf, sz);
        }
        printf("SERVER: SURVEY COMPLETE\n");
    }
}
int
device/libnetdisk.a
Binary files differ
service/netdisk_service
Binary files differ
service/test
Binary files differ
service/test_client
Binary files differ
service/test_properties
Binary files differ
service/test_queue
Binary files differ