Bug fixes for sync flush and add_tracker (#91)

* fixes for sync flush and also add_tracker

* added flag for flush
This commit is contained in:
Alex Damian
2018-06-18 17:46:31 -04:00
committed by Matias Fontanini
parent b8f4be5e1b
commit eb46b8808e
2 changed files with 73 additions and 18 deletions

View File

@@ -185,20 +185,28 @@ public:
* This will send all messages and keep waiting until all of them are acknowledged (this is
* done by calling wait_for_acks).
*
* \param preserve_order If set to True, each message in the queue will be flushed only when the previous
* message ack is received. This may result in performance degradation as messages
* are sent one at a time. This calls sync_produce() on each message in the buffer.
* If set to False, all messages are flushed in one batch before waiting for acks,
* however message reordering may occur if librdkafka setting 'messages.sent.max.retries > 0'.
*
* \remark Although it is possible to call flush from multiple threads concurrently, better
* performance is achieved when called from the same thread or when serialized
* with respect to other threads.
*/
void flush();
void flush(bool preserve_order = false);
/**
* \brief Flushes the buffered messages and waits up to 'timeout'
*
* \param timeout The maximum time to wait until all acks are received
*
* \param preserve_order True to preserve message ordering, False otherwise. See flush above for more details.
*
* \return True if the operation completes and all acks have been received.
*/
bool flush(std::chrono::milliseconds timeout);
bool flush(std::chrono::milliseconds timeout, bool preserve_order = false);
/**
* Waits for produced message's acknowledgements from the brokers
@@ -404,13 +412,15 @@ private:
};
using TrackerPtr = std::shared_ptr<Tracker>;
// Returns existing tracker or creates new one
template <typename BuilderType>
TrackerPtr add_tracker(BuilderType& builder) {
if (has_internal_data_ && !builder.internal()) {
// Add message tracker only if it hasn't been added before
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
builder.internal(tracker);
return tracker;
TrackerPtr add_tracker(SenderType sender, BuilderType& builder) {
if (has_internal_data_) {
if (!builder.internal()) {
// Add message tracker only if it hasn't been added before
builder.internal(std::make_shared<Tracker>(sender, max_number_retries_));
}
return std::static_pointer_cast<Tracker>(builder.internal());
}
return nullptr;
}
@@ -469,7 +479,7 @@ void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) {
add_tracker(builder);
add_tracker(SenderType::Async, builder);
do_add_message(move(builder), MessagePriority::Low, true);
}
@@ -477,7 +487,7 @@ template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
add_tracker(builder_clone);
add_tracker(SenderType::Async, builder_clone);
async_produce(builder_clone, true);
}
else {
@@ -489,7 +499,7 @@ template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
TrackerPtr tracker = add_tracker(builder_clone);
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
// produce until we succeed or we reach max retry limit
std::future<bool> should_retry;
do {
@@ -526,15 +536,47 @@ void BufferedProducer<BufferType>::async_flush() {
}
template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
async_flush();
wait_for_acks();
void BufferedProducer<BufferType>::flush(bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
}
else {
async_flush();
wait_for_acks();
}
}
template <typename BufferType>
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
async_flush();
return wait_for_acks(timeout);
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout,
bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
auto start_time = std::chrono::high_resolution_clock::now();
while (!flush_queue.empty() &&
(std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time) < timeout)) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
}
else {
async_flush();
return wait_for_acks(timeout);
}
}
template <typename BufferType>

View File

@@ -79,6 +79,19 @@ void flusher_run(BufferedProducer<string>& producer,
producer.flush();
}
void async_flusher_run(BufferedProducer<string>& producer,
int& exit_flag,
int num_flush) {
while (!exit_flag) {
if (producer.get_buffer_size() >= (size_t)num_flush) {
producer.async_flush();
}
this_thread::sleep_for(milliseconds(10));
}
producer.async_flush();
producer.wait_for_acks();
}
void clear_run(BufferedProducer<string>& producer,
condition_variable& clear) {
mutex m;
@@ -377,7 +390,7 @@ TEST_CASE("replay async messages with errors", "[producer][buffered_producer][as
ErrorProducer<string> producer(make_producer_config(),
BufferedProducer<string>::TestParameters{false, true});
producer.set_max_number_retries(num_retries);
thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0);
thread flusher_thread(async_flusher_run, ref(producer), ref(exit_flag), 0);
string payload = "Hello world";
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
this_thread::sleep_for(milliseconds(2000));