Compare commits

..

22 Commits

Author SHA1 Message Date
shilpa24balaji
9abb37a005 Merge pull request #398 from Thanusha-D/log_change
Added NULL checks in value change event & Logging transaction_uuid in msg send
2022-06-17 12:17:47 +05:30
Thanusha D
c0edaaf31c Added logs for debugging 2022-06-16 21:16:00 +05:30
shilpa24balaji
34802d329d Merge pull request #394 from sadhyama/xmd_content
Add content_type validation for Xmidt method profiles
2022-05-27 23:07:53 +05:30
Sadhyama Vengilat
0297d994b5 Reduce debug log 2022-05-27 19:47:39 +05:30
Sadhyama Vengilat
ee7033c36c Add contentType list and validate 2022-05-27 12:59:38 +05:30
Sadhyama Vengilat
e35221efd2 Add content_type validation for Xmidt method profiles 2022-05-26 20:36:11 +05:30
sadhyama
c898fe6d0e Merge pull request #393 from sadhyama/def_profile
Handle Xmidt default profile events received before connection
2022-05-13 18:55:25 +05:30
Sadhyama Vengilat
0017e48f7d Remove content_type modification 2022-05-13 18:48:30 +05:30
Sadhyama Vengilat
285a461f42 Disable debug logs 2022-05-11 18:16:25 +05:30
Sadhyama Vengilat
bb0f49a8b3 Added delay on parodus init 2022-05-10 19:26:33 +05:30
Sadhyama Vengilat
ee20e1fcb8 Handle xmidt default profiles received before initial cloud connection 2022-05-09 20:31:07 +05:30
sadhyama
becacd8990 Merge pull request #392 from sadhyama/parodus_highqos
set cloud_status online after close_retry reset to fix Xmidt sendMsg failure
2022-04-20 16:13:53 +05:30
Sadhyama Vengilat
b05bfbe1fa Merge remote-tracking branch 'upstream/master' into parodus_highqos 2022-04-20 16:13:10 +05:30
sadhyama
9fb3f118b7 Merge pull request #391 from guruchandru/xmd_send
Error Number value change
2022-04-20 16:11:22 +05:30
Sadhyama Vengilat
a124aafc5f set cloud_status online after close_retry reset to fix sendMsg failure 2022-04-20 13:09:57 +05:30
Guru Chandru
0c4a2f407a Error Number value change 2022-04-18 12:25:20 +05:30
shilpa24balaji
523c9cc0d2 Merge pull request #390 from guruchandru/xmd_send
Changes to Send Success and Error ACK
2022-04-13 18:51:33 +05:30
Guru Chandru
58492950d6 Review comment changes 2022-04-13 13:18:21 +05:30
Guru Chandru
29906511b8 Changes to Send ACK for Success and Error Case 2022-04-12 17:56:34 +05:30
shilpa24balaji
c6fa4b2ce8 Merge pull request #388 from shilpa24balaji/wan_failover
Reconnect to cloud on every wan failover event
2022-04-12 16:43:59 +05:30
sadhyama
0640f3516f Merge pull request #386 from xmidt-org/xmidt_send
Xmidt send RBUS method to send events upstream
2022-04-08 20:35:23 +05:30
Shilpa Seshadri
13309a737e Reconnect to cloud on every wan failover event 2022-04-07 12:40:21 +05:30
7 changed files with 111 additions and 43 deletions

View File

@@ -143,7 +143,7 @@ void reset_cloud_disconnect_reason(ParodusCfg *cfg);
char *getWebpaInterface(void);
void set_cloud_status(char *status);
char *get_cloud_status(void);
int get_parodus_init();
/**
* parse a webpa url. Extract the server address, the port
* and return whether it's secure or not

View File

@@ -113,10 +113,6 @@ void createSocketConnection(void (* initKeypress)())
UpStreamMsgQ = NULL;
StartThread(handle_upstream, &upstream_tid);
StartThread(processUpstreamMessage, &upstream_msg_tid);
#ifdef ENABLE_WEBCFGBIN
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif

View File

@@ -132,7 +132,10 @@ void set_cloud_disconnect_time(int disconnTime)
cloud_disconnect_max_time = disconnTime;
}
int get_parodus_init()
{
return init;
}
//--------------------------------------------------------------------
// createNopollConnection_logic:
@@ -772,7 +775,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
}
#endif
}
if(conn_ctx.current_server->allow_insecure <= 0)
{
ParodusInfo("Connected to server over SSL\n");
@@ -783,9 +786,6 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
ParodusInfo("Connected to server\n");
OnboardLog("Connected to server\n");
}
set_cloud_status(CLOUD_STATUS_ONLINE);
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
/* On initial connect success, invoke conn status change event as "success" */
if((NULL != on_conn_status_change) && init)
@@ -817,6 +817,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
ParodusPrint("LastReasonStatus reset after successful connection\n");
setMessageHandlers();
stop_conn_in_progress ();
ParodusPrint("set cloud_status\n");
set_cloud_status(CLOUD_STATUS_ONLINE);
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
return nopoll_true;
}

