properly shut down tasks, and fix memory leaks

This commit is contained in:
Bill Williams
2019-01-07 14:24:55 -08:00
parent 8a61e66456
commit 22feddeadc
26 changed files with 198 additions and 48 deletions

View File

@@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- used mutex protection to make client list and nn_sends thread safe
- put mutex lock into get_global_node
- change svc alive from a thread to a function called every 30 sec from main
- shut down tasks properly
- fix memory leaks
## [1.0.1] - 2018-07-18
### Added

View File

@@ -23,6 +23,7 @@ set(INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}/_install)
set(PREFIX_DIR ${CMAKE_CURRENT_BINARY_DIR}/_prefix)
set(INCLUDE_DIR ${INSTALL_DIR}/include)
set(INCLUDE_UCRESOLV ${PREFIX_DIR}/ucresolv/src/ucresolv/include)
set(NOPOLL_SRC ${PREFIX_DIR}/nopoll/src/nopoll/src)
set(PATCHES_DIR ${CMAKE_CURRENT_SOURCE_DIR}/patches)
set(LIBRARY_DIR ${INSTALL_DIR}/lib)
set(LIBRARY_DIR64 ${INSTALL_DIR}/lib64)
@@ -85,6 +86,7 @@ ExternalProject_Add(nopoll
PREFIX ${PREFIX_DIR}/nopoll
GIT_REPOSITORY https://github.com/Comcast/nopoll.git
GIT_TAG "1.0.1"
PATCH_COMMAND patch ${NOPOLL_SRC}/nopoll_conn.c < ${PATCHES_DIR}/nopoll_conn.patch
CONFIGURE_COMMAND COMMAND <SOURCE_DIR>/autogen.sh --prefix=${PREFIX}
--includedir=${INCLUDE_DIR}
--libdir=${LIBRARY_DIR}
@@ -190,8 +192,9 @@ endif (ENABLE_SESHAT)
#-------------------------------------------------------------------------------
ExternalProject_Add(cjwt
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/cjwt
GIT_REPOSITORY https://github.com/Comcast/cjwt.git
GIT_TAG "1.0.1"
GIT_REPOSITORY https://github.com/bill1600/cjwt.git
GIT_TAG "7b3240f9278072ce0ec64c79fdf2bc38e20bda06"
#GIT_TAG "1.0.1"
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
)
add_library(libcjwt STATIC SHARED IMPORTED)

View File

@@ -50,7 +50,8 @@
/* Macros */
/*----------------------------------------------------------------------------*/
#define UNUSED(x) (void )(x)
#define NANOMSG_SOCKET_TIMEOUT_MSEC 2000
#define NANO_SOCKET_SEND_TIMEOUT_MS 2000
#define NANO_SOCKET_RCV_TIMEOUT_MS 500
#ifndef TEST
#define FOREVER() 1
@@ -133,6 +134,7 @@ typedef struct {
/*----------------------------------------------------------------------------*/
/* File Scoped Variables */
/*----------------------------------------------------------------------------*/
extern bool g_shutdown;
extern ParodusMsg *ParodusMsgQ;
int numLoops;
/*----------------------------------------------------------------------------*/

View File

@@ -65,7 +65,7 @@ int addToList( wrp_msg_t **msg)
ParodusPrint("sock created for adding entries to list: %d\n", sock);
if(sock >= 0)
{
int t = NANOMSG_SOCKET_TIMEOUT_MSEC;
int t = NANO_SOCKET_SEND_TIMEOUT_MS;
rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t));
if(rc < 0)
{
@@ -234,6 +234,22 @@ int deleteFromList(char* service_name)
return -1;
}
void deleteAllClients (void)
{
reg_list_item_t *next_node = NULL;
while (NULL != g_head)
{
next_node = g_head->next;
free (g_head);
g_head = next_node;
}
if (numOfClients > 0) {
ParodusInfo ("Deleted %d clients\n", numOfClients);
numOfClients = 0;
}
}
/*
*@dest : Client destination to send message
*@Msg: Msg to send it to client (No free done here), user responsibilites to free the msg

View File

@@ -50,6 +50,7 @@ int sendAuthStatus(reg_list_item_t *new_node);
int deleteFromList(char* service_name);
int get_numOfClients();
void deleteAllClients (void);
int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize);
reg_list_item_t * get_global_node(void);

View File

@@ -52,6 +52,11 @@
/* File Scoped Variables */
/*----------------------------------------------------------------------------*/
bool g_shutdown = false;
pthread_t upstream_tid;
pthread_t upstream_msg_tid;
pthread_t downstream_tid;
pthread_t svc_alive_tid;
pthread_t crud_tid;
/*----------------------------------------------------------------------------*/
/* Function Prototypes */
@@ -94,13 +99,13 @@ void createSocketConnection(void (* initKeypress)())
}
packMetaData();
UpStreamMsgQ = NULL;
StartThread(handle_upstream);
StartThread(processUpstreamMessage);
UpStreamMsgQ = NULL;
StartThread(handle_upstream, &upstream_tid);
StartThread(processUpstreamMessage, &upstream_msg_tid);
ParodusMsgQ = NULL;
StartThread(messageHandlerTask);
/* StartThread(serviceAliveTask); */
StartThread(CRUDHandlerTask);
StartThread(messageHandlerTask, &downstream_tid);
/*StartThread(serviceAliveTask, &svc_alive_tid;);*/
StartThread(CRUDHandlerTask, &crud_tid);
if (NULL != initKeypress)
{
@@ -180,6 +185,24 @@ void createSocketConnection(void (* initKeypress)())
}
} while(!get_close_retry() && !g_shutdown);
pthread_mutex_lock (get_global_crud_mut());
pthread_cond_signal (get_global_crud_con());
pthread_mutex_unlock (get_global_crud_mut());
pthread_mutex_lock (&g_mutex);
pthread_cond_signal (&g_cond);
pthread_mutex_unlock (&g_mutex);
pthread_mutex_lock (get_global_nano_mut ());
pthread_cond_signal (get_global_nano_con());
pthread_mutex_unlock (get_global_nano_mut());
ParodusInfo ("joining threads\n");
pthread_join (upstream_tid, NULL);
pthread_join (downstream_tid, NULL);
pthread_join (upstream_msg_tid, NULL);
pthread_join (crud_tid, NULL);
deleteAllClients ();
close_and_unref_connection(get_global_conn());
nopoll_ctx_unref(ctx);
nopoll_cleanup_library();

