From 93e066a1c100102c769b1d6eedbae1a65a6d5ba5 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 4 Dec 2018 11:12:28 -0500 Subject: [PATCH] * Added asserts when building a HeaderList and removed checks for handle validity. * Removed explicit move semantic when cloning a MessageBuilder. * Renamed clone() to try_clone() in ClonablePtr class. --- include/cppkafka/clonable_ptr.h | 9 +++------ include/cppkafka/header_list.h | 14 ++++++-------- include/cppkafka/message.h | 3 +++ include/cppkafka/message_builder.h | 16 ++++++++-------- src/producer.cpp | 2 +- 5 files changed, 21 insertions(+), 23 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 7b25b17..13d7b20 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -60,7 +60,7 @@ public: * \param rhs The pointer to be copied */ ClonablePtr(const ClonablePtr& rhs) - : handle_(std::unique_ptr(rhs.clone(), rhs.get_deleter())), + : handle_(std::unique_ptr(rhs.try_clone(), rhs.get_deleter())), cloner_(rhs.get_cloner()) { } @@ -72,7 +72,7 @@ public: */ ClonablePtr& operator=(const ClonablePtr& rhs) { if (this != &rhs) { - handle_ = std::unique_ptr(rhs.clone(), rhs.get_deleter()); + handle_ = std::unique_ptr(rhs.try_clone(), rhs.get_deleter()); cloner_ = rhs.get_cloner(); } return *this; @@ -124,10 +124,7 @@ public: return static_cast(handle_); } private: - /** - * \brief Clones the internal pointer using the specified cloner function. - */ - T* clone() const { + T* try_clone() const { return cloner_ ? cloner_(get()) : get(); } diff --git a/include/cppkafka/header_list.h b/include/cppkafka/header_list.h index 313996f..b0d3ab4 100644 --- a/include/cppkafka/header_list.h +++ b/include/cppkafka/header_list.h @@ -192,7 +192,7 @@ bool operator!=(const HeaderList& lhs, const HeaderList template HeaderList HeaderList::make_non_owning(rd_kafka_headers_t* handle) { - return handle ? HeaderList(handle, NonOwningTag()) : HeaderList(); + return HeaderList(handle, NonOwningTag()); } template @@ -204,21 +204,19 @@ HeaderList::HeaderList() template HeaderList::HeaderList(size_t reserve) : handle_(rd_kafka_headers_new(reserve), &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { - + assert(reserve); } template HeaderList::HeaderList(rd_kafka_headers_t* handle) -: handle_(handle, - handle ? &rd_kafka_headers_destroy : nullptr, - handle ? &rd_kafka_headers_copy : nullptr) { //if we own the header list, we clone it on copy +: handle_(handle, &rd_kafka_headers_destroy, &rd_kafka_headers_copy) { //if we own the header list, we clone it on copy + assert(handle); } template HeaderList::HeaderList(rd_kafka_headers_t* handle, NonOwningTag) -: handle_(handle, - handle ? &dummy_deleter : nullptr, - nullptr) { //if we don't own the header list, we forward the handle on copy. +: handle_(handle, &dummy_deleter, nullptr) { //if we don't own the header list, we forward the handle on copy. + assert(handle); } // Methods diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 22a2880..f8101a0 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -133,6 +133,9 @@ public: */ void set_header_list(const HeaderListType& headers) { assert(handle_); + if (!headers) { + return; //nothing to set + } rd_kafka_headers_t* handle_copy = rd_kafka_headers_copy(headers.get_handle()); rd_kafka_message_set_headers(handle_.get(), handle_copy); header_list_ = HeaderListType::make_non_owning(handle_copy); diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index c889912..981e120 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -245,8 +245,7 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) key_(Buffer(message.get_key().get_data(), message.get_key().get_size())), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(message.get_header_list() ? - rd_kafka_headers_copy(message.get_header_list().get_handle()) : - nullptr), //copy and assume ownership + rd_kafka_headers_copy(message.get_header_list().get_handle()) : nullptr), //copy headers #endif payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : @@ -263,8 +262,7 @@ BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) header_list_(rhs.header_list() ? - rd_kafka_headers_copy(rhs.header_list().get_handle()) : - nullptr), //copy headers + rd_kafka_headers_copy(rhs.header_list().get_handle()) : nullptr), //copy headers #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -279,7 +277,8 @@ BasicMessageBuilder::BasicMessageBuilder(BasicMessageBuilder&& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) - header_list_(rhs.header_list().release_handle()), //assume ownership + header_list_(rhs.header_list() ? + rhs.header_list().release_handle() : nullptr), //assume header ownership #endif timestamp_(rhs.timestamp()), user_data_(rhs.user_data()), @@ -476,15 +475,16 @@ public: MessageBuilder clone() const { - return std::move(MessageBuilder(topic()). - key(Buffer(key().get_data(), key().get_size())). + MessageBuilder builder(topic()); + builder.key(Buffer(key().get_data(), key().get_size())). #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) headers(header_list()). #endif payload(Buffer(payload().get_data(), payload().get_size())). timestamp(timestamp()). user_data(user_data()). - internal(internal())); + internal(internal()); + return builder; } }; diff --git a/src/producer.cpp b/src/producer.cpp index 815c75b..af138d0 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -70,7 +70,7 @@ void Producer::produce(const MessageBuilder& builder) { } void Producer::produce(MessageBuilder&& builder) { - do_produce(builder, MessageBuilder::HeaderListType(builder.header_list().release_handle())); //move headers + do_produce(builder, std::move(builder.header_list())); //move headers } void Producer::produce(const Message& message) {