Compare commits

...

7 Commits

Author SHA1 Message Date
Sadhyama Vengilat
27ba50a2f6 Xmidt send loop 2023-06-12 18:33:44 +05:30
Sadhyama Vengilat
c04ed33b20 To skip max queue size delete for higher priority low qos 2023-04-05 20:33:32 +05:30
Sadhyama Vengilat
47b0e335ae XmidtSendData to handle higher priority low qos msgs to confirm send success 2023-04-05 20:32:52 +05:30
Sadhyama Vengilat
f872fdcc44 To fix low qos msgs immediate delete during max queue size 2023-03-21 14:30:14 +05:30
Sadhyama Vengilat
0da6e73c90 To skip max queue size callback for already processed msgs 2023-03-21 14:29:56 +05:30
Vasuki
7c56dbb3f6 "Added null check as fix for parodus crash" 2023-03-21 14:28:19 +05:30
Sadhyama Vengilat
6952647ce3 Subscribe to CurrentActiveInterfaceEvent before initial cloud connection 2023-02-24 12:39:41 +05:30
5 changed files with 85 additions and 47 deletions

View File

@@ -99,7 +99,9 @@ void createSocketConnection(void (* initKeypress)())
#endif
EventHandler();
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif
set_server_list_null (&server_list);
create_conn_rtn = createNopollConnection(ctx, &server_list);
if(!create_conn_rtn)
@@ -113,9 +115,6 @@ void createSocketConnection(void (* initKeypress)())
UpStreamMsgQ = NULL;
StartThread(handle_upstream, &upstream_tid);
StartThread(processUpstreamMessage, &upstream_msg_tid);
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif
ParodusMsgQ = NULL;
StartThread(messageHandlerTask, &downstream_tid);
StartThread(serviceAliveTask, &svc_alive_tid);

View File

