mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-11-04 04:27:48 +00:00 
			
		
		
		
	Merge pull request #144 from accelerated/header_fix
Header fixes and header copy considerations
This commit is contained in:
		@@ -81,6 +81,17 @@ public:
 | 
			
		||||
            throw Exception("Invalid buffer configuration");
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * Constructs a buffer from two iterators in the range [first,last)
 | 
			
		||||
     *
 | 
			
		||||
     * \param first An iterator to the start of data
 | 
			
		||||
     * \param last An iterator to the end of data (not included)
 | 
			
		||||
     */
 | 
			
		||||
    template <typename Iter>
 | 
			
		||||
    Buffer(const Iter first, const Iter last)
 | 
			
		||||
    : Buffer(&*first, std::distance(first, last)) {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Constructs a buffer from a vector
 | 
			
		||||
@@ -93,7 +104,9 @@ public:
 | 
			
		||||
        static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Don't allow construction from temporary vectors
 | 
			
		||||
    /**
 | 
			
		||||
     * Don't allow construction from temporary vectors
 | 
			
		||||
     */
 | 
			
		||||
    template <typename T>
 | 
			
		||||
    Buffer(std::vector<T>&& data) = delete;
 | 
			
		||||
 | 
			
		||||
@@ -108,7 +121,9 @@ public:
 | 
			
		||||
        static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Don't allow construction from temporary arrays
 | 
			
		||||
    /**
 | 
			
		||||
     * Don't allow construction from temporary arrays
 | 
			
		||||
     */
 | 
			
		||||
    template <typename T, size_t N>
 | 
			
		||||
    Buffer(std::array<T, N>&& data) = delete;
 | 
			
		||||
 | 
			
		||||
@@ -120,9 +135,11 @@ public:
 | 
			
		||||
     */
 | 
			
		||||
    Buffer(const std::string& data); 
 | 
			
		||||
 | 
			
		||||
    // Don't allow construction from temporary strings
 | 
			
		||||
    /**
 | 
			
		||||
     * Don't allow construction from temporary strings
 | 
			
		||||
     */
 | 
			
		||||
    Buffer(std::string&&) = delete;
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    Buffer(const Buffer&) = delete;
 | 
			
		||||
    Buffer(Buffer&&) = default;
 | 
			
		||||
    Buffer& operator=(const Buffer&) = delete;
 | 
			
		||||
 
 | 
			
		||||
@@ -60,9 +60,8 @@ public:
 | 
			
		||||
     * \param rhs The pointer to be copied
 | 
			
		||||
     */
 | 
			
		||||
    ClonablePtr(const ClonablePtr& rhs)
 | 
			
		||||
    : handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
 | 
			
		||||
                            std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
 | 
			
		||||
      cloner_(rhs.cloner_) {
 | 
			
		||||
    : handle_(std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter())),
 | 
			
		||||
      cloner_(rhs.get_cloner()) {
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -72,11 +71,10 @@ public:
 | 
			
		||||
     * \param rhs The pointer to be copied
 | 
			
		||||
     */
 | 
			
		||||
    ClonablePtr& operator=(const ClonablePtr& rhs) {
 | 
			
		||||
        if (this == &rhs) {
 | 
			
		||||
            return *this;
 | 
			
		||||
        if (this != &rhs) {
 | 
			
		||||
            handle_ = std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter());
 | 
			
		||||
            cloner_ = rhs.get_cloner();
 | 
			
		||||
        }
 | 
			
		||||
        handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
 | 
			
		||||
                                std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
 | 
			
		||||
        return *this;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -98,6 +96,27 @@ public:
 | 
			
		||||
        return handle_.release();
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Reset the internal pointer to a new one
 | 
			
		||||
     */
 | 
			
		||||
    void reset(T* ptr) {
 | 
			
		||||
        handle_.reset(ptr);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Get the deleter
 | 
			
		||||
     */
 | 
			
		||||
    const Deleter& get_deleter() const {
 | 
			
		||||
        return handle_.get_deleter();
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Get the cloner
 | 
			
		||||
     */
 | 
			
		||||
    const Cloner& get_cloner() const {
 | 
			
		||||
        return cloner_;
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Indicates whether this ClonablePtr instance is valid (not null)
 | 
			
		||||
     */
 | 
			
		||||
@@ -105,6 +124,10 @@ public:
 | 
			
		||||
        return static_cast<bool>(handle_);
 | 
			
		||||
    }
 | 
			
		||||
private:
 | 
			
		||||
    T* try_clone() const {
 | 
			
		||||
        return cloner_ ? cloner_(get()) : get();
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    std::unique_ptr<T, Deleter> handle_;
 | 
			
		||||
    Cloner cloner_;
 | 
			
		||||
};
 | 
			
		||||
 
 | 
			
		||||
@@ -51,6 +51,9 @@ namespace cppkafka {
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
class HeaderList {
 | 
			
		||||
public:
 | 
			
		||||
    template <typename OtherHeaderType>
 | 
			
		||||
    friend class HeaderList;
 | 
			
		||||
    
 | 
			
		||||
    using BufferType = typename HeaderType::ValueType;
 | 
			
		||||
    using Iterator = HeaderIterator<HeaderType>;
 | 
			
		||||
    /**
 | 
			
		||||
@@ -75,6 +78,16 @@ public:
 | 
			
		||||
     */
 | 
			
		||||
    explicit HeaderList(rd_kafka_headers_t* handle);
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Create a header list from another header list type
 | 
			
		||||
     * \param other The other list
 | 
			
		||||
     */
 | 
			
		||||
    template <typename OtherHeaderType>
 | 
			
		||||
    HeaderList(const HeaderList<OtherHeaderType>& other);
 | 
			
		||||
 | 
			
		||||
    template <typename OtherHeaderType>
 | 
			
		||||
    HeaderList(HeaderList<OtherHeaderType>&& other);
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Add a header to the list. This translates to rd_kafka_header_add().
 | 
			
		||||
     * \param header The header.
 | 
			
		||||
@@ -162,7 +175,6 @@ public:
 | 
			
		||||
private:
 | 
			
		||||
    struct NonOwningTag { };
 | 
			
		||||
    static void dummy_deleter(rd_kafka_headers_t*) {}
 | 
			
		||||
    static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast<rd_kafka_headers_t*>(handle); }
 | 
			
		||||
    
 | 
			
		||||
    using HandlePtr = ClonablePtr<rd_kafka_headers_t, decltype(&rd_kafka_headers_destroy),
 | 
			
		||||
                                  decltype(&rd_kafka_headers_copy)>;
 | 
			
		||||
@@ -205,18 +217,32 @@ HeaderList<HeaderType>::HeaderList()
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
HeaderList<HeaderType>::HeaderList(size_t reserve)
 | 
			
		||||
: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) {
 | 
			
		||||
 | 
			
		||||
    assert(reserve);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
 | 
			
		||||
: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy
 | 
			
		||||
 | 
			
		||||
    assert(handle);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
 | 
			
		||||
: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy.
 | 
			
		||||
: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy.
 | 
			
		||||
    assert(handle);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
template <typename OtherHeaderType>
 | 
			
		||||
HeaderList<HeaderType>::HeaderList(const HeaderList<OtherHeaderType>& other)
 | 
			
		||||
: handle_(other.handle_) {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
template <typename OtherHeaderType>
 | 
			
		||||
HeaderList<HeaderType>::HeaderList(HeaderList<OtherHeaderType>&& other)
 | 
			
		||||
: handle_(std::move(other.handle_)) {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -254,7 +280,7 @@ HeaderType HeaderList<HeaderType>::at(size_t index) const {
 | 
			
		||||
    if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
 | 
			
		||||
        throw Exception(error.to_string());
 | 
			
		||||
    }
 | 
			
		||||
    return HeaderType(name, BufferType(value, size));
 | 
			
		||||
    return HeaderType(name, BufferType(value, value + size));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
@@ -269,8 +295,7 @@ HeaderType HeaderList<HeaderType>::back() const {
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
size_t HeaderList<HeaderType>::size() const {
 | 
			
		||||
    assert(handle_);
 | 
			
		||||
    return rd_kafka_header_cnt(handle_.get());
 | 
			
		||||
    return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
@@ -281,18 +306,13 @@ bool HeaderList<HeaderType>::empty() const {
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
typename HeaderList<HeaderType>::Iterator
 | 
			
		||||
HeaderList<HeaderType>::begin() const {
 | 
			
		||||
    assert(handle_);
 | 
			
		||||
    if (empty()) {
 | 
			
		||||
        return end();
 | 
			
		||||
    }
 | 
			
		||||
    return Iterator(make_non_owning(handle_.get()), 0);
 | 
			
		||||
    return Iterator(*this, 0);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
typename HeaderList<HeaderType>::Iterator
 | 
			
		||||
HeaderList<HeaderType>::end() const {
 | 
			
		||||
    assert(handle_);
 | 
			
		||||
    return Iterator(make_non_owning(handle_.get()), size());
 | 
			
		||||
    return Iterator(*this, size());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename HeaderType>
 | 
			
		||||
 
 | 
			
		||||
@@ -151,10 +151,9 @@ public:
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
private:
 | 
			
		||||
    HeaderIterator(HeaderListType headers,
 | 
			
		||||
    HeaderIterator(const HeaderListType& headers,
 | 
			
		||||
                   size_t index)
 | 
			
		||||
    : header_list_(std::move(headers)),
 | 
			
		||||
      header_(index == header_list_.size() ? HeaderType() : header_list_.at(index)),
 | 
			
		||||
    : header_list_(headers),
 | 
			
		||||
      index_(index) {
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
@@ -169,7 +168,7 @@ private:
 | 
			
		||||
                                     other.get_value().get_size()));
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    HeaderListType header_list_;
 | 
			
		||||
    const HeaderListType& header_list_;
 | 
			
		||||
    HeaderType header_;
 | 
			
		||||
    size_t index_;
 | 
			
		||||
};
 | 
			
		||||
 
 | 
			
		||||
@@ -127,6 +127,20 @@ public:
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Sets the message's header list.
 | 
			
		||||
     * \note This calls rd_kafka_message_set_headers.
 | 
			
		||||
     */
 | 
			
		||||
    void set_header_list(const HeaderListType& headers) {
 | 
			
		||||
        assert(handle_);
 | 
			
		||||
        if (!headers) {
 | 
			
		||||
            return; //nothing to set
 | 
			
		||||
        }
 | 
			
		||||
        rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle());
 | 
			
		||||
        rd_kafka_message_set_headers(handle_.get(), handle_copy);
 | 
			
		||||
        header_list_ = HeaderListType::make_non_owning(handle_copy);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
     * \brief Gets the message's header list
 | 
			
		||||
     */
 | 
			
		||||
 
 | 
			
		||||
@@ -70,7 +70,12 @@ public:
 | 
			
		||||
     */
 | 
			
		||||
    template <typename OtherBufferType, typename OtherConcrete>
 | 
			
		||||
    BasicMessageBuilder(const BasicMessageBuilder<OtherBufferType, OtherConcrete>& rhs);
 | 
			
		||||
    template <typename OtherBufferType, typename OtherConcrete>
 | 
			
		||||
    BasicMessageBuilder(BasicMessageBuilder<OtherBufferType, OtherConcrete>&& rhs);
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Default copy and move constructors and assignment operators
 | 
			
		||||
     */
 | 
			
		||||
    BasicMessageBuilder(BasicMessageBuilder&&) = default;
 | 
			
		||||
    BasicMessageBuilder(const BasicMessageBuilder&) = default;
 | 
			
		||||
    BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default;
 | 
			
		||||
@@ -106,11 +111,13 @@ public:
 | 
			
		||||
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
    /**
 | 
			
		||||
     *  Add a header to the message
 | 
			
		||||
     *  Add a header(s) to the message
 | 
			
		||||
     *
 | 
			
		||||
     * \param header The header to be used
 | 
			
		||||
     */
 | 
			
		||||
    Concrete& header(const HeaderType& header);
 | 
			
		||||
    Concrete& headers(const HeaderListType& headers);
 | 
			
		||||
    Concrete& headers(HeaderListType&& headers);
 | 
			
		||||
#endif
 | 
			
		||||
    
 | 
			
		||||
    /**
 | 
			
		||||
@@ -208,10 +215,12 @@ public:
 | 
			
		||||
    Message::InternalPtr internal() const;
 | 
			
		||||
    Concrete& internal(Message::InternalPtr internal);
 | 
			
		||||
    
 | 
			
		||||
private:
 | 
			
		||||
protected:
 | 
			
		||||
    void construct_buffer(BufferType& lhs, const BufferType& rhs);
 | 
			
		||||
    Concrete& get_concrete();
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    Concrete& get_concrete();
 | 
			
		||||
    
 | 
			
		||||
    std::string topic_;
 | 
			
		||||
    int partition_{-1};
 | 
			
		||||
    BufferType key_;
 | 
			
		||||
@@ -234,23 +243,51 @@ template <typename T, typename C>
 | 
			
		||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
 | 
			
		||||
: topic_(message.get_topic()),
 | 
			
		||||
  key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
  //Here we must copy explicitly the Message headers since they are non-owning and this class
 | 
			
		||||
  //assumes full ownership. Otherwise we will be holding an invalid handle when Message goes
 | 
			
		||||
  //out of scope and rdkafka frees its resource.
 | 
			
		||||
  header_list_(message.get_header_list() ?
 | 
			
		||||
               HeaderListType(rd_kafka_headers_copy(message.get_header_list().get_handle())) : HeaderListType()), //copy headers
 | 
			
		||||
#endif
 | 
			
		||||
  payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
 | 
			
		||||
  timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
 | 
			
		||||
                                       std::chrono::milliseconds(0)),
 | 
			
		||||
  user_data_(message.get_user_data()),
 | 
			
		||||
  internal_(message.internal()) {
 | 
			
		||||
  
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
template <typename U, typename V>
 | 
			
		||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
 | 
			
		||||
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
 | 
			
		||||
: topic_(rhs.topic()),
 | 
			
		||||
  partition_(rhs.partition()),
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
  header_list_(rhs.header_list()), //copy headers
 | 
			
		||||
#endif
 | 
			
		||||
  timestamp_(rhs.timestamp()),
 | 
			
		||||
  user_data_(rhs.user_data()),
 | 
			
		||||
  internal_(rhs.internal()) {
 | 
			
		||||
    get_concrete().construct_buffer(key_, rhs.key());
 | 
			
		||||
    get_concrete().construct_buffer(payload_, rhs.payload());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
template <typename U, typename V>
 | 
			
		||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(BasicMessageBuilder<U, V>&& rhs)
 | 
			
		||||
: topic_(rhs.topic()),
 | 
			
		||||
  partition_(rhs.partition()),
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
  header_list_(std::move(header_list())), //assume header ownership
 | 
			
		||||
#endif
 | 
			
		||||
  timestamp_(rhs.timestamp()),
 | 
			
		||||
  user_data_(rhs.user_data()),
 | 
			
		||||
  internal_(rhs.internal()) {
 | 
			
		||||
    get_concrete().construct_buffer(key_, std::move(rhs.key()));
 | 
			
		||||
    get_concrete().construct_buffer(payload_, std::move(rhs.payload()));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
C& BasicMessageBuilder<T, C>::topic(std::string value) {
 | 
			
		||||
    topic_ = std::move(value);
 | 
			
		||||
@@ -284,6 +321,18 @@ C& BasicMessageBuilder<T, C>::header(const HeaderType& header) {
 | 
			
		||||
    header_list_.add(header);
 | 
			
		||||
    return get_concrete();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
C& BasicMessageBuilder<T, C>::headers(const HeaderListType& headers) {
 | 
			
		||||
    header_list_ = headers;
 | 
			
		||||
    return get_concrete();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
C& BasicMessageBuilder<T, C>::headers(HeaderListType&& headers) {
 | 
			
		||||
    header_list_ = std::move(headers);
 | 
			
		||||
    return get_concrete();
 | 
			
		||||
}
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
template <typename T, typename C>
 | 
			
		||||
@@ -410,7 +459,7 @@ C& BasicMessageBuilder<T, C>::get_concrete() {
 | 
			
		||||
class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
 | 
			
		||||
public:
 | 
			
		||||
    using Base = BasicMessageBuilder<Buffer, MessageBuilder>;
 | 
			
		||||
    using BasicMessageBuilder::BasicMessageBuilder;
 | 
			
		||||
    using BasicMessageBuilder<Buffer, MessageBuilder>::BasicMessageBuilder;
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
    using HeaderType = Base::HeaderType;
 | 
			
		||||
    using HeaderListType = Base::HeaderListType;
 | 
			
		||||
@@ -421,17 +470,22 @@ public:
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    template <typename T>
 | 
			
		||||
    void construct_buffer(Buffer& lhs, const T& rhs) {
 | 
			
		||||
        lhs = Buffer(rhs);
 | 
			
		||||
    void construct_buffer(Buffer& lhs, T&& rhs) {
 | 
			
		||||
        lhs = Buffer(std::forward<T>(rhs));
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    MessageBuilder clone() const {
 | 
			
		||||
        return std::move(MessageBuilder(topic()).
 | 
			
		||||
                             key(Buffer(key().get_data(), key().get_size())).
 | 
			
		||||
                             payload(Buffer(payload().get_data(), payload().get_size())).
 | 
			
		||||
                             timestamp(timestamp()).
 | 
			
		||||
                             user_data(user_data()).
 | 
			
		||||
                             internal(internal()));
 | 
			
		||||
        MessageBuilder builder(topic());
 | 
			
		||||
        builder.key(Buffer(key().get_data(), key().get_size())).
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
            headers(header_list()).
 | 
			
		||||
#endif
 | 
			
		||||
            payload(Buffer(payload().get_data(), payload().get_size())).
 | 
			
		||||
            timestamp(timestamp()).
 | 
			
		||||
            user_data(user_data()).
 | 
			
		||||
            internal(internal());
 | 
			
		||||
        return builder;
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@@ -441,7 +495,12 @@ public:
 | 
			
		||||
template <typename T>
 | 
			
		||||
class ConcreteMessageBuilder : public BasicMessageBuilder<T, ConcreteMessageBuilder<T>> {
 | 
			
		||||
public:
 | 
			
		||||
    using Base = BasicMessageBuilder<T, ConcreteMessageBuilder<T>>;
 | 
			
		||||
    using BasicMessageBuilder<T, ConcreteMessageBuilder<T>>::BasicMessageBuilder;
 | 
			
		||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
 | 
			
		||||
    using HeaderType = typename Base::HeaderType;
 | 
			
		||||
    using HeaderListType = typename Base::HeaderListType;
 | 
			
		||||
#endif
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
} // cppkafka
 | 
			
		||||
 
 | 
			
		||||
@@ -70,7 +70,7 @@ void Producer::produce(const MessageBuilder& builder) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Producer::produce(MessageBuilder&& builder) {
 | 
			
		||||
    do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers
 | 
			
		||||
    do_produce(builder, std::move(builder.header_list())); //move headers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void Producer::produce(const Message& message) {
 | 
			
		||||
 
 | 
			
		||||
@@ -39,7 +39,11 @@ TEST_CASE("construction", "[buffer]") {
 | 
			
		||||
    const string str_data = "Hello world!";
 | 
			
		||||
    const vector<uint8_t> data(str_data.begin(), str_data.end());
 | 
			
		||||
    const Buffer buffer(data);
 | 
			
		||||
    const Buffer buffer2(data.begin(), data.end());
 | 
			
		||||
    const Buffer buffer3(str_data.data(), str_data.data() + str_data.size());
 | 
			
		||||
    CHECK(str_data == buffer);
 | 
			
		||||
    CHECK(buffer == buffer2);
 | 
			
		||||
    CHECK(buffer == buffer3);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user