add remove key(s) and RequestWithTimeout
| | |
| | | return fn_dgram_socket_sub_nowait(s, topic, size, port); |
| | | } |
| | | |
| | | int wrap_fn_dgram_socket_desub(hcsoftbus lib, void *s, void *topic, int size, int port){ |
| | | if (!fn_dgram_socket_desub){ |
| | | fn_dgram_socket_desub = (tfn_dgram_socket_desub)dlsym(lib, l_dgram_socket_desub); |
| | | check_with_ret(fn_dgram_socket_desub, lib, -1); |
| | | } |
| | | return fn_dgram_socket_desub(s, topic, size, port); |
| | | } |
| | | |
| | | int wrap_fn_dgram_socket_desub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec){ |
| | | if (!fn_dgram_socket_desub_timeout){ |
| | | fn_dgram_socket_desub_timeout = (tfn_dgram_socket_desub_timeout)dlsym(lib, l_dgram_socket_desub_timeout); |
| | | check_with_ret(fn_dgram_socket_desub_timeout, lib, -1); |
| | | } |
| | | return fn_dgram_socket_desub_timeout(s, topic, size, port, sec, usec); |
| | | } |
| | | |
| | | int wrap_fn_dgram_socket_desub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port){ |
| | | if (!fn_dgram_socket_desub_nowait){ |
| | | fn_dgram_socket_desub_nowait = (tfn_dgram_socket_desub_nowait)dlsym(lib, l_dgram_socket_desub_nowait); |
| | | check_with_ret(fn_dgram_socket_desub_nowait, lib, -1); |
| | | } |
| | | return fn_dgram_socket_desub_nowait(s, topic, size, port); |
| | | } |
| | | |
| | | int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port){ |
| | | if (!fn_dgram_socket_pub){ |
| | | fn_dgram_socket_pub = (tfn_dgram_socket_pub)dlsym(lib, l_dgram_socket_pub); |
| | |
| | | } |
| | | } |
| | | |
| | | int wrap_fn_dgram_remove_key(hcsoftbus lib, int key){ |
| | | if (!fn_dgram_remove_key){ |
| | | fn_dgram_remove_key = (tfn_dgram_remove_key)dlsym(lib, l_dgram_remove_key); |
| | | check_with_ret(fn_dgram_remove_key, lib, -1); |
| | | } |
| | | return fn_dgram_remove_key(key); |
| | | } |
| | | |
| | | int wrap_fn_dgram_remove_keys(hcsoftbus lib, void *keys, int length){ |
| | | if (!fn_dgram_remove_keys){ |
| | | fn_dgram_remove_keys = (tfn_dgram_remove_keys)dlsym(lib, l_dgram_remove_keys); |
| | | check_with_ret(fn_dgram_remove_keys, lib, -1); |
| | | } |
| | | return fn_dgram_remove_keys((int*)keys, length); |
| | | } |
| | |
| | | static tfn_dgram_socket_sub fn_dgram_socket_sub = NULL; |
| | | static tfn_dgram_socket_sub_timeout fn_dgram_socket_sub_timeout = NULL; |
| | | static tfn_dgram_socket_sub_nowait fn_dgram_socket_sub_nowait = NULL; |
| | | static tfn_dgram_socket_desub fn_dgram_socket_desub = NULL; |
| | | static tfn_dgram_socket_desub_timeout fn_dgram_socket_desub_timeout = NULL; |
| | | static tfn_dgram_socket_desub_nowait fn_dgram_socket_desub_nowait = NULL; |
| | | static tfn_dgram_socket_pub fn_dgram_socket_pub = NULL; |
| | | static tfn_dgram_socket_pub_timeout fn_dgram_socket_pub_timeout = NULL; |
| | | static tfn_dgram_socket_pub_nowait fn_dgram_socket_pub_nowait = NULL; |
| | | static tfn_dgram_socket_port fn_dgram_socket_port = NULL; |
| | | static tfn_dgram_socket_free fn_dgram_socket_free = NULL; |
| | | |
| | | static tfn_dgram_remove_key fn_dgram_remove_key = NULL; |
| | | static tfn_dgram_remove_keys fn_dgram_remove_keys = NULL; |
| | | ////////////////////////////////////////////////////////////////////// |
| | | |
| | | // labels |
| | |
| | | const static char l_dgram_socket_sub[] = "dgram_mod_sub"; |
| | | const static char l_dgram_socket_sub_timeout[] = "dgram_mod_sub_timeout"; |
| | | const static char l_dgram_socket_sub_nowait[] = "dgram_mod_sub_nowait"; |
| | | const static char l_dgram_socket_desub[] = "dgram_mod_desub"; |
| | | const static char l_dgram_socket_desub_timeout[] = "dgram_mod_desub_timeout"; |
| | | const static char l_dgram_socket_desub_nowait[] = "dgram_mod_desub_nowait"; |
| | | const static char l_dgram_socket_pub[] = "dgram_mod_pub"; |
| | | const static char l_dgram_socket_pub_timeout[] = "dgram_mod_pub_timeout"; |
| | | const static char l_dgram_socket_pub_nowait[] = "dgram_mod_pub_nowait"; |
| | | const static char l_dgram_socket_port[] = "dgram_mod_get_port"; |
| | | const static char l_dgram_socket_free[] = "dgram_mod_free"; |
| | | |
| | | const static char l_dgram_remove_key[] = "dgram_mod_remove_key"; |
| | | const static char l_dgram_remove_keys[] = "dgram_mod_remove_keys"; |
| | | ////////////////////////////////////////////////////////////////////// |
| | | |
| | | // dlopen dynamic library |
| | |
| | | int wrap_fn_dgram_socket_sub(hcsoftbus lib, void *s, void *topic, int size, int port); |
| | | int wrap_fn_dgram_socket_sub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec); |
| | | int wrap_fn_dgram_socket_sub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port); |
| | | int wrap_fn_dgram_socket_desub(hcsoftbus lib, void *s, void *topic, int size, int port); |
| | | int wrap_fn_dgram_socket_desub_timeout(hcsoftbus lib, void *s, void *topic, int size, int port, int sec, int usec); |
| | | int wrap_fn_dgram_socket_desub_nowait(hcsoftbus lib, void *s, void *topic, int size, int port); |
| | | int wrap_fn_dgram_socket_pub(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port); |
| | | int wrap_fn_dgram_socket_pub_timeout(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port, int sec, int usec); |
| | | int wrap_fn_dgram_socket_pub_nowait(hcsoftbus lib, void *s, void *topic, int tsize, void *content, int csize, int port); |
| | | int wrap_fn_dgram_socket_port(hcsoftbus lib, void *s); |
| | | void wrap_fn_dgram_socket_free(hcsoftbus lib, void *buf); |
| | | int wrap_fn_dgram_remove_key(hcsoftbus lib, int key); |
| | | int wrap_fn_dgram_remove_keys(hcsoftbus lib, void *keys, int length); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | |
| | | typedef int(*tfn_dgram_socket_sub_timeout) (void*, void*, int, int, int, int); |
| | | typedef tfn_dgram_socket_sub tfn_dgram_socket_sub_nowait; |
| | | /** |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | typedef tfn_dgram_socket_sub tfn_dgram_socket_desub; |
| | | typedef tfn_dgram_socket_sub_timeout tfn_dgram_socket_desub_timeout; |
| | | typedef tfn_dgram_socket_sub_nowait tfn_dgram_socket_desub_nowait; |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | |
| | | */ |
| | | typedef void(*tfn_dgram_socket_free) (void*); |
| | | |
| | | /** |
| | | * 删除key对应的共享队列,并在bus里删除该key的订阅 |
| | | */ |
| | | |
| | | typedef int(*tfn_dgram_remove_key) (int); |
| | | /** |
| | | * 批量删除key对应的共享队列,并在bus里删除该key的订阅 |
| | | */ |
| | | typedef int(*tfn_dgram_remove_keys) (int*, int); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | |
| | | return &ret |
| | | } |
| | | |
| | | // RequestWithTimeout req sync |
| | | func (h *Handle) RequestWithTimeout(key int, info *MsgInfo, timeout int) *MsgInfo { |
| | | h.mtxWorker.Lock() |
| | | defer h.mtxWorker.Unlock() |
| | | |
| | | msg, err := proto.Marshal(info) |
| | | if err != nil { |
| | | return nil |
| | | } |
| | | |
| | | until := (float32)(timeout) |
| | | one := (float32)(timeoutSec) + ((float32)(timeoutUsec) / 1000000) |
| | | fc := until / one |
| | | |
| | | count := (int)(fc) |
| | | |
| | | try := 0 |
| | | |
| | | // 同步接口,需要等待返回值 |
| | | var ret MsgInfo |
| | | loop: |
| | | for { |
| | | select { |
| | | case <-h.ctx.Done(): |
| | | return nil |
| | | default: |
| | | if data, err := h.sockWorker.sock.SendAndRecvTimeout(msg, key, timeoutSec, timeoutUsec); err == nil { |
| | | if err := proto.Unmarshal(data, &ret); err == nil { |
| | | break loop |
| | | } else { |
| | | try++ |
| | | if try > count { |
| | | return nil |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return &ret |
| | | } |
| | | |
| | | // Reply request |
| | | func (h *Handle) Reply(key int, info *MsgInfo) error { |
| | | msg, err := proto.Marshal(info) |
| | |
| | | /* |
| | | #include <stdlib.h> |
| | | #include "libcsoftbus.h" |
| | | extern void *get_array(const int size); |
| | | extern void set_array(void *arr, const int index, const int value); |
| | | */ |
| | | import "C" |
| | | import ( |
| | |
| | | return int(r) |
| | | } |
| | | |
| | | // Desub remove sub |
| | | func (d *DgramSocket) Desub(topic string, port int) int { |
| | | if libsoftbus == nil { |
| | | return RETVAL |
| | | } |
| | | |
| | | ct := C.CString(topic) |
| | | defer C.free(unsafe.Pointer(ct)) |
| | | |
| | | r := C.wrap_fn_dgram_socket_desub(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port)) |
| | | return int(r) |
| | | } |
| | | |
| | | // DesubTimeout timeout |
| | | func (d *DgramSocket) DesubTimeout(topic string, port int, sec, usec int) int { |
| | | if libsoftbus == nil { |
| | | return RETVAL |
| | | } |
| | | |
| | | ct := C.CString(topic) |
| | | defer C.free(unsafe.Pointer(ct)) |
| | | |
| | | r := C.wrap_fn_dgram_socket_desub_timeout(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port), C.int(sec), C.int(usec)) |
| | | return int(r) |
| | | } |
| | | |
| | | // DesubNoWait remove sub |
| | | func (d *DgramSocket) DesubNoWait(topic string, port int) int { |
| | | if libsoftbus == nil { |
| | | return RETVAL |
| | | } |
| | | |
| | | ct := C.CString(topic) |
| | | defer C.free(unsafe.Pointer(ct)) |
| | | |
| | | r := C.wrap_fn_dgram_socket_desub_nowait(libsoftbus, d.dgram, unsafe.Pointer(ct), C.int(len(topic)), C.int(port)) |
| | | return int(r) |
| | | } |
| | | |
| | | // Pub bus |
| | | func (d *DgramSocket) Pub(topic, msg string, port int) int { |
| | | if libsoftbus == nil { |
| | |
| | | r := C.wrap_fn_dgram_socket_port(libsoftbus, d.dgram) |
| | | return int(r) |
| | | } |
| | | |
| | | // RemoveKey rm key |
| | | func (d *DgramSocket) RemoveKey(key int) int { |
| | | if libsoftbus == nil { |
| | | return RETVAL |
| | | } |
| | | |
| | | r := C.wrap_fn_dgram_remove_key(libsoftbus, C.int(key)) |
| | | return int(r) |
| | | } |
| | | |
| | | // RemoveKeys rm key |
| | | func (d *DgramSocket) RemoveKeys(keys []int) int { |
| | | if libsoftbus == nil { |
| | | return RETVAL |
| | | } |
| | | |
| | | l := len(keys) |
| | | ca := C.get_array(C.int(l)) |
| | | defer C.free(ca) |
| | | for k, v := range keys { |
| | | C.set_array(ca, C.int(k), C.int(v)) |
| | | } |
| | | |
| | | r := C.wrap_fn_dgram_remove_keys(libsoftbus, ca, C.int(l)) |
| | | return int(r) |
| | | } |