Move some small functions into header files

This commit is contained in:
Matias Fontanini
2017-06-10 19:15:53 -07:00
parent bb5fb490ce
commit 52822fdb61
4 changed files with 43 additions and 76 deletions

View File

@@ -82,37 +82,51 @@ public:
/**
* Gets the error attribute
*/
Error get_error() const;
Error get_error() const {
return handle_->err;
}
/**
* Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF
*/
bool is_eof() const;
bool is_eof() const {
return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF;
}
/**
* Gets the topic that this message belongs to
*/
std::string get_topic() const;
std::string get_topic() const {
return rd_kafka_topic_name(handle_->rkt);
}
/**
* Gets the partition that this message belongs to
*/
int get_partition() const;
int get_partition() const {
return handle_->partition;
}
/**
* Gets the message's payload
*/
const Buffer& get_payload() const;
const Buffer& get_payload() const {
return payload_;
}
/**
* Gets the message's key
*/
const Buffer& get_key() const;
const Buffer& get_key() const {
return key_;
}
/**
* Gets the message offset
*/
int64_t get_offset() const;
int64_t get_offset() const {
return handle_->offset;
}
/**
* \brief Gets the private data.
@@ -120,24 +134,30 @@ public:
* This should only be used on messages produced by a Producer that were set a private data
* attribute
*/
void* get_private_data() const;
void* get_private_data() const {
return handle_->_private;
}
/**
* \brief Gets this Message's timestamp
*
* If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned.
*/
boost::optional<MessageTimestamp> get_timestamp() const;
inline boost::optional<MessageTimestamp> get_timestamp() const;
/**
* Indicates whether this message is valid (not null)
*/
explicit operator bool() const;
explicit operator bool() const {
return handle_ != nullptr;
}
/**
* Gets the rdkafka message handle
*/
rd_kafka_message_t* get_handle() const;
rd_kafka_message_t* get_handle() const {
return handle_.get();
}
private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
@@ -183,6 +203,16 @@ private:
TimestampType type_;
};
boost::optional<MessageTimestamp> Message::get_timestamp() const {
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
return {};
}
return MessageTimestamp(std::chrono::milliseconds(timestamp),
static_cast<MessageTimestamp::TimestampType>(type));
}
} // cppkafka
#endif // CPPKAFKA_MESSAGE_H

View File

@@ -109,8 +109,8 @@ private:
using OnTimeoutArgs = std::tuple<Timeout>;
static void handle_error(Error error);
static void handle_eof(EndOfFile, const TopicPartition& topic_partition);
static void handle_timeout(Timeout);
static void handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { }
static void handle_timeout(Timeout) { }
// Traits and template helpers

View File

@@ -29,13 +29,8 @@
#include "message.h"
using std::string;
using std::chrono::milliseconds;
using boost::optional;
using boost::none_t;
namespace cppkafka {
void dummy_deleter(rd_kafka_message_t*) {
@@ -68,56 +63,6 @@ Message::Message(HandlePtr handle)
}
Error Message::get_error() const {
return handle_->err;
}
bool Message::is_eof() const {
return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF;
}
int Message::get_partition() const {
return handle_->partition;
}
string Message::get_topic() const {
return rd_kafka_topic_name(handle_->rkt);
}
const Buffer& Message::get_payload() const {
return payload_;
}
const Buffer& Message::get_key() const {
return key_;
}
int64_t Message::get_offset() const {
return handle_->offset;
}
void* Message::get_private_data() const {
return handle_->_private;
}
optional<MessageTimestamp> Message::get_timestamp() const {
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
return {};
}
return MessageTimestamp(milliseconds(timestamp),
static_cast<MessageTimestamp::TimestampType>(type));
}
Message::operator bool() const {
return handle_ != nullptr;
}
rd_kafka_message_t* Message::get_handle() const {
return handle_.get();
}
// MessageTimestamp
MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type)

View File

@@ -44,12 +44,4 @@ void ConsumerDispatcher::handle_error(Error error) {
throw ConsumerException(error);
}
void ConsumerDispatcher::handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) {
}
void ConsumerDispatcher::handle_timeout(Timeout) {
}
} // cppkafka