diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 9a3abfd..277b0c2 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -29,6 +29,9 @@ public: void commit(const TopicPartitionList& topic_partitions); void async_commit(const TopicPartitionList& topic_partitions); + TopicPartitionList get_committed(const TopicPartitionList& topic_partitions); + TopicPartitionList get_position(const TopicPartitionList& topic_partitions); + Message poll(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; diff --git a/src/consumer.cpp b/src/consumer.cpp index cf035e4..7d494bf 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -61,6 +61,23 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) { commit(topic_partitions, true); } +TopicPartitionList Consumer::get_committed(const TopicPartitionList& topic_partitions) { + // Copy the list, let rd_kafka change it and return it + TopicPartitionList output = topic_partitions; + rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(), + timeout_ms_.count()); + check_error(error); + return output; +} + +TopicPartitionList Consumer::get_position(const TopicPartitionList& topic_partitions) { + // Copy the list, let rd_kafka change it and return it + TopicPartitionList output = topic_partitions; + rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), output.get_handle()); + check_error(error); + return output; +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); return Message(message);