mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2026-01-27 02:22:36 +00:00
Fix for failover issue.
When the consumer enters the group and gets no assignment (for ex. there is not enough partitions in the topic), librdkafka waits for the rebalancing sequence to be finished by calling assign with the empty list of partitions (just as was passed by librdkafka to rebalance callback). But cppkafka instead pass nullptr instead of an empty list (which means unassign). And consumer stuck forever in that state, not being able to pick the partition during the next rebalance (failover), because the previous rebalance sequence was not finished. Fixes https://github.com/mfontanini/cppkafka/issues/273 , https://github.com/ClickHouse/ClickHouse/issues/21118 , etc.
This commit is contained in:
@@ -124,15 +124,9 @@ void Consumer::unsubscribe() {
|
||||
|
||||
void Consumer::assign(const TopicPartitionList& topic_partitions) {
|
||||
rd_kafka_resp_err_t error;
|
||||
if (topic_partitions.empty()) {
|
||||
error = rd_kafka_assign(get_handle(), nullptr);
|
||||
check_error(error);
|
||||
}
|
||||
else {
|
||||
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
|
||||
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
|
||||
check_error(error, topic_list_handle.get());
|
||||
}
|
||||
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
|
||||
error = rd_kafka_assign(get_handle(), topic_list_handle.get());
|
||||
check_error(error, topic_list_handle.get());
|
||||
}
|
||||
|
||||
void Consumer::unassign() {
|
||||
|
||||
Reference in New Issue
Block a user