Allow making no key/payload copies in BufferedProducer

This commit is contained in:
Matias Fontanini
2017-04-16 10:13:14 -07:00
parent f781afe5cf
commit f924eb68e7
3 changed files with 63 additions and 7 deletions

View File

@@ -108,11 +108,21 @@ public:
*/
const BufferType& key() const;
/**
* Gets the message's key
*/
BufferType& key();
/**
* Gets the message's payload
*/
const BufferType& payload() const;
/**
* Gets the message's payload
*/
BufferType& payload();
/**
* Gets the message's user data pointer
*/
@@ -183,11 +193,21 @@ const T& BasicMessageBuilder<T, C>::key() const {
return key_;
}
template <typename T, typename C>
T& BasicMessageBuilder<T, C>::key() {
return key_;
}
template <typename T, typename C>
const T& BasicMessageBuilder<T, C>::payload() const {
return payload_;
}
template <typename T, typename C>
T& BasicMessageBuilder<T, C>::payload() {
return payload_;
}
template <typename T, typename C>
void* BasicMessageBuilder<T, C>::user_data() const {
return user_data_;

View File

@@ -38,6 +38,18 @@ public:
*/
void add_message(const MessageBuilder& builder);
/**
* \brief Adds a message to the producer's buffer.
*
* The message won't be sent until flush is called.
*
* Using this overload, you can avoid copies and construct your builder using the type
* you are actually using in this buffered producer.
*
* \param builder The builder that contains the message to be added
*/
void add_message(Builder builder);
/**
* \brief Flushes the buffered messages.
*
@@ -54,10 +66,17 @@ public:
* Gets the Producer object
*/
const Producer& get_producer() const;
/**
* Simple helper to construct a builder object
*/
Builder make_builder(const Topic& topic);
private:
// Pick the most appropriate index type depending on the platform we're using
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
template <typename BuilderType>
void do_add_message(BuilderType&& builder);
const Topic& get_topic(const std::string& topic);
void produce_message(IndexType index, Builder& message);
Configuration prepare_configuration(Configuration config);
@@ -79,13 +98,12 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
Builder local_builder(get_topic(builder.topic().get_name()));
local_builder.partition(builder.partition());
local_builder.key(builder.key());
local_builder.payload(builder.payload());
do_add_message(builder);
}
IndexType index = messages_.size();
messages_.emplace(index, std::move(local_builder));
template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) {
do_add_message(move(builder));
}
template <typename BufferType>
@@ -105,6 +123,18 @@ void BufferedProducer<BufferType>::flush() {
}
}
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
Builder local_builder(get_topic(builder.topic().get_name()));
local_builder.partition(builder.partition());
local_builder.key(std::move(builder.key()));
local_builder.payload(std::move(builder.payload()));
IndexType index = messages_.size();
messages_.emplace(index, std::move(local_builder));
}
template <typename BufferType>
Producer& BufferedProducer<BufferType>::get_producer() {
return producer_;
@@ -115,6 +145,12 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
return producer_;
}
template <typename BufferType>
typename BufferedProducer<BufferType>::Builder
BufferedProducer<BufferType>::make_builder(const Topic& topic) {
return Builder(topic);
}
template <typename BufferType>
const Topic& BufferedProducer<BufferType>::get_topic(const std::string& topic) {
auto iter = topic_mapping_.find(topic);