From 1c51bb72f416de3cf9dd0dd3bd1cb3c633c75304 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 21 May 2016 19:51:50 -0700 Subject: [PATCH] Add Consumer::poll --- include/cppkafka/buffer.h | 4 ++-- include/cppkafka/consumer.h | 10 ++++++++++ include/cppkafka/message.h | 4 ++++ src/consumer.cpp | 16 +++++++++++++++- 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 53974ce..dfa28b0 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -13,9 +13,9 @@ public: Buffer(const DataType* data, size_t size); Buffer(const Buffer&) = delete; - Buffer(Buffer&&) = delete; + Buffer(Buffer&&) = default; Buffer& operator=(const Buffer&) = delete; - Buffer& operator=(Buffer&&) = delete; + Buffer& operator=(Buffer&&) = default; const DataType* get_data() const; size_t get_size() const; diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index b8ebd9a..752b1a5 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -3,8 +3,10 @@ #include #include +#include #include "kafka_handle_base.h" #include "topic_partition_list.h" +#include "message.h" namespace cppkafka { @@ -15,12 +17,20 @@ class Consumer : public KafkaHandleBase { public: Consumer(const Configuration& config); + void set_timeout(const std::chrono::milliseconds timeout); + void subscribe(const std::vector& topics); void unsubscribe(); void assign(const TopicPartitionList& topic_partitions); + + Message poll(); private: + static const std::chrono::milliseconds DEFAULT_TIMEOUT; + void check_error(rd_kafka_resp_err_t error); + + std::chrono::milliseconds timeout_ms_; }; } // cppkafka diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index d004fde..19e9e9c 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -12,6 +12,10 @@ namespace cppkafka { class Message { public: Message(rd_kafka_message_t* handle); + Message(const Message&) = delete; + Message(Message&& rhs) = default; + Message& operator=(const Message&) = delete; + Message& operator=(Message&& rhs) = default; bool has_error() const; rd_kafka_resp_err_t get_error() const; diff --git a/src/consumer.cpp b/src/consumer.cpp index 046bc1a..b1e6829 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -6,9 +6,14 @@ using std::vector; using std::string; +using std::chrono::milliseconds; + namespace cppkafka { -Consumer::Consumer(const Configuration& config) { +const milliseconds Consumer::DEFAULT_TIMEOUT{1000}; + +Consumer::Consumer(const Configuration& config) +: timeout_ms_(DEFAULT_TIMEOUT) { char error_buffer[512]; rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(), error_buffer, sizeof(error_buffer)); @@ -18,6 +23,10 @@ Consumer::Consumer(const Configuration& config) { set_handle(ptr); } +void Consumer::set_timeout(const std::chrono::milliseconds timeout) { + timeout_ms_ = timeout; +} + void Consumer::subscribe(const vector& topics) { TopicPartitionList list(topics.begin(), topics.end()); rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle()); @@ -36,6 +45,11 @@ void Consumer::assign(const TopicPartitionList& topic_partitions) { check_error(error); } +Message Consumer::poll() { + rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); + return Message(message); +} + void Consumer::check_error(rd_kafka_resp_err_t error) { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw HandleException(error);