mirror of
https://github.com/outbackdingo/parodus.git
synced 2026-01-27 18:20:04 +00:00
Compare commits
1 Commits
3.4_p3xb3
...
3.3_p17xb3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b134a646e5 |
@@ -26,7 +26,7 @@ install:
|
||||
script:
|
||||
- mkdir build
|
||||
- cd build
|
||||
- cmake .. -DINTEGRATION_TESTING:BOOL=false -DDISABLE_VALGRIND:BOOL=false -DENABLE_SESHAT:BOOL=true -DFEATURE_DNS_QUERY:BOOL=true
|
||||
- cmake .. -DINTEGRATION_TESTING:BOOL=false -DDISABLE_VALGRIND:BOOL=true -DENABLE_SESHAT:BOOL=true -DFEATURE_DNS_QUERY:BOOL=true
|
||||
- make
|
||||
- export ARGS="-V"
|
||||
- make test
|
||||
|
||||
16
CHANGELOG.md
16
CHANGELOG.md
@@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.0.2] - 2019-02-08
|
||||
## [Unreleased]
|
||||
- Refactored connection.c and updated corresponding unit tests
|
||||
- Additional `/cloud-status` and `/cloud-disconnect` fields.
|
||||
- Switched from nanomsg (Release 1.1.2) to NNG (Release v1.0.1)
|
||||
@@ -17,19 +17,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
||||
- added NULL check for device mac id in upstream retrieve message handling
|
||||
- backoff retry to include find_servers in loop (connection.c)
|
||||
- backoff max is max count not max delay
|
||||
- used mutex protection to make client list and nn_sends thread safe
|
||||
- put mutex lock into get_global_node
|
||||
- change svc alive from a thread to a function called every 30 sec from main
|
||||
- shut down tasks properly
|
||||
- fix memory leaks
|
||||
- Fixed memory leak in upstream event message flow
|
||||
- Fixed crash in CRUD request processing
|
||||
- Fixed issue on RETRIEVE respone processing
|
||||
- Enabled valgrind
|
||||
- Fixed main loop to keep calling svc_alive_task during a cloud disconnect and retry
|
||||
- change svc alive back to a separate thread. Shut it down with pthread_cond_timedwait
|
||||
- Refactored Upsteam RETRIEVE flow
|
||||
- Fix re-registration to call nn_shutdown and nn_close, so we don't lose a socket.
|
||||
|
||||
## [1.0.1] - 2018-07-18
|
||||
### Added
|
||||
@@ -62,7 +49,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
|
||||
- Initial creation
|
||||
|
||||
[Unreleased]: https://github.com/Comcast/parodus/compare/1.0.1...HEAD
|
||||
[1.0.2]: https://github.com/Comcast/parodus/compare/1.0.1...1.0.2
|
||||
[1.0.1]: https://github.com/Comcast/parodus/compare/1.0.0...1.0.1
|
||||
[1.0.0]: https://github.com/Comcast/parodus/compare/79fa7438de2b14ae64f869d52f5c127497bf9c3f...1.0.0
|
||||
|
||||
|
||||
@@ -191,8 +191,7 @@ endif (ENABLE_SESHAT)
|
||||
ExternalProject_Add(cjwt
|
||||
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/cjwt
|
||||
GIT_REPOSITORY https://github.com/Comcast/cjwt.git
|
||||
GIT_TAG "1b023c41bb2d6dbbf493c202ed81f06c84d5b51b"
|
||||
#GIT_TAG "1.0.1"
|
||||
GIT_TAG "1.0.1"
|
||||
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
|
||||
)
|
||||
add_library(libcjwt STATIC SHARED IMPORTED)
|
||||
|
||||
@@ -50,8 +50,7 @@
|
||||
/* Macros */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
#define UNUSED(x) (void )(x)
|
||||
#define NANO_SOCKET_SEND_TIMEOUT_MS 2000
|
||||
#define NANO_SOCKET_RCV_TIMEOUT_MS 500
|
||||
#define NANOMSG_SOCKET_TIMEOUT_MSEC 2000
|
||||
|
||||
#ifndef TEST
|
||||
#define FOREVER() 1
|
||||
@@ -134,7 +133,6 @@ typedef struct {
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
extern bool g_shutdown;
|
||||
extern ParodusMsg *ParodusMsgQ;
|
||||
int numLoops;
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static int numOfClients = 0;
|
||||
static reg_list_item_t * g_head = NULL;
|
||||
pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External functions */
|
||||
@@ -38,16 +37,9 @@ pthread_mutex_t client_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
reg_list_item_t * get_global_node(void)
|
||||
{
|
||||
pthread_mutex_lock (&client_mut);
|
||||
return g_head;
|
||||
}
|
||||
|
||||
void release_global_node (void)
|
||||
{
|
||||
pthread_mutex_unlock (&client_mut);
|
||||
}
|
||||
|
||||
|
||||
int get_numOfClients()
|
||||
{
|
||||
return numOfClients;
|
||||
@@ -60,12 +52,11 @@ int addToList( wrp_msg_t **msg)
|
||||
int rc = -1;
|
||||
int sock;
|
||||
int retStatus = -1;
|
||||
|
||||
sock = nn_socket( AF_SP, NN_PUSH );
|
||||
ParodusPrint("sock created for adding entries to list: %d\n", sock);
|
||||
if(sock >= 0)
|
||||
{
|
||||
int t = NANO_SOCKET_SEND_TIMEOUT_MS;
|
||||
int t = NANOMSG_SOCKET_TIMEOUT_MSEC;
|
||||
rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t));
|
||||
if(rc < 0)
|
||||
{
|
||||
@@ -76,11 +67,7 @@ int addToList( wrp_msg_t **msg)
|
||||
if(rc < 0)
|
||||
{
|
||||
ParodusError ("Unable to connect socket (errno=%d, %s)\n",errno, strerror(errno));
|
||||
if (nn_close (sock) < 0)
|
||||
{
|
||||
ParodusError ("nn_close socket=%d (err=%d, %s)\n",
|
||||
sock, errno, strerror(errno));
|
||||
}
|
||||
nn_close (sock);
|
||||
|
||||
}
|
||||
else
|
||||
@@ -91,7 +78,6 @@ int addToList( wrp_msg_t **msg)
|
||||
{
|
||||
memset( new_node, 0, sizeof( reg_list_item_t ) );
|
||||
new_node->sock = sock;
|
||||
new_node->endpoint = rc;
|
||||
ParodusPrint("new_node->sock is %d\n", new_node->sock);
|
||||
|
||||
|
||||
@@ -224,21 +210,11 @@ int deleteFromList(char* service_name)
|
||||
}
|
||||
|
||||
ParodusPrint("Deleting the node\n");
|
||||
if(nn_shutdown(curr_node->sock, curr_node->endpoint) < 0)
|
||||
{
|
||||
ParodusError ("nn_shutdown socket=%d endpt=%d, err=%d\n",
|
||||
curr_node->sock, curr_node->endpoint, errno);
|
||||
}
|
||||
if (nn_close (curr_node->sock) < 0)
|
||||
{
|
||||
ParodusError ("nn_close socket=%d err=%d\n",
|
||||
curr_node->sock, errno);
|
||||
}
|
||||
free( curr_node );
|
||||
curr_node = NULL;
|
||||
ParodusInfo("Deleted successfully and returning..\n");
|
||||
numOfClients =numOfClients - 1;
|
||||
ParodusPrint("numOfClients after delete is %d\n", numOfClients);
|
||||
ParodusPrint("numOfClients after delte is %d\n", numOfClients);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -249,22 +225,6 @@ int deleteFromList(char* service_name)
|
||||
return -1;
|
||||
}
|
||||
|
||||
void deleteAllClients (void)
|
||||
{
|
||||
reg_list_item_t *next_node = NULL;
|
||||
|
||||
while (NULL != g_head)
|
||||
{
|
||||
next_node = g_head->next;
|
||||
free (g_head);
|
||||
g_head = next_node;
|
||||
}
|
||||
if (numOfClients > 0) {
|
||||
ParodusInfo ("Deleted %d clients\n", numOfClients);
|
||||
numOfClients = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
*@dest : Client destination to send message
|
||||
*@Msg: Msg to send it to client (No free done here), user responsibilites to free the msg
|
||||
@@ -283,7 +243,6 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize)
|
||||
if( strcmp(dest, temp->service_name) == 0)
|
||||
{
|
||||
bytes = nn_send(temp->sock, *Msg, msgSize, 0);
|
||||
release_global_node ();
|
||||
ParodusInfo("sent downstream message to reg_client '%s'\n", temp->url);
|
||||
ParodusPrint("downstream bytes sent:%d\n", bytes);
|
||||
return 1;
|
||||
@@ -291,6 +250,5 @@ int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize)
|
||||
ParodusPrint("checking the next item in the list\n");
|
||||
temp= temp->next;
|
||||
}
|
||||
release_global_node ();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
typedef struct reg_list_item
|
||||
{
|
||||
int sock;
|
||||
int endpoint;
|
||||
char service_name[32];
|
||||
char url[100];
|
||||
struct reg_list_item *next;
|
||||
@@ -51,11 +50,9 @@ int sendAuthStatus(reg_list_item_t *new_node);
|
||||
|
||||
int deleteFromList(char* service_name);
|
||||
int get_numOfClients();
|
||||
void deleteAllClients (void);
|
||||
int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize);
|
||||
|
||||
reg_list_item_t * get_global_node(void);
|
||||
void release_global_node (void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ extern "C" {
|
||||
#define BOOT_TIME "boot-time"
|
||||
#define LAST_RECONNECT_REASON "webpa-last-reconnect-reason"
|
||||
#define WEBPA_PROTOCOL "webpa-protocol"
|
||||
#define WEBPA_INTERFACE "webpa-interface-used"
|
||||
#define WEBPA_INTERFACE "webpa-inteface-used"
|
||||
#define WEBPA_UUID "webpa-uuid"
|
||||
#define WEBPA_URL "webpa-url"
|
||||
#define WEBPA_PING_TIMEOUT "webpa-ping-timeout"
|
||||
|
||||
@@ -52,18 +52,12 @@
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
bool g_shutdown = false;
|
||||
pthread_t upstream_tid;
|
||||
pthread_t upstream_msg_tid;
|
||||
pthread_t downstream_tid;
|
||||
pthread_t svc_alive_tid;
|
||||
pthread_t crud_tid;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
void timespec_diff(struct timespec *start, struct timespec *stop,
|
||||
struct timespec *result);
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -75,7 +69,6 @@ void createSocketConnection(void (* initKeypress)())
|
||||
bool seshat_registered = false;
|
||||
unsigned int webpa_ping_timeout_ms = 1000 * get_parodus_cfg()->webpa_ping_timeout;
|
||||
unsigned int heartBeatTimer = 0;
|
||||
struct timespec start_svc_alive_timer;
|
||||
|
||||
//loadParodusCfg(tmpCfg,get_parodus_cfg());
|
||||
#ifdef FEATURE_DNS_QUERY
|
||||
@@ -101,12 +94,12 @@ void createSocketConnection(void (* initKeypress)())
|
||||
packMetaData();
|
||||
|
||||
UpStreamMsgQ = NULL;
|
||||
StartThread(handle_upstream, &upstream_tid);
|
||||
StartThread(processUpstreamMessage, &upstream_msg_tid);
|
||||
StartThread(handle_upstream);
|
||||
StartThread(processUpstreamMessage);
|
||||
ParodusMsgQ = NULL;
|
||||
StartThread(messageHandlerTask, &downstream_tid);
|
||||
StartThread(serviceAliveTask, &svc_alive_tid);
|
||||
StartThread(CRUDHandlerTask, &crud_tid);
|
||||
StartThread(messageHandlerTask);
|
||||
StartThread(serviceAliveTask);
|
||||
StartThread(CRUDHandlerTask);
|
||||
|
||||
if (NULL != initKeypress)
|
||||
{
|
||||
@@ -115,8 +108,6 @@ void createSocketConnection(void (* initKeypress)())
|
||||
|
||||
seshat_registered = __registerWithSeshat();
|
||||
|
||||
clock_gettime(CLOCK_REALTIME, &start_svc_alive_timer);
|
||||
|
||||
do
|
||||
{
|
||||
struct timespec start, stop, diff;
|
||||
@@ -167,41 +158,19 @@ void createSocketConnection(void (* initKeypress)())
|
||||
ParodusInfo("cloud_status set as %s after connection close\n", get_parodus_cfg()->cloud_status);
|
||||
if(get_parodus_cfg()->cloud_disconnect !=NULL)
|
||||
{
|
||||
ParodusPrint("get_parodus_cfg()->cloud_disconnect is %s\n", get_parodus_cfg()->cloud_disconnect);
|
||||
set_cloud_disconnect_time(CLOUD_RECONNECT_TIME);
|
||||
ParodusInfo("Waiting for %d minutes for reconnecting .. \n", get_cloud_disconnect_time());
|
||||
ParodusPrint("get_parodus_cfg()->cloud_disconnect is %s\n", get_parodus_cfg()->cloud_disconnect);
|
||||
set_cloud_disconnect_time(CLOUD_RECONNECT_TIME);
|
||||
ParodusInfo("Waiting for %d minutes for reconnecting .. \n", get_cloud_disconnect_time());
|
||||
|
||||
sleep (get_cloud_disconnect_time() * 60);
|
||||
ParodusInfo("cloud-disconnect reason reset after %d minutes\n", get_cloud_disconnect_time());
|
||||
free(get_parodus_cfg()->cloud_disconnect);
|
||||
reset_cloud_disconnect_reason(get_parodus_cfg());
|
||||
sleep( get_cloud_disconnect_time() * 60 );
|
||||
ParodusInfo("cloud-disconnect reason reset after %d minutes\n", get_cloud_disconnect_time());
|
||||
free(get_parodus_cfg()->cloud_disconnect);
|
||||
reset_cloud_disconnect_reason(get_parodus_cfg());
|
||||
}
|
||||
createNopollConnection(ctx);
|
||||
}
|
||||
} while(!get_close_retry() && !g_shutdown);
|
||||
|
||||
pthread_mutex_lock (get_global_svc_mut());
|
||||
pthread_cond_signal (get_global_svc_con());
|
||||
pthread_mutex_unlock (get_global_svc_mut());
|
||||
pthread_mutex_lock (get_global_crud_mut());
|
||||
pthread_cond_signal (get_global_crud_con());
|
||||
pthread_mutex_unlock (get_global_crud_mut());
|
||||
pthread_mutex_lock (&g_mutex);
|
||||
pthread_cond_signal (&g_cond);
|
||||
pthread_mutex_unlock (&g_mutex);
|
||||
pthread_mutex_lock (get_global_nano_mut ());
|
||||
pthread_cond_signal (get_global_nano_con());
|
||||
pthread_mutex_unlock (get_global_nano_mut());
|
||||
|
||||
ParodusInfo ("joining threads\n");
|
||||
JoinThread (svc_alive_tid);
|
||||
JoinThread (upstream_tid);
|
||||
JoinThread (downstream_tid);
|
||||
JoinThread (upstream_msg_tid);
|
||||
JoinThread (crud_tid);
|
||||
|
||||
deleteAllClients ();
|
||||
|
||||
close_and_unref_connection(get_global_conn());
|
||||
nopoll_ctx_unref(ctx);
|
||||
nopoll_cleanup_library();
|
||||
|
||||
@@ -46,7 +46,6 @@ static char *reconnect_reason = "webpa_process_starts";
|
||||
static int cloud_disconnect_max_time = 5;
|
||||
static noPollConn *g_conn = NULL;
|
||||
static bool LastReasonStatus = false;
|
||||
static int init = 1;
|
||||
static noPollConnOpts * createConnOpts (char * extra_headers, bool secure);
|
||||
static char* build_extra_headers( const char *auth, const char *device_id,
|
||||
const char *user_agent, const char *convey );
|
||||
@@ -385,7 +384,7 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6)
|
||||
NULL, default_url,NULL,NULL);
|
||||
}
|
||||
}
|
||||
if ((NULL == connection) && (!is_ipv6)) {
|
||||
if (NULL == connection) {
|
||||
if((checkHostIp(server->server_addr) == -2)) {
|
||||
if (check_timer_expired (&ctx->connect_timer, 15*60*1000)) {
|
||||
ParodusError("WebPA unable to connect due to DNS resolving to 10.0.0.1 for over 15 minutes; crashing service.\n");
|
||||
@@ -407,8 +406,6 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6)
|
||||
#define WAIT_ACTION_RETRY 1 // if wait_status is 307, 302, 303 or 403
|
||||
#define WAIT_FAIL 2
|
||||
|
||||
#define FREE_NON_NULL_PTR(ptr) if (NULL != ptr) free(ptr)
|
||||
|
||||
int wait_connection_ready (create_connection_ctx_t *ctx)
|
||||
{
|
||||
int wait_status;
|
||||
@@ -416,10 +413,7 @@ int wait_connection_ready (create_connection_ctx_t *ctx)
|
||||
|
||||
if(nopoll_conn_wait_for_status_until_connection_ready(get_global_conn(), 10,
|
||||
&wait_status, &redirectURL))
|
||||
{
|
||||
FREE_NON_NULL_PTR (redirectURL);
|
||||
return WAIT_SUCCESS;
|
||||
}
|
||||
if(wait_status == 307 || wait_status == 302 || wait_status == 303) // only when there is a http redirect
|
||||
{
|
||||
char *redirect_ptr = redirectURL;
|
||||
@@ -437,7 +431,9 @@ int wait_connection_ready (create_connection_ctx_t *ctx)
|
||||
set_current_server (ctx); // set current server to redirect server
|
||||
return WAIT_ACTION_RETRY;
|
||||
}
|
||||
FREE_NON_NULL_PTR (redirectURL);
|
||||
if (NULL != redirectURL) {
|
||||
free (redirectURL);
|
||||
}
|
||||
if(wait_status == 403)
|
||||
{
|
||||
ParodusError("Received Unauthorized response with status: %d\n", wait_status);
|
||||
@@ -560,7 +556,7 @@ int createNopollConnection(noPollCtx *ctx)
|
||||
set_server_list_null (&conn_ctx.server_list);
|
||||
init_backoff_timer (&backoff_timer, max_retry_count);
|
||||
|
||||
while (!g_shutdown)
|
||||
while (true)
|
||||
{
|
||||
query_dns_status = find_servers (&conn_ctx.server_list);
|
||||
if (query_dns_status == FIND_INVALID_DEFAULT)
|
||||
@@ -583,10 +579,9 @@ int createNopollConnection(noPollCtx *ctx)
|
||||
get_parodus_cfg()->cloud_status = CLOUD_STATUS_ONLINE;
|
||||
ParodusInfo("cloud_status set as %s after successful connection\n", get_parodus_cfg()->cloud_status);
|
||||
|
||||
if((get_parodus_cfg()->boot_time != 0) && init) {
|
||||
if(get_parodus_cfg()->boot_time != 0) {
|
||||
getCurrentTime(connectTimePtr);
|
||||
ParodusInfo("connect_time-diff-boot_time=%d\n", connectTimePtr->tv_sec - get_parodus_cfg()->boot_time);
|
||||
init = 0; //set init to 0 so that this is logged only during process start up and not during reconnect
|
||||
}
|
||||
|
||||
free_extra_headers (&conn_ctx);
|
||||
|
||||
@@ -44,16 +44,6 @@ CrudMsg *crudMsgQ = NULL;
|
||||
/* External functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
pthread_cond_t *get_global_crud_con(void)
|
||||
{
|
||||
return &crud_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_crud_mut(void)
|
||||
{
|
||||
return &crud_mut;
|
||||
}
|
||||
|
||||
void addCRUDmsgToQueue(wrp_msg_t *crudMsg)
|
||||
{
|
||||
CrudMsg * crudMessage;
|
||||
@@ -132,10 +122,6 @@ void *CRUDHandlerTask()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (g_shutdown) {
|
||||
pthread_mutex_unlock (&crud_mut);
|
||||
break;
|
||||
}
|
||||
pthread_cond_wait(&crud_con, &crud_mut);
|
||||
pthread_mutex_unlock (&crud_mut);
|
||||
}
|
||||
|
||||
@@ -24,8 +24,6 @@
|
||||
#ifndef _CRUD_INTERFACE_H_
|
||||
#define _CRUD_INTERFACE_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
@@ -45,8 +43,6 @@ typedef struct CrudMsg__
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
void addCRUDresponseToUpstreamQ(void *response_bytes, ssize_t response_size);
|
||||
pthread_cond_t *get_global_crud_con(void);
|
||||
pthread_mutex_t *get_global_crud_mut(void);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
159
src/downstream.c
159
src/downstream.c
@@ -27,10 +27,7 @@
|
||||
#include "partners_check.h"
|
||||
#include "ParodusInternal.h"
|
||||
#include "crud_interface.h"
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static void createNewMsgForCRUD(wrp_msg_t *message, wrp_msg_t **crudMessage );
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -45,7 +42,6 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
{
|
||||
int rv =0;
|
||||
wrp_msg_t *message;
|
||||
wrp_msg_t *crudMessage= NULL;
|
||||
char* destVal = NULL;
|
||||
char dest[32] = {'\0'};
|
||||
int msgType;
|
||||
@@ -79,6 +75,7 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
case WRP_MSG_TYPE__AUTH:
|
||||
{
|
||||
ParodusInfo("Authorization Status received with Status code :%d\n", message->u.auth.status);
|
||||
wrp_free_struct(message);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -123,8 +120,7 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
((WRP_MSG_TYPE__EVENT == msgType) ? "NA" : message->u.crud.transaction_uuid)));
|
||||
|
||||
free(destVal);
|
||||
|
||||
temp = get_global_node();
|
||||
temp = get_global_node();
|
||||
//Checking for individual clients & Sending to each client
|
||||
|
||||
while (NULL != temp)
|
||||
@@ -143,11 +139,10 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
ParodusPrint("checking the next item in the list\n");
|
||||
temp= temp->next;
|
||||
}
|
||||
release_global_node ();
|
||||
|
||||
/* check Downstream dest for CRUD requests */
|
||||
if(destFlag ==0 && strcmp("parodus", dest)==0)
|
||||
{
|
||||
/* check Downstream dest for CRUD requests */
|
||||
if(destFlag ==0 && strcmp("parodus", dest)==0)
|
||||
{
|
||||
ParodusPrint("Received CRUD request : dest : %s\n", dest);
|
||||
if ((message->u.crud.source == NULL) || (message->u.crud.transaction_uuid == NULL))
|
||||
{
|
||||
@@ -159,12 +154,11 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
}
|
||||
else
|
||||
{
|
||||
createNewMsgForCRUD(message, &crudMessage);
|
||||
addCRUDmsgToQueue(crudMessage);
|
||||
addCRUDmsgToQueue(message);
|
||||
}
|
||||
destFlag =1;
|
||||
}
|
||||
//if any unknown dest received sending error response to server
|
||||
}
|
||||
//if any unknown dest received sending error response to server
|
||||
if(destFlag ==0)
|
||||
{
|
||||
ParodusError("Unknown dest:%s\n", dest);
|
||||
@@ -190,14 +184,14 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
else
|
||||
{
|
||||
resp_msg ->u.crud.source = message->u.crud.dest;
|
||||
if(message->u.crud.source !=NULL)
|
||||
{
|
||||
resp_msg ->u.crud.dest = message->u.crud.source;
|
||||
}
|
||||
else
|
||||
{
|
||||
resp_msg ->u.crud.dest = "unknown";
|
||||
}
|
||||
if(message->u.crud.source !=NULL)
|
||||
{
|
||||
resp_msg ->u.crud.dest = message->u.crud.source;
|
||||
}
|
||||
else
|
||||
{
|
||||
resp_msg ->u.crud.dest = "unknown";
|
||||
}
|
||||
resp_msg ->u.crud.transaction_uuid = message->u.crud.transaction_uuid;
|
||||
resp_msg ->u.crud.path = message->u.crud.path;
|
||||
}
|
||||
@@ -231,6 +225,8 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
resp_bytes = NULL;
|
||||
}
|
||||
free(resp_msg);
|
||||
ParodusPrint("free for downstream decoded msg\n");
|
||||
wrp_free_struct(message);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -239,11 +235,9 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
case WRP_MSG_TYPE__SVC_ALIVE:
|
||||
case WRP_MSG_TYPE__UNKNOWN:
|
||||
default:
|
||||
wrp_free_struct(message);
|
||||
break;
|
||||
}
|
||||
ParodusPrint("free for downstream decoded msg\n");
|
||||
wrp_free_struct(message);
|
||||
message = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -251,116 +245,3 @@ void listenerOnMessage(void * msg, size_t msgSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Internal Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/**
|
||||
* @brief createNewMsgForCRUD function to create new message for processing CRUD requests
|
||||
*
|
||||
* @param[in] message The message received from server
|
||||
* @param[out] crudMessage New message for processing CRUD requests
|
||||
*/
|
||||
static void createNewMsgForCRUD(wrp_msg_t *message, wrp_msg_t **crudMessage )
|
||||
{
|
||||
wrp_msg_t *msg;
|
||||
msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) );
|
||||
size_t i;
|
||||
if(msg != NULL)
|
||||
{
|
||||
memset( msg, 0, sizeof( wrp_msg_t ) );
|
||||
msg->msg_type = message->msg_type;
|
||||
if(message->u.crud.source != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.source = %s\n",message->u.crud.source);
|
||||
msg->u.crud.source = strdup(message->u.crud.source);
|
||||
}
|
||||
|
||||
if(message->u.crud.dest!= NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.dest = %s\n",message->u.crud.dest);
|
||||
msg->u.crud.dest = strdup(message->u.crud.dest);
|
||||
}
|
||||
|
||||
if(message->u.crud.transaction_uuid != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.transaction_uuid = %s\n",message->u.crud.transaction_uuid);
|
||||
msg->u.crud.transaction_uuid = strdup(message->u.crud.transaction_uuid);
|
||||
}
|
||||
|
||||
if(message->u.crud.partner_ids!= NULL && message->u.crud.partner_ids->count >0)
|
||||
{
|
||||
msg->u.crud.partner_ids = ( partners_t * ) malloc( sizeof( partners_t ) +
|
||||
sizeof( char * ) * message->u.crud.partner_ids->count );
|
||||
if(msg->u.crud.partner_ids != NULL)
|
||||
{
|
||||
msg->u.crud.partner_ids->count = message->u.crud.partner_ids->count;
|
||||
for(i = 0; i<message->u.crud.partner_ids->count; i++)
|
||||
{
|
||||
ParodusPrint("message->u.crud.partner_ids->partner_ids[%d] = %s\n",i,message->u.crud.partner_ids->partner_ids[i]);
|
||||
msg->u.crud.partner_ids->partner_ids[i] = strdup(message->u.crud.partner_ids->partner_ids[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(message->u.crud.headers!= NULL && message->u.crud.headers->count >0)
|
||||
{
|
||||
msg->u.crud.headers = ( headers_t * ) malloc( sizeof( headers_t ) +
|
||||
sizeof( char * ) * message->u.crud.headers->count );
|
||||
if(msg->u.crud.headers != NULL)
|
||||
{
|
||||
msg->u.crud.headers->count = message->u.crud.headers->count;
|
||||
for(i = 0; i<message->u.crud.headers->count; i++)
|
||||
{
|
||||
ParodusPrint("message->u.crud.headers->headers[%d] = %s\n",i,message->u.crud.headers->headers[i]);
|
||||
msg->u.crud.headers->headers[i] = strdup(message->u.crud.headers->headers[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(message->u.crud.metadata != NULL && message->u.crud.metadata->count > 0)
|
||||
{
|
||||
msg->u.crud.metadata = (data_t *) malloc( sizeof( data_t ) );
|
||||
if(msg->u.crud.metadata != NULL)
|
||||
{
|
||||
memset( msg->u.crud.metadata, 0, sizeof( data_t ) );
|
||||
msg->u.crud.metadata->count = message->u.crud.metadata->count;
|
||||
msg->u.crud.metadata->data_items = ( struct data* )malloc( sizeof( struct data ) * ( message->u.crud.metadata->count ) );
|
||||
for(i=0; i<message->u.crud.metadata->count; i++)
|
||||
{
|
||||
if(message->u.crud.metadata->data_items[i].name != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.metadata->data_items[%d].name : %s\n",i,message->u.crud.metadata->data_items[i].name);
|
||||
msg->u.crud.metadata->data_items[i].name = strdup(message->u.crud.metadata->data_items[i].name);
|
||||
}
|
||||
if(message->u.crud.metadata->data_items[i].value != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.metadata->data_items[%d].value : %s\n",i,message->u.crud.metadata->data_items[i].value);
|
||||
msg->u.crud.metadata->data_items[i].value = strdup(message->u.crud.metadata->data_items[i].value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
msg->u.crud.include_spans = message->u.crud.include_spans;
|
||||
if(message->u.crud.content_type != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.content_type : %s\n",message->u.crud.content_type);
|
||||
msg->u.crud.content_type = strdup(message->u.crud.content_type);
|
||||
}
|
||||
msg->u.crud.spans.spans = NULL; /* not supported */
|
||||
msg->u.crud.spans.count = 0; /* not supported */
|
||||
msg->u.crud.status = message->u.crud.status;
|
||||
msg->u.crud.rdr = message->u.crud.rdr;
|
||||
if(message->u.crud.payload != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.payload = %s\n", (char *)message->u.crud.payload);
|
||||
msg->u.crud.payload = strdup((char *)message->u.crud.payload);
|
||||
}
|
||||
msg->u.crud.payload_size = message->u.crud.payload_size;
|
||||
if(message->u.crud.path != NULL)
|
||||
{
|
||||
ParodusPrint("message->u.crud.path = %s\n", message->u.crud.path);
|
||||
msg->u.crud.path = strdup(message->u.crud.path);
|
||||
}
|
||||
*crudMessage = msg;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,41 +31,9 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
#define KEEPALIVE_INTERVAL_SEC 30
|
||||
|
||||
pthread_mutex_t svc_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t svc_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Utiliy Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static int wait__ (unsigned int secs)
|
||||
{
|
||||
int shutdown_flag;
|
||||
struct timespec svc_alive_timer;
|
||||
|
||||
clock_gettime(CLOCK_REALTIME, &svc_alive_timer);
|
||||
svc_alive_timer.tv_sec += secs;
|
||||
pthread_mutex_lock(&svc_mut);
|
||||
pthread_cond_timedwait (&svc_con, &svc_mut, &svc_alive_timer);
|
||||
shutdown_flag = g_shutdown;
|
||||
pthread_mutex_unlock (&svc_mut);
|
||||
return shutdown_flag;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
pthread_cond_t *get_global_svc_con(void)
|
||||
{
|
||||
return &svc_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_svc_mut(void)
|
||||
{
|
||||
return &svc_mut;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* @brief To handle registered services to indicate that the service is still alive.
|
||||
*/
|
||||
@@ -90,10 +58,10 @@ void *serviceAliveTask()
|
||||
while(1)
|
||||
{
|
||||
ParodusPrint("serviceAliveTask: numOfClients registered is %d\n", get_numOfClients());
|
||||
temp = get_global_node();
|
||||
if(get_numOfClients() > 0)
|
||||
{
|
||||
//sending svc msg to all the clients every 30s
|
||||
temp = get_global_node();
|
||||
size = (size_t) nbytes;
|
||||
while(NULL != temp)
|
||||
{
|
||||
@@ -114,9 +82,8 @@ void *serviceAliveTask()
|
||||
byte = 0;
|
||||
if(ret == 0)
|
||||
{
|
||||
release_global_node ();
|
||||
ParodusPrint("Deletion from list is success, doing resync with head\n");
|
||||
temp= get_global_node();
|
||||
ParodusInfo("Deletion from list is success, doing resync with head\n");
|
||||
ret = -1;
|
||||
}
|
||||
else
|
||||
@@ -124,20 +91,15 @@ void *serviceAliveTask()
|
||||
temp= temp->next;
|
||||
}
|
||||
}
|
||||
release_global_node ();
|
||||
ParodusPrint("Waiting for 30s to send keep alive msg \n");
|
||||
if (wait__ (KEEPALIVE_INTERVAL_SEC))
|
||||
break;
|
||||
sleep(KEEPALIVE_INTERVAL_SEC);
|
||||
}
|
||||
else
|
||||
{
|
||||
release_global_node ();
|
||||
ParodusInfo("No clients are registered, waiting ..\n");
|
||||
if (wait__ (50))
|
||||
break;
|
||||
sleep(50);
|
||||
}
|
||||
}
|
||||
free (svc_bytes);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -24,8 +24,6 @@
|
||||
#ifndef _SERVICE_ALIVE_H_
|
||||
#define _SERVICE_ALIVE_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -35,8 +33,7 @@ extern "C" {
|
||||
#endif
|
||||
|
||||
void *serviceAliveTask();
|
||||
pthread_cond_t *get_global_svc_con(void);
|
||||
pthread_mutex_t *get_global_svc_mut(void);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "spin_thread.h"
|
||||
#include "parodus_log.h"
|
||||
@@ -30,12 +31,12 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
void StartThread(void *(*start_routine) (void *), pthread_t *threadId)
|
||||
void StartThread(void *(*start_routine) (void *))
|
||||
{
|
||||
int err = 0;
|
||||
pthread_t __threadId;
|
||||
pthread_t threadId;
|
||||
|
||||
err = pthread_create(&__threadId, NULL, start_routine, NULL);
|
||||
err = pthread_create(&threadId, NULL, start_routine, NULL);
|
||||
if (err != 0)
|
||||
{
|
||||
ParodusError("Error creating thread :[%s]\n", strerror(err));
|
||||
@@ -43,13 +44,8 @@ void StartThread(void *(*start_routine) (void *), pthread_t *threadId)
|
||||
}
|
||||
else
|
||||
{
|
||||
*threadId = __threadId;
|
||||
ParodusPrint("Thread created Successfully %lu\n", (unsigned long) __threadId);
|
||||
ParodusPrint("Thread created Successfully %lu\n", (unsigned long) threadId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void JoinThread (pthread_t threadId)
|
||||
{
|
||||
pthread_join (threadId, NULL);
|
||||
}
|
||||
|
||||
|
||||
@@ -24,8 +24,6 @@
|
||||
#ifndef _SPIN_THREAD_H_
|
||||
#define _SPIN_THREAD_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
@@ -34,8 +32,8 @@ extern "C" {
|
||||
/* Function Prototypes */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
void StartThread(void *(*start_routine) (void *), pthread_t *threadId);
|
||||
void JoinThread (pthread_t threadId);
|
||||
void StartThread(void *(*start_routine) (void *));
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
@@ -43,10 +43,6 @@ void *messageHandlerTask()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (g_shutdown) {
|
||||
pthread_mutex_unlock (&g_mutex);
|
||||
break;
|
||||
}
|
||||
ParodusPrint("Before pthread cond wait in consumer thread\n");
|
||||
pthread_cond_wait(&g_cond, &g_mutex);
|
||||
pthread_mutex_unlock (&g_mutex);
|
||||
|
||||
@@ -527,9 +527,7 @@ int allow_insecure_conn(char **server_addr, unsigned int *port)
|
||||
}
|
||||
|
||||
if (insecure >= 0) {
|
||||
char *claim_str = cJSON_Print (jwt->private_claims);
|
||||
ParodusInfo ("JWT claims: %s\n", claim_str);
|
||||
free (claim_str);
|
||||
ParodusInfo ("JWT claims: %s\n", cJSON_Print (jwt->private_claims));
|
||||
}
|
||||
cjwt_destroy(&jwt);
|
||||
|
||||
|
||||
270
src/upstream.c
270
src/upstream.c
@@ -34,7 +34,7 @@
|
||||
/* Macros */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
#define METADATA_COUNT 12
|
||||
#define PARODUS_SERVICE_NAME "parodus"
|
||||
#define CLOUD_STATUS_FORMAT "parodus/cloud-status"
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -129,12 +129,6 @@ void *handle_upstream()
|
||||
sock = nn_socket( AF_SP, NN_PULL );
|
||||
if(sock >= 0)
|
||||
{
|
||||
int t = NANO_SOCKET_RCV_TIMEOUT_MS;
|
||||
int rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &t, sizeof(t));
|
||||
if (rc < 0)
|
||||
{
|
||||
ParodusError ("Unable to set socket receive timeout (errno=%d, %s)\n",errno, strerror(errno));
|
||||
}
|
||||
ParodusPrint("Nanomsg bind with get_parodus_cfg()->local_url %s\n", get_parodus_cfg()->local_url);
|
||||
bind = nn_bind(sock, get_parodus_cfg()->local_url);
|
||||
if(bind < 0)
|
||||
@@ -143,18 +137,11 @@ void *handle_upstream()
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusInfo("nanomsg server gone into the listening mode...\n");
|
||||
while( FOREVER() )
|
||||
{
|
||||
buf = NULL;
|
||||
ParodusInfo("nanomsg server gone into the listening mode...\n");
|
||||
bytes = nn_recv (sock, &buf, NN_MSG, 0);
|
||||
if (g_shutdown)
|
||||
break;
|
||||
if (bytes < 0) {
|
||||
if ((errno != EAGAIN) && (errno != ETIMEDOUT))
|
||||
ParodusInfo ("Error (%d) receiving message from nanomsg client\n", errno);
|
||||
continue;
|
||||
}
|
||||
ParodusInfo ("Upstream message received from nanomsg client\n");
|
||||
message = (UpStreamMsg *)malloc(sizeof(UpStreamMsg));
|
||||
|
||||
@@ -190,16 +177,6 @@ void *handle_upstream()
|
||||
ParodusError("failure in allocation for message\n");
|
||||
}
|
||||
}
|
||||
if(nn_shutdown(sock, bind) < 0)
|
||||
{
|
||||
ParodusError ("nn_shutdown bind socket=%d endpt=%d, err=%d\n",
|
||||
sock, bind, errno);
|
||||
}
|
||||
if (nn_close (sock) < 0)
|
||||
{
|
||||
ParodusError ("nn_close bind socket=%d err=%d\n",
|
||||
sock, errno);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -220,10 +197,11 @@ void *processUpstreamMessage()
|
||||
reg_list_item_t *temp = NULL;
|
||||
int matchFlag = 0;
|
||||
int status = -1;
|
||||
char *device_id = NULL;
|
||||
size_t device_id_len = 0;
|
||||
size_t parodus_len;
|
||||
int ret = -1;
|
||||
char *serviceName = NULL;
|
||||
char *macId = NULL;
|
||||
char *destService, *destApplication =NULL;
|
||||
char *sourceService, *sourceApplication =NULL;
|
||||
int sendStatus =-1;
|
||||
|
||||
while(FOREVER())
|
||||
{
|
||||
@@ -244,40 +222,34 @@ void *processUpstreamMessage()
|
||||
if(rv > 0)
|
||||
{
|
||||
msgType = msg->msg_type;
|
||||
if(msgType == WRP_MSG_TYPE__SVC_REGISTRATION)
|
||||
if(msgType == 9)
|
||||
{
|
||||
ParodusInfo("\n Nanomsg client Registration for Upstream\n");
|
||||
//Extract serviceName and url & store it in a linked list for reg_clients
|
||||
temp = get_global_node();
|
||||
if(get_numOfClients() !=0)
|
||||
{
|
||||
matchFlag = 0;
|
||||
ParodusPrint("matchFlag reset to %d\n", matchFlag);
|
||||
temp = get_global_node();
|
||||
while(temp!=NULL)
|
||||
{
|
||||
if(strcmp(temp->service_name, msg->u.reg.service_name)==0)
|
||||
{
|
||||
ParodusInfo("match found, client is already registered\n");
|
||||
parStrncpy(temp->url,msg->u.reg.url, sizeof(temp->url));
|
||||
if(nn_shutdown(temp->sock, temp->endpoint) < 0)
|
||||
if(nn_shutdown(temp->sock, 0) < 0)
|
||||
{
|
||||
ParodusError ("nn_shutdown socket=%d endpt=%d, err=%d\n",
|
||||
temp->sock, temp->endpoint, errno);
|
||||
}
|
||||
if (nn_close (temp->sock) < 0)
|
||||
{
|
||||
ParodusError ("nn_close socket=%d err=%d\n",
|
||||
temp->sock, errno);
|
||||
ParodusError ("Failed to shutdown\n");
|
||||
}
|
||||
|
||||
temp->sock = nn_socket(AF_SP,NN_PUSH );
|
||||
if(temp->sock >= 0)
|
||||
{
|
||||
int t = NANO_SOCKET_SEND_TIMEOUT_MS;
|
||||
int t = NANOMSG_SOCKET_TIMEOUT_MSEC;
|
||||
rc = nn_setsockopt(temp->sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t));
|
||||
if(rc < 0)
|
||||
{
|
||||
ParodusError ("Unable to set socket send timeout (errno=%d, %s)\n",errno, strerror(errno));
|
||||
ParodusError ("Unable to set socket timeout (errno=%d, %s)\n",errno, strerror(errno));
|
||||
}
|
||||
rc = nn_connect(temp->sock, msg->u.reg.url);
|
||||
if(rc < 0)
|
||||
@@ -286,8 +258,7 @@ void *processUpstreamMessage()
|
||||
}
|
||||
else
|
||||
{
|
||||
temp->endpoint = rc;
|
||||
ParodusInfo("Client registered before. Sending ack on socket %d\n", temp->sock);
|
||||
ParodusInfo("Client registered before. Sending acknowledgement \n");
|
||||
status =sendAuthStatus(temp);
|
||||
|
||||
if(status == 0)
|
||||
@@ -318,13 +289,11 @@ void *processUpstreamMessage()
|
||||
ParodusPrint("sent auth status to reg client\n");
|
||||
}
|
||||
}
|
||||
release_global_node ();
|
||||
}
|
||||
else if(msgType == WRP_MSG_TYPE__EVENT)
|
||||
{
|
||||
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
|
||||
partners_t *partnersList = NULL;
|
||||
int j = 0;
|
||||
|
||||
int ret = validate_partner_id(msg, &partnersList);
|
||||
if(ret == 1)
|
||||
@@ -353,18 +322,6 @@ void *processUpstreamMessage()
|
||||
{
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
if(partnersList != NULL)
|
||||
{
|
||||
for(j=0; j<(int)partnersList->count; j++)
|
||||
{
|
||||
if(NULL != partnersList->partner_ids[j])
|
||||
{
|
||||
free(partnersList->partner_ids[j]);
|
||||
}
|
||||
}
|
||||
free(partnersList);
|
||||
}
|
||||
partnersList = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -377,48 +334,90 @@ void *processUpstreamMessage()
|
||||
else
|
||||
{
|
||||
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s status: %d\n",msgType, msg->u.crud.dest, msg->u.crud.transaction_uuid, msg->u.crud.status );
|
||||
if(WRP_MSG_TYPE__RETREIVE == msgType)
|
||||
if(WRP_MSG_TYPE__RETREIVE == msgType && msg->u.crud.dest !=NULL && msg->u.crud.source != NULL)
|
||||
{
|
||||
ret = getDeviceId(&device_id, &device_id_len);
|
||||
if(ret == 0)
|
||||
macId = wrp_get_msg_element(WRP_ID_ELEMENT__ID, msg, DEST);
|
||||
destService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
destApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, DEST);
|
||||
sourceService = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, SOURCE);
|
||||
sourceApplication = wrp_get_msg_element(WRP_ID_ELEMENT__APPLICATION, msg, SOURCE);
|
||||
/* Handle cloud-status retrieve request here
|
||||
Expecting dest format as mac:xxxxxxxxxxxx/parodus/cloud-status
|
||||
Parse dest field and check destService is "parodus" and destApplication is "cloud-status"
|
||||
*/
|
||||
if(macId != NULL)
|
||||
{
|
||||
if(destService != NULL && destApplication != NULL && strcmp(destService,"parodus")== 0 && strcmp(destApplication,"cloud-status")== 0)
|
||||
{
|
||||
ParodusPrint("device_id %s device_id_len %lu\n", device_id, device_id_len);
|
||||
/* Match dest based on device_id. Check dest start with: "mac:112233445xxx" ? */
|
||||
if( 0 == strncasecmp(device_id, msg->u.crud.dest, device_id_len-1) )
|
||||
retrieve_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) );
|
||||
memset(retrieve_msg, 0, sizeof(wrp_msg_t));
|
||||
retrieve_msg->msg_type = msg->msg_type;
|
||||
retrieve_msg->u.crud.transaction_uuid = strdup(msg->u.crud.transaction_uuid);
|
||||
retrieve_msg->u.crud.source = strdup(msg->u.crud.source);
|
||||
retrieve_msg->u.crud.dest = strdup(msg->u.crud.dest);
|
||||
addCRUDmsgToQueue(retrieve_msg);
|
||||
}
|
||||
else if(sourceService != NULL && sourceApplication != NULL && strcmp(sourceService,"parodus")== 0 && strcmp(sourceApplication,"cloud-status")== 0 && strncmp(msg->u.crud.dest,"mac:", 4)==0)
|
||||
{
|
||||
/* Handle cloud-status retrieve response here to send it to registered client
|
||||
Expecting src format as mac:xxxxxxxxxxxx/parodus/cloud-status and dest as mac:
|
||||
Parse src field and check sourceService is "parodus" and sourceApplication is "cloud-status"
|
||||
*/
|
||||
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
if ( serviceName != NULL)
|
||||
{
|
||||
/* For this device. */
|
||||
parodus_len = strlen( PARODUS_SERVICE_NAME );
|
||||
if( 0 == strncmp(PARODUS_SERVICE_NAME, &msg->u.crud.dest[device_id_len], parodus_len-1) )
|
||||
//Send Client cloud-status response back to registered client
|
||||
ParodusInfo("Sending cloud-status response to %s client\n",serviceName);
|
||||
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)&message->msg,message->len);
|
||||
if(sendStatus ==1)
|
||||
{
|
||||
/* For Parodus CRUD queue. */
|
||||
ParodusInfo("Create RetrieveMsg and add to parodus CRUD queue\n");
|
||||
createUpstreamRetrieveMsg(msg, &retrieve_msg);
|
||||
addCRUDmsgToQueue(retrieve_msg);
|
||||
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* For nanomsg clients. */
|
||||
getServiceNameAndSendResponse(msg, &message->msg, message->len);
|
||||
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
|
||||
}
|
||||
free(serviceName);
|
||||
serviceName = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Not for this device. Send upstream */
|
||||
ParodusInfo("sendUpstreamMsgToServer \n");
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
if(device_id != NULL)
|
||||
{
|
||||
free(device_id);
|
||||
device_id = NULL;
|
||||
ParodusError("serviceName is NULL,not sending cloud-status response to client\n");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to get device_id\n");
|
||||
ParodusInfo("sendUpstreamMsgToServer \n");
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
} else if (WRP_MSG_TYPE__SVC_ALIVE != msgType) {
|
||||
/* Don't reply to service alive message */
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("MAC is null, not handling retrieve wrp message \n");
|
||||
}
|
||||
if(sourceService !=NULL)
|
||||
{
|
||||
free(sourceService);
|
||||
sourceService = NULL;
|
||||
}
|
||||
if(sourceApplication !=NULL)
|
||||
{
|
||||
free(sourceApplication);
|
||||
sourceApplication = NULL;
|
||||
}
|
||||
if(destService !=NULL)
|
||||
{
|
||||
free(destService);
|
||||
destService = NULL;
|
||||
}
|
||||
if(destApplication !=NULL)
|
||||
{
|
||||
free(destApplication);
|
||||
destApplication = NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
sendUpstreamMsgToServer(&message->msg, message->len);
|
||||
}
|
||||
}
|
||||
@@ -451,10 +450,6 @@ void *processUpstreamMessage()
|
||||
}
|
||||
else
|
||||
{
|
||||
if (g_shutdown) {
|
||||
pthread_mutex_unlock (&nano_mut);
|
||||
break;
|
||||
}
|
||||
ParodusPrint("Before pthread cond wait in consumer thread\n");
|
||||
pthread_cond_wait(&nano_con, &nano_mut);
|
||||
pthread_mutex_unlock (&nano_mut);
|
||||
@@ -464,107 +459,6 @@ void *processUpstreamMessage()
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief getDeviceId function to create deviceId in the format "mac:112233445xxx"
|
||||
*
|
||||
* @param[out] device_id in the format "mac:112233445xxx"
|
||||
* @param[out] total size of device_id
|
||||
*/
|
||||
int getDeviceId(char **device_id, size_t *device_id_len)
|
||||
{
|
||||
char *deviceID = NULL;
|
||||
size_t len;
|
||||
|
||||
if((get_parodus_cfg()->hw_mac !=NULL) && (strlen(get_parodus_cfg()->hw_mac)!=0))
|
||||
{
|
||||
len = strlen(get_parodus_cfg()->hw_mac) + 5;
|
||||
|
||||
deviceID = (char *) malloc(sizeof(char)*64);
|
||||
if(deviceID != NULL)
|
||||
{
|
||||
snprintf(deviceID, len, "mac:%s", get_parodus_cfg()->hw_mac);
|
||||
*device_id = deviceID;
|
||||
*device_id_len = len;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("device_id allocation failed\n");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("device mac is NULL\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @brief createUpstreamRetrieveMsg function to create new message for processing Retrieve requests
|
||||
*
|
||||
* @param[in] message The upstream message received from cloud or internal clients
|
||||
* @param[out] retrieve_msg New message for processing Retrieve requests
|
||||
*/
|
||||
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg)
|
||||
{
|
||||
wrp_msg_t *msg;
|
||||
msg = ( wrp_msg_t * ) malloc( sizeof( wrp_msg_t ) );
|
||||
if(msg != NULL)
|
||||
{
|
||||
memset(msg, 0, sizeof(wrp_msg_t));
|
||||
|
||||
msg->msg_type = message->msg_type;
|
||||
if(message->u.crud.transaction_uuid != NULL)
|
||||
{
|
||||
msg->u.crud.transaction_uuid = strdup(message->u.crud.transaction_uuid);
|
||||
}
|
||||
if(message->u.crud.source !=NULL)
|
||||
{
|
||||
msg->u.crud.source = strdup(message->u.crud.source);
|
||||
}
|
||||
if(message->u.crud.dest != NULL)
|
||||
{
|
||||
msg->u.crud.dest = strdup(message->u.crud.dest);
|
||||
}
|
||||
*retrieve_msg = msg;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief getServiceNameAndSendResponse function to fetch client service name and to send msg to it.
|
||||
*
|
||||
* @param[in] msg The decoded message to fetch client service name from its dest field
|
||||
* @param[in] msg_bytes The encoded upstream msg to be sent to client
|
||||
* @param[in] msg_size Total size of the msg to send to client
|
||||
*/
|
||||
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size)
|
||||
{
|
||||
char *serviceName = NULL;
|
||||
int sendStatus =-1;
|
||||
|
||||
serviceName = wrp_get_msg_element(WRP_ID_ELEMENT__SERVICE, msg, DEST);
|
||||
if ( serviceName != NULL)
|
||||
{
|
||||
sendStatus=sendMsgtoRegisteredClients(serviceName,(const char **)msg_bytes, msg_size);
|
||||
if(sendStatus ==1)
|
||||
{
|
||||
ParodusInfo("Send upstreamMsg successfully to registered client %s\n", serviceName);
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("Failed to send upstreamMsg to registered client %s\n", serviceName);
|
||||
}
|
||||
free(serviceName);
|
||||
serviceName = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
ParodusError("serviceName is NULL,not sending retrieve response to client\n");
|
||||
}
|
||||
}
|
||||
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size)
|
||||
{
|
||||
void *appendData;
|
||||
|
||||
@@ -47,10 +47,8 @@ typedef struct UpStreamMsg__
|
||||
void packMetaData();
|
||||
void *handle_upstream();
|
||||
void *processUpstreamMessage();
|
||||
int getDeviceId(char **device_id, size_t *device_id_len);
|
||||
|
||||
void sendUpstreamMsgToServer(void **resp_bytes, size_t resp_size);
|
||||
void getServiceNameAndSendResponse(wrp_msg_t *msg, void **msg_bytes, size_t msg_size);
|
||||
void createUpstreamRetrieveMsg(wrp_msg_t *message, wrp_msg_t **retrieve_msg);
|
||||
void set_global_UpStreamMsgQ(UpStreamMsg * UpStreamQ);
|
||||
UpStreamMsg * get_global_UpStreamMsgQ(void);
|
||||
pthread_cond_t *get_global_nano_con(void);
|
||||
|
||||
@@ -26,9 +26,6 @@
|
||||
#define TEST_CLIENT1_URL "tcp://127.0.0.1:6677"
|
||||
#define TEST_CLIENT2_URL "tcp://127.0.0.1:6655"
|
||||
|
||||
pthread_t test_tid;
|
||||
pthread_t test_tid2;
|
||||
|
||||
static void *client_rcv_task();
|
||||
static void *client2_rcv_task();
|
||||
|
||||
@@ -61,7 +58,7 @@ void test_client_addtolist()
|
||||
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
|
||||
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
|
||||
|
||||
StartThread(client_rcv_task, &test_tid);
|
||||
StartThread(client_rcv_task);
|
||||
|
||||
status = addToList(&message);
|
||||
ParodusPrint("addToList status is %d\n", status);
|
||||
@@ -76,7 +73,7 @@ void test_client_addtolist()
|
||||
CU_ASSERT_STRING_EQUAL( temp->service_name, message->u.reg.service_name );
|
||||
CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url );
|
||||
}
|
||||
release_global_node ();
|
||||
|
||||
wrp_free_struct(message);
|
||||
free(bytes);
|
||||
ParodusInfo("test_client_addtolist done..\n");
|
||||
@@ -192,7 +189,7 @@ void test_addtolist_multiple_clients()
|
||||
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
|
||||
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
|
||||
|
||||
StartThread(client2_rcv_task, &test_tid2);
|
||||
StartThread(client2_rcv_task);
|
||||
|
||||
status = addToList(&message);
|
||||
ParodusPrint("addToList status is %d\n", status);
|
||||
@@ -209,7 +206,7 @@ void test_addtolist_multiple_clients()
|
||||
CU_ASSERT_STRING_EQUAL( temp->url, message->u.reg.url );
|
||||
|
||||
}
|
||||
release_global_node ();
|
||||
|
||||
wrp_free_struct(message);
|
||||
free(bytes);
|
||||
ParodusInfo("test_addtolist_multiple_clients done..\n");
|
||||
|
||||
@@ -35,12 +35,8 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
UpStreamMsg *UpStreamMsgQ;
|
||||
ParodusMsg *ParodusMsgQ;
|
||||
pthread_mutex_t g_mutex=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t g_cond=PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t nano_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t nano_con=PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t svc_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t svc_con=PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t nano_mut;
|
||||
pthread_cond_t nano_con;
|
||||
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -107,9 +103,9 @@ void *messageHandlerTask()
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int serviceAliveTask()
|
||||
void *serviceAliveTask()
|
||||
{
|
||||
return 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int nopoll_loop_wait(noPollCtx * ctx,long timeout)
|
||||
@@ -160,18 +156,12 @@ void set_global_conn(noPollConn *conn)
|
||||
function_called();
|
||||
}
|
||||
|
||||
void StartThread(void *(*start_routine) (void *), pthread_t *threadId)
|
||||
void StartThread(void *(*start_routine) (void *))
|
||||
{
|
||||
UNUSED(start_routine);
|
||||
UNUSED(threadId);
|
||||
function_called();
|
||||
}
|
||||
|
||||
void JoinThread (pthread_t threadId)
|
||||
{
|
||||
UNUSED(threadId);
|
||||
}
|
||||
|
||||
noPollCtx* nopoll_ctx_new(void)
|
||||
{
|
||||
function_called();
|
||||
@@ -202,16 +192,6 @@ pthread_mutex_t *get_global_nano_mut(void)
|
||||
return &nano_mut;
|
||||
}
|
||||
|
||||
pthread_cond_t *get_global_svc_con(void)
|
||||
{
|
||||
return &svc_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_svc_mut(void)
|
||||
{
|
||||
return &svc_mut;
|
||||
}
|
||||
|
||||
/*
|
||||
* Mock func to calculate time diff between start and stop time
|
||||
* This timespec_diff retuns 1 sec as diff time
|
||||
@@ -225,10 +205,6 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
|
||||
diff->tv_nsec = 1000;
|
||||
}
|
||||
|
||||
void deleteAllClients (void)
|
||||
{
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Tests */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -63,7 +63,6 @@ bool LastReasonStatus;
|
||||
pthread_mutex_t close_mut;
|
||||
|
||||
// Mock values
|
||||
bool g_shutdown = false;
|
||||
char *mock_server_addr;
|
||||
unsigned int mock_port;
|
||||
int mock_wait_status;
|
||||
@@ -516,58 +515,34 @@ void test_nopoll_connect ()
|
||||
test_server.allow_insecure = 1;
|
||||
will_return (nopoll_conn_new_opts, NULL);
|
||||
expect_function_call (nopoll_conn_new_opts);
|
||||
//will_return (checkHostIp, 0);
|
||||
//expect_function_call (checkHostIp);
|
||||
will_return (checkHostIp, 0);
|
||||
expect_function_call (checkHostIp);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
test_server.allow_insecure = 0;
|
||||
will_return (nopoll_conn_tls_new6, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new6);
|
||||
//will_return (checkHostIp, 0);
|
||||
//expect_function_call (checkHostIp);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
will_return (nopoll_conn_tls_new, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new);
|
||||
will_return (checkHostIp, 0);
|
||||
expect_function_call (checkHostIp);
|
||||
assert_int_equal (nopoll_connect (&ctx, false), 0);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
will_return (nopoll_conn_tls_new6, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new6);
|
||||
//will_return (checkHostIp, -2);
|
||||
//expect_function_call (checkHostIp);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
will_return (nopoll_conn_tls_new, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new);
|
||||
will_return (checkHostIp, -2);
|
||||
expect_function_call (checkHostIp);
|
||||
assert_int_equal (nopoll_connect (&ctx, false), 0);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
will_return (nopoll_conn_tls_new6, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new6);
|
||||
//will_return (checkHostIp, -2);
|
||||
//expect_function_call (checkHostIp);
|
||||
//ctx.connect_timer.start_time.tv_sec -= (15*60);
|
||||
//will_return(kill, 1);
|
||||
//expect_function_call(kill);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
will_return (nopoll_conn_tls_new, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new);
|
||||
will_return (checkHostIp, -2);
|
||||
expect_function_call (checkHostIp);
|
||||
ctx.connect_timer.start_time.tv_sec -= (15*60);
|
||||
will_return(kill, 1);
|
||||
expect_function_call(kill);
|
||||
assert_int_equal (nopoll_connect (&ctx, false), 0);
|
||||
assert_int_equal (nopoll_connect (&ctx, true), 0);
|
||||
assert_ptr_equal(NULL, get_global_conn());
|
||||
|
||||
init_expire_timer (&ctx.connect_timer);
|
||||
@@ -739,8 +714,8 @@ void test_connect_and_wait ()
|
||||
test_server.allow_insecure = 0;
|
||||
will_return (nopoll_conn_tls_new6, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new6);
|
||||
//will_return (checkHostIp, 0);
|
||||
//expect_function_call (checkHostIp);
|
||||
will_return (checkHostIp, 0);
|
||||
expect_function_call (checkHostIp);
|
||||
assert_int_equal (connect_and_wait (&ctx), CONN_WAIT_RETRY_DNS);
|
||||
|
||||
Cfg.flags = 0;
|
||||
@@ -748,8 +723,8 @@ void test_connect_and_wait ()
|
||||
|
||||
will_return (nopoll_conn_tls_new6, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new6);
|
||||
//will_return (checkHostIp, 0);
|
||||
//expect_function_call (checkHostIp);
|
||||
will_return (checkHostIp, 0);
|
||||
expect_function_call (checkHostIp);
|
||||
will_return (nopoll_conn_tls_new, NULL);
|
||||
expect_function_call (nopoll_conn_tls_new);
|
||||
will_return (checkHostIp, 0);
|
||||
|
||||
@@ -35,11 +35,9 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
bool g_shutdown = false;
|
||||
extern CrudMsg *crudMsgQ;
|
||||
int numLoops = 1;
|
||||
wrp_msg_t *temp = NULL;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
@@ -165,7 +165,7 @@ void test_retrieveFromMemory()
|
||||
assert_int_equal (ret, 0);
|
||||
ret = retrieveFromMemory("webpa-protocol", &jsonresponse );
|
||||
assert_int_equal (ret, 0);
|
||||
ret = retrieveFromMemory("webpa-interface-used", &jsonresponse );
|
||||
ret = retrieveFromMemory("webpa-inteface-used", &jsonresponse );
|
||||
assert_int_equal (ret, 0);
|
||||
ret = retrieveFromMemory("webpa-backoff-max", &jsonresponse );
|
||||
assert_int_equal (ret, 0);
|
||||
@@ -200,7 +200,7 @@ void test_retrieveFromMemoryFailure()
|
||||
assert_int_equal (ret, -1);
|
||||
ret = retrieveFromMemory("webpa-protocol", &jsonresponse );
|
||||
assert_int_equal (ret, -1);
|
||||
ret = retrieveFromMemory("webpa-interface-used", &jsonresponse );
|
||||
ret = retrieveFromMemory("webpa-inteface-used", &jsonresponse );
|
||||
assert_int_equal (ret, -1);
|
||||
ret = retrieveFromMemory("webpa-backoff-max", &jsonresponse );
|
||||
assert_int_equal (ret, 0);
|
||||
|
||||
@@ -61,10 +61,6 @@ reg_list_item_t * get_global_node(void)
|
||||
return mock_ptr_type(reg_list_item_t *);
|
||||
}
|
||||
|
||||
void release_global_node (void)
|
||||
{
|
||||
}
|
||||
|
||||
ssize_t wrp_to_struct( const void *bytes, const size_t length,
|
||||
const enum wrp_format fmt, wrp_msg_t **msg )
|
||||
{
|
||||
@@ -72,7 +68,7 @@ ssize_t wrp_to_struct( const void *bytes, const size_t length,
|
||||
function_called();
|
||||
*msg = (wrp_msg_t*) malloc(sizeof(wrp_msg_t));
|
||||
memset(*msg, 0, sizeof(wrp_msg_t));
|
||||
(*msg)->msg_type = WRP_MSG_TYPE__REQ;
|
||||
(*msg)->msg_type = WRP_MSG_TYPE__REQ;
|
||||
(*msg)->u.req.dest = (char *) malloc(sizeof(char) *100);
|
||||
(*msg)->u.req.partner_ids = (partners_t *) malloc(sizeof(partners_t));
|
||||
(*msg)->u.req.partner_ids->count = 1;
|
||||
@@ -83,10 +79,8 @@ ssize_t wrp_to_struct( const void *bytes, const size_t length,
|
||||
{
|
||||
(*msg)->msg_type = WRP_MSG_TYPE__CREATE;
|
||||
parStrncpy((*msg)->u.crud.dest,"mac:1122334455/parodus", 100);
|
||||
(*msg)->u.crud.source = (char *) malloc(sizeof(char) *40);
|
||||
parStrncpy ((*msg)->u.crud.source, "tag-update", 40);
|
||||
(*msg)->u.crud.transaction_uuid = (char *) malloc(sizeof(char) *40);
|
||||
parStrncpy ((*msg)->u.crud.transaction_uuid, "1234", 40);
|
||||
(*msg)->u.crud.source = "tag-update";
|
||||
(*msg)->u.crud.transaction_uuid = "1234";
|
||||
}
|
||||
return (ssize_t) mock();
|
||||
}
|
||||
|
||||
@@ -244,10 +244,6 @@ reg_list_item_t *get_global_node(void)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void release_global_node (void)
|
||||
{
|
||||
}
|
||||
|
||||
void wrp_free_struct( wrp_msg_t *msg )
|
||||
{
|
||||
if( WRP_MSG_TYPE__EVENT == tests[i].s.msg_type ) {
|
||||
|
||||
@@ -33,18 +33,6 @@ static void *keep_alive_thread();
|
||||
static void add_client();
|
||||
int sock1;
|
||||
pthread_t threadId;
|
||||
pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
pthread_cond_t *get_global_crud_con(void)
|
||||
{
|
||||
return &crud_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_crud_mut(void)
|
||||
{
|
||||
return &crud_mut;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Tests */
|
||||
@@ -60,14 +48,12 @@ void *CRUDHandlerTask()
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void add_client()
|
||||
{
|
||||
const wrp_msg_t reg = { .msg_type = WRP_MSG_TYPE__SVC_REGISTRATION,
|
||||
.u.reg.service_name = "service_client",
|
||||
.u.reg.url = TEST_SERVICE_URL};
|
||||
|
||||
pthread_t test_tid;
|
||||
void *bytes;
|
||||
int size =0;
|
||||
int rv;
|
||||
@@ -85,7 +71,7 @@ static void add_client()
|
||||
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
|
||||
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
|
||||
|
||||
StartThread(client_rcv_task, &test_tid);
|
||||
StartThread(client_rcv_task);
|
||||
status = addToList(&message);
|
||||
ParodusPrint("addToList status is %d\n", status);
|
||||
|
||||
@@ -148,10 +134,7 @@ static void *keep_alive_thread()
|
||||
//ParodusPrint("keep_alive threadId is %d\n", threadId);
|
||||
sleep(2);
|
||||
ParodusPrint("Starting serviceAliveTask..\n");
|
||||
while (true) {
|
||||
serviceAliveTask();
|
||||
sleep (30);
|
||||
}
|
||||
serviceAliveTask();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -23,8 +23,6 @@
|
||||
#include "../src/ParodusInternal.h"
|
||||
#include "../src/spin_thread.h"
|
||||
|
||||
pthread_t test_tid;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -56,7 +54,7 @@ void *_routine(void *v)
|
||||
|
||||
void test_StartThread_error()
|
||||
{
|
||||
StartThread(&_routine, &test_tid);
|
||||
StartThread(&_routine);
|
||||
}
|
||||
|
||||
void add_suites( CU_pSuite *suite )
|
||||
|
||||
@@ -23,8 +23,6 @@
|
||||
#include "../src/ParodusInternal.h"
|
||||
#include "../src/spin_thread.h"
|
||||
|
||||
pthread_t test_tid;
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -46,7 +44,7 @@ void *_routine(void *v)
|
||||
|
||||
void test_StartThread_success()
|
||||
{
|
||||
StartThread(&_routine, &test_tid);
|
||||
StartThread(&_routine);
|
||||
}
|
||||
|
||||
void add_suites( CU_pSuite *suite )
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* File Scoped Variables */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
bool g_shutdown = false;
|
||||
ParodusMsg *ParodusMsgQ;
|
||||
pthread_mutex_t g_mutex;
|
||||
pthread_cond_t g_cond;
|
||||
|
||||
@@ -158,19 +158,6 @@ extern void read_key_from_file (const char *fname, char *buf, size_t buflen);
|
||||
extern const char *get_tok (const char *src, int delim, char *result, int resultsize);
|
||||
extern unsigned int get_algo_mask (const char *algo_str);
|
||||
|
||||
pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
pthread_cond_t *get_global_crud_con(void)
|
||||
{
|
||||
return &crud_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_crud_mut(void)
|
||||
{
|
||||
return &crud_mut;
|
||||
}
|
||||
|
||||
void addCRUDmsgToQueue(wrp_msg_t *crudMsg)
|
||||
{
|
||||
(void)crudMsg;
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
#include <cmocka.h>
|
||||
#include <assert.h>
|
||||
#include <wrp-c.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "../src/token.h"
|
||||
|
||||
|
||||
@@ -31,20 +31,6 @@
|
||||
/* Mocks */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
|
||||
|
||||
pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER;
|
||||
pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER;
|
||||
|
||||
pthread_cond_t *get_global_crud_con(void)
|
||||
{
|
||||
return &crud_con;
|
||||
}
|
||||
|
||||
pthread_mutex_t *get_global_crud_mut(void)
|
||||
{
|
||||
return &crud_mut;
|
||||
}
|
||||
|
||||
void addCRUDmsgToQueue(wrp_msg_t *crudMsg)
|
||||
{
|
||||
(void)crudMsg;
|
||||
|
||||
@@ -38,12 +38,10 @@
|
||||
/*----------------------------------------------------------------------------*/
|
||||
static noPollConn *conn;
|
||||
static char *reconnect_reason = "webpa_process_starts";
|
||||
bool g_shutdown = false;
|
||||
static ParodusCfg parodusCfg;
|
||||
extern size_t metaPackSize;
|
||||
extern UpStreamMsg *UpStreamMsgQ;
|
||||
int numLoops = 1;
|
||||
int deviceIDNull =0;
|
||||
wrp_msg_t *temp = NULL;
|
||||
extern pthread_mutex_t nano_mut;
|
||||
extern pthread_cond_t nano_con;
|
||||
@@ -68,10 +66,6 @@ reg_list_item_t * get_global_node(void)
|
||||
return mock_ptr_type(reg_list_item_t *);
|
||||
}
|
||||
|
||||
void release_global_node (void)
|
||||
{
|
||||
}
|
||||
|
||||
int get_numOfClients()
|
||||
{
|
||||
function_called();
|
||||
@@ -98,21 +92,8 @@ void sendMessage(noPollConn *conn, void *msg, size_t len)
|
||||
function_called();
|
||||
}
|
||||
|
||||
void set_parodus_cfg(ParodusCfg *cfg)
|
||||
{
|
||||
memcpy(&parodusCfg, cfg, sizeof(ParodusCfg));
|
||||
}
|
||||
|
||||
ParodusCfg *get_parodus_cfg(void)
|
||||
{
|
||||
ParodusCfg cfg;
|
||||
memset(&cfg,0,sizeof(cfg));
|
||||
parStrncpy(cfg.hw_mac , "14cfe2142xxx", sizeof(cfg.hw_mac));
|
||||
if(deviceIDNull)
|
||||
{
|
||||
parStrncpy(cfg.hw_mac , "", sizeof(cfg.hw_mac));
|
||||
}
|
||||
set_parodus_cfg(&cfg);
|
||||
return &parodusCfg;
|
||||
}
|
||||
|
||||
@@ -215,13 +196,6 @@ int nn_shutdown (int s, int how)
|
||||
return (int)mock();
|
||||
}
|
||||
|
||||
int nn_close (int s)
|
||||
{
|
||||
UNUSED(s);
|
||||
function_called();
|
||||
return (int)mock();
|
||||
}
|
||||
|
||||
int nn_setsockopt (int s, int level, int option, const void *optval, size_t optvallen)
|
||||
{
|
||||
UNUSED(s); UNUSED(level); UNUSED(option); UNUSED(optval); UNUSED(optvallen);
|
||||
@@ -267,16 +241,10 @@ void test_handleUpstreamNull()
|
||||
UpStreamMsgQ = NULL;
|
||||
will_return(nn_socket, 1);
|
||||
expect_function_call(nn_socket);
|
||||
will_return(nn_setsockopt, 0);
|
||||
expect_function_call(nn_setsockopt);
|
||||
will_return(nn_bind, 1);
|
||||
expect_function_call(nn_bind);
|
||||
will_return(nn_recv, 12);
|
||||
expect_function_call(nn_recv);
|
||||
will_return(nn_shutdown, 0);
|
||||
expect_function_call(nn_shutdown);
|
||||
will_return(nn_close, 0);
|
||||
expect_function_call(nn_close);
|
||||
handle_upstream();
|
||||
}
|
||||
|
||||
@@ -292,16 +260,10 @@ void test_handle_upstream()
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
will_return(nn_socket, 1);
|
||||
expect_function_call(nn_socket);
|
||||
will_return(nn_setsockopt, 0);
|
||||
expect_function_call(nn_setsockopt);
|
||||
will_return(nn_bind, 1);
|
||||
expect_function_call(nn_bind);
|
||||
will_return(nn_recv, 12);
|
||||
expect_function_call(nn_recv);
|
||||
will_return(nn_shutdown, 0);
|
||||
expect_function_call(nn_shutdown);
|
||||
will_return(nn_close, 0);
|
||||
expect_function_call(nn_close);
|
||||
handle_upstream();
|
||||
free(UpStreamMsgQ->next);
|
||||
free(UpStreamMsgQ);
|
||||
@@ -311,8 +273,6 @@ void err_handleUpstreamBindFailure()
|
||||
{
|
||||
will_return(nn_socket, 1);
|
||||
expect_function_call(nn_socket);
|
||||
will_return(nn_setsockopt, 0);
|
||||
expect_function_call(nn_setsockopt);
|
||||
will_return(nn_bind, -1);
|
||||
expect_function_call(nn_bind);
|
||||
handle_upstream();
|
||||
@@ -454,18 +414,15 @@ void test_processUpstreamMessageRegMsg()
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
|
||||
will_return(get_global_node, (intptr_t)head);
|
||||
expect_function_call(get_global_node);
|
||||
|
||||
will_return(get_numOfClients, 1);
|
||||
expect_function_call(get_numOfClients);
|
||||
|
||||
will_return(get_global_node, (intptr_t)head);
|
||||
expect_function_call(get_global_node);
|
||||
|
||||
will_return(nn_shutdown, 1);
|
||||
expect_function_call(nn_shutdown);
|
||||
|
||||
will_return(nn_close, 0);
|
||||
expect_function_call(nn_close);
|
||||
|
||||
will_return(nn_socket, 1);
|
||||
expect_function_call(nn_socket);
|
||||
|
||||
@@ -516,9 +473,6 @@ void test_processUpstreamMessageRegMsgNoClients()
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
|
||||
will_return(get_global_node, (intptr_t)head);
|
||||
expect_function_call(get_global_node);
|
||||
|
||||
will_return(get_numOfClients, 0);
|
||||
expect_function_call(get_numOfClients);
|
||||
|
||||
@@ -616,27 +570,21 @@ void err_processUpstreamMessageRegMsg()
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
|
||||
will_return(get_global_node, (intptr_t)head);
|
||||
expect_function_call(get_global_node);
|
||||
|
||||
will_return(get_numOfClients, 1);
|
||||
expect_function_call(get_numOfClients);
|
||||
|
||||
will_return(get_global_node, (intptr_t)head);
|
||||
expect_function_call(get_global_node);
|
||||
|
||||
will_return(nn_shutdown, -1);
|
||||
expect_function_call(nn_shutdown);
|
||||
|
||||
will_return(nn_close, 0);
|
||||
expect_function_call(nn_close);
|
||||
|
||||
will_return(nn_socket, -1);
|
||||
expect_function_call(nn_socket);
|
||||
|
||||
will_return(nn_shutdown, 1);
|
||||
expect_function_call(nn_shutdown);
|
||||
|
||||
will_return(nn_close, 0);
|
||||
expect_function_call(nn_close);
|
||||
|
||||
will_return(nn_socket, 1);
|
||||
expect_function_call(nn_socket);
|
||||
|
||||
@@ -778,16 +726,16 @@ void test_processUpstreamMsg_cloud_status()
|
||||
|
||||
void test_processUpstreamMsg_sendToClient()
|
||||
{
|
||||
numLoops = 2;
|
||||
metaPackSize = 20;
|
||||
numLoops = 2;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
@@ -810,66 +758,86 @@ void test_processUpstreamMsg_sendToClient()
|
||||
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
|
||||
void test_processUpstreamMsg_serviceNameNULL()
|
||||
void test_processUpstreamMessageNullCheck()
|
||||
{
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
UpStreamMsgQ->next = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->next->msg = strdup("Second Message");
|
||||
UpStreamMsgQ->next->len = 15;
|
||||
UpStreamMsgQ->next->next = NULL;
|
||||
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = 6;
|
||||
temp->u.crud.dest = strdup("mac:14cfe2142xxx/");
|
||||
temp->u.crud.source = strdup("mac:14cfe2142xxx/parodus/cloud-status");
|
||||
temp->u.crud.transaction_uuid = strdup("123");
|
||||
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = WRP_MSG_TYPE__RETREIVE;
|
||||
temp->u.crud.dest = strdup("mac:14cfe2142xxx/parodus/cloud-status");
|
||||
temp->u.crud.source = strdup("mac:14cfe2142xxx/config");
|
||||
temp->u.crud.transaction_uuid = strdup("123");
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
expect_function_call(addCRUDmsgToQueue);
|
||||
will_return(nn_freemsg, 0);
|
||||
expect_function_call(nn_freemsg);
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
|
||||
void err_processUpstreamMsg_deviceID()
|
||||
void err_processUpstreamMessageNullCheck()
|
||||
{
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
deviceIDNull = 1;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = "First Message";
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = 6;
|
||||
temp->u.crud.dest = "mac:14cfe2142xxx/parodus/cloud-status";
|
||||
temp->u.crud.source = "mac:14cfe2142xxx/config";
|
||||
temp->u.crud.transaction_uuid = "123";
|
||||
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
will_return(nn_freemsg, 0);
|
||||
expect_function_call(nn_freemsg);
|
||||
expect_function_call(wrp_free_struct);
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = WRP_MSG_TYPE__RETREIVE;
|
||||
temp->u.crud.dest = strdup("mac:/parodus/cloud-status");
|
||||
temp->u.crud.source = strdup("mac:14cfe2142xxx/config");
|
||||
temp->u.crud.transaction_uuid = strdup("123");
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
will_return(nn_freemsg, 0);
|
||||
expect_function_call(nn_freemsg);
|
||||
expect_function_call(wrp_free_struct);
|
||||
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
void err_processUpstreamMessageWithoutMac()
|
||||
{
|
||||
numLoops = 1;
|
||||
metaPackSize = 20;
|
||||
UpStreamMsgQ = (UpStreamMsg *) malloc(sizeof(UpStreamMsg));
|
||||
UpStreamMsgQ->msg = strdup("First Message");
|
||||
UpStreamMsgQ->len = 13;
|
||||
UpStreamMsgQ->next= NULL;
|
||||
temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t));
|
||||
memset(temp,0,sizeof(wrp_msg_t));
|
||||
temp->msg_type = WRP_MSG_TYPE__RETREIVE;
|
||||
temp->u.crud.dest = strdup("/parodus/cloud-status");
|
||||
temp->u.crud.source = strdup("mac:14cfe2142xxx/config");
|
||||
temp->u.crud.transaction_uuid = strdup("123");
|
||||
will_return(wrp_to_struct, 12);
|
||||
expect_function_call(wrp_to_struct);
|
||||
will_return(nn_freemsg, 0);
|
||||
expect_function_call(nn_freemsg);
|
||||
expect_function_call(wrp_free_struct);
|
||||
|
||||
processUpstreamMessage();
|
||||
free(temp);
|
||||
free(UpStreamMsgQ);
|
||||
UpStreamMsgQ = NULL;
|
||||
}
|
||||
|
||||
/*----------------------------------------------------------------------------*/
|
||||
/* External Functions */
|
||||
/*----------------------------------------------------------------------------*/
|
||||
@@ -902,8 +870,9 @@ int main(void)
|
||||
cmocka_unit_test(test_processUpstreamMsgCrud_nnfree),
|
||||
cmocka_unit_test(test_processUpstreamMsg_cloud_status),
|
||||
cmocka_unit_test(test_processUpstreamMsg_sendToClient),
|
||||
cmocka_unit_test(test_processUpstreamMsg_serviceNameNULL),
|
||||
cmocka_unit_test(err_processUpstreamMsg_deviceID)
|
||||
cmocka_unit_test(test_processUpstreamMessageNullCheck),
|
||||
cmocka_unit_test(err_processUpstreamMessageNullCheck),
|
||||
cmocka_unit_test(err_processUpstreamMessageWithoutMac),
|
||||
};
|
||||
|
||||
return cmocka_run_group_tests(tests, NULL, NULL);
|
||||
|
||||
Reference in New Issue
Block a user