Removed ReturnType. Throw on error from inside do_commit() as well as from perform()

This commit is contained in:
Alexander Damian
2019-04-24 16:42:56 -04:00
parent d84b75ca9d
commit effdf7fb95
3 changed files with 22 additions and 39 deletions

View File

@@ -137,32 +137,28 @@ public:
private:
// If the ReturnType contains 'true', we abort committing. Otherwise we continue.
// The second member of the ReturnType contains the RdKafka error if any.
template <typename T>
ReturnType do_commit(const T* object) {
ReturnType rt;
template <typename...Args>
bool do_commit(Args&&...args) {
try {
if (!object) {
consumer_.commit();
}
else {
consumer_.commit(*object);
}
consumer_.commit(std::forward<Args>(args)...);
// If the commit succeeds, we're done.
return true;
}
catch (const HandleException& ex) {
rt.error_ = ex.get_error();
Error error = ex.get_error();
// If there were actually no offsets to commit, return. Retrying won't solve
// anything here
if (rt.error_ != RD_KAFKA_RESP_ERR__NO_OFFSET) {
// If there's no callback or if returns true for this message, keep committing.
// Otherwise abort.
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
if (!callback || callback(rt.error_)) {
rt.abort_ = false; //continue retrying
}
if (error == RD_KAFKA_RESP_ERR__NO_OFFSET) {
throw ex; //abort
}
// If there's a callback and it returns false for this message, abort.
// Otherwise keep committing.
CallbackInvoker<ErrorCallback> callback("backoff committer", callback_, &consumer_);
if (callback && !callback(error)) {
throw ex; //abort
}
}
return rt;
return false; //continue
}
Consumer& consumer_;

View File

@@ -48,14 +48,6 @@ public:
static const TimeUnit DEFAULT_BACKOFF_STEP;
static const TimeUnit DEFAULT_MAXIMUM_BACKOFF;
static const size_t DEFAULT_MAXIMUM_RETRIES;
/**
* @brief Type which any functor must return.
*/
struct ReturnType {
bool abort_{true};
Error error_;
};
/**
* The backoff policy to use
@@ -128,15 +120,10 @@ public:
void perform(const Functor& callback) {
TimeUnit backoff = initial_backoff_;
size_t retries = maximum_retries_;
ReturnType rt;
while (retries--) {
auto start = std::chrono::steady_clock::now();
// If the callback returns true, we're done
rt = callback();
if (rt.abort_) {
if (rt.error_) {
break; //terminal error
}
if (callback()) {
return; //success
}
auto end = std::chrono::steady_clock::now();
@@ -149,7 +136,7 @@ public:
backoff = increase_backoff(backoff);
}
// No more retries left or we have a terminal error.
throw ConsumerException(rt.error_);
throw Exception("Commit failed: no more retries.");
}
private:
TimeUnit increase_backoff(TimeUnit backoff);