mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 11:07:56 +00:00
concurrency issues in MessageBuilder internal data
This commit is contained in:
@@ -348,6 +348,15 @@ public:
|
||||
void construct_buffer(Buffer& lhs, const T& rhs) {
|
||||
lhs = Buffer(rhs);
|
||||
}
|
||||
|
||||
MessageBuilder clone() const {
|
||||
return std::move(MessageBuilder(topic()).
|
||||
key(Buffer(key().get_data(), key().get_size())).
|
||||
payload(Buffer(payload().get_data(), payload().get_size())).
|
||||
timestamp(timestamp()).
|
||||
user_data(user_data()).
|
||||
internal(internal()));
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -36,7 +36,8 @@ namespace cppkafka {
|
||||
|
||||
class Message;
|
||||
|
||||
struct Internal {
|
||||
class Internal {
|
||||
public:
|
||||
virtual ~Internal() = default;
|
||||
};
|
||||
using InternalPtr = std::shared_ptr<Internal>;
|
||||
@@ -44,15 +45,20 @@ using InternalPtr = std::shared_ptr<Internal>;
|
||||
/**
|
||||
* \brief Private message data structure
|
||||
*/
|
||||
struct MessageInternal {
|
||||
class MessageInternal {
|
||||
public:
|
||||
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
|
||||
static std::unique_ptr<MessageInternal> load(Message& message);
|
||||
void* get_user_data() const;
|
||||
InternalPtr get_internal() const;
|
||||
private:
|
||||
void* user_data_;
|
||||
InternalPtr internal_;
|
||||
};
|
||||
|
||||
template <typename BuilderType>
|
||||
struct MessageInternalGuard {
|
||||
class MessageInternalGuard {
|
||||
public:
|
||||
MessageInternalGuard(BuilderType& builder)
|
||||
: builder_(builder),
|
||||
user_data_(builder.user_data()) {
|
||||
|
||||
@@ -366,9 +366,6 @@ private:
|
||||
|
||||
template <typename BuilderType>
|
||||
TrackerPtr add_tracker(BuilderType& builder) {
|
||||
if (!has_internal_data_ && (max_number_retries_ > 0)) {
|
||||
has_internal_data_ = true; //enable once
|
||||
}
|
||||
if (has_internal_data_ && !builder.internal()) {
|
||||
// Add message tracker only if it hasn't been added before
|
||||
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
|
||||
@@ -426,8 +423,7 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
|
||||
add_tracker(const_cast<MessageBuilder&>(builder));
|
||||
do_add_message(builder, MessagePriority::Low, true);
|
||||
add_message(Builder(builder)); //make ConcreteBuilder
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
@@ -438,19 +434,26 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
|
||||
add_tracker(const_cast<MessageBuilder&>(builder));
|
||||
async_produce(builder, true);
|
||||
if (has_internal_data_) {
|
||||
MessageBuilder builder_copy(builder.clone());
|
||||
add_tracker(builder_copy);
|
||||
async_produce(builder_copy, true);
|
||||
}
|
||||
else {
|
||||
async_produce(builder, true);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
|
||||
TrackerPtr tracker = add_tracker(const_cast<MessageBuilder&>(builder));
|
||||
if (tracker) {
|
||||
if (has_internal_data_) {
|
||||
MessageBuilder builder_copy(builder.clone());
|
||||
TrackerPtr tracker = add_tracker(builder_copy);
|
||||
// produce until we succeed or we reach max retry limit
|
||||
std::future<bool> should_retry;
|
||||
do {
|
||||
should_retry = tracker->get_new_future();
|
||||
produce_message(builder);
|
||||
produce_message(builder_copy);
|
||||
wait_for_acks();
|
||||
}
|
||||
while (should_retry.get());
|
||||
@@ -576,6 +579,9 @@ size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::set_max_number_retries(size_t max_number_retries) {
|
||||
if (!has_internal_data_ && (max_number_retries > 0)) {
|
||||
has_internal_data_ = true; //enable once
|
||||
}
|
||||
max_number_retries_ = max_number_retries;
|
||||
}
|
||||
|
||||
@@ -638,12 +644,12 @@ void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool thr
|
||||
if (test_params && test_params->force_produce_error_) {
|
||||
throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN));
|
||||
}
|
||||
produce_message(std::forward<BuilderType>(builder));
|
||||
produce_message(builder);
|
||||
}
|
||||
catch (const HandleException& ex) {
|
||||
// If we have a flush failure callback and it returns true, we retry producing this message later
|
||||
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
|
||||
if (!callback || callback(std::forward<BuilderType>(builder), ex.get_error())) {
|
||||
if (!callback || callback(builder, ex.get_error())) {
|
||||
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
|
||||
if (tracker && tracker->num_retries_ > 0) {
|
||||
--tracker->num_retries_;
|
||||
@@ -671,7 +677,7 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
|
||||
//Get tracker data
|
||||
TestParameters* test_params = get_test_parameters();
|
||||
TrackerPtr tracker = has_internal_data_ ?
|
||||
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->internal_) : nullptr;
|
||||
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->get_internal()) : nullptr;
|
||||
bool should_retry = false;
|
||||
if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
|
||||
// We should produce this message again if we don't have a produce failure callback
|
||||
|
||||
@@ -68,8 +68,8 @@ Message::Message(HandlePtr handle)
|
||||
Message& Message::load_internal() {
|
||||
if (user_data_) {
|
||||
MessageInternal* mi = static_cast<MessageInternal*>(user_data_);
|
||||
user_data_ = mi->user_data_;
|
||||
internal_ = mi->internal_;
|
||||
user_data_ = mi->get_user_data();
|
||||
internal_ = mi->get_internal();
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -45,4 +45,12 @@ std::unique_ptr<MessageInternal> MessageInternal::load(Message& message) {
|
||||
static_cast<MessageInternal*>(message.get_handle()->_private) : nullptr);
|
||||
}
|
||||
|
||||
void* MessageInternal::get_user_data() const {
|
||||
return user_data_;
|
||||
}
|
||||
|
||||
InternalPtr MessageInternal::get_internal() const {
|
||||
return internal_;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user