Removed dependency on Producer and dr_callback_proxy

This commit is contained in:
accelerated
2018-06-06 19:15:34 -04:00
parent f746653841
commit 23810654ab
8 changed files with 72 additions and 80 deletions

View File

@@ -180,7 +180,7 @@ private:
Message(rd_kafka_message_t* handle, NonOwningTag);
Message(HandlePtr handle);
void load_internal(void* user_data, InternalPtr internal);
Message& load_internal();
HandlePtr handle_;
Buffer payload_;

View File

@@ -31,11 +31,10 @@
#define CPPKAFKA_MESSAGE_INTERNAL_H
#include <memory>
#include "message.h"
namespace cppkafka {
class Producer;
class Message;
struct Internal {
virtual ~Internal() = default;
@@ -45,16 +44,37 @@ using InternalPtr = std::shared_ptr<Internal>;
/**
* \brief Private message data structure
*/
class MessageInternal {
friend Producer;
public:
static std::unique_ptr<MessageInternal> load(const Producer& producer, Message& message);
private:
struct MessageInternal {
MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
static std::unique_ptr<MessageInternal> load(Message& message);
void* user_data_;
InternalPtr internal_;
};
template <typename BuilderType>
struct MessageInternalGuard {
MessageInternalGuard(BuilderType& builder)
: builder_(builder),
user_data_(builder.user_data()) {
if (builder_.internal()) {
// Swap contents with user_data
ptr_.reset(new MessageInternal(user_data_, builder_.internal()));
builder_.user_data(ptr_.get()); //overwrite user data
}
}
~MessageInternalGuard() {
//Restore user data
builder_.user_data(user_data_);
}
void release() {
ptr_.release();
}
private:
BuilderType& builder_;
std::unique_ptr<MessageInternal> ptr_;
void* user_data_;
};
}
#endif //CPPKAFKA_MESSAGE_INTERNAL_H

View File

@@ -31,14 +31,12 @@
#define CPPKAFKA_PRODUCER_H
#include <memory>
#include <tuple>
#include "kafka_handle_base.h"
#include "configuration.h"
#include "buffer.h"
#include "topic.h"
#include "macros.h"
#include "message_builder.h"
#include "message_internal.h"
namespace cppkafka {
@@ -80,7 +78,6 @@ class Message;
*/
class CPPKAFKA_API Producer : public KafkaHandleBase {
public:
friend MessageInternal;
/**
* The policy to use for the payload. The default policy is COPY_PAYLOAD
*/
@@ -159,11 +156,7 @@ public:
*/
void flush(std::chrono::milliseconds timeout);
private:
using LoadResult = std::tuple<void*, std::unique_ptr<MessageInternal>>;
LoadResult load_internal(void* user_data, InternalPtr internal);
PayloadPolicy message_payload_policy_;
bool has_internal_data_;
};
} // cppkafka

View File

@@ -104,7 +104,7 @@ public:
/**
* Callback to indicate a message failed to be flushed
*/
using FlushFailureCallback = std::function<bool(const Builder&, Error error)>;
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
/**
* \brief Constructs a buffered producer using the provided configuration
@@ -369,24 +369,22 @@ private:
if (!has_internal_data_ && (max_number_retries_ > 0)) {
has_internal_data_ = true; //enable once
}
if (has_internal_data_) {
// Add message tracker
if (has_internal_data_ && !builder.internal()) {
// Add message tracker only if it hasn't been added before
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
builder.internal(tracker);
return tracker;
}
return nullptr;
}
template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
void do_add_message(const Message& message, MessagePriority priority, bool do_flush);
template <typename MessageType>
void produce_message(MessageType&& message);
template <typename BuilderType>
void produce_message(BuilderType&& builder);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
template <typename MessageType>
void async_produce(MessageType&& message, bool throw_on_error);
template <typename BuilderType>
void async_produce(BuilderType&& message, bool throw_on_error);
// Members
Producer producer_;
@@ -466,7 +464,7 @@ void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
template <typename BufferType>
void BufferedProducer<BufferType>::produce(const Message& message) {
async_produce(message, true);
async_produce(MessageBuilder(message), true);
}
template <typename BufferType>
@@ -546,13 +544,6 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::do_add_message(const Message& message,
MessagePriority priority,
bool do_flush) {
do_add_messsage(MessageBuilder(message), priority, do_flush);
}
template <typename BufferType>
Producer& BufferedProducer<BufferType>::get_producer() {
return producer_;
@@ -615,11 +606,14 @@ void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallba
}
template <typename BufferType>
template <typename MessageType>
void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
template <typename BuilderType>
void BufferedProducer<BufferType>::produce_message(BuilderType&& builder) {
using builder_type = typename std::decay<BuilderType>::type;
while (true) {
try {
producer_.produce(std::forward<MessageType>(message));
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
producer_.produce(builder);
internal_guard.release();
// Sent successfully
++pending_acks_;
break;
@@ -637,23 +631,23 @@ void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
}
template <typename BufferType>
template <typename MessageType>
void BufferedProducer<BufferType>::async_produce(MessageType&& message, bool throw_on_error) {
template <typename BuilderType>
void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool throw_on_error) {
try {
TestParameters* test_params = get_test_parameters();
if (test_params && test_params->force_produce_error_) {
throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN));
}
produce_message(std::forward<MessageType>(message));
produce_message(std::forward<BuilderType>(builder));
}
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) {
TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
if (!callback || callback(std::forward<BuilderType>(builder), ex.get_error())) {
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_;
do_add_message(std::forward<MessageType>(message), MessagePriority::High, false);
do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false);
return;
}
}
@@ -676,7 +670,8 @@ template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
//Get tracker data
TestParameters* test_params = get_test_parameters();
TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
TrackerPtr tracker = has_internal_data_ ?
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->internal_) : nullptr;
bool should_retry = false;
if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
// We should produce this message again if we don't have a produce failure callback