diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 9827265..0159816 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -128,6 +128,24 @@ public: * \param timeout The timeout used on this call */ int poll(std::chrono::milliseconds timeout); + + /** + * \brief Flush all outstanding produce requests + * + * This translates into a call to rd_kafka_flush. + * + * The timeout used on this call is the one configured via Producer::set_timeout. + */ + void flush(); + + /** + * \brief Flush all outstanding produce requests + * + * This translates into a call to rd_kafka_flush + * + * \param timeout The timeout used on this call + */ + void flush(std::chrono::milliseconds timeout); private: PayloadPolicy message_payload_policy_; }; diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 0c99d6a..1ddf65d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -2,7 +2,7 @@ #define CPPKAFKA_BUFFERED_PRODUCER_H #include -#include +#include #include #include #include @@ -22,6 +22,11 @@ public: */ using Builder = ConcreteMessageBuilder; + /** + * Callback to indicate a message failed to be produced. + */ + using ProduceFailureCallback = std::function; + /** * \brief Constructs a buffered producer using the provided configuration * @@ -71,21 +76,32 @@ public: * Simple helper to construct a builder object */ Builder make_builder(std::string topic); + + /** + * \brief Sets the message produce failure callback + * + * This will be called when the delivery report callback is executed for a message having + * an error. The callback should return true if the message should be re-sent, otherwise + * false. Note that if the callback return false, then the message will be discarded. + * + * \param callback The callback to be set + */ + void set_produce_failure_callback(ProduceFailureCallback callback); private: // Pick the most appropriate index type depending on the platform we're using using IndexType = std::conditional::type; template void do_add_message(BuilderType&& builder); - void produce_message(IndexType index, Builder& message); + void produce_message(const MessageBuilder& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); Producer producer_; - std::map messages_; - std::vector failed_indexes_; - IndexType current_index_{0}; - std::unordered_map topic_mapping_; + std::queue messages_; + ProduceFailureCallback produce_failure_callback_; + size_t expected_acks_{0}; + size_t messages_acked_{0}; }; template @@ -106,26 +122,22 @@ void BufferedProducer::add_message(Builder builder) { template void BufferedProducer::flush() { - for (auto& message_pair : messages_) { - produce_message(message_pair.first, message_pair.second); + while (!messages_.empty()) { + produce_message(messages_.front()); + messages_.pop(); } - while (!messages_.empty()) { - producer_.poll(); - if (!failed_indexes_.empty()) { - for (const IndexType index : failed_indexes_) { - produce_message(index, messages_.at(index)); - } - } - failed_indexes_.clear(); + messages_acked_ = 0; + while (messages_acked_ != expected_acks_) { + producer_.flush(); } } template template void BufferedProducer::do_add_message(BuilderType&& builder) { - IndexType index = messages_.size(); - messages_.emplace(index, std::move(builder)); + expected_acks_++; + messages_.push(std::move(builder)); } template @@ -145,13 +157,16 @@ BufferedProducer::make_builder(std::string topic) { } template -void BufferedProducer::produce_message(IndexType index, Builder& builder) { +void BufferedProducer::set_produce_failure_callback(ProduceFailureCallback callback) { + produce_failure_callback_ = std::move(callback); +} + +template +void BufferedProducer::produce_message(const MessageBuilder& builder) { bool sent = false; - MessageBuilder local_builder = builder; - local_builder.user_data(reinterpret_cast(index)); while (!sent) { try { - producer_.produce(local_builder); + producer_.produce(builder); sent = true; } catch (const HandleException& ex) { @@ -177,19 +192,26 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - const IndexType index = reinterpret_cast(message.get_private_data()); - auto iter = messages_.find(index); - // Got an ACK for an unexpected message? - if (iter == messages_.end()) { + // We should produce this message again if it has an error and we either don't have a + // produce failure callback or we have one but it returns true + bool should_produce = message.get_error() && + (!produce_failure_callback_ || produce_failure_callback_(message)); + if (should_produce) { + MessageBuilder builder(message.get_topic()); + const auto& key = message.get_key(); + const auto& payload = message.get_payload(); + builder.partition(message.get_partition()) + .key(Buffer(key.get_data(), key.get_size())) + .payload(Buffer(payload.get_data(), payload.get_size())); + if (message.get_timestamp()) { + builder.timestamp(message.get_timestamp()->get_timestamp()); + } + produce_message(builder); return; } - // If there was an error sending this message, then we need to re-send it - if (message.get_error()) { - failed_indexes_.push_back(index); - } - else { - messages_.erase(iter); - } + // If production was successful or the produce failure callback returned false, then + // let's consider it to be acked + messages_acked_++; } } // cppkafka diff --git a/src/producer.cpp b/src/producer.cpp index 7e4e8d2..ba65fe7 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -86,4 +86,13 @@ int Producer::poll(milliseconds timeout) { return rd_kafka_poll(get_handle(), timeout.count()); } +void Producer::flush() { + flush(get_timeout()); +} + +void Producer::flush(milliseconds timeout) { + auto result = rd_kafka_flush(get_handle(), timeout.count()); + check_error(result); +} + } // cppkafka