Replace TopicPartitionList class with a vector of TopicPartition

This commit is contained in:
Matias Fontanini
2016-05-28 09:02:44 -07:00
parent c31d00115a
commit 5b957de7e4
10 changed files with 106 additions and 171 deletions

View File

@@ -54,7 +54,7 @@ private:
void commit(const Message& msg, bool async); void commit(const Message& msg, bool async);
void commit(const TopicPartitionList& topic_partitions, bool async); void commit(const TopicPartitionList& topic_partitions, bool async);
void handle_rebalance(rd_kafka_resp_err_t err, const TopicPartitionList& topic_partitions); void handle_rebalance(rd_kafka_resp_err_t err, TopicPartitionList& topic_partitions);
AssignmentCallback assignment_callback_; AssignmentCallback assignment_callback_;
RevocationCallback revocation_callback_; RevocationCallback revocation_callback_;

View File

@@ -5,8 +5,9 @@
#include <memory> #include <memory>
#include <chrono> #include <chrono>
#include <librdkafka/rdkafka.h> #include <librdkafka/rdkafka.h>
#include "topic_partition_list.h"
#include "metadata.h" #include "metadata.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
namespace cppkafka { namespace cppkafka {

View File

@@ -8,9 +8,11 @@ namespace cppkafka {
class TopicPartition { class TopicPartition {
public: public:
TopicPartition(const std::string& topic); TopicPartition();
TopicPartition(const std::string& topic, int partition); TopicPartition(const char* topic);
TopicPartition(const std::string& topic, int partition, int64_t offset); TopicPartition(std::string topic);
TopicPartition(std::string topic, int partition);
TopicPartition(std::string topic, int partition, int64_t offset);
const std::string& get_topic() const; const std::string& get_topic() const;
int get_partition() const; int get_partition() const;

View File

@@ -3,60 +3,19 @@
#include <memory> #include <memory>
#include <algorithm> #include <algorithm>
#include <initializer_list>
#include <librdkafka/rdkafka.h> #include <librdkafka/rdkafka.h>
#include "topic_partition.h"
namespace cppkafka { namespace cppkafka {
class TopicPartition; class TopicPartition;
class TopicPartitionList { using TopicPartitionsListPtr = std::unique_ptr<rd_kafka_topic_partition_list_t,
public: decltype(&rd_kafka_topic_partition_list_destroy)>;
static TopicPartitionList make_non_owning(rd_kafka_topic_partition_list_t* handle); using TopicPartitionList = std::vector<TopicPartition>;
TopicPartitionList(); TopicPartitionsListPtr convert(const std::vector<TopicPartition>& topic_partitions);
TopicPartitionList(const std::initializer_list<TopicPartition>& topic_partitions); std::vector<TopicPartition> convert(const TopicPartitionsListPtr& topic_partitions);
TopicPartitionList(rd_kafka_topic_partition_list_t* handle); TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
TopicPartitionList(size_t size);
template <typename ForwardIterator>
TopicPartitionList(ForwardIterator start, const ForwardIterator& end)
: TopicPartitionList(std::distance(start, end)) {
while (start != end) {
add(*start);
++start;
}
}
TopicPartitionList(const TopicPartitionList& rhs);
TopicPartitionList(TopicPartitionList&&) = default;
TopicPartitionList& operator=(const TopicPartitionList& rhs);
TopicPartitionList& operator=(TopicPartitionList&&) = default;
void add(const TopicPartition& topic_partition);
void update(const TopicPartition& topic_partition);
bool remove(const TopicPartition& topic_partition);
bool contains(const TopicPartition& topic_partition) const;
size_t size() const;
bool empty() const;
rd_kafka_topic_partition_list_t* get_handle() const;
private:
static const size_t DEFAULT_CONTAINER_SIZE;
struct NonOwningTag { };
using HandlePtr = std::unique_ptr<rd_kafka_topic_partition_list_t,
decltype(&rd_kafka_topic_partition_list_destroy)>;
static HandlePtr make_handle(rd_kafka_topic_partition_list_t* ptr);
TopicPartitionList(rd_kafka_topic_partition_list_t* handle, NonOwningTag);
rd_kafka_topic_partition_t* get_topic_partition(const TopicPartition& topic_partition) const;
HandlePtr handle_;
};
} // cppkafka } // cppkafka

View File

@@ -10,9 +10,15 @@ using std::chrono::milliseconds;
namespace cppkafka { namespace cppkafka {
void dummy_topic_partition_list_deleter(rd_kafka_topic_partition_list_t*) {
}
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque) { rd_kafka_topic_partition_list_t *partitions, void *opaque) {
TopicPartitionList list = TopicPartitionList::make_non_owning(partitions); // Build a dummy unique_ptr which won't actually delete the ptr
TopicPartitionsListPtr handle(partitions, &dummy_topic_partition_list_deleter);
TopicPartitionList list = convert(handle);
static_cast<Consumer*>(opaque)->handle_rebalance(error, list); static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
} }
@@ -47,8 +53,9 @@ void Consumer::set_rebalance_error_callback(RebalanceErrorCallback callback) {
} }
void Consumer::subscribe(const vector<string>& topics) { void Consumer::subscribe(const vector<string>& topics) {
TopicPartitionList list(topics.begin(), topics.end()); TopicPartitionList topic_partitions(topics.begin(), topics.end());
rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle()); TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), topic_list_handle.get());
check_error(error); check_error(error);
} }
@@ -58,8 +65,9 @@ void Consumer::unsubscribe() {
} }
void Consumer::assign(const TopicPartitionList& topic_partitions) { void Consumer::assign(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
// If the list is empty, then we need to use a null pointer // If the list is empty, then we need to use a null pointer
auto handle = topic_partitions.empty() ? nullptr : topic_partitions.get_handle(); auto handle = topic_partitions.empty() ? nullptr : topic_list_handle.get();
rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle); rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle);
check_error(error); check_error(error);
} }
@@ -91,20 +99,18 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) {
} }
TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) { TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& topic_partitions) {
// Copy the list, let rd_kafka change it and return it TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
TopicPartitionList output = topic_partitions; rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), topic_list_handle.get(),
rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(),
get_timeout().count()); get_timeout().count());
check_error(error); check_error(error);
return output; return convert(topic_list_handle);
} }
TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) { TopicPartitionList Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) {
// Copy the list, let rd_kafka change it and return it TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
TopicPartitionList output = topic_partitions; rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), topic_list_handle.get());
rd_kafka_resp_err_t error = rd_kafka_position(get_handle(), output.get_handle());
check_error(error); check_error(error);
return output; return convert(topic_list_handle);
} }
TopicPartitionList Consumer::get_subscription() { TopicPartitionList Consumer::get_subscription() {
@@ -112,7 +118,7 @@ TopicPartitionList Consumer::get_subscription() {
rd_kafka_topic_partition_list_t* list = nullptr; rd_kafka_topic_partition_list_t* list = nullptr;
error = rd_kafka_subscription(get_handle(), &list); error = rd_kafka_subscription(get_handle(), &list);
check_error(error); check_error(error);
return TopicPartitionList(list); return convert(make_handle(list));
} }
TopicPartitionList Consumer::get_assignment() { TopicPartitionList Consumer::get_assignment() {
@@ -120,7 +126,7 @@ TopicPartitionList Consumer::get_assignment() {
rd_kafka_topic_partition_list_t* list = nullptr; rd_kafka_topic_partition_list_t* list = nullptr;
error = rd_kafka_assignment(get_handle(), &list); error = rd_kafka_assignment(get_handle(), &list);
check_error(error); check_error(error);
return TopicPartitionList(list); return convert(make_handle(list));
} }
Message Consumer::poll() { Message Consumer::poll() {
@@ -137,13 +143,14 @@ void Consumer::commit(const Message& msg, bool async) {
} }
void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) { void Consumer::commit(const TopicPartitionList& topic_partitions, bool async) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error; rd_kafka_resp_err_t error;
error = rd_kafka_commit(get_handle(), topic_partitions.get_handle(), async ? 1 : 0); error = rd_kafka_commit(get_handle(), topic_list_handle.get(), async ? 1 : 0);
check_error(error); check_error(error);
} }
void Consumer::handle_rebalance(rd_kafka_resp_err_t error, void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
const TopicPartitionList& topic_partitions) { TopicPartitionList& topic_partitions) {
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
if (assignment_callback_) { if (assignment_callback_) {
assignment_callback_(topic_partitions); assignment_callback_(topic_partitions);

View File

@@ -2,8 +2,10 @@
#include "exceptions.h" #include "exceptions.h"
#include "topic_configuration.h" #include "topic_configuration.h"
#include "topic.h" #include "topic.h"
#include "topic_partition_list.h"
using std::string; using std::string;
using std::vector;
using std::chrono::milliseconds; using std::chrono::milliseconds;
namespace cppkafka { namespace cppkafka {
@@ -21,14 +23,16 @@ KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle)
} }
void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) { void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(), rd_kafka_resp_err_t error = rd_kafka_pause_partitions(get_handle(),
topic_partitions.get_handle()); topic_list_handle.get());
check_error(error); check_error(error);
} }
void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) { void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(), rd_kafka_resp_err_t error = rd_kafka_resume_partitions(get_handle(),
topic_partitions.get_handle()); topic_list_handle.get());
check_error(error); check_error(error);
} }

