#ifndef CPPKAFKA_BUFFERED_PRODUCER_H #define CPPKAFKA_BUFFERED_PRODUCER_H #include #include #include #include #include #include #include #include #include "../producer.h" #include "../message.h" namespace cppkafka { template class BufferedProducer { public: /** * Concrete builder */ using Builder = ConcreteMessageBuilder; /** * \brief Constructs a buffered producer using the provided configuration * * \param config The configuration to be used on the actual Producer object */ BufferedProducer(Configuration config); /** * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * * \param builder The builder that contains the message to be added */ void add_message(const MessageBuilder& builder); /** * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * * Using this overload, you can avoid copies and construct your builder using the type * you are actually using in this buffered producer. * * \param builder The builder that contains the message to be added */ void add_message(Builder builder); /** * \brief Flushes the buffered messages. * * This will send all messages and keep waiting until all of them are acknowledged. */ void flush(); /** * Gets the Producer object */ Producer& get_producer(); /** * Gets the Producer object */ const Producer& get_producer() const; /** * Simple helper to construct a builder object */ Builder make_builder(std::string topic); private: // Pick the most appropriate index type depending on the platform we're using using IndexType = std::conditional::type; template void do_add_message(BuilderType&& builder); void produce_message(IndexType index, Builder& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); Producer producer_; std::map messages_; std::vector failed_indexes_; IndexType current_index_{0}; std::unordered_map topic_mapping_; }; template BufferedProducer::BufferedProducer(Configuration config) : producer_(prepare_configuration(std::move(config))) { } template void BufferedProducer::add_message(const MessageBuilder& builder) { do_add_message(builder); } template void BufferedProducer::add_message(Builder builder) { do_add_message(move(builder)); } template void BufferedProducer::flush() { for (auto& message_pair : messages_) { produce_message(message_pair.first, message_pair.second); } while (!messages_.empty()) { producer_.poll(); if (!failed_indexes_.empty()) { for (const IndexType index : failed_indexes_) { produce_message(index, messages_.at(index)); } } failed_indexes_.clear(); } } template template void BufferedProducer::do_add_message(BuilderType&& builder) { IndexType index = messages_.size(); messages_.emplace(index, std::move(builder)); } template Producer& BufferedProducer::get_producer() { return producer_; } template const Producer& BufferedProducer::get_producer() const { return producer_; } template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { return Builder(std::move(topic)); } template void BufferedProducer::produce_message(IndexType index, Builder& builder) { bool sent = false; MessageBuilder local_builder = builder; local_builder.user_data(reinterpret_cast(index)); while (!sent) { try { producer_.produce(local_builder); sent = true; } catch (const HandleException& ex) { const Error error = ex.get_error(); if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { // If the output queue is full, then just poll producer_.poll(); } else { throw; } } } } template Configuration BufferedProducer::prepare_configuration(Configuration config) { using std::placeholders::_2; auto callback = std::bind(&BufferedProducer::on_delivery_report, this, _2); config.set_delivery_report_callback(std::move(callback)); return config; } template void BufferedProducer::on_delivery_report(const Message& message) { const IndexType index = reinterpret_cast(message.get_private_data()); auto iter = messages_.find(index); // Got an ACK for an unexpected message? if (iter == messages_.end()) { return; } // If there was an error sending this message, then we need to re-send it if (message.get_error()) { failed_indexes_.push_back(index); } else { messages_.erase(iter); } } } // cppkafka #endif // CPPKAFKA_BUFFERED_PRODUCER_H