Add offset commit and delivery report callbacks

This commit is contained in:
Matias Fontanini
2016-05-31 07:26:31 -07:00
parent fd2d616506
commit 82393b558e
13 changed files with 236 additions and 34 deletions

View File

@@ -0,0 +1,39 @@
#ifndef CPPKAFKA_CLONABLE_PTR_H
#define CPPKAFKA_CLONABLE_PTR_H
#include <memory>
namespace cppkafka {
template <typename T, typename Deleter, typename Cloner>
class ClonablePtr {
public:
ClonablePtr(T* ptr, const Deleter& deleter, const Cloner& cloner)
: handle_(ptr, deleter), cloner_(cloner) {
}
ClonablePtr(const ClonablePtr& rhs)
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
}
ClonablePtr& operator=(const ClonablePtr& rhs) {
handle_.reset(cloner_(rhs.handle_.get()));
}
ClonablePtr(ClonablePtr&&) = default;
ClonablePtr& operator=(ClonablePtr&&) = default;
~ClonablePtr() = default;
T* get() const {
return handle_.get();
}
private:
std::unique_ptr<T, Deleter> handle_;
Cloner cloner_;
};
} // cppkafka
#endif // CPPKAFKA_CLONABLE_PTR_H

View File

@@ -3,28 +3,40 @@
#include <memory>
#include <string>
#include <functional>
#include <librdkafka/rdkafka.h>
#include "topic_partition_list.h"
#include "clonable_ptr.h"
namespace cppkafka {
class Message;
class Configuration {
public:
using DeliveryReportCallback = std::function<void(const Message&)>;
using OffsetCommitCallback = std::function<void(rd_kafka_resp_err_t,
const TopicPartitionList& topic_partitions)>;
Configuration();
Configuration(const Configuration& rhs);
Configuration(Configuration&& rhs) noexcept = default;
Configuration& operator=(const Configuration& rhs);
Configuration& operator=(Configuration&& rhs) noexcept = default;
void set(const std::string& name, const std::string& value);
void set_delivery_report_callback(DeliveryReportCallback callback);
void set_offset_commit_callback(OffsetCommitCallback callback);
rd_kafka_conf_t* get_handle() const;
const DeliveryReportCallback& get_delivery_report_callback() const;
const OffsetCommitCallback& get_offset_commit_callback() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy)>;
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
decltype(&rd_kafka_conf_dup)>;
Configuration(rd_kafka_conf_t* ptr);
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
HandlePtr handle_;
DeliveryReportCallback delivery_report_callback_;
OffsetCommitCallback offset_commit_callback_;
};
} // cppkafka

View File

@@ -7,10 +7,10 @@
#include <functional>
#include "kafka_handle_base.h"
#include "message.h"
#include "configuration.h"
namespace cppkafka {
class Configuration;
class TopicConfiguration;
class Consumer : public KafkaHandleBase {
@@ -46,6 +46,7 @@ public:
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions);
TopicPartitionList get_subscription();
TopicPartitionList get_assignment();
const Configuration& get_configuration() const;
Message poll();
private:
@@ -59,6 +60,7 @@ private:
AssignmentCallback assignment_callback_;
RevocationCallback revocation_callback_;
RebalanceErrorCallback rebalance_error_callback_;
Configuration config_;
};
} // cppkafka

View File

@@ -11,6 +11,8 @@ namespace cppkafka {
class Message {
public:
static Message make_non_owning(rd_kafka_message_t* handle);
Message();
Message(rd_kafka_message_t* handle);
Message(const Message&) = delete;
@@ -32,6 +34,11 @@ public:
private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
struct NonOwningTag { };
Message(rd_kafka_message_t* handle, NonOwningTag);
Message(HandlePtr handle);
HandlePtr handle_;
Buffer payload_;
Buffer key_;

View File

@@ -27,11 +27,15 @@ public:
void set_payload_policy(PayloadPolicy policy);
PayloadPolicy get_payload_policy() const;
const Configuration& get_configuration() const;
void produce(const Topic& topic, const Partition& partition, const Buffer& payload);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key, void* user_data);
int poll();
private:
Configuration config_;
PayloadPolicy message_payload_policy_;

View File

@@ -15,6 +15,7 @@ using TopicPartitionList = std::vector<TopicPartition>;
TopicPartitionsListPtr convert(const std::vector<TopicPartition>& topic_partitions);
std::vector<TopicPartition> convert(const TopicPartitionsListPtr& topic_partitions);
std::vector<TopicPartition> convert(rd_kafka_topic_partition_list_t* topic_partitions);
TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle);
} // cppkafka

