Merge branch 'master' into untask_svc_alive

This commit is contained in:
shilpa24balaji
2018-12-18 14:08:50 -08:00
committed by GitHub
11 changed files with 49 additions and 13 deletions

View File

@@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- added NULL check for device mac id in upstream retrieve message handling
- backoff retry to include find_servers in loop (connection.c)
- backoff max is max count not max delay
- used mutex protection to make client list and nn_sends thread safe
- put mutex lock into get_global_node
- change svc alive from a thread to a function called every 30 sec from main
## [1.0.1] - 2018-07-18

View File

@@ -30,6 +30,7 @@
/*----------------------------------------------------------------------------*/
static int numOfClients = 0;
static reg_list_item_t * g_head = NULL;
pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER;
/*----------------------------------------------------------------------------*/
/* External functions */
@@ -37,9 +38,16 @@ static reg_list_item_t * g_head = NULL;
reg_list_item_t * get_global_node(void)
{
pthread_mutex_lock (&client_mut);
return g_head;
}
void release_global_node (void)
{
pthread_mutex_unlock (&client_mut);
}
int get_numOfClients()
{
return numOfClients;
@@ -52,6 +60,7 @@ int addToList( wrp_msg_t **msg)
int rc = -1;
int sock;
int retStatus = -1;
sock = nn_socket( AF_SP, NN_PUSH );
ParodusPrint("sock created for adding entries to list: %d\n", sock);
if(sock >= 0)
@@ -214,7 +223,7 @@ int deleteFromList(char* service_name)
curr_node = NULL;
ParodusInfo("Deleted successfully and returning..\n");
numOfClients =numOfClients - 1;
ParodusPrint("numOfClients after delte is %d\n", numOfClients);
ParodusPrint("numOfClients after delete is %d\n", numOfClients);
return 0;
}
@@ -243,6 +252,7 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize)
if( strcmp(dest, temp->service_name) == 0)
{
bytes = nn_send(temp->sock, *Msg, msgSize, 0);
release_global_node ();
ParodusInfo("sent downstream message to reg_client '%s'\n", temp->url);
ParodusPrint("downstream bytes sent:%d\n", bytes);
return 1;
@@ -250,5 +260,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize)
ParodusPrint("checking the next item in the list\n");
temp= temp->next;
}
release_global_node ();
return 0;
}

View File

@@ -53,6 +53,7 @@ int get_numOfClients();
int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize);
reg_list_item_t * get_global_node(void);
void release_global_node (void);
#ifdef __cplusplus
}

View File

@@ -46,6 +46,7 @@ static char *reconnect_reason = "webpa_process_starts";
static int cloud_disconnect_max_time = 5;
static noPollConn *g_conn = NULL;
static bool LastReasonStatus = false;
static int init = 1;
static noPollConnOpts * createConnOpts (char * extra_headers, bool secure);
static char* build_extra_headers( const char *auth, const char *device_id,
const char *user_agent, const char *convey );
@@ -579,9 +580,10 @@ int createNopollConnection(noPollCtx *ctx)
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status);
if(get_parodus_cfg()->boot_time != 0) {
if((get_parodus_cfg()->boot_time != 0) && init) {
getCurrentTime(connectTimePtr);
ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time);
init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect
}
free_extra_headers (&conn_ctx);

View File

@@ -120,7 +120,8 @@ void listenerOnMessage(void * msg, size_t msgSize)
((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid)));
free(destVal);
temp = get_global_node();
temp = get_global_node();
//Checking for individual clients & Sending to each client
while (NULL != temp)
@@ -139,6 +140,7 @@ void listenerOnMessage(void * msg, size_t msgSize)
ParodusPrint("checking the next item in the list\n");
temp= temp->next;
}
release_global_node ();
/* check Downstream dest for CRUD requests */
if(destFlag ==0 && strcmp("parodus", dest)==0)

View File

