Compare commits

..

1 Commits

Author SHA1 Message Date
Sadhyama Vengilat
6952647ce3 Subscribe to CurrentActiveInterfaceEvent before initial cloud connection 2023-02-24 12:39:41 +05:30
14 changed files with 48 additions and 213 deletions

View File

@@ -47,7 +47,6 @@ if (ENABLE_WEBCFGBIN)
include_directories(${INCLUDE_DIR}/rbus)
endif (ENABLE_WEBCFGBIN)
# Get git commit hash
#-------------------------------------------------------------------------------
execute_process(
@@ -64,18 +63,9 @@ add_definitions("-DGIT_COMMIT_TAG=\"${GIT_COMMIT_TAG}\"")
add_definitions(-std=c99)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE -DNOPOLL_LOGGER ")
if (DEVICE_CAMERA)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
add_definitions(-DDEVICE_CAMERA)
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
endif (DEVICE_CAMERA)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Wall -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall")
if (INCLUDE_BREAKPAD)
add_definitions(-DINCLUDE_BREAKPAD)
endif (INCLUDE_BREAKPAD)
# pthread external dependency
#-------------------------------------------------------------------------------

View File

@@ -164,7 +164,6 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
#ifdef ENABLE_WEBCFGBIN
void subscribeRBUSevent();
int regXmidtSendDataMethod();
void registerRbusLogger();
#endif
#ifdef WAN_FAILOVER_SUPPORTED
void setWebpaInterface(char *value);

View File

@@ -223,14 +223,8 @@ void getAuthToken(ParodusCfg *cfg)
* @param[in] nmemb size of delivered data
* @param[out] data curl response data saved.
*/
#ifndef DEVICE_CAMERA
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data)
{
#else
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *datain)
{
struct token_data *data = (struct token_data*) datain;
#endif //DEVICE_CAMERA
ParodusCfg *cfg;
size_t max_data_size = sizeof (cfg->webpa_auth_token);
size_t index = data->size;

View File

@@ -47,11 +47,7 @@ struct token_data {
int requestNewAuthToken(char *newToken, size_t len, int r_count);
void getAuthToken(ParodusCfg *cfg);
#ifndef DEVICE_CAMERA
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data);
#else
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *data);
#endif
char* generate_trans_uuid();
#ifdef __cplusplus

View File

@@ -711,53 +711,6 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
return 0;
}
void free_cfg(ParodusCfg *cfg)
{
if(cfg != NULL)
{
if (cfg->mtls_client_cert_path != NULL )
{
free(cfg->mtls_client_cert_path);
cfg->mtls_client_cert_path = NULL;
}
if(cfg->connection_health_file != NULL)
{
free(cfg->connection_health_file);
cfg->connection_health_file = NULL;
}
if(cfg->token_server_url != NULL)
{
free(cfg->token_server_url );
cfg->token_server_url = NULL;
}
if(cfg->mtls_client_key_path != NULL)
{
free(cfg->mtls_client_key_path);
cfg->mtls_client_key_path = NULL;
}
if(cfg->client_cert_path != NULL)
{
free(cfg->client_cert_path);
cfg->client_cert_path = NULL;
}
if(cfg->crud_config_file != NULL)
{
free(cfg->crud_config_file);
cfg->crud_config_file = NULL;
}
if(cfg->close_reason_file != NULL)
{
free(cfg->close_reason_file);
cfg->close_reason_file = NULL;
}
if(cfg->cloud_disconnect != NULL)
{
free(cfg->cloud_disconnect);
cfg->cloud_disconnect = NULL;
}
}
}
void setDefaultValuesToCfg(ParodusCfg *cfg)
{
if(cfg == NULL)

View File

@@ -135,7 +135,7 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg);
* or -1 if error
*/
int parseCommandLine(int argc,char **argv,ParodusCfg * cfg);
void free_cfg(ParodusCfg *cfg);
void setDefaultValuesToCfg(ParodusCfg *cfg);
// Accessor for the global config structure.
ParodusCfg *get_parodus_cfg(void);

View File