View File

@@ -1,11 +1,39 @@
#include "configuration.h"
#include <librdkafka/rdkafka.h>
#include "exceptions.h"
#include "message.h"
#include "producer.h"
#include "consumer.h"
using std::string;
namespace cppkafka {
// Callback proxies
void delivery_report_proxy(rd_kafka_t *rk, const rd_kafka_message_t* msg, void *opaque) {
const Producer* producer = static_cast<const Producer*>(opaque);
Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
const auto& callback = producer->get_configuration().get_delivery_report_callback();
if (callback) {
callback(message);
}
}
void offset_commit_proxy(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *offsets,
void *opaque) {
const Consumer* consumer = static_cast<const Consumer*>(opaque);
TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{};
const auto& callback = consumer->get_configuration().get_offset_commit_callback();
if (callback) {
callback(err, list);
}
}
// Configuration
Configuration::Configuration()
: handle_(make_handle(rd_kafka_conf_new())) {
@@ -16,16 +44,6 @@ Configuration::Configuration(rd_kafka_conf_t* ptr)
}
Configuration::Configuration(const Configuration& rhs)
: handle_(make_handle(rd_kafka_conf_dup(rhs.handle_.get()))) {
}
Configuration& Configuration::operator=(const Configuration& rhs) {
handle_.reset(rd_kafka_conf_dup(rhs.handle_.get()));
return *this;
}
void Configuration::set(const string& name, const string& value) {
char error_buffer[512];
rd_kafka_conf_res_t result;
@@ -36,12 +54,30 @@ void Configuration::set(const string& name, const string& value) {
}
}
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
delivery_report_callback_ = move(callback);
rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_proxy);
}
void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) {
offset_commit_callback_ = move(callback);
rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_proxy);
}
rd_kafka_conf_t* Configuration::get_handle() const {
return handle_.get();
}
const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
return delivery_report_callback_;
}
const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_callback() const {
return offset_commit_callback_;
}
Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
return HandlePtr(ptr, &rd_kafka_conf_destroy);
return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup);
}
} // cppkafka

View File

@@ -5,30 +5,26 @@
using std::vector;
using std::string;
using std::move;
using std::chrono::milliseconds;
namespace cppkafka {
void dummy_topic_partition_list_deleter(rd_kafka_topic_partition_list_t*) {
}
void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque) {
// Build a dummy unique_ptr which won't actually delete the ptr
TopicPartitionsListPtr handle(partitions, &dummy_topic_partition_list_deleter);
TopicPartitionList list = convert(handle);
TopicPartitionList list = convert(partitions);
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}
Consumer::Consumer(Configuration config) {
Consumer::Consumer(Configuration config)
: config_(move(config)) {
char error_buffer[512];
// Set ourselves as the opaque pointer
rd_kafka_conf_set_opaque(config.get_handle(), this);
rd_kafka_conf_set_rebalance_cb(config.get_handle(), &Consumer::rebalance_proxy);
rd_kafka_conf_set_opaque(config_.get_handle(), this);
rd_kafka_conf_set_rebalance_cb(config_.get_handle(), &Consumer::rebalance_proxy);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_conf_dup(config.get_handle()),
rd_kafka_conf_dup(config_.get_handle()),
error_buffer, sizeof(error_buffer));
if (!ptr) {
throw Exception("Failed to create consumer handle: " + string(error_buffer));
@@ -130,6 +126,10 @@ TopicPartitionList Consumer::get_assignment() {
return convert(make_handle(list));
}
const Configuration& Consumer::get_configuration() const {
return config_;
}
Message Consumer::poll() {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),
get_timeout().count());

View File

@@ -4,15 +4,33 @@ using std::string;
namespace cppkafka {
void dummy_deleter(rd_kafka_message_t*) {
}
Message Message::make_non_owning(rd_kafka_message_t* handle) {
return Message(handle, NonOwningTag());
}
Message::Message()
: handle_(nullptr, nullptr) {
}
Message::Message(rd_kafka_message_t* handle)
: handle_(handle, &rd_kafka_message_destroy),
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
key_((const Buffer::DataType*)handle_->key, handle_->key_len) {
: Message(HandlePtr(handle, &rd_kafka_message_destroy)) {
}
Message::Message(rd_kafka_message_t* handle, NonOwningTag)
: Message(HandlePtr(handle, &dummy_deleter)) {
}
Message::Message(HandlePtr handle)
: handle_(move(handle)),
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
key_((const Buffer::DataType*)handle_->key, handle_->key_len) {
}

View File

@@ -10,6 +10,7 @@ namespace cppkafka {
Producer::Producer(Configuration config)
: config_(move(config)) {
char error_buffer[512];
rd_kafka_conf_set_opaque(config_.get_handle(), this);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER,
rd_kafka_conf_dup(config_.get_handle()),
error_buffer, sizeof(error_buffer));
@@ -29,6 +30,10 @@ Producer::PayloadPolicy Producer::get_payload_policy() const {
return message_payload_policy_;
}
const Configuration& Producer::get_configuration() const {
return config_;
}
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) {
produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/);
}
@@ -51,4 +56,8 @@ void Producer::produce(const Topic& topic, const Partition& partition, const Buf
}
}
int Producer::poll() {
return rd_kafka_poll(get_handle(), get_timeout().count());
}
} // cppkafka

