From 2f0244af1171e91aa7f7ff6a80c843f0ae10ebe2 Mon Sep 17 00:00:00 2001 From: Bill Williams Date: Fri, 30 Nov 2018 09:35:05 -0800 Subject: [PATCH 1/5] make client list and nn_sends thread safe --- src/client_list.c | 12 +++++++++++- src/client_list.h | 3 +++ src/downstream.c | 5 ++++- src/service_alive.c | 3 +++ src/upstream.c | 2 ++ tests/CMakeLists.txt | 3 ++- tests/test_downstream.c | 6 ++++++ tests/test_downstream_more.c | 7 +++++++ tests/test_upstream.c | 6 ++++++ 9 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/client_list.c b/src/client_list.c index 7012c52..21c666b 100644 --- a/src/client_list.c +++ b/src/client_list.c @@ -30,6 +30,7 @@ /*----------------------------------------------------------------------------*/ static int numOfClients = 0; static reg_list_item_t * g_head = NULL; +pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; /*----------------------------------------------------------------------------*/ /* External functions */ @@ -40,6 +41,11 @@ reg_list_item_t * get_global_node(void) return g_head; } +pthread_mutex_t *get_global_client_mut(void) +{ + return &client_mut; +} + int get_numOfClients() { return numOfClients; @@ -52,6 +58,7 @@ int addToList( wrp_msg_t **msg) int rc = -1; int sock; int retStatus = -1; + sock = nn_socket( AF_SP, NN_PUSH ); ParodusPrint("sock created for adding entries to list: %d\n", sock); if(sock >= 0) @@ -214,7 +221,7 @@ int deleteFromList(char* service_name) curr_node = NULL; ParodusInfo("Deleted successfully and returning..\n"); numOfClients =numOfClients - 1; - ParodusPrint("numOfClients after delte is %d\n", numOfClients); + ParodusPrint("numOfClients after delete is %d\n", numOfClients); return 0; } @@ -234,6 +241,7 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) { int bytes =0; reg_list_item_t *temp = NULL; + pthread_mutex_lock (&client_mut); temp = get_global_node(); //Checking for individual clients & Sending msg to registered client while (NULL != temp) @@ -243,6 +251,7 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) if( strcmp(dest, temp->service_name) == 0) { bytes = nn_send(temp->sock, *Msg, msgSize, 0); + pthread_mutex_unlock (&client_mut); ParodusInfo("sent downstream message to reg_client '%s'\n", temp->url); ParodusPrint("downstream bytes sent:%d\n", bytes); return 1; @@ -250,5 +259,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } + pthread_mutex_unlock (&client_mut); return 0; } diff --git a/src/client_list.h b/src/client_list.h index f3f2a33..4bd3745 100644 --- a/src/client_list.h +++ b/src/client_list.h @@ -23,6 +23,8 @@ #ifndef _CLIENTLIST_H_ #define _CLIENTLIST_H_ +#include + /*----------------------------------------------------------------------------*/ /* Data Structures */ /*----------------------------------------------------------------------------*/ @@ -53,6 +55,7 @@ int get_numOfClients(); int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize); reg_list_item_t * get_global_node(void); +pthread_mutex_t * get_global_client_mut(void); #ifdef __cplusplus } diff --git a/src/downstream.c b/src/downstream.c index aabb327..c7fadcf 100644 --- a/src/downstream.c +++ b/src/downstream.c @@ -120,7 +120,9 @@ void listenerOnMessage(void * msg, size_t msgSize) ((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid))); free(destVal); - temp = get_global_node(); + + pthread_mutex_lock (get_global_client_mut()); + temp = get_global_node(); //Checking for individual clients & Sending to each client while (NULL != temp) @@ -139,6 +141,7 @@ void listenerOnMessage(void * msg, size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } + pthread_mutex_unlock (get_global_client_mut()); /* check Downstream dest for CRUD requests */ if(destFlag ==0 && strcmp("parodus", dest)==0) diff --git a/src/service_alive.c b/src/service_alive.c index cb90a39..96dfc3c 100644 --- a/src/service_alive.c +++ b/src/service_alive.c @@ -61,6 +61,7 @@ void *serviceAliveTask() if(get_numOfClients() > 0) { //sending svc msg to all the clients every 30s + pthread_mutex_lock (get_global_client_mut()); temp = get_global_node(); size = (size_t) nbytes; while(NULL != temp) @@ -91,6 +92,8 @@ void *serviceAliveTask() temp= temp->next; } } + pthread_mutex_unlock (get_global_client_mut()); + ParodusPrint("Waiting for 30s to send keep alive msg \n"); sleep(KEEPALIVE_INTERVAL_SEC); } diff --git a/src/upstream.c b/src/upstream.c index 5fc16f4..f220249 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -226,6 +226,7 @@ void *processUpstreamMessage() { ParodusInfo("\n Nanomsg client Registration for Upstream\n"); //Extract serviceName and url & store it in a linked list for reg_clients + pthread_mutex_lock (get_global_client_mut()); if(get_numOfClients() !=0) { matchFlag = 0; @@ -289,6 +290,7 @@ void *processUpstreamMessage() ParodusPrint("sent auth status to reg client\n"); } } + pthread_mutex_unlock (get_global_client_mut()); } else if(msgType == WRP_MSG_TYPE__EVENT) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 699e38a..d067352 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -219,7 +219,8 @@ target_link_libraries (test_upstream -lcmocka gcov -lcunit -lcimplog # test_downstream #------------------------------------------------------------------------------- add_test(NAME test_downstream COMMAND ${MEMORY_CHECK} ./test_downstream) -add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c) +add_executable(test_downstream test_downstream.c ../src/downstream.c + ../src/string_helpers.c) target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog -lwrp-c -luuid -lpthread -lmsgpackc -lnopoll -Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64 diff --git a/tests/test_downstream.c b/tests/test_downstream.c index fa239b6..c313883 100755 --- a/tests/test_downstream.c +++ b/tests/test_downstream.c @@ -33,6 +33,7 @@ ParodusMsg *ParodusMsgQ; pthread_mutex_t g_mutex; pthread_cond_t g_cond; +pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; int crud_test = 0; /*----------------------------------------------------------------------------*/ /* Mocks */ @@ -61,6 +62,11 @@ reg_list_item_t * get_global_node(void) return mock_ptr_type(reg_list_item_t *); } +pthread_mutex_t *get_global_client_mut(void) +{ + return &client_mut; +} + ssize_t wrp_to_struct( const void *bytes, const size_t length, const enum wrp_format fmt, wrp_msg_t **msg ) { diff --git a/tests/test_downstream_more.c b/tests/test_downstream_more.c index cfdef5e..820a4b0 100644 --- a/tests/test_downstream_more.c +++ b/tests/test_downstream_more.c @@ -40,6 +40,8 @@ typedef struct { /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; + static test_t tests[] = { { .s.msg_type = WRP_MSG_TYPE__CREATE, @@ -244,6 +246,11 @@ reg_list_item_t *get_global_node(void) return NULL; } +pthread_mutex_t *get_global_client_mut(void) +{ + return &client_mut; +} + void wrp_free_struct( wrp_msg_t *msg ) { if( WRP_MSG_TYPE__EVENT == tests[i].s.msg_type ) { diff --git a/tests/test_upstream.c b/tests/test_upstream.c index 68ba3cf..97a2eae 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -45,6 +45,7 @@ int numLoops = 1; wrp_msg_t *temp = NULL; extern pthread_mutex_t nano_mut; extern pthread_cond_t nano_con; +pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; static int crud_test = 0; /*----------------------------------------------------------------------------*/ /* Mocks */ @@ -72,6 +73,11 @@ int get_numOfClients() return (int)mock(); } +pthread_mutex_t *get_global_client_mut(void) +{ + return &client_mut; +} + void addCRUDmsgToQueue(wrp_msg_t *crudMsg) { (void)crudMsg; From c2f6a645699b4c42b832f24444ed8d598b3552e1 Mon Sep 17 00:00:00 2001 From: Bill Williams Date: Fri, 30 Nov 2018 09:38:41 -0800 Subject: [PATCH 2/5] Updated change log for send_mutex change --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f241e29..fd74457 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Partner-id comparison made case insensitive - Reverted from NNG to nanomag (v1.1.2) - reverted temporary CMake reference to https://github.com/bill1600/seshat +- used mutex protection to make client list and nn_sends thread safe + ## [1.0.1] - 2018-07-18 ### Added From 0e6e8333011167506ed41c25468668f42c2e03b0 Mon Sep 17 00:00:00 2001 From: Bill Williams Date: Thu, 13 Dec 2018 13:33:33 -0800 Subject: [PATCH 3/5] put mutex lock in get_global_node --- src/ParodusInternal.h | 3 +- src/client_list.c | 11 ++-- src/client_list.h | 4 +- src/connection.c | 35 +++++++----- src/downstream.c | 3 +- src/partners_check.c | 45 +++++++++------ src/service_alive.c | 7 +-- src/upstream.c | 70 +++++++++++++----------- tests/CMakeLists.txt | 3 +- tests/test_client_list.c | 4 +- tests/test_connection.c | 26 +++++---- tests/test_downstream.c | 4 +- tests/test_downstream_more.c | 5 +- tests/test_upstream.c | 103 +++++++++++++++++++++++++++++++---- 14 files changed, 211 insertions(+), 112 deletions(-) diff --git a/src/ParodusInternal.h b/src/ParodusInternal.h index 2ff01ba..5d9fd11 100644 --- a/src/ParodusInternal.h +++ b/src/ParodusInternal.h @@ -106,7 +106,8 @@ typedef struct { //--- Used in connection.c for backoff delay timer typedef struct { - int max_delay; + int count; + int max_count; int delay; } backoff_timer_t; diff --git a/src/client_list.c b/src/client_list.c index 21c666b..f519a75 100644 --- a/src/client_list.c +++ b/src/client_list.c @@ -38,14 +38,16 @@ pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; reg_list_item_t * get_global_node(void) { + pthread_mutex_lock (&client_mut); return g_head; } -pthread_mutex_t *get_global_client_mut(void) +void release_global_node (void) { - return &client_mut; + pthread_mutex_unlock (&client_mut); } + int get_numOfClients() { return numOfClients; @@ -241,7 +243,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) { int bytes =0; reg_list_item_t *temp = NULL; - pthread_mutex_lock (&client_mut); temp = get_global_node(); //Checking for individual clients & Sending msg to registered client while (NULL != temp) @@ -251,7 +252,7 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) if( strcmp(dest, temp->service_name) == 0) { bytes = nn_send(temp->sock, *Msg, msgSize, 0); - pthread_mutex_unlock (&client_mut); + release_global_node (); ParodusInfo("sent downstream message to reg_client '%s'\n", temp->url); ParodusPrint("downstream bytes sent:%d\n", bytes); return 1; @@ -259,6 +260,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } - pthread_mutex_unlock (&client_mut); + release_global_node (); return 0; } diff --git a/src/client_list.h b/src/client_list.h index 4bd3745..5d58f57 100644 --- a/src/client_list.h +++ b/src/client_list.h @@ -23,8 +23,6 @@ #ifndef _CLIENTLIST_H_ #define _CLIENTLIST_H_ -#include - /*----------------------------------------------------------------------------*/ /* Data Structures */ /*----------------------------------------------------------------------------*/ @@ -55,7 +53,7 @@ int get_numOfClients(); int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize); reg_list_item_t * get_global_node(void); -pthread_mutex_t * get_global_client_mut(void); +void release_global_node (void); #ifdef __cplusplus } diff --git a/src/connection.c b/src/connection.c index 3213fb1..52b47fd 100644 --- a/src/connection.c +++ b/src/connection.c @@ -192,19 +192,20 @@ int check_timer_expired (expire_timer_t *timer, long timeout_ms) } //-------------------------------------------------------------------- -void init_backoff_timer (backoff_timer_t *timer, int max_delay) +void init_backoff_timer (backoff_timer_t *timer, int max_count) { - timer->max_delay = max_delay; + timer->count = 1; + timer->max_count = max_count; timer->delay = 1; } int update_backoff_delay (backoff_timer_t *timer) { - if (timer->delay < timer->max_delay) + if (timer->count < timer->max_count) { + timer->count += 1; timer->delay = timer->delay + timer->delay + 1; // 3,7,15,31 .. - if (timer->delay > timer->max_delay) - timer->delay = timer->max_delay; + } return timer->delay; } @@ -503,13 +504,10 @@ int connect_and_wait (create_connection_ctx_t *ctx) // a) success, or // b) need to requery dns int keep_trying_to_connect (create_connection_ctx_t *ctx, - int max_retry_sleep) + backoff_timer_t *backoff_timer) { - backoff_timer_t backoff_timer; int rtn; - init_backoff_timer (&backoff_timer, max_retry_sleep); - while (true) { rtn = connect_and_wait (ctx); @@ -517,7 +515,7 @@ int keep_trying_to_connect (create_connection_ctx_t *ctx, return true; if (rtn == CONN_WAIT_ACTION_RETRY) // if redirected or build_headers continue; - backoff_delay (&backoff_timer); // 3,7,15,31 .. + backoff_delay (backoff_timer); // 3,7,15,31 .. if (rtn == CONN_WAIT_RETRY_DNS) return false; //find_server again // else retry @@ -534,8 +532,11 @@ int keep_trying_to_connect (create_connection_ctx_t *ctx, int createNopollConnection(noPollCtx *ctx) { create_connection_ctx_t conn_ctx; - int max_retry_sleep; + int max_retry_count; int query_dns_status; + struct timespec connect_time,*connectTimePtr; + connectTimePtr = &connect_time; + backoff_timer_t backoff_timer; if(ctx == NULL) { return nopoll_false; @@ -545,14 +546,15 @@ int createNopollConnection(noPollCtx *ctx) ParodusInfo("Received reboot_reason as:%s\n", get_parodus_cfg()->hw_last_reboot_reason); ParodusInfo("Received reconnect_reason as:%s\n", reconnect_reason); - max_retry_sleep = (int) get_parodus_cfg()->webpa_backoff_max; - ParodusPrint("max_retry_sleep is %d\n", max_retry_sleep ); + max_retry_count = (int) get_parodus_cfg()->webpa_backoff_max; + ParodusPrint("max_retry_count is %d\n", max_retry_count ); conn_ctx.nopoll_ctx = ctx; init_expire_timer (&conn_ctx.connect_timer); init_header_info (&conn_ctx.header_info); set_extra_headers (&conn_ctx, false); set_server_list_null (&conn_ctx.server_list); + init_backoff_timer (&backoff_timer, max_retry_count); while (true) { @@ -560,7 +562,7 @@ int createNopollConnection(noPollCtx *ctx) if (query_dns_status == FIND_INVALID_DEFAULT) return nopoll_false; set_current_server (&conn_ctx); - if (keep_trying_to_connect (&conn_ctx, max_retry_sleep)) + if (keep_trying_to_connect (&conn_ctx, &backoff_timer)) break; // retry dns query } @@ -577,6 +579,11 @@ int createNopollConnection(noPollCtx *ctx) get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status); + if(get_parodus_cfg()->boot_time != 0) { + getCurrentTime(connectTimePtr); + ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time); + } + free_extra_headers (&conn_ctx); free_header_info (&conn_ctx.header_info); free_server_list (&conn_ctx.server_list); diff --git a/src/downstream.c b/src/downstream.c index c7fadcf..e3e4708 100644 --- a/src/downstream.c +++ b/src/downstream.c @@ -121,7 +121,6 @@ void listenerOnMessage(void * msg, size_t msgSize) free(destVal); - pthread_mutex_lock (get_global_client_mut()); temp = get_global_node(); //Checking for individual clients & Sending to each client @@ -141,7 +140,7 @@ void listenerOnMessage(void * msg, size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } - pthread_mutex_unlock (get_global_client_mut()); + release_global_node (); /* check Downstream dest for CRUD requests */ if(destFlag ==0 && strcmp("parodus", dest)==0) diff --git a/src/partners_check.c b/src/partners_check.c index 670687b..7aedcef 100644 --- a/src/partners_check.c +++ b/src/partners_check.c @@ -93,14 +93,18 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) { for(j = 0; jcount; j++) { - ParodusPrint("partnersList->partner_ids[%lu] = %s\n",j, partnersList->partner_ids[j]); - ParodusPrint("msg->u.event.partner_ids->partner_ids[%lu] = %s\n",i, msg->u.event.partner_ids->partner_ids[i]); - if(strcasecmp(partnersList->partner_ids[j], msg->u.event.partner_ids->partner_ids[i]) == 0) - { - ParodusInfo("partner_id match found\n"); - matchFlag = 1; - break; - } + if(NULL != partnersList->partner_ids[j]) { + ParodusPrint("partnersList->partner_ids[%lu] = %s\n",j, partnersList->partner_ids[j]); + ParodusPrint("msg->u.event.partner_ids->partner_ids[%lu] = %s\n",i, msg->u.event.partner_ids->partner_ids[i]); + if(strcasecmp(partnersList->partner_ids[j], msg->u.event.partner_ids->partner_ids[i]) == 0) + { + ParodusInfo("partner_id match found\n"); + matchFlag = 1; + break; + } + } + else + ParodusError("partner Id in partnersList is NULL but count is not 0"); } /* Commandline input partner_ids matched with partner_ids from request */ if(matchFlag == 1) @@ -154,14 +158,19 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) { for(j = 0; jcount; j++) { - ParodusPrint("partnersList->partner_ids[%lu] = %s\n",j, partnersList->partner_ids[j]); - ParodusPrint("msg->u.req.partner_ids->partner_ids[%lu] = %s\n",i, msg->u.req.partner_ids->partner_ids[i]); - if(strcasecmp(partnersList->partner_ids[j], msg->u.req.partner_ids->partner_ids[i]) == 0) - { - ParodusInfo("partner_id match found\n"); - matchFlag = 1; - break; - } + if(NULL != partnersList->partner_ids[j]) + { + ParodusPrint("partnersList->partner_ids[%lu] = %s\n",j, partnersList->partner_ids[j]); + ParodusPrint("msg->u.req.partner_ids->partner_ids[%lu] = %s\n",i, msg->u.req.partner_ids->partner_ids[i]); + if(strcasecmp(partnersList->partner_ids[j], msg->u.req.partner_ids->partner_ids[i]) == 0) + { + ParodusInfo("partner_id match found\n"); + matchFlag = 1; + break; + } + } + else + ParodusError("partner Id in partnersList is NULL but count is not 0"); } } @@ -173,7 +182,7 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) { for(j=0; jcount; j++) { - if(partnersList->partner_ids[j] != NULL) + if(NULL != partnersList->partner_ids[j]) { free(partnersList->partner_ids[j]); } @@ -193,7 +202,7 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) { for(j=0; jcount; j++) { - if(partnersList->partner_ids[j] != NULL) + if(NULL != partnersList->partner_ids[j]) { free(partnersList->partner_ids[j]); } diff --git a/src/service_alive.c b/src/service_alive.c index 96dfc3c..314634d 100644 --- a/src/service_alive.c +++ b/src/service_alive.c @@ -58,11 +58,10 @@ void *serviceAliveTask() while(1) { ParodusPrint("serviceAliveTask: numOfClients registered is %d\n", get_numOfClients()); + temp = get_global_node(); if(get_numOfClients() > 0) { //sending svc msg to all the clients every 30s - pthread_mutex_lock (get_global_client_mut()); - temp = get_global_node(); size = (size_t) nbytes; while(NULL != temp) { @@ -92,13 +91,13 @@ void *serviceAliveTask() temp= temp->next; } } - pthread_mutex_unlock (get_global_client_mut()); - + release_global_node (); ParodusPrint("Waiting for 30s to send keep alive msg \n"); sleep(KEEPALIVE_INTERVAL_SEC); } else { + release_global_node (); ParodusInfo("No clients are registered, waiting ..\n"); sleep(50); } diff --git a/src/upstream.c b/src/upstream.c index f220249..449ce46 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -226,12 +226,11 @@ void *processUpstreamMessage() { ParodusInfo("\n Nanomsg client Registration for Upstream\n"); //Extract serviceName and url & store it in a linked list for reg_clients - pthread_mutex_lock (get_global_client_mut()); + temp = get_global_node(); if(get_numOfClients() !=0) { matchFlag = 0; ParodusPrint("matchFlag reset to %d\n", matchFlag); - temp = get_global_node(); while(temp!=NULL) { if(strcmp(temp->service_name, msg->u.reg.service_name)==0) @@ -290,7 +289,7 @@ void *processUpstreamMessage() ParodusPrint("sent auth status to reg client\n"); } } - pthread_mutex_unlock (get_global_client_mut()); + release_global_node (); } else if(msgType == WRP_MSG_TYPE__EVENT) { @@ -347,48 +346,55 @@ void *processUpstreamMessage() Expecting dest format as mac:xxxxxxxxxxxx/parodus/cloud-status Parse dest field and check destService is "parodus" and destApplication is "cloud-status" */ - if(macId != NULL && destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0) - { - retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); - memset(retrieve_msg, 0, sizeof(wrp_msg_t)); - retrieve_msg->msg_type = msg->msg_type; - retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid); - retrieve_msg->u.crud.source = strdup(msg->u.crud.source); - retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest); - addCRUDmsgToQueue(retrieve_msg); - } - else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0) - { - /* Handle cloud-status retrieve response here to send it to registered client - Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac: - Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status" - */ - serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST); - if ( serviceName != NULL) + if(macId != NULL) + { + if(destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0) { - //Send Client cloud-status response back to registered client - ParodusInfo("Sending cloud-status response to %s client\n",serviceName); - sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len); - if(sendStatus ==1) + retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); + memset(retrieve_msg, 0, sizeof(wrp_msg_t)); + retrieve_msg->msg_type = msg->msg_type; + retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid); + retrieve_msg->u.crud.source = strdup(msg->u.crud.source); + retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest); + addCRUDmsgToQueue(retrieve_msg); + } + else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0) + { + /* Handle cloud-status retrieve response here to send it to registered client + Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac: + Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status" + */ + serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST); + if ( serviceName != NULL) { - ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName); + //Send Client cloud-status response back to registered client + ParodusInfo("Sending cloud-status response to %s client\n",serviceName); + sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len); + if(sendStatus ==1) + { + ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName); + } + else + { + ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName); + } + free(serviceName); + serviceName = NULL; } else { - ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName); + ParodusError("serviceName is NULL,not sending cloud-status response to client\n"); } - free(serviceName); - serviceName = NULL; } else { - ParodusError("serviceName is NULL,not sending cloud-status response to client\n"); + ParodusInfo("sendUpstreamMsgToServer \n"); + sendUpstreamMsgToServer(&message->msg, message->len); } } else { - ParodusInfo("sendUpstreamMsgToServer \n"); - sendUpstreamMsgToServer(&message->msg, message->len); + ParodusError("MAC is null, not handling retrieve wrp message \n"); } if(sourceService !=NULL) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d067352..699e38a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -219,8 +219,7 @@ target_link_libraries (test_upstream -lcmocka gcov -lcunit -lcimplog # test_downstream #------------------------------------------------------------------------------- add_test(NAME test_downstream COMMAND ${MEMORY_CHECK} ./test_downstream) -add_executable(test_downstream test_downstream.c ../src/downstream.c - ../src/string_helpers.c) +add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c) target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog -lwrp-c -luuid -lpthread -lmsgpackc -lnopoll -Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64 diff --git a/tests/test_client_list.c b/tests/test_client_list.c index c7fcc48..ed6cad4 100644 --- a/tests/test_client_list.c +++ b/tests/test_client_list.c @@ -73,7 +73,7 @@ void test_client_addtolist() CU_ASSERT_STRING_EQUAL( temp->service_name, message->u.reg.service_name ); CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url ); } - + release_global_node (); wrp_free_struct(message); free(bytes); ParodusInfo("test_client_addtolist done..\n"); @@ -206,7 +206,7 @@ void test_addtolist_multiple_clients() CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url ); } - + release_global_node (); wrp_free_struct(message); free(bytes); ParodusInfo("test_addtolist_multiple_clients done..\n"); diff --git a/tests/test_connection.c b/tests/test_connection.c index 6cde3fc..0aace96 100644 --- a/tests/test_connection.c +++ b/tests/test_connection.c @@ -36,7 +36,7 @@ extern server_t *get_current_server (server_list_t *server_list); extern int parse_server_url (const char *full_url, server_t *server); extern void init_expire_timer (expire_timer_t *timer); extern int check_timer_expired (expire_timer_t *timer, long timeout_ms); -extern void init_backoff_timer (backoff_timer_t *timer, int max_delay); +extern void init_backoff_timer (backoff_timer_t *timer, int max_count); extern int update_backoff_delay (backoff_timer_t *timer); extern int init_header_info (header_info_t *header_info); extern void free_header_info (header_info_t *header_info); @@ -51,7 +51,7 @@ extern int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6); extern int wait_connection_ready (create_connection_ctx_t *ctx); extern int connect_and_wait (create_connection_ctx_t *ctx); extern int keep_trying_to_connect (create_connection_ctx_t *ctx, - int max_retry_sleep); + backoff_timer_t *backoff_timer); /*----------------------------------------------------------------------------*/ @@ -301,12 +301,12 @@ void test_expire_timer() void test_backoff_delay_timer() { backoff_timer_t btimer; - init_backoff_timer (&btimer, 30); + init_backoff_timer (&btimer, 5); assert_int_equal (3, update_backoff_delay (&btimer)); assert_int_equal (7, update_backoff_delay (&btimer)); assert_int_equal (15, update_backoff_delay (&btimer)); - assert_int_equal (30, update_backoff_delay (&btimer)); - assert_int_equal (30, update_backoff_delay (&btimer)); + assert_int_equal (31, update_backoff_delay (&btimer)); + assert_int_equal (31, update_backoff_delay (&btimer)); } @@ -791,6 +791,7 @@ void test_keep_trying () create_connection_ctx_t ctx; noPollCtx test_nopoll_ctx; server_t test_server; + backoff_timer_t backoff_timer; ParodusCfg Cfg; char *test_extra_headers = "\r\nAuthorization: Bearer SER_MAC Fer23u948590 123567892366" @@ -822,7 +823,8 @@ void test_keep_trying () expect_function_call (nopoll_conn_is_ok); will_return (nopoll_conn_wait_for_status_until_connection_ready, nopoll_true); expect_function_call (nopoll_conn_wait_for_status_until_connection_ready); - rtn = keep_trying_to_connect (&ctx, 30); + init_backoff_timer (&backoff_timer, 5); + rtn = keep_trying_to_connect (&ctx, &backoff_timer); assert_int_equal (rtn, true); test_server.allow_insecure = 0; @@ -845,7 +847,8 @@ void test_keep_trying () expect_function_call (nopoll_conn_is_ok); will_return (nopoll_conn_wait_for_status_until_connection_ready, nopoll_true); expect_function_call (nopoll_conn_wait_for_status_until_connection_ready); - rtn = keep_trying_to_connect (&ctx, 30); + init_backoff_timer (&backoff_timer, 5); + rtn = keep_trying_to_connect (&ctx, &backoff_timer); assert_int_equal (rtn, true); will_return (nopoll_conn_tls_new, &connection1); @@ -866,7 +869,8 @@ void test_keep_trying () expect_function_call (nopoll_conn_wait_for_status_until_connection_ready); will_return (nopoll_conn_ref_count, 0); expect_function_call (nopoll_conn_ref_count); - rtn = keep_trying_to_connect (&ctx, 30); + init_backoff_timer (&backoff_timer, 5); + rtn = keep_trying_to_connect (&ctx, &backoff_timer); assert_int_equal (rtn, false); mock_wait_status = 0; @@ -874,7 +878,8 @@ void test_keep_trying () expect_function_call (nopoll_conn_tls_new); will_return (checkHostIp, 0); expect_function_call (checkHostIp); - rtn = keep_trying_to_connect (&ctx, 30); + init_backoff_timer (&backoff_timer, 5); + rtn = keep_trying_to_connect (&ctx, &backoff_timer); assert_int_equal (rtn, false); mock_wait_status = 302; @@ -888,7 +893,8 @@ void test_keep_trying () expect_function_call (nopoll_conn_wait_for_status_until_connection_ready); will_return (nopoll_conn_ref_count, 0); expect_function_call (nopoll_conn_ref_count); - rtn = keep_trying_to_connect (&ctx, 30); + init_backoff_timer (&backoff_timer, 5); + rtn = keep_trying_to_connect (&ctx, &backoff_timer); assert_int_equal (rtn, false); } diff --git a/tests/test_downstream.c b/tests/test_downstream.c index c313883..4d38972 100755 --- a/tests/test_downstream.c +++ b/tests/test_downstream.c @@ -33,7 +33,6 @@ ParodusMsg *ParodusMsgQ; pthread_mutex_t g_mutex; pthread_cond_t g_cond; -pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; int crud_test = 0; /*----------------------------------------------------------------------------*/ /* Mocks */ @@ -62,9 +61,8 @@ reg_list_item_t * get_global_node(void) return mock_ptr_type(reg_list_item_t *); } -pthread_mutex_t *get_global_client_mut(void) +void release_global_node (void) { - return &client_mut; } ssize_t wrp_to_struct( const void *bytes, const size_t length, diff --git a/tests/test_downstream_more.c b/tests/test_downstream_more.c index 820a4b0..7fb63a0 100644 --- a/tests/test_downstream_more.c +++ b/tests/test_downstream_more.c @@ -40,8 +40,6 @@ typedef struct { /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ -pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; - static test_t tests[] = { { .s.msg_type = WRP_MSG_TYPE__CREATE, @@ -246,9 +244,8 @@ reg_list_item_t *get_global_node(void) return NULL; } -pthread_mutex_t *get_global_client_mut(void) +void release_global_node (void) { - return &client_mut; } void wrp_free_struct( wrp_msg_t *msg ) diff --git a/tests/test_upstream.c b/tests/test_upstream.c index 97a2eae..7c2d747 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -45,7 +45,6 @@ int numLoops = 1; wrp_msg_t *temp = NULL; extern pthread_mutex_t nano_mut; extern pthread_cond_t nano_con; -pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; static int crud_test = 0; /*----------------------------------------------------------------------------*/ /* Mocks */ @@ -67,17 +66,16 @@ reg_list_item_t * get_global_node(void) return mock_ptr_type(reg_list_item_t *); } +void release_global_node (void) +{ +} + int get_numOfClients() { function_called(); return (int)mock(); } -pthread_mutex_t *get_global_client_mut(void) -{ - return &client_mut; -} - void addCRUDmsgToQueue(wrp_msg_t *crudMsg) { (void)crudMsg; @@ -420,12 +418,12 @@ void test_processUpstreamMessageRegMsg() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); - will_return(get_numOfClients, 1); - expect_function_call(get_numOfClients); - will_return(get_global_node, (intptr_t)head); expect_function_call(get_global_node); + will_return(get_numOfClients, 1); + expect_function_call(get_numOfClients); + will_return(nn_shutdown, 1); expect_function_call(nn_shutdown); @@ -479,6 +477,9 @@ void test_processUpstreamMessageRegMsgNoClients() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); + will_return(get_global_node, (intptr_t)head); + expect_function_call(get_global_node); + will_return(get_numOfClients, 0); expect_function_call(get_numOfClients); @@ -576,12 +577,12 @@ void err_processUpstreamMessageRegMsg() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); - will_return(get_numOfClients, 1); - expect_function_call(get_numOfClients); - will_return(get_global_node, (intptr_t)head); expect_function_call(get_global_node); + will_return(get_numOfClients, 1); + expect_function_call(get_numOfClients); + will_return(nn_shutdown, -1); expect_function_call(nn_shutdown); @@ -768,6 +769,81 @@ void test_processUpstreamMsg_sendToClient() free(UpStreamMsgQ); UpStreamMsgQ = NULL; } +void test_processUpstreamMessageNullCheck() +{ + numLoops = 1; + metaPackSize = 20; + UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->msg = strdup("First Message"); + UpStreamMsgQ->len = 13; + UpStreamMsgQ->next= NULL; + temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset(temp,0,sizeof(wrp_msg_t)); + temp->msg_type = WRP_MSG_TYPE__RETREIVE; + temp->u.crud.dest = strdup("mac:14cfe2142xxx/parodus/cloud-status"); + temp->u.crud.source = strdup("mac:14cfe2142xxx/config"); + temp->u.crud.transaction_uuid = strdup("123"); + will_return(wrp_to_struct, 12); + expect_function_call(wrp_to_struct); + expect_function_call(addCRUDmsgToQueue); + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); + expect_function_call(wrp_free_struct); + processUpstreamMessage(); + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; +} +void err_processUpstreamMessageNullCheck() +{ + numLoops = 1; + metaPackSize = 20; + UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->msg = strdup("First Message"); + UpStreamMsgQ->len = 13; + UpStreamMsgQ->next= NULL; + temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset(temp,0,sizeof(wrp_msg_t)); + temp->msg_type = WRP_MSG_TYPE__RETREIVE; + temp->u.crud.dest = strdup("mac:/parodus/cloud-status"); + temp->u.crud.source = strdup("mac:14cfe2142xxx/config"); + temp->u.crud.transaction_uuid = strdup("123"); + will_return(wrp_to_struct, 12); + expect_function_call(wrp_to_struct); + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); + expect_function_call(wrp_free_struct); + + processUpstreamMessage(); + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; +} +void err_processUpstreamMessageWithoutMac() +{ + numLoops = 1; + metaPackSize = 20; + UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->msg = strdup("First Message"); + UpStreamMsgQ->len = 13; + UpStreamMsgQ->next= NULL; + temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset(temp,0,sizeof(wrp_msg_t)); + temp->msg_type = WRP_MSG_TYPE__RETREIVE; + temp->u.crud.dest = strdup("/parodus/cloud-status"); + temp->u.crud.source = strdup("mac:14cfe2142xxx/config"); + temp->u.crud.transaction_uuid = strdup("123"); + will_return(wrp_to_struct, 12); + expect_function_call(wrp_to_struct); + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); + expect_function_call(wrp_free_struct); + + processUpstreamMessage(); + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; +} /*----------------------------------------------------------------------------*/ /* External Functions */ @@ -801,6 +877,9 @@ int main(void) cmocka_unit_test(test_processUpstreamMsgCrud_nnfree), cmocka_unit_test(test_processUpstreamMsg_cloud_status), cmocka_unit_test(test_processUpstreamMsg_sendToClient), + cmocka_unit_test(test_processUpstreamMessageNullCheck), + cmocka_unit_test(err_processUpstreamMessageNullCheck), + cmocka_unit_test(err_processUpstreamMessageWithoutMac), }; return cmocka_run_group_tests(tests, NULL, NULL); From 97da13ffcc34e2a23c8359d1117ef8220cb8e1ed Mon Sep 17 00:00:00 2001 From: Bill Williams Date: Thu, 13 Dec 2018 14:07:06 -0800 Subject: [PATCH 4/5] put mutex lock into get_global_node --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e965175..fc3e6ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - backoff retry to include find_servers in loop (connection.c) - backoff max is max count not max delay - used mutex protection to make client list and nn_sends thread safe - +- put mutex lock into get_global_node ## [1.0.1] - 2018-07-18 ### Added From a2d7dfeb95e533a1d6e1f357109c80275f2988b4 Mon Sep 17 00:00:00 2001 From: Shilpa Seshadri Date: Mon, 17 Dec 2018 23:21:04 -0800 Subject: [PATCH 5/5] Log time difference in connect time and boot time only during boot time and not during reconnect --- src/connection.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/connection.c b/src/connection.c index 52b47fd..eb095e1 100644 --- a/src/connection.c +++ b/src/connection.c @@ -46,6 +46,7 @@ static char *reconnect_reason = "webpa_process_starts"; static int cloud_disconnect_max_time = 5; static noPollConn *g_conn = NULL; static bool LastReasonStatus = false; +static int init = 1; static noPollConnOpts * createConnOpts (char * extra_headers, bool secure); static char* build_extra_headers( const char *auth, const char *device_id, const char *user_agent, const char *convey ); @@ -579,9 +580,10 @@ int createNopollConnection(noPollCtx *ctx) get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status); - if(get_parodus_cfg()->boot_time != 0) { + if((get_parodus_cfg()->boot_time != 0) && init) { getCurrentTime(connectTimePtr); ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time); + init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect } free_extra_headers (&conn_ctx);