View File

@@ -106,6 +106,10 @@ int main( int argc, char **argv)
ParodusInfo("********** Starting component: Parodus **********\n ");
drop_root_privilege();
#ifdef ENABLE_WEBCFGBIN
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif
setDefaultValuesToCfg(cfg);
if (0 != parseCommandLine(argc,argv,cfg)) {
abort();

View File

@@ -27,6 +27,9 @@
#include "upstream.h"
#include "ParodusInternal.h"
#include "partners_check.h"
#include "close_retry.h"
#include "connection.h"
#include "heartBeat.h"
#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream"
#ifdef WAN_FAILOVER_SUPPORTED
@@ -171,13 +174,38 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
rbusValue_t oldValue = rbusObject_GetValue(event->data, "oldValue");
ParodusInfo("Consumer received ValueChange event for param %s\n", event->name);
if(newValue) {
interface = (char *) rbusValue_GetString(newValue, NULL);
setWebpaInterface(interface);
}
}
else {
ParodusError("newValue is NULL\n");
}
if(newValue !=NULL && oldValue!=NULL && interface!=NULL) {
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
// If interface is already down then reset it and reconnect cloud conn as wan failover event is received
if(get_interface_down_event())
{
reset_interface_down_event();
ParodusInfo("Interface_down_event is reset\n");
resume_heartBeatTimer();
}
// Close cloud conn and reconnect with the new interface as wan failover event is received
set_global_reconnect_reason("WAN_FAILOVER");
set_global_reconnect_status(true);
set_close_retry();
}
else {
if(oldValue == NULL) {
ParodusError("oldValue is NULL\n");
}
if(interface == NULL) {
ParodusError("interface is NULL\n");
}
}
}
#endif

View File

