mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 19:18:04 +00:00
Style changes
This commit is contained in:
@@ -343,23 +343,11 @@ public:
|
|||||||
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
|
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
|
||||||
/**
|
/**
|
||||||
* \brief Sets flags for rd_kafka_destroy_flags()
|
* \brief Sets flags for rd_kafka_destroy_flags()
|
||||||
*
|
|
||||||
* 0 (default) - calls consumer_close() during handle destruction
|
|
||||||
* to leave group and commit final offsets.
|
|
||||||
*
|
|
||||||
* RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close()
|
|
||||||
*
|
|
||||||
* Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651
|
|
||||||
*
|
|
||||||
* With default value some termination sequences can lead to hang
|
|
||||||
* during destruction, see: https://github.com/edenhill/librdkafka/issues/2077
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
void set_destroy_flags(int destroy_flags);
|
void set_destroy_flags(int destroy_flags);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Returns destroy_flags
|
* \brief Returns flags for rd_kafka_destroy_flags()
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
int get_destroy_flags() const;
|
int get_destroy_flags() const;
|
||||||
|
|
||||||
@@ -381,20 +369,14 @@ protected:
|
|||||||
private:
|
private:
|
||||||
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
|
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
|
||||||
|
|
||||||
// It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default.
|
struct HandleDeleter {
|
||||||
//
|
explicit HandleDeleter(const KafkaHandleBase* handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
|
||||||
// All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction.
|
|
||||||
// So we don't want it to be called once again during handle destruction.
|
|
||||||
int destroy_flags_ = 0;
|
|
||||||
|
|
||||||
struct handle_deleter {
|
|
||||||
handle_deleter(const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
|
|
||||||
void operator()(rd_kafka_t* handle);
|
void operator()(rd_kafka_t* handle);
|
||||||
private:
|
private:
|
||||||
const KafkaHandleBase * handle_base_ptr_;
|
const KafkaHandleBase * handle_base_ptr_;
|
||||||
};
|
};
|
||||||
|
|
||||||
using HandlePtr = std::unique_ptr<rd_kafka_t, handle_deleter>;
|
using HandlePtr = std::unique_ptr<rd_kafka_t, HandleDeleter>;
|
||||||
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
|
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
|
||||||
|
|
||||||
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
|
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
|
||||||
@@ -410,6 +392,7 @@ private:
|
|||||||
TopicConfigurationMap topic_configurations_;
|
TopicConfigurationMap topic_configurations_;
|
||||||
std::mutex topic_configurations_mutex_;
|
std::mutex topic_configurations_mutex_;
|
||||||
HandlePtr handle_;
|
HandlePtr handle_;
|
||||||
|
int destroy_flags_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ namespace cppkafka {
|
|||||||
const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
|
const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
|
||||||
|
|
||||||
KafkaHandleBase::KafkaHandleBase(Configuration config)
|
KafkaHandleBase::KafkaHandleBase(Configuration config)
|
||||||
: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, handle_deleter(this)) {
|
: timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)), handle_(nullptr, HandleDeleter(this)), destroy_flags_(0) {
|
||||||
auto& maybe_config = config_.get_default_topic_configuration();
|
auto& maybe_config = config_.get_default_topic_configuration();
|
||||||
if (maybe_config) {
|
if (maybe_config) {
|
||||||
maybe_config->set_as_opaque();
|
maybe_config->set_as_opaque();
|
||||||
@@ -213,7 +213,7 @@ void KafkaHandleBase::yield() const {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
||||||
handle_ = HandlePtr(handle, handle_deleter(this));
|
handle_ = HandlePtr(handle, HandleDeleter(this));
|
||||||
}
|
}
|
||||||
|
|
||||||
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
|
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
|
||||||
@@ -298,7 +298,7 @@ int KafkaHandleBase::get_destroy_flags() const {
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
void KafkaHandleBase::handle_deleter::operator()(rd_kafka_t* handle) {
|
void KafkaHandleBase::HandleDeleter::operator()(rd_kafka_t* handle) {
|
||||||
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
|
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
|
||||||
rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());
|
rd_kafka_destroy_flags(handle, handle_base_ptr_->get_destroy_flags());
|
||||||
#else
|
#else
|
||||||
|
|||||||
Reference in New Issue
Block a user