mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-01 19:37:54 +00:00
AP Version pull from configuration file, openwrt_ctl.py auto add to jumphost, firmware upgrade fixes
This commit is contained in:
@@ -11,11 +11,12 @@ Currently Having Methods:
|
||||
"""
|
||||
|
||||
import paramiko
|
||||
|
||||
from scp import SCPClient
|
||||
import os
|
||||
|
||||
class APNOS:
|
||||
|
||||
def __init__(self, credentials=None):
|
||||
def __init__(self, credentials=None, pwd=os.getcwd()):
|
||||
self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi"
|
||||
if credentials is None:
|
||||
print("No credentials Given")
|
||||
@@ -27,6 +28,25 @@ class APNOS:
|
||||
self.mode = credentials['jumphost'] # 1 for jumphost, 0 for direct ssh
|
||||
if self.mode:
|
||||
self.tty = credentials['jumphost_tty'] # /dev/ttyAP1
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = '[ -f ~/cicd-git/ ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
if str(stdout.read()).__contains__("False"):
|
||||
cmd = 'mkdir ~/cicd-git/'
|
||||
client.exec_command(cmd)
|
||||
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
if str(stdout.read()).__contains__("False"):
|
||||
print("Copying openwrt_ctl serial control Script...")
|
||||
with SCPClient(client.get_transport()) as scp:
|
||||
scp.put(pwd+'openwrt_ctl.py', '~/cicd-git/openwrt_ctl.py') # Copy my_file.txt to the server
|
||||
cmd = '[ -f ~/cicd-git/openwrt_ctl.py ] && echo "True" || echo "False"'
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
var = str(stdout.read())
|
||||
if var.__contains__("True"):
|
||||
print("APNOS Serial Setup OK")
|
||||
else:
|
||||
print("APNOS Serial Setup Fail")
|
||||
|
||||
# Method to connect AP-CLI/ JUMPHOST-CLI
|
||||
def ssh_cli_connect(self):
|
||||
@@ -44,7 +64,7 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = 'iwinfo'
|
||||
if self.mode:
|
||||
cmd = f"cd /home/lanforge/lanforge-scripts/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
@@ -56,7 +76,7 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_Config -c"
|
||||
if self.mode:
|
||||
cmd = f"cd /home/lanforge/lanforge-scripts/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
@@ -68,7 +88,7 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "/usr/opensync/bin/ovsh s Wifi_VIF_State -c"
|
||||
if self.mode:
|
||||
cmd = f"cd /home/lanforge/lanforge-scripts/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
@@ -101,7 +121,7 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = '/usr/opensync/bin/ovsh s AWLAN_Node -c | grep FW_IMAGE_ACTIVE'
|
||||
if self.mode:
|
||||
cmd = f"cd /home/lanforge/lanforge-scripts/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
|
||||
f" --action cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
@@ -121,15 +141,17 @@ class APNOS:
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = '/usr/opensync/bin/ovsh s Manager -c | grep status'
|
||||
if self.mode:
|
||||
cmd = f"cd /home/lanforge/lanforge-scripts/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty}" \
|
||||
f" --action cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
status = str(output.decode('utf-8').splitlines())
|
||||
print(output, stderr.read())
|
||||
# print(output, stderr.read())
|
||||
client.close()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
status = "Error"
|
||||
return status
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ CONFIGURATION = {
|
||||
'password': "lanforge",
|
||||
'port': 22,
|
||||
'jumphost_tty': '/dev/ttyAP1',
|
||||
'version': "version"
|
||||
'version': "ecw5410-2021-04-26-pending-3fc41fa"
|
||||
}
|
||||
],
|
||||
"traffic_generator": {
|
||||
@@ -56,7 +56,7 @@ CONFIGURATION = {
|
||||
'password': "lanforge",
|
||||
'port': 22,
|
||||
'jumphost_tty': '/dev/ttyAP1',
|
||||
'version': "version"
|
||||
'version': "ecw5410-2021-04-26-pending-3fc41fa"
|
||||
}
|
||||
],
|
||||
"traffic_generator": {
|
||||
@@ -91,9 +91,9 @@ CONFIGURATION = {
|
||||
'ip': "192.168.200.82",
|
||||
'username': "lanforge",
|
||||
'password': "lanforge",
|
||||
'port': 8809,
|
||||
'port': 22,
|
||||
'jumphost_tty': '/dev/ttyAP1',
|
||||
'version': "version"
|
||||
'version': "ecw5410-2021-04-26-pending-3fc41fa"
|
||||
}
|
||||
],
|
||||
"traffic_generator": {
|
||||
|
||||
@@ -108,6 +108,7 @@ Instantiate Objects for Test session
|
||||
def instantiate_controller(request, testbed):
|
||||
try:
|
||||
sdk_client = Controller(controller_data=CONFIGURATION[testbed]["controller"])
|
||||
|
||||
def teardown_session():
|
||||
print("\nTest session Completed")
|
||||
sdk_client.disconnect_Controller()
|
||||
@@ -131,7 +132,9 @@ def instantiate_testrail(request):
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def instantiate_firmware(instantiate_controller, instantiate_jFrog, testbed):
|
||||
firmware_client = FirmwareUtility(jfrog_credentials=instantiate_jFrog, sdk_client=instantiate_controller, model=CONFIGURATION[testbed]["access_point"][0]["model"])
|
||||
firmware_client = FirmwareUtility(jfrog_credentials=instantiate_jFrog, sdk_client=instantiate_controller,
|
||||
model=CONFIGURATION[testbed]["access_point"][0]["model"],
|
||||
version=CONFIGURATION[testbed]["access_point"][0]["version"])
|
||||
yield firmware_client
|
||||
|
||||
|
||||
@@ -170,8 +173,14 @@ def test_cases():
|
||||
yield TEST_CASES
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def instantiate_access_point(testbed):
|
||||
APNOS(CONFIGURATION[testbed]['access_point'][0], pwd="../libs/apnos/")
|
||||
yield True
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_access_point(testbed):
|
||||
def test_access_point(testbed, instantiate_access_point):
|
||||
ap_ssh = APNOS(CONFIGURATION[testbed]['access_point'][0])
|
||||
status = ap_ssh.get_manager_state()
|
||||
if "ACTIVE" not in status:
|
||||
@@ -238,7 +247,7 @@ def get_markers(request, get_security_flags):
|
||||
@pytest.fixture(scope="session")
|
||||
def get_latest_firmware(instantiate_firmware):
|
||||
# try:
|
||||
latest_firmware = instantiate_firmware.get_latest_fw_version()
|
||||
latest_firmware = instantiate_firmware.get_fw_version()
|
||||
# except:
|
||||
# latest_firmware = False
|
||||
yield latest_firmware
|
||||
|
||||
@@ -30,7 +30,7 @@ import pytest
|
||||
pytestmark = [pytest.mark.test_featureA, pytest.mark.bridge]
|
||||
|
||||
|
||||
@pytest.mark.wifi_capacity_test
|
||||
@pytest.mark.test_featureA
|
||||
@pytest.mark.wifi5
|
||||
@pytest.mark.wifi6
|
||||
@pytest.mark.parametrize(
|
||||
@@ -53,7 +53,7 @@ class TestFeatureABridge(object):
|
||||
security_key = profile_data["security_key"]
|
||||
security = "wpa"
|
||||
upstream = get_lanforge_data["lanforge_bridge_port"]
|
||||
radio = get_lanforge_data["lanforge_2g"]
|
||||
radio = get_lanforge_data["lanforge_2dot4g"]
|
||||
# Write Your test case Here
|
||||
PASS = True
|
||||
assert PASS
|
||||
@@ -84,7 +84,7 @@ class TestFeatureABridge(object):
|
||||
security_key = profile_data["security_key"]
|
||||
security = "wpa2"
|
||||
upstream = get_lanforge_data["lanforge_bridge_port"]
|
||||
radio = get_lanforge_data["lanforge_2g"]
|
||||
radio = get_lanforge_data["lanforge_2dot4g"]
|
||||
# Write Your test case Here
|
||||
PASS = True
|
||||
assert PASS
|
||||
|
||||
@@ -101,7 +101,7 @@ def upload_firmware(should_upload_firmware, instantiate_firmware, get_latest_fir
|
||||
yield firmware_id
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@pytest.fixture(scope="function")
|
||||
def upgrade_firmware(request, instantiate_firmware, get_equipment_id, check_ap_firmware_cloud, get_latest_firmware,
|
||||
should_upgrade_firmware):
|
||||
if get_latest_firmware != check_ap_firmware_cloud:
|
||||
@@ -119,7 +119,7 @@ def upgrade_firmware(request, instantiate_firmware, get_equipment_id, check_ap_f
|
||||
yield status
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@pytest.fixture(scope="function")
|
||||
def check_ap_firmware_cloud(instantiate_controller, get_equipment_id):
|
||||
yield instantiate_controller.get_ap_firmware_old_method(equipment_id=get_equipment_id)
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import pytest
|
||||
import sys
|
||||
|
||||
pytestmark = [pytest.mark.test_connection]
|
||||
pytestmark = [pytest.mark.test_connectivity]
|
||||
|
||||
|
||||
@pytest.mark.sanity
|
||||
|
||||
Reference in New Issue
Block a user