View File

@@ -20,6 +20,10 @@ TopicPartitionsListPtr convert(const vector<TopicPartition>& topic_partitions) {
}
vector<TopicPartition> convert(const TopicPartitionsListPtr& topic_partitions) {
return convert(topic_partitions.get());
}
vector<TopicPartition> convert(rd_kafka_topic_partition_list_t* topic_partitions) {
vector<TopicPartition> output;
for (int i = 0; i < topic_partitions->cnt; ++i) {
const auto& elem = topic_partitions->elems[i];

View File

@@ -44,7 +44,7 @@ public:
cond.notify_one();
}
}
else if (msg && msg.get_error() == 0) {
else if (msg && msg.get_error() == 0 && number_eofs == partitions) {
messages_.push_back(move(msg));
}
}
@@ -90,11 +90,11 @@ public:
return config;
}
Configuration make_consumer_config() {
Configuration make_consumer_config(const string& group_id = "consumer_test") {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
config.set("enable.auto.commit", "false");
config.set("group.id", "consumer_test");
config.set("group.id", group_id);
return config;
}
};
@@ -181,3 +181,40 @@ TEST_F(ConsumerTest, Rebalance) {
}
EXPECT_EQ(1, runner1.get_messages().size() + runner2.get_messages().size());
}
TEST_F(ConsumerTest, OffsetCommit) {
int partition = 0;
int64_t message_offset = 0;
bool offset_commit_called = false;
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
config.set_offset_commit_callback([&](rd_kafka_resp_err_t error,
const TopicPartitionList& topic_partitions) {
offset_commit_called = true;
EXPECT_EQ(0, error);
ASSERT_EQ(1, topic_partitions.size());
EXPECT_EQ(KAFKA_TOPIC, topic_partitions[0].get_topic());
EXPECT_EQ(0, topic_partitions[0].get_partition());
EXPECT_EQ(message_offset + 1, topic_partitions[0].get_offset());
});
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
ConsumerRunner runner(consumer, 1, 1);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
runner.try_join();
ASSERT_EQ(1, runner.get_messages().size());
const Message& msg = runner.get_messages()[0];
message_offset = msg.get_offset();
consumer.commit(msg);
for (size_t i = 0; i < 3 && !offset_commit_called; ++i) {
consumer.poll();
}
EXPECT_TRUE(offset_commit_called);
}

View File

@@ -186,3 +186,36 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
EXPECT_LT(message.get_partition(), 3);
}
}
TEST_F(ProducerTest, Callbacks) {
int partition = 0;
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, 1, 1);
// Now create a producer and produce a message
string payload = "Hello world!";
bool deliver_report_called = false;
Configuration config = make_producer_config();
config.set_delivery_report_callback([&](const Message& msg) {
EXPECT_EQ(payload, msg.get_payload().as_string());
deliver_report_called = true;
});
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC);
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
producer.poll();
runner.try_join();
const auto& messages = runner.get_messages();
ASSERT_EQ(1, messages.size());
const auto& message = messages[0];
EXPECT_EQ(payload, message.get_payload().as_string());
EXPECT_EQ("", message.get_key().as_string());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
EXPECT_TRUE(deliver_report_called);
}