Buffered producer thread safe (#72)

* Thread safe buffered producer

* Using single mutex version

* Changed based on feedback

* Changes based on latest review

* Added flush counter
This commit is contained in:
Alex Damian
2018-05-28 21:33:36 -04:00
committed by Matias Fontanini
parent f543810515
commit 429ec92369
8 changed files with 387 additions and 88 deletions

View File

@@ -24,8 +24,18 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)
# Build output checks # Build output checks
option(CPPKAFKA_CMAKE_VERBOSE "Generate verbose output." OFF)
option(CPPKAFKA_BUILD_SHARED "Build cppkafka as a shared library." ON) option(CPPKAFKA_BUILD_SHARED "Build cppkafka as a shared library." ON)
option(CPPKAFKA_DISABLE_TESTS "Disable build of cppkafka tests." OFF) option(CPPKAFKA_DISABLE_TESTS "Disable build of cppkafka tests." OFF)
option(CPPKAFKA_DISABLE_EXAMPLES "Disable build of cppkafka examples." OFF)
option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON)
option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON)
# Disable output from find_package macro
if (NOT CPPKAFKA_CMAKE_VERBOSE)
set(FIND_PACKAGE_QUIET QUIET)
endif()
if(CPPKAFKA_BUILD_SHARED) if(CPPKAFKA_BUILD_SHARED)
message(STATUS "Build will generate a shared library. " message(STATUS "Build will generate a shared library. "
"Use CPPKAFKA_BUILD_SHARED=0 to perform a static build") "Use CPPKAFKA_BUILD_SHARED=0 to perform a static build")
@@ -37,16 +47,36 @@ else()
endif() endif()
# Look for Boost (just need boost.optional headers here) # Look for Boost (just need boost.optional headers here)
find_package(Boost REQUIRED) find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET})
find_package(RdKafka REQUIRED) find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET})
if (Boost_FOUND)
find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET})
set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS})
set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED})
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
if (CPPKAFKA_CMAKE_VERBOSE)
message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}")
message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}")
message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}")
message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}")
message(STATUS "Boost libraries: ${Boost_LIBRARIES}")
endif()
endif()
add_subdirectory(src) add_subdirectory(src)
add_subdirectory(include) add_subdirectory(include)
add_subdirectory(examples) # Examples target
if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND)
add_subdirectory(examples)
else()
message(STATUS "Disabling examples")
endif()
# Add a target to generate API documentation using Doxygen # Add a target to generate API documentation using Doxygen
find_package(Doxygen QUIET) find_package(Doxygen ${FIND_PACKAGE_QUIET})
if(DOXYGEN_FOUND) if(DOXYGEN_FOUND)
configure_file( configure_file(
${CMAKE_CURRENT_SOURCE_DIR}/docs/Doxyfile.in ${CMAKE_CURRENT_SOURCE_DIR}/docs/Doxyfile.in
@@ -65,12 +95,13 @@ if(NOT CPPKAFKA_DISABLE_TESTS)
set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2) set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2)
if(EXISTS ${CATCH_ROOT}/CMakeLists.txt) if(EXISTS ${CATCH_ROOT}/CMakeLists.txt)
set(CATCH_INCLUDE ${CATCH_ROOT}/single_include) set(CATCH_INCLUDE ${CATCH_ROOT}/single_include)
enable_testing() enable_testing()
add_subdirectory(tests) add_subdirectory(tests)
else() else()
message(STATUS "Disabling tests because submodule Catch2 isn't checked out") message(STATUS "Disabling tests because submodule Catch2 isn't checked out")
endif() endif()
else()
message(STATUS "Disabling tests")
endif() endif()
if(NOT TARGET uninstall) if(NOT TARGET uninstall)

View File

