From 12621b11ad93b8bb7b482a08b68f32bd1b3526d6 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 21 May 2016 20:06:54 -0700 Subject: [PATCH] Add some other methods to TopicPartitionList --- include/cppkafka/topic_partition_list.h | 6 +++++ src/topic_partition_list.cpp | 29 +++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 8ff99d3..83f14dd 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -28,6 +28,10 @@ public: TopicPartitionList& operator=(TopicPartitionList&&) = default; void add(const TopicPartition& topic_partition); + void update(const TopicPartition& topic_partition); + bool remove(const TopicPartition& topic_partition); + + bool contains(const TopicPartition& topic_partition) const; size_t size() const; bool empty() const; @@ -40,6 +44,8 @@ private: static HandlePtr make_handle(rd_kafka_topic_partition_list_t* ptr); + rd_kafka_topic_partition_t* get_topic_partition(const TopicPartition& topic_partition) const; + HandlePtr handle_; }; diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 85445a8..689cddb 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -1,5 +1,6 @@ #include "topic_partition_list.h" #include "topic_partition.h" +#include "exceptions.h" namespace cppkafka { @@ -33,6 +34,27 @@ void TopicPartitionList::add(const TopicPartition& topic_partition) { element->offset = topic_partition.get_offset(); } +void TopicPartitionList::update(const TopicPartition& topic_partition) { + rd_kafka_resp_err_t error; + error = rd_kafka_topic_partition_list_set_offset(get_handle(), + topic_partition.get_topic().data(), + topic_partition.get_partition(), + topic_partition.get_offset()); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw HandleException(error); + } +} + +bool TopicPartitionList::remove(const TopicPartition& topic_partition) { + return rd_kafka_topic_partition_list_del(get_handle(), + topic_partition.get_topic().data(), + topic_partition.get_partition()) == 1; +} + +bool TopicPartitionList::contains(const TopicPartition& topic_partition) const { + return get_topic_partition(topic_partition) != nullptr; +} + size_t TopicPartitionList::size() const { return handle_->cnt; } @@ -50,4 +72,11 @@ TopicPartitionList::make_handle(rd_kafka_topic_partition_list_t* ptr) { return HandlePtr(ptr, &rd_kafka_topic_partition_list_destroy); } +rd_kafka_topic_partition_t* +TopicPartitionList::get_topic_partition(const TopicPartition& topic_partition) const { + return rd_kafka_topic_partition_list_find(get_handle(), + topic_partition.get_topic().data(), + topic_partition.get_partition()); +} + } // cppkafka