From 83c1d304c628145764eafb907695e6b39645998c Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 22 May 2016 09:32:17 -0700 Subject: [PATCH] Add more wrappers for Consumer --- include/cppkafka/consumer.h | 8 ++++++-- include/cppkafka/topic_partition_list.h | 1 + src/consumer.cpp | 25 +++++++++++++++++++++++-- src/topic_partition_list.cpp | 5 +++++ 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 277b0c2..605af93 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -23,14 +23,18 @@ public: void unsubscribe(); void assign(const TopicPartitionList& topic_partitions); + void close(); void commit(const Message& msg); void async_commit(const Message& msg); void commit(const TopicPartitionList& topic_partitions); void async_commit(const TopicPartitionList& topic_partitions); - TopicPartitionList get_committed(const TopicPartitionList& topic_partitions); - TopicPartitionList get_position(const TopicPartitionList& topic_partitions); + TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions); + TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions); + TopicPartitionList get_subscription(); + TopicPartitionList get_assignment(); + Message poll(); private: diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 83f14dd..22ae27d 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -13,6 +13,7 @@ class TopicPartition; class TopicPartitionList { public: TopicPartitionList(); + TopicPartitionList(rd_kafka_topic_partition_list_t* handle); TopicPartitionList(size_t size); template TopicPartitionList(ForwardIterator start, const ForwardIterator& end) diff --git a/src/consumer.cpp b/src/consumer.cpp index 7d494bf..1d82fe1 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -45,6 +45,11 @@ void Consumer::assign(const TopicPartitionList& topic_partitions) { check_error(error); } +void Consumer::close() { + rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); + check_error(error); +} + void Consumer::commit(const Message& msg) { commit(msg, false); } @@ -61,7 +66,7 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) { commit(topic_partitions, true); } -TopicPartitionList Consumer::get_committed(const TopicPartitionList& topic_partitions) { +TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) { // Copy the list, let rd_kafka change it and return it TopicPartitionList output = topic_partitions; rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(), @@ -70,7 +75,7 @@ TopicPartitionList Consumer::get_committed(const TopicPartitionList& topic_parti return output; } -TopicPartitionList Consumer::get_position(const TopicPartitionList& topic_partitions) { +TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) { // Copy the list, let rd_kafka change it and return it TopicPartitionList output = topic_partitions; rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), output.get_handle()); @@ -78,6 +83,22 @@ TopicPartitionList Consumer::get_position(const TopicPartitionList& topic_partit return output; } +TopicPartitionList Consumer::get_subscription() { + rd_kafka_resp_err_t error; + rd_kafka_topic_partition_list_t* list = nullptr; + error = rd_kafka_subscription(get_handle(), &list); + check_error(error); + return TopicPartitionList(list); +} + +TopicPartitionList Consumer::get_assignment() { + rd_kafka_resp_err_t error; + rd_kafka_topic_partition_list_t* list = nullptr; + error = rd_kafka_assignment(get_handle(), &list); + check_error(error); + return TopicPartitionList(list); +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); return Message(message); diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 689cddb..0981410 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -11,6 +11,11 @@ TopicPartitionList::TopicPartitionList() } +TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle) +: handle_(make_handle(handle)) { + +} + TopicPartitionList::TopicPartitionList(size_t size) : handle_(make_handle(rd_kafka_topic_partition_list_new(size))) {