Allow clearing buffered messages on buffered producer

This commit is contained in:
Matias Fontanini
2017-04-24 20:08:26 -07:00
parent 5e84da2458
commit bb900f21cb
2 changed files with 17 additions and 4 deletions

View File

@@ -3,8 +3,8 @@
#include <string> #include <string>
#include <queue> #include <queue>
#include <type_traits>
#include <cstdint> #include <cstdint>
#include <algorithm>
#include <unordered_set> #include <unordered_set>
#include <unordered_map> #include <unordered_map>
#include <map> #include <map>
@@ -93,6 +93,11 @@ public:
*/ */
void wait_for_acks(); void wait_for_acks();
/**
* Clears any buffered messages
*/
void clear();
/** /**
* Gets the Producer object * Gets the Producer object
*/ */
@@ -119,8 +124,7 @@ public:
*/ */
void set_produce_failure_callback(ProduceFailureCallback callback); void set_produce_failure_callback(ProduceFailureCallback callback);
private: private:
// Pick the most appropriate index type depending on the platform we're using using QueueType = std::queue<Builder>;
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
template <typename BuilderType> template <typename BuilderType>
void do_add_message(BuilderType&& builder); void do_add_message(BuilderType&& builder);
@@ -129,7 +133,7 @@ private:
void on_delivery_report(const Message& message); void on_delivery_report(const Message& message);
Producer producer_; Producer producer_;
std::queue<Builder> messages_; QueueType messages_;
ProduceFailureCallback produce_failure_callback_; ProduceFailureCallback produce_failure_callback_;
size_t expected_acks_{0}; size_t expected_acks_{0};
size_t messages_acked_{0}; size_t messages_acked_{0};
@@ -187,6 +191,12 @@ void BufferedProducer<BufferType>::wait_for_acks() {
messages_acked_ = 0; messages_acked_ = 0;
} }
template <typename BufferType>
void BufferedProducer<BufferType>::clear() {
QueueType tmp;
std::swap(tmp, messages_);
}
template <typename BufferType> template <typename BufferType>
template <typename BuilderType> template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) { void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {

View File

@@ -238,6 +238,9 @@ TEST_F(ProducerTest, BufferedProducer) {
producer.flush(); producer.flush();
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.wait_for_acks(); producer.wait_for_acks();
// Add another one but then clear it
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.clear();
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();