diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index bddc0d2..910cebb 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -42,6 +42,7 @@ #include "clonable_ptr.h" #include "configuration_base.h" #include "macros.h" +#include "event.h" namespace cppkafka { @@ -78,6 +79,7 @@ public: const std::string& message)>; using StatsCallback = std::function; using SocketCallback = std::function; + using BackgroundEventCallback = std::function; using ConfigurationBase::set; using ConfigurationBase::get; @@ -142,6 +144,13 @@ public: */ Configuration& set_socket_callback(SocketCallback callback); +#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION + /** + * Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb) + */ + Configuration& set_background_event_callback(BackgroundEventCallback callback); +#endif + /** * Sets the default topic configuration */ @@ -204,6 +213,11 @@ public: */ const SocketCallback& get_socket_callback() const; + /** + * Gets the background event callback + */ + const BackgroundEventCallback& get_background_event_callback() const; + /** * Gets the default topic configuration */ @@ -229,6 +243,7 @@ private: LogCallback log_callback_; StatsCallback stats_callback_; SocketCallback socket_callback_; + BackgroundEventCallback background_event_callback_; }; } // cppkafka diff --git a/src/configuration.cpp b/src/configuration.cpp index 061adc7..fbb3d3a 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -74,7 +74,7 @@ void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { KafkaHandleBase* handle = static_cast(opaque); - CallbackInvoker + CallbackInvoker ("throttle", handle->get_configuration().get_throttle_callback(), handle) (*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); } @@ -102,6 +102,13 @@ int socket_callback_proxy(int domain, int type, int protocol, void* opaque) { (domain, type, protocol); } +void background_event_callback_proxy(rd_kafka_t*, rd_kafka_event_t* event_ptr, void *opaque) { + KafkaHandleBase* handle = static_cast(opaque); + CallbackInvoker + ("background_event", handle->get_configuration().get_background_event_callback(), handle) + (*handle, Event{event_ptr}); +} + // Configuration Configuration::Configuration() @@ -177,6 +184,14 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) { return *this; } +#if RD_KAFKA_VERSION >= RD_KAFKA_ADMIN_API_SUPPORT_VERSION +Configuration& Configuration::set_background_event_callback(BackgroundEventCallback callback) { + background_event_callback_ = move(callback); + rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy); + return *this; +} +#endif + Configuration& Configuration::set_default_topic_configuration(TopicConfiguration config) { default_topic_config_ = std::move(config); @@ -239,6 +254,11 @@ const Configuration::SocketCallback& Configuration::get_socket_callback() const return socket_callback_; } +const Configuration::BackgroundEventCallback& +Configuration::get_background_event_callback() const { + return background_event_callback_; +} + const optional& Configuration::get_default_topic_configuration() const { return default_topic_config_; }