mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-27 18:20:04 +00:00
Compare commits
22 Commits
xmidt_send
...
2.0.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9abb37a005 | ||
|
|
c0edaaf31c | ||
|
|
34802d329d | ||
|
|
0297d994b5 | ||
|
|
ee7033c36c | ||
|
|
e35221efd2 | ||
|
|
c898fe6d0e | ||
|
|
0017e48f7d | ||
|
|
285a461f42 | ||
|
|
bb0f49a8b3 | ||
|
|
ee20e1fcb8 | ||
|
|
becacd8990 | ||
|
|
b05bfbe1fa | ||
|
|
9fb3f118b7 | ||
|
|
a124aafc5f | ||
|
|
0c4a2f407a | ||
|
|
523c9cc0d2 | ||
|
|
58492950d6 | ||
|
|
29906511b8 | ||
|
|
c6fa4b2ce8 | ||
|
|
0640f3516f | ||
|
|
13309a737e |
@@ -143,7 +143,7 @@ void reset_cloud_disconnect_reason(ParodusCfg *cfg);
|
||||
char *getWebpaInterface(void);
|
||||
void set_cloud_status(char *status);
|
||||
char *get_cloud_status(void);
|
||||
|
||||
int get_parodus_init();
|
||||
/**
|
||||
* parse a webpa url. Extract the server address, the port
|
||||
* and return whether it's secure or not
|
||||
|
||||
@@ -113,10 +113,6 @@ void createSocketConnection(void (* initKeypress)())
|
||||
UpStreamMsgQ = NULL;
|
||||
StartThread(handle_upstream, &upstream_tid);
|
||||
StartThread(processUpstreamMessage, &upstream_msg_tid);
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
subscribeRBUSevent();
|
||||
regXmidtSendDataMethod();
|
||||
#endif
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
subscribeCurrentActiveInterfaceEvent();
|
||||
#endif
|
||||
|
||||
@@ -132,7 +132,10 @@ void set_cloud_disconnect_time(int disconnTime)
|
||||
cloud_disconnect_max_time = disconnTime;
|
||||
}
|
||||
|
||||
|
||||
int get_parodus_init()
|
||||
{
|
||||
return init;
|
||||
}
|
||||
//--------------------------------------------------------------------
|
||||
// createNopollConnection_logic:
|
||||
|
||||
@@ -772,7 +775,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
if(conn_ctx.current_server->allow_insecure <= 0)
|
||||
{
|
||||
ParodusInfo("Connected to server over SSL\n");
|
||||
@@ -783,9 +786,6 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
ParodusInfo("Connected to server\n");
|
||||
OnboardLog("Connected to server\n");
|
||||
}
|
||||
|
||||
set_cloud_status(CLOUD_STATUS_ONLINE);
|
||||
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
|
||||
|
||||
/* On initial connect success, invoke conn status change event as "success" */
|
||||
if((NULL != on_conn_status_change) && init)
|
||||
@@ -817,6 +817,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
ParodusPrint("LastReasonStatus reset after successful connection\n");
|
||||
setMessageHandlers();
|
||||
stop_conn_in_progress ();
|
||||
ParodusPrint("set cloud_status\n");
|
||||
set_cloud_status(CLOUD_STATUS_ONLINE);
|
||||
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
|
||||
return nopoll_true;
|
||||
}
|
||||
|
||||
|
||||
@@ -106,6 +106,10 @@ int main( int argc, char **argv)
|
||||
|
||||
ParodusInfo("********** Starting component: Parodus **********\n ");
|
||||
drop_root_privilege();
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
subscribeRBUSevent();
|
||||
regXmidtSendDataMethod();
|
||||
#endif
|
||||
setDefaultValuesToCfg(cfg);
|
||||
if (0 != parseCommandLine(argc,argv,cfg)) {
|
||||
abort();
|
||||
|
||||
@@ -27,6 +27,9 @@
|
||||
#include "upstream.h"
|
||||
#include "ParodusInternal.h"
|
||||
#include "partners_check.h"
|
||||
#include "close_retry.h"
|
||||
#include "connection.h"
|
||||
#include "heartBeat.h"
|
||||
|
||||
#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream"
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
@@ -171,13 +174,38 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
|
||||
rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
|
||||
rbusValue_t oldValue = rbusObject_GetValue(event->data, "oldValue");
|
||||
ParodusInfo("Consumer received ValueChange event for param %s\n", event->name);
|
||||
|
||||
|
||||
if(newValue) {
|
||||
interface = (char *) rbusValue_GetString(newValue, NULL);
|
||||
setWebpaInterface(interface);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ParodusError("newValue is NULL\n");
|
||||
}
|
||||
|
||||
if(newValue !=NULL && oldValue!=NULL && interface!=NULL) {
|
||||
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
|
||||
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
|
||||
|
||||
// If interface is already down then reset it and reconnect cloud conn as wan failover event is received
|
||||
if(get_interface_down_event())
|
||||
{
|
||||
reset_interface_down_event();
|
||||
ParodusInfo("Interface_down_event is reset\n");
|
||||
resume_heartBeatTimer();
|
||||
}
|
||||
// Close cloud conn and reconnect with the new interface as wan failover event is received
|
||||
set_global_reconnect_reason("WAN_FAILOVER");
|
||||
set_global_reconnect_status(true);
|
||||
set_close_retry();
|
||||
|
||||
}
|
||||
else {
|
||||
if(oldValue == NULL) {
|
||||
ParodusError("oldValue is NULL\n");
|
||||
}
|
||||
if(interface == NULL) {
|
||||
ParodusError("interface is NULL\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -38,6 +38,17 @@ pthread_mutex_t xmidt_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
pthread_cond_t xmidt_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
const char * contentTypeList[]={
|
||||
"application/json",
|
||||
"avro/binary",
|
||||
"application/msgpack",
|
||||
"application/binary"
|
||||
};
|
||||
|
||||
void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid) {
|
||||
ParodusInfo("status: %s, qos: %d, dest: %s, transaction_uuid: %s\n", (status!=NULL)?status:"NULL", qos, (dest!=NULL)?dest:"NULL", (transaction_uuid!=NULL)?transaction_uuid:"NULL");
|
||||
}
|
||||
|
||||
bool highQosValueCheck(int qos)
|
||||
{
|
||||
if(qos > 24)
|
||||
@@ -64,7 +75,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
char * errorMsg = strdup("Max Queue Size Exceeded");
|
||||
ParodusError("Queue Size Exceeded\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
wrp_free_struct(msg);
|
||||
return;
|
||||
}
|
||||
@@ -105,7 +116,7 @@ void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
char * errorMsg = strdup("Unable to enqueue");
|
||||
ParodusError("failure in allocation for xmidt message\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
wrp_free_struct(msg);
|
||||
}
|
||||
return;
|
||||
@@ -133,6 +144,14 @@ void* processXmidtUpstreamMsg()
|
||||
int rv = 0;
|
||||
while(FOREVER())
|
||||
{
|
||||
if(get_parodus_init())
|
||||
{
|
||||
ParodusInfo("Initial cloud connection is not established, Xmidt wait till connection up\n");
|
||||
pthread_mutex_lock(get_global_cloud_status_mut());
|
||||
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
|
||||
pthread_mutex_unlock(get_global_cloud_status_mut());
|
||||
ParodusInfo("Received cloud status signal proceed to event processing\n");
|
||||
}
|
||||
pthread_mutex_lock (&xmidt_mut);
|
||||
ParodusPrint("mutex lock in xmidt consumer thread\n");
|
||||
if(XmidtMsgQ != NULL)
|
||||
@@ -182,7 +201,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
ParodusError("xmidtMsg is NULL\n");
|
||||
errorMsg = strdup("Unable to enqueue");
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE);
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
xmidtQDequeue();
|
||||
return rv;
|
||||
}
|
||||
@@ -198,7 +217,7 @@ int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
else
|
||||
{
|
||||
ParodusError("validation failed, send failure ack\n");
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode);
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode, RBUS_ERROR_INVALID_INPUT);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
return rv;
|
||||
@@ -259,6 +278,28 @@ int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode)
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
int i =0, count = 0, valid = 0;
|
||||
count = sizeof(contentTypeList)/sizeof(contentTypeList[0]);
|
||||
for(i = 0; i<count; i++)
|
||||
{
|
||||
if (strcmp(eventMsg->u.event.content_type, contentTypeList[i]) == 0)
|
||||
{
|
||||
ParodusPrint("content_type is valid %s\n", contentTypeList[i]);
|
||||
valid = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!valid)
|
||||
{
|
||||
ParodusError("content_type is not valid, %s\n", eventMsg->u.event.content_type);
|
||||
*errorMsg = strdup("Invalid content_type");
|
||||
*statusCode = INVALID_CONTENT_TYPE;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.payload == NULL)
|
||||
{
|
||||
@@ -338,11 +379,8 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
|
||||
if(msg->u.event.content_type != NULL)
|
||||
{
|
||||
if(strcmp(msg->u.event.content_type , "JSON") == 0)
|
||||
{
|
||||
notif_wrp_msg->u.event.content_type = strdup("application/json");
|
||||
}
|
||||
ParodusPrint("content_type is %s\n",notif_wrp_msg->u.event.content_type);
|
||||
notif_wrp_msg->u.event.content_type = msg->u.event.content_type;
|
||||
ParodusInfo("content_type is %s\n",notif_wrp_msg->u.event.content_type);
|
||||
}
|
||||
|
||||
if(msg->u.event.payload != NULL)
|
||||
@@ -365,13 +403,14 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
if(msg_len > 0)
|
||||
{
|
||||
ParodusPrint("sendUpstreamMsgToServer\n");
|
||||
printSendMsgData("send to server", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("wrp msg_len is zero\n");
|
||||
errorMsg = strdup("Wrp message encoding failed");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
xmidtQDequeue();
|
||||
|
||||
ParodusPrint("wrp_free_struct\n");
|
||||
@@ -395,27 +434,29 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
{
|
||||
ParodusPrint("The event is having high qos retry again\n");
|
||||
ParodusInfo("Wait till connection is Up\n");
|
||||
|
||||
pthread_mutex_lock(get_global_cloud_status_mut());
|
||||
|
||||
pthread_mutex_lock(get_global_cloud_status_mut());
|
||||
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
|
||||
pthread_mutex_unlock(get_global_cloud_status_mut());
|
||||
ParodusInfo("Received cloud status signal proceed to retry\n");
|
||||
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
errorMsg = strdup("send failed due to client disconnect");
|
||||
ParodusInfo("The event is having low qos proceed to dequeue\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT);
|
||||
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
xmidtQDequeue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
|
||||
}
|
||||
|
||||
|
||||
if(sendRetStatus == 0)
|
||||
{
|
||||
errorMsg = strdup("send to server success");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, RBUS_ERROR_SUCCESS);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
|
||||
@@ -436,7 +477,7 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
{
|
||||
errorMsg = strdup("Memory allocation failed");
|
||||
ParodusError("Memory allocation failed\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
|
||||
@@ -445,14 +486,9 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
free(msg->u.event.source);
|
||||
msg->u.event.source = NULL;
|
||||
}
|
||||
if(msg->u.event.content_type !=NULL)
|
||||
{
|
||||
free(msg->u.event.content_type);
|
||||
msg->u.event.content_type = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode)
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error)
|
||||
{
|
||||
rbusObject_t outParams;
|
||||
rbusError_t err;
|
||||
@@ -533,8 +569,7 @@ void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHand
|
||||
return;
|
||||
}
|
||||
|
||||
err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_SUCCESS, outParams);
|
||||
//err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION, outParams); //for negative case
|
||||
err = rbusMethod_SendAsyncResponse(asyncHandle, error, outParams);
|
||||
|
||||
if(err != RBUS_ERROR_SUCCESS)
|
||||
{
|
||||
|
||||
@@ -51,11 +51,12 @@ typedef enum
|
||||
MISSING_CONTENT_TYPE,
|
||||
MISSING_PAYLOAD,
|
||||
MISSING_PAYLOADLEN,
|
||||
QUEUE_SIZE_EXCEEDED,
|
||||
WRP_ENCODE_FAILURE,
|
||||
MSG_PROCESSING_FAILED,
|
||||
INVALID_CONTENT_TYPE,
|
||||
ENQUEUE_FAILURE = 100,
|
||||
CLIENT_DISCONNECT = 101
|
||||
CLIENT_DISCONNECT = 101,
|
||||
QUEUE_SIZE_EXCEEDED = 102,
|
||||
WRP_ENCODE_FAILURE = 103,
|
||||
MSG_PROCESSING_FAILED = 104
|
||||
} XMIDT_STATUS;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
@@ -70,9 +71,10 @@ void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle
|
||||
int checkInputParameters(rbusObject_t inParams);
|
||||
char* generate_transaction_uuid();
|
||||
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg);
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode);
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error);
|
||||
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode);
|
||||
void xmidtQDequeue();
|
||||
void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid);
|
||||
bool highQosValueCheck(int qos);
|
||||
void waitTillConnectionIsUp();
|
||||
void printRBUSParams(rbusObject_t params, char* file_path);
|
||||
|
||||
Reference in New Issue
Block a user