Changes per code review

This commit is contained in:
accelerated
2018-05-17 11:06:23 -04:00
parent ffc64b9a5a
commit ea9601ba1b
8 changed files with 29 additions and 46 deletions

View File

@@ -72,7 +72,7 @@ struct PollInterface {
* *
* Each call to poll() will first consume from the global event queue and if there are * Each call to poll() will first consume from the global event queue and if there are
* no pending events, will attempt to consume from all partitions until a valid message is found. * no pending events, will attempt to consume from all partitions until a valid message is found.
* The timeout used on this call will be the one configured via RoundRobinPollStrategy::set_timeout. * The timeout used on this call will be the one configured via PollInterface::set_timeout.
* *
* \return A message. The returned message *might* be empty. It's necessary to check * \return A message. The returned message *might* be empty. It's necessary to check
* that it's a valid one before using it (see example above). * that it's a valid one before using it (see example above).
@@ -86,7 +86,7 @@ struct PollInterface {
/** /**
* \brief Polls for new messages * \brief Polls for new messages
* *
* Same as the other overload of RoundRobinPollStrategy::poll but the provided * Same as the other overload of PollInterface::poll but the provided
* timeout will be used instead of the one configured on this Consumer. * timeout will be used instead of the one configured on this Consumer.
* *
* \param timeout The timeout to be used on this call * \param timeout The timeout to be used on this call
@@ -113,7 +113,7 @@ struct PollInterface {
/** /**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion * \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
* *
* Same as the other overload of RoundRobinPollStrategy::poll_batch but the provided * Same as the other overload of PollInterface::poll_batch but the provided
* timeout will be used instead of the one configured on this Consumer. * timeout will be used instead of the one configured on this Consumer.
* *
* \param max_batch_size The maximum amount of messages expected * \param max_batch_size The maximum amount of messages expected

View File

@@ -43,8 +43,8 @@ namespace cppkafka {
* related (user-specific) information. * related (user-specific) information.
*/ */
struct QueueData { struct QueueData {
Queue queue_; Queue queue;
boost::any metadata_; boost::any metadata;
}; };
/** /**
@@ -52,8 +52,7 @@ struct QueueData {
* *
* \brief Base implementation of the PollInterface * \brief Base implementation of the PollInterface
*/ */
class PollStrategyBase : public PollInterface class PollStrategyBase : public PollInterface {
{
public: public:
using QueueMap = std::map<TopicPartition, QueueData>; using QueueMap = std::map<TopicPartition, QueueData>;
@@ -99,18 +98,6 @@ protected:
*/ */
QueueData& get_consumer_queue(); QueueData& get_consumer_queue();
/**
* \brief Return the next queue to be processed
*
* Depending on the polling strategy, each implementation must define it's own algorithm for
* determining the next queue to poll.
*
* \param opaque Application specific data which can help determine the next queue.
*
* \return A partition queue
*/
virtual QueueData& get_next_queue(void* opaque = nullptr) = 0;
/** /**
* \brief Reset the internal state of the queues. * \brief Reset the internal state of the queues.
* *

View File

@@ -83,8 +83,7 @@ namespace cppkafka {
* the Consumer instance it owns. * the Consumer instance it owns.
*/ */
class RoundRobinPollStrategy : public PollStrategyBase class RoundRobinPollStrategy : public PollStrategyBase {
{
public: public:
RoundRobinPollStrategy(Consumer& consumer); RoundRobinPollStrategy(Consumer& consumer);
@@ -112,16 +111,13 @@ public:
std::chrono::milliseconds timeout) override; std::chrono::milliseconds timeout) override;
protected: protected:
/**
* \sa PollStrategyBase::get_next_queue
*/
QueueData& get_next_queue(void* opaque = nullptr) final;
/** /**
* \sa PollStrategyBase::reset_state * \sa PollStrategyBase::reset_state
*/ */
void reset_state() final; void reset_state() final;
QueueData& get_next_queue();
private: private:
void consume_batch(Queue& queue, void consume_batch(Queue& queue,
MessageList& messages, MessageList& messages,

View File

@@ -51,20 +51,20 @@ Message RoundRobinPollStrategy::poll() {
Message RoundRobinPollStrategy::poll(milliseconds timeout) { Message RoundRobinPollStrategy::poll(milliseconds timeout) {
// Always give priority to group and global events // Always give priority to group and global events
Message message = get_consumer_queue().queue_.consume(milliseconds(0)); Message message = get_consumer_queue().queue.consume(milliseconds(0));
if (message) { if (message) {
return message; return message;
} }
size_t num_queues = get_partition_queues().size(); size_t num_queues = get_partition_queues().size();
while (num_queues--) { while (num_queues--) {
//consume the next partition (non-blocking) //consume the next partition (non-blocking)
message = get_next_queue().queue_.consume(milliseconds(0)); message = get_next_queue().queue.consume(milliseconds(0));
if (message) { if (message) {
return message; return message;
} }
} }
// We still don't have a valid message so we block on the event queue // We still don't have a valid message so we block on the event queue
return get_consumer_queue().queue_.consume(timeout); return get_consumer_queue().queue.consume(timeout);
} }
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) { MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
@@ -76,16 +76,16 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon
ssize_t count = max_batch_size; ssize_t count = max_batch_size;
// batch from the group event queue first (non-blocking) // batch from the group event queue first (non-blocking)
consume_batch(get_consumer_queue().queue_, messages, count, milliseconds(0)); consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0));
size_t num_queues = get_partition_queues().size(); size_t num_queues = get_partition_queues().size();
while ((count > 0) && (num_queues--)) { while ((count > 0) && (num_queues--)) {
// batch from the next partition (non-blocking) // batch from the next partition (non-blocking)
consume_batch(get_next_queue().queue_, messages, count, milliseconds(0)); consume_batch(get_next_queue().queue, messages, count, milliseconds(0));
} }
// we still have space left in the buffer // we still have space left in the buffer
if (count > 0) { if (count > 0) {
// wait on the event queue until timeout // wait on the event queue until timeout
consume_batch(get_consumer_queue().queue_, messages, count, timeout); consume_batch(get_consumer_queue().queue, messages, count, timeout);
} }
return messages; return messages;
} }
@@ -93,8 +93,7 @@ MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, millisecon
void RoundRobinPollStrategy::consume_batch(Queue& queue, void RoundRobinPollStrategy::consume_batch(Queue& queue,
MessageList& messages, MessageList& messages,
ssize_t& count, ssize_t& count,
milliseconds timeout) milliseconds timeout) {
{
MessageList queue_messages = queue.consume_batch(count, timeout); MessageList queue_messages = queue.consume_batch(count, timeout);
if (queue_messages.empty()) { if (queue_messages.empty()) {
return; return;
@@ -111,11 +110,11 @@ void RoundRobinPollStrategy::consume_batch(Queue& queue,
void RoundRobinPollStrategy::restore_forwarding() { void RoundRobinPollStrategy::restore_forwarding() {
// forward all partition queues // forward all partition queues
for (const auto& toppar : get_partition_queues()) { for (const auto& toppar : get_partition_queues()) {
toppar.second.queue_.forward_to_queue(get_consumer_queue().queue_); toppar.second.queue.forward_to_queue(get_consumer_queue().queue);
} }
} }
QueueData& RoundRobinPollStrategy::get_next_queue(void* opaque) { QueueData& RoundRobinPollStrategy::get_next_queue() {
if (get_partition_queues().empty()) { if (get_partition_queues().empty()) {
throw QueueException(RD_KAFKA_RESP_ERR__STATE); throw QueueException(RD_KAFKA_RESP_ERR__STATE);
} }

View File

@@ -108,9 +108,7 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
// Create a consumer and subscribe to the topic // Create a consumer and subscribe to the topic
PollStrategyAdapter consumer(make_consumer_config()); PollStrategyAdapter consumer(make_consumer_config());
TopicPartitionList partitions; consumer.subscribe({ KAFKA_TOPICS[0] });
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
consumer.assign(partitions);
consumer.add_polling_strategy(unique_ptr<PollInterface>(new RoundRobinPollStrategy(consumer))); consumer.add_polling_strategy(unique_ptr<PollInterface>(new RoundRobinPollStrategy(consumer)));
PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
@@ -130,10 +128,14 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
REQUIRE(runner.get_messages().size() == total_messages); REQUIRE(runner.get_messages().size() == total_messages);
// Check that we have one message from each partition in desired order // Check that we have one message from each partition in desired order
vector<int> partition_order = make_roundrobin_partition_vector(total_messages); vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
int partition_idx;
for (int i = 0; i < total_messages; ++i) { for (int i = 0; i < total_messages; ++i) {
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+1]); if (i == 0) {
// find first polled partition index
partition_idx = runner.get_messages()[i].get_partition();
}
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
REQUIRE((string)runner.get_messages()[i].get_payload() == payload); REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
} }

View File

@@ -41,8 +41,7 @@ private:
* \brief Specific implementation which can be used with other * \brief Specific implementation which can be used with other
* util classes such as BasicConsumerDispatcher. * util classes such as BasicConsumerDispatcher.
*/ */
class PollStrategyAdapter : public Consumer class PollStrategyAdapter : public Consumer {
{
public: public:
PollStrategyAdapter(Configuration config); PollStrategyAdapter(Configuration config);
void add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy); void add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy);