diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 910cebb..c97f5a8 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -149,6 +149,11 @@ public: * Sets the background event callback (invokes rd_kafka_conf_set_background_event_cb) */ Configuration& set_background_event_callback(BackgroundEventCallback callback); + + /** + * Sets the event mask (invokes rd_kafka_conf_set_events) + */ + Configuration& set_events(int events); #endif /** diff --git a/include/cppkafka/macros.h b/include/cppkafka/macros.h index 460b414..aa07a97 100644 --- a/include/cppkafka/macros.h +++ b/include/cppkafka/macros.h @@ -49,5 +49,6 @@ #define RD_KAFKA_ADMIN_API_SUPPORT_VERSION 0x000b0500 //v0.11.5.00 #define RD_KAFKA_MESSAGE_LATENCY_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 #define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 +#define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x000b06ff //v0.11.6 #endif // CPPKAFKA_MACROS_H diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 2d82d4b..f85efa8 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -187,6 +187,13 @@ public: } #endif +#if (RD_KAFKA_VERSION >= RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION) + rd_kafka_msg_status_t get_status() const { + assert(handle_); + return rd_kafka_message_status(handle_.get()); + } +#endif + /** * \brief Indicates whether this message is valid (not null) */ diff --git a/src/configuration.cpp b/src/configuration.cpp index fbb3d3a..5a59c51 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -190,6 +190,11 @@ Configuration& Configuration::set_background_event_callback(BackgroundEventCallb rd_kafka_conf_set_background_event_cb(handle_.get(), &background_event_callback_proxy); return *this; } + +Configuration& Configuration::set_events(int events) { + rd_kafka_conf_set_events(handle_.get(), events); + return *this; +} #endif Configuration&