// // Copyright 2020 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2019 Nathan Kent // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include #include #include "core/nng_impl.h" #include "nng/protocol/pubsub0/sub.h" // Subscriber protocol. The SUB protocol receives messages sent to // it from publishers, and filters out those it is not interested in, // only passing up ones that match known subscriptions. #ifndef NNI_PROTO_SUB_V0 #define NNI_PROTO_SUB_V0 NNI_PROTO(2, 1) #endif #ifndef NNI_PROTO_PUB_V0 #define NNI_PROTO_PUB_V0 NNI_PROTO(2, 0) #endif // By default we accept 128 messages. #define SUB0_DEFAULT_RECV_BUF_LEN 128 // By default, prefer new messages when the queue is full. #define SUB0_DEFAULT_PREFER_NEW true typedef struct sub0_pipe sub0_pipe; typedef struct sub0_sock sub0_sock; typedef struct sub0_ctx sub0_ctx; typedef struct sub0_topic sub0_topic; static void sub0_recv_cb(void *); static void sub0_pipe_fini(void *); struct sub0_topic { nni_list_node node; size_t len; void * buf; }; // sub0_ctx is a context for a SUB socket. The advantage of contexts is // that different contexts can maintain different subscriptions. struct sub0_ctx { nni_list_node node; sub0_sock * sock; nni_list topics; // TODO: Consider patricia trie nni_list recv_queue; // can have multiple pending receives nni_lmq lmq; bool prefer_new; }; // sub0_sock is our per-socket protocol private structure. struct sub0_sock { nni_pollable readable; sub0_ctx master; // default context nni_list contexts; // all contexts int num_contexts; size_t recv_buf_len; bool prefer_new; nni_mtx lk; }; // sub0_pipe is our per-pipe protocol private structure. struct sub0_pipe { nni_pipe * pipe; sub0_sock *sub; nni_aio aio_recv; }; static void sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); if (nni_list_active(&ctx->recv_queue, aio)) { nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&sock->lk); } static void sub0_ctx_recv(void *arg, nni_aio *aio) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; nni_msg * msg; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&sock->lk); again: if (nni_lmq_empty(&ctx->lmq)) { int rv; if ((rv = nni_aio_schedule(aio, sub0_ctx_cancel, ctx)) != 0) { nni_mtx_unlock(&sock->lk); nni_aio_finish_error(aio, rv); return; } nni_list_append(&ctx->recv_queue, aio); nni_mtx_unlock(&sock->lk); return; } (void) nni_lmq_getq(&ctx->lmq, &msg); if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { nni_pollable_clear(&sock->readable); } if ((msg = nni_msg_unique(msg)) == NULL) { goto again; } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void sub0_ctx_send(void *arg, nni_aio *aio) { NNI_ARG_UNUSED(arg); if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } } static void sub0_ctx_close(void *arg) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; nni_aio * aio; nni_mtx_lock(&sock->lk); while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&sock->lk); } static void sub0_ctx_fini(void *arg) { sub0_ctx * ctx = arg; sub0_sock * sock = ctx->sock; sub0_topic *topic; sub0_ctx_close(ctx); nni_mtx_lock(&sock->lk); nni_list_remove(&sock->contexts, ctx); sock->num_contexts--; nni_mtx_unlock(&sock->lk); while ((topic = nni_list_first(&ctx->topics)) != 0) { nni_list_remove(&ctx->topics, topic); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } nni_lmq_fini(&ctx->lmq); } static int sub0_ctx_init(void *ctx_arg, void *sock_arg) { sub0_sock *sock = sock_arg; sub0_ctx * ctx = ctx_arg; size_t len; bool prefer_new; int rv; nni_mtx_lock(&sock->lk); len = sock->recv_buf_len; prefer_new = sock->prefer_new; if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { return (rv); } ctx->prefer_new = prefer_new; nni_aio_list_init(&ctx->recv_queue); NNI_LIST_INIT(&ctx->topics, sub0_topic, node); ctx->sock = sock; nni_list_append(&sock->contexts, ctx); sock->num_contexts++; nni_mtx_unlock(&sock->lk); return (0); } static void sub0_sock_fini(void *arg) { sub0_sock *sock = arg; sub0_ctx_fini(&sock->master); nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->lk); } static int sub0_sock_init(void *arg, nni_sock *unused) { sub0_sock *sock = arg; int rv; NNI_ARG_UNUSED(unused); NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); nni_mtx_init(&sock->lk); sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; nni_pollable_init(&sock->readable); if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { sub0_sock_fini(sock); return (rv); } return (0); } static void sub0_sock_open(void *arg) { NNI_ARG_UNUSED(arg); } static void sub0_sock_close(void *arg) { sub0_sock *sock = arg; sub0_ctx_close(&sock->master); } static void sub0_pipe_stop(void *arg) { sub0_pipe *p = arg; nni_aio_stop(&p->aio_recv); } static void sub0_pipe_fini(void *arg) { sub0_pipe *p = arg; nni_aio_fini(&p->aio_recv); } static int sub0_pipe_init(void *arg, nni_pipe *pipe, void *s) { sub0_pipe *p = arg; nni_aio_init(&p->aio_recv, sub0_recv_cb, p); p->pipe = pipe; p->sub = s; return (0); } static int sub0_pipe_start(void *arg) { sub0_pipe *p = arg; if (nni_pipe_peer(p->pipe) != NNI_PROTO_PUB_V0) { // Peer protocol mismatch. return (NNG_EPROTO); } nni_pipe_recv(p->pipe, &p->aio_recv); return (0); } static void sub0_pipe_close(void *arg) { sub0_pipe *p = arg; nni_aio_close(&p->aio_recv); } static bool sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len) { sub0_topic *topic; // This is a naive and trivial matcher. Replace with a real // patricia trie later. NNI_LIST_FOREACH (&ctx->topics, topic) { if (len < topic->len) { continue; } if ((topic->len == 0) || (memcmp(topic->buf, body, topic->len) == 0)) { return (true); } } return (false); } static void sub0_recv_cb(void *arg) { sub0_pipe *p = arg; sub0_sock *sock = p->sub; sub0_ctx * ctx; nni_msg * msg; size_t len; uint8_t * body; nni_list finish; nng_aio * aio; nni_msg * dup_msg; if (nni_aio_result(&p->aio_recv) != 0) { nni_pipe_close(p->pipe); return; } nni_aio_list_init(&finish); msg = nni_aio_get_msg(&p->aio_recv); nni_aio_set_msg(&p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); body = nni_msg_body(msg); len = nni_msg_len(msg); dup_msg = NULL; nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. NNI_LIST_FOREACH (&sock->contexts, ctx) { bool queued = false; if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. continue; } if (!sub0_matches(ctx, body, len)) { continue; } // This is a performance optimization, that ensures we // do not duplicate a message in the common case, where there // is only a single context. if (sock->num_contexts > 1) { if (nni_msg_dup(&dup_msg, msg) != 0) { // if we cannot dup it, continue on continue; } } else { // We only have one context, so it's the only // possible message. dup_msg = msg; } if (!nni_list_empty(&ctx->recv_queue)) { aio = nni_list_first(&ctx->recv_queue); nni_list_remove(&ctx->recv_queue, aio); nni_aio_set_msg(aio, dup_msg); // Save for synchronous completion nni_list_append(&finish, aio); } else if (nni_lmq_full(&ctx->lmq)) { // Make space for the new message. nni_msg *old; (void) nni_lmq_getq(&ctx->lmq, &old); nni_msg_free(old); (void) nni_lmq_putq(&ctx->lmq, dup_msg); queued = true; } else { (void) nni_lmq_putq(&ctx->lmq, dup_msg); queued = true; } if (queued && ctx == &sock->master) { nni_pollable_raise(&sock->readable); } } nni_mtx_unlock(&sock->lk); // NB: This is slightly less efficient in that we may have // created an extra copy in the face of e.g. two subscriptions, // but optimizing this further would require checking the subscription // list twice, adding complexity. If this turns out to be a problem // we could probably add some other sophistication with a counter // and flags on the contexts themselves. if (msg != dup_msg) { // If we didn't just use the message, then free our copy. nni_msg_free(msg); } while ((aio = nni_list_first(&finish)) != NULL) { nni_list_remove(&finish, aio); nni_aio_finish_sync(aio, 0, len); } nni_pipe_recv(p->pipe, &p->aio_recv); } static int sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; int val; nni_mtx_lock(&sock->lk); val = (int) nni_lmq_cap(&ctx->lmq); nni_mtx_unlock(&sock->lk); return (nni_copyout_int(val, buf, szp, t)); } static int sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; int val; int rv; if ((rv = nni_copyin_int(&val, buf, sz, 1, 8192, t)) != 0) { return (rv); } nni_mtx_lock(&sock->lk); if ((rv = nni_lmq_resize(&ctx->lmq, (size_t) val)) != 0) { nni_mtx_unlock(&sock->lk); return (rv); } // If we change the socket, then this will change the queue for // any new contexts. (Previously constructed contexts are unaffected.) if (&sock->master == ctx) { sock->recv_buf_len = (size_t) val; } nni_mtx_unlock(&sock->lk); return (0); } // For now we maintain subscriptions on a sorted linked list. As we do not // expect to have huge numbers of subscriptions, and as the operation is // really O(n), we think this is acceptable. In the future we might decide // to replace this with a patricia trie, like old nanomsg had. static int sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock * sock = ctx->sock; sub0_topic *topic; sub0_topic *new_topic; NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { if (topic->len != sz) { continue; } if (memcmp(topic->buf, buf, sz) == 0) { // Already have it. nni_mtx_unlock(&sock->lk); return (0); } } if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { nni_mtx_unlock(&sock->lk); return (NNG_ENOMEM); } if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { nni_mtx_unlock(&sock->lk); NNI_FREE_STRUCT(new_topic); return (NNG_ENOMEM); } if (buf && new_topic->buf) { memcpy(new_topic->buf, buf, sz); } new_topic->len = sz; nni_list_append(&ctx->topics, new_topic); nni_mtx_unlock(&sock->lk); return (0); } static int sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock * sock = ctx->sock; sub0_topic *topic; size_t len; NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); NNI_LIST_FOREACH (&ctx->topics, topic) { if (topic->len != sz) { continue; } if (memcmp(topic->buf, buf, sz) == 0) { // Matched! break; } } if (topic == NULL) { nni_mtx_unlock(&sock->lk); return (NNG_ENOENT); } nni_list_remove(&ctx->topics, topic); // Now we need to make sure that any messages that are waiting still // match the subscription. We basically just run through the queue // and requeue those messages we need. len = nni_lmq_len(&ctx->lmq); for (size_t i = 0; i < len; i++) { nni_msg *msg; (void) nni_lmq_getq(&ctx->lmq, &msg); if (sub0_matches(ctx, nni_msg_body(msg), nni_msg_len(msg))) { (void) nni_lmq_putq(&ctx->lmq, msg); } else { nni_msg_free(msg); } } nni_mtx_unlock(&sock->lk); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); return (0); } static int sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; bool val; nni_mtx_lock(&sock->lk); val = ctx->prefer_new; nni_mtx_unlock(&sock->lk); return (nni_copyout_bool(val, buf, szp, t)); } static int sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; bool val; int rv; if ((rv = nni_copyin_bool(&val, buf, sz, t)) != 0) { return (rv); } nni_mtx_lock(&sock->lk); ctx->prefer_new = val; if (&sock->master == ctx) { sock->prefer_new = val; } nni_mtx_unlock(&sock->lk); return (0); } static nni_option sub0_ctx_options[] = { { .o_name = NNG_OPT_RECVBUF, .o_get = sub0_ctx_get_recv_buf_len, .o_set = sub0_ctx_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_SUBSCRIBE, .o_set = sub0_ctx_subscribe, }, { .o_name = NNG_OPT_SUB_UNSUBSCRIBE, .o_set = sub0_ctx_unsubscribe, }, { .o_name = NNG_OPT_SUB_PREFNEW, .o_get = sub0_ctx_get_prefer_new, .o_set = sub0_ctx_set_prefer_new, }, { .o_name = NULL, }, }; static void sub0_sock_send(void *arg, nni_aio *aio) { NNI_ARG_UNUSED(arg); if (nni_aio_begin(aio) == 0) { nni_aio_finish_error(aio, NNG_ENOTSUP); } } static void sub0_sock_recv(void *arg, nni_aio *aio) { sub0_sock *sock = arg; sub0_ctx_recv(&sock->master, aio); } static int sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { sub0_sock *sock = arg; int rv; int fd; if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); } static int sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); } static int sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); } static int sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); } static int sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); } static int sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); } static int sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); } // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops sub0_pipe_ops = { .pipe_size = sizeof(sub0_pipe), .pipe_init = sub0_pipe_init, .pipe_fini = sub0_pipe_fini, .pipe_start = sub0_pipe_start, .pipe_close = sub0_pipe_close, .pipe_stop = sub0_pipe_stop, }; static nni_proto_ctx_ops sub0_ctx_ops = { .ctx_size = sizeof(sub0_ctx), .ctx_init = sub0_ctx_init, .ctx_fini = sub0_ctx_fini, .ctx_send = sub0_ctx_send, .ctx_recv = sub0_ctx_recv, .ctx_options = sub0_ctx_options, }; static nni_option sub0_sock_options[] = { { .o_name = NNG_OPT_SUB_SUBSCRIBE, .o_set = sub0_sock_subscribe, }, { .o_name = NNG_OPT_SUB_UNSUBSCRIBE, .o_set = sub0_sock_unsubscribe, }, { .o_name = NNG_OPT_RECVFD, .o_get = sub0_sock_get_recv_fd, }, { .o_name = NNG_OPT_RECVBUF, .o_get = sub0_sock_get_recv_buf_len, .o_set = sub0_sock_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_PREFNEW, .o_get = sub0_sock_get_prefer_new, .o_set = sub0_sock_set_prefer_new, }, // terminate list { .o_name = NULL, }, }; static nni_proto_sock_ops sub0_sock_ops = { .sock_size = sizeof(sub0_sock), .sock_init = sub0_sock_init, .sock_fini = sub0_sock_fini, .sock_open = sub0_sock_open, .sock_close = sub0_sock_close, .sock_send = sub0_sock_send, .sock_recv = sub0_sock_recv, .sock_options = sub0_sock_options, }; static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, .proto_flags = NNI_PROTO_FLAG_RCV, .proto_sock_ops = &sub0_sock_ops, .proto_pipe_ops = &sub0_pipe_ops, .proto_ctx_ops = &sub0_ctx_ops, }; int nng_sub0_open(nng_socket *sock) { return (nni_proto_open(sock, &sub0_proto)); }