Replaced termination callback with throwing exception

This commit is contained in:
Alexander Damian
2019-04-24 10:30:32 -04:00
parent e8c4397b66
commit 0c1119727b
6 changed files with 73 additions and 34 deletions

View File

@@ -42,6 +42,10 @@ namespace cppkafka {
*/
class CPPKAFKA_API Error {
public:
/**
* @brief Constructs an error object with RD_KAFKA_RESP_ERR_NO_ERROR
*/
Error() = default;
/**
* Constructs an error object
*/
@@ -77,7 +81,7 @@ public:
*/
CPPKAFKA_API friend std::ostream& operator<<(std::ostream& output, const Error& rhs);
private:
rd_kafka_resp_err_t error_;
rd_kafka_resp_err_t error_{RD_KAFKA_RESP_ERR_NO_ERROR};
};
} // cppkafka

View File

@@ -100,10 +100,18 @@ public:
*/
void set_error_callback(ErrorCallback callback);
/**
* \brief Commits the current partition assignment synchronously
*
* This will call Consumer::commit() until either the message is successfully
* committed or the error callback returns false (if any is set).
*/
void commit();
/**
* \brief Commits the given message synchronously
*
* This will call Consumer::commit until either the message is successfully
* This will call Consumer::commit(msg) until either the message is successfully
* committed or the error callback returns false (if any is set).
*
* \param msg The message to be committed
@@ -113,7 +121,7 @@ public:
/**
* \brief Commits the offsets on the given topic/partitions synchronously
*
* This will call Consumer::commit until either the offsets are successfully
* This will call Consumer::commit(topic_partitions) until either the offsets are successfully
* committed or the error callback returns false (if any is set).
*
* \param topic_partitions The topic/partition list to be committed
@@ -127,25 +135,34 @@ public:
*/
Consumer& get_consumer();
private:
// Return true to abort and false to continue committing
// If the ReturnType contains 'true', we abort committing. Otherwise we continue.
// The second member of the ReturnType contains the RdKafka error if any.
template <typename T>
bool do_commit(const T& object) {
ReturnType do_commit(const T* object) {
ReturnType rt;
try {
consumer_.commit(object);
// If the commit succeeds, we're done
return true;
if (!object) {
consumer_.commit();
}
else {
consumer_.commit(*object);
}
// If the commit succeeds, we're done.
}
catch (const HandleException& ex) {
rt.error_ = ex.get_error();
// If there were actually no offsets to commit, return. Retrying won't solve
// anything here
if (ex.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) {
return true;
if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) {
// If there's no callback or if returns true for this message, keep committing.
// Otherwise abort.
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
if (!callback || callback(rt.error_)) {
rt.abort_ = false; //continue retrying
}
}
// If there's a callback and it returns false for this message, abort.
// Otherwise keep committing.
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
return callback && !callback(ex.get_error());
}
return rt;
}
Consumer& consumer_;

View File

@@ -34,6 +34,7 @@
#include <functional>
#include <thread>
#include "../consumer.h"
#include "../exceptions.h"
namespace cppkafka {
@@ -47,6 +48,14 @@ public:
static const TimeUnit DEFAULT_BACKOFF_STEP;
static const TimeUnit DEFAULT_MAXIMUM_BACKOFF;
static const size_t DEFAULT_MAXIMUM_RETRIES;
/**
* @brief Type which any functor must return.
*/
struct ReturnType {
bool abort_{true};
Error error_;
};
/**
* The backoff policy to use
@@ -119,11 +128,16 @@ public:
void perform(const Functor& callback) {
TimeUnit backoff = initial_backoff_;
size_t retries = maximum_retries_;
ReturnType rt;
while (retries--) {
auto start = std::chrono::steady_clock::now();
// If the callback returns true, we're done
if (callback()) {
return;
rt = callback();
if (rt.abort_) {
if (rt.error_) {
break; //terminal error
}
return; //success
}
auto end = std::chrono::steady_clock::now();
auto time_elapsed = end - start;
@@ -134,6 +148,8 @@ public:
// Increase out backoff depending on the policy being used
backoff = increase_backoff(backoff);
}
// No more retries left or we have a terminal error.
throw ConsumerException(rt.error_);
}
private:
TimeUnit increase_backoff(TimeUnit backoff);