diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index bfe9c63..80a9f67 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -178,6 +178,13 @@ public: */ void clear(); + /** + * \brief Get the number of messages in the buffer + * + * \return The number of messages + */ + size_t get_buffer_size() const; + /** * \brief Sets the maximum amount of messages to be enqueued in the buffer. * @@ -400,6 +407,11 @@ void BufferedProducer::clear() { std::swap(tmp, messages_); } +template +size_t BufferedProducer::get_buffer_size() const { + return messages_.size(); +} + template void BufferedProducer::set_max_buffer_size(ssize_t max_buffer_size) { if (max_buffer_size < -1) { @@ -413,11 +425,6 @@ ssize_t BufferedProducer::get_max_buffer_size() const { return max_buffer_size_; } -template -size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); -} - template template void BufferedProducer::do_add_message(BuilderType&& builder, @@ -508,6 +515,7 @@ void BufferedProducer::produce_message(const MessageType& message) { template Configuration BufferedProducer::prepare_configuration(Configuration config) { using std::placeholders::_2; + delivery_report_callback_ = config.get_delivery_report_callback(); auto callback = std::bind(&BufferedProducer::on_delivery_report, this, _2); config.set_delivery_report_callback(std::move(callback)); return config;