diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index c412c11..05ae12a 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -629,6 +629,20 @@ impl CGWConnectionServer { }); } + async fn get_redis_last_update_timestamp(&self) -> i64 { + match self + .cgw_remote_discovery + .get_redis_last_update_timestamp() + .await + { + Ok(timestamp) => timestamp, + Err(e) => { + error!("{e}"); + 0i64 + } + } + } + async fn process_internal_nb_api_mbox( self: Arc, mut rx_mbox: CGWConnectionServerNBAPIMboxRx, @@ -643,17 +657,7 @@ impl CGWConnectionServer { // This only means that original capacity of all buffers is allocated to <100>, // it can still increase on demand or need automatically (upon insert, push_back etc) let cgw_buf_prealloc_size = 100; - let mut last_update_timestamp: i64 = match self - .cgw_remote_discovery - .get_redis_last_update_timestamp() - .await - { - Ok(timestamp) => timestamp, - Err(e) => { - error!("{e}"); - 0i64 - } - }; + let mut last_update_timestamp: i64 = self.get_redis_last_update_timestamp().await; let mut partition_array_idx: usize = 0; let mut local_shard_partition_key: Option; @@ -686,19 +690,9 @@ impl CGWConnectionServer { num_of_msg_read += rd_num; if rd_num == 0 { - let curretn_timestamp: i64 = match self - .cgw_remote_discovery - .get_redis_last_update_timestamp() - .await - { - Ok(timestamp) => timestamp, - Err(e) => { - error!("{e}"); - 0i64 - } - }; + let current_timestamp: i64 = self.get_redis_last_update_timestamp().await; - if last_update_timestamp != curretn_timestamp { + if last_update_timestamp != current_timestamp { if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await { error!("process_internal_nb_api_mbox: failed to sync GID to CGW map! Error: {e}"); } @@ -715,7 +709,7 @@ impl CGWConnectionServer { error!("Failed to sync Device cache! Error: {e}"); } - last_update_timestamp = curretn_timestamp; + last_update_timestamp = current_timestamp; } } @@ -824,6 +818,16 @@ impl CGWConnectionServer { .await { Ok(_dst_cgw_id) => { + // We successfully updated both SQL and REDIS + // cache. In order to keep it in sync with local + // one, we have to make sure we latest + // update timestamp locally, to prevent CGW + // from trying to update it in next iteration + // of the main loop, while this very own + // local shard _is_ responsible for timestamp + // update. + last_update_timestamp = self.get_redis_last_update_timestamp().await; + if let Ok(resp) = cgw_construct_infra_group_create_response( gid, self.local_cgw_id, @@ -873,6 +877,16 @@ impl CGWConnectionServer { .await { Ok(_dst_cgw_id) => { + // We successfully updated both SQL and REDIS + // cache. In order to keep it in sync with local + // one, we have to make sure we latest + // update timestamp locally, to prevent CGW + // from trying to update it in next iteration + // of the main loop, while this very own + // local shard _is_ responsible for timestamp + // update. + last_update_timestamp = self.get_redis_last_update_timestamp().await; + if let Ok(resp) = cgw_construct_infra_group_create_response( gid, self.local_cgw_id, @@ -921,6 +935,16 @@ impl CGWConnectionServer { .await { Ok(()) => { + // We successfully updated both SQL and REDIS + // cache. In order to keep it in sync with local + // one, we have to make sure we latest + // update timestamp locally, to prevent CGW + // from trying to update it in next iteration + // of the main loop, while this very own + // local shard _is_ responsible for timestamp + // update. + last_update_timestamp = self.get_redis_last_update_timestamp().await; + // We try to help free topomap memory usage // by notifying it whenever GID get's destroyed. // Howover, for allocation we let topo map @@ -1125,7 +1149,8 @@ impl CGWConnectionServer { CGWNBApiParsedMsg { uuid, gid, - msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(infras_list), + msg_type: + CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(infras_list), } => { if (self .cgw_remote_discovery @@ -1283,7 +1308,8 @@ impl CGWConnectionServer { CGWNBApiParsedMsg { uuid, gid, - msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(infras_list), + msg_type: + CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(infras_list), } => { if (self .cgw_remote_discovery