From 22feddeadc9b010237d91e1cc2ff6611ed8626aa Mon Sep 17 00:00:00 2001 From: Bill Williams Date: Mon, 7 Jan 2019 14:24:55 -0800 Subject: [PATCH] properly shut down tasks, and fix memory leaks --- CHANGELOG.md | 2 ++ CMakeLists.txt | 7 +++++-- src/ParodusInternal.h | 4 +++- src/client_list.c | 18 +++++++++++++++++- src/client_list.h | 1 + src/conn_interface.c | 35 +++++++++++++++++++++++++++++------ src/connection.c | 13 ++++++++----- src/crud_interface.c | 14 ++++++++++++++ src/crud_interface.h | 4 ++++ src/downstream.c | 30 ++++++++++++++---------------- src/service_alive.c | 1 + src/spin_thread.c | 10 +++++----- src/spin_thread.h | 4 +++- src/thread_tasks.c | 4 ++++ src/token.c | 4 +++- src/upstream.c | 36 +++++++++++++++++++++++++++++++++--- tests/test_client_list.c | 7 +++++-- tests/test_conn_interface.c | 10 ++++++++-- tests/test_connection.c | 1 + tests/test_crud_interface.c | 2 ++ tests/test_service_alive.c | 16 +++++++++++++++- tests/test_spin_thread_e.c | 4 +++- tests/test_spin_thread_s.c | 4 +++- tests/test_thread_tasks.c | 1 + tests/test_token.c | 13 +++++++++++++ tests/test_upstream.c | 1 + 26 files changed, 198 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b5cb9e..c81bf48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - used mutex protection to make client list and nn_sends thread safe - put mutex lock into get_global_node - change svc alive from a thread to a function called every 30 sec from main +- shut down tasks properly +- fix memory leaks ## [1.0.1] - 2018-07-18 ### Added diff --git a/CMakeLists.txt b/CMakeLists.txt index 78236f2..1514712 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ set(INSTALL_DIR ${CMAKE_CURRENT_BINARY_DIR}/_install) set(PREFIX_DIR ${CMAKE_CURRENT_BINARY_DIR}/_prefix) set(INCLUDE_DIR ${INSTALL_DIR}/include) set(INCLUDE_UCRESOLV ${PREFIX_DIR}/ucresolv/src/ucresolv/include) +set(NOPOLL_SRC ${PREFIX_DIR}/nopoll/src/nopoll/src) set(PATCHES_DIR ${CMAKE_CURRENT_SOURCE_DIR}/patches) set(LIBRARY_DIR ${INSTALL_DIR}/lib) set(LIBRARY_DIR64 ${INSTALL_DIR}/lib64) @@ -85,6 +86,7 @@ ExternalProject_Add(nopoll PREFIX ${PREFIX_DIR}/nopoll GIT_REPOSITORY https://github.com/Comcast/nopoll.git GIT_TAG "1.0.1" + PATCH_COMMAND patch ${NOPOLL_SRC}/nopoll_conn.c < ${PATCHES_DIR}/nopoll_conn.patch CONFIGURE_COMMAND COMMAND /autogen.sh --prefix=${PREFIX} --includedir=${INCLUDE_DIR} --libdir=${LIBRARY_DIR} @@ -190,8 +192,9 @@ endif (ENABLE_SESHAT) #------------------------------------------------------------------------------- ExternalProject_Add(cjwt PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/cjwt - GIT_REPOSITORY https://github.com/Comcast/cjwt.git - GIT_TAG "1.0.1" + GIT_REPOSITORY https://github.com/bill1600/cjwt.git + GIT_TAG "7b3240f9278072ce0ec64c79fdf2bc38e20bda06" + #GIT_TAG "1.0.1" CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF ) add_library(libcjwt STATIC SHARED IMPORTED) diff --git a/src/ParodusInternal.h b/src/ParodusInternal.h index 5d9fd11..e88bb05 100644 --- a/src/ParodusInternal.h +++ b/src/ParodusInternal.h @@ -50,7 +50,8 @@ /* Macros */ /*----------------------------------------------------------------------------*/ #define UNUSED(x) (void )(x) -#define NANOMSG_SOCKET_TIMEOUT_MSEC 2000 +#define NANO_SOCKET_SEND_TIMEOUT_MS 2000 +#define NANO_SOCKET_RCV_TIMEOUT_MS 500 #ifndef TEST #define FOREVER() 1 @@ -133,6 +134,7 @@ typedef struct { /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +extern bool g_shutdown; extern ParodusMsg *ParodusMsgQ; int numLoops; /*----------------------------------------------------------------------------*/ diff --git a/src/client_list.c b/src/client_list.c index f519a75..3cf47f8 100644 --- a/src/client_list.c +++ b/src/client_list.c @@ -65,7 +65,7 @@ int addToList( wrp_msg_t **msg) ParodusPrint("sock created for adding entries to list: %d\n", sock); if(sock >= 0) { - int t = NANOMSG_SOCKET_TIMEOUT_MSEC; + int t = NANO_SOCKET_SEND_TIMEOUT_MS; rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t)); if(rc < 0) { @@ -234,6 +234,22 @@ int deleteFromList(char* service_name) return -1; } +void deleteAllClients (void) +{ + reg_list_item_t *next_node = NULL; + + while (NULL != g_head) + { + next_node = g_head->next; + free (g_head); + g_head = next_node; + } + if (numOfClients > 0) { + ParodusInfo ("Deleted %d clients\n", numOfClients); + numOfClients = 0; + } +} + /* *@dest : Client destination to send message *@Msg: Msg to send it to client (No free done here), user responsibilites to free the msg diff --git a/src/client_list.h b/src/client_list.h index 5d58f57..dd38c8d 100644 --- a/src/client_list.h +++ b/src/client_list.h @@ -50,6 +50,7 @@ int sendAuthStatus(reg_list_item_t *new_node); int deleteFromList(char* service_name); int get_numOfClients(); +void deleteAllClients (void); int sendMsgtoRegisteredClients(char *dest,const char **Msg,size_t msgSize); reg_list_item_t * get_global_node(void); diff --git a/src/conn_interface.c b/src/conn_interface.c index 9259f5f..4288055 100644 --- a/src/conn_interface.c +++ b/src/conn_interface.c @@ -52,6 +52,11 @@ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ bool g_shutdown = false; +pthread_t upstream_tid; +pthread_t upstream_msg_tid; +pthread_t downstream_tid; +pthread_t svc_alive_tid; +pthread_t crud_tid; /*----------------------------------------------------------------------------*/ /* Function Prototypes */ @@ -94,13 +99,13 @@ void createSocketConnection(void (* initKeypress)()) } packMetaData(); - UpStreamMsgQ = NULL; - StartThread(handle_upstream); - StartThread(processUpstreamMessage); + UpStreamMsgQ = NULL; + StartThread(handle_upstream, &upstream_tid); + StartThread(processUpstreamMessage, &upstream_msg_tid); ParodusMsgQ = NULL; - StartThread(messageHandlerTask); - /* StartThread(serviceAliveTask); */ - StartThread(CRUDHandlerTask); + StartThread(messageHandlerTask, &downstream_tid); + /*StartThread(serviceAliveTask, &svc_alive_tid;);*/ + StartThread(CRUDHandlerTask, &crud_tid); if (NULL != initKeypress) { @@ -180,6 +185,24 @@ void createSocketConnection(void (* initKeypress)()) } } while(!get_close_retry() && !g_shutdown); + pthread_mutex_lock (get_global_crud_mut()); + pthread_cond_signal (get_global_crud_con()); + pthread_mutex_unlock (get_global_crud_mut()); + pthread_mutex_lock (&g_mutex); + pthread_cond_signal (&g_cond); + pthread_mutex_unlock (&g_mutex); + pthread_mutex_lock (get_global_nano_mut ()); + pthread_cond_signal (get_global_nano_con()); + pthread_mutex_unlock (get_global_nano_mut()); + + ParodusInfo ("joining threads\n"); + pthread_join (upstream_tid, NULL); + pthread_join (downstream_tid, NULL); + pthread_join (upstream_msg_tid, NULL); + pthread_join (crud_tid, NULL); + + deleteAllClients (); + close_and_unref_connection(get_global_conn()); nopoll_ctx_unref(ctx); nopoll_cleanup_library(); diff --git a/src/connection.c b/src/connection.c index eb095e1..2764cb7 100644 --- a/src/connection.c +++ b/src/connection.c @@ -385,7 +385,7 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6) NULL, default_url,NULL,NULL); } } - if (NULL == connection) { + if ((NULL == connection) && (!is_ipv6)) { if((checkHostIp(server->server_addr) == -2)) { if (check_timer_expired (&ctx->connect_timer, 15*60*1000)) { ParodusError("WebPA unable to connect due to DNS resolving to 10.0.0.1 for over 15 minutes; crashing service.\n"); @@ -407,6 +407,8 @@ int nopoll_connect (create_connection_ctx_t *ctx, int is_ipv6) #define WAIT_ACTION_RETRY 1 // if wait_status is 307, 302, 303 or 403 #define WAIT_FAIL 2 +#define FREE_NON_NULL_PTR(ptr) if (NULL != ptr) free(ptr) + int wait_connection_ready (create_connection_ctx_t *ctx) { int wait_status; @@ -414,7 +416,10 @@ int wait_connection_ready (create_connection_ctx_t *ctx) if(nopoll_conn_wait_for_status_until_connection_ready(get_global_conn(), 10, &wait_status, &redirectURL)) + { + FREE_NON_NULL_PTR (redirectURL); return WAIT_SUCCESS; + } if(wait_status == 307 || wait_status == 302 || wait_status == 303) // only when there is a http redirect { char *redirect_ptr = redirectURL; @@ -432,9 +437,7 @@ int wait_connection_ready (create_connection_ctx_t *ctx) set_current_server (ctx); // set current server to redirect server return WAIT_ACTION_RETRY; } - if (NULL != redirectURL) { - free (redirectURL); - } + FREE_NON_NULL_PTR (redirectURL); if(wait_status == 403) { ParodusError("Received Unauthorized response with status: %d\n", wait_status); @@ -557,7 +560,7 @@ int createNopollConnection(noPollCtx *ctx) set_server_list_null (&conn_ctx.server_list); init_backoff_timer (&backoff_timer, max_retry_count); - while (true) + while (!g_shutdown) { query_dns_status = find_servers (&conn_ctx.server_list); if (query_dns_status == FIND_INVALID_DEFAULT) diff --git a/src/crud_interface.c b/src/crud_interface.c index d6a5aea..3994eec 100644 --- a/src/crud_interface.c +++ b/src/crud_interface.c @@ -44,6 +44,16 @@ CrudMsg *crudMsgQ = NULL; /* External functions */ /*----------------------------------------------------------------------------*/ +pthread_cond_t *get_global_crud_con(void) +{ + return &crud_con; +} + +pthread_mutex_t *get_global_crud_mut(void) +{ + return &crud_mut; +} + void addCRUDmsgToQueue(wrp_msg_t *crudMsg) { CrudMsg * crudMessage; @@ -122,6 +132,10 @@ void *CRUDHandlerTask() } else { + if (g_shutdown) { + pthread_mutex_unlock (&crud_mut); + break; + } pthread_cond_wait(&crud_con, &crud_mut); pthread_mutex_unlock (&crud_mut); } diff --git a/src/crud_interface.h b/src/crud_interface.h index b2fb2e4..04107cd 100644 --- a/src/crud_interface.h +++ b/src/crud_interface.h @@ -24,6 +24,8 @@ #ifndef _CRUD_INTERFACE_H_ #define _CRUD_INTERFACE_H_ +#include + #ifdef __cplusplus extern "C" { #endif @@ -43,6 +45,8 @@ typedef struct CrudMsg__ /*----------------------------------------------------------------------------*/ void addCRUDresponseToUpstreamQ(void *response_bytes, ssize_t response_size); +pthread_cond_t *get_global_crud_con(void); +pthread_mutex_t *get_global_crud_mut(void); #ifdef __cplusplus } diff --git a/src/downstream.c b/src/downstream.c index e3e4708..3a3a66a 100644 --- a/src/downstream.c +++ b/src/downstream.c @@ -75,7 +75,6 @@ void listenerOnMessage(void * msg, size_t msgSize) case WRP_MSG_TYPE__AUTH: { ParodusInfo("Authorization Status received with Status code :%d\n", message->u.auth.status); - wrp_free_struct(message); break; } @@ -142,9 +141,9 @@ void listenerOnMessage(void * msg, size_t msgSize) } release_global_node (); - /* check Downstream dest for CRUD requests */ - if(destFlag ==0 && strcmp("parodus", dest)==0) - { + /* check Downstream dest for CRUD requests */ + if(destFlag ==0 && strcmp("parodus", dest)==0) + { ParodusPrint("Received CRUD request : dest : %s\n", dest); if ((message->u.crud.source == NULL) || (message->u.crud.transaction_uuid == NULL)) { @@ -159,8 +158,8 @@ void listenerOnMessage(void * msg, size_t msgSize) addCRUDmsgToQueue(message); } destFlag =1; - } - //if any unknown dest received sending error response to server + } + //if any unknown dest received sending error response to server if(destFlag ==0) { ParodusError("Unknown dest:%s\n", dest); @@ -186,14 +185,14 @@ void listenerOnMessage(void * msg, size_t msgSize) else { resp_msg ->u.crud.source = message->u.crud.dest; - if(message->u.crud.source !=NULL) - { - resp_msg ->u.crud.dest = message->u.crud.source; - } - else - { - resp_msg ->u.crud.dest = "unknown"; - } + if(message->u.crud.source !=NULL) + { + resp_msg ->u.crud.dest = message->u.crud.source; + } + else + { + resp_msg ->u.crud.dest = "unknown"; + } resp_msg ->u.crud.transaction_uuid = message->u.crud.transaction_uuid; resp_msg ->u.crud.path = message->u.crud.path; } @@ -228,7 +227,6 @@ void listenerOnMessage(void * msg, size_t msgSize) } free(resp_msg); ParodusPrint("free for downstream decoded msg\n"); - wrp_free_struct(message); } break; } @@ -237,9 +235,9 @@ void listenerOnMessage(void * msg, size_t msgSize) case WRP_MSG_TYPE__SVC_ALIVE: case WRP_MSG_TYPE__UNKNOWN: default: - wrp_free_struct(message); break; } + wrp_free_struct(message); } else { diff --git a/src/service_alive.c b/src/service_alive.c index 74cd02c..cd667f6 100644 --- a/src/service_alive.c +++ b/src/service_alive.c @@ -99,5 +99,6 @@ int serviceAliveTask() ParodusInfo("No clients are registered, waiting ..\n"); } } + free (svc_bytes); return 0; } diff --git a/src/spin_thread.c b/src/spin_thread.c index e9e7a2e..465f52e 100644 --- a/src/spin_thread.c +++ b/src/spin_thread.c @@ -23,7 +23,6 @@ #include #include -#include #include "spin_thread.h" #include "parodus_log.h" @@ -31,12 +30,12 @@ /*----------------------------------------------------------------------------*/ /* External Functions */ /*----------------------------------------------------------------------------*/ -void StartThread(void *(*start_routine) (void *)) +void StartThread(void *(*start_routine) (void *), pthread_t *threadId) { int err = 0; - pthread_t threadId; + pthread_t __threadId; - err = pthread_create(&threadId, NULL, start_routine, NULL); + err = pthread_create(&__threadId, NULL, start_routine, NULL); if (err != 0) { ParodusError("Error creating thread :[%s]\n", strerror(err)); @@ -44,7 +43,8 @@ void StartThread(void *(*start_routine) (void *)) } else { - ParodusPrint("Thread created Successfully %lu\n", (unsigned long) threadId); + *threadId = __threadId; + ParodusPrint("Thread created Successfully %lu\n", (unsigned long) __threadId); } } diff --git a/src/spin_thread.h b/src/spin_thread.h index 61d2610..6c224b5 100644 --- a/src/spin_thread.h +++ b/src/spin_thread.h @@ -24,6 +24,8 @@ #ifndef _SPIN_THREAD_H_ #define _SPIN_THREAD_H_ +#include + #ifdef __cplusplus extern "C" { #endif @@ -32,7 +34,7 @@ extern "C" { /* Function Prototypes */ /*----------------------------------------------------------------------------*/ -void StartThread(void *(*start_routine) (void *)); +void StartThread(void *(*start_routine) (void *), pthread_t *threadId); #ifdef __cplusplus diff --git a/src/thread_tasks.c b/src/thread_tasks.c index 16e32d6..dfc9c17 100644 --- a/src/thread_tasks.c +++ b/src/thread_tasks.c @@ -43,6 +43,10 @@ void *messageHandlerTask() } else { + if (g_shutdown) { + pthread_mutex_unlock (&g_mutex); + break; + } ParodusPrint("Before pthread cond wait in consumer thread\n"); pthread_cond_wait(&g_cond, &g_mutex); pthread_mutex_unlock (&g_mutex); diff --git a/src/token.c b/src/token.c index 7da1d95..1dfc691 100644 --- a/src/token.c +++ b/src/token.c @@ -527,7 +527,9 @@ int allow_insecure_conn(char **server_addr, unsigned int *port) } if (insecure >= 0) { - ParodusInfo ("JWT claims: %s\n", cJSON_Print (jwt->private_claims)); + char *claim_str = cJSON_Print (jwt->private_claims); + ParodusInfo ("JWT claims: %s\n", claim_str); + free (claim_str); } cjwt_destroy(&jwt); diff --git a/src/upstream.c b/src/upstream.c index 449ce46..a5c17ce 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -129,6 +129,12 @@ void *handle_upstream() sock = nn_socket( AF_SP, NN_PULL ); if(sock >= 0) { + int t = NANO_SOCKET_RCV_TIMEOUT_MS; + int rc = nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &t, sizeof(t)); + if (rc < 0) + { + ParodusError ("Unable to set socket receive timeout (errno=%d, %s)\n",errno, strerror(errno)); + } ParodusPrint("Nanomsg bind with get_parodus_cfg()->local_url %s\n", get_parodus_cfg()->local_url); bind = nn_bind(sock, get_parodus_cfg()->local_url); if(bind < 0) @@ -137,11 +143,18 @@ void *handle_upstream() } else { + ParodusInfo("nanomsg server gone into the listening mode...\n"); while( FOREVER() ) { buf = NULL; - ParodusInfo("nanomsg server gone into the listening mode...\n"); bytes = nn_recv (sock, &buf, NN_MSG, 0); + if (g_shutdown) + break; + if (bytes < 0) { + if ((errno != EAGAIN) && (errno != ETIMEDOUT)) + ParodusInfo ("Error (%d) receiving message from nanomsg client\n", errno); + continue; + } ParodusInfo ("Upstream message received from nanomsg client\n"); message = (UpStreamMsg *)malloc(sizeof(UpStreamMsg)); @@ -245,11 +258,11 @@ void *processUpstreamMessage() temp->sock = nn_socket(AF_SP,NN_PUSH ); if(temp->sock >= 0) { - int t = NANOMSG_SOCKET_TIMEOUT_MSEC; + int t = NANO_SOCKET_SEND_TIMEOUT_MS; rc = nn_setsockopt(temp->sock, NN_SOL_SOCKET, NN_SNDTIMEO, &t, sizeof(t)); if(rc < 0) { - ParodusError ("Unable to set socket timeout (errno=%d, %s)\n",errno, strerror(errno)); + ParodusError ("Unable to set socket send timeout (errno=%d, %s)\n",errno, strerror(errno)); } rc = nn_connect(temp->sock, msg->u.reg.url); if(rc < 0) @@ -295,6 +308,7 @@ void *processUpstreamMessage() { ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest); partners_t *partnersList = NULL; + int j = 0; int ret = validate_partner_id(msg, &partnersList); if(ret == 1) @@ -323,6 +337,18 @@ void *processUpstreamMessage() { sendUpstreamMsgToServer(&message->msg, message->len); } + if(partnersList != NULL) + { + for(j=0; j<(int)partnersList->count; j++) + { + if(NULL != partnersList->partner_ids[j]) + { + free(partnersList->partner_ids[j]); + } + } + free(partnersList); + } + partnersList = NULL; } else { @@ -451,6 +477,10 @@ void *processUpstreamMessage() } else { + if (g_shutdown) { + pthread_mutex_unlock (&nano_mut); + break; + } ParodusPrint("Before pthread cond wait in consumer thread\n"); pthread_cond_wait(&nano_con, &nano_mut); pthread_mutex_unlock (&nano_mut); diff --git a/tests/test_client_list.c b/tests/test_client_list.c index ed6cad4..e38da34 100644 --- a/tests/test_client_list.c +++ b/tests/test_client_list.c @@ -26,6 +26,9 @@ #define TEST_CLIENT1_URL "tcp://127.0.0.1:6677" #define TEST_CLIENT2_URL "tcp://127.0.0.1:6655" +pthread_t test_tid; +pthread_t test_tid2; + static void *client_rcv_task(); static void *client2_rcv_task(); @@ -58,7 +61,7 @@ void test_client_addtolist() ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name); ParodusPrint("decoded dest:%s\n", message->u.reg.url); - StartThread(client_rcv_task); + StartThread(client_rcv_task, &test_tid); status = addToList(&message); ParodusPrint("addToList status is %d\n", status); @@ -189,7 +192,7 @@ void test_addtolist_multiple_clients() ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name); ParodusPrint("decoded dest:%s\n", message->u.reg.url); - StartThread(client2_rcv_task); + StartThread(client2_rcv_task, &test_tid2); status = addToList(&message); ParodusPrint("addToList status is %d\n", status); diff --git a/tests/test_conn_interface.c b/tests/test_conn_interface.c index f4bcf0f..571b043 100644 --- a/tests/test_conn_interface.c +++ b/tests/test_conn_interface.c @@ -35,8 +35,10 @@ /*----------------------------------------------------------------------------*/ UpStreamMsg *UpStreamMsgQ; ParodusMsg *ParodusMsgQ; -pthread_mutex_t nano_mut; -pthread_cond_t nano_con; +pthread_mutex_t g_mutex=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t g_cond=PTHREAD_COND_INITIALIZER; +pthread_mutex_t nano_mut=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t nano_con=PTHREAD_COND_INITIALIZER; /*----------------------------------------------------------------------------*/ @@ -205,6 +207,10 @@ void timespec_diff(struct timespec *start, struct timespec *stop, diff->tv_nsec = 1000; } +void deleteAllClients (void) +{ +} + /*----------------------------------------------------------------------------*/ /* Tests */ /*----------------------------------------------------------------------------*/ diff --git a/tests/test_connection.c b/tests/test_connection.c index 0aace96..dc00636 100644 --- a/tests/test_connection.c +++ b/tests/test_connection.c @@ -63,6 +63,7 @@ bool LastReasonStatus; pthread_mutex_t close_mut; // Mock values +bool g_shutdown = false; char *mock_server_addr; unsigned int mock_port; int mock_wait_status; diff --git a/tests/test_crud_interface.c b/tests/test_crud_interface.c index 39ec8c3..8649ff6 100644 --- a/tests/test_crud_interface.c +++ b/tests/test_crud_interface.c @@ -35,9 +35,11 @@ /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +bool g_shutdown = false; extern CrudMsg *crudMsgQ; int numLoops = 1; wrp_msg_t *temp = NULL; + /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ diff --git a/tests/test_service_alive.c b/tests/test_service_alive.c index 26ba5ed..c1f5433 100644 --- a/tests/test_service_alive.c +++ b/tests/test_service_alive.c @@ -33,6 +33,18 @@ static void *keep_alive_thread(); static void add_client(); int sock1; pthread_t threadId; +pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER; + +pthread_cond_t *get_global_crud_con(void) +{ + return &crud_con; +} + +pthread_mutex_t *get_global_crud_mut(void) +{ + return &crud_mut; +} /*----------------------------------------------------------------------------*/ /* Tests */ @@ -48,12 +60,14 @@ void *CRUDHandlerTask() { return NULL; } + static void add_client() { const wrp_msg_t reg = { .msg_type = WRP_MSG_TYPE__SVC_REGISTRATION, .u.reg.service_name = "service_client", .u.reg.url = TEST_SERVICE_URL}; + pthread_t test_tid; void *bytes; int size =0; int rv; @@ -71,7 +85,7 @@ static void add_client() ParodusPrint("decoded service_name:%s\n", message->u.reg.service_name); ParodusPrint("decoded dest:%s\n", message->u.reg.url); - StartThread(client_rcv_task); + StartThread(client_rcv_task, &test_tid); status = addToList(&message); ParodusPrint("addToList status is %d\n", status); diff --git a/tests/test_spin_thread_e.c b/tests/test_spin_thread_e.c index 77af77f..4171ea6 100644 --- a/tests/test_spin_thread_e.c +++ b/tests/test_spin_thread_e.c @@ -23,6 +23,8 @@ #include "../src/ParodusInternal.h" #include "../src/spin_thread.h" +pthread_t test_tid; + /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ @@ -54,7 +56,7 @@ void *_routine(void *v) void test_StartThread_error() { - StartThread(&_routine); + StartThread(&_routine, &test_tid); } void add_suites( CU_pSuite *suite ) diff --git a/tests/test_spin_thread_s.c b/tests/test_spin_thread_s.c index 91a8897..8048684 100644 --- a/tests/test_spin_thread_s.c +++ b/tests/test_spin_thread_s.c @@ -23,6 +23,8 @@ #include "../src/ParodusInternal.h" #include "../src/spin_thread.h" +pthread_t test_tid; + /*----------------------------------------------------------------------------*/ /* Mocks */ /*----------------------------------------------------------------------------*/ @@ -44,7 +46,7 @@ void *_routine(void *v) void test_StartThread_success() { - StartThread(&_routine); + StartThread(&_routine, &test_tid); } void add_suites( CU_pSuite *suite ) diff --git a/tests/test_thread_tasks.c b/tests/test_thread_tasks.c index 9375ec0..c3a6102 100755 --- a/tests/test_thread_tasks.c +++ b/tests/test_thread_tasks.c @@ -30,6 +30,7 @@ /*----------------------------------------------------------------------------*/ /* File Scoped Variables */ /*----------------------------------------------------------------------------*/ +bool g_shutdown = false; ParodusMsg *ParodusMsgQ; pthread_mutex_t g_mutex; pthread_cond_t g_cond; diff --git a/tests/test_token.c b/tests/test_token.c index cb1827c..f6206af 100644 --- a/tests/test_token.c +++ b/tests/test_token.c @@ -158,6 +158,19 @@ extern void read_key_from_file (const char *fname, char *buf, size_t buflen); extern const char *get_tok (const char *src, int delim, char *result, int resultsize); extern unsigned int get_algo_mask (const char *algo_str); +pthread_mutex_t crud_mut=PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t crud_con=PTHREAD_COND_INITIALIZER; + +pthread_cond_t *get_global_crud_con(void) +{ + return &crud_con; +} + +pthread_mutex_t *get_global_crud_mut(void) +{ + return &crud_mut; +} + void addCRUDmsgToQueue(wrp_msg_t *crudMsg) { (void)crudMsg; diff --git a/tests/test_upstream.c b/tests/test_upstream.c index 7c2d747..2f99833 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -38,6 +38,7 @@ /*----------------------------------------------------------------------------*/ static noPollConn *conn; static char *reconnect_reason = "webpa_process_starts"; +bool g_shutdown = false; static ParodusCfg parodusCfg; extern size_t metaPackSize; extern UpStreamMsg *UpStreamMsgQ;