diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index 907dd60..7c15b1a 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -75,7 +75,7 @@ public: /** * \brief The error callback. * - * Whenever an error occurs comitting an offset, this callback will be executed using + * Whenever an error occurs committing an offset, this callback will be executed using * the generated error. While the function returns true, then this is offset will be * committed again until it either succeeds or the function returns false. */ @@ -97,8 +97,7 @@ public: * \param callback The callback to be set */ void set_error_callback(ErrorCallback callback); - - + /** * \brief Commits the given message synchronously * @@ -118,6 +117,13 @@ public: * \param topic_partitions The topic/partition list to be committed */ void commit(const TopicPartitionList& topic_partitions); + + /** + * \brief Get the internal Consumer object + * + * \return A reference to the Consumer + */ + Consumer& get_consumer(); private: // Return true to abort and false to continue committing template diff --git a/include/cppkafka/utils/backoff_performer.h b/include/cppkafka/utils/backoff_performer.h index 7f8cc84..7641c9c 100644 --- a/include/cppkafka/utils/backoff_performer.h +++ b/include/cppkafka/utils/backoff_performer.h @@ -57,7 +57,7 @@ public: }; /** - * Constructs an instance of backoff perform + * Constructs an instance of backoff performer * * By default, the linear backoff policy is used */ diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 50c8cc7..74febbd 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -133,7 +133,7 @@ public: void add_message(Builder builder); /** - * \brief Produces a message without buffering it + * \brief Produces a message asynchronously without buffering it * * The message will still be tracked so that a call to flush or wait_for_acks will actually * wait for it to be acknowledged. @@ -145,7 +145,7 @@ public: void produce(const MessageBuilder& builder); /** - * \brief Produces a message without buffering it + * \brief Produces a message asynchronously without buffering it * * The message will still be tracked so that a call to flush or wait_for_acks will actually * wait for it to be acknowledged. @@ -515,10 +515,6 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - // Decrement the expected acks - --pending_acks_; - assert(pending_acks_ != (size_t)-1); // Prevent underflow - if (message.get_error()) { // We should produce this message again if we don't have a produce failure callback // or we have one but it returns true @@ -534,6 +530,9 @@ void BufferedProducer::on_delivery_report(const Message& message) { // Increment the total successful transmissions ++total_messages_produced_; } + // Decrement the expected acks + --pending_acks_; + assert(pending_acks_ != (size_t)-1); // Prevent underflow } } // cppkafka diff --git a/src/utils/backoff_committer.cpp b/src/utils/backoff_committer.cpp index de02bc7..58fedc6 100644 --- a/src/utils/backoff_committer.cpp +++ b/src/utils/backoff_committer.cpp @@ -55,4 +55,8 @@ void BackoffCommitter::commit(const TopicPartitionList& topic_partitions) { }); } +Consumer& BackoffCommitter::get_consumer() { + return consumer_; +} + } // cppkafka