diff --git a/CHANGELOG.md b/CHANGELOG.md index aaf0d0f..1c0cb84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Fixed crash in CRUD request processing - Fixed issue on RETRIEVE respone processing - Enabled valgrind +- Refactored Upsteam RETRIEVE flow ## [1.0.1] - 2018-07-18 ### Added diff --git a/src/upstream.c b/src/upstream.c index 35ccd37..dea551d 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -34,7 +34,7 @@ /* Macros */ /*----------------------------------------------------------------------------*/ #define METADATA_COUNT 12 -#define CLOUD_STATUS_FORMAT "parodus/cloud-status" +#define PARODUS_SERVICE_NAME "parodus" /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ @@ -210,10 +210,10 @@ void *processUpstreamMessage() reg_list_item_t *temp = NULL; int matchFlag = 0; int status = -1; - char *serviceName = NULL; - char *destService, *destApplication =NULL; - char *sourceService, *sourceApplication =NULL; - int sendStatus =-1; + char *device_id = NULL; + size_t device_id_len = 0; + size_t parodus_len; + int ret = -1; while(FOREVER()) { @@ -360,79 +360,46 @@ void *processUpstreamMessage() else { ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s status: %d\n",msgType, msg->u.crud.dest, msg->u.crud.transaction_uuid, msg->u.crud.status ); - if(WRP_MSG_TYPE__RETREIVE == msgType && msg->u.crud.dest !=NULL && msg->u.crud.source != NULL) + if(WRP_MSG_TYPE__RETREIVE == msgType) { - destService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST); - destApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, DEST); - sourceService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, SOURCE); - sourceApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, SOURCE); - /* Handle cloud-status retrieve request here - Expecting dest format as mac:xxxxxxxxxxxx/parodus/cloud-status - Parse dest field and check destService is "parodus" and destApplication is "cloud-status" - */ - if(destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0) - { - retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); - memset(retrieve_msg, 0, sizeof(wrp_msg_t)); - retrieve_msg->msg_type = msg->msg_type; - retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid); - retrieve_msg->u.crud.source = strdup(msg->u.crud.source); - retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest); - addCRUDmsgToQueue(retrieve_msg); - } - else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0) - { - /* Handle cloud-status retrieve response here to send it to registered client - Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac: - Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status" - */ - serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST); - if ( serviceName != NULL) + ret = getDeviceId(&device_id, &device_id_len); + if(ret == 0) { - //Send Client cloud-status response back to registered client - ParodusInfo("Sending cloud-status response to %s client\n",serviceName); - sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len); - if(sendStatus ==1) + ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len); + /* Match dest based on device_id. Check dest start with: "mac:112233445xxx" ? */ + if( 0 == strncasecmp(device_id, msg->u.crud.dest, device_id_len-1) ) { - ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName); + /* For this device. */ + parodus_len = strlen( PARODUS_SERVICE_NAME ); + if( 0 == strncmp(PARODUS_SERVICE_NAME, &msg->u.crud.dest[device_id_len], parodus_len-1) ) + { + /* For Parodus CRUD queue. */ + ParodusInfo("Create RetrieveMsg and add to parodus CRUD queue\n"); + createUpstreamRetrieveMsg(msg, &retrieve_msg); + addCRUDmsgToQueue(retrieve_msg); + } + else + { + /* For nanomsg clients. */ + getServiceNameAndSendResponse(msg, &message->msg, message->len); + } } else { - ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName); + /* Not for this device. Send upstream */ + ParodusInfo("sendUpstreamMsgToServer \n"); + sendUpstreamMsgToServer(&message->msg, message->len); + } + if(device_id != NULL) + { + free(device_id); + device_id = NULL; } - free(serviceName); - serviceName = NULL; } else { - ParodusError("serviceName is NULL,not sending cloud-status response to client\n"); + ParodusError("Failed to get device_id\n"); } - } - else - { - ParodusInfo("sendUpstreamMsgToServer \n"); - sendUpstreamMsgToServer(&message->msg, message->len); - } - if(sourceService !=NULL) - { - free(sourceService); - sourceService = NULL; - } - if(sourceApplication !=NULL) - { - free(sourceApplication); - sourceApplication = NULL; - } - if(destService !=NULL) - { - free(destService); - destService = NULL; - } - if(destApplication !=NULL) - { - free(destApplication); - destApplication = NULL; - } } else { @@ -481,6 +448,107 @@ void *processUpstreamMessage() return NULL; } +/** + * @brief getDeviceId function to create deviceId in the format "mac:112233445xxx" + * + * @param[out] device_id in the format "mac:112233445xxx" + * @param[out] total size of device_id + */ +int getDeviceId(char **device_id, size_t *device_id_len) +{ + char *deviceID = NULL; + size_t len; + + if((get_parodus_cfg()->hw_mac !=NULL) && (strlen(get_parodus_cfg()->hw_mac)!=0)) + { + len = strlen(get_parodus_cfg()->hw_mac) + 5; + + deviceID = (char *) malloc(sizeof(char)*64); + if(deviceID != NULL) + { + snprintf(deviceID, len, "mac:%s", get_parodus_cfg()->hw_mac); + *device_id = deviceID; + *device_id_len = len; + } + else + { + ParodusError("device_id allocation failed\n"); + return -1; + } + } + else + { + ParodusError("device mac is NULL\n"); + return -1; + } + return 0; +} + + +/** + * @brief createUpstreamRetrieveMsg function to create new message for processing Retrieve requests + * + * @param[in] message The upstream message received from cloud or internal clients + * @param[out] retrieve_msg New message for processing Retrieve requests + */ +void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg) +{ + wrp_msg_t *msg; + msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) ); + if(msg != NULL) + { + memset(msg, 0, sizeof(wrp_msg_t)); + + msg->msg_type = message->msg_type; + if(message->u.crud.transaction_uuid != NULL) + { + msg->u.crud.transaction_uuid = strdup(message->u.crud.transaction_uuid); + } + if(message->u.crud.source !=NULL) + { + msg->u.crud.source = strdup(message->u.crud.source); + } + if(message->u.crud.dest != NULL) + { + msg->u.crud.dest = strdup(message->u.crud.dest); + } + *retrieve_msg = msg; + } +} + +/** + * @brief getServiceNameAndSendResponse function to fetch client service name and to send msg to it. + * + * @param[in] msg The decoded message to fetch client service name from its dest field + * @param[in] msg_bytes The encoded upstream msg to be sent to client + * @param[in] msg_size Total size of the msg to send to client + */ +void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size) +{ + char *serviceName = NULL; + int sendStatus =-1; + + serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST); + if ( serviceName != NULL) + { + sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)msg_bytes, msg_size); + if(sendStatus ==1) + { + ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName); + } + else + { + ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName); + } + free(serviceName); + serviceName = NULL; + } + else + { + ParodusError("serviceName is NULL,not sending retrieve response to client\n"); + } +} + void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) { void *appendData; diff --git a/src/upstream.h b/src/upstream.h index bd27944..adf222e 100644 --- a/src/upstream.h +++ b/src/upstream.h @@ -47,8 +47,10 @@ typedef struct UpStreamMsg__ void packMetaData(); void *handle_upstream(); void *processUpstreamMessage(); - +int getDeviceId(char **device_id, size_t *device_id_len); void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size); +void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size); +void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg); void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ); UpStreamMsg * get_global_UpStreamMsgQ(void); pthread_cond_t *get_global_nano_con(void); diff --git a/tests/test_upstream.c b/tests/test_upstream.c index cf6c2ce..f1601b4 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -43,6 +43,7 @@ static ParodusCfg parodusCfg; extern size_t metaPackSize; extern UpStreamMsg *UpStreamMsgQ; int numLoops = 1; +int deviceIDNull =0; wrp_msg_t *temp = NULL; extern pthread_mutex_t nano_mut; extern pthread_cond_t nano_con; @@ -97,8 +98,21 @@ void sendMessage(noPollConn *conn, void *msg, size_t len) function_called(); } +void set_parodus_cfg(ParodusCfg *cfg) +{ + memcpy(&parodusCfg, cfg, sizeof(ParodusCfg)); +} + ParodusCfg *get_parodus_cfg(void) { + ParodusCfg cfg; + memset(&cfg,0,sizeof(cfg)); + parStrncpy(cfg.hw_mac , "14cfe2142xxx", sizeof(cfg.hw_mac)); + if(deviceIDNull) + { + parStrncpy(cfg.hw_mac , "", sizeof(cfg.hw_mac)); + } + set_parodus_cfg(&cfg); return &parodusCfg; } @@ -740,16 +754,16 @@ void test_processUpstreamMsg_cloud_status() void test_processUpstreamMsg_sendToClient() { - numLoops = 2; - metaPackSize = 20; + numLoops = 2; + metaPackSize = 20; UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); UpStreamMsgQ->msg = strdup("First Message"); UpStreamMsgQ->len = 13; UpStreamMsgQ->next= NULL; UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); - UpStreamMsgQ->next->msg = strdup("Second Message"); - UpStreamMsgQ->next->len = 15; - UpStreamMsgQ->next->next = NULL; + UpStreamMsgQ->next->msg = strdup("Second Message"); + UpStreamMsgQ->next->len = 15; + UpStreamMsgQ->next->next = NULL; temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); memset(temp,0,sizeof(wrp_msg_t)); @@ -772,9 +786,65 @@ void test_processUpstreamMsg_sendToClient() expect_function_call(wrp_free_struct); processUpstreamMessage(); - free(temp); - free(UpStreamMsgQ); - UpStreamMsgQ = NULL; + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; +} + +void test_processUpstreamMsg_serviceNameNULL() +{ + numLoops = 1; + metaPackSize = 20; + UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->msg = strdup("First Message"); + UpStreamMsgQ->len = 13; + UpStreamMsgQ->next= NULL; + UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->next->msg = strdup("Second Message"); + UpStreamMsgQ->next->len = 15; + UpStreamMsgQ->next->next = NULL; + + temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset(temp,0,sizeof(wrp_msg_t)); + temp->msg_type = 6; + temp->u.crud.dest = strdup("mac:14cfe2142xxx/"); + temp->u.crud.source = strdup("mac:14cfe2142xxx/parodus/cloud-status"); + temp->u.crud.transaction_uuid = strdup("123"); + + will_return(wrp_to_struct, 12); + expect_function_call(wrp_to_struct); + expect_function_call(wrp_free_struct); + processUpstreamMessage(); + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; +} + +void err_processUpstreamMsg_deviceID() +{ + numLoops = 1; + metaPackSize = 20; + deviceIDNull = 1; + UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg)); + UpStreamMsgQ->msg = "First Message"; + UpStreamMsgQ->len = 13; + UpStreamMsgQ->next= NULL; + temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset(temp,0,sizeof(wrp_msg_t)); + temp->msg_type = 6; + temp->u.crud.dest = "mac:14cfe2142xxx/parodus/cloud-status"; + temp->u.crud.source = "mac:14cfe2142xxx/config"; + temp->u.crud.transaction_uuid = "123"; + + will_return(wrp_to_struct, 12); + expect_function_call(wrp_to_struct); + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); + expect_function_call(wrp_free_struct); + processUpstreamMessage(); + free(temp); + free(UpStreamMsgQ); + UpStreamMsgQ = NULL; } /*----------------------------------------------------------------------------*/ /* External Functions */ @@ -808,6 +878,8 @@ int main(void) cmocka_unit_test(test_processUpstreamMsgCrud_nnfree), cmocka_unit_test(test_processUpstreamMsg_cloud_status), cmocka_unit_test(test_processUpstreamMsg_sendToClient), + cmocka_unit_test(test_processUpstreamMsg_serviceNameNULL), + cmocka_unit_test(err_processUpstreamMsg_deviceID) }; return cmocka_run_group_tests(tests, NULL, NULL);