diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 674396e..9a3abfd 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -26,12 +26,15 @@ public: void commit(const Message& msg); void async_commit(const Message& msg); + void commit(const TopicPartitionList& topic_partitions); + void async_commit(const TopicPartitionList& topic_partitions); Message poll(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; void commit(const Message& msg, bool async); + void commit(const TopicPartitionList& topic_partitions, bool async); void check_error(rd_kafka_resp_err_t error); std::chrono::milliseconds timeout_ms_; diff --git a/src/consumer.cpp b/src/consumer.cpp index f6cf4b3..cf035e4 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -23,7 +23,7 @@ Consumer::Consumer(const Configuration& config) set_handle(ptr); } -void Consumer::set_timeout(const std::chrono::milliseconds timeout) { +void Consumer::set_timeout(const milliseconds timeout) { timeout_ms_ = timeout; } @@ -53,6 +53,14 @@ void Consumer::async_commit(const Message& msg) { commit(msg, true); } +void Consumer::commit(const TopicPartitionList& topic_partitions) { + commit(topic_partitions, false); +} + +void Consumer::async_commit(const TopicPartitionList& topic_partitions) { + commit(topic_partitions, true); +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); return Message(message); @@ -61,7 +69,13 @@ Message Consumer::poll() { void Consumer::commit(const Message& msg, bool async) { rd_kafka_resp_err_t error; error = rd_kafka_commit_message(get_handle(), msg.get_handle(), - async ? 1 : 0); + async ? 1 : 0); + check_error(error); +} + +void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) { + rd_kafka_resp_err_t error; + error = rd_kafka_commit(get_handle(), topic_partitions.get_handle(), async ? 1 : 0); check_error(error); }