diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 605af93..478cc5d 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -5,7 +5,6 @@ #include #include #include "kafka_handle_base.h" -#include "topic_partition_list.h" #include "message.h" namespace cppkafka { @@ -35,14 +34,12 @@ public: TopicPartitionList get_subscription(); TopicPartitionList get_assignment(); - Message poll(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; void commit(const Message& msg, bool async); void commit(const TopicPartitionList& topic_partitions, bool async); - void check_error(rd_kafka_resp_err_t error); std::chrono::milliseconds timeout_ms_; }; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 76fad22..ee08739 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -4,6 +4,7 @@ #include #include #include +#include "topic_partition_list.h" namespace cppkafka { @@ -18,6 +19,9 @@ public: KafkaHandleBase& operator=(const KafkaHandleBase&) = delete; KafkaHandleBase& operator=(KafkaHandleBase&&) = delete; + void pause_partitions(const TopicPartitionList& topic_partitions); + void resume_partitions(const TopicPartitionList& topic_partitions); + rd_kafka_t* get_handle(); Topic get_topic(const std::string& name); Topic get_topic(const std::string& name, TopicConfiguration config); @@ -26,6 +30,7 @@ protected: KafkaHandleBase(rd_kafka_t* handle); void set_handle(rd_kafka_t* handle); + void check_error(rd_kafka_resp_err_t error); private: using HandlePtr = std::unique_ptr; diff --git a/src/consumer.cpp b/src/consumer.cpp index 1d82fe1..e5ea373 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -117,10 +117,4 @@ void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) { check_error(error); } -void Consumer::check_error(rd_kafka_resp_err_t error) { - if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { - throw HandleException(error); - } -} - } // cppkafka diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 3b38343..58626e5 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -17,6 +17,18 @@ KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) } +void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) { + rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(), + topic_partitions.get_handle()); + check_error(error); +} + +void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) { + rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), + topic_partitions.get_handle()); + check_error(error); +} + rd_kafka_t* KafkaHandleBase::get_handle() { return handle_.get(); } @@ -41,4 +53,10 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf return Topic(topic); } +void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw HandleException(error); + } +} + } // cppkafka