From 5b957de7e4c3b3bf7dc01bcce20b804b82d44c9a Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 28 May 2016 09:02:44 -0700 Subject: [PATCH] Replace TopicPartitionList class with a vector of TopicPartition --- include/cppkafka/consumer.h | 2 +- include/cppkafka/kafka_handle_base.h | 3 +- include/cppkafka/topic_partition.h | 8 +- include/cppkafka/topic_partition_list.h | 53 ++--------- src/consumer.cpp | 39 ++++---- src/kafka_handle_base.cpp | 8 +- src/topic_partition.cpp | 22 +++-- src/topic_partition_list.cpp | 114 ++++-------------------- tests/CMakeLists.txt | 1 + tests/topic_partition_list_test.cpp | 27 ++++++ 10 files changed, 106 insertions(+), 171 deletions(-) create mode 100644 tests/topic_partition_list_test.cpp diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index d0680e5..98ae3c9 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -54,7 +54,7 @@ private: void commit(const Message& msg, bool async); void commit(const TopicPartitionList& topic_partitions, bool async); - void handle_rebalance(rd_kafka_resp_err_t err, const TopicPartitionList& topic_partitions); + void handle_rebalance(rd_kafka_resp_err_t err, TopicPartitionList& topic_partitions); AssignmentCallback assignment_callback_; RevocationCallback revocation_callback_; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index bedbc8c..d3f9aa7 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -5,8 +5,9 @@ #include #include #include -#include "topic_partition_list.h" #include "metadata.h" +#include "topic_partition.h" +#include "topic_partition_list.h" namespace cppkafka { diff --git a/include/cppkafka/topic_partition.h b/include/cppkafka/topic_partition.h index 4946980..24bf9c2 100644 --- a/include/cppkafka/topic_partition.h +++ b/include/cppkafka/topic_partition.h @@ -8,9 +8,11 @@ namespace cppkafka { class TopicPartition { public: - TopicPartition(const std::string& topic); - TopicPartition(const std::string& topic, int partition); - TopicPartition(const std::string& topic, int partition, int64_t offset); + TopicPartition(); + TopicPartition(const char* topic); + TopicPartition(std::string topic); + TopicPartition(std::string topic, int partition); + TopicPartition(std::string topic, int partition, int64_t offset); const std::string& get_topic() const; int get_partition() const; diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 1bc0f98..f94ac11 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -3,60 +3,19 @@ #include #include -#include #include -#include "topic_partition.h" namespace cppkafka { class TopicPartition; -class TopicPartitionList { -public: - static TopicPartitionList make_non_owning(rd_kafka_topic_partition_list_t* handle); +using TopicPartitionsListPtr = std::unique_ptr; +using TopicPartitionList = std::vector; - TopicPartitionList(); - TopicPartitionList(const std::initializer_list& topic_partitions); - TopicPartitionList(rd_kafka_topic_partition_list_t* handle); - TopicPartitionList(size_t size); - template - TopicPartitionList(ForwardIterator start, const ForwardIterator& end) - : TopicPartitionList(std::distance(start, end)) { - while (start != end) { - add(*start); - ++start; - } - } - TopicPartitionList(const TopicPartitionList& rhs); - TopicPartitionList(TopicPartitionList&&) = default; - TopicPartitionList& operator=(const TopicPartitionList& rhs); - TopicPartitionList& operator=(TopicPartitionList&&) = default; - - void add(const TopicPartition& topic_partition); - void update(const TopicPartition& topic_partition); - bool remove(const TopicPartition& topic_partition); - - bool contains(const TopicPartition& topic_partition) const; - size_t size() const; - bool empty() const; - - rd_kafka_topic_partition_list_t* get_handle() const; -private: - static const size_t DEFAULT_CONTAINER_SIZE; - - struct NonOwningTag { }; - - using HandlePtr = std::unique_ptr; - - static HandlePtr make_handle(rd_kafka_topic_partition_list_t* ptr); - - TopicPartitionList(rd_kafka_topic_partition_list_t* handle, NonOwningTag); - - rd_kafka_topic_partition_t* get_topic_partition(const TopicPartition& topic_partition) const; - - HandlePtr handle_; -}; +TopicPartitionsListPtr convert(const std::vector& topic_partitions); +std::vector convert(const TopicPartitionsListPtr& topic_partitions); +TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); } // cppkafka diff --git a/src/consumer.cpp b/src/consumer.cpp index 8887855..c065322 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -10,9 +10,15 @@ using std::chrono::milliseconds; namespace cppkafka { +void dummy_topic_partition_list_deleter(rd_kafka_topic_partition_list_t*) { + +} + void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque) { - TopicPartitionList list = TopicPartitionList::make_non_owning(partitions); + // Build a dummy unique_ptr which won't actually delete the ptr + TopicPartitionsListPtr handle(partitions, &dummy_topic_partition_list_deleter); + TopicPartitionList list = convert(handle); static_cast(opaque)->handle_rebalance(error, list); } @@ -47,8 +53,9 @@ void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) { } void Consumer::subscribe(const vector& topics) { - TopicPartitionList list(topics.begin(), topics.end()); - rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle()); + TopicPartitionList topic_partitions(topics.begin(), topics.end()); + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), topic_list_handle.get()); check_error(error); } @@ -58,8 +65,9 @@ void Consumer::unsubscribe() { } void Consumer::assign(const TopicPartitionList& topic_partitions) { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); // If the list is empty, then we need to use a null pointer - auto handle = topic_partitions.empty() ? nullptr : topic_partitions.get_handle(); + auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get(); rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle); check_error(error); } @@ -91,20 +99,18 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) { } TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) { - // Copy the list, let rd_kafka change it and return it - TopicPartitionList output = topic_partitions; - rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(), + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(), get_timeout().count()); check_error(error); - return output; + return convert(topic_list_handle); } TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) { - // Copy the list, let rd_kafka change it and return it - TopicPartitionList output = topic_partitions; - rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), output.get_handle()); + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get()); check_error(error); - return output; + return convert(topic_list_handle); } TopicPartitionList Consumer::get_subscription() { @@ -112,7 +118,7 @@ TopicPartitionList Consumer::get_subscription() { rd_kafka_topic_partition_list_t* list = nullptr; error = rd_kafka_subscription(get_handle(), &list); check_error(error); - return TopicPartitionList(list); + return convert(make_handle(list)); } TopicPartitionList Consumer::get_assignment() { @@ -120,7 +126,7 @@ TopicPartitionList Consumer::get_assignment() { rd_kafka_topic_partition_list_t* list = nullptr; error = rd_kafka_assignment(get_handle(), &list); check_error(error); - return TopicPartitionList(list); + return convert(make_handle(list)); } Message Consumer::poll() { @@ -137,13 +143,14 @@ void Consumer::commit(const Message& msg, bool async) { } void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error; - error = rd_kafka_commit(get_handle(), topic_partitions.get_handle(), async ? 1 : 0); + error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0); check_error(error); } void Consumer::handle_rebalance(rd_kafka_resp_err_t error, - const TopicPartitionList& topic_partitions) { + TopicPartitionList& topic_partitions) { if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { if (assignment_callback_) { assignment_callback_(topic_partitions); diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index bd0265f..47d03b0 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -2,8 +2,10 @@ #include "exceptions.h" #include "topic_configuration.h" #include "topic.h" +#include "topic_partition_list.h" using std::string; +using std::vector; using std::chrono::milliseconds; namespace cppkafka { @@ -21,14 +23,16 @@ KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) } void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(), - topic_partitions.get_handle()); + topic_list_handle.get()); check_error(error); } void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), - topic_partitions.get_handle()); + topic_list_handle.get()); check_error(error); } diff --git a/src/topic_partition.cpp b/src/topic_partition.cpp index 34bce0f..d2e0520 100644 --- a/src/topic_partition.cpp +++ b/src/topic_partition.cpp @@ -5,18 +5,28 @@ using std::string; namespace cppkafka { -TopicPartition::TopicPartition(const string& topic) -: TopicPartition(topic, RD_KAFKA_PARTITION_UA) { +TopicPartition::TopicPartition() +: TopicPartition("") { } -TopicPartition::TopicPartition(const string& topic, int partition) -: TopicPartition(topic, partition, RD_KAFKA_OFFSET_INVALID) { +TopicPartition::TopicPartition(const char* topic) +: TopicPartition(string(topic)) { } -TopicPartition::TopicPartition(const string& topic, int partition, int64_t offset) -: topic_(topic), partition_(partition), offset_(offset) { +TopicPartition::TopicPartition(string topic) +: TopicPartition(move(topic), RD_KAFKA_PARTITION_UA) { + +} + +TopicPartition::TopicPartition(string topic, int partition) +: TopicPartition(move(topic), partition, RD_KAFKA_OFFSET_INVALID) { + +} + +TopicPartition::TopicPartition(string topic, int partition, int64_t offset) +: topic_(move(topic)), partition_(partition), offset_(offset) { } diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index c8b09d3..42d49df 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -2,110 +2,34 @@ #include "topic_partition.h" #include "exceptions.h" -using std::initializer_list; +using std::vector; namespace cppkafka { -const size_t TopicPartitionList::DEFAULT_CONTAINER_SIZE = 5; - -void dummy_deleter(rd_kafka_topic_partition_list_t*) { - -} - -TopicPartitionList -TopicPartitionList::make_non_owning(rd_kafka_topic_partition_list_t* handle) { - return TopicPartitionList(handle, NonOwningTag()); -} - -TopicPartitionList::TopicPartitionList() -: TopicPartitionList(DEFAULT_CONTAINER_SIZE) { - -} - -TopicPartitionList::TopicPartitionList(const initializer_list& topic_partitions) -: TopicPartitionList(topic_partitions.size()) { - for (const auto& value : topic_partitions) { - add(value); +TopicPartitionsListPtr convert(const vector& topic_partitions) { + TopicPartitionsListPtr handle(rd_kafka_topic_partition_list_new(topic_partitions.size()), + &rd_kafka_topic_partition_list_destroy); + for (const auto& item : topic_partitions) { + rd_kafka_topic_partition_t* new_item = nullptr; + new_item = rd_kafka_topic_partition_list_add(handle.get(), + item.get_topic().data(), + item.get_partition()); + new_item->offset = item.get_offset(); } + return handle; } -TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle) -: handle_(make_handle(handle)) { - -} - -TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle, - NonOwningTag) -: handle_(handle, &dummy_deleter) { - -} - -TopicPartitionList::TopicPartitionList(size_t size) -: handle_(make_handle(rd_kafka_topic_partition_list_new(size))) { - -} - -TopicPartitionList::TopicPartitionList(const TopicPartitionList& rhs) -: handle_(make_handle(rd_kafka_topic_partition_list_copy(rhs.get_handle()))) { - -} - -TopicPartitionList& TopicPartitionList::operator=(const TopicPartitionList& rhs) { - handle_.reset(rd_kafka_topic_partition_list_copy(rhs.get_handle())); - return *this; -} - -void TopicPartitionList::add(const TopicPartition& topic_partition) { - rd_kafka_topic_partition_t* element = nullptr; - element = rd_kafka_topic_partition_list_add(handle_.get(), - topic_partition.get_topic().data(), - topic_partition.get_partition()); - element->offset = topic_partition.get_offset(); -} - -void TopicPartitionList::update(const TopicPartition& topic_partition) { - rd_kafka_resp_err_t error; - error = rd_kafka_topic_partition_list_set_offset(get_handle(), - topic_partition.get_topic().data(), - topic_partition.get_partition(), - topic_partition.get_offset()); - if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { - throw HandleException(error); +vector convert(const TopicPartitionsListPtr& topic_partitions) { + vector output; + for (int i = 0; i < topic_partitions->cnt; ++i) { + const auto& elem = topic_partitions->elems[i]; + output.emplace_back(elem.topic, elem.partition, elem.offset); } + return output; } -bool TopicPartitionList::remove(const TopicPartition& topic_partition) { - return rd_kafka_topic_partition_list_del(get_handle(), - topic_partition.get_topic().data(), - topic_partition.get_partition()) == 1; -} - -bool TopicPartitionList::contains(const TopicPartition& topic_partition) const { - return get_topic_partition(topic_partition) != nullptr; -} - -size_t TopicPartitionList::size() const { - return handle_->cnt; -} - -bool TopicPartitionList::empty() const { - return size() == 0; -} - -rd_kafka_topic_partition_list_t* TopicPartitionList::get_handle() const { - return handle_.get(); -} - -TopicPartitionList::HandlePtr -TopicPartitionList::make_handle(rd_kafka_topic_partition_list_t* ptr) { - return HandlePtr(ptr, &rd_kafka_topic_partition_list_destroy); -} - -rd_kafka_topic_partition_t* -TopicPartitionList::get_topic_partition(const TopicPartition& topic_partition) const { - return rd_kafka_topic_partition_list_find(get_handle(), - topic_partition.get_topic().data(), - topic_partition.get_partition()); +TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) { + return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy); } } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c7add58..866b41a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -16,3 +16,4 @@ add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") create_test(producer) create_test(kafka_handle_base) +create_test(topic_partition_list) diff --git a/tests/topic_partition_list_test.cpp b/tests/topic_partition_list_test.cpp new file mode 100644 index 0000000..82eee62 --- /dev/null +++ b/tests/topic_partition_list_test.cpp @@ -0,0 +1,27 @@ +#include +#include "cppkafka/topic_partition_list.h" +#include "cppkafka/topic_partition.h" + +using namespace cppkafka; + +class TopicPartitionListTest : public testing::Test { +public: + +}; + +TEST_F(TopicPartitionListTest, Conversion) { + TopicPartitionList list1; + list1.push_back("foo"); + list1.push_back({ "bar", 2 }); + + TopicPartitionList list2 = convert(convert(list1)); + + EXPECT_EQ(list1.size(), list2.size()); + for (size_t i = 0; i < list1.size(); ++i) { + const auto& item1 = list1[i]; + const auto& item2 = list2[i]; + EXPECT_EQ(item1.get_topic(), item2.get_topic()); + EXPECT_EQ(item1.get_partition(), item2.get_partition()); + EXPECT_EQ(item1.get_offset(), item2.get_offset()); + } +}