From 68ae525eba5ff6778e7da21ba4567490d96713aa Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Sat, 8 Feb 2020 22:34:45 -0500 Subject: [PATCH 1/2] Added member functions for static consumers --- include/cppkafka/utils/poll_strategy_base.h | 31 ++++++++++++++++--- src/utils/poll_strategy_base.cpp | 34 +++++++++++++++------ 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index 6a13946..beb50d9 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -84,6 +84,29 @@ public: */ Consumer& get_consumer() final; + /** + * \brief Creates partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It adds all the new queues belonging + * to the provided partition list and calls reset_state(). + * To be used with static consumers. + * + * \param partitions Assigned topic partitions. + */ + virtual void assign(TopicPartitionList& partitions); + + /** + * \brief Removes partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It removes all the queues + * belonging to the provided partition list and calls reset_state(). + * To be used with static consumers. + * + * \param partitions Revoked topic partitions. If the partition list is empty + * all partitions will be revoked. + */ + virtual void revoke(const TopicPartitionList& partitions = {}); + protected: /** * \brief Get the queues from all assigned partitions @@ -111,8 +134,8 @@ protected: /** * \brief Function to be called when a new partition assignment takes place * - * This method contains a default implementation. It adds all the new queues belonging - * to the provided partition list and calls reset_state(). + * This method contains a default implementation. It calls assign() + * and invokes the user assignment callback. * * \param partitions Assigned topic partitions */ @@ -121,8 +144,8 @@ protected: /** * \brief Function to be called when an old partition assignment gets revoked * - * This method contains a default implementation. It removes all the queues - * belonging to the provided partition list and calls reset_state(). + * This method contains a default implementation. It calls revoke() + * and invokes the user revocation callback. * * \param partitions Revoked topic partitions */ diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 910ae6f..586581f 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -89,13 +89,35 @@ void PollStrategyBase::reset_state() { } -void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { +void PollStrategyBase::assign(TopicPartitionList& partitions) { // populate partition queues for (const auto& partition : partitions) { // get the queue associated with this partition partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); } reset_state(); +} + +void PollStrategyBase::revoke(const TopicPartitionList& partitions) { + if (partitions.empty()) { + //revoke everything + partition_queues_.clear(); + } + else { + for (const auto &partition : partitions) { + // get the queue associated with this partition + auto toppar_it = partition_queues_.find(partition); + if (toppar_it != partition_queues_.end()) { + // remove this queue from the list + partition_queues_.erase(toppar_it); + } + } + } + reset_state(); +} + +void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { + assign(partitions); // call original consumer callback if any if (assignment_callback_) { assignment_callback_(partitions); @@ -103,15 +125,7 @@ void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { } void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) { - for (const auto& partition : partitions) { - // get the queue associated with this partition - auto toppar_it = partition_queues_.find(partition); - if (toppar_it != partition_queues_.end()) { - // remove this queue from the list - partition_queues_.erase(toppar_it); - } - } - reset_state(); + revoke(partitions); // call original consumer callback if any if (revocation_callback_) { revocation_callback_(partitions); From a4532ed3366193bbc84cc2dc548bc47c4f797c47 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Sun, 9 Feb 2020 21:17:09 -0500 Subject: [PATCH 2/2] Use erase directly Added revoke() member function --- include/cppkafka/utils/poll_strategy_base.h | 13 ++++++++++--- src/utils/poll_strategy_base.cpp | 20 +++++++------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/include/cppkafka/utils/poll_strategy_base.h b/include/cppkafka/utils/poll_strategy_base.h index beb50d9..e8d4928 100644 --- a/include/cppkafka/utils/poll_strategy_base.h +++ b/include/cppkafka/utils/poll_strategy_base.h @@ -102,10 +102,17 @@ public: * belonging to the provided partition list and calls reset_state(). * To be used with static consumers. * - * \param partitions Revoked topic partitions. If the partition list is empty - * all partitions will be revoked. + * \param partitions Revoked topic partitions. */ - virtual void revoke(const TopicPartitionList& partitions = {}); + virtual void revoke(const TopicPartitionList& partitions); + + /** + * \brief Removes all partitions queues associated with the supplied partitions. + * + * This method contains a default implementation. It removes all the queues + * currently assigned and calls reset_state(). To be used with static consumers. + */ + virtual void revoke(); protected: /** diff --git a/src/utils/poll_strategy_base.cpp b/src/utils/poll_strategy_base.cpp index 586581f..15f75d8 100644 --- a/src/utils/poll_strategy_base.cpp +++ b/src/utils/poll_strategy_base.cpp @@ -99,23 +99,17 @@ void PollStrategyBase::assign(TopicPartitionList& partitions) { } void PollStrategyBase::revoke(const TopicPartitionList& partitions) { - if (partitions.empty()) { - //revoke everything - partition_queues_.clear(); - } - else { - for (const auto &partition : partitions) { - // get the queue associated with this partition - auto toppar_it = partition_queues_.find(partition); - if (toppar_it != partition_queues_.end()) { - // remove this queue from the list - partition_queues_.erase(toppar_it); - } - } + for (const auto &partition : partitions) { + partition_queues_.erase(partition); } reset_state(); } +void PollStrategyBase::revoke() { + partition_queues_.clear(); + reset_state(); +} + void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { assign(partitions); // call original consumer callback if any