From 5606b81ecb09195b2f41749c969b4fb115503b65 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 21 May 2016 20:16:08 -0700 Subject: [PATCH] Add commit methods to Consumer --- include/cppkafka/consumer.h | 4 ++++ src/consumer.cpp | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 752b1a5..674396e 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -24,10 +24,14 @@ public: void assign(const TopicPartitionList& topic_partitions); + void commit(const Message& msg); + void async_commit(const Message& msg); + Message poll(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; + void commit(const Message& msg, bool async); void check_error(rd_kafka_resp_err_t error); std::chrono::milliseconds timeout_ms_; diff --git a/src/consumer.cpp b/src/consumer.cpp index b1e6829..f6cf4b3 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -45,11 +45,26 @@ void Consumer::assign(const TopicPartitionList& topic_partitions) { check_error(error); } +void Consumer::commit(const Message& msg) { + commit(msg, false); +} + +void Consumer::async_commit(const Message& msg) { + commit(msg, true); +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); return Message(message); } +void Consumer::commit(const Message& msg, bool async) { + rd_kafka_resp_err_t error; + error = rd_kafka_commit_message(get_handle(), msg.get_handle(), + async ? 1 : 0); + check_error(error); +} + void Consumer::check_error(rd_kafka_resp_err_t error) { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw HandleException(error);