mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-02 03:48:09 +00:00
2.x updates
setup_profiles for 2.x added functionality - bssid band ssid mapping for lf dut get_vif_state fixture adjustments for 2.x (posibbly changing the naming convention in next commit) lf_tests synced up with master branch lf_tools synced up with master added a ssid_list in lf_tools, it will be updated and used on a class level for verification of ssid availability in each test case Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -23,6 +23,7 @@ class APNOS:
|
||||
allure.attach(name="APNOS LIbrary: ", body=str(credentials))
|
||||
self.serial = credentials['serial']
|
||||
self.owrt_args = "--prompt root@OpenAp -s serial --log stdout --user root --passwd openwifi"
|
||||
self.sdk = sdk
|
||||
if sdk == "2.x":
|
||||
self.owrt_args = "--prompt root@" + self.serial + " -s serial --log stdout --user root --passwd openwifi"
|
||||
if credentials is None:
|
||||
@@ -85,18 +86,39 @@ class APNOS:
|
||||
return output
|
||||
|
||||
# Method to get the iwinfo status of AP using AP-CLI/ JUMPHOST-CLI
|
||||
def iwinfo_status(self):
|
||||
# Method to get the iwinfo status of AP using AP-CLI/ JUMPHOST-CLI
|
||||
def get_bssid_band_mapping(self):
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = 'iwinfo'
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
data = stdout.read()
|
||||
client.close()
|
||||
allure.attach(name="iwinfo Output Msg: ", body=str(output))
|
||||
allure.attach(name="iwinfo Output Msg: ", body=str(data))
|
||||
allure.attach(name="iwinfo config Err Msg: ", body=str(stderr))
|
||||
return output
|
||||
data = str(data).replace(" ", "").split("\\r\\n")
|
||||
band_info = []
|
||||
for i in data:
|
||||
tmp = []
|
||||
if i.__contains__("AccessPoint"):
|
||||
bssid = i.replace("AccessPoint:", "")
|
||||
tmp.append(bssid.casefold())
|
||||
elif i.__contains__("MasterChannel"):
|
||||
if i.split(":")[2].__contains__("2.4"):
|
||||
tmp.append("2G")
|
||||
else:
|
||||
tmp.append("5G")
|
||||
else:
|
||||
tmp = []
|
||||
if tmp != []:
|
||||
band_info.append(tmp)
|
||||
bssi_band_mapping = {}
|
||||
for i in range(len(band_info)):
|
||||
if (i % 2) == 0:
|
||||
bssi_band_mapping[band_info[i][0]] = band_info[i + 1][0]
|
||||
return bssi_band_mapping
|
||||
|
||||
# Method to get the vif_config of AP using AP-CLI/ JUMPHOST-CLI
|
||||
def get_vif_config(self):
|
||||
@@ -365,63 +387,86 @@ class APNOS:
|
||||
print(e)
|
||||
return json_output
|
||||
|
||||
def logread(self):
|
||||
def get_interface_details(self):
|
||||
r = self.get_wifi_status()
|
||||
print(r)
|
||||
wifi_info = {}
|
||||
if self.sdk == "1.x":
|
||||
for i in r:
|
||||
for j in r[i]["interfaces"]:
|
||||
encryption = j["config"]["encryption"]
|
||||
if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \
|
||||
encryption == "sae" or encryption == "sae-mixed":
|
||||
wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]]
|
||||
else:
|
||||
wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""]
|
||||
print(wifi_info)
|
||||
data = self.get_iwinfo()
|
||||
for i in wifi_info.keys():
|
||||
wifi_info[i].append(data[i])
|
||||
|
||||
return wifi_info
|
||||
if self.sdk == "2.x":
|
||||
for i in r:
|
||||
for j in r[i]["interfaces"]:
|
||||
encryption = j["config"]["encryption"]
|
||||
if encryption == "psk" or encryption == "psk2" or encryption == "psk-mixed" or \
|
||||
encryption == "sae" or encryption == "sae-mixed":
|
||||
wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], j["config"]["key"]]
|
||||
else:
|
||||
wifi_info[j["ifname"]] = [j["config"]["ssid"], j["config"]["encryption"], ""]
|
||||
data = self.get_iwinfo()
|
||||
for i in wifi_info.keys():
|
||||
wifi_info[i].append(data[i])
|
||||
return wifi_info
|
||||
|
||||
def get_wifi_status(self):
|
||||
try:
|
||||
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "logread"
|
||||
cmd = "wifi status"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
status = output.decode('utf-8').splitlines()
|
||||
logread = status
|
||||
logs = ""
|
||||
for i in logread:
|
||||
logs = logs + i + "\n"
|
||||
|
||||
output = stdout.read().decode('utf-8')
|
||||
data = output.split()
|
||||
data.pop(0)
|
||||
data.pop(0)
|
||||
data.pop(0)
|
||||
OUT = "".join(data)
|
||||
json_output = json.loads(OUT)
|
||||
client.close()
|
||||
except Exception as e:
|
||||
json_output = False
|
||||
print(e)
|
||||
logs = ""
|
||||
return logs
|
||||
return json_output
|
||||
|
||||
def get_vifc(self):
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "vifC"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
client.close()
|
||||
allure.attach(name="vif state Output Msg: ", body=str(output))
|
||||
allure.attach(name="vif state Err Msg: ", body=str(stderr))
|
||||
return output
|
||||
def get_iwinfo(self):
|
||||
try:
|
||||
|
||||
def get_vifs(self):
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "vifS"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read()
|
||||
client.close()
|
||||
allure.attach(name="vif state Output Msg: ", body=str(output))
|
||||
allure.attach(name="vif state Err Msg: ", body=str(stderr))
|
||||
return output
|
||||
|
||||
def get_vlan(self):
|
||||
stdout = self.get_vifs()
|
||||
vlan_list = []
|
||||
for i in stdout.splitlines():
|
||||
vlan = str(i.strip()).replace("|", ".").split(".")
|
||||
try:
|
||||
if not vlan[0].find("b'vlan_id"):
|
||||
vlan_list.append(vlan[1].strip())
|
||||
except:
|
||||
pass
|
||||
return vlan_list
|
||||
client = self.ssh_cli_connect()
|
||||
cmd = "iwinfo"
|
||||
if self.mode:
|
||||
cmd = f"cd ~/cicd-git/ && ./openwrt_ctl.py {self.owrt_args} -t {self.tty} --action " \
|
||||
f"cmd --value \"{cmd}\" "
|
||||
stdin, stdout, stderr = client.exec_command(cmd)
|
||||
output = stdout.read().replace(b":~# iwinfo", b"").decode('utf-8')
|
||||
o = output.split()
|
||||
iwinfo_bssid_data = {}
|
||||
for i in range(len(o)):
|
||||
if o[i].__contains__("ESSID"):
|
||||
if o[i + 9].__contains__("2.4"):
|
||||
band = "2G"
|
||||
else:
|
||||
band = "5G"
|
||||
iwinfo_bssid_data[o[i - 1]] = [o[i + 4], band]
|
||||
client.close()
|
||||
except Exception as e:
|
||||
iwinfo_bssid_data = False
|
||||
print(e)
|
||||
return iwinfo_bssid_data
|
||||
|
||||
def logread(self):
|
||||
try:
|
||||
@@ -486,15 +531,15 @@ if __name__ == '__main__':
|
||||
obj = {
|
||||
'model': 'eap102',
|
||||
'mode': 'wifi6',
|
||||
'serial': '903cb39d6918',
|
||||
'serial': '0000c1018812',
|
||||
'jumphost': True,
|
||||
'ip': "10.28.3.103", # 10.28.3.103
|
||||
'ip': "10.28.3.100", # 10.28.3.103
|
||||
'username': "lanforge",
|
||||
'password': "pumpkin77",
|
||||
'port': 22, # 22
|
||||
'jumphost_tty': '/dev/ttyAP2',
|
||||
'jumphost_tty': '/dev/ttyAP1',
|
||||
'version': "https://tip.jfrog.io/artifactory/tip-wlan-ap-firmware/uCentral/edgecore_eap102/20210625-edgecore_eap102-uCentral-trunk-4225122-upgrade.bin"
|
||||
}
|
||||
var = APNOS(credentials=obj, sdk="2.x")
|
||||
r = var.get_uc_latest_config()
|
||||
print(r)
|
||||
var = APNOS(credentials=obj, sdk="1.x")
|
||||
x = var.get_interface_details()
|
||||
print(x)
|
||||
Reference in New Issue
Block a user