diff --git a/libs/controller/controller_2x/controller.py b/libs/controller/controller_2x/controller.py index 3f606d9eb..ec48d78bb 100644 --- a/libs/controller/controller_2x/controller.py +++ b/libs/controller/controller_2x/controller.py @@ -297,6 +297,75 @@ class Controller(ConfigureController): self.check_response("POST", resp, self.make_headers(), payload, uri) return resp + def ping_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/ping") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def led_blink_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/leds") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def trace_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/trace") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def wifi_scan_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/wifiscan") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def request_specific_msg_from_device(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/request") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def event_queue(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/eventqueue") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp + + def telemetry(self, serial_number, payload): + uri = self.build_uri("device/" + serial_number + "/telemetry") + print(uri) + print(payload) + payload = json.dumps(payload) + resp = requests.post(uri, data=payload, headers=self.make_headers(), verify=False, timeout=100) + print(resp) + self.check_response("POST", resp, self.make_headers(), payload, uri) + return resp class FMSUtils: diff --git a/tests/controller_tests/ucentral_gateway/test_gatewayservice.py b/tests/controller_tests/ucentral_gateway/test_gatewayservice.py index c6cbbc9a1..e092f78c4 100644 --- a/tests/controller_tests/ucentral_gateway/test_gatewayservice.py +++ b/tests/controller_tests/ucentral_gateway/test_gatewayservice.py @@ -239,3 +239,127 @@ class TestUcentralGatewayService(object): print(resp.json()) allure.attach(name="Device status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + @pytest.mark.gw_ping_device + def test_gw_service_ping_device(self, setup_controller, get_configuration): + """ + Test to Ping device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name + } + print(json.dumps(payload)) + resp = setup_controller.ping_device(device_name, payload) + print(resp.json()) + allure.attach(name="Device Ping status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_led_blink_device + def test_gw_service_led_blink_device(self, setup_controller, get_configuration): + """ + Test to Blink led on device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "when": 0, + "duration": 1, + "pattern": "on" + } + print(json.dumps(payload)) + resp = setup_controller.led_blink_device(device_name, payload) + print(resp.json()) + allure.attach(name="Device Blink led status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_trace_device + def test_gw_service_trace_device(self, setup_controller, get_configuration): + """ + Test to trace device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "when": 0, + "duration": 1, + "numberOfPackets": 0, + "network": "string", + "interface": "string" + } + print(json.dumps(payload)) + resp = setup_controller.trace_device(device_name, payload) + print(resp.json()) + allure.attach(name="Device trace status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_wifi_scan_device + def test_gw_service_wifi_scan_device(self, setup_controller, get_configuration): + """ + Test to Wifi scan device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "verbose": True, + "activeScan": True, + "selector": { + "bands": [ + "2" + ] + } + } + print(json.dumps(payload)) + resp = setup_controller.wifi_scan_device(device_name, payload) + print(resp.json()) + allure.attach(name="Device Wifi scan status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_request_msg_device + def test_gw_service_request_msg_device(self, setup_controller, get_configuration): + """ + Test to Request specific msg from device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "when": 0, + "message": "state" + } + print(json.dumps(payload)) + resp = setup_controller.request_specific_msg_from_device(device_name, payload) + print(resp.json()) + allure.attach(name="Device Request specific msg status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_event_queue_device + def test_gw_service_event_queue_device(self, setup_controller, get_configuration): + """ + Test to Request Event Queue from device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "types": [ + "dhcp" + ] + } + print(json.dumps(payload)) + resp = setup_controller.event_queue(device_name, payload) + print(resp.json()) + allure.attach(name="Device Request Event Queue status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON) + + @pytest.mark.gw_telemetry_device + def test_gw_service_telemetry_device(self, setup_controller, get_configuration): + """ + Test to Request telemetry from device present in Gateway UI + """ + device_name = get_configuration['access_point'][0]['serial'] + payload = { + "serialNumber": device_name, + "interval": 0, + "lifetime": 0, + "kafka": False, + "types": [ + "dhcp-snooping" + ], + "uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6" + } + print(json.dumps(payload)) + resp = setup_controller.telemetry(device_name, payload) + print(resp.json()) + allure.attach(name="Device telemetry status", body=str(resp.json()), attachment_type=allure.attachment_type.JSON)