mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 11:07:56 +00:00
Add conversion/comparison operators to Buffer
This commit is contained in:
@@ -81,10 +81,10 @@ int main(int argc, char* argv[]) {
|
|||||||
else {
|
else {
|
||||||
// Print the key (if any)
|
// Print the key (if any)
|
||||||
if (msg.get_key()) {
|
if (msg.get_key()) {
|
||||||
cout << msg.get_key().as_string() << " -> ";
|
cout << msg.get_key() << " -> ";
|
||||||
}
|
}
|
||||||
// Print the payload
|
// Print the payload
|
||||||
cout << msg.get_payload().as_string() << endl;
|
cout << msg.get_payload() << endl;
|
||||||
// Now commit the message
|
// Now commit the message
|
||||||
consumer.commit(msg);
|
consumer.commit(msg);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,8 @@
|
|||||||
#define CPPKAFKA_BUFFER_H
|
#define CPPKAFKA_BUFFER_H
|
||||||
|
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
|
#include <vector>
|
||||||
|
#include <iosfwd>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
@@ -63,7 +65,7 @@ public:
|
|||||||
template <typename T>
|
template <typename T>
|
||||||
Buffer(const T* data, size_t size)
|
Buffer(const T* data, size_t size)
|
||||||
: data_(reinterpret_cast<const DataType*>(data)), size_(size) {
|
: data_(reinterpret_cast<const DataType*>(data)), size_(size) {
|
||||||
static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte");
|
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -100,7 +102,33 @@ public:
|
|||||||
/**
|
/**
|
||||||
* Converts the contents of the buffer into a string
|
* Converts the contents of the buffer into a string
|
||||||
*/
|
*/
|
||||||
std::string as_string() const;
|
operator std::string() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Converts the contents of the buffer into a vector.
|
||||||
|
*
|
||||||
|
* The vector must contain some type of size 1 (e.g. uint8_t, char, etc).
|
||||||
|
*/
|
||||||
|
template <typename T>
|
||||||
|
operator std::vector<T>() const {
|
||||||
|
static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)");
|
||||||
|
return std::vector<T>(data_, data_ + size_);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares this Buffer for equality
|
||||||
|
*/
|
||||||
|
bool operator==(const Buffer& rhs) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compares this Buffer for inequality
|
||||||
|
*/
|
||||||
|
bool operator!=(const Buffer& rhs) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Output operator
|
||||||
|
*/
|
||||||
|
friend std::ostream& operator<<(std::ostream& output, const Buffer& rhs);
|
||||||
private:
|
private:
|
||||||
const DataType* data_;
|
const DataType* data_;
|
||||||
size_t size_;
|
size_t size_;
|
||||||
|
|||||||
@@ -27,9 +27,16 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
#include "buffer.h"
|
#include "buffer.h"
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
|
using std::equal;
|
||||||
|
using std::ostream;
|
||||||
|
using std::hex;
|
||||||
|
using std::dec;
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
|
|
||||||
@@ -55,8 +62,36 @@ Buffer::operator bool() const {
|
|||||||
return data_ != nullptr;
|
return data_ != nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
string Buffer::as_string() const {
|
Buffer::operator string() const {
|
||||||
return string(data_, data_ + size_);
|
return string(data_, data_ + size_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Buffer::operator==(const Buffer& rhs) const {
|
||||||
|
if (get_size() != rhs.get_size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return equal(get_data(), get_data() + get_size(), rhs.get_data());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Buffer::operator!=(const Buffer& rhs) const {
|
||||||
|
return !(*this == rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
ostream& operator<<(ostream& output, const Buffer& rhs) {
|
||||||
|
for (size_t i = 0; i < rhs.get_size(); ++i) {
|
||||||
|
char c = static_cast<char>(rhs.get_data()[i]);
|
||||||
|
if (c >= ' ' && c < 127) {
|
||||||
|
output << c;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
output << "\\x";
|
||||||
|
if (c < 16) {
|
||||||
|
output << '0';
|
||||||
|
}
|
||||||
|
output << hex << (int)c << dec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ create_test(producer)
|
|||||||
create_test(kafka_handle_base)
|
create_test(kafka_handle_base)
|
||||||
create_test(topic_partition_list)
|
create_test(topic_partition_list)
|
||||||
create_test(configuration)
|
create_test(configuration)
|
||||||
|
create_test(buffer)
|
||||||
|
|
||||||
if (ENABLE_ZOOKEEPER)
|
if (ENABLE_ZOOKEEPER)
|
||||||
create_test(zookeeper_watcher)
|
create_test(zookeeper_watcher)
|
||||||
endif()
|
endif()
|
||||||
|
|||||||
62
tests/buffer_test.cpp
Normal file
62
tests/buffer_test.cpp
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
#include <sstream>
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include "cppkafka/buffer.h"
|
||||||
|
|
||||||
|
using std::string;
|
||||||
|
using std::vector;
|
||||||
|
using std::ostringstream;
|
||||||
|
|
||||||
|
using namespace cppkafka;
|
||||||
|
|
||||||
|
class BufferTest : public testing::Test {
|
||||||
|
public:
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(BufferTest, StringConversion) {
|
||||||
|
string data = "Hello world!";
|
||||||
|
Buffer buffer(data);
|
||||||
|
string buffer_as_string = buffer;
|
||||||
|
EXPECT_EQ(data, buffer_as_string);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(BufferTest, StringConversionOnEmptyBuffer) {
|
||||||
|
Buffer buffer;
|
||||||
|
EXPECT_EQ("", static_cast<string>(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(BufferTest, VectorConversion) {
|
||||||
|
string data = "Hello world!";
|
||||||
|
Buffer buffer(data);
|
||||||
|
vector<char> buffer_as_vector = buffer;
|
||||||
|
EXPECT_EQ(data, string(buffer_as_vector.begin(), buffer_as_vector.end()));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(BufferTest, Equality) {
|
||||||
|
string data = "Hello world!";
|
||||||
|
Buffer buffer1(data);
|
||||||
|
Buffer buffer2(data);
|
||||||
|
|
||||||
|
EXPECT_EQ(buffer1, buffer2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(BufferTest, InEquality) {
|
||||||
|
string data1 = "Hello world!";
|
||||||
|
string data2 = "Hello worldz";
|
||||||
|
Buffer buffer1(data1);
|
||||||
|
Buffer buffer2(data2);
|
||||||
|
|
||||||
|
EXPECT_NE(buffer1, buffer2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(BufferTest, OutputOperator) {
|
||||||
|
string data = "Hello \x7fwor\x03ld!";
|
||||||
|
string pretty_string = "Hello \\x7fwor\\x03ld!";
|
||||||
|
Buffer buffer(data);
|
||||||
|
|
||||||
|
ostringstream output;
|
||||||
|
output << buffer;
|
||||||
|
EXPECT_EQ(pretty_string, output.str());
|
||||||
|
}
|
||||||
@@ -120,8 +120,8 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
|
|||||||
const auto& messages = runner.get_messages();
|
const auto& messages = runner.get_messages();
|
||||||
ASSERT_EQ(1, messages.size());
|
ASSERT_EQ(1, messages.size());
|
||||||
const auto& message = messages[0];
|
const auto& message = messages[0];
|
||||||
EXPECT_EQ(payload, message.get_payload().as_string());
|
EXPECT_EQ(Buffer(payload), message.get_payload());
|
||||||
EXPECT_EQ("", message.get_key().as_string());
|
EXPECT_FALSE(message.get_key());
|
||||||
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
EXPECT_EQ(partition, message.get_partition());
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
@@ -151,8 +151,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
|
|||||||
const auto& messages = runner.get_messages();
|
const auto& messages = runner.get_messages();
|
||||||
ASSERT_EQ(1, messages.size());
|
ASSERT_EQ(1, messages.size());
|
||||||
const auto& message = messages[0];
|
const auto& message = messages[0];
|
||||||
EXPECT_EQ(payload, message.get_payload().as_string());
|
EXPECT_EQ(Buffer(payload), message.get_payload());
|
||||||
EXPECT_EQ(key, message.get_key().as_string());
|
EXPECT_EQ(Buffer(key), message.get_key());
|
||||||
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
EXPECT_EQ(partition, message.get_partition());
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
@@ -183,9 +183,9 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
|
|||||||
ASSERT_EQ(message_count, messages.size());
|
ASSERT_EQ(message_count, messages.size());
|
||||||
for (const auto& message : messages) {
|
for (const auto& message : messages) {
|
||||||
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
EXPECT_EQ(1, payloads.erase(message.get_payload().as_string()));
|
EXPECT_EQ(1, payloads.erase(message.get_payload()));
|
||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
EXPECT_EQ("", message.get_key().as_string());
|
EXPECT_FALSE(message.get_key());
|
||||||
EXPECT_GE(message.get_partition(), 0);
|
EXPECT_GE(message.get_partition(), 0);
|
||||||
EXPECT_LT(message.get_partition(), 3);
|
EXPECT_LT(message.get_partition(), 3);
|
||||||
}
|
}
|
||||||
@@ -205,14 +205,14 @@ TEST_F(ProducerTest, Callbacks) {
|
|||||||
bool deliver_report_called = false;
|
bool deliver_report_called = false;
|
||||||
Configuration config = make_producer_config();
|
Configuration config = make_producer_config();
|
||||||
config.set_delivery_report_callback([&](Producer&, const Message& msg) {
|
config.set_delivery_report_callback([&](Producer&, const Message& msg) {
|
||||||
EXPECT_EQ(payload, msg.get_payload().as_string());
|
EXPECT_EQ(Buffer(payload), msg.get_payload());
|
||||||
deliver_report_called = true;
|
deliver_report_called = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
TopicConfiguration topic_config;
|
TopicConfiguration topic_config;
|
||||||
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
|
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
|
||||||
int32_t partition_count) {
|
int32_t partition_count) {
|
||||||
EXPECT_EQ(key, msg_key.as_string());
|
EXPECT_EQ(Buffer(key), msg_key);
|
||||||
EXPECT_EQ(3, partition_count);
|
EXPECT_EQ(3, partition_count);
|
||||||
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
|
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
|
||||||
return 0;
|
return 0;
|
||||||
@@ -227,8 +227,8 @@ TEST_F(ProducerTest, Callbacks) {
|
|||||||
const auto& messages = runner.get_messages();
|
const auto& messages = runner.get_messages();
|
||||||
ASSERT_EQ(1, messages.size());
|
ASSERT_EQ(1, messages.size());
|
||||||
const auto& message = messages[0];
|
const auto& message = messages[0];
|
||||||
EXPECT_EQ(payload, message.get_payload().as_string());
|
EXPECT_EQ(Buffer(payload), message.get_payload());
|
||||||
EXPECT_EQ(key, message.get_key().as_string());
|
EXPECT_EQ(Buffer(key), message.get_key());
|
||||||
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
EXPECT_EQ(partition, message.get_partition());
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
@@ -252,7 +252,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
|
|||||||
TopicConfiguration topic_config;
|
TopicConfiguration topic_config;
|
||||||
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
|
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
|
||||||
int32_t partition_count) {
|
int32_t partition_count) {
|
||||||
EXPECT_EQ(key, msg_key.as_string());
|
EXPECT_EQ(Buffer(key), msg_key);
|
||||||
EXPECT_EQ(3, partition_count);
|
EXPECT_EQ(3, partition_count);
|
||||||
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
|
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
|
||||||
callback_called = true;
|
callback_called = true;
|
||||||
@@ -303,8 +303,8 @@ TEST_F(ProducerTest, ConnectUsingZookeeper) {
|
|||||||
const auto& messages = runner.get_messages();
|
const auto& messages = runner.get_messages();
|
||||||
ASSERT_EQ(1, messages.size());
|
ASSERT_EQ(1, messages.size());
|
||||||
const auto& message = messages[0];
|
const auto& message = messages[0];
|
||||||
EXPECT_EQ(payload, message.get_payload().as_string());
|
EXPECT_EQ(Buffer(payload), message.get_payload());
|
||||||
EXPECT_EQ(key, message.get_key().as_string());
|
EXPECT_EQ(Buffer(key), message.get_key());
|
||||||
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
EXPECT_EQ(partition, message.get_partition());
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
EXPECT_EQ(0, message.get_error());
|
EXPECT_EQ(0, message.get_error());
|
||||||
|
|||||||
Reference in New Issue
Block a user