diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h new file mode 100644 index 0000000..7b6b92e --- /dev/null +++ b/include/cppkafka/clonable_ptr.h @@ -0,0 +1,39 @@ +#ifndef CPPKAFKA_CLONABLE_PTR_H +#define CPPKAFKA_CLONABLE_PTR_H + +#include + +namespace cppkafka { + +template +class ClonablePtr { +public: + ClonablePtr(T* ptr, const Deleter& deleter, const Cloner& cloner) + : handle_(ptr, deleter), cloner_(cloner) { + + } + + ClonablePtr(const ClonablePtr& rhs) + : handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) { + + } + + ClonablePtr& operator=(const ClonablePtr& rhs) { + handle_.reset(cloner_(rhs.handle_.get())); + } + + ClonablePtr(ClonablePtr&&) = default; + ClonablePtr& operator=(ClonablePtr&&) = default; + ~ClonablePtr() = default; + + T* get() const { + return handle_.get(); + } +private: + std::unique_ptr handle_; + Cloner cloner_; +}; + +} // cppkafka + +#endif // CPPKAFKA_CLONABLE_PTR_H diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 28290cd..7617ac7 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -3,28 +3,40 @@ #include #include +#include #include +#include "topic_partition_list.h" +#include "clonable_ptr.h" namespace cppkafka { +class Message; + class Configuration { public: + using DeliveryReportCallback = std::function; + using OffsetCommitCallback = std::function; + Configuration(); - Configuration(const Configuration& rhs); - Configuration(Configuration&& rhs) noexcept = default; - Configuration& operator=(const Configuration& rhs); - Configuration& operator=(Configuration&& rhs) noexcept = default; void set(const std::string& name, const std::string& value); + void set_delivery_report_callback(DeliveryReportCallback callback); + void set_offset_commit_callback(OffsetCommitCallback callback); rd_kafka_conf_t* get_handle() const; + const DeliveryReportCallback& get_delivery_report_callback() const; + const OffsetCommitCallback& get_offset_commit_callback() const; private: - using HandlePtr = std::unique_ptr; + using HandlePtr = ClonablePtr; Configuration(rd_kafka_conf_t* ptr); static HandlePtr make_handle(rd_kafka_conf_t* ptr); HandlePtr handle_; + DeliveryReportCallback delivery_report_callback_; + OffsetCommitCallback offset_commit_callback_; }; } // cppkafka diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index c7a42e6..e7f9325 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -7,10 +7,10 @@ #include #include "kafka_handle_base.h" #include "message.h" +#include "configuration.h" namespace cppkafka { -class Configuration; class TopicConfiguration; class Consumer : public KafkaHandleBase { @@ -46,6 +46,7 @@ public: TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions); TopicPartitionList get_subscription(); TopicPartitionList get_assignment(); + const Configuration& get_configuration() const; Message poll(); private: @@ -59,6 +60,7 @@ private: AssignmentCallback assignment_callback_; RevocationCallback revocation_callback_; RebalanceErrorCallback rebalance_error_callback_; + Configuration config_; }; } // cppkafka diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 4910175..739d38d 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -11,6 +11,8 @@ namespace cppkafka { class Message { public: + static Message make_non_owning(rd_kafka_message_t* handle); + Message(); Message(rd_kafka_message_t* handle); Message(const Message&) = delete; @@ -32,6 +34,11 @@ public: private: using HandlePtr = std::unique_ptr; + struct NonOwningTag { }; + + Message(rd_kafka_message_t* handle, NonOwningTag); + Message(HandlePtr handle); + HandlePtr handle_; Buffer payload_; Buffer key_; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 8986089..9e2a1d9 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -27,11 +27,15 @@ public: void set_payload_policy(PayloadPolicy policy); PayloadPolicy get_payload_policy() const; + const Configuration& get_configuration() const; + void produce(const Topic& topic, const Partition& partition, const Buffer& payload); void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key); void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key, void* user_data); + + int poll(); private: Configuration config_; PayloadPolicy message_payload_policy_; diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index f94ac11..cd8e651 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -15,6 +15,7 @@ using TopicPartitionList = std::vector; TopicPartitionsListPtr convert(const std::vector& topic_partitions); std::vector convert(const TopicPartitionsListPtr& topic_partitions); +std::vector convert(rd_kafka_topic_partition_list_t* topic_partitions); TopicPartitionsListPtr make_handle(rd_kafka_topic_partition_list_t* handle); } // cppkafka diff --git a/src/configuration.cpp b/src/configuration.cpp index ae91852..0057e25 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -1,11 +1,39 @@ #include "configuration.h" #include #include "exceptions.h" +#include "message.h" +#include "producer.h" +#include "consumer.h" using std::string; namespace cppkafka { +// Callback proxies + +void delivery_report_proxy(rd_kafka_t *rk, const rd_kafka_message_t* msg, void *opaque) { + const Producer* producer = static_cast(opaque); + Message message = Message::make_non_owning((rd_kafka_message_t*)msg); + const auto& callback = producer->get_configuration().get_delivery_report_callback(); + if (callback) { + callback(message); + } +} + +void offset_commit_proxy(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets, + void *opaque) { + const Consumer* consumer = static_cast(opaque); + TopicPartitionList list = offsets ? convert(offsets) : TopicPartitionList{}; + const auto& callback = consumer->get_configuration().get_offset_commit_callback(); + if (callback) { + callback(err, list); + } +} + +// Configuration + Configuration::Configuration() : handle_(make_handle(rd_kafka_conf_new())) { @@ -16,16 +44,6 @@ Configuration::Configuration(rd_kafka_conf_t* ptr) } -Configuration::Configuration(const Configuration& rhs) -: handle_(make_handle(rd_kafka_conf_dup(rhs.handle_.get()))) { - -} - -Configuration& Configuration::operator=(const Configuration& rhs) { - handle_.reset(rd_kafka_conf_dup(rhs.handle_.get())); - return *this; -} - void Configuration::set(const string& name, const string& value) { char error_buffer[512]; rd_kafka_conf_res_t result; @@ -36,12 +54,30 @@ void Configuration::set(const string& name, const string& value) { } } +void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { + delivery_report_callback_ = move(callback); + rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_proxy); +} + +void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { + offset_commit_callback_ = move(callback); + rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_proxy); +} + rd_kafka_conf_t* Configuration::get_handle() const { return handle_.get(); } +const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { + return delivery_report_callback_; +} + +const Configuration::OffsetCommitCallback& Configuration::get_offset_commit_callback() const { + return offset_commit_callback_; +} + Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { - return HandlePtr(ptr, &rd_kafka_conf_destroy); + return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup); } } // cppkafka diff --git a/src/consumer.cpp b/src/consumer.cpp index df0b828..9bfcd39 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -5,30 +5,26 @@ using std::vector; using std::string; +using std::move; using std::chrono::milliseconds; namespace cppkafka { -void dummy_topic_partition_list_deleter(rd_kafka_topic_partition_list_t*) { - -} - void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque) { - // Build a dummy unique_ptr which won't actually delete the ptr - TopicPartitionsListPtr handle(partitions, &dummy_topic_partition_list_deleter); - TopicPartitionList list = convert(handle); + TopicPartitionList list = convert(partitions); static_cast(opaque)->handle_rebalance(error, list); } -Consumer::Consumer(Configuration config) { +Consumer::Consumer(Configuration config) +: config_(move(config)) { char error_buffer[512]; // Set ourselves as the opaque pointer - rd_kafka_conf_set_opaque(config.get_handle(), this); - rd_kafka_conf_set_rebalance_cb(config.get_handle(), &Consumer::rebalance_proxy); + rd_kafka_conf_set_opaque(config_.get_handle(), this); + rd_kafka_conf_set_rebalance_cb(config_.get_handle(), &Consumer::rebalance_proxy); rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, - rd_kafka_conf_dup(config.get_handle()), + rd_kafka_conf_dup(config_.get_handle()), error_buffer, sizeof(error_buffer)); if (!ptr) { throw Exception("Failed to create consumer handle: " + string(error_buffer)); @@ -130,6 +126,10 @@ TopicPartitionList Consumer::get_assignment() { return convert(make_handle(list)); } +const Configuration& Consumer::get_configuration() const { + return config_; +} + Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), get_timeout().count()); diff --git a/src/message.cpp b/src/message.cpp index 2bef564..8f71e8e 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -4,15 +4,33 @@ using std::string; namespace cppkafka { +void dummy_deleter(rd_kafka_message_t*) { + +} + +Message Message::make_non_owning(rd_kafka_message_t* handle) { + return Message(handle, NonOwningTag()); +} + Message::Message() : handle_(nullptr, nullptr) { } Message::Message(rd_kafka_message_t* handle) -: handle_(handle, &rd_kafka_message_destroy), -payload_((const Buffer::DataType*)handle_->payload, handle_->len), -key_((const Buffer::DataType*)handle_->key, handle_->key_len) { +: Message(HandlePtr(handle, &rd_kafka_message_destroy)) { + +} + +Message::Message(rd_kafka_message_t* handle, NonOwningTag) +: Message(HandlePtr(handle, &dummy_deleter)) { + +} + +Message::Message(HandlePtr handle) +: handle_(move(handle)), + payload_((const Buffer::DataType*)handle_->payload, handle_->len), + key_((const Buffer::DataType*)handle_->key, handle_->key_len) { } diff --git a/src/producer.cpp b/src/producer.cpp index db7e3dc..62d212f 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -10,6 +10,7 @@ namespace cppkafka { Producer::Producer(Configuration config) : config_(move(config)) { char error_buffer[512]; + rd_kafka_conf_set_opaque(config_.get_handle(), this); rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(config_.get_handle()), error_buffer, sizeof(error_buffer)); @@ -29,6 +30,10 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { return message_payload_policy_; } +const Configuration& Producer::get_configuration() const { + return config_; +} + void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/); } @@ -51,4 +56,8 @@ void Producer::produce(const Topic& topic, const Partition& partition, const Buf } } +int Producer::poll() { + return rd_kafka_poll(get_handle(), get_timeout().count()); +} + } // cppkafka diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index 42d49df..825a7de 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -20,6 +20,10 @@ TopicPartitionsListPtr convert(const vector& topic_partitions) { } vector convert(const TopicPartitionsListPtr& topic_partitions) { + return convert(topic_partitions.get()); +} + +vector convert(rd_kafka_topic_partition_list_t* topic_partitions) { vector output; for (int i = 0; i < topic_partitions->cnt; ++i) { const auto& elem = topic_partitions->elems[i]; diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 03971e2..38ed0c7 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -44,7 +44,7 @@ public: cond.notify_one(); } } - else if (msg && msg.get_error() == 0) { + else if (msg && msg.get_error() == 0 && number_eofs == partitions) { messages_.push_back(move(msg)); } } @@ -90,11 +90,11 @@ public: return config; } - Configuration make_consumer_config() { + Configuration make_consumer_config(const string& group_id = "consumer_test") { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); config.set("enable.auto.commit", "false"); - config.set("group.id", "consumer_test"); + config.set("group.id", group_id); return config; } }; @@ -181,3 +181,40 @@ TEST_F(ConsumerTest, Rebalance) { } EXPECT_EQ(1, runner1.get_messages().size() + runner2.get_messages().size()); } + +TEST_F(ConsumerTest, OffsetCommit) { + int partition = 0; + int64_t message_offset = 0; + bool offset_commit_called = false; + + // Create a consumer and subscribe to the topic + Configuration config = make_consumer_config("offset_commit"); + config.set_offset_commit_callback([&](rd_kafka_resp_err_t error, + const TopicPartitionList& topic_partitions) { + offset_commit_called = true; + EXPECT_EQ(0, error); + ASSERT_EQ(1, topic_partitions.size()); + EXPECT_EQ(KAFKA_TOPIC, topic_partitions[0].get_topic()); + EXPECT_EQ(0, topic_partitions[0].get_partition()); + EXPECT_EQ(message_offset + 1, topic_partitions[0].get_offset()); + }); + Consumer consumer(config); + consumer.assign({ { KAFKA_TOPIC, 0 } }); + ConsumerRunner runner(consumer, 1, 1); + + // Produce a message just so we stop the consumer + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world!"; + producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + runner.try_join(); + + ASSERT_EQ(1, runner.get_messages().size()); + const Message& msg = runner.get_messages()[0]; + message_offset = msg.get_offset(); + consumer.commit(msg); + for (size_t i = 0; i < 3 && !offset_commit_called; ++i) { + consumer.poll(); + } + EXPECT_TRUE(offset_commit_called); +} diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 1cd7023..b981a61 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -186,3 +186,36 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { EXPECT_LT(message.get_partition(), 3); } } + +TEST_F(ProducerTest, Callbacks) { + int partition = 0; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 1, 1); + + // Now create a producer and produce a message + string payload = "Hello world!"; + bool deliver_report_called = false; + Configuration config = make_producer_config(); + config.set_delivery_report_callback([&](const Message& msg) { + EXPECT_EQ(payload, msg.get_payload().as_string()); + deliver_report_called = true; + }); + Producer producer(move(config)); + Topic topic = producer.get_topic(KAFKA_TOPIC); + producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + producer.poll(); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(1, messages.size()); + const auto& message = messages[0]; + EXPECT_EQ(payload, message.get_payload().as_string()); + EXPECT_EQ("", message.get_key().as_string()); + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(partition, message.get_partition()); + EXPECT_EQ(0, message.get_error()); + EXPECT_TRUE(deliver_report_called); +}