@@ -231,9 +231,6 @@ void createSocketConnection(void (* initKeypress)())
nopoll_ctx_unref(ctx);
nopoll_cleanup_library();
curl_global_cleanup();
clear_metadata();
rdk_logger_deinit();
free_server_list(&server_list);
}
void shutdownSocketConnection(char *reason) {

View File

@@ -783,7 +783,8 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
}
#endif
}
if(conn_ctx.current_server != NULL && conn_ctx.current_server->allow_insecure <= 0)
if(conn_ctx.current_server->allow_insecure <= 0)
{
ParodusInfo("Connected to server over SSL\n");
OnboardLog("Connected to server over SSL\n");

View File

@@ -24,11 +24,7 @@
#include "parodus_log.h"
#include <curl/curl.h>
#ifdef INCLUDE_BREAKPAD
#ifndef DEVICE_CAMERA
#include "breakpad_wrapper.h"
#else
#include "breakpadwrap.h"
#endif //DEVICE_CAMERA
#endif
#include "signal.h"
#include "privilege.h"
@@ -91,17 +87,8 @@ int main( int argc, char **argv)
signal(SIGHUP, sig_handler);
signal(SIGALRM, sig_handler);
#ifdef INCLUDE_BREAKPAD
#ifndef DEVICE_CAMERA
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
breakpad_ExceptionHandler();
#else
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
BreakPadWrapExceptionHandler eh;
eh = newBreakPadWrapExceptionHandler();
if(NULL != eh) {
ParodusInfo("Breakpad Initialized\n");
}
#endif //DEVICE_CAMERA
#else
signal(SIGSEGV, sig_handler);
signal(SIGBUS, sig_handler);
@@ -120,7 +107,6 @@ int main( int argc, char **argv)
ParodusInfo("********** Starting component: Parodus **********\n ");
drop_root_privilege();
#ifdef ENABLE_WEBCFGBIN
registerRbusLogger();
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif
@@ -131,7 +117,7 @@ int main( int argc, char **argv)
curl_global_init(CURL_GLOBAL_DEFAULT);
createSocketConnection( NULL);
free_cfg(cfg);
return 0;
}

View File

@@ -110,11 +110,6 @@ void packMetaData()
ParodusError("Failed to encode metadata\n");
}
}
void clear_metadata(){
if(metadataPack != NULL)
free(metadataPack);
}
/*
* @brief To handle UpStream messages which is received from nanomsg server socket
@@ -325,10 +320,7 @@ void *processUpstreamMessage()
}
else if(msgType == WRP_MSG_TYPE__EVENT)
{
(msg->u.event.headers != NULL && msg->u.event.headers->headers[0] != NULL && msg->u.event.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream event data: dest '%s' traceParent: %s traceState: %s\n", msg->u.event.dest, msg->u.event.headers->headers[0], msg->u.event.headers->headers[1]) : ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
if(msg->u.event.transaction_uuid != NULL) {
ParodusInfo("transaction_uuid in event: %s\n", msg->u.event.transaction_uuid);
}
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
partners_t *partnersList = NULL;
int j = 0;
@@ -387,7 +379,7 @@ void *processUpstreamMessage()
//Sending to server for msgTypes 3, 5, 6, 7, 8.
if( WRP_MSG_TYPE__REQ == msgType )
{
(msg->u.req.headers != NULL && msg->u.req.headers->headers[0] != NULL && msg->u.req.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s traceParent: %s traceState: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid, msg->u.req.headers->headers[0], msg->u.req.headers->headers[1]) : ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid);
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid );
sendUpstreamMsgToServer(&message->msg, message->len);
}
else

View File

@@ -59,7 +59,6 @@ int subscribeCurrentActiveInterfaceEvent();
UpStreamMsg * get_global_UpStreamMsgQ(void);
pthread_cond_t *get_global_nano_con(void);
pthread_mutex_t *get_global_nano_mut(void);
void clear_metadata();
#ifdef __cplusplus
}

View File

@@ -47,38 +47,6 @@ rbusHandle_t get_parodus_rbus_Handle(void)
{
return rbus_Handle;
}
/* Enables rbus ERROR level logs in parodus. Modify RBUS_LOG_ERROR check if more debug logs are needed from rbus. */
void rbus_log_handler(
rbusLogLevel level,
const char* file,
int line,
int threadId,
char* message)
{
ParodusPrint("threadId %d\n", threadId);
const char* slevel = "";
if(level < RBUS_LOG_ERROR)
return;
switch(level)
{
case RBUS_LOG_DEBUG: slevel = "DEBUG"; break;
case RBUS_LOG_INFO: slevel = "INFO"; break;
case RBUS_LOG_WARN: slevel = "WARN"; break;
case RBUS_LOG_ERROR: slevel = "ERROR"; break;
case RBUS_LOG_FATAL: slevel = "FATAL"; break;
}
ParodusInfo("%5s %s:%d -- %s\n", slevel, file, line, message);
}
void registerRbusLogger()
{
rbus_registerLogHandler(rbus_log_handler);
ParodusPrint("Registered rbus log handler\n");
}
#ifdef WAN_FAILOVER_SUPPORTED
void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription );
#endif

View File