@@ -57,10 +57,10 @@ int serviceAliveTask()
else
{
ParodusPrint("serviceAliveTask: numOfClients registered is %d\n", get_numOfClients());
temp = get_global_node();
if(get_numOfClients() > 0)
{
//sending svc msg to all the clients every 30s
temp = get_global_node();
size = (size_t) nbytes;
while(NULL != temp)
{
@@ -90,10 +90,12 @@ int serviceAliveTask()
temp= temp->next;
}
}
release_global_node ();
ParodusPrint("Waiting for 30s to send keep alive msg \n");
}
else
{
release_global_node ();
ParodusInfo("No clients are registered, waiting ..\n");
}
}

View File

@@ -226,11 +226,11 @@ void *processUpstreamMessage()
{
ParodusInfo("\n Nanomsg client Registration for Upstream\n");
//Extract serviceName and url & store it in a linked list for reg_clients
temp = get_global_node();
if(get_numOfClients() !=0)
{
matchFlag = 0;
ParodusPrint("matchFlag reset to %d\n", matchFlag);
temp = get_global_node();
while(temp!=NULL)
{
if(strcmp(temp->service_name, msg->u.reg.service_name)==0)
@@ -289,6 +289,7 @@ void *processUpstreamMessage()
ParodusPrint("sent auth status to reg client\n");
}
}
release_global_node ();
}
else if(msgType == WRP_MSG_TYPE__EVENT)
{

View File

@@ -73,7 +73,7 @@ void test_client_addtolist()
CU_ASSERT_STRING_EQUAL( temp->service_name, message->u.reg.service_name );
CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url );
}
release_global_node ();
wrp_free_struct(message);
free(bytes);
ParodusInfo("test_client_addtolist done..\n");
@@ -206,7 +206,7 @@ void test_addtolist_multiple_clients()
CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url );
}
release_global_node ();
wrp_free_struct(message);
free(bytes);
ParodusInfo("test_addtolist_multiple_clients done..\n");

View File

@@ -61,6 +61,10 @@ reg_list_item_t * get_global_node(void)
return mock_ptr_type(reg_list_item_t *);
}
void release_global_node (void)
{
}
ssize_t wrp_to_struct( const void *bytes, const size_t length,
const enum wrp_format fmt, wrp_msg_t **msg )
{

View File

@@ -244,6 +244,10 @@ reg_list_item_t *get_global_node(void)
return NULL;
}
void release_global_node (void)
{
}
void wrp_free_struct( wrp_msg_t *msg )
{
if( WRP_MSG_TYPE__EVENT == tests[i].s.msg_type ) {

View File

@@ -66,6 +66,10 @@ reg_list_item_t * get_global_node(void)
return mock_ptr_type(reg_list_item_t *);
}
void release_global_node (void)
{
}
int get_numOfClients()
{
function_called();
@@ -414,12 +418,12 @@ void test_processUpstreamMessageRegMsg()
will_return(wrp_to_struct, 12);
expect_function_call(wrp_to_struct);
will_return(get_numOfClients, 1);
expect_function_call(get_numOfClients);
will_return(get_global_node, (intptr_t)head);
expect_function_call(get_global_node);
will_return(get_numOfClients, 1);
expect_function_call(get_numOfClients);
will_return(nn_shutdown, 1);
expect_function_call(nn_shutdown);
@@ -473,6 +477,9 @@ void test_processUpstreamMessageRegMsgNoClients()
will_return(wrp_to_struct, 12);
expect_function_call(wrp_to_struct);
will_return(get_global_node, (intptr_t)head);
expect_function_call(get_global_node);
will_return(get_numOfClients, 0);
expect_function_call(get_numOfClients);
@@ -570,12 +577,12 @@ void err_processUpstreamMessageRegMsg()
will_return(wrp_to_struct, 12);
expect_function_call(wrp_to_struct);
will_return(get_numOfClients, 1);
expect_function_call(get_numOfClients);
will_return(get_global_node, (intptr_t)head);
expect_function_call(get_global_node);
will_return(get_numOfClients, 1);
expect_function_call(get_numOfClients);
will_return(nn_shutdown, -1);
expect_function_call(nn_shutdown);