From cf6ac1675b5ab69147e4f60665cb66b60f89c91b Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Mon, 27 Jun 2016 21:43:20 -0700 Subject: [PATCH] Add Error class --- examples/kafka_consumer.cpp | 4 +- include/cppkafka/configuration.h | 3 +- include/cppkafka/consumer.h | 5 +- include/cppkafka/error.h | 85 ++++++++++++++++++++++++++++++++ include/cppkafka/exceptions.h | 7 +-- include/cppkafka/message.h | 13 +---- include/cppkafka/metadata.h | 6 ++- src/CMakeLists.txt | 1 + src/error.cpp | 67 +++++++++++++++++++++++++ src/exceptions.cpp | 8 +-- src/message.cpp | 10 +--- src/metadata.cpp | 5 +- tests/consumer_test.cpp | 6 +-- tests/producer_test.cpp | 13 ++--- 14 files changed, 188 insertions(+), 45 deletions(-) create mode 100644 include/cppkafka/error.h create mode 100644 src/error.cpp diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp index 59bf2b0..ba8bf39 100644 --- a/examples/kafka_consumer.cpp +++ b/examples/kafka_consumer.cpp @@ -83,9 +83,9 @@ int main(int argc, char* argv[]) { Message msg = consumer.poll(); if (msg) { // If we managed to get a message - if (msg.has_error()) { + if (msg.get_error()) { if (msg.get_error() != RD_KAFKA_RESP_ERR__PARTITION_EOF) { - cout << "[+] Received error notification: " << msg.get_error_string() << endl; + cout << "[+] Received error notification: " << msg.get_error() << endl; } } else { diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index df53254..2c904de 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -46,6 +46,7 @@ namespace cppkafka { class Message; +class Error; class Producer; class Consumer; class KafkaHandleBase; @@ -61,7 +62,7 @@ class KafkaHandleBase; class CPPKAFKA_API Configuration : public ConfigurationBase { public: using DeliveryReportCallback = std::function; - using OffsetCommitCallback = std::function; using ErrorCallback = std::function; diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index b364b17..4748f96 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -37,6 +37,7 @@ #include "kafka_handle_base.h" #include "message.h" #include "macros.h" +#include "error.h" namespace cppkafka { @@ -82,7 +83,7 @@ class TopicConfiguration; * Message msg = consumer.poll(); * if (msg) { * // It's a valid message! - * if (!msg.has_error()) { + * if (!msg.get_error()) { * // It's an actual message. Get the payload and print it to stdout * cout << msg.get_payload().as_string() << endl; * } @@ -98,7 +99,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { public: using AssignmentCallback = std::function; using RevocationCallback = std::function; - using RebalanceErrorCallback = std::function; + using RebalanceErrorCallback = std::function; /** * \brief Creates an instance of a consumer. diff --git a/include/cppkafka/error.h b/include/cppkafka/error.h new file mode 100644 index 0000000..1165dba --- /dev/null +++ b/include/cppkafka/error.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_ERROR_H +#define CPPKAFKA_ERROR_H + +#include +#include +#include +#include "macros.h" + +namespace cppkafka { + +/** + * Abstraction for an rdkafka error + */ +class CPPKAFKA_API Error { +public: + /** + * Constructs an error object + */ + Error(rd_kafka_resp_err_t error); + + /** + * Gets the error value + */ + rd_kafka_resp_err_t get_error() const; + + /** + * Gets the error string + */ + std::string to_string() const; + + /** + * Checks whether this error contains an actual error (and not RD_KAFKA_RESP_ERR_NO_ERROR) + */ + explicit operator bool() const; + + /** + * Compares this error for equality + */ + bool operator==(const Error& rhs) const; + + /** + * Compares this error for inequality + */ + bool operator!=(const Error& rhs) const; + + /** + * Writes this error's string representation into a stream + */ + friend std::ostream& operator<<(std::ostream& output, const Error& rhs); +private: + rd_kafka_resp_err_t error_; +}; + +} // cppkafka + +#endif // CPPKAFKA_ERROR_H diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index 5fffad3..bd52865 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -34,6 +34,7 @@ #include #include #include "macros.h" +#include "error.h" namespace cppkafka { @@ -78,11 +79,11 @@ public: */ class CPPKAFKA_API HandleException : public Exception { public: - HandleException(rd_kafka_resp_err_t error_code); + HandleException(Error error); - rd_kafka_resp_err_t get_error_code() const; + Error get_error() const; private: - rd_kafka_resp_err_t error_code_; + Error error_; }; } // cppkafka diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index c910f5e..b37ef44 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -37,6 +37,7 @@ #include "buffer.h" #include "topic.h" #include "macros.h" +#include "error.h" namespace cppkafka { @@ -78,20 +79,10 @@ public: Message& operator=(const Message&) = delete; Message& operator=(Message&& rhs) = default; - /** - * Indicates whether this is a message carrying an error notification - */ - bool has_error() const; - /** * Gets the error attribute */ - rd_kafka_resp_err_t get_error() const; - - /** - * Gets the error as a string - */ - std::string get_error_string() const; + Error get_error() const; /** * Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF diff --git a/include/cppkafka/metadata.h b/include/cppkafka/metadata.h index 7aca711..05a948d 100644 --- a/include/cppkafka/metadata.h +++ b/include/cppkafka/metadata.h @@ -40,6 +40,8 @@ namespace cppkafka { +class Error; + /** * Represents the metadata for a partition */ @@ -55,7 +57,7 @@ public: /** * Gets the partition error as reported by the broker */ - rd_kafka_resp_err_t get_error() const; + Error get_error() const; /** * Gets the leader broker id @@ -94,7 +96,7 @@ public: /** * Gets the topic error */ - rd_kafka_resp_err_t get_error() const; + Error get_error() const; /** * Gets the partitions' metadata diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b3319f5..cc45616 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,6 +10,7 @@ set(SOURCES topic_partition.cpp topic_partition_list.cpp metadata.cpp + error.cpp kafka_handle_base.cpp producer.cpp diff --git a/src/error.cpp b/src/error.cpp new file mode 100644 index 0000000..d2479e9 --- /dev/null +++ b/src/error.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include "error.h" + +using std::string; +using std::ostream; + +namespace cppkafka { + +Error::Error(rd_kafka_resp_err_t error) +: error_(error) { + +} + +rd_kafka_resp_err_t Error::get_error() const { + return error_; +} + +string Error::to_string() const { + return rd_kafka_err2str(error_); +} + +Error::operator bool() const { + return error_ != RD_KAFKA_RESP_ERR_NO_ERROR; +} + +bool Error::operator==(const Error& rhs) const { + return error_ == rhs.error_; +} + +bool Error::operator!=(const Error& rhs) const { + return !(*this == rhs); +} + +ostream& operator<<(ostream& output, const Error& rhs) { + return output << rhs.to_string(); +} + +} // cppkafka diff --git a/src/exceptions.cpp b/src/exceptions.cpp index 7603eb0..616675e 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -67,13 +67,13 @@ InvalidConfigOptionType::InvalidConfigOptionType(const string& config_name, cons // HandleException -HandleException::HandleException(rd_kafka_resp_err_t error_code) -: Exception(rd_kafka_err2str(error_code)), error_code_(error_code) { +HandleException::HandleException(Error error) +: Exception(error.to_string()), error_(error) { } -rd_kafka_resp_err_t HandleException::get_error_code() const { - return error_code_; +Error HandleException::get_error() const { + return error_; } } // cppkafka diff --git a/src/message.cpp b/src/message.cpp index bebd0ba..ecc71be 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -66,18 +66,10 @@ Message::Message(HandlePtr handle) } -bool Message::has_error() const { - return get_error() != RD_KAFKA_RESP_ERR_NO_ERROR; -} - -rd_kafka_resp_err_t Message::get_error() const { +Error Message::get_error() const { return handle_->err; } -string Message::get_error_string() const { - return rd_kafka_err2str(handle_->err); -} - bool Message::is_eof() const { return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF; } diff --git a/src/metadata.cpp b/src/metadata.cpp index a3e0526..4d98dd4 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -28,6 +28,7 @@ */ #include "metadata.h" +#include "error.h" using std::string; using std::vector; @@ -51,7 +52,7 @@ uint32_t PartitionMetadata::get_id() const { return id_; } -rd_kafka_resp_err_t PartitionMetadata::get_error() const { +Error PartitionMetadata::get_error() const { return error_; } @@ -80,7 +81,7 @@ const string& TopicMetadata::get_topic() const { return topic_; } -rd_kafka_resp_err_t TopicMetadata::get_error() const { +Error TopicMetadata::get_error() const { return error_; } diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 0741644..558f144 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -45,7 +45,7 @@ public: cond.notify_one(); } } - else if (msg && msg.get_error() == 0 && number_eofs == partitions) { + else if (msg && !msg.get_error() && number_eofs == partitions) { messages_.push_back(move(msg)); } } @@ -198,10 +198,10 @@ TEST_F(ConsumerTest, OffsetCommit) { // Create a consumer and subscribe to the topic Configuration config = make_consumer_config("offset_commit"); - config.set_offset_commit_callback([&](Consumer&, rd_kafka_resp_err_t error, + config.set_offset_commit_callback([&](Consumer&, Error error, const TopicPartitionList& topic_partitions) { offset_commit_called = true; - EXPECT_EQ(0, error); + EXPECT_FALSE(error); ASSERT_EQ(1, topic_partitions.size()); EXPECT_EQ(KAFKA_TOPIC, topic_partitions[0].get_topic()); EXPECT_EQ(0, topic_partitions[0].get_partition()); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 93f2bf5..98074bf 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -37,7 +37,8 @@ public: auto start = system_clock::now(); while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { Message msg = consumer_.poll(); - if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + if (msg && number_eofs != partitions && + msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { number_eofs++; if (number_eofs == partitions) { lock_guard _(mtx); @@ -45,7 +46,7 @@ public: cond.notify_one(); } } - else if (msg && msg.get_error() == 0) { + else if (msg && !msg.get_error()) { messages_.push_back(move(msg)); } } @@ -124,7 +125,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { EXPECT_FALSE(message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); - EXPECT_EQ(0, message.get_error()); + EXPECT_FALSE(message.get_error()); int64_t low; int64_t high; @@ -155,7 +156,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) { EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); - EXPECT_EQ(0, message.get_error()); + EXPECT_FALSE(message.get_error()); // NOTE: if this line fails, then you're using kafka 0.10+ and that's okay EXPECT_FALSE(message.get_timestamp()); } @@ -186,7 +187,7 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { for (const auto& message : messages) { EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(1, payloads.erase(message.get_payload())); - EXPECT_EQ(0, message.get_error()); + EXPECT_FALSE(message.get_error()); EXPECT_FALSE(message.get_key()); EXPECT_GE(message.get_partition(), 0); EXPECT_LT(message.get_partition(), 3); @@ -233,7 +234,7 @@ TEST_F(ProducerTest, Callbacks) { EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); - EXPECT_EQ(0, message.get_error()); + EXPECT_FALSE(message.get_error()); EXPECT_TRUE(deliver_report_called); }