@@ -783,8 +783,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
}
#endif
}
if(conn_ctx.current_server->allow_insecure <= 0)
if(conn_ctx.current_server != NULL && conn_ctx.current_server->allow_insecure <= 0)
{
ParodusInfo("Connected to server over SSL\n");
OnboardLog("Connected to server over SSL\n");

View File

@@ -73,7 +73,7 @@ void subscribeRBUSevent()
int subscribeCurrentActiveInterfaceEvent()
{
int rc = RBUS_ERROR_SUCCESS;
ParodusPrint("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
ParodusInfo("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBPA_INTERFACE,eventReceiveHandler,subscribeAsyncHandler,"parodusInterface",10*20);
if(rc != RBUS_ERROR_SUCCESS)
{

View File

@@ -31,6 +31,7 @@
#include "config.h"
#include "time.h"
#include "heartBeat.h"
#include "close_retry.h"
static pthread_t processThreadId = 0;
static unsigned int XmidtQsize = 0;
@@ -70,6 +71,22 @@ bool highQosValueCheck(int qos)
return false;
}
//To handle high priority low qos message to confirm send success and ignore cloud ack.
bool higherPriorityLowQosCheck(int qos)
{
if(qos > 20 && qos < 25)
{
ParodusInfo("The low qos msg with higher priority\n");
return true;
}
else
{
ParodusPrint("The qos is not higher priority low qos\n");
}
return false;
}
XmidtMsg * get_global_xmidthead(void)
{
XmidtMsg *tmp = NULL;
@@ -122,9 +139,9 @@ void decrement_XmidtQsize()
int checkCloudConn()
{
int ret = 1;
if (!cloud_status_is_online ())
if (get_close_retry() || !cloud_status_is_online ())
{
ParodusInfo("cloud status is not online, wait till connection up\n");
ParodusInfo("close_retry is in progress or cloud status is not online, wait till connection up\n");
int rv;
struct timespec ts;
@@ -214,8 +231,15 @@ int xmidtQOptmize()
{
if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size)
{
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
del = 2;
if(higherPriorityLowQosCheck(tempMsg->u.event.qos))
{
ParodusInfo("Skip max queue size delete for qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
}
else
{
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
del = 2;
}
}
}
}
@@ -376,10 +400,9 @@ void* processXmidtUpstreamMsg()
{
XmidtMsg *Data = xmidtQ;
pthread_mutex_unlock (&xmidt_mut);
ParodusPrint("mutex unlock in xmidt consumer thread\n");
checkMsgExpiry();
checkMaxQandOptimize();
ParodusPrint("mutex unlock in xmidt consumer\n");
checkMsgExpiry(xmidtQ);
checkMaxQandOptimize(xmidtQ);
cv = 0;
ParodusPrint("check state\n");
@@ -747,10 +770,9 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
while(sendRetStatus) //If SendMessage is failed condition
{
ParodusError("sendXmidtEventToServer is Failed\n");
if(highQosValueCheck(qos))
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
{
ParodusPrint("The event is having high qos retry again\n");
ParodusInfo("Wait till connection is Up\n");
ParodusPrint("The event is having high qos retry again, wait till connection is Up\n");
rv = checkCloudConn();
if(rv == 2)
{
@@ -758,7 +780,7 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
break;
}
ParodusInfo("Received cloud status signal proceed to retry\n");
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
}
else
{
@@ -798,13 +820,26 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
}
else
{
ParodusInfo("Low qos event, send success callback and delete\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
//print_xmidMsg_list();
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
if(higherPriorityLowQosCheck(qos))
{
ParodusInfo("Higher priority low qos send success, ignore cloud ack\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
ParodusPrint("update state to DELETE\n");
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
}
else
{
ParodusInfo("Low qos event, send success callback and delete\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
//print_xmidMsg_list();
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
}
}
}
@@ -1533,16 +1568,16 @@ int deleteFromXmidtQ(XmidtMsg **next_node)
}
//check if message is expired based on each qos and set to delete state.
void checkMsgExpiry()
void checkMsgExpiry(XmidtMsg *xmdMsg)
{
long long currTime = 0;
struct timespec ts;
char *errorMsg = NULL;
XmidtMsg *temp = NULL;
temp = get_global_xmidthead();
temp = xmdMsg;
while(temp != NULL)
if(temp != NULL)
{
getCurrentTime(&ts);
currTime= (long long)ts.tv_sec;
@@ -1551,8 +1586,7 @@ void checkMsgExpiry()
if(temp->state == DELETE)
{
ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid);
temp = temp->next;
continue;
return;
}
if(tempMsg->u.event.qos > 74)
@@ -1611,12 +1645,11 @@ void checkMsgExpiry()
{
ParodusError("Invalid qos\n");
}
temp = temp->next;
}
}
//To delete low qos messages from queue when max queue limit is reached.
void checkMaxQandOptimize()
void checkMaxQandOptimize(XmidtMsg *xmdMsg)
{
int qos = 0;
@@ -1627,28 +1660,35 @@ void checkMaxQandOptimize()
//Traverse through XmidtMsgQ list and set low qos msgs to DELETE
XmidtMsg *temp = NULL;
temp = get_global_xmidthead();
temp = xmdMsg;
while(temp != NULL)
{
if (temp != NULL)
{
wrp_msg_t * tempMsg = temp->msg;
qos = tempMsg->u.event.qos;
ParodusPrint("qos is %d\n", qos);
if(highQosValueCheck(qos))
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
{
ParodusPrint("High qos msg, skip delete\n");
}
else
{
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
//Skip max queue callback when msg is already in DELETE state.
if( temp->state == DELETE)
{
ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid);
}
else
{
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
}
}
temp = temp->next;
}
}
}

View File

@@ -114,8 +114,8 @@ void print_xmidMsg_list();
int deleteCloudACKNode(char* trans_id);
int deleteFromXmidtQ(XmidtMsg **next_node);
int checkCloudConn();
void checkMaxQandOptimize();
void checkMsgExpiry();
void checkMaxQandOptimize(XmidtMsg *xmdMsg);
void checkMsgExpiry(XmidtMsg *xmdMsg);
void mapXmidtStatusToStatusMessage(int status, char **message);
int xmidtQOptmize();
#ifdef __cplusplus