diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 6c6d0f9..83edd74 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -40,6 +40,10 @@ namespace cppkafka { * * This is only a view, hence you should convert the contents of a buffer into * some other container if you want to store it somewhere. + * + * If you're using this to produce a message, you *need* to guarantee that the + * pointer that this buffer points to will still until the call to Producer::produce + * returns. */ class Buffer { public: @@ -62,6 +66,17 @@ public: static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte"); } + /** + * \brief Construct a buffer from a const string ref + * + * Note that you *can't use temporaries* here as they would be destructed after + * the constructor finishes. + */ + Buffer(const std::string& data); + + // Don't allow construction from temporaries + Buffer(std::string&&) = delete; + Buffer(const Buffer&) = delete; Buffer(Buffer&&) = default; Buffer& operator=(const Buffer&) = delete; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index d7ad4dc..834e931 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -70,11 +70,10 @@ class TopicConfiguration; * string payload = "some payload"; * * // Write a message into an unassigned partition - * producer.produce(topic, Partition(), Buffer(payload.data(), payload.size())); + * producer.produce(topic, Partition(), payload); * - * // Write using a key - * producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()), - * Buffer(key.data(), key.size())); + * // Write using a key on a fixed partition (42) + * producer.produce(topic, 42, payload, key); * * \endcode */ diff --git a/src/buffer.cpp b/src/buffer.cpp index b2b4d63..e4a1cc2 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -38,6 +38,11 @@ Buffer::Buffer() } +Buffer::Buffer(const string& data) +: Buffer(data.data(), data.size()) { + +} + const Buffer::DataType* Buffer::get_data() const { return data_; } diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index f5a6f59..0741644 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -118,7 +118,7 @@ TEST_F(ConsumerTest, AssignmentCallback) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + producer.produce(topic, partition, payload); runner.try_join(); // All 3 partitions should be ours @@ -173,7 +173,7 @@ TEST_F(ConsumerTest, Rebalance) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + producer.produce(topic, partition, payload); runner1.try_join(); runner2.try_join(); @@ -215,7 +215,7 @@ TEST_F(ConsumerTest, OffsetCommit) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + producer.produce(topic, partition, payload); runner.try_join(); ASSERT_EQ(1, runner.get_messages().size()); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index b9abc03..a718874 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -114,7 +114,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world! 1"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + producer.produce(topic, partition, payload); runner.try_join(); const auto& messages = runner.get_messages(); @@ -145,8 +145,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) { Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world! 2"; string key = "such key"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size()), - Buffer(key.data(), key.size())); + producer.produce(topic, partition, payload, key); runner.try_join(); const auto& messages = runner.get_messages(); @@ -176,8 +175,7 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { for (size_t i = 0; i < message_count; ++i) { string payload = payload_base + to_string(i); payloads.insert(payload); - producer.produce(topic, {} /*unassigned partition*/, - Buffer(payload.data(), payload.size())); + producer.produce(topic, {} /*unassigned partition*/, payload); } runner.try_join(); @@ -222,8 +220,7 @@ TEST_F(ProducerTest, Callbacks) { Producer producer(move(config)); Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config); - producer.produce(topic, {}, Buffer(payload.data(), payload.size()), - Buffer(key.data(), key.size())); + producer.produce(topic, {}, payload, key); producer.poll(); runner.try_join(); @@ -265,8 +262,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { Producer producer(move(config)); Topic topic = producer.get_topic(KAFKA_TOPIC); - producer.produce(topic, {}, Buffer(payload.data(), payload.size()), - Buffer(key.data(), key.size())); + producer.produce(topic, {}, payload, key); producer.poll(); runner.try_join(); @@ -301,8 +297,7 @@ TEST_F(ProducerTest, ConnectUsingZookeeper) { Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world! 2"; string key = "such key"; - producer.produce(topic, partition, Buffer(payload.data(), payload.size()), - Buffer(key.data(), key.size())); + producer.produce(topic, partition, payload, key); runner.try_join(); const auto& messages = runner.get_messages();