diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index 6a13946..e8d4928 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -84,6 +84,36 @@ public: */ Consumer& get_consumer() final; + /** + * \brief Creates partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It adds all the new queues belonging + * to the provided partition list and calls reset_state(). + * To be used with static consumers. + * + * \param partitions Assigned topic partitions. + */ + virtual void assign(TopicPartitionList& partitions); + + /** + * \brief Removes partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It removes all the queues + * belonging to the provided partition list and calls reset_state(). + * To be used with static consumers. + * + * \param partitions Revoked topic partitions. + */ + virtual void revoke(const TopicPartitionList& partitions); + + /** + * \brief Removes all partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It removes all the queues + * currently assigned and calls reset_state(). To be used with static consumers. + */ + virtual void revoke(); + protected: /** * \brief Get the queues from all assigned partitions @@ -111,8 +141,8 @@ protected: /** * \brief Function to be called when a new partition assignment takes place * - * This method contains a default implementation. It adds all the new queues belonging - * to the provided partition list and calls reset_state(). + * This method contains a default implementation. It calls assign() + * and invokes the user assignment callback. * * \param partitions Assigned topic partitions */ @@ -121,8 +151,8 @@ protected: /** * \brief Function to be called when an old partition assignment gets revoked * - * This method contains a default implementation. It removes all the queues - * belonging to the provided partition list and calls reset_state(). + * This method contains a default implementation. It calls revoke() + * and invokes the user revocation callback. * * \param partitions Revoked topic partitions */ diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 910ae6f..15f75d8 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -89,13 +89,29 @@ void PollStrategyBase::reset_state() { } -void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { +void PollStrategyBase::assign(TopicPartitionList& partitions) { // populate partition queues for (const auto& partition : partitions) { // get the queue associated with this partition partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); } reset_state(); +} + +void PollStrategyBase::revoke(const TopicPartitionList& partitions) { + for (const auto &partition : partitions) { + partition_queues_.erase(partition); + } + reset_state(); +} + +void PollStrategyBase::revoke() { + partition_queues_.clear(); + reset_state(); +} + +void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { + assign(partitions); // call original consumer callback if any if (assignment_callback_) { assignment_callback_(partitions); @@ -103,15 +119,7 @@ void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { } void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) { - for (const auto& partition : partitions) { - // get the queue associated with this partition - auto toppar_it = partition_queues_.find(partition); - if (toppar_it != partition_queues_.end()) { - // remove this queue from the list - partition_queues_.erase(toppar_it); - } - } - reset_state(); + revoke(partitions); // call original consumer callback if any if (revocation_callback_) { revocation_callback_(partitions);