Change internals of BufferedProducer

This commit is contained in:
Matias Fontanini
2017-04-23 11:01:21 -07:00
parent dbb547889b
commit e7db3df966
3 changed files with 82 additions and 33 deletions

View File

@@ -128,6 +128,24 @@ public:
* \param timeout The timeout used on this call
*/
int poll(std::chrono::milliseconds timeout);
/**
* \brief Flush all outstanding produce requests
*
* This translates into a call to rd_kafka_flush.
*
* The timeout used on this call is the one configured via Producer::set_timeout.
*/
void flush();
/**
* \brief Flush all outstanding produce requests
*
* This translates into a call to rd_kafka_flush
*
* \param timeout The timeout used on this call
*/
void flush(std::chrono::milliseconds timeout);
private:
PayloadPolicy message_payload_policy_;
};

View File

@@ -2,7 +2,7 @@
#define CPPKAFKA_BUFFERED_PRODUCER_H
#include <string>
#include <vector>
#include <queue>
#include <type_traits>
#include <cstdint>
#include <unordered_set>
@@ -22,6 +22,11 @@ public:
*/
using Builder = ConcreteMessageBuilder<BufferType>;
/**
* Callback to indicate a message failed to be produced.
*/
using ProduceFailureCallback = std::function<bool(const Message&)>;
/**
* \brief Constructs a buffered producer using the provided configuration
*
@@ -71,21 +76,32 @@ public:
* Simple helper to construct a builder object
*/
Builder make_builder(std::string topic);
/**
* \brief Sets the message produce failure callback
*
* This will be called when the delivery report callback is executed for a message having
* an error. The callback should return true if the message should be re-sent, otherwise
* false. Note that if the callback return false, then the message will be discarded.
*
* \param callback The callback to be set
*/
void set_produce_failure_callback(ProduceFailureCallback callback);
private:
// Pick the most appropriate index type depending on the platform we're using
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
template <typename BuilderType>
void do_add_message(BuilderType&& builder);
void produce_message(IndexType index, Builder& message);
void produce_message(const MessageBuilder& message);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
Producer producer_;
std::map<IndexType, Builder> messages_;
std::vector<IndexType> failed_indexes_;
IndexType current_index_{0};
std::unordered_map<std::string, unsigned> topic_mapping_;
std::queue<Builder> messages_;
ProduceFailureCallback produce_failure_callback_;
size_t expected_acks_{0};
size_t messages_acked_{0};
};
template <typename BufferType>
@@ -106,26 +122,22 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
for (auto& message_pair : messages_) {
produce_message(message_pair.first, message_pair.second);
while (!messages_.empty()) {
produce_message(messages_.front());
messages_.pop();
}
while (!messages_.empty()) {
producer_.poll();
if (!failed_indexes_.empty()) {
for (const IndexType index : failed_indexes_) {
produce_message(index, messages_.at(index));
}
}
failed_indexes_.clear();
messages_acked_ = 0;
while (messages_acked_ != expected_acks_) {
producer_.flush();
}
}
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
IndexType index = messages_.size();
messages_.emplace(index, std::move(builder));
expected_acks_++;
messages_.push(std::move(builder));
}
template <typename BufferType>
@@ -145,13 +157,16 @@ BufferedProducer<BufferType>::make_builder(std::string topic) {
}
template <typename BufferType>
void BufferedProducer<BufferType>::produce_message(IndexType index, Builder& builder) {
void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCallback callback) {
produce_failure_callback_ = std::move(callback);
}
template <typename BufferType>
void BufferedProducer<BufferType>::produce_message(const MessageBuilder& builder) {
bool sent = false;
MessageBuilder local_builder = builder;
local_builder.user_data(reinterpret_cast<void*>(index));
while (!sent) {
try {
producer_.produce(local_builder);
producer_.produce(builder);
sent = true;
}
catch (const HandleException& ex) {
@@ -177,19 +192,26 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
const IndexType index = reinterpret_cast<IndexType>(message.get_private_data());
auto iter = messages_.find(index);
// Got an ACK for an unexpected message?
if (iter == messages_.end()) {
// We should produce this message again if it has an error and we either don't have a
// produce failure callback or we have one but it returns true
bool should_produce = message.get_error() &&
(!produce_failure_callback_ || produce_failure_callback_(message));
if (should_produce) {
MessageBuilder builder(message.get_topic());
const auto& key = message.get_key();
const auto& payload = message.get_payload();
builder.partition(message.get_partition())
.key(Buffer(key.get_data(), key.get_size()))
.payload(Buffer(payload.get_data(), payload.get_size()));
if (message.get_timestamp()) {
builder.timestamp(message.get_timestamp()->get_timestamp());
}
produce_message(builder);
return;
}
// If there was an error sending this message, then we need to re-send it
if (message.get_error()) {
failed_indexes_.push_back(index);
}
else {
messages_.erase(iter);
}
// If production was successful or the produce failure callback returned false, then
// let's consider it to be acked
messages_acked_++;
}
} // cppkafka

View File

@@ -86,4 +86,13 @@ int Producer::poll(milliseconds timeout) {
return rd_kafka_poll(get_handle(), timeout.count());
}
void Producer::flush() {
flush(get_timeout());
}
void Producer::flush(milliseconds timeout) {
auto result = rd_kafka_flush(get_handle(), timeout.count());
check_error(result);
}
} // cppkafka