|
|
|
|
@@ -31,6 +31,7 @@
|
|
|
|
|
#include "config.h"
|
|
|
|
|
#include "time.h"
|
|
|
|
|
#include "heartBeat.h"
|
|
|
|
|
#include "close_retry.h"
|
|
|
|
|
|
|
|
|
|
static pthread_t processThreadId = 0;
|
|
|
|
|
static unsigned int XmidtQsize = 0;
|
|
|
|
|
@@ -70,6 +71,22 @@ bool highQosValueCheck(int qos)
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//To handle high priority low qos message to confirm send success and ignore cloud ack.
|
|
|
|
|
bool higherPriorityLowQosCheck(int qos)
|
|
|
|
|
{
|
|
|
|
|
if(qos > 20 && qos < 25)
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("The low qos msg with higher priority\n");
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusPrint("The qos is not higher priority low qos\n");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
XmidtMsg * get_global_xmidthead(void)
|
|
|
|
|
{
|
|
|
|
|
XmidtMsg *tmp = NULL;
|
|
|
|
|
@@ -122,9 +139,9 @@ void decrement_XmidtQsize()
|
|
|
|
|
int checkCloudConn()
|
|
|
|
|
{
|
|
|
|
|
int ret = 1;
|
|
|
|
|
if (!cloud_status_is_online ())
|
|
|
|
|
if (get_close_retry() || !cloud_status_is_online ())
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("cloud status is not online, wait till connection up\n");
|
|
|
|
|
ParodusInfo("close_retry is in progress or cloud status is not online, wait till connection up\n");
|
|
|
|
|
|
|
|
|
|
int rv;
|
|
|
|
|
struct timespec ts;
|
|
|
|
|
@@ -214,8 +231,15 @@ int xmidtQOptmize()
|
|
|
|
|
{
|
|
|
|
|
if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size)
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
|
|
|
|
del = 2;
|
|
|
|
|
if(higherPriorityLowQosCheck(tempMsg->u.event.qos))
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Skip max queue size delete for qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
|
|
|
|
del = 2;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -376,10 +400,9 @@ void* processXmidtUpstreamMsg()
|
|
|
|
|
{
|
|
|
|
|
XmidtMsg *Data = xmidtQ;
|
|
|
|
|
pthread_mutex_unlock (&xmidt_mut);
|
|
|
|
|
ParodusPrint("mutex unlock in xmidt consumer thread\n");
|
|
|
|
|
|
|
|
|
|
checkMsgExpiry();
|
|
|
|
|
checkMaxQandOptimize();
|
|
|
|
|
ParodusPrint("mutex unlock in xmidt consumer\n");
|
|
|
|
|
checkMsgExpiry(xmidtQ);
|
|
|
|
|
checkMaxQandOptimize(xmidtQ);
|
|
|
|
|
cv = 0;
|
|
|
|
|
|
|
|
|
|
ParodusPrint("check state\n");
|
|
|
|
|
@@ -747,10 +770,9 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
|
|
|
|
while(sendRetStatus) //If SendMessage is failed condition
|
|
|
|
|
{
|
|
|
|
|
ParodusError("sendXmidtEventToServer is Failed\n");
|
|
|
|
|
if(highQosValueCheck(qos))
|
|
|
|
|
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
|
|
|
|
{
|
|
|
|
|
ParodusPrint("The event is having high qos retry again\n");
|
|
|
|
|
ParodusInfo("Wait till connection is Up\n");
|
|
|
|
|
ParodusPrint("The event is having high qos retry again, wait till connection is Up\n");
|
|
|
|
|
rv = checkCloudConn();
|
|
|
|
|
if(rv == 2)
|
|
|
|
|
{
|
|
|
|
|
@@ -758,7 +780,7 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
ParodusInfo("Received cloud status signal proceed to retry\n");
|
|
|
|
|
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
|
|
|
|
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
@@ -798,13 +820,26 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Low qos event, send success callback and delete\n");
|
|
|
|
|
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
|
|
|
|
ParodusPrint("statusMsg is %s\n",errorMsg);
|
|
|
|
|
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
|
|
|
|
//print_xmidMsg_list();
|
|
|
|
|
updateXmidtState(msgnode, DELETE);
|
|
|
|
|
print_xmidMsg_list();
|
|
|
|
|
if(higherPriorityLowQosCheck(qos))
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Higher priority low qos send success, ignore cloud ack\n");
|
|
|
|
|
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
|
|
|
|
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
|
|
|
|
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
|
|
|
|
ParodusPrint("update state to DELETE\n");
|
|
|
|
|
updateXmidtState(msgnode, DELETE);
|
|
|
|
|
print_xmidMsg_list();
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Low qos event, send success callback and delete\n");
|
|
|
|
|
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
|
|
|
|
ParodusPrint("statusMsg is %s\n",errorMsg);
|
|
|
|
|
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
|
|
|
|
//print_xmidMsg_list();
|
|
|
|
|
updateXmidtState(msgnode, DELETE);
|
|
|
|
|
print_xmidMsg_list();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1533,16 +1568,16 @@ int deleteFromXmidtQ(XmidtMsg **next_node)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//check if message is expired based on each qos and set to delete state.
|
|
|
|
|
void checkMsgExpiry()
|
|
|
|
|
void checkMsgExpiry(XmidtMsg *xmdMsg)
|
|
|
|
|
{
|
|
|
|
|
long long currTime = 0;
|
|
|
|
|
struct timespec ts;
|
|
|
|
|
char *errorMsg = NULL;
|
|
|
|
|
|
|
|
|
|
XmidtMsg *temp = NULL;
|
|
|
|
|
temp = get_global_xmidthead();
|
|
|
|
|
temp = xmdMsg;
|
|
|
|
|
|
|
|
|
|
while(temp != NULL)
|
|
|
|
|
if(temp != NULL)
|
|
|
|
|
{
|
|
|
|
|
getCurrentTime(&ts);
|
|
|
|
|
currTime= (long long)ts.tv_sec;
|
|
|
|
|
@@ -1551,8 +1586,7 @@ void checkMsgExpiry()
|
|
|
|
|
if(temp->state == DELETE)
|
|
|
|
|
{
|
|
|
|
|
ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid);
|
|
|
|
|
temp = temp->next;
|
|
|
|
|
continue;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(tempMsg->u.event.qos > 74)
|
|
|
|
|
@@ -1611,12 +1645,11 @@ void checkMsgExpiry()
|
|
|
|
|
{
|
|
|
|
|
ParodusError("Invalid qos\n");
|
|
|
|
|
}
|
|
|
|
|
temp = temp->next;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//To delete low qos messages from queue when max queue limit is reached.
|
|
|
|
|
void checkMaxQandOptimize()
|
|
|
|
|
void checkMaxQandOptimize(XmidtMsg *xmdMsg)
|
|
|
|
|
{
|
|
|
|
|
int qos = 0;
|
|
|
|
|
|
|
|
|
|
@@ -1627,28 +1660,35 @@ void checkMaxQandOptimize()
|
|
|
|
|
|
|
|
|
|
//Traverse through XmidtMsgQ list and set low qos msgs to DELETE
|
|
|
|
|
XmidtMsg *temp = NULL;
|
|
|
|
|
temp = get_global_xmidthead();
|
|
|
|
|
temp = xmdMsg;
|
|
|
|
|
|
|
|
|
|
while(temp != NULL)
|
|
|
|
|
{
|
|
|
|
|
if (temp != NULL)
|
|
|
|
|
{
|
|
|
|
|
wrp_msg_t * tempMsg = temp->msg;
|
|
|
|
|
qos = tempMsg->u.event.qos;
|
|
|
|
|
ParodusPrint("qos is %d\n", qos);
|
|
|
|
|
if(highQosValueCheck(qos))
|
|
|
|
|
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
|
|
|
|
{
|
|
|
|
|
ParodusPrint("High qos msg, skip delete\n");
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
|
|
|
|
|
//rbus callback to caller
|
|
|
|
|
char *errorMsg = NULL;
|
|
|
|
|
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
|
|
|
|
|
ParodusPrint("statusMsg is %s\n",errorMsg);
|
|
|
|
|
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
|
|
|
|
updateXmidtState(temp, DELETE);
|
|
|
|
|
//Skip max queue callback when msg is already in DELETE state.
|
|
|
|
|
if( temp->state == DELETE)
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
|
|
|
|
|
//rbus callback to caller
|
|
|
|
|
char *errorMsg = NULL;
|
|
|
|
|
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
|
|
|
|
|
ParodusPrint("statusMsg is %s\n",errorMsg);
|
|
|
|
|
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
|
|
|
|
updateXmidtState(temp, DELETE);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
temp = temp->next;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|