Add Error class

This commit is contained in:
Matias Fontanini
2016-06-27 21:43:20 -07:00
parent 6cfe01afb1
commit cf6ac1675b
14 changed files with 188 additions and 45 deletions

View File

@@ -83,9 +83,9 @@ int main(int argc, char* argv[]) {
Message msg = consumer.poll();
if (msg) {
// If we managed to get a message
if (msg.has_error()) {
if (msg.get_error()) {
if (msg.get_error() != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
cout << "[+] Received error notification: " << msg.get_error_string() << endl;
cout << "[+] Received error notification: " << msg.get_error() << endl;
}
}
else {

View File

@@ -46,6 +46,7 @@
namespace cppkafka {
class Message;
class Error;
class Producer;
class Consumer;
class KafkaHandleBase;
@@ -61,7 +62,7 @@ class KafkaHandleBase;
class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
public:
using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
using OffsetCommitCallback = std::function<void(Consumer& consumer, rd_kafka_resp_err_t,
using OffsetCommitCallback = std::function<void(Consumer& consumer, Error,
const TopicPartitionList& topic_partitions)>;
using ErrorCallback = std::function<void(KafkaHandleBase& handle, int error,
const std::string& reason)>;

View File

@@ -37,6 +37,7 @@
#include "kafka_handle_base.h"
#include "message.h"
#include "macros.h"
#include "error.h"
namespace cppkafka {
@@ -82,7 +83,7 @@ class TopicConfiguration;
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* if (!msg.has_error()) {
* if (!msg.get_error()) {
* // It's an actual message. Get the payload and print it to stdout
* cout << msg.get_payload().as_string() << endl;
* }
@@ -98,7 +99,7 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
public:
using AssignmentCallback = std::function<void(TopicPartitionList&)>;
using RevocationCallback = std::function<void(const TopicPartitionList&)>;
using RebalanceErrorCallback = std::function<void(rd_kafka_resp_err_t)>;
using RebalanceErrorCallback = std::function<void(Error)>;
/**
* \brief Creates an instance of a consumer.

85
include/cppkafka/error.h Normal file
View File

@@ -0,0 +1,85 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ERROR_H
#define CPPKAFKA_ERROR_H
#include <string>
#include <iosfwd>
#include <librdkafka/rdkafka.h>
#include "macros.h"
namespace cppkafka {
/**
* Abstraction for an rdkafka error
*/
class CPPKAFKA_API Error {
public:
/**
* Constructs an error object
*/
Error(rd_kafka_resp_err_t error);
/**
* Gets the error value
*/
rd_kafka_resp_err_t get_error() const;
/**
* Gets the error string
*/
std::string to_string() const;
/**
* Checks whether this error contains an actual error (and not RD_KAFKA_RESP_ERR_NO_ERROR)
*/
explicit operator bool() const;
/**
* Compares this error for equality
*/
bool operator==(const Error& rhs) const;
/**
* Compares this error for inequality
*/
bool operator!=(const Error& rhs) const;
/**
* Writes this error's string representation into a stream
*/
friend std::ostream& operator<<(std::ostream& output, const Error& rhs);
private:
rd_kafka_resp_err_t error_;
};
} // cppkafka
#endif // CPPKAFKA_ERROR_H

View File

@@ -34,6 +34,7 @@
#include <string>
#include <librdkafka/rdkafka.h>
#include "macros.h"
#include "error.h"
namespace cppkafka {
@@ -78,11 +79,11 @@ public:
*/
class CPPKAFKA_API HandleException : public Exception {
public:
HandleException(rd_kafka_resp_err_t error_code);
HandleException(Error error);
rd_kafka_resp_err_t get_error_code() const;
Error get_error() const;
private:
rd_kafka_resp_err_t error_code_;
Error error_;
};
} // cppkafka

View File

@@ -37,6 +37,7 @@
#include "buffer.h"
#include "topic.h"
#include "macros.h"
#include "error.h"
namespace cppkafka {
@@ -78,20 +79,10 @@ public:
Message& operator=(const Message&) = delete;
Message& operator=(Message&& rhs) = default;
/**
* Indicates whether this is a message carrying an error notification
*/
bool has_error() const;
/**
* Gets the error attribute
*/
rd_kafka_resp_err_t get_error() const;
/**
* Gets the error as a string
*/
std::string get_error_string() const;
Error get_error() const;
/**
* Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF

View File

@@ -40,6 +40,8 @@
namespace cppkafka {
class Error;
/**
* Represents the metadata for a partition
*/
@@ -55,7 +57,7 @@ public:
/**
* Gets the partition error as reported by the broker
*/
rd_kafka_resp_err_t get_error() const;
Error get_error() const;
/**
* Gets the leader broker id
@@ -94,7 +96,7 @@ public:
/**
* Gets the topic error
*/
rd_kafka_resp_err_t get_error() const;
Error get_error() const;
/**
* Gets the partitions' metadata

View File

@@ -10,6 +10,7 @@ set(SOURCES
topic_partition.cpp
topic_partition_list.cpp
metadata.cpp
error.cpp
kafka_handle_base.cpp
producer.cpp

67
src/error.cpp Normal file
View File

@@ -0,0 +1,67 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <iostream>
#include "error.h"
using std::string;
using std::ostream;
namespace cppkafka {
Error::Error(rd_kafka_resp_err_t error)
: error_(error) {
}
rd_kafka_resp_err_t Error::get_error() const {
return error_;
}
string Error::to_string() const {
return rd_kafka_err2str(error_);
}
Error::operator bool() const {
return error_ != RD_KAFKA_RESP_ERR_NO_ERROR;
}
bool Error::operator==(const Error& rhs) const {
return error_ == rhs.error_;
}
bool Error::operator!=(const Error& rhs) const {
return !(*this == rhs);
}
ostream& operator<<(ostream& output, const Error& rhs) {
return output << rhs.to_string();
}
} // cppkafka

View File

@@ -67,13 +67,13 @@ InvalidConfigOptionType::InvalidConfigOptionType(const string& config_name, cons
// HandleException
HandleException::HandleException(rd_kafka_resp_err_t error_code)
: Exception(rd_kafka_err2str(error_code)), error_code_(error_code) {
HandleException::HandleException(Error error)
: Exception(error.to_string()), error_(error) {
}
rd_kafka_resp_err_t HandleException::get_error_code() const {
return error_code_;
Error HandleException::get_error() const {
return error_;
}
} // cppkafka

View File

@@ -66,18 +66,10 @@ Message::Message(HandlePtr handle)
}
bool Message::has_error() const {
return get_error() != RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t Message::get_error() const {
Error Message::get_error() const {
return handle_->err;
}
string Message::get_error_string() const {
return rd_kafka_err2str(handle_->err);
}
bool Message::is_eof() const {
return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF;
}

View File

@@ -28,6 +28,7 @@
*/
#include "metadata.h"
#include "error.h"
using std::string;
using std::vector;
@@ -51,7 +52,7 @@ uint32_t PartitionMetadata::get_id() const {
return id_;
}
rd_kafka_resp_err_t PartitionMetadata::get_error() const {
Error PartitionMetadata::get_error() const {
return error_;
}
@@ -80,7 +81,7 @@ const string& TopicMetadata::get_topic() const {
return topic_;
}
rd_kafka_resp_err_t TopicMetadata::get_error() const {
Error TopicMetadata::get_error() const {
return error_;
}

View File

@@ -45,7 +45,7 @@ public:
cond.notify_one();
}
}
else if (msg && msg.get_error() == 0 && number_eofs == partitions) {
else if (msg && !msg.get_error() && number_eofs == partitions) {
messages_.push_back(move(msg));
}
}
@@ -198,10 +198,10 @@ TEST_F(ConsumerTest, OffsetCommit) {
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
config.set_offset_commit_callback([&](Consumer&, rd_kafka_resp_err_t error,
config.set_offset_commit_callback([&](Consumer&, Error error,
const TopicPartitionList& topic_partitions) {
offset_commit_called = true;
EXPECT_EQ(0, error);
EXPECT_FALSE(error);
ASSERT_EQ(1, topic_partitions.size());
EXPECT_EQ(KAFKA_TOPIC, topic_partitions[0].get_topic());
EXPECT_EQ(0, topic_partitions[0].get_partition());

View File

@@ -37,7 +37,8 @@ public:
auto start = system_clock::now();
while (system_clock::now() - start < seconds(10) && messages_.size() < expected) {
Message msg = consumer_.poll();
if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
if (msg && number_eofs != partitions &&
msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
@@ -45,7 +46,7 @@ public:
cond.notify_one();
}
}
else if (msg && msg.get_error() == 0) {
else if (msg && !msg.get_error()) {
messages_.push_back(move(msg));
}
}
@@ -124,7 +125,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
EXPECT_FALSE(message.get_key());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
EXPECT_FALSE(message.get_error());
int64_t low;
int64_t high;
@@ -155,7 +156,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
EXPECT_EQ(Buffer(key), message.get_key());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
EXPECT_FALSE(message.get_error());
// NOTE: if this line fails, then you're using kafka 0.10+ and that's okay
EXPECT_FALSE(message.get_timestamp());
}
@@ -186,7 +187,7 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
for (const auto& message : messages) {
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(1, payloads.erase(message.get_payload()));
EXPECT_EQ(0, message.get_error());
EXPECT_FALSE(message.get_error());
EXPECT_FALSE(message.get_key());
EXPECT_GE(message.get_partition(), 0);
EXPECT_LT(message.get_partition(), 3);
@@ -233,7 +234,7 @@ TEST_F(ProducerTest, Callbacks) {
EXPECT_EQ(Buffer(key), message.get_key());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
EXPECT_FALSE(message.get_error());
EXPECT_TRUE(deliver_report_called);
}