diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 4748f96..8c3c899 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -193,13 +193,6 @@ public: */ void unassign(); - /** - * \brief Closes the consumer session - * - * This translates into a call to rd_kafka_consumer_close - */ - void close(); - /** * \brief Commits the given message synchronously * @@ -323,6 +316,7 @@ private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); + void close(); void commit(const Message& msg, bool async); void commit(const TopicPartitionList& topic_partitions, bool async); void handle_rebalance(rd_kafka_resp_err_t err, TopicPartitionList& topic_partitions); diff --git a/src/consumer.cpp b/src/consumer.cpp index 3d1e2ab..96955b0 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -105,11 +105,6 @@ void Consumer::unassign() { check_error(error); } -void Consumer::close() { - rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); - check_error(error); -} - void Consumer::commit(const Message& msg) { commit(msg, false); } @@ -198,6 +193,11 @@ Message Consumer::poll() { return message ? Message(message) : Message(); } +void Consumer::close() { + rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); + check_error(error); +} + void Consumer::commit(const Message& msg, bool async) { rd_kafka_resp_err_t error; error = rd_kafka_commit_message(get_handle(), msg.get_handle(),