diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 7617ac7..9a622dc 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "topic_partition_list.h" #include "clonable_ptr.h" @@ -11,22 +12,46 @@ namespace cppkafka { class Message; +class Producer; +class Consumer; +class KafkaHandleBase; class Configuration { public: - using DeliveryReportCallback = std::function; - using OffsetCommitCallback = std::function; + using OffsetCommitCallback = std::function; + using ErrorCallback = std::function; + using ThrottleCallback = std::function; + using LogCallback = std::function; + using StatsCallback = std::function; + using SocketCallback = std::function; Configuration(); void set(const std::string& name, const std::string& value); void set_delivery_report_callback(DeliveryReportCallback callback); void set_offset_commit_callback(OffsetCommitCallback callback); + void set_error_callback(ErrorCallback callback); + void set_throttle_callback(ThrottleCallback callback); + void set_log_callback(LogCallback callback); + void set_stats_callback(StatsCallback callback); + void set_socket_callback(SocketCallback callback); rd_kafka_conf_t* get_handle() const; const DeliveryReportCallback& get_delivery_report_callback() const; const OffsetCommitCallback& get_offset_commit_callback() const; + const ErrorCallback& get_error_callback() const; + const ThrottleCallback& get_throttle_callback() const; + const LogCallback& get_log_callback() const; + const StatsCallback& get_stats_callback() const; + const SocketCallback& get_socket_callback() const; private: using HandlePtr = ClonablePtr; @@ -37,6 +62,11 @@ private: HandlePtr handle_; DeliveryReportCallback delivery_report_callback_; OffsetCommitCallback offset_commit_callback_; + ErrorCallback error_callback_; + ThrottleCallback throttle_callback_; + LogCallback log_callback_; + StatsCallback stats_callback_; + SocketCallback socket_callback_; }; } // cppkafka diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index e7f9325..e279974 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -7,7 +7,6 @@ #include #include "kafka_handle_base.h" #include "message.h" -#include "configuration.h" namespace cppkafka { @@ -46,7 +45,6 @@ public: TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions); TopicPartitionList get_subscription(); TopicPartitionList get_assignment(); - const Configuration& get_configuration() const; Message poll(); private: @@ -60,7 +58,6 @@ private: AssignmentCallback assignment_callback_; RevocationCallback revocation_callback_; RebalanceErrorCallback rebalance_error_callback_; - Configuration config_; }; } // cppkafka diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index d3f9aa7..fd304aa 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -8,6 +8,7 @@ #include "metadata.h" #include "topic_partition.h" #include "topic_partition_list.h" +#include "configuration.h" namespace cppkafka { @@ -33,12 +34,14 @@ public: Metadata get_metadata(); Metadata get_metadata(const Topic& topic); std::chrono::milliseconds get_timeout() const; + const Configuration& get_configuration() const; protected: - KafkaHandleBase(); + KafkaHandleBase(Configuration config); KafkaHandleBase(rd_kafka_t* handle); void set_handle(rd_kafka_t* handle); void check_error(rd_kafka_resp_err_t error); + rd_kafka_conf_t* get_configuration_handle(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; @@ -49,6 +52,7 @@ private: HandlePtr handle_; std::chrono::milliseconds timeout_ms_; + Configuration config_; }; } // cppkafka diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 9e2a1d9..f7be1c6 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -27,8 +27,6 @@ public: void set_payload_policy(PayloadPolicy policy); PayloadPolicy get_payload_policy() const; - const Configuration& get_configuration() const; - void produce(const Topic& topic, const Partition& partition, const Buffer& payload); void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key); @@ -37,7 +35,6 @@ public: int poll(); private: - Configuration config_; PayloadPolicy message_payload_policy_; }; diff --git a/src/configuration.cpp b/src/configuration.cpp index 0057e25..71228b4 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -7,28 +7,74 @@ using std::string; +using std::chrono::milliseconds; + namespace cppkafka { // Callback proxies -void delivery_report_proxy(rd_kafka_t *rk, const rd_kafka_message_t* msg, void *opaque) { - const Producer* producer = static_cast(opaque); +void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { + Producer* handle = static_cast(opaque); Message message = Message::make_non_owning((rd_kafka_message_t*)msg); - const auto& callback = producer->get_configuration().get_delivery_report_callback(); + const auto& callback = handle->get_configuration().get_delivery_report_callback(); if (callback) { - callback(message); + callback(*handle, message); } } -void offset_commit_proxy(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *offsets, - void *opaque) { - const Consumer* consumer = static_cast(opaque); +void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, void *opaque) { + Consumer* handle = static_cast(opaque); TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{}; - const auto& callback = consumer->get_configuration().get_offset_commit_callback(); + const auto& callback = handle->get_configuration().get_offset_commit_callback(); if (callback) { - callback(err, list); + callback(*handle, err, list); + } +} + +void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) { + KafkaHandleBase* handle = static_cast(opaque); + const auto& callback = handle->get_configuration().get_error_callback(); + if (callback) { + callback(*handle, err, reason); + } +} + +void throttle_callback_proxy(rd_kafka_t*, const char* broker_name, + int32_t broker_id, int throttle_time_ms, void *opaque) { + KafkaHandleBase* handle = static_cast(opaque); + const auto& callback = handle->get_configuration().get_throttle_callback(); + if (callback) { + callback(*handle, broker_name, broker_id, milliseconds(throttle_time_ms)); + } +} + +void log_callback_proxy(const rd_kafka_t* h, int level, + const char* facility, const char* message) { + KafkaHandleBase* handle = static_cast(rd_kafka_opaque(h)); + const auto& callback = handle->get_configuration().get_log_callback(); + if (callback) { + callback(*handle, level, facility, message); + } +} + +int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) { + KafkaHandleBase* handle = static_cast(opaque); + const auto& callback = handle->get_configuration().get_stats_callback(); + if (callback) { + callback(*handle, string(json, json + json_len)); + } + return 0; +} + +int socket_callback_proxy(int domain, int type, int protocol, void* opaque) { + KafkaHandleBase* handle = static_cast(opaque); + const auto& callback = handle->get_configuration().get_socket_callback(); + if (callback) { + return callback(domain, type, protocol); + } + else { + return -1; } } @@ -56,12 +102,37 @@ void Configuration::set(const string& name, const string& value) { void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { delivery_report_callback_ = move(callback); - rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_proxy); + rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy); } void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { offset_commit_callback_ = move(callback); - rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_proxy); + rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy); +} + +void Configuration::set_error_callback(ErrorCallback callback) { + error_callback_ = move(callback); + rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy); +} + +void Configuration::set_throttle_callback(ThrottleCallback callback) { + throttle_callback_ = move(callback); + rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy); +} + +void Configuration::set_log_callback(LogCallback callback) { + log_callback_ = move(callback); + rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy); +} + +void Configuration::set_stats_callback(StatsCallback callback) { + stats_callback_ = move(callback); + rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy); +} + +void Configuration::set_socket_callback(SocketCallback callback) { + socket_callback_ = move(callback); + rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy); } rd_kafka_conf_t* Configuration::get_handle() const { @@ -76,6 +147,26 @@ const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_call return offset_commit_callback_; } +const Configuration::ErrorCallback& Configuration::get_error_callback() const { + return error_callback_; +} + +const Configuration::ThrottleCallback& Configuration::get_throttle_callback() const { + return throttle_callback_; +} + +const Configuration::LogCallback& Configuration::get_log_callback() const { + return log_callback_; +} + +const Configuration::StatsCallback& Configuration::get_stats_callback() const { + return stats_callback_; +} + +const Configuration::SocketCallback& Configuration::get_socket_callback() const { + return socket_callback_; +} + Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup); } diff --git a/src/consumer.cpp b/src/consumer.cpp index 9bfcd39..f140893 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -18,13 +18,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, } Consumer::Consumer(Configuration config) -: config_(move(config)) { +: KafkaHandleBase(move(config)) { char error_buffer[512]; + rd_kafka_conf_t* config_handle = get_configuration_handle(); // Set ourselves as the opaque pointer - rd_kafka_conf_set_opaque(config_.get_handle(), this); - rd_kafka_conf_set_rebalance_cb(config_.get_handle(), &Consumer::rebalance_proxy); + rd_kafka_conf_set_opaque(config_handle, this); + rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy); rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, - rd_kafka_conf_dup(config_.get_handle()), + rd_kafka_conf_dup(config_handle), error_buffer, sizeof(error_buffer)); if (!ptr) { throw Exception("Failed to create consumer handle: " + string(error_buffer)); @@ -126,10 +127,6 @@ TopicPartitionList Consumer::get_assignment() { return convert(make_handle(list)); } -const Configuration& Consumer::get_configuration() const { - return config_; -} - Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), get_timeout().count()); diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 47d03b0..5eb3e75 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -6,14 +6,15 @@ using std::string; using std::vector; +using std::move; using std::chrono::milliseconds; namespace cppkafka { const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; -KafkaHandleBase::KafkaHandleBase() -: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) { +KafkaHandleBase::KafkaHandleBase(Configuration config) +: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)) { } @@ -64,6 +65,10 @@ milliseconds KafkaHandleBase::get_timeout() const { return timeout_ms_; } +const Configuration& KafkaHandleBase::get_configuration() const { + return config_; +} + void KafkaHandleBase::set_handle(rd_kafka_t* handle) { handle_ = HandlePtr(handle, &rd_kafka_destroy); } @@ -90,4 +95,8 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { } } +rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() { + return config_.get_handle(); +} + } // cppkafka diff --git a/src/producer.cpp b/src/producer.cpp index 62d212f..869ec53 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -8,11 +8,12 @@ using std::string; namespace cppkafka { Producer::Producer(Configuration config) -: config_(move(config)) { +: KafkaHandleBase(move(config)) { char error_buffer[512]; - rd_kafka_conf_set_opaque(config_.get_handle(), this); + auto config_handle = get_configuration().get_handle(); + rd_kafka_conf_set_opaque(config_handle, this); rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, - rd_kafka_conf_dup(config_.get_handle()), + rd_kafka_conf_dup(config_handle), error_buffer, sizeof(error_buffer)); if (!ptr) { throw Exception("Failed to create producer handle: " + string(error_buffer)); @@ -30,10 +31,6 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { return message_payload_policy_; } -const Configuration& Producer::get_configuration() const { - return config_; -} - void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/); } diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 38ed0c7..c65a5b3 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -189,7 +189,7 @@ TEST_F(ConsumerTest, OffsetCommit) { // Create a consumer and subscribe to the topic Configuration config = make_consumer_config("offset_commit"); - config.set_offset_commit_callback([&](rd_kafka_resp_err_t error, + config.set_offset_commit_callback([&](Consumer&, rd_kafka_resp_err_t error, const TopicPartitionList& topic_partitions) { offset_commit_called = true; EXPECT_EQ(0, error); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index b981a61..95517d8 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -199,7 +199,7 @@ TEST_F(ProducerTest, Callbacks) { string payload = "Hello world!"; bool deliver_report_called = false; Configuration config = make_producer_config(); - config.set_delivery_report_callback([&](const Message& msg) { + config.set_delivery_report_callback([&](Producer&, const Message& msg) { EXPECT_EQ(payload, msg.get_payload().as_string()); deliver_report_called = true; });