diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index a1071e8..56c1a91 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -145,9 +145,10 @@ private: } // Simple RAII wrapper for pausing/resuming + template class Pauser { public: - Pauser(Consumer& consumer, const TopicPartitionList& topic_partitions) + Pauser(C& consumer, const TopicPartitionList& topic_partitions) : consumer_(consumer), topic_partitions_(topic_partitions) { consumer_.pause_partitions(topic_partitions_); } @@ -159,7 +160,7 @@ private: Pauser(const Pauser&) = delete; Pauser& operator=(const Pauser&) = delete; private: - Consumer& consumer_; + C& consumer_; TopicPartitionList topic_partitions_; }; @@ -299,7 +300,7 @@ private: // The callback rejected the message, start throttling if (msg) { // Pause consumption. When the pauser goes off scope, it will resume it - Pauser pauser(consumer_, consumer_.get_assignment()); + Pauser pauser(consumer_, consumer_.get_assignment()); // Handle throttling on this message on_throttle(Throttle{}, callback, std::move(msg));