mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-31 18:47:48 +00:00 
			
		
		
		
	Add Consumer::poll
This commit is contained in:
		| @@ -13,9 +13,9 @@ public: | ||||
|     Buffer(const DataType* data, size_t size); | ||||
|  | ||||
|     Buffer(const Buffer&) = delete; | ||||
|     Buffer(Buffer&&) = delete; | ||||
|     Buffer(Buffer&&) = default; | ||||
|     Buffer& operator=(const Buffer&) = delete; | ||||
|     Buffer& operator=(Buffer&&) = delete; | ||||
|     Buffer& operator=(Buffer&&) = default; | ||||
|  | ||||
|     const DataType* get_data() const; | ||||
|     size_t get_size() const; | ||||
|   | ||||
| @@ -3,8 +3,10 @@ | ||||
|  | ||||
| #include <vector> | ||||
| #include <string> | ||||
| #include <chrono> | ||||
| #include "kafka_handle_base.h" | ||||
| #include "topic_partition_list.h" | ||||
| #include "message.h" | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| @@ -15,12 +17,20 @@ class Consumer : public KafkaHandleBase { | ||||
| public: | ||||
|     Consumer(const Configuration& config); | ||||
|  | ||||
|     void set_timeout(const std::chrono::milliseconds timeout); | ||||
|  | ||||
|     void subscribe(const std::vector<std::string>& topics); | ||||
|     void unsubscribe(); | ||||
|  | ||||
|     void assign(const TopicPartitionList& topic_partitions); | ||||
|  | ||||
|     Message poll(); | ||||
| private: | ||||
|     static const std::chrono::milliseconds DEFAULT_TIMEOUT; | ||||
|  | ||||
|     void check_error(rd_kafka_resp_err_t error); | ||||
|  | ||||
|     std::chrono::milliseconds timeout_ms_; | ||||
| }; | ||||
|  | ||||
| } // cppkafka | ||||
|   | ||||
| @@ -12,6 +12,10 @@ namespace cppkafka { | ||||
| class Message { | ||||
| public: | ||||
|     Message(rd_kafka_message_t* handle); | ||||
|     Message(const Message&) = delete; | ||||
|     Message(Message&& rhs) = default; | ||||
|     Message& operator=(const Message&) = delete; | ||||
|     Message& operator=(Message&& rhs) = default; | ||||
|  | ||||
|     bool has_error() const; | ||||
|     rd_kafka_resp_err_t get_error() const; | ||||
|   | ||||
| @@ -6,9 +6,14 @@ | ||||
| using std::vector; | ||||
| using std::string; | ||||
|  | ||||
| using std::chrono::milliseconds; | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| Consumer::Consumer(const Configuration& config) { | ||||
| const milliseconds Consumer::DEFAULT_TIMEOUT{1000}; | ||||
|  | ||||
| Consumer::Consumer(const Configuration& config)  | ||||
| : timeout_ms_(DEFAULT_TIMEOUT) { | ||||
|     char error_buffer[512]; | ||||
|     rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(), | ||||
|                                    error_buffer, sizeof(error_buffer)); | ||||
| @@ -18,6 +23,10 @@ Consumer::Consumer(const Configuration& config) { | ||||
|     set_handle(ptr); | ||||
| } | ||||
|  | ||||
| void Consumer::set_timeout(const std::chrono::milliseconds timeout) { | ||||
|     timeout_ms_ = timeout; | ||||
| } | ||||
|  | ||||
| void Consumer::subscribe(const vector<string>& topics) { | ||||
|     TopicPartitionList list(topics.begin(), topics.end()); | ||||
|     rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle()); | ||||
| @@ -36,6 +45,11 @@ void Consumer::assign(const TopicPartitionList& topic_partitions) { | ||||
|     check_error(error); | ||||
| } | ||||
|  | ||||
| Message Consumer::poll() { | ||||
|     rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); | ||||
|     return Message(message); | ||||
| } | ||||
|  | ||||
| void Consumer::check_error(rd_kafka_resp_err_t error) { | ||||
|     if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||||
|         throw HandleException(error); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Matias Fontanini
					Matias Fontanini