mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-27 18:20:04 +00:00
Compare commits
69 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
90d6d6d9d5 | ||
|
|
df67b95244 | ||
|
|
e6e37053d3 | ||
|
|
d5a9ff36f6 | ||
|
|
f4536b778e | ||
|
|
b65b3ab3b0 | ||
|
|
954080bbd1 | ||
|
|
c8a2c95ae4 | ||
|
|
59bba50bee | ||
|
|
3090e50b02 | ||
|
|
af4eeb9199 | ||
|
|
2dd392dac6 | ||
|
|
7eae3da3f2 | ||
|
|
6e5ecc87f9 | ||
|
|
893c1c7547 | ||
|
|
3e30328738 | ||
|
|
f7412f9710 | ||
|
|
2e11d7e3ba | ||
|
|
5ddac7dc47 | ||
|
|
79d8a84c2e | ||
|
|
6f7332d054 | ||
|
|
26d409d884 | ||
|
|
d3e758ccb4 | ||
|
|
4af1aaf418 | ||
|
|
8e00ec8e5e | ||
|
|
43f55f9a99 | ||
|
|
bf3ee65360 | ||
|
|
9ebe011cb0 | ||
|
|
2bfc0a3c32 | ||
|
|
37bb19eeaa | ||
|
|
2c4980cf9e | ||
|
|
2248f31658 | ||
|
|
b97643ec42 | ||
|
|
66b16aa58f | ||
|
|
d3a56df184 | ||
|
|
8b5346e69b | ||
|
|
2a28e5d3e8 | ||
|
|
33af3f3eec | ||
|
|
c9b34d3c3f | ||
|
|
4add1c07ca | ||
|
|
75c833e3a8 | ||
|
|
5359fc79cf | ||
|
|
623192e301 | ||
|
|
b6490ba5b3 | ||
|
|
20ccd81086 | ||
|
|
f9ac95642c | ||
|
|
ae5d027612 | ||
|
|
e139a2d6b6 | ||
|
|
f655d6382b | ||
|
|
267e0eb796 | ||
|
|
00251153a4 | ||
|
|
d328a56e1a | ||
|
|
1a169561e4 | ||
|
|
8019489b86 | ||
|
|
ee3e006908 | ||
|
|
797c9de17f | ||
|
|
0bf4d754e5 | ||
|
|
3d98b4a80b | ||
|
|
80a50dead6 | ||
|
|
dd69d5ec5d | ||
|
|
1b23de5a12 | ||
|
|
8ea342b537 | ||
|
|
124fc11232 | ||
|
|
9abb37a005 | ||
|
|
c0edaaf31c | ||
|
|
32138613bb | ||
|
|
4cb167e8dd | ||
|
|
454ec26065 | ||
|
|
47af275929 |
6
.github/workflows/push.yml
vendored
6
.github/workflows/push.yml
vendored
@@ -39,9 +39,9 @@ jobs:
|
||||
run: mkdir build
|
||||
|
||||
- name: Get Sonarcloud Binaries
|
||||
working-directory: build
|
||||
run: |
|
||||
../.github/scripts/get_sonarcloud.sh
|
||||
uses: xmidt-org/sonarcloud-installer-action@v1
|
||||
with:
|
||||
working-directory: build
|
||||
|
||||
- name: CMake
|
||||
working-directory: build
|
||||
|
||||
@@ -173,7 +173,7 @@ ExternalProject_Add(wrp-c
|
||||
DEPENDS trower-base64 msgpack cimplog
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/wrp-c
|
||||
GIT_REPOSITORY https://github.com/xmidt-org/wrp-c.git
|
||||
GIT_TAG "71f8a39fe39f98da007ed4cdabbb192be1da1685"
|
||||
GIT_TAG "b5ef4d10cb39905908788bc89ab3e4dab201db8a"
|
||||
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR}
|
||||
-DMSGPACK_ENABLE_CXX=OFF
|
||||
-DMSGPACK_BUILD_EXAMPLES=OFF
|
||||
|
||||
@@ -188,7 +188,9 @@ pthread_mutex_t *get_interface_down_mut();
|
||||
pthread_cond_t *get_global_cloud_status_cond(void);
|
||||
|
||||
pthread_mutex_t *get_global_cloud_status_mut(void);
|
||||
|
||||
|
||||
int cloud_status_is_online (void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
55
src/config.c
55
src/config.c
@@ -208,6 +208,38 @@ int parse_mac_address (char *target, const char *arg)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int parse_serial_num(char *target, const char *arg)
|
||||
{
|
||||
char ch;
|
||||
if(arg != NULL)
|
||||
{
|
||||
if(strlen(arg) == 0)
|
||||
{
|
||||
ParodusError("Empty serial number, setting to default unknown\n");
|
||||
strcpy(target,"unknown");
|
||||
}
|
||||
for(int i=0; (ch = arg[i]) != '\0'; i++)
|
||||
{
|
||||
// check if character is ascii, a-z --> 97 to 122, A-Z --> 65 to 90, digits(0 to 9) --> 48 to 57
|
||||
if((ch >= 97 && ch <= 122) || (ch >= 65 && ch <= 90) || (ch >=48 && ch <= 57))
|
||||
{
|
||||
target[i] = ch;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Invalid serial number, setting to default unknown\n");
|
||||
strcpy(target,"unknown");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("serial number argument is NULL\n");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int server_is_http (const char *full_url,
|
||||
const char **server_ptr)
|
||||
{
|
||||
@@ -400,6 +432,9 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
{"webpa-backoff-max", required_argument, 0, 'o'},
|
||||
{"webpa-interface-used", required_argument, 0, 'i'},
|
||||
{"parodus-local-url", required_argument, 0, 'l'},
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
{"max-queue-size", required_argument, 0, 'q'},
|
||||
#endif
|
||||
{"partner-id", required_argument, 0, 'p'},
|
||||
#ifdef ENABLE_SESHAT
|
||||
{"seshat-url", required_argument, 0, 'e'},
|
||||
@@ -447,7 +482,7 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
/* getopt_long stores the option index here. */
|
||||
int option_index = 0;
|
||||
c = getopt_long (argc, argv,
|
||||
"m:s:f:d:r:n:b:u:t:o:i:l:p:e:D:j:a:k:c:T:w:J:46:C:S:R:K:M",
|
||||
"m:s:f:d:r:n:b:u:t:o:i:l:q:p:e:D:j:a:k:c:T:w:J:46:C:S:R:K:M",
|
||||
long_options, &option_index);
|
||||
|
||||
/* Detect the end of the options. */
|
||||
@@ -462,8 +497,8 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
break;
|
||||
|
||||
case 's':
|
||||
parStrncpy(cfg->hw_serial_number,optarg,sizeof(cfg->hw_serial_number));
|
||||
ParodusInfo("hw_serial_number is %s\n",cfg->hw_serial_number);
|
||||
if(parse_serial_num(cfg->hw_serial_number, optarg) == 0)
|
||||
ParodusInfo ("hw_serial-number is %s\n",cfg->hw_serial_number);
|
||||
break;
|
||||
|
||||
case 'f':
|
||||
@@ -533,6 +568,16 @@ int parseCommandLine(int argc,char **argv,ParodusCfg * cfg)
|
||||
parStrncpy(cfg->local_url, optarg,sizeof(cfg->local_url));
|
||||
ParodusInfo("parodus local_url is %s\n",cfg->local_url);
|
||||
break;
|
||||
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
case 'q':
|
||||
cfg->max_queue_size = parse_num_arg (optarg, "max-queue-size");
|
||||
if (cfg->max_queue_size == (unsigned int) -1)
|
||||
return -1;
|
||||
ParodusInfo("max_queue_size is %d\n",cfg->max_queue_size);
|
||||
break;
|
||||
#endif
|
||||
|
||||
case 'D':
|
||||
// like 'fabric' or 'test'
|
||||
// this parameter is used, along with the hw_mac parameter
|
||||
@@ -837,7 +882,9 @@ void loadParodusCfg(ParodusCfg * config,ParodusCfg *cfg)
|
||||
parStrncpy(cfg->cert_path, "\0", sizeof(cfg->cert_path));
|
||||
ParodusPrint("cert_path is NULL. set to empty\n");
|
||||
}
|
||||
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
cfg->max_queue_size = config->max_queue_size;
|
||||
#endif
|
||||
cfg->boot_time = config->boot_time;
|
||||
cfg->webpa_ping_timeout = config->webpa_ping_timeout;
|
||||
cfg->webpa_backoff_max = config->webpa_backoff_max;
|
||||
|
||||
@@ -88,6 +88,9 @@ typedef struct
|
||||
char webpa_uuid[64];
|
||||
unsigned int flags;
|
||||
char local_url[124];
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
unsigned int max_queue_size;
|
||||
#endif
|
||||
char partner_id[64];
|
||||
#ifdef ENABLE_SESHAT
|
||||
char seshat_url[128];
|
||||
|
||||
@@ -239,7 +239,7 @@ void init_backoff_timer (backoff_timer_t *timer, int max_count)
|
||||
timer->count = 1;
|
||||
timer->max_count = max_count;
|
||||
timer->delay = 1;
|
||||
clock_gettime (CLOCK_REALTIME, &timer->ts);
|
||||
clock_gettime (CLOCK_MONOTONIC, &timer->ts);
|
||||
timer->start_time = time(NULL);
|
||||
}
|
||||
|
||||
@@ -325,8 +325,14 @@ static int backoff_delay (backoff_timer_t *timer)
|
||||
struct timespec ts;
|
||||
int rtn;
|
||||
|
||||
pthread_condattr_t backoff_delay_cond_attr;
|
||||
|
||||
pthread_condattr_init (&backoff_delay_cond_attr);
|
||||
pthread_condattr_setclock (&backoff_delay_cond_attr, CLOCK_MONOTONIC);
|
||||
pthread_cond_init (&backoff_delay_con, &backoff_delay_cond_attr);
|
||||
|
||||
// periodically update the health file.
|
||||
clock_gettime (CLOCK_REALTIME, &ts);
|
||||
clock_gettime (CLOCK_MONOTONIC, &ts);
|
||||
if ((ts.tv_sec - timer->ts.tv_sec) >= UPDATE_HEALTH_FILE_INTERVAL_SECS) {
|
||||
start_conn_in_progress (timer->start_time);
|
||||
timer->ts.tv_sec += UPDATE_HEALTH_FILE_INTERVAL_SECS;
|
||||
@@ -339,6 +345,8 @@ static int backoff_delay (backoff_timer_t *timer)
|
||||
rtn = pthread_cond_timedwait (&backoff_delay_con, &backoff_delay_mut, &ts);
|
||||
pthread_mutex_unlock (&backoff_delay_mut);
|
||||
|
||||
pthread_condattr_destroy(&backoff_delay_cond_attr);
|
||||
|
||||
if (g_shutdown)
|
||||
return BACKOFF_SHUTDOWN;
|
||||
if ((rtn != 0) && (rtn != ETIMEDOUT)) {
|
||||
@@ -775,8 +783,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
if(conn_ctx.current_server->allow_insecure <= 0)
|
||||
if(conn_ctx.current_server != NULL && conn_ctx.current_server->allow_insecure <= 0)
|
||||
{
|
||||
ParodusInfo("Connected to server over SSL\n");
|
||||
OnboardLog("Connected to server over SSL\n");
|
||||
|
||||
@@ -27,6 +27,9 @@
|
||||
#include "partners_check.h"
|
||||
#include "ParodusInternal.h"
|
||||
#include "crud_interface.h"
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
#include "xmidtsend_rbus.h"
|
||||
#endif
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -91,6 +94,7 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
{
|
||||
ParodusPrint("numOfClients registered is %d\n", get_numOfClients());
|
||||
int ret = validate_partner_id(message, NULL);
|
||||
ParodusPrint("validate_partner_id returns %d\n", ret);
|
||||
if(ret < 0)
|
||||
{
|
||||
response = cJSON_CreateObject();
|
||||
@@ -98,7 +102,6 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
cJSON_AddStringToObject(response, "message", "Invalid partner_id");
|
||||
}
|
||||
|
||||
|
||||
destVal = strdup(((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.dest :
|
||||
((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.dest : message->u.crud.dest)));
|
||||
|
||||
@@ -120,10 +123,10 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
}
|
||||
ParodusInfo("Received downstream dest as :%s and transaction_uuid :%s\n", dest,
|
||||
((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.transaction_uuid :
|
||||
((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid)));
|
||||
((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.transaction_uuid : message->u.crud.transaction_uuid)));
|
||||
OnboardLog("%s\n",
|
||||
((WRP_MSG_TYPE__REQ == msgType) ? message->u.req.transaction_uuid :
|
||||
((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid)));
|
||||
((WRP_MSG_TYPE__EVENT == msgType) ? message->u.event.transaction_uuid : message->u.crud.transaction_uuid)));
|
||||
|
||||
free(destVal);
|
||||
|
||||
@@ -168,13 +171,16 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
destFlag =1;
|
||||
}
|
||||
//if any unknown dest received sending error response to server
|
||||
if(destFlag ==0)
|
||||
{
|
||||
ParodusError("Unknown dest:%s\n", dest);
|
||||
response = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(response, "statusCode", 531);
|
||||
cJSON_AddStringToObject(response, "message", "Service Unavailable");
|
||||
}
|
||||
if (WRP_MSG_TYPE__EVENT != msgType)
|
||||
{
|
||||
if(destFlag ==0)
|
||||
{
|
||||
ParodusError("Unknown dest:%s\n", dest);
|
||||
response = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(response, "statusCode", 531);
|
||||
cJSON_AddStringToObject(response, "message", "Service Unavailable");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if( (WRP_MSG_TYPE__EVENT != msgType) &&
|
||||
@@ -235,6 +241,37 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
}
|
||||
free(resp_msg);
|
||||
}
|
||||
#ifdef ENABLE_WEBCFGBIN
|
||||
//To handle cloud ack events received from server for the xmidt sent messages.
|
||||
if((WRP_MSG_TYPE__EVENT == msgType) && (ret >= 0))
|
||||
{
|
||||
if(get_parodus_cfg()->max_queue_size > 0)
|
||||
{
|
||||
//Process cloud ack only when qos > 24
|
||||
if(highQosValueCheck(message->u.event.qos))
|
||||
{
|
||||
if(message->u.event.transaction_uuid !=NULL)
|
||||
{
|
||||
ParodusInfo("Received cloud ack from server: transaction_uuid %s qos %d, rdr %d source %s\n", message->u.event.transaction_uuid, message->u.event.qos, message->u.event.rdr, message->u.event.source);
|
||||
addToCloudAckQ(message->u.event.transaction_uuid, message->u.event.qos, message->u.event.rdr, message->u.event.source);
|
||||
ParodusPrint("Added to cloud ack Q\n");
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("cloud ack transaction id is NULL\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("cloud ack received with low qos %d, ignoring it\n", message->u.event.qos);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("cloud ack is ignored as max queue size is %d\n", get_parodus_cfg()->max_queue_size );
|
||||
}
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -22,12 +22,15 @@
|
||||
*/
|
||||
|
||||
#include "heartBeat.h"
|
||||
#include "time.h"
|
||||
#include <stdbool.h>
|
||||
|
||||
volatile unsigned int heartBeatTimer = 0;
|
||||
volatile bool paused = false;
|
||||
volatile long long pingTimeStamp = 0;
|
||||
|
||||
pthread_mutex_t heartBeat_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_mutex_t ping_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
// Get value of heartBeatTimer
|
||||
unsigned int get_heartBeatTimer()
|
||||
@@ -73,5 +76,22 @@ void resume_heartBeatTimer()
|
||||
pthread_mutex_unlock (&heartBeat_mut);
|
||||
}
|
||||
|
||||
// Set ping received timeStamp
|
||||
void set_pingTimeStamp()
|
||||
{
|
||||
struct timespec ts;
|
||||
getCurrentTime(&ts);
|
||||
pthread_mutex_lock (&ping_mut);
|
||||
pingTimeStamp = (long long)ts.tv_sec;
|
||||
pthread_mutex_unlock (&ping_mut);
|
||||
}
|
||||
|
||||
|
||||
// Get ping received timeStamp
|
||||
long long get_pingTimeStamp()
|
||||
{
|
||||
long long tmp = 0;
|
||||
pthread_mutex_lock (&ping_mut);
|
||||
tmp = pingTimeStamp;
|
||||
pthread_mutex_unlock (&ping_mut);
|
||||
return tmp;
|
||||
}
|
||||
|
||||
@@ -45,6 +45,10 @@ void pause_heartBeatTimer();
|
||||
// Resume heartBeatTimer, i.e. resume incrementing
|
||||
void resume_heartBeatTimer();
|
||||
|
||||
void set_pingTimeStamp();
|
||||
|
||||
long long get_pingTimeStamp();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -149,6 +149,7 @@ void listenerOnPingMessage (noPollCtx * ctx, noPollConn * conn, noPollMsg * msg,
|
||||
if (nopoll_msg_opcode(msg) == NOPOLL_PING_FRAME)
|
||||
{
|
||||
reset_heartBeatTimer();
|
||||
set_pingTimeStamp();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,7 @@ void setMessageHandlers()
|
||||
nopoll_conn_set_on_close(get_global_conn(), (noPollOnCloseHandler)listenerOnCloseMessage, NULL);
|
||||
}
|
||||
|
||||
static int cloud_status_is_online (void)
|
||||
int cloud_status_is_online (void)
|
||||
{
|
||||
const char *status = get_cloud_status();
|
||||
if (NULL == status)
|
||||
|
||||
@@ -114,7 +114,7 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds)
|
||||
}
|
||||
|
||||
/* Commandline input partner_ids not matching with partner_ids from request, appending to request partner_ids*/
|
||||
if(matchFlag != 1)
|
||||
if(matchFlag != 1 && partnerIds !=NULL)
|
||||
{
|
||||
(*partnerIds) = (partners_t *) malloc(sizeof(partners_t) + (sizeof(char *) * (count+partnersList->count)));
|
||||
(*partnerIds)->count = count+partnersList->count;
|
||||
@@ -132,6 +132,23 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds)
|
||||
i++;
|
||||
}
|
||||
}
|
||||
else if (matchFlag != 1 && partnerIds == NULL)
|
||||
{
|
||||
ParodusError("partner_id match not found: command line input %s , msg partner_id %s\n", temp, msg->u.event.partner_ids->partner_ids[0]);
|
||||
if(partnersList != NULL)
|
||||
{
|
||||
for(j=0; j<partnersList->count; j++)
|
||||
{
|
||||
if(NULL != partnersList->partner_ids[j])
|
||||
{
|
||||
free(partnersList->partner_ids[j]);
|
||||
}
|
||||
}
|
||||
free(partnersList);
|
||||
}
|
||||
free(partnerId);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
#include "time.h"
|
||||
#include "parodus_log.h"
|
||||
#include <errno.h>
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
@@ -24,7 +25,10 @@
|
||||
|
||||
void getCurrentTime(struct timespec *timer)
|
||||
{
|
||||
clock_gettime(CLOCK_REALTIME, timer);
|
||||
if( clock_gettime(CLOCK_REALTIME, timer) == -1 )
|
||||
{
|
||||
ParodusError("clock gettime returns errno %d\n", errno );
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t getCurrentTimeInMicroSeconds(struct timespec *timer)
|
||||
|
||||
@@ -174,11 +174,15 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
|
||||
rbusValue_t newValue = rbusObject_GetValue(event->data, "value");
|
||||
rbusValue_t oldValue = rbusObject_GetValue(event->data, "oldValue");
|
||||
ParodusInfo("Consumer received ValueChange event for param %s\n", event->name);
|
||||
|
||||
|
||||
if(newValue) {
|
||||
interface = (char *) rbusValue_GetString(newValue, NULL);
|
||||
setWebpaInterface(interface);
|
||||
}
|
||||
}
|
||||
else {
|
||||
ParodusError("newValue is NULL\n");
|
||||
}
|
||||
|
||||
if(newValue !=NULL && oldValue!=NULL && interface!=NULL) {
|
||||
ParodusInfo("New Value: %s Old Value: %s New Interface Value: %s\n", rbusValue_GetString(newValue, NULL), rbusValue_GetString(oldValue, NULL), interface);
|
||||
|
||||
@@ -195,5 +199,13 @@ void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rb
|
||||
set_close_retry();
|
||||
|
||||
}
|
||||
else {
|
||||
if(oldValue == NULL) {
|
||||
ParodusError("oldValue is NULL\n");
|
||||
}
|
||||
if(interface == NULL) {
|
||||
ParodusError("interface is NULL\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
1069
src/xmidtsend_rbus.c
1069
src/xmidtsend_rbus.c
File diff suppressed because it is too large
Load Diff
@@ -24,14 +24,22 @@
|
||||
#ifndef _XMIDTSEND_RBUS_H_
|
||||
#define _XMIDTSEND_RBUS_H_
|
||||
#include <rbus.h>
|
||||
#include "config.h"
|
||||
#include <uuid/uuid.h>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define XMIDT_SEND_METHOD "Device.X_RDK_Xmidt.SendData"
|
||||
#define MAX_QUEUE_SIZE 10
|
||||
#define INPARAMS_PATH "/tmp/inparams.txt"
|
||||
|
||||
#define CLOUD_ACK_TIMEOUT_SEC 7
|
||||
#define CRITICAL_QOS_EXPIRE_TIME 30*60
|
||||
#define HIGH_QOS_EXPIRE_TIME 25*60
|
||||
#define MEDIUM_QOS_EXPIRE_TIME 20*60
|
||||
#define LOW_QOS_EXPIRE_TIME 15*60
|
||||
|
||||
#define EXPIRY_CHECK_TIME 5*60 //To check expiry in every 5 mins when cloud connection is down.
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Data Structures */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -39,9 +47,21 @@ typedef struct XmidtMsg__
|
||||
{
|
||||
wrp_msg_t *msg;
|
||||
rbusMethodAsyncHandle_t asyncHandle;
|
||||
int state;
|
||||
long long enqueueTime;
|
||||
long long sentTime;
|
||||
struct XmidtMsg__ *next;
|
||||
} XmidtMsg;
|
||||
|
||||
typedef struct CloudAck__
|
||||
{
|
||||
char *transaction_id;
|
||||
int qos;
|
||||
int rdr;
|
||||
char *source;
|
||||
struct CloudAck__ *next;
|
||||
} CloudAck;
|
||||
|
||||
typedef enum
|
||||
{
|
||||
DELIVERED_SUCCESS = 0,
|
||||
@@ -56,8 +76,18 @@ typedef enum
|
||||
CLIENT_DISCONNECT = 101,
|
||||
QUEUE_SIZE_EXCEEDED = 102,
|
||||
WRP_ENCODE_FAILURE = 103,
|
||||
MSG_PROCESSING_FAILED = 104
|
||||
MSG_PROCESSING_FAILED = 104,
|
||||
QOS_SEMANTICS_DISABLED = 105,
|
||||
MSG_EXPIRED = 106,
|
||||
QUEUE_OPTIMIZED = 107
|
||||
} XMIDT_STATUS;
|
||||
|
||||
typedef enum
|
||||
{
|
||||
PENDING = 0,
|
||||
SENT,
|
||||
DELETE
|
||||
} MSG_STATUS;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -66,17 +96,28 @@ rbusHandle_t get_parodus_rbus_Handle(void);
|
||||
void addToXmidtUpstreamQ(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
void* processXmidtUpstreamMsg();
|
||||
void processXmidtData();
|
||||
int processData(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
void sendXmidtEventToServer(wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
int processData(XmidtMsg *Datanode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
int sendXmidtEventToServer(XmidtMsg *msgnode, wrp_msg_t * msg, rbusMethodAsyncHandle_t asyncHandle);
|
||||
int checkInputParameters(rbusObject_t inParams);
|
||||
char* generate_transaction_uuid();
|
||||
void parseRbusInparamsToWrp(rbusObject_t inParams, char *trans_id, wrp_msg_t **eventMsg);
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, rbusError_t error);
|
||||
void createOutParamsandSendAck(wrp_msg_t *msg, rbusMethodAsyncHandle_t asyncHandle, char *errorMsg, int statuscode, char *cloudsource, rbusError_t error);
|
||||
int validateXmidtData(wrp_msg_t * eventMsg, char **errorMsg, int *statusCode);
|
||||
void xmidtQDequeue();
|
||||
void printSendMsgData(char* status, int qos, char* dest, char* transaction_uuid);
|
||||
bool highQosValueCheck(int qos);
|
||||
void waitTillConnectionIsUp();
|
||||
void printRBUSParams(rbusObject_t params, char* file_path);
|
||||
void addToCloudAckQ(char *transaction_id, int qos, int rdr, char *source);
|
||||
int checkCloudACK(XmidtMsg *xmdnode, rbusMethodAsyncHandle_t asyncHandle);
|
||||
int updateXmidtState(XmidtMsg * temp, int state);
|
||||
void print_xmidMsg_list();
|
||||
int deleteCloudACKNode(char* trans_id);
|
||||
int deleteFromXmidtQ(XmidtMsg **next_node);
|
||||
int checkCloudConn();
|
||||
void checkMaxQandOptimize();
|
||||
void checkMsgExpiry();
|
||||
void mapXmidtStatusToStatusMessage(int status, char **message);
|
||||
int xmidtQOptmize();
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -46,7 +46,7 @@ link_directories ( ${LIBRARY_DIR} )
|
||||
# test_heartBeatTimer
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_heartBeatTimer COMMAND ${MEMORY_CHECK} ./test_heartBeatTimer)
|
||||
add_executable(test_heartBeatTimer test_heartBeatTimer.c ../src/heartBeat.c)
|
||||
add_executable(test_heartBeatTimer test_heartBeatTimer.c ../src/heartBeat.c ../src/time.c)
|
||||
target_link_libraries (test_heartBeatTimer ${PARODUS_COMMON_LIBS} -lcmocka)
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
@@ -109,7 +109,7 @@ target_link_libraries (test_spin_thread_e ${PARODUS_COMMON_LIBS} )
|
||||
# test_nopoll_handlers
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_nopoll_handlers COMMAND ${MEMORY_CHECK} ./test_nopoll_handlers)
|
||||
add_executable(test_nopoll_handlers test_nopoll_handlers.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c)
|
||||
add_executable(test_nopoll_handlers test_nopoll_handlers.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c ../src/time.c)
|
||||
target_link_libraries (test_nopoll_handlers -lnopoll -lcunit -lcimplog -Wl,--no-as-needed -lrt -lpthread -lm)
|
||||
|
||||
|
||||
@@ -117,7 +117,7 @@ target_link_libraries (test_nopoll_handlers -lnopoll -lcunit -lcimplog -Wl,--no-
|
||||
# test_nopoll_handlers_fragment
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_nopoll_handlers_fragment COMMAND ${MEMORY_CHECK} ./test_nopoll_handlers_fragment)
|
||||
add_executable(test_nopoll_handlers_fragment test_nopoll_handlers_fragment.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c)
|
||||
add_executable(test_nopoll_handlers_fragment test_nopoll_handlers_fragment.c ../src/nopoll_handlers.c ../src/heartBeat.c ../src/close_retry.c ../src/time.c)
|
||||
target_link_libraries (test_nopoll_handlers_fragment -lnopoll -lcunit -lcimplog -Wl,--no-as-needed -lrt -lpthread -lm -lcmocka)
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
@@ -212,7 +212,7 @@ target_link_libraries (test_auth_token -lcmocka
|
||||
# test_auth_token_more
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_auth_token_more COMMAND ${MEMORY_CHECK} ./test_auth_token_more)
|
||||
add_executable(test_auth_token_more test_auth_token_more.c ../src/config.c ../src/auth_token.c ../src/string_helpers.c)
|
||||
add_executable(test_auth_token_more test_auth_token_more.c ../src/config.c ../src/auth_token.c ../src/string_helpers.c ../src/config.c)
|
||||
target_link_libraries (test_auth_token_more -lcmocka
|
||||
-Wl,--no-as-needed -lcimplog
|
||||
-lcjson -lcjwt -ltrower-base64 -lssl -lcrypto -lrt -lm -lcurl -luuid
|
||||
@@ -265,7 +265,7 @@ target_link_libraries (test_upstream_sock -lcmocka gcov -lcunit -lcimplog
|
||||
# test_downstream
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_downstream COMMAND ${MEMORY_CHECK} ./test_downstream)
|
||||
add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c)
|
||||
add_executable(test_downstream test_downstream.c ../src/downstream.c ../src/string_helpers.c ../src/config.c)
|
||||
target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog
|
||||
-lwrp-c -luuid -lpthread -lmsgpackc -lnopoll
|
||||
-Wl,--no-as-needed -lcjson -lcjwt -ltrower-base64
|
||||
@@ -275,7 +275,7 @@ target_link_libraries (test_downstream -lcmocka gcov -lcunit -lcimplog
|
||||
# test_downstream_more
|
||||
#-------------------------------------------------------------------------------
|
||||
add_test(NAME test_downstream_more COMMAND ${MEMORY_CHECK} ./test_downstream_more)
|
||||
add_executable(test_downstream_more test_downstream_more.c ../src/downstream.c ../src/string_helpers.c)
|
||||
add_executable(test_downstream_more test_downstream_more.c ../src/downstream.c ../src/string_helpers.c ../src/config.c)
|
||||
target_link_libraries (test_downstream_more -lcmocka ${PARODUS_COMMON_LIBS} )
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
@@ -296,6 +296,7 @@ set(CONIFC_SRC test_conn_interface.c
|
||||
../src/token.c
|
||||
../src/string_helpers.c
|
||||
../src/mutex.c
|
||||
../src/time.c
|
||||
../src/heartBeat.c
|
||||
../src/close_retry.c
|
||||
../src/event_handler.c
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "../src/ParodusInternal.h"
|
||||
|
||||
extern int parse_mac_address (char *target, const char *arg);
|
||||
extern int parse_serial_num(char *target, const char *arg);
|
||||
extern int server_is_http (const char *full_url,
|
||||
const char **server_ptr);
|
||||
extern int parse_webpa_url__(const char *full_url,
|
||||
@@ -467,6 +468,14 @@ void test_parse_mac_address ()
|
||||
assert_int_equal (parse_mac_address (result, ""), -1);
|
||||
}
|
||||
|
||||
void test_parse_serial_num()
|
||||
{
|
||||
char result[14];
|
||||
assert_int_equal (parse_serial_num (result, "1234ABC00ab"), 0);
|
||||
assert_int_equal (parse_serial_num (result, "$@@"), 0);
|
||||
assert_int_equal (parse_serial_num (result, ""), 0);
|
||||
}
|
||||
|
||||
void test_server_is_http ()
|
||||
{
|
||||
const char *server_ptr;
|
||||
@@ -587,6 +596,7 @@ int main(void)
|
||||
cmocka_unit_test(err_loadParodusCfg),
|
||||
cmocka_unit_test(test_parse_num_arg),
|
||||
cmocka_unit_test(test_parse_mac_address),
|
||||
cmocka_unit_test(test_parse_serial_num),
|
||||
cmocka_unit_test(test_get_algo_mask),
|
||||
cmocka_unit_test(test_server_is_http),
|
||||
cmocka_unit_test(test_parse_webpa_url__),
|
||||
|
||||
@@ -71,7 +71,10 @@ void nopoll_log_set_handler (noPollCtx *ctx, noPollLogHandler handler, noPollPtr
|
||||
UNUSED(ctx); UNUSED(handler); UNUSED(user_data);
|
||||
function_called();
|
||||
}
|
||||
|
||||
int cloud_status_is_online (void)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
void __report_log (noPollCtx * ctx, noPollDebugLevel level, const char * log_msg, noPollPtr user_data)
|
||||
{
|
||||
UNUSED(ctx); UNUSED(level); UNUSED(log_msg); UNUSED(user_data);
|
||||
@@ -144,6 +147,10 @@ void packMetaData()
|
||||
function_called();
|
||||
}
|
||||
|
||||
int get_parodus_init()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int get_cloud_disconnect_time(void)
|
||||
{
|
||||
|
||||
@@ -64,7 +64,18 @@ reg_list_item_t * get_global_node(void)
|
||||
void release_global_node (void)
|
||||
{
|
||||
}
|
||||
|
||||
void addToCloudAckQ(char *transaction_id, int qos, int rdr)
|
||||
{
|
||||
(void)transaction_id;
|
||||
(void)qos;
|
||||
(void)rdr;
|
||||
return;
|
||||
}
|
||||
bool highQosValueCheck(int qos)
|
||||
{
|
||||
(void)qos;
|
||||
return false;
|
||||
}
|
||||
ssize_t wrp_to_struct( const void *bytes, const size_t length,
|
||||
const enum wrp_format fmt, wrp_msg_t **msg )
|
||||
{
|
||||
|
||||
@@ -192,6 +192,20 @@ int validate_partner_id(wrp_msg_t *msg, partners_t **partnerIds)
|
||||
return 1;
|
||||
}
|
||||
|
||||
void addToCloudAckQ(char *transaction_id, int qos, int rdr)
|
||||
{
|
||||
(void)transaction_id;
|
||||
(void)qos;
|
||||
(void)rdr;
|
||||
return;
|
||||
}
|
||||
|
||||
bool highQosValueCheck(int qos)
|
||||
{
|
||||
(void)qos;
|
||||
return false;
|
||||
}
|
||||
|
||||
ssize_t wrp_to_struct( const void *bytes, const size_t length, const enum wrp_format fmt,
|
||||
wrp_msg_t **msg )
|
||||
{
|
||||
|
||||
@@ -79,7 +79,6 @@ void test_mutexHeartBeatTimer() {
|
||||
ParodusInfo("heartBeatTimer reset to: %d\n", heartBeatTimer);
|
||||
assert_int_equal(heartBeatTimer, 0);
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -68,7 +68,6 @@ nopoll_bool nopoll_msg_is_final(noPollMsg *msg)
|
||||
function_called();
|
||||
return (nopoll_bool) mock();
|
||||
}
|
||||
|
||||
const unsigned char *nopoll_msg_get_payload(noPollMsg *msg)
|
||||
{
|
||||
(void)msg;
|
||||
|
||||
Reference in New Issue
Block a user