CGW: provide (optional) kafka routing key in some of replies

The provided kafka key in (some of the) replies can be used
by NB services to talk <directly> to _this_ local shard
that replied, effectively using kafka partitioning to
not rely on the relaying mechanism.
The keys list is calculated upon each kafka partition rebalancing,
if the partition number changes.

Signed-off-by: Oleksandr Mazur <oleksandr.mazur@plvision.eu>
This commit is contained in:
Oleksandr Mazur
2024-12-03 15:11:23 +02:00
parent 8ce7104ef8
commit 723c2ffb1b
4 changed files with 287 additions and 38 deletions

7
Cargo.lock generated
View File

@@ -1523,6 +1523,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "murmur2"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb585ade2549a017db2e35978b77c319214fa4b37cede841e27954dd6e8f3ca8"
[[package]]
name = "native-tls"
version = "0.2.12"
@@ -3085,6 +3091,7 @@ dependencies = [
"jsonschema",
"lazy_static",
"log",
"murmur2",
"nix",
"petgraph",
"prometheus",

View File

@@ -45,6 +45,7 @@ reqwest = { version = "0.12.5", features = ["json"] }
jsonschema = { version = "0.18.0" }
url = { version = "2.5.2" }
nix = { version = "0.29.0", features = ["net"] }
murmur2 = { version = "0.1.0" }
[build-dependencies]
tonic-build = "0.11.0"

View File

@@ -175,7 +175,7 @@ enum CGWNBApiParsedMsgType {
InfrastructureGroupCreateToShard(i32),
InfrastructureGroupDelete,
InfrastructureGroupInfrasAdd(Vec<MacAddress>),
InfrastructureGroupInfraDel(Vec<MacAddress>),
InfrastructureGroupInfrasDel(Vec<MacAddress>),
InfrastructureGroupInfraMsg(MacAddress, String, Option<u64>),
RebalanceGroups,
}
@@ -573,7 +573,9 @@ impl CGWConnectionServer {
return Some(CGWNBApiParsedMsg::new(
json_msg.uuid,
group_id,
CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(json_msg.infra_group_infras),
CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(
json_msg.infra_group_infras,
),
));
}
"infrastructure_group_infra_message_enqueue" => {
@@ -655,6 +657,9 @@ impl CGWConnectionServer {
}
};
let mut partition_array_idx: usize = 0;
let mut local_shard_partition_key: Option<String>;
loop {
if num_of_msg_read < buf_capacity {
// Try to recv_many, but don't sleep too much
@@ -727,6 +732,26 @@ impl CGWConnectionServer {
debug!("Received {num_of_msg_read} messages from NB API, processing...");
let partition_mapping = self.nb_api_client.get_partition_to_local_shard_mapping();
debug!("Kafka partitions idx:key mapping info: {:?}", partition_mapping);
if !partition_mapping.is_empty() {
partition_array_idx += 1;
if partition_array_idx >= partition_mapping.len() {
partition_array_idx = 0;
}
local_shard_partition_key = Some(partition_mapping[partition_array_idx].1.clone());
debug!(
"Using kafka key '{}' for kafka partition idx '{}'",
partition_mapping[partition_array_idx].1,
partition_mapping[partition_array_idx].0
);
} else {
warn!("Cannot get partition to local shard mapping, won't be able to return kafka routing key in NB request replies!");
// Clear previously used partition key
local_shard_partition_key = None;
}
// We rely on this map only for a single iteration of received messages:
// say, we receive 10 messages but 20 in queue, this means that gid->cgw_id
// cache is clear at first, the filled up when processing first 10 messages,
@@ -751,16 +776,6 @@ impl CGWConnectionServer {
origin,
) = msg;
let gid_numeric = match key.parse::<i32>() {
Err(e) => {
warn!(
"Invalid KEY received from KAFKA bus message, ignoring it. Error: {e}"
);
continue;
}
Ok(v) => v,
};
let parsed_msg = match self.parse_nbapi_msg(&payload) {
Some(val) => val,
None => {
@@ -973,9 +988,17 @@ impl CGWConnectionServer {
continue;
}
// We need to know which dst GID this request should be
// forwarded to.
// In order to get it, match to <any> parsed msg, and
// get only gid field.
let gid: i32 = match parsed_msg {
CGWNBApiParsedMsg { gid, .. } => gid,
};
match self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await
{
Some(dst_cgw_id) => {
@@ -1010,6 +1033,7 @@ impl CGWConnectionServer {
let discovery_clone = self.cgw_remote_discovery.clone();
let self_clone = self.clone();
let local_shard_partition_key_clone = local_shard_partition_key.clone();
// Future to Handle (relay) messages for remote CGW
let relay_task_hdl = self.mbox_relay_msg_runtime_handle.spawn(async move {
@@ -1045,6 +1069,7 @@ impl CGWConnectionServer {
let cgw_id = value.0;
let msg_stream = value.1;
let self_clone = self_clone.clone();
let local_shard_partition_key_clone = local_shard_partition_key_clone.clone();
tokio::spawn(async move {
if (discovery_clone
.relay_request_stream_to_remote_cgw(cgw_id, msg_stream)
@@ -1056,6 +1081,7 @@ impl CGWConnectionServer {
Uuid::default(),
false,
Some(format!("Failed to relay MSG stream to remote CGW{cgw_id}")),
local_shard_partition_key_clone,
) {
self_clone.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
} else {
@@ -1079,16 +1105,8 @@ impl CGWConnectionServer {
_origin,
) = msg;
let gid_numeric = match key.parse::<i32>() {
Err(e) => {
warn!("Invalid KEY received from KAFKA bus message, ignoring! Error: {e}");
continue;
}
Ok(v) => v,
};
debug!(
"Received message for local CGW key: {key}, local id {}",
"Received message for local CGW: key '{key}', local shard id '{}'",
self.local_cgw_id
);
@@ -1101,7 +1119,7 @@ impl CGWConnectionServer {
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
@@ -1112,6 +1130,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to add infra list to nonexisting group, gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1140,6 +1159,7 @@ impl CGWConnectionServer {
uuid,
true,
None,
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1233,6 +1253,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to create few MACs from infras list (partial create), gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1250,11 +1271,11 @@ impl CGWConnectionServer {
CGWNBApiParsedMsg {
uuid,
gid,
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(mac_list),
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(mac_list),
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
@@ -1265,6 +1286,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to delete MACs from infra list, gid {gid}, uuid {uuid}: group does not exist")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1297,6 +1319,7 @@ impl CGWConnectionServer {
uuid,
true,
None,
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1331,6 +1354,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to destroy few MACs from infras list (partial delete), gid {gid}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1357,7 +1381,7 @@ impl CGWConnectionServer {
} => {
if (self
.cgw_remote_discovery
.get_infra_group_owner_id(gid_numeric)
.get_infra_group_owner_id(gid)
.await)
.is_none()
{
@@ -1366,6 +1390,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1397,6 +1422,7 @@ impl CGWConnectionServer {
msg,
uuid,
timeout,
local_shard_partition_key.clone(),
)
.await;
}
@@ -1407,6 +1433,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1424,6 +1451,7 @@ impl CGWConnectionServer {
msg,
uuid,
timeout,
local_shard_partition_key.clone(),
)
.await;
}
@@ -1439,6 +1467,7 @@ impl CGWConnectionServer {
uuid,
false,
Some(format!("Failed to parse command message to device: {device_mac}, uuid {uuid}")),
local_shard_partition_key.clone(),
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
} else {
@@ -1891,6 +1920,7 @@ impl CGWConnectionServer {
message: String,
uuid: Uuid,
timeout: Option<u64>,
local_shard_partition_key: Option<String>,
) {
if (infra_state == CGWDeviceState::CGWDeviceConnected)
|| (infra_state == CGWDeviceState::CGWDeviceDisconnected
@@ -1908,16 +1938,22 @@ impl CGWConnectionServer {
req.uuid,
false,
Some("Request replaced with new!".to_string()),
local_shard_partition_key,
),
None => cgw_construct_infra_enqueue_response(
self.local_cgw_id,
uuid,
true,
None,
local_shard_partition_key,
),
None => {
cgw_construct_infra_enqueue_response(self.local_cgw_id, uuid, true, None)
}
},
Err(e) => cgw_construct_infra_enqueue_response(
self.local_cgw_id,
uuid,
false,
Some(e.to_string()),
local_shard_partition_key,
),
};
@@ -1935,6 +1971,7 @@ impl CGWConnectionServer {
Some(format!(
"Device {mac} is disconnected! Accepting only Configure and Upgrade requests!"
)),
local_shard_partition_key,
) {
self.enqueue_mbox_message_from_cgw_to_nb_api(infra_gid, resp);
} else {

View File

@@ -8,6 +8,7 @@ use crate::cgw_metrics::{CGWMetrics, CGWMetricsHealthComponent, CGWMetricsHealth
use eui48::MacAddress;
use futures::stream::TryStreamExt;
use murmur2::murmur2;
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::error::KafkaResult;
@@ -20,6 +21,7 @@ use rdkafka::{
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::Range;
use std::sync::Arc;
use tokio::{
runtime::{Builder, Runtime},
@@ -62,6 +64,8 @@ pub struct InfraGroupInfrasAddResponse {
pub uuid: Uuid,
pub success: bool,
pub error_message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kafka_partition_key: Option<String>,
}
#[derive(Debug, Serialize)]
@@ -73,6 +77,8 @@ pub struct InfraGroupInfrasDelResponse {
pub uuid: Uuid,
pub success: bool,
pub error_message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kafka_partition_key: Option<String>,
}
#[derive(Debug, Serialize)]
@@ -82,6 +88,8 @@ pub struct InfraGroupInfraMessageEnqueueResponse {
pub uuid: Uuid,
pub success: bool,
pub error_message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kafka_partition_key: Option<String>,
}
#[derive(Debug, Serialize)]
@@ -222,6 +230,7 @@ pub fn cgw_construct_infra_group_infras_add_response(
uuid: Uuid,
success: bool,
error_message: Option<String>,
kafka_partition_key: Option<String>,
) -> Result<String> {
let dev_add = InfraGroupInfrasAddResponse {
r#type: "infrastructure_group_infras_add_response",
@@ -231,6 +240,7 @@ pub fn cgw_construct_infra_group_infras_add_response(
uuid,
success,
error_message,
kafka_partition_key,
};
Ok(serde_json::to_string(&dev_add)?)
@@ -243,6 +253,7 @@ pub fn cgw_construct_infra_group_infras_del_response(
uuid: Uuid,
success: bool,
error_message: Option<String>,
kafka_partition_key: Option<String>,
) -> Result<String> {
let dev_del = InfraGroupInfrasDelResponse {
r#type: "infrastructure_group_infras_del_response",
@@ -252,6 +263,7 @@ pub fn cgw_construct_infra_group_infras_del_response(
uuid,
success,
error_message,
kafka_partition_key,
};
Ok(serde_json::to_string(&dev_del)?)
@@ -262,6 +274,7 @@ pub fn cgw_construct_infra_enqueue_response(
uuid: Uuid,
success: bool,
error_message: Option<String>,
kafka_partition_key: Option<String>,
) -> Result<String> {
let dev_enq_resp = InfraGroupInfraMessageEnqueueResponse {
r#type: "infrastructure_group_infra_message_enqueue_response",
@@ -269,6 +282,7 @@ pub fn cgw_construct_infra_enqueue_response(
uuid,
success,
error_message,
kafka_partition_key,
};
Ok(serde_json::to_string(&dev_enq_resp)?)
@@ -456,11 +470,125 @@ pub fn cgw_construct_infra_request_result_msg(
Ok(serde_json::to_string(&infra_request_result)?)
}
struct CustomContext;
struct CGWConsumerContextData {
// Tuple consistion of physical partition id (0,1,2.. etc)
// and the corresponding _kafka routing key_, or just kafka key,
// that can be used with this topic to access specified topic.
// It can be used to optimize CGW to GID to Kafka topic mapping,
// e.g. cloud has knowledge of what kafka key to use, to direct
// a NB message to specific exact CGW, without the need of
// alway backing to the use of relaying mechanism.
// P.S. this optimization technic does not necessarily
// make relaying obsolete. Relaying is still used to
// forward at least one (first) NB request from
// the shard that received message to the designated
// recipient. Whenever recipient shard receives the NB
// request and sends response back to NB services,
// shard should reply back with routing_key included.
// It's up to cloud (NB services) then to use specified
// kafka key to make sure the kafka message reaches
// recipient shard in exactly one hop (direct forwarding),
// or omit kafka key completely to once again use the
// relaying mechanism.
partition_mapping: HashMap<u32, String>,
assigned_partition_list: Vec<u32>,
last_used_key_idx: u32,
partition_num: usize,
// A bit ugly, but we need a way to get
// consumer (to retrieve patition num) whenever
// client->context rebalance callback is being called.
consumer_client: Option<Arc<CGWCNCConsumerType>>,
}
struct CustomContext {
ctx_data: std::sync::RwLock<CGWConsumerContextData>,
}
impl CGWConsumerContextData {
fn recalculate_partition_to_key_mapping(&mut self, partition_num: usize) {
const DEFAULT_HASH_SEED: u32 = 0x9747b28c;
// The factor of 10 is selected to cover >=15000 of topics,
// meaning with 15K partitions, this algorithm can still
// confidently covert all 15K partitions with unique
// kafka string-keys.
// Even then, anything past 10K of partitions per topics
// could be an overkill in the first place, hence
// this algo should be sufficient.
let loop_range = Range {
start: 0,
end: partition_num * 10,
};
let mut key_map: HashMap<u32, String> = HashMap::new();
for x in loop_range {
let key_str = x.to_string();
let key_bytes = key_str.as_bytes();
if key_map.len() == partition_num {
break;
}
// Default partitioner users the following formula:
// toPositive(murmur2(keyBytes)) % numPartitions
let hash_res = murmur2(key_bytes, DEFAULT_HASH_SEED) & 0x7fffffff;
let part_idx = hash_res.rem_euclid(partition_num as u32);
if !key_map.contains_key(&part_idx) {
debug!("Inserted key '{key_str}' for '{part_idx}' partition");
key_map.insert(part_idx, key_str);
}
}
info!(
"Filled {} unique keys for {} of partitions",
key_map.len(),
partition_num
);
if key_map.len() != partition_num {
// All this means, is that if some partition X has
// no corresponding 1:1 kafka key.
// From CGW perspective, this means that application
// will always instruct NB to use a set of keys that
// we were able to map, ignoring any other un-mapped
// partitions, rendering them unused completely.
// But it's up to NB still to either use or not provided
// routing kafka key by CGW.
warn!("Filled fulfill all range of kafka topics for 1:1 mapping, some partitions will not be mapped!");
}
self.partition_mapping = key_map;
}
fn get_partition_info(&mut self) -> (Vec<u32>, HashMap<u32, String>) {
(
self.assigned_partition_list.clone(),
self.partition_mapping.clone(),
)
}
}
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn pre_rebalance(&self, rebalance: &Rebalance<'_>) {
debug!("Pre rebalance entry");
// We need to make sure the <before>
// we're _actually_ assigned a partition list,
// we don't fool any internal code that depends
// on the topic list, and zero-out it when not
// ready, and return anything only when it's
// available.
if let Ok(mut ctx) = self.ctx_data.write() {
ctx.partition_mapping.clear();
ctx.assigned_partition_list.clear();
ctx.last_used_key_idx = 0;
ctx.partition_num = 0;
}
let mut part_list = String::new();
if let rdkafka::consumer::Rebalance::Assign(partitions) = rebalance {
for x in partitions.elements() {
@@ -480,15 +608,52 @@ impl ConsumerContext for CustomContext {
}
fn post_rebalance(&self, rebalance: &Rebalance<'_>) {
let mut assigned_partition_list: Vec<u32> = Vec::new();
let mut part_list = String::new();
if let rdkafka::consumer::Rebalance::Assign(partitions) = rebalance {
for x in partitions.elements() {
part_list += &(x.partition().to_string() + " ");
assigned_partition_list.push(x.partition() as u32);
}
debug!("post_rebalance callback, assigned partition(s): {part_list}");
}
if let Ok(mut ctx) = self.ctx_data.write() {
if let Some(consumer) = &ctx.consumer_client {
if let Ok(metadata) =
consumer.fetch_metadata(Some(CONSUMER_TOPICS[0]), Duration::from_millis(2000))
{
let topic = &metadata.topics()[0];
let partition_num: usize = topic.partitions().len();
debug!("topic: {}, partitions: {}", topic.name(), partition_num);
// We recalculate mapping only if the underlying
// _number_ of partitions's changed.
// Also, the underlying assignment to a specific
// partitions is irrelevant itself,
// as key:partition mapping changes only whenever
// underlying number of partitions is altered.
//
// This also means that the underlying block
// will get executed at least once throughout the
// CGW lifetime - at least once upon startup,
// whenever _this_ CGW consumer group
// consumer instance - CGW shard - is being
// assigned a list of partitions to consume from.
if ctx.partition_num != partition_num {
ctx.partition_num = partition_num;
ctx.assigned_partition_list = assigned_partition_list;
ctx.recalculate_partition_to_key_mapping(partition_num);
}
} else {
warn!("Tried to fetch consumer metadata but failed. CGW will not be able to reply with optimized Kafka key for efficient routing!");
}
}
}
part_list.clear();
if let rdkafka::consumer::Rebalance::Revoke(partitions) = rebalance {
@@ -508,13 +673,13 @@ impl ConsumerContext for CustomContext {
});
}
fn commit_callback(&self, _result: KafkaResult<()>, _offsets: &TopicPartitionList) {
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
let mut part_list = String::new();
for x in _offsets.elements() {
part_list += &(x.partition().to_string() + " ");
}
debug!("commit_callback callback, partition(s): {part_list}");
debug!("Consumer callback: commited offset");
debug!("Consumer callback: commiting offsets: {:?}", result);
}
}
@@ -527,17 +692,25 @@ struct CGWCNCProducer {
}
struct CGWCNCConsumer {
c: CGWCNCConsumerType,
c: Arc<CGWCNCConsumerType>,
}
impl CGWCNCConsumer {
pub fn new(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<Self> {
let consum: CGWCNCConsumerType = Self::create_consumer(cgw_id, kafka_args)?;
let consum = Self::create_consumer(cgw_id, kafka_args)?;
Ok(CGWCNCConsumer { c: consum })
}
fn create_consumer(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<CGWCNCConsumerType> {
let context = CustomContext;
fn create_consumer(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<Arc<CGWCNCConsumerType>> {
let context = CustomContext {
ctx_data: std::sync::RwLock::new(CGWConsumerContextData {
partition_mapping: HashMap::new(),
assigned_partition_list: Vec::new(),
last_used_key_idx: 0u32,
partition_num: 0usize,
consumer_client: None,
}),
};
let consumer: CGWCNCConsumerType = match ClientConfig::new()
.set("group.id", GROUP_ID)
@@ -562,8 +735,12 @@ impl CGWCNCConsumer {
}
};
let consumer = Arc::new(consumer);
// Need to set this guy for context
let consumer_clone = consumer.clone();
debug!(
"(consumer) (producer) Created lazy connection to kafka broker ({}:{})...",
"(consumer) Created lazy connection to kafka broker ({}:{})...",
kafka_args.kafka_host, kafka_args.kafka_port,
);
@@ -575,6 +752,10 @@ impl CGWCNCConsumer {
return Err(Error::Kafka(e));
};
if let Ok(mut ctx) = consumer.context().ctx_data.write() {
ctx.consumer_client = Some(consumer_clone);
}
Ok(consumer)
}
}
@@ -614,6 +795,7 @@ pub struct CGWNBApiClient {
working_runtime_handle: Runtime,
cgw_server_tx_mbox: CGWConnectionServerMboxTx,
prod: CGWCNCProducer,
consumer: Arc<CGWCNCConsumer>,
// TBD: stplit different implementators through a defined trait,
// that implements async R W operations?
}
@@ -631,14 +813,16 @@ impl CGWNBApiClient {
.enable_all()
.build()?;
let consumer: Arc<CGWCNCConsumer> = Arc::new(CGWCNCConsumer::new(cgw_id, kafka_args)?);
let consumer_clone = consumer.clone();
let cl = Arc::new(CGWNBApiClient {
working_runtime_handle: working_runtime_h,
cgw_server_tx_mbox: cgw_tx.clone(),
prod: CGWCNCProducer::new(kafka_args)?,
consumer: consumer_clone,
});
let cl_clone = cl.clone();
let consumer: CGWCNCConsumer = CGWCNCConsumer::new(cgw_id, kafka_args)?;
cl.working_runtime_handle.spawn(async move {
loop {
let cl_clone = cl_clone.clone();
@@ -687,6 +871,26 @@ impl CGWNBApiClient {
Ok(cl)
}
pub fn get_partition_to_local_shard_mapping(&self) -> Vec<(u32, String)> {
let mut return_vec: Vec<(u32, String)> = Vec::new();
if let Ok(mut ctx) = self.consumer.c.context().ctx_data.write() {
let (assigned_partition_list, mut partition_mapping) = ctx.get_partition_info();
if !partition_mapping.is_empty()
&& ctx.partition_num > 0
&& !assigned_partition_list.is_empty()
{
for x in assigned_partition_list {
if let Some(key) = partition_mapping.remove(&x) {
return_vec.push((x, key));
}
}
}
}
return_vec
}
pub async fn enqueue_mbox_message_from_cgw_server(&self, key: String, payload: String) {
let produce_future = self.prod.p.send(
FutureRecord::to(PRODUCER_TOPICS)