From 7708e945b7092eacd27e3362a4dd293ba9e550e7 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 5 Jun 2016 18:14:46 -0700 Subject: [PATCH] Add get/query offset wrappers --- include/cppkafka/consumer.h | 11 +++++++---- include/cppkafka/kafka_handle_base.h | 16 +++++++++++----- src/consumer.cpp | 24 ++++++++++++++++++++---- src/kafka_handle_base.cpp | 26 +++++++++++++++++++++----- tests/consumer_test.cpp | 7 +++++++ tests/producer_test.cpp | 6 ++++++ 6 files changed, 72 insertions(+), 18 deletions(-) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index e279974..c340032 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -41,10 +41,13 @@ public: void commit(const TopicPartitionList& topic_partitions); void async_commit(const TopicPartitionList& topic_partitions); - TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions); - TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions); - TopicPartitionList get_subscription(); - TopicPartitionList get_assignment(); + OffsetTuple get_offsets(const std::string& topic, int partition) const; + + TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; + TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; + TopicPartitionList get_subscription() const; + TopicPartitionList get_assignment() const; + std::string get_member_id() const; Message poll(); private: diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 5cd6cbe..db8a3ce 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "metadata.h" #include "topic_partition.h" @@ -19,6 +20,8 @@ class Topic; class KafkaHandleBase { public: + using OffsetTuple = std::tuple; + virtual ~KafkaHandleBase() = default; KafkaHandleBase(const KafkaHandleBase&) = delete; KafkaHandleBase(KafkaHandleBase&&) = delete; @@ -30,18 +33,21 @@ public: void set_timeout(const std::chrono::milliseconds& timeout); - rd_kafka_t* get_handle(); + OffsetTuple query_offsets(const std::string& topic, int partition) const; + + rd_kafka_t* get_handle() const; Topic get_topic(const std::string& name); Topic get_topic(const std::string& name, TopicConfiguration config); - Metadata get_metadata(); - Metadata get_metadata(const Topic& topic); + Metadata get_metadata() const; + Metadata get_metadata(const Topic& topic) const; + std::string get_name() const; std::chrono::milliseconds get_timeout() const; const Configuration& get_configuration() const; protected: KafkaHandleBase(Configuration config); void set_handle(rd_kafka_t* handle); - void check_error(rd_kafka_resp_err_t error); + void check_error(rd_kafka_resp_err_t error) const; rd_kafka_conf_t* get_configuration_handle(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; @@ -50,7 +56,7 @@ private: using TopicConfigurationMap = std::unordered_map; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); - Metadata get_metadata(rd_kafka_topic_t* topic_ptr); + Metadata get_metadata(rd_kafka_topic_t* topic_ptr) const; void save_topic_config(const std::string& topic_name, TopicConfiguration config); HandlePtr handle_; diff --git a/src/consumer.cpp b/src/consumer.cpp index f140893..7708f2e 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -6,6 +6,7 @@ using std::vector; using std::string; using std::move; +using std::make_tuple; using std::chrono::milliseconds; @@ -96,7 +97,17 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) { commit(topic_partitions, true); } -TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) { +KafkaHandleBase::OffsetTuple Consumer::get_offsets(const string& topic, int partition) const { + int64_t low; + int64_t high; + rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), + partition, &low, &high); + check_error(result); + return make_tuple(low, high); +} + +TopicPartitionList +Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) const { TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), get_timeout().count()); @@ -104,14 +115,15 @@ TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& top return convert(topic_list_handle); } -TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) { +TopicPartitionList +Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const { TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); check_error(error); return convert(topic_list_handle); } -TopicPartitionList Consumer::get_subscription() { +TopicPartitionList Consumer::get_subscription() const { rd_kafka_resp_err_t error; rd_kafka_topic_partition_list_t* list = nullptr; error = rd_kafka_subscription(get_handle(), &list); @@ -119,7 +131,7 @@ TopicPartitionList Consumer::get_subscription() { return convert(make_handle(list)); } -TopicPartitionList Consumer::get_assignment() { +TopicPartitionList Consumer::get_assignment() const { rd_kafka_resp_err_t error; rd_kafka_topic_partition_list_t* list = nullptr; error = rd_kafka_assignment(get_handle(), &list); @@ -127,6 +139,10 @@ TopicPartitionList Consumer::get_assignment() { return convert(make_handle(list)); } +string Consumer::get_member_id() const { + return rd_kafka_memberid(get_handle()); +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), get_timeout().count()); diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 9b8f0d5..3c6ba97 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -6,6 +6,7 @@ using std::string; using std::vector; using std::move; +using std::make_tuple; using std::lock_guard; using std::mutex; using std::chrono::milliseconds; @@ -42,7 +43,7 @@ void KafkaHandleBase::set_timeout(const milliseconds& timeout) { timeout_ms_ = timeout; } -rd_kafka_t* KafkaHandleBase::get_handle() { +rd_kafka_t* KafkaHandleBase::get_handle() const { return handle_.get(); } @@ -57,14 +58,29 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) return get_topic(name, rd_kafka_topic_conf_dup(handle)); } -Metadata KafkaHandleBase::get_metadata() { +KafkaHandleBase::OffsetTuple KafkaHandleBase::query_offsets(const string& topic, + int partition) const { + int64_t low; + int64_t high; + rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(), + partition, &low, &high, + timeout_ms_.count()); + check_error(result); + return make_tuple(low, high); +} + +Metadata KafkaHandleBase::get_metadata() const { return get_metadata(nullptr); } -Metadata KafkaHandleBase::get_metadata(const Topic& topic) { +Metadata KafkaHandleBase::get_metadata(const Topic& topic) const { return get_metadata(topic.get_handle()); } +string KafkaHandleBase::get_name() const { + return rd_kafka_name(handle_.get()); +} + milliseconds KafkaHandleBase::get_timeout() const { return timeout_ms_; } @@ -85,7 +101,7 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf return Topic(topic); } -Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) { +Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) const { const rd_kafka_metadata_t* metadata; rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr, topic_ptr, &metadata, timeout_ms_.count()); @@ -99,7 +115,7 @@ void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfigura iter->second.set_as_opaque(); } -void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { +void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) const { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw HandleException(error); } diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 605bfc2..2d205aa 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -14,6 +14,7 @@ using std::string; using std::thread; using std::set; using std::mutex; +using std::tie; using std::condition_variable; using std::lock_guard; using std::unique_lock; @@ -131,6 +132,12 @@ TEST_F(ConsumerTest, AssignmentCallback) { assignment = consumer.get_assignment(); EXPECT_EQ(3, assignment.size()); + + int64_t low; + int64_t high; + tie(low, high) = consumer.get_offsets(KAFKA_TOPIC, partition); + EXPECT_GT(high, low); + EXPECT_EQ(high, runner.get_messages().back().get_offset() + 1); } TEST_F(ConsumerTest, Rebalance) { diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 5b80c27..86208f1 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -10,6 +10,7 @@ using std::string; using std::to_string; using std::set; +using std::tie; using std::move; using std::thread; using std::mutex; @@ -124,6 +125,11 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error()); + + int64_t low; + int64_t high; + tie(low, high) = producer.query_offsets(KAFKA_TOPIC, partition); + EXPECT_GT(high, low); } TEST_F(ProducerTest, OneMessageUsingKey) {