diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index c717cc5..b703b91 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -3,8 +3,8 @@ #include #include -#include #include +#include #include #include #include @@ -93,6 +93,11 @@ public: */ void wait_for_acks(); + /** + * Clears any buffered messages + */ + void clear(); + /** * Gets the Producer object */ @@ -119,8 +124,7 @@ public: */ void set_produce_failure_callback(ProduceFailureCallback callback); private: - // Pick the most appropriate index type depending on the platform we're using - using IndexType = std::conditional::type; + using QueueType = std::queue; template void do_add_message(BuilderType&& builder); @@ -129,7 +133,7 @@ private: void on_delivery_report(const Message& message); Producer producer_; - std::queue messages_; + QueueType messages_; ProduceFailureCallback produce_failure_callback_; size_t expected_acks_{0}; size_t messages_acked_{0}; @@ -187,6 +191,12 @@ void BufferedProducer::wait_for_acks() { messages_acked_ = 0; } +template +void BufferedProducer::clear() { + QueueType tmp; + std::swap(tmp, messages_); +} + template template void BufferedProducer::do_add_message(BuilderType&& builder) { diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index be0c4c6..64ad0b6 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -238,6 +238,9 @@ TEST_F(ProducerTest, BufferedProducer) { producer.flush(); producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.wait_for_acks(); + // Add another one but then clear it + producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.clear(); runner.try_join(); const auto& messages = runner.get_messages();