Merge pull request #278 from sadhyama/master

Refactored Upstream Retrieve flow
This commit is contained in:
shilpa24balaji
2019-01-25 12:33:32 -08:00
committed by GitHub
4 changed files with 219 additions and 76 deletions

View File

@@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Fixed crash in CRUD request processing
- Fixed issue on RETRIEVE respone processing
- Enabled valgrind
- Refactored Upsteam RETRIEVE flow
## [1.0.1] - 2018-07-18
### Added

View File

@@ -34,7 +34,7 @@
/* Macros */
/*----------------------------------------------------------------------------*/
#define METADATA_COUNT 12
#define CLOUD_STATUS_FORMAT "parodus/cloud-status"
#define PARODUS_SERVICE_NAME "parodus"
/*----------------------------------------------------------------------------*/
/* File Scoped Variables */
/*----------------------------------------------------------------------------*/
@@ -210,10 +210,10 @@ void *processUpstreamMessage()
reg_list_item_t *temp = NULL;
int matchFlag = 0;
int status = -1;
char *serviceName = NULL;
char *destService, *destApplication =NULL;
char *sourceService, *sourceApplication =NULL;
int sendStatus =-1;
char *device_id = NULL;
size_t device_id_len = 0;
size_t parodus_len;
int ret = -1;
while(FOREVER())
{
@@ -360,79 +360,46 @@ void *processUpstreamMessage()
else
{
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s status: %d\n",msgType, msg->u.crud.dest, msg->u.crud.transaction_uuid, msg->u.crud.status );
if(WRP_MSG_TYPE__RETREIVE == msgType && msg->u.crud.dest !=NULL && msg->u.crud.source != NULL)
if(WRP_MSG_TYPE__RETREIVE == msgType)
{
destService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
destApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, DEST);
sourceService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, SOURCE);
sourceApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, SOURCE);
/* Handle cloud-status retrieve request here
Expecting dest format as mac:xxxxxxxxxxxx/parodus/cloud-status
Parse dest field and check destService is "parodus" and destApplication is "cloud-status"
*/
if(destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0)
{
retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) );
memset(retrieve_msg, 0, sizeof(wrp_msg_t));
retrieve_msg->msg_type = msg->msg_type;
retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid);
retrieve_msg->u.crud.source = strdup(msg->u.crud.source);
retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest);
addCRUDmsgToQueue(retrieve_msg);
}
else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0)
{
/* Handle cloud-status retrieve response here to send it to registered client
Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac:
Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status"
*/
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
if ( serviceName != NULL)
ret = getDeviceId(&device_id, &device_id_len);
if(ret == 0)
{
//Send Client cloud-status response back to registered client
ParodusInfo("Sending cloud-status response to %s client\n",serviceName);
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len);
if(sendStatus ==1)
ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len);
/* Match dest based on device_id. Check dest start with: "mac:112233445xxx" ? */
if( 0 == strncasecmp(device_id, msg->u.crud.dest, device_id_len-1) )
{
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
/* For this device. */
parodus_len = strlen( PARODUS_SERVICE_NAME );
if( 0 == strncmp(PARODUS_SERVICE_NAME, &msg->u.crud.dest[device_id_len], parodus_len-1) )
{
/* For Parodus CRUD queue. */
ParodusInfo("Create RetrieveMsg and add to parodus CRUD queue\n");
createUpstreamRetrieveMsg(msg, &retrieve_msg);
addCRUDmsgToQueue(retrieve_msg);
}
else
{
/* For nanomsg clients. */
getServiceNameAndSendResponse(msg, &message->msg, message->len);
}
}
else
{
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
/* Not for this device. Send upstream */
ParodusInfo("sendUpstreamMsgToServer \n");
sendUpstreamMsgToServer(&message->msg, message->len);
}
if(device_id != NULL)
{
free(device_id);
device_id = NULL;
}
free(serviceName);
serviceName = NULL;
}
else
{
ParodusError("serviceName is NULL,not sending cloud-status response to client\n");
ParodusError("Failed to get device_id\n");
}
}
else
{
ParodusInfo("sendUpstreamMsgToServer \n");
sendUpstreamMsgToServer(&message->msg, message->len);
}
if(sourceService !=NULL)
{
free(sourceService);
sourceService = NULL;
}
if(sourceApplication !=NULL)
{
free(sourceApplication);
sourceApplication = NULL;
}
if(destService !=NULL)
{
free(destService);
destService = NULL;
}
if(destApplication !=NULL)
{
free(destApplication);
destApplication = NULL;
}
}
else
{
@@ -481,6 +448,107 @@ void *processUpstreamMessage()
return NULL;
}
/**
* @brief getDeviceId function to create deviceId in the format "mac:112233445xxx"
*
* @param[out] device_id in the format "mac:112233445xxx"
* @param[out] total size of device_id
*/
int getDeviceId(char **device_id, size_t *device_id_len)
{
char *deviceID = NULL;
size_t len;
if((get_parodus_cfg()->hw_mac !=NULL) && (strlen(get_parodus_cfg()->hw_mac)!=0))
{
len = strlen(get_parodus_cfg()->hw_mac) + 5;
deviceID = (char *) malloc(sizeof(char)*64);
if(deviceID != NULL)
{
snprintf(deviceID, len, "mac:%s", get_parodus_cfg()->hw_mac);
*device_id = deviceID;
*device_id_len = len;
}
else
{
ParodusError("device_id allocation failed\n");
return -1;
}
}
else
{
ParodusError("device mac is NULL\n");
return -1;
}
return 0;
}
/**
* @brief createUpstreamRetrieveMsg function to create new message for processing Retrieve requests
*
* @param[in] message The upstream message received from cloud or internal clients
* @param[out] retrieve_msg New message for processing Retrieve requests
*/
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg)
{
wrp_msg_t *msg;
msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) );
if(msg != NULL)
{
memset(msg, 0, sizeof(wrp_msg_t));
msg->msg_type = message->msg_type;
if(message->u.crud.transaction_uuid != NULL)
{
msg->u.crud.transaction_uuid = strdup(message->u.crud.transaction_uuid);
}
if(message->u.crud.source !=NULL)
{
msg->u.crud.source = strdup(message->u.crud.source);
}
if(message->u.crud.dest != NULL)
{
msg->u.crud.dest = strdup(message->u.crud.dest);
}
*retrieve_msg = msg;
}
}
/**
* @brief getServiceNameAndSendResponse function to fetch client service name and to send msg to it.
*
* @param[in] msg The decoded message to fetch client service name from its dest field
* @param[in] msg_bytes The encoded upstream msg to be sent to client
* @param[in] msg_size Total size of the msg to send to client
*/
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size)
{
char *serviceName = NULL;
int sendStatus =-1;
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
if ( serviceName != NULL)
{
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)msg_bytes, msg_size);
if(sendStatus ==1)
{
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
}
else
{
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
}
free(serviceName);
serviceName = NULL;
}
else
{
ParodusError("serviceName is NULL,not sending retrieve response to client\n");
}
}
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
{
void *appendData;

View File

@@ -47,8 +47,10 @@ typedef struct UpStreamMsg__
void packMetaData();
void *handle_upstream();
void *processUpstreamMessage();
int getDeviceId(char **device_id, size_t *device_id_len);
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size);
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size);
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg);
void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ);
UpStreamMsg * get_global_UpStreamMsgQ(void);
pthread_cond_t *get_global_nano_con(void);

