From 284e1c57a945fe41c134e069409e5dc0572b6c5f Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Mon, 4 Feb 2019 14:08:40 -0500 Subject: [PATCH] Changed store_offsets() to use the actual position from the assignment --- include/cppkafka/consumer.h | 4 ++-- src/consumer.cpp | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index ca3cd79..9ab95b0 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -295,8 +295,8 @@ public: /** * \brief Stores the offsets on the currently assigned topic/partitions (legacy). * - * This translates into a call to rd_kafka_offsets_store with the current partition assignment. - * It is equivalent to calling rd_kafka_offsets_store(get_assignment()). + * This translates into a call to rd_kafka_offsets_store with the offsets prior to the current assignment positions. + * It is equivalent to calling rd_kafka_offsets_store(get_offsets_position(get_assignment())). * * \note When using this API it's recommended to set enable.auto.offset.store to false. */ diff --git a/src/consumer.cpp b/src/consumer.cpp index eceb356..a1a9f43 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -202,11 +202,7 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const void Consumer::store_offsets() const { - rd_kafka_topic_partition_list_t* list = nullptr; - rd_kafka_resp_err_t error = rd_kafka_assignment(get_handle(), &list); - check_error(error); - error = rd_kafka_offsets_store(get_handle(), list); - check_error(error, list); + store_offsets(get_offsets_position(get_assignment())); } void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const