Added access to the internal Consumer in the backoff committer (#75)

* Added access to the internal Consumer and provided non-default constructor for BackoffPerformer

* added sync_produce and deleted the value constructor

* removed sync_produce methods

* removed value constructor in backoff_performer class
This commit is contained in:
Alex Damian
2018-06-04 14:48:20 -04:00
committed by Matias Fontanini
parent 9714bec5bf
commit 5cad740aea
4 changed files with 19 additions and 10 deletions

View File

@@ -75,7 +75,7 @@ public:
/** /**
* \brief The error callback. * \brief The error callback.
* *
* Whenever an error occurs comitting an offset, this callback will be executed using * Whenever an error occurs committing an offset, this callback will be executed using
* the generated error. While the function returns true, then this is offset will be * the generated error. While the function returns true, then this is offset will be
* committed again until it either succeeds or the function returns false. * committed again until it either succeeds or the function returns false.
*/ */
@@ -98,7 +98,6 @@ public:
*/ */
void set_error_callback(ErrorCallback callback); void set_error_callback(ErrorCallback callback);
/** /**
* \brief Commits the given message synchronously * \brief Commits the given message synchronously
* *
@@ -118,6 +117,13 @@ public:
* \param topic_partitions The topic/partition list to be committed * \param topic_partitions The topic/partition list to be committed
*/ */
void commit(const TopicPartitionList& topic_partitions); void commit(const TopicPartitionList& topic_partitions);
/**
* \brief Get the internal Consumer object
*
* \return A reference to the Consumer
*/
Consumer& get_consumer();
private: private:
// Return true to abort and false to continue committing // Return true to abort and false to continue committing
template <typename T> template <typename T>

View File

@@ -57,7 +57,7 @@ public:
}; };
/** /**
* Constructs an instance of backoff perform * Constructs an instance of backoff performer
* *
* By default, the linear backoff policy is used * By default, the linear backoff policy is used
*/ */

View File

@@ -133,7 +133,7 @@ public:
void add_message(Builder builder); void add_message(Builder builder);
/** /**
* \brief Produces a message without buffering it * \brief Produces a message asynchronously without buffering it
* *
* The message will still be tracked so that a call to flush or wait_for_acks will actually * The message will still be tracked so that a call to flush or wait_for_acks will actually
* wait for it to be acknowledged. * wait for it to be acknowledged.
@@ -145,7 +145,7 @@ public:
void produce(const MessageBuilder& builder); void produce(const MessageBuilder& builder);
/** /**
* \brief Produces a message without buffering it * \brief Produces a message asynchronously without buffering it
* *
* The message will still be tracked so that a call to flush or wait_for_acks will actually * The message will still be tracked so that a call to flush or wait_for_acks will actually
* wait for it to be acknowledged. * wait for it to be acknowledged.
@@ -515,10 +515,6 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) { void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
// Decrement the expected acks
--pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow
if (message.get_error()) { if (message.get_error()) {
// We should produce this message again if we don't have a produce failure callback // We should produce this message again if we don't have a produce failure callback
// or we have one but it returns true // or we have one but it returns true
@@ -534,6 +530,9 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
// Increment the total successful transmissions // Increment the total successful transmissions
++total_messages_produced_; ++total_messages_produced_;
} }
// Decrement the expected acks
--pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow
} }
} // cppkafka } // cppkafka

View File

@@ -55,4 +55,8 @@ void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) {
}); });
} }
Consumer& BackoffCommitter::get_consumer() {
return consumer_;
}
} // cppkafka } // cppkafka