diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 2188c91..2c189d5 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -340,6 +340,30 @@ public: */ int get_out_queue_length() const; +#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION + /** + * \brief Sets flags for rd_kafka_destroy_flags() + * + * 0 (default) - calls consumer_close() during handle destruction + * to leave group and commit final offsets. + * + * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close() + * + * Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651 + * + * With default value some termination sequences can lead to hang + * during destruction, see: https://github.com/edenhill/librdkafka/issues/2077 + * + */ + void set_destroy_flags(int destroy_flags); + + /** + * \brief Returns destroy_flags + * + */ + int get_destroy_flags() const; + +#endif /** * \brief Cancels the current callback dispatcher * @@ -357,7 +381,20 @@ protected: private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; - using HandlePtr = std::unique_ptr; + // It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default. + // + // All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction. + // So we don't want it to be called once again during handle destruction. + int destroy_flags_ = 0; + + struct handle_deleter { + handle_deleter(const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {} + void operator()(rd_kafka_t* handle); + private: + const KafkaHandleBase * handle_base_ptr_; + }; + + using HandlePtr = std::unique_ptr; using TopicConfigurationMap = std::unordered_map; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); diff --git a/include/cppkafka/macros.h b/include/cppkafka/macros.h index 6baebf0..9037994 100644 --- a/include/cppkafka/macros.h +++ b/include/cppkafka/macros.h @@ -51,5 +51,6 @@ #define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00 #define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02 #define RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION 0x00090501 //v0.9.5.01 +#define RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION 0x000b0600 //v0.11.6 #endif // CPPKAFKA_MACROS_H diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index a4196e7..b2a63d4 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -48,7 +48,7 @@ namespace cppkafka { const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; KafkaHandleBase::KafkaHandleBase(Configuration config) -: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, nullptr) { +: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, handle_deleter(this)) { auto& maybe_config = config_.get_default_topic_configuration(); if (maybe_config) { maybe_config->set_as_opaque(); @@ -213,7 +213,7 @@ void KafkaHandleBase::yield() const { } void KafkaHandleBase::set_handle(rd_kafka_t* handle) { - handle_ = HandlePtr(handle, &rd_kafka_destroy); + handle_ = HandlePtr(handle, handle_deleter(this)); } Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) { @@ -285,4 +285,25 @@ rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() { return config_.get_handle(); } +#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION + +void KafkaHandleBase::set_destroy_flags(int destroy_flags) { + destroy_flags_ = destroy_flags; +}; + +int KafkaHandleBase::get_destroy_flags() const { + return destroy_flags_; +}; + +#endif + + +void KafkaHandleBase::handle_deleter::operator()(rd_kafka_t* handle) { +#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION + rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags()); +#else + rd_kafka_destroy(handle); +#endif +} + } // cppkafka