mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 19:18:04 +00:00
Allow setting default topic config
This commit is contained in:
@@ -20,6 +20,7 @@ public:
|
|||||||
|
|
||||||
ClonablePtr& operator=(const ClonablePtr& rhs) {
|
ClonablePtr& operator=(const ClonablePtr& rhs) {
|
||||||
handle_.reset(cloner_(rhs.handle_.get()));
|
handle_.reset(cloner_(rhs.handle_.get()));
|
||||||
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
ClonablePtr(ClonablePtr&&) = default;
|
ClonablePtr(ClonablePtr&&) = default;
|
||||||
|
|||||||
@@ -5,8 +5,10 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <boost/optional.hpp>
|
||||||
#include <librdkafka/rdkafka.h>
|
#include <librdkafka/rdkafka.h>
|
||||||
#include "topic_partition_list.h"
|
#include "topic_partition_list.h"
|
||||||
|
#include "topic_configuration.h"
|
||||||
#include "clonable_ptr.h"
|
#include "clonable_ptr.h"
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
@@ -43,6 +45,7 @@ public:
|
|||||||
void set_log_callback(LogCallback callback);
|
void set_log_callback(LogCallback callback);
|
||||||
void set_stats_callback(StatsCallback callback);
|
void set_stats_callback(StatsCallback callback);
|
||||||
void set_socket_callback(SocketCallback callback);
|
void set_socket_callback(SocketCallback callback);
|
||||||
|
void set_default_topic_configuration(boost::optional<TopicConfiguration> config);
|
||||||
|
|
||||||
rd_kafka_conf_t* get_handle() const;
|
rd_kafka_conf_t* get_handle() const;
|
||||||
const DeliveryReportCallback& get_delivery_report_callback() const;
|
const DeliveryReportCallback& get_delivery_report_callback() const;
|
||||||
@@ -52,6 +55,8 @@ public:
|
|||||||
const LogCallback& get_log_callback() const;
|
const LogCallback& get_log_callback() const;
|
||||||
const StatsCallback& get_stats_callback() const;
|
const StatsCallback& get_stats_callback() const;
|
||||||
const SocketCallback& get_socket_callback() const;
|
const SocketCallback& get_socket_callback() const;
|
||||||
|
const boost::optional<TopicConfiguration>& get_default_topic_configuration() const;
|
||||||
|
boost::optional<TopicConfiguration>& get_default_topic_configuration();
|
||||||
private:
|
private:
|
||||||
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
|
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
|
||||||
decltype(&rd_kafka_conf_dup)>;
|
decltype(&rd_kafka_conf_dup)>;
|
||||||
@@ -60,6 +65,7 @@ private:
|
|||||||
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
|
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
|
||||||
|
|
||||||
HandlePtr handle_;
|
HandlePtr handle_;
|
||||||
|
boost::optional<TopicConfiguration> default_topic_config_;
|
||||||
DeliveryReportCallback delivery_report_callback_;
|
DeliveryReportCallback delivery_report_callback_;
|
||||||
OffsetCommitCallback offset_commit_callback_;
|
OffsetCommitCallback offset_commit_callback_;
|
||||||
ErrorCallback error_callback_;
|
ErrorCallback error_callback_;
|
||||||
|
|||||||
@@ -39,7 +39,6 @@ public:
|
|||||||
const Configuration& get_configuration() const;
|
const Configuration& get_configuration() const;
|
||||||
protected:
|
protected:
|
||||||
KafkaHandleBase(Configuration config);
|
KafkaHandleBase(Configuration config);
|
||||||
KafkaHandleBase(rd_kafka_t* handle);
|
|
||||||
|
|
||||||
void set_handle(rd_kafka_t* handle);
|
void set_handle(rd_kafka_t* handle);
|
||||||
void check_error(rd_kafka_resp_err_t error);
|
void check_error(rd_kafka_resp_err_t error);
|
||||||
|
|||||||
@@ -6,6 +6,9 @@
|
|||||||
#include "consumer.h"
|
#include "consumer.h"
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
|
using std::move;
|
||||||
|
|
||||||
|
using boost::optional;
|
||||||
|
|
||||||
using std::chrono::milliseconds;
|
using std::chrono::milliseconds;
|
||||||
|
|
||||||
@@ -135,6 +138,10 @@ void Configuration::set_socket_callback(SocketCallback callback) {
|
|||||||
rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy);
|
rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Configuration::set_default_topic_configuration(optional<TopicConfiguration> config) {
|
||||||
|
default_topic_config_ = move(config);
|
||||||
|
}
|
||||||
|
|
||||||
rd_kafka_conf_t* Configuration::get_handle() const {
|
rd_kafka_conf_t* Configuration::get_handle() const {
|
||||||
return handle_.get();
|
return handle_.get();
|
||||||
}
|
}
|
||||||
@@ -167,6 +174,14 @@ const Configuration::SocketCallback& Configuration::get_socket_callback() const
|
|||||||
return socket_callback_;
|
return socket_callback_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const optional<TopicConfiguration>& Configuration::get_default_topic_configuration() const {
|
||||||
|
return default_topic_config_;
|
||||||
|
}
|
||||||
|
|
||||||
|
optional<TopicConfiguration>& Configuration::get_default_topic_configuration() {
|
||||||
|
return default_topic_config_;
|
||||||
|
}
|
||||||
|
|
||||||
Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
|
Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
|
||||||
return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup);
|
return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,12 +16,12 @@ const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000};
|
|||||||
|
|
||||||
KafkaHandleBase::KafkaHandleBase(Configuration config)
|
KafkaHandleBase::KafkaHandleBase(Configuration config)
|
||||||
: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)) {
|
: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)) {
|
||||||
|
auto& maybe_config = config_.get_default_topic_configuration();
|
||||||
}
|
if (maybe_config) {
|
||||||
|
maybe_config->set_as_opaque();
|
||||||
KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle)
|
auto conf_handle = rd_kafka_topic_conf_dup(maybe_config->get_handle());
|
||||||
: handle_(handle, &rd_kafka_destroy), timeout_ms_(DEFAULT_TIMEOUT) {
|
rd_kafka_conf_set_default_topic_conf(config_.get_handle(), conf_handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) {
|
void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) {
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
|
|||||||
// Now create a producer and produce a message
|
// Now create a producer and produce a message
|
||||||
Producer producer(make_producer_config());
|
Producer producer(make_producer_config());
|
||||||
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
||||||
string payload = "Hello world!";
|
string payload = "Hello world! 1";
|
||||||
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
|
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
|
||||||
runner.try_join();
|
runner.try_join();
|
||||||
|
|
||||||
@@ -137,7 +137,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
|
|||||||
// Now create a producer and produce a message
|
// Now create a producer and produce a message
|
||||||
Producer producer(make_producer_config());
|
Producer producer(make_producer_config());
|
||||||
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
||||||
string payload = "Hello world!";
|
string payload = "Hello world! 2";
|
||||||
string key = "such key";
|
string key = "such key";
|
||||||
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
|
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
|
||||||
Buffer(key.data(), key.size()));
|
Buffer(key.data(), key.size()));
|
||||||
@@ -196,7 +196,7 @@ TEST_F(ProducerTest, Callbacks) {
|
|||||||
ConsumerRunner runner(consumer, 1, 1);
|
ConsumerRunner runner(consumer, 1, 1);
|
||||||
|
|
||||||
// Now create a producer and produce a message
|
// Now create a producer and produce a message
|
||||||
string payload = "Hello world!";
|
string payload = "Hello world! 3";
|
||||||
string key = "hehe";
|
string key = "hehe";
|
||||||
bool deliver_report_called = false;
|
bool deliver_report_called = false;
|
||||||
Configuration config = make_producer_config();
|
Configuration config = make_producer_config();
|
||||||
@@ -231,3 +231,42 @@ TEST_F(ProducerTest, Callbacks) {
|
|||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
EXPECT_TRUE(deliver_report_called);
|
EXPECT_TRUE(deliver_report_called);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
|
||||||
|
int partition = 0;
|
||||||
|
|
||||||
|
// Create a consumer and assign this topic/partition
|
||||||
|
Consumer consumer(make_consumer_config());
|
||||||
|
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
|
||||||
|
ConsumerRunner runner(consumer, 1, 1);
|
||||||
|
|
||||||
|
// Now create a producer and produce a message
|
||||||
|
string payload = "Hello world! 4";
|
||||||
|
string key = "hehe";
|
||||||
|
bool callback_called = false;
|
||||||
|
|
||||||
|
Configuration config = make_producer_config();
|
||||||
|
TopicConfiguration topic_config;
|
||||||
|
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
|
||||||
|
int32_t partition_count) {
|
||||||
|
EXPECT_EQ(key, msg_key.as_string());
|
||||||
|
EXPECT_EQ(3, partition_count);
|
||||||
|
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
|
||||||
|
callback_called = true;
|
||||||
|
return 0;
|
||||||
|
});
|
||||||
|
config.set_default_topic_configuration(topic_config);
|
||||||
|
|
||||||
|
Producer producer(move(config));
|
||||||
|
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
||||||
|
producer.produce(topic, {}, Buffer(payload.data(), payload.size()),
|
||||||
|
Buffer(key.data(), key.size()));
|
||||||
|
producer.poll();
|
||||||
|
runner.try_join();
|
||||||
|
|
||||||
|
const auto& messages = runner.get_messages();
|
||||||
|
ASSERT_EQ(1, messages.size());
|
||||||
|
const auto& message = messages[0];
|
||||||
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
|
EXPECT_TRUE(callback_called);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user