Add Message::get_timestamp

This commit is contained in:
Matias Fontanini
2016-06-19 21:00:25 -07:00
parent 831111812e
commit c615664f12
3 changed files with 70 additions and 2 deletions

View File

@@ -32,6 +32,7 @@
#include <memory>
#include <cstdint>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
#include "buffer.h"
#include "topic.h"
@@ -39,6 +40,8 @@
namespace cppkafka {
class MessageTimestamp;
/**
* \brief Thin wrapper over a rdkafka message handle
*
@@ -126,7 +129,14 @@ public:
* This should only be used on messages produced by a Producer that were set a private data
* attribute
*/
void* private_data() const;
void* get_private_data() const;
/**
* \brief Gets this Message's timestamp
*
* If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned.
*/
boost::optional<MessageTimestamp> get_timestamp() const;
/**
* Indicates whether this message is valid (not null)
@@ -150,6 +160,35 @@ private:
Buffer key_;
};
class CPPKAFKA_API MessageTimestamp {
public:
/**
* The timestamp type
*/
enum TimestampType {
CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME,
LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME
};
/**
* Constructs a timestamp object
*/
MessageTimestamp(int64_t timestamp, TimestampType type);
/**
* Gets the timestamp value
*/
int64_t get_timestamp() const;
/**
* Gets the timestamp type
*/
TimestampType get_type() const;
private:
int64_t timestamp_;
TimestampType type_;
};
} // cppkafka
#endif // CPPKAFKA_MESSAGE_H

View File

@@ -31,6 +31,9 @@
using std::string;
using boost::optional;
using boost::none_t;
namespace cppkafka {
void dummy_deleter(rd_kafka_message_t*) {
@@ -99,10 +102,19 @@ int64_t Message::get_offset() const {
return handle_->offset;
}
void* Message::private_data() const {
void* Message::get_private_data() const {
return handle_->_private;
}
optional<MessageTimestamp> Message::get_timestamp() const {
rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type);
if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
return none_t();
}
return MessageTimestamp(timestamp, static_cast<MessageTimestamp::TimestampType>(type));
}
Message::operator bool() const {
return handle_ != nullptr;
}
@@ -111,4 +123,19 @@ rd_kafka_message_t* Message::get_handle() const {
return handle_.get();
}
// MessageTimestamp
MessageTimestamp::MessageTimestamp(int64_t timestamp, TimestampType type)
: timestamp_(timestamp), type_(type) {
}
int64_t MessageTimestamp::get_timestamp() const {
return timestamp_;
}
MessageTimestamp::TimestampType MessageTimestamp::get_type() const {
return type_;
}
} // cppkafka

View File

@@ -156,6 +156,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
// NOTE: if this line fails, then you're using kafka 0.10+ and that's okay
EXPECT_FALSE(message.get_timestamp());
}
TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {