From 5848bccdb8919edda9b0edd9dcb17d3d7ccfb407 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Tue, 17 May 2016 18:51:30 -0700 Subject: [PATCH] Add more producer stuff --- include/cppkafka/buffer.h | 3 ++- include/cppkafka/kafka_handle_base.h | 4 +++- include/cppkafka/producer.h | 11 ++++++++++- src/CMakeLists.txt | 1 + src/buffer.cpp | 7 ------- src/kafka_handle_base.cpp | 25 ++++++++++++------------- src/producer.cpp | 14 ++++++++++++-- 7 files changed, 40 insertions(+), 25 deletions(-) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index a84b337..53974ce 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -1,6 +1,8 @@ #ifndef CPPKAFKA_BUFFER_H #define CPPKAFKA_BUFFER_H +#include + namespace cppkafka { class Buffer { @@ -8,7 +10,6 @@ public: using DataType = unsigned char; Buffer(); - Buffer(const std::string& data); Buffer(const DataType* data, size_t size); Buffer(const Buffer&) = delete; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 32bbeec..76b8606 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -16,7 +16,7 @@ public: rd_kafka_t* get_handle(); Topic get_topic(const std::string& name); - Topic get_topic(const std::string& name, const TopicConfiguration& config); + Topic get_topic(const std::string& name, TopicConfiguration config); protected: KafkaHandleBase(); KafkaHandleBase(rd_kafka_t* handle); @@ -25,6 +25,8 @@ protected: private: using HandlePtr = std::unique_ptr; + Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); + HandlePtr handle_; }; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index b6abac1..7a51e1b 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -10,11 +10,20 @@ namespace cppkafka { class Topic; class Buffer; class Partition; +class TopicConfiguration; class Producer : public KafkaHandleBase { public: + enum PayloadPolicy { + COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY + FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE + }; + Producer(Configuration config); + void set_payload_policy(PayloadPolicy policy); + PayloadPolicy get_payload_policy() const; + void produce(const Topic& topic, const Partition& partition, const Buffer& payload); void produce(const Topic& topic, const Partition& partition, const Buffer& payload, const Buffer& key); @@ -22,7 +31,7 @@ public: const Buffer& key, void* user_data); private: Configuration config_; - int message_payload_policy_; + PayloadPolicy message_payload_policy_; }; } // cppkafka diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0962acf..a2a4d25 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ set(SOURCES exceptions.cpp topic.cpp partition.cpp + buffer.cpp kafka_handle_base.cpp producer.cpp diff --git a/src/buffer.cpp b/src/buffer.cpp index 5c289d2..3e0136c 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -1,7 +1,5 @@ #include "buffer.h" -using std::string; - namespace cppkafka { Buffer::Buffer() @@ -9,11 +7,6 @@ Buffer::Buffer() } -Buffer::Buffer(const string& data) -: data_(data.data()), size_(data.size()) { - -} - Buffer::Buffer(const DataType* data, size_t size) : data_(data), size_(size) { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 564bf60..3b38343 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -17,27 +17,26 @@ KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) } -void KafkaHandleBase::set_handle(rd_kafka_t* handle) { - handle_ = HandlePtr(handle, &rd_kafka_destroy); -} - rd_kafka_t* KafkaHandleBase::get_handle() { return handle_.get(); } Topic KafkaHandleBase::get_topic(const string& name) { - rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), nullptr); - if (!topic) { - throw Exception("Failed to create topic handle"); - } - return Topic(topic); + return get_topic(name, nullptr); } -Topic KafkaHandleBase::get_topic(const string& name, const TopicConfiguration& config) { - rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), - config.get_handle()); +Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicConfig) { + return get_topic(name, topicConfig.get_handle()); +} + +void KafkaHandleBase::set_handle(rd_kafka_t* handle) { + handle_ = HandlePtr(handle, &rd_kafka_destroy); +} + +Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) { + rd_kafka_topic_t* topic = rd_kafka_topic_new(get_handle(), name.data(), conf); if (!topic) { - throw Exception("Failed to create topic handle"); + throw HandleException(rd_kafka_errno2err(errno)); } return Topic(topic); } diff --git a/src/producer.cpp b/src/producer.cpp index 07d44fa..a25e946 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -11,7 +11,7 @@ using std::string; namespace cppkafka { Producer::Producer(Configuration config) -: config_(move(config)), message_payload_policy_(RD_KAFKA_MSG_F_COPY) { +: config_(move(config)) { char error_buffer[512]; rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(), error_buffer, sizeof(error_buffer)); @@ -19,6 +19,15 @@ Producer::Producer(Configuration config) throw Exception("Failed to create producer handle: " + string(error_buffer)); } set_handle(ptr); + set_payload_policy(Producer::COPY_PAYLOAD); +} + +void Producer::set_payload_policy(PayloadPolicy policy) { + message_payload_policy_ = policy; +} + +Producer::PayloadPolicy Producer::get_payload_policy() const { + return message_payload_policy_; } void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { @@ -34,8 +43,9 @@ void Producer::produce(const Topic& topic, const Partition& partition, const Buf const Buffer& key, void* user_data) { void* payload_ptr = (void*)payload.get_data(); void* key_ptr = (void*)key.get_data(); + const int policy = static_cast(message_payload_policy_); int result = rd_kafka_produce(topic.get_handle(), partition.get_partition(), - message_payload_policy_, payload_ptr, payload.get_size(), + policy, payload_ptr, payload.get_size(), key_ptr, key.get_size(), user_data); if (result == -1) { throw HandleException(rd_kafka_errno2err(errno));