From 1c80af9b68ceb558ac6ee99aa28511476e532d0d Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 5 Dec 2018 20:04:10 -0500 Subject: [PATCH] Added constructor from another HeaderList type --- include/cppkafka/buffer.h | 2 +- include/cppkafka/header_list.h | 33 ++++++++++++++++++++++++++++-- include/cppkafka/message_builder.h | 9 ++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 504d083..671160d 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -89,7 +89,7 @@ public: * \param last An iterator to the end of data (not included) */ template - Buffer(const Iter first, Iter last) + Buffer(const Iter first, const Iter last) : Buffer(&*first, std::distance(first, last)) { } diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 45115ee..e0b4589 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -51,6 +51,9 @@ namespace cppkafka { template class HeaderList { public: + template + friend class HeaderList; + using BufferType = typename HeaderType::ValueType; using Iterator = HeaderIterator; /** @@ -75,6 +78,16 @@ public: */ explicit HeaderList(rd_kafka_headers_t* handle); + /** + * \brief Create a header list from another header list type + * \param other The other list + */ + template + HeaderList(const HeaderList& other); + + template + HeaderList(HeaderList&& other); + /** * \brief Add a header to the list. This translates to rd_kafka_header_add(). * \param header The header. @@ -219,6 +232,20 @@ HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) assert(handle); } +template +template +HeaderList::HeaderList(const HeaderList& other) +: handle_(other.handle_) { + +} + +template +template +HeaderList::HeaderList(HeaderList&& other) +: handle_(std::move(other.handle_)) { + +} + // Methods template Error HeaderList::add(const HeaderType& header) { @@ -279,13 +306,15 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - return empty() ? end() : Iterator(make_non_owning(handle_.get()), 0); + return empty() ? Iterator(HeaderList(), 0) : + Iterator(make_non_owning(handle_.get()), 0); } template typename HeaderList::Iterator HeaderList::end() const { - return Iterator(empty() ? HeaderList() : make_non_owning(handle_.get()), size()); + return empty() ? Iterator(HeaderList(), size()) : + Iterator(make_non_owning(handle_.get()), size()); } template diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index d7dd9b0..ba2b73e 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -244,6 +244,9 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + //Here we must copy explicitly the Message headers since they are non-owning and this class + //assumes full ownership. Otherwise we will be holding an invalid handle when Message goes + //out of scope and rdkafka frees its resource. header_list_(message.get_header_list() ? HeaderListType(rd_kafka_headers_copy(message.get_header_list().get_handle())) : HeaderListType()), //copy headers #endif @@ -261,8 +264,7 @@ BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list() ? - HeaderListType(rd_kafka_headers_copy(rhs.header_list().get_handle())) : HeaderListType()), //copy headers + header_list_(rhs.header_list()), //copy headers #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -277,8 +279,7 @@ BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list() ? - HeaderListType(rhs.header_list().release_handle()) : HeaderListType()), //assume header ownership + header_list_(std::move(header_list())), //assume header ownership #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()),