diff --git a/run_cgw.sh b/run_cgw.sh index 290cdf7..d3782a1 100755 --- a/run_cgw.sh +++ b/run_cgw.sh @@ -156,9 +156,9 @@ echo "CGW UCENTRAL AP DATAMODEL URI : $CGW_UCENTRAL_AP_DATAMODEL_URI" echo "CGW UCENTRAL SWITCH DATAMODEL URI : $CGW_UCENTRAL_SWITCH_DATAMODEL_URI" docker run \ - -p 15002:15002 \ - -p 50051:50051 \ - -p 8080:8080 \ + -p $CGW_WSS_PORT:$CGW_WSS_PORT \ + -p $CGW_GRPC_PUBLIC_PORT:$CGW_GRPC_PUBLIC_PORT \ + -p $CGW_METRICS_PORT:$CGW_METRICS_PORT \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ -v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \ -v $CGW_NB_INFRA_CERTS_PATH:$CONTAINTER_NB_INFRA_CERTS_VOLUME \ diff --git a/src/cgw_ucentral_ap_parser.rs b/src/cgw_ucentral_ap_parser.rs index 62534e9..cf8d4fd 100644 --- a/src/cgw_ucentral_ap_parser.rs +++ b/src/cgw_ucentral_ap_parser.rs @@ -894,21 +894,23 @@ pub fn cgw_ucentral_ap_parse_message( } } } else if map.contains_key("result") { - if !map.contains_key("id") { - warn!("Received JRPC without id!"); - return Err(Error::UCentralParser("Received JRPC without id")); + if let Value::Object(result) = &map["result"] { + if !result.contains_key("id") { + warn!("Received JRPC without id!"); + return Err(Error::UCentralParser("Received JRPC without id")); + } + + let id = result["id"] + .as_u64() + .ok_or_else(|| Error::UCentralParser("Failed to parse id"))?; + let reply_event = CGWUCentralEvent { + serial: Default::default(), + evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }), + decompressed: None, + }; + + return Ok(reply_event); } - - let id = map["id"] - .as_u64() - .ok_or_else(|| Error::UCentralParser("Failed to parse id"))?; - let reply_event = CGWUCentralEvent { - serial: Default::default(), - evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }), - decompressed: None, - }; - - return Ok(reply_event); } Err(Error::UCentralParser("Failed to parse event/method")) diff --git a/src/cgw_ucentral_switch_parser.rs b/src/cgw_ucentral_switch_parser.rs index f49748f..ece37cf 100644 --- a/src/cgw_ucentral_switch_parser.rs +++ b/src/cgw_ucentral_switch_parser.rs @@ -8,7 +8,7 @@ use crate::cgw_ucentral_parser::{ CGWUCentralEvent, CGWUCentralEventLog, CGWUCentralEventState, CGWUCentralEventStateClients, CGWUCentralEventStateClientsData, CGWUCentralEventStateClientsType, CGWUCentralEventStateLLDPData, CGWUCentralEventStateLinks, CGWUCentralEventStatePort, - CGWUCentralEventType, CGWUCentralJRPCMessage, + CGWUCentralEventType, CGWUCentralJRPCMessage, CGWUCentralEventReply }; fn parse_lldp_data( @@ -253,11 +253,25 @@ pub fn cgw_ucentral_switch_parse_message( } } } else if map.contains_key("result") { - info!("Processing JSONRPC msg"); - info!("{:?}", map); - return Err(Error::UCentralParser( - "Result handling is not yet implemented", - )); + // For now, let's mimic AP's basic reply / result + // format. + if let Value::Object(result) = &map["result"] { + if !result.contains_key("id") { + warn!("Received JRPC without id!"); + return Err(Error::UCentralParser("Received JRPC without id")); + } + + let id = result["id"] + .as_u64() + .ok_or_else(|| Error::UCentralParser("Failed to parse id"))?; + let reply_event = CGWUCentralEvent { + serial: Default::default(), + evt_type: CGWUCentralEventType::Reply(CGWUCentralEventReply { id }), + decompressed: None, + }; + + return Ok(reply_event); + } } Err(Error::UCentralParser("Failed to parse event/method")) diff --git a/tests/conftest.py b/tests/conftest.py index 64f1420..10c368c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,6 +45,7 @@ class TestContext: # However, we're making a fixture, hence all values must be the same # on the initial step. connect_msg = json.loads(device.messages.connect) + connect_msg['params']['capabilities']['platform'] = "ap" connect_msg['params']['firmware'] = "Test_FW_A" connect_msg['params']['uuid'] = 1 device.messages.connect = json.dumps(connect_msg) diff --git a/utils/client_simulator/src/simulation_runner.py b/utils/client_simulator/src/simulation_runner.py index 87081a4..dbcac5e 100644 --- a/utils/client_simulator/src/simulation_runner.py +++ b/utils/client_simulator/src/simulation_runner.py @@ -87,6 +87,15 @@ class Device: def send_leave(self, socket: client.ClientConnection): socket.send(self.messages.leave) + def get_single_message(self, socket: client.ClientConnection): + try: + msg = socket.recv(self.interval) + return self.messages.from_json(msg) + except TimeoutError: + return None + except: + raise + def handle_messages(self, socket: client.ClientConnection): try: msg = socket.recv(self.interval) diff --git a/utils/kafka_producer/src/consumer.py b/utils/kafka_producer/src/consumer.py index 0dbe570..0993ec4 100644 --- a/utils/kafka_producer/src/consumer.py +++ b/utils/kafka_producer/src/consumer.py @@ -86,6 +86,27 @@ class Consumer: return res_list + def get_infra_request_result_msg(self, uuid_val: int, timeout_ms: int = 12000): + res_uuid = str(uuid.UUID(int=uuid_val)) + + assert self.is_connected(),\ + f"consumer: Cannot get Kafka result msg, Not connected!" + + while True: + # We explicitly use get_single_msg instead of + # to make sure we return as soon as we find result, + # without waiting for potential T/O + message = self.get_single_msg(timeout_ms=timeout_ms) + if message is None: + break + + logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" % + (message.topic, message.key, message.value, message.timestamp)) + if 'uuid' in message.value.keys(): + if res_uuid == message.value['uuid'] and message.value['type'] == 'infra_request_result': + return message + return None + def get_result_msg(self, uuid_val: int, timeout_ms: int = 12000): res_uuid = str(uuid.UUID(int=uuid_val)) @@ -102,8 +123,9 @@ class Consumer: logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" % (message.topic, message.key, message.value, message.timestamp)) - if res_uuid == message.value['uuid']: - return message + if 'uuid' in message.value.keys(): + if res_uuid == message.value['uuid']: + return message return None def get_single_msg(self, timeout_ms: int = 12000): diff --git a/utils/kafka_producer/src/producer.py b/utils/kafka_producer/src/producer.py index 5b3abad..161f3ce 100644 --- a/utils/kafka_producer/src/producer.py +++ b/utils/kafka_producer/src/producer.py @@ -1,4 +1,4 @@ -from .utils import Message, MacRange +from .utils import Message, MacRange, UCentralConfigRequest from .log import logger from typing import List, Tuple @@ -7,14 +7,122 @@ import time import uuid import sys import random +import json class Producer: + @staticmethod + def device_message_reboot(mac: str, id: int = None): + msg = {} + params = {} + + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + params["serial"] = mac + params["when"] = 0 + + msg["jsonrpc"] = "2.0" + msg["method"] = "reboot" + msg["params"] = params + msg["id"] = id + + return msg + + @staticmethod + def device_message_factory(mac: str, id: int = None, keep_rediretor: bool = None): + msg = {} + params = {} + + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + if keep_rediretor is None: + keep_rediretor = True + + params["serial"] = mac + params["when"] = 0 + params["keep_rediretor"] = keep_rediretor + + msg["jsonrpc"] = "2.0" + msg["method"] = "factory" + msg["params"] = params + msg["id"] = id + + return msg + + @staticmethod + def device_message_ping(mac: str, id: int = None): + msg = {} + params = {} + + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + params["serial"] = mac + + msg["jsonrpc"] = "2.0" + msg["method"] = "ping" + msg["params"] = params + msg["id"] = id + + return msg + + def device_message_config_ap_basic(self, mac: str, id: int = None) -> str: + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + msg = self.ucentral_configs.get_ap_basic_cfg(mac, id); + return json.loads(msg) + + def device_message_config_ap_basic_invalid(self, mac: str, id: int = None) -> str: + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + msg = self.ucentral_configs.get_ap_basic_invalid_cfg(mac, id); + return json.loads(msg) + + def device_message_config_switch_basic(self, mac: str, id: int = None) -> str: + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + msg = self.ucentral_configs.get_switch_basic_cfg(mac, id); + return json.loads(msg) + + def device_message_config_switch_basic_invalid(self, mac: str, id: int = None) -> str: + if mac is None: + raise Exception('Cannot format message without MAC specified') + + if id is None: + id = 1 + + msg = self.ucentral_configs.get_switch_basic_invalid_cfg(mac, id); + return json.loads(msg) + def __init__(self, db: str, topic: str) -> None: self.db = db self.conn = None self.topic = topic self.message = Message() + self.ucentral_configs = UCentralConfigRequest() def __enter__(self) -> kafka.KafkaProducer: return self.connect() @@ -134,6 +242,11 @@ class Producer: bytes(group, encoding="utf-8")) conn.flush() + def handle_single_device_message(self, message: dict, group: str, mac: str, uuid_val: int) -> None: + self.conn.send(self.topic, self.message.to_device(group, mac, message, 0, uuid_val), + bytes(group, encoding="utf-8")) + self.conn.flush() + def handle_device_messages(self, message: dict, group: str, mac_range: MacRange, count: int, time_s: int, interval_s: int) -> None: if not time_s: diff --git a/utils/kafka_producer/src/utils.py b/utils/kafka_producer/src/utils.py index 3ddcaea..5111a24 100644 --- a/utils/kafka_producer/src/utils.py +++ b/utils/kafka_producer/src/utils.py @@ -79,6 +79,52 @@ class MacRange: return self.mac2num(base), int(count) return self.mac2num(input), 1 +class UCentralConfigRequest: + TEMPLATE_FILE_AP_BASIC = "./kafka_data/cfg_ap_basic.json" + TEMPLATE_FILE_AP_BASIC_INVALID = "./kafka_data/cfg_ap_basic_invalid.json" + TEMPLATE_FILE_SWITCH_BASIC = "./kafka_data/cfg_switch_basic.json" + TEMPLATE_FILE_SWITCH_BASIC_INVALID = "./kafka_data/cfg_switch_basic_invalid.json" + + @staticmethod + def parse_uuid(uuid_val = None) -> str: + if uuid_val is None: + return str(1) + + return str(uuid_val) + + def __init__(self) -> None: + with open(self.TEMPLATE_FILE_AP_BASIC) as f: + self.ap_basic = f.read() + with open(self.TEMPLATE_FILE_AP_BASIC_INVALID) as f: + self.ap_basic_invalid = f.read() + with open(self.TEMPLATE_FILE_SWITCH_BASIC) as f: + self.switch_basic = f.read() + with open(self.TEMPLATE_FILE_SWITCH_BASIC_INVALID) as f: + self.switch_basic_invalid = f.read() + + def get_ap_basic_cfg(self, mac: str, uuid_val = None): + req = copy.deepcopy(self.ap_basic); + req = req.replace("MAC_PLACEHOLDER", mac) + req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val)) + return req + + def get_ap_basic_invalid_cfg(self, mac: str, uuid_val = None): + req = copy.deepcopy(self.ap_basic_invalid); + req = req.replace("MAC_PLACEHOLDER", mac) + req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val)) + return req + + def get_switch_basic_cfg(self, mac: str, uuid_val = None): + req = copy.deepcopy(self.switch_basic); + req = req.replace("MAC_PLACEHOLDER", mac) + req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val)) + return req + + def get_switch_basic_invalid_cfg(self, mac: str, uuid_val = None): + req = copy.deepcopy(self.switch_basic_invalid); + req = req.replace("MAC_PLACEHOLDER", mac) + req = req.replace("UUID_PLACEHOLDER", UCentralConfigRequest.parse_uuid(uuid_val)) + return req class Message: TEMPLATE_FILE = "./kafka_data/message_template.json"