Allow chaining set calls on Configuration and TopicConfiguration

This commit is contained in:
Matias Fontanini
2016-06-22 20:03:28 -07:00
parent bba9043faf
commit c300a9bf35
6 changed files with 49 additions and 34 deletions

View File

@@ -90,47 +90,47 @@ public:
* \param name The name of the attribute * \param name The name of the attribute
* \param value The value of the attribute * \param value The value of the attribute
*/ */
void set(const std::string& name, const std::string& value); Configuration& set(const std::string& name, const std::string& value);
/** /**
* Sets the delivery report callback (invokes rd_kafka_conf_set_dr_msg_cb) * Sets the delivery report callback (invokes rd_kafka_conf_set_dr_msg_cb)
*/ */
void set_delivery_report_callback(DeliveryReportCallback callback); Configuration& set_delivery_report_callback(DeliveryReportCallback callback);
/** /**
* Sets the offset commit callback (invokes rd_kafka_conf_set_offset_commit_cb) * Sets the offset commit callback (invokes rd_kafka_conf_set_offset_commit_cb)
*/ */
void set_offset_commit_callback(OffsetCommitCallback callback); Configuration& set_offset_commit_callback(OffsetCommitCallback callback);
/** /**
* Sets the error callback (invokes rd_kafka_conf_set_error_cb) * Sets the error callback (invokes rd_kafka_conf_set_error_cb)
*/ */
void set_error_callback(ErrorCallback callback); Configuration& set_error_callback(ErrorCallback callback);
/** /**
* Sets the throttle callback (invokes rd_kafka_conf_set_throttle_cb) * Sets the throttle callback (invokes rd_kafka_conf_set_throttle_cb)
*/ */
void set_throttle_callback(ThrottleCallback callback); Configuration& set_throttle_callback(ThrottleCallback callback);
/** /**
* Sets the log callback (invokes rd_kafka_conf_set_log_cb) * Sets the log callback (invokes rd_kafka_conf_set_log_cb)
*/ */
void set_log_callback(LogCallback callback); Configuration& set_log_callback(LogCallback callback);
/** /**
* Sets the stats callback (invokes rd_kafka_conf_set_stats_cb) * Sets the stats callback (invokes rd_kafka_conf_set_stats_cb)
*/ */
void set_stats_callback(StatsCallback callback); Configuration& set_stats_callback(StatsCallback callback);
/** /**
* Sets the socket callback (invokes rd_kafka_conf_set_socket_cb) * Sets the socket callback (invokes rd_kafka_conf_set_socket_cb)
*/ */
void set_socket_callback(SocketCallback callback); Configuration& set_socket_callback(SocketCallback callback);
/** /**
* Sets the default topic configuration * Sets the default topic configuration
*/ */
void set_default_topic_configuration(boost::optional<TopicConfiguration> config); Configuration& set_default_topic_configuration(boost::optional<TopicConfiguration> config);
/** /**
* Returns true iff the given property name has been set * Returns true iff the given property name has been set

View File

@@ -45,8 +45,8 @@ public:
/** /**
* Sets a bool value * Sets a bool value
*/ */
void set(const std::string& name, bool value) { Concrete& set(const std::string& name, bool value) {
proxy_set(name, value ? "true" : "false"); return proxy_set(name, value ? "true" : "false");
} }
/** /**
@@ -54,15 +54,15 @@ public:
*/ */
template <typename T, template <typename T,
typename = typename std::enable_if<std::is_integral<T>::value>::type> typename = typename std::enable_if<std::is_integral<T>::value>::type>
void set(const std::string& name, T value) { Concrete& set(const std::string& name, T value) {
proxy_set(name, std::to_string(value)); return proxy_set(name, std::to_string(value));
} }
/** /**
* Sets a cstring value * Sets a cstring value
*/ */
void set(const std::string& name, const char* value) { Concrete& set(const std::string& name, const char* value) {
proxy_set(name, value); return proxy_set(name, value);
} }
/** /**
@@ -92,8 +92,8 @@ protected:
return output; return output;
} }
private: private:
void proxy_set(const std::string& name, const std::string& value) { Concrete& proxy_set(const std::string& name, const std::string& value) {
static_cast<Concrete&>(*this).set(name, value); return static_cast<Concrete&>(*this).set(name, value);
} }
static std::string convert(const std::string& value, Type2Type<std::string>) { static std::string convert(const std::string& value, Type2Type<std::string>) {

View File

@@ -77,14 +77,14 @@ public:
* \param name The name of the option * \param name The name of the option
* \param value The value of the option * \param value The value of the option
*/ */
void set(const std::string& name, const std::string& value); TopicConfiguration& set(const std::string& name, const std::string& value);
/** /**
* \brief Sets the partitioner callback * \brief Sets the partitioner callback
* *
* This translates into a call to rd_kafka_topic_conf_set_partitioner_cb * This translates into a call to rd_kafka_topic_conf_set_partitioner_cb
*/ */
void set_partitioner_callback(PartitionerCallback callback); TopicConfiguration& set_partitioner_callback(PartitionerCallback callback);
/** /**
* \brief Sets the "this" pointer as the opaque pointer for this handle * \brief Sets the "this" pointer as the opaque pointer for this handle
@@ -92,7 +92,7 @@ public:
* This method will be called by consumers/producers when the topic configuration object * This method will be called by consumers/producers when the topic configuration object
* has been put in a persistent memory location. Users of cppkafka do not need to use this. * has been put in a persistent memory location. Users of cppkafka do not need to use this.
*/ */
void set_as_opaque(); TopicConfiguration& set_as_opaque();
/** /**
* Gets the partitioner callback * Gets the partitioner callback

View File

@@ -125,7 +125,7 @@ Configuration::Configuration(rd_kafka_conf_t* ptr)
} }
void Configuration::set(const string& name, const string& value) { Configuration& Configuration::set(const string& name, const string& value) {
char error_buffer[512]; char error_buffer[512];
rd_kafka_conf_res_t result; rd_kafka_conf_res_t result;
result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
@@ -133,45 +133,55 @@ void Configuration::set(const string& name, const string& value) {
if (result != RD_KAFKA_CONF_OK) { if (result != RD_KAFKA_CONF_OK) {
throw ConfigException(name, error_buffer); throw ConfigException(name, error_buffer);
} }
return *this;
} }
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { Configuration& Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
delivery_report_callback_ = move(callback); delivery_report_callback_ = move(callback);
rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy); rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy);
return *this;
} }
void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { Configuration& Configuration::set_offset_commit_callback(OffsetCommitCallback callback) {
offset_commit_callback_ = move(callback); offset_commit_callback_ = move(callback);
rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy); rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy);
return *this;
} }
void Configuration::set_error_callback(ErrorCallback callback) { Configuration& Configuration::set_error_callback(ErrorCallback callback) {
error_callback_ = move(callback); error_callback_ = move(callback);
rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy); rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy);
return *this;
} }
void Configuration::set_throttle_callback(ThrottleCallback callback) { Configuration& Configuration::set_throttle_callback(ThrottleCallback callback) {
throttle_callback_ = move(callback); throttle_callback_ = move(callback);
rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy); rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy);
return *this;
} }
void Configuration::set_log_callback(LogCallback callback) { Configuration& Configuration::set_log_callback(LogCallback callback) {
log_callback_ = move(callback); log_callback_ = move(callback);
rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy); rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy);
return *this;
} }
void Configuration::set_stats_callback(StatsCallback callback) { Configuration& Configuration::set_stats_callback(StatsCallback callback) {
stats_callback_ = move(callback); stats_callback_ = move(callback);
rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy); rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy);
return *this;
} }
void Configuration::set_socket_callback(SocketCallback callback) { Configuration& Configuration::set_socket_callback(SocketCallback callback) {
socket_callback_ = move(callback); socket_callback_ = move(callback);
rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy); rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy);
return *this;
} }
void Configuration::set_default_topic_configuration(optional<TopicConfiguration> config) { Configuration&
Configuration::set_default_topic_configuration(optional<TopicConfiguration> config) {
default_topic_config_ = std::move(config); default_topic_config_ = std::move(config);
return *this;
} }
bool Configuration::has_property(const string& name) const { bool Configuration::has_property(const string& name) const {

View File

@@ -67,7 +67,7 @@ TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr)
} }
void TopicConfiguration::set(const string& name, const string& value) { TopicConfiguration& TopicConfiguration::set(const string& name, const string& value) {
char error_buffer[512]; char error_buffer[512];
rd_kafka_conf_res_t result; rd_kafka_conf_res_t result;
result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer, result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
@@ -75,15 +75,18 @@ void TopicConfiguration::set(const string& name, const string& value) {
if (result != RD_KAFKA_CONF_OK) { if (result != RD_KAFKA_CONF_OK) {
throw ConfigException(name, error_buffer); throw ConfigException(name, error_buffer);
} }
return *this;
} }
void TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) { TopicConfiguration& TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) {
partitioner_callback_ = move(callback); partitioner_callback_ = move(callback);
rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy); rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy);
return *this;
} }
void TopicConfiguration::set_as_opaque() { TopicConfiguration& TopicConfiguration::set_as_opaque() {
rd_kafka_topic_conf_set_opaque(handle_.get(), this); rd_kafka_topic_conf_set_opaque(handle_.get(), this);
return *this;
} }
const TopicConfiguration::PartitionerCallback& const TopicConfiguration::PartitionerCallback&

View File

@@ -13,8 +13,9 @@ public:
TEST_F(ConfigurationTest, GetSetConfig) { TEST_F(ConfigurationTest, GetSetConfig) {
Configuration config; Configuration config;
config.set("group.id", "foo"); config.set("group.id", "foo").set("metadata.broker.list", "asd:9092");
EXPECT_EQ("foo", config.get("group.id")); EXPECT_EQ("foo", config.get("group.id"));
EXPECT_EQ("asd:9092", config.get("metadata.broker.list"));
EXPECT_EQ("foo", config.get<string>("group.id")); EXPECT_EQ("foo", config.get<string>("group.id"));
EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); EXPECT_THROW(config.get("asd"), ConfigOptionNotFound);
@@ -22,8 +23,9 @@ TEST_F(ConfigurationTest, GetSetConfig) {
TEST_F(ConfigurationTest, GetSetTopicConfig) { TEST_F(ConfigurationTest, GetSetTopicConfig) {
TopicConfiguration config; TopicConfiguration config;
config.set("auto.commit.enable", true); config.set("auto.commit.enable", true).set("offset.store.method", "broker");
EXPECT_EQ("true", config.get("auto.commit.enable")); EXPECT_EQ("true", config.get("auto.commit.enable"));
EXPECT_EQ("broker", config.get("offset.store.method"));
EXPECT_EQ(true, config.get<bool>("auto.commit.enable")); EXPECT_EQ(true, config.get<bool>("auto.commit.enable"));
EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); EXPECT_THROW(config.get("asd"), ConfigOptionNotFound);