Add support for rd_kafka_destroy_flags.

This commit is contained in:
Mikhail Filimonov
2020-05-21 12:32:24 +02:00
parent 006642cdb2
commit 3b67ba072a
3 changed files with 62 additions and 3 deletions

View File

@@ -340,6 +340,30 @@ public:
*/
int get_out_queue_length() const;
#if RD_KAFKA_VERSION >= RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION
/**
* \brief Sets flags for rd_kafka_destroy_flags()
*
* 0 (default) - calls consumer_close() during handle destruction
* to leave group and commit final offsets.
*
* RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE - don't call consumer_close()
*
* Details: see https://github.com/edenhill/librdkafka/blob/8fc678518738c6b80380326dad86ef62228f87f0/src/rdkafka.h#L2612-L2651
*
* With default value some termination sequences can lead to hang
* during destruction, see: https://github.com/edenhill/librdkafka/issues/2077
*
*/
void set_destroy_flags(int destroy_flags);
/**
* \brief Returns destroy_flags
*
*/
int get_destroy_flags() const;
#endif
/**
* \brief Cancels the current callback dispatcher
*
@@ -357,7 +381,20 @@ protected:
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
// It seems that destroy_flags maybe RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE by default.
//
// All the consumer objects created by cppkafka call rd_kafka_consumer_close during destruction.
// So we don't want it to be called once again during handle destruction.
int destroy_flags_ = 0;
struct handle_deleter {
handle_deleter(const KafkaHandleBase * handle_base_ptr) : handle_base_ptr_{handle_base_ptr} {}
void operator()(rd_kafka_t* handle);
private:
const KafkaHandleBase * handle_base_ptr_;
};
using HandlePtr = std::unique_ptr<rd_kafka_t, handle_deleter>;
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);

View File

@@ -51,5 +51,6 @@
#define RD_KAFKA_EVENT_STATS_SUPPORT_VERSION 0x000b0000 //v0.11.0.00
#define RD_KAFKA_MESSAGE_STATUS_SUPPORT_VERSION 0x01000002 //v1.0.0.02
#define RD_KAFKA_STORE_OFFSETS_SUPPORT_VERSION 0x00090501 //v0.9.5.01
#define RD_KAFKA_DESTROY_FLAGS_SUPPORT_VERSION 0x000b0600 //v0.11.6
#endif // CPPKAFKA_MACROS_H