mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 19:18:04 +00:00
Add more producer stuff
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
#ifndef CPPKAFKA_BUFFER_H
|
||||
#define CPPKAFKA_BUFFER_H
|
||||
|
||||
#include <cstddef>
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
class Buffer {
|
||||
@@ -8,7 +10,6 @@ public:
|
||||
using DataType = unsigned char;
|
||||
|
||||
Buffer();
|
||||
Buffer(const std::string& data);
|
||||
Buffer(const DataType* data, size_t size);
|
||||
|
||||
Buffer(const Buffer&) = delete;
|
||||
|
||||
@@ -16,7 +16,7 @@ public:
|
||||
|
||||
rd_kafka_t* get_handle();
|
||||
Topic get_topic(const std::string& name);
|
||||
Topic get_topic(const std::string& name, const TopicConfiguration& config);
|
||||
Topic get_topic(const std::string& name, TopicConfiguration config);
|
||||
protected:
|
||||
KafkaHandleBase();
|
||||
KafkaHandleBase(rd_kafka_t* handle);
|
||||
@@ -25,6 +25,8 @@ protected:
|
||||
private:
|
||||
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
|
||||
|
||||
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
|
||||
|
||||
HandlePtr handle_;
|
||||
};
|
||||
|
||||
|
||||
@@ -10,11 +10,20 @@ namespace cppkafka {
|
||||
class Topic;
|
||||
class Buffer;
|
||||
class Partition;
|
||||
class TopicConfiguration;
|
||||
|
||||
class Producer : public KafkaHandleBase {
|
||||
public:
|
||||
enum PayloadPolicy {
|
||||
COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY
|
||||
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
|
||||
};
|
||||
|
||||
Producer(Configuration config);
|
||||
|
||||
void set_payload_policy(PayloadPolicy policy);
|
||||
PayloadPolicy get_payload_policy() const;
|
||||
|
||||
void produce(const Topic& topic, const Partition& partition, const Buffer& payload);
|
||||
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
|
||||
const Buffer& key);
|
||||
@@ -22,7 +31,7 @@ public:
|
||||
const Buffer& key, void* user_data);
|
||||
private:
|
||||
Configuration config_;
|
||||
int message_payload_policy_;
|
||||
PayloadPolicy message_payload_policy_;
|
||||
};
|
||||
|
||||
} // cppkafka
|
||||
|
||||
@@ -4,6 +4,7 @@ set(SOURCES
|
||||
exceptions.cpp
|
||||
topic.cpp
|
||||
partition.cpp
|
||||
buffer.cpp
|
||||
|
||||
kafka_handle_base.cpp
|
||||
producer.cpp
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
#include "buffer.h"
|
||||
|
||||
using std::string;
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
Buffer::Buffer()
|
||||
@@ -9,11 +7,6 @@ Buffer::Buffer()
|
||||
|
||||
}
|
||||
|
||||
Buffer::Buffer(const string& data)
|
||||
: data_(data.data()), size_(data.size()) {
|
||||
|
||||
}
|
||||
|
||||
Buffer::Buffer(const DataType* data, size_t size)
|
||||
: data_(data), size_(size) {
|
||||
|
||||
|
||||
@@ -17,27 +17,26 @@ KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle)
|
||||
|
||||
}
|
||||
|
||||
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
||||
handle_ = HandlePtr(handle, &rd_kafka_destroy);
|
||||
}
|
||||
|
||||
rd_kafka_t* KafkaHandleBase::get_handle() {
|
||||
return handle_.get();
|
||||
}
|
||||
|
||||
Topic KafkaHandleBase::get_topic(const string& name) {
|
||||
rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), nullptr);
|
||||
if (!topic) {
|
||||
throw Exception("Failed to create topic handle");
|
||||
}
|
||||
return Topic(topic);
|
||||
return get_topic(name, nullptr);
|
||||
}
|
||||
|
||||
Topic KafkaHandleBase::get_topic(const string& name, const TopicConfiguration& config) {
|
||||
rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(),
|
||||
config.get_handle());
|
||||
Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicConfig) {
|
||||
return get_topic(name, topicConfig.get_handle());
|
||||
}
|
||||
|
||||
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
||||
handle_ = HandlePtr(handle, &rd_kafka_destroy);
|
||||
}
|
||||
|
||||
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
|
||||
rd_kafka_topic_t* topic = rd_kafka_topic_new(get_handle(), name.data(), conf);
|
||||
if (!topic) {
|
||||
throw Exception("Failed to create topic handle");
|
||||
throw HandleException(rd_kafka_errno2err(errno));
|
||||
}
|
||||
return Topic(topic);
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ using std::string;
|
||||
namespace cppkafka {
|
||||
|
||||
Producer::Producer(Configuration config)
|
||||
: config_(move(config)), message_payload_policy_(RD_KAFKA_MSG_F_COPY) {
|
||||
: config_(move(config)) {
|
||||
char error_buffer[512];
|
||||
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(),
|
||||
error_buffer, sizeof(error_buffer));
|
||||
@@ -19,6 +19,15 @@ Producer::Producer(Configuration config)
|
||||
throw Exception("Failed to create producer handle: " + string(error_buffer));
|
||||
}
|
||||
set_handle(ptr);
|
||||
set_payload_policy(Producer::COPY_PAYLOAD);
|
||||
}
|
||||
|
||||
void Producer::set_payload_policy(PayloadPolicy policy) {
|
||||
message_payload_policy_ = policy;
|
||||
}
|
||||
|
||||
Producer::PayloadPolicy Producer::get_payload_policy() const {
|
||||
return message_payload_policy_;
|
||||
}
|
||||
|
||||
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) {
|
||||
@@ -34,8 +43,9 @@ void Producer::produce(const Topic& topic, const Partition& partition, const Buf
|
||||
const Buffer& key, void* user_data) {
|
||||
void* payload_ptr = (void*)payload.get_data();
|
||||
void* key_ptr = (void*)key.get_data();
|
||||
const int policy = static_cast<int>(message_payload_policy_);
|
||||
int result = rd_kafka_produce(topic.get_handle(), partition.get_partition(),
|
||||
message_payload_policy_, payload_ptr, payload.get_size(),
|
||||
policy, payload_ptr, payload.get_size(),
|
||||
key_ptr, key.get_size(), user_data);
|
||||
if (result == -1) {
|
||||
throw HandleException(rd_kafka_errno2err(errno));
|
||||
|
||||
Reference in New Issue
Block a user