mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-27 10:20:04 +00:00
Merge branch 'master' into svcalive
This commit is contained in:
@@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
||||
- Enabled valgrind
|
||||
- Fixed main loop to keep calling svc_alive_task during a cloud disconnect and retry
|
||||
- change svc alive back to a separate thread. Shut it down with pthread_cond_timedwait
|
||||
- Refactored Upsteam RETRIEVE flow
|
||||
|
||||
## [1.0.1] - 2018-07-18
|
||||
### Added
|
||||
|
||||
202
src/upstream.c
202
src/upstream.c
@@ -34,7 +34,7 @@
|
||||
/* Macros */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
#define METADATA_COUNT 12
|
||||
#define CLOUD_STATUS_FORMAT "parodus/cloud-status"
|
||||
#define PARODUS_SERVICE_NAME "parodus"
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -210,10 +210,10 @@ void *processUpstreamMessage()
|
||||
reg_list_item_t *temp = NULL;
|
||||
int matchFlag = 0;
|
||||
int status = -1;
|
||||
char *serviceName = NULL;
|
||||
char *destService, *destApplication =NULL;
|
||||
char *sourceService, *sourceApplication =NULL;
|
||||
int sendStatus =-1;
|
||||
char *device_id = NULL;
|
||||
size_t device_id_len = 0;
|
||||
size_t parodus_len;
|
||||
int ret = -1;
|
||||
|
||||
while(FOREVER())
|
||||
{
|
||||
@@ -360,79 +360,46 @@ void *processUpstreamMessage()
|
||||
else
|
||||
{
|
||||
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s status: %d\n",msgType, msg->u.crud.dest, msg->u.crud.transaction_uuid, msg->u.crud.status );
|
||||
if(WRP_MSG_TYPE__RETREIVE == msgType && msg->u.crud.dest !=NULL && msg->u.crud.source != NULL)
|
||||
if(WRP_MSG_TYPE__RETREIVE == msgType)
|
||||
{
|
||||
destService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
destApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, DEST);
|
||||
sourceService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, SOURCE);
|
||||
sourceApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, SOURCE);
|
||||
/* Handle cloud-status retrieve request here
|
||||
Expecting dest format as mac:xxxxxxxxxxxx/parodus/cloud-status
|
||||
Parse dest field and check destService is "parodus" and destApplication is "cloud-status"
|
||||
*/
|
||||
if(destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0)
|
||||
{
|
||||
retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) );
|
||||
memset(retrieve_msg, 0, sizeof(wrp_msg_t));
|
||||
retrieve_msg->msg_type = msg->msg_type;
|
||||
retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid);
|
||||
retrieve_msg->u.crud.source = strdup(msg->u.crud.source);
|
||||
retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest);
|
||||
addCRUDmsgToQueue(retrieve_msg);
|
||||
}
|
||||
else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0)
|
||||
{
|
||||
/* Handle cloud-status retrieve response here to send it to registered client
|
||||
Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac:
|
||||
Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status"
|
||||
*/
|
||||
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
if ( serviceName != NULL)
|
||||
ret = getDeviceId(&device_id, &device_id_len);
|
||||
if(ret == 0)
|
||||
{
|
||||
//Send Client cloud-status response back to registered client
|
||||
ParodusInfo("Sending cloud-status response to %s client\n",serviceName);
|
||||
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len);
|
||||
if(sendStatus ==1)
|
||||
ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len);
|
||||
/* Match dest based on device_id. Check dest start with: "mac:112233445xxx" ? */
|
||||
if( 0 == strncasecmp(device_id, msg->u.crud.dest, device_id_len-1) )
|
||||
{
|
||||
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
|
||||
/* For this device. */
|
||||
parodus_len = strlen( PARODUS_SERVICE_NAME );
|
||||
if( 0 == strncmp(PARODUS_SERVICE_NAME, &msg->u.crud.dest[device_id_len], parodus_len-1) )
|
||||
{
|
||||
/* For Parodus CRUD queue. */
|
||||
ParodusInfo("Create RetrieveMsg and add to parodus CRUD queue\n");
|
||||
createUpstreamRetrieveMsg(msg, &retrieve_msg);
|
||||
addCRUDmsgToQueue(retrieve_msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* For nanomsg clients. */
|
||||
getServiceNameAndSendResponse(msg, &message->msg, message->len);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
|
||||
/* Not for this device. Send upstream */
|
||||
ParodusInfo("sendUpstreamMsgToServer \n");
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
if(device_id != NULL)
|
||||
{
|
||||
free(device_id);
|
||||
device_id = NULL;
|
||||
}
|
||||
free(serviceName);
|
||||
serviceName = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("serviceName is NULL,not sending cloud-status response to client\n");
|
||||
ParodusError("Failed to get device_id\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("sendUpstreamMsgToServer \n");
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
if(sourceService !=NULL)
|
||||
{
|
||||
free(sourceService);
|
||||
sourceService = NULL;
|
||||
}
|
||||
if(sourceApplication !=NULL)
|
||||
{
|
||||
free(sourceApplication);
|
||||
sourceApplication = NULL;
|
||||
}
|
||||
if(destService !=NULL)
|
||||
{
|
||||
free(destService);
|
||||
destService = NULL;
|
||||
}
|
||||
if(destApplication !=NULL)
|
||||
{
|
||||
free(destApplication);
|
||||
destApplication = NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -481,6 +448,107 @@ void *processUpstreamMessage()
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief getDeviceId function to create deviceId in the format "mac:112233445xxx"
|
||||
*
|
||||
* @param[out] device_id in the format "mac:112233445xxx"
|
||||
* @param[out] total size of device_id
|
||||
*/
|
||||
int getDeviceId(char **device_id, size_t *device_id_len)
|
||||
{
|
||||
char *deviceID = NULL;
|
||||
size_t len;
|
||||
|
||||
if((get_parodus_cfg()->hw_mac !=NULL) && (strlen(get_parodus_cfg()->hw_mac)!=0))
|
||||
{
|
||||
len = strlen(get_parodus_cfg()->hw_mac) + 5;
|
||||
|
||||
deviceID = (char *) malloc(sizeof(char)*64);
|
||||
if(deviceID != NULL)
|
||||
{
|
||||
snprintf(deviceID, len, "mac:%s", get_parodus_cfg()->hw_mac);
|
||||
*device_id = deviceID;
|
||||
*device_id_len = len;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("device_id allocation failed\n");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("device mac is NULL\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief createUpstreamRetrieveMsg function to create new message for processing Retrieve requests
|
||||
*
|
||||
* @param[in] message The upstream message received from cloud or internal clients
|
||||
* @param[out] retrieve_msg New message for processing Retrieve requests
|
||||
*/
|
||||
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg)
|
||||
{
|
||||
wrp_msg_t *msg;
|
||||
msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) );
|
||||
if(msg != NULL)
|
||||
{
|
||||
memset(msg, 0, sizeof(wrp_msg_t));
|
||||
|
||||
msg->msg_type = message->msg_type;
|
||||
if(message->u.crud.transaction_uuid != NULL)
|
||||
{
|
||||
msg->u.crud.transaction_uuid = strdup(message->u.crud.transaction_uuid);
|
||||
}
|
||||
if(message->u.crud.source !=NULL)
|
||||
{
|
||||
msg->u.crud.source = strdup(message->u.crud.source);
|
||||
}
|
||||
if(message->u.crud.dest != NULL)
|
||||
{
|
||||
msg->u.crud.dest = strdup(message->u.crud.dest);
|
||||
}
|
||||
*retrieve_msg = msg;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief getServiceNameAndSendResponse function to fetch client service name and to send msg to it.
|
||||
*
|
||||
* @param[in] msg The decoded message to fetch client service name from its dest field
|
||||
* @param[in] msg_bytes The encoded upstream msg to be sent to client
|
||||
* @param[in] msg_size Total size of the msg to send to client
|
||||
*/
|
||||
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size)
|
||||
{
|
||||
char *serviceName = NULL;
|
||||
int sendStatus =-1;
|
||||
|
||||
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
if ( serviceName != NULL)
|
||||
{
|
||||
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)msg_bytes, msg_size);
|
||||
if(sendStatus ==1)
|
||||
{
|
||||
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
|
||||
}
|
||||
free(serviceName);
|
||||
serviceName = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("serviceName is NULL,not sending retrieve response to client\n");
|
||||
}
|
||||
}
|
||||
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
{
|
||||
void *appendData;
|
||||
|
||||
@@ -47,8 +47,10 @@ typedef struct UpStreamMsg__
|
||||
void packMetaData();
|
||||
void *handle_upstream();
|
||||
void *processUpstreamMessage();
|
||||
|
||||
int getDeviceId(char **device_id, size_t *device_id_len);
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size);
|
||||
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size);
|
||||
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg);
|
||||
void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ);
|
||||
UpStreamMsg * get_global_UpStreamMsgQ(void);
|
||||
pthread_cond_t *get_global_nano_con(void);
|
||||
|
||||
@@ -43,6 +43,7 @@ static ParodusCfg parodusCfg;
|
||||
extern size_t metaPackSize;
|
||||
extern UpStreamMsg *UpStreamMsgQ;
|
||||
int numLoops = 1;
|
||||
int deviceIDNull =0;
|
||||
wrp_msg_t *temp = NULL;
|
||||
extern pthread_mutex_t nano_mut;
|
||||
extern pthread_cond_t nano_con;
|
||||
@@ -97,8 +98,21 @@ void sendMessage(noPollConn *conn, void *msg, size_t len)
|
||||
function_called();
|
||||
}
|
||||
|
||||
void set_parodus_cfg(ParodusCfg *cfg)
|
||||
{
|
||||
memcpy(&parodusCfg, cfg, sizeof(ParodusCfg));
|
||||
}
|
||||
|
||||
ParodusCfg *get_parodus_cfg(void)
|
||||
{
|
||||
ParodusCfg cfg;
|
||||
memset(&cfg,0,sizeof(cfg));
|
||||
parStrncpy(cfg.hw_mac , "14cfe2142xxx", sizeof(cfg.hw_mac));
|
||||
if(deviceIDNull)
|
||||
{
|
||||
parStrncpy(cfg.hw_mac , "", sizeof(cfg.hw_mac));
|
||||
}
|
||||
set_parodus_cfg(&cfg);
|
||||
return &parodusCfg;
|
||||
}
|
||||
|
||||
@@ -740,16 +754,16 @@ void test_processUpstreamMsg_cloud_status()
|
||||
|
||||
void test_processUpstreamMsg_sendToClient()
|
||||
{
|
||||
numLoops = 2;
|
||||
metaPackSize = 20;
|
||||
numLoops = 2;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
@@ -772,9 +786,65 @@ void test_processUpstreamMsg_sendToClient()
|
||||
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
|
||||
void test_processUpstreamMsg_serviceNameNULL()
|
||||
{
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = 6;
|
||||
temp->u.crud.dest = strdup("mac:14cfe2142xxx/");
|
||||
temp->u.crud.source = strdup("mac:14cfe2142xxx/parodus/cloud-status");
|
||||
temp->u.crud.transaction_uuid = strdup("123");
|
||||
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
|
||||
void err_processUpstreamMsg_deviceID()
|
||||
{
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
deviceIDNull = 1;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = "First Message";
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = 6;
|
||||
temp->u.crud.dest = "mac:14cfe2142xxx/parodus/cloud-status";
|
||||
temp->u.crud.source = "mac:14cfe2142xxx/config";
|
||||
temp->u.crud.transaction_uuid = "123";
|
||||
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
will_return(nn_freemsg, 0);
|
||||
expect_function_call(nn_freemsg);
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
@@ -808,6 +878,8 @@ int main(void)
|
||||
cmocka_unit_test(test_processUpstreamMsgCrud_nnfree),
|
||||
cmocka_unit_test(test_processUpstreamMsg_cloud_status),
|
||||
cmocka_unit_test(test_processUpstreamMsg_sendToClient),
|
||||
cmocka_unit_test(test_processUpstreamMsg_serviceNameNULL),
|
||||
cmocka_unit_test(err_processUpstreamMsg_deviceID)
|
||||
};
|
||||
|
||||
return cmocka_run_group_tests(tests, NULL, NULL);
|
||||
|
||||
Reference in New Issue
Block a user