Compare commits

...

24 Commits

Author SHA1 Message Date
Marcia Piccione
e66c984146 update host name of quic server 2023-04-25 13:41:09 -04:00
Marcia Piccione
fb24b899f7 update quic endpoints 2023-04-25 12:16:33 -04:00
Marcia Piccione
e04c5a106a push latest to try compiling from xmidt 2023-03-30 15:33:59 -04:00
Marcia Piccione
34cacec824 update cmake file for msquic and nghttp3 (too many issues building lightspeed) 2023-03-28 20:27:48 -04:00
Marcia Piccione
96403b76b6 update build with lightspeed just for reference... there seems to be a lot of out of date openssl stuff 2023-03-28 17:06:35 -04:00
Marcia Piccione
eb481cdd4b Merge branch 'master' into quic 2023-03-28 17:05:50 -04:00
Marcia Piccione
3077125fa9 initial commit 2023-03-24 14:39:06 -04:00
shilpa24balaji
1a7ae0b785 Merge pull request #417 from sadhyama/rbus_log
Enable rbus logs in parodus
2023-03-23 11:53:46 +05:30
Sadhyama Vengilat
bc655cf9ba Parodus to enable rbus ERROR level logging 2023-03-17 17:28:40 +05:30
Sadhyama Vengilat
ad0491179d Remove threadId from rbus log 2023-03-16 18:31:02 +05:30
Sadhyama Vengilat
fa49a52a94 Register rbus logger to enable more debug logs from rbus 2023-03-16 11:19:11 +05:30
shilpa24balaji
25baef78a8 Merge pull request #416 from sadhyama/max_queue
Fix parodus crash when xmidtSend reaches max queue size during WFO
2023-03-13 23:41:07 +05:30
Sadhyama Vengilat
9034ef9d10 To fix low qos msgs immediate delete during max queue size 2023-03-03 21:03:51 +05:30
Sadhyama Vengilat
9020089016 To skip max queue size callback for already processed msgs 2023-02-28 17:39:33 +05:30
sadhyama
664690e6a6 Merge pull request #415 from sadhyama/wfo_subscribe
Subscribe to CurrentActiveInterfaceEvent before initial cloud connection
2023-02-22 09:57:42 +05:30
shilpa24balaji
4aed47b730 Merge pull request #414 from Mike0893/master
RDKC-13073 : Remove camera specific patch from parodus
2023-02-21 13:22:16 +05:30
Sadhyama Vengilat
2f3f1424b4 Subscribe to CurrentActiveInterfaceEvent before initial cloud connection 2023-02-17 19:11:03 +05:30
mmikhi643
91ae0e82e2 RDKC-13073 : Remove camera specific patch from parodus 2023-02-15 16:20:20 +00:00
mmikhi643
bfc2659bbb RDKC-13073 : Remove camera specific patch from parodus 2023-02-14 14:43:29 +00:00
shilpa24balaji
f9c2878cbf Merge pull request #412 from vasuki01/null_check
"Added null check as fix for parodus crash"
2023-01-17 19:56:53 +05:30
Vasuki
c84a1bdfad "Added null check as fix for parodus crash" 2023-01-11 22:42:23 +05:30
shilpa24balaji
0b0309c3dd Merge pull request #410 from Thanusha-D/traceContext_api
Log traceParent & traceState value in upstream request & event messages
2022-12-22 08:57:51 +05:30
Thanusha D
f4e358c179 Log traceParent & traceState value in upstream request & event messages 2022-12-21 16:04:32 +05:30
Weston Schmidt
193706c21c Release 1.1.5 2021-06-02 18:46:53 -07:00
16 changed files with 389 additions and 41 deletions

View File

@@ -16,7 +16,6 @@ on:
- 'v[0-9]+.[0-9]+.[0-9]+'
branches:
- main
- master
jobs:
test:

View File