@@ -31,7 +31,6 @@
#include "config.h"
#include "time.h"
#include "heartBeat.h"
#include "close_retry.h"
static pthread_t processThreadId = 0;
static unsigned int XmidtQsize = 0;
@@ -71,22 +70,6 @@ bool highQosValueCheck(int qos)
return false;
}
//To handle high priority low qos message to confirm send success and ignore cloud ack.
bool higherPriorityLowQosCheck(int qos)
{
if(qos > 20 && qos < 25)
{
ParodusInfo("The low qos msg with higher priority\n");
return true;
}
else
{
ParodusPrint("The qos is not higher priority low qos\n");
}
return false;
}
XmidtMsg * get_global_xmidthead(void)
{
XmidtMsg *tmp = NULL;
@@ -139,9 +122,9 @@ void decrement_XmidtQsize()
int checkCloudConn()
{
int ret = 1;
if (get_close_retry() || !cloud_status_is_online ())
if (!cloud_status_is_online ())
{
ParodusInfo("close_retry is in progress or cloud status is not online, wait till connection up\n");
ParodusInfo("cloud status is not online, wait till connection up\n");
int rv;
struct timespec ts;
@@ -231,15 +214,8 @@ int xmidtQOptmize()
{
if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size)
{
if(higherPriorityLowQosCheck(tempMsg->u.event.qos))
{
ParodusInfo("Skip max queue size delete for qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
}
else
{
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
del = 2;
}
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
del = 2;
}
}
}
@@ -400,9 +376,10 @@ void* processXmidtUpstreamMsg()
{
XmidtMsg *Data = xmidtQ;
pthread_mutex_unlock (&xmidt_mut);
ParodusPrint("mutex unlock in xmidt consumer\n");
checkMsgExpiry(xmidtQ);
checkMaxQandOptimize(xmidtQ);
ParodusPrint("mutex unlock in xmidt consumer thread\n");
checkMsgExpiry();
checkMaxQandOptimize();
cv = 0;
ParodusPrint("check state\n");
@@ -770,9 +747,10 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
while(sendRetStatus) //If SendMessage is failed condition
{
ParodusError("sendXmidtEventToServer is Failed\n");
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
if(highQosValueCheck(qos))
{
ParodusPrint("The event is having high qos retry again, wait till connection is Up\n");
ParodusPrint("The event is having high qos retry again\n");
ParodusInfo("Wait till connection is Up\n");
rv = checkCloudConn();
if(rv == 2)
{
@@ -780,7 +758,7 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
break;
}
ParodusInfo("Received cloud status signal proceed to retry\n");
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
}
else
{
@@ -820,26 +798,13 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
}
else
{
if(higherPriorityLowQosCheck(qos))
{
ParodusInfo("Higher priority low qos send success, ignore cloud ack\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
ParodusPrint("update state to DELETE\n");
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
}
else
{
ParodusInfo("Low qos event, send success callback and delete\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
//print_xmidMsg_list();
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
}
ParodusInfo("Low qos event, send success callback and delete\n");
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
//print_xmidMsg_list();
updateXmidtState(msgnode, DELETE);
print_xmidMsg_list();
}
}
@@ -1568,16 +1533,16 @@ int deleteFromXmidtQ(XmidtMsg **next_node)
}
//check if message is expired based on each qos and set to delete state.
void checkMsgExpiry(XmidtMsg *xmdMsg)
void checkMsgExpiry()
{
long long currTime = 0;
struct timespec ts;
char *errorMsg = NULL;
XmidtMsg *temp = NULL;
temp = xmdMsg;
temp = get_global_xmidthead();
if(temp != NULL)
while(temp != NULL)
{
getCurrentTime(&ts);
currTime= (long long)ts.tv_sec;
@@ -1586,7 +1551,8 @@ void checkMsgExpiry(XmidtMsg *xmdMsg)
if(temp->state == DELETE)
{
ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid);
return;
temp = temp->next;
continue;
}
if(tempMsg->u.event.qos > 74)
@@ -1645,11 +1611,12 @@ void checkMsgExpiry(XmidtMsg *xmdMsg)
{
ParodusError("Invalid qos\n");
}
temp = temp->next;
}
}
//To delete low qos messages from queue when max queue limit is reached.
void checkMaxQandOptimize(XmidtMsg *xmdMsg)
void checkMaxQandOptimize()
{
int qos = 0;
@@ -1660,35 +1627,28 @@ void checkMaxQandOptimize(XmidtMsg *xmdMsg)
//Traverse through XmidtMsgQ list and set low qos msgs to DELETE
XmidtMsg *temp = NULL;
temp = xmdMsg;
temp = get_global_xmidthead();
if (temp != NULL)
{
while(temp != NULL)
{
wrp_msg_t * tempMsg = temp->msg;
qos = tempMsg->u.event.qos;
ParodusPrint("qos is %d\n", qos);
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
if(highQosValueCheck(qos))
{
ParodusPrint("High qos msg, skip delete\n");
}
else
{
//Skip max queue callback when msg is already in DELETE state.
if( temp->state == DELETE)
{
ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid);
}
else
{
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
}
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
}
temp = temp->next;
}
}
}

View File

@@ -114,8 +114,8 @@ void print_xmidMsg_list();
int deleteCloudACKNode(char* trans_id);
int deleteFromXmidtQ(XmidtMsg **next_node);
int checkCloudConn();
void checkMaxQandOptimize(XmidtMsg *xmdMsg);
void checkMsgExpiry(XmidtMsg *xmdMsg);
void checkMaxQandOptimize();
void checkMsgExpiry();
void mapXmidtStatusToStatusMessage(int status, char **message);
int xmidtQOptmize();
#ifdef __cplusplus