Add config callbacks

This commit is contained in:
Matias Fontanini
2016-06-04 18:16:21 -07:00
parent a74d46094f
commit 8e37440f58
10 changed files with 163 additions and 41 deletions

View File

@@ -4,6 +4,7 @@
#include <memory>
#include <string>
#include <functional>
#include <chrono>
#include <librdkafka/rdkafka.h>
#include "topic_partition_list.h"
#include "clonable_ptr.h"
@@ -11,22 +12,46 @@
namespace cppkafka {
class Message;
class Producer;
class Consumer;
class KafkaHandleBase;
class Configuration {
public:
using DeliveryReportCallback = std::function<void(const Message&)>;
using OffsetCommitCallback = std::function<void(rd_kafka_resp_err_t,
using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
using OffsetCommitCallback = std::function<void(Consumer& consumer, rd_kafka_resp_err_t,
const TopicPartitionList& topic_partitions)>;
using ErrorCallback = std::function<void(KafkaHandleBase& handle, int error,
const std::string& reason)>;
using ThrottleCallback = std::function<void(KafkaHandleBase& handle,
const std::string& broker_name,
int32_t broker_id,
std::chrono::milliseconds throttle_time)>;
using LogCallback = std::function<void(KafkaHandleBase& handle, int level,
const std::string& facility,
const std::string& message)>;
using StatsCallback = std::function<void(KafkaHandleBase& handle, const std::string& json)>;
using SocketCallback = std::function<int(int domain, int type, int protoco)>;
Configuration();
void set(const std::string& name, const std::string& value);
void set_delivery_report_callback(DeliveryReportCallback callback);
void set_offset_commit_callback(OffsetCommitCallback callback);
void set_error_callback(ErrorCallback callback);
void set_throttle_callback(ThrottleCallback callback);
void set_log_callback(LogCallback callback);
void set_stats_callback(StatsCallback callback);
void set_socket_callback(SocketCallback callback);
rd_kafka_conf_t* get_handle() const;
const DeliveryReportCallback& get_delivery_report_callback() const;
const OffsetCommitCallback& get_offset_commit_callback() const;
const ErrorCallback& get_error_callback() const;
const ThrottleCallback& get_throttle_callback() const;
const LogCallback& get_log_callback() const;
const StatsCallback& get_stats_callback() const;
const SocketCallback& get_socket_callback() const;
private:
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
decltype(&rd_kafka_conf_dup)>;
@@ -37,6 +62,11 @@ private:
HandlePtr handle_;
DeliveryReportCallback delivery_report_callback_;
OffsetCommitCallback offset_commit_callback_;
ErrorCallback error_callback_;
ThrottleCallback throttle_callback_;
LogCallback log_callback_;
StatsCallback stats_callback_;
SocketCallback socket_callback_;
};
} // cppkafka

View File

@@ -7,7 +7,6 @@
#include <functional>
#include "kafka_handle_base.h"
#include "message.h"
#include "configuration.h"
namespace cppkafka {
@@ -46,7 +45,6 @@ public:
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions);
TopicPartitionList get_subscription();
TopicPartitionList get_assignment();
const Configuration& get_configuration() const;
Message poll();
private:
@@ -60,7 +58,6 @@ private:
AssignmentCallback assignment_callback_;
RevocationCallback revocation_callback_;
RebalanceErrorCallback rebalance_error_callback_;
Configuration config_;
};
} // cppkafka

View File

@@ -8,6 +8,7 @@
#include "metadata.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "configuration.h"
namespace cppkafka {
@@ -33,12 +34,14 @@ public:
Metadata get_metadata();
Metadata get_metadata(const Topic& topic);
std::chrono::milliseconds get_timeout() const;
const Configuration& get_configuration() const;
protected:
KafkaHandleBase();
KafkaHandleBase(Configuration config);
KafkaHandleBase(rd_kafka_t* handle);
void set_handle(rd_kafka_t* handle);
void check_error(rd_kafka_resp_err_t error);
rd_kafka_conf_t* get_configuration_handle();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
@@ -49,6 +52,7 @@ private:
HandlePtr handle_;
std::chrono::milliseconds timeout_ms_;
Configuration config_;
};
} // cppkafka

