From dabb2d3aa86b8793cf31241e6a347f4b97894f4b Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 1 Jun 2021 23:36:25 +0200 Subject: [PATCH] Fix for failover issue. When the consumer enters the group and gets no assignment (for ex. there is not enough partitions in the topic), librdkafka waits for the rebalancing sequence to be finished by calling assign with the empty list of partitions (just as was passed by librdkafka to rebalance callback). But cppkafka instead pass nullptr instead of an empty list (which means unassign). And consumer stuck forever in that state, not being able to pick the partition during the next rebalance (failover), because the previous rebalance sequence was not finished. Fixes https://github.com/mfontanini/cppkafka/issues/273 , https://github.com/ClickHouse/ClickHouse/issues/21118 , etc. --- src/consumer.cpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/consumer.cpp b/src/consumer.cpp index 5522ca9..38f156a 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -124,15 +124,9 @@ void Consumer::unsubscribe() { void Consumer::assign(const TopicPartitionList& topic_partitions) { rd_kafka_resp_err_t error; - if (topic_partitions.empty()) { - error = rd_kafka_assign(get_handle(), nullptr); - check_error(error); - } - else { - TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); - error = rd_kafka_assign(get_handle(), topic_list_handle.get()); - check_error(error, topic_list_handle.get()); - } + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + error = rd_kafka_assign(get_handle(), topic_list_handle.get()); + check_error(error, topic_list_handle.get()); } void Consumer::unassign() {