Merge pull request #7 from Telecominfraproject/fix/invalid_mac_report_fix

Fix invalid device MAC reporting upon connect
This commit is contained in:
Olexandr, Mazur
2024-04-04 11:02:49 +03:00
committed by GitHub
2 changed files with 66 additions and 107 deletions

View File

@@ -1,12 +1,14 @@
use crate::cgw_connection_server::{CGWConnectionServer, CGWConnectionServerReqMsg};
use eui48::{MacAddress, Eui48};
use futures_util::{
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, StreamExt,
};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, str::FromStr};
use tokio::{
net::TcpStream,
sync::mpsc::{unbounded_channel, UnboundedReceiver},
@@ -37,19 +39,14 @@ enum CGWConnectionState {
ClosedGracefully,
}
#[derive(Deserialize, Serialize, Debug, Default)]
struct CGWEventLogParams {
#[derive(Debug, Default)]
struct CGWEventLog {
serial: String,
log: String,
severity: i64,
}
#[derive(Deserialize, Serialize, Debug, Default)]
struct CGWEventLog {
params: CGWEventLogParams,
}
#[derive(Deserialize, Serialize, Debug, Default)]
#[derive(Debug, Default)]
struct CGWEventConnectParamsCaps {
compatible: String,
model: String,
@@ -57,167 +54,122 @@ struct CGWEventConnectParamsCaps {
label_macaddr: String,
}
#[derive(Deserialize, Serialize, Debug, Default)]
struct CGWEventConnectParams {
#[derive(Debug, Default)]
struct CGWEventConnect {
serial: String,
firmware: String,
uuid: u64,
capabilities: CGWEventConnectParamsCaps,
}
#[derive(Deserialize, Serialize, Debug, Default)]
struct CGWEventConnect {
params: CGWEventConnectParams,
}
#[derive(Deserialize, Serialize, Debug)]
enum CGWEvent {
#[derive(Debug)]
enum CGWEventType {
Connect(CGWEventConnect),
Log(CGWEventLog),
Empty,
}
fn cgw_parse_jrpc_event(map: &Map<String, Value>, method: String) -> CGWEvent {
#[derive(Debug)]
struct CGWEvent {
serial: String,
evt_type: CGWEventType,
}
fn cgw_parse_jrpc_event(map: &Map<String, Value>, method: &str) -> CGWEvent {
if method == "log" {
let params = map.get("params").expect("Params are missing");
return CGWEvent::Log(CGWEventLog {
params: CGWEventLogParams {
serial: params["serial"].to_string(),
let mac_serial = MacAddress::from_str(params["serial"].as_str().unwrap()).unwrap();
return CGWEvent {
serial: mac_serial.to_hex_string().to_uppercase(),
evt_type: CGWEventType::Log(CGWEventLog {
serial: mac_serial.to_hex_string().to_uppercase(),
log: params["log"].to_string(),
severity: serde_json::from_value(params["severity"].clone()).unwrap(),
},
});
})
};
} else if method == "connect" {
let params = map.get("params").expect("Params are missing");
return CGWEvent::Connect(CGWEventConnect {
params: CGWEventConnectParams {
serial: params["serial"].to_string(),
let mac_serial = MacAddress::from_str(params["serial"].as_str().unwrap()).unwrap();
let label = MacAddress::from_str(params["capabilities"]["label_macaddr"].as_str().unwrap()).unwrap();
return CGWEvent {
serial: mac_serial.to_hex_string().to_uppercase(),
evt_type: CGWEventType::Connect(CGWEventConnect {
serial: mac_serial.to_hex_string().to_uppercase(),
firmware: params["firmware"].to_string(),
uuid: 1,
capabilities: CGWEventConnectParamsCaps {
compatible: params["capabilities"]["compatible"].to_string(),
model: params["capabilities"]["model"].to_string(),
platform: params["capabilities"]["platform"].to_string(),
label_macaddr: params["capabilities"]["label_macaddr"].to_string(),
label_macaddr: label.to_hex_string().to_uppercase(),
},
},
});
}),
};
}
CGWEvent::Empty
}
async fn cgw_process_jrpc_event(event: &CGWEvent) -> Result<(), String> {
// TODO
if let CGWEvent::Connect(_c) = event {
/*
info!(
"Requesting {} to reboot (immediate request)",
c.params.serial
);
let req = json!({
"jsonrpc": "2.0",
"method": "reboot",
"params": {
"serial": c.params.serial,
"when": 0
},
"id": 1
});
info!("Received connect msg {}", c.params.serial);
sender.send(Message::text(req.to_string())).await.ok();
*/
CGWEvent {
serial: String::from(""),
evt_type: CGWEventType::Empty,
}
Ok(())
}
// TODO: heavy rework to enum-variant struct-based
async fn cgw_process_jrpc_message(message: Message) -> Result<CGWUcentralJRPCMessage, String> {
//let rpcmsg: CGWMethodConnect = CGWMethodConnect::default();
//serde_json::from_str(method).unwrap();
//
async fn cgw_parse_jrpc_message(message: Message) -> Result<CGWEvent, &'static str> {
let msg = if let Ok(s) = message.into_text() {
s
} else {
return Err("Message to string cast failed".to_string());
return Err("Message to string cast failed");
};
let map: CGWUcentralJRPCMessage = match serde_json::from_str(&msg) {
Ok(m) => m,
Err(e) => {
error!("Failed to parse input json {e}");
return Err("Failed to parse input json".to_string());
return Err("Failed to parse input json");
}
};
//.expect("Failed to parse input json");
if !map.contains_key("jsonrpc") {
warn!("Received malformed JSONRPC msg");
return Err("JSONRPC field is missing in message".to_string());
return Err("JSONRPC field is missing in message");
}
if map.contains_key("method") {
if !map.contains_key("params") {
warn!("Received JRPC <method> without params.");
return Err("Received JRPC <method> without params".to_string());
return Err("Received JRPC <method> without params");
}
// unwrap can panic
let method = map["method"].as_str().unwrap();
let event: CGWEvent = cgw_parse_jrpc_event(&map, method.to_string());
let event: CGWEvent = cgw_parse_jrpc_event(&map, method);
match &event {
CGWEvent::Log(l) => {
match &event.evt_type {
CGWEventType::Log(l) => {
debug!(
"Received LOG evt from device {}: {}",
l.params.serial, l.params.log
l.serial, l.log
);
}
CGWEvent::Connect(c) => {
CGWEventType::Connect(c) => {
debug!(
"Received connect evt from device {}: type {}, fw {}",
c.params.serial, c.params.capabilities.platform, c.params.firmware
c.serial, c.capabilities.platform, c.firmware
);
}
_ => {
warn!("received not yet implemented method {}", method);
return Err(format!("received not yet implemented method {}", method));
return Err("received not yet implemented method");
}
};
if let Err(e) = cgw_process_jrpc_event(&event).await {
warn!(
"Failed to process jrpc event (unmatched) {}",
method.to_string()
);
return Err(e);
}
// TODO
return Ok(event);
} else if map.contains_key("result") {
info!("Processing <result> JSONRPC msg");
info!("{:?}", map);
return Err("Result handling is not yet implemented".to_string());
return Err("Result handling is not yet implemented");
}
/*
match map.get_mut("jsonrpc") {
Some(value) => info!("Got value {:?}", value),
None => info!("Got no value"),
}
*/
/*
if let CGWMethod::Connect { ref someint, .. } = &rpcmsg {
info!("secondmatch {}", *someint);
return Some(rpcmsg);
} else {
return None;
}
*/
//return Some(CGWMethodConnect::default());
Ok(map)
Err("Failed to parse event/method")
}
pub struct CGWConnectionProcessor {
@@ -279,8 +231,8 @@ impl CGWConnectionProcessor {
}
};
let map = match cgw_process_jrpc_message(message).await {
Ok(val) => val,
let evt = match cgw_parse_jrpc_message(message).await {
Ok(e) => e,
Err(_e) => {
error!(
"failed to recv connect message from {}, closing connection",
@@ -290,8 +242,12 @@ impl CGWConnectionProcessor {
}
};
let serial = map["params"]["serial"].as_str().unwrap();
self.serial = Some(serial.to_string());
match evt.evt_type {
CGWEventType::Connect(c) => (),
_ => warn!("Device {} is not abiding the protocol: first message - CONNECT - expected", evt.serial),
}
self.serial = Some(evt.serial.clone());
// TODO: we accepted tls stream and split the WS into RX TX part,
// now we have to ASK cgw_connection_server's permission whether
@@ -299,7 +255,7 @@ impl CGWConnectionProcessor {
// cgw_connection_server has an authorative decision whether
// we can proceed.
let (mbox_tx, mut mbox_rx) = unbounded_channel::<CGWConnectionProcessorReqMsg>();
let msg = CGWConnectionServerReqMsg::AddNewConnection(serial.to_string(), mbox_tx);
let msg = CGWConnectionServerReqMsg::AddNewConnection(evt.serial.clone(), mbox_tx);
self.cgw_server
.enqueue_mbox_message_to_cgw_server(msg)
.await;
@@ -308,13 +264,13 @@ impl CGWConnectionProcessor {
if let Some(m) = ack {
match m {
CGWConnectionProcessorReqMsg::AddNewConnectionAck => {
debug!("websocket connection established: {} {}", self.addr, serial);
debug!("websocket connection established: {} {}", self.addr, evt.serial);
}
_ => panic!("Unexpected response from server, expected ACK/NOT ACK)"),
}
} else {
info!("connection server declined connection, websocket connection {} {} cannot be established",
self.addr, serial);
self.addr, evt.serial);
return;
}
@@ -381,6 +337,7 @@ impl CGWConnectionProcessor {
last_contact: Instant,
) -> Result<CGWConnectionState, &str> {
// TODO: configurable duration (upon server creation)
/*
if Instant::now().duration_since(last_contact) > Duration::from_secs(70) {
warn!(
"Closing connection {} (idle for too long, stale)",
@@ -390,6 +347,8 @@ impl CGWConnectionProcessor {
} else {
Ok(CGWConnectionState::IsActive)
}
*/
Ok(CGWConnectionState::IsActive)
}
async fn process_connection(

View File

@@ -366,7 +366,7 @@ impl CGWConnectionServer {
}
"infrastructure_group_device_message" => {
let json_msg: InfraGroupMsgJSON = serde_json::from_str(&pload).unwrap();
//debug!("{:?}", json_msg);
debug!("{:?}", json_msg);
return Some(CGWNBApiParsedMsg::InfrastructureGroupInfraMsg(
json_msg.uuid,
group_id,