View File

@@ -27,8 +27,6 @@ public:
void set_payload_policy(PayloadPolicy policy);
PayloadPolicy get_payload_policy() const;
const Configuration& get_configuration() const;
void produce(const Topic& topic, const Partition& partition, const Buffer& payload);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key);
@@ -37,7 +35,6 @@ public:
int poll();
private:
Configuration config_;
PayloadPolicy message_payload_policy_;
};

View File

@@ -7,28 +7,74 @@
using std::string;
using std::chrono::milliseconds;
namespace cppkafka {
// Callback proxies
void delivery_report_proxy(rd_kafka_t *rk, const rd_kafka_message_t* msg, void *opaque) {
const Producer* producer = static_cast<const Producer*>(opaque);
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
Producer* handle = static_cast<Producer*>(opaque);
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
const auto& callback = producer->get_configuration().get_delivery_report_callback();
const auto& callback = handle->get_configuration().get_delivery_report_callback();
if (callback) {
callback(message);
callback(*handle, message);
}
}
void offset_commit_proxy(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
const Consumer* consumer = static_cast<const Consumer*>(opaque);
void offset_commit_callback_proxy(rd_kafka_t*, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets, void *opaque) {
Consumer* handle = static_cast<Consumer*>(opaque);
TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
const auto& callback = consumer->get_configuration().get_offset_commit_callback();
const auto& callback = handle->get_configuration().get_offset_commit_callback();
if (callback) {
callback(err, list);
callback(*handle, err, list);
}
}
void error_callback_proxy(rd_kafka_t*, int err, const char *reason, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_error_callback();
if (callback) {
callback(*handle, err, reason);
}
}
void throttle_callback_proxy(rd_kafka_t*, const char* broker_name,
int32_t broker_id, int throttle_time_ms, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_throttle_callback();
if (callback) {
callback(*handle, broker_name, broker_id, milliseconds(throttle_time_ms));
}
}
void log_callback_proxy(const rd_kafka_t* h, int level,
const char* facility, const char* message) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(rd_kafka_opaque(h));
const auto& callback = handle->get_configuration().get_log_callback();
if (callback) {
callback(*handle, level, facility, message);
}
}
int stats_callback_proxy(rd_kafka_t*, char *json, size_t json_len, void *opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_stats_callback();
if (callback) {
callback(*handle, string(json, json + json_len));
}
return 0;
}
int socket_callback_proxy(int domain, int type, int protocol, void* opaque) {
KafkaHandleBase* handle = static_cast<KafkaHandleBase*>(opaque);
const auto& callback = handle->get_configuration().get_socket_callback();
if (callback) {
return callback(domain, type, protocol);
}
else {
return -1;
}
}
@@ -56,12 +102,37 @@ void Configuration::set(const string& name, const string& value) {
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
delivery_report_callback_ = move(callback);
rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_proxy);
rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy);
}
void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) {
offset_commit_callback_ = move(callback);
rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_proxy);
rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy);
}
void Configuration::set_error_callback(ErrorCallback callback) {
error_callback_ = move(callback);
rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy);
}
void Configuration::set_throttle_callback(ThrottleCallback callback) {
throttle_callback_ = move(callback);
rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy);
}
void Configuration::set_log_callback(LogCallback callback) {
log_callback_ = move(callback);
rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy);
}
void Configuration::set_stats_callback(StatsCallback callback) {
stats_callback_ = move(callback);
rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy);
}
void Configuration::set_socket_callback(SocketCallback callback) {
socket_callback_ = move(callback);
rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy);
}
rd_kafka_conf_t* Configuration::get_handle() const {
@@ -76,6 +147,26 @@ const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_call
return offset_commit_callback_;
}
const Configuration::ErrorCallback& Configuration::get_error_callback() const {
return error_callback_;
}
const Configuration::ThrottleCallback& Configuration::get_throttle_callback() const {
return throttle_callback_;
}
const Configuration::LogCallback& Configuration::get_log_callback() const {
return log_callback_;
}
const Configuration::StatsCallback& Configuration::get_stats_callback() const {
return stats_callback_;
}
const Configuration::SocketCallback& Configuration::get_socket_callback() const {
return socket_callback_;
}
Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup);
}

