Compare commits

...

21 Commits

Author SHA1 Message Date
Shilpa Seshadri
d8f9dc44a9 To fix webconfig upstream event crash 2023-01-30 19:27:01 +05:30
shilpa24balaji
34802d329d Merge pull request #394 from sadhyama/xmd_content
Add content_type validation for Xmidt method profiles
2022-05-27 23:07:53 +05:30
Sadhyama Vengilat
0297d994b5 Reduce debug log 2022-05-27 19:47:39 +05:30
Sadhyama Vengilat
ee7033c36c Add contentType list and validate 2022-05-27 12:59:38 +05:30
Sadhyama Vengilat
e35221efd2 Add content_type validation for Xmidt method profiles 2022-05-26 20:36:11 +05:30
sadhyama
c898fe6d0e Merge pull request #393 from sadhyama/def_profile
Handle Xmidt default profile events received before connection
2022-05-13 18:55:25 +05:30
Sadhyama Vengilat
0017e48f7d Remove content_type modification 2022-05-13 18:48:30 +05:30
Sadhyama Vengilat
285a461f42 Disable debug logs 2022-05-11 18:16:25 +05:30
Sadhyama Vengilat
bb0f49a8b3 Added delay on parodus init 2022-05-10 19:26:33 +05:30
Sadhyama Vengilat
ee20e1fcb8 Handle xmidt default profiles received before initial cloud connection 2022-05-09 20:31:07 +05:30
sadhyama
becacd8990 Merge pull request #392 from sadhyama/parodus_highqos
set cloud_status online after close_retry reset to fix Xmidt sendMsg failure
2022-04-20 16:13:53 +05:30
Sadhyama Vengilat
b05bfbe1fa Merge remote-tracking branch 'upstream/master' into parodus_highqos 2022-04-20 16:13:10 +05:30
sadhyama
9fb3f118b7 Merge pull request #391 from guruchandru/xmd_send
Error Number value change
2022-04-20 16:11:22 +05:30
Sadhyama Vengilat
a124aafc5f set cloud_status online after close_retry reset to fix sendMsg failure 2022-04-20 13:09:57 +05:30
Guru Chandru
0c4a2f407a Error Number value change 2022-04-18 12:25:20 +05:30
shilpa24balaji
523c9cc0d2 Merge pull request #390 from guruchandru/xmd_send
Changes to Send Success and Error ACK
2022-04-13 18:51:33 +05:30
Guru Chandru
58492950d6 Review comment changes 2022-04-13 13:18:21 +05:30
Guru Chandru
29906511b8 Changes to Send ACK for Success and Error Case 2022-04-12 17:56:34 +05:30
shilpa24balaji
c6fa4b2ce8 Merge pull request #388 from shilpa24balaji/wan_failover
Reconnect to cloud on every wan failover event
2022-04-12 16:43:59 +05:30
sadhyama
0640f3516f Merge pull request #386 from xmidt-org/xmidt_send
Xmidt send RBUS method to send events upstream
2022-04-08 20:35:23 +05:30
Shilpa Seshadri
13309a737e Reconnect to cloud on every wan failover event 2022-04-07 12:40:21 +05:30
8 changed files with 86 additions and 38 deletions

View File