@@ -38,6 +38,17 @@ pthread_mutex_t xmidt_mut=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t xmidt_con=PTHREAD_COND_INITIALIZER;
const char * contentTypeList[]={
"application/json",
"avro/binary",
"application/msgpack",
"application/binary"
};
void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid) {
ParodusInfo("status: %s, qos: %d, dest: %s, transaction_uuid: %s\n", (status!=NULL)?status:"NULL", qos, (dest!=NULL)?dest:"NULL", (transaction_uuid!=NULL)?transaction_uuid:"NULL");
}
bool highQosValueCheck(int qos)
{
if(qos > 24)
@@ -64,7 +75,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
char * errorMsg = strdup("Max Queue Size Exceeded");
ParodusError("Queue Size Exceeded\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED);
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
wrp_free_struct(msg);
return;
}
@@ -105,7 +116,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
char * errorMsg = strdup("Unable to enqueue");
ParodusError("failure in allocation for xmidt message\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE);
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
wrp_free_struct(msg);
}
return;
@@ -133,6 +144,14 @@ void* processXmidtUpstreamMsg()
int rv = 0;
while(FOREVER())
{
if(get_parodus_init())
{
ParodusInfo("Initial cloud connection is not established, Xmidt wait till connection up\n");
pthread_mutex_lock(get_global_cloud_status_mut());
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
pthread_mutex_unlock(get_global_cloud_status_mut());
ParodusInfo("Received cloud status signal proceed to event processing\n");
}
pthread_mutex_lock (&xmidt_mut);
ParodusPrint("mutex lock in xmidt consumer thread\n");
if(XmidtMsgQ != NULL)
@@ -182,7 +201,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
{
ParodusError("xmidtMsg is NULL\n");
errorMsg = strdup("Unable to enqueue");
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE);
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
return rv;
}
@@ -198,7 +217,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
else
{
ParodusError("validation failed, send failure ack\n");
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode);
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode, RBUS_ERROR_INVALID_INPUT);
xmidtQDequeue();
}
return rv;
@@ -259,6 +278,28 @@ int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode)
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
return 0;
}
else
{
int i =0, count = 0, valid = 0;
count = sizeof(contentTypeList)/sizeof(contentTypeList[0]);
for(i = 0; i<count; i++)
{
if (strcmp(eventMsg->u.event.content_type, contentTypeList[i]) == 0)
{
ParodusPrint("content_type is valid %s\n", contentTypeList[i]);
valid = 1;
break;
}
}
if (!valid)
{
ParodusError("content_type is not valid, %s\n", eventMsg->u.event.content_type);
*errorMsg = strdup("Invalid content_type");
*statusCode = INVALID_CONTENT_TYPE;
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
return 0;
}
}
if(eventMsg->u.event.payload == NULL)
{
@@ -338,11 +379,8 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
if(msg->u.event.content_type != NULL)
{
if(strcmp(msg->u.event.content_type , "JSON") == 0)
{
notif_wrp_msg->u.event.content_type = strdup("application/json");
}
ParodusPrint("content_type is %s\n",notif_wrp_msg->u.event.content_type);
notif_wrp_msg->u.event.content_type = msg->u.event.content_type;
ParodusInfo("content_type is %s\n",notif_wrp_msg->u.event.content_type);
}
if(msg->u.event.payload != NULL)
@@ -365,13 +403,14 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
if(msg_len > 0)
{
ParodusPrint("sendUpstreamMsgToServer\n");
printSendMsgData("send to server", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
}
else
{
ParodusError("wrp msg_len is zero\n");
errorMsg = strdup("Wrp message encoding failed");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
ParodusPrint("wrp_free_struct\n");
@@ -395,27 +434,29 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
{
ParodusPrint("The event is having high qos retry again\n");
ParodusInfo("Wait till connection is Up\n");
pthread_mutex_lock(get_global_cloud_status_mut());
pthread_mutex_lock(get_global_cloud_status_mut());
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
pthread_mutex_unlock(get_global_cloud_status_mut());
ParodusInfo("Received cloud status signal proceed to retry\n");
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
}
else
{
errorMsg = strdup("send failed due to client disconnect");
ParodusInfo("The event is having low qos proceed to dequeue\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT);
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
break;
}
}
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
}
if(sendRetStatus == 0)
{
errorMsg = strdup("send to server success");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, RBUS_ERROR_SUCCESS);
xmidtQDequeue();
}
@@ -436,7 +477,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
{
errorMsg = strdup("Memory allocation failed");
ParodusError("Memory allocation failed\n");
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
xmidtQDequeue();
}
@@ -445,14 +486,9 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
free(msg->u.event.source);
msg->u.event.source = NULL;
}
if(msg->u.event.content_type !=NULL)
{
free(msg->u.event.content_type);
msg->u.event.content_type = NULL;
}
}
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode)
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error)
{
rbusObject_t outParams;
rbusError_t err;
@@ -533,8 +569,7 @@ void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHand
return;
}
err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_SUCCESS, outParams);
//err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION, outParams); //for negative case
err = rbusMethod_SendAsyncResponse(asyncHandle, error, outParams);
if(err != RBUS_ERROR_SUCCESS)
{

View File

@@ -51,11 +51,12 @@ typedef enum
MISSING_CONTENT_TYPE,
MISSING_PAYLOAD,
MISSING_PAYLOADLEN,
QUEUE_SIZE_EXCEEDED,
WRP_ENCODE_FAILURE,
MSG_PROCESSING_FAILED,
INVALID_CONTENT_TYPE,
ENQUEUE_FAILURE = 100,
CLIENT_DISCONNECT = 101
CLIENT_DISCONNECT = 101,
QUEUE_SIZE_EXCEEDED = 102,
WRP_ENCODE_FAILURE = 103,
MSG_PROCESSING_FAILED = 104
} XMIDT_STATUS;
/*----------------------------------------------------------------------------*/
/* Function Prototypes */
@@ -70,9 +71,10 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
int checkInputParameters(rbusObject_t inParams);
char* generate_transaction_uuid();
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg);
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode);
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error);
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode);
void xmidtQDequeue();
void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid);
bool highQosValueCheck(int qos);
void waitTillConnectionIsUp();
void printRBUSParams(rbusObject_t params, char* file_path);