Add Message class

This commit is contained in:
Matias Fontanini
2016-05-17 19:25:49 -07:00
parent 5848bccdb8
commit 61253e4f42
3 changed files with 87 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
#ifndef CPPKAFKA_MESSAGE_H
#define CPPKAFKA_MESSAGE_H
#include <memory>
#include <cstdint>
#include <librdkafka/rdkafka.h>
#include "buffer.h"
#include "topic.h"
namespace cppkafka {
class Message {
public:
Message(rd_kafka_message_t* handle);
bool has_error() const;
rd_kafka_resp_err_t get_error() const;
std::string get_topic() const;
int get_partition() const;
const Buffer& get_payload() const;
const Buffer& get_key() const;
int64_t get_offset() const;
void* private_data();
rd_kafka_message_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
HandlePtr handle_;
Buffer payload_;
Buffer key_;
};
} // cppkafka
#endif // CPPKAFKA_MESSAGE_H

View File

@@ -5,6 +5,7 @@ set(SOURCES
topic.cpp
partition.cpp
buffer.cpp
message.cpp
kafka_handle_base.cpp
producer.cpp

50
src/message.cpp Normal file
View File

@@ -0,0 +1,50 @@
#include "message.h"
using std::string;
namespace cppkafka {
Message::Message(rd_kafka_message_t* handle)
: handle_(handle, &rd_kafka_message_destroy),
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
key_((const Buffer::DataType*)handle_->key, handle_->key_len) {
}
bool Message::has_error() const {
return get_error() != RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t Message::get_error() const {
return handle_->err;
}
int Message::get_partition() const {
return handle_->partition;
}
string Message::get_topic() const {
return rd_kafka_topic_name(handle_->rkt);
}
const Buffer& Message::get_payload() const {
return payload_;
}
const Buffer& Message::get_key() const {
return key_;
}
int64_t Message::get_offset() const {
return handle_->offset;
}
void* Message::private_data() {
return handle_->_private;
}
rd_kafka_message_t* Message::get_handle() const {
return handle_.get();
}
} // cppkafka