Merge pull request #238 from accelerated/poll_strategy

Added member functions for static consumers
This commit is contained in:
Matias Fontanini
2020-04-08 07:14:58 -07:00
committed by GitHub
2 changed files with 52 additions and 14 deletions

View File

@@ -84,6 +84,36 @@ public:
*/ */
Consumer& get_consumer() final; Consumer& get_consumer() final;
/**
* \brief Creates partitions queues associated with the supplied partitions.
*
* This method contains a default implementation. It adds all the new queues belonging
* to the provided partition list and calls reset_state().
* To be used with static consumers.
*
* \param partitions Assigned topic partitions.
*/
virtual void assign(TopicPartitionList& partitions);
/**
* \brief Removes partitions queues associated with the supplied partitions.
*
* This method contains a default implementation. It removes all the queues
* belonging to the provided partition list and calls reset_state().
* To be used with static consumers.
*
* \param partitions Revoked topic partitions.
*/
virtual void revoke(const TopicPartitionList& partitions);
/**
* \brief Removes all partitions queues associated with the supplied partitions.
*
* This method contains a default implementation. It removes all the queues
* currently assigned and calls reset_state(). To be used with static consumers.
*/
virtual void revoke();
protected: protected:
/** /**
* \brief Get the queues from all assigned partitions * \brief Get the queues from all assigned partitions
@@ -111,8 +141,8 @@ protected:
/** /**
* \brief Function to be called when a new partition assignment takes place * \brief Function to be called when a new partition assignment takes place
* *
* This method contains a default implementation. It adds all the new queues belonging * This method contains a default implementation. It calls assign()
* to the provided partition list and calls reset_state(). * and invokes the user assignment callback.
* *
* \param partitions Assigned topic partitions * \param partitions Assigned topic partitions
*/ */
@@ -121,8 +151,8 @@ protected:
/** /**
* \brief Function to be called when an old partition assignment gets revoked * \brief Function to be called when an old partition assignment gets revoked
* *
* This method contains a default implementation. It removes all the queues * This method contains a default implementation. It calls revoke()
* belonging to the provided partition list and calls reset_state(). * and invokes the user revocation callback.
* *
* \param partitions Revoked topic partitions * \param partitions Revoked topic partitions
*/ */

View File

@@ -89,13 +89,29 @@ void PollStrategyBase::reset_state() {
} }
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) { void PollStrategyBase::assign(TopicPartitionList& partitions) {
// populate partition queues // populate partition queues
for (const auto& partition : partitions) { for (const auto& partition : partitions) {
// get the queue associated with this partition // get the queue associated with this partition
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()}); partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
} }
reset_state(); reset_state();
}
void PollStrategyBase::revoke(const TopicPartitionList& partitions) {
for (const auto &partition : partitions) {
partition_queues_.erase(partition);
}
reset_state();
}
void PollStrategyBase::revoke() {
partition_queues_.clear();
reset_state();
}
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
assign(partitions);
// call original consumer callback if any // call original consumer callback if any
if (assignment_callback_) { if (assignment_callback_) {
assignment_callback_(partitions); assignment_callback_(partitions);
@@ -103,15 +119,7 @@ void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
} }
void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) { void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) {
for (const auto& partition : partitions) { revoke(partitions);
// get the queue associated with this partition
auto toppar_it = partition_queues_.find(partition);
if (toppar_it != partition_queues_.end()) {
// remove this queue from the list
partition_queues_.erase(toppar_it);
}
}
reset_state();
// call original consumer callback if any // call original consumer callback if any
if (revocation_callback_) { if (revocation_callback_) {
revocation_callback_(partitions); revocation_callback_(partitions);