From 7bc03185a8c8f2e8d8f8a245687f7b9b98ec95d7 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Mon, 4 Feb 2019 12:10:45 -0500 Subject: [PATCH] Added legacy offset store API --- include/cppkafka/consumer.h | 32 ++++++++++++++++++++++++++++++++ src/consumer.cpp | 22 ++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 1be75c8..ca3cd79 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -291,6 +291,38 @@ public: * \return The topic partition list */ TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; + + /** + * \brief Stores the offsets on the currently assigned topic/partitions (legacy). + * + * This translates into a call to rd_kafka_offsets_store with the current partition assignment. + * It is equivalent to calling rd_kafka_offsets_store(get_assignment()). + * + * \note When using this API it's recommended to set enable.auto.offset.store to false. + */ + void store_offsets() const; + + /** + * \brief Stores the offsets on the given topic/partitions (legacy). + * + * This translates into a call to rd_kafka_offsets_store. + * + * \param topic_partitions The topic/partition list to be stored. + * + * \note When using this API it's recommended to set enable.auto.offset.store to false. + */ + void store_offsets(const TopicPartitionList& topic_partitions) const; + + /** + * \brief Stores the offset for this message (legacy). + * + * This translates into a call to rd_kafka_offset_store. + * + * \param msg The message whose offset will be stored. + * + * \note When using this API it's recommended to set enable.auto.offset.store to false. + */ + void store_offset(const Message& msg) const; /** * \brief Gets the current topic subscription diff --git a/src/consumer.cpp b/src/consumer.cpp index 20e3dcc..eceb356 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -200,6 +200,28 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const return convert(topic_list_handle); } +void Consumer::store_offsets() const +{ + rd_kafka_topic_partition_list_t* list = nullptr; + rd_kafka_resp_err_t error = rd_kafka_assignment(get_handle(), &list); + check_error(error); + error = rd_kafka_offsets_store(get_handle(), list); + check_error(error, list); +} + +void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const +{ + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get()); + check_error(error, topic_list_handle.get()); +} + +void Consumer::store_offset(const Message& msg) const +{ + rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset()); + check_error(error); +} + vector Consumer::get_subscription() const { rd_kafka_resp_err_t error; rd_kafka_topic_partition_list_t* list = nullptr;