#include "bus_server_socket.h" #include "shm_mod_socket.h" #include "shm_mm_wrapper.h" #include "usg_common.h" #include "mm.h" #include "logger_factory.h" #include #include #include #include #include "bus_error.h" #include "bh_api.h" #include "proc_def.h" #define TOTAL_REG_UNREG 2 #define MAGIC_STR "INTER_" #define STR_LEN 30 pthread_t tids; void *res; static ProcInfo proc_desc; char *genStr(int length, char *buf) { int flag, i; char *str; if (length < (strlen(buf) + 10)) { length = strlen(buf) + 10; } srand((unsigned) time(NULL)); if ((str = (char *)malloc(length + 1)) == NULL) { printf("out of memory!\n"); exit(0); } memset(str, 0x00, length + 1); memcpy(str, MAGIC_STR, strlen(MAGIC_STR)); strcat(str, buf); for (i = strlen(str); i < length; i++) { flag = rand() % 3; switch (flag) { case 0: str[i] = 'A' + rand() % 26; break; case 1: str[i] = 'a' + rand() % 26; break; default: str[i] = '0' + rand() % 10; break; } } str[length] = '\0'; return str; } void *client_recv(void *skptr) { pthread_detach(pthread_self()); void *recvbuf = NULL; int recv_len; void *proc_id = NULL; int id_len; int rv; void *errBuf = NULL; int len; char proc_data[200] = { 0x00 }; char topics[200] = { 0x00 }; struct timespec timeout = {2, 0}; while (true) { rv = BHReadSub(&proc_id, &id_len, &recvbuf, &recv_len, -1); if(rv == true) { memset(topics, 0x00, sizeof(topics)); memset(proc_data, 0x00, sizeof(proc_data)); memcpy(proc_data, proc_id, (sizeof(proc_data) - 1) > id_len ? id_len : (sizeof(proc_data) - 1)); memcpy(topics, recvbuf, (sizeof(topics) - 1) > recv_len ? recv_len : (sizeof(topics) - 1)); printf("Get the sub topics data(%s) from proc id(%s)\n", (char *)topics, (char *)proc_id); BHFree(recvbuf, len); BHFree(proc_id, id_len); } else { BHGetLastError(&errBuf, &len); printf("the thread recv fail with error: %s\n", (char *)errBuf); BHFree(errBuf, len); } } } void parseQueryTopicsBuf(void *buf, int len) { if (buf == NULL) return; int total_topics = *(int *)buf; int i, j; int buf_pos = 0; void *ptr_temp = NULL; ProcInfo_query *ptr = NULL; ProcInfo *Proc_ptr = NULL; buf_pos = sizeof(ProcInfo_query); ptr = (ProcInfo_query *)((char *)buf + sizeof(int)); ptr_temp = (void *)ptr; for (i = 0; i < total_topics; i++) { printf("topic %s:\n", ptr->name); for (j = 0; j < ptr->num; j++) { printf("the %dst process info:\n", j + 1); Proc_ptr = &(ptr->procData) + j; printf("proc_id: %s\n", Proc_ptr->proc_id); printf("name: %s\n", Proc_ptr->name); printf("public_info: %s\n", Proc_ptr->public_info); printf("private_info: %s\n", Proc_ptr->private_info); } if (ptr->num > 1) { buf_pos += sizeof(ProcInfo) * (ptr->num - 1); } ptr = (ProcInfo_query *)((char *)ptr_temp + buf_pos); } } void parseQueryProcBuf(void *buf, int len) { if (buf == NULL) return; int total = *(int *)buf; int i; ProcInfo_sum *Proc_ptr = NULL; Proc_ptr = (ProcInfo_sum *)((char *)buf + sizeof(int)); for (i = 0; i < total; i++) { printf("proc_id: %s\n", (Proc_ptr + i)->procData.proc_id); printf("name: %s\n", (Proc_ptr + i)->procData.name); printf("public_info: %s\n", (Proc_ptr + i)->procData.public_info); printf("private_info: %s\n", (Proc_ptr + i)->procData.private_info); printf("service: %s\n", (Proc_ptr + i)->reg_info); printf("sub: %s\n", (Proc_ptr + i)->local_info); } } int main(int argc, char *argv[]) { int ret; char *ptr = NULL; char buf[] = "Process"; void *buf_temp = NULL; void *errBuf = NULL; void *proc_id = NULL; int size; int id_len; int i; char data_buf[200] = { 0x00 }; const int timeout_ms = 3000; memset(&proc_desc, 0x00, sizeof(ProcInfo)); ptr = genStr(STR_LEN, buf); strncpy(proc_desc.proc_id, ptr, strlen(ptr) + 1); //strncpy(proc_desc.proc_id, "Hello", strlen("Hello") + 1); free(ptr); sleep(2); //make rand change ptr = genStr(STR_LEN, buf); strncpy(proc_desc.name, ptr, strlen(ptr) + 1); //strncpy(proc_desc.name, "World", strlen("World") + 1); free(ptr); sleep(2); ptr = genStr(STR_LEN, buf); //strncpy(proc_desc.public_info, ptr, strlen(ptr) + 1); strncpy(proc_desc.public_info, "Good", strlen("Good") + 1); free(ptr); sleep(2); ptr = genStr(STR_LEN, buf); //strncpy(proc_desc.private_info, ptr, strlen(ptr) + 1); //strncpy(proc_desc.private_info, "Bye", strlen("Bye") + 1); free(ptr); printf("before the registered, process info:\n"); printf("proc_id: %s\n", proc_desc.proc_id); printf("name: %s\n", proc_desc.name); printf("public_info: %s\n", proc_desc.public_info); printf("private_info: %s\n", proc_desc.private_info); for (i = 0; i < TOTAL_REG_UNREG; i++) { ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); printf("the second way to get the error log: %s\n", buf_temp); }; ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process unregistered OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process unregistered fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); printf("the second way to get the error log: %s\n", buf_temp); }; } //const char *topics_reg_buf1[] = {"topics demo1"}; const char *topics_reg_buf1 = "topics demo1"; const char *topics_query_buf1 = "topics demo1"; const char *topics_query_buf2 = "Hello World,So,Great,Good"; //No space between each other //const char *topics_reg_buf2[] = {"Hello World", "So", "Great", "Good"}; const char *topics_reg_buf2 = "Hello World,So,Great,Good"; //const char *topics_sub_buf1[] = {"news"}; //const char *topics_sub_buf2[] = {"sports", "balls", "topics demo1"}; const char *topics_sub_buf1 = "news"; const char *topics_sub_buf2 = "sports,balls,topics demo1"; const char *topics_pub_topic1 = "news"; const char *topics_pub_topic1_data = "boob news"; const char *topics_pub_topic2 = "balls"; const char *topics_pub_topic2_data = "Great volleyballs and basketballs"; ret = BHRegister(&proc_desc, (const int)sizeof(ProcInfo), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered fail with error: %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered2 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHQueryTopicAddress(NULL, 0, topics_query_buf1, strlen(topics_query_buf1), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process query topics OKay\n"); parseQueryTopicsBuf(buf_temp, size); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process query3 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHQueryTopicAddress(NULL, 0, topics_query_buf2, strlen(topics_query_buf2), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process query topics OKay\n"); parseQueryTopicsBuf(buf_temp, size); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process query4 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; pthread_create(&tids, NULL, client_recv, NULL); ret = BHRegisterTopics(topics_reg_buf1, strlen(topics_reg_buf1), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered1 fail with errorL %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHSubscribeTopics(topics_sub_buf1, strlen(topics_sub_buf1), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process subscribe topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process sub1 fail with error: %s(%s)\n", (char *)errBuf, buf_temp); BHFree(errBuf, size); }; ret = BHSubscribeTopics(topics_sub_buf2, strlen(topics_sub_buf2), &buf_temp, &size, timeout_ms); if (ret == true) { printf("tthe process subscribe topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process sub2 fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; ret = BHRegisterTopics(topics_reg_buf2, strlen(topics_reg_buf2), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process registered topics OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process registered10 fail with errorL %s\n", (char *)errBuf); BHFree(errBuf, size); }; const char *topics_server_specific_reg_buf1 = "Server Specific topics demo1"; const char *topics_server_specific_reg_buf2 = "Server Specific Hello World"; void *msgID = NULL; int msg_id_len; ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &msgID, &msg_id_len); if (ret == true) { printf("the process BHAsyncRequest topics OKay\n"); BHFree(msgID, msg_id_len); } else { BHGetLastError(&errBuf, &size); printf("the process BHAsyncRequest1 topics fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; ret = BHAsyncRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), NULL, 0); if (ret == true) { printf("the process BHAsyncRequest topics OKay\n"); } else { BHGetLastError(&errBuf, &size); printf("the process BHAsyncRequest2 topics fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; ret = BHRequest(NULL, 0, topics_server_specific_reg_buf1, strlen(topics_server_specific_reg_buf1), &proc_id, &id_len, &buf_temp, &size, timeout_ms); if (ret == true) { memset(data_buf, 0x00, sizeof(data_buf)); strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); printf("the process BHRequest topics OKay\n"); printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process BHRequest topics fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); } ret = BHRequest(NULL, 0, topics_server_specific_reg_buf2, strlen(topics_server_specific_reg_buf2), &proc_id, &id_len, &buf_temp, &size, timeout_ms); if (ret == true) { memset(data_buf, 0x00, sizeof(data_buf)); strncpy(data_buf, (char *)buf_temp, (sizeof(data_buf) - 1) > size ? size : (sizeof(data_buf) - 1)); printf("the process BHRequest topics OKay\n"); printf("the response data(%s) from procid(%s)\n", data_buf, (char *)proc_id); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process BHRequest2 topics fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; #if !defined(PRO_DE_SERIALIZE) ret = BHPublish(topics_pub_topic1, topics_pub_topic1_data, timeout_ms); if (ret == true) { printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic1, topics_pub_topic1_data); } else { printf("the process published1 fail\n"); }; ret = BHPublish(topics_pub_topic2, topics_pub_topic2_data, timeout_ms); if (ret == true) { printf("the process publish topic(%s) and content(%s) OKay\n", topics_pub_topic2, topics_pub_topic2_data); } else { printf("the process published2 fail\n"); }; #endif memset(data_buf, 0x00, sizeof(data_buf)); strcpy(data_buf, "query the process"); ret = BHQueryProcs(NULL, 0, data_buf, strlen(data_buf), &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process query all the process data OKay\n"); parseQueryProcBuf(buf_temp, size); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process query proc fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; #if 1 while(1) { sleep(1); } #else /*if the process will exit finally, we must call BHUnregister to release the resources*/ ret = BHUnregister(NULL, 0, &buf_temp, &size, timeout_ms); if (ret == true) { printf("the process unregistered OKay\n"); BHFree(buf_temp, size); } else { BHGetLastError(&errBuf, &size); printf("the process unregistered fail with error: %s\n", (char *)errBuf); BHFree(errBuf, size); }; #endif return 0; }