mirror of
https://github.com/optim-enterprises-bv/openlan-cgw.git
synced 2025-10-30 01:42:20 +00:00
Merge pull request #102 from Telecominfraproject/feat/cgw_kafka_key_in_replies
Feat/cgw kafka key in replies
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -1523,6 +1523,12 @@ version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
|
||||
|
||||
[[package]]
|
||||
name = "murmur2"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fb585ade2549a017db2e35978b77c319214fa4b37cede841e27954dd6e8f3ca8"
|
||||
|
||||
[[package]]
|
||||
name = "native-tls"
|
||||
version = "0.2.12"
|
||||
@@ -3085,6 +3091,7 @@ dependencies = [
|
||||
"jsonschema",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"murmur2",
|
||||
"nix",
|
||||
"petgraph",
|
||||
"prometheus",
|
||||
|
||||
@@ -45,6 +45,7 @@ reqwest = { version = "0.12.5", features = ["json"] }
|
||||
jsonschema = { version = "0.18.0" }
|
||||
url = { version = "2.5.2" }
|
||||
nix = { version = "0.29.0", features = ["net"] }
|
||||
murmur2 = { version = "0.1.0" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.11.0"
|
||||
|
||||
@@ -175,7 +175,7 @@ enum CGWNBApiParsedMsgType {
|
||||
InfrastructureGroupCreateToShard(i32),
|
||||
InfrastructureGroupDelete,
|
||||
InfrastructureGroupInfrasAdd(Vec<MacAddress>),
|
||||
InfrastructureGroupInfraDel(Vec<MacAddress>),
|
||||
InfrastructureGroupInfrasDel(Vec<MacAddress>),
|
||||
InfrastructureGroupInfraMsg(MacAddress, String, Option<u64>),
|
||||
RebalanceGroups,
|
||||
}
|
||||
@@ -573,7 +573,9 @@ impl CGWConnectionServer {
|
||||
return Some(CGWNBApiParsedMsg::new(
|
||||
json_msg.uuid,
|
||||
group_id,
|
||||
CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(json_msg.infra_group_infras),
|
||||
CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(
|
||||
json_msg.infra_group_infras,
|
||||
),
|
||||
));
|
||||
}
|
||||
"infrastructure_group_infra_message_enqueue" => {
|
||||
@@ -655,6 +657,9 @@ impl CGWConnectionServer {
|
||||
}
|
||||
};
|
||||
|
||||
let mut partition_array_idx: usize = 0;
|
||||
let mut local_shard_partition_key: Option<String>;
|
||||
|
||||
loop {
|
||||
if num_of_msg_read < buf_capacity {
|
||||
// Try to recv_many, but don't sleep too much
|
||||
@@ -727,6 +732,26 @@ impl CGWConnectionServer {
|
||||
|
||||
debug!("Received {num_of_msg_read} messages from NB API, processing...");
|
||||
|
||||
let partition_mapping = self.nb_api_client.get_partition_to_local_shard_mapping();
|
||||
debug!("Kafka partitions idx:key mapping info: {:?}", partition_mapping);
|
||||
if !partition_mapping.is_empty() {
|
||||
partition_array_idx += 1;
|
||||
if partition_array_idx >= partition_mapping.len() {
|
||||
partition_array_idx = 0;
|
||||
}
|
||||
local_shard_partition_key = Some(partition_mapping[partition_array_idx].1.clone());
|
||||
|
||||
debug!(
|
||||
"Using kafka key '{}' for kafka partition idx '{}'",
|
||||
partition_mapping[partition_array_idx].1,
|
||||
partition_mapping[partition_array_idx].0
|
||||
);
|
||||
} else {
|
||||
warn!("Cannot get partition to local shard mapping, won't be able to return kafka routing key in NB request replies!");
|
||||
// Clear previously used partition key
|
||||
local_shard_partition_key = None;
|
||||
}
|
||||
|
||||
// We rely on this map only for a single iteration of received messages:
|
||||
// say, we receive 10 messages but 20 in queue, this means that gid->cgw_id
|
||||
// cache is clear at first, the filled up when processing first 10 messages,
|
||||
@@ -751,16 +776,6 @@ impl CGWConnectionServer {
|
||||
origin,
|
||||
) = msg;
|
||||
|
||||
let gid_numeric = match key.parse::<i32>() {
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Invalid KEY received from KAFKA bus message, ignoring it. Error: {e}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
let parsed_msg = match self.parse_nbapi_msg(&payload) {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
@@ -973,9 +988,17 @@ impl CGWConnectionServer {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We need to know which dst GID this request should be
|
||||
// forwarded to.
|
||||
// In order to get it, match to <any> parsed msg, and
|
||||
// get only gid field.
|
||||
let gid: i32 = match parsed_msg {
|
||||
CGWNBApiParsedMsg { gid, .. } => gid,
|
||||
};
|
||||
|
||||
match self
|
||||
.cgw_remote_discovery
|
||||
.get_infra_group_owner_id(gid_numeric)
|
||||
.get_infra_group_owner_id(gid)
|
||||
.await
|
||||
{
|
||||
Some(dst_cgw_id) => {
|
||||
@@ -1010,6 +1033,7 @@ impl CGWConnectionServer {
|
||||
|
||||
let discovery_clone = self.cgw_remote_discovery.clone();
|
||||
let self_clone = self.clone();
|
||||
let local_shard_partition_key_clone = local_shard_partition_key.clone();
|
||||
|
||||
// Future to Handle (relay) messages for remote CGW
|
||||
let relay_task_hdl = self.mbox_relay_msg_runtime_handle.spawn(async move {
|
||||
@@ -1045,6 +1069,7 @@ impl CGWConnectionServer {
|
||||
let cgw_id = value.0;
|
||||
let msg_stream = value.1;
|
||||
let self_clone = self_clone.clone();
|
||||
let local_shard_partition_key_clone = local_shard_partition_key_clone.clone();
|
||||
tokio::spawn(async move {
|
||||
if (discovery_clone
|
||||
.relay_request_stream_to_remote_cgw(cgw_id, msg_stream)
|
||||
@@ -1056,6 +1081,7 @@ impl CGWConnectionServer {
|
||||
Uuid::default(),
|
||||
false,
|
||||
Some(format!("Failed to relay MSG stream to remote CGW{cgw_id}")),
|
||||
local_shard_partition_key_clone,
|
||||
) {
|
||||
self_clone.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
|
||||
} else {
|
||||
@@ -1079,16 +1105,8 @@ impl CGWConnectionServer {
|
||||
_origin,
|
||||
) = msg;
|
||||
|
||||
let gid_numeric = match key.parse::<i32>() {
|
||||
Err(e) => {
|
||||
warn!("Invalid KEY received from KAFKA bus message, ignoring! Error: {e}");
|
||||
continue;
|
||||
}
|
||||
Ok(v) => v,
|
||||
};
|
||||
|
||||
debug!(
|
||||
"Received message for local CGW key: {key}, local id {}",
|
||||
"Received message for local CGW: key '{key}', local shard id '{}'",
|
||||
self.local_cgw_id
|
||||
);
|
||||
|
||||
@@ -1101,7 +1119,7 @@ impl CGWConnectionServer {
|
||||
} => {
|
||||
if (self
|
||||
.cgw_remote_discovery
|
||||
.get_infra_group_owner_id(gid_numeric)
|
||||
.get_infra_group_owner_id(gid)
|
||||
.await)
|
||||
.is_none()
|
||||
{
|
||||
@@ -1112,6 +1130,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to add infra list to nonexisting group, gid {gid}, uuid {uuid}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1140,6 +1159,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
true,
|
||||
None,
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1233,6 +1253,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to create few MACs from infras list (partial create), gid {gid}, uuid {uuid}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1250,11 +1271,11 @@ impl CGWConnectionServer {
|
||||
CGWNBApiParsedMsg {
|
||||
uuid,
|
||||
gid,
|
||||
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfraDel(mac_list),
|
||||
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(mac_list),
|
||||
} => {
|
||||
if (self
|
||||
.cgw_remote_discovery
|
||||
.get_infra_group_owner_id(gid_numeric)
|
||||
.get_infra_group_owner_id(gid)
|
||||
.await)
|
||||
.is_none()
|
||||
{
|
||||
@@ -1265,6 +1286,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to delete MACs from infra list, gid {gid}, uuid {uuid}: group does not exist")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1297,6 +1319,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
true,
|
||||
None,
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1331,6 +1354,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to destroy few MACs from infras list (partial delete), gid {gid}, uuid {uuid}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1357,7 +1381,7 @@ impl CGWConnectionServer {
|
||||
} => {
|
||||
if (self
|
||||
.cgw_remote_discovery
|
||||
.get_infra_group_owner_id(gid_numeric)
|
||||
.get_infra_group_owner_id(gid)
|
||||
.await)
|
||||
.is_none()
|
||||
{
|
||||
@@ -1366,6 +1390,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to sink down msg to device of nonexisting group, gid {gid}, uuid {uuid}: group does not exist")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1397,6 +1422,7 @@ impl CGWConnectionServer {
|
||||
msg,
|
||||
uuid,
|
||||
timeout,
|
||||
local_shard_partition_key.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -1407,6 +1433,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to validate config message! Invalid configure message for device: {device_mac}, uuid {uuid}\nError: {e}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1424,6 +1451,7 @@ impl CGWConnectionServer {
|
||||
msg,
|
||||
uuid,
|
||||
timeout,
|
||||
local_shard_partition_key.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
@@ -1439,6 +1467,7 @@ impl CGWConnectionServer {
|
||||
uuid,
|
||||
false,
|
||||
Some(format!("Failed to parse command message to device: {device_mac}, uuid {uuid}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(gid, resp);
|
||||
} else {
|
||||
@@ -1891,6 +1920,7 @@ impl CGWConnectionServer {
|
||||
message: String,
|
||||
uuid: Uuid,
|
||||
timeout: Option<u64>,
|
||||
local_shard_partition_key: Option<String>,
|
||||
) {
|
||||
if (infra_state == CGWDeviceState::CGWDeviceConnected)
|
||||
|| (infra_state == CGWDeviceState::CGWDeviceDisconnected
|
||||
@@ -1908,16 +1938,22 @@ impl CGWConnectionServer {
|
||||
req.uuid,
|
||||
false,
|
||||
Some("Request replaced with new!".to_string()),
|
||||
local_shard_partition_key,
|
||||
),
|
||||
None => cgw_construct_infra_enqueue_response(
|
||||
self.local_cgw_id,
|
||||
uuid,
|
||||
true,
|
||||
None,
|
||||
local_shard_partition_key,
|
||||
),
|
||||
None => {
|
||||
cgw_construct_infra_enqueue_response(self.local_cgw_id, uuid, true, None)
|
||||
}
|
||||
},
|
||||
Err(e) => cgw_construct_infra_enqueue_response(
|
||||
self.local_cgw_id,
|
||||
uuid,
|
||||
false,
|
||||
Some(e.to_string()),
|
||||
local_shard_partition_key,
|
||||
),
|
||||
};
|
||||
|
||||
@@ -1935,6 +1971,7 @@ impl CGWConnectionServer {
|
||||
Some(format!(
|
||||
"Device {mac} is disconnected! Accepting only Configure and Upgrade requests!"
|
||||
)),
|
||||
local_shard_partition_key,
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(infra_gid, resp);
|
||||
} else {
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::cgw_metrics::{CGWMetrics, CGWMetricsHealthComponent, CGWMetricsHealth
|
||||
|
||||
use eui48::MacAddress;
|
||||
use futures::stream::TryStreamExt;
|
||||
use murmur2::murmur2;
|
||||
use rdkafka::client::ClientContext;
|
||||
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
|
||||
use rdkafka::error::KafkaResult;
|
||||
@@ -20,6 +21,7 @@ use rdkafka::{
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
runtime::{Builder, Runtime},
|
||||
@@ -62,6 +64,8 @@ pub struct InfraGroupInfrasAddResponse {
|
||||
pub uuid: Uuid,
|
||||
pub success: bool,
|
||||
pub error_message: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub kafka_partition_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -73,6 +77,8 @@ pub struct InfraGroupInfrasDelResponse {
|
||||
pub uuid: Uuid,
|
||||
pub success: bool,
|
||||
pub error_message: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub kafka_partition_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -82,6 +88,8 @@ pub struct InfraGroupInfraMessageEnqueueResponse {
|
||||
pub uuid: Uuid,
|
||||
pub success: bool,
|
||||
pub error_message: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub kafka_partition_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -222,6 +230,7 @@ pub fn cgw_construct_infra_group_infras_add_response(
|
||||
uuid: Uuid,
|
||||
success: bool,
|
||||
error_message: Option<String>,
|
||||
kafka_partition_key: Option<String>,
|
||||
) -> Result<String> {
|
||||
let dev_add = InfraGroupInfrasAddResponse {
|
||||
r#type: "infrastructure_group_infras_add_response",
|
||||
@@ -231,6 +240,7 @@ pub fn cgw_construct_infra_group_infras_add_response(
|
||||
uuid,
|
||||
success,
|
||||
error_message,
|
||||
kafka_partition_key,
|
||||
};
|
||||
|
||||
Ok(serde_json::to_string(&dev_add)?)
|
||||
@@ -243,6 +253,7 @@ pub fn cgw_construct_infra_group_infras_del_response(
|
||||
uuid: Uuid,
|
||||
success: bool,
|
||||
error_message: Option<String>,
|
||||
kafka_partition_key: Option<String>,
|
||||
) -> Result<String> {
|
||||
let dev_del = InfraGroupInfrasDelResponse {
|
||||
r#type: "infrastructure_group_infras_del_response",
|
||||
@@ -252,6 +263,7 @@ pub fn cgw_construct_infra_group_infras_del_response(
|
||||
uuid,
|
||||
success,
|
||||
error_message,
|
||||
kafka_partition_key,
|
||||
};
|
||||
|
||||
Ok(serde_json::to_string(&dev_del)?)
|
||||
@@ -262,6 +274,7 @@ pub fn cgw_construct_infra_enqueue_response(
|
||||
uuid: Uuid,
|
||||
success: bool,
|
||||
error_message: Option<String>,
|
||||
kafka_partition_key: Option<String>,
|
||||
) -> Result<String> {
|
||||
let dev_enq_resp = InfraGroupInfraMessageEnqueueResponse {
|
||||
r#type: "infrastructure_group_infra_message_enqueue_response",
|
||||
@@ -269,6 +282,7 @@ pub fn cgw_construct_infra_enqueue_response(
|
||||
uuid,
|
||||
success,
|
||||
error_message,
|
||||
kafka_partition_key,
|
||||
};
|
||||
|
||||
Ok(serde_json::to_string(&dev_enq_resp)?)
|
||||
@@ -456,11 +470,125 @@ pub fn cgw_construct_infra_request_result_msg(
|
||||
Ok(serde_json::to_string(&infra_request_result)?)
|
||||
}
|
||||
|
||||
struct CustomContext;
|
||||
struct CGWConsumerContextData {
|
||||
// Tuple consistion of physical partition id (0,1,2.. etc)
|
||||
// and the corresponding _kafka routing key_, or just kafka key,
|
||||
// that can be used with this topic to access specified topic.
|
||||
// It can be used to optimize CGW to GID to Kafka topic mapping,
|
||||
// e.g. cloud has knowledge of what kafka key to use, to direct
|
||||
// a NB message to specific exact CGW, without the need of
|
||||
// alway backing to the use of relaying mechanism.
|
||||
// P.S. this optimization technic does not necessarily
|
||||
// make relaying obsolete. Relaying is still used to
|
||||
// forward at least one (first) NB request from
|
||||
// the shard that received message to the designated
|
||||
// recipient. Whenever recipient shard receives the NB
|
||||
// request and sends response back to NB services,
|
||||
// shard should reply back with routing_key included.
|
||||
// It's up to cloud (NB services) then to use specified
|
||||
// kafka key to make sure the kafka message reaches
|
||||
// recipient shard in exactly one hop (direct forwarding),
|
||||
// or omit kafka key completely to once again use the
|
||||
// relaying mechanism.
|
||||
partition_mapping: HashMap<u32, String>,
|
||||
assigned_partition_list: Vec<u32>,
|
||||
last_used_key_idx: u32,
|
||||
partition_num: usize,
|
||||
|
||||
// A bit ugly, but we need a way to get
|
||||
// consumer (to retrieve patition num) whenever
|
||||
// client->context rebalance callback is being called.
|
||||
consumer_client: Option<Arc<CGWCNCConsumerType>>,
|
||||
}
|
||||
|
||||
struct CustomContext {
|
||||
ctx_data: std::sync::RwLock<CGWConsumerContextData>,
|
||||
}
|
||||
|
||||
impl CGWConsumerContextData {
|
||||
fn recalculate_partition_to_key_mapping(&mut self, partition_num: usize) {
|
||||
const DEFAULT_HASH_SEED: u32 = 0x9747b28c;
|
||||
|
||||
// The factor of 10 is selected to cover >=15000 of topics,
|
||||
// meaning with 15K partitions, this algorithm can still
|
||||
// confidently covert all 15K partitions with unique
|
||||
// kafka string-keys.
|
||||
// Even then, anything past 10K of partitions per topics
|
||||
// could be an overkill in the first place, hence
|
||||
// this algo should be sufficient.
|
||||
let loop_range = Range {
|
||||
start: 0,
|
||||
end: partition_num * 10,
|
||||
};
|
||||
let mut key_map: HashMap<u32, String> = HashMap::new();
|
||||
|
||||
for x in loop_range {
|
||||
let key_str = x.to_string();
|
||||
let key_bytes = key_str.as_bytes();
|
||||
|
||||
if key_map.len() == partition_num {
|
||||
break;
|
||||
}
|
||||
|
||||
// Default partitioner users the following formula:
|
||||
// toPositive(murmur2(keyBytes)) % numPartitions
|
||||
let hash_res = murmur2(key_bytes, DEFAULT_HASH_SEED) & 0x7fffffff;
|
||||
let part_idx = hash_res.rem_euclid(partition_num as u32);
|
||||
|
||||
if !key_map.contains_key(&part_idx) {
|
||||
debug!("Inserted key '{key_str}' for '{part_idx}' partition");
|
||||
key_map.insert(part_idx, key_str);
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Filled {} unique keys for {} of partitions",
|
||||
key_map.len(),
|
||||
partition_num
|
||||
);
|
||||
|
||||
if key_map.len() != partition_num {
|
||||
// All this means, is that if some partition X has
|
||||
// no corresponding 1:1 kafka key.
|
||||
// From CGW perspective, this means that application
|
||||
// will always instruct NB to use a set of keys that
|
||||
// we were able to map, ignoring any other un-mapped
|
||||
// partitions, rendering them unused completely.
|
||||
// But it's up to NB still to either use or not provided
|
||||
// routing kafka key by CGW.
|
||||
warn!("Filled fulfill all range of kafka topics for 1:1 mapping, some partitions will not be mapped!");
|
||||
}
|
||||
|
||||
self.partition_mapping = key_map;
|
||||
}
|
||||
|
||||
fn get_partition_info(&mut self) -> (Vec<u32>, HashMap<u32, String>) {
|
||||
(
|
||||
self.assigned_partition_list.clone(),
|
||||
self.partition_mapping.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientContext for CustomContext {}
|
||||
|
||||
impl ConsumerContext for CustomContext {
|
||||
fn pre_rebalance(&self, rebalance: &Rebalance<'_>) {
|
||||
debug!("Pre rebalance entry");
|
||||
|
||||
// We need to make sure the <before>
|
||||
// we're _actually_ assigned a partition list,
|
||||
// we don't fool any internal code that depends
|
||||
// on the topic list, and zero-out it when not
|
||||
// ready, and return anything only when it's
|
||||
// available.
|
||||
if let Ok(mut ctx) = self.ctx_data.write() {
|
||||
ctx.partition_mapping.clear();
|
||||
ctx.assigned_partition_list.clear();
|
||||
ctx.last_used_key_idx = 0;
|
||||
ctx.partition_num = 0;
|
||||
}
|
||||
|
||||
let mut part_list = String::new();
|
||||
if let rdkafka::consumer::Rebalance::Assign(partitions) = rebalance {
|
||||
for x in partitions.elements() {
|
||||
@@ -480,15 +608,52 @@ impl ConsumerContext for CustomContext {
|
||||
}
|
||||
|
||||
fn post_rebalance(&self, rebalance: &Rebalance<'_>) {
|
||||
let mut assigned_partition_list: Vec<u32> = Vec::new();
|
||||
let mut part_list = String::new();
|
||||
|
||||
if let rdkafka::consumer::Rebalance::Assign(partitions) = rebalance {
|
||||
for x in partitions.elements() {
|
||||
part_list += &(x.partition().to_string() + " ");
|
||||
assigned_partition_list.push(x.partition() as u32);
|
||||
}
|
||||
debug!("post_rebalance callback, assigned partition(s): {part_list}");
|
||||
}
|
||||
|
||||
if let Ok(mut ctx) = self.ctx_data.write() {
|
||||
if let Some(consumer) = &ctx.consumer_client {
|
||||
if let Ok(metadata) =
|
||||
consumer.fetch_metadata(Some(CONSUMER_TOPICS[0]), Duration::from_millis(2000))
|
||||
{
|
||||
let topic = &metadata.topics()[0];
|
||||
let partition_num: usize = topic.partitions().len();
|
||||
|
||||
debug!("topic: {}, partitions: {}", topic.name(), partition_num);
|
||||
|
||||
// We recalculate mapping only if the underlying
|
||||
// _number_ of partitions's changed.
|
||||
// Also, the underlying assignment to a specific
|
||||
// partitions is irrelevant itself,
|
||||
// as key:partition mapping changes only whenever
|
||||
// underlying number of partitions is altered.
|
||||
//
|
||||
// This also means that the underlying block
|
||||
// will get executed at least once throughout the
|
||||
// CGW lifetime - at least once upon startup,
|
||||
// whenever _this_ CGW consumer group
|
||||
// consumer instance - CGW shard - is being
|
||||
// assigned a list of partitions to consume from.
|
||||
if ctx.partition_num != partition_num {
|
||||
ctx.partition_num = partition_num;
|
||||
ctx.assigned_partition_list = assigned_partition_list;
|
||||
|
||||
ctx.recalculate_partition_to_key_mapping(partition_num);
|
||||
}
|
||||
} else {
|
||||
warn!("Tried to fetch consumer metadata but failed. CGW will not be able to reply with optimized Kafka key for efficient routing!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
part_list.clear();
|
||||
|
||||
if let rdkafka::consumer::Rebalance::Revoke(partitions) = rebalance {
|
||||
@@ -508,13 +673,13 @@ impl ConsumerContext for CustomContext {
|
||||
});
|
||||
}
|
||||
|
||||
fn commit_callback(&self, _result: KafkaResult<()>, _offsets: &TopicPartitionList) {
|
||||
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
|
||||
let mut part_list = String::new();
|
||||
for x in _offsets.elements() {
|
||||
part_list += &(x.partition().to_string() + " ");
|
||||
}
|
||||
debug!("commit_callback callback, partition(s): {part_list}");
|
||||
debug!("Consumer callback: commited offset");
|
||||
debug!("Consumer callback: commiting offsets: {:?}", result);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,17 +692,25 @@ struct CGWCNCProducer {
|
||||
}
|
||||
|
||||
struct CGWCNCConsumer {
|
||||
c: CGWCNCConsumerType,
|
||||
c: Arc<CGWCNCConsumerType>,
|
||||
}
|
||||
|
||||
impl CGWCNCConsumer {
|
||||
pub fn new(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<Self> {
|
||||
let consum: CGWCNCConsumerType = Self::create_consumer(cgw_id, kafka_args)?;
|
||||
let consum = Self::create_consumer(cgw_id, kafka_args)?;
|
||||
Ok(CGWCNCConsumer { c: consum })
|
||||
}
|
||||
|
||||
fn create_consumer(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<CGWCNCConsumerType> {
|
||||
let context = CustomContext;
|
||||
fn create_consumer(cgw_id: i32, kafka_args: &CGWKafkaArgs) -> Result<Arc<CGWCNCConsumerType>> {
|
||||
let context = CustomContext {
|
||||
ctx_data: std::sync::RwLock::new(CGWConsumerContextData {
|
||||
partition_mapping: HashMap::new(),
|
||||
assigned_partition_list: Vec::new(),
|
||||
last_used_key_idx: 0u32,
|
||||
partition_num: 0usize,
|
||||
consumer_client: None,
|
||||
}),
|
||||
};
|
||||
|
||||
let consumer: CGWCNCConsumerType = match ClientConfig::new()
|
||||
.set("group.id", GROUP_ID)
|
||||
@@ -562,8 +735,12 @@ impl CGWCNCConsumer {
|
||||
}
|
||||
};
|
||||
|
||||
let consumer = Arc::new(consumer);
|
||||
// Need to set this guy for context
|
||||
let consumer_clone = consumer.clone();
|
||||
|
||||
debug!(
|
||||
"(consumer) (producer) Created lazy connection to kafka broker ({}:{})...",
|
||||
"(consumer) Created lazy connection to kafka broker ({}:{})...",
|
||||
kafka_args.kafka_host, kafka_args.kafka_port,
|
||||
);
|
||||
|
||||
@@ -575,6 +752,10 @@ impl CGWCNCConsumer {
|
||||
return Err(Error::Kafka(e));
|
||||
};
|
||||
|
||||
if let Ok(mut ctx) = consumer.context().ctx_data.write() {
|
||||
ctx.consumer_client = Some(consumer_clone);
|
||||
}
|
||||
|
||||
Ok(consumer)
|
||||
}
|
||||
}
|
||||
@@ -614,6 +795,7 @@ pub struct CGWNBApiClient {
|
||||
working_runtime_handle: Runtime,
|
||||
cgw_server_tx_mbox: CGWConnectionServerMboxTx,
|
||||
prod: CGWCNCProducer,
|
||||
consumer: Arc<CGWCNCConsumer>,
|
||||
// TBD: stplit different implementators through a defined trait,
|
||||
// that implements async R W operations?
|
||||
}
|
||||
@@ -631,14 +813,16 @@ impl CGWNBApiClient {
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
let consumer: Arc<CGWCNCConsumer> = Arc::new(CGWCNCConsumer::new(cgw_id, kafka_args)?);
|
||||
let consumer_clone = consumer.clone();
|
||||
let cl = Arc::new(CGWNBApiClient {
|
||||
working_runtime_handle: working_runtime_h,
|
||||
cgw_server_tx_mbox: cgw_tx.clone(),
|
||||
prod: CGWCNCProducer::new(kafka_args)?,
|
||||
consumer: consumer_clone,
|
||||
});
|
||||
|
||||
let cl_clone = cl.clone();
|
||||
let consumer: CGWCNCConsumer = CGWCNCConsumer::new(cgw_id, kafka_args)?;
|
||||
cl.working_runtime_handle.spawn(async move {
|
||||
loop {
|
||||
let cl_clone = cl_clone.clone();
|
||||
@@ -687,6 +871,26 @@ impl CGWNBApiClient {
|
||||
Ok(cl)
|
||||
}
|
||||
|
||||
pub fn get_partition_to_local_shard_mapping(&self) -> Vec<(u32, String)> {
|
||||
let mut return_vec: Vec<(u32, String)> = Vec::new();
|
||||
if let Ok(mut ctx) = self.consumer.c.context().ctx_data.write() {
|
||||
let (assigned_partition_list, mut partition_mapping) = ctx.get_partition_info();
|
||||
|
||||
if !partition_mapping.is_empty()
|
||||
&& ctx.partition_num > 0
|
||||
&& !assigned_partition_list.is_empty()
|
||||
{
|
||||
for x in assigned_partition_list {
|
||||
if let Some(key) = partition_mapping.remove(&x) {
|
||||
return_vec.push((x, key));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return_vec
|
||||
}
|
||||
|
||||
pub async fn enqueue_mbox_message_from_cgw_server(&self, key: String, payload: String) {
|
||||
let produce_future = self.prod.p.send(
|
||||
FutureRecord::to(PRODUCER_TOPICS)
|
||||
|
||||
1
utils/cert_generator/certs/client/macs.txt
Normal file
1
utils/cert_generator/certs/client/macs.txt
Normal file
File diff suppressed because one or more lines are too long
@@ -20,7 +20,7 @@ spawn:
|
||||
-v ${CA_CERT_PATH}:/etc/ca \
|
||||
-v ${CLIENT_CERT_PATH}:/etc/certs \
|
||||
${IMG_NAME} \
|
||||
PYTHONPATH="$PYTHONPATH:$PWD:$PWD/src/" python3 main.py -M ${MAC} -N ${COUNT} -s ${URL} \
|
||||
python3 main.py -M ${MAC} -N ${COUNT} -s ${URL} \
|
||||
--ca-cert /etc/ca/ca.crt \
|
||||
--client-certs-path /etc/certs \
|
||||
--msg-interval ${MSG_INTERVAL} \
|
||||
|
||||
@@ -1 +1 @@
|
||||
websockets==12.0
|
||||
websockets==13.1
|
||||
|
||||
@@ -9,4 +9,4 @@
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
./run.sh -s localhost:9092 -c 1 --new-group $1 0 some_name_0
|
||||
./run.sh -s localhost:9092 -c 1 --new-group $1 some_name_0
|
||||
|
||||
@@ -30,8 +30,8 @@ def time(input: str) -> float:
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Creates entries in kafka.")
|
||||
|
||||
parser.add_argument("-g", "--new-group", metavar=("GROUP-ID", "SHARD-ID", "NAME"),
|
||||
nargs=3, action="append",
|
||||
parser.add_argument("-g", "--new-group", metavar=("GROUP-ID", "NAME"),
|
||||
nargs=2, action="append",
|
||||
help="create a new group")
|
||||
parser.add_argument("-G", "--rm-group", metavar=("GROUP-ID"),
|
||||
nargs=1, action="append",
|
||||
@@ -92,11 +92,11 @@ def parse_args():
|
||||
send_to_macs=parsed_args.send_to_mac,
|
||||
)
|
||||
if parsed_args.new_group is not None:
|
||||
for group, shard, name in parsed_args.new_group:
|
||||
for group, name in parsed_args.new_group:
|
||||
try:
|
||||
args.add_groups.append((group, int(shard), name))
|
||||
args.add_groups.append((group, name))
|
||||
except ValueError:
|
||||
parser.error(f"--new-group: failed to parse shard id \"{shard}\"")
|
||||
parser.error(f"--new-group: failed to parse {group} / {name}")
|
||||
if parsed_args.rm_group is not None:
|
||||
for (group,) in parsed_args.rm_group:
|
||||
args.del_groups.append(group)
|
||||
|
||||
Reference in New Issue
Block a user