Merge pull request #103 from Telecominfraproject/feat/test_initial_infra_message_impl

Initial infra message impl
This commit is contained in:
Sviatoslav Boichuk
2024-12-06 13:03:03 +02:00
committed by GitHub
8 changed files with 233 additions and 26 deletions

View File

@@ -156,9 +156,9 @@ echo "CGW UCENTRAL AP DATAMODEL URI : $CGW_UCENTRAL_AP_DATAMODEL_URI"
echo "CGW UCENTRAL SWITCH DATAMODEL URI : $CGW_UCENTRAL_SWITCH_DATAMODEL_URI"
docker run \
-p 15002:15002 \
-p 50051:50051 \
-p 8080:8080 \
-p $CGW_WSS_PORT:$CGW_WSS_PORT \
-p $CGW_GRPC_PUBLIC_PORT:$CGW_GRPC_PUBLIC_PORT \
-p $CGW_METRICS_PORT:$CGW_METRICS_PORT \
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
-v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \
-v $CGW_NB_INFRA_CERTS_PATH:$CONTAINTER_NB_INFRA_CERTS_VOLUME \

View File

@@ -894,21 +894,23 @@ pub fn cgw_ucentral_ap_parse_message(
}
}
} else if map.contains_key("result") {
if !map.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
if let Value::Object(result) = &map["result"] {
if !result.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
}
let id = result["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};
return Ok(reply_event);
}
let id = map["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};
return Ok(reply_event);
}
Err(Error::UCentralParser("Failed to parse event/method"))

View File

@@ -8,7 +8,7 @@ use crate::cgw_ucentral_parser::{
CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateClients,
CGWUCentralEventStateClientsData, CGWUCentralEventStateClientsType,
CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventStatePort,
CGWUCentralEventType, CGWUCentralJRPCMessage,
CGWUCentralEventType, CGWUCentralJRPCMessage, CGWUCentralEventReply
};
fn parse_lldp_data(
@@ -253,11 +253,25 @@ pub fn cgw_ucentral_switch_parse_message(
}
}
} else if map.contains_key("result") {
info!("Processing <result> JSONRPC msg");
info!("{:?}", map);
return Err(Error::UCentralParser(
"Result handling is not yet implemented",
));
// For now, let's mimic AP's basic reply / result
// format.
if let Value::Object(result) = &map["result"] {
if !result.contains_key("id") {
warn!("Received JRPC <result> without id!");
return Err(Error::UCentralParser("Received JRPC <result> without id"));
}
let id = result["id"]
.as_u64()
.ok_or_else(|| Error::UCentralParser("Failed to parse id"))?;
let reply_event = CGWUCentralEvent {
serial: Default::default(),
evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }),
decompressed: None,
};
return Ok(reply_event);
}
}
Err(Error::UCentralParser("Failed to parse event/method"))

View File

@@ -45,6 +45,7 @@ class TestContext:
# However, we're making a fixture, hence all values must be the same
# on the initial step.
connect_msg = json.loads(device.messages.connect)
connect_msg['params']['capabilities']['platform'] = "ap"
connect_msg['params']['firmware'] = "Test_FW_A"
connect_msg['params']['uuid'] = 1
device.messages.connect = json.dumps(connect_msg)

View File

@@ -87,6 +87,15 @@ class Device:
def send_leave(self, socket: client.ClientConnection):
socket.send(self.messages.leave)
def get_single_message(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)
return self.messages.from_json(msg)
except TimeoutError:
return None
except:
raise
def handle_messages(self, socket: client.ClientConnection):
try:
msg = socket.recv(self.interval)

View File

@@ -86,6 +86,27 @@ class Consumer:
return res_list
def get_infra_request_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))
assert self.is_connected(),\
f"consumer: Cannot get Kafka result msg, Not connected!"
while True:
# We explicitly use get_single_msg instead of <get_msgs>
# to make sure we return as soon as we find result,
# without waiting for potential T/O
message = self.get_single_msg(timeout_ms=timeout_ms)
if message is None:
break
logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid'] and message.value['type'] == 'infra_request_result':
return message
return None
def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000):
res_uuid = str(uuid.UUID(int=uuid_val))
@@ -102,8 +123,9 @@ class Consumer:
logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
(message.topic, message.key, message.value, message.timestamp))
if res_uuid == message.value['uuid']:
return message
if 'uuid' in message.value.keys():
if res_uuid == message.value['uuid']:
return message
return None
def get_single_msg(self, timeout_ms: int = 12000):