View File

@@ -385,7 +385,7 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6)
NULL, default_url,NULL,NULL);
}
}
if (NULL == connection) {
if ((NULL == connection) && (!is_ipv6)) {
if((checkHostIp(server->server_addr) == -2)) {
if (check_timer_expired (&ctx->connect_timer, 15*60*1000)) {
ParodusError("WebPA unable to connect due to DNS resolving to 10.0.0.1 for over 15 minutes; crashing service.\n");
@@ -407,6 +407,8 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6)
#define WAIT_ACTION_RETRY 1 // if wait_status is 307, 302, 303 or 403
#define WAIT_FAIL 2
#define FREE_NON_NULL_PTR(ptr) if (NULL != ptr) free(ptr)
int wait_connection_ready (create_connection_ctx_t *ctx)
{
int wait_status;
@@ -414,7 +416,10 @@ int wait_connection_ready (create_connection_ctx_t *ctx)
if(nopoll_conn_wait_for_status_until_connection_ready(get_global_conn(), 10,
&wait_status, &redirectURL))
{
FREE_NON_NULL_PTR (redirectURL);
return WAIT_SUCCESS;
}
if(wait_status == 307 || wait_status == 302 || wait_status == 303) // only when there is a http redirect
{
char *redirect_ptr = redirectURL;
@@ -432,9 +437,7 @@ int wait_connection_ready (create_connection_ctx_t *ctx)
set_current_server (ctx); // set current server to redirect server
return WAIT_ACTION_RETRY;
}
if (NULL != redirectURL) {
free (redirectURL);
}
FREE_NON_NULL_PTR (redirectURL);
if(wait_status == 403)
{
ParodusError("Received Unauthorized response with status: %d\n", wait_status);
@@ -557,7 +560,7 @@ int createNopollConnection(noPollCtx *ctx)
set_server_list_null (&conn_ctx.server_list);
init_backoff_timer (&backoff_timer, max_retry_count);
while (true)
while (!g_shutdown)
{
query_dns_status = find_servers (&conn_ctx.server_list);
if (query_dns_status == FIND_INVALID_DEFAULT)

View File

@@ -44,6 +44,16 @@ CrudMsg *crudMsgQ = NULL;
/* External functions */
/*----------------------------------------------------------------------------*/
pthread_cond_t *get_global_crud_con(void)
{
return &crud_con;
}
pthread_mutex_t *get_global_crud_mut(void)
{
return &crud_mut;
}
void addCRUDmsgToQueue(wrp_msg_t *crudMsg)
{
CrudMsg * crudMessage;
@@ -122,6 +132,10 @@ void *CRUDHandlerTask()
}
else
{
if (g_shutdown) {
pthread_mutex_unlock (&crud_mut);
break;
}
pthread_cond_wait(&crud_con, &crud_mut);
pthread_mutex_unlock (&crud_mut);
}

View File

@@ -24,6 +24,8 @@
#ifndef _CRUD_INTERFACE_H_
#define _CRUD_INTERFACE_H_
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
@@ -43,6 +45,8 @@ typedef struct CrudMsg__
/*----------------------------------------------------------------------------*/
void addCRUDresponseToUpstreamQ(void *response_bytes, ssize_t response_size);
pthread_cond_t *get_global_crud_con(void);
pthread_mutex_t *get_global_crud_mut(void);
#ifdef __cplusplus
}

View File

@@ -75,7 +75,6 @@ void listenerOnMessage(void * msg, size_t msgSize)
case WRP_MSG_TYPE__AUTH:
{
ParodusInfo("Authorization Status received with Status code :%d\n", message->u.auth.status);
wrp_free_struct(message);
break;
}
@@ -142,9 +141,9 @@ void listenerOnMessage(void * msg, size_t msgSize)
}
release_global_node ();
/* check Downstream dest for CRUD requests */
if(destFlag ==0 && strcmp("parodus", dest)==0)
{
/* check Downstream dest for CRUD requests */
if(destFlag ==0 && strcmp("parodus", dest)==0)
{
ParodusPrint("Received CRUD request : dest : %s\n", dest);
if ((message->u.crud.source == NULL) || (message->u.crud.transaction_uuid == NULL))
{
@@ -159,8 +158,8 @@ void listenerOnMessage(void * msg, size_t msgSize)
addCRUDmsgToQueue(message);
}
destFlag =1;
}
//if any unknown dest received sending error response to server
}
//if any unknown dest received sending error response to server
if(destFlag ==0)
{
ParodusError("Unknown dest:%s\n", dest);
@@ -186,14 +185,14 @@ void listenerOnMessage(void * msg, size_t msgSize)
else
{
resp_msg ->u.crud.source = message->u.crud.dest;
if(message->u.crud.source !=NULL)
{
resp_msg ->u.crud.dest = message->u.crud.source;
}
else
{
resp_msg ->u.crud.dest = "unknown";
}
if(message->u.crud.source !=NULL)
{
resp_msg ->u.crud.dest = message->u.crud.source;
}
else
{
resp_msg ->u.crud.dest = "unknown";
}
resp_msg ->u.crud.transaction_uuid = message->u.crud.transaction_uuid;
resp_msg ->u.crud.path = message->u.crud.path;
}
@@ -228,7 +227,6 @@ void listenerOnMessage(void * msg, size_t msgSize)
}
free(resp_msg);
ParodusPrint("free for downstream decoded msg\n");
wrp_free_struct(message);
}
break;
}
@@ -237,9 +235,9 @@ void listenerOnMessage(void * msg, size_t msgSize)
case WRP_MSG_TYPE__SVC_ALIVE:
case WRP_MSG_TYPE__UNKNOWN:
default:
wrp_free_struct(message);
break;
}
wrp_free_struct(message);
}
else
{

View File

@@ -99,5 +99,6 @@ int serviceAliveTask()
ParodusInfo("No clients are registered, waiting ..\n");
}
}
free (svc_bytes);
return 0;
}

