From d0535521c744015fa589c972b6089811d3d6b69a Mon Sep 17 00:00:00 2001 From: skrishnamoorthy01 Date: Fri, 22 Jun 2018 13:11:31 +0530 Subject: [PATCH] Fix more leak for crud request --- src/crud_interface.c | 2 +- src/crud_internal.c | 174 ++++++++++++++++++++----------------- src/crud_tasks.c | 31 +------ src/upstream.c | 6 +- tests/test_crud_internal.c | 51 ++++++++++- tests/test_crud_tasks.c | 4 - tests/test_upstream.c | 21 +++-- 7 files changed, 168 insertions(+), 121 deletions(-) diff --git a/src/crud_interface.c b/src/crud_interface.c index 9d79dad..d6a5aea 100644 --- a/src/crud_interface.c +++ b/src/crud_interface.c @@ -116,7 +116,7 @@ void *CRUDHandlerTask() resp_size = wrp_struct_to( crud_response, WRP_BYTES, &resp_bytes ); ParodusPrint("Encoded CRUD resp_size :%lu\n", resp_size); - ParodusInfo("Adding CRUD response to upstreamQ\n"); + ParodusPrint("Adding CRUD response to upstreamQ\n"); addCRUDresponseToUpstreamQ(resp_bytes, resp_size); wrp_free_struct(crud_response); } diff --git a/src/crud_internal.c b/src/crud_internal.c index d86146b..204d14e 100644 --- a/src/crud_internal.c +++ b/src/crud_internal.c @@ -26,6 +26,7 @@ #include "config.h" static void freeObjArray(char *(*obj)[], int size); +static int writeIntoCrudJson(cJSON *res_obj, char * object, cJSON *objValue, int freeFlag); static int parse_dest_elements_to_string(wrp_msg_t *reqMsg, char *(*obj)[]); static char* strdupptr( const char *s, const char *e ); @@ -70,15 +71,42 @@ int readFromJSON(char **data) fclose(fp); return 1; } +/* +* @res_obj json object to add it in crud config json file +* @object parent json obj name i.e tags +* @objValue child json obj to be added to parent i.e test +* @freeFlag Based on this flag, writeIntoCrudJson decides to do free/cjson_delete for the json object +*/ +int writeIntoCrudJson(cJSON *res_obj, char * object, cJSON *objValue, int freeFlag) +{ + char *out = NULL; + int write_status = 0; + cJSON_AddItemToObject(res_obj , object, objValue); + out = cJSON_PrintUnformatted(res_obj ); + ParodusPrint("out : %s\n",out); + write_status = writeToJSON(out); + + if(out !=NULL) + { + free( out ); + } + if(res_obj !=NULL && freeFlag) + { + cJSON_Delete(res_obj); + } + else if(res_obj != NULL) + { + free(res_obj); + } + return write_status; +} int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) { - char *out = NULL; - cJSON *parameters = NULL; cJSON *json, *jsonPayload = NULL; char *obj[5]; - int objlevel = 0, i = 0, j=0; + int objlevel = 0, j=0; char *jsonData = NULL; cJSON *testObj1 = NULL; char *resPayload = NULL; @@ -90,7 +118,7 @@ int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) ParodusInfo("Processing createObject\n"); status = readFromJSON(&jsonData); - ParodusInfo("read status %d\n", status); + ParodusPrint("read status %d\n", status); if(status == 0) { @@ -145,10 +173,6 @@ int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) } ParodusInfo( "Number of object level %d\n", objlevel ); - for( i = 0; i <= objlevel; i++ ) - { - ParodusInfo("obj[%d] is %s \n", i, obj[i]); - } /* Valid request will be mac:14cfexxxx/parodus/tags/${name} which is objlevel 4 */ if(objlevel == 4) @@ -163,7 +187,7 @@ int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) ParodusPrint( "jsonPayloadSize is %d\n", jsonPayloadSize ); if(jsonPayloadSize) { - cJSON* res_obj = cJSON_CreateObject(); + cJSON* res_obj = NULL; key= cJSON_GetArrayItem( jsonPayload, 0 )->string; value = cJSON_GetArrayItem( jsonPayload, 0 )->valueint; ParodusInfo("key:%s value:%d\n", key, value); @@ -188,7 +212,12 @@ int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) { ParodusError( "Create is not allowed. testObj %s is already exists in json\n", testkey ); (*response)->u.crud.status = 409; - break; + cJSON_Delete( jsonPayload ); + jsonPayload = NULL; + cJSON_Delete(json); + json = NULL; + freeObjArray(&obj, objlevel); + return -1; } else { @@ -199,52 +228,52 @@ int createObject( wrp_msg_t *reqMsg , wrp_msg_t **response) else { ParodusInfo("testObj doesnot exists in json, adding it\n"); - cJSON *payloadObj = cJSON_CreateObject(); + res_obj = cJSON_CreateObject(); + //To add into crud json config file cJSON_AddItemToObject(tagObj, obj[objlevel], testObj1 = cJSON_CreateObject()); cJSON_AddNumberToObject(testObj1, key, value); - cJSON_AddItemToObject(payloadObj, obj[objlevel], testObj1); + //To add into response payload + cJSON *payloadObj = cJSON_CreateObject(); + cJSON * tmpObj = NULL; + cJSON_AddItemToObject(payloadObj, obj[objlevel], tmpObj = cJSON_CreateObject()); + cJSON_AddNumberToObject(tmpObj, key, value); resPayload = cJSON_PrintUnformatted(payloadObj); + cJSON_Delete( payloadObj ); (*response)->u.crud.payload = resPayload; + (*response)->u.crud.payload_size = strlen(resPayload); (*response)->u.crud.status = 201; + //Pass freeflag as 0 if you add new child obj to parent obj i.e test obj creation + create_status = writeIntoCrudJson(res_obj,"tags", tagObj, 0); } } else { ParodusInfo("tagObj doesnot exists in json, adding it\n"); - cJSON_AddItemToObject(res_obj , "tags", parameters = cJSON_CreateObject()); - cJSON_AddItemToObject(parameters, obj[objlevel], testObj1 = cJSON_CreateObject()); + //To add into crud json config file + res_obj = cJSON_CreateObject(); + tagObj = cJSON_CreateObject(); + cJSON_AddItemToObject(tagObj, obj[objlevel], testObj1 = cJSON_CreateObject()); cJSON_AddNumberToObject(testObj1, key, value); - resPayload = cJSON_PrintUnformatted(parameters); + //To add into response payload + resPayload = cJSON_PrintUnformatted(tagObj); (*response)->u.crud.payload = resPayload; + (*response)->u.crud.payload_size = strlen(resPayload); (*response)->u.crud.status = 201; + //Pass freeflag as 1 if you create new parent json object i.e tag obj creation + create_status = writeIntoCrudJson(res_obj,"tags", tagObj, 1); } - cJSON_AddItemToObject(res_obj , "tags", tagObj); - out = cJSON_PrintUnformatted(res_obj ); - ParodusInfo("out : %s\n",out); - - create_status = writeToJSON(out); - if(out !=NULL) - { - free( out ); - out = NULL; - } cJSON_Delete( jsonPayload ); jsonPayload = NULL; cJSON_Delete(json); json = NULL; - - if(res_obj != NULL) - { - free(res_obj); - } freeObjArray(&obj, objlevel); if(create_status == 1) { - ParodusInfo("Data is successfully added to JSON\n"); + ParodusPrint("Data is successfully added to JSON\n"); } else { @@ -458,10 +487,6 @@ int retrieveObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) } ParodusInfo( "Number of object level %d\n", objlevel ); - for( i = 0; i <= objlevel; i++ ) - { - ParodusInfo("obj[%d] is %s \n", i, obj[i]); - } if(objlevel == 3 && ((obj[3] !=NULL) && strstr(obj[3] ,"tags") == NULL)) { @@ -491,7 +516,7 @@ int retrieveObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) ParodusInfo("Processing CRUD external tag request \n"); status = readFromJSON(&jsonData); - ParodusInfo("read status %d\n", status); + ParodusPrint("read status %d\n", status); if(status) { @@ -636,11 +661,9 @@ int retrieveObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) { - char *out = NULL; - cJSON *parameters = NULL; cJSON *json, *jsonPayload = NULL; char *obj[5]; - int objlevel = 0, i = 0, j=0; + int objlevel = 0, j=0; char *jsonData = NULL; cJSON *testObj1 = NULL; int update_status = 0, jsonPayloadSize =0; @@ -649,10 +672,8 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) const char *parse_error = NULL; int status =0; - ParodusInfo("Processing updateObject\n"); - status = readFromJSON(&jsonData); - ParodusInfo("read status %d\n", status); + ParodusPrint("read status %d\n", status); if(status == 0) { ParodusInfo("Proceed creating CRUD config %s\n", get_parodus_cfg()->crud_config_file ); @@ -705,10 +726,6 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) } ParodusInfo( "Number of object level %d\n", objlevel ); - for( i = 0; i <= objlevel; i++ ) - { - ParodusInfo("obj[%d] is %s \n", i, obj[i]); - } /* Valid request will be mac:14cfexxxx/parodus/tags/${name} which is objlevel 4 */ if(objlevel == 4) @@ -724,7 +741,6 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) if(jsonPayloadSize) { cJSON* res_obj = cJSON_CreateObject(); - cJSON *payloadObj = cJSON_CreateObject(); key = cJSON_GetArrayItem( jsonPayload, 0 )->string; value = cJSON_GetArrayItem( jsonPayload, 0 )->valueint; ParodusInfo("key:%s value:%d\n", key, value); @@ -743,7 +759,7 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) ParodusPrint( "jsontagitemSize is %d\n", jsontagitemSize ); //traverse through each test objects to find match - for( i = 0 ; j < jsontagitemSize ; j++ ) + for( j = 0 ; j < jsontagitemSize ; j++ ) { testkey = cJSON_GetArrayItem( tagObj, j )->string; ParodusPrint("testkey is %s\n", testkey); @@ -751,8 +767,9 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) { ParodusInfo( "testObj already exists in json. Update it\n" ); cJSON_ReplaceItemInObject(testObj,key,cJSON_CreateNumber(value)); - cJSON_AddItemToObject(payloadObj, obj[objlevel] , testObj); (*response)->u.crud.status = 200; + //Pass freeflag as 0 if you add new child obj to parent obj i.e test obj creation + update_status = writeIntoCrudJson(res_obj,"tags",tagObj,0); break; } else @@ -766,31 +783,23 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) ParodusInfo("testObj doesnot exists in json, adding it\n"); cJSON_AddItemToObject(tagObj, obj[objlevel], testObj1 = cJSON_CreateObject()); cJSON_AddNumberToObject(testObj1, key, value); - cJSON_AddItemToObject(payloadObj, obj[objlevel], testObj1); (*response)->u.crud.status = 201; + //Pass freeflag as 0 if you add new child obj to parent obj i.e test obj creation + update_status = writeIntoCrudJson(res_obj,"tags",tagObj,0); } } else { ParodusInfo("tagObj doesnot exists in json, adding it\n"); - cJSON_AddItemToObject(res_obj , "tags", parameters = cJSON_CreateObject()); - cJSON_AddItemToObject(parameters, obj[objlevel], testObj1 = cJSON_CreateObject()); + + tagObj = cJSON_CreateObject(); + cJSON_AddItemToObject(tagObj, obj[objlevel], testObj1 = cJSON_CreateObject()); cJSON_AddNumberToObject(testObj1, key, value); - cJSON_AddItemToObject(payloadObj, obj[objlevel], testObj1); (*response)->u.crud.status = 201; + //Pass freeflag as 1 if you create new parent json object i.e tag obj creation + update_status = writeIntoCrudJson(res_obj,"tags",tagObj,1); } - cJSON_AddItemToObject(res_obj , "tags", tagObj); - out = cJSON_PrintUnformatted(res_obj ); - ParodusInfo("out : %s\n",out); - - update_status = writeToJSON(out); - - if(out !=NULL) - { - free( out ); - out = NULL; - } cJSON_Delete( jsonPayload ); jsonPayload = NULL; cJSON_Delete(json); @@ -798,7 +807,7 @@ int updateObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) freeObjArray(&obj, objlevel); if(update_status == 1) { - ParodusInfo("Data is successfully added to JSON\n"); + ParodusPrint("Data is successfully added to JSON\n"); } else { @@ -902,11 +911,6 @@ int deleteObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) } ParodusInfo( "Number of object level %d\n", objlevel ); - for( i = 0; i <= objlevel; i++ ) - { - ParodusInfo("obj[%d] is %s \n", i, obj[i]); - } - paramArray = cJSON_GetObjectItem( json, "tags" ); if( paramArray != NULL ) { @@ -938,7 +942,6 @@ int deleteObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) if( strcmp( cJSON_GetArrayItem( paramArray, i )->string, obj[objlevel] ) == 0 ) { ParodusInfo("Delete: requested object found \n"); - ParodusInfo("deleting from paramArray\n"); cJSON_DeleteItemFromArray(paramArray, i); found = 1; (*response)->u.crud.status = 200; @@ -989,12 +992,12 @@ int deleteObject( wrp_msg_t *reqMsg, wrp_msg_t **response ) (*response)->u.crud.status = 500; return -1; } - out = cJSON_Print( json ); + out = cJSON_PrintUnformatted( json ); ParodusPrint("%s\n",out); delete_status = writeToJSON(out); if(delete_status == 1) { - ParodusInfo("Deleted Data is successfully updated to JSON\n"); + ParodusPrint("Deleted Data is successfully updated to JSON\n"); } else { @@ -1044,8 +1047,11 @@ static int parse_dest_elements_to_string(wrp_msg_t *reqMsg, char *(*obj)[]) end = strchr(start, '/'); if (NULL != end) { - (*obj)[rv] = strdupptr(start, end); - ParodusPrint("obj[%d] is %s\n", rv, (*obj)[rv]); + char *testElem = NULL; + char *tagElem = NULL; + + tagElem = strdupptr(start, end); + ParodusPrint("tagElem is %s\n",tagElem); start = end; start++; @@ -1053,9 +1059,8 @@ static int parse_dest_elements_to_string(wrp_msg_t *reqMsg, char *(*obj)[]) end = strchr(start, '/'); if (NULL == end) { - rv = rv+1; - (*obj)[rv] = strdup(start); - ParodusPrint("obj[%d] is %s\n", rv, (*obj)[rv]); + testElem = strdup(start); + ParodusPrint("testElem is %s\n", testElem); } else { @@ -1063,6 +1068,19 @@ static int parse_dest_elements_to_string(wrp_msg_t *reqMsg, char *(*obj)[]) freeObjArray(obj, rv); rv = -1; } + + if(rv != -1) + { + //Reuse array of pointer + free((*obj)[rv]); + (*obj)[rv] = NULL; + (*obj)[rv] = tagElem; + (*obj)[++rv] = testElem; + } + else if (tagElem != NULL) + { + free(tagElem); + } } return rv; diff --git a/src/crud_tasks.c b/src/crud_tasks.c index 1081033..829ff68 100644 --- a/src/crud_tasks.c +++ b/src/crud_tasks.c @@ -10,7 +10,6 @@ int processCrudRequest( wrp_msg_t *reqMsg, wrp_msg_t **responseMsg) { wrp_msg_t *resp_msg = NULL; - char *str= NULL; int ret = -1; resp_msg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); @@ -29,20 +28,7 @@ int processCrudRequest( wrp_msg_t *reqMsg, wrp_msg_t **responseMsg) ret = createObject( reqMsg, &resp_msg ); - if(ret == 0) - { - cJSON *payloadObj = cJSON_Parse( (resp_msg)->u.crud.payload ); - str = cJSON_PrintUnformatted(payloadObj); - - resp_msg ->u.crud.payload = (void *)str; - if(str !=NULL) - { - ParodusInfo("Payload Response: %s\n", str); - resp_msg ->u.crud.payload_size = strlen(str); - } - cJSON_Delete( payloadObj ); - } - else + if(ret != 0) { ParodusError("Failed to create object in config JSON\n"); @@ -60,20 +46,7 @@ int processCrudRequest( wrp_msg_t *reqMsg, wrp_msg_t **responseMsg) ParodusInfo( "RETREIVE request\n" ); ret = retrieveObject( reqMsg, &resp_msg ); - if(ret == 0) - { - cJSON *payloadObj = cJSON_Parse( (resp_msg)->u.crud.payload ); - str = cJSON_PrintUnformatted(payloadObj); - - resp_msg ->u.crud.payload = (void *)str; - if((resp_msg)->u.crud.payload !=NULL) - { - ParodusInfo("Payload Response: %s\n", str); - resp_msg ->u.crud.payload_size = strlen((resp_msg)->u.crud.payload); - } - cJSON_Delete( payloadObj ); - } - else + if(ret != 0) { ParodusError("Failed to retrieve object \n"); diff --git a/src/upstream.c b/src/upstream.c index 6df5bbb..63475fb 100644 --- a/src/upstream.c +++ b/src/upstream.c @@ -336,7 +336,11 @@ void *processUpstreamMessage() } //nn_freemsg should not be done for parodus/tags/ CRUD requests as it is not received through nanomsg. - if ((msg->u.crud.source !=NULL) && strstr(msg->u.crud.source, "parodus") == NULL) + if ((msg->u.crud.source !=NULL) && strstr(msg->u.crud.source, "parodus") != NULL) + { + free(message->msg); + } + else { if(nn_freemsg (message->msg) < 0) { diff --git a/tests/test_crud_internal.c b/tests/test_crud_internal.c index e5e4d22..fbfb81d 100644 --- a/tests/test_crud_internal.c +++ b/tests/test_crud_internal.c @@ -450,6 +450,54 @@ void test_createObject_JsonParse() wrp_free_struct(reqMsg); wrp_free_struct(respMsg); +} + +void test_UnsupportedDestination() +{ + int ret = 0; + int write_ret = -1; + FILE *fp; + char *testdata = NULL; + + wrp_msg_t *reqMsg = NULL; + reqMsg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); + memset(reqMsg, 0, sizeof(wrp_msg_t)); + + wrp_msg_t *respMsg = NULL; + respMsg = ( wrp_msg_t *)malloc( sizeof( wrp_msg_t ) ); + memset(respMsg, 0, sizeof(wrp_msg_t)); + + ParodusCfg cfg; + memset(&cfg,0,sizeof(cfg)); + cfg.crud_config_file = strdup("parodus_cfg.json"); + set_parodus_cfg(&cfg); + testdata=strdup("{ \"expires\" : 1522451870 }"); + write_ret = writeToJSON(testdata); + assert_int_equal (write_ret, 1); + reqMsg->msg_type = 5; + reqMsg->u.crud.transaction_uuid = strdup("1234"); + reqMsg->u.crud.source = strdup("tag-update"); + reqMsg->u.crud.dest = strdup("mac:14xxx/parodus/tags/test1/test2"); + reqMsg->u.crud.payload = strdup("{ \"expires\" : 1522451870 }"); + + respMsg->msg_type = 5; + ret = createObject(reqMsg, &respMsg); + assert_int_equal (respMsg->u.crud.status, 400); + assert_int_equal (ret, -1); + + + fp = fopen(cfg.crud_config_file, "r"); + if (fp != NULL) + { + system("rm parodus_cfg.json"); + fclose(fp); + } + if(cfg.crud_config_file !=NULL) + free(cfg.crud_config_file); + + wrp_free_struct(reqMsg); + wrp_free_struct(respMsg); + } void test_createObject_withProperPayload() @@ -631,7 +679,7 @@ void test_createObject_existingObj() ret = createObject(reqMsg, &respMsg); assert_int_equal (respMsg->u.crud.status, 409); - assert_int_equal (ret, 0); + assert_int_equal (ret, -1); fp = fopen(cfg.crud_config_file, "r"); @@ -1901,6 +1949,7 @@ int main(void) cmocka_unit_test(test_createObjectInvalid_JsonParseErr), cmocka_unit_test(test_createObject_destNull), cmocka_unit_test(test_createObject_JsonParse), + cmocka_unit_test(test_UnsupportedDestination), cmocka_unit_test(test_createObject_withProperPayload), cmocka_unit_test(test_createObject_withWrongPayload), cmocka_unit_test(test_createObject_multipleObjects), diff --git a/tests/test_crud_tasks.c b/tests/test_crud_tasks.c index fb10da2..5a0ccd6 100644 --- a/tests/test_crud_tasks.c +++ b/tests/test_crud_tasks.c @@ -92,8 +92,6 @@ void test_processCrudRequestCreate() will_return(createObject, 0); expect_function_call(createObject); - will_return(cJSON_Parse, 0); - expect_function_call(cJSON_Parse); ret = processCrudRequest(reqMsg, &response); assert_int_equal(ret, 0); @@ -134,8 +132,6 @@ void test_processCrudRequestRetrieve() reqMsg->u.crud.dest = strdup("mac:14xxx/parodus/tags"); will_return(retrieveObject, 0); expect_function_call(retrieveObject); - will_return(cJSON_Parse, 0); - expect_function_call(cJSON_Parse); ret = processCrudRequest(reqMsg, &response); assert_int_equal(ret, 0); diff --git a/tests/test_upstream.c b/tests/test_upstream.c index ca90a25..af49c7a 100644 --- a/tests/test_upstream.c +++ b/tests/test_upstream.c @@ -295,7 +295,8 @@ void test_processUpstreamMessage() expect_function_call(appendEncodedData); expect_function_call(sendMessage); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); @@ -330,7 +331,8 @@ void test_processUpstreamMessageInvalidPartner() expect_function_call(appendEncodedData); expect_function_call(sendMessage); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); free(temp); @@ -386,7 +388,8 @@ void test_processUpstreamMessageRegMsg() will_return(get_numOfClients, 1); expect_function_call(get_numOfClients); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); @@ -426,7 +429,8 @@ void test_processUpstreamMessageRegMsgNoClients() will_return(addToList, 0); expect_function_call(addToList); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); @@ -456,9 +460,10 @@ void err_processUpstreamMessageDecodeErr() temp = (wrp_msg_t *) malloc(sizeof(wrp_msg_t)); memset(temp,0,sizeof(wrp_msg_t)); temp->msg_type = 3; - will_return(wrp_to_struct, -1); expect_function_call(wrp_to_struct); + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); free(temp); @@ -480,7 +485,8 @@ void err_processUpstreamMessageMetapackFailure() will_return(wrp_to_struct, 15); expect_function_call(wrp_to_struct); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage(); free(temp); @@ -541,7 +547,8 @@ void err_processUpstreamMessageRegMsg() will_return(addToList, -1); expect_function_call(addToList); - + will_return(nn_freemsg, 0); + expect_function_call(nn_freemsg); expect_function_call(wrp_free_struct); processUpstreamMessage();