View File

@@ -43,6 +43,7 @@ static ParodusCfg parodusCfg;
extern size_t metaPackSize;
extern UpStreamMsg *UpStreamMsgQ;
int numLoops = 1;
int deviceIDNull =0;
wrp_msg_t *temp = NULL;
extern pthread_mutex_t nano_mut;
extern pthread_cond_t nano_con;
@@ -97,8 +98,21 @@ void sendMessage(noPollConn *conn, void *msg, size_t len)
function_called();
}
void set_parodus_cfg(ParodusCfg *cfg)
{
memcpy(&parodusCfg, cfg, sizeof(ParodusCfg));
}
ParodusCfg *get_parodus_cfg(void)
{
ParodusCfg cfg;
memset(&cfg,0,sizeof(cfg));
parStrncpy(cfg.hw_mac , "14cfe2142xxx", sizeof(cfg.hw_mac));
if(deviceIDNull)
{
parStrncpy(cfg.hw_mac , "", sizeof(cfg.hw_mac));
}
set_parodus_cfg(&cfg);
return &parodusCfg;
}
@@ -740,16 +754,16 @@ void test_processUpstreamMsg_cloud_status()
void test_processUpstreamMsg_sendToClient()
{
numLoops = 2;
metaPackSize = 20;
numLoops = 2;
metaPackSize = 20;
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
UpStreamMsgQ->msg = strdup("First Message");
UpStreamMsgQ->len = 13;
UpStreamMsgQ->next= NULL;
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
UpStreamMsgQ->next->msg = strdup("Second Message");
UpStreamMsgQ->next->len = 15;
UpStreamMsgQ->next->next = NULL;
UpStreamMsgQ->next->msg = strdup("Second Message");
UpStreamMsgQ->next->len = 15;
UpStreamMsgQ->next->next = NULL;
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
memset(temp,0,sizeof(wrp_msg_t));
@@ -772,9 +786,65 @@ void test_processUpstreamMsg_sendToClient()
expect_function_call(wrp_free_struct);
processUpstreamMessage();
free(temp);
free(UpStreamMsgQ);
UpStreamMsgQ = NULL;
free(temp);
free(UpStreamMsgQ);
UpStreamMsgQ = NULL;
}
void test_processUpstreamMsg_serviceNameNULL()
{
numLoops = 1;
metaPackSize = 20;
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
UpStreamMsgQ->msg = strdup("First Message");
UpStreamMsgQ->len = 13;
UpStreamMsgQ->next= NULL;
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
UpStreamMsgQ->next->msg = strdup("Second Message");
UpStreamMsgQ->next->len = 15;
UpStreamMsgQ->next->next = NULL;
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
memset(temp,0,sizeof(wrp_msg_t));
temp->msg_type = 6;
temp->u.crud.dest = strdup("mac:14cfe2142xxx/");
temp->u.crud.source = strdup("mac:14cfe2142xxx/parodus/cloud-status");
temp->u.crud.transaction_uuid = strdup("123");
will_return(wrp_to_struct, 12);
expect_function_call(wrp_to_struct);
expect_function_call(wrp_free_struct);
processUpstreamMessage();
free(temp);
free(UpStreamMsgQ);
UpStreamMsgQ = NULL;
}
void err_processUpstreamMsg_deviceID()
{
numLoops = 1;
metaPackSize = 20;
deviceIDNull = 1;
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
UpStreamMsgQ->msg = "First Message";
UpStreamMsgQ->len = 13;
UpStreamMsgQ->next= NULL;
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
memset(temp,0,sizeof(wrp_msg_t));
temp->msg_type = 6;
temp->u.crud.dest = "mac:14cfe2142xxx/parodus/cloud-status";
temp->u.crud.source = "mac:14cfe2142xxx/config";
temp->u.crud.transaction_uuid = "123";
will_return(wrp_to_struct, 12);
expect_function_call(wrp_to_struct);
will_return(nn_freemsg, 0);
expect_function_call(nn_freemsg);
expect_function_call(wrp_free_struct);
processUpstreamMessage();
free(temp);
free(UpStreamMsgQ);
UpStreamMsgQ = NULL;
}
/*----------------------------------------------------------------------------*/
/* External Functions */
@@ -808,6 +878,8 @@ int main(void)
cmocka_unit_test(test_processUpstreamMsgCrud_nnfree),
cmocka_unit_test(test_processUpstreamMsg_cloud_status),
cmocka_unit_test(test_processUpstreamMsg_sendToClient),
cmocka_unit_test(test_processUpstreamMsg_serviceNameNULL),
cmocka_unit_test(err_processUpstreamMsg_deviceID)
};
return cmocka_run_group_tests(tests, NULL, NULL);