Allow constructing Buffer from a std::string

This commit is contained in:
Matias Fontanini
2016-06-14 19:15:46 -07:00
parent d90f039a87
commit c4054eaae0
5 changed files with 32 additions and 18 deletions

View File

@@ -40,6 +40,10 @@ namespace cppkafka {
*
* This is only a view, hence you should convert the contents of a buffer into
* some other container if you want to store it somewhere.
*
* If you're using this to produce a message, you *need* to guarantee that the
* pointer that this buffer points to will still until the call to Producer::produce
* returns.
*/
class Buffer {
public:
@@ -62,6 +66,17 @@ public:
static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte");
}
/**
* \brief Construct a buffer from a const string ref
*
* Note that you *can't use temporaries* here as they would be destructed after
* the constructor finishes.
*/
Buffer(const std::string& data);
// Don't allow construction from temporaries
Buffer(std::string&&) = delete;
Buffer(const Buffer&) = delete;
Buffer(Buffer&&) = default;
Buffer& operator=(const Buffer&) = delete;

View File

@@ -70,11 +70,10 @@ class TopicConfiguration;
* string payload = "some payload";
*
* // Write a message into an unassigned partition
* producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()));
* producer.produce(topic, Partition(), payload);
*
* // Write using a key
* producer.produce(topic, Partition(), Buffer(payload.data(), payload.size()),
* Buffer(key.data(), key.size()));
* // Write using a key on a fixed partition (42)
* producer.produce(topic, 42, payload, key);
*
* \endcode
*/

View File

@@ -38,6 +38,11 @@ Buffer::Buffer()
}
Buffer::Buffer(const string& data)
: Buffer(data.data(), data.size()) {
}
const Buffer::DataType* Buffer::get_data() const {
return data_;
}

View File

@@ -118,7 +118,7 @@ TEST_F(ConsumerTest, AssignmentCallback) {
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
producer.produce(topic, partition, payload);
runner.try_join();
// All 3 partitions should be ours
@@ -173,7 +173,7 @@ TEST_F(ConsumerTest, Rebalance) {
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
producer.produce(topic, partition, payload);
runner1.try_join();
runner2.try_join();
@@ -215,7 +215,7 @@ TEST_F(ConsumerTest, OffsetCommit) {
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
producer.produce(topic, partition, payload);
runner.try_join();
ASSERT_EQ(1, runner.get_messages().size());

View File

@@ -114,7 +114,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 1";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
producer.produce(topic, partition, payload);
runner.try_join();
const auto& messages = runner.get_messages();
@@ -145,8 +145,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 2";
string key = "such key";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
producer.produce(topic, partition, payload, key);
runner.try_join();
const auto& messages = runner.get_messages();
@@ -176,8 +175,7 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
for (size_t i = 0; i < message_count; ++i) {
string payload = payload_base + to_string(i);
payloads.insert(payload);
producer.produce(topic, {} /*unassigned partition*/,
Buffer(payload.data(), payload.size()));
producer.produce(topic, {} /*unassigned partition*/, payload);
}
runner.try_join();
@@ -222,8 +220,7 @@ TEST_F(ProducerTest, Callbacks) {
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config);
producer.produce(topic, {}, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
producer.produce(topic, {}, payload, key);
producer.poll();
runner.try_join();
@@ -265,8 +262,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC);
producer.produce(topic, {}, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
producer.produce(topic, {}, payload, key);
producer.poll();
runner.try_join();
@@ -301,8 +297,7 @@ TEST_F(ProducerTest, ConnectUsingZookeeper) {
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 2";
string key = "such key";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
producer.produce(topic, partition, payload, key);
runner.try_join();
const auto& messages = runner.get_messages();