@@ -143,7 +143,7 @@ void reset_cloud_disconnect_reason(ParodusCfg *cfg);
char *getWebpaInterface(void);
void set_cloud_status(char *status);
char *get_cloud_status(void);
int get_parodus_init();
/**
* parse a webpa url. Extract the server address, the port
* and return whether it's secure or not

View File

@@ -113,10 +113,6 @@ void createSocketConnection(void (* initKeypress)())
UpStreamMsgQ = NULL;
StartThread(handle_upstream, &upstream_tid);
StartThread(processUpstreamMessage, &upstream_msg_tid);
#ifdef ENABLE_WEBCFGBIN
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif

View File

@@ -132,7 +132,10 @@ void set_cloud_disconnect_time(int disconnTime)
cloud_disconnect_max_time = disconnTime;
}
int get_parodus_init()
{
return init;
}
//--------------------------------------------------------------------
// createNopollConnection_logic:
@@ -772,7 +775,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
}
#endif
}
if(conn_ctx.current_server->allow_insecure <= 0)
{
ParodusInfo("Connected to server over SSL\n");
@@ -783,9 +786,6 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
ParodusInfo("Connected to server\n");
OnboardLog("Connected to server\n");
}
set_cloud_status(CLOUD_STATUS_ONLINE);
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
/* On initial connect success, invoke conn status change event as "success" */
if((NULL != on_conn_status_change) && init)
@@ -817,6 +817,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
ParodusPrint("LastReasonStatus reset after successful connection\n");
setMessageHandlers();
stop_conn_in_progress ();
ParodusPrint("set cloud_status\n");
set_cloud_status(CLOUD_STATUS_ONLINE);
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
return nopoll_true;
}

View File

@@ -106,6 +106,10 @@ int main( int argc, char **argv)
ParodusInfo("********** Starting component: Parodus **********\n ");
drop_root_privilege();
#ifdef ENABLE_WEBCFGBIN
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif
setDefaultValuesToCfg(cfg);
if (0 != parseCommandLine(argc,argv,cfg)) {
abort();

View File

@@ -40,7 +40,7 @@
/*----------------------------------------------------------------------------*/
void *metadataPack;
size_t metaPackSize=-1;
size_t metaPackSize=0;
UpStreamMsg *UpStreamMsgQ = NULL;

View File

@@ -27,6 +27,9 @@
#include "upstream.h"
#include "ParodusInternal.h"
#include "partners_check.h"
#include "close_retry.h"
#include "connection.h"
#include "heartBeat.h"
#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream"
#ifdef WAN_FAILOVER_SUPPORTED
@@ -177,7 +180,20 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
setWebpaInterface(interface);
}
if(newValue !=NULL && oldValue!=NULL && interface!=NULL) {
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
// If interface is already down then reset it and reconnect cloud conn as wan failover event is received
if(get_interface_down_event())
{
reset_interface_down_event();
ParodusInfo("Interface_down_event is reset\n");
resume_heartBeatTimer();
}
// Close cloud conn and reconnect with the new interface as wan failover event is received
set_global_reconnect_reason("WAN_FAILOVER");
set_global_reconnect_status(true);
set_close_retry();
}
}
#endif

View File

