Invoke error callback if present instead of log callback (#93)

This commit is contained in:
Alex Damian
2018-06-20 12:11:24 -04:00
committed by Matias Fontanini
parent eb46b8808e
commit c5aca985b8
2 changed files with 16 additions and 10 deletions

View File

@@ -62,19 +62,22 @@ class KafkaHandleBase;
class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> { class CPPKAFKA_API Configuration : public ConfigurationBase<Configuration> {
public: public:
using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>; using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
using OffsetCommitCallback = std::function<void(Consumer& consumer, Error, using OffsetCommitCallback = std::function<void(Consumer& consumer,
Error error,
const TopicPartitionList& topic_partitions)>; const TopicPartitionList& topic_partitions)>;
using ErrorCallback = std::function<void(KafkaHandleBase& handle, int error, using ErrorCallback = std::function<void(KafkaHandleBase& handle,
int error,
const std::string& reason)>; const std::string& reason)>;
using ThrottleCallback = std::function<void(KafkaHandleBase& handle, using ThrottleCallback = std::function<void(KafkaHandleBase& handle,
const std::string& broker_name, const std::string& broker_name,
int32_t broker_id, int32_t broker_id,
std::chrono::milliseconds throttle_time)>; std::chrono::milliseconds throttle_time)>;
using LogCallback = std::function<void(KafkaHandleBase& handle, int level, using LogCallback = std::function<void(KafkaHandleBase& handle,
int level,
const std::string& facility, const std::string& facility,
const std::string& message)>; const std::string& message)>;
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>; using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
using SocketCallback = std::function<int(int domain, int type, int protoco)>; using SocketCallback = std::function<int(int domain, int type, int protocol)>;
using ConfigurationBase<Configuration>::set; using ConfigurationBase<Configuration>::set;
using ConfigurationBase<Configuration>::get; using ConfigurationBase<Configuration>::get;

View File

@@ -79,16 +79,19 @@ Consumer::~Consumer() {
rebalance_error_callback_ = nullptr; rebalance_error_callback_ = nullptr;
close(); close();
} }
catch (const Exception& ex) { catch (const HandleException& ex) {
const char* library_name = "cppkafka";
ostringstream error_msg; ostringstream error_msg;
error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what();
CallbackInvoker<Configuration::LogCallback> logger("log", get_configuration().get_log_callback(), nullptr); CallbackInvoker<Configuration::ErrorCallback> error_cb("error", get_configuration().get_error_callback(), this);
if (logger) { CallbackInvoker<Configuration::LogCallback> logger_cb("log", get_configuration().get_log_callback(), nullptr);
logger(*this, static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str()); if (error_cb) {
error_cb(*this, static_cast<int>(ex.get_error().get_error()), error_msg.str());
}
else if (logger_cb) {
logger_cb(*this, static_cast<int>(LogLevel::LOG_ERR), "cppkafka", error_msg.str());
} }
else { else {
rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), library_name, error_msg.str().c_str()); rd_kafka_log_print(get_handle(), static_cast<int>(LogLevel::LOG_ERR), "cppkafka", error_msg.str().c_str());
} }
} }
} }