diff --git a/src/conn_interface.c b/src/conn_interface.c index ff868fe..be67af7 100644 --- a/src/conn_interface.c +++ b/src/conn_interface.c @@ -113,7 +113,7 @@ void createSocketConnection(void (* initKeypress)()) StartThread(handle_upstream, &upstream_tid); StartThread(processUpstreamMessage, &upstream_msg_tid); #ifdef ENABLE_WEBCFGBIN - registerRBUSlistener(); + subscribeRBUSevent(); #endif ParodusMsgQ = NULL; StartThread(messageHandlerTask, &downstream_tid); diff --git a/src/upstream_rbus.c b/src/upstream_rbus.c index 69d19d0..a3cf009 100644 --- a/src/upstream_rbus.c +++ b/src/upstream_rbus.c @@ -10,32 +10,47 @@ #include "ParodusInternal.h" #include "partners_check.h" -void processWebconfigUpstreamMessage(rbusHandle_t handle, rbusMessage_t* msg, void * userData); +#define WEBCFG_UPSTREAM_EVENT "Webconfig.Upstream" + +void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription); + +void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error); /* API to register RBUS listener to receive messages from webconfig */ -void registerRBUSlistener() +void subscribeRBUSevent() { rbusError_t err; + int rc = RBUS_ERROR_SUCCESS; rbusHandle_t rbus_Handle; err = rbus_open(&rbus_Handle, "parodus"); if (err) { - ParodusError("rbus_open:%s\n", rbusError_ToString(err)); + ParodusError("rbus_open failed :%s\n", rbusError_ToString(err)); return; } - ParodusInfo("B4 rbusMessage_AddListener\n"); - rbusMessage_AddListener(rbus_Handle, "webconfig.upstream", &processWebconfigUpstreamMessage, NULL); - ParodusInfo("After rbusMessage_AddListener\n"); + rc = rbusEvent_SubscribeAsync(rbus_Handle,WEBCFG_UPSTREAM_EVENT,processWebconfigUpstreamEvent,subscribeAsyncHandler,"parodus",10*60); + if(rc != RBUS_ERROR_SUCCESS) + ParodusError("rbusEvent_Subscribe failed: %d, %s\n", rc, rbusError_ToString(rc)); + else + ParodusInfo("rbusEvent_Subscribe was successful\n"); } -void processWebconfigUpstreamMessage(rbusHandle_t handle, rbusMessage_t* msg, void * userData) +void processWebconfigUpstreamEvent(rbusHandle_t handle, rbusEvent_t const* event, rbusEventSubscription_t* subscription) { - int rv=-1; + (void)handle; + (void)subscription; + + int rv=-1; wrp_msg_t *event_msg; void *bytes; + int len; + rbusValue_t value = NULL; - rv = wrp_to_struct( msg->data, msg->length, WRP_BYTES, &event_msg ); + value = rbusObject_GetValue(event->data, "value"); + bytes = rbusValue_GetBytes(value, &len); + + rv = wrp_to_struct( bytes, len, WRP_BYTES, &event_msg ); if(rv > 0) { ParodusInfo(" Received upstream event data: dest '%s'\n", event_msg->u.event.dest); @@ -67,7 +82,7 @@ void processWebconfigUpstreamMessage(rbusHandle_t handle, rbusMessage_t* msg, vo } else { - sendUpstreamMsgToServer((void **)(&msg->data), msg->length); + sendUpstreamMsgToServer((void **)(&bytes), len); } if(partnersList != NULL) { @@ -84,4 +99,7 @@ void processWebconfigUpstreamMessage(rbusHandle_t handle, rbusMessage_t* msg, vo } } - +void subscribeAsyncHandler( rbusHandle_t handle, rbusEventSubscription_t* subscription, rbusError_t error) +{ + ParodusInfo("subscribeAsyncHandler event %s, error %d - %s\n",subscription->eventName, error, rbusError_ToString(error)); +}