Changes per code review

This commit is contained in:
accelerated
2019-02-06 13:01:57 -05:00
parent 284e1c57a9
commit 4f4c9e9c91
2 changed files with 7 additions and 10 deletions

View File

@@ -298,9 +298,9 @@ public:
* This translates into a call to rd_kafka_offsets_store with the offsets prior to the current assignment positions.
* It is equivalent to calling rd_kafka_offsets_store(get_offsets_position(get_assignment())).
*
* \note When using this API it's recommended to set enable.auto.offset.store to false.
* \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
*/
void store_offsets() const;
void store_consumed_offsets() const;
/**
* \brief Stores the offsets on the given topic/partitions (legacy).
@@ -309,7 +309,7 @@ public:
*
* \param topic_partitions The topic/partition list to be stored.
*
* \note When using this API it's recommended to set enable.auto.offset.store to false.
* \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
*/
void store_offsets(const TopicPartitionList& topic_partitions) const;
@@ -320,7 +320,7 @@ public:
*
* \param msg The message whose offset will be stored.
*
* \note When using this API it's recommended to set enable.auto.offset.store to false.
* \note When using this API it's recommended to set enable.auto.offset.store=false and enable.auto.commit=true.
*/
void store_offset(const Message& msg) const;

View File

@@ -200,20 +200,17 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const
return convert(topic_list_handle);
}
void Consumer::store_offsets() const
{
void Consumer::store_consumed_offsets() const {
store_offsets(get_offsets_position(get_assignment()));
}
void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const
{
void Consumer::store_offsets(const TopicPartitionList& topic_partitions) const {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_offsets_store(get_handle(), topic_list_handle.get());
check_error(error, topic_list_handle.get());
}
void Consumer::store_offset(const Message& msg) const
{
void Consumer::store_offset(const Message& msg) const {
rd_kafka_resp_err_t error = rd_kafka_offset_store(msg.get_handle()->rkt, msg.get_partition(), msg.get_offset());
check_error(error);
}