mirror of
https://github.com/optim-enterprises-bv/openlan-cgw.git
synced 2025-11-01 02:37:46 +00:00
CGW: Connection server: update local redis timestamp after handling requests
Internally, group add / del requests do alter the Redis timestamp (lat update timestamp), however the main loop of connection_server that processes messages is _not_ aware of this change up untill the next iteration of the loop. Fix this to make sure we _save_ locally the timestamp that we get from redis once we know for sure that local shard modified it (e.g. after group add/del request handled). Signed-off-by: Oleksandr Mazur <oleksandr.mazur@plvision.eu>
This commit is contained in:
@@ -629,6 +629,20 @@ impl CGWConnectionServer {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_redis_last_update_timestamp(&self) -> i64 {
|
||||||
|
match self
|
||||||
|
.cgw_remote_discovery
|
||||||
|
.get_redis_last_update_timestamp()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(timestamp) => timestamp,
|
||||||
|
Err(e) => {
|
||||||
|
error!("{e}");
|
||||||
|
0i64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_internal_nb_api_mbox(
|
async fn process_internal_nb_api_mbox(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut rx_mbox: CGWConnectionServerNBAPIMboxRx,
|
mut rx_mbox: CGWConnectionServerNBAPIMboxRx,
|
||||||
@@ -643,17 +657,7 @@ impl CGWConnectionServer {
|
|||||||
// This only means that original capacity of all buffers is allocated to <100>,
|
// This only means that original capacity of all buffers is allocated to <100>,
|
||||||
// it can still increase on demand or need automatically (upon insert, push_back etc)
|
// it can still increase on demand or need automatically (upon insert, push_back etc)
|
||||||
let cgw_buf_prealloc_size = 100;
|
let cgw_buf_prealloc_size = 100;
|
||||||
let mut last_update_timestamp: i64 = match self
|
let mut last_update_timestamp: i64 = self.get_redis_last_update_timestamp().await;
|
||||||
.cgw_remote_discovery
|
|
||||||
.get_redis_last_update_timestamp()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(timestamp) => timestamp,
|
|
||||||
Err(e) => {
|
|
||||||
error!("{e}");
|
|
||||||
0i64
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut partition_array_idx: usize = 0;
|
let mut partition_array_idx: usize = 0;
|
||||||
let mut local_shard_partition_key: Option<String>;
|
let mut local_shard_partition_key: Option<String>;
|
||||||
@@ -686,19 +690,9 @@ impl CGWConnectionServer {
|
|||||||
num_of_msg_read += rd_num;
|
num_of_msg_read += rd_num;
|
||||||
|
|
||||||
if rd_num == 0 {
|
if rd_num == 0 {
|
||||||
let curretn_timestamp: i64 = match self
|
let current_timestamp: i64 = self.get_redis_last_update_timestamp().await;
|
||||||
.cgw_remote_discovery
|
|
||||||
.get_redis_last_update_timestamp()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(timestamp) => timestamp,
|
|
||||||
Err(e) => {
|
|
||||||
error!("{e}");
|
|
||||||
0i64
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if last_update_timestamp != curretn_timestamp {
|
if last_update_timestamp != current_timestamp {
|
||||||
if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await {
|
if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await {
|
||||||
error!("process_internal_nb_api_mbox: failed to sync GID to CGW map! Error: {e}");
|
error!("process_internal_nb_api_mbox: failed to sync GID to CGW map! Error: {e}");
|
||||||
}
|
}
|
||||||
@@ -715,7 +709,7 @@ impl CGWConnectionServer {
|
|||||||
error!("Failed to sync Device cache! Error: {e}");
|
error!("Failed to sync Device cache! Error: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
last_update_timestamp = curretn_timestamp;
|
last_update_timestamp = current_timestamp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -824,6 +818,16 @@ impl CGWConnectionServer {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_dst_cgw_id) => {
|
Ok(_dst_cgw_id) => {
|
||||||
|
// We successfully updated both SQL and REDIS
|
||||||
|
// cache. In order to keep it in sync with local
|
||||||
|
// one, we have to make sure we <save> latest
|
||||||
|
// update timestamp locally, to prevent CGW
|
||||||
|
// from trying to update it in next iteration
|
||||||
|
// of the main loop, while this very own
|
||||||
|
// local shard _is_ responsible for timestamp
|
||||||
|
// update.
|
||||||
|
last_update_timestamp = self.get_redis_last_update_timestamp().await;
|
||||||
|
|
||||||
if let Ok(resp) = cgw_construct_infra_group_create_response(
|
if let Ok(resp) = cgw_construct_infra_group_create_response(
|
||||||
gid,
|
gid,
|
||||||
self.local_cgw_id,
|
self.local_cgw_id,
|
||||||
@@ -873,6 +877,16 @@ impl CGWConnectionServer {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(_dst_cgw_id) => {
|
Ok(_dst_cgw_id) => {
|
||||||
|
// We successfully updated both SQL and REDIS
|
||||||
|
// cache. In order to keep it in sync with local
|
||||||
|
// one, we have to make sure we <save> latest
|
||||||
|
// update timestamp locally, to prevent CGW
|
||||||
|
// from trying to update it in next iteration
|
||||||
|
// of the main loop, while this very own
|
||||||
|
// local shard _is_ responsible for timestamp
|
||||||
|
// update.
|
||||||
|
last_update_timestamp = self.get_redis_last_update_timestamp().await;
|
||||||
|
|
||||||
if let Ok(resp) = cgw_construct_infra_group_create_response(
|
if let Ok(resp) = cgw_construct_infra_group_create_response(
|
||||||
gid,
|
gid,
|
||||||
self.local_cgw_id,
|
self.local_cgw_id,
|
||||||
@@ -921,6 +935,16 @@ impl CGWConnectionServer {
|
|||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
|
// We successfully updated both SQL and REDIS
|
||||||
|
// cache. In order to keep it in sync with local
|
||||||
|
// one, we have to make sure we <save> latest
|
||||||
|
// update timestamp locally, to prevent CGW
|
||||||
|
// from trying to update it in next iteration
|
||||||
|
// of the main loop, while this very own
|
||||||
|
// local shard _is_ responsible for timestamp
|
||||||
|
// update.
|
||||||
|
last_update_timestamp = self.get_redis_last_update_timestamp().await;
|
||||||
|
|
||||||
// We try to help free topomap memory usage
|
// We try to help free topomap memory usage
|
||||||
// by notifying it whenever GID get's destroyed.
|
// by notifying it whenever GID get's destroyed.
|
||||||
// Howover, for allocation we let topo map
|
// Howover, for allocation we let topo map
|
||||||
@@ -1125,7 +1149,8 @@ impl CGWConnectionServer {
|
|||||||
CGWNBApiParsedMsg {
|
CGWNBApiParsedMsg {
|
||||||
uuid,
|
uuid,
|
||||||
gid,
|
gid,
|
||||||
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(infras_list),
|
msg_type:
|
||||||
|
CGWNBApiParsedMsgType::InfrastructureGroupInfrasAdd(infras_list),
|
||||||
} => {
|
} => {
|
||||||
if (self
|
if (self
|
||||||
.cgw_remote_discovery
|
.cgw_remote_discovery
|
||||||
@@ -1283,7 +1308,8 @@ impl CGWConnectionServer {
|
|||||||
CGWNBApiParsedMsg {
|
CGWNBApiParsedMsg {
|
||||||
uuid,
|
uuid,
|
||||||
gid,
|
gid,
|
||||||
msg_type: CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(infras_list),
|
msg_type:
|
||||||
|
CGWNBApiParsedMsgType::InfrastructureGroupInfrasDel(infras_list),
|
||||||
} => {
|
} => {
|
||||||
if (self
|
if (self
|
||||||
.cgw_remote_discovery
|
.cgw_remote_discovery
|
||||||
|
|||||||
Reference in New Issue
Block a user