Add partitioner callback to topic configuration

This commit is contained in:
Matias Fontanini
2016-06-04 19:15:32 -07:00
parent 8e37440f58
commit 4fccf277e0
7 changed files with 102 additions and 9 deletions

View File

@@ -4,16 +4,18 @@
#include <string>
#include <memory>
#include <chrono>
#include <unordered_map>
#include <mutex>
#include <librdkafka/rdkafka.h>
#include "metadata.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "topic_configuration.h"
#include "configuration.h"
namespace cppkafka {
class Topic;
class TopicConfiguration;
class KafkaHandleBase {
public:
@@ -46,13 +48,17 @@ private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(rd_kafka_topic_t* topic_ptr);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
HandlePtr handle_;
std::chrono::milliseconds timeout_ms_;
Configuration config_;
TopicConfigurationMap topic_configurations_;
std::mutex topic_configurations_mutex_;
};
} // cppkafka

View File

@@ -10,14 +10,19 @@ namespace cppkafka {
class Topic {
public:
static Topic make_non_owning(rd_kafka_topic_t* handle);
Topic(rd_kafka_topic_t* handle);
std::string get_name() const;
rd_kafka_topic_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_topic_t, decltype(&rd_kafka_topic_destroy)>;
struct NonOwningTag { };
Topic(rd_kafka_topic_t* handle, NonOwningTag);
HandlePtr handle_;
};

View File

@@ -2,26 +2,40 @@
#define CPPKAFKA_TOPIC_CONFIGURATION_H
#include <string>
#include <functional>
#include <librdkafka/rdkafka.h>
#include "clonable_ptr.h"
namespace cppkafka {
class Topic;
class Buffer;
class TopicConfiguration {
public:
using PartitionerCallback = std::function<int32_t(const Topic&, const Buffer& key,
int32_t partition_count)>;
TopicConfiguration();
void set(const std::string& name, const std::string& value);
void set_partitioner_callback(PartitionerCallback callback);
void set_as_opaque();
const PartitionerCallback& get_partitioner_callback() const;
rd_kafka_topic_conf_t* get_handle() const;
private:
using HandlePtr = ClonablePtr<rd_kafka_topic_conf_t, decltype(&rd_kafka_topic_conf_destroy),
using HandlePtr = ClonablePtr<rd_kafka_topic_conf_t,
decltype(&rd_kafka_topic_conf_destroy),
decltype(&rd_kafka_topic_conf_dup)>;
TopicConfiguration(rd_kafka_topic_conf_t* ptr);
static HandlePtr make_handle(rd_kafka_topic_conf_t* ptr);
HandlePtr handle_;
PartitionerCallback partitioner_callback_;
};
} // cppkafka

View File

@@ -1,12 +1,13 @@
#include "kafka_handle_base.h"
#include "exceptions.h"
#include "topic_configuration.h"
#include "topic.h"
#include "topic_partition_list.h"
using std::string;
using std::vector;
using std::move;
using std::lock_guard;
using std::mutex;
using std::chrono::milliseconds;
namespace cppkafka {
@@ -46,11 +47,14 @@ rd_kafka_t* KafkaHandleBase::get_handle() {
}
Topic KafkaHandleBase::get_topic(const string& name) {
save_topic_config(name, TopicConfiguration{});
return get_topic(name, nullptr);
}
Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicConfig) {
return get_topic(name, topicConfig.get_handle());
Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) {
auto handle = config.get_handle();
save_topic_config(name, move(config));
return get_topic(name, rd_kafka_topic_conf_dup(handle));
}
Metadata KafkaHandleBase::get_metadata() {
@@ -89,6 +93,12 @@ Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) {
return Metadata(metadata);
}
void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfiguration config) {
lock_guard<mutex> _(topic_configurations_mutex_);
auto iter = topic_configurations_.emplace(topic_name, move(config)).first;
iter->second.set_as_opaque();
}
void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) {
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw HandleException(error);

View File

@@ -5,11 +5,24 @@ using std::string;
namespace cppkafka {
void dummy_topic_destroyer(rd_kafka_topic_t*) {
}
Topic Topic::make_non_owning(rd_kafka_topic_t* handle) {
return Topic(handle, NonOwningTag{});
}
Topic::Topic(rd_kafka_topic_t* handle)
: handle_(handle, &rd_kafka_topic_destroy) {
}
Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag)
: handle_(handle, &dummy_topic_destroyer) {
}
string Topic::get_name() const {
return rd_kafka_topic_name(handle_.get());
}

View File

@@ -1,11 +1,30 @@
#include "topic_configuration.h"
#include <librdkafka/rdkafka.h>
#include "exceptions.h"
#include "topic.h"
#include "buffer.h"
using std::string;
namespace cppkafka {
int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *key_ptr,
size_t key_size, int32_t partition_count,
void* topic_opaque, void* message_opaque) {
const TopicConfiguration* config = static_cast<TopicConfiguration*>(topic_opaque);
const auto& callback = config->get_partitioner_callback();
if (callback) {
Topic topic = Topic::make_non_owning(const_cast<rd_kafka_topic_t*>(handle));
Buffer key(static_cast<const char*>(key_ptr), key_size);
return callback(topic, key, partition_count);
}
else {
return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size,
partition_count, topic_opaque,
message_opaque);
}
}
TopicConfiguration::TopicConfiguration()
: handle_(make_handle(rd_kafka_topic_conf_new())) {
@@ -26,6 +45,20 @@ void TopicConfiguration::set(const string& name, const string& value) {
}
}
void TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) {
partitioner_callback_ = move(callback);
rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy);
}
void TopicConfiguration::set_as_opaque() {
rd_kafka_topic_conf_set_opaque(handle_.get(), this);
}
const TopicConfiguration::PartitionerCallback&
TopicConfiguration::get_partitioner_callback() const {
return partitioner_callback_;
}
rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
return handle_.get();
}

View File

@@ -197,15 +197,27 @@ TEST_F(ProducerTest, Callbacks) {
// Now create a producer and produce a message
string payload = "Hello world!";
string key = "hehe";
bool deliver_report_called = false;
Configuration config = make_producer_config();
config.set_delivery_report_callback([&](Producer&, const Message& msg) {
EXPECT_EQ(payload, msg.get_payload().as_string());
deliver_report_called = true;
});
TopicConfiguration topic_config;
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
int32_t partition_count) {
EXPECT_EQ(key, msg_key.as_string());
EXPECT_EQ(3, partition_count);
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
return 0;
});
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC);
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config);
producer.produce(topic, {}, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
producer.poll();
runner.try_join();
@@ -213,7 +225,7 @@ TEST_F(ProducerTest, Callbacks) {
ASSERT_EQ(1, messages.size());
const auto& message = messages[0];
EXPECT_EQ(payload, message.get_payload().as_string());
EXPECT_EQ("", message.get_key().as_string());
EXPECT_EQ(key, message.get_key().as_string());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());