@@ -38,6 +38,13 @@ pthread_mutex_t xmidt_mut=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t xmidt_con=PTHREAD_COND_INITIALIZER;
const char * contentTypeList[]={
"application/json",
"avro/binary",
"application/msgpack",
"application/binary"
};
bool highQosValueCheck(int qos)
{
if(qos > 24)
@@ -64,7 +71,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
char * errorMsg = strdup("Max Queue Size Exceeded");
ParodusError("Queue Size Exceeded\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED);
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
wrp_free_struct(msg);
return;
}
@@ -105,7 +112,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
char * errorMsg = strdup("Unable to enqueue");
ParodusError("failure in allocation for xmidt message\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE);
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
wrp_free_struct(msg);
}
return;
@@ -133,6 +140,14 @@ void* processXmidtUpstreamMsg()
int rv = 0;
while(FOREVER())
{
if(get_parodus_init())
{
ParodusInfo("Initial cloud connection is not established, Xmidt wait till connection up\n");
pthread_mutex_lock(get_global_cloud_status_mut());
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
pthread_mutex_unlock(get_global_cloud_status_mut());
ParodusInfo("Received cloud status signal proceed to event processing\n");
}
pthread_mutex_lock (&xmidt_mut);
ParodusPrint("mutex lock in xmidt consumer thread\n");
if(XmidtMsgQ != NULL)
@@ -182,7 +197,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
ParodusError("xmidtMsg is NULL\n");
errorMsg = strdup("Unable to enqueue");
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE);
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
return rv;
}
@@ -198,7 +213,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
else
{
ParodusError("validation failed, send failure ack\n");
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode);
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode, RBUS_ERROR_INVALID_INPUT);
xmidtQDequeue();
}
return rv;
@@ -259,6 +274,28 @@ int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode)
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
return 0;
}
else
{
int i =0, count = 0, valid = 0;
count = sizeof(contentTypeList)/sizeof(contentTypeList[0]);
for(i = 0; i<count; i++)
{
if (strcmp(eventMsg->u.event.content_type, contentTypeList[i]) == 0)
{
ParodusPrint("content_type is valid %s\n", contentTypeList[i]);
valid = 1;
break;
}
}
if (!valid)
{
ParodusError("content_type is not valid, %s\n", eventMsg->u.event.content_type);
*errorMsg = strdup("Invalid content_type");
*statusCode = INVALID_CONTENT_TYPE;
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
return 0;
}
}
if(eventMsg->u.event.payload == NULL)
{
@@ -338,11 +375,8 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
if(msg->u.event.content_type != NULL)
{
if(strcmp(msg->u.event.content_type , "JSON") == 0)
{
notif_wrp_msg->u.event.content_type = strdup("application/json");
}
ParodusPrint("content_type is %s\n",notif_wrp_msg->u.event.content_type);
notif_wrp_msg->u.event.content_type = msg->u.event.content_type;
ParodusInfo("content_type is %s\n",notif_wrp_msg->u.event.content_type);
}
if(msg->u.event.payload != NULL)
@@ -371,7 +405,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
{
ParodusError("wrp msg_len is zero\n");
errorMsg = strdup("Wrp message encoding failed");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
ParodusPrint("wrp_free_struct\n");
@@ -405,7 +439,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
{
errorMsg = strdup("send failed due to client disconnect");
ParodusInfo("The event is having low qos proceed to dequeue\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
break;
}
@@ -415,7 +449,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
if(sendRetStatus == 0)
{
errorMsg = strdup("send to server success");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, RBUS_ERROR_SUCCESS);
xmidtQDequeue();
}
@@ -436,7 +470,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
{
errorMsg = strdup("Memory allocation failed");
ParodusError("Memory allocation failed\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
}
@@ -445,14 +479,9 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
free(msg->u.event.source);
msg->u.event.source = NULL;
}
if(msg->u.event.content_type !=NULL)
{
free(msg->u.event.content_type);
msg->u.event.content_type = NULL;
}
}
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode)
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error)
{
rbusObject_t outParams;
rbusError_t err;
@@ -533,8 +562,7 @@ void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHand
return;
}
err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_SUCCESS, outParams);
//err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION, outParams); //for negative case
err = rbusMethod_SendAsyncResponse(asyncHandle, error, outParams);
if(err != RBUS_ERROR_SUCCESS)
{

View File

@@ -51,11 +51,12 @@ typedef enum
MISSING_CONTENT_TYPE,
MISSING_PAYLOAD,
MISSING_PAYLOADLEN,
QUEUE_SIZE_EXCEEDED,
WRP_ENCODE_FAILURE,
MSG_PROCESSING_FAILED,
INVALID_CONTENT_TYPE,
ENQUEUE_FAILURE = 100,
CLIENT_DISCONNECT = 101
CLIENT_DISCONNECT = 101,
QUEUE_SIZE_EXCEEDED = 102,
WRP_ENCODE_FAILURE = 103,
MSG_PROCESSING_FAILED = 104
} XMIDT_STATUS;
/*----------------------------------------------------------------------------*/
/* Function Prototypes */
@@ -70,7 +71,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
int checkInputParameters(rbusObject_t inParams);
char* generate_transaction_uuid();
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg);
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode);
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error);
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode);
void xmidtQDequeue();
bool highQosValueCheck(int qos);