mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 11:07:56 +00:00
* Added asserts when building a HeaderList and removed checks for handle
validity. * Removed explicit move semantic when cloning a MessageBuilder. * Renamed clone() to try_clone() in ClonablePtr class.
This commit is contained in:
@@ -60,7 +60,7 @@ public:
|
|||||||
* \param rhs The pointer to be copied
|
* \param rhs The pointer to be copied
|
||||||
*/
|
*/
|
||||||
ClonablePtr(const ClonablePtr& rhs)
|
ClonablePtr(const ClonablePtr& rhs)
|
||||||
: handle_(std::unique_ptr<T, Deleter>(rhs.clone(), rhs.get_deleter())),
|
: handle_(std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter())),
|
||||||
cloner_(rhs.get_cloner()) {
|
cloner_(rhs.get_cloner()) {
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -72,7 +72,7 @@ public:
|
|||||||
*/
|
*/
|
||||||
ClonablePtr& operator=(const ClonablePtr& rhs) {
|
ClonablePtr& operator=(const ClonablePtr& rhs) {
|
||||||
if (this != &rhs) {
|
if (this != &rhs) {
|
||||||
handle_ = std::unique_ptr<T, Deleter>(rhs.clone(), rhs.get_deleter());
|
handle_ = std::unique_ptr<T, Deleter>(rhs.try_clone(), rhs.get_deleter());
|
||||||
cloner_ = rhs.get_cloner();
|
cloner_ = rhs.get_cloner();
|
||||||
}
|
}
|
||||||
return *this;
|
return *this;
|
||||||
@@ -124,10 +124,7 @@ public:
|
|||||||
return static_cast<bool>(handle_);
|
return static_cast<bool>(handle_);
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
/**
|
T* try_clone() const {
|
||||||
* \brief Clones the internal pointer using the specified cloner function.
|
|
||||||
*/
|
|
||||||
T* clone() const {
|
|
||||||
return cloner_ ? cloner_(get()) : get();
|
return cloner_ ? cloner_(get()) : get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -192,7 +192,7 @@ bool operator!=(const HeaderList<HeaderType>& lhs, const HeaderList<HeaderType>
|
|||||||
|
|
||||||
template <typename HeaderType>
|
template <typename HeaderType>
|
||||||
HeaderList<HeaderType> HeaderList<HeaderType>::make_non_owning(rd_kafka_headers_t* handle) {
|
HeaderList<HeaderType> HeaderList<HeaderType>::make_non_owning(rd_kafka_headers_t* handle) {
|
||||||
return handle ? HeaderList(handle, NonOwningTag()) : HeaderList();
|
return HeaderList(handle, NonOwningTag());
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HeaderType>
|
template <typename HeaderType>
|
||||||
@@ -204,21 +204,19 @@ HeaderList<HeaderType>::HeaderList()
|
|||||||
template <typename HeaderType>
|
template <typename HeaderType>
|
||||||
HeaderList<HeaderType>::HeaderList(size_t reserve)
|
HeaderList<HeaderType>::HeaderList(size_t reserve)
|
||||||
: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) {
|
: handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) {
|
||||||
|
assert(reserve);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HeaderType>
|
template <typename HeaderType>
|
||||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
|
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle)
|
||||||
: handle_(handle,
|
: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy
|
||||||
handle ? &rd_kafka_headers_destroy : nullptr,
|
assert(handle);
|
||||||
handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename HeaderType>
|
template <typename HeaderType>
|
||||||
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
|
HeaderList<HeaderType>::HeaderList(rd_kafka_headers_t* handle, NonOwningTag)
|
||||||
: handle_(handle,
|
: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy.
|
||||||
handle ? &dummy_deleter : nullptr,
|
assert(handle);
|
||||||
nullptr) { //if we don't own the header list, we forward the handle on copy.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Methods
|
// Methods
|
||||||
|
|||||||
@@ -133,6 +133,9 @@ public:
|
|||||||
*/
|
*/
|
||||||
void set_header_list(const HeaderListType& headers) {
|
void set_header_list(const HeaderListType& headers) {
|
||||||
assert(handle_);
|
assert(handle_);
|
||||||
|
if (!headers) {
|
||||||
|
return; //nothing to set
|
||||||
|
}
|
||||||
rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle());
|
rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle());
|
||||||
rd_kafka_message_set_headers(handle_.get(), handle_copy);
|
rd_kafka_message_set_headers(handle_.get(), handle_copy);
|
||||||
header_list_ = HeaderListType::make_non_owning(handle_copy);
|
header_list_ = HeaderListType::make_non_owning(handle_copy);
|
||||||
|
|||||||
@@ -245,8 +245,7 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
|
|||||||
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
|
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
|
||||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||||
header_list_(message.get_header_list() ?
|
header_list_(message.get_header_list() ?
|
||||||
rd_kafka_headers_copy(message.get_header_list().get_handle()) :
|
rd_kafka_headers_copy(message.get_header_list().get_handle()) : nullptr), //copy headers
|
||||||
nullptr), //copy and assume ownership
|
|
||||||
#endif
|
#endif
|
||||||
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
|
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
|
||||||
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
|
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
|
||||||
@@ -263,8 +262,7 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>&
|
|||||||
partition_(rhs.partition()),
|
partition_(rhs.partition()),
|
||||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||||
header_list_(rhs.header_list() ?
|
header_list_(rhs.header_list() ?
|
||||||
rd_kafka_headers_copy(rhs.header_list().get_handle()) :
|
rd_kafka_headers_copy(rhs.header_list().get_handle()) : nullptr), //copy headers
|
||||||
nullptr), //copy headers
|
|
||||||
#endif
|
#endif
|
||||||
timestamp_(rhs.timestamp()),
|
timestamp_(rhs.timestamp()),
|
||||||
user_data_(rhs.user_data()),
|
user_data_(rhs.user_data()),
|
||||||
@@ -279,7 +277,8 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(BasicMessageBuilder<U, V>&& rhs)
|
|||||||
: topic_(rhs.topic()),
|
: topic_(rhs.topic()),
|
||||||
partition_(rhs.partition()),
|
partition_(rhs.partition()),
|
||||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||||
header_list_(rhs.header_list().release_handle()), //assume ownership
|
header_list_(rhs.header_list() ?
|
||||||
|
rhs.header_list().release_handle() : nullptr), //assume header ownership
|
||||||
#endif
|
#endif
|
||||||
timestamp_(rhs.timestamp()),
|
timestamp_(rhs.timestamp()),
|
||||||
user_data_(rhs.user_data()),
|
user_data_(rhs.user_data()),
|
||||||
@@ -476,15 +475,16 @@ public:
|
|||||||
|
|
||||||
|
|
||||||
MessageBuilder clone() const {
|
MessageBuilder clone() const {
|
||||||
return std::move(MessageBuilder(topic()).
|
MessageBuilder builder(topic());
|
||||||
key(Buffer(key().get_data(), key().get_size())).
|
builder.key(Buffer(key().get_data(), key().get_size())).
|
||||||
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)
|
||||||
headers(header_list()).
|
headers(header_list()).
|
||||||
#endif
|
#endif
|
||||||
payload(Buffer(payload().get_data(), payload().get_size())).
|
payload(Buffer(payload().get_data(), payload().get_size())).
|
||||||
timestamp(timestamp()).
|
timestamp(timestamp()).
|
||||||
user_data(user_data()).
|
user_data(user_data()).
|
||||||
internal(internal()));
|
internal(internal());
|
||||||
|
return builder;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ void Producer::produce(const MessageBuilder& builder) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Producer::produce(MessageBuilder&& builder) {
|
void Producer::produce(MessageBuilder&& builder) {
|
||||||
do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers
|
do_produce(builder, std::move(builder.header_list())); //move headers
|
||||||
}
|
}
|
||||||
|
|
||||||
void Producer::produce(const Message& message) {
|
void Producer::produce(const Message& message) {
|
||||||
|
|||||||
Reference in New Issue
Block a user