diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index c395264..73871cf 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -38,9 +38,9 @@ jobs: run: mkdir build - name: Get Sonarcloud Binaries - working-directory: build - run: | - ../.github/scripts/get_sonarcloud.sh + uses: xmidt-org/sonarcloud-installer-action@v1 + with: + working-directory: build - name: CMake working-directory: build diff --git a/CHANGELOG.md b/CHANGELOG.md index dff0a92..8600524 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Add additional HTTP headers for call to Themis from Convey +- Add callback handler for initial cloud connection status change event +- Fix Parodus connection stuck on interface up down received together +- Update to use nopoll version 1.0.3 ## [v1.1.5] - Add additional HTTP headers for call to Themis from Convey diff --git a/CMakeLists.txt b/CMakeLists.txt index a8e600c..be30c1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -43,6 +43,11 @@ include_directories(${INCLUDE_DIR} ${INCLUDE_DIR}/cjwt ) +if (ENABLE_WEBCFGBIN) +include_directories(${INCLUDE_DIR}/rbus) +endif (ENABLE_WEBCFGBIN) + + # Get git commit hash #------------------------------------------------------------------------------- execute_process( @@ -59,9 +64,18 @@ add_definitions("-DGIT_COMMIT_TAG=\"${GIT_COMMIT_TAG}\"") add_definitions(-std=c99) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE -DNOPOLL_LOGGER ") -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Wall -Wno-missing-field-initializers") -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall") +if (DEVICE_CAMERA) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all") +add_definitions(-DDEVICE_CAMERA) +else () +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all") +endif (DEVICE_CAMERA) +if (INCLUDE_BREAKPAD) +add_definitions(-DINCLUDE_BREAKPAD) +endif (INCLUDE_BREAKPAD) # pthread external dependency #------------------------------------------------------------------------------- @@ -86,7 +100,7 @@ add_dependencies(libtrower-base64 trower-base64) ExternalProject_Add(nopoll PREFIX ${PREFIX_DIR}/nopoll GIT_REPOSITORY https://github.com/Comcast/nopoll.git - GIT_TAG "1.0.2" + GIT_TAG "1.0.3" CONFIGURE_COMMAND COMMAND /autogen.sh --prefix=${PREFIX} --includedir=${INCLUDE_DIR} --libdir=${LIBRARY_DIR} @@ -165,8 +179,8 @@ add_dependencies(libcimplog cimplog) ExternalProject_Add(wrp-c DEPENDS trower-base64 msgpack cimplog PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/wrp-c - GIT_REPOSITORY https://github.com/Comcast/wrp-c.git - GIT_TAG "1.0.1" + GIT_REPOSITORY https://github.com/xmidt-org/wrp-c.git + GIT_TAG "b5ef4d10cb39905908788bc89ab3e4dab201db8a" CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DMSGPACK_ENABLE_CXX=OFF -DMSGPACK_BUILD_EXAMPLES=OFF @@ -229,6 +243,26 @@ include_directories(${INCLUDE_DIR} endif (FEATURE_DNS_QUERY) +if (ENABLE_WEBCFGBIN) +# rbus external dependency +#------------------------------------------------------------------------------- +ExternalProject_Add(rbus + PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/rbus + GIT_REPOSITORY https://github.com/rdkcentral/rbus.git + GIT_TAG main + CMAKE_ARGS += -DBUILD_FOR_DESKTOP=ON -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF +) + +add_library(librbuscore STATIC SHARED IMPORTED) +add_dependencies(librbuscore rbuscore) + +add_library(librtMessage STATIC SHARED IMPORTED) +add_dependencies(librtMessage rtMessage) + +add_library(librbus STATIC SHARED IMPORTED) +add_dependencies(librbus rbus) +endif (ENABLE_WEBCFGBIN) + if (BUILD_TESTING) # cmocka external dependency #------------------------------------------------------------------------------- @@ -253,6 +287,14 @@ if (FEATURE_DNS_QUERY) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DFEATURE_DNS_QUERY ") endif (FEATURE_DNS_QUERY) +if (ENABLE_WEBCFGBIN) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_WEBCFGBIN ") +endif (ENABLE_WEBCFGBIN) + +if (WAN_FAILOVER_SUPPORTED) +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DWAN_FAILOVER_SUPPORTED ") +endif (WAN_FAILOVER_SUPPORTED) + link_directories ( ${LIBRARY_DIR} ${COMMON_LIBRARY_DIR} ${LIBRARY_DIR64} ) add_subdirectory(src) if (BUILD_TESTING) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index aa3f66c..8b7af65 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -23,6 +23,15 @@ else() set(SOURCES ${SOURCES} seshat_interface_stub.c) endif (ENABLE_SESHAT) +if (ENABLE_WEBCFGBIN) +set(SOURCES ${SOURCES} upstream_rbus.c xmidtsend_rbus.c) +endif (ENABLE_WEBCFGBIN) + +if (WAN_FAILOVER_SUPPORTED) +message(STATUS "WAN_FAILOVER_SUPPORTED is supported") +else() +message(STATUS "WAN_FAILOVER_SUPPORTED is not supported") +endif (WAN_FAILOVER_SUPPORTED) add_executable(parodus ${SOURCES}) @@ -53,5 +62,7 @@ if (ENABLE_SESHAT) target_link_libraries (parodus -llibseshat) endif (ENABLE_SESHAT) - +if (ENABLE_WEBCFGBIN) +target_link_libraries (parodus -lrbus) +endif (ENABLE_WEBCFGBIN) install (TARGETS parodus DESTINATION bin) diff --git a/src/ParodusInternal.c b/src/ParodusInternal.c index 054f25a..cdbeff2 100644 --- a/src/ParodusInternal.c +++ b/src/ParodusInternal.c @@ -69,9 +69,9 @@ char* getWebpaConveyHeader() cJSON_AddStringToObject(response, WEBPA_PROTOCOL, get_parodus_cfg()->webpa_protocol); } - if(strlen(get_parodus_cfg()->webpa_interface_used)!=0) + if(strlen(getWebpaInterface())!=0) { - cJSON_AddStringToObject(response, WEBPA_INTERFACE, get_parodus_cfg()->webpa_interface_used); + cJSON_AddStringToObject(response, WEBPA_INTERFACE, getWebpaInterface()); } if(strlen(get_parodus_cfg()->hw_last_reboot_reason)!=0) diff --git a/src/ParodusInternal.h b/src/ParodusInternal.h index f8a1193..d261331 100644 --- a/src/ParodusInternal.h +++ b/src/ParodusInternal.h @@ -161,7 +161,14 @@ int readFromFile(const char *file_name, char **data); void timespec_diff(struct timespec *start, struct timespec *stop, struct timespec *result); - +#ifdef ENABLE_WEBCFGBIN +void subscribeRBUSevent(); +int regXmidtSendDataMethod(); +void registerRbusLogger(); +#endif +#ifdef WAN_FAILOVER_SUPPORTED +void setWebpaInterface(char *value); +#endif /*------------------------------------------------------------------------------*/ /* For interface_down_event Flag */ /*------------------------------------------------------------------------------*/ @@ -178,7 +185,12 @@ void set_interface_down_event(); pthread_cond_t *get_interface_down_con(); pthread_mutex_t *get_interface_down_mut(); - + +pthread_cond_t *get_global_cloud_status_cond(void); + +pthread_mutex_t *get_global_cloud_status_mut(void); + +int cloud_status_is_online (void); #ifdef __cplusplus } diff --git a/src/auth_token.c b/src/auth_token.c index 9bc694b..18468e8 100644 --- a/src/auth_token.c +++ b/src/auth_token.c @@ -64,6 +64,7 @@ int requestNewAuthToken(char *newToken, size_t len, int r_count) struct curl_slist *list = NULL; struct curl_slist *headers_list = NULL; + char webpa_interface[64]={'\0'}; double total; struct token_data data; @@ -80,9 +81,10 @@ int requestNewAuthToken(char *newToken, size_t len, int r_count) curl_easy_setopt(curl, CURLOPT_URL, get_parodus_cfg()->token_server_url); curl_easy_setopt(curl, CURLOPT_TIMEOUT, CURL_TIMEOUT_SEC); - if(get_parodus_cfg()->webpa_interface_used !=NULL && strlen(get_parodus_cfg()->webpa_interface_used) >0) + parStrncpy(webpa_interface, getWebpaInterface(), sizeof(webpa_interface)); + if(webpa_interface !=NULL && strlen(webpa_interface) >0) { - curl_easy_setopt(curl, CURLOPT_INTERFACE, get_parodus_cfg()->webpa_interface_used); + curl_easy_setopt(curl, CURLOPT_INTERFACE, webpa_interface); } /* set callback for writing received data */ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback_fn); @@ -221,8 +223,14 @@ void getAuthToken(ParodusCfg *cfg) * @param[in] nmemb size of delivered data * @param[out] data curl response data saved. */ +#ifndef DEVICE_CAMERA size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data) { +#else +size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *datain) +{ + struct token_data *data = (struct token_data*) datain; +#endif //DEVICE_CAMERA ParodusCfg *cfg; size_t max_data_size = sizeof (cfg->webpa_auth_token); size_t index = data->size; @@ -304,7 +312,7 @@ void createCurlheader(struct curl_slist *list, struct curl_slist **header_list) snprintf(buf, MAX_BUF_SIZE, "X-Midt-Protocol: %s", get_parodus_cfg()->webpa_protocol); list = curl_slist_append(list, buf); - snprintf(buf, MAX_BUF_SIZE, "X-Midt-Interface-Used: %s", get_parodus_cfg()->webpa_interface_used); + snprintf(buf, MAX_BUF_SIZE, "X-Midt-Interface-Used: %s", getWebpaInterface()); list = curl_slist_append(list, buf); snprintf(buf, MAX_BUF_SIZE, "X-Midt-Last-Reboot-Reason: %s", get_parodus_cfg()->hw_last_reboot_reason); diff --git a/src/auth_token.h b/src/auth_token.h index 905d527..d5d0aac 100644 --- a/src/auth_token.h +++ b/src/auth_token.h @@ -47,7 +47,11 @@ struct token_data { int requestNewAuthToken(char *newToken, size_t len, int r_count); void getAuthToken(ParodusCfg *cfg); +#ifndef DEVICE_CAMERA size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data); +#else +size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *data); +#endif char* generate_trans_uuid(); #ifdef __cplusplus diff --git a/src/config.c b/src/config.c index b710b8a..00b2975 100644 --- a/src/config.c +++ b/src/config.c @@ -33,6 +33,13 @@ /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER; + +//For sending cond signal when cloud status is ONLINE +pthread_mutex_t cloud_status_mut=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t cloud_status_cond=PTHREAD_COND_INITIALIZER; + +char webpa_interface[64]={'\0'}; static ParodusCfg parodusCfg; static unsigned int rsa_algorithms = @@ -40,6 +47,15 @@ static unsigned int rsa_algorithms = /*----------------------------------------------------------------------------*/ /* External Functions */ /*----------------------------------------------------------------------------*/ +pthread_cond_t *get_global_cloud_status_cond(void) +{ + return &cloud_status_cond; +} + +pthread_mutex_t *get_global_cloud_status_mut(void) +{ + return &cloud_status_mut; +} ParodusCfg *get_parodus_cfg(void) { @@ -61,6 +77,31 @@ void reset_cloud_disconnect_reason(ParodusCfg *cfg) cfg->cloud_disconnect = NULL; } +void set_cloud_status(char *status) +{ + if(status != NULL) + { + pthread_mutex_lock(&config_mut); + get_parodus_cfg()->cloud_status = status; + if(strcmp (status, CLOUD_STATUS_ONLINE) == 0) + { + pthread_cond_signal(&cloud_status_cond); + } + pthread_mutex_unlock(&config_mut); + } +} + +char *get_cloud_status(void) +{ + char *status = NULL; + pthread_mutex_lock(&config_mut); + if(NULL != get_parodus_cfg()->cloud_status) + { + status = get_parodus_cfg()->cloud_status; + } + pthread_mutex_unlock(&config_mut); + return status; +} const char *get_tok (const char *src, int delim, char *result, int resultsize) { @@ -146,9 +187,9 @@ void read_key_from_file (const char *fname, char *buf, size_t buflen) int parse_mac_address (char *target, const char *arg) { int count = 0; - int i; + int i, j; char c; - + char *mac = target; for (i=0; (c=arg[i]) != 0; i++) { if (c !=':') count++; @@ -160,9 +201,48 @@ int parse_mac_address (char *target, const char *arg) *(target++) = c; } *target = 0; // terminating null + + //convert mac to lowercase + for(j = 0; mac[j]; j++) + { + mac[j] = tolower(mac[j]); + } + ParodusPrint("mac in lowercase is %s\n", mac); return 0; } +int parse_serial_num(char *target, const char *arg) +{ + char ch; + if(arg != NULL) + { + if(strlen(arg) == 0) + { + ParodusError("Empty serial number, setting to default unknown\n"); + strcpy(target,"unknown"); + } + for(int i=0; (ch = arg[i]) != '\0'; i++) + { + // check if character is ascii, a-z --> 97 to 122, A-Z --> 65 to 90, digits(0 to 9) --> 48 to 57 + if((ch >= 97 && ch <= 122) || (ch >= 65 && ch <= 90) || (ch >=48 && ch <= 57)) + { + target[i] = ch; + } + else + { + ParodusError("Invalid serial number, setting to default unknown\n"); + strcpy(target,"unknown"); + break; + } + } + } + else + { + ParodusError("serial number argument is NULL\n"); + } + return 0; +} + int server_is_http (const char *full_url, const char **server_ptr) { @@ -355,6 +435,9 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg) {"webpa-backoff-max", required_argument, 0, 'o'}, {"webpa-interface-used", required_argument, 0, 'i'}, {"parodus-local-url", required_argument, 0, 'l'}, +#ifdef ENABLE_WEBCFGBIN + {"max-queue-size", required_argument, 0, 'q'}, +#endif {"partner-id", required_argument, 0, 'p'}, #ifdef ENABLE_SESHAT {"seshat-url", required_argument, 0, 'e'}, @@ -402,7 +485,7 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg) /* getopt_long stores the option index here. */ int option_index = 0; c = getopt_long (argc, argv, - "m:s:f:d:r:n:b:u:t:o:i:l:p:e:D:j:a:k:c:T:w:J:46:C:S:R:K:M", + "m:s:f:d:r:n:b:u:t:o:i:l:q:p:e:D:j:a:k:c:T:w:J:46:C:S:R:K:M", long_options, &option_index); /* Detect the end of the options. */ @@ -417,8 +500,8 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg) break; case 's': - parStrncpy(cfg->hw_serial_number,optarg,sizeof(cfg->hw_serial_number)); - ParodusInfo("hw_serial_number is %s\n",cfg->hw_serial_number); + if(parse_serial_num(cfg->hw_serial_number, optarg) == 0) + ParodusInfo ("hw_serial_number is %s\n",cfg->hw_serial_number); break; case 'f': @@ -488,6 +571,16 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg) parStrncpy(cfg->local_url, optarg,sizeof(cfg->local_url)); ParodusInfo("parodus local_url is %s\n",cfg->local_url); break; + +#ifdef ENABLE_WEBCFGBIN + case 'q': + cfg->max_queue_size = parse_num_arg (optarg, "max-queue-size"); + if (cfg->max_queue_size == (unsigned int) -1) + return -1; + ParodusInfo("max_queue_size is %d\n",cfg->max_queue_size); + break; +#endif + case 'D': // like 'fabric' or 'test' // this parameter is used, along with the hw_mac parameter @@ -724,7 +817,7 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg) } if(strlen(config->webpa_interface_used )!=0) { - parStrncpy(cfg->webpa_interface_used, config->webpa_interface_used,sizeof(cfg->webpa_interface_used)); + parStrncpy(getWebpaInterface(), config->webpa_interface_used,sizeof(getWebpaInterface())); } else { @@ -792,7 +885,9 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg) parStrncpy(cfg->cert_path, "\0", sizeof(cfg->cert_path)); ParodusPrint("cert_path is NULL. set to empty\n"); } - + #ifdef ENABLE_WEBCFGBIN + cfg->max_queue_size = config->max_queue_size; + #endif cfg->boot_time = config->boot_time; cfg->webpa_ping_timeout = config->webpa_ping_timeout; cfg->webpa_backoff_max = config->webpa_backoff_max; @@ -848,4 +943,26 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg) } } +#ifdef WAN_FAILOVER_SUPPORTED +void setWebpaInterface(char *value) +{ + pthread_mutex_lock (&config_mut); + parStrncpy(get_parodus_cfg()->webpa_interface_used, value, sizeof(get_parodus_cfg()->webpa_interface_used)); + pthread_mutex_unlock (&config_mut); +} +#endif + +char *getWebpaInterface(void) +{ + #ifdef WAN_FAILOVER_SUPPORTED + ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n"); + pthread_mutex_lock (&config_mut); + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + pthread_mutex_unlock (&config_mut); + #else + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + #endif + ParodusPrint("webpa_interface:%s\n", webpa_interface); + return webpa_interface; +} diff --git a/src/config.h b/src/config.h index f7fc56a..72770e8 100644 --- a/src/config.h +++ b/src/config.h @@ -24,6 +24,7 @@ #ifndef _CONFIG_H_ #define _CONFIG_H_ +#include #ifdef __cplusplus extern "C" { #endif @@ -87,6 +88,9 @@ typedef struct char webpa_uuid[64]; unsigned int flags; char local_url[124]; +#ifdef ENABLE_WEBCFGBIN + unsigned int max_queue_size; +#endif char partner_id[64]; #ifdef ENABLE_SESHAT char seshat_url[128]; @@ -139,6 +143,10 @@ void set_parodus_cfg(ParodusCfg *); char *get_token_application(void) ; void set_cloud_disconnect_reason(ParodusCfg *cfg, char *disconn_reason); void reset_cloud_disconnect_reason(ParodusCfg *cfg); +char *getWebpaInterface(void); +void set_cloud_status(char *status); +char *get_cloud_status(void); +int get_parodus_init(); /** * parse a webpa url. Extract the server address, the port * and return whether it's secure or not diff --git a/src/conn_interface.c b/src/conn_interface.c index 3456267..aadd177 100644 --- a/src/conn_interface.c +++ b/src/conn_interface.c @@ -77,6 +77,7 @@ void createSocketConnection(void (* initKeypress)()) server_list_t server_list; bool seshat_registered = false; int create_conn_rtn = 0; + int nopoll_returnvalue = 0; unsigned int webpa_ping_timeout_ms = 1000 * get_parodus_cfg()->webpa_ping_timeout; unsigned int heartBeatTimer = 0; struct timespec start_svc_alive_timer; @@ -98,7 +99,9 @@ void createSocketConnection(void (* initKeypress)()) #endif EventHandler(); - + #ifdef WAN_FAILOVER_SUPPORTED + subscribeCurrentActiveInterfaceEvent(); + #endif set_server_list_null (&server_list); create_conn_rtn = createNopollConnection(ctx, &server_list); if(!create_conn_rtn) @@ -131,14 +134,17 @@ void createSocketConnection(void (* initKeypress)()) struct timespec start, stop, diff; int time_taken_ms; - clock_gettime(CLOCK_REALTIME, &start); - nopoll_loop_wait(ctx, 5000000); - clock_gettime(CLOCK_REALTIME, &stop); + clock_gettime(CLOCK_MONOTONIC, &start); + nopoll_returnvalue = nopoll_loop_wait(ctx, 5000000); + clock_gettime(CLOCK_MONOTONIC, &stop); timespec_diff(&start, &stop, &diff); time_taken_ms = diff.tv_sec * 1000 + (diff.tv_nsec / 1000000); - - // ParodusInfo("nopoll_loop_wait() time %d msec\n", time_taken_ms); + if(time_taken_ms/1000 != 5) + { + ParodusInfo("nopoll_loop_wait value %d,nopoll_loop_wait() time %d msec\n",nopoll_returnvalue, time_taken_ms); + } + ParodusPrint("webpa_ping_timeout_ms %d msec\n", webpa_ping_timeout_ms); heartBeatTimer = get_heartBeatTimer(); if(heartBeatTimer >= webpa_ping_timeout_ms) { @@ -195,7 +201,8 @@ void createSocketConnection(void (* initKeypress)()) } createNopollConnection(ctx, &server_list); } - } while(!get_close_retry() && !g_shutdown); + //process exit only when g_shutdown is true. + } while(FOREVER() && !g_shutdown); pthread_mutex_lock (get_global_svc_mut()); pthread_cond_signal (get_global_svc_con()); diff --git a/src/connection.c b/src/connection.c index c679870..79ff389 100644 --- a/src/connection.c +++ b/src/connection.c @@ -62,6 +62,8 @@ enum { /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +parodusOnConnStatusChangeHandler on_conn_status_change; + parodusOnPingStatusChangeHandler on_ping_status_change; pthread_mutex_t backoff_delay_mut=PTHREAD_MUTEX_INITIALIZER; @@ -130,7 +132,10 @@ void set_cloud_disconnect_time(int disconnTime) cloud_disconnect_max_time = disconnTime; } - +int get_parodus_init() +{ + return init; +} //-------------------------------------------------------------------- // createNopollConnection_logic: @@ -234,7 +239,7 @@ void init_backoff_timer (backoff_timer_t *timer, int max_count) timer->count = 1; timer->max_count = max_count; timer->delay = 1; - clock_gettime (CLOCK_REALTIME, &timer->ts); + clock_gettime (CLOCK_MONOTONIC, &timer->ts); timer->start_time = time(NULL); } @@ -320,8 +325,14 @@ static int backoff_delay (backoff_timer_t *timer) struct timespec ts; int rtn; + pthread_condattr_t backoff_delay_cond_attr; + + pthread_condattr_init (&backoff_delay_cond_attr); + pthread_condattr_setclock (&backoff_delay_cond_attr, CLOCK_MONOTONIC); + pthread_cond_init (&backoff_delay_con, &backoff_delay_cond_attr); + // periodically update the health file. - clock_gettime (CLOCK_REALTIME, &ts); + clock_gettime (CLOCK_MONOTONIC, &ts); if ((ts.tv_sec - timer->ts.tv_sec) >= UPDATE_HEALTH_FILE_INTERVAL_SECS) { start_conn_in_progress (timer->start_time); timer->ts.tv_sec += UPDATE_HEALTH_FILE_INTERVAL_SECS; @@ -334,6 +345,8 @@ static int backoff_delay (backoff_timer_t *timer) rtn = pthread_cond_timedwait (&backoff_delay_con, &backoff_delay_mut, &ts); pthread_mutex_unlock (&backoff_delay_mut); + pthread_condattr_destroy(&backoff_delay_cond_attr); + if (g_shutdown) return BACKOFF_SHUTDOWN; if ((rtn != 0) && (rtn != ETIMEDOUT)) { @@ -697,7 +710,8 @@ int wait_while_interface_down() if (rtn != 0) ParodusError ("Error on pthread_cond_wait (%d) in wait_while_interface_down\n", rtn); - if ((rtn != 0) || g_shutdown) { + if (g_shutdown) { + ParodusInfo("Received g_shutdown during interface down wait, returning\n"); return -1; } } @@ -714,9 +728,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) { create_connection_ctx_t conn_ctx; int max_retry_count; - struct timespec connect_time,*connectTimePtr; - connectTimePtr = &connect_time; backoff_timer_t backoff_timer; + static int init_conn_failure=1; + struct sysinfo l_sSysInfo; if(ctx == NULL) { return nopoll_false; @@ -752,6 +766,14 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) } /* if we failed to connect, don't reuse the redirect server */ free_server (&conn_ctx.server_list->redirect); + + /* On initial connect failure, invoke conn status change event as "failed" only 1 time*/ + if((NULL != on_conn_status_change) && init && init_conn_failure) + { + on_conn_status_change("failed"); + init_conn_failure=0; + } + #ifdef FEATURE_DNS_QUERY /* if we don't already have a valid jwt, look up server information */ if (server_is_null (&conn_ctx.server_list->jwt)) @@ -761,8 +783,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) } #endif } - - if(conn_ctx.current_server->allow_insecure <= 0) + if(conn_ctx.current_server != NULL && conn_ctx.current_server->allow_insecure <= 0) { ParodusInfo("Connected to server over SSL\n"); OnboardLog("Connected to server over SSL\n"); @@ -772,9 +793,12 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) ParodusInfo("Connected to server\n"); OnboardLog("Connected to server\n"); } - - get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; - ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status); + + /* On initial connect success, invoke conn status change event as "success" */ + if((NULL != on_conn_status_change) && init) + { + on_conn_status_change("success"); + } // Invoke the ping status change event callback as "received" ping if(NULL != on_ping_status_change) @@ -782,9 +806,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) on_ping_status_change("received"); } - if((get_parodus_cfg()->boot_time != 0) && init) { - getCurrentTime(connectTimePtr); - ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time); + if(init) { + sysinfo(&l_sSysInfo); + ParodusInfo("connect_time-diff-boot_time=%ld\n", l_sSysInfo.uptime); init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect } @@ -800,6 +824,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list) ParodusPrint("LastReasonStatus reset after successful connection\n"); setMessageHandlers(); stop_conn_in_progress (); + ParodusPrint("set cloud_status\n"); + set_cloud_status(CLOUD_STATUS_ONLINE); + ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status()); return nopoll_true; } @@ -844,7 +871,7 @@ static noPollConnOpts * createConnOpts (char * extra_headers, bool secure) nopoll_conn_opts_ssl_peer_verify (opts, nopoll_true); nopoll_conn_opts_set_ssl_protocol (opts, NOPOLL_METHOD_TLSV1_2); } - nopoll_conn_opts_set_interface (opts,get_parodus_cfg()->webpa_interface_used); + nopoll_conn_opts_set_interface (opts,getWebpaInterface()); nopoll_conn_opts_set_extra_headers (opts,extra_headers); return opts; } @@ -884,8 +911,8 @@ void close_and_unref_connection(noPollConn *conn, bool is_shutting_down) { if (conn) { close_conn (conn, is_shutting_down); - get_parodus_cfg()->cloud_status = CLOUD_STATUS_OFFLINE; - ParodusInfo("cloud_status set as %s after connection close\n", get_parodus_cfg()->cloud_status); + set_cloud_status(CLOUD_STATUS_OFFLINE); + ParodusInfo("cloud_status set as %s after connection close\n", get_cloud_status()); } } @@ -934,3 +961,8 @@ void registerParodusOnPingStatusChangeHandler(parodusOnPingStatusChangeHandler c on_ping_status_change = callback_func; } +void registerParodusOnConnStatusChangeHandler(parodusOnConnStatusChangeHandler callback_func) +{ + on_conn_status_change = callback_func; +} + diff --git a/src/connection.h b/src/connection.h index d750829..adff9c2 100644 --- a/src/connection.h +++ b/src/connection.h @@ -37,6 +37,15 @@ extern "C" { #define SHUTDOWN_REASON_SYSTEM_RESTART "system_restarting" #define SHUTDOWN_REASON_SIGTERM "SIGTERM" +/** +* parodusOnConnStatusChangeHandler - Function pointer +* Used to define callback function to do additional processing +* when websocket cloud connection status change event +* i.e. "cloud-conn-status" as "fail" or "success" +*/ +typedef void (*parodusOnConnStatusChangeHandler) (char * status); +extern parodusOnConnStatusChangeHandler on_conn_status_change; + /** * parodusOnPingStatusChangeHandler - Function pointer * Used to define callback function to do additional processing @@ -79,6 +88,9 @@ void set_cloud_disconnect_time(int disconnTime); void start_conn_in_progress (unsigned long start_time); void stop_conn_in_progress (void); +// To Register parodusOnConnStatusChangeHandler Callback function +void registerParodusOnConnStatusChangeHandler(parodusOnConnStatusChangeHandler on_conn_status_change); + // To Register parodusOnPingStatusChangeHandler Callback function void registerParodusOnPingStatusChangeHandler(parodusOnPingStatusChangeHandler on_ping_status_change); diff --git a/src/crud_internal.c b/src/crud_internal.c index 0a77490..e11b2f9 100644 --- a/src/crud_internal.c +++ b/src/crud_internal.c @@ -537,13 +537,13 @@ int retrieveFromMemory(char *keyName, cJSON **jsonresponse) } else if(strcmp(WEBPA_INTERFACE, keyName)==0) { - if((get_parodus_cfg()->webpa_interface_used !=NULL)&& (strlen(get_parodus_cfg()->fw_name)==0)) + if((getWebpaInterface() !=NULL)&& (strlen(get_parodus_cfg()->fw_name)==0)) { ParodusError("retrieveFromMemory: webpa_interface_used value is NULL\n"); return -1; } - ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n",keyName,get_parodus_cfg()->webpa_interface_used); - cJSON_AddItemToObject( *jsonresponse, WEBPA_INTERFACE , cJSON_CreateString(get_parodus_cfg()->webpa_interface_used)); + ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n",keyName,getWebpaInterface()); + cJSON_AddItemToObject( *jsonresponse, WEBPA_INTERFACE , cJSON_CreateString(getWebpaInterface())); } else if(strcmp(WEBPA_URL, keyName)==0) { @@ -577,20 +577,20 @@ int retrieveFromMemory(char *keyName, cJSON **jsonresponse) } else if(strcmp(CLOUD_STATUS, keyName)==0) { - if(get_parodus_cfg()->cloud_status ==NULL) + if(get_cloud_status() ==NULL) { ParodusError("retrieveFromMemory: cloud_status value is NULL\n"); return -1; } - else if((get_parodus_cfg()->cloud_status !=NULL) && (strlen(get_parodus_cfg()->cloud_status)==0)) + else if((get_cloud_status() !=NULL) && (strlen(get_cloud_status())==0)) { ParodusError("retrieveFromMemory: cloud_status value is empty\n"); return -1; } else { - ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n", keyName, get_parodus_cfg()->cloud_status); - cJSON_AddItemToObject( *jsonresponse, CLOUD_STATUS , cJSON_CreateString(get_parodus_cfg()->cloud_status)); + ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n", keyName, get_cloud_status()); + cJSON_AddItemToObject( *jsonresponse, CLOUD_STATUS , cJSON_CreateString(get_cloud_status())); } } else if(strcmp(BOOT_TIME, keyName)==0) diff --git a/src/downstream.c b/src/downstream.c index 952a347..55e890d 100644 --- a/src/downstream.c +++ b/src/downstream.c @@ -27,6 +27,9 @@ #include "partners_check.h" #include "ParodusInternal.h" #include "crud_interface.h" +#ifdef ENABLE_WEBCFGBIN +#include "xmidtsend_rbus.h" +#endif /*----------------------------------------------------------------------------*/ /* Function Prototypes */ /*----------------------------------------------------------------------------*/ @@ -91,6 +94,7 @@ void listenerOnMessage(void * msg, size_t msgSize) { ParodusPrint("numOfClients registered is %d\n", get_numOfClients()); int ret = validate_partner_id(message, NULL); + ParodusPrint("validate_partner_id returns %d\n", ret); if(ret < 0) { response = cJSON_CreateObject(); @@ -98,7 +102,6 @@ void listenerOnMessage(void * msg, size_t msgSize) cJSON_AddStringToObject(response, "message", "Invalid partner_id"); } - destVal = strdup(((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.dest : ((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.dest : message->u.crud.dest))); @@ -120,10 +123,10 @@ void listenerOnMessage(void * msg, size_t msgSize) } ParodusInfo("Received downstream dest as :%s and transaction_uuid :%s\n", dest, ((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.transaction_uuid : - ((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid))); + ((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.transaction_uuid : message->u.crud.transaction_uuid))); OnboardLog("%s\n", ((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.transaction_uuid : - ((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid))); + ((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.transaction_uuid : message->u.crud.transaction_uuid))); free(destVal); @@ -168,13 +171,16 @@ void listenerOnMessage(void * msg, size_t msgSize) destFlag =1; } //if any unknown dest received sending error response to server - if(destFlag ==0) - { - ParodusError("Unknown dest:%s\n", dest); - response = cJSON_CreateObject(); - cJSON_AddNumberToObject(response, "statusCode", 531); - cJSON_AddStringToObject(response, "message", "Service Unavailable"); - } + if (WRP_MSG_TYPE__EVENT != msgType) + { + if(destFlag ==0) + { + ParodusError("Unknown dest:%s\n", dest); + response = cJSON_CreateObject(); + cJSON_AddNumberToObject(response, "statusCode", 531); + cJSON_AddStringToObject(response, "message", "Service Unavailable"); + } + } } if( (WRP_MSG_TYPE__EVENT != msgType) && @@ -235,6 +241,37 @@ void listenerOnMessage(void * msg, size_t msgSize) } free(resp_msg); } + #ifdef ENABLE_WEBCFGBIN + //To handle cloud ack events received from server for the xmidt sent messages. + if((WRP_MSG_TYPE__EVENT == msgType) && (ret >= 0)) + { + if(get_parodus_cfg()->max_queue_size > 0) + { + //Process cloud ack only when qos > 24 + if(highQosValueCheck(message->u.event.qos)) + { + if(message->u.event.transaction_uuid !=NULL) + { + ParodusInfo("Received cloud ack from server: transaction_uuid %s qos %d, rdr %d source %s\n", message->u.event.transaction_uuid, message->u.event.qos, message->u.event.rdr, message->u.event.source); + addToCloudAckQ(message->u.event.transaction_uuid, message->u.event.qos, message->u.event.rdr, message->u.event.source); + ParodusPrint("Added to cloud ack Q\n"); + } + else + { + ParodusError("cloud ack transaction id is NULL\n"); + } + } + else + { + ParodusInfo("cloud ack received with low qos %d, ignoring it\n", message->u.event.qos); + } + } + else + { + ParodusInfo("cloud ack is ignored as max queue size is %d\n", get_parodus_cfg()->max_queue_size ); + } + } + #endif break; } diff --git a/src/heartBeat.c b/src/heartBeat.c index 41eeb37..e721cb5 100644 --- a/src/heartBeat.c +++ b/src/heartBeat.c @@ -22,12 +22,15 @@ */ #include "heartBeat.h" +#include "time.h" #include volatile unsigned int heartBeatTimer = 0; volatile bool paused = false; +volatile long long pingTimeStamp = 0; pthread_mutex_t heartBeat_mut=PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t ping_mut=PTHREAD_MUTEX_INITIALIZER; // Get value of heartBeatTimer unsigned int get_heartBeatTimer() @@ -73,5 +76,22 @@ void resume_heartBeatTimer() pthread_mutex_unlock (&heartBeat_mut); } +// Set ping received timeStamp +void set_pingTimeStamp() +{ + struct timespec ts; + getCurrentTime(&ts); + pthread_mutex_lock (&ping_mut); + pingTimeStamp = (long long)ts.tv_sec; + pthread_mutex_unlock (&ping_mut); +} - +// Get ping received timeStamp +long long get_pingTimeStamp() +{ + long long tmp = 0; + pthread_mutex_lock (&ping_mut); + tmp = pingTimeStamp; + pthread_mutex_unlock (&ping_mut); + return tmp; +} diff --git a/src/heartBeat.h b/src/heartBeat.h index 2986968..04a73c4 100644 --- a/src/heartBeat.h +++ b/src/heartBeat.h @@ -45,6 +45,10 @@ void pause_heartBeatTimer(); // Resume heartBeatTimer, i.e. resume incrementing void resume_heartBeatTimer(); +void set_pingTimeStamp(); + +long long get_pingTimeStamp(); + #ifdef __cplusplus } #endif diff --git a/src/main.c b/src/main.c index 30eb032..a1b4eef 100644 --- a/src/main.c +++ b/src/main.c @@ -24,7 +24,11 @@ #include "parodus_log.h" #include #ifdef INCLUDE_BREAKPAD +#ifndef DEVICE_CAMERA #include "breakpad_wrapper.h" +#else +#include "breakpadwrap.h" +#endif //DEVICE_CAMERA #endif #include "signal.h" #include "privilege.h" @@ -87,8 +91,17 @@ int main( int argc, char **argv) signal(SIGHUP, sig_handler); signal(SIGALRM, sig_handler); #ifdef INCLUDE_BREAKPAD +#ifndef DEVICE_CAMERA /* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */ breakpad_ExceptionHandler(); +#else + /* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */ + BreakPadWrapExceptionHandler eh; + eh = newBreakPadWrapExceptionHandler(); + if(NULL != eh) { + ParodusInfo("Breakpad Initialized\n"); + } +#endif //DEVICE_CAMERA #else signal(SIGSEGV, sig_handler); signal(SIGBUS, sig_handler); @@ -107,6 +120,11 @@ int main( int argc, char **argv) ParodusInfo("********** Starting component: Parodus **********\n "); drop_root_privilege(); + #ifdef ENABLE_WEBCFGBIN + registerRbusLogger(); + subscribeRBUSevent(); + regXmidtSendDataMethod(); + #endif setDefaultValuesToCfg(cfg); if (0 != parseCommandLine(argc,argv,cfg)) { abort(); diff --git a/src/nopoll_handlers.c b/src/nopoll_handlers.c index 6172e7d..e363b04 100644 --- a/src/nopoll_handlers.c +++ b/src/nopoll_handlers.c @@ -149,6 +149,7 @@ void listenerOnPingMessage (noPollCtx * ctx, noPollConn * conn, noPollMsg * msg, if (nopoll_msg_opcode(msg) == NOPOLL_PING_FRAME) { reset_heartBeatTimer(); + set_pingTimeStamp(); } } } diff --git a/src/nopoll_helpers.c b/src/nopoll_helpers.c index 70ebc0f..4b245a4 100644 --- a/src/nopoll_helpers.c +++ b/src/nopoll_helpers.c @@ -50,9 +50,9 @@ void setMessageHandlers() nopoll_conn_set_on_close(get_global_conn(), (noPollOnCloseHandler)listenerOnCloseMessage, NULL); } -static int cloud_status_is_online (void) +int cloud_status_is_online (void) { - const char *status = get_parodus_cfg()->cloud_status; + const char *status = get_cloud_status(); if (NULL == status) return false; return (strcmp (status, CLOUD_STATUS_ONLINE) == 0); @@ -60,14 +60,14 @@ static int cloud_status_is_online (void) /** To send upstream msgs to server ***/ -void sendMessage(noPollConn *conn, void *msg, size_t len) +int sendMessage(noPollConn *conn, void *msg, size_t len) { int bytesWritten = 0; if (!cloud_status_is_online ()) { ParodusError("Failed to send msg upstream as connection is not OK\n"); OnboardLog("Failed to send msg upstream as connection is not OK\n"); - return; + return 1; } ParodusInfo("sendMessage length %zu\n", len); @@ -77,7 +77,9 @@ void sendMessage(noPollConn *conn, void *msg, size_t len) if (bytesWritten != (int) len) { ParodusError("Failed to send bytes %zu, bytes written were=%d (errno=%d, %s)..\n", len, bytesWritten, errno, strerror(errno)); + return 1; } + return 0; } int sendResponse(noPollConn * conn, void * buffer, size_t length) diff --git a/src/nopoll_helpers.h b/src/nopoll_helpers.h index a17ddba..d6c8516 100644 --- a/src/nopoll_helpers.h +++ b/src/nopoll_helpers.h @@ -41,7 +41,7 @@ extern "C" { */ int sendResponse(noPollConn * conn,void *str, size_t bufferSize); void setMessageHandlers(); -void sendMessage(noPollConn *conn, void *msg, size_t len); +int sendMessage(noPollConn *conn, void *msg, size_t len); /** * @brief __report_log Nopoll log handler diff --git a/src/partners_check.c b/src/partners_check.c index 2c46d0c..7a88d66 100644 --- a/src/partners_check.c +++ b/src/partners_check.c @@ -114,7 +114,7 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) } /* Commandline input partner_ids not matching with partner_ids from request, appending to request partner_ids*/ - if(matchFlag != 1) + if(matchFlag != 1 && partnerIds !=NULL) { (*partnerIds) = (partners_t *) malloc(sizeof(partners_t) + (sizeof(char *) * (count+partnersList->count))); (*partnerIds)->count = count+partnersList->count; @@ -132,6 +132,23 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) i++; } } + else if (matchFlag != 1 && partnerIds == NULL) + { + ParodusError("partner_id match not found: command line input %s , msg partner_id %s\n", temp, msg->u.event.partner_ids->partner_ids[0]); + if(partnersList != NULL) + { + for(j=0; jcount; j++) + { + if(NULL != partnersList->partner_ids[j]) + { + free(partnersList->partner_ids[j]); + } + } + free(partnersList); + } + free(partnerId); + return 1; + } } else { diff --git a/src/time.c b/src/time.c index 0c81356..d6eb742 100644 --- a/src/time.c +++ b/src/time.c @@ -17,6 +17,7 @@ #include "time.h" #include "parodus_log.h" +#include /*----------------------------------------------------------------------------*/ /* External Functions */ @@ -24,7 +25,10 @@ void getCurrentTime(struct timespec *timer) { - clock_gettime(CLOCK_REALTIME, timer); + if( clock_gettime(CLOCK_REALTIME, timer) == -1 ) + { + ParodusError("clock gettime returns errno %d\n", errno ); + } } uint64_t getCurrentTimeInMicroSeconds(struct timespec *timer) @@ -34,7 +38,7 @@ uint64_t getCurrentTimeInMicroSeconds(struct timespec *timer) ParodusPrint("timer->tv_sec : %lu\n",timer->tv_sec); ParodusPrint("timer->tv_nsec : %lu\n",timer->tv_nsec); systime = (uint64_t)timer->tv_sec * 1000000L + timer->tv_nsec/ 1000; - return systime; + return systime; } long timeValDiff(struct timespec *starttime, struct timespec *finishtime) diff --git a/src/upstream.c b/src/upstream.c index 0f13df0..41ef2fa 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -40,7 +40,7 @@ /*----------------------------------------------------------------------------*/ void *metadataPack; -size_t metaPackSize=-1; +size_t metaPackSize=0; UpStreamMsg *UpStreamMsgQ = NULL; @@ -83,7 +83,6 @@ void packMetaData() //Pack the metadata initially to reuse for every upstream msg sending to server ParodusPrint("-------------- Packing metadata ----------------\n"); sprintf(boot_time, "%d", get_parodus_cfg()->boot_time); - struct data meta_pack[METADATA_COUNT] = { {HW_MODELNAME, get_parodus_cfg()->hw_model}, {HW_SERIALNUMBER, get_parodus_cfg()->hw_serial_number}, @@ -95,10 +94,9 @@ void packMetaData() {LAST_RECONNECT_REASON, get_global_reconnect_reason()}, {WEBPA_PROTOCOL, get_parodus_cfg()->webpa_protocol}, {WEBPA_UUID,get_parodus_cfg()->webpa_uuid}, - {WEBPA_INTERFACE, get_parodus_cfg()->webpa_interface_used}, + {WEBPA_INTERFACE, getWebpaInterface()}, {PARTNER_ID, get_parodus_cfg()->partner_id} }; - const data_t metapack = {METADATA_COUNT, meta_pack}; metaPackSize = wrp_pack_metadata( &metapack , &metadataPack ); @@ -322,7 +320,10 @@ void *processUpstreamMessage() } else if(msgType == WRP_MSG_TYPE__EVENT) { - ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest); + (msg->u.event.headers != NULL && msg->u.event.headers->headers[0] != NULL && msg->u.event.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream event data: dest '%s' traceParent: %s traceState: %s\n", msg->u.event.dest, msg->u.event.headers->headers[0], msg->u.event.headers->headers[1]) : ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest); + if(msg->u.event.transaction_uuid != NULL) { + ParodusInfo("transaction_uuid in event: %s\n", msg->u.event.transaction_uuid); + } partners_t *partnersList = NULL; int j = 0; @@ -330,6 +331,7 @@ void *processUpstreamMessage() if(ret == 1) { wrp_msg_t *eventMsg = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset( eventMsg, 0, sizeof( wrp_msg_t ) ); eventMsg->msg_type = msgType; eventMsg->u.event.content_type=msg->u.event.content_type; eventMsg->u.event.source=msg->u.event.source; @@ -339,6 +341,15 @@ void *processUpstreamMessage() eventMsg->u.event.headers=msg->u.event.headers; eventMsg->u.event.metadata=msg->u.event.metadata; eventMsg->u.event.partner_ids = partnersList; + if(msg->u.event.transaction_uuid) + { + ParodusPrint("Inside Trans id in PARODUS\n"); + } + else + { + ParodusPrint("Assigning NULL to trans id\n"); + eventMsg->u.event.transaction_uuid = NULL; + } int size = wrp_struct_to( eventMsg, WRP_BYTES, &bytes ); if(size > 0) @@ -371,7 +382,7 @@ void *processUpstreamMessage() //Sending to server for msgTypes 3, 5, 6, 7, 8. if( WRP_MSG_TYPE__REQ == msgType ) { - ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid ); + (msg->u.req.headers != NULL && msg->u.req.headers->headers[0] != NULL && msg->u.req.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s traceParent: %s traceState: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid, msg->u.req.headers->headers[0], msg->u.req.headers->headers[1]) : ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid); sendUpstreamMsgToServer(&message->msg, message->len); } else @@ -596,11 +607,12 @@ void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_ } } -void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) +int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) { void *appendData; size_t encodedSize; bool close_retry = false; + int sendRetStatus = 1; //appending response with metadata if(metaPackSize > 0) { @@ -615,12 +627,13 @@ void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) //TODO: Upstream and downstream messages in queue should be handled and queue should be empty before parodus forcefully disconnect from cloud. if(!close_retry || (get_parodus_cfg()->cloud_disconnect !=NULL)) { - sendMessage(get_global_conn(),appendData, encodedSize); + sendRetStatus = sendMessage(get_global_conn(),appendData, encodedSize); } else { ParodusInfo("close_retry is %d, unable to send response as connection retry is in progress\n", close_retry); OnboardLog("close_retry is %d, unable to send response as connection retry is in progress\n", close_retry); + sendRetStatus = 1; } free(appendData); appendData =NULL; @@ -628,6 +641,9 @@ void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) else { ParodusError("Failed to send upstream as metadata packing is not successful\n"); + sendRetStatus = 1; } + ParodusPrint("sendRetStatus is %d\n", sendRetStatus); + return sendRetStatus; } diff --git a/src/upstream.h b/src/upstream.h index adf222e..d686257 100644 --- a/src/upstream.h +++ b/src/upstream.h @@ -47,11 +47,15 @@ typedef struct UpStreamMsg__ void packMetaData(); void *handle_upstream(); void *processUpstreamMessage(); +void registerRBUSlistener(); int getDeviceId(char **device_id, size_t *device_id_len); -void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size); +int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size); void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size); void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg); void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ); +#ifdef WAN_FAILOVER_SUPPORTED +int subscribeCurrentActiveInterfaceEvent(); +#endif UpStreamMsg * get_global_UpStreamMsgQ(void); pthread_cond_t *get_global_nano_con(void); pthread_mutex_t *get_global_nano_mut(void); diff --git a/src/upstream_rbus.c b/src/upstream_rbus.c new file mode 100644 index 0000000..ba2b669 --- /dev/null +++ b/src/upstream_rbus.c @@ -0,0 +1,243 @@ +/** + * Copyright 2021 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * @file upstream_rbus.c + * + * @description This is used for parodus-RBUS communication + * to send notification events upstream to cloud. + * + */ + +#include +#include +#include "upstream.h" +#include "ParodusInternal.h" +#include "partners_check.h" +#include "close_retry.h" +#include "connection.h" +#include "heartBeat.h" + +#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream" +#ifdef WAN_FAILOVER_SUPPORTED +#define WEBPA_INTERFACE "Device.X_RDK_WanManager.CurrentActiveInterface" +#endif + +rbusHandle_t rbus_Handle; +rbusError_t err; + +void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription); + +void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error); + +rbusHandle_t get_parodus_rbus_Handle(void) +{ + return rbus_Handle; +} + +/* Enables rbus ERROR level logs in parodus. Modify RBUS_LOG_ERROR check if more debug logs are needed from rbus. */ +void rbus_log_handler( + rbusLogLevel level, + const char* file, + int line, + int threadId, + char* message) +{ + ParodusPrint("threadId %d\n", threadId); + const char* slevel = ""; + + if(level < RBUS_LOG_ERROR) + return; + + switch(level) + { + case RBUS_LOG_DEBUG: slevel = "DEBUG"; break; + case RBUS_LOG_INFO: slevel = "INFO"; break; + case RBUS_LOG_WARN: slevel = "WARN"; break; + case RBUS_LOG_ERROR: slevel = "ERROR"; break; + case RBUS_LOG_FATAL: slevel = "FATAL"; break; + } + ParodusInfo("%5s %s:%d -- %s\n", slevel, file, line, message); +} + +void registerRbusLogger() +{ + rbus_registerLogHandler(rbus_log_handler); + ParodusPrint("Registered rbus log handler\n"); +} + +#ifdef WAN_FAILOVER_SUPPORTED +void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription ); +#endif + +/* API to register RBUS listener to receive messages from webconfig */ +void subscribeRBUSevent() +{ + int rc = RBUS_ERROR_SUCCESS; + err = rbus_open(&rbus_Handle, "parodus"); + if (err) + { + ParodusError("rbus_open failed :%s\n", rbusError_ToString(err)); + return; + } + rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBCFG_UPSTREAM_EVENT,processWebconfigUpstreamEvent,subscribeAsyncHandler,"parodus",10*60); + if(rc != RBUS_ERROR_SUCCESS) + ParodusError("rbusEvent_Subscribe failed: %d, %s\n", rc, rbusError_ToString(rc)); + else + ParodusInfo("rbusEvent_Subscribe was successful\n"); +} + +#ifdef WAN_FAILOVER_SUPPORTED +/* API to subscribe Active Interface name on value change event*/ +int subscribeCurrentActiveInterfaceEvent() +{ + int rc = RBUS_ERROR_SUCCESS; + ParodusInfo("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n"); + rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBPA_INTERFACE,eventReceiveHandler,subscribeAsyncHandler,"parodusInterface",10*20); + if(rc != RBUS_ERROR_SUCCESS) + { + ParodusError("%s subscribe failed : %d - %s\n", WEBPA_INTERFACE, rc, rbusError_ToString(rc)); + } + return rc; +} +#endif + +void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription) +{ + (void)handle; + (void)subscription; + + int rv=-1; + wrp_msg_t *event_msg; + void *bytes; + const uint8_t* bytesVal = NULL; + int len; + rbusValue_t value = NULL; + + value = rbusObject_GetValue(event->data, "value"); + bytesVal = rbusValue_GetBytes(value, &len); + + bytes = (void*) bytesVal; + rv = wrp_to_struct( bytes, len, WRP_BYTES, &event_msg ); + if(rv > 0) + { + ParodusInfo(" Received upstream event data: dest '%s'\n", event_msg->u.event.dest); + partners_t *partnersList = NULL; + int j = 0; + + int ret = validate_partner_id(event_msg, &partnersList); + if(ret == 1) + { + wrp_msg_t *eventMsg = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); + memset( eventMsg, 0, sizeof( wrp_msg_t ) ); + eventMsg->msg_type = event_msg->msg_type; + eventMsg->u.event.content_type=event_msg->u.event.content_type; + eventMsg->u.event.source=event_msg->u.event.source; + eventMsg->u.event.dest=event_msg->u.event.dest; + eventMsg->u.event.payload=event_msg->u.event.payload; + eventMsg->u.event.payload_size=event_msg->u.event.payload_size; + eventMsg->u.event.headers=event_msg->u.event.headers; + eventMsg->u.event.metadata=event_msg->u.event.metadata; + eventMsg->u.event.partner_ids = partnersList; + if(event_msg->u.event.transaction_uuid) + { + ParodusPrint("Inside Trans id in PARODUS_rbus\n"); + } + else + { + ParodusPrint("Assigning NULL to trans id RBUS\n"); + eventMsg->u.event.transaction_uuid = NULL; + } + + int size = wrp_struct_to( eventMsg, WRP_BYTES, &bytes ); + if(size > 0) + { + sendUpstreamMsgToServer(&bytes, size); + } + free(eventMsg); + free(bytes); + bytes = NULL; + } + else + { + sendUpstreamMsgToServer((void **)(&bytes), len); + } + if(partnersList != NULL) + { + for(j=0; j<(int)partnersList->count; j++) + { + if(NULL != partnersList->partner_ids[j]) + { + free(partnersList->partner_ids[j]); + } + } + free(partnersList); + } + partnersList = NULL; + } +} + +void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error) +{ + (void)handle; + ParodusInfo("subscribeAsyncHandler event %s, error %d - %s\n",subscription->eventName, error, rbusError_ToString(error)); +} + +#ifdef WAN_FAILOVER_SUPPORTED +void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription ) +{ + (void)subscription; + ParodusPrint("Handling event inside eventReceiveHandler\n"); + (void)rbus_Handle; + char * interface = NULL; + rbusValue_t newValue = rbusObject_GetValue(event->data, "value"); + rbusValue_t oldValue = rbusObject_GetValue(event->data, "oldValue"); + ParodusInfo("Consumer received ValueChange event for param %s\n", event->name); + + if(newValue) { + interface = (char *) rbusValue_GetString(newValue, NULL); + setWebpaInterface(interface); + } + else { + ParodusError("newValue is NULL\n"); + } + + if(newValue !=NULL && oldValue!=NULL && interface!=NULL) { + ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface); + + // If interface is already down then reset it and reconnect cloud conn as wan failover event is received + if(get_interface_down_event()) + { + reset_interface_down_event(); + ParodusInfo("Interface_down_event is reset\n"); + resume_heartBeatTimer(); + } + // Close cloud conn and reconnect with the new interface as wan failover event is received + set_global_reconnect_reason("WAN_FAILOVER"); + set_global_reconnect_status(true); + set_close_retry(); + + } + else { + if(oldValue == NULL) { + ParodusError("oldValue is NULL\n"); + } + if(interface == NULL) { + ParodusError("interface is NULL\n"); + } + } +} +#endif diff --git a/src/xmidtsend_rbus.c b/src/xmidtsend_rbus.c new file mode 100644 index 0000000..e0dd4b1 --- /dev/null +++ b/src/xmidtsend_rbus.c @@ -0,0 +1,1735 @@ +/** + * Copyright 2022 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * @file xmidtsend_rbus.c + * + * @ To provide Xmidt send RBUS method to send events upstream. + * + */ + +#include +#include +#include +#include "upstream.h" +#include "ParodusInternal.h" +#include "partners_check.h" +#include "xmidtsend_rbus.h" +#include "config.h" +#include "time.h" +#include "heartBeat.h" + +static pthread_t processThreadId = 0; +static unsigned int XmidtQsize = 0; +XmidtMsg *XmidtMsgQ = NULL; + +CloudAck *g_cloudackHead = NULL; + +pthread_mutex_t xmidt_mut=PTHREAD_MUTEX_INITIALIZER; + +pthread_cond_t xmidt_con=PTHREAD_COND_INITIALIZER; + +pthread_mutex_t cloudack_mut=PTHREAD_MUTEX_INITIALIZER; + +const char * contentTypeList[]={ +"application/json", +"avro/binary", +"application/msgpack", +"application/binary" +}; + +void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid) { + ParodusInfo("status: %s, qos: %d, dest: %s, transaction_uuid: %s\n", (status!=NULL)?status:"NULL", qos, (dest!=NULL)?dest:"NULL", (transaction_uuid!=NULL)?transaction_uuid:"NULL"); +} + +bool highQosValueCheck(int qos) +{ + if(qos > 24) + { + ParodusPrint("The qos value is high\n"); + return true; + } + else + { + ParodusPrint("The qos value is low\n"); + } + + return false; +} + +XmidtMsg * get_global_xmidthead(void) +{ + XmidtMsg *tmp = NULL; + pthread_mutex_lock (&xmidt_mut); + tmp = XmidtMsgQ; + pthread_mutex_unlock (&xmidt_mut); + return tmp; +} + +void set_global_xmidthead(XmidtMsg *new) +{ + pthread_mutex_lock (&xmidt_mut); + XmidtMsgQ = new; + pthread_mutex_unlock (&xmidt_mut); +} + +CloudAck * get_global_cloud_node(void) +{ + CloudAck * tmp = NULL; + pthread_mutex_lock (&cloudack_mut); + tmp = g_cloudackHead; + pthread_mutex_unlock (&cloudack_mut); + return tmp; +} + +unsigned int get_XmidtQsize() +{ + unsigned int tmp = 0; + pthread_mutex_lock (&xmidt_mut); + tmp = XmidtQsize; + pthread_mutex_unlock (&xmidt_mut); + return tmp; +} + +void increment_XmidtQsize() +{ + pthread_mutex_lock (&xmidt_mut); + XmidtQsize++; + ParodusInfo("XmidtQsize incremented to %d\n", XmidtQsize); + pthread_mutex_unlock (&xmidt_mut); +} + +void decrement_XmidtQsize() +{ + pthread_mutex_lock (&xmidt_mut); + XmidtQsize--; + pthread_mutex_unlock (&xmidt_mut); +} + +int checkCloudConn() +{ + int ret = 1; + if (!cloud_status_is_online ()) + { + ParodusInfo("cloud status is not online, wait till connection up\n"); + + int rv; + struct timespec ts; + + while (1) + { + pthread_mutex_lock(get_global_cloud_status_mut()); + getCurrentTime(&ts); + ts.tv_sec += EXPIRY_CHECK_TIME; + ParodusPrint("checkCloudConn timeout at %lld\n", (long long) ts.tv_sec); + rv = pthread_cond_timedwait(get_global_cloud_status_cond(), get_global_cloud_status_mut(), &ts); + if (rv == ETIMEDOUT) + { + ParodusInfo("Timedout. Cloud connection is down for %d minutes, check msg expiry\n", (EXPIRY_CHECK_TIME/60)); + pthread_mutex_unlock(get_global_cloud_status_mut()); + int opt = xmidtQOptmize(); + if(opt) + { + ret = 2; + ParodusInfo("xmidtQ is optimized during connection down %d, ret %d\n", opt, ret); + } + } + else + { + ParodusInfo("Received cloud status signal, proceed to event processing\n"); + pthread_mutex_unlock(get_global_cloud_status_mut()); + break; + } + } + } + ParodusPrint("checkCloudConn ret %d\n", ret); + return ret; +} + +//Delete all expired msgs from queue and low qos when max queue. +int xmidtQOptmize() +{ + long long currTime = 0; + struct timespec ts; + int rv = 0, status = 0; + XmidtMsg *next_node = NULL; + char *errorMsg = NULL; + + XmidtMsg *temp = NULL; + temp = get_global_xmidthead(); + + while(temp != NULL) + { + getCurrentTime(&ts); + currTime= (long long)ts.tv_sec; + int del = 0; + + wrp_msg_t * tempMsg = temp->msg; + ParodusPrint("qos %d currTime %lu enqueueTime %lu\n", tempMsg->u.event.qos, currTime, temp->enqueueTime); + if(tempMsg->u.event.qos > 74) + { + if((currTime - temp->enqueueTime) > CRITICAL_QOS_EXPIRE_TIME) + { + ParodusInfo("Critical qos 30 mins expired, delete qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + del = 1; + } + } + else if (tempMsg->u.event.qos > 49) + { + if((currTime - temp->enqueueTime) > HIGH_QOS_EXPIRE_TIME) + { + ParodusInfo("High qos 25 mins expired, delete qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + del = 1; + } + } + else if (tempMsg->u.event.qos > 24) + { + if((currTime - temp->enqueueTime) > MEDIUM_QOS_EXPIRE_TIME) + { + ParodusInfo("Medium qos 20 mins expired, delete qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + del = 1; + } + } + else if (tempMsg->u.event.qos >= 0) + { + if((currTime - temp->enqueueTime) > LOW_QOS_EXPIRE_TIME) + { + ParodusInfo("Low qos 15 mins expired, delete qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + del = 1; + } + else + { + if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size) + { + ParodusInfo("Max queue size reached, delete low qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + del = 2; + } + } + } + else + { + ParodusError("Invalid qos\n"); + } + + if(del) + { + ParodusPrint("msg expired, updateXmidtState to DELETE\n"); + updateXmidtState(temp, DELETE); + //rbus callback to caller + if(del == 1) + { + mapXmidtStatusToStatusMessage(MSG_EXPIRED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, MSG_EXPIRED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + } + else if(del == 2) + { + mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + } + status = deleteFromXmidtQ(&next_node); + temp = next_node; + if(status) + { + ParodusPrint("deleteFromXmidtQ success\n"); + rv = 1; + } + else + { + ParodusError("deleteFromXmidtQ failed\n"); + } + continue; + } + if(temp) + { + temp = temp->next; + } + } + ParodusPrint("xmidtQOptmize returns rv %d\n", rv); + return rv; +} + +/* + * @brief To handle xmidt rbus messages received from various components. + */ +void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle) +{ + XmidtMsg *message; + struct timespec times; + char * errorMsg = NULL; + + ParodusPrint("XmidtQsize is %d\n" , get_XmidtQsize()); + if( get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size) + { + ParodusInfo("queue size %d exceeded at producer, ignoring the event\n", get_XmidtQsize()); + mapXmidtStatusToStatusMessage(QUEUE_SIZE_EXCEEDED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + wrp_free_struct(msg); + return; + } + + ParodusPrint ("Add Xmidt Upstream message to queue\n"); + message = (XmidtMsg *)malloc(sizeof(XmidtMsg)); + + if(message) + { + message->msg = msg; + message->asyncHandle =asyncHandle; + message->state = PENDING; + getCurrentTime(×); + message->enqueueTime = (long long)times.tv_sec; + ParodusPrint("message->enqueueTime is %lld\n", message->enqueueTime); + message->sentTime = 0; + //Increment queue size to handle max queue limit + increment_XmidtQsize(); + message->next=NULL; + pthread_mutex_lock (&xmidt_mut); + //Producer adds the rbus msg into queue + if(XmidtMsgQ == NULL) + { + XmidtMsgQ = message; + + ParodusPrint("Producer added xmidt message\n"); + pthread_cond_signal(&xmidt_con); + pthread_mutex_unlock (&xmidt_mut); + ParodusPrint("mutex unlock in xmidt producer\n"); + } + else + { + XmidtMsg *temp = XmidtMsgQ; + while(temp->next) + { + temp = temp->next; + } + temp->next = message; + pthread_mutex_unlock (&xmidt_mut); + } + } + else + { + mapXmidtStatusToStatusMessage(ENQUEUE_FAILURE, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + wrp_free_struct(msg); + } + return; +} + +//Xmidt consumer thread to process the rbus events. +void processXmidtData() +{ + int err = 0; + err = pthread_create(&processThreadId, NULL, processXmidtUpstreamMsg, NULL); + if (err != 0) + { + ParodusError("Error creating processXmidtData thread :[%s]\n", strerror(err)); + } + else + { + ParodusInfo("processXmidtData thread created Successfully\n"); + } + +} + +//Consumer to Parse and process rbus data. +void* processXmidtUpstreamMsg() +{ + int rv = 0; + long long currTime = 0; + int ret = 0, status = 0; + struct timespec tms; + XmidtMsg *xmidtQ = NULL; + XmidtMsg *next_node = NULL; + int cv = 0; + + if(get_parodus_init()) + { + ParodusInfo("Initial cloud connection is not established, Xmidt wait till connection up\n"); + pthread_mutex_lock(get_global_cloud_status_mut()); + pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut()); + pthread_mutex_unlock(get_global_cloud_status_mut()); + ParodusInfo("Received cloud status signal proceed to event processing\n"); + } + + xmidtQ = get_global_xmidthead(); + + while(FOREVER()) + { + pthread_mutex_lock (&xmidt_mut); + ParodusPrint("mutex lock in xmidt consumer thread\n"); + if (xmidtQ != NULL) + { + XmidtMsg *Data = xmidtQ; + pthread_mutex_unlock (&xmidt_mut); + ParodusPrint("mutex unlock in xmidt consumer\n"); + checkMsgExpiry(xmidtQ); + checkMaxQandOptimize(xmidtQ); + cv = 0; + + ParodusPrint("check state\n"); + switch(Data->state) + { + case PENDING: + ParodusPrint("state : PENDING\n"); + //send msg to server only when cloud connection is online. + cv = checkCloudConn(); + if (cv == 2) + { + ParodusInfo("queue is optimized, reset to head node\n"); + } + else + { + ParodusPrint("cloud status is online, processData\n"); + rv = processData(Data, Data->msg, Data->asyncHandle); + if(!rv) + { + ParodusPrint("processData failed\n"); + } + else + { + if(rv == 2) + { + ParodusInfo("queue is optimized, reset to head\n"); + } + else + { + ParodusPrint("processData success\n"); + } + } + } + break; + + case SENT: + ParodusPrint("state : SENT\n"); + ParodusPrint("Check cloud ack for matching transaction id\n"); + ret = checkCloudACK(Data, Data->asyncHandle); + if (ret) + { + ParodusPrint("cloud ack processed successfully\n"); + } + else + { + getCurrentTime(&tms); + currTime = (long long)tms.tv_sec; + long long timeout_secs = (Data->sentTime) + CLOUD_ACK_TIMEOUT_SEC; + ParodusPrint("currTime %lld sentTime %lld CLOUD_ACK_TIMEOUT_SEC %d, timeout_secs %lld trans_id %s\n", currTime, Data->sentTime, CLOUD_ACK_TIMEOUT_SEC, timeout_secs, Data->msg->u.event.transaction_uuid); + if (currTime > timeout_secs) //ack timeout case + { + ParodusPrint("transaction id %s match not found, cloud ack timed out. Need to retry\n", Data->msg->u.event.transaction_uuid); + cv = checkCloudConn(); + if (cv == 2) + { + ParodusInfo("queue is optimized, reset to head node and retry\n"); + break; + } + else + { + ParodusPrint("cloud status is online, check ping time\n"); + if(get_pingTimeStamp() > (Data->sentTime + CLOUD_ACK_TIMEOUT_SEC)) + { + ParodusPrint("Ping received at timestamp %lld, proceed to retry\n", get_pingTimeStamp()); + rv = processData(Data, Data->msg, Data->asyncHandle); + if(!rv) + { + ParodusPrint("processData retry failed\n"); + } + else + { + if(rv == 2) + { + ParodusInfo("queue is optimized, reset to head\n"); + } + else + { + ParodusPrint("processData success\n"); + } + } + } + } + } + } + break; + case DELETE: + ParodusPrint("state : DELETE\n"); + status = deleteFromXmidtQ(&next_node); + if(status) + { + ParodusPrint("deleteFromXmidtQ success\n"); + xmidtQ = next_node; + } + else + { + ParodusError("deleteFromXmidtQ failed\n"); + } + break; + } + //sleep of 1s to process each msg ack and to avoid cpu load. + sleep(1); + + if(cv !=2 && xmidtQ !=NULL) + { + ParodusPrint("Move to next node\n"); + xmidtQ = xmidtQ->next; + } + + // circling back to 1st node + if((cv ==2) || (xmidtQ == NULL && get_global_xmidthead() != NULL)) + { + ParodusPrint("circling back to 1st node, cv %d\n", cv); + xmidtQ = get_global_xmidthead(); + } + } + else + { + if (g_shutdown) + { + pthread_mutex_unlock (&xmidt_mut); + break; + } + ParodusPrint("Before cond wait in xmidt consumer thread\n"); + pthread_cond_wait(&xmidt_con, &xmidt_mut); + pthread_mutex_unlock (&xmidt_mut); + ParodusPrint("mutex unlock in xmidt thread after cond wait\n"); + xmidtQ = get_global_xmidthead(); + } + } + return NULL; +} + +//To validate and send events upstream +int processData(XmidtMsg *Datanode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle) +{ + int rv = 0; + char *errorMsg = "none"; + int statuscode =0; + + wrp_msg_t * xmidtMsg = msg; + if (xmidtMsg == NULL) + { + ParodusError("xmidtMsg is NULL\n"); + mapXmidtStatusToStatusMessage(ENQUEUE_FAILURE, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(Datanode, DELETE); + return rv; + } + + rv = validateXmidtData(xmidtMsg, &errorMsg, &statuscode); + ParodusPrint("validateXmidtData, errorMsg %s statuscode %d\n", errorMsg, statuscode); + if(rv) + { + ParodusPrint("validation successful, send event to server\n"); + rv = sendXmidtEventToServer(Datanode, xmidtMsg, asyncHandle); + return rv; + } + else + { + ParodusError("validation failed, send failure ack\n"); + createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode, NULL, RBUS_ERROR_INVALID_INPUT); + updateXmidtState(Datanode, DELETE); + } + return rv; +} + +int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode) +{ + if(eventMsg == NULL) + { + ParodusError("eventMsg is NULL\n"); + return 0; + } + + if(eventMsg->msg_type != WRP_MSG_TYPE__EVENT) + { + *errorMsg = strdup("Message format is invalid"); + *statusCode = INVALID_MSG_TYPE; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + + if(eventMsg->u.event.source == NULL) + { + *errorMsg = strdup("Missing source"); + *statusCode = MISSING_SOURCE; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + + if(eventMsg->u.event.dest == NULL) + { + *errorMsg = strdup("Missing dest"); + *statusCode = MISSING_DEST; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + + if(eventMsg->u.event.content_type == NULL) + { + *errorMsg = strdup("Missing content_type"); + *statusCode = MISSING_CONTENT_TYPE; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + else + { + int i =0, count = 0, valid = 0; + count = sizeof(contentTypeList)/sizeof(contentTypeList[0]); + for(i = 0; iu.event.content_type, contentTypeList[i]) == 0) + { + ParodusPrint("content_type is valid %s\n", contentTypeList[i]); + valid = 1; + break; + } + } + if (!valid) + { + ParodusError("content_type is not valid, %s\n", eventMsg->u.event.content_type); + *errorMsg = strdup("Invalid content_type"); + *statusCode = INVALID_CONTENT_TYPE; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + } + + if(eventMsg->u.event.payload == NULL) + { + *errorMsg = strdup("Missing payload"); + *statusCode = MISSING_PAYLOAD; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + + if(eventMsg->u.event.payload_size == 0) + { + *errorMsg = strdup("Missing payloadlen"); + *statusCode = MISSING_PAYLOADLEN; + ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode); + return 0; + } + + ParodusPrint("validateXmidtData success. errorMsg %s statusCode %d\n", *errorMsg, *statusCode); + return 1; +} + +int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle) +{ + wrp_msg_t *notif_wrp_msg = NULL; + ssize_t msg_len; + void *msg_bytes; + int ret = -1; + char sourceStr[64] = {'\0'}; + char *device_id = NULL; + size_t device_id_len = 0; + int sendRetStatus = 1; + char *errorMsg = NULL; + int qos = 0; + int rv = 0; + + ParodusPrint("MAX_QUEUE_SIZE: %d\n", get_parodus_cfg()->max_queue_size); + notif_wrp_msg = (wrp_msg_t *)malloc(sizeof(wrp_msg_t)); + if(notif_wrp_msg != NULL) + { + memset(notif_wrp_msg, 0, sizeof(wrp_msg_t)); + notif_wrp_msg->msg_type = WRP_MSG_TYPE__EVENT; + + ParodusPrint("msg->u.event.source: %s\n",msg->u.event.source); + + if(msg->u.event.source !=NULL) + { + //To get device_id in the format "mac:112233445xxx" + ret = getDeviceId(&device_id, &device_id_len); + if(ret == 0) + { + ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len); + snprintf(sourceStr, sizeof(sourceStr), "%s/%s", device_id, msg->u.event.source); + ParodusPrint("sourceStr formed is %s\n" , sourceStr); + notif_wrp_msg->u.event.source = strdup(sourceStr); + ParodusPrint("source:%s\n", notif_wrp_msg->u.event.source); + } + else + { + ParodusError("Failed to get device_id\n"); + } + if(device_id != NULL) + { + free(device_id); + device_id = NULL; + } + } + + if(msg->u.event.dest != NULL) + { + notif_wrp_msg->u.event.dest = msg->u.event.dest; + ParodusPrint("destination: %s\n", notif_wrp_msg->u.event.dest); + } + + if(msg->u.event.transaction_uuid != NULL) + { + notif_wrp_msg->u.event.transaction_uuid = msg->u.event.transaction_uuid; + ParodusPrint("Notification transaction_uuid %s\n", notif_wrp_msg->u.event.transaction_uuid); + } + + if(msg->u.event.content_type != NULL) + { + notif_wrp_msg->u.event.content_type = msg->u.event.content_type; + ParodusPrint("content_type is %s\n",notif_wrp_msg->u.event.content_type); + } + + if(msg->u.event.payload != NULL) + { + ParodusInfo("Notification payload: %s\n",msg->u.event.payload); + notif_wrp_msg->u.event.payload = (void *)msg->u.event.payload; + notif_wrp_msg->u.event.payload_size = msg->u.event.payload_size; + ParodusPrint("payload size %lu\n", notif_wrp_msg->u.event.payload_size); + } + + if(msg->u.event.qos != 0) + { + notif_wrp_msg->u.event.qos = msg->u.event.qos; + qos = notif_wrp_msg->u.event.qos; + ParodusPrint("Notification qos: %d\n",notif_wrp_msg->u.event.qos); + } + msg_len = wrp_struct_to (notif_wrp_msg, WRP_BYTES, &msg_bytes); + + ParodusPrint("Encoded xmidt wrp msg, msg_len %lu\n", msg_len); + if(msg_len > 0) + { + ParodusPrint("sendUpstreamMsgToServer\n"); + printSendMsgData("sending to server", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid); + sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len); + } + else + { + ParodusError("wrp msg_len is zero\n"); + mapXmidtStatusToStatusMessage(WRP_ENCODE_FAILURE, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(msgnode, DELETE); + if(notif_wrp_msg !=NULL) + { + ParodusPrint("notif_wrp_msg->u.event.source free\n"); + if(notif_wrp_msg->u.event.source !=NULL) + { + free(notif_wrp_msg->u.event.source); + notif_wrp_msg->u.event.source = NULL; + } + ParodusPrint("notif_wrp_msg free\n"); + free(notif_wrp_msg); + notif_wrp_msg = NULL; + } + if(msg_bytes != NULL) + { + ParodusPrint("msg_bytes free\n"); + free(msg_bytes); + msg_bytes = NULL; + } + return rv; + } + + while(sendRetStatus) //If SendMessage is failed condition + { + ParodusError("sendXmidtEventToServer is Failed\n"); + if(highQosValueCheck(qos)) + { + ParodusPrint("The event is having high qos retry again\n"); + ParodusInfo("Wait till connection is Up\n"); + rv = checkCloudConn(); + if(rv == 2) + { + printSendMsgData("queue optimized during send. retry", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid); + break; + } + ParodusInfo("Received cloud status signal proceed to retry\n"); + printSendMsgData("send to server after cloud reconnect", notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid); + } + else + { + mapXmidtStatusToStatusMessage(CLIENT_DISCONNECT, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + ParodusInfo("The event is having low qos proceed to delete\n"); + printSendMsgData(errorMsg, notif_wrp_msg->u.event.qos, notif_wrp_msg->u.event.dest, notif_wrp_msg->u.event.transaction_uuid); + createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(msgnode, DELETE); + break; + } + sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len); + } + + if(sendRetStatus == 0) + { + if(highQosValueCheck(qos)) + { + ParodusPrint("High qos event send success\n"); + //when max_queue_size is 0, cloud acks will not be processed|qos is disabled. + if(get_parodus_cfg()->max_queue_size == 0 ) + { + ParodusInfo("max queue size is 0, qos semantics are disabled. send callback\n"); + mapXmidtStatusToStatusMessage(QOS_SEMANTICS_DISABLED, &errorMsg); + createOutParamsandSendAck(msg, asyncHandle, errorMsg, QOS_SEMANTICS_DISABLED, NULL, RBUS_ERROR_SUCCESS); + ParodusPrint("update state to DELETE\n"); + updateXmidtState(msgnode, DELETE); + print_xmidMsg_list(); + } + else + { + ParodusPrint("update state to SENT\n"); + //Update msg status from PENDING to SENT + updateXmidtState(msgnode, SENT); + } + print_xmidMsg_list(); + } + else + { + ParodusInfo("Low qos event, send success callback and delete\n"); + mapXmidtStatusToStatusMessage(DELIVERED_SUCCESS, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS, NULL, RBUS_ERROR_SUCCESS); + //print_xmidMsg_list(); + updateXmidtState(msgnode, DELETE); + print_xmidMsg_list(); + } + } + + if(msg_bytes != NULL) + { + free(msg_bytes); + msg_bytes = NULL; + } + ParodusPrint("sendXmidtEventToServer done\n"); + } + else + { + mapXmidtStatusToStatusMessage(MSG_PROCESSING_FAILED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + ParodusError("Memory allocation failed\n"); + createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(msgnode, DELETE); + } + + if(notif_wrp_msg !=NULL) + { + ParodusPrint("notif_wrp_msg->u.event.source free\n"); + if(notif_wrp_msg->u.event.source !=NULL) + { + free(notif_wrp_msg->u.event.source); + notif_wrp_msg->u.event.source = NULL; + } + ParodusPrint("notif_wrp_msg free\n"); + free(notif_wrp_msg); + notif_wrp_msg = NULL; + } + ParodusPrint("sendXmidtEventToServer done\n"); + return rv; +} + +void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, char *cloudsource, rbusError_t error) +{ + rbusObject_t outParams; + rbusError_t err; + rbusValue_t value; + char qosstring[20] = ""; + int ret = -1; + char sourceStr[64] = {'\0'}; + char *device_id = NULL; + size_t device_id_len = 0; + char *source = NULL; + + rbusValue_Init(&value); + rbusValue_SetString(value, "event"); + rbusObject_Init(&outParams, NULL); + rbusObject_SetValue(outParams, "msg_type", value); + rbusValue_Release(value); + + ParodusPrint("statuscode %d statusMsg %s\n", statuscode, errorMsg); + rbusValue_Init(&value); + rbusValue_SetInt32(value, statuscode); + rbusObject_SetValue(outParams, "status", value); + rbusValue_Release(value); + + if(errorMsg !=NULL) + { + rbusValue_Init(&value); + rbusValue_SetString(value, errorMsg); + rbusObject_SetValue(outParams, "error_message", value); + rbusValue_Release(value); + free(errorMsg); + } + + //update source based on status originated from cloud or parodus . + if (cloudsource != NULL) + { + source = strdup(cloudsource); + ParodusPrint("cloudsource is %s\n", cloudsource); + + } + else + { + //To get device_id in the format "mac:112233445xxx" + ret = getDeviceId(&device_id, &device_id_len); + if(ret == 0) + { + ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len); + snprintf(sourceStr, sizeof(sourceStr), "%s/%s", device_id, "parodus"); + ParodusPrint("sourceStr formed is %s\n" , sourceStr); + source = strdup(sourceStr); + } + else + { + ParodusError("Failed to get device_id\n"); + } + if(device_id != NULL) + { + free(device_id); + device_id = NULL; + } + } + + if(source != NULL) + { + ParodusInfo("source is %s\n" , source); + rbusValue_Init(&value); + rbusValue_SetString(value, source); + rbusObject_SetValue(outParams, "source", value); + rbusValue_Release(value); + free(source); + } + + if(msg != NULL) + { + if(msg->u.event.dest !=NULL) + { + rbusValue_Init(&value); + rbusValue_SetString(value, msg->u.event.dest); + rbusObject_SetValue(outParams, "dest", value); + rbusValue_Release(value); + } + + if(msg->u.event.content_type !=NULL) + { + rbusValue_Init(&value); + rbusValue_SetString(value, msg->u.event.content_type); + rbusObject_SetValue(outParams, "content_type", value); + rbusValue_Release(value); + } + + rbusValue_Init(&value); + snprintf(qosstring, sizeof(qosstring), "%d", msg->u.event.qos); + ParodusPrint("qosstring is %s\n", qosstring); + rbusValue_SetString(value, qosstring); + rbusObject_SetValue(outParams, "qos", value); + rbusValue_Release(value); + + if(msg->u.event.transaction_uuid !=NULL) + { + rbusValue_Init(&value); + rbusValue_SetString(value, msg->u.event.transaction_uuid); + rbusObject_SetValue(outParams, "transaction_uuid", value); + rbusValue_Release(value); + ParodusPrint("outParams msg->u.event.transaction_uuid %s\n", msg->u.event.transaction_uuid); + } + } + + if(outParams !=NULL) + { + //rbusObject_fwrite(outParams, 1, stdout); + if(asyncHandle == NULL) + { + ParodusError("asyncHandle is NULL\n"); + return; + } + + err = rbusMethod_SendAsyncResponse(asyncHandle, error, outParams); + + if(err != RBUS_ERROR_SUCCESS) + { + ParodusError("rbusMethod_SendAsyncResponse failed err: %d\n", err); + } + else + { + ParodusInfo("rbusMethod_SendAsyncResponse success: %d\n", err); + } + + rbusObject_Release(outParams); + } + else + { + ParodusError("Failed to create outParams\n"); + } + + return; +} + +/* +* @brief This function handles check method call from t2 before the actual inParams SET and to not proceed with inParams processing. +*/ +int checkInputParameters(rbusObject_t inParams) +{ + rbusValue_t check = rbusObject_GetValue(inParams, "check"); + if(check) + { + ParodusPrint("Rbus check method. Not proceeding to process this inparam\n"); + return 0; + } + return 1; +} + +//To generate unique transaction id for xmidt send requests +char* generate_transaction_uuid() +{ + char *transID = NULL; + uuid_t transaction_Id; + char *trans_id = NULL; + trans_id = (char *)malloc(37); + uuid_generate_random(transaction_Id); + uuid_unparse(transaction_Id, trans_id); + + if(trans_id !=NULL) + { + transID = trans_id; + } + return transID; +} + +void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg) +{ + char *msg_typeStr = NULL; + char *sourceVal = NULL; + char *destStr = NULL, *contenttypeStr = NULL; + char *payloadStr = NULL, *qosVal = NULL; + unsigned int payloadlength = 0; + wrp_msg_t *msg = NULL; + + msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) ); + if(msg == NULL) + { + ParodusError("Wrp msg allocation failed\n"); + return; + } + + memset( msg, 0, sizeof( wrp_msg_t ) ); + + rbusValue_t msg_type = rbusObject_GetValue(inParams, "msg_type"); + if(msg_type) + { + if(rbusValue_GetType(msg_type) == RBUS_STRING) + { + msg_typeStr = (char *) rbusValue_GetString(msg_type, NULL); + ParodusPrint("msg_type value received is %s\n", msg_typeStr); + if(msg_typeStr !=NULL) + { + if(strcmp(msg_typeStr, "event") ==0) + { + msg->msg_type = WRP_MSG_TYPE__EVENT; + } + else + { + ParodusError("msg_type received is not event : %s\n", msg_typeStr); + } + } + } + } + else + { + ParodusError("msg_type is empty\n"); + } + + rbusValue_t source = rbusObject_GetValue(inParams, "source"); + if(source) + { + if(rbusValue_GetType(source) == RBUS_STRING) + { + sourceVal = (char *)rbusValue_GetString(source, NULL); + if(sourceVal !=NULL) + { + ParodusInfo("source value received is %s\n", sourceVal); + msg->u.event.source = strdup(sourceVal); + } + ParodusPrint("msg->u.event.source is %s\n", msg->u.event.source); + } + } + else + { + ParodusError("source is empty\n"); + } + + rbusValue_t dest = rbusObject_GetValue(inParams, "dest"); + if(dest) + { + if(rbusValue_GetType(dest) == RBUS_STRING) + { + destStr = (char *)rbusValue_GetString(dest, NULL); + if(destStr !=NULL) + { + ParodusPrint("dest value received is %s\n", destStr); + msg->u.event.dest = strdup(destStr); + ParodusPrint("msg->u.event.dest is %s\n", msg->u.event.dest); + } + } + } + else + { + ParodusError("dest is empty\n"); + } + + rbusValue_t contenttype = rbusObject_GetValue(inParams, "content_type"); + if(contenttype) + { + if(rbusValue_GetType(contenttype) == RBUS_STRING) + { + contenttypeStr = (char *)rbusValue_GetString(contenttype, NULL); + if(contenttypeStr !=NULL) + { + ParodusPrint("contenttype value received is %s\n", contenttypeStr); + msg->u.event.content_type = strdup(contenttypeStr); + ParodusPrint("msg->u.event.content_type is %s\n", msg->u.event.content_type); + } + } + } + else + { + ParodusError("contenttype is empty\n"); + } + + rbusValue_t payload = rbusObject_GetValue(inParams, "payload"); + if(payload) + { + if((rbusValue_GetType(payload) == RBUS_STRING)) + { + payloadStr = (char *)rbusValue_GetString(payload, NULL); + if(payloadStr !=NULL) + { + ParodusPrint("payload received is %s\n", payloadStr); + msg->u.event.payload = strdup(payloadStr); + ParodusPrint("msg->u.event.payload is %s\n", msg->u.event.payload); + } + else + { + ParodusError("payloadStr is empty\n"); + } + } + } + else + { + ParodusError("payload is empty\n"); + } + + rbusValue_t payloadlen = rbusObject_GetValue(inParams, "payloadlen"); + if(payloadlen) + { + if(rbusValue_GetType(payloadlen) == RBUS_INT32) + { + ParodusPrint("payloadlen type %d RBUS_INT32 %d\n", rbusValue_GetType(payloadlen), RBUS_INT32); + payloadlength = rbusValue_GetInt32(payloadlen); + ParodusPrint("payloadlen received is %lu\n", payloadlength); + msg->u.event.payload_size = (size_t) payloadlength; + ParodusPrint("msg->u.event.payload_size is %lu\n", msg->u.event.payload_size); + } + } + else + { + ParodusError("payloadlen is empty\n"); + } + + ParodusPrint("check qos\n"); + rbusValue_t qos = rbusObject_GetValue(inParams, "qos"); + if(qos) + { + if(rbusValue_GetType(qos) == RBUS_STRING) + { + ParodusPrint("qos type %d RBUS_STRING %d\n", rbusValue_GetType(qos), RBUS_STRING); + qosVal = (char *)rbusValue_GetString(qos, NULL); + ParodusPrint("qos received is %s\n", qosVal); + if(qosVal !=NULL) + { + msg->u.event.qos = atoi(qosVal); + ParodusPrint("msg->u.event.qos is %d\n", msg->u.event.qos); + } + } + } + + if(trans_id !=NULL) + { + ParodusPrint("Add trans_id %s to wrp message\n", trans_id); + msg->u.event.transaction_uuid = strdup(trans_id); + free(trans_id); + trans_id = NULL; + ParodusPrint("msg->u.event.transaction_uuid is %s\n", msg->u.event.transaction_uuid); + } + else + { + ParodusError("transaction_uuid is empty\n"); + } + + *eventMsg = msg; + ParodusPrint("parseRbusInparamsToWrp End\n"); +} + +static rbusError_t sendDataHandler(rbusHandle_t handle, char const* methodName, rbusObject_t inParams, rbusObject_t outParams, rbusMethodAsyncHandle_t asyncHandle) +{ + (void) handle; + (void) outParams; + int inStatus = 0; + char *transaction_uuid = NULL; + wrp_msg_t *wrpMsg= NULL; + ParodusInfo("methodHandler called: %s\n", methodName); + //printRBUSParams(inParams, INPARAMS_PATH); + + if((methodName !=NULL) && (strcmp(methodName, XMIDT_SEND_METHOD) == 0)) + { + inStatus = checkInputParameters(inParams); + if(inStatus) + { + //generate transaction id to create outParams and send ack + transaction_uuid = generate_transaction_uuid(); + ParodusInfo("xmidt transaction_uuid generated is %s\n", transaction_uuid); + parseRbusInparamsToWrp(inParams, transaction_uuid, &wrpMsg); + + //xmidt send producer + addToXmidtUpstreamQ(wrpMsg, asyncHandle); + ParodusPrint("sendDataHandler returned %d\n", RBUS_ERROR_ASYNC_RESPONSE); + return RBUS_ERROR_ASYNC_RESPONSE; + } + else + { + ParodusPrint("check method call received, ignoring input\n"); + } + } + else + { + ParodusError("Method %s received is not supported\n", methodName); + return RBUS_ERROR_BUS_ERROR; + } + ParodusPrint("send RBUS_ERROR_SUCCESS\n"); + return RBUS_ERROR_SUCCESS; +} + + +int regXmidtSendDataMethod() +{ + int rc = RBUS_ERROR_SUCCESS; + rbusDataElement_t dataElements[1] = { { XMIDT_SEND_METHOD, RBUS_ELEMENT_TYPE_METHOD, { NULL, NULL, NULL, NULL, NULL, sendDataHandler } } }; + + rbusHandle_t rbus_handle = get_parodus_rbus_Handle(); + + ParodusPrint("Registering xmidt sendData method %s\n", XMIDT_SEND_METHOD); + if(!rbus_handle) + { + ParodusError("regXmidtSendDataMethod failed as rbus_handle is empty\n"); + return -1; + } + + rc = rbus_regDataElements(rbus_handle, 1, dataElements); + + if(rc != RBUS_ERROR_SUCCESS) + { + ParodusError("Register xmidt sendData method failed: %d\n", rc); + } + else + { + ParodusInfo("Register xmidt sendData method %s success\n", XMIDT_SEND_METHOD); + + //start xmidt queue consumer thread . + processXmidtData(); + } + return rc; +} + +//To print and store params output to a file +void printRBUSParams(rbusObject_t params, char* file_path) +{ + if( NULL != params ) + { + FILE *fd = fopen(file_path, "w+"); + rbusObject_fwrite(params, 1, fd); + fclose(fd); + } + else + { + ParodusError("Params is NULL\n"); + } +} + +/* + * @brief To store downstream cloud ack messages in a list for further processing. + */ +void addToCloudAckQ(char *trans_id, int qos, int rdr, char *source) +{ + CloudAck *ackmsg; + + ParodusPrint ("Add downstream message to CloudAck list\n"); + ackmsg = (CloudAck *)malloc(sizeof(CloudAck)); + + if(ackmsg) + { + ackmsg->transaction_id = strdup(trans_id); + ackmsg->qos = qos; + ackmsg->rdr =rdr; + ackmsg->source= strdup(source); + ParodusPrint("ackmsg->transaction_id %s ackmsg->qos %d ackmsg->rdr %d ackmsg->source %s\n", ackmsg->transaction_id,ackmsg->qos,ackmsg->rdr,ackmsg->source ); + ackmsg->next=NULL; + pthread_mutex_lock (&cloudack_mut); + if(g_cloudackHead == NULL) + { + g_cloudackHead = ackmsg; + + ParodusPrint("Producer added cloud ack msg to Q\n"); + pthread_mutex_unlock (&cloudack_mut); + ParodusPrint("mutex unlock in cloud ack producer\n"); + } + else + { + CloudAck *temp = g_cloudackHead; + while(temp->next) + { + temp = temp->next; + } + temp->next = ackmsg; + pthread_mutex_unlock (&cloudack_mut); + } + } + else + { + ParodusError("failure in allocation for cloud ack\n"); + } + return; +} + +//Check cloud ack and send rbus callback to caller based on transaction id. +int checkCloudACK(XmidtMsg *xmdnode, rbusMethodAsyncHandle_t asyncHandle) +{ + wrp_msg_t *xmdMsg; + char *xmdMsgTransID = NULL; + char * errorMsg = NULL; + CloudAck *cloudnode = NULL; + + if(xmdnode != NULL) + { + xmdMsg = xmdnode->msg; + if(xmdMsg != NULL) + { + xmdMsgTransID = xmdMsg->u.event.transaction_uuid; + ParodusPrint("xmdMsgTransID %s\n", xmdMsgTransID); + } + else + { + ParodusError("xmdMsg is NULL\n"); + return 0; + } + } + + cloudnode = get_global_cloud_node(); + + while (NULL != cloudnode) + { + ParodusPrint("cloudnode->transaction_id %s cloudnode->qos %d cloudnode->rdr %d cloudnode->source %s\n", cloudnode->transaction_id,cloudnode->qos,cloudnode->rdr, cloudnode->source); + if(xmdMsgTransID != NULL && cloudnode->transaction_id != NULL) + { + if( strcmp(xmdMsgTransID, cloudnode->transaction_id) == 0) + { + ParodusInfo("transaction_id %s is matching, send callback\n", xmdMsgTransID); + mapXmidtStatusToStatusMessage(cloudnode->rdr, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + if(cloudnode->rdr == 0) + { + createOutParamsandSendAck(xmdMsg, asyncHandle, errorMsg, cloudnode->rdr, cloudnode->source, RBUS_ERROR_SUCCESS); + } + else + { + createOutParamsandSendAck(xmdMsg, asyncHandle, errorMsg, cloudnode->rdr, cloudnode->source, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + } + ParodusPrint("set xmidt msg to DELETE state as cloud ack is processed\n"); + updateXmidtState(xmdnode, DELETE); + print_xmidMsg_list(); + ParodusPrint("delete cloudACK cloudnode\n"); + deleteCloudACKNode(cloudnode->transaction_id); + ParodusPrint("checkCloudACK returns success\n"); + return 1; + } + else + { + ParodusError("transaction_id %s match not found\n", xmdMsgTransID); + } + } + cloudnode= cloudnode->next; + } + ParodusPrint("checkCloudACK returns failure\n"); + return 0; +} + +//To update state of the msg node that is currently being processed. +int updateXmidtState(XmidtMsg * temp, int state) +{ + struct timespec ts; + if(temp == NULL) + { + ParodusError("XmidtMsg is NULL, updateXmidtState failed\n"); + return 0; + } + else + { + ParodusPrint("state to be updated %d\n", state); + ParodusPrint("node is pointing to temp->state %d\n",temp->state); + pthread_mutex_lock (&xmidt_mut); + temp->state = state; + if(state != DELETE) + { + getCurrentTime(&ts); + temp->sentTime = (long long)ts.tv_sec; + ParodusPrint("updated temp->sentTime %lld\n", temp->sentTime); + } + ParodusPrint("msgnode is updated with state %d sentTime %lld\n", temp->state, temp->sentTime); + pthread_mutex_unlock (&xmidt_mut); + return 1; + } +} + +void print_xmidMsg_list() +{ + XmidtMsg *temp = NULL; + temp = get_global_xmidthead(); + while (NULL != temp) + { + wrp_msg_t *xmdMsg = temp->msg; + ParodusPrint("node is pointing to xmdMsg transid %s temp->state %d temp->enqueueTime %lld temp->sentTime %lld\n", xmdMsg->u.event.transaction_uuid, temp->state, temp->enqueueTime, temp->sentTime); + temp= temp->next; + } + ParodusPrint("print_xmidMsg_list done\n"); + return; +} + +//delete matching cloud ack entry +int deleteCloudACKNode(char* trans_id) +{ + CloudAck *prev_node = NULL, *curr_node = NULL; + + if( NULL == trans_id ) + { + ParodusError("Invalid value for trans_id\n"); + return 0; + } + ParodusPrint("cloud ack to be deleted with trans_id: %s\n", trans_id); + + prev_node = NULL; + pthread_mutex_lock (&cloudack_mut); + curr_node = g_cloudackHead ; + // Traverse to get the msg to be deleted + while( NULL != curr_node ) + { + if(strcmp(curr_node->transaction_id, trans_id) == 0) + { + ParodusPrint("Found the node to delete\n"); + if( NULL == prev_node ) + { + ParodusPrint("need to delete first doc\n"); + g_cloudackHead = curr_node->next; + } + else + { + ParodusPrint("Traversing to find node\n"); + prev_node->next = curr_node->next; + + } + + ParodusPrint("Deleting the node entries\n"); + if(curr_node->transaction_id !=NULL) + { + free( curr_node->transaction_id ); + curr_node->transaction_id = NULL; + } + if(curr_node->source !=NULL) + { + free( curr_node->source ); + curr_node->source = NULL; + } + if(curr_node != NULL) + { + free( curr_node ); + } + curr_node = NULL; + ParodusPrint("Deleted successfully and returning..\n"); + pthread_mutex_unlock (&cloudack_mut); + return 1; + } + + prev_node = curr_node; + curr_node = curr_node->next; + } + pthread_mutex_unlock (&cloudack_mut); + ParodusError("Could not find the entry to delete from list\n"); + return 0; +} + +//Delete msg from XmidtQ +int deleteFromXmidtQ(XmidtMsg **next_node) +{ + XmidtMsg *prev_node = NULL, *curr_node = NULL; + + prev_node = NULL; + pthread_mutex_lock (&xmidt_mut); + curr_node = XmidtMsgQ ; + // Traverse to get the node with DELETE state which needs to be deleted + while( NULL != curr_node ) + { + ParodusPrint("curr_node->state %d\n" , curr_node->state); + if(curr_node->state == DELETE) + { + ParodusPrint("Found the node to delete\n"); + if( NULL == prev_node ) + { + ParodusPrint("need to delete first doc\n"); + XmidtMsgQ = curr_node->next; + } + else + { + ParodusPrint("Traversing to find node\n"); + prev_node->next = curr_node->next; + *next_node = curr_node->next; + + } + + wrp_msg_t *xmdMsg = curr_node->msg; + if (xmdMsg) + { + ParodusInfo("Delete xmidt node with transid %s\n", xmdMsg->u.event.transaction_uuid); + } + wrp_free_struct( curr_node->msg); + curr_node->msg = NULL; + if(curr_node !=NULL) + { + free( curr_node ); + curr_node = NULL; + } + ParodusPrint("Deleted successfully and returning..\n"); + pthread_mutex_unlock (&xmidt_mut); + decrement_XmidtQsize(); + ParodusInfo("XmidtQsize after delete is %d\n", get_XmidtQsize()); + return 1; + } + + prev_node = curr_node; + curr_node = curr_node->next; + } + pthread_mutex_unlock (&xmidt_mut); + ParodusError("Could not find the entry to delete from list\n"); + return 0; +} + +//check if message is expired based on each qos and set to delete state. +void checkMsgExpiry(XmidtMsg *xmdMsg) +{ + long long currTime = 0; + struct timespec ts; + char *errorMsg = NULL; + + XmidtMsg *temp = NULL; + temp = xmdMsg; + + if(temp != NULL) + { + getCurrentTime(&ts); + currTime= (long long)ts.tv_sec; + wrp_msg_t * tempMsg = temp->msg; + ParodusPrint("qos %d currTime %lu enqueueTime %lu\n", tempMsg->u.event.qos, currTime, temp->enqueueTime); + if(temp->state == DELETE) + { + ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid); + return; + } + + if(tempMsg->u.event.qos > 74) + { + ParodusPrint("Critical Qos, check if expiry of 30 mins reached\n"); + if((currTime - temp->enqueueTime) > CRITICAL_QOS_EXPIRE_TIME) + { + ParodusInfo("Critical qos 30 mins expired, set to DELETE. qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + //rbus callback to caller + mapXmidtStatusToStatusMessage(MSG_EXPIRED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, MSG_EXPIRED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(temp, DELETE); + } + } + else if (tempMsg->u.event.qos > 49) + { + ParodusPrint("High Qos, check if expiry of 25 mins reached\n"); + if((currTime - temp->enqueueTime) > HIGH_QOS_EXPIRE_TIME) + { + ParodusInfo("High qos 25 mins expired, set to DELETE. qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + //rbus callback to caller + mapXmidtStatusToStatusMessage(MSG_EXPIRED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, MSG_EXPIRED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(temp, DELETE); + } + } + else if (tempMsg->u.event.qos > 24) + { + ParodusPrint("Medium Qos, check if expiry of 20 mins reached\n"); + if((currTime - temp->enqueueTime) > MEDIUM_QOS_EXPIRE_TIME) + { + ParodusInfo("Medium qos 20 mins expired, set to DELETE. qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + //rbus callback to caller + mapXmidtStatusToStatusMessage(MSG_EXPIRED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, MSG_EXPIRED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(temp, DELETE); + } + } + else if (tempMsg->u.event.qos >= 0) + { + ParodusPrint("Low Qos, check if expiry of 15 mins reached\n"); + if((currTime - temp->enqueueTime) > LOW_QOS_EXPIRE_TIME) + { + ParodusInfo("Low qos 15 mins expired, set to DELETE. qos %d transid %s\n", tempMsg->u.event.qos, tempMsg->u.event.transaction_uuid); + //rbus callback to caller + mapXmidtStatusToStatusMessage(MSG_EXPIRED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, MSG_EXPIRED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(temp, DELETE); + } + } + else + { + ParodusError("Invalid qos\n"); + } + } +} + +//To delete low qos messages from queue when max queue limit is reached. +void checkMaxQandOptimize(XmidtMsg *xmdMsg) +{ + int qos = 0; + + ParodusPrint("checkMaxQandOptimize . XmidtQsize is %d\n" , get_XmidtQsize()); + if(get_XmidtQsize() > 0 && get_XmidtQsize() == get_parodus_cfg()->max_queue_size) + { + ParodusPrint("Max Queue size reached, check and optimize\n"); + + //Traverse through XmidtMsgQ list and set low qos msgs to DELETE + XmidtMsg *temp = NULL; + temp = xmdMsg; + + if (temp != NULL) + { + wrp_msg_t * tempMsg = temp->msg; + qos = tempMsg->u.event.qos; + ParodusPrint("qos is %d\n", qos); + if(highQosValueCheck(qos)) + { + ParodusPrint("High qos msg, skip delete\n"); + } + else + { + //Skip max queue callback when msg is already in DELETE state. + if( temp->state == DELETE) + { + ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid); + } + else + { + ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos); + //rbus callback to caller + char *errorMsg = NULL; + mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg); + ParodusPrint("statusMsg is %s\n",errorMsg); + createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION); + updateXmidtState(temp, DELETE); + } + } + } + } +} + +//map xmidt status and rdr response to status message +void mapXmidtStatusToStatusMessage(int status, char **message) +{ + char *result = NULL; + + if (status == DELIVERED_SUCCESS) + { + result = strdup("Delivered (success)"); + } + else if (status == INVALID_MSG_TYPE) + { + result = strdup("Message format is invalid"); + } + else if (status == MISSING_SOURCE) + { + result = strdup("Missing source"); + } + else if (status == MISSING_DEST) + { + result = strdup("Missing dest"); + } + else if (status == MISSING_CONTENT_TYPE) + { + result = strdup("Missing content_type"); + } + else if (status == MISSING_PAYLOAD) + { + result = strdup("Missing payload"); + } + else if (status == MISSING_PAYLOADLEN) + { + result = strdup("Missing payloadlen"); + } + else if (status == INVALID_CONTENT_TYPE) + { + result = strdup("Invalid content_type"); + } + else if (status == ENQUEUE_FAILURE) + { + result = strdup("Unable to enqueue"); + } + else if (status == CLIENT_DISCONNECT) + { + result = strdup("Send failed due to client disconnect"); + } + else if (status == QUEUE_SIZE_EXCEEDED) + { + result = strdup("Max Queue Size Exceeded"); + } + else if (status == WRP_ENCODE_FAILURE) + { + result = strdup("Wrp message encoding failed"); + } + else if (status == MSG_PROCESSING_FAILED) + { + result = strdup("Memory allocation failed"); + } + else if (status == QOS_SEMANTICS_DISABLED) + { + result = strdup("Send to server, qos semantics are disabled"); + } + else if (status == MSG_EXPIRED) + { + result = strdup("Message expired"); + } + else if (status == QUEUE_OPTIMIZED) + { + result = strdup("Message deleted after queue optimized"); + } + else + { + result = strdup("Unknown Error"); + } + ParodusInfo("Xmidt status message: %s\n", result); + *message = result; +} diff --git a/src/xmidtsend_rbus.h b/src/xmidtsend_rbus.h new file mode 100644 index 0000000..a190d2d --- /dev/null +++ b/src/xmidtsend_rbus.h @@ -0,0 +1,127 @@ +/** + * Copyright 2022 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * @file xmidtsend_rbus.h + * + * @description This header defines functions required to manage xmidt send messages via rbus. + * + */ + +#ifndef _XMIDTSEND_RBUS_H_ +#define _XMIDTSEND_RBUS_H_ +#include +#include "config.h" +#include +#ifdef __cplusplus +extern "C" { +#endif + +#define XMIDT_SEND_METHOD "Device.X_RDK_Xmidt.SendData" +#define INPARAMS_PATH "/tmp/inparams.txt" + +#define CLOUD_ACK_TIMEOUT_SEC 7 +#define CRITICAL_QOS_EXPIRE_TIME 30*60 +#define HIGH_QOS_EXPIRE_TIME 25*60 +#define MEDIUM_QOS_EXPIRE_TIME 20*60 +#define LOW_QOS_EXPIRE_TIME 15*60 + +#define EXPIRY_CHECK_TIME 5*60 //To check expiry in every 5 mins when cloud connection is down. +/*----------------------------------------------------------------------------*/ +/* Data Structures */ +/*----------------------------------------------------------------------------*/ +typedef struct XmidtMsg__ +{ + wrp_msg_t *msg; + rbusMethodAsyncHandle_t asyncHandle; + int state; + long long enqueueTime; + long long sentTime; + struct XmidtMsg__ *next; +} XmidtMsg; + +typedef struct CloudAck__ +{ + char *transaction_id; + int qos; + int rdr; + char *source; + struct CloudAck__ *next; +} CloudAck; + +typedef enum +{ + DELIVERED_SUCCESS = 0, + INVALID_MSG_TYPE, + MISSING_SOURCE, + MISSING_DEST, + MISSING_CONTENT_TYPE, + MISSING_PAYLOAD, + MISSING_PAYLOADLEN, + INVALID_CONTENT_TYPE, + ENQUEUE_FAILURE = 100, + CLIENT_DISCONNECT = 101, + QUEUE_SIZE_EXCEEDED = 102, + WRP_ENCODE_FAILURE = 103, + MSG_PROCESSING_FAILED = 104, + QOS_SEMANTICS_DISABLED = 105, + MSG_EXPIRED = 106, + QUEUE_OPTIMIZED = 107 +} XMIDT_STATUS; + +typedef enum +{ + PENDING = 0, + SENT, + DELETE +} MSG_STATUS; +/*----------------------------------------------------------------------------*/ +/* Function Prototypes */ +/*----------------------------------------------------------------------------*/ + +rbusHandle_t get_parodus_rbus_Handle(void); +void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle); +void* processXmidtUpstreamMsg(); +void processXmidtData(); +int processData(XmidtMsg *Datanode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle); +int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle); +int checkInputParameters(rbusObject_t inParams); +char* generate_transaction_uuid(); +void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg); +void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, char *cloudsource, rbusError_t error); +int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode); +void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid); +bool highQosValueCheck(int qos); +void waitTillConnectionIsUp(); +void printRBUSParams(rbusObject_t params, char* file_path); +void addToCloudAckQ(char *transaction_id, int qos, int rdr, char *source); +int checkCloudACK(XmidtMsg *xmdnode, rbusMethodAsyncHandle_t asyncHandle); +int updateXmidtState(XmidtMsg * temp, int state); +void print_xmidMsg_list(); +int deleteCloudACKNode(char* trans_id); +int deleteFromXmidtQ(XmidtMsg **next_node); +int checkCloudConn(); +void checkMaxQandOptimize(XmidtMsg *xmdMsg); +void checkMsgExpiry(XmidtMsg *xmdMsg); +void mapXmidtStatusToStatusMessage(int status, char **message); +int xmidtQOptmize(); +#ifdef __cplusplus +} +#endif + + +#endif /* _XMIDTSEND_RBUS_H_ */ + diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f1a9f9d..4c8a2c4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -32,6 +32,10 @@ if (FEATURE_DNS_QUERY) set (PARODUS_COMMON_LIBS ${PARODUS_COMMON_LIBS} -lucresolv -lresolv) endif (FEATURE_DNS_QUERY) +if (ENABLE_WEBCFGBIN) +set (PARODUS_COMMON_LIBS ${PARODUS_COMMON_LIBS} -lrbus) +endif (ENABLE_WEBCFGBIN) + if(NOT DISABLE_VALGRIND) set (MEMORY_CHECK valgrind --leak-check=full --show-reachable=yes -v) endif () @@ -42,7 +46,7 @@ link_directories ( ${LIBRARY_DIR} ) # test_heartBeatTimer #------------------------------------------------------------------------------- add_test(NAME test_heartBeatTimer COMMAND ${MEMORY_CHECK} ./test_heartBeatTimer) -add_executable(test_heartBeatTimer test_heartBeatTimer.c ../src/heartBeat.c) +add_executable(test_heartBeatTimer test_heartBeatTimer.c ../src/heartBeat.c ../src/time.c) target_link_libraries (test_heartBeatTimer ${PARODUS_COMMON_LIBS} -lcmocka) #------------------------------------------------------------------------------- @@ -57,7 +61,10 @@ target_link_libraries (test_close_retry ${PARODUS_COMMON_LIBS} -lcmocka) #------------------------------------------------------------------------------- add_test(NAME test_mutex COMMAND ${MEMORY_CHECK} ./test_mutex) add_executable(test_mutex test_mutex.c ../src/mutex.c) -target_link_libraries (test_mutex ${PARODUS_COMMON_LIBS} -lcmocka) +target_link_libraries (test_mutex gcov -lcunit -lcimplog -lwrp-c + -luuid -lmsgpackc -lnopoll -lnanomsg -lpthread + -Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64 + -lssl -lcrypto -lrt -lm -lcmocka) #------------------------------------------------------------------------------- # test_networking @@ -70,8 +77,8 @@ target_link_libraries (test_networking ${PARODUS_COMMON_LIBS}) # test_nopoll_helpers #------------------------------------------------------------------------------- add_test(NAME test_nopoll_helpers COMMAND ${MEMORY_CHECK} ./test_nopoll_helpers) -add_executable(test_nopoll_helpers test_nopoll_helpers.c ../src/nopoll_helpers.c) -target_link_libraries (test_nopoll_helpers -Wl,--no-as-needed -lrt -lcmocka -lcimplog -lnopoll) +add_executable(test_nopoll_helpers test_nopoll_helpers.c ../src/nopoll_helpers.c ../src/string_helpers.c ../src/config.c) +target_link_libraries (test_nopoll_helpers -Wl,--no-as-needed -lrt -lcmocka -lcimplog -lnopoll ${PARODUS_COMMON_LIBS}) #------------------------------------------------------------------------------- # test_time @@ -105,7 +112,7 @@ target_link_libraries (test_spin_thread_e ${PARODUS_COMMON_LIBS} ) # test_nopoll_handlers #------------------------------------------------------------------------------- add_test(NAME test_nopoll_handlers COMMAND ${MEMORY_CHECK} ./test_nopoll_handlers) -add_executable(test_nopoll_handlers test_nopoll_handlers.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c) +add_executable(test_nopoll_handlers test_nopoll_handlers.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c ../src/time.c) target_link_libraries (test_nopoll_handlers -lnopoll -lcunit -lcimplog -Wl,--no-as-needed -lrt -lpthread -lm) @@ -113,7 +120,7 @@ target_link_libraries (test_nopoll_handlers -lnopoll -lcunit -lcimplog -Wl,--no- # test_nopoll_handlers_fragment #------------------------------------------------------------------------------- add_test(NAME test_nopoll_handlers_fragment COMMAND ${MEMORY_CHECK} ./test_nopoll_handlers_fragment) -add_executable(test_nopoll_handlers_fragment test_nopoll_handlers_fragment.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c) +add_executable(test_nopoll_handlers_fragment test_nopoll_handlers_fragment.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c ../src/time.c) target_link_libraries (test_nopoll_handlers_fragment -lnopoll -lcunit -lcimplog -Wl,--no-as-needed -lrt -lpthread -lm -lcmocka) #------------------------------------------------------------------------------- @@ -154,7 +161,11 @@ set(CLIST_SRC ${CLIST_SRC} ../src/seshat_interface.c) else() set(CLIST_SRC ${CLIST_SRC} ../src/seshat_interface_stub.c) endif (ENABLE_SESHAT) - + +if (ENABLE_WEBCFGBIN) +set(CLIST_SRC ${CLIST_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c) +endif (ENABLE_WEBCFGBIN) + add_executable(test_client_list ${CLIST_SRC}) #target_link_libraries (test_client_list ${PARODUS_CONN_LIBS} ${PARODUS_COMMON_LIBS}) target_link_libraries (test_client_list ${PARODUS_COMMON_LIBS} -lcurl -luuid) @@ -172,6 +183,10 @@ else() set(SVA_SRC ${SVA_SRC} ../src/seshat_interface_stub.c) endif (ENABLE_SESHAT) +if (ENABLE_WEBCFGBIN) +set(SVA_SRC ${SVA_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c) +endif (ENABLE_WEBCFGBIN) + add_executable(test_service_alive ${SVA_SRC}) #target_link_libraries (test_service_alive ${PARODUS_CONN_LIBS} ${PARODUS_COMMON_LIBS}) target_link_libraries (test_service_alive ${PARODUS_COMMON_LIBS} -lcurl -luuid) @@ -200,7 +215,7 @@ target_link_libraries (test_auth_token -lcmocka # test_auth_token_more #------------------------------------------------------------------------------- add_test(NAME test_auth_token_more COMMAND ${MEMORY_CHECK} ./test_auth_token_more) -add_executable(test_auth_token_more test_auth_token_more.c ../src/config.c ../src/auth_token.c ../src/string_helpers.c) +add_executable(test_auth_token_more test_auth_token_more.c ../src/config.c ../src/auth_token.c ../src/string_helpers.c ../src/config.c) target_link_libraries (test_auth_token_more -lcmocka -Wl,--no-as-needed -lcimplog -lcjson -lcjwt -ltrower-base64 -lssl -lcrypto -lrt -lm -lcurl -luuid @@ -253,7 +268,7 @@ target_link_libraries (test_upstream_sock -lcmocka gcov -lcunit -lcimplog # test_downstream #------------------------------------------------------------------------------- add_test(NAME test_downstream COMMAND ${MEMORY_CHECK} ./test_downstream) -add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c) +add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c ../src/config.c) target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog -lwrp-c -luuid -lpthread -lmsgpackc -lnopoll -Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64 @@ -263,7 +278,7 @@ target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog # test_downstream_more #------------------------------------------------------------------------------- add_test(NAME test_downstream_more COMMAND ${MEMORY_CHECK} ./test_downstream_more) -add_executable(test_downstream_more test_downstream_more.c ../src/downstream.c ../src/string_helpers.c) +add_executable(test_downstream_more test_downstream_more.c ../src/downstream.c ../src/string_helpers.c ../src/config.c) target_link_libraries (test_downstream_more -lcmocka ${PARODUS_COMMON_LIBS} ) #------------------------------------------------------------------------------- @@ -284,6 +299,7 @@ set(CONIFC_SRC test_conn_interface.c ../src/token.c ../src/string_helpers.c ../src/mutex.c + ../src/time.c ../src/heartBeat.c ../src/close_retry.c ../src/event_handler.c @@ -294,6 +310,9 @@ set(CONIFC_SRC ${CONIFC_SRC} ../src/seshat_interface.c) else() set(CONIFC_SRC ${CONIFC_SRC} ../src/seshat_interface_stub.c) endif (ENABLE_SESHAT) +if (ENABLE_WEBCFGBIN) +set(CONIFC_SRC ${CONIFC_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c) +endif (ENABLE_WEBCFGBIN) add_executable(test_conn_interface ${CONIFC_SRC}) target_link_libraries (test_conn_interface -lcmocka ${PARODUS_COMMON_LIBS} -lcurl -luuid ) @@ -340,6 +359,10 @@ else() set(TOKEN_SRC test_token_stub.c ${TOKEN_SRC}) endif (FEATURE_DNS_QUERY) +if (ENABLE_WEBCFGBIN) +set(TOKEN_SRC ${TOKEN_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c) +endif (ENABLE_WEBCFGBIN) + add_executable(test_token ${TOKEN_SRC} ) #target_link_libraries (test_token ${PARODUS_COMMON_LIBS} ${PARODUS_JWT_LIBS} -lcmocka ) target_link_libraries (test_token ${PARODUS_COMMON_LIBS} -lcmocka -lcurl -luuid) diff --git a/tests/simple_connection.c b/tests/simple_connection.c index 4fcf5a5..35c6538 100644 --- a/tests/simple_connection.c +++ b/tests/simple_connection.c @@ -92,7 +92,7 @@ void test_set_parodus_cfg() CU_ASSERT_STRING_EQUAL(cfg.hw_last_reboot_reason,get_parodus_cfg()->hw_last_reboot_reason); CU_ASSERT_STRING_EQUAL(cfg.fw_name,get_parodus_cfg()->fw_name); CU_ASSERT_STRING_EQUAL(cfg.webpa_url, get_parodus_cfg()->webpa_url); - CU_ASSERT_STRING_EQUAL(cfg.webpa_interface_used , get_parodus_cfg()->webpa_interface_used); + CU_ASSERT_STRING_EQUAL(cfg.webpa_interface_used , getWebpaInterface()); CU_ASSERT_STRING_EQUAL(cfg.webpa_protocol, get_parodus_cfg()->webpa_protocol); CU_ASSERT_EQUAL(cfg.boot_time, get_parodus_cfg()->boot_time); CU_ASSERT_EQUAL(cfg.webpa_ping_timeout, get_parodus_cfg()->webpa_ping_timeout); @@ -113,7 +113,7 @@ void test_getWebpaConveyHeader() CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->hw_manufacturer, cJSON_GetObjectItem(payload, HW_MANUFACTURER)->valuestring); CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->hw_last_reboot_reason, cJSON_GetObjectItem(payload, HW_LAST_REBOOT_REASON)->valuestring); CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->fw_name, cJSON_GetObjectItem(payload, FIRMWARE_NAME)->valuestring); - CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->webpa_interface_used, cJSON_GetObjectItem(payload, WEBPA_INTERFACE)->valuestring); + CU_ASSERT_STRING_EQUAL(getWebpaInterface(), cJSON_GetObjectItem(payload, WEBPA_INTERFACE)->valuestring); CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->webpa_protocol, cJSON_GetObjectItem(payload, WEBPA_PROTOCOL)->valuestring); CU_ASSERT_EQUAL((int)get_parodus_cfg()->boot_time, cJSON_GetObjectItem(payload, BOOT_TIME)->valueint); diff --git a/tests/test_config.c b/tests/test_config.c index 2cad776..3bf4371 100644 --- a/tests/test_config.c +++ b/tests/test_config.c @@ -29,6 +29,7 @@ #include "../src/ParodusInternal.h" extern int parse_mac_address (char *target, const char *arg); +extern int parse_serial_num(char *target, const char *arg); extern int server_is_http (const char *full_url, const char **server_ptr); extern int parse_webpa_url__(const char *full_url, @@ -103,7 +104,7 @@ void test_setParodusConfig() assert_string_equal(cfg.hw_last_reboot_reason, temp->hw_last_reboot_reason); assert_string_equal(cfg.webpa_path_url, temp->webpa_path_url); assert_string_equal(cfg.webpa_url, temp->webpa_url); - assert_string_equal(cfg.webpa_interface_used, temp->webpa_interface_used); + assert_string_equal(cfg.webpa_interface_used, getWebpaInterface()); assert_string_equal(cfg.webpa_protocol, temp->webpa_protocol); assert_string_equal(cfg.webpa_uuid, temp->webpa_uuid); assert_string_equal(cfg.partner_id, temp->partner_id); @@ -208,13 +209,11 @@ void test_parseCommandLine() ParodusCfg parodusCfg; memset(&parodusCfg,0,sizeof(parodusCfg)); - #ifdef FEATURE_DNS_QUERY write_key_to_file ("../../tests/jwt_key.tst", jwt_key); #endif create_token_script("/tmp/token.sh"); assert_int_equal (parseCommandLine(argc,command,&parodusCfg), 0); - assert_string_equal( parodusCfg.hw_model, "TG1682"); assert_string_equal( parodusCfg.hw_serial_number, "Fer23u948590"); assert_string_equal( parodusCfg.hw_manufacturer, "ARRISGroup,Inc."); @@ -469,6 +468,14 @@ void test_parse_mac_address () assert_int_equal (parse_mac_address (result, ""), -1); } +void test_parse_serial_num() +{ + char result[14]; + assert_int_equal (parse_serial_num (result, "1234ABC00ab"), 0); + assert_int_equal (parse_serial_num (result, "$@@"), 0); + assert_int_equal (parse_serial_num (result, ""), 0); +} + void test_server_is_http () { const char *server_ptr; @@ -589,6 +596,7 @@ int main(void) cmocka_unit_test(err_loadParodusCfg), cmocka_unit_test(test_parse_num_arg), cmocka_unit_test(test_parse_mac_address), + cmocka_unit_test(test_parse_serial_num), cmocka_unit_test(test_get_algo_mask), cmocka_unit_test(test_server_is_http), cmocka_unit_test(test_parse_webpa_url__), diff --git a/tests/test_conn_interface.c b/tests/test_conn_interface.c index 5433e7c..380aaab 100644 --- a/tests/test_conn_interface.c +++ b/tests/test_conn_interface.c @@ -44,7 +44,7 @@ pthread_mutex_t svc_mut=PTHREAD_MUTEX_INITIALIZER; pthread_cond_t svc_con=PTHREAD_COND_INITIALIZER; int numLoops; parodusOnPingStatusChangeHandler on_ping_status_change; - + /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ @@ -71,7 +71,10 @@ void nopoll_log_set_handler (noPollCtx *ctx, noPollLogHandler handler, noPollPtr UNUSED(ctx); UNUSED(handler); UNUSED(user_data); function_called(); } - +int cloud_status_is_online (void) +{ + return 0; +} void __report_log (noPollCtx * ctx, noPollDebugLevel level, const char * log_msg, noPollPtr user_data) { UNUSED(ctx); UNUSED(level); UNUSED(log_msg); UNUSED(user_data); @@ -103,6 +106,11 @@ void set_global_shutdown_reason(char *reason) UNUSED(reason); } +int getDeviceId(char **device_id, size_t *device_id_len) +{ + UNUSED(device_id); UNUSED(device_id_len); + return 0; +} void start_conn_in_progress (unsigned long start_time) { UNUSED(start_time); @@ -139,6 +147,10 @@ void packMetaData() function_called(); } +int get_parodus_init() +{ + return 0; +} int get_cloud_disconnect_time(void) { @@ -172,6 +184,18 @@ int serviceAliveTask() return 0; } +int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) +{ + UNUSED(msg); UNUSED(partnerIds); + return 0; +} + +int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size) +{ + UNUSED(resp_bytes); UNUSED(resp_size); + return 0; +} + int nopoll_loop_wait(noPollCtx * ctx,long timeout) { UNUSED(ctx); UNUSED(timeout); @@ -335,6 +359,7 @@ void test_createSocketConnection() void test_createSocketConnection1() { + numLoops =0; noPollCtx *ctx; ParodusCfg cfg; memset(&cfg,0, sizeof(ParodusCfg)); @@ -364,11 +389,11 @@ void test_createSocketConnection1() expect_function_call(nopoll_ctx_unref); expect_function_call(nopoll_cleanup_library); createSocketConnection(NULL); - } void test_PingMissIntervalTime() { + numLoops = 6; noPollCtx *ctx; ParodusCfg cfg; memset(&cfg,0,sizeof(ParodusCfg)); @@ -386,7 +411,6 @@ void test_PingMissIntervalTime() //Max ping timeout is 6 sec cfg.webpa_ping_timeout = 6; set_parodus_cfg(&cfg); - reset_close_retry(); expect_function_call(nopoll_thread_handlers); @@ -422,11 +446,11 @@ void test_PingMissIntervalTime() expect_function_call(nopoll_ctx_unref); expect_function_call(nopoll_cleanup_library); createSocketConnection(NULL); - } void err_createSocketConnection() { + numLoops =0; set_close_retry(); reset_heartBeatTimer(); expect_function_call(nopoll_thread_handlers); @@ -459,6 +483,7 @@ void err_createSocketConnection() void test_createSocketConnection_cloud_disconn() { + numLoops =0; ParodusCfg cfg; memset(&cfg,0,sizeof(ParodusCfg)); cfg.cloud_disconnect = strdup("XPC"); diff --git a/tests/test_createConnection.c b/tests/test_createConnection.c index 88d90ea..aa6b83e 100644 --- a/tests/test_createConnection.c +++ b/tests/test_createConnection.c @@ -341,7 +341,7 @@ void test_createConnection() int ret = createNopollConnection(ctx); assert_int_equal(ret, nopoll_true); - assert_string_equal(get_parodus_cfg()->cloud_status, CLOUD_STATUS_ONLINE); + assert_string_equal(get_cloud_status(), CLOUD_STATUS_ONLINE); free(cfg); if (g_jwt_server_ip !=NULL) { diff --git a/tests/test_crud_interface.c b/tests/test_crud_interface.c index 8649ff6..f848e03 100644 --- a/tests/test_crud_interface.c +++ b/tests/test_crud_interface.c @@ -125,6 +125,10 @@ int processCrudRequest(wrp_msg_t *reqMsg, wrp_msg_t **responseMsg ) return (int)mock(); } +char* getWebpaInterface(void) +{ + return NULL; +} /*----------------------------------------------------------------------------*/ /* Tests */ /*----------------------------------------------------------------------------*/ diff --git a/tests/test_crud_internal.c b/tests/test_crud_internal.c index 332d282..0cb7e47 100644 --- a/tests/test_crud_internal.c +++ b/tests/test_crud_internal.c @@ -1530,7 +1530,7 @@ void test_retrieveObject_cloud_status() ret = retrieveObject(reqMsg, &respMsg); assert_int_equal (respMsg->u.crud.status, 200); assert_int_equal (ret, 0); - assert_string_equal(get_parodus_cfg()->cloud_status, CLOUD_STATUS_ONLINE); + assert_string_equal(get_cloud_status(), CLOUD_STATUS_ONLINE); assert_int_equal (respMsg->u.crud.payload_size, 25); fp = fopen(cfg.crud_config_file, "r"); diff --git a/tests/test_downstream.c b/tests/test_downstream.c index 5b7961f..f43108f 100755 --- a/tests/test_downstream.c +++ b/tests/test_downstream.c @@ -64,7 +64,18 @@ reg_list_item_t * get_global_node(void) void release_global_node (void) { } - +void addToCloudAckQ(char *transaction_id, int qos, int rdr) +{ + (void)transaction_id; + (void)qos; + (void)rdr; + return; +} +bool highQosValueCheck(int qos) +{ + (void)qos; + return false; +} ssize_t wrp_to_struct( const void *bytes, const size_t length, const enum wrp_format fmt, wrp_msg_t **msg ) { diff --git a/tests/test_downstream_more.c b/tests/test_downstream_more.c index 7fb63a0..41e1528 100644 --- a/tests/test_downstream_more.c +++ b/tests/test_downstream_more.c @@ -192,6 +192,20 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds) return 1; } +void addToCloudAckQ(char *transaction_id, int qos, int rdr) +{ + (void)transaction_id; + (void)qos; + (void)rdr; + return; +} + +bool highQosValueCheck(int qos) +{ + (void)qos; + return false; +} + ssize_t wrp_to_struct( const void *bytes, const size_t length, const enum wrp_format fmt, wrp_msg_t **msg ) { diff --git a/tests/test_heartBeatTimer.c b/tests/test_heartBeatTimer.c index 48f401b..123cb6f 100644 --- a/tests/test_heartBeatTimer.c +++ b/tests/test_heartBeatTimer.c @@ -79,7 +79,6 @@ void test_mutexHeartBeatTimer() { ParodusInfo("heartBeatTimer reset to: %d\n", heartBeatTimer); assert_int_equal(heartBeatTimer, 0); } - /*----------------------------------------------------------------------------*/ /* External Functions */ /*----------------------------------------------------------------------------*/ diff --git a/tests/test_nopoll_handlers_fragment.c b/tests/test_nopoll_handlers_fragment.c index 4c79faf..2929edf 100644 --- a/tests/test_nopoll_handlers_fragment.c +++ b/tests/test_nopoll_handlers_fragment.c @@ -68,7 +68,6 @@ nopoll_bool nopoll_msg_is_final(noPollMsg *msg) function_called(); return (nopoll_bool) mock(); } - const unsigned char *nopoll_msg_get_payload(noPollMsg *msg) { (void)msg; diff --git a/tests/test_nopoll_helpers.c b/tests/test_nopoll_helpers.c index bebb956..f8862d5 100644 --- a/tests/test_nopoll_helpers.c +++ b/tests/test_nopoll_helpers.c @@ -24,6 +24,9 @@ #include "../src/parodus_log.h" #include "../src/nopoll_helpers.h" #include "../src/config.h" +#include +#include "../src/connection.h" +#include "../src/ParodusInternal.h" /*----------------------------------------------------------------------------*/ /* Macros */ @@ -34,7 +37,6 @@ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ static noPollConn *conn = NULL; - static ParodusCfg cfg; /*----------------------------------------------------------------------------*/ /* Mocks */ @@ -57,12 +59,6 @@ nopoll_bool nopoll_conn_is_ready( noPollConn *conn ) return (nopoll_bool)mock(); } -ParodusCfg *get_parodus_cfg(void) -{ - function_called(); - return &cfg; -} - int __nopoll_conn_send_common (noPollConn * conn, const char * content, long length, nopoll_bool has_fin, long sleep_in_header, noPollOpCode frame_type) { UNUSED(has_fin); UNUSED(sleep_in_header); UNUSED(frame_type); UNUSED(content); @@ -131,6 +127,7 @@ bool get_interface_down_event() return false; } + /*----------------------------------------------------------------------------*/ /* Tests */ /*----------------------------------------------------------------------------*/ @@ -221,8 +218,7 @@ void test_sendMessage() { int len = strlen("Hello Parodus!"); - cfg.cloud_status = CLOUD_STATUS_ONLINE; - expect_function_calls (get_parodus_cfg, 1); + get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; expect_value(__nopoll_conn_send_common, (intptr_t)conn, (intptr_t)conn); expect_value(__nopoll_conn_send_common, length, len); @@ -236,8 +232,7 @@ void test_sendMessageOffline() { int len = strlen("Hello Parodus!"); - cfg.cloud_status = CLOUD_STATUS_OFFLINE; - expect_function_calls (get_parodus_cfg, 1); + get_parodus_cfg()->cloud_status = CLOUD_STATUS_OFFLINE; sendMessage(conn, "Hello Parodus!", len); } @@ -246,8 +241,7 @@ void err_sendMessage() { int len = strlen("Hello Parodus!"); - cfg.cloud_status = CLOUD_STATUS_ONLINE; - expect_function_calls (get_parodus_cfg, 1); + get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; expect_value(__nopoll_conn_send_common, (intptr_t)conn,(intptr_t) conn); expect_value(__nopoll_conn_send_common, length, len); @@ -266,8 +260,7 @@ void err_sendMessageConnNull() { int len = strlen("Hello Parodus!"); - cfg.cloud_status = CLOUD_STATUS_ONLINE; - expect_function_calls (get_parodus_cfg, 1); + get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE; expect_value(__nopoll_conn_send_common, (intptr_t)conn, NULL); expect_value(__nopoll_conn_send_common, length, len); diff --git a/tests/test_upstream.c b/tests/test_upstream.c index 03bfe55..8f8283c 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -44,10 +44,12 @@ extern size_t metaPackSize; extern UpStreamMsg *UpStreamMsgQ; int numLoops = 1; int deviceIDNull =0; +char webpa_interface[64]={'\0'}; wrp_msg_t *temp = NULL; extern pthread_mutex_t nano_mut; extern pthread_cond_t nano_con; static int crud_test = 0; +pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER; /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ @@ -116,6 +118,27 @@ ParodusCfg *get_parodus_cfg(void) return &parodusCfg; } +char *getWebpaInterface(void) +{ + ParodusCfg cfg; + memset(&cfg,0,sizeof(cfg)); + #ifdef WAN_FAILOVER_SUPPORTED + parStrncpy(cfg.webpa_interface_used , "wl0", sizeof(cfg.webpa_interface_used)); + #else + parStrncpy(cfg.webpa_interface_used , "eth0", sizeof(cfg.webpa_interface_used)); + #endif + set_parodus_cfg(&cfg); + #ifdef WAN_FAILOVER_SUPPORTED + ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n"); + pthread_mutex_lock (&config_mut); + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + pthread_mutex_unlock (&config_mut); + #else + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + #endif + return webpa_interface; +} + ssize_t wrp_pack_metadata( const data_t *packData, void **data ) { (void) packData; (void) data; diff --git a/tests/test_upstream_sock.c b/tests/test_upstream_sock.c index bcbccb8..d97281d 100644 --- a/tests/test_upstream_sock.c +++ b/tests/test_upstream_sock.c @@ -46,10 +46,11 @@ extern size_t metaPackSize; extern UpStreamMsg *UpStreamMsgQ; int numLoops = 1; int deviceIDNull =0; +char webpa_interface[64]={'\0'}; wrp_msg_t *reg_msg = NULL; extern pthread_mutex_t nano_mut; extern pthread_cond_t nano_con; - +pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER; /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ @@ -101,6 +102,27 @@ ParodusCfg *get_parodus_cfg(void) return &parodusCfg; } +char *getWebpaInterface(void) +{ + ParodusCfg cfg; + memset(&cfg,0,sizeof(cfg)); + #ifdef WAN_FAILOVER_SUPPORTED + parStrncpy(cfg.webpa_interface_used , "wl0", sizeof(cfg.webpa_interface_used)); + #else + parStrncpy(cfg.webpa_interface_used , "eth0", sizeof(cfg.webpa_interface_used)); + #endif + set_parodus_cfg(&cfg); + #ifdef WAN_FAILOVER_SUPPORTED + ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n"); + pthread_mutex_lock (&config_mut); + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + pthread_mutex_unlock (&config_mut); + #else + parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface)); + #endif + return webpa_interface; +} + /*------------------------------------------- int nn_connect (int s, const char *addr) {