From c7ba4785821253230eba06c312a36904434d6e00 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 28 Nov 2018 20:44:55 -0500 Subject: [PATCH 1/9] Header fixes --- include/cppkafka/buffer.h | 14 +++-- include/cppkafka/clonable_ptr.h | 42 ++++++++++++--- include/cppkafka/header_list.h | 42 +++++++++++---- include/cppkafka/message.h | 21 ++++++++ include/cppkafka/message_builder.h | 82 +++++++++++++++++++++++++----- 5 files changed, 167 insertions(+), 34 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index ee735f5..c1bb519 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -93,7 +93,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary vectors + /** + * Don't allow construction from temporary vectors + */ template Buffer(std::vector&& data) = delete; @@ -108,7 +110,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary arrays + /** + * Don't allow construction from temporary arrays + */ template Buffer(std::array&& data) = delete; @@ -120,9 +124,11 @@ public: */ Buffer(const std::string& data); - // Don't allow construction from temporary strings + /** + * Don't allow construction from temporary strings + */ Buffer(std::string&&) = delete; - + Buffer(const Buffer&) = delete; Buffer(Buffer&&) = default; Buffer& operator=(const Buffer&) = delete; diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 78ec0f0..6f175ff 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,9 +60,9 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter())), - cloner_(rhs.cloner_) { + : handle_(rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : + std::unique_ptr(rhs.get(), rhs.get_deleter())), + cloner_(rhs.get_cloner()) { } @@ -72,11 +72,11 @@ public: * \param rhs The pointer to be copied */ ClonablePtr& operator=(const ClonablePtr& rhs) { - if (this == &rhs) { - return *this; + if (this != &rhs) { + handle_ = rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : + std::unique_ptr(rhs.get(), rhs.get_deleter()); + cloner_ = rhs.get_cloner(); } - handle_ = rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter()); return *this; } @@ -91,6 +91,13 @@ public: return handle_.get(); } + /** + * \brief Clones the internal pointer using the specified cloner function. + */ + T* clone() const { + return cloner_ ? cloner_(handle_.get()) : handle_.get(); + } + /** * \brief Releases ownership of the internal pointer */ @@ -98,6 +105,27 @@ public: return handle_.release(); } + /** + * \brief Reset the internal pointer to a new one + */ + void reset(T* ptr) { + handle_.reset(ptr); + } + + /** + * \brief Get the deleter + */ + const Deleter& get_deleter() const { + return handle_.get_deleter(); + } + + /** + * \brief Get the cloner + */ + const Cloner& get_cloner() const { + return cloner_; + } + /** * \brief Indicates whether this ClonablePtr instance is valid (not null) */ diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 3400b94..384fce5 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -146,6 +146,12 @@ public: */ rd_kafka_headers_t* get_handle() const; + /** + * \brief Clone the underlying header list handle. + * \return The handle. + */ + rd_kafka_headers_t* clone_handle() const; + /** * \brief Get the underlying header list handle and release its ownership. * \return The handle. @@ -159,10 +165,14 @@ public: */ explicit operator bool() const; + /** + * \brief Indicates if this list owns the underlying handle or not. + */ + bool is_owning() const; + private: struct NonOwningTag { }; static void dummy_deleter(rd_kafka_headers_t*) {} - static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast(handle); } using HandlePtr = ClonablePtr; @@ -193,7 +203,7 @@ bool operator!=(const HeaderList& lhs, const HeaderList template HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { - return HeaderList(handle, NonOwningTag()); + return handle ? HeaderList(handle, NonOwningTag()) : HeaderList(); } template @@ -210,14 +220,16 @@ HeaderList::HeaderList(size_t reserve) template HeaderList::HeaderList(rd_kafka_headers_t* handle) -: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy - +: handle_(handle, + handle ? &rd_kafka_headers_destroy : nullptr, + handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy } template HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) -: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy. - +: handle_(handle, + handle ? &dummy_deleter : nullptr, + nullptr) { //if we don't own the header list, we forward the handle on copy. } // Methods @@ -254,7 +266,8 @@ HeaderType HeaderList::at(size_t index) const { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw Exception(error.to_string()); } - return HeaderType(name, BufferType(value, size)); + //Use 'Buffer' to implicitly convert to 'BufferType' + return HeaderType(name, Buffer(value, size)); } template @@ -269,8 +282,7 @@ HeaderType HeaderList::back() const { template size_t HeaderList::size() const { - assert(handle_); - return rd_kafka_header_cnt(handle_.get()); + return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0; } template @@ -281,7 +293,6 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - assert(handle_); if (empty()) { return end(); } @@ -291,7 +302,6 @@ HeaderList::begin() const { template typename HeaderList::Iterator HeaderList::end() const { - assert(handle_); return Iterator(make_non_owning(handle_.get()), size()); } @@ -300,6 +310,11 @@ rd_kafka_headers_t* HeaderList::get_handle() const { return handle_.get(); } +template +rd_kafka_headers_t* HeaderList::clone_handle() const { + return handle_.clone(); +} + template rd_kafka_headers_t* HeaderList::release_handle() { return handle_.release(); @@ -310,6 +325,11 @@ HeaderList::operator bool() const { return static_cast(handle_); } +template +bool HeaderList::is_owning() const { + return handle_.get_deleter() != &dummy_deleter; +} + } //namespace cppkafka #endif //RD_KAFKA_HEADERS_SUPPORT_VERSION diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index b59b98b..8007aee 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -127,6 +127,27 @@ public: } #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * \brief Sets the message's header list. + * \note After this call, the Message will take ownership of the header list. + */ + void set_header_list(const HeaderListType& headers) { + assert(handle_); + assert(!headers.is_owning()); + rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); + header_list_ = HeaderListType::make_non_owning(headers.get_handle()); + } + + /** + * \brief Sets the message's header list. + * \note After this call, the Message will take ownership of the header list. + */ + void set_header_list(HeaderListType&& headers) { + assert(handle_); + rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); + header_list_ = HeaderListType::make_non_owning(headers.release_handle()); + } + /** * \brief Gets the message's header list */ diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index df216d9..c889912 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -70,7 +70,12 @@ public: */ template BasicMessageBuilder(const BasicMessageBuilder& rhs); + template + BasicMessageBuilder(BasicMessageBuilder&& rhs); + /** + * Default copy and move constructors and assignment operators + */ BasicMessageBuilder(BasicMessageBuilder&&) = default; BasicMessageBuilder(const BasicMessageBuilder&) = default; BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default; @@ -106,11 +111,13 @@ public: #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** - * Add a header to the message + * Add a header(s) to the message * * \param header The header to be used */ Concrete& header(const HeaderType& header); + Concrete& headers(const HeaderListType& headers); + Concrete& headers(HeaderListType&& headers); #endif /** @@ -208,10 +215,12 @@ public: Message::InternalPtr internal() const; Concrete& internal(Message::InternalPtr internal); -private: +protected: void construct_buffer(BufferType& lhs, const BufferType& rhs); - Concrete& get_concrete(); +private: + Concrete& get_concrete(); + std::string topic_; int partition_{-1}; BufferType key_; @@ -234,23 +243,51 @@ template BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(message.get_header_list() ? + rd_kafka_headers_copy(message.get_header_list().get_handle()) : + nullptr), //copy and assume ownership +#endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : std::chrono::milliseconds(0)), user_data_(message.get_user_data()), internal_(message.internal()) { + } template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) -: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(rhs.header_list() ? + rd_kafka_headers_copy(rhs.header_list().get_handle()) : + nullptr), //copy headers +#endif + timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), internal_(rhs.internal()) { get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(payload_, rhs.payload()); } +template +template +BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(rhs.header_list().release_handle()), //assume ownership +#endif + timestamp_(rhs.timestamp()), + user_data_(rhs.user_data()), + internal_(rhs.internal()) { + get_concrete().construct_buffer(key_, std::move(rhs.key())); + get_concrete().construct_buffer(payload_, std::move(rhs.payload())); +} + template C& BasicMessageBuilder::topic(std::string value) { topic_ = std::move(value); @@ -284,6 +321,18 @@ C& BasicMessageBuilder::header(const HeaderType& header) { header_list_.add(header); return get_concrete(); } + +template +C& BasicMessageBuilder::headers(const HeaderListType& headers) { + header_list_ = headers; + return get_concrete(); +} + +template +C& BasicMessageBuilder::headers(HeaderListType&& headers) { + header_list_ = std::move(headers); + return get_concrete(); +} #endif template @@ -410,7 +459,7 @@ C& BasicMessageBuilder::get_concrete() { class MessageBuilder : public BasicMessageBuilder { public: using Base = BasicMessageBuilder; - using BasicMessageBuilder::BasicMessageBuilder; + using BasicMessageBuilder::BasicMessageBuilder; #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Base::HeaderType; using HeaderListType = Base::HeaderListType; @@ -421,17 +470,21 @@ public: } template - void construct_buffer(Buffer& lhs, const T& rhs) { - lhs = Buffer(rhs); + void construct_buffer(Buffer& lhs, T&& rhs) { + lhs = Buffer(std::forward(rhs)); } + MessageBuilder clone() const { return std::move(MessageBuilder(topic()). - key(Buffer(key().get_data(), key().get_size())). - payload(Buffer(payload().get_data(), payload().get_size())). - timestamp(timestamp()). - user_data(user_data()). - internal(internal())); + key(Buffer(key().get_data(), key().get_size())). +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + headers(header_list()). +#endif + payload(Buffer(payload().get_data(), payload().get_size())). + timestamp(timestamp()). + user_data(user_data()). + internal(internal())); } }; @@ -441,7 +494,12 @@ public: template class ConcreteMessageBuilder : public BasicMessageBuilder> { public: + using Base = BasicMessageBuilder>; using BasicMessageBuilder>::BasicMessageBuilder; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = typename Base::HeaderType; + using HeaderListType = typename Base::HeaderListType; +#endif }; } // cppkafka From 57bddabfd0f062c74da49ad69cd3e8b0be2aae46 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 2 Dec 2018 14:15:20 -0500 Subject: [PATCH 2/9] Removed clone_handle method and made ClonablePtr::clone private --- include/cppkafka/clonable_ptr.h | 14 +++++++------- include/cppkafka/header_list.h | 11 ----------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 6f175ff..4f0f323 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -91,13 +91,6 @@ public: return handle_.get(); } - /** - * \brief Clones the internal pointer using the specified cloner function. - */ - T* clone() const { - return cloner_ ? cloner_(handle_.get()) : handle_.get(); - } - /** * \brief Releases ownership of the internal pointer */ @@ -133,6 +126,13 @@ public: return static_cast(handle_); } private: + /** + * \brief Clones the internal pointer using the specified cloner function. + */ + T* clone() const { + return cloner_ ? cloner_(handle_.get()) : handle_.get(); + } + std::unique_ptr handle_; Cloner cloner_; }; diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 384fce5..1d3b3b0 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -146,12 +146,6 @@ public: */ rd_kafka_headers_t* get_handle() const; - /** - * \brief Clone the underlying header list handle. - * \return The handle. - */ - rd_kafka_headers_t* clone_handle() const; - /** * \brief Get the underlying header list handle and release its ownership. * \return The handle. @@ -310,11 +304,6 @@ rd_kafka_headers_t* HeaderList::get_handle() const { return handle_.get(); } -template -rd_kafka_headers_t* HeaderList::clone_handle() const { - return handle_.clone(); -} - template rd_kafka_headers_t* HeaderList::release_handle() { return handle_.release(); From 0b7931bfb8654ad3e5976e1608fb503531750f8b Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 2 Dec 2018 14:42:02 -0500 Subject: [PATCH 3/9] Added Buffer::Buffer(iter, iter) constructor overload --- include/cppkafka/buffer.h | 5 +++++ include/cppkafka/header_list.h | 3 +-- tests/buffer_test.cpp | 4 ++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index c1bb519..361e130 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -81,6 +81,11 @@ public: throw Exception("Invalid buffer configuration"); } } + + template + Buffer(const Iter first, Iter last) + : Buffer(&*first, std::distance(first, last)) { + } /** * Constructs a buffer from a vector diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 1d3b3b0..20e60a8 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -260,8 +260,7 @@ HeaderType HeaderList::at(size_t index) const { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw Exception(error.to_string()); } - //Use 'Buffer' to implicitly convert to 'BufferType' - return HeaderType(name, Buffer(value, size)); + return HeaderType(name, BufferType(value, value + size)); } template diff --git a/tests/buffer_test.cpp b/tests/buffer_test.cpp index 6659d3b..126f524 100644 --- a/tests/buffer_test.cpp +++ b/tests/buffer_test.cpp @@ -39,7 +39,11 @@ TEST_CASE("construction", "[buffer]") { const string str_data = "Hello world!"; const vector data(str_data.begin(), str_data.end()); const Buffer buffer(data); + const Buffer buffer2(data.begin(), data.end()); + const Buffer buffer3(str_data.data(), str_data.data() + str_data.size()); CHECK(str_data == buffer); + CHECK(buffer == buffer2); + CHECK(buffer == buffer3); } From e96dc6d1fccc89eb146fcb83e3a32b60c78791c6 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 2 Dec 2018 15:00:07 -0500 Subject: [PATCH 4/9] Added comments --- include/cppkafka/buffer.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 361e130..504d083 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -82,6 +82,12 @@ public: } } + /** + * Constructs a buffer from two iterators in the range [first,last) + * + * \param first An iterator to the start of data + * \param last An iterator to the end of data (not included) + */ template Buffer(const Iter first, Iter last) : Buffer(&*first, std::distance(first, last)) { From 6bbddcd5d50eda782790b2bb660fcb94849fc88e Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Mon, 3 Dec 2018 08:53:25 -0500 Subject: [PATCH 5/9] Fixed Message::set_header_list as per review comments. Changed ClonablePtr to use clone() internally --- include/cppkafka/clonable_ptr.h | 8 +++----- include/cppkafka/header_list.h | 10 ---------- include/cppkafka/message.h | 18 ++++-------------- 3 files changed, 7 insertions(+), 29 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 4f0f323..7b25b17 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,8 +60,7 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : - std::unique_ptr(rhs.get(), rhs.get_deleter())), + : handle_(std::unique_ptr(rhs.clone(), rhs.get_deleter())), cloner_(rhs.get_cloner()) { } @@ -73,8 +72,7 @@ public: */ ClonablePtr& operator=(const ClonablePtr& rhs) { if (this != &rhs) { - handle_ = rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : - std::unique_ptr(rhs.get(), rhs.get_deleter()); + handle_ = std::unique_ptr(rhs.clone(), rhs.get_deleter()); cloner_ = rhs.get_cloner(); } return *this; @@ -130,7 +128,7 @@ private: * \brief Clones the internal pointer using the specified cloner function. */ T* clone() const { - return cloner_ ? cloner_(handle_.get()) : handle_.get(); + return cloner_ ? cloner_(get()) : get(); } std::unique_ptr handle_; diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 20e60a8..313996f 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -159,11 +159,6 @@ public: */ explicit operator bool() const; - /** - * \brief Indicates if this list owns the underlying handle or not. - */ - bool is_owning() const; - private: struct NonOwningTag { }; static void dummy_deleter(rd_kafka_headers_t*) {} @@ -313,11 +308,6 @@ HeaderList::operator bool() const { return static_cast(handle_); } -template -bool HeaderList::is_owning() const { - return handle_.get_deleter() != &dummy_deleter; -} - } //namespace cppkafka #endif //RD_KAFKA_HEADERS_SUPPORT_VERSION diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 8007aee..22a2880 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -129,23 +129,13 @@ public: #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** * \brief Sets the message's header list. - * \note After this call, the Message will take ownership of the header list. + * \note This calls rd_kafka_message_set_headers. */ void set_header_list(const HeaderListType& headers) { assert(handle_); - assert(!headers.is_owning()); - rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); - header_list_ = HeaderListType::make_non_owning(headers.get_handle()); - } - - /** - * \brief Sets the message's header list. - * \note After this call, the Message will take ownership of the header list. - */ - void set_header_list(HeaderListType&& headers) { - assert(handle_); - rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); - header_list_ = HeaderListType::make_non_owning(headers.release_handle()); + rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle()); + rd_kafka_message_set_headers(handle_.get(), handle_copy); + header_list_ = HeaderListType::make_non_owning(handle_copy); } /** From 93e066a1c100102c769b1d6eedbae1a65a6d5ba5 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 4 Dec 2018 11:12:28 -0500 Subject: [PATCH 6/9] * Added asserts when building a HeaderList and removed checks for handle validity. * Removed explicit move semantic when cloning a MessageBuilder. * Renamed clone() to try_clone() in ClonablePtr class. --- include/cppkafka/clonable_ptr.h | 9 +++------ include/cppkafka/header_list.h | 14 ++++++-------- include/cppkafka/message.h | 3 +++ include/cppkafka/message_builder.h | 16 ++++++++-------- src/producer.cpp | 2 +- 5 files changed, 21 insertions(+), 23 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 7b25b17..13d7b20 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,7 +60,7 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(std::unique_ptr(rhs.clone(), rhs.get_deleter())), + : handle_(std::unique_ptr(rhs.try_clone(), rhs.get_deleter())), cloner_(rhs.get_cloner()) { } @@ -72,7 +72,7 @@ public: */ ClonablePtr& operator=(const ClonablePtr& rhs) { if (this != &rhs) { - handle_ = std::unique_ptr(rhs.clone(), rhs.get_deleter()); + handle_ = std::unique_ptr(rhs.try_clone(), rhs.get_deleter()); cloner_ = rhs.get_cloner(); } return *this; @@ -124,10 +124,7 @@ public: return static_cast(handle_); } private: - /** - * \brief Clones the internal pointer using the specified cloner function. - */ - T* clone() const { + T* try_clone() const { return cloner_ ? cloner_(get()) : get(); } diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 313996f..b0d3ab4 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -192,7 +192,7 @@ bool operator!=(const HeaderList& lhs, const HeaderList template HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { - return handle ? HeaderList(handle, NonOwningTag()) : HeaderList(); + return HeaderList(handle, NonOwningTag()); } template @@ -204,21 +204,19 @@ HeaderList::HeaderList() template HeaderList::HeaderList(size_t reserve) : handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { - + assert(reserve); } template HeaderList::HeaderList(rd_kafka_headers_t* handle) -: handle_(handle, - handle ? &rd_kafka_headers_destroy : nullptr, - handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy +: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy + assert(handle); } template HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) -: handle_(handle, - handle ? &dummy_deleter : nullptr, - nullptr) { //if we don't own the header list, we forward the handle on copy. +: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy. + assert(handle); } // Methods diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 22a2880..f8101a0 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -133,6 +133,9 @@ public: */ void set_header_list(const HeaderListType& headers) { assert(handle_); + if (!headers) { + return; //nothing to set + } rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle()); rd_kafka_message_set_headers(handle_.get(), handle_copy); header_list_ = HeaderListType::make_non_owning(handle_copy); diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index c889912..981e120 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -245,8 +245,7 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(message.get_header_list() ? - rd_kafka_headers_copy(message.get_header_list().get_handle()) : - nullptr), //copy and assume ownership + rd_kafka_headers_copy(message.get_header_list().get_handle()) : nullptr), //copy headers #endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : @@ -263,8 +262,7 @@ BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(rhs.header_list() ? - rd_kafka_headers_copy(rhs.header_list().get_handle()) : - nullptr), //copy headers + rd_kafka_headers_copy(rhs.header_list().get_handle()) : nullptr), //copy headers #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -279,7 +277,8 @@ BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list().release_handle()), //assume ownership + header_list_(rhs.header_list() ? + rhs.header_list().release_handle() : nullptr), //assume header ownership #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -476,15 +475,16 @@ public: MessageBuilder clone() const { - return std::move(MessageBuilder(topic()). - key(Buffer(key().get_data(), key().get_size())). + MessageBuilder builder(topic()); + builder.key(Buffer(key().get_data(), key().get_size())). #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) headers(header_list()). #endif payload(Buffer(payload().get_data(), payload().get_size())). timestamp(timestamp()). user_data(user_data()). - internal(internal())); + internal(internal()); + return builder; } }; diff --git a/src/producer.cpp b/src/producer.cpp index 815c75b..af138d0 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -70,7 +70,7 @@ void Producer::produce(const MessageBuilder& builder) { } void Producer::produce(MessageBuilder&& builder) { - do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers + do_produce(builder, std::move(builder.header_list())); //move headers } void Producer::produce(const Message& message) { From fe0c7e7dd5b4890db24779cbf1298854fcaeed91 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 5 Dec 2018 12:15:25 -0500 Subject: [PATCH 7/9] Fixed end() iterator and also applied default copy-constructor instead of passing null handle in BasicMessageBuilder --- include/cppkafka/header_list.h | 7 ++----- include/cppkafka/header_list_iterator.h | 2 +- include/cppkafka/message_builder.h | 6 +++--- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index b0d3ab4..45115ee 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -279,16 +279,13 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - if (empty()) { - return end(); - } - return Iterator(make_non_owning(handle_.get()), 0); + return empty() ? end() : Iterator(make_non_owning(handle_.get()), 0); } template typename HeaderList::Iterator HeaderList::end() const { - return Iterator(make_non_owning(handle_.get()), size()); + return Iterator(empty() ? HeaderList() : make_non_owning(handle_.get()), size()); } template diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h index 226c3e2..e226b6d 100644 --- a/include/cppkafka/header_list_iterator.h +++ b/include/cppkafka/header_list_iterator.h @@ -154,7 +154,7 @@ private: HeaderIterator(HeaderListType headers, size_t index) : header_list_(std::move(headers)), - header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)), + header_(header_list_.empty() ? HeaderType() : header_list_.at(index)), index_(index) { } diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 981e120..d7dd9b0 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -245,7 +245,7 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(message.get_header_list() ? - rd_kafka_headers_copy(message.get_header_list().get_handle()) : nullptr), //copy headers + HeaderListType(rd_kafka_headers_copy(message.get_header_list().get_handle())) : HeaderListType()), //copy headers #endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : @@ -262,7 +262,7 @@ BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(rhs.header_list() ? - rd_kafka_headers_copy(rhs.header_list().get_handle()) : nullptr), //copy headers + HeaderListType(rd_kafka_headers_copy(rhs.header_list().get_handle())) : HeaderListType()), //copy headers #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -278,7 +278,7 @@ BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(rhs.header_list() ? - rhs.header_list().release_handle() : nullptr), //assume header ownership + HeaderListType(rhs.header_list().release_handle()) : HeaderListType()), //assume header ownership #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), From 1c80af9b68ceb558ac6ee99aa28511476e532d0d Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 5 Dec 2018 20:04:10 -0500 Subject: [PATCH 8/9] Added constructor from another HeaderList type --- include/cppkafka/buffer.h | 2 +- include/cppkafka/header_list.h | 33 ++++++++++++++++++++++++++++-- include/cppkafka/message_builder.h | 9 ++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 504d083..671160d 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -89,7 +89,7 @@ public: * \param last An iterator to the end of data (not included) */ template - Buffer(const Iter first, Iter last) + Buffer(const Iter first, const Iter last) : Buffer(&*first, std::distance(first, last)) { } diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 45115ee..e0b4589 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -51,6 +51,9 @@ namespace cppkafka { template class HeaderList { public: + template + friend class HeaderList; + using BufferType = typename HeaderType::ValueType; using Iterator = HeaderIterator; /** @@ -75,6 +78,16 @@ public: */ explicit HeaderList(rd_kafka_headers_t* handle); + /** + * \brief Create a header list from another header list type + * \param other The other list + */ + template + HeaderList(const HeaderList& other); + + template + HeaderList(HeaderList&& other); + /** * \brief Add a header to the list. This translates to rd_kafka_header_add(). * \param header The header. @@ -219,6 +232,20 @@ HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) assert(handle); } +template +template +HeaderList::HeaderList(const HeaderList& other) +: handle_(other.handle_) { + +} + +template +template +HeaderList::HeaderList(HeaderList&& other) +: handle_(std::move(other.handle_)) { + +} + // Methods template Error HeaderList::add(const HeaderType& header) { @@ -279,13 +306,15 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - return empty() ? end() : Iterator(make_non_owning(handle_.get()), 0); + return empty() ? Iterator(HeaderList(), 0) : + Iterator(make_non_owning(handle_.get()), 0); } template typename HeaderList::Iterator HeaderList::end() const { - return Iterator(empty() ? HeaderList() : make_non_owning(handle_.get()), size()); + return empty() ? Iterator(HeaderList(), size()) : + Iterator(make_non_owning(handle_.get()), size()); } template diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index d7dd9b0..ba2b73e 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -244,6 +244,9 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + //Here we must copy explicitly the Message headers since they are non-owning and this class + //assumes full ownership. Otherwise we will be holding an invalid handle when Message goes + //out of scope and rdkafka frees its resource. header_list_(message.get_header_list() ? HeaderListType(rd_kafka_headers_copy(message.get_header_list().get_handle())) : HeaderListType()), //copy headers #endif @@ -261,8 +264,7 @@ BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list() ? - HeaderListType(rd_kafka_headers_copy(rhs.header_list().get_handle())) : HeaderListType()), //copy headers + header_list_(rhs.header_list()), //copy headers #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -277,8 +279,7 @@ BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list() ? - HeaderListType(rhs.header_list().release_handle()) : HeaderListType()), //assume header ownership + header_list_(std::move(header_list())), //assume header ownership #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), From 25c2eaa998a8f2d4be89b2c50c9df713eb6ed95e Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 6 Dec 2018 10:37:02 -0500 Subject: [PATCH 9/9] Changed iterator logic to capture header list by reference --- include/cppkafka/header_list.h | 6 ++---- include/cppkafka/header_list_iterator.h | 7 +++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index e0b4589..afb6210 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -306,15 +306,13 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - return empty() ? Iterator(HeaderList(), 0) : - Iterator(make_non_owning(handle_.get()), 0); + return Iterator(*this, 0); } template typename HeaderList::Iterator HeaderList::end() const { - return empty() ? Iterator(HeaderList(), size()) : - Iterator(make_non_owning(handle_.get()), size()); + return Iterator(*this, size()); } template diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h index e226b6d..b063294 100644 --- a/include/cppkafka/header_list_iterator.h +++ b/include/cppkafka/header_list_iterator.h @@ -151,10 +151,9 @@ public: } private: - HeaderIterator(HeaderListType headers, + HeaderIterator(const HeaderListType& headers, size_t index) - : header_list_(std::move(headers)), - header_(header_list_.empty() ? HeaderType() : header_list_.at(index)), + : header_list_(headers), index_(index) { } @@ -169,7 +168,7 @@ private: other.get_value().get_size())); } - HeaderListType header_list_; + const HeaderListType& header_list_; HeaderType header_; size_t index_; };