Xmidt SENT state to process cloud ACK

This commit is contained in:
Sadhyama Vengilat
2022-06-21 20:29:36 +05:30
parent 124fc11232
commit 8ea342b537
2 changed files with 50 additions and 85 deletions

View File

@@ -67,7 +67,7 @@ bool highQosValueCheck(int qos)
return false;
}
XmidtMsg * get_global_XmidtMsg(void)
XmidtMsg * get_global_xmidthead(void)
{
XmidtMsg *tmp = NULL;
pthread_mutex_lock (&xmidt_mut);
@@ -76,7 +76,7 @@ XmidtMsg * get_global_XmidtMsg(void)
return tmp;
}
void set_global_XmidtMsg(XmidtMsg *new)
void set_global_xmidthead(XmidtMsg *new)
{
pthread_mutex_lock (&xmidt_mut);
XmidtMsgQ = new;
@@ -175,6 +175,9 @@ void* processXmidtUpstreamMsg()
{
int rv = 0;
int state = 0;
long long currTime = 0;
int match_found = 0;
while(FOREVER())
{
if(get_parodus_init())
@@ -189,14 +192,15 @@ void* processXmidtUpstreamMsg()
ParodusInfo("mutex lock in xmidt consumer thread\n");
if (XmidtMsgQ != NULL)
{
int state = XmidtMsgQ->state;
switch(state)
XmidtMsg *Data = XmidtMsgQ;
XmidtMsgQ = XmidtMsgQ->next;
pthread_mutex_unlock (&xmidt_mut);
ParodusInfo("mutex unlock in xmidt consumer thread\n");
switch(Data->state)
{
case PENDING:
ParodusInfo("state : PENDING\n");
XmidtMsg *Data = XmidtMsgQ;
pthread_mutex_unlock (&xmidt_mut);
ParodusInfo("mutex unlock in xmidt consumer thread\n");
//Try sending msg to server only when cloud connection is up/online.
if (!cloud_status_is_online ())
{
@@ -223,18 +227,49 @@ void* processXmidtUpstreamMsg()
break;
case SENT:
ParodusInfo("state : SENT\n");
currTime = currentTime();
ParodusInfo("currTime %d sentTime %lu\n",currTime, Data->sentTime);
if (currTime > (Data->sentTime + CLOUD_ACK_TIMEOUT_SEC))
{
ParodusInfo("Check cloud ack for matching transaction id\n");
match_found = checkCloudACK();
ParodusInfo("match_found is %d\n", match_found);
if (match_found)
{
createOutParamsandSendAck(Data->msg, Data->asyncHandle, "Delivered success", DELIVERED_SUCCESS, RBUS_ERROR_SUCCESS);
ParodusInfo("updateStateAndTime to DELETE\n");
int upRet = updateStateAndTime(Data, DELETE);
if(upRet)
{
ParodusInfo("updateStateAndTime success\n");
}
else
{
ParodusError("updateStateAndTime failed\n");
}
ParodusInfo("B4 print_xmidMsg_list\n");
print_xmidMsg_list();
ParodusInfo("print_xmidMsg_list done\n");
//wrp_free_struct(Data->msg);
//free cloud ack node
}
else
{
ParodusInfo("transaction id match not found, cloud ack timed out. Need to retry\n");
//Retry .
}
}
break;
case DELETE:
ParodusInfo("state : DELETE\n");
break;
}
XmidtMsgQ = XmidtMsgQ->next;
// circling back to 1st node
if(XmidtMsgQ == NULL && headNode != NULL)
if(XmidtMsgQ == NULL && get_global_xmidthead() != NULL)
{
ParodusInfo("XmidtMsgQ is NULL, circling back to 1st node\n");
XmidtMsgQ = headNode;
XmidtMsgQ = get_global_xmidthead();
}
sleep(1);
}
@@ -519,8 +554,6 @@ void sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncH
{
if(highQosValueCheck(qos))
{
ParodusInfo("Start processCloudAck consumer\n");
processCloudAck();
//update msg status from PENDING to SENT
ParodusInfo("B4 updateStateAndTime\n");
int upRet = updateStateAndTime(msgnode, SENT);
@@ -1017,81 +1050,15 @@ void addToCloudAckQ(char *trans_id, int qos, int rdr)
return;
}
//Consumer thread to process cloud ack.
void processCloudAck()
{
int err = 0;
err = pthread_create(&cloudackThreadId, NULL, cloudAckHandler, NULL);
if (err != 0)
{
ParodusError("Error creating processCloudAck thread :[%s]\n", strerror(err));
}
else
{
ParodusInfo("processCloudAck thread created Successfully\n");
}
}
//To handle downstream cloud ack and send callback to consumer component.
void* cloudAckHandler()
{
int rv = 0;
while(FOREVER())
{
pthread_mutex_lock (&cloudack_mut);
ParodusInfo("mutex lock in cloudack consumer thread\n");
if(CloudAckQ != NULL)
{
CloudAck *Data = CloudAckQ;
CloudAckQ = CloudAckQ->next;
pthread_mutex_unlock (&cloudack_mut);
ParodusInfo("mutex unlock in cloudack consumer thread\n");
ParodusInfo("Data->transaction_id %s Data->qos %d Data->rdr %d\n", Data->transaction_id,Data->qos,Data->rdr);
rv = processCloudAckMsg(Data->transaction_id, Data->qos, Data->rdr);
if(rv)
{
ParodusInfo("processCloudAckMsg success\n");
}
else
{
ParodusError("processCloudAckMsg failed\n");
}
ParodusInfo("Data->transaction_id free\n");
if((Data !=NULL) && (Data->transaction_id !=NULL))
{
free(Data->transaction_id);
Data->transaction_id = NULL;
ParodusInfo("Data free\n");
free(Data);
Data = NULL;
}
ParodusInfo("processCloudAckMsg done\n");
}
else
{
if (g_shutdown)
{
pthread_mutex_unlock (&cloudack_mut);
break;
}
ParodusInfo("Before cond wait in cloudack consumer thread\n");
pthread_cond_wait(&cloudack_con, &cloudack_mut);
pthread_mutex_unlock (&cloudack_mut);
ParodusInfo("mutex unlock in cloudack thread after cond wait\n");
}
}
return NULL;
}
//Check cloud ack and send rbus callback based on transaction id of xmidt send messages.
int processCloudAckMsg(char *cloud_transID, int qos, int rdr)
int checkCloudACK(char *cloud_transID, int qos, int rdr)
{
if(cloud_transID == NULL)
{
ParodusError("cloud_transID is NULL, failed to process cloud ack\n");
return 0;
}
ParodusInfo("processCloudAckMsg cloud_transID %s, qos %d, rdr %d\n", cloud_transID, qos, rdr);
ParodusInfo("checkCloudACK cloud_transID %s, qos %d, rdr %d\n", cloud_transID, qos, rdr);
XmidtMsg *temp = NULL;
wrp_msg_t *xmdMsg = NULL;
@@ -1118,7 +1085,7 @@ int processCloudAckMsg(char *cloud_transID, int qos, int rdr)
ParodusInfo("free xmdMsg as callback is sent after cloud ack\n");
//wrp_free_struct(xmdMsg);
//xmidtQDequeue(); delete this temp node instead of dequeue
ParodusInfo("processCloudAckMsg done\n");
ParodusInfo("checkCloudACK done\n");
return 1;
}
else

View File

@@ -32,6 +32,7 @@ extern "C" {
#define XMIDT_SEND_METHOD "Device.X_RDK_Xmidt.SendData"
#define MAX_QUEUE_SIZE 10
#define INPARAMS_PATH "/tmp/inparams.txt"
#define CLOUD_ACK_TIMEOUT_SEC 7
/*----------------------------------------------------------------------------*/
/* Data Structures */
/*----------------------------------------------------------------------------*/
@@ -96,10 +97,7 @@ bool highQosValueCheck(int qos);
void waitTillConnectionIsUp();
void printRBUSParams(rbusObject_t params, char* file_path);
void addToCloudAckQ(char *transaction_id, int qos, int rdr);
void processCloudAck();
void* cloudAckHandler();
int processCloudAckMsg(char *trans_id, int qos, int rdr);
int checkCloudAckTimer(int startTime);
int checkCloudACK(char *trans_id, int qos, int rdr);
int updateStateAndTime(XmidtMsg * temp, int state);
void print_xmidMsg_list();
#ifdef __cplusplus