Allow setting background event callback on configuration handles

This commit is contained in:
Matias Fontanini
2018-10-27 09:46:13 -07:00
parent 19baa03cea
commit b242e2c35c
2 changed files with 36 additions and 1 deletions

View File

@@ -42,6 +42,7 @@
#include "clonable_ptr.h" #include "clonable_ptr.h"
#include "configuration_base.h" #include "configuration_base.h"
#include "macros.h" #include "macros.h"
#include "event.h"
namespace cppkafka { namespace cppkafka {
@@ -78,6 +79,7 @@ public:
const std::string& message)>; const std::string& message)>;
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>; using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
using SocketCallback = std::function<int(int domain, int type, int protocol)>; using SocketCallback = std::function<int(int domain, int type, int protocol)>;
using BackgroundEventCallback = std::function<void(KafkaHandleBase& handle, Event)>;
using ConfigurationBase<Configuration>::set; using ConfigurationBase<Configuration>::set;
using ConfigurationBase<Configuration>::get; using ConfigurationBase<Configuration>::get;
@@ -142,6 +144,13 @@ public:
*/ */
Configuration& set_socket_callback(SocketCallback callback); Configuration& set_socket_callback(SocketCallback callback);
#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
/**
* Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb)
*/
Configuration& set_background_event_callback(BackgroundEventCallback callback);
#endif
/** /**
* Sets the default topic configuration * Sets the default topic configuration
*/ */
@@ -204,6 +213,11 @@ public:
*/ */
const SocketCallback& get_socket_callback() const; const SocketCallback& get_socket_callback() const;
/**
* Gets the background event callback
*/
const BackgroundEventCallback& get_background_event_callback() const;
/** /**
* Gets the default topic configuration * Gets the default topic configuration
*/ */
@@ -229,6 +243,7 @@ private:
LogCallback log_callback_; LogCallback log_callback_;
StatsCallback stats_callback_; StatsCallback stats_callback_;
SocketCallback socket_callback_; SocketCallback socket_callback_;
BackgroundEventCallback background_event_callback_;
}; };
} // cppkafka } // cppkafka

View File

@@ -74,7 +74,7 @@ void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque
void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
int32_t broker_id, int throttle_time_ms, void *opaque) { int32_t broker_id, int throttle_time_ms, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque); KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
CallbackInvoker<Configuration::ThrottleCallback> CallbackInvoker<Configuration::ThrottleCallback>
("throttle", handle->get_configuration().get_throttle_callback(), handle) ("throttle", handle->get_configuration().get_throttle_callback(), handle)
(*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); (*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
} }
@@ -102,6 +102,13 @@ int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
(domain, type, protocol); (domain, type, protocol);
} }
void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
CallbackInvoker<Configuration::BackgroundEventCallback>
("background_event", handle->get_configuration().get_background_event_callback(), handle)
(*handle, Event{event_ptr});
}
// Configuration // Configuration
Configuration::Configuration() Configuration::Configuration()
@@ -177,6 +184,14 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) {
return *this; return *this;
} }
#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION
Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) {
background_event_callback_ = move(callback);
rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy);
return *this;
}
#endif
Configuration& Configuration&
Configuration::set_default_topic_configuration(TopicConfiguration config) { Configuration::set_default_topic_configuration(TopicConfiguration config) {
default_topic_config_ = std::move(config); default_topic_config_ = std::move(config);
@@ -239,6 +254,11 @@ const Configuration::SocketCallback& Configuration::get_socket_callback() const
return socket_callback_; return socket_callback_;
} }
const Configuration::BackgroundEventCallback&
Configuration::get_background_event_callback() const {
return background_event_callback_;
}
const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const { const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const {
return default_topic_config_; return default_topic_config_;
} }