Remove Partition class

This commit is contained in:
Matias Fontanini
2017-04-16 14:51:06 -07:00
parent f5c8c7c3c8
commit 9ecad71be1
11 changed files with 64 additions and 197 deletions

View File

@@ -48,11 +48,11 @@ int main(int argc, char* argv[]) {
return 1;
}
MessageBuilder builder(topic_name);
// Get the partition we want to write to. If no partition is provided, this will be
// an unassigned one
Partition partition;
if (partition_value != -1) {
partition = partition_value;
builder.partition(partition_value);
}
// Construct the configuration
@@ -62,15 +62,14 @@ int main(int argc, char* argv[]) {
// Create the producer
Producer producer(config);
// Get the topic we want
Topic topic = producer.get_topic(topic_name);
cout << "Producing messages into topic " << topic_name << endl;
// Now read lines and write them into kafka
string line;
while (getline(cin, line)) {
builder.payload(line);
// Write the string into the partition
producer.produce(MessageBuilder(topic).partition(partition).payload(line));
producer.produce(builder);
}
}

View File

@@ -30,10 +30,8 @@
#ifndef CPPKAFKA_MESSAGE_BUILDER_H
#define CPPKAFKA_MESSAGE_BUILDER_H
#include <boost/optional.hpp>
#include "buffer.h"
#include "topic.h"
#include "partition.h"
#include "macros.h"
namespace cppkafka {
@@ -49,14 +47,21 @@ public:
*
* \param topic The topic into which this message would be produced
*/
BasicMessageBuilder(const Topic& topic);
BasicMessageBuilder(std::string topic);
/**
* Sets the topic in which this message will be produced
*
* \param value The topic to be used
*/
Concrete& topic(std::string value);
/**
* Sets the partition into which this message will be produced
*
* \param value The partition to be used
*/
Concrete& partition(Partition value);
Concrete& partition(int value);
/**
* Sets the message's key
@@ -96,12 +101,12 @@ public:
/**
* Gets the topic this message will be produced into
*/
const Topic& topic() const;
const std::string& topic() const;
/**
* Gets the partition this message will be produced into
*/
const Partition& partition() const;
int partition() const;
/**
* Gets the message's key
@@ -130,20 +135,26 @@ public:
private:
void construct_buffer(BufferType& lhs, const BufferType& rhs);
const Topic& topic_;
Partition partition_;
std::string topic_;
int partition_{-1};
BufferType key_;
BufferType payload_;
void* user_data_;
};
template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Topic& topic)
: topic_(topic) {
BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
: topic_(std::move(topic)) {
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::partition(Partition value) {
C& BasicMessageBuilder<T, C>::topic(std::string value) {
topic_ = std::move(value);
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::partition(int value) {
partition_ = value;
return static_cast<C&>(*this);
}
@@ -179,12 +190,12 @@ C& BasicMessageBuilder<T, C>::user_data(void* value) {
}
template <typename T, typename C>
const Topic& BasicMessageBuilder<T, C>::topic() const {
const std::string& BasicMessageBuilder<T, C>::topic() const {
return topic_;
}
template <typename T, typename C>
const Partition& BasicMessageBuilder<T, C>::partition() const {
int BasicMessageBuilder<T, C>::partition() const {
return partition_;
}
@@ -223,17 +234,13 @@ void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
*
* Allows building a message including topic, partition, key, payload, etc.
*
* The topic and buffer objects used <b>must</b> be kept alive while the message builder object
* is still being used.
*
* Example:
*
* \code
* Producer producer(...);
* Topic topic = producer.get_topic("test");
*
* string payload = "hello world";
* producer.produce(MessageBuilder(topic).partition(5).payload(payload));
* producer.produce(MessageBuilder("test").partition(5).payload(payload));
* \endcode
*/
class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {

View File

@@ -1,67 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_PARTITION_H
#define CPPKAFKA_PARTITION_H
#include "macros.h"
namespace cppkafka {
/**
* \brief Dumb wrapper over a partition
*
* This class is basically a wrapper over an int that when default constructed will default
* to using RD_KAFKA_PARTITION_UA so you don't need to use the macro name.
*/
class CPPKAFKA_API Partition {
public:
/**
* \brief Constructs an unassigned partition
*
* The partition's value will be RD_KAFKA_PARTITION_UA
*/
Partition();
/**
* Construct a partition using the given partition value
*/
Partition(int partition);
/**
* Gets the partition value
*/
int get_partition() const;
private:
int partition_;
};
} // cppkafka
#endif // CPPKAFKA_PARTITION_H

View File

@@ -35,7 +35,6 @@
#include "configuration.h"
#include "buffer.h"
#include "topic.h"
#include "partition.h"
#include "macros.h"
#include "message_builder.h"
@@ -65,18 +64,15 @@ class TopicConfiguration;
* // Create a producer
* Producer producer(config);
*
* // Get the topic we'll write into
* Topic topic = producer.get_topic("foo");
*
* // Create some key and payload
* string key = "creative_key_name";
* string payload = "some payload";
*
* // Write a message into an unassigned partition
* producer.produce(MessageBuilder(topic).payload(payload));
* producer.produce(MessageBuilder("some_topic").payload(payload));
*
* // Write using a key on a fixed partition (42)
* producer.produce(MessageBuilder(topic).partition(42).key(key).payload(payload));
* producer.produce(MessageBuilder("some_topic").partition(42).key(key).payload(payload));
*
* \endcode
*/

View File

@@ -70,14 +70,13 @@ public:
/**
* Simple helper to construct a builder object
*/
Builder make_builder(const Topic& topic);
Builder make_builder(std::string topic);
private:
// Pick the most appropriate index type depending on the platform we're using
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
template <typename BuilderType>
void do_add_message(BuilderType&& builder);
const Topic& get_topic(const std::string& topic);
void produce_message(IndexType index, Builder& message);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
@@ -86,7 +85,6 @@ private:
std::map<IndexType, Builder> messages_;
std::vector<IndexType> failed_indexes_;
IndexType current_index_{0};
std::vector<Topic> topics_;
std::unordered_map<std::string, unsigned> topic_mapping_;
};
@@ -126,10 +124,10 @@ void BufferedProducer<BufferType>::flush() {
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
Builder local_builder(get_topic(builder.topic().get_name()));
local_builder.partition(builder.partition());
local_builder.key(std::move(builder.key()));
local_builder.payload(std::move(builder.payload()));
Builder local_builder(builder.topic());
local_builder.partition(builder.partition())
.key(std::move(builder.key()))
.payload(std::move(builder.payload()));
IndexType index = messages_.size();
messages_.emplace(index, std::move(local_builder));
@@ -147,19 +145,8 @@ const Producer& BufferedProducer<BufferType>::get_producer() const {
template <typename BufferType>
typename BufferedProducer<BufferType>::Builder
BufferedProducer<BufferType>::make_builder(const Topic& topic) {
return Builder(topic);
}
template <typename BufferType>
const Topic& BufferedProducer<BufferType>::get_topic(const std::string& topic) {
auto iter = topic_mapping_.find(topic);
if (iter == topic_mapping_.end()) {
unsigned index = topics_.size();
topics_.push_back(producer_.get_topic(topic));
iter = topic_mapping_.emplace(topic, index).first;
}
return topics_[iter->second];
BufferedProducer<BufferType>::make_builder(std::string topic) {
return Builder(std::move(topic));
}
template <typename BufferType>

View File

@@ -4,7 +4,6 @@ set(SOURCES
configuration_option.cpp
exceptions.cpp
topic.cpp
partition.cpp
buffer.cpp
message.cpp
topic_partition.cpp

View File

@@ -1,49 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "partition.h"
#include <librdkafka/rdkafka.h>
namespace cppkafka {
Partition::Partition()
: partition_(RD_KAFKA_PARTITION_UA) {
}
Partition::Partition(int partition)
: partition_(partition) {
}
int Partition::get_partition() const {
return partition_;
}
} // cppkafka

View File

@@ -63,16 +63,18 @@ Producer::PayloadPolicy Producer::get_payload_policy() const {
}
void Producer::produce(const MessageBuilder& builder) {
void* payload_ptr = (void*)builder.payload().get_data();
void* key_ptr = (void*)builder.key().get_data();
const Buffer& payload = builder.payload();
const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_);
int result = rd_kafka_produce(builder.topic().get_handle(),
builder.partition().get_partition(),
policy, payload_ptr, builder.payload().get_size(),
key_ptr, builder.key().get_size(), builder.user_data());
if (result == -1) {
throw HandleException(rd_kafka_errno2err(errno));
}
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()),
RD_KAFKA_V_MSGFLAGS(policy),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_OPAQUE(builder.user_data()),
RD_KAFKA_V_END);
check_error(result);
}
int Producer::poll() {

View File

@@ -76,7 +76,6 @@ TEST_F(CompactedTopicProcessorTest, Consume) {
consumer.poll();
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
struct ElementType {
string value;
@@ -88,13 +87,13 @@ TEST_F(CompactedTopicProcessorTest, Consume) {
};
for (const auto& element_pair : elements) {
const ElementType& element = element_pair.second;
MessageBuilder builder(topic);
MessageBuilder builder(KAFKA_TOPIC);
builder.partition(element.partition).key(element_pair.first).payload(element.value);
producer.produce(builder);
}
// Now erase the first element
string deleted_key = "42";
producer.produce(MessageBuilder(topic).partition(0).key(deleted_key));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(0).key(deleted_key));
for (size_t i = 0; i < 10; ++i) {
compacted_consumer.process_event();

View File

@@ -116,9 +116,8 @@ TEST_F(ConsumerTest, AssignmentCallback) {
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(MessageBuilder(topic).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner.try_join();
// All 3 partitions should be ours
@@ -171,9 +170,8 @@ TEST_F(ConsumerTest, Rebalance) {
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(MessageBuilder(topic).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner1.try_join();
runner2.try_join();
@@ -213,9 +211,8 @@ TEST_F(ConsumerTest, OffsetCommit) {
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
producer.produce(MessageBuilder(topic).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner.try_join();
ASSERT_EQ(1, runner.get_messages().size());

View File

@@ -117,9 +117,8 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
// Now create a producer and produce a message
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 1";
producer.produce(MessageBuilder(topic).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner.try_join();
const auto& messages = runner.get_messages();
@@ -147,10 +146,9 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
// Now create a producer and produce a message
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 2";
string key = "such key";
producer.produce(MessageBuilder(topic).partition(partition).key(key).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload));
runner.try_join();
const auto& messages = runner.get_messages();
@@ -177,12 +175,11 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
// Now create a producer and produce a message
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload_base = "Hello world ";
for (size_t i = 0; i < message_count; ++i) {
string payload = payload_base + to_string(i);
payloads.insert(payload);
producer.produce(MessageBuilder(topic).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).payload(payload));
}
runner.try_join();
@@ -224,10 +221,10 @@ TEST_F(ProducerTest, Callbacks) {
EXPECT_EQ(KAFKA_TOPIC, topic.get_name());
return 0;
});
config.set_default_topic_configuration(topic_config);
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config);
producer.produce(MessageBuilder(topic).key(key).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
producer.poll();
runner.try_join();
@@ -268,8 +265,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
config.set_default_topic_configuration(topic_config);
Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC);
producer.produce(MessageBuilder(topic).key(key).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
producer.poll();
runner.try_join();
@@ -292,9 +288,10 @@ TEST_F(ProducerTest, BufferedProducer) {
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world! 2";
string key = "such key";
Topic topic = producer.get_producer().get_topic(KAFKA_TOPIC);
producer.add_message(MessageBuilder(topic).partition(partition).key(key).payload(payload));
producer.add_message(producer.make_builder(topic).partition(partition).payload(payload));
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition)
.key(key)
.payload(payload));
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.flush();
runner.try_join();