diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index ee735f5..671160d 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -81,6 +81,17 @@ public: throw Exception("Invalid buffer configuration"); } } + + /** + * Constructs a buffer from two iterators in the range [first,last) + * + * \param first An iterator to the start of data + * \param last An iterator to the end of data (not included) + */ + template + Buffer(const Iter first, const Iter last) + : Buffer(&*first, std::distance(first, last)) { + } /** * Constructs a buffer from a vector @@ -93,7 +104,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary vectors + /** + * Don't allow construction from temporary vectors + */ template Buffer(std::vector&& data) = delete; @@ -108,7 +121,9 @@ public: static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } - // Don't allow construction from temporary arrays + /** + * Don't allow construction from temporary arrays + */ template Buffer(std::array&& data) = delete; @@ -120,9 +135,11 @@ public: */ Buffer(const std::string& data); - // Don't allow construction from temporary strings + /** + * Don't allow construction from temporary strings + */ Buffer(std::string&&) = delete; - + Buffer(const Buffer&) = delete; Buffer(Buffer&&) = default; Buffer& operator=(const Buffer&) = delete; diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 78ec0f0..13d7b20 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,9 +60,8 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter())), - cloner_(rhs.cloner_) { + : handle_(std::unique_ptr(rhs.try_clone(), rhs.get_deleter())), + cloner_(rhs.get_cloner()) { } @@ -72,11 +71,10 @@ public: * \param rhs The pointer to be copied */ ClonablePtr& operator=(const ClonablePtr& rhs) { - if (this == &rhs) { - return *this; + if (this != &rhs) { + handle_ = std::unique_ptr(rhs.try_clone(), rhs.get_deleter()); + cloner_ = rhs.get_cloner(); } - handle_ = rhs.cloner_ ? std::unique_ptr(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) : - std::unique_ptr(rhs.handle_.get(), rhs.handle_.get_deleter()); return *this; } @@ -98,6 +96,27 @@ public: return handle_.release(); } + /** + * \brief Reset the internal pointer to a new one + */ + void reset(T* ptr) { + handle_.reset(ptr); + } + + /** + * \brief Get the deleter + */ + const Deleter& get_deleter() const { + return handle_.get_deleter(); + } + + /** + * \brief Get the cloner + */ + const Cloner& get_cloner() const { + return cloner_; + } + /** * \brief Indicates whether this ClonablePtr instance is valid (not null) */ @@ -105,6 +124,10 @@ public: return static_cast(handle_); } private: + T* try_clone() const { + return cloner_ ? cloner_(get()) : get(); + } + std::unique_ptr handle_; Cloner cloner_; }; diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 3400b94..afb6210 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -51,6 +51,9 @@ namespace cppkafka { template class HeaderList { public: + template + friend class HeaderList; + using BufferType = typename HeaderType::ValueType; using Iterator = HeaderIterator; /** @@ -75,6 +78,16 @@ public: */ explicit HeaderList(rd_kafka_headers_t* handle); + /** + * \brief Create a header list from another header list type + * \param other The other list + */ + template + HeaderList(const HeaderList& other); + + template + HeaderList(HeaderList&& other); + /** * \brief Add a header to the list. This translates to rd_kafka_header_add(). * \param header The header. @@ -162,7 +175,6 @@ public: private: struct NonOwningTag { }; static void dummy_deleter(rd_kafka_headers_t*) {} - static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast(handle); } using HandlePtr = ClonablePtr; @@ -205,18 +217,32 @@ HeaderList::HeaderList() template HeaderList::HeaderList(size_t reserve) : handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { - + assert(reserve); } template HeaderList::HeaderList(rd_kafka_headers_t* handle) : handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy - + assert(handle); } template HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) -: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy. +: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy. + assert(handle); +} + +template +template +HeaderList::HeaderList(const HeaderList& other) +: handle_(other.handle_) { + +} + +template +template +HeaderList::HeaderList(HeaderList&& other) +: handle_(std::move(other.handle_)) { } @@ -254,7 +280,7 @@ HeaderType HeaderList::at(size_t index) const { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw Exception(error.to_string()); } - return HeaderType(name, BufferType(value, size)); + return HeaderType(name, BufferType(value, value + size)); } template @@ -269,8 +295,7 @@ HeaderType HeaderList::back() const { template size_t HeaderList::size() const { - assert(handle_); - return rd_kafka_header_cnt(handle_.get()); + return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0; } template @@ -281,18 +306,13 @@ bool HeaderList::empty() const { template typename HeaderList::Iterator HeaderList::begin() const { - assert(handle_); - if (empty()) { - return end(); - } - return Iterator(make_non_owning(handle_.get()), 0); + return Iterator(*this, 0); } template typename HeaderList::Iterator HeaderList::end() const { - assert(handle_); - return Iterator(make_non_owning(handle_.get()), size()); + return Iterator(*this, size()); } template diff --git a/include/cppkafka/header_list_iterator.h b/include/cppkafka/header_list_iterator.h index 226c3e2..b063294 100644 --- a/include/cppkafka/header_list_iterator.h +++ b/include/cppkafka/header_list_iterator.h @@ -151,10 +151,9 @@ public: } private: - HeaderIterator(HeaderListType headers, + HeaderIterator(const HeaderListType& headers, size_t index) - : header_list_(std::move(headers)), - header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)), + : header_list_(headers), index_(index) { } @@ -169,7 +168,7 @@ private: other.get_value().get_size())); } - HeaderListType header_list_; + const HeaderListType& header_list_; HeaderType header_; size_t index_; }; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index b59b98b..f8101a0 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -127,6 +127,20 @@ public: } #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + /** + * \brief Sets the message's header list. + * \note This calls rd_kafka_message_set_headers. + */ + void set_header_list(const HeaderListType& headers) { + assert(handle_); + if (!headers) { + return; //nothing to set + } + rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle()); + rd_kafka_message_set_headers(handle_.get(), handle_copy); + header_list_ = HeaderListType::make_non_owning(handle_copy); + } + /** * \brief Gets the message's header list */ diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index df216d9..ba2b73e 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -70,7 +70,12 @@ public: */ template BasicMessageBuilder(const BasicMessageBuilder& rhs); + template + BasicMessageBuilder(BasicMessageBuilder&& rhs); + /** + * Default copy and move constructors and assignment operators + */ BasicMessageBuilder(BasicMessageBuilder&&) = default; BasicMessageBuilder(const BasicMessageBuilder&) = default; BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default; @@ -106,11 +111,13 @@ public: #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) /** - * Add a header to the message + * Add a header(s) to the message * * \param header The header to be used */ Concrete& header(const HeaderType& header); + Concrete& headers(const HeaderListType& headers); + Concrete& headers(HeaderListType&& headers); #endif /** @@ -208,10 +215,12 @@ public: Message::InternalPtr internal() const; Concrete& internal(Message::InternalPtr internal); -private: +protected: void construct_buffer(BufferType& lhs, const BufferType& rhs); - Concrete& get_concrete(); +private: + Concrete& get_concrete(); + std::string topic_; int partition_{-1}; BufferType key_; @@ -234,23 +243,51 @@ template BasicMessageBuilder::BasicMessageBuilder(const Message& message) : topic_(message.get_topic()), key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + //Here we must copy explicitly the Message headers since they are non-owning and this class + //assumes full ownership. Otherwise we will be holding an invalid handle when Message goes + //out of scope and rdkafka frees its resource. + header_list_(message.get_header_list() ? + HeaderListType(rd_kafka_headers_copy(message.get_header_list().get_handle())) : HeaderListType()), //copy headers +#endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : std::chrono::milliseconds(0)), user_data_(message.get_user_data()), internal_(message.internal()) { + } template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) -: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(rhs.header_list()), //copy headers +#endif + timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), internal_(rhs.internal()) { get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(payload_, rhs.payload()); } +template +template +BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) +: topic_(rhs.topic()), + partition_(rhs.partition()), +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + header_list_(std::move(header_list())), //assume header ownership +#endif + timestamp_(rhs.timestamp()), + user_data_(rhs.user_data()), + internal_(rhs.internal()) { + get_concrete().construct_buffer(key_, std::move(rhs.key())); + get_concrete().construct_buffer(payload_, std::move(rhs.payload())); +} + template C& BasicMessageBuilder::topic(std::string value) { topic_ = std::move(value); @@ -284,6 +321,18 @@ C& BasicMessageBuilder::header(const HeaderType& header) { header_list_.add(header); return get_concrete(); } + +template +C& BasicMessageBuilder::headers(const HeaderListType& headers) { + header_list_ = headers; + return get_concrete(); +} + +template +C& BasicMessageBuilder::headers(HeaderListType&& headers) { + header_list_ = std::move(headers); + return get_concrete(); +} #endif template @@ -410,7 +459,7 @@ C& BasicMessageBuilder::get_concrete() { class MessageBuilder : public BasicMessageBuilder { public: using Base = BasicMessageBuilder; - using BasicMessageBuilder::BasicMessageBuilder; + using BasicMessageBuilder::BasicMessageBuilder; #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) using HeaderType = Base::HeaderType; using HeaderListType = Base::HeaderListType; @@ -421,17 +470,22 @@ public: } template - void construct_buffer(Buffer& lhs, const T& rhs) { - lhs = Buffer(rhs); + void construct_buffer(Buffer& lhs, T&& rhs) { + lhs = Buffer(std::forward(rhs)); } + MessageBuilder clone() const { - return std::move(MessageBuilder(topic()). - key(Buffer(key().get_data(), key().get_size())). - payload(Buffer(payload().get_data(), payload().get_size())). - timestamp(timestamp()). - user_data(user_data()). - internal(internal())); + MessageBuilder builder(topic()); + builder.key(Buffer(key().get_data(), key().get_size())). +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + headers(header_list()). +#endif + payload(Buffer(payload().get_data(), payload().get_size())). + timestamp(timestamp()). + user_data(user_data()). + internal(internal()); + return builder; } }; @@ -441,7 +495,12 @@ public: template class ConcreteMessageBuilder : public BasicMessageBuilder> { public: + using Base = BasicMessageBuilder>; using BasicMessageBuilder>::BasicMessageBuilder; +#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) + using HeaderType = typename Base::HeaderType; + using HeaderListType = typename Base::HeaderListType; +#endif }; } // cppkafka diff --git a/src/producer.cpp b/src/producer.cpp index 815c75b..af138d0 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -70,7 +70,7 @@ void Producer::produce(const MessageBuilder& builder) { } void Producer::produce(MessageBuilder&& builder) { - do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers + do_produce(builder, std::move(builder.header_list())); //move headers } void Producer::produce(const Message& message) { diff --git a/tests/buffer_test.cpp b/tests/buffer_test.cpp index 6659d3b..126f524 100644 --- a/tests/buffer_test.cpp +++ b/tests/buffer_test.cpp @@ -39,7 +39,11 @@ TEST_CASE("construction", "[buffer]") { const string str_data = "Hello world!"; const vector data(str_data.begin(), str_data.end()); const Buffer buffer(data); + const Buffer buffer2(data.begin(), data.end()); + const Buffer buffer3(str_data.data(), str_data.data() + str_data.size()); CHECK(str_data == buffer); + CHECK(buffer == buffer2); + CHECK(buffer == buffer3); }