View File

@@ -18,13 +18,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
}
Consumer::Consumer(Configuration config)
: config_(move(config)) {
: KafkaHandleBase(move(config)) {
char error_buffer[512];
rd_kafka_conf_t* config_handle = get_configuration_handle();
// Set ourselves as the opaque pointer
rd_kafka_conf_set_opaque(config_.get_handle(), this);
rd_kafka_conf_set_rebalance_cb(config_.get_handle(), &Consumer::rebalance_proxy);
rd_kafka_conf_set_opaque(config_handle, this);
rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_conf_dup(config_.get_handle()),
rd_kafka_conf_dup(config_handle),
error_buffer, sizeof(error_buffer));
if (!ptr) {
throw Exception("Failed to create consumer handle: " + string(error_buffer));
@@ -126,10 +127,6 @@ TopicPartitionList Consumer::get_assignment() {
return convert(make_handle(list));
}
const Configuration& Consumer::get_configuration() const {
return config_;
}
Message Consumer::poll() {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),
get_timeout().count());

View File

@@ -6,14 +6,15 @@
using std::string;
using std::vector;
using std::move;
using std::chrono::milliseconds;
namespace cppkafka {
const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
KafkaHandleBase::KafkaHandleBase()
: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) {
KafkaHandleBase::KafkaHandleBase(Configuration config)
: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)) {
}
@@ -64,6 +65,10 @@ milliseconds KafkaHandleBase::get_timeout() const {
return timeout_ms_;
}
const Configuration& KafkaHandleBase::get_configuration() const {
return config_;
}
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
handle_ = HandlePtr(handle, &rd_kafka_destroy);
}
@@ -90,4 +95,8 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) {
}
}
rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
return config_.get_handle();
}
} // cppkafka

View File

@@ -8,11 +8,12 @@ using std::string;
namespace cppkafka {
Producer::Producer(Configuration config)
: config_(move(config)) {
: KafkaHandleBase(move(config)) {
char error_buffer[512];
rd_kafka_conf_set_opaque(config_.get_handle(), this);
auto config_handle = get_configuration().get_handle();
rd_kafka_conf_set_opaque(config_handle, this);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER,
rd_kafka_conf_dup(config_.get_handle()),
rd_kafka_conf_dup(config_handle),
error_buffer, sizeof(error_buffer));
if (!ptr) {
throw Exception("Failed to create producer handle: " + string(error_buffer));
@@ -30,10 +31,6 @@ Producer::PayloadPolicy Producer::get_payload_policy() const {
return message_payload_policy_;
}
const Configuration& Producer::get_configuration() const {
return config_;
}
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) {
produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/);
}

View File

@@ -189,7 +189,7 @@ TEST_F(ConsumerTest, OffsetCommit) {
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
config.set_offset_commit_callback([&](rd_kafka_resp_err_t error,
config.set_offset_commit_callback([&](Consumer&, rd_kafka_resp_err_t error,
const TopicPartitionList& topic_partitions) {
offset_commit_called = true;
EXPECT_EQ(0, error);

View File

@@ -199,7 +199,7 @@ TEST_F(ProducerTest, Callbacks) {
string payload = "Hello world!";
bool deliver_report_called = false;
Configuration config = make_producer_config();
config.set_delivery_report_callback([&](const Message& msg) {
config.set_delivery_report_callback([&](Producer&, const Message& msg) {
EXPECT_EQ(payload, msg.get_payload().as_string());
deliver_report_called = true;
});