diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 5bc218c..793a4aa 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include diff --git a/include/cppkafka/logging.h b/include/cppkafka/logging.h new file mode 100644 index 0000000..7d9737b --- /dev/null +++ b/include/cppkafka/logging.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_LOGGING_H +#define CPPKAFKA_LOGGING_H + +namespace cppkafka { + +// Based on syslog.h levels +enum class LogLevel : int { + LOG_EMERG = 0, /* system is unusable */ + LOG_ALERT = 1, /* action must be taken immediately */ + LOG_CRIT = 2, /* critical conditions */ + LOG_ERR = 3, /* error conditions */ + LOG_WARNING = 4, /* warning conditions */ + LOG_NOTICE = 5, /* normal but significant condition */ + LOG_INFO = 6, /* informational */ + LOG_DEBUG = 7 /* debug-level messages */ +}; + +} //cppkafka + +#endif //CPPKAFKA_LOGGING_H diff --git a/src/consumer.cpp b/src/consumer.cpp index 9333e7b..66f1803 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -26,9 +26,10 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ - +#include #include "consumer.h" #include "exceptions.h" +#include "logging.h" #include "configuration.h" #include "topic_partition_list.h" @@ -36,7 +37,7 @@ using std::vector; using std::string; using std::move; using std::make_tuple; - +using std::ostringstream; using std::chrono::milliseconds; namespace cppkafka { @@ -73,9 +74,17 @@ Consumer::~Consumer() { rebalance_error_callback_ = nullptr; close(); } - catch (const Exception&) { - // If close throws just silently ignore until there's some - // logging facility (if any) + catch (const Exception& ex) { + constexpr const char* library_name = "cppkafka"; + ostringstream error_msg; + error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); + const auto& callback = get_configuration().get_log_callback(); + if (callback) { + callback(*this, static_cast(LogLevel::LOG_ERR), library_name, error_msg.str()); + } + else { + rd_kafka_log_print(get_handle(), static_cast(LogLevel::LOG_ERR), library_name, error_msg.str().c_str()); + } } }