diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index ee735f5..c1bb519 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -93,7 +93,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary vectors + /** + * Don't allow construction from temporary vectors + */ template Buffer(std::vector&& data) = delete; @@ -108,7 +110,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary arrays + /** + * Don't allow construction from temporary arrays + */ template Buffer(std::array&& data) = delete; @@ -120,9 +124,11 @@ public: */ Buffer(const std::string& data); - // Don't allow construction from temporary strings + /** + * Don't allow construction from temporary strings + */ Buffer(std::string&&) = delete; - + Buffer(const Buffer&) = delete; Buffer(Buffer&&) = default; Buffer& operator=(const Buffer&) = delete; diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 78ec0f0..6f175ff 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,9 +60,9 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter())), - cloner_(rhs.cloner_) { + : handle_(rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : + std::unique_ptr(rhs.get(), rhs.get_deleter())), + cloner_(rhs.get_cloner()) { } @@ -72,11 +72,11 @@ public: * \param rhs The pointer to be copied */ ClonablePtr& operator=(const ClonablePtr& rhs) { - if (this == &rhs) { - return *this; + if (this != &rhs) { + handle_ = rhs.get_cloner() ? std::unique_ptr(rhs.clone(), rhs.get_deleter()) : + std::unique_ptr(rhs.get(), rhs.get_deleter()); + cloner_ = rhs.get_cloner(); } - handle_ = rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter()); return *this; } @@ -91,6 +91,13 @@ public: return handle_.get(); } + /** + * \brief Clones the internal pointer using the specified cloner function. + */ + T* clone() const { + return cloner_ ? cloner_(handle_.get()) : handle_.get(); + } + /** * \brief Releases ownership of the internal pointer */ @@ -98,6 +105,27 @@ public: return handle_.release(); } + /** + * \brief Reset the internal pointer to a new one + */ + void reset(T* ptr) { + handle_.reset(ptr); + } + + /** + * \brief Get the deleter + */ + const Deleter& get_deleter() const { + return handle_.get_deleter(); + } + + /** + * \brief Get the cloner + */ + const Cloner& get_cloner() const { + return cloner_; + } + /** * \brief Indicates whether this ClonablePtr instance is valid (not null) */ diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 3400b94..384fce5 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -146,6 +146,12 @@ public: */ rd_kafka_headers_t* get_handle() const; + /** + * \brief Clone the underlying header list handle. + * \return The handle. + */ + rd_kafka_headers_t* clone_handle() const; + /** * \brief Get the underlying header list handle and release its ownership. * \return The handle. @@ -159,10 +165,14 @@ public: */ explicit operator bool() const; + /** + * \brief Indicates if this list owns the underlying handle or not. + */ + bool is_owning() const; + private: struct NonOwningTag { }; static void dummy_deleter(rd_kafka_headers_t*) {} - static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast(handle); } using HandlePtr = ClonablePtr; @@ -193,7 +203,7 @@ bool operator!=(const HeaderList& lhs, const HeaderList template HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { - return HeaderList(handle, NonOwningTag()); + return handle ? HeaderList(handle, NonOwningTag()) : HeaderList(); } template @@ -210,14 +220,16 @@ HeaderList::HeaderList(size_t reserve) template HeaderList::HeaderList(rd_kafka_headers_t* handle) -: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy - +: handle_(handle, + handle ? &rd_kafka_headers_destroy : nullptr, + handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy } template HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) -: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy. - +: handle_(handle, + handle ? &dummy_deleter : nullptr, + nullptr) { //if we don't own the header list, we forward the handle on copy. } // Methods @@ -254,7 +266,8 @@ HeaderType HeaderList::at(size_t index) const { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw Exception(error.to_string()); } - return HeaderType(name, BufferType(value, size)); + //Use 'Buffer' to implicitly convert to 'BufferType' + return HeaderType(name, Buffer(value, size)); } template @@ -269,8 +282,7 @@ HeaderType HeaderList::back() const { template size_t HeaderList::size() const { - assert(handle_); - return rd_kafka_header_cnt(handle_.get()); + return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0; } template @@ -281,7 +293,6 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - assert(handle_); if (empty()) { return end(); } @@ -291,7 +302,6 @@ HeaderList::begin() const { template typename HeaderList::Iterator HeaderList::end() const { - assert(handle_); return Iterator(make_non_owning(handle_.get()), size()); } @@ -300,6 +310,11 @@ rd_kafka_headers_t* HeaderList::get_handle() const { return handle_.get(); } +template +rd_kafka_headers_t* HeaderList::clone_handle() const { + return handle_.clone(); +} + template rd_kafka_headers_t* HeaderList::release_handle() { return handle_.release(); @@ -310,6 +325,11 @@ HeaderList::operator bool() const { return static_cast(handle_); } +template +bool HeaderList::is_owning() const { + return handle_.get_deleter() != &dummy_deleter; +} + } //namespace cppkafka #endif //RD_KAFKA_HEADERS_SUPPORT_VERSION diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index b59b98b..8007aee 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -127,6 +127,27 @@ public: } #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * \brief Sets the message's header list. + * \note After this call, the Message will take ownership of the header list. + */ + void set_header_list(const HeaderListType& headers) { + assert(handle_); + assert(!headers.is_owning()); + rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); + header_list_ = HeaderListType::make_non_owning(headers.get_handle()); + } + + /** + * \brief Sets the message's header list. + * \note After this call, the Message will take ownership of the header list. + */ + void set_header_list(HeaderListType&& headers) { + assert(handle_); + rd_kafka_message_set_headers(handle_.get(), headers.get_handle()); + header_list_ = HeaderListType::make_non_owning(headers.release_handle()); + } + /** * \brief Gets the message's header list */ diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index df216d9..c889912 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -70,7 +70,12 @@ public: */ template BasicMessageBuilder(const BasicMessageBuilder& rhs); + template + BasicMessageBuilder(BasicMessageBuilder&& rhs); + /** + * Default copy and move constructors and assignment operators + */ BasicMessageBuilder(BasicMessageBuilder&&) = default; BasicMessageBuilder(const BasicMessageBuilder&) = default; BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default; @@ -106,11 +111,13 @@ public: #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** - * Add a header to the message + * Add a header(s) to the message * * \param header The header to be used */ Concrete& header(const HeaderType& header); + Concrete& headers(const HeaderListType& headers); + Concrete& headers(HeaderListType&& headers); #endif /** @@ -208,10 +215,12 @@ public: Message::InternalPtr internal() const; Concrete& internal(Message::InternalPtr internal); -private: +protected: void construct_buffer(BufferType& lhs, const BufferType& rhs); - Concrete& get_concrete(); +private: + Concrete& get_concrete(); + std::string topic_; int partition_{-1}; BufferType key_; @@ -234,23 +243,51 @@ template BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(message.get_header_list() ? + rd_kafka_headers_copy(message.get_header_list().get_handle()) : + nullptr), //copy and assume ownership +#endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : std::chrono::milliseconds(0)), user_data_(message.get_user_data()), internal_(message.internal()) { + } template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) -: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(rhs.header_list() ? + rd_kafka_headers_copy(rhs.header_list().get_handle()) : + nullptr), //copy headers +#endif + timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), internal_(rhs.internal()) { get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(payload_, rhs.payload()); } +template +template +BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(rhs.header_list().release_handle()), //assume ownership +#endif + timestamp_(rhs.timestamp()), + user_data_(rhs.user_data()), + internal_(rhs.internal()) { + get_concrete().construct_buffer(key_, std::move(rhs.key())); + get_concrete().construct_buffer(payload_, std::move(rhs.payload())); +} + template C& BasicMessageBuilder::topic(std::string value) { topic_ = std::move(value); @@ -284,6 +321,18 @@ C& BasicMessageBuilder::header(const HeaderType& header) { header_list_.add(header); return get_concrete(); } + +template +C& BasicMessageBuilder::headers(const HeaderListType& headers) { + header_list_ = headers; + return get_concrete(); +} + +template +C& BasicMessageBuilder::headers(HeaderListType&& headers) { + header_list_ = std::move(headers); + return get_concrete(); +} #endif template @@ -410,7 +459,7 @@ C& BasicMessageBuilder::get_concrete() { class MessageBuilder : public BasicMessageBuilder { public: using Base = BasicMessageBuilder; - using BasicMessageBuilder::BasicMessageBuilder; + using BasicMessageBuilder::BasicMessageBuilder; #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Base::HeaderType; using HeaderListType = Base::HeaderListType; @@ -421,17 +470,21 @@ public: } template - void construct_buffer(Buffer& lhs, const T& rhs) { - lhs = Buffer(rhs); + void construct_buffer(Buffer& lhs, T&& rhs) { + lhs = Buffer(std::forward(rhs)); } + MessageBuilder clone() const { return std::move(MessageBuilder(topic()). - key(Buffer(key().get_data(), key().get_size())). - payload(Buffer(payload().get_data(), payload().get_size())). - timestamp(timestamp()). - user_data(user_data()). - internal(internal())); + key(Buffer(key().get_data(), key().get_size())). +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + headers(header_list()). +#endif + payload(Buffer(payload().get_data(), payload().get_size())). + timestamp(timestamp()). + user_data(user_data()). + internal(internal())); } }; @@ -441,7 +494,12 @@ public: template class ConcreteMessageBuilder : public BasicMessageBuilder> { public: + using Base = BasicMessageBuilder>; using BasicMessageBuilder>::BasicMessageBuilder; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = typename Base::HeaderType; + using HeaderListType = typename Base::HeaderListType; +#endif }; } // cppkafka