@@ -10,7 +10,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Fix Parodus connection stuck on interface up down received together
- Update to use nopoll version 1.0.3
## [1.1.4]
## [v1.1.5]
- Add additional HTTP headers for call to Themis from Convey
- Change default branch name to `main`
## [v1.1.4]
- on connect retry, requery jwt only if it failed before
- put two timestamps in connection health file; start conn and current
- change health file update interval to 240sec
@@ -105,8 +109,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- Initial creation
[Unreleased]: https://github.com/Comcast/parodus/compare/1.1.4...HEAD
[1.1.4]: https://github.com/Comcast/parodus/compare/1.1.3...1.1.4
[Unreleased]: https://github.com/Comcast/parodus/compare/v1.1.5...HEAD
[1.1.5]: https://github.com/Comcast/parodus/compare/v1.1.4...v1.1.5
[1.1.4]: https://github.com/Comcast/parodus/compare/1.1.3...v1.1.4
[1.1.3]: https://github.com/Comcast/parodus/compare/1.1.2...1.1.3
[1.1.2]: https://github.com/Comcast/parodus/compare/1.1.1...1.1.2
[1.1.1]: https://github.com/Comcast/parodus/compare/1.0.4...1.1.1

View File

@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
cmake_minimum_required(VERSION 2.8.7)
cmake_minimum_required(VERSION 3.10)
#project(parodus VERSION 1.1.15)
#project(parodus VERSION 1.1.5)
project(parodus)
include(ExternalProject)
@@ -47,6 +47,7 @@ if (ENABLE_WEBCFGBIN)
include_directories(${INCLUDE_DIR}/rbus)
endif (ENABLE_WEBCFGBIN)
# Get git commit hash
#-------------------------------------------------------------------------------
execute_process(
@@ -63,9 +64,18 @@ add_definitions("-DGIT_COMMIT_TAG=\"${GIT_COMMIT_TAG}\"")
add_definitions(-std=c99)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -D_GNU_SOURCE -DNOPOLL_LOGGER ")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Wall -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wall")
if (DEVICE_CAMERA)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
add_definitions(-DDEVICE_CAMERA)
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=all -Wno-missing-field-initializers")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=all")
endif (DEVICE_CAMERA)
if (INCLUDE_BREAKPAD)
add_definitions(-DINCLUDE_BREAKPAD)
endif (INCLUDE_BREAKPAD)
# pthread external dependency
#-------------------------------------------------------------------------------
@@ -216,6 +226,42 @@ ExternalProject_Add(cjwt
add_library(libcjwt STATIC SHARED IMPORTED)
add_dependencies(libcjwt cjwt)
# libmsh3 external dependency
#-------------------------------------------------------------------------------
ExternalProject_Add(msh3
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/msh3
GIT_REPOSITORY https://github.com/nibanks/msh3.git
GIT_TAG "v0.6.0"
GIT_SUBMODULES_RECURSE true
CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -G "Unix Makefiles" -DBUILD_TESTING=OFF
)
add_library(libmsh3 STATIC SHARED IMPORTED)
add_dependencies(libmsh3 msh3)
# libmsquic external dependency
#-------------------------------------------------------------------------------
# ExternalProject_Add(msquic
# PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/msquic
# GIT_REPOSITORY https://github.com/microsoft/msquic.git
# GIT_TAG "v2.1.8"
# CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
# )
# add_library(libmsquic STATIC SHARED IMPORTED)
# add_dependencies(libmsquic msquic)
# # libnghttp3 external dependency
# #-------------------------------------------------------------------------------
# ExternalProject_Add(nghttp3
# PREFIX ${CMAKE_CURRENT_BINARY_DIR}/_prefix/nghttp3
# GIT_REPOSITORY https://github.com/ngtcp2/nghttp3.git
# GIT_TAG "v0.10.0"
# CMAKE_ARGS += -DCMAKE_INSTALL_PREFIX=${INSTALL_DIR} -DBUILD_TESTING=OFF
# )
# add_library(libnghttp3 STATIC SHARED IMPORTED)
# add_dependencies(libnghttp3 nghttp3)
if (FEATURE_DNS_QUERY)
# libucresolv external dependency
#-------------------------------------------------------------------------------

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# build stage
FROM alpine:3.12
RUN \
apk add --no-cache bsd-compat-headers linux-headers cmake autoconf make musl-dev gcc g++ openssl openssl-dev git cunit cunit-dev automake libtool util-linux-dev && \
mkdir -p build
COPY src src
COPY patches patches
COPY tests tests
COPY CMakeLists.txt .
COPY run.sh .
RUN cd build && \
cmake .. && make
CMD ["build/src/parodus"]

78
run.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/bin/sh
parodus_port=16014d
if [[ -z "${URL}" ]]; then
URL="http://petasos:6400"
fi
if [[ -z "${FIRMWARE}" ]]; then
FIRMWARE="mock-rdkb-firmware"
fi
if [[ -z "${BOOT_TIME}" ]]; then
BOOT_TIME=$(date +%s)
fi
if [[ -z "${HW_MANUFACTURER}" ]]; then
HW_MANUFACTURER="Example Inc."
fi
if [[ -z "${REBOOT_REASON}" ]]; then
REBOOT_REASON="unknown"
fi
if [[ -z "${SERIAL_NUMBER}" ]]; then
SERIAL_NUMBER="mock-rdkb-simulator"
fi
if [[ -z "${PARTNER_ID}" ]]; then
PARTNER_ID="comcast"
fi
if [[ -z "${CMAC}" ]]; then
CMAC="112233445566"
fi
if [[ -z "${TOKEN_SERVER_URL}" ]]; then
TOKEN_SERVER_URL="http://themis:6501/issue"
fi
if [[ -z "${SSL_CERT_PATH}" ]]; then
SSL_CERT_PATH="/etc/ssl/certs/ca-certificates.crt"
fi
#In this docker-compose cluster, themis has mtls disabled so
#feel free to ignore the --client-cert-path flag value
#it is required by parodus to fetch a token
if [[ -z "${CLIENT_CERT_PATH}" ]]; then
CLIENT_CERT_PATH="/etc/ssl/certs/ca-certificates.crt"
fi
# MTLS_CLIENT_* is used to authenticate with talaria.
if [[ -z "${MTLS_CLIENT_CERT_PATH}" ]]; then
MTLS_CLIENT_CERT_PATH=""
fi
if [[ -z "${MTLS_CLIENT_KEY_PATH}" ]]; then
MTLS_CLIENT_KEY_PATH=""
fi
build/src/parodus --hw-model=aker-testing \
--ssl-cert-path=$SSL_CERT_PATH \
--client-cert-path=$CLIENT_CERT_PATH \
--mtls-client-cert-path=$MTLS_CLIENT_CERT_PATH \
--mtls-client-key-path=$MTLS_CLIENT_KEY_PATH \
--hw-serial-number=$SERIAL_NUMBER \
--hw-manufacturer=$HW_MANUFACTURER \
--hw-mac=$CMAC \
--hw-last-reboot-reason=$REBOOT_REASON \
--fw-name=$FIRMWARE \
--boot-time=$BOOT_TIME \
--partner-id=$PARTNER_ID \
--parodus-local-url=tcp://127.0.0.1:$parodus_port \
--webpa-ping-timeout=60 \
--token-server-url=$TOKEN_SERVER_URL \
--webpa-backoff-max=2 \
--webpa-interface-used=eth0 \
--webpa-url=$URL \
--force-ipv4

View File

@@ -164,6 +164,7 @@ void timespec_diff(struct timespec *start, struct timespec *stop,
#ifdef ENABLE_WEBCFGBIN
void subscribeRBUSevent();
int regXmidtSendDataMethod();
void registerRbusLogger();
#endif
#ifdef WAN_FAILOVER_SUPPORTED
void setWebpaInterface(char *value);

View File

@@ -223,8 +223,14 @@ void getAuthToken(ParodusCfg *cfg)
* @param[in] nmemb size of delivered data
* @param[out] data curl response data saved.
*/
#ifndef DEVICE_CAMERA
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data)
{
#else
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *datain)
{
struct token_data *data = (struct token_data*) datain;
#endif //DEVICE_CAMERA
ParodusCfg *cfg;
size_t max_data_size = sizeof (cfg->webpa_auth_token);
size_t index = data->size;

View File

@@ -47,7 +47,11 @@ struct token_data {
int requestNewAuthToken(char *newToken, size_t len, int r_count);
void getAuthToken(ParodusCfg *cfg);
#ifndef DEVICE_CAMERA
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, struct token_data *data);
#else
size_t write_callback_fn(void *buffer, size_t size, size_t nmemb, void *data);
#endif
char* generate_trans_uuid();
#ifdef __cplusplus

View File

@@ -99,7 +99,9 @@ void createSocketConnection(void (* initKeypress)())
#endif
EventHandler();
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif
set_server_list_null (&server_list);
create_conn_rtn = createNopollConnection(ctx, &server_list);
if(!create_conn_rtn)
@@ -113,9 +115,6 @@ void createSocketConnection(void (* initKeypress)())
UpStreamMsgQ = NULL;
StartThread(handle_upstream, &upstream_tid);
StartThread(processUpstreamMessage, &upstream_msg_tid);
#ifdef WAN_FAILOVER_SUPPORTED
subscribeCurrentActiveInterfaceEvent();
#endif
ParodusMsgQ = NULL;
StartThread(messageHandlerTask, &downstream_tid);
StartThread(serviceAliveTask, &svc_alive_tid);

View File

@@ -783,8 +783,7 @@ int createNopollConnection(noPollCtx *ctx, server_list_t *server_list)
}
#endif
}
if(conn_ctx.current_server->allow_insecure <= 0)
if(conn_ctx.current_server != NULL && conn_ctx.current_server->allow_insecure <= 0)
{
ParodusInfo("Connected to server over SSL\n");
OnboardLog("Connected to server over SSL\n");

138
src/connection_http3.cpp Normal file
View File

@@ -0,0 +1,138 @@
/*++
Copyright (c) Microsoft Corporation.
Licensed under the MIT License.
--*/
#include "msh3.hpp"
#include <vector>
#include <cstring>
using namespace std;
char * host = "nginx";
//char * path = "/api/v2/devices"
char * path = "/"
struct Arguments {
const char* Host { host };
MsH3Addr Address {443};
vector<const char*> Paths {path};
bool Unsecure { true };
bool Print { true };
uint32_t Count { 1 };
} Args;
void MSH3_CALL HeaderReceived(struct MsH3Request* , const MSH3_HEADER* Header) {
if (Args.Print) {
fwrite(Header->Name, 1, Header->NameLength, stdout);
printf(":");
fwrite(Header->Value, 1, Header->ValueLength, stdout);
printf("\n");
}
}
bool MSH3_CALL DataReceived(struct MsH3Request* , uint32_t* Length, const uint8_t* Data) {
if (Args.Print) fwrite(Data, 1, *Length, stdout);
return true;
}
void MSH3_CALL Complete(struct MsH3Request* Request, bool Aborted, uint64_t AbortError) {
const uint32_t Index = (uint32_t)(size_t)Request->AppContext;
if (Args.Print) printf("\n");
if (Aborted) printf("Request %u aborted: 0x%llx\n", Index, (long long unsigned)AbortError);
else printf("Request %u complete\n", Index);
}
void ParseArgs(int argc, char **argv) {
if (argc < 2 || !strcmp(argv[1], "-?") || !strcmp(argv[1], "-h") || !strcmp(argv[1], "--help")) {
printf("usage: %s <server[:port]> [options...]\n"
" -c, --count <num> The number of times to query each path (def=1)\n"
" -h, --help Prints this help text\n"
" -p, --path <path(s)> The paths to query\n"
" -u, --unsecure Allows unsecure connections\n"
" -v, --verbose Enables verbose output\n"
" -V, --version Prints out the version\n",
argv[0]);
exit(-1);
}
// Parse the server[:port] argument.
Args.Host = argv[1];
char *port = strrchr(argv[1], ':');
if (port) {
*port = 0; port++;
Args.Address.SetPort((uint16_t)atoi(port));
}
// Parse options.
for (int i = 2; i < argc; ++i) {
if (!strcmp(argv[i], "--count") || !strcmp(argv[i], "-c")) {
if (++i >= argc) { printf("Missing count value\n"); exit(-1); }
Args.Count = (uint32_t)atoi(argv[i]);
} else if (!strcmp(argv[i], "--path") || !strcmp(argv[i], "-p")) {
if (++i >= argc) { printf("Missing path value(s)\n"); exit(-1); }
char* Path = (char*)argv[i];
do {
char* End = strchr(Path, ',');
if (End) *End = 0;
Args.Paths.push_back(Path);
if (!End) break;
Path = End + 1;
} while (true);
} else if (!strcmp(argv[i], "--unsecure") || !strcmp(argv[i], "-u")) {
Args.Unsecure = true;
} else if (!strcmp(argv[i], "--verbose") || !strcmp(argv[i], "-v")) {
Args.Print = true;
} else if (!strcmp(argv[i], "--version") || !strcmp(argv[i], "-V")) {
uint32_t Version[4]; MsH3Version(Version);
printf("Using msh3 v%u.%u.%u.%u\n", Version[0], Version[1], Version[2], Version[3]);
}
}
if (Args.Paths.empty()) {
Args.Paths.push_back("/");
}
}
int MSH3_CALL init(int argc, char **argv) {
ParseArgs(argc, argv);
MSH3_HEADER Headers[] = {
{ ":method", 7, "GET", 3 },
{ ":path", 5, Args.Paths[0], strlen(Args.Paths[0]) },
{ ":scheme", 7, "https", 5 },
{ ":authority", 10, Args.Host, strlen(Args.Host) },
{ "user-agent", 10, "curl/7.82.0-DEV", 15 },
{ "accept", 6, "*/*", 3 },
};
const size_t HeadersCount = sizeof(Headers)/sizeof(MSH3_HEADER);
MsH3Api Api;
if (Api.IsValid()) {
MsH3Connection Connection(Api, Args.Host, Args.Address, Args.Unsecure);
if (Connection.IsValid()) {
for (auto Path : Args.Paths) {
printf("HTTP/3 GET https://%s%s\n", Args.Host, Path);
Headers[1].Value = Path;
Headers[1].ValueLength = strlen(Path);
for (uint32_t i = 0; i < Args.Count; ++i) {
auto Request = new (std::nothrow) MsH3Request(Connection, Headers, HeadersCount, MSH3_REQUEST_FLAG_FIN, (void*)(size_t)(i+1), HeaderReceived, DataReceived, Complete, CleanUpAutoDelete);
if (!Request || !Request->IsValid()) {
printf("Request %u failed to start\n", i+1);
break;
}
}
}
Connection.ShutdownComplete.Wait();
}
}
return 0;
}

View File

@@ -24,7 +24,11 @@
#include "parodus_log.h"
#include <curl/curl.h>
#ifdef INCLUDE_BREAKPAD
#ifndef DEVICE_CAMERA
#include "breakpad_wrapper.h"
#else
#include "breakpadwrap.h"
#endif //DEVICE_CAMERA
#endif
#include "signal.h"
#include "privilege.h"
@@ -87,8 +91,17 @@ int main( int argc, char **argv)
signal(SIGHUP, sig_handler);
signal(SIGALRM, sig_handler);
#ifdef INCLUDE_BREAKPAD
#ifndef DEVICE_CAMERA
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
breakpad_ExceptionHandler();
#else
/* breakpad handles the signals SIGSEGV, SIGBUS, SIGFPE, and SIGILL */
BreakPadWrapExceptionHandler eh;
eh = newBreakPadWrapExceptionHandler();
if(NULL != eh) {
ParodusInfo("Breakpad Initialized\n");
}
#endif //DEVICE_CAMERA
#else
signal(SIGSEGV, sig_handler);
signal(SIGBUS, sig_handler);
@@ -98,6 +111,7 @@ int main( int argc, char **argv)
ParodusCfg *cfg;
ParodusInfo ("RAND_MAX is %ld (0x%lx)\n", RAND_MAX, RAND_MAX);
ParodusInfo("TEST PRINT");
srandom (getpid());
/* TODO not ideal, but it fixes a more major problem for now. */
@@ -107,6 +121,7 @@ int main( int argc, char **argv)
ParodusInfo("********** Starting component: Parodus **********\n ");
drop_root_privilege();
#ifdef ENABLE_WEBCFGBIN
registerRbusLogger();
subscribeRBUSevent();
regXmidtSendDataMethod();
#endif

View File

@@ -320,7 +320,10 @@ void *processUpstreamMessage()
}
else if(msgType == WRP_MSG_TYPE__EVENT)
{
ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
(msg->u.event.headers != NULL && msg->u.event.headers->headers[0] != NULL && msg->u.event.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream event data: dest '%s' traceParent: %s traceState: %s\n", msg->u.event.dest, msg->u.event.headers->headers[0], msg->u.event.headers->headers[1]) : ParodusInfo(" Received upstream event data: dest '%s'\n", msg->u.event.dest);
if(msg->u.event.transaction_uuid != NULL) {
ParodusInfo("transaction_uuid in event: %s\n", msg->u.event.transaction_uuid);
}
partners_t *partnersList = NULL;
int j = 0;
@@ -379,7 +382,7 @@ void *processUpstreamMessage()
//Sending to server for msgTypes 3, 5, 6, 7, 8.
if( WRP_MSG_TYPE__REQ == msgType )
{
ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid );
(msg->u.req.headers != NULL && msg->u.req.headers->headers[0] != NULL && msg->u.req.headers->headers[1] != NULL) ? ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s traceParent: %s traceState: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid, msg->u.req.headers->headers[0], msg->u.req.headers->headers[1]) : ParodusInfo(" Received upstream data with MsgType: %d dest: '%s' transaction_uuid: %s\n", msgType, msg->u.req.dest, msg->u.req.transaction_uuid);
sendUpstreamMsgToServer(&message->msg, message->len);
}
else

