diff --git a/CHANGELOG.md b/CHANGELOG.md index 33c3879..4b5cb9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - added NULL check for device mac id in upstream retrieve message handling - backoff retry to include find_servers in loop (connection.c) - backoff max is max count not max delay +- used mutex protection to make client list and nn_sends thread safe +- put mutex lock into get_global_node - change svc alive from a thread to a function called every 30 sec from main ## [1.0.1] - 2018-07-18 diff --git a/src/client_list.c b/src/client_list.c index 7012c52..f519a75 100644 --- a/src/client_list.c +++ b/src/client_list.c @@ -30,6 +30,7 @@ /*----------------------------------------------------------------------------*/ static int numOfClients = 0; static reg_list_item_t * g_head = NULL; +pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER; /*----------------------------------------------------------------------------*/ /* External functions */ @@ -37,9 +38,16 @@ static reg_list_item_t * g_head = NULL; reg_list_item_t * get_global_node(void) { + pthread_mutex_lock (&client_mut); return g_head; } +void release_global_node (void) +{ + pthread_mutex_unlock (&client_mut); +} + + int get_numOfClients() { return numOfClients; @@ -52,6 +60,7 @@ int addToList( wrp_msg_t **msg) int rc = -1; int sock; int retStatus = -1; + sock = nn_socket( AF_SP, NN_PUSH ); ParodusPrint("sock created for adding entries to list: %d\n", sock); if(sock >= 0) @@ -214,7 +223,7 @@ int deleteFromList(char* service_name) curr_node = NULL; ParodusInfo("Deleted successfully and returning..\n"); numOfClients =numOfClients - 1; - ParodusPrint("numOfClients after delte is %d\n", numOfClients); + ParodusPrint("numOfClients after delete is %d\n", numOfClients); return 0; } @@ -243,6 +252,7 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) if( strcmp(dest, temp->service_name) == 0) { bytes = nn_send(temp->sock, *Msg, msgSize, 0); + release_global_node (); ParodusInfo("sent downstream message to reg_client '%s'\n", temp->url); ParodusPrint("downstream bytes sent:%d\n", bytes); return 1; @@ -250,5 +260,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } + release_global_node (); return 0; } diff --git a/src/client_list.h b/src/client_list.h index f3f2a33..5d58f57 100644 --- a/src/client_list.h +++ b/src/client_list.h @@ -53,6 +53,7 @@ int get_numOfClients(); int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize); reg_list_item_t * get_global_node(void); +void release_global_node (void); #ifdef __cplusplus } diff --git a/src/connection.c b/src/connection.c index 52b47fd..eb095e1 100644 --- a/src/connection.c +++ b/src/connection.c @@ -46,6 +46,7 @@ static char *reconnect_reason = "webpa_process_starts"; static int cloud_disconnect_max_time = 5; static noPollConn *g_conn = NULL; static bool LastReasonStatus = false; +static int init = 1; static noPollConnOpts * createConnOpts (char * extra_headers, bool secure); static char* build_extra_headers( const char *auth, const char *device_id, const char *user_agent, const char *convey ); @@ -579,9 +580,10 @@ int createNopollConnection(noPollCtx *ctx) get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status); - if(get_parodus_cfg()->boot_time != 0) { + if((get_parodus_cfg()->boot_time != 0) && init) { getCurrentTime(connectTimePtr); ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time); + init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect } free_extra_headers (&conn_ctx); diff --git a/src/downstream.c b/src/downstream.c index aabb327..e3e4708 100644 --- a/src/downstream.c +++ b/src/downstream.c @@ -120,7 +120,8 @@ void listenerOnMessage(void * msg, size_t msgSize) ((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid))); free(destVal); - temp = get_global_node(); + + temp = get_global_node(); //Checking for individual clients & Sending to each client while (NULL != temp) @@ -139,6 +140,7 @@ void listenerOnMessage(void * msg, size_t msgSize) ParodusPrint("checking the next item in the list\n"); temp= temp->next; } + release_global_node (); /* check Downstream dest for CRUD requests */ if(destFlag ==0 && strcmp("parodus", dest)==0) diff --git a/src/service_alive.c b/src/service_alive.c index bacf55b..74cd02c 100644 --- a/src/service_alive.c +++ b/src/service_alive.c @@ -57,10 +57,10 @@ int serviceAliveTask() else { ParodusPrint("serviceAliveTask: numOfClients registered is %d\n", get_numOfClients()); + temp = get_global_node(); if(get_numOfClients() > 0) { //sending svc msg to all the clients every 30s - temp = get_global_node(); size = (size_t) nbytes; while(NULL != temp) { @@ -90,10 +90,12 @@ int serviceAliveTask() temp= temp->next; } } + release_global_node (); ParodusPrint("Waiting for 30s to send keep alive msg \n"); } else { + release_global_node (); ParodusInfo("No clients are registered, waiting ..\n"); } } diff --git a/src/upstream.c b/src/upstream.c index f494cdf..449ce46 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -226,11 +226,11 @@ void *processUpstreamMessage() { ParodusInfo("\n Nanomsg client Registration for Upstream\n"); //Extract serviceName and url & store it in a linked list for reg_clients + temp = get_global_node(); if(get_numOfClients() !=0) { matchFlag = 0; ParodusPrint("matchFlag reset to %d\n", matchFlag); - temp = get_global_node(); while(temp!=NULL) { if(strcmp(temp->service_name, msg->u.reg.service_name)==0) @@ -289,6 +289,7 @@ void *processUpstreamMessage() ParodusPrint("sent auth status to reg client\n"); } } + release_global_node (); } else if(msgType == WRP_MSG_TYPE__EVENT) { diff --git a/tests/test_client_list.c b/tests/test_client_list.c index c7fcc48..ed6cad4 100644 --- a/tests/test_client_list.c +++ b/tests/test_client_list.c @@ -73,7 +73,7 @@ void test_client_addtolist() CU_ASSERT_STRING_EQUAL( temp->service_name, message->u.reg.service_name ); CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url ); } - + release_global_node (); wrp_free_struct(message); free(bytes); ParodusInfo("test_client_addtolist done..\n"); @@ -206,7 +206,7 @@ void test_addtolist_multiple_clients() CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url ); } - + release_global_node (); wrp_free_struct(message); free(bytes); ParodusInfo("test_addtolist_multiple_clients done..\n"); diff --git a/tests/test_downstream.c b/tests/test_downstream.c index fa239b6..4d38972 100755 --- a/tests/test_downstream.c +++ b/tests/test_downstream.c @@ -61,6 +61,10 @@ reg_list_item_t * get_global_node(void) return mock_ptr_type(reg_list_item_t *); } +void release_global_node (void) +{ +} + ssize_t wrp_to_struct( const void *bytes, const size_t length, const enum wrp_format fmt, wrp_msg_t **msg ) { diff --git a/tests/test_downstream_more.c b/tests/test_downstream_more.c index cfdef5e..7fb63a0 100644 --- a/tests/test_downstream_more.c +++ b/tests/test_downstream_more.c @@ -244,6 +244,10 @@ reg_list_item_t *get_global_node(void) return NULL; } +void release_global_node (void) +{ +} + void wrp_free_struct( wrp_msg_t *msg ) { if( WRP_MSG_TYPE__EVENT == tests[i].s.msg_type ) { diff --git a/tests/test_upstream.c b/tests/test_upstream.c index 824aa9a..7c2d747 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -66,6 +66,10 @@ reg_list_item_t * get_global_node(void) return mock_ptr_type(reg_list_item_t *); } +void release_global_node (void) +{ +} + int get_numOfClients() { function_called(); @@ -414,12 +418,12 @@ void test_processUpstreamMessageRegMsg() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); - will_return(get_numOfClients, 1); - expect_function_call(get_numOfClients); - will_return(get_global_node, (intptr_t)head); expect_function_call(get_global_node); + will_return(get_numOfClients, 1); + expect_function_call(get_numOfClients); + will_return(nn_shutdown, 1); expect_function_call(nn_shutdown); @@ -473,6 +477,9 @@ void test_processUpstreamMessageRegMsgNoClients() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); + will_return(get_global_node, (intptr_t)head); + expect_function_call(get_global_node); + will_return(get_numOfClients, 0); expect_function_call(get_numOfClients); @@ -570,12 +577,12 @@ void err_processUpstreamMessageRegMsg() will_return(wrp_to_struct, 12); expect_function_call(wrp_to_struct); - will_return(get_numOfClients, 1); - expect_function_call(get_numOfClients); - will_return(get_global_node, (intptr_t)head); expect_function_call(get_global_node); + will_return(get_numOfClients, 1); + expect_function_call(get_numOfClients); + will_return(nn_shutdown, -1); expect_function_call(nn_shutdown);