View File

@@ -1,4 +1,4 @@
from .utils import Message, MacRange
from .utils import Message, MacRange, UCentralConfigRequest
from .log import logger
from typing import List, Tuple
@@ -7,14 +7,122 @@ import time
import uuid
import sys
import random
import json
class Producer:
@staticmethod
def device_message_reboot(mac: str, id: int = None):
msg = {}
params = {}
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
params["serial"] = mac
params["when"] = 0
msg["jsonrpc"] = "2.0"
msg["method"] = "reboot"
msg["params"] = params
msg["id"] = id
return msg
@staticmethod
def device_message_factory(mac: str, id: int = None, keep_rediretor: bool = None):
msg = {}
params = {}
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
if keep_rediretor is None:
keep_rediretor = True
params["serial"] = mac
params["when"] = 0
params["keep_rediretor"] = keep_rediretor
msg["jsonrpc"] = "2.0"
msg["method"] = "factory"
msg["params"] = params
msg["id"] = id
return msg
@staticmethod
def device_message_ping(mac: str, id: int = None):
msg = {}
params = {}
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
params["serial"] = mac
msg["jsonrpc"] = "2.0"
msg["method"] = "ping"
msg["params"] = params
msg["id"] = id
return msg
def device_message_config_ap_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
msg = self.ucentral_configs.get_ap_basic_cfg(mac, id);
return json.loads(msg)
def device_message_config_ap_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
msg = self.ucentral_configs.get_ap_basic_invalid_cfg(mac, id);
return json.loads(msg)
def device_message_config_switch_basic(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
msg = self.ucentral_configs.get_switch_basic_cfg(mac, id);
return json.loads(msg)
def device_message_config_switch_basic_invalid(self, mac: str, id: int = None) -> str:
if mac is None:
raise Exception('Cannot format message without MAC specified')
if id is None:
id = 1
msg = self.ucentral_configs.get_switch_basic_invalid_cfg(mac, id);
return json.loads(msg)
def __init__(self, db: str, topic: str) -> None:
self.db = db
self.conn = None
self.topic = topic
self.message = Message()
self.ucentral_configs = UCentralConfigRequest()
def __enter__(self) -> kafka.KafkaProducer:
return self.connect()
@@ -134,6 +242,11 @@ class Producer:
bytes(group, encoding="utf-8"))
conn.flush()
def handle_single_device_message(self, message: dict, group: str, mac: str, uuid_val: int) -> None:
self.conn.send(self.topic, self.message.to_device(group, mac, message, 0, uuid_val),
bytes(group, encoding="utf-8"))
self.conn.flush()
def handle_device_messages(self, message: dict, group: str, mac_range: MacRange,
count: int, time_s: int, interval_s: int) -> None:
if not time_s:

View File

@@ -79,6 +79,52 @@ class MacRange:
return self.mac2num(base), int(count)
return self.mac2num(input), 1
class UCentralConfigRequest:
TEMPLATE_FILE_AP_BASIC = "./kafka_data/cfg_ap_basic.json"
TEMPLATE_FILE_AP_BASIC_INVALID = "./kafka_data/cfg_ap_basic_invalid.json"
TEMPLATE_FILE_SWITCH_BASIC = "./kafka_data/cfg_switch_basic.json"
TEMPLATE_FILE_SWITCH_BASIC_INVALID = "./kafka_data/cfg_switch_basic_invalid.json"
@staticmethod
def parse_uuid(uuid_val = None) -> str:
if uuid_val is None:
return str(1)
return str(uuid_val)
def __init__(self) -> None:
with open(self.TEMPLATE_FILE_AP_BASIC) as f:
self.ap_basic = f.read()
with open(self.TEMPLATE_FILE_AP_BASIC_INVALID) as f:
self.ap_basic_invalid = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC) as f:
self.switch_basic = f.read()
with open(self.TEMPLATE_FILE_SWITCH_BASIC_INVALID) as f:
self.switch_basic_invalid = f.read()
def get_ap_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req
def get_ap_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.ap_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req
def get_switch_basic_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req
def get_switch_basic_invalid_cfg(self, mac: str, uuid_val = None):
req = copy.deepcopy(self.switch_basic_invalid);
req = req.replace("MAC_PLACEHOLDER", mac)
req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val))
return req
class Message:
TEMPLATE_FILE = "./kafka_data/message_template.json"