@@ -55,25 +55,32 @@ In order to compile _cppkafka_ you need:
* _CMake_ * _CMake_
* A compiler with good C++11 support (e.g. gcc >= 4.8). This was tested successfully on * A compiler with good C++11 support (e.g. gcc >= 4.8). This was tested successfully on
_g++ 4.8.3_. _g++ 4.8.3_.
* The boost library. _cppkafka_ only requires boost.optional, which is a header only library, * The boost library.
so this doesn't add any additional runtime dependencies.
Now, in order to build, just run: Now, in order to build, just run:
```Shell ```Shell
mkdir build mkdir build
cd build cd build
cmake .. cmake <OPTIONS> ..
make make
``` ```
## CMake options ## CMake options
If you have installed _librdkafka_ on a non standard directory, you can use the The following cmake options can be specified:
`RDKAFKA_ROOT_DIR` cmake parameter when configuring the project: * `RDKAFKA_ROOT_DIR` : Specify a different librdkafka install directory.
* `BOOST_ROOT` : Specify a different Boost install directory.
* `CPPKAFKA_CMAKE_VERBOSE` : Generate verbose output. Default is `OFF`.
* `CPPKAFKA_BUILD_SHARED` : Build cppkafka as a shared library. Default is `ON`.
* `CPPKAFKA_DISABLE_TESTS` : Disable build of cppkafka tests. Default is `OFF`.
* `CPPKAFKA_DISABLE_EXAMPLES` : Disable build of cppkafka examples. Default is `OFF`.
* `CPPKAFKA_BOOST_STATIC_LIBS` : Link with Boost static libraries. Default is `ON`.
* `CPPKAFKA_BOOST_USE_MULTITHREADED` : Use Boost multi-threaded libraries. Default is `ON`.
Example:
```Shell ```Shell
cmake .. -DRDKAFKA_ROOT_DIR=/some/other/dir cmake -DRDKAFKA_ROOT_DIR=/some/other/dir -DCPPKAFKA_BUILD_SHARED=OFF ...
``` ```
Note that the `RDKAFKA_ROOT_DIR` must contain the following structure: Note that the `RDKAFKA_ROOT_DIR` must contain the following structure:
@@ -86,13 +93,6 @@ ${RDKAFKA_ROOT_DIR}/
+ lib/librdkafka.a + lib/librdkafka.a
``` ```
By default, a shared library will be built. If you want to perform a static build,
use the `CPPKAFKA_BUILD_SHARED` parameter:
```Shell
cmake .. -DCPPKAFKA_BUILD_SHARED=0
```
# Using # Using
If you want to use _cppkafka_, you'll need to link your application with: If you want to use _cppkafka_, you'll need to link your application with:
@@ -108,4 +108,3 @@ _Doxygen_ to be installed. The documentation will be written in html format at
Make sure to check the [wiki](https://github.com/mfontanini/cppkafka/wiki) which includes Make sure to check the [wiki](https://github.com/mfontanini/cppkafka/wiki) which includes
some documentation about the project and some of its features. some documentation about the project and some of its features.

View File

@@ -1,22 +1,15 @@
find_package(Boost COMPONENTS program_options) link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})
if (Boost_PROGRAM_OPTIONS_FOUND) add_custom_target(examples)
link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) macro(create_example example_name)
add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp")
add_dependencies(examples ${example_name})
endmacro()
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) create_example(kafka_producer)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) create_example(kafka_consumer)
create_example(kafka_consumer_dispatcher)
add_custom_target(examples) create_example(metadata)
macro(create_example example_name) create_example(consumers_information)
add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp")
add_dependencies(examples ${example_name})
endmacro()
create_example(kafka_producer)
create_example(kafka_consumer)
create_example(kafka_consumer_dispatcher)
create_example(metadata)
create_example(consumers_information)
else()
message(STATUS "Disabling examples since boost.program_options was not found")
endif()

View File

