mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-28 02:20:02 +00:00
Compare commits
56 Commits
5.2_p1
...
xmidt_send
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60c37daa18 | ||
|
|
f51b4ef9c2 | ||
|
|
254c9f6763 | ||
|
|
8876fa9239 | ||
|
|
42fc9eb26e | ||
|
|
e8e652b076 | ||
|
|
dc935f9a09 | ||
|
|
d1462de56a | ||
|
|
223e7db81c | ||
|
|
cd763823fc | ||
|
|
8537f27ece | ||
|
|
6381d9c55f | ||
|
|
980b7a88db | ||
|
|
e3c37b6764 | ||
|
|
67d58441ef | ||
|
|
f6ffda05ad | ||
|
|
6aee851944 | ||
|
|
aaa93e137d | ||
|
|
21e8c74f23 | ||
|
|
0248e7b241 | ||
|
|
da019382c6 | ||
|
|
aecbd885d5 | ||
|
|
0443c45924 | ||
|
|
314c840755 | ||
|
|
69f7e3eba0 | ||
|
|
e643f9e2fc | ||
|
|
7d2e9733d0 | ||
|
|
23c18da03e | ||
|
|
2218637f2c | ||
|
|
54f9983eff | ||
|
|
a577854172 | ||
|
|
e054ef580f | ||
|
|
79eb7ba6ef | ||
|
|
a291b047b6 | ||
|
|
656b1b0b58 | ||
|
|
22ecf0ef76 | ||
|
|
c97a3a3a20 | ||
|
|
46273d9766 | ||
|
|
7efed14c20 | ||
|
|
6d39163d47 | ||
|
|
5b0d3e19f3 | ||
|
|
0d8bb960f6 | ||
|
|
5d537fb139 | ||
|
|
47fcd6bdae | ||
|
|
24e1790ccb | ||
|
|
89fdd97d44 | ||
|
|
5ae2325fb6 | ||
|
|
9bad1142ab | ||
|
|
339b93cfd3 | ||
|
|
a34db44c90 | ||
|
|
bdec80dd07 | ||
|
|
1833c30e83 | ||
|
|
5b26fe38f5 | ||
|
|
f3a4daeb74 | ||
|
|
aaeb1f6612 | ||
|
|
0fb6288255 |
@@ -172,8 +172,8 @@ add_dependencies(libcimplog cimplog)
|
||||
ExternalProject_Add(wrp-c
|
||||
DEPENDS trower-base64 msgpack cimplog
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/wrp-c
|
||||
GIT_REPOSITORY https://github.com/Comcast/wrp-c.git
|
||||
GIT_TAG "1.0.1"
|
||||
GIT_REPOSITORY https://github.com/xmidt-org/wrp-c.git
|
||||
GIT_TAG "71f8a39fe39f98da007ed4cdabbb192be1da1685"
|
||||
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR}
|
||||
-DMSGPACK_ENABLE_CXX=OFF
|
||||
-DMSGPACK_BUILD_EXAMPLES=OFF
|
||||
@@ -240,7 +240,7 @@ if (ENABLE_WEBCFGBIN)
|
||||
# rtMessage external dependency
|
||||
#-------------------------------------------------------------------------------
|
||||
ExternalProject_Add(rtMessage
|
||||
#DEPENDS cJSON
|
||||
DEPENDS cJSON
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/rtMessage
|
||||
GIT_REPOSITORY https://github.com/rdkcmf/rdk-rtmessage.git
|
||||
GIT_TAG rdk-next
|
||||
@@ -309,6 +309,10 @@ if (ENABLE_WEBCFGBIN)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_WEBCFGBIN ")
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
if (WAN_FAILOVER_SUPPORTED)
|
||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DWAN_FAILOVER_SUPPORTED ")
|
||||
endif (WAN_FAILOVER_SUPPORTED)
|
||||
|
||||
link_directories ( ${LIBRARY_DIR} ${COMMON_LIBRARY_DIR} ${LIBRARY_DIR64} )
|
||||
add_subdirectory(src)
|
||||
if (BUILD_TESTING)
|
||||
|
||||
@@ -24,9 +24,15 @@ set(SOURCES ${SOURCES} seshat_interface_stub.c)
|
||||
endif (ENABLE_SESHAT)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set(SOURCES ${SOURCES} upstream_rbus.c)
|
||||
set(SOURCES ${SOURCES} upstream_rbus.c xmidtsend_rbus.c)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
if (WAN_FAILOVER_SUPPORTED)
|
||||
message(STATUS "WAN_FAILOVER_SUPPORTED is supported")
|
||||
else()
|
||||
message(STATUS "WAN_FAILOVER_SUPPORTED is not supported")
|
||||
endif (WAN_FAILOVER_SUPPORTED)
|
||||
|
||||
add_executable(parodus ${SOURCES})
|
||||
|
||||
target_link_libraries (parodus
|
||||
@@ -57,6 +63,6 @@ target_link_libraries (parodus -llibseshat)
|
||||
endif (ENABLE_SESHAT)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
target_link_libraries (parodus -lrbus -lrbus-core)
|
||||
target_link_libraries (parodus -lrbus -lrbus-core -lrtMessage)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
install (TARGETS parodus DESTINATION bin)
|
||||
|
||||
@@ -69,9 +69,9 @@ char* getWebpaConveyHeader()
|
||||
cJSON_AddStringToObject(response, WEBPA_PROTOCOL, get_parodus_cfg()->webpa_protocol);
|
||||
}
|
||||
|
||||
if(strlen(get_parodus_cfg()->webpa_interface_used)!=0)
|
||||
if(strlen(getWebpaInterface())!=0)
|
||||
{
|
||||
cJSON_AddStringToObject(response, WEBPA_INTERFACE, get_parodus_cfg()->webpa_interface_used);
|
||||
cJSON_AddStringToObject(response, WEBPA_INTERFACE, getWebpaInterface());
|
||||
}
|
||||
|
||||
if(strlen(get_parodus_cfg()->hw_last_reboot_reason)!=0)
|
||||
|
||||
@@ -163,6 +163,10 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
|
||||
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
void subscribeRBUSevent();
|
||||
int regXmidtSendDataMethod();
|
||||
#endif
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void setWebpaInterface(char *value);
|
||||
#endif
|
||||
/*------------------------------------------------------------------------------*/
|
||||
/* For interface_down_event Flag */
|
||||
@@ -180,8 +184,11 @@ void set_interface_down_event();
|
||||
pthread_cond_t *get_interface_down_con();
|
||||
|
||||
pthread_mutex_t *get_interface_down_mut();
|
||||
|
||||
|
||||
pthread_cond_t *get_global_cloud_status_cond(void);
|
||||
|
||||
pthread_mutex_t *get_global_cloud_status_mut(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -64,6 +64,7 @@ int requestNewAuthToken(char *newToken, size_t len, int r_count)
|
||||
struct curl_slist *list = NULL;
|
||||
struct curl_slist *headers_list = NULL;
|
||||
|
||||
char webpa_interface[64]={'\0'};
|
||||
double total;
|
||||
|
||||
struct token_data data;
|
||||
@@ -80,9 +81,10 @@ int requestNewAuthToken(char *newToken, size_t len, int r_count)
|
||||
curl_easy_setopt(curl, CURLOPT_URL, get_parodus_cfg()->token_server_url);
|
||||
curl_easy_setopt(curl, CURLOPT_TIMEOUT, CURL_TIMEOUT_SEC);
|
||||
|
||||
if(get_parodus_cfg()->webpa_interface_used !=NULL && strlen(get_parodus_cfg()->webpa_interface_used) >0)
|
||||
parStrncpy(webpa_interface, getWebpaInterface(), sizeof(webpa_interface));
|
||||
if(webpa_interface !=NULL && strlen(webpa_interface) >0)
|
||||
{
|
||||
curl_easy_setopt(curl, CURLOPT_INTERFACE, get_parodus_cfg()->webpa_interface_used);
|
||||
curl_easy_setopt(curl, CURLOPT_INTERFACE, webpa_interface);
|
||||
}
|
||||
/* set callback for writing received data */
|
||||
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback_fn);
|
||||
@@ -304,7 +306,7 @@ void createCurlheader(struct curl_slist *list, struct curl_slist **header_list)
|
||||
snprintf(buf, MAX_BUF_SIZE, "X-Midt-Protocol: %s", get_parodus_cfg()->webpa_protocol);
|
||||
list = curl_slist_append(list, buf);
|
||||
|
||||
snprintf(buf, MAX_BUF_SIZE, "X-Midt-Interface-Used: %s", get_parodus_cfg()->webpa_interface_used);
|
||||
snprintf(buf, MAX_BUF_SIZE, "X-Midt-Interface-Used: %s", getWebpaInterface());
|
||||
list = curl_slist_append(list, buf);
|
||||
|
||||
snprintf(buf, MAX_BUF_SIZE, "X-Midt-Last-Reboot-Reason: %s", get_parodus_cfg()->hw_last_reboot_reason);
|
||||
|
||||
105
src/config.c
105
src/config.c
@@ -33,13 +33,30 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
//For sending cond signal when cloud status is ONLINE
|
||||
pthread_mutex_t cloud_status_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t cloud_status_cond=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
char webpa_interface[64]={'\0'};
|
||||
|
||||
char cloud_status[32]={'\0'};
|
||||
static ParodusCfg parodusCfg;
|
||||
static unsigned int rsa_algorithms =
|
||||
(1<<alg_rs256) | (1<<alg_rs384) | (1<<alg_rs512);
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
pthread_cond_t *get_global_cloud_status_cond(void)
|
||||
{
|
||||
return &cloud_status_cond;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_cloud_status_mut(void)
|
||||
{
|
||||
return &cloud_status_mut;
|
||||
}
|
||||
|
||||
ParodusCfg *get_parodus_cfg(void)
|
||||
{
|
||||
@@ -61,6 +78,27 @@ void reset_cloud_disconnect_reason(ParodusCfg *cfg)
|
||||
cfg->cloud_disconnect = NULL;
|
||||
}
|
||||
|
||||
void set_cloud_status(char *status)
|
||||
{
|
||||
if(status != NULL)
|
||||
{
|
||||
pthread_mutex_lock(&config_mut);
|
||||
get_parodus_cfg()->cloud_status = status;
|
||||
if(strcmp (status, CLOUD_STATUS_ONLINE) == 0)
|
||||
{
|
||||
pthread_cond_signal(&cloud_status_cond);
|
||||
}
|
||||
pthread_mutex_unlock(&config_mut);
|
||||
}
|
||||
}
|
||||
|
||||
char *get_cloud_status(void)
|
||||
{
|
||||
pthread_mutex_lock(&config_mut);
|
||||
parStrncpy(cloud_status, get_parodus_cfg()->cloud_status, sizeof(cloud_status));
|
||||
pthread_mutex_unlock(&config_mut);
|
||||
return cloud_status;
|
||||
}
|
||||
|
||||
const char *get_tok (const char *src, int delim, char *result, int resultsize)
|
||||
{
|
||||
@@ -146,9 +184,9 @@ void read_key_from_file (const char *fname, char *buf, size_t buflen)
|
||||
int parse_mac_address (char *target, const char *arg)
|
||||
{
|
||||
int count = 0;
|
||||
int i;
|
||||
int i, j;
|
||||
char c;
|
||||
|
||||
char *mac = target;
|
||||
for (i=0; (c=arg[i]) != 0; i++) {
|
||||
if (c !=':')
|
||||
count++;
|
||||
@@ -160,39 +198,14 @@ int parse_mac_address (char *target, const char *arg)
|
||||
*(target++) = c;
|
||||
}
|
||||
*target = 0; // terminating null
|
||||
return 0;
|
||||
}
|
||||
|
||||
int parse_serial_num(char *target, const char *arg)
|
||||
{
|
||||
char ch;
|
||||
if(arg != NULL)
|
||||
//convert mac to lowercase
|
||||
for(j = 0; mac[j]; j++)
|
||||
{
|
||||
if(strlen(arg) == 0)
|
||||
{
|
||||
ParodusError("Empty serial number, setting to default unknown\n");
|
||||
strcpy(target,"unknown");
|
||||
}
|
||||
for(int i=0; (ch = arg[i]) != '\0'; i++)
|
||||
{
|
||||
// check if character is ascii, a-z --> 97 to 122, A-Z --> 65 to 90, digits(0 to 9) --> 48 to 57
|
||||
if((ch >= 97 && ch <= 122) || (ch >= 65 && ch <= 90) || (ch >=48 && ch <= 57))
|
||||
{
|
||||
target[i] = ch;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Invalid serial number, setting to default unknown\n");
|
||||
strcpy(target,"unknown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
mac[j] = tolower(mac[j]);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("serial number argument is NULL\n");
|
||||
}
|
||||
return 0;
|
||||
ParodusPrint("mac in lowercase is %s\n", mac);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int server_is_http (const char *full_url,
|
||||
@@ -449,8 +462,8 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
break;
|
||||
|
||||
case 's':
|
||||
if(parse_serial_num(cfg->hw_serial_number, optarg) == 0)
|
||||
ParodusInfo ("hw_serial-number is %s\n",cfg->hw_serial_number);
|
||||
parStrncpy(cfg->hw_serial_number,optarg,sizeof(cfg->hw_serial_number));
|
||||
ParodusInfo("hw_serial_number is %s\n",cfg->hw_serial_number);
|
||||
break;
|
||||
|
||||
case 'f':
|
||||
@@ -756,7 +769,7 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg)
|
||||
}
|
||||
if(strlen(config->webpa_interface_used )!=0)
|
||||
{
|
||||
parStrncpy(cfg->webpa_interface_used, config->webpa_interface_used,sizeof(cfg->webpa_interface_used));
|
||||
parStrncpy(getWebpaInterface(), config->webpa_interface_used,sizeof(getWebpaInterface()));
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -880,4 +893,26 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg)
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void setWebpaInterface(char *value)
|
||||
{
|
||||
pthread_mutex_lock (&config_mut);
|
||||
parStrncpy(get_parodus_cfg()->webpa_interface_used, value, sizeof(get_parodus_cfg()->webpa_interface_used));
|
||||
pthread_mutex_unlock (&config_mut);
|
||||
}
|
||||
#endif
|
||||
|
||||
char *getWebpaInterface(void)
|
||||
{
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n");
|
||||
pthread_mutex_lock (&config_mut);
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
pthread_mutex_unlock (&config_mut);
|
||||
#else
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
#endif
|
||||
ParodusPrint("webpa_interface:%s\n", webpa_interface);
|
||||
return webpa_interface;
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#ifndef _CONFIG_H_
|
||||
#define _CONFIG_H_
|
||||
|
||||
#include <pthread.h>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
@@ -139,6 +140,10 @@ void set_parodus_cfg(ParodusCfg *);
|
||||
char *get_token_application(void) ;
|
||||
void set_cloud_disconnect_reason(ParodusCfg *cfg, char *disconn_reason);
|
||||
void reset_cloud_disconnect_reason(ParodusCfg *cfg);
|
||||
char *getWebpaInterface(void);
|
||||
void set_cloud_status(char *status);
|
||||
char *get_cloud_status(void);
|
||||
|
||||
/**
|
||||
* parse a webpa url. Extract the server address, the port
|
||||
* and return whether it's secure or not
|
||||
|
||||
@@ -115,6 +115,10 @@ void createSocketConnection(void (* initKeypress)())
|
||||
StartThread(processUpstreamMessage, &upstream_msg_tid);
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
subscribeRBUSevent();
|
||||
regXmidtSendDataMethod();
|
||||
#endif
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
subscribeCurrentActiveInterfaceEvent();
|
||||
#endif
|
||||
ParodusMsgQ = NULL;
|
||||
StartThread(messageHandlerTask, &downstream_tid);
|
||||
|
||||
@@ -717,10 +717,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
{
|
||||
create_connection_ctx_t conn_ctx;
|
||||
int max_retry_count;
|
||||
struct timespec connect_time,*connectTimePtr;
|
||||
connectTimePtr = &connect_time;
|
||||
backoff_timer_t backoff_timer;
|
||||
static int init_conn_failure=1;
|
||||
struct sysinfo l_sSysInfo;
|
||||
|
||||
if(ctx == NULL) {
|
||||
return nopoll_false;
|
||||
@@ -785,8 +784,8 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
OnboardLog("Connected to server\n");
|
||||
}
|
||||
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
|
||||
ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status);
|
||||
set_cloud_status(CLOUD_STATUS_ONLINE);
|
||||
ParodusInfo("cloud_status set as %s after successful connection\n", get_cloud_status());
|
||||
|
||||
/* On initial connect success, invoke conn status change event as "success" */
|
||||
if((NULL != on_conn_status_change) && init)
|
||||
@@ -800,9 +799,9 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
on_ping_status_change("received");
|
||||
}
|
||||
|
||||
if((get_parodus_cfg()->boot_time != 0) && init) {
|
||||
getCurrentTime(connectTimePtr);
|
||||
ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time);
|
||||
if(init) {
|
||||
sysinfo(&l_sSysInfo);
|
||||
ParodusInfo("connect_time-diff-boot_time=%ld\n", l_sSysInfo.uptime);
|
||||
init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect
|
||||
}
|
||||
|
||||
@@ -862,7 +861,7 @@ static noPollConnOpts * createConnOpts (char * extra_headers, bool secure)
|
||||
nopoll_conn_opts_ssl_peer_verify (opts, nopoll_true);
|
||||
nopoll_conn_opts_set_ssl_protocol (opts, NOPOLL_METHOD_TLSV1_2);
|
||||
}
|
||||
nopoll_conn_opts_set_interface (opts,get_parodus_cfg()->webpa_interface_used);
|
||||
nopoll_conn_opts_set_interface (opts,getWebpaInterface());
|
||||
nopoll_conn_opts_set_extra_headers (opts,extra_headers);
|
||||
return opts;
|
||||
}
|
||||
@@ -902,8 +901,8 @@ void close_and_unref_connection(noPollConn *conn, bool is_shutting_down)
|
||||
{
|
||||
if (conn) {
|
||||
close_conn (conn, is_shutting_down);
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_OFFLINE;
|
||||
ParodusInfo("cloud_status set as %s after connection close\n", get_parodus_cfg()->cloud_status);
|
||||
set_cloud_status(CLOUD_STATUS_OFFLINE);
|
||||
ParodusInfo("cloud_status set as %s after connection close\n", get_cloud_status());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -537,13 +537,13 @@ int retrieveFromMemory(char *keyName, cJSON **jsonresponse)
|
||||
}
|
||||
else if(strcmp(WEBPA_INTERFACE, keyName)==0)
|
||||
{
|
||||
if((get_parodus_cfg()->webpa_interface_used !=NULL)&& (strlen(get_parodus_cfg()->fw_name)==0))
|
||||
if((getWebpaInterface() !=NULL)&& (strlen(get_parodus_cfg()->fw_name)==0))
|
||||
{
|
||||
ParodusError("retrieveFromMemory: webpa_interface_used value is NULL\n");
|
||||
return -1;
|
||||
}
|
||||
ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n",keyName,get_parodus_cfg()->webpa_interface_used);
|
||||
cJSON_AddItemToObject( *jsonresponse, WEBPA_INTERFACE , cJSON_CreateString(get_parodus_cfg()->webpa_interface_used));
|
||||
ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n",keyName,getWebpaInterface());
|
||||
cJSON_AddItemToObject( *jsonresponse, WEBPA_INTERFACE , cJSON_CreateString(getWebpaInterface()));
|
||||
}
|
||||
else if(strcmp(WEBPA_URL, keyName)==0)
|
||||
{
|
||||
@@ -577,20 +577,20 @@ int retrieveFromMemory(char *keyName, cJSON **jsonresponse)
|
||||
}
|
||||
else if(strcmp(CLOUD_STATUS, keyName)==0)
|
||||
{
|
||||
if(get_parodus_cfg()->cloud_status ==NULL)
|
||||
if(get_cloud_status() ==NULL)
|
||||
{
|
||||
ParodusError("retrieveFromMemory: cloud_status value is NULL\n");
|
||||
return -1;
|
||||
}
|
||||
else if((get_parodus_cfg()->cloud_status !=NULL) && (strlen(get_parodus_cfg()->cloud_status)==0))
|
||||
else if((get_cloud_status() !=NULL) && (strlen(get_cloud_status())==0))
|
||||
{
|
||||
ParodusError("retrieveFromMemory: cloud_status value is empty\n");
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n", keyName, get_parodus_cfg()->cloud_status);
|
||||
cJSON_AddItemToObject( *jsonresponse, CLOUD_STATUS , cJSON_CreateString(get_parodus_cfg()->cloud_status));
|
||||
ParodusInfo("retrieveFromMemory: keyName:%s value:%s\n", keyName, get_cloud_status());
|
||||
cJSON_AddItemToObject( *jsonresponse, CLOUD_STATUS , cJSON_CreateString(get_cloud_status()));
|
||||
}
|
||||
}
|
||||
else if(strcmp(BOOT_TIME, keyName)==0)
|
||||
|
||||
@@ -52,7 +52,7 @@ void setMessageHandlers()
|
||||
|
||||
static int cloud_status_is_online (void)
|
||||
{
|
||||
const char *status = get_parodus_cfg()->cloud_status;
|
||||
const char *status = get_cloud_status();
|
||||
if (NULL == status)
|
||||
return false;
|
||||
return (strcmp (status, CLOUD_STATUS_ONLINE) == 0);
|
||||
@@ -60,14 +60,14 @@ static int cloud_status_is_online (void)
|
||||
|
||||
/** To send upstream msgs to server ***/
|
||||
|
||||
void sendMessage(noPollConn *conn, void *msg, size_t len)
|
||||
int sendMessage(noPollConn *conn, void *msg, size_t len)
|
||||
{
|
||||
int bytesWritten = 0;
|
||||
|
||||
if (!cloud_status_is_online ()) {
|
||||
ParodusError("Failed to send msg upstream as connection is not OK\n");
|
||||
OnboardLog("Failed to send msg upstream as connection is not OK\n");
|
||||
return;
|
||||
return 1;
|
||||
}
|
||||
|
||||
ParodusInfo("sendMessage length %zu\n", len);
|
||||
@@ -77,7 +77,9 @@ void sendMessage(noPollConn *conn, void *msg, size_t len)
|
||||
if (bytesWritten != (int) len)
|
||||
{
|
||||
ParodusError("Failed to send bytes %zu, bytes written were=%d (errno=%d, %s)..\n", len, bytesWritten, errno, strerror(errno));
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int sendResponse(noPollConn * conn, void * buffer, size_t length)
|
||||
|
||||
@@ -41,7 +41,7 @@ extern "C" {
|
||||
*/
|
||||
int sendResponse(noPollConn * conn,void *str, size_t bufferSize);
|
||||
void setMessageHandlers();
|
||||
void sendMessage(noPollConn *conn, void *msg, size_t len);
|
||||
int sendMessage(noPollConn *conn, void *msg, size_t len);
|
||||
|
||||
/**
|
||||
* @brief __report_log Nopoll log handler
|
||||
|
||||
@@ -83,7 +83,6 @@ void packMetaData()
|
||||
//Pack the metadata initially to reuse for every upstream msg sending to server
|
||||
ParodusPrint("-------------- Packing metadata ----------------\n");
|
||||
sprintf(boot_time, "%d", get_parodus_cfg()->boot_time);
|
||||
|
||||
struct data meta_pack[METADATA_COUNT] = {
|
||||
{HW_MODELNAME, get_parodus_cfg()->hw_model},
|
||||
{HW_SERIALNUMBER, get_parodus_cfg()->hw_serial_number},
|
||||
@@ -95,10 +94,9 @@ void packMetaData()
|
||||
{LAST_RECONNECT_REASON, get_global_reconnect_reason()},
|
||||
{WEBPA_PROTOCOL, get_parodus_cfg()->webpa_protocol},
|
||||
{WEBPA_UUID,get_parodus_cfg()->webpa_uuid},
|
||||
{WEBPA_INTERFACE, get_parodus_cfg()->webpa_interface_used},
|
||||
{WEBPA_INTERFACE, getWebpaInterface()},
|
||||
{PARTNER_ID, get_parodus_cfg()->partner_id}
|
||||
};
|
||||
|
||||
const data_t metapack = {METADATA_COUNT, meta_pack};
|
||||
|
||||
metaPackSize = wrp_pack_metadata( &metapack , &metadataPack );
|
||||
@@ -330,6 +328,7 @@ void *processUpstreamMessage()
|
||||
if(ret == 1)
|
||||
{
|
||||
wrp_msg_t *eventMsg = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset( eventMsg, 0, sizeof( wrp_msg_t ) );
|
||||
eventMsg->msg_type = msgType;
|
||||
eventMsg->u.event.content_type=msg->u.event.content_type;
|
||||
eventMsg->u.event.source=msg->u.event.source;
|
||||
@@ -339,6 +338,15 @@ void *processUpstreamMessage()
|
||||
eventMsg->u.event.headers=msg->u.event.headers;
|
||||
eventMsg->u.event.metadata=msg->u.event.metadata;
|
||||
eventMsg->u.event.partner_ids = partnersList;
|
||||
if(msg->u.event.transaction_uuid)
|
||||
{
|
||||
ParodusPrint("Inside Trans id in PARODUS\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("Assigning NULL to trans id\n");
|
||||
eventMsg->u.event.transaction_uuid = NULL;
|
||||
}
|
||||
|
||||
int size = wrp_struct_to( eventMsg, WRP_BYTES, &bytes );
|
||||
if(size > 0)
|
||||
@@ -596,11 +604,12 @@ void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_
|
||||
}
|
||||
}
|
||||
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
{
|
||||
void *appendData;
|
||||
size_t encodedSize;
|
||||
bool close_retry = false;
|
||||
int sendRetStatus = 1;
|
||||
//appending response with metadata
|
||||
if(metaPackSize > 0)
|
||||
{
|
||||
@@ -615,12 +624,13 @@ void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
//TODO: Upstream and downstream messages in queue should be handled and queue should be empty before parodus forcefully disconnect from cloud.
|
||||
if(!close_retry || (get_parodus_cfg()->cloud_disconnect !=NULL))
|
||||
{
|
||||
sendMessage(get_global_conn(),appendData, encodedSize);
|
||||
sendRetStatus = sendMessage(get_global_conn(),appendData, encodedSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("close_retry is %d, unable to send response as connection retry is in progress\n", close_retry);
|
||||
OnboardLog("close_retry is %d, unable to send response as connection retry is in progress\n", close_retry);
|
||||
sendRetStatus = 1;
|
||||
}
|
||||
free(appendData);
|
||||
appendData =NULL;
|
||||
@@ -628,6 +638,9 @@ void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to send upstream as metadata packing is not successful\n");
|
||||
sendRetStatus = 1;
|
||||
}
|
||||
ParodusPrint("sendRetStatus is %d\n", sendRetStatus);
|
||||
|
||||
return sendRetStatus;
|
||||
}
|
||||
|
||||
@@ -49,10 +49,13 @@ void *handle_upstream();
|
||||
void *processUpstreamMessage();
|
||||
void registerRBUSlistener();
|
||||
int getDeviceId(char **device_id, size_t *device_id_len);
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size);
|
||||
int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size);
|
||||
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size);
|
||||
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg);
|
||||
void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ);
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
int subscribeCurrentActiveInterfaceEvent();
|
||||
#endif
|
||||
UpStreamMsg * get_global_UpStreamMsgQ(void);
|
||||
pthread_cond_t *get_global_nano_con(void);
|
||||
pthread_mutex_t *get_global_nano_mut(void);
|
||||
|
||||
@@ -29,17 +29,29 @@
|
||||
#include "partners_check.h"
|
||||
|
||||
#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream"
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
#define WEBPA_INTERFACE "Device.X_RDK_WanManager.CurrentActiveInterface"
|
||||
#endif
|
||||
|
||||
rbusHandle_t rbus_Handle;
|
||||
rbusError_t err;
|
||||
|
||||
void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription);
|
||||
|
||||
void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error);
|
||||
|
||||
rbusHandle_t get_parodus_rbus_Handle(void)
|
||||
{
|
||||
return rbus_Handle;
|
||||
}
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription );
|
||||
#endif
|
||||
|
||||
/* API to register RBUS listener to receive messages from webconfig */
|
||||
void subscribeRBUSevent()
|
||||
{
|
||||
rbusError_t err;
|
||||
int rc = RBUS_ERROR_SUCCESS;
|
||||
rbusHandle_t rbus_Handle;
|
||||
err = rbus_open(&rbus_Handle, "parodus");
|
||||
if (err)
|
||||
{
|
||||
@@ -53,6 +65,21 @@ void subscribeRBUSevent()
|
||||
ParodusInfo("rbusEvent_Subscribe was successful\n");
|
||||
}
|
||||
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
/* API to subscribe Active Interface name on value change event*/
|
||||
int subscribeCurrentActiveInterfaceEvent()
|
||||
{
|
||||
int rc = RBUS_ERROR_SUCCESS;
|
||||
ParodusPrint("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
|
||||
rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBPA_INTERFACE,eventReceiveHandler,subscribeAsyncHandler,"parodusInterface",10*20);
|
||||
if(rc != RBUS_ERROR_SUCCESS)
|
||||
{
|
||||
ParodusError("%s subscribe failed : %d - %s\n", WEBPA_INTERFACE, rc, rbusError_ToString(rc));
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
#endif
|
||||
|
||||
void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription)
|
||||
{
|
||||
(void)handle;
|
||||
@@ -80,6 +107,7 @@ void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event
|
||||
if(ret == 1)
|
||||
{
|
||||
wrp_msg_t *eventMsg = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset( eventMsg, 0, sizeof( wrp_msg_t ) );
|
||||
eventMsg->msg_type = event_msg->msg_type;
|
||||
eventMsg->u.event.content_type=event_msg->u.event.content_type;
|
||||
eventMsg->u.event.source=event_msg->u.event.source;
|
||||
@@ -89,6 +117,15 @@ void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event
|
||||
eventMsg->u.event.headers=event_msg->u.event.headers;
|
||||
eventMsg->u.event.metadata=event_msg->u.event.metadata;
|
||||
eventMsg->u.event.partner_ids = partnersList;
|
||||
if(event_msg->u.event.transaction_uuid)
|
||||
{
|
||||
ParodusPrint("Inside Trans id in PARODUS_rbus\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("Assigning NULL to trans id RBUS\n");
|
||||
eventMsg->u.event.transaction_uuid = NULL;
|
||||
}
|
||||
|
||||
int size = wrp_struct_to( eventMsg, WRP_BYTES, &bytes );
|
||||
if(size > 0)
|
||||
@@ -123,3 +160,24 @@ void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscr
|
||||
(void)handle;
|
||||
ParodusInfo("subscribeAsyncHandler event %s, error %d - %s\n",subscription->eventName, error, rbusError_ToString(error));
|
||||
}
|
||||
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription )
|
||||
{
|
||||
(void)subscription;
|
||||
ParodusPrint("Handling event inside eventReceiveHandler\n");
|
||||
(void)rbus_Handle;
|
||||
char * interface = NULL;
|
||||
rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
|
||||
rbusValue_t oldValue = rbusObject_GetValue(event->data, "oldValue");
|
||||
ParodusInfo("Consumer received ValueChange event for param %s\n", event->name);
|
||||
|
||||
if(newValue) {
|
||||
interface = (char *) rbusValue_GetString(newValue, NULL);
|
||||
setWebpaInterface(interface);
|
||||
}
|
||||
if(newValue !=NULL && oldValue!=NULL && interface!=NULL) {
|
||||
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
846
src/xmidtsend_rbus.c
Normal file
846
src/xmidtsend_rbus.c
Normal file
@@ -0,0 +1,846 @@
|
||||
/**
|
||||
* Copyright 2022 Comcast Cable Communications Management, LLC
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
/**
|
||||
* @file xmidtsend_rbus.c
|
||||
*
|
||||
* @ To provide Xmidt send RBUS method to send events upstream.
|
||||
*
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <rbus.h>
|
||||
#include "upstream.h"
|
||||
#include "ParodusInternal.h"
|
||||
#include "partners_check.h"
|
||||
#include "xmidtsend_rbus.h"
|
||||
#include "config.h"
|
||||
|
||||
static pthread_t processThreadId = 0;
|
||||
static int XmidtQsize = 0;
|
||||
|
||||
XmidtMsg *XmidtMsgQ = NULL;
|
||||
|
||||
pthread_mutex_t xmidt_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
pthread_cond_t xmidt_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
bool highQosValueCheck(int qos)
|
||||
{
|
||||
if(qos > 24)
|
||||
{
|
||||
ParodusInfo("The qos value is high\n");
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("The qos value is low\n");
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
/*
|
||||
* @brief To handle xmidt rbus messages received from various components.
|
||||
*/
|
||||
void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
XmidtMsg *message;
|
||||
|
||||
ParodusPrint("XmidtQsize is %d\n" , XmidtQsize);
|
||||
if(XmidtQsize == MAX_QUEUE_SIZE)
|
||||
{
|
||||
char * errorMsg = strdup("Max Queue Size Exceeded");
|
||||
ParodusError("Queue Size Exceeded\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , QUEUE_SIZE_EXCEEDED);
|
||||
wrp_free_struct(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
ParodusPrint ("Add Xmidt Upstream message to queue\n");
|
||||
message = (XmidtMsg *)malloc(sizeof(XmidtMsg));
|
||||
|
||||
if(message)
|
||||
{
|
||||
message->msg = msg;
|
||||
message->asyncHandle =asyncHandle;
|
||||
//Increment queue size to handle max queue limit
|
||||
XmidtQsize++;
|
||||
message->next=NULL;
|
||||
pthread_mutex_lock (&xmidt_mut);
|
||||
//Producer adds the rbus msg into queue
|
||||
if(XmidtMsgQ == NULL)
|
||||
{
|
||||
XmidtMsgQ = message;
|
||||
|
||||
ParodusPrint("Producer added xmidt message\n");
|
||||
pthread_cond_signal(&xmidt_con);
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
ParodusPrint("mutex unlock in xmidt producer\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
XmidtMsg *temp = XmidtMsgQ;
|
||||
while(temp->next)
|
||||
{
|
||||
temp = temp->next;
|
||||
}
|
||||
temp->next = message;
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
char * errorMsg = strdup("Unable to enqueue");
|
||||
ParodusError("failure in allocation for xmidt message\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg , ENQUEUE_FAILURE);
|
||||
wrp_free_struct(msg);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
//Xmidt consumer thread to process the rbus events.
|
||||
void processXmidtData()
|
||||
{
|
||||
int err = 0;
|
||||
err = pthread_create(&processThreadId, NULL, processXmidtUpstreamMsg, NULL);
|
||||
if (err != 0)
|
||||
{
|
||||
ParodusError("Error creating processXmidtData thread :[%s]\n", strerror(err));
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("processXmidtData thread created Successfully\n");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//Consumer to Parse and process rbus data.
|
||||
void* processXmidtUpstreamMsg()
|
||||
{
|
||||
int rv = 0;
|
||||
while(FOREVER())
|
||||
{
|
||||
pthread_mutex_lock (&xmidt_mut);
|
||||
ParodusPrint("mutex lock in xmidt consumer thread\n");
|
||||
if(XmidtMsgQ != NULL)
|
||||
{
|
||||
XmidtMsg *Data = XmidtMsgQ;
|
||||
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
ParodusPrint("mutex unlock in xmidt consumer thread\n");
|
||||
rv = processData(Data->msg, Data->asyncHandle);
|
||||
if(!rv)
|
||||
{
|
||||
ParodusPrint("Data->msg wrp free\n");
|
||||
wrp_free_struct(Data->msg);
|
||||
}
|
||||
else
|
||||
{
|
||||
free(Data->msg);
|
||||
}
|
||||
free(Data);
|
||||
Data = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (g_shutdown)
|
||||
{
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
break;
|
||||
}
|
||||
ParodusPrint("Before cond wait in xmidt consumer thread\n");
|
||||
pthread_cond_wait(&xmidt_con, &xmidt_mut);
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
ParodusPrint("mutex unlock in xmidt thread after cond wait\n");
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//To validate and send events upstream
|
||||
int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
int rv = 0;
|
||||
char *errorMsg = "none";
|
||||
int statuscode =0;
|
||||
|
||||
wrp_msg_t * xmidtMsg = msg;
|
||||
if (xmidtMsg == NULL)
|
||||
{
|
||||
ParodusError("xmidtMsg is NULL\n");
|
||||
errorMsg = strdup("Unable to enqueue");
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg, ENQUEUE_FAILURE);
|
||||
xmidtQDequeue();
|
||||
return rv;
|
||||
}
|
||||
|
||||
rv = validateXmidtData(xmidtMsg, &errorMsg, &statuscode);
|
||||
ParodusPrint("validateXmidtData, errorMsg %s statuscode %d\n", errorMsg, statuscode);
|
||||
if(rv)
|
||||
{
|
||||
ParodusPrint("validation successful, send event to server\n");
|
||||
sendXmidtEventToServer(xmidtMsg, asyncHandle);
|
||||
return rv;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("validation failed, send failure ack\n");
|
||||
createOutParamsandSendAck(xmidtMsg, asyncHandle, errorMsg , statuscode);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
//To remove an event from Queue
|
||||
void xmidtQDequeue()
|
||||
{
|
||||
pthread_mutex_lock (&xmidt_mut);
|
||||
if(XmidtMsgQ != NULL)
|
||||
{
|
||||
XmidtMsgQ = XmidtMsgQ->next;
|
||||
XmidtQsize -= 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("XmidtMsgQ is NULL\n");
|
||||
}
|
||||
pthread_mutex_unlock (&xmidt_mut);
|
||||
}
|
||||
|
||||
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode)
|
||||
{
|
||||
if(eventMsg == NULL)
|
||||
{
|
||||
ParodusError("eventMsg is NULL\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->msg_type != WRP_MSG_TYPE__EVENT)
|
||||
{
|
||||
*errorMsg = strdup("Message format is invalid");
|
||||
*statusCode = INVALID_MSG_TYPE;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.source == NULL)
|
||||
{
|
||||
*errorMsg = strdup("Missing source");
|
||||
*statusCode = MISSING_SOURCE;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.dest == NULL)
|
||||
{
|
||||
*errorMsg = strdup("Missing dest");
|
||||
*statusCode = MISSING_DEST;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.content_type == NULL)
|
||||
{
|
||||
*errorMsg = strdup("Missing content_type");
|
||||
*statusCode = MISSING_CONTENT_TYPE;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.payload == NULL)
|
||||
{
|
||||
*errorMsg = strdup("Missing payload");
|
||||
*statusCode = MISSING_PAYLOAD;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(eventMsg->u.event.payload_size == 0)
|
||||
{
|
||||
*errorMsg = strdup("Missing payloadlen");
|
||||
*statusCode = MISSING_PAYLOADLEN;
|
||||
ParodusError("errorMsg: %s, statusCode: %d\n", *errorMsg, *statusCode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ParodusPrint("validateXmidtData success. errorMsg %s statusCode %d\n", *errorMsg, *statusCode);
|
||||
return 1;
|
||||
}
|
||||
|
||||
void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
wrp_msg_t *notif_wrp_msg = NULL;
|
||||
ssize_t msg_len;
|
||||
void *msg_bytes;
|
||||
int ret = -1;
|
||||
char sourceStr[64] = {'\0'};
|
||||
char *device_id = NULL;
|
||||
size_t device_id_len = 0;
|
||||
int sendRetStatus = 1;
|
||||
char *errorMsg = NULL;
|
||||
int qos = 0;
|
||||
|
||||
notif_wrp_msg = (wrp_msg_t *)malloc(sizeof(wrp_msg_t));
|
||||
if(notif_wrp_msg != NULL)
|
||||
{
|
||||
memset(notif_wrp_msg, 0, sizeof(wrp_msg_t));
|
||||
notif_wrp_msg->msg_type = WRP_MSG_TYPE__EVENT;
|
||||
|
||||
ParodusPrint("msg->u.event.source: %s\n",msg->u.event.source);
|
||||
|
||||
if(msg->u.event.source !=NULL)
|
||||
{
|
||||
//To get device_id in the format "mac:112233445xxx"
|
||||
ret = getDeviceId(&device_id, &device_id_len);
|
||||
if(ret == 0)
|
||||
{
|
||||
ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len);
|
||||
snprintf(sourceStr, sizeof(sourceStr), "%s/%s", device_id, msg->u.event.source);
|
||||
ParodusPrint("sourceStr formed is %s\n" , sourceStr);
|
||||
notif_wrp_msg->u.event.source = strdup(sourceStr);
|
||||
ParodusInfo("source:%s\n", notif_wrp_msg->u.event.source);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to get device_id\n");
|
||||
}
|
||||
if(device_id != NULL)
|
||||
{
|
||||
free(device_id);
|
||||
device_id = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if(msg->u.event.dest != NULL)
|
||||
{
|
||||
notif_wrp_msg->u.event.dest = msg->u.event.dest;
|
||||
ParodusInfo("destination: %s\n", notif_wrp_msg->u.event.dest);
|
||||
}
|
||||
|
||||
if(msg->u.event.transaction_uuid != NULL)
|
||||
{
|
||||
notif_wrp_msg->u.event.transaction_uuid = msg->u.event.transaction_uuid;
|
||||
ParodusPrint("Notification transaction_uuid %s\n", notif_wrp_msg->u.event.transaction_uuid);
|
||||
}
|
||||
|
||||
if(msg->u.event.content_type != NULL)
|
||||
{
|
||||
if(strcmp(msg->u.event.content_type , "JSON") == 0)
|
||||
{
|
||||
notif_wrp_msg->u.event.content_type = strdup("application/json");
|
||||
}
|
||||
ParodusPrint("content_type is %s\n",notif_wrp_msg->u.event.content_type);
|
||||
}
|
||||
|
||||
if(msg->u.event.payload != NULL)
|
||||
{
|
||||
ParodusInfo("Notification payload: %s\n",msg->u.event.payload);
|
||||
notif_wrp_msg->u.event.payload = (void *)msg->u.event.payload;
|
||||
notif_wrp_msg->u.event.payload_size = msg->u.event.payload_size;
|
||||
ParodusPrint("payload size %lu\n", notif_wrp_msg->u.event.payload_size);
|
||||
}
|
||||
|
||||
if(msg->u.event.qos != 0)
|
||||
{
|
||||
notif_wrp_msg->u.event.qos = msg->u.event.qos;
|
||||
qos = notif_wrp_msg->u.event.qos;
|
||||
ParodusInfo("Notification qos: %d\n",notif_wrp_msg->u.event.qos);
|
||||
}
|
||||
msg_len = wrp_struct_to (notif_wrp_msg, WRP_BYTES, &msg_bytes);
|
||||
|
||||
ParodusPrint("Encoded xmidt wrp msg, msg_len %lu\n", msg_len);
|
||||
if(msg_len > 0)
|
||||
{
|
||||
ParodusPrint("sendUpstreamMsgToServer\n");
|
||||
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("wrp msg_len is zero\n");
|
||||
errorMsg = strdup("Wrp message encoding failed");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, WRP_ENCODE_FAILURE);
|
||||
xmidtQDequeue();
|
||||
|
||||
ParodusPrint("wrp_free_struct\n");
|
||||
if(notif_wrp_msg != NULL)
|
||||
{
|
||||
wrp_free_struct(notif_wrp_msg);
|
||||
}
|
||||
|
||||
if(msg_bytes != NULL)
|
||||
{
|
||||
free(msg_bytes);
|
||||
msg_bytes = NULL;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
while(sendRetStatus) //If SendMessage is failed condition
|
||||
{
|
||||
ParodusError("sendXmidtEventToServer is Failed\n");
|
||||
if(highQosValueCheck(qos))
|
||||
{
|
||||
ParodusPrint("The event is having high qos retry again\n");
|
||||
ParodusInfo("Wait till connection is Up\n");
|
||||
|
||||
pthread_mutex_lock(get_global_cloud_status_mut());
|
||||
pthread_cond_wait(get_global_cloud_status_cond(), get_global_cloud_status_mut());
|
||||
pthread_mutex_unlock(get_global_cloud_status_mut());
|
||||
ParodusInfo("Received cloud status signal proceed to retry\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
errorMsg = strdup("send failed due to client disconnect");
|
||||
ParodusInfo("The event is having low qos proceed to dequeue\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, CLIENT_DISCONNECT);
|
||||
xmidtQDequeue();
|
||||
break;
|
||||
}
|
||||
sendRetStatus = sendUpstreamMsgToServer(&msg_bytes, msg_len);
|
||||
}
|
||||
|
||||
if(sendRetStatus == 0)
|
||||
{
|
||||
errorMsg = strdup("send to server success");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, DELIVERED_SUCCESS);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
|
||||
ParodusPrint("B4 notif wrp_free_struct\n");
|
||||
if(notif_wrp_msg != NULL)
|
||||
{
|
||||
wrp_free_struct(notif_wrp_msg);
|
||||
}
|
||||
|
||||
if(msg_bytes != NULL)
|
||||
{
|
||||
free(msg_bytes);
|
||||
msg_bytes = NULL;
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
errorMsg = strdup("Memory allocation failed");
|
||||
ParodusError("Memory allocation failed\n");
|
||||
createOutParamsandSendAck(msg, asyncHandle, errorMsg, MSG_PROCESSING_FAILED);
|
||||
xmidtQDequeue();
|
||||
}
|
||||
|
||||
if(msg->u.event.source !=NULL)
|
||||
{
|
||||
free(msg->u.event.source);
|
||||
msg->u.event.source = NULL;
|
||||
}
|
||||
if(msg->u.event.content_type !=NULL)
|
||||
{
|
||||
free(msg->u.event.content_type);
|
||||
msg->u.event.content_type = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode)
|
||||
{
|
||||
rbusObject_t outParams;
|
||||
rbusError_t err;
|
||||
rbusValue_t value;
|
||||
char qosstring[20] = "";
|
||||
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, "event");
|
||||
rbusObject_Init(&outParams, NULL);
|
||||
rbusObject_SetValue(outParams, "msg_type", value);
|
||||
rbusValue_Release(value);
|
||||
|
||||
ParodusPrint("statuscode %d errorMsg %s\n", statuscode, errorMsg);
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetInt32(value, statuscode);
|
||||
rbusObject_SetValue(outParams, "status", value);
|
||||
rbusValue_Release(value);
|
||||
|
||||
if(errorMsg !=NULL)
|
||||
{
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, errorMsg);
|
||||
rbusObject_SetValue(outParams, "error_message", value);
|
||||
rbusValue_Release(value);
|
||||
free(errorMsg);
|
||||
}
|
||||
|
||||
if(msg != NULL)
|
||||
{
|
||||
if(msg->u.event.source !=NULL)
|
||||
{
|
||||
ParodusPrint("msg->u.event.source is %s\n", msg->u.event.source);
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, msg->u.event.source);
|
||||
rbusObject_SetValue(outParams, "source", value);
|
||||
rbusValue_Release(value);
|
||||
}
|
||||
|
||||
if(msg->u.event.dest !=NULL)
|
||||
{
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, msg->u.event.dest);
|
||||
rbusObject_SetValue(outParams, "dest", value);
|
||||
rbusValue_Release(value);
|
||||
}
|
||||
|
||||
if(msg->u.event.content_type !=NULL)
|
||||
{
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, msg->u.event.content_type);
|
||||
rbusObject_SetValue(outParams, "content_type", value);
|
||||
rbusValue_Release(value);
|
||||
}
|
||||
|
||||
rbusValue_Init(&value);
|
||||
snprintf(qosstring, sizeof(qosstring), "%d", msg->u.event.qos);
|
||||
ParodusPrint("qosstring is %s\n", qosstring);
|
||||
rbusValue_SetString(value, qosstring);
|
||||
rbusObject_SetValue(outParams, "qos", value);
|
||||
rbusValue_Release(value);
|
||||
|
||||
if(msg->u.event.transaction_uuid !=NULL)
|
||||
{
|
||||
rbusValue_Init(&value);
|
||||
rbusValue_SetString(value, msg->u.event.transaction_uuid);
|
||||
rbusObject_SetValue(outParams, "transaction_uuid", value);
|
||||
rbusValue_Release(value);
|
||||
ParodusPrint("outParams msg->u.event.transaction_uuid %s\n", msg->u.event.transaction_uuid);
|
||||
}
|
||||
}
|
||||
|
||||
if(outParams !=NULL)
|
||||
{
|
||||
//rbusObject_fwrite(outParams, 1, stdout);
|
||||
if(asyncHandle == NULL)
|
||||
{
|
||||
ParodusError("asyncHandle is NULL\n");
|
||||
return;
|
||||
}
|
||||
|
||||
err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_SUCCESS, outParams);
|
||||
//err = rbusMethod_SendAsyncResponse(asyncHandle, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION, outParams); //for negative case
|
||||
|
||||
if(err != RBUS_ERROR_SUCCESS)
|
||||
{
|
||||
ParodusError("rbusMethod_SendAsyncResponse failed err: %d\n", err);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("rbusMethod_SendAsyncResponse success: %d\n", err);
|
||||
}
|
||||
|
||||
rbusObject_Release(outParams);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to create outParams\n");
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* @brief This function handles check method call from t2 before the actual inParams SET and to not proceed with inParams processing.
|
||||
*/
|
||||
int checkInputParameters(rbusObject_t inParams)
|
||||
{
|
||||
rbusValue_t check = rbusObject_GetValue(inParams, "check");
|
||||
if(check)
|
||||
{
|
||||
ParodusPrint("Rbus check method. Not proceeding to process this inparam\n");
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
//To generate unique transaction id for xmidt send requests
|
||||
char* generate_transaction_uuid()
|
||||
{
|
||||
char *transID = NULL;
|
||||
uuid_t transaction_Id;
|
||||
char *trans_id = NULL;
|
||||
trans_id = (char *)malloc(37);
|
||||
uuid_generate_random(transaction_Id);
|
||||
uuid_unparse(transaction_Id, trans_id);
|
||||
|
||||
if(trans_id !=NULL)
|
||||
{
|
||||
transID = trans_id;
|
||||
}
|
||||
return transID;
|
||||
}
|
||||
|
||||
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg)
|
||||
{
|
||||
char *msg_typeStr = NULL;
|
||||
char *sourceVal = NULL;
|
||||
char *destStr = NULL, *contenttypeStr = NULL;
|
||||
char *payloadStr = NULL, *qosVal = NULL;
|
||||
unsigned int payloadlength = 0;
|
||||
wrp_msg_t *msg = NULL;
|
||||
|
||||
msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) );
|
||||
if(msg == NULL)
|
||||
{
|
||||
ParodusError("Wrp msg allocation failed\n");
|
||||
return;
|
||||
}
|
||||
|
||||
memset( msg, 0, sizeof( wrp_msg_t ) );
|
||||
|
||||
rbusValue_t msg_type = rbusObject_GetValue(inParams, "msg_type");
|
||||
if(msg_type)
|
||||
{
|
||||
if(rbusValue_GetType(msg_type) == RBUS_STRING)
|
||||
{
|
||||
msg_typeStr = (char *) rbusValue_GetString(msg_type, NULL);
|
||||
ParodusPrint("msg_type value received is %s\n", msg_typeStr);
|
||||
if(msg_typeStr !=NULL)
|
||||
{
|
||||
if(strcmp(msg_typeStr, "event") ==0)
|
||||
{
|
||||
msg->msg_type = WRP_MSG_TYPE__EVENT;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("msg_type received is not event : %s\n", msg_typeStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("msg_type is empty\n");
|
||||
}
|
||||
|
||||
rbusValue_t source = rbusObject_GetValue(inParams, "source");
|
||||
if(source)
|
||||
{
|
||||
if(rbusValue_GetType(source) == RBUS_STRING)
|
||||
{
|
||||
sourceVal = (char *)rbusValue_GetString(source, NULL);
|
||||
if(sourceVal !=NULL)
|
||||
{
|
||||
ParodusInfo("source value received is %s\n", sourceVal);
|
||||
msg->u.event.source = strdup(sourceVal);
|
||||
}
|
||||
ParodusPrint("msg->u.event.source is %s\n", msg->u.event.source);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("source is empty\n");
|
||||
}
|
||||
|
||||
rbusValue_t dest = rbusObject_GetValue(inParams, "dest");
|
||||
if(dest)
|
||||
{
|
||||
if(rbusValue_GetType(dest) == RBUS_STRING)
|
||||
{
|
||||
destStr = (char *)rbusValue_GetString(dest, NULL);
|
||||
if(destStr !=NULL)
|
||||
{
|
||||
ParodusPrint("dest value received is %s\n", destStr);
|
||||
msg->u.event.dest = strdup(destStr);
|
||||
ParodusPrint("msg->u.event.dest is %s\n", msg->u.event.dest);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("dest is empty\n");
|
||||
}
|
||||
|
||||
rbusValue_t contenttype = rbusObject_GetValue(inParams, "content_type");
|
||||
if(contenttype)
|
||||
{
|
||||
if(rbusValue_GetType(contenttype) == RBUS_STRING)
|
||||
{
|
||||
contenttypeStr = (char *)rbusValue_GetString(contenttype, NULL);
|
||||
if(contenttypeStr !=NULL)
|
||||
{
|
||||
ParodusPrint("contenttype value received is %s\n", contenttypeStr);
|
||||
msg->u.event.content_type = strdup(contenttypeStr);
|
||||
ParodusPrint("msg->u.event.content_type is %s\n", msg->u.event.content_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("contenttype is empty\n");
|
||||
}
|
||||
|
||||
rbusValue_t payload = rbusObject_GetValue(inParams, "payload");
|
||||
if(payload)
|
||||
{
|
||||
if((rbusValue_GetType(payload) == RBUS_STRING))
|
||||
{
|
||||
payloadStr = (char *)rbusValue_GetString(payload, NULL);
|
||||
if(payloadStr !=NULL)
|
||||
{
|
||||
ParodusPrint("payload received is %s\n", payloadStr);
|
||||
msg->u.event.payload = strdup(payloadStr);
|
||||
ParodusPrint("msg->u.event.payload is %s\n", msg->u.event.payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("payloadStr is empty\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("payload is empty\n");
|
||||
}
|
||||
|
||||
rbusValue_t payloadlen = rbusObject_GetValue(inParams, "payloadlen");
|
||||
if(payloadlen)
|
||||
{
|
||||
if(rbusValue_GetType(payloadlen) == RBUS_INT32)
|
||||
{
|
||||
ParodusPrint("payloadlen type %d RBUS_INT32 %d\n", rbusValue_GetType(payloadlen), RBUS_INT32);
|
||||
payloadlength = rbusValue_GetInt32(payloadlen);
|
||||
ParodusPrint("payloadlen received is %lu\n", payloadlength);
|
||||
msg->u.event.payload_size = (size_t) payloadlength;
|
||||
ParodusPrint("msg->u.event.payload_size is %lu\n", msg->u.event.payload_size);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("payloadlen is empty\n");
|
||||
}
|
||||
|
||||
ParodusPrint("check qos\n");
|
||||
rbusValue_t qos = rbusObject_GetValue(inParams, "qos");
|
||||
if(qos)
|
||||
{
|
||||
if(rbusValue_GetType(qos) == RBUS_STRING)
|
||||
{
|
||||
ParodusPrint("qos type %d RBUS_STRING %d\n", rbusValue_GetType(qos), RBUS_STRING);
|
||||
qosVal = (char *)rbusValue_GetString(qos, NULL);
|
||||
ParodusPrint("qos received is %s\n", qosVal);
|
||||
if(qosVal !=NULL)
|
||||
{
|
||||
msg->u.event.qos = atoi(qosVal);
|
||||
ParodusPrint("msg->u.event.qos is %d\n", msg->u.event.qos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(trans_id !=NULL)
|
||||
{
|
||||
ParodusPrint("Add trans_id %s to wrp message\n", trans_id);
|
||||
msg->u.event.transaction_uuid = strdup(trans_id);
|
||||
free(trans_id);
|
||||
trans_id = NULL;
|
||||
ParodusPrint("msg->u.event.transaction_uuid is %s\n", msg->u.event.transaction_uuid);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("transaction_uuid is empty\n");
|
||||
}
|
||||
|
||||
*eventMsg = msg;
|
||||
ParodusPrint("parseRbusInparamsToWrp End\n");
|
||||
}
|
||||
|
||||
static rbusError_t sendDataHandler(rbusHandle_t handle, char const* methodName, rbusObject_t inParams, rbusObject_t outParams, rbusMethodAsyncHandle_t asyncHandle)
|
||||
{
|
||||
(void) handle;
|
||||
(void) outParams;
|
||||
int inStatus = 0;
|
||||
char *transaction_uuid = NULL;
|
||||
wrp_msg_t *wrpMsg= NULL;
|
||||
ParodusInfo("methodHandler called: %s\n", methodName);
|
||||
//printRBUSParams(inParams, INPARAMS_PATH);
|
||||
|
||||
if((methodName !=NULL) && (strcmp(methodName, XMIDT_SEND_METHOD) == 0))
|
||||
{
|
||||
inStatus = checkInputParameters(inParams);
|
||||
if(inStatus)
|
||||
{
|
||||
//generate transaction id to create outParams and send ack
|
||||
transaction_uuid = generate_transaction_uuid();
|
||||
ParodusInfo("xmidt transaction_uuid generated is %s\n", transaction_uuid);
|
||||
parseRbusInparamsToWrp(inParams, transaction_uuid, &wrpMsg);
|
||||
|
||||
//xmidt send producer
|
||||
addToXmidtUpstreamQ(wrpMsg, asyncHandle);
|
||||
ParodusInfo("sendDataHandler returned %d\n", RBUS_ERROR_ASYNC_RESPONSE);
|
||||
return RBUS_ERROR_ASYNC_RESPONSE;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusPrint("check method call received, ignoring input\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Method %s received is not supported\n", methodName);
|
||||
return RBUS_ERROR_BUS_ERROR;
|
||||
}
|
||||
ParodusPrint("send RBUS_ERROR_SUCCESS\n");
|
||||
return RBUS_ERROR_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int regXmidtSendDataMethod()
|
||||
{
|
||||
int rc = RBUS_ERROR_SUCCESS;
|
||||
rbusDataElement_t dataElements[1] = { { XMIDT_SEND_METHOD, RBUS_ELEMENT_TYPE_METHOD, { NULL, NULL, NULL, NULL, NULL, sendDataHandler } } };
|
||||
|
||||
rbusHandle_t rbus_handle = get_parodus_rbus_Handle();
|
||||
|
||||
ParodusPrint("Registering xmidt sendData method %s\n", XMIDT_SEND_METHOD);
|
||||
if(!rbus_handle)
|
||||
{
|
||||
ParodusError("regXmidtSendDataMethod failed as rbus_handle is empty\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
rc = rbus_regDataElements(rbus_handle, 1, dataElements);
|
||||
|
||||
if(rc != RBUS_ERROR_SUCCESS)
|
||||
{
|
||||
ParodusError("Register xmidt sendData method failed: %d\n", rc);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("Register xmidt sendData method %s success\n", XMIDT_SEND_METHOD);
|
||||
|
||||
//start xmidt queue consumer thread .
|
||||
processXmidtData();
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
//To print and store params output to a file
|
||||
void printRBUSParams(rbusObject_t params, char* file_path)
|
||||
{
|
||||
if( NULL != params )
|
||||
{
|
||||
FILE *fd = fopen(file_path, "w+");
|
||||
rbusObject_fwrite(params, 1, fd);
|
||||
fclose(fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Params is NULL\n");
|
||||
}
|
||||
}
|
||||
85
src/xmidtsend_rbus.h
Normal file
85
src/xmidtsend_rbus.h
Normal file
@@ -0,0 +1,85 @@
|
||||
/**
|
||||
* Copyright 2022 Comcast Cable Communications Management, LLC
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
/**
|
||||
* @file xmidtsend_rbus.h
|
||||
*
|
||||
* @description This header defines functions required to manage xmidt send messages via rbus.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef _XMIDTSEND_RBUS_H_
|
||||
#define _XMIDTSEND_RBUS_H_
|
||||
#include <rbus.h>
|
||||
#include <uuid/uuid.h>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define XMIDT_SEND_METHOD "Device.X_RDK_Xmidt.SendData"
|
||||
#define MAX_QUEUE_SIZE 10
|
||||
#define INPARAMS_PATH "/tmp/inparams.txt"
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Data Structures */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
typedef struct XmidtMsg__
|
||||
{
|
||||
wrp_msg_t *msg;
|
||||
rbusMethodAsyncHandle_t asyncHandle;
|
||||
struct XmidtMsg__ *next;
|
||||
} XmidtMsg;
|
||||
|
||||
typedef enum
|
||||
{
|
||||
DELIVERED_SUCCESS = 0,
|
||||
INVALID_MSG_TYPE,
|
||||
MISSING_SOURCE,
|
||||
MISSING_DEST,
|
||||
MISSING_CONTENT_TYPE,
|
||||
MISSING_PAYLOAD,
|
||||
MISSING_PAYLOADLEN,
|
||||
QUEUE_SIZE_EXCEEDED,
|
||||
WRP_ENCODE_FAILURE,
|
||||
MSG_PROCESSING_FAILED,
|
||||
ENQUEUE_FAILURE = 100,
|
||||
CLIENT_DISCONNECT = 101
|
||||
} XMIDT_STATUS;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
rbusHandle_t get_parodus_rbus_Handle(void);
|
||||
void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
void* processXmidtUpstreamMsg();
|
||||
void processXmidtData();
|
||||
int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
int checkInputParameters(rbusObject_t inParams);
|
||||
char* generate_transaction_uuid();
|
||||
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg);
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode);
|
||||
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode);
|
||||
void xmidtQDequeue();
|
||||
bool highQosValueCheck(int qos);
|
||||
void waitTillConnectionIsUp();
|
||||
void printRBUSParams(rbusObject_t params, char* file_path);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#endif /* _XMIDTSEND_RBUS_H_ */
|
||||
|
||||
@@ -160,7 +160,7 @@ set(CLIST_SRC ${CLIST_SRC} ../src/seshat_interface_stub.c)
|
||||
endif (ENABLE_SESHAT)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set(CLIST_SRC ${CLIST_SRC} ../src/upstream_rbus.c)
|
||||
set(CLIST_SRC ${CLIST_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
add_executable(test_client_list ${CLIST_SRC})
|
||||
@@ -181,7 +181,7 @@ set(SVA_SRC ${SVA_SRC} ../src/seshat_interface_stub.c)
|
||||
endif (ENABLE_SESHAT)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set(SVA_SRC ${SVA_SRC} ../src/upstream_rbus.c)
|
||||
set(SVA_SRC ${SVA_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
add_executable(test_service_alive ${SVA_SRC})
|
||||
@@ -307,7 +307,7 @@ else()
|
||||
set(CONIFC_SRC ${CONIFC_SRC} ../src/seshat_interface_stub.c)
|
||||
endif (ENABLE_SESHAT)
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set(CONIFC_SRC ${CONIFC_SRC} ../src/upstream_rbus.c)
|
||||
set(CONIFC_SRC ${CONIFC_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
add_executable(test_conn_interface ${CONIFC_SRC})
|
||||
target_link_libraries (test_conn_interface -lcmocka ${PARODUS_COMMON_LIBS} -lcurl -luuid )
|
||||
@@ -356,7 +356,7 @@ set(TOKEN_SRC test_token_stub.c ${TOKEN_SRC})
|
||||
endif (FEATURE_DNS_QUERY)
|
||||
|
||||
if (ENABLE_WEBCFGBIN)
|
||||
set(TOKEN_SRC ${TOKEN_SRC} ../src/upstream_rbus.c)
|
||||
set(TOKEN_SRC ${TOKEN_SRC} ../src/upstream_rbus.c ../src/xmidtsend_rbus.c)
|
||||
endif (ENABLE_WEBCFGBIN)
|
||||
|
||||
add_executable(test_token ${TOKEN_SRC} )
|
||||
|
||||
@@ -92,7 +92,7 @@ void test_set_parodus_cfg()
|
||||
CU_ASSERT_STRING_EQUAL(cfg.hw_last_reboot_reason,get_parodus_cfg()->hw_last_reboot_reason);
|
||||
CU_ASSERT_STRING_EQUAL(cfg.fw_name,get_parodus_cfg()->fw_name);
|
||||
CU_ASSERT_STRING_EQUAL(cfg.webpa_url, get_parodus_cfg()->webpa_url);
|
||||
CU_ASSERT_STRING_EQUAL(cfg.webpa_interface_used , get_parodus_cfg()->webpa_interface_used);
|
||||
CU_ASSERT_STRING_EQUAL(cfg.webpa_interface_used , getWebpaInterface());
|
||||
CU_ASSERT_STRING_EQUAL(cfg.webpa_protocol, get_parodus_cfg()->webpa_protocol);
|
||||
CU_ASSERT_EQUAL(cfg.boot_time, get_parodus_cfg()->boot_time);
|
||||
CU_ASSERT_EQUAL(cfg.webpa_ping_timeout, get_parodus_cfg()->webpa_ping_timeout);
|
||||
@@ -113,7 +113,7 @@ void test_getWebpaConveyHeader()
|
||||
CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->hw_manufacturer, cJSON_GetObjectItem(payload, HW_MANUFACTURER)->valuestring);
|
||||
CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->hw_last_reboot_reason, cJSON_GetObjectItem(payload, HW_LAST_REBOOT_REASON)->valuestring);
|
||||
CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->fw_name, cJSON_GetObjectItem(payload, FIRMWARE_NAME)->valuestring);
|
||||
CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->webpa_interface_used, cJSON_GetObjectItem(payload, WEBPA_INTERFACE)->valuestring);
|
||||
CU_ASSERT_STRING_EQUAL(getWebpaInterface(), cJSON_GetObjectItem(payload, WEBPA_INTERFACE)->valuestring);
|
||||
CU_ASSERT_STRING_EQUAL(get_parodus_cfg()->webpa_protocol, cJSON_GetObjectItem(payload, WEBPA_PROTOCOL)->valuestring);
|
||||
CU_ASSERT_EQUAL((int)get_parodus_cfg()->boot_time, cJSON_GetObjectItem(payload, BOOT_TIME)->valueint);
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include "../src/ParodusInternal.h"
|
||||
|
||||
extern int parse_mac_address (char *target, const char *arg);
|
||||
extern int parse_serial_num(char *target, const char *arg);
|
||||
extern int server_is_http (const char *full_url,
|
||||
const char **server_ptr);
|
||||
extern int parse_webpa_url__(const char *full_url,
|
||||
@@ -104,7 +103,7 @@ void test_setParodusConfig()
|
||||
assert_string_equal(cfg.hw_last_reboot_reason, temp->hw_last_reboot_reason);
|
||||
assert_string_equal(cfg.webpa_path_url, temp->webpa_path_url);
|
||||
assert_string_equal(cfg.webpa_url, temp->webpa_url);
|
||||
assert_string_equal(cfg.webpa_interface_used, temp->webpa_interface_used);
|
||||
assert_string_equal(cfg.webpa_interface_used, getWebpaInterface());
|
||||
assert_string_equal(cfg.webpa_protocol, temp->webpa_protocol);
|
||||
assert_string_equal(cfg.webpa_uuid, temp->webpa_uuid);
|
||||
assert_string_equal(cfg.partner_id, temp->partner_id);
|
||||
@@ -209,13 +208,11 @@ void test_parseCommandLine()
|
||||
|
||||
ParodusCfg parodusCfg;
|
||||
memset(&parodusCfg,0,sizeof(parodusCfg));
|
||||
|
||||
#ifdef FEATURE_DNS_QUERY
|
||||
write_key_to_file ("../../tests/jwt_key.tst", jwt_key);
|
||||
#endif
|
||||
create_token_script("/tmp/token.sh");
|
||||
assert_int_equal (parseCommandLine(argc,command,&parodusCfg), 0);
|
||||
|
||||
assert_string_equal( parodusCfg.hw_model, "TG1682");
|
||||
assert_string_equal( parodusCfg.hw_serial_number, "Fer23u948590");
|
||||
assert_string_equal( parodusCfg.hw_manufacturer, "ARRISGroup,Inc.");
|
||||
@@ -470,14 +467,6 @@ void test_parse_mac_address ()
|
||||
assert_int_equal (parse_mac_address (result, ""), -1);
|
||||
}
|
||||
|
||||
void test_parse_serial_num()
|
||||
{
|
||||
char result[14];
|
||||
assert_int_equal (parse_serial_num (result, "1234ABC00ab"), 0);
|
||||
assert_int_equal (parse_serial_num (result, "$@@"), 0);
|
||||
assert_int_equal (parse_serial_num (result, ""), 0);
|
||||
}
|
||||
|
||||
void test_server_is_http ()
|
||||
{
|
||||
const char *server_ptr;
|
||||
@@ -598,7 +587,6 @@ int main(void)
|
||||
cmocka_unit_test(err_loadParodusCfg),
|
||||
cmocka_unit_test(test_parse_num_arg),
|
||||
cmocka_unit_test(test_parse_mac_address),
|
||||
cmocka_unit_test(test_parse_serial_num),
|
||||
cmocka_unit_test(test_get_algo_mask),
|
||||
cmocka_unit_test(test_server_is_http),
|
||||
cmocka_unit_test(test_parse_webpa_url__),
|
||||
|
||||
@@ -103,6 +103,11 @@ void set_global_shutdown_reason(char *reason)
|
||||
UNUSED(reason);
|
||||
}
|
||||
|
||||
int getDeviceId(char **device_id, size_t *device_id_len)
|
||||
{
|
||||
UNUSED(device_id); UNUSED(device_id_len);
|
||||
return 0;
|
||||
}
|
||||
void start_conn_in_progress (unsigned long start_time)
|
||||
{
|
||||
UNUSED(start_time);
|
||||
@@ -178,10 +183,10 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
int sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
{
|
||||
UNUSED(resp_bytes); UNUSED(resp_size);
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int nopoll_loop_wait(noPollCtx * ctx,long timeout)
|
||||
|
||||
@@ -341,7 +341,7 @@ void test_createConnection()
|
||||
|
||||
int ret = createNopollConnection(ctx);
|
||||
assert_int_equal(ret, nopoll_true);
|
||||
assert_string_equal(get_parodus_cfg()->cloud_status, CLOUD_STATUS_ONLINE);
|
||||
assert_string_equal(get_cloud_status(), CLOUD_STATUS_ONLINE);
|
||||
free(cfg);
|
||||
if (g_jwt_server_ip !=NULL)
|
||||
{
|
||||
|
||||
@@ -125,6 +125,10 @@ int processCrudRequest(wrp_msg_t *reqMsg, wrp_msg_t **responseMsg )
|
||||
return (int)mock();
|
||||
}
|
||||
|
||||
char* getWebpaInterface(void)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Tests */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -1530,7 +1530,7 @@ void test_retrieveObject_cloud_status()
|
||||
ret = retrieveObject(reqMsg, &respMsg);
|
||||
assert_int_equal (respMsg->u.crud.status, 200);
|
||||
assert_int_equal (ret, 0);
|
||||
assert_string_equal(get_parodus_cfg()->cloud_status, CLOUD_STATUS_ONLINE);
|
||||
assert_string_equal(get_cloud_status(), CLOUD_STATUS_ONLINE);
|
||||
assert_int_equal (respMsg->u.crud.payload_size, 25);
|
||||
|
||||
fp = fopen(cfg.crud_config_file, "r");
|
||||
|
||||
@@ -131,6 +131,11 @@ bool get_interface_down_event()
|
||||
return false;
|
||||
}
|
||||
|
||||
char *get_cloud_status(void)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Tests */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -44,10 +44,12 @@ extern size_t metaPackSize;
|
||||
extern UpStreamMsg *UpStreamMsgQ;
|
||||
int numLoops = 1;
|
||||
int deviceIDNull =0;
|
||||
char webpa_interface[64]={'\0'};
|
||||
wrp_msg_t *temp = NULL;
|
||||
extern pthread_mutex_t nano_mut;
|
||||
extern pthread_cond_t nano_con;
|
||||
static int crud_test = 0;
|
||||
pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -116,6 +118,27 @@ ParodusCfg *get_parodus_cfg(void)
|
||||
return &parodusCfg;
|
||||
}
|
||||
|
||||
char *getWebpaInterface(void)
|
||||
{
|
||||
ParodusCfg cfg;
|
||||
memset(&cfg,0,sizeof(cfg));
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
parStrncpy(cfg.webpa_interface_used , "wl0", sizeof(cfg.webpa_interface_used));
|
||||
#else
|
||||
parStrncpy(cfg.webpa_interface_used , "eth0", sizeof(cfg.webpa_interface_used));
|
||||
#endif
|
||||
set_parodus_cfg(&cfg);
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n");
|
||||
pthread_mutex_lock (&config_mut);
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
pthread_mutex_unlock (&config_mut);
|
||||
#else
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
#endif
|
||||
return webpa_interface;
|
||||
}
|
||||
|
||||
ssize_t wrp_pack_metadata( const data_t *packData, void **data )
|
||||
{
|
||||
(void) packData; (void) data;
|
||||
|
||||
@@ -46,10 +46,11 @@ extern size_t metaPackSize;
|
||||
extern UpStreamMsg *UpStreamMsgQ;
|
||||
int numLoops = 1;
|
||||
int deviceIDNull =0;
|
||||
char webpa_interface[64]={'\0'};
|
||||
wrp_msg_t *reg_msg = NULL;
|
||||
extern pthread_mutex_t nano_mut;
|
||||
extern pthread_cond_t nano_con;
|
||||
|
||||
pthread_mutex_t config_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -101,6 +102,27 @@ ParodusCfg *get_parodus_cfg(void)
|
||||
return &parodusCfg;
|
||||
}
|
||||
|
||||
char *getWebpaInterface(void)
|
||||
{
|
||||
ParodusCfg cfg;
|
||||
memset(&cfg,0,sizeof(cfg));
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
parStrncpy(cfg.webpa_interface_used , "wl0", sizeof(cfg.webpa_interface_used));
|
||||
#else
|
||||
parStrncpy(cfg.webpa_interface_used , "eth0", sizeof(cfg.webpa_interface_used));
|
||||
#endif
|
||||
set_parodus_cfg(&cfg);
|
||||
#ifdef WAN_FAILOVER_SUPPORTED
|
||||
ParodusPrint("WAN_FAILOVER_SUPPORTED mode \n");
|
||||
pthread_mutex_lock (&config_mut);
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
pthread_mutex_unlock (&config_mut);
|
||||
#else
|
||||
parStrncpy(webpa_interface, get_parodus_cfg()->webpa_interface_used, sizeof(webpa_interface));
|
||||
#endif
|
||||
return webpa_interface;
|
||||
}
|
||||
|
||||
/*-------------------------------------------
|
||||
int nn_connect (int s, const char *addr)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user