From 244726c2516e27de7a92d99230e5b67aac8f9f82 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 22 May 2020 17:24:19 +0200 Subject: [PATCH] Style changes --- include/cppkafka/kafka_handle_base.h | 27 +++++---------------------- src/kafka_handle_base.cpp | 6 +++--- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 2c189d5..e6095f9 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -343,23 +343,11 @@ public: #if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION /** * \brief Sets flags for rd_kafka_destroy_flags() - * - * 0 (default) - calls consumer_close() during handle destruction - * to leave group and commit final offsets. - * - * RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close() - * - * Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651 - * - * With default value some termination sequences can lead to hang - * during destruction, see: https://github.com/edenhill/librdkafka/issues/2077 - * */ void set_destroy_flags(int destroy_flags); /** - * \brief Returns destroy_flags - * + * \brief Returns flags for rd_kafka_destroy_flags() */ int get_destroy_flags() const; @@ -381,20 +369,14 @@ protected: private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; - // It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default. - // - // All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction. - // So we don't want it to be called once again during handle destruction. - int destroy_flags_ = 0; - - struct handle_deleter { - handle_deleter(const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {} + struct HandleDeleter { + explicit HandleDeleter(const KafkaHandleBase* handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {} void operator()(rd_kafka_t* handle); private: const KafkaHandleBase * handle_base_ptr_; }; - using HandlePtr = std::unique_ptr; + using HandlePtr = std::unique_ptr; using TopicConfigurationMap = std::unordered_map; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); @@ -410,6 +392,7 @@ private: TopicConfigurationMap topic_configurations_; std::mutex topic_configurations_mutex_; HandlePtr handle_; + int destroy_flags_; }; } // cppkafka diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index b2a63d4..3d1dc7f 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -48,7 +48,7 @@ namespace cppkafka { const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; KafkaHandleBase::KafkaHandleBase(Configuration config) -: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, handle_deleter(this)) { +: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, HandleDeleter(this)), destroy_flags_(0) { auto& maybe_config = config_.get_default_topic_configuration(); if (maybe_config) { maybe_config->set_as_opaque(); @@ -213,7 +213,7 @@ void KafkaHandleBase::yield() const { } void KafkaHandleBase::set_handle(rd_kafka_t* handle) { - handle_ = HandlePtr(handle, handle_deleter(this)); + handle_ = HandlePtr(handle, HandleDeleter(this)); } Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) { @@ -298,7 +298,7 @@ int KafkaHandleBase::get_destroy_flags() const { #endif -void KafkaHandleBase::handle_deleter::operator()(rd_kafka_t* handle) { +void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) { #if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags()); #else