View File

@@ -5,18 +5,28 @@ using std::string;
namespace cppkafka { namespace cppkafka {
TopicPartition::TopicPartition(const string& topic) TopicPartition::TopicPartition()
: TopicPartition(topic, RD_KAFKA_PARTITION_UA) { : TopicPartition("") {
} }
TopicPartition::TopicPartition(const string& topic, int partition) TopicPartition::TopicPartition(const char* topic)
: TopicPartition(topic, partition, RD_KAFKA_OFFSET_INVALID) { : TopicPartition(string(topic)) {
} }
TopicPartition::TopicPartition(const string& topic, int partition, int64_t offset) TopicPartition::TopicPartition(string topic)
: topic_(topic), partition_(partition), offset_(offset) { : TopicPartition(move(topic), RD_KAFKA_PARTITION_UA) {
}
TopicPartition::TopicPartition(string topic, int partition)
: TopicPartition(move(topic), partition, RD_KAFKA_OFFSET_INVALID) {
}
TopicPartition::TopicPartition(string topic, int partition, int64_t offset)
: topic_(move(topic)), partition_(partition), offset_(offset) {
} }

View File

@@ -2,110 +2,34 @@
#include "topic_partition.h" #include "topic_partition.h"
#include "exceptions.h" #include "exceptions.h"
using std::initializer_list; using std::vector;
namespace cppkafka { namespace cppkafka {
const size_t TopicPartitionList::DEFAULT_CONTAINER_SIZE = 5; TopicPartitionsListPtr convert(const vector<TopicPartition>& topic_partitions) {
TopicPartitionsListPtr handle(rd_kafka_topic_partition_list_new(topic_partitions.size()),
void dummy_deleter(rd_kafka_topic_partition_list_t*) { &rd_kafka_topic_partition_list_destroy);
for (const auto& item : topic_partitions) {
} rd_kafka_topic_partition_t* new_item = nullptr;
new_item = rd_kafka_topic_partition_list_add(handle.get(),
TopicPartitionList item.get_topic().data(),
TopicPartitionList::make_non_owning(rd_kafka_topic_partition_list_t* handle) { item.get_partition());
return TopicPartitionList(handle, NonOwningTag()); new_item->offset = item.get_offset();
}
TopicPartitionList::TopicPartitionList()
: TopicPartitionList(DEFAULT_CONTAINER_SIZE) {
}
TopicPartitionList::TopicPartitionList(const initializer_list<TopicPartition>& topic_partitions)
: TopicPartitionList(topic_partitions.size()) {
for (const auto& value : topic_partitions) {
add(value);
} }
return handle;
} }
TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle) vector<TopicPartition> convert(const TopicPartitionsListPtr& topic_partitions) {
: handle_(make_handle(handle)) { vector<TopicPartition> output;
for (int i = 0; i < topic_partitions->cnt; ++i) {
} const auto& elem = topic_partitions->elems[i];
output.emplace_back(elem.topic, elem.partition, elem.offset);
TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle,
NonOwningTag)
: handle_(handle, &dummy_deleter) {
}
TopicPartitionList::TopicPartitionList(size_t size)
: handle_(make_handle(rd_kafka_topic_partition_list_new(size))) {
}
TopicPartitionList::TopicPartitionList(const TopicPartitionList& rhs)
: handle_(make_handle(rd_kafka_topic_partition_list_copy(rhs.get_handle()))) {
}
TopicPartitionList& TopicPartitionList::operator=(const TopicPartitionList& rhs) {
handle_.reset(rd_kafka_topic_partition_list_copy(rhs.get_handle()));
return *this;
}
void TopicPartitionList::add(const TopicPartition& topic_partition) {
rd_kafka_topic_partition_t* element = nullptr;
element = rd_kafka_topic_partition_list_add(handle_.get(),
topic_partition.get_topic().data(),
topic_partition.get_partition());
element->offset = topic_partition.get_offset();
}
void TopicPartitionList::update(const TopicPartition& topic_partition) {
rd_kafka_resp_err_t error;
error = rd_kafka_topic_partition_list_set_offset(get_handle(),
topic_partition.get_topic().data(),
topic_partition.get_partition(),
topic_partition.get_offset());
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);
} }
return output;
} }
bool TopicPartitionList::remove(const TopicPartition& topic_partition) { TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle) {
return rd_kafka_topic_partition_list_del(get_handle(), return TopicPartitionsListPtr(handle, &rd_kafka_topic_partition_list_destroy);
topic_partition.get_topic().data(),
topic_partition.get_partition()) == 1;
}
bool TopicPartitionList::contains(const TopicPartition& topic_partition) const {
return get_topic_partition(topic_partition) != nullptr;
}
size_t TopicPartitionList::size() const {
return handle_->cnt;
}
bool TopicPartitionList::empty() const {
return size() == 0;
}
rd_kafka_topic_partition_list_t* TopicPartitionList::get_handle() const {
return handle_.get();
}
TopicPartitionList::HandlePtr
TopicPartitionList::make_handle(rd_kafka_topic_partition_list_t* ptr) {
return HandlePtr(ptr, &rd_kafka_topic_partition_list_destroy);
}
rd_kafka_topic_partition_t*
TopicPartitionList::get_topic_partition(const TopicPartition& topic_partition) const {
return rd_kafka_topic_partition_list_find(get_handle(),
topic_partition.get_topic().data(),
topic_partition.get_partition());
} }
} // cppkafka } // cppkafka

View File

@@ -16,3 +16,4 @@ add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
create_test(producer) create_test(producer)
create_test(kafka_handle_base) create_test(kafka_handle_base)
create_test(topic_partition_list)

View File

@@ -0,0 +1,27 @@
#include <gtest/gtest.h>
#include "cppkafka/topic_partition_list.h"
#include "cppkafka/topic_partition.h"
using namespace cppkafka;
class TopicPartitionListTest : public testing::Test {
public:
};
TEST_F(TopicPartitionListTest, Conversion) {
TopicPartitionList list1;
list1.push_back("foo");
list1.push_back({ "bar", 2 });
TopicPartitionList list2 = convert(convert(list1));
EXPECT_EQ(list1.size(), list2.size());
for (size_t i = 0; i < list1.size(); ++i) {
const auto& item1 = list1[i];
const auto& item2 = list2[i];
EXPECT_EQ(item1.get_topic(), item2.get_topic());
EXPECT_EQ(item1.get_partition(), item2.get_partition());
EXPECT_EQ(item1.get_offset(), item2.get_offset());
}
}