diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index beb50d9..e8d4928 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -102,10 +102,17 @@ public: * belonging to the provided partition list and calls reset_state(). * To be used with static consumers. * - * \param partitions Revoked topic partitions. If the partition list is empty - * all partitions will be revoked. + * \param partitions Revoked topic partitions. */ - virtual void revoke(const TopicPartitionList& partitions = {}); + virtual void revoke(const TopicPartitionList& partitions); + + /** + * \brief Removes all partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It removes all the queues + * currently assigned and calls reset_state(). To be used with static consumers. + */ + virtual void revoke(); protected: /** diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 586581f..15f75d8 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -99,23 +99,17 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) { } void PollStrategyBase::revoke(const TopicPartitionList& partitions) { - if (partitions.empty()) { - //revoke everything - partition_queues_.clear(); - } - else { - for (const auto &partition : partitions) { - // get the queue associated with this partition - auto toppar_it = partition_queues_.find(partition); - if (toppar_it != partition_queues_.end()) { - // remove this queue from the list - partition_queues_.erase(toppar_it); - } - } + for (const auto &partition : partitions) { + partition_queues_.erase(partition); } reset_state(); } +void PollStrategyBase::revoke() { + partition_queues_.clear(); + reset_state(); +} + void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { assign(partitions); // call original consumer callback if any