View File

@@ -23,7 +23,6 @@
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include "spin_thread.h"
#include "parodus_log.h"
@@ -31,12 +30,12 @@
/*----------------------------------------------------------------------------*/
/* External Functions */
/*----------------------------------------------------------------------------*/
void StartThread(void *(*start_routine) (void *))
void StartThread(void *(*start_routine) (void *), pthread_t *threadId)
{
int err = 0;
pthread_t threadId;
pthread_t __threadId;
err = pthread_create(&threadId, NULL, start_routine, NULL);
err = pthread_create(&__threadId, NULL, start_routine, NULL);
if (err != 0)
{
ParodusError("Error creating thread :[%s]\n", strerror(err));
@@ -44,7 +43,8 @@ void StartThread(void *(*start_routine) (void *))
}
else
{
ParodusPrint("Thread created Successfully %lu\n", (unsigned long) threadId);
*threadId = __threadId;
ParodusPrint("Thread created Successfully %lu\n", (unsigned long) __threadId);
}
}

View File

@@ -24,6 +24,8 @@
#ifndef _SPIN_THREAD_H_
#define _SPIN_THREAD_H_
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
@@ -32,7 +34,7 @@ extern "C" {
/* Function Prototypes */
/*----------------------------------------------------------------------------*/
void StartThread(void *(*start_routine) (void *));
void StartThread(void *(*start_routine) (void *), pthread_t *threadId);
#ifdef __cplusplus

View File

@@ -43,6 +43,10 @@ void *messageHandlerTask()
}
else
{
if (g_shutdown) {
pthread_mutex_unlock (&g_mutex);
break;
}
ParodusPrint("Before pthread cond wait in consumer thread\n");
pthread_cond_wait(&g_cond, &g_mutex);
pthread_mutex_unlock (&g_mutex);

View File

@@ -527,7 +527,9 @@ int allow_insecure_conn(char **server_addr, unsigned int *port)
}
if (insecure >= 0) {
ParodusInfo ("JWT claims: %s\n", cJSON_Print (jwt->private_claims));
char *claim_str = cJSON_Print (jwt->private_claims);
ParodusInfo ("JWT claims: %s\n", claim_str);
free (claim_str);
}
cjwt_destroy(&jwt);

