diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp index 8555bbf..ee206a8 100644 --- a/examples/kafka_consumer.cpp +++ b/examples/kafka_consumer.cpp @@ -81,10 +81,10 @@ int main(int argc, char* argv[]) { else { // Print the key (if any) if (msg.get_key()) { - cout << msg.get_key().as_string() << " -> "; + cout << msg.get_key() << " -> "; } // Print the payload - cout << msg.get_payload().as_string() << endl; + cout << msg.get_payload() << endl; // Now commit the message consumer.commit(msg); } diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index a7c47cc..b7de88f 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -31,6 +31,8 @@ #define CPPKAFKA_BUFFER_H #include +#include +#include #include namespace cppkafka { @@ -63,7 +65,7 @@ public: template Buffer(const T* data, size_t size) : data_(reinterpret_cast(data)), size_(size) { - static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte"); + static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); } /** @@ -100,7 +102,33 @@ public: /** * Converts the contents of the buffer into a string */ - std::string as_string() const; + operator std::string() const; + + /** + * \brief Converts the contents of the buffer into a vector. + * + * The vector must contain some type of size 1 (e.g. uint8_t, char, etc). + */ + template + operator std::vector() const { + static_assert(sizeof(T) == sizeof(DataType), "sizeof(T) != sizeof(DataType)"); + return std::vector(data_, data_ + size_); + } + + /** + * Compares this Buffer for equality + */ + bool operator==(const Buffer& rhs) const; + + /** + * Compares this Buffer for inequality + */ + bool operator!=(const Buffer& rhs) const; + + /** + * Output operator + */ + friend std::ostream& operator<<(std::ostream& output, const Buffer& rhs); private: const DataType* data_; size_t size_; diff --git a/src/buffer.cpp b/src/buffer.cpp index 7c281f7..58a61bc 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -27,9 +27,16 @@ * */ +#include +#include +#include #include "buffer.h" using std::string; +using std::equal; +using std::ostream; +using std::hex; +using std::dec; namespace cppkafka { @@ -55,8 +62,36 @@ Buffer::operator bool() const { return data_ != nullptr; } -string Buffer::as_string() const { +Buffer::operator string() const { return string(data_, data_ + size_); } +bool Buffer::operator==(const Buffer& rhs) const { + if (get_size() != rhs.get_size()) { + return false; + } + return equal(get_data(), get_data() + get_size(), rhs.get_data()); +} + +bool Buffer::operator!=(const Buffer& rhs) const { + return !(*this == rhs); +} + +ostream& operator<<(ostream& output, const Buffer& rhs) { + for (size_t i = 0; i < rhs.get_size(); ++i) { + char c = static_cast(rhs.get_data()[i]); + if (c >= ' ' && c < 127) { + output << c; + } + else { + output << "\\x"; + if (c < 16) { + output << '0'; + } + output << hex << (int)c << dec; + } + } + return output; +} + } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c5787ac..6f941d6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -30,6 +30,8 @@ create_test(producer) create_test(kafka_handle_base) create_test(topic_partition_list) create_test(configuration) +create_test(buffer) + if (ENABLE_ZOOKEEPER) create_test(zookeeper_watcher) endif() diff --git a/tests/buffer_test.cpp b/tests/buffer_test.cpp new file mode 100644 index 0000000..85540a9 --- /dev/null +++ b/tests/buffer_test.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include "cppkafka/buffer.h" + +using std::string; +using std::vector; +using std::ostringstream; + +using namespace cppkafka; + +class BufferTest : public testing::Test { +public: + +}; + +TEST_F(BufferTest, StringConversion) { + string data = "Hello world!"; + Buffer buffer(data); + string buffer_as_string = buffer; + EXPECT_EQ(data, buffer_as_string); +} + +TEST_F(BufferTest, StringConversionOnEmptyBuffer) { + Buffer buffer; + EXPECT_EQ("", static_cast(buffer)); +} + +TEST_F(BufferTest, VectorConversion) { + string data = "Hello world!"; + Buffer buffer(data); + vector buffer_as_vector = buffer; + EXPECT_EQ(data, string(buffer_as_vector.begin(), buffer_as_vector.end())); +} + +TEST_F(BufferTest, Equality) { + string data = "Hello world!"; + Buffer buffer1(data); + Buffer buffer2(data); + + EXPECT_EQ(buffer1, buffer2); +} + +TEST_F(BufferTest, InEquality) { + string data1 = "Hello world!"; + string data2 = "Hello worldz"; + Buffer buffer1(data1); + Buffer buffer2(data2); + + EXPECT_NE(buffer1, buffer2); +} + +TEST_F(BufferTest, OutputOperator) { + string data = "Hello \x7fwor\x03ld!"; + string pretty_string = "Hello \\x7fwor\\x03ld!"; + Buffer buffer(data); + + ostringstream output; + output << buffer; + EXPECT_EQ(pretty_string, output.str()); +} diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index a718874..a7a08ce 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -120,8 +120,8 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { const auto& messages = runner.get_messages(); ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; - EXPECT_EQ(payload, message.get_payload().as_string()); - EXPECT_EQ("", message.get_key().as_string()); + EXPECT_EQ(Buffer(payload), message.get_payload()); + EXPECT_FALSE(message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error()); @@ -151,8 +151,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) { const auto& messages = runner.get_messages(); ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; - EXPECT_EQ(payload, message.get_payload().as_string()); - EXPECT_EQ(key, message.get_key().as_string()); + EXPECT_EQ(Buffer(payload), message.get_payload()); + EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error()); @@ -183,9 +183,9 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { ASSERT_EQ(message_count, messages.size()); for (const auto& message : messages) { EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); - EXPECT_EQ(1, payloads.erase(message.get_payload().as_string())); + EXPECT_EQ(1, payloads.erase(message.get_payload())); EXPECT_EQ(0, message.get_error()); - EXPECT_EQ("", message.get_key().as_string()); + EXPECT_FALSE(message.get_key()); EXPECT_GE(message.get_partition(), 0); EXPECT_LT(message.get_partition(), 3); } @@ -205,14 +205,14 @@ TEST_F(ProducerTest, Callbacks) { bool deliver_report_called = false; Configuration config = make_producer_config(); config.set_delivery_report_callback([&](Producer&, const Message& msg) { - EXPECT_EQ(payload, msg.get_payload().as_string()); + EXPECT_EQ(Buffer(payload), msg.get_payload()); deliver_report_called = true; }); TopicConfiguration topic_config; topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, int32_t partition_count) { - EXPECT_EQ(key, msg_key.as_string()); + EXPECT_EQ(Buffer(key), msg_key); EXPECT_EQ(3, partition_count); EXPECT_EQ(KAFKA_TOPIC, topic.get_name()); return 0; @@ -227,8 +227,8 @@ TEST_F(ProducerTest, Callbacks) { const auto& messages = runner.get_messages(); ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; - EXPECT_EQ(payload, message.get_payload().as_string()); - EXPECT_EQ(key, message.get_key().as_string()); + EXPECT_EQ(Buffer(payload), message.get_payload()); + EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error()); @@ -252,7 +252,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { TopicConfiguration topic_config; topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, int32_t partition_count) { - EXPECT_EQ(key, msg_key.as_string()); + EXPECT_EQ(Buffer(key), msg_key); EXPECT_EQ(3, partition_count); EXPECT_EQ(KAFKA_TOPIC, topic.get_name()); callback_called = true; @@ -303,8 +303,8 @@ TEST_F(ProducerTest, ConnectUsingZookeeper) { const auto& messages = runner.get_messages(); ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; - EXPECT_EQ(payload, message.get_payload().as_string()); - EXPECT_EQ(key, message.get_key().as_string()); + EXPECT_EQ(Buffer(payload), message.get_payload()); + EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error());