Changed store_offsets() to use the actual position from the assignment

This commit is contained in:
accelerated
2019-02-04 14:08:40 -05:00
parent 7bc03185a8
commit 284e1c57a9
2 changed files with 3 additions and 7 deletions

View File

@@ -295,8 +295,8 @@ public:
/** /**
* \brief Stores the offsets on the currently assigned topic/partitions (legacy). * \brief Stores the offsets on the currently assigned topic/partitions (legacy).
* *
* This translates into a call to rd_kafka_offsets_store with the current partition assignment. * This translates into a call to rd_kafka_offsets_store with the offsets prior to the current assignment positions.
* It is equivalent to calling rd_kafka_offsets_store(get_assignment()). * It is equivalent to calling rd_kafka_offsets_store(get_offsets_position(get_assignment())).
* *
* \note When using this API it's recommended to set enable.auto.offset.store to false. * \note When using this API it's recommended to set enable.auto.offset.store to false.
*/ */

View File

@@ -202,11 +202,7 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const
void Consumer::store_offsets() const void Consumer::store_offsets() const
{ {
rd_kafka_topic_partition_list_t* list = nullptr; store_offsets(get_offsets_position(get_assignment()));
rd_kafka_resp_err_t error = rd_kafka_assignment(get_handle(), &list);
check_error(error);
error = rd_kafka_offsets_store(get_handle(), list);
check_error(error, list);
} }
void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const