View File

@@ -129,6 +129,12 @@ void *handle_upstream()
sock = nn_socket( AF_SP, NN_PULL );
if(sock >= 0)
{
int t = NANO_SOCKET_RCV_TIMEOUT_MS;
int rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &t, sizeof(t));
if (rc < 0)
{
ParodusError ("Unable to set socket receive timeout (errno=%d, %s)\n",errno, strerror(errno));
}
ParodusPrint("Nanomsg bind with get_parodus_cfg()->local_url %s\n", get_parodus_cfg()->local_url);
bind = nn_bind(sock, get_parodus_cfg()->local_url);
if(bind < 0)
@@ -137,11 +143,18 @@ void *handle_upstream()
}
else
{
ParodusInfo("nanomsg server gone into the listening mode...\n");
while( FOREVER() )
{
buf = NULL;
ParodusInfo("nanomsg server gone into the listening mode...\n");
bytes = nn_recv (sock, &buf, NN_MSG, 0);
if (g_shutdown)
break;
if (bytes < 0) {
if ((errno != EAGAIN) && (errno != ETIMEDOUT))
ParodusInfo ("Error (%d) receiving message from nanomsg client\n", errno);
continue;
}
ParodusInfo ("Upstream message received from nanomsg client\n");
message = (UpStreamMsg *)malloc(sizeof(UpStreamMsg));
@@ -245,11 +258,11 @@ void *processUpstreamMessage()
temp->sock = nn_socket(AF_SP,NN_PUSH );
if(temp->sock >= 0)
{
int t = NANOMSG_SOCKET_TIMEOUT_MSEC;
int t = NANO_SOCKET_SEND_TIMEOUT_MS;
rc = nn_setsockopt(temp->sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t));
if(rc < 0)
{
ParodusError ("Unable to set socket timeout (errno=%d, %s)\n",errno, strerror(errno));
ParodusError ("Unable to set socket send timeout (errno=%d, %s)\n",errno, strerror(errno));
}
rc = nn_connect(temp->sock, msg->u.reg.url);
if(rc < 0)
@@ -295,6 +308,7 @@ void *processUpstreamMessage()
{
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
partners_t *partnersList = NULL;
int j = 0;
int ret = validate_partner_id(msg, &partnersList);
if(ret == 1)
@@ -323,6 +337,18 @@ void *processUpstreamMessage()
{
sendUpstreamMsgToServer(&message->msg, message->len);
}
if(partnersList != NULL)
{
for(j=0; j<(int)partnersList->count; j++)
{
if(NULL != partnersList->partner_ids[j])
{
free(partnersList->partner_ids[j]);
}
}
free(partnersList);
}
partnersList = NULL;
}
else
{
@@ -451,6 +477,10 @@ void *processUpstreamMessage()
}
else
{
if (g_shutdown) {
pthread_mutex_unlock (&nano_mut);
break;
}
ParodusPrint("Before pthread cond wait in consumer thread\n");
pthread_cond_wait(&nano_con, &nano_mut);
pthread_mutex_unlock (&nano_mut);

View File

@@ -26,6 +26,9 @@
#define TEST_CLIENT1_URL "tcp://127.0.0.1:6677"
#define TEST_CLIENT2_URL "tcp://127.0.0.1:6655"
pthread_t test_tid;
pthread_t test_tid2;
static void *client_rcv_task();
static void *client2_rcv_task();
@@ -58,7 +61,7 @@ void test_client_addtolist()
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
StartThread(client_rcv_task);
StartThread(client_rcv_task, &test_tid);
status = addToList(&message);
ParodusPrint("addToList status is %d\n", status);
@@ -189,7 +192,7 @@ void test_addtolist_multiple_clients()
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
StartThread(client2_rcv_task);
StartThread(client2_rcv_task, &test_tid2);
status = addToList(&message);
ParodusPrint("addToList status is %d\n", status);

View File

@@ -35,8 +35,10 @@
/*----------------------------------------------------------------------------*/
UpStreamMsg *UpStreamMsgQ;
ParodusMsg *ParodusMsgQ;
pthread_mutex_t nano_mut;
pthread_cond_t nano_con;
pthread_mutex_t g_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t g_cond=PTHREAD_COND_INITIALIZER;
pthread_mutex_t nano_mut=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t nano_con=PTHREAD_COND_INITIALIZER;
/*----------------------------------------------------------------------------*/
@@ -205,6 +207,10 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
diff->tv_nsec = 1000;
}
void deleteAllClients (void)
{
}
/*----------------------------------------------------------------------------*/
/* Tests */
/*----------------------------------------------------------------------------*/

View File

@@ -63,6 +63,7 @@ bool LastReasonStatus;
pthread_mutex_t close_mut;
// Mock values
bool g_shutdown = false;
char *mock_server_addr;
unsigned int mock_port;
int mock_wait_status;

View File

@@ -35,9 +35,11 @@
/*----------------------------------------------------------------------------*/
/* File Scoped Variables */
/*----------------------------------------------------------------------------*/
bool g_shutdown = false;
extern CrudMsg *crudMsgQ;
int numLoops = 1;
wrp_msg_t *temp = NULL;
/*----------------------------------------------------------------------------*/
/* Mocks */
/*----------------------------------------------------------------------------*/

View File

@@ -33,6 +33,18 @@ static void *keep_alive_thread();
static void add_client();
int sock1;
pthread_t threadId;
pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER;
pthread_cond_t *get_global_crud_con(void)
{
return &crud_con;
}
pthread_mutex_t *get_global_crud_mut(void)
{
return &crud_mut;
}
/*----------------------------------------------------------------------------*/
/* Tests */
@@ -48,12 +60,14 @@ void *CRUDHandlerTask()
{
return NULL;
}
static void add_client()
{
const wrp_msg_t reg = { .msg_type = WRP_MSG_TYPE__SVC_REGISTRATION,
.u.reg.service_name = "service_client",
.u.reg.url = TEST_SERVICE_URL};
pthread_t test_tid;
void *bytes;
int size =0;
int rv;
@@ -71,7 +85,7 @@ static void add_client()
ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name);
ParodusPrint("decoded dest:%s\n", message->u.reg.url);
StartThread(client_rcv_task);
StartThread(client_rcv_task, &test_tid);
status = addToList(&message);
ParodusPrint("addToList status is %d\n", status);

View File

@@ -23,6 +23,8 @@
#include "../src/ParodusInternal.h"
#include "../src/spin_thread.h"
pthread_t test_tid;
/*----------------------------------------------------------------------------*/
/* Mocks */
/*----------------------------------------------------------------------------*/
@@ -54,7 +56,7 @@ void *_routine(void *v)
void test_StartThread_error()
{
StartThread(&_routine);
StartThread(&_routine, &test_tid);
}
void add_suites( CU_pSuite *suite )

View File

@@ -23,6 +23,8 @@
#include "../src/ParodusInternal.h"
#include "../src/spin_thread.h"
pthread_t test_tid;
/*----------------------------------------------------------------------------*/
/* Mocks */
/*----------------------------------------------------------------------------*/
@@ -44,7 +46,7 @@ void *_routine(void *v)
void test_StartThread_success()
{
StartThread(&_routine);
StartThread(&_routine, &test_tid);
}
void add_suites( CU_pSuite *suite )

View File

@@ -30,6 +30,7 @@
/*----------------------------------------------------------------------------*/
/* File Scoped Variables */
/*----------------------------------------------------------------------------*/
bool g_shutdown = false;
ParodusMsg *ParodusMsgQ;
pthread_mutex_t g_mutex;
pthread_cond_t g_cond;

View File

@@ -158,6 +158,19 @@ extern void read_key_from_file (const char *fname, char *buf, size_t buflen);
extern const char *get_tok (const char *src, int delim, char *result, int resultsize);
extern unsigned int get_algo_mask (const char *algo_str);
pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER;
pthread_cond_t *get_global_crud_con(void)
{
return &crud_con;
}
pthread_mutex_t *get_global_crud_mut(void)
{
return &crud_mut;
}
void addCRUDmsgToQueue(wrp_msg_t *crudMsg)
{
(void)crudMsg;

View File

@@ -38,6 +38,7 @@
/*----------------------------------------------------------------------------*/
static noPollConn *conn;
static char *reconnect_reason = "webpa_process_starts";
bool g_shutdown = false;
static ParodusCfg parodusCfg;
extern size_t metaPackSize;
extern UpStreamMsg *UpStreamMsgQ;