Take key before value on Producer::produce

This commit is contained in:
Matias Fontanini
2016-08-14 15:49:23 -07:00
parent af72add34c
commit b768d52791
3 changed files with 16 additions and 16 deletions

View File

@@ -75,7 +75,7 @@ class TopicConfiguration;
* producer.produce(topic, Partition(), payload); * producer.produce(topic, Partition(), payload);
* *
* // Write using a key on a fixed partition (42) * // Write using a key on a fixed partition (42)
* producer.produce(topic, 42, payload, key); * producer.produce(topic, 42, key, payload);
* *
* \endcode * \endcode
*/ */
@@ -119,23 +119,23 @@ public:
* *
* \param topic The topic to write the message to * \param topic The topic to write the message to
* \param partition The partition to write the message to * \param partition The partition to write the message to
* \param payload The message payload
* \param key The message key * \param key The message key
* \param payload The message payload
*/ */
void produce(const Topic& topic, const Partition& partition, const Buffer& payload, void produce(const Topic& topic, const Partition& partition, const Buffer& key,
const Buffer& key); const Buffer& payload);
/** /**
* Produces a message * Produces a message
* *
* \param topic The topic to write the message to * \param topic The topic to write the message to
* \param partition The partition to write the message to * \param partition The partition to write the message to
* \param payload The message payload
* \param key The message key * \param key The message key
* \param payload The message payload
* \param user_data The opaque data pointer to be used (accesible via Message::private_data) * \param user_data The opaque data pointer to be used (accesible via Message::private_data)
*/ */
void produce(const Topic& topic, const Partition& partition, const Buffer& payload, void produce(const Topic& topic, const Partition& partition, const Buffer& key,
const Buffer& key, void* user_data); const Buffer& payload, void* user_data);
/** /**
* \brief Polls on this handle * \brief Polls on this handle

View File

@@ -61,16 +61,16 @@ Producer::PayloadPolicy Producer::get_payload_policy() const {
} }
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) {
produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/); produce(topic, partition, Buffer{} /*key*/, payload, nullptr /*user_data*/);
} }
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload, void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& key,
const Buffer& key) { const Buffer& payload) {
produce(topic, partition, payload, key, nullptr /*user_data*/); produce(topic, partition, key, payload, nullptr /*user_data*/);
} }
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload, void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& key,
const Buffer& key, void* user_data) { const Buffer& payload, void* user_data) {
void* payload_ptr = (void*)payload.get_data(); void* payload_ptr = (void*)payload.get_data();
void* key_ptr = (void*)key.get_data(); void* key_ptr = (void*)key.get_data();
const int policy = static_cast<int>(message_payload_policy_); const int policy = static_cast<int>(message_payload_policy_);

View File

@@ -146,7 +146,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
Topic topic = producer.get_topic(KAFKA_TOPIC); Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 2"; string payload = "Hello world! 2";
string key = "such key"; string key = "such key";
producer.produce(topic, partition, payload, key); producer.produce(topic, partition, key, payload);
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();
@@ -223,7 +223,7 @@ TEST_F(ProducerTest, Callbacks) {
Producer producer(move(config)); Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config); Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config);
producer.produce(topic, {}, payload, key); producer.produce(topic, {}, key, payload);
producer.poll(); producer.poll();
runner.try_join(); runner.try_join();
@@ -265,7 +265,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
Producer producer(move(config)); Producer producer(move(config));
Topic topic = producer.get_topic(KAFKA_TOPIC); Topic topic = producer.get_topic(KAFKA_TOPIC);
producer.produce(topic, {}, payload, key); producer.produce(topic, {}, key, payload);
producer.poll(); producer.poll();
runner.try_join(); runner.try_join();