mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-28 10:20:04 +00:00
Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85a0d9b71d | ||
|
|
03a5b96e0c | ||
|
|
066bdc180d | ||
|
|
118e8ee32f | ||
|
|
f7c9f483f9 | ||
|
|
502f56400e | ||
|
|
3e557ae4b0 | ||
|
|
0602fb243b | ||
|
|
1a7ae0b785 | ||
|
|
bc655cf9ba | ||
|
|
ad0491179d | ||
|
|
fa49a52a94 | ||
|
|
25baef78a8 | ||
|
|
9034ef9d10 | ||
|
|
9020089016 | ||
|
|
664690e6a6 | ||
|
|
4aed47b730 | ||
|
|
2f3f1424b4 | ||
|
|
91ae0e82e2 | ||
|
|
bfc2659bbb | ||
|
|
f9c2878cbf | ||
|
|
c84a1bdfad | ||
|
|
0b0ba77bd6 | ||
|
|
0b0309c3dd | ||
|
|
f4e358c179 | ||
|
|
0561c67e5c | ||
|
|
c1fc8d877b | ||
|
|
581d7fc966 | ||
|
|
0673d2624e | ||
|
|
0f82e7c2d9 | ||
|
|
4acf7f63ba |
@@ -44,12 +44,10 @@ include_directories(${INCLUDE_DIR}
|
||||
)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
include_directories(${INCLUDE_DIR}/rbus
|
||||
${INCLUDE_DIR}/rbus-core
|
||||
${INCLUDE_DIR}/rtmessage
|
||||
)
|
||||
include_directories(${INCLUDE_DIR}/rbus)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
|
||||
# Get git commit hash
|
||||
#-------------------------------------------------------------------------------
|
||||
execute_process(
|
||||
@@ -66,9 +64,18 @@ add_definitions("-DGIT_COMMIT_TAG=\"${GIT_COMMIT_TAG}\"")
|
||||
add_definitions(-std=c99)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE -DNOPOLL_LOGGER ")
|
||||
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Wall -Wno-missing-field-initializers")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall")
|
||||
if (DEVICE_CAMERA)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
|
||||
add_definitions(-DDEVICE_CAMERA)
|
||||
else ()
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
|
||||
endif (DEVICE_CAMERA)
|
||||
|
||||
if (INCLUDE_BREAKPAD)
|
||||
add_definitions(-DINCLUDE_BREAKPAD)
|
||||
endif (INCLUDE_BREAKPAD)
|
||||
|
||||
# pthread external dependency
|
||||
#-------------------------------------------------------------------------------
|
||||
@@ -237,46 +244,21 @@ include_directories(${INCLUDE_DIR}
|
||||
endif (FEATURE_DNS_QUERY)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
# rtMessage external dependency
|
||||
#-------------------------------------------------------------------------------
|
||||
ExternalProject_Add(rtMessage
|
||||
DEPENDS cJSON
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/rtMessage
|
||||
GIT_REPOSITORY https://github.com/rdkcmf/rdk-rtmessage.git
|
||||
GIT_TAG rdk-next
|
||||
CMAKE_ARGS += -DBUILD_RTMESSAGE_LIB=ON
|
||||
-DBUILD_RTMESSAGE_SAMPLE_APP=ON
|
||||
-DBUILD_FOR_DESKTOP=OFF
|
||||
-DCJSON_BUILD=OFF
|
||||
-DBUILD_DATAPROVIDER_LIB=ON
|
||||
-DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
|
||||
)
|
||||
add_library(librtMessage STATIC SHARED IMPORTED)
|
||||
add_dependencies(librtMessage rtMessage)
|
||||
|
||||
# rbus-core external dependency
|
||||
#-------------------------------------------------------------------------------
|
||||
ExternalProject_Add(rbus-core
|
||||
DEPENDS rtMessage
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/rbus-core
|
||||
GIT_REPOSITORY https://github.com/rdkcmf/rbuscore.git
|
||||
GIT_TAG rdk-next
|
||||
CMAKE_ARGS += -DBUILD_FOR_DESKTOP=ON -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR}
|
||||
-DBUILD_TESTING=OFF
|
||||
)
|
||||
add_library(librbus-core STATIC SHARED IMPORTED)
|
||||
add_dependencies(librbus-core rbus-core)
|
||||
|
||||
# rbus external dependency
|
||||
#-------------------------------------------------------------------------------
|
||||
ExternalProject_Add(rbus
|
||||
DEPENDS rtMessage rbus-core
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/rbus
|
||||
GIT_REPOSITORY https://github.com/rdkcmf/rbus.git
|
||||
GIT_TAG rdk-next
|
||||
GIT_REPOSITORY https://github.com/rdkcentral/rbus.git
|
||||
GIT_TAG main
|
||||
CMAKE_ARGS += -DBUILD_FOR_DESKTOP=ON -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
|
||||
)
|
||||
|
||||
add_library(librbuscore STATIC SHARED IMPORTED)
|
||||
add_dependencies(librbuscore rbuscore)
|
||||
|
||||
add_library(librtMessage STATIC SHARED IMPORTED)
|
||||
add_dependencies(librtMessage rtMessage)
|
||||
|
||||
add_library(librbus STATIC SHARED IMPORTED)
|
||||
add_dependencies(librbus rbus)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
@@ -63,6 +63,6 @@ target_link_libraries (parodus -llibseshat)
|
||||
endif (ENABLE_SESHAT)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
target_link_libraries (parodus -lrbus -lrbus-core -lrtMessage)
|
||||
target_link_libraries (parodus -lrbus)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
install (TARGETS parodus DESTINATION bin)
|
||||
|
||||
@@ -164,6 +164,7 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
void subscribeRBUSevent();
|
||||
int regXmidtSendDataMethod();
|
||||
void registerRbusLogger();
|
||||
#endif
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void setWebpaInterface(char *value);
|
||||
|
||||
@@ -223,8 +223,14 @@ void getAuthToken(ParodusCfg *cfg)
|
||||
* @param[in] nmemb size of delivered data
|
||||
* @param[out] data curl response data saved.
|
||||
*/
|
||||
#ifndef DEVICE_CAMERA
|
||||
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data)
|
||||
{
|
||||
#else
|
||||
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *datain)
|
||||
{
|
||||
struct token_data *data = (struct token_data*) datain;
|
||||
#endif //DEVICE_CAMERA
|
||||
ParodusCfg *cfg;
|
||||
size_t max_data_size = sizeof (cfg->webpa_auth_token);
|
||||
size_t index = data->size;
|
||||
|
||||
@@ -47,7 +47,11 @@ struct token_data {
|
||||
|
||||
int requestNewAuthToken(char *newToken, size_t len, int r_count);
|
||||
void getAuthToken(ParodusCfg *cfg);
|
||||
#ifndef DEVICE_CAMERA
|
||||
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data);
|
||||
#else
|
||||
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *data);
|
||||
#endif
|
||||
char* generate_trans_uuid();
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
||||
60
src/config.c
60
src/config.c
@@ -41,7 +41,6 @@ pthread_cond_t cloud_status_cond=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
char webpa_interface[64]={'\0'};
|
||||
|
||||
char cloud_status[32]={'\0'};
|
||||
static ParodusCfg parodusCfg;
|
||||
static unsigned int rsa_algorithms =
|
||||
(1<<alg_rs256) | (1<<alg_rs384) | (1<<alg_rs512);
|
||||
@@ -94,10 +93,14 @@ void set_cloud_status(char *status)
|
||||
|
||||
char *get_cloud_status(void)
|
||||
{
|
||||
char *status = NULL;
|
||||
pthread_mutex_lock(&config_mut);
|
||||
parStrncpy(cloud_status, get_parodus_cfg()->cloud_status, sizeof(cloud_status));
|
||||
pthread_mutex_unlock(&config_mut);
|
||||
return cloud_status;
|
||||
if(NULL != get_parodus_cfg()->cloud_status)
|
||||
{
|
||||
status = get_parodus_cfg()->cloud_status;
|
||||
}
|
||||
pthread_mutex_unlock(&config_mut);
|
||||
return status;
|
||||
}
|
||||
|
||||
const char *get_tok (const char *src, int delim, char *result, int resultsize)
|
||||
@@ -498,7 +501,7 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
|
||||
case 's':
|
||||
if(parse_serial_num(cfg->hw_serial_number, optarg) == 0)
|
||||
ParodusInfo ("hw_serial-number is %s\n",cfg->hw_serial_number);
|
||||
ParodusInfo ("hw_serial_number is %s\n",cfg->hw_serial_number);
|
||||
break;
|
||||
|
||||
case 'f':
|
||||
@@ -708,6 +711,53 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void free_cfg(ParodusCfg *cfg)
|
||||
{
|
||||
if(cfg != NULL)
|
||||
{
|
||||
if (cfg->mtls_client_cert_path != NULL )
|
||||
{
|
||||
free(cfg->mtls_client_cert_path);
|
||||
cfg->mtls_client_cert_path = NULL;
|
||||
}
|
||||
if(cfg->connection_health_file != NULL)
|
||||
{
|
||||
free(cfg->connection_health_file);
|
||||
cfg->connection_health_file = NULL;
|
||||
}
|
||||
if(cfg->token_server_url != NULL)
|
||||
{
|
||||
free(cfg->token_server_url );
|
||||
cfg->token_server_url = NULL;
|
||||
}
|
||||
if(cfg->mtls_client_key_path != NULL)
|
||||
{
|
||||
free(cfg->mtls_client_key_path);
|
||||
cfg->mtls_client_key_path = NULL;
|
||||
}
|
||||
if(cfg->client_cert_path != NULL)
|
||||
{
|
||||
free(cfg->client_cert_path);
|
||||
cfg->client_cert_path = NULL;
|
||||
}
|
||||
if(cfg->crud_config_file != NULL)
|
||||
{
|
||||
free(cfg->crud_config_file);
|
||||
cfg->crud_config_file = NULL;
|
||||
}
|
||||
if(cfg->close_reason_file != NULL)
|
||||
{
|
||||
free(cfg->close_reason_file);
|
||||
cfg->close_reason_file = NULL;
|
||||
}
|
||||
if(cfg->cloud_disconnect != NULL)
|
||||
{
|
||||
free(cfg->cloud_disconnect);
|
||||
cfg->cloud_disconnect = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setDefaultValuesToCfg(ParodusCfg *cfg)
|
||||
{
|
||||
if(cfg == NULL)
|
||||
|
||||
@@ -135,7 +135,7 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg);
|
||||
* or -1 if error
|
||||
*/
|
||||
int parseCommandLine(int argc,char **argv,ParodusCfg * cfg);
|
||||
|
||||
void free_cfg(ParodusCfg *cfg);
|
||||
void setDefaultValuesToCfg(ParodusCfg *cfg);
|
||||
// Accessor for the global config structure.
|
||||
ParodusCfg *get_parodus_cfg(void);
|
||||
|
||||
@@ -99,7 +99,9 @@ void createSocketConnection(void (* initKeypress)())
|
||||
#endif
|
||||
|
||||
EventHandler();
|
||||
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
subscribeCurrentActiveInterfaceEvent();
|
||||
#endif
|
||||
set_server_list_null (&server_list);
|
||||
create_conn_rtn = createNopollConnection(ctx, &server_list);
|
||||
if(!create_conn_rtn)
|
||||
@@ -113,9 +115,6 @@ void createSocketConnection(void (* initKeypress)())
|
||||
UpStreamMsgQ = NULL;
|
||||
StartThread(handle_upstream, &upstream_tid);
|
||||
StartThread(processUpstreamMessage, &upstream_msg_tid);
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
subscribeCurrentActiveInterfaceEvent();
|
||||
#endif
|
||||
ParodusMsgQ = NULL;
|
||||
StartThread(messageHandlerTask, &downstream_tid);
|
||||
StartThread(serviceAliveTask, &svc_alive_tid);
|
||||
@@ -232,6 +231,9 @@ void createSocketConnection(void (* initKeypress)())
|
||||
nopoll_ctx_unref(ctx);
|
||||
nopoll_cleanup_library();
|
||||
curl_global_cleanup();
|
||||
clear_metadata();
|
||||
rdk_logger_deinit();
|
||||
free_server_list(&server_list);
|
||||
}
|
||||
|
||||
void shutdownSocketConnection(char *reason) {
|
||||
|
||||
16
src/main.c
16
src/main.c
@@ -24,7 +24,11 @@
|
||||
#include "parodus_log.h"
|
||||
#include <curl/curl.h>
|
||||
#ifdef INCLUDE_BREAKPAD
|
||||
#ifndef DEVICE_CAMERA
|
||||
#include "breakpad_wrapper.h"
|
||||
#else
|
||||
#include "breakpadwrap.h"
|
||||
#endif //DEVICE_CAMERA
|
||||
#endif
|
||||
#include "signal.h"
|
||||
#include "privilege.h"
|
||||
@@ -87,8 +91,17 @@ int main( int argc, char **argv)
|
||||
signal(SIGHUP, sig_handler);
|
||||
signal(SIGALRM, sig_handler);
|
||||
#ifdef INCLUDE_BREAKPAD
|
||||
#ifndef DEVICE_CAMERA
|
||||
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
|
||||
breakpad_ExceptionHandler();
|
||||
#else
|
||||
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
|
||||
BreakPadWrapExceptionHandler eh;
|
||||
eh = newBreakPadWrapExceptionHandler();
|
||||
if(NULL != eh) {
|
||||
ParodusInfo("Breakpad Initialized\n");
|
||||
}
|
||||
#endif //DEVICE_CAMERA
|
||||
#else
|
||||
signal(SIGSEGV, sig_handler);
|
||||
signal(SIGBUS, sig_handler);
|
||||
@@ -107,6 +120,7 @@ int main( int argc, char **argv)
|
||||
ParodusInfo("********** Starting component: Parodus **********\n ");
|
||||
drop_root_privilege();
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
registerRbusLogger();
|
||||
subscribeRBUSevent();
|
||||
regXmidtSendDataMethod();
|
||||
#endif
|
||||
@@ -117,7 +131,7 @@ int main( int argc, char **argv)
|
||||
curl_global_init(CURL_GLOBAL_DEFAULT);
|
||||
|
||||
createSocketConnection( NULL);
|
||||
|
||||
free_cfg(cfg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -110,6 +110,11 @@ void packMetaData()
|
||||
ParodusError("Failed to encode metadata\n");
|
||||
}
|
||||
}
|
||||
|
||||
void clear_metadata(){
|
||||
if(metadataPack != NULL)
|
||||
free(metadataPack);
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief To handle UpStream messages which is received from nanomsg server socket
|
||||
@@ -320,7 +325,10 @@ void *processUpstreamMessage()
|
||||
}
|
||||
else if(msgType == WRP_MSG_TYPE__EVENT)
|
||||
{
|
||||
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
|
||||
(msg->u.event.headers != NULL && msg->u.event.headers->headers[0] != NULL && msg->u.event.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream event data: dest '%s' traceParent: %s traceState: %s\n", msg->u.event.dest, msg->u.event.headers->headers[0], msg->u.event.headers->headers[1]) : ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
|
||||
if(msg->u.event.transaction_uuid != NULL) {
|
||||
ParodusInfo("transaction_uuid in event: %s\n", msg->u.event.transaction_uuid);
|
||||
}
|
||||
partners_t *partnersList = NULL;
|
||||
int j = 0;
|
||||
|
||||
@@ -379,7 +387,7 @@ void *processUpstreamMessage()
|
||||
//Sending to server for msgTypes 3, 5, 6, 7, 8.
|
||||
if( WRP_MSG_TYPE__REQ == msgType )
|
||||
{
|
||||
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid );
|
||||
(msg->u.req.headers != NULL && msg->u.req.headers->headers[0] != NULL && msg->u.req.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s traceParent: %s traceState: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid, msg->u.req.headers->headers[0], msg->u.req.headers->headers[1]) : ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid);
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
else
|
||||
|
||||
@@ -59,6 +59,7 @@ int subscribeCurrentActiveInterfaceEvent();
|
||||
UpStreamMsg * get_global_UpStreamMsgQ(void);
|
||||
pthread_cond_t *get_global_nano_con(void);
|
||||
pthread_mutex_t *get_global_nano_mut(void);
|
||||
void clear_metadata();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -47,6 +47,38 @@ rbusHandle_t get_parodus_rbus_Handle(void)
|
||||
{
|
||||
return rbus_Handle;
|
||||
}
|
||||
|
||||
/* Enables rbus ERROR level logs in parodus. Modify RBUS_LOG_ERROR check if more debug logs are needed from rbus. */
|
||||
void rbus_log_handler(
|
||||
rbusLogLevel level,
|
||||
const char* file,
|
||||
int line,
|
||||
int threadId,
|
||||
char* message)
|
||||
{
|
||||
ParodusPrint("threadId %d\n", threadId);
|
||||
const char* slevel = "";
|
||||
|
||||
if(level < RBUS_LOG_ERROR)
|
||||
return;
|
||||
|
||||
switch(level)
|
||||
{
|
||||
case RBUS_LOG_DEBUG: slevel = "DEBUG"; break;
|
||||
case RBUS_LOG_INFO: slevel = "INFO"; break;
|
||||
case RBUS_LOG_WARN: slevel = "WARN"; break;
|
||||
case RBUS_LOG_ERROR: slevel = "ERROR"; break;
|
||||
case RBUS_LOG_FATAL: slevel = "FATAL"; break;
|
||||
}
|
||||
ParodusInfo("%5s %s:%d -- %s\n", slevel, file, line, message);
|
||||
}
|
||||
|
||||
void registerRbusLogger()
|
||||
{
|
||||
rbus_registerLogHandler(rbus_log_handler);
|
||||
ParodusPrint("Registered rbus log handler\n");
|
||||
}
|
||||
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription );
|
||||
#endif
|
||||
@@ -73,7 +105,7 @@ void subscribeRBUSevent()
|
||||
int subscribeCurrentActiveInterfaceEvent()
|
||||
{
|
||||
int rc = RBUS_ERROR_SUCCESS;
|
||||
ParodusPrint("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
|
||||
ParodusInfo("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
|
||||
rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBPA_INTERFACE,eventReceiveHandler,subscribeAsyncHandler,"parodusInterface",10*20);
|
||||
if(rc != RBUS_ERROR_SUCCESS)
|
||||
{
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
#include "config.h"
|
||||
#include "time.h"
|
||||
#include "heartBeat.h"
|
||||
#include "close_retry.h"
|
||||
|
||||
static pthread_t processThreadId = 0;
|
||||
static unsigned int XmidtQsize = 0;
|
||||
@@ -70,6 +71,22 @@ bool highQosValueCheck(int qos)
|
||||
return false;
|
||||
}
|
||||
|
||||
//To handle high priority low qos message to confirm send success and ignore cloud ack.
|
||||
bool higherPriorityLowQosCheck(int qos)
|
||||
{
|
||||
if(qos > 20 && qos < 25)
|
||||
{
|
||||
ParodusInfo("The low qos msg with higher priority\n");
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("The qos is not higher priority low qos\n");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
XmidtMsg * get_global_xmidthead(void)
|
||||
{
|
||||
XmidtMsg *tmp = NULL;
|
||||
@@ -122,9 +139,9 @@ void decrement_XmidtQsize()
|
||||
int checkCloudConn()
|
||||
{
|
||||
int ret = 1;
|
||||
if (!cloud_status_is_online ())
|
||||
if (get_close_retry() || !cloud_status_is_online ())
|
||||
{
|
||||
ParodusInfo("cloud status is not online, wait till connection up\n");
|
||||
ParodusInfo("close_retry is in progress or cloud status is not online, wait till connection up\n");
|
||||
|
||||
int rv;
|
||||
struct timespec ts;
|
||||
@@ -214,8 +231,15 @@ int xmidtQOptmize()
|
||||
{
|
||||
if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size)
|
||||
{
|
||||
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
del = 2;
|
||||
if(higherPriorityLowQosCheck(tempMsg->u.event.qos))
|
||||
{
|
||||
ParodusInfo("Skip max queue size delete for qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid);
|
||||
del = 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -376,10 +400,9 @@ void* processXmidtUpstreamMsg()
|
||||
{
|
||||
XmidtMsg *Data = xmidtQ;
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
ParodusPrint("mutex unlock in xmidt consumer thread\n");
|
||||
|
||||
checkMsgExpiry();
|
||||
checkMaxQandOptimize();
|
||||
ParodusPrint("mutex unlock in xmidt consumer\n");
|
||||
checkMsgExpiry(xmidtQ);
|
||||
checkMaxQandOptimize(xmidtQ);
|
||||
cv = 0;
|
||||
|
||||
ParodusPrint("check state\n");
|
||||
@@ -747,10 +770,9 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
while(sendRetStatus) //If SendMessage is failed condition
|
||||
{
|
||||
ParodusError("sendXmidtEventToServer is Failed\n");
|
||||
if(highQosValueCheck(qos))
|
||||
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
||||
{
|
||||
ParodusPrint("The event is having high qos retry again\n");
|
||||
ParodusInfo("Wait till connection is Up\n");
|
||||
ParodusPrint("The event is having high qos retry again, wait till connection is Up\n");
|
||||
rv = checkCloudConn();
|
||||
if(rv == 2)
|
||||
{
|
||||
@@ -758,7 +780,7 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
break;
|
||||
}
|
||||
ParodusInfo("Received cloud status signal proceed to retry\n");
|
||||
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -798,13 +820,26 @@ int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHa
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Low qos event, send success callback and delete\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
//print_xmidMsg_list();
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
if(higherPriorityLowQosCheck(qos))
|
||||
{
|
||||
ParodusInfo("Higher priority low qos send success, ignore cloud ack\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
ParodusPrint("update state to DELETE\n");
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Low qos event, send success callback and delete\n");
|
||||
mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS);
|
||||
//print_xmidMsg_list();
|
||||
updateXmidtState(msgnode, DELETE);
|
||||
print_xmidMsg_list();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1533,16 +1568,16 @@ int deleteFromXmidtQ(XmidtMsg **next_node)
|
||||
}
|
||||
|
||||
//check if message is expired based on each qos and set to delete state.
|
||||
void checkMsgExpiry()
|
||||
void checkMsgExpiry(XmidtMsg *xmdMsg)
|
||||
{
|
||||
long long currTime = 0;
|
||||
struct timespec ts;
|
||||
char *errorMsg = NULL;
|
||||
|
||||
XmidtMsg *temp = NULL;
|
||||
temp = get_global_xmidthead();
|
||||
temp = xmdMsg;
|
||||
|
||||
while(temp != NULL)
|
||||
if(temp != NULL)
|
||||
{
|
||||
getCurrentTime(&ts);
|
||||
currTime= (long long)ts.tv_sec;
|
||||
@@ -1551,8 +1586,7 @@ void checkMsgExpiry()
|
||||
if(temp->state == DELETE)
|
||||
{
|
||||
ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid);
|
||||
temp = temp->next;
|
||||
continue;
|
||||
return;
|
||||
}
|
||||
|
||||
if(tempMsg->u.event.qos > 74)
|
||||
@@ -1611,12 +1645,11 @@ void checkMsgExpiry()
|
||||
{
|
||||
ParodusError("Invalid qos\n");
|
||||
}
|
||||
temp = temp->next;
|
||||
}
|
||||
}
|
||||
|
||||
//To delete low qos messages from queue when max queue limit is reached.
|
||||
void checkMaxQandOptimize()
|
||||
void checkMaxQandOptimize(XmidtMsg *xmdMsg)
|
||||
{
|
||||
int qos = 0;
|
||||
|
||||
@@ -1627,28 +1660,35 @@ void checkMaxQandOptimize()
|
||||
|
||||
//Traverse through XmidtMsgQ list and set low qos msgs to DELETE
|
||||
XmidtMsg *temp = NULL;
|
||||
temp = get_global_xmidthead();
|
||||
temp = xmdMsg;
|
||||
|
||||
while(temp != NULL)
|
||||
{
|
||||
if (temp != NULL)
|
||||
{
|
||||
wrp_msg_t * tempMsg = temp->msg;
|
||||
qos = tempMsg->u.event.qos;
|
||||
ParodusPrint("qos is %d\n", qos);
|
||||
if(highQosValueCheck(qos))
|
||||
if((highQosValueCheck(qos)) || (higherPriorityLowQosCheck(qos)))
|
||||
{
|
||||
ParodusPrint("High qos msg, skip delete\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
|
||||
//rbus callback to caller
|
||||
char *errorMsg = NULL;
|
||||
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
updateXmidtState(temp, DELETE);
|
||||
//Skip max queue callback when msg is already in DELETE state.
|
||||
if( temp->state == DELETE)
|
||||
{
|
||||
ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
|
||||
//rbus callback to caller
|
||||
char *errorMsg = NULL;
|
||||
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
|
||||
ParodusPrint("statusMsg is %s\n",errorMsg);
|
||||
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
|
||||
updateXmidtState(temp, DELETE);
|
||||
}
|
||||
}
|
||||
temp = temp->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,8 +114,8 @@ void print_xmidMsg_list();
|
||||
int deleteCloudACKNode(char* trans_id);
|
||||
int deleteFromXmidtQ(XmidtMsg **next_node);
|
||||
int checkCloudConn();
|
||||
void checkMaxQandOptimize();
|
||||
void checkMsgExpiry();
|
||||
void checkMaxQandOptimize(XmidtMsg *xmdMsg);
|
||||
void checkMsgExpiry(XmidtMsg *xmdMsg);
|
||||
void mapXmidtStatusToStatusMessage(int status, char **message);
|
||||
int xmidtQOptmize();
|
||||
#ifdef __cplusplus
|
||||
|
||||
@@ -33,7 +33,7 @@ set (PARODUS_COMMON_LIBS ${PARODUS_COMMON_LIBS} -lucresolv -lresolv)
|
||||
endif (FEATURE_DNS_QUERY)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set (PARODUS_COMMON_LIBS ${PARODUS_COMMON_LIBS} -lrbus -lrbus-core)
|
||||
set (PARODUS_COMMON_LIBS ${PARODUS_COMMON_LIBS} -lrbus)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
if(NOT DISABLE_VALGRIND)
|
||||
@@ -61,7 +61,10 @@ target_link_libraries (test_close_retry ${PARODUS_COMMON_LIBS} -lcmocka)
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_mutex COMMAND ${MEMORY_CHECK} ./test_mutex)
|
||||
add_executable(test_mutex test_mutex.c ../src/mutex.c)
|
||||
target_link_libraries (test_mutex ${PARODUS_COMMON_LIBS} -lcmocka)
|
||||
target_link_libraries (test_mutex gcov -lcunit -lcimplog -lwrp-c
|
||||
-luuid -lmsgpackc -lnopoll -lnanomsg -lpthread
|
||||
-Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64
|
||||
-lssl -lcrypto -lrt -lm -lcmocka)
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# test_networking
|
||||
@@ -74,8 +77,8 @@ target_link_libraries (test_networking ${PARODUS_COMMON_LIBS})
|
||||
# test_nopoll_helpers
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_nopoll_helpers COMMAND ${MEMORY_CHECK} ./test_nopoll_helpers)
|
||||
add_executable(test_nopoll_helpers test_nopoll_helpers.c ../src/nopoll_helpers.c)
|
||||
target_link_libraries (test_nopoll_helpers -Wl,--no-as-needed -lrt -lcmocka -lcimplog -lnopoll)
|
||||
add_executable(test_nopoll_helpers test_nopoll_helpers.c ../src/nopoll_helpers.c ../src/string_helpers.c ../src/config.c)
|
||||
target_link_libraries (test_nopoll_helpers -Wl,--no-as-needed -lrt -lcmocka -lcimplog -lnopoll ${PARODUS_COMMON_LIBS})
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
# test_time
|
||||
|
||||
@@ -24,6 +24,9 @@
|
||||
#include "../src/parodus_log.h"
|
||||
#include "../src/nopoll_helpers.h"
|
||||
#include "../src/config.h"
|
||||
#include <cjwt/cjwt.h>
|
||||
#include "../src/connection.h"
|
||||
#include "../src/ParodusInternal.h"
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Macros */
|
||||
@@ -34,7 +37,6 @@
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static noPollConn *conn = NULL;
|
||||
static ParodusCfg cfg;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
@@ -57,12 +59,6 @@ nopoll_bool nopoll_conn_is_ready( noPollConn *conn )
|
||||
return (nopoll_bool)mock();
|
||||
}
|
||||
|
||||
ParodusCfg *get_parodus_cfg(void)
|
||||
{
|
||||
function_called();
|
||||
return &cfg;
|
||||
}
|
||||
|
||||
int __nopoll_conn_send_common (noPollConn * conn, const char * content, long length, nopoll_bool has_fin, long sleep_in_header, noPollOpCode frame_type)
|
||||
{
|
||||
UNUSED(has_fin); UNUSED(sleep_in_header); UNUSED(frame_type); UNUSED(content);
|
||||
@@ -131,10 +127,6 @@ bool get_interface_down_event()
|
||||
return false;
|
||||
}
|
||||
|
||||
char *get_cloud_status(void)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Tests */
|
||||
@@ -226,8 +218,7 @@ void test_sendMessage()
|
||||
{
|
||||
int len = strlen("Hello Parodus!");
|
||||
|
||||
cfg.cloud_status = CLOUD_STATUS_ONLINE;
|
||||
expect_function_calls (get_parodus_cfg, 1);
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
|
||||
|
||||
expect_value(__nopoll_conn_send_common, (intptr_t)conn, (intptr_t)conn);
|
||||
expect_value(__nopoll_conn_send_common, length, len);
|
||||
@@ -241,8 +232,7 @@ void test_sendMessageOffline()
|
||||
{
|
||||
int len = strlen("Hello Parodus!");
|
||||
|
||||
cfg.cloud_status = CLOUD_STATUS_OFFLINE;
|
||||
expect_function_calls (get_parodus_cfg, 1);
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_OFFLINE;
|
||||
sendMessage(conn, "Hello Parodus!", len);
|
||||
|
||||
}
|
||||
@@ -251,8 +241,7 @@ void err_sendMessage()
|
||||
{
|
||||
int len = strlen("Hello Parodus!");
|
||||
|
||||
cfg.cloud_status = CLOUD_STATUS_ONLINE;
|
||||
expect_function_calls (get_parodus_cfg, 1);
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
|
||||
|
||||
expect_value(__nopoll_conn_send_common, (intptr_t)conn,(intptr_t) conn);
|
||||
expect_value(__nopoll_conn_send_common, length, len);
|
||||
@@ -271,8 +260,7 @@ void err_sendMessageConnNull()
|
||||
{
|
||||
int len = strlen("Hello Parodus!");
|
||||
|
||||
cfg.cloud_status = CLOUD_STATUS_ONLINE;
|
||||
expect_function_calls (get_parodus_cfg, 1);
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
|
||||
|
||||
expect_value(__nopoll_conn_send_common, (intptr_t)conn, NULL);
|
||||
expect_value(__nopoll_conn_send_common, length, len);
|
||||
|
||||
Reference in New Issue
Block a user