From 8ea342b537f781fd3aae7264b016dfb122cd971f Mon Sep 17 00:00:00 2001 From: Sadhyama Vengilat Date: Tue, 21 Jun 2022 20:29:36 +0530 Subject: [PATCH] Xmidt SENT state to process cloud ACK --- src/xmidtsend_rbus.c | 129 ++++++++++++++++--------------------------- src/xmidtsend_rbus.h | 6 +- 2 files changed, 50 insertions(+), 85 deletions(-) diff --git a/src/xmidtsend_rbus.c b/src/xmidtsend_rbus.c index 08475c5..76be9e6 100644 --- a/src/xmidtsend_rbus.c +++ b/src/xmidtsend_rbus.c @@ -67,7 +67,7 @@ bool highQosValueCheck(int qos) return false; } -XmidtMsg * get_global_XmidtMsg(void) +XmidtMsg * get_global_xmidthead(void) { XmidtMsg *tmp = NULL; pthread_mutex_lock (&xmidt_mut); @@ -76,7 +76,7 @@ XmidtMsg * get_global_XmidtMsg(void) return tmp; } -void set_global_XmidtMsg(XmidtMsg *new) +void set_global_xmidthead(XmidtMsg *new) { pthread_mutex_lock (&xmidt_mut); XmidtMsgQ = new; @@ -175,6 +175,9 @@ void* processXmidtUpstreamMsg() { int rv = 0; int state = 0; + long long currTime = 0; + int match_found = 0; + while(FOREVER()) { if(get_parodus_init()) @@ -189,14 +192,15 @@ void* processXmidtUpstreamMsg() ParodusInfo("mutex lock in xmidt consumer thread\n"); if (XmidtMsgQ != NULL) { - int state = XmidtMsgQ->state; - switch(state) + XmidtMsg *Data = XmidtMsgQ; + XmidtMsgQ = XmidtMsgQ->next; + pthread_mutex_unlock (&xmidt_mut); + ParodusInfo("mutex unlock in xmidt consumer thread\n"); + + switch(Data->state) { case PENDING: ParodusInfo("state : PENDING\n"); - XmidtMsg *Data = XmidtMsgQ; - pthread_mutex_unlock (&xmidt_mut); - ParodusInfo("mutex unlock in xmidt consumer thread\n"); //Try sending msg to server only when cloud connection is up/online. if (!cloud_status_is_online ()) { @@ -223,18 +227,49 @@ void* processXmidtUpstreamMsg() break; case SENT: ParodusInfo("state : SENT\n"); + currTime = currentTime(); + ParodusInfo("currTime %d sentTime %lu\n",currTime, Data->sentTime); + if (currTime > (Data->sentTime + CLOUD_ACK_TIMEOUT_SEC)) + { + ParodusInfo("Check cloud ack for matching transaction id\n"); + match_found = checkCloudACK(); + ParodusInfo("match_found is %d\n", match_found); + if (match_found) + { + createOutParamsandSendAck(Data->msg, Data->asyncHandle, "Delivered success", DELIVERED_SUCCESS, RBUS_ERROR_SUCCESS); + ParodusInfo("updateStateAndTime to DELETE\n"); + int upRet = updateStateAndTime(Data, DELETE); + if(upRet) + { + ParodusInfo("updateStateAndTime success\n"); + } + else + { + ParodusError("updateStateAndTime failed\n"); + } + ParodusInfo("B4 print_xmidMsg_list\n"); + print_xmidMsg_list(); + ParodusInfo("print_xmidMsg_list done\n"); + //wrp_free_struct(Data->msg); + //free cloud ack node + } + else + { + ParodusInfo("transaction id match not found, cloud ack timed out. Need to retry\n"); + //Retry . + } + } break; case DELETE: ParodusInfo("state : DELETE\n"); break; } - XmidtMsgQ = XmidtMsgQ->next; // circling back to 1st node - if(XmidtMsgQ == NULL && headNode != NULL) + if(XmidtMsgQ == NULL && get_global_xmidthead() != NULL) { ParodusInfo("XmidtMsgQ is NULL, circling back to 1st node\n"); - XmidtMsgQ = headNode; + XmidtMsgQ = get_global_xmidthead(); } sleep(1); } @@ -519,8 +554,6 @@ void sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncH { if(highQosValueCheck(qos)) { - ParodusInfo("Start processCloudAck consumer\n"); - processCloudAck(); //update msg status from PENDING to SENT ParodusInfo("B4 updateStateAndTime\n"); int upRet = updateStateAndTime(msgnode, SENT); @@ -1017,81 +1050,15 @@ void addToCloudAckQ(char *trans_id, int qos, int rdr) return; } -//Consumer thread to process cloud ack. -void processCloudAck() -{ - int err = 0; - err = pthread_create(&cloudackThreadId, NULL, cloudAckHandler, NULL); - if (err != 0) - { - ParodusError("Error creating processCloudAck thread :[%s]\n", strerror(err)); - } - else - { - ParodusInfo("processCloudAck thread created Successfully\n"); - } -} - -//To handle downstream cloud ack and send callback to consumer component. -void* cloudAckHandler() -{ - int rv = 0; - while(FOREVER()) - { - pthread_mutex_lock (&cloudack_mut); - ParodusInfo("mutex lock in cloudack consumer thread\n"); - if(CloudAckQ != NULL) - { - CloudAck *Data = CloudAckQ; - CloudAckQ = CloudAckQ->next; - pthread_mutex_unlock (&cloudack_mut); - ParodusInfo("mutex unlock in cloudack consumer thread\n"); - ParodusInfo("Data->transaction_id %s Data->qos %d Data->rdr %d\n", Data->transaction_id,Data->qos,Data->rdr); - rv = processCloudAckMsg(Data->transaction_id, Data->qos, Data->rdr); - if(rv) - { - ParodusInfo("processCloudAckMsg success\n"); - } - else - { - ParodusError("processCloudAckMsg failed\n"); - } - ParodusInfo("Data->transaction_id free\n"); - if((Data !=NULL) && (Data->transaction_id !=NULL)) - { - free(Data->transaction_id); - Data->transaction_id = NULL; - ParodusInfo("Data free\n"); - free(Data); - Data = NULL; - } - ParodusInfo("processCloudAckMsg done\n"); - } - else - { - if (g_shutdown) - { - pthread_mutex_unlock (&cloudack_mut); - break; - } - ParodusInfo("Before cond wait in cloudack consumer thread\n"); - pthread_cond_wait(&cloudack_con, &cloudack_mut); - pthread_mutex_unlock (&cloudack_mut); - ParodusInfo("mutex unlock in cloudack thread after cond wait\n"); - } - } - return NULL; -} - //Check cloud ack and send rbus callback based on transaction id of xmidt send messages. -int processCloudAckMsg(char *cloud_transID, int qos, int rdr) +int checkCloudACK(char *cloud_transID, int qos, int rdr) { if(cloud_transID == NULL) { ParodusError("cloud_transID is NULL, failed to process cloud ack\n"); return 0; } - ParodusInfo("processCloudAckMsg cloud_transID %s, qos %d, rdr %d\n", cloud_transID, qos, rdr); + ParodusInfo("checkCloudACK cloud_transID %s, qos %d, rdr %d\n", cloud_transID, qos, rdr); XmidtMsg *temp = NULL; wrp_msg_t *xmdMsg = NULL; @@ -1118,7 +1085,7 @@ int processCloudAckMsg(char *cloud_transID, int qos, int rdr) ParodusInfo("free xmdMsg as callback is sent after cloud ack\n"); //wrp_free_struct(xmdMsg); //xmidtQDequeue(); delete this temp node instead of dequeue - ParodusInfo("processCloudAckMsg done\n"); + ParodusInfo("checkCloudACK done\n"); return 1; } else diff --git a/src/xmidtsend_rbus.h b/src/xmidtsend_rbus.h index 9fe12f4..0f8d85b 100644 --- a/src/xmidtsend_rbus.h +++ b/src/xmidtsend_rbus.h @@ -32,6 +32,7 @@ extern "C" { #define XMIDT_SEND_METHOD "Device.X_RDK_Xmidt.SendData" #define MAX_QUEUE_SIZE 10 #define INPARAMS_PATH "/tmp/inparams.txt" +#define CLOUD_ACK_TIMEOUT_SEC 7 /*----------------------------------------------------------------------------*/ /* Data Structures */ /*----------------------------------------------------------------------------*/ @@ -96,10 +97,7 @@ bool highQosValueCheck(int qos); void waitTillConnectionIsUp(); void printRBUSParams(rbusObject_t params, char* file_path); void addToCloudAckQ(char *transaction_id, int qos, int rdr); -void processCloudAck(); -void* cloudAckHandler(); -int processCloudAckMsg(char *trans_id, int qos, int rdr); -int checkCloudAckTimer(int startTime); +int checkCloudACK(char *trans_id, int qos, int rdr); int updateStateAndTime(XmidtMsg * temp, int state); void print_xmidMsg_list(); #ifdef __cplusplus