diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9b73941..c089611 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,4 +1,4 @@ -link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread) +link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread rt ssl crypto dl z) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 61228fe..9e00932 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -38,6 +38,7 @@ #include "queue.h" #include "macros.h" #include "error.h" +#include "detail/callback_invoker.h" namespace cppkafka { diff --git a/include/cppkafka/detail/callback_invoker.h b/include/cppkafka/detail/callback_invoker.h new file mode 100644 index 0000000..8b2d8d5 --- /dev/null +++ b/include/cppkafka/detail/callback_invoker.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_CALLBACK_INVOKER_H +#define CPPKAFKA_CALLBACK_INVOKER_H + +#include +#include +#include "../logging.h" +#include "../kafka_handle_base.h" + +namespace cppkafka { + +// Error values +template +T error_value() { return T{}; } + +template<> inline +void error_value() {}; + +template<> inline +bool error_value() { return false; } + +template<> inline +int error_value() { return -1; } + +/** + * \brief Wraps an std::function object and runs it while preventing all exceptions from escaping + * \tparam Func An std::function object + */ +template +class CallbackInvoker +{ +public: + using RetType = typename Func::result_type; + using LogCallback = std::function; + CallbackInvoker(const char* callback_name, + const Func& callback, + KafkaHandleBase* handle) + : callback_name_(callback_name), + callback_(callback), + handle_(handle) { + } + + explicit operator bool() const { + return (bool)callback_; + } + + template + RetType operator()(Args&&... args) const { + static const char* library_name = "cppkafka"; + std::ostringstream error_msg; + try { + if (callback_) { + return callback_(std::forward(args)...); + } + return error_value(); + } + catch (const std::exception& ex) { + if (handle_) { + error_msg << "Caught exception in " << callback_name_ << " callback: " << ex.what(); + } + } + catch (...) { + if (handle_) { + error_msg << "Caught unknown exception in " << callback_name_ << " callback"; + } + } + // Log error + if (handle_) { + if (handle_->get_configuration().get_log_callback()) { + try { + // Log it + handle_->get_configuration().get_log_callback()(*handle_, + static_cast(LogLevel::LOG_ERR), + library_name, + error_msg.str()); + } + catch (...) {} // sink everything + } + else { + rd_kafka_log_print(handle_->get_handle(), + static_cast(LogLevel::LOG_ERR), + library_name, + error_msg.str().c_str()); + } + } + return error_value(); + } +private: + const char* callback_name_; + const Func& callback_; + KafkaHandleBase* handle_; +}; + +} + +#endif diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index 6c25f62..907dd60 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -33,6 +33,7 @@ #include #include #include +#include #include "../consumer.h" #include "backoff_performer.h" @@ -118,6 +119,7 @@ public: */ void commit(const TopicPartitionList& topic_partitions); private: + // Return true to abort and false to continue committing template bool do_commit(const T& object) { try { @@ -131,13 +133,11 @@ private: if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) { return true; } - // If there's a callback and it returns false for this message, abort - if (callback_ && !callback_(ex.get_error())) { - return true; - } + // If there's a callback and it returns false for this message, abort. + // Otherwise keep committing. + CallbackInvoker callback("backoff committer", callback_, &consumer_); + return callback && !callback(ex.get_error()); } - // In any other case, we failed. Keep committing - return false; } Consumer& consumer_; diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 8454a4c..50c8cc7 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -364,9 +364,9 @@ void BufferedProducer::flush() { produce_message(flush_queue.front()); } catch (const HandleException& ex) { - if (flush_failure_callback_ && - flush_failure_callback_(flush_queue.front(), ex.get_error())) { - // retry again later + // If we have a flush failure callback and it returns true, we retry producing this message later + CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); + if (callback && callback(flush_queue.front(), ex.get_error())) { do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false); } } @@ -519,19 +519,18 @@ void BufferedProducer::on_delivery_report(const Message& message) { --pending_acks_; assert(pending_acks_ != (size_t)-1); // Prevent underflow - // We should produce this message again if it has an error and we either don't have a - // produce failure callback or we have one but it returns true - bool should_produce = message.get_error() && - (!produce_failure_callback_ || produce_failure_callback_(message)); - if (should_produce) { - // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + if (message.get_error()) { + // We should produce this message again if we don't have a produce failure callback + // or we have one but it returns true + CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); + if (!callback || callback(message)) { + // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + do_add_message(Builder(message), MessagePriority::High, false); + } } else { // Successful delivery - if (produce_success_callback_) { - produce_success_callback_(message); - } + CallbackInvoker("delivery success", produce_success_callback_, &producer_)(message); // Increment the total successful transmissions ++total_messages_produced_; } diff --git a/src/configuration.cpp b/src/configuration.cpp index 8f574b1..92a81df 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -52,66 +52,56 @@ namespace cppkafka { void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { Producer* handle = static_cast(opaque); Message message = Message::make_non_owning((rd_kafka_message_t*)msg); - const auto& callback = handle->get_configuration().get_delivery_report_callback(); - if (callback) { - callback(*handle, message); - } + CallbackInvoker + ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle) + (*handle, message); } void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets, void *opaque) { Consumer* handle = static_cast(opaque); TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{}; - const auto& callback = handle->get_configuration().get_offset_commit_callback(); - if (callback) { - callback(*handle, err, list); - } + CallbackInvoker + ("offset commit", handle->get_configuration().get_offset_commit_callback(), handle) + (*handle, err, list); } void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) { KafkaHandleBase* handle = static_cast(opaque); - const auto& callback = handle->get_configuration().get_error_callback(); - if (callback) { - callback(*handle, err, reason); - } + CallbackInvoker + ("error", handle->get_configuration().get_error_callback(), handle) + (*handle, err, reason); } void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, int32_t broker_id, int throttle_time_ms, void *opaque) { KafkaHandleBase* handle = static_cast(opaque); - const auto& callback = handle->get_configuration().get_throttle_callback(); - if (callback) { - callback(*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); - } + CallbackInvoker + ("throttle", handle->get_configuration().get_throttle_callback(), handle) + (*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); } void log_callback_proxy(const rd_kafka_t* h, int level, const char* facility, const char* message) { KafkaHandleBase* handle = static_cast(rd_kafka_opaque(h)); - const auto& callback = handle->get_configuration().get_log_callback(); - if (callback) { - callback(*handle, level, facility, message); - } + CallbackInvoker + ("log", handle->get_configuration().get_log_callback(), nullptr) + (*handle, level, facility, message); } int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) { KafkaHandleBase* handle = static_cast(opaque); - const auto& callback = handle->get_configuration().get_stats_callback(); - if (callback) { - callback(*handle, string(json, json + json_len)); - } + CallbackInvoker + ("statistics", handle->get_configuration().get_stats_callback(), handle) + (*handle, string(json, json + json_len)); return 0; } int socket_callback_proxy(int domain, int type, int protocol, void* opaque) { KafkaHandleBase* handle = static_cast(opaque); - const auto& callback = handle->get_configuration().get_socket_callback(); - if (callback) { - return callback(domain, type, protocol); - } - else { - return -1; - } + return CallbackInvoker + ("socket", handle->get_configuration().get_socket_callback(), handle) + (domain, type, protocol); } // Configuration diff --git a/src/consumer.cpp b/src/consumer.cpp index ce84092..2243400 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -34,6 +34,7 @@ #include "logging.h" #include "configuration.h" #include "topic_partition_list.h" +#include "detail/callback_invoker.h" using std::vector; using std::string; @@ -79,12 +80,12 @@ Consumer::~Consumer() { close(); } catch (const Exception& ex) { - constexpr const char* library_name = "cppkafka"; + const char* library_name = "cppkafka"; ostringstream error_msg; error_msg << "Failed to close consumer [" << get_name() << "]: " << ex.what(); - const auto& callback = get_configuration().get_log_callback(); - if (callback) { - callback(*this, static_cast(LogLevel::LOG_ERR), library_name, error_msg.str()); + CallbackInvoker logger("log", get_configuration().get_log_callback(), nullptr); + if (logger) { + logger(*this, static_cast(LogLevel::LOG_ERR), library_name, error_msg.str()); } else { rd_kafka_log_print(get_handle(), static_cast(LogLevel::LOG_ERR), library_name, error_msg.str().c_str()); @@ -292,21 +293,15 @@ void Consumer::commit(const TopicPartitionList* topic_partitions, bool async) { void Consumer::handle_rebalance(rd_kafka_resp_err_t error, TopicPartitionList& topic_partitions) { if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { - if (assignment_callback_) { - assignment_callback_(topic_partitions); - } + CallbackInvoker("assignment", assignment_callback_, this)(topic_partitions); assign(topic_partitions); } else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { - if (revocation_callback_) { - revocation_callback_(topic_partitions); - } + CallbackInvoker("revocation", revocation_callback_, this)(topic_partitions); unassign(); } else { - if (rebalance_error_callback_) { - rebalance_error_callback_(error); - } + CallbackInvoker("rebalance error", rebalance_error_callback_, this)(error); unassign(); } } diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index 9db0a01..35512f2 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -33,6 +33,7 @@ #include "exceptions.h" #include "topic.h" #include "buffer.h" +#include "detail/callback_invoker.h" using std::string; using std::map; @@ -49,7 +50,8 @@ int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *k if (callback) { Topic topic = Topic::make_non_owning(const_cast(handle)); Buffer key(static_cast(key_ptr), key_size); - return callback(topic, key, partition_count); + return CallbackInvoker("topic partitioner", callback, nullptr) + (topic, key, partition_count); } else { return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 01e06ed..c3234d0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -23,6 +23,6 @@ add_executable( # Main file test_main.cpp ) -target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread) +target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread rt ssl crypto dl z) add_dependencies(tests cppkafka_tests) add_test(cppkafka cppkafka_tests)