mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-27 18:20:04 +00:00
Compare commits
8 Commits
quic
...
trace_reve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8957afc6bc | ||
|
|
118e8ee32f | ||
|
|
f7c9f483f9 | ||
|
|
502f56400e | ||
|
|
3e557ae4b0 | ||
|
|
0602fb243b | ||
|
|
0b0ba77bd6 | ||
|
|
0561c67e5c |
@@ -320,10 +320,7 @@ void *processUpstreamMessage()
|
||||
}
|
||||
else if(msgType == WRP_MSG_TYPE__EVENT)
|
||||
{
|
||||
(msg->u.event.headers != NULL && msg->u.event.headers->headers[0] != NULL && msg->u.event.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream event data: dest '%s' traceParent: %s traceState: %s\n", msg->u.event.dest, msg->u.event.headers->headers[0], msg->u.event.headers->headers[1]) : ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
|
||||
if(msg->u.event.transaction_uuid != NULL) {
|
||||
ParodusInfo("transaction_uuid in event: %s\n", msg->u.event.transaction_uuid);
|
||||
}
|
||||
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
|
||||
partners_t *partnersList = NULL;
|
||||
int j = 0;
|
||||
|
||||
@@ -382,7 +379,7 @@ void *processUpstreamMessage()
|
||||
//Sending to server for msgTypes 3, 5, 6, 7, 8.
|
||||
if( WRP_MSG_TYPE__REQ == msgType )
|
||||
{
|
||||
(msg->u.req.headers != NULL && msg->u.req.headers->headers[0] != NULL && msg->u.req.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s traceParent: %s traceState: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid, msg->u.req.headers->headers[0], msg->u.req.headers->headers[1]) : ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid);
|
||||
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid );
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
else
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "config.h"
|
||||
#include "time.h"
|
||||
#include "heartBeat.h"
|
||||
#include "close_retry.h"
|
||||
|
||||
static pthread_t processThreadId = 0;
|
||||
static unsigned int XmidtQsize = 0;
|
||||
@@ -70,6 +71,22 @@ bool highQosValueCheck(int qos)
|
||||
return false;
|
||||
}
|
||||
|
||||
//To handle high priority low qos message to confirm send success and ignore cloud ack.
|
||||
bool higherPriorityLowQosCheck(int qos)
|
||||
{
|
||||
if(qos > 20 && qos < 25)
|
||||
{
|
||||
ParodusInfo("The low qos msg with higher priority\n");
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("The qos is not higher priority low qos\n");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
XmidtMsg * get_global_xmidthead(void)
|
||||
{
|
||||
XmidtMsg *tmp = NULL;
|
||||
@@ -122,9 +139,9 @@ void decrement_XmidtQsize()
|
||||
int checkCloudConn()
|
||||
{
|
||||
int ret = 1;
|
||||
if (!cloud_status_is_online ())
|
||||
if (get_close_retry() || !cloud_status_is_online ())
|
||||
{
|
||||
ParodusInfo("cloud status is not online, wait till connection up\n");
|
||||
ParodusInfo("close_retry is in progress or cloud status is not online, wait till connection up\n");
|
||||
|
||||
int rv;
|
||||
struct timespec ts;
|
||||
@@ -214,8 +231,15 @@ int xmidtQOptmize()
|
||||
{
|
||||
if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size)
|
||||
{
|
||||
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
del = 2;
|
||||
if(higherPriorityLowQosCheck(tempMsg->u.event.qos))
|
||||
{
|
||||
ParodusInfo("Skip max queue size delete for qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
del = 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -746,10 +770,9 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
while(sendRetStatus) //If SendMessage is failed condition
|
||||
{
|
||||
ParodusError("sendXmidtEventToServer is Failed\n");
|
||||
if(highQosValueCheck(qos))
|
||||
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
||||
{
|
||||
ParodusPrint("The event is having high qos retry again\n");
|
||||
ParodusInfo("Wait till connection is Up\n");
|
||||
ParodusPrint("The event is having high qos retry again, wait till connection is Up\n");
|
||||
rv = checkCloudConn();
|
||||
if(rv == 2)
|
||||
{
|
||||
@@ -757,7 +780,7 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
break;
|
||||
}
|
||||
ParodusInfo("Received cloud status signal proceed to retry\n");
|
||||
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -797,13 +820,26 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Low qos event, send success callback and delete\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
//print_xmidMsg_list();
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
if(higherPriorityLowQosCheck(qos))
|
||||
{
|
||||
ParodusInfo("Higher priority low qos send success, ignore cloud ack\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
ParodusPrint("update state to DELETE\n");
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Low qos event, send success callback and delete\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
//print_xmidMsg_list();
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1631,7 +1667,7 @@ void checkMaxQandOptimize(XmidtMsg *xmdMsg)
|
||||
wrp_msg_t * tempMsg = temp->msg;
|
||||
qos = tempMsg->u.event.qos;
|
||||
ParodusPrint("qos is %d\n", qos);
|
||||
if(highQosValueCheck(qos))
|
||||
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
||||
{
|
||||
ParodusPrint("High qos msg, skip delete\n");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user