@@ -34,6 +34,7 @@
#include "buffer.h" #include "buffer.h"
#include "topic.h" #include "topic.h"
#include "macros.h" #include "macros.h"
#include "message.h"
namespace cppkafka { namespace cppkafka {
@@ -50,6 +51,11 @@ public:
*/ */
BasicMessageBuilder(std::string topic); BasicMessageBuilder(std::string topic);
/**
* Construct a BasicMessageBuilder from a Message object
*/
BasicMessageBuilder(const Message& message);
/** /**
* \brief Construct a message builder from another one that uses a different buffer type * \brief Construct a message builder from another one that uses a different buffer type
* *
@@ -177,6 +183,18 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
: topic_(std::move(topic)) { : topic_(std::move(topic)) {
} }
template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
: topic_(message.get_topic()),
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
std::chrono::milliseconds(0)),
user_data_(message.get_user_data())
{
}
template <typename T, typename C> template <typename T, typename C>
template <typename U, typename V> template <typename U, typename V>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs) BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)

View File

@@ -82,6 +82,7 @@ public:
* The policy to use for the payload. The default policy is COPY_PAYLOAD * The policy to use for the payload. The default policy is COPY_PAYLOAD
*/ */
enum class PayloadPolicy { enum class PayloadPolicy {
PASSTHROUGH_PAYLOAD = 0, ///< Rdkafka will not copy nor free the payload.
COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
}; };

View File

@@ -31,12 +31,14 @@
#define CPPKAFKA_BUFFERED_PRODUCER_H #define CPPKAFKA_BUFFERED_PRODUCER_H
#include <string> #include <string>
#include <queue> #include <deque>
#include <cstdint> #include <cstdint>
#include <algorithm> #include <algorithm>
#include <unordered_set> #include <unordered_set>
#include <unordered_map> #include <unordered_map>
#include <map> #include <map>
#include <mutex>
#include <atomic>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include "../producer.h" #include "../producer.h"
#include "../message.h" #include "../message.h"
@@ -50,13 +52,34 @@ namespace cppkafka {
* to produce them just as you would using the Producer class. * to produce them just as you would using the Producer class.
* *
* When calling either flush or wait_for_acks, the buffered producer will block until all * When calling either flush or wait_for_acks, the buffered producer will block until all
* produced messages (either in a buffer or non buffered way) are acknowledged by the kafka * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers.
* brokers.
* *
* When producing messages, this class will handle cases where the producer's queue is full so it\ * When producing messages, this class will handle cases where the producer's queue is full so it
* will poll until the production is successful. * will poll until the production is successful.
* *
* This class is not thread safe. * \remark This class is thread safe.
*
* \remark Releasing buffers: For high-performance applications preferring a zero-copy approach
* (using PayloadPolicy::PASSTHROUGH_PAYLOAD - see warning below) it is very important to know when
* to safely release owned message buffers. One way is to perform individual cleanup when
* ProduceSuccessCallback is called. If the application produces messages in batches or has a
* bursty behavior another way is to check when flush operations have fully completed with
* get_buffer_size()==0 && get_flushes_in_progress()==0. Note that get_pending_acks()==0
* is not always a guarantee as there is very small window when flush() starts where
* get_buffer_size()==0 && get_pending_acks()==0 but messages have not yet been sent to the
* remote broker. For applications producing messages w/o buffering, get_pending_acks()==0
* is sufficient.
*
* \warning Delivery Report Callback: This class makes internal use of this function and will
* overwrite anything the user has supplied as part of the configuration options. Instead user
* should call set_produce_success_callback() and set_produce_failure_callback() respectively.
*
* \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char>
* the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
* cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka
* shall not make any internal copies of the message and it is the application's responsability to free
* the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory
* corruptions.
*/ */
template <typename BufferType> template <typename BufferType>
class CPPKAFKA_API BufferedProducer { class CPPKAFKA_API BufferedProducer {
@@ -67,10 +90,20 @@ public:
using Builder = ConcreteMessageBuilder<BufferType>; using Builder = ConcreteMessageBuilder<BufferType>;
/** /**
* Callback to indicate a message failed to be produced. * Callback to indicate a message was delivered to the broker
*/
using ProduceSuccessCallback = std::function<void(const Message&)>;
/**
* Callback to indicate a message failed to be produced by the broker
*/ */
using ProduceFailureCallback = std::function<bool(const Message&)>; using ProduceFailureCallback = std::function<bool(const Message&)>;
/**
* Callback to indicate a message failed to be flushed
*/
using FlushFailureCallback = std::function<bool(const Builder&, Error error)>;
/** /**
* \brief Constructs a buffered producer using the provided configuration * \brief Constructs a buffered producer using the provided configuration
* *
@@ -106,6 +139,8 @@ public:
* wait for it to be acknowledged. * wait for it to be acknowledged.
* *
* \param builder The builder that contains the message to be produced * \param builder The builder that contains the message to be produced
*
* \remark This method throws cppkafka::HandleException on failure
*/ */
void produce(const MessageBuilder& builder); void produce(const MessageBuilder& builder);
@@ -116,6 +151,8 @@ public:
* wait for it to be acknowledged. * wait for it to be acknowledged.
* *
* \param message The message to be produced * \param message The message to be produced
*
* \remark This method throws cppkafka::HandleException on failure
*/ */
void produce(const Message& message); void produce(const Message& message);
@@ -124,6 +161,10 @@ public:
* *
* This will send all messages and keep waiting until all of them are acknowledged (this is * This will send all messages and keep waiting until all of them are acknowledged (this is
* done by calling wait_for_acks). * done by calling wait_for_acks).
*
* \remark Although it is possible to call flush from multiple threads concurrently, better
* performance is achieved when called from the same thread or when serialized
* with respect to other threads.
*/ */
void flush(); void flush();
@@ -159,12 +200,36 @@ public:
ssize_t get_max_buffer_size() const; ssize_t get_max_buffer_size() const;
/** /**
* \brief Get the number of messages in the buffer * \brief Get the number of unsent messages in the buffer
* *
* \return The number of messages * \return The number of messages
*/ */
size_t get_buffer_size() const; size_t get_buffer_size() const;
/**
* \brief Get the number of messages not yet acked by the broker
*
* \return The number of messages
*/
size_t get_pending_acks() const;
/**
* \brief Get the total number of messages successfully produced since the beginning
*
* \return The number of messages
*/
size_t get_total_messages_produced() const;
/**
* \brief Get the total outstanding flush operations in progress
*
* Since flush can be called from multiple threads concurrently, this counter indicates
* how many operations are curretnly in progress.
*
* \return The number of outstanding flush operations.
*/
size_t get_flushes_in_progress() const;
/** /**
* Gets the Producer object * Gets the Producer object
*/ */
@@ -188,69 +253,131 @@ public:
* false. Note that if the callback return false, then the message will be discarded. * false. Note that if the callback return false, then the message will be discarded.
* *
* \param callback The callback to be set * \param callback The callback to be set
*
* \remark It is *highly* recommended to set this callback as your message may be produced
* indefinitely if there's a remote error.
*
* \warning Do not call any method on the BufferedProducer while inside this callback.
*/ */
void set_produce_failure_callback(ProduceFailureCallback callback); void set_produce_failure_callback(ProduceFailureCallback callback);
/**
* \brief Sets the successful delivery callback
*
* The user can use this function to cleanup any application-owned message buffers.
*
* \param callback The callback to be set
*/
void set_produce_success_callback(ProduceSuccessCallback callback);
/**
* \brief Sets the local message produce failure callback
*
* This callback will be called when local message production fails during a flush() operation.
* Failure errors are typically payload too large, unknown topic or unknown partition.
* Note that if the callback returns false, the message will be dropped from the buffer,
* otherwise it will be re-enqueued for later retry.
*
* \param callback
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_flush_failure_callback(FlushFailureCallback callback);
private: private:
using QueueType = std::queue<Builder>; using QueueType = std::deque<Builder>;
enum class MessagePriority { Low, High };
template <typename T>
struct CounterGuard{
CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; }
~CounterGuard() { --counter_; }
std::atomic<T>& counter_;
};
template <typename BuilderType> template <typename BuilderType>
void do_add_message(BuilderType&& builder); void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
template <typename MessageType> template <typename MessageType>
void produce_message(const MessageType& message); void produce_message(const MessageType& message);
Configuration prepare_configuration(Configuration config); Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message); void on_delivery_report(const Message& message);
// Members
Configuration::DeliveryReportCallback delivery_report_callback_;
Producer producer_; Producer producer_;
QueueType messages_; QueueType messages_;
mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_; ProduceFailureCallback produce_failure_callback_;
size_t expected_acks_{0}; FlushFailureCallback flush_failure_callback_;
size_t messages_acked_{0};
ssize_t max_buffer_size_{-1}; ssize_t max_buffer_size_{-1};
std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0};
}; };
template <typename BufferType> template <typename BufferType>
BufferedProducer<BufferType>::BufferedProducer(Configuration config) Producer::PayloadPolicy get_default_payload_policy() {
: delivery_report_callback_(config.get_delivery_report_callback()), return Producer::PayloadPolicy::COPY_PAYLOAD;
producer_(prepare_configuration(std::move(config))) { }
template <> inline
Producer::PayloadPolicy get_default_payload_policy<Buffer>() {
return Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD;
}
template <typename BufferType>
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
: producer_(prepare_configuration(std::move(config))) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) { void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
do_add_message(builder); do_add_message(builder, MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) { void BufferedProducer<BufferType>::add_message(Builder builder) {
do_add_message(move(builder)); do_add_message(move(builder), MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) { void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
produce_message(builder); produce_message(builder);
expected_acks_++;
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce(const Message& message) { void BufferedProducer<BufferType>::produce(const Message& message) {
produce_message(message); produce_message(message);
expected_acks_++;
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::flush() { void BufferedProducer<BufferType>::flush() {
while (!messages_.empty()) { CounterGuard<size_t> counter_guard(flushes_in_progress_);
produce_message(messages_.front()); QueueType flush_queue; // flush from temporary queue
messages_.pop(); {
std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue);
}
while (!flush_queue.empty()) {
try {
produce_message(flush_queue.front());
}
catch (const HandleException& ex) {
if (flush_failure_callback_ &&
flush_failure_callback_(flush_queue.front(), ex.get_error())) {
// retry again later
do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false);
}
}
flush_queue.pop_front();
} }
wait_for_acks(); wait_for_acks();
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::wait_for_acks() { void BufferedProducer<BufferType>::wait_for_acks() {
while (messages_acked_ < expected_acks_) { while (pending_acks_ > 0) {
try { try {
producer_.flush(); producer_.flush();
} }
@@ -264,16 +391,13 @@ void BufferedProducer<BufferType>::wait_for_acks() {
} }
} }
} }
expected_acks_ = 0;
messages_acked_ = 0;
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::clear() { void BufferedProducer<BufferType>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp; QueueType tmp;
std::swap(tmp, messages_); std::swap(tmp, messages_);
expected_acks_ = 0;
messages_acked_ = 0;
} }
template <typename BufferType> template <typename BufferType>
@@ -296,10 +420,19 @@ size_t BufferedProducer<BufferType>::get_buffer_size() const {
template <typename BufferType> template <typename BufferType>
template <typename BuilderType> template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) { void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
expected_acks_++; MessagePriority priority,
messages_.push(std::forward<BuilderType>(builder)); bool do_flush) {
if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { {
std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) {
messages_.emplace_front(std::move(builder));
}
else {
messages_.emplace_back(std::move(builder));
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
flush(); flush();
} }
} }
@@ -314,6 +447,21 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
return producer_; return producer_;
} }
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_pending_acks() const {
return pending_acks_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_produced() const {
return total_messages_produced_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
return flushes_in_progress_;
}
template <typename BufferType> template <typename BufferType>
typename BufferedProducer<BufferType>::Builder typename BufferedProducer<BufferType>::Builder
BufferedProducer<BufferType>::make_builder(std::string topic) { BufferedProducer<BufferType>::make_builder(std::string topic) {
@@ -325,18 +473,28 @@ void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCa
produce_failure_callback_ = std::move(callback); produce_failure_callback_ = std::move(callback);
} }
template <typename BufferType>
void BufferedProducer<BufferType>::set_produce_success_callback(ProduceSuccessCallback callback) {
produce_success_callback_ = std::move(callback);
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallback callback) {
flush_failure_callback_ = std::move(callback);
}
template <typename BufferType> template <typename BufferType>
template <typename MessageType> template <typename MessageType>
void BufferedProducer<BufferType>::produce_message(const MessageType& message) { void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
bool sent = false; while (true) {
while (!sent) {
try { try {
producer_.produce(message); producer_.produce(message);
sent = true; // Sent successfully
++pending_acks_;
break;
} }
catch (const HandleException& ex) { catch (const HandleException& ex) {
const Error error = ex.get_error(); if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
// If the output queue is full, then just poll // If the output queue is full, then just poll
producer_.poll(); producer_.poll();
} }
@@ -357,21 +515,26 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) { void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
// Call the user-supplied delivery report callback if any // Decrement the expected acks
if (delivery_report_callback_) { --pending_acks_;
delivery_report_callback_(producer_, message); assert(pending_acks_ != (size_t)-1); // Prevent underflow
}
// We should produce this message again if it has an error and we either don't have a // We should produce this message again if it has an error and we either don't have a
// produce failure callback or we have one but it returns true // produce failure callback or we have one but it returns true
bool should_produce = message.get_error() && bool should_produce = message.get_error() &&
(!produce_failure_callback_ || produce_failure_callback_(message)); (!produce_failure_callback_ || produce_failure_callback_(message));
if (should_produce) { if (should_produce) {
produce_message(message); // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
return; do_add_message(Builder(message), MessagePriority::High, false);
}
else {
// Successful delivery
if (produce_success_callback_) {
produce_success_callback_(message);
}
// Increment the total successful transmissions
++total_messages_produced_;
} }
// If production was successful or the produce failure callback returned false, then
// let's consider it to be acked
messages_acked_++;
} }
} // cppkafka } // cppkafka

View File

@@ -1,6 +1,6 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/)
include_directories(SYSTEM ${CATCH_INCLUDE}) include_directories(SYSTEM ${CATCH_INCLUDE})
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})
set(KAFKA_TEST_INSTANCE "kafka-vm:9092" set(KAFKA_TEST_INSTANCE "kafka-vm:9092"
CACHE STRING "The kafka instance to which to connect to run tests") CACHE STRING "The kafka instance to which to connect to run tests")

View File

@@ -12,17 +12,19 @@
using std::string; using std::string;
using std::to_string; using std::to_string;
using std::set; using std::set;
using std::vector;
using std::tie; using std::tie;
using std::move; using std::move;
using std::thread; using std::thread;
namespace this_thread = std::this_thread;
using std::mutex; using std::mutex;
using std::unique_lock; using std::unique_lock;
using std::lock_guard; using std::lock_guard;
using std::condition_variable; using std::condition_variable;
using std::chrono::system_clock; using std::chrono::system_clock;
using std::chrono::seconds; using std::chrono::seconds;
using std::chrono::milliseconds; using std::chrono::milliseconds;
using std::ref;
using namespace cppkafka; using namespace cppkafka;
@@ -48,6 +50,44 @@ static Configuration make_consumer_config() {
return config; return config;
} }
void producer_run(BufferedProducer<string>& producer,
int& exit_flag, condition_variable& clear,
int num_messages,
int partition) {
MessageBuilder builder(KAFKA_TOPIC);
string key("wassup?");
string payload("nothing much!");
builder.partition(partition).key(key).payload(payload);
for (int i = 0; i < num_messages; ++i) {
if (i == num_messages/2) {
clear.notify_one();
}
producer.add_message(builder);
this_thread::sleep_for(milliseconds(10));
}
exit_flag = 1;
}
void flusher_run(BufferedProducer<string>& producer,
int& exit_flag,
int num_flush) {
while (!exit_flag) {
if (producer.get_buffer_size() >= (size_t)num_flush) {
producer.flush();
}
}
producer.flush();
}
void clear_run(BufferedProducer<string>& producer,
condition_variable& clear) {
mutex m;
unique_lock<mutex> lock(m);
clear.wait(lock);
producer.clear();
}
TEST_CASE("simple production", "[producer]") { TEST_CASE("simple production", "[producer]") {
int partition = 0; int partition = 0;
@@ -234,7 +274,7 @@ TEST_CASE("multiple messages", "[producer]") {
} }
} }
TEST_CASE("buffered producer", "[producer]") { TEST_CASE("buffered producer", "[producer][buffered_producer]") {
int partition = 0; int partition = 0;
// Create a consumer and assign this topic/partition // Create a consumer and assign this topic/partition
@@ -303,3 +343,57 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") {
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();
REQUIRE(messages.size() == producer.get_max_buffer_size()); REQUIRE(messages.size() == producer.get_max_buffer_size());
} }
TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") {
int partition = 0;
vector<thread> threads;
int num_messages = 50;
int num_flush = 10;
int exit_flag = 0;
condition_variable clear;
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, num_messages, 1);
BufferedProducer<string> producer(make_producer_config());
threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition));
threads.push_back(thread(flusher_run, ref(producer), ref(exit_flag), num_flush));
// Wait for completion
runner.try_join();
for (auto&& thread : threads) {
thread.join();
}
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == num_messages);
REQUIRE(producer.get_flushes_in_progress() == 0);
REQUIRE(producer.get_pending_acks() == 0);
REQUIRE(producer.get_total_messages_produced() == num_messages);
REQUIRE(producer.get_buffer_size() == 0);
}
TEST_CASE("clear multi-threaded buffered producer", "[producer][buffered_producer]") {
int partition = 0;
vector<thread> threads;
int num_messages = 50;
int exit_flag = 0;
condition_variable clear;
BufferedProducer<string> producer(make_producer_config());
threads.push_back(thread(producer_run, ref(producer), ref(exit_flag), ref(clear), num_messages, partition));
threads.push_back(thread(clear_run, ref(producer), ref(clear)));
// Wait for completion
for (auto&& thread : threads) {
thread.join();
}
REQUIRE(producer.get_total_messages_produced() == 0);
REQUIRE(producer.get_flushes_in_progress() == 0);
REQUIRE(producer.get_pending_acks() == 0);
REQUIRE(producer.get_buffer_size() < num_messages);
}