View File

@@ -47,6 +47,38 @@ rbusHandle_t get_parodus_rbus_Handle(void)
{
return rbus_Handle;
}
/* Enables rbus ERROR level logs in parodus. Modify RBUS_LOG_ERROR check if more debug logs are needed from rbus. */
void rbus_log_handler(
rbusLogLevel level,
const char* file,
int line,
int threadId,
char* message)
{
ParodusPrint("threadId %d\n", threadId);
const char* slevel = "";
if(level < RBUS_LOG_ERROR)
return;
switch(level)
{
case RBUS_LOG_DEBUG: slevel = "DEBUG"; break;
case RBUS_LOG_INFO: slevel = "INFO"; break;
case RBUS_LOG_WARN: slevel = "WARN"; break;
case RBUS_LOG_ERROR: slevel = "ERROR"; break;
case RBUS_LOG_FATAL: slevel = "FATAL"; break;
}
ParodusInfo("%5s %s:%d -- %s\n", slevel, file, line, message);
}
void registerRbusLogger()
{
rbus_registerLogHandler(rbus_log_handler);
ParodusPrint("Registered rbus log handler\n");
}
#ifdef WAN_FAILOVER_SUPPORTED
void eventReceiveHandler( rbusHandle_t rbus_Handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription );
#endif
@@ -73,7 +105,7 @@ void subscribeRBUSevent()
int subscribeCurrentActiveInterfaceEvent()
{
int rc = RBUS_ERROR_SUCCESS;
ParodusPrint("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
ParodusInfo("Subscribing to Device.X_RDK_WanManager.CurrentActiveInterface Event\n");
rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBPA_INTERFACE,eventReceiveHandler,subscribeAsyncHandler,"parodusInterface",10*20);
if(rc != RBUS_ERROR_SUCCESS)
{

View File

@@ -376,10 +376,9 @@ void* processXmidtUpstreamMsg()
{
XmidtMsg *Data = xmidtQ;
pthread_mutex_unlock (&xmidt_mut);
ParodusPrint("mutex unlock in xmidt consumer thread\n");
checkMsgExpiry();
checkMaxQandOptimize();
ParodusPrint("mutex unlock in xmidt consumer\n");
checkMsgExpiry(xmidtQ);
checkMaxQandOptimize(xmidtQ);
cv = 0;
ParodusPrint("check state\n");
@@ -1533,16 +1532,16 @@ int deleteFromXmidtQ(XmidtMsg **next_node)
}
//check if message is expired based on each qos and set to delete state.
void checkMsgExpiry()
void checkMsgExpiry(XmidtMsg *xmdMsg)
{
long long currTime = 0;
struct timespec ts;
char *errorMsg = NULL;
XmidtMsg *temp = NULL;
temp = get_global_xmidthead();
temp = xmdMsg;
while(temp != NULL)
if(temp != NULL)
{
getCurrentTime(&ts);
currTime= (long long)ts.tv_sec;
@@ -1551,8 +1550,7 @@ void checkMsgExpiry()
if(temp->state == DELETE)
{
ParodusPrint("msg is already in DELETE state and about to delete, skipping state update. transid %s\n", tempMsg->u.event.transaction_uuid);
temp = temp->next;
continue;
return;
}
if(tempMsg->u.event.qos > 74)
@@ -1611,12 +1609,11 @@ void checkMsgExpiry()
{
ParodusError("Invalid qos\n");
}
temp = temp->next;
}
}
//To delete low qos messages from queue when max queue limit is reached.
void checkMaxQandOptimize()
void checkMaxQandOptimize(XmidtMsg *xmdMsg)
{
int qos = 0;
@@ -1627,10 +1624,10 @@ void checkMaxQandOptimize()
//Traverse through XmidtMsgQ list and set low qos msgs to DELETE
XmidtMsg *temp = NULL;
temp = get_global_xmidthead();
temp = xmdMsg;
while(temp != NULL)
{
if (temp != NULL)
{
wrp_msg_t * tempMsg = temp->msg;
qos = tempMsg->u.event.qos;
ParodusPrint("qos is %d\n", qos);
@@ -1640,15 +1637,22 @@ void checkMaxQandOptimize()
}
else
{
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
//Skip max queue callback when msg is already in DELETE state.
if( temp->state == DELETE)
{
ParodusInfo("Msg is in DELETE state, skipped Max Queue size callback %s\n", tempMsg->u.event.transaction_uuid);
}
else
{
ParodusInfo("Max Queue size reached. Low qos %d, set to DELETE state\n", qos);
//rbus callback to caller
char *errorMsg = NULL;
mapXmidtStatusToStatusMessage(QUEUE_OPTIMIZED, &errorMsg);
ParodusPrint("statusMsg is %s\n",errorMsg);
createOutParamsandSendAck(temp->msg, temp->asyncHandle, errorMsg, QUEUE_OPTIMIZED, NULL, RBUS_ERROR_INVALID_RESPONSE_FROM_DESTINATION);
updateXmidtState(temp, DELETE);
}
}
temp = temp->next;
}
}
}

View File

@@ -114,8 +114,8 @@ void print_xmidMsg_list();
int deleteCloudACKNode(char* trans_id);
int deleteFromXmidtQ(XmidtMsg **next_node);
int checkCloudConn();
void checkMaxQandOptimize();
void checkMsgExpiry();
void checkMaxQandOptimize(XmidtMsg *xmdMsg);
void checkMsgExpiry(XmidtMsg *xmdMsg);
void mapXmidtStatusToStatusMessage(int status, char **message);
int xmidtQOptmize();
#ifdef __cplusplus