WIFI-3446 Initial testcases

This commit is contained in:
Jaspreet Sachdev
2021-08-12 09:12:59 -04:00
parent 8e968fc738
commit 003cc25a71
6 changed files with 264 additions and 37 deletions

Submodule lanforge/lanforge-scripts updated: 16276e080c...82a756834b

View File

@@ -34,7 +34,7 @@ class ConfigureController:
self.access_token = ""
# self.session = requests.Session()
self.login_resp = self.login()
self.gw_host = self.get_endpoint()
self.gw_host = self.get_gw_endpoint()
def build_uri_sec(self, path):
new_uri = 'https://%s:%d/api/v1/%s' % (self.host.hostname, self.host.port, path)
@@ -47,6 +47,31 @@ class ConfigureController:
print(new_uri)
return new_uri
def request(self, service, command, method, params, payload):
if service == "sec":
uri = self.build_uri_sec(command)
elif service == "gw":
uri = self.build_uri(command)
elif service == "fms":
uri = self.build_url_fms(command)
else:
raise NameError("Invalid service code for request.")
print(uri)
params = params
if method == "GET":
resp = requests.get(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
elif method == "POST":
resp = requests.post(uri, data=payload, verify=False, timeout=100)
elif method == "PUT":
resp = requests.put(uri, data=payload, verify=False, timeout=100)
elif method == "DELETE":
resp = requests.delete(uri, headers=self.make_headers(), params=params, verify=False, timeout=100)
self.check_response(method, resp, self.make_headers(), payload, uri)
print (resp)
return resp
def login(self):
uri = self.build_uri_sec("oauth2")
# self.session.mount(uri, HTTPAdapter(max_retries=15))
@@ -60,17 +85,18 @@ class ConfigureController:
# self.session.headers.update({'Authorization': self.access_token})
return resp
def get_endpoint(self):
def get_gw_endpoint(self):
uri = self.build_uri_sec("systemEndpoints")
print(uri)
resp = requests.get(uri, headers=self.make_headers(), verify=False, timeout=100)
print(resp)
self.check_response("GET", resp, self.make_headers(), "", uri)
devices = resp.json()
print(devices["endpoints"][0]["uri"])
gw_host = urlparse(devices["endpoints"][0]["uri"])
services = resp.json()
print(services["endpoints"][0]["uri"])
gw_host = urlparse(services["endpoints"][0]["uri"])
return gw_host
def logout(self):
uri = self.build_uri_sec('oauth2/%s' % self.access_token)
resp = requests.delete(uri, headers=self.make_headers(), verify=False, timeout=100)

View File

@@ -126,10 +126,10 @@ CONFIGURATION = {
'mode': 'wifi5',
'serial': '001122090801',
'jumphost': True,
'ip': "10.28.3.100",
'ip': "127.0.0.1",
'username': "lanforge",
'password': "pumpkin77",
'port': 22,
'port': 8833,
'jumphost_tty': '/dev/ttyAP3',
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/tplink_ec420/20210728-tplink_ec420-uCentral-trunk-12ad0d5-upgrade.bin"
}

View File

@@ -1,29 +0,0 @@
"""
UCI Rest API Tests: Test Login, Logout API's
"""
import pytest
class TestUCIAUTHDEAUTH(object):
"""
pytest -m "uci_login or uci_logout" --ucentral
"""
@pytest.mark.uci_login
def test_uci_auth(self, setup_controller):
"""
pytest -m "uci_login" --ucentral
"""
print(setup_controller.login_resp)
assert setup_controller.login_resp.status_code == 200
@pytest.mark.uci_logout
def test_uci_deauth(self, setup_controller):
"""
pytest -m "uci_logout" --ucentral
"""
resp = setup_controller.logout()
print(resp)
assert resp.status_code == 200

View File

@@ -0,0 +1,104 @@
"""
UCentral Gateway Services Rest API Tests
"""
import pytest
import json
import allure
@allure.feature("SDK REST API")
class TestUcentralGatewayService(object):
"""
"""
@pytest.mark.sdk_restapi
def test_gwservice_listdevices(self, setup_controller):
"""
"""
resp = setup_controller.request("gw", "devices", "GET", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw list devices", body=body)
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
@pytest.mark.sdk_restapi
def test_gwservice_createdevice(self, setup_controller):
"""
"""
configuration = {'uuid': '1'}
payload = {'serialNumber': 'DEADBEEF0011',
'UUID': '123456',
'configuration': configuration,
'deviceType': 'AP',
'location': '',
'macAddress': 'DE:AD:BE:EF:00:11',
'manufacturer': 'Testing',
'owner': ''}
print(json.dumps(payload))
resp = setup_controller.request("gw", "device/DEADBEEF0011", "POST", None, json.dumps(payload))
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw create devices", body=body)
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
resp = setup_controller.request("gw", "device/DEADBEEF0011", "GET", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw create device verify", body=body)
if resp.status_code != 200:
assert False
resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw create device delete", body=body)
if resp.status_code != 200:
assert False
@pytest.mark.sdk_restapi
def test_gwservice_updatedevice(self, setup_controller):
"""
"""
configuration = {'uuid': '1'}
payload = {'serialNumber': 'DEADBEEF0011',
'UUID': '123456',
'configuration': configuration,
'deviceType': 'AP',
'location': '',
'macAddress': 'DE:AD:BE:EF:00:11',
'manufacturer': 'Testing',
'owner': ''}
resp = setup_controller.request("gw", "device/DEADBEEF0011", "POST", None, json.dumps(payload))
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw create devices", body=body)
if resp.status_code != 200:
assert False
devices = json.loads(resp.text)
print (devices)
payload = {'serialNumber': 'DEADBEEF0011',
'owner': 'pytest'}
resp = setup_controller.request("gw", "device/DEADBEEF0011", "PUT", None, json.dumps(payload))
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw get device", body=body)
if resp.status_code != 200:
assert False
resp = setup_controller.request("gw", "device/DEADBEEF0011", "GET", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw create device verify", body=body)
if resp.status_code != 200:
assert False
device = json.loads(resp.text)
print (device)
resp = setup_controller.request("gw", "device/DEADBEEF0011", "DELETE", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="gw get device", body=body)
if resp.status_code != 200:
assert False

View File

@@ -0,0 +1,126 @@
"""
UCentral Security Services Rest API Tests: Test Login, Logout API's
"""
import pytest
import json
import allure
@allure.feature("SDK REST API")
class TestUcentralSecService(object):
"""
pytest -m "uci_login or uci_logout"
"""
@pytest.mark.sdk_restapi
def test_secservice_oauth(self, setup_controller):
"""
pytest -m "uci_login"
"""
resp = setup_controller.login_resp
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="login", body=body)
assert setup_controller.login_resp.status_code == 200
@pytest.mark.sdk_restapi
def test_secservice_oauth_revoke(self, setup_controller):
"""
pytest -m "uci_logout"
"""
resp = setup_controller.logout()
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="oauth revoke", body=body)
assert resp.status_code == 204
setup_controller.login()
@pytest.mark.sdk_restapi
def test_secservice_system_endpoints(self, setup_controller):
"""
pytest -m "uci_endpoints"
look for ucentralgw and ucentralfms services for 2.1 release
"""
resp = setup_controller.request("sec", "systemEndpoints", "GET", None, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="security systemEndpoints", body=body)
if resp.status_code != 200:
assert False
services = json.loads(resp.text)
print (services)
if 'endpoints' not in services:
assert False
num_services = 0
uri_present = 0
authtype_present = 0
for item in services['endpoints']:
if item['type'] == 'ucentralgw':
num_services += 1
if item['uri'] is not None:
uri_present += 1
if item['authenticationType'] is not None:
authtype_present += 1
elif item['type'] == 'ucentralfms':
num_services += 1
if item['uri'] is not None:
uri_present += 1
if item['authenticationType'] is not None:
authtype_present += 1
assert (num_services == 2) and (uri_present == 2) and (authtype_present == 2)
@pytest.mark.sdk_restapi
def test_secservice_get_version(self, setup_controller):
"""
pytest -m "uci_endpoints"
look for ucentralgw and ucentralfms services for 2.1 release
"""
params = {'command': 'version'}
resp = setup_controller.request("sec", "system", "GET", params, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="security get version result", body=body)
if resp.status_code != 200:
assert False
system = json.loads(resp.text)
print (system)
if 'tag' not in system:
assert False
if system['tag'] != 'version':
assert False
if not system['value']:
assert False
@pytest.mark.sdk_restapi
def test_secservice_get_uptime(self, setup_controller):
"""
look for ucentralgw and ucentralfms services for 2.1 release
"""
params = {'command': 'times'}
resp = setup_controller.request("sec", "system", "GET", params, None)
body = resp.url + "," + str(resp.status_code) + ',' + resp.text
allure.attach(name="security get uptime", body=body)
if resp.status_code != 200:
assert False
system = json.loads(resp.text)
print (system)
if 'times' not in system:
assert False
valid_entities = 0
for item in system['times']:
if item['tag'] == 'uptime':
valid_entities += 1
if item['tag'] == 'start':
valid_entities += 1
if item['value'] is not None:
valid_entities += 1
assert (valid_entities == 4)