mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 11:07:56 +00:00
Header fixes
This commit is contained in:
@@ -93,7 +93,9 @@ public:
|
||||
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
|
||||
}
|
||||
|
||||
// Don't allow construction from temporary vectors
|
||||
/**
|
||||
* Don't allow construction from temporary vectors
|
||||
*/
|
||||
template <typename T>
|
||||
Buffer(std::vector<T>&& data) = delete;
|
||||
|
||||
@@ -108,7 +110,9 @@ public:
|
||||
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
|
||||
}
|
||||
|
||||
// Don't allow construction from temporary arrays
|
||||
/**
|
||||
* Don't allow construction from temporary arrays
|
||||
*/
|
||||
template <typename T, size_t N>
|
||||
Buffer(std::array<T, N>&& data) = delete;
|
||||
|
||||
@@ -120,7 +124,9 @@ public:
|
||||
*/
|
||||
Buffer(const std::string& data);
|
||||
|
||||
// Don't allow construction from temporary strings
|
||||
/**
|
||||
* Don't allow construction from temporary strings
|
||||
*/
|
||||
Buffer(std::string&&) = delete;
|
||||
|
||||
Buffer(const Buffer&) = delete;
|
||||
|
||||
@@ -60,9 +60,9 @@ public:
|
||||
* \param rhs The pointer to be copied
|
||||
*/
|
||||
ClonablePtr(const ClonablePtr& rhs)
|
||||
: handle_(rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
|
||||
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter())),
|
||||
cloner_(rhs.cloner_) {
|
||||
: handle_(rhs.get_cloner() ? std::unique_ptr<T, Deleter>(rhs.clone(), rhs.get_deleter()) :
|
||||
std::unique_ptr<T, Deleter>(rhs.get(), rhs.get_deleter())),
|
||||
cloner_(rhs.get_cloner()) {
|
||||
|
||||
}
|
||||
|
||||
@@ -72,11 +72,11 @@ public:
|
||||
* \param rhs The pointer to be copied
|
||||
*/
|
||||
ClonablePtr& operator=(const ClonablePtr& rhs) {
|
||||
if (this == &rhs) {
|
||||
return *this;
|
||||
if (this != &rhs) {
|
||||
handle_ = rhs.get_cloner() ? std::unique_ptr<T, Deleter>(rhs.clone(), rhs.get_deleter()) :
|
||||
std::unique_ptr<T, Deleter>(rhs.get(), rhs.get_deleter());
|
||||
cloner_ = rhs.get_cloner();
|
||||
}
|
||||
handle_ = rhs.cloner_ ? std::unique_ptr<T, Deleter>(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()) :
|
||||
std::unique_ptr<T, Deleter>(rhs.handle_.get(), rhs.handle_.get_deleter());
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -91,6 +91,13 @@ public:
|
||||
return handle_.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Clones the internal pointer using the specified cloner function.
|
||||
*/
|
||||
T* clone() const {
|
||||
return cloner_ ? cloner_(handle_.get()) : handle_.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Releases ownership of the internal pointer
|
||||
*/
|
||||
@@ -98,6 +105,27 @@ public:
|
||||
return handle_.release();
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Reset the internal pointer to a new one
|
||||
*/
|
||||
void reset(T* ptr) {
|
||||
handle_.reset(ptr);
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Get the deleter
|
||||
*/
|
||||
const Deleter& get_deleter() const {
|
||||
return handle_.get_deleter();
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Get the cloner
|
||||
*/
|
||||
const Cloner& get_cloner() const {
|
||||
return cloner_;
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Indicates whether this ClonablePtr instance is valid (not null)
|
||||
*/
|
||||
|
||||
@@ -146,6 +146,12 @@ public:
|
||||
*/
|
||||
rd_kafka_headers_t* get_handle() const;
|
||||
|
||||
/**
|
||||
* \brief Clone the underlying header list handle.
|
||||
* \return The handle.
|
||||
*/
|
||||
rd_kafka_headers_t* clone_handle() const;
|
||||
|
||||
/**
|
||||
* \brief Get the underlying header list handle and release its ownership.
|
||||
* \return The handle.
|
||||
@@ -159,10 +165,14 @@ public:
|
||||
*/
|
||||
explicit operator bool() const;
|
||||
|
||||
/**
|
||||
* \brief Indicates if this list owns the underlying handle or not.
|
||||
*/
|
||||
bool is_owning() const;
|
||||
|
||||
private:
|
||||
struct NonOwningTag { };
|
||||
static void dummy_deleter(rd_kafka_headers_t*) {}
|
||||
static rd_kafka_headers_t* dummy_cloner(const rd_kafka_headers_t* handle) { return const_cast<rd_kafka_headers_t*>(handle); }
|
||||
|
||||
using HandlePtr = ClonablePtr<rd_kafka_headers_t, decltype(&rd_kafka_headers_destroy),
|
||||
decltype(&rd_kafka_headers_copy)>;
|
||||
@@ -193,7 +203,7 @@ bool operator!=(const HeaderList<HeaderType>& lhs, const HeaderList<HeaderType>
|
||||
|
||||
template <typename HeaderType>
|
||||
HeaderList<HeaderType> HeaderList<HeaderType>::make_non_owning(rd_kafka_headers_t* handle) {
|
||||
return HeaderList(handle, NonOwningTag());
|
||||
return handle ? HeaderList(handle, NonOwningTag()) : HeaderList();
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
@@ -210,14 +220,16 @@ HeaderList<HeaderType>::HeaderList(size_t reserve)
|
||||
|
||||
template <typename HeaderType>
|
||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
|
||||
: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy
|
||||
|
||||
: handle_(handle,
|
||||
handle ? &rd_kafka_headers_destroy : nullptr,
|
||||
handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
|
||||
: handle_(HandlePtr(handle, &dummy_deleter, &dummy_cloner)) { //if we don't own the header list, we forward the handle on copy.
|
||||
|
||||
: handle_(handle,
|
||||
handle ? &dummy_deleter : nullptr,
|
||||
nullptr) { //if we don't own the header list, we forward the handle on copy.
|
||||
}
|
||||
|
||||
// Methods
|
||||
@@ -254,7 +266,8 @@ HeaderType HeaderList<HeaderType>::at(size_t index) const {
|
||||
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
|
||||
throw Exception(error.to_string());
|
||||
}
|
||||
return HeaderType(name, BufferType(value, size));
|
||||
//Use 'Buffer' to implicitly convert to 'BufferType'
|
||||
return HeaderType(name, Buffer(value, size));
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
@@ -269,8 +282,7 @@ HeaderType HeaderList<HeaderType>::back() const {
|
||||
|
||||
template <typename HeaderType>
|
||||
size_t HeaderList<HeaderType>::size() const {
|
||||
assert(handle_);
|
||||
return rd_kafka_header_cnt(handle_.get());
|
||||
return handle_ ? rd_kafka_header_cnt(handle_.get()) : 0;
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
@@ -281,7 +293,6 @@ bool HeaderList<HeaderType>::empty() const {
|
||||
template <typename HeaderType>
|
||||
typename HeaderList<HeaderType>::Iterator
|
||||
HeaderList<HeaderType>::begin() const {
|
||||
assert(handle_);
|
||||
if (empty()) {
|
||||
return end();
|
||||
}
|
||||
@@ -291,7 +302,6 @@ HeaderList<HeaderType>::begin() const {
|
||||
template <typename HeaderType>
|
||||
typename HeaderList<HeaderType>::Iterator
|
||||
HeaderList<HeaderType>::end() const {
|
||||
assert(handle_);
|
||||
return Iterator(make_non_owning(handle_.get()), size());
|
||||
}
|
||||
|
||||
@@ -300,6 +310,11 @@ rd_kafka_headers_t* HeaderList<HeaderType>::get_handle() const {
|
||||
return handle_.get();
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
rd_kafka_headers_t* HeaderList<HeaderType>::clone_handle() const {
|
||||
return handle_.clone();
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
rd_kafka_headers_t* HeaderList<HeaderType>::release_handle() {
|
||||
return handle_.release();
|
||||
@@ -310,6 +325,11 @@ HeaderList<HeaderType>::operator bool() const {
|
||||
return static_cast<bool>(handle_);
|
||||
}
|
||||
|
||||
template <typename HeaderType>
|
||||
bool HeaderList<HeaderType>::is_owning() const {
|
||||
return handle_.get_deleter() != &dummy_deleter;
|
||||
}
|
||||
|
||||
} //namespace cppkafka
|
||||
|
||||
#endif //RD_KAFKA_HEADERS_SUPPORT_VERSION
|
||||
|
||||
@@ -127,6 +127,27 @@ public:
|
||||
}
|
||||
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
/**
|
||||
* \brief Sets the message's header list.
|
||||
* \note After this call, the Message will take ownership of the header list.
|
||||
*/
|
||||
void set_header_list(const HeaderListType& headers) {
|
||||
assert(handle_);
|
||||
assert(!headers.is_owning());
|
||||
rd_kafka_message_set_headers(handle_.get(), headers.get_handle());
|
||||
header_list_ = HeaderListType::make_non_owning(headers.get_handle());
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Sets the message's header list.
|
||||
* \note After this call, the Message will take ownership of the header list.
|
||||
*/
|
||||
void set_header_list(HeaderListType&& headers) {
|
||||
assert(handle_);
|
||||
rd_kafka_message_set_headers(handle_.get(), headers.get_handle());
|
||||
header_list_ = HeaderListType::make_non_owning(headers.release_handle());
|
||||
}
|
||||
|
||||
/**
|
||||
* \brief Gets the message's header list
|
||||
*/
|
||||
|
||||
@@ -70,7 +70,12 @@ public:
|
||||
*/
|
||||
template <typename OtherBufferType, typename OtherConcrete>
|
||||
BasicMessageBuilder(const BasicMessageBuilder<OtherBufferType, OtherConcrete>& rhs);
|
||||
template <typename OtherBufferType, typename OtherConcrete>
|
||||
BasicMessageBuilder(BasicMessageBuilder<OtherBufferType, OtherConcrete>&& rhs);
|
||||
|
||||
/**
|
||||
* Default copy and move constructors and assignment operators
|
||||
*/
|
||||
BasicMessageBuilder(BasicMessageBuilder&&) = default;
|
||||
BasicMessageBuilder(const BasicMessageBuilder&) = default;
|
||||
BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default;
|
||||
@@ -106,11 +111,13 @@ public:
|
||||
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
/**
|
||||
* Add a header to the message
|
||||
* Add a header(s) to the message
|
||||
*
|
||||
* \param header The header to be used
|
||||
*/
|
||||
Concrete& header(const HeaderType& header);
|
||||
Concrete& headers(const HeaderListType& headers);
|
||||
Concrete& headers(HeaderListType&& headers);
|
||||
#endif
|
||||
|
||||
/**
|
||||
@@ -208,8 +215,10 @@ public:
|
||||
Message::InternalPtr internal() const;
|
||||
Concrete& internal(Message::InternalPtr internal);
|
||||
|
||||
private:
|
||||
protected:
|
||||
void construct_buffer(BufferType& lhs, const BufferType& rhs);
|
||||
|
||||
private:
|
||||
Concrete& get_concrete();
|
||||
|
||||
std::string topic_;
|
||||
@@ -234,23 +243,51 @@ template <typename T, typename C>
|
||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
|
||||
: topic_(message.get_topic()),
|
||||
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
header_list_(message.get_header_list() ?
|
||||
rd_kafka_headers_copy(message.get_header_list().get_handle()) :
|
||||
nullptr), //copy and assume ownership
|
||||
#endif
|
||||
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
|
||||
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
|
||||
std::chrono::milliseconds(0)),
|
||||
user_data_(message.get_user_data()),
|
||||
internal_(message.internal()) {
|
||||
|
||||
}
|
||||
|
||||
template <typename T, typename C>
|
||||
template <typename U, typename V>
|
||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
|
||||
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
|
||||
: topic_(rhs.topic()),
|
||||
partition_(rhs.partition()),
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
header_list_(rhs.header_list() ?
|
||||
rd_kafka_headers_copy(rhs.header_list().get_handle()) :
|
||||
nullptr), //copy headers
|
||||
#endif
|
||||
timestamp_(rhs.timestamp()),
|
||||
user_data_(rhs.user_data()),
|
||||
internal_(rhs.internal()) {
|
||||
get_concrete().construct_buffer(key_, rhs.key());
|
||||
get_concrete().construct_buffer(payload_, rhs.payload());
|
||||
}
|
||||
|
||||
template <typename T, typename C>
|
||||
template <typename U, typename V>
|
||||
BasicMessageBuilder<T, C>::BasicMessageBuilder(BasicMessageBuilder<U, V>&& rhs)
|
||||
: topic_(rhs.topic()),
|
||||
partition_(rhs.partition()),
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
header_list_(rhs.header_list().release_handle()), //assume ownership
|
||||
#endif
|
||||
timestamp_(rhs.timestamp()),
|
||||
user_data_(rhs.user_data()),
|
||||
internal_(rhs.internal()) {
|
||||
get_concrete().construct_buffer(key_, std::move(rhs.key()));
|
||||
get_concrete().construct_buffer(payload_, std::move(rhs.payload()));
|
||||
}
|
||||
|
||||
template <typename T, typename C>
|
||||
C& BasicMessageBuilder<T, C>::topic(std::string value) {
|
||||
topic_ = std::move(value);
|
||||
@@ -284,6 +321,18 @@ C& BasicMessageBuilder<T, C>::header(const HeaderType& header) {
|
||||
header_list_.add(header);
|
||||
return get_concrete();
|
||||
}
|
||||
|
||||
template <typename T, typename C>
|
||||
C& BasicMessageBuilder<T, C>::headers(const HeaderListType& headers) {
|
||||
header_list_ = headers;
|
||||
return get_concrete();
|
||||
}
|
||||
|
||||
template <typename T, typename C>
|
||||
C& BasicMessageBuilder<T, C>::headers(HeaderListType&& headers) {
|
||||
header_list_ = std::move(headers);
|
||||
return get_concrete();
|
||||
}
|
||||
#endif
|
||||
|
||||
template <typename T, typename C>
|
||||
@@ -410,7 +459,7 @@ C& BasicMessageBuilder<T, C>::get_concrete() {
|
||||
class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
|
||||
public:
|
||||
using Base = BasicMessageBuilder<Buffer, MessageBuilder>;
|
||||
using BasicMessageBuilder::BasicMessageBuilder;
|
||||
using BasicMessageBuilder<Buffer, MessageBuilder>::BasicMessageBuilder;
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
using HeaderType = Base::HeaderType;
|
||||
using HeaderListType = Base::HeaderListType;
|
||||
@@ -421,13 +470,17 @@ public:
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void construct_buffer(Buffer& lhs, const T& rhs) {
|
||||
lhs = Buffer(rhs);
|
||||
void construct_buffer(Buffer& lhs, T&& rhs) {
|
||||
lhs = Buffer(std::forward<T>(rhs));
|
||||
}
|
||||
|
||||
|
||||
MessageBuilder clone() const {
|
||||
return std::move(MessageBuilder(topic()).
|
||||
key(Buffer(key().get_data(), key().get_size())).
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
headers(header_list()).
|
||||
#endif
|
||||
payload(Buffer(payload().get_data(), payload().get_size())).
|
||||
timestamp(timestamp()).
|
||||
user_data(user_data()).
|
||||
@@ -441,7 +494,12 @@ public:
|
||||
template <typename T>
|
||||
class ConcreteMessageBuilder : public BasicMessageBuilder<T, ConcreteMessageBuilder<T>> {
|
||||
public:
|
||||
using Base = BasicMessageBuilder<T, ConcreteMessageBuilder<T>>;
|
||||
using BasicMessageBuilder<T, ConcreteMessageBuilder<T>>::BasicMessageBuilder;
|
||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||
using HeaderType = typename Base::HeaderType;
|
||||
using HeaderListType = typename Base::HeaderListType;
|
||||
#endif
|
||||
};
|
||||
|
||||
} // cppkafka
|
||||
|
||||
Reference in New Issue
Block a user