diff --git a/include/cppkafka/utils/roundrobin_poll_adapter.h b/include/cppkafka/utils/roundrobin_poll_adapter.h index dee42b1..1988328 100644 --- a/include/cppkafka/utils/roundrobin_poll_adapter.h +++ b/include/cppkafka/utils/roundrobin_poll_adapter.h @@ -30,7 +30,8 @@ #ifndef CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H #define CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H -#include +#include +#include #include "../exceptions.h" #include "../consumer.h" #include "../queue.h" @@ -173,12 +174,25 @@ public: */ MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + /** + * \brief Gets the number of assigned partitions that can be polled across all topics + * + * \return The number of partitions + */ + size_t get_num_partitions(); + private: class CircularBuffer { - using qlist = std::list; - using qiter = qlist::iterator; public: - qlist& ref() { return queues_; } + // typedefs + using toppar_t = std::pair; // + using qmap_t = std::map; + using qiter_t = qmap_t::iterator; + + qmap_t& ref() { + return queues_; + } + Queue& next() { if (queues_.empty()) { throw QueueException(RD_KAFKA_RESP_ERR__STATE); @@ -186,12 +200,13 @@ private: if (++iter_ == queues_.end()) { iter_ = queues_.begin(); } - return *iter_; + return iter_->second; } + void rewind() { iter_ = queues_.begin(); } private: - qlist queues_; - qiter iter_ = queues_.begin(); + qmap_t queues_; + qiter_t iter_ = queues_.begin(); }; void on_assignment(TopicPartitionList& partitions); diff --git a/src/utils/roundrobin_poll_adapter.cpp b/src/utils/roundrobin_poll_adapter.cpp index 83df923..64bb3fa 100644 --- a/src/utils/roundrobin_poll_adapter.cpp +++ b/src/utils/roundrobin_poll_adapter.cpp @@ -26,9 +26,11 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - + #include "utils/roundrobin_poll_adapter.h" +using std::string; +using std::make_pair; using std::chrono::milliseconds; using std::make_move_iterator; @@ -123,11 +125,19 @@ MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, millisecond return messages; } +size_t RoundRobinPollAdapter::get_num_partitions() { + return partition_queues_.ref().size(); +} + void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) { - //populate partition queues + // populate partition queues for (const auto& partition : partitions) { - partition_queues_.ref().push_back(consumer_.get_partition_queue(partition)); + // get the queue associated with this partition + CircularBuffer::toppar_t key = make_pair(partition.get_topic(), partition.get_partition()); + partition_queues_.ref().emplace(key, consumer_.get_partition_queue(partition)); } + // reset the queue iterator + partition_queues_.rewind(); // call original consumer callback if any if (assignment_callback_) { assignment_callback_(partitions); @@ -135,10 +145,17 @@ void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) { } void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) { - // put all partitions queues back to their initial state - restore_forwarding(); - // empty the circular queue list - partition_queues_.ref().clear(); + for (const auto& partition : partitions) { + // get the queue associated with this partition + CircularBuffer::toppar_t key = make_pair(partition.get_topic(), partition.get_partition()); + auto qit = partition_queues_.ref().find(key); + if (qit != partition_queues_.ref().end()) { + // restore forwarding on this queue + qit->second.forward_to_queue(consumer_queue_); + // remove this queue from the list + partition_queues_.ref().erase(qit); + } + } // reset the queue iterator partition_queues_.rewind(); // call original consumer callback if any @@ -148,6 +165,7 @@ void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) } void RoundRobinPollAdapter::on_rebalance_error(Error error) { + // Todo : clear partition queues ? // call original consumer callback if any if (rebalance_error_callback_) { rebalance_error_callback_(error); @@ -156,8 +174,8 @@ void RoundRobinPollAdapter::on_rebalance_error(Error error) { void RoundRobinPollAdapter::restore_forwarding() { // forward all partition queues - for (const auto& queue : partition_queues_.ref()) { - queue.forward_to_queue(consumer_queue_); + for (const auto& toppar_queue : partition_queues_.ref()) { + toppar_queue.second.forward_to_queue(consumer_queue_); } }