diff --git a/tests/fixtures_2x.py b/tests/fixtures_2x.py index f722a1165..c42ddc77e 100644 --- a/tests/fixtures_2x.py +++ b/tests/fixtures_2x.py @@ -822,449 +822,180 @@ class Fixtures_2x: print(e) test_cases["wpa2_personal"] = False - dut = [] + dut_data = [] + ssid_data = [] # this will return configuration of your testbed from tests/conftest.py get_configuration fixtures print("get configuration", get_configuration) print(len(get_configuration['access_point'])) # print(get_configuration['access_point']) - for length in range(0, len(get_configuration['access_point'])): - ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x") - connected, latest, active = ap_ssh.get_ucentral_status() - print("connected", connected) - print("latest", latest) - print("active", active) - latest_old = latest - if connected == False: - pytest.exit("AP is disconnected from UC Gateway") - if latest != active: - allure.attach(name="FAIL : ubus call ucentral status: ", - body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str( - active)) - ap_logs = ap_ssh.logread() - allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") - pytest.fail("AP is disconnected from UC Gateway") - - S = 10 - - # Add logger command before config push - instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) - ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name) - - time_1 = time.time() - - # Apply config - print("get equipment id ref ", get_equipment_ref) - print("get equipment id ref [0]", get_equipment_ref[length]) - instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length]) - - config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) - config["uuid"] = 0 - - # Attach the config that is sent from API - allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON) - - ap_config_latest = ap_ssh.get_uc_latest_config() - try: - ap_config_latest["uuid"] = 0 - except Exception as e: - print(e) - pass - x = 1 - - # Check if ucentral gw has pushed the config into latest - connected, latest, active = ap_ssh.get_ucentral_status() - while latest_old == latest: - time.sleep(5) - x += 1 - print("old config: ", latest_old) - print("latest: ", latest) - connected, latest, active = ap_ssh.get_ucentral_status() - if x == 5: - break - onnected, latest, active = ap_ssh.get_ucentral_status() - if latest == latest_old: - latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) - allure.attach(name="Latest Config Received by AP: ", - body=str(latest_cfg), - attachment_type=allure.attachment_type.JSON) - ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, - stop_ref="stop testcase: " + instance_name) - allure.attach(body=ap_logs, name="AP Log: ") - print("Config from ucentral gw is not sent to AP") - else: - print("Config is sent to AP from ucentral gw") - - x = 1 - latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) - allure.attach(name="Latest Config Received by AP: ", - body=str(latest_cfg), - attachment_type=allure.attachment_type.JSON) - - while active != latest: - connected, latest, active = ap_ssh.get_ucentral_status() - time.sleep(20) - x += 1 - print("active: ", active) - print("latest: ", latest) - if x == 10: - break - - connected, latest, active = ap_ssh.get_ucentral_status() - if latest == active: - print("Config properly Applied on AP") - else: - print("Config is not Applied on AP") - - time_2 = time.time() - time_interval = time_2 - time_1 - allure.attach(name="Time Took to apply Config: " + str(time_interval), body="") - - time.sleep(60) - - ap_config_latest = ap_ssh.get_uc_latest_config() - ap_config_active = ap_ssh.get_uc_active_config() - if x < 19: - print("AP is Broadcasting Applied Config") - allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active)) - - else: - print("AP is Not Broadcasting Applied Config") - allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active)) - time.sleep(10) - - try: - iwinfo = ap_ssh.iwinfo() - allure.attach(name="iwinfo: ", body=str(iwinfo)) - - # tx_power, name = ap_ssh.gettxpower() - # allure.attach(name="interface name: ", body=str(name)) - # allure.attach(name="tx power: ", body=str(tx_power)) - except: - pass - ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) - ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, - stop_ref="stop testcase: " + instance_name) - allure.attach(body=ap_logs, name="AP Log: ") - - wifi_status = ap_ssh.get_wifi_status() - allure.attach(name="wifi status", body=str(wifi_status)) - - try: - ssid_info_sdk = instantiate_profile_obj.get_ssid_info() - print("ssid_info_sdk", ssid_info_sdk) - ap_wifi_data = ap_ssh.get_iwinfo() - print("ap_wifi_data", ap_wifi_data) - print(type(ap_wifi_data)) - - for p in ap_wifi_data: - print(p) - for q in ssid_info_sdk: - if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]: - q.append(ap_wifi_data[p][1]) - - ssid_data = [] - idx_mapping = {} - print(ssid_info_sdk) - for interface in range(len(ssid_info_sdk)): - ssid = ["ssid_idx=" + str(interface) + - " ssid=" + ssid_info_sdk[interface][0] + - " security=" + ssid_info_sdk[interface][1].upper() + - " password=" + ssid_info_sdk[interface][2] + - " bssid=" + ssid_info_sdk[interface][4].lower() - ] - print("ssid", ssid) - - idx_mapping[str(interface)] = [ssid_info_sdk[interface][0], - ssid_info_sdk[interface][2], - ssid_info_sdk[interface][1], - ssid_info_sdk[interface][3], - ssid_info_sdk[interface][4].lower() - ] - print("idx mapping", idx_mapping[str(interface)]) - ssid_data.append(ssid) - print("ssid_data", ssid_data) - lf_tools.ssid_list.append(ssid_info_sdk[interface][0]) - print("ssid_data", ssid_data) - if not skip_lf: - lf_tools.dut_idx_mapping = idx_mapping - lf_tools.update_ssid(ssid_data=ssid_data) - except Exception as e: - print(e) - pass - - # def teardown_session(): + # for length in range(0, len(get_configuration['access_point'])): + # ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x") + # connected, latest, active = ap_ssh.get_ucentral_status() + # print("connected", connected) + # print("latest", latest) + # print("active", active) + # latest_old = latest + # if connected == False: + # pytest.exit("AP is disconnected from UC Gateway") + # if latest != active: + # allure.attach(name="FAIL : ubus call ucentral status: ", + # body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str( + # active)) + # ap_logs = ap_ssh.logread() + # allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") + # pytest.fail("AP is disconnected from UC Gateway") + # + # S = 10 + # + # # Add logger command before config push + # instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) + # ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name) + # + # time_1 = time.time() + # + # # Apply config + # print("get equipment id ref ", get_equipment_ref) + # print("get equipment id ref [0]", get_equipment_ref[length]) + # instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length]) + # + # config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) + # config["uuid"] = 0 + # + # # Attach the config that is sent from API + # allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON) + # + # ap_config_latest = ap_ssh.get_uc_latest_config() + # try: + # ap_config_latest["uuid"] = 0 + # except Exception as e: + # print(e) + # pass + # x = 1 + # + # # Check if ucentral gw has pushed the config into latest + # connected, latest, active = ap_ssh.get_ucentral_status() + # while latest_old == latest: + # time.sleep(5) + # x += 1 + # print("old config: ", latest_old) + # print("latest: ", latest) + # connected, latest, active = ap_ssh.get_ucentral_status() + # if x == 5: + # break + # onnected, latest, active = ap_ssh.get_ucentral_status() + # if latest == latest_old: + # latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) + # allure.attach(name="Latest Config Received by AP: ", + # body=str(latest_cfg), + # attachment_type=allure.attachment_type.JSON) + # ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, + # stop_ref="stop testcase: " + instance_name) + # allure.attach(body=ap_logs, name="AP Log: ") + # print("Config from ucentral gw is not sent to AP") + # else: + # print("Config is sent to AP from ucentral gw") + # + # x = 1 + # latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) + # allure.attach(name="Latest Config Received by AP: ", + # body=str(latest_cfg), + # attachment_type=allure.attachment_type.JSON) + # + # while active != latest: + # connected, latest, active = ap_ssh.get_ucentral_status() + # time.sleep(20) + # x += 1 + # print("active: ", active) + # print("latest: ", latest) + # if x == 10: + # break + # + # connected, latest, active = ap_ssh.get_ucentral_status() + # if latest == active: + # print("Config properly Applied on AP") + # else: + # print("Config is not Applied on AP") + # + # time_2 = time.time() + # time_interval = time_2 - time_1 + # allure.attach(name="Time Took to apply Config: " + str(time_interval), body="") + # + # time.sleep(60) + # + # ap_config_latest = ap_ssh.get_uc_latest_config() + # ap_config_active = ap_ssh.get_uc_active_config() + # if x < 19: + # print("AP is Broadcasting Applied Config") + # allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active)) + # + # else: + # print("AP is Not Broadcasting Applied Config") + # allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active)) + # time.sleep(10) + # + # try: + # iwinfo = ap_ssh.iwinfo() + # allure.attach(name="iwinfo: ", body=str(iwinfo)) + # except: + # pass + # ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) + # ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, + # stop_ref="stop testcase: " + instance_name) + # allure.attach(body=ap_logs, name="AP Log: ") + # # wifi_status = ap_ssh.get_wifi_status() # allure.attach(name="wifi status", body=str(wifi_status)) # - # iwinfo = ap_ssh.iwinfo() - # allure.attach(name="iwinfo: ", body=str(iwinfo)) + # try: + # ssid_info_sdk = instantiate_profile_obj.get_ssid_info() + # print("ssid_info_sdk", ssid_info_sdk) + # ap_wifi_data = ap_ssh.get_iwinfo() + # print("ap_wifi_data", ap_wifi_data) + # print(type(ap_wifi_data)) # - # print("\nTeardown") + # for p in ap_wifi_data: + # print(p) + # for q in ssid_info_sdk: + # if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]: + # q.append(ap_wifi_data[p][1]) + # + # + # idx_mapping = {} + # print(ssid_info_sdk) + # dut_data.append(ssid_info_sdk) + # + # except Exception as e: + # print(e) + # pass + # print("ssid_data", ssid_data) + # print("dut", dut_data) + # dut_ssid_data = [] + # dut_final_data = [] + # for dut in range(len(dut_data)): + # for interface in range(len(dut_data[dut])): + # ssid = ["ssid_idx=" + str(interface) + + # " ssid=" + dut_data[dut][interface][0] + + # " security=" + dut_data[dut][interface][1].upper() + + # " password=" + dut_data[dut][interface][2] + + # " bssid=" + dut_data[dut][interface][4].lower() + # ] + # print(ssid) + # dut_ssid_data.append(ssid) + # dut_final_data.append(dut_ssid_data) + # print("dut ssid data", dut_ssid_data) + + #dut creation for mesh + dut_ssid_data = [[['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:4a:84'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:4a:7c']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=34:ef:b6:af:49:0d'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=34:ef:b6:af:49:05']], [['ssid_idx=0 ssid=ssid_wpa2_2g security=WPA2 password=something bssid=90:3c:b3:9d:69:36'], ['ssid_idx=1 ssid=ssid_wpa2_5g security=WPA2 password=something bssid=90:3c:b3:9d:69:2e']]] + create_dut = lf_tools.create_mesh_dut(ssid_data=dut_ssid_data) + + #create mesh scenario + mesh_scenario = lf_tools.create_mesh_scenario() + + #check for all ap are connected and is pinging + + #create a mesh scenario with dhcp disable option to node-1 and node-2 + + + - # request.addfinalizer(teardown_session) - # return test_cases - # def setup_mesh_profile(self, request, param, get_apnos, get_configuration, setup_controller, instantiate_profile, get_markers,get_equipment_ref, lf_tools, skip_lf=False, open_flow=None): - # instantiate_profile_obj = instantiate_profile(sdk_client=setup_controller) - # print(1, instantiate_profile_obj.sdk_client) - # vlan_id, mode = 0, 0 - # parameter = dict(param) - # print("parameter", parameter) - # print(list(parameter["ssid_modes"])[0]) - # ssid_info_sdk = instantiate_profile_obj.get_ssid_info() - # print(ssid_info_sdk) - # test_cases = {} - # profile_data = {} - # var = "" - # if parameter['mode'] not in ["BRIDGE", "NAT", "VLAN"]: - # print("Invalid Mode: ", parameter['mode']) - # return test_cases - # instantiate_profile_obj.set_radio_config() - # if parameter['mode'] == "NAT": - # mode = "NAT" - # instantiate_profile_obj.set_mode(mode=mode) - # vlan_id = 1 - # if parameter['mode'] == "BRIDGE": - # mode = "BRIDGE" - # instantiate_profile_obj.set_mode(mode=mode, mesh=True) - # vlan_id = 1 - # if parameter['mode'] == "VLAN": - # mode = "VLAN" - # instantiate_profile_obj.set_mode(mode=mode) - # - # profile_data["ssid"] = {} - # - # for i in parameter["ssid_modes"]: - # profile_data["ssid"][i] = [] - # for j in range(len(parameter["ssid_modes"][i])): - # data = parameter["ssid_modes"][i][j] - # profile_data["ssid"][i].append(data) - # lf_dut_data = [] - # - # for mode in profile_data['ssid']: - # if mode == "open": - # for j in profile_data["ssid"][mode]: - # if mode in get_markers.keys() and get_markers[mode]: - # try: - # if j["appliedRadios"].__contains__("2G"): - # lf_dut_data.append(j) - # if j["appliedRadios"].__contains__("5G"): - # lf_dut_data.append(j) - # j["appliedRadios"] = list(set(j["appliedRadios"])) - # j['security'] = 'none' - # creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - # test_cases["wpa_2g"] = True - # except Exception as e: - # print(e) - # test_cases["wpa_2g"] = False - # if mode == "wpa": - # for j in profile_data["ssid"][mode]: - # if mode in get_markers.keys() and get_markers[mode]: - # try: - # if j["appliedRadios"].__contains__("2G"): - # lf_dut_data.append(j) - # if j["appliedRadios"].__contains__("5G"): - # lf_dut_data.append(j) - # j["appliedRadios"] = list(set(j["appliedRadios"])) - # j['security'] = 'psk' - # creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - # test_cases["wpa_2g"] = True - # except Exception as e: - # print(e) - # test_cases["wpa_2g"] = False - # if mode == "wpa2_personal": - # for j in profile_data["ssid"][mode]: - # - # if mode in get_markers.keys() and get_markers[mode]: - # try: - # if j["appliedRadios"].__contains__("2G"): - # lf_dut_data.append(j) - # if j["appliedRadios"].__contains__("5G"): - # lf_dut_data.append(j) - # j["appliedRadios"] = list(set(j["appliedRadios"])) - # j['security'] = 'psk2' - # creates_profile = instantiate_profile_obj.add_ssid(ssid_data=j) - # test_cases["wpa_2g"] = True - # except Exception as e: - # print(e) - # test_cases["wpa2_personal"] = False - # - # - # - # # this will return configuration of your testbed from tests/conftest.py get_configuration fixtures - # print("get configuration",get_configuration) - # print(len(get_configuration['access_point'])) - # # print(get_configuration['access_point']) - # for length in range(0,len(get_configuration['access_point'])): - # ap_ssh = get_apnos(credentials=get_configuration['access_point'][length], pwd="../libs/apnos/", sdk="2.x") - # connected, latest, active = ap_ssh.get_ucentral_status() - # print("connected", connected) - # print("latest",latest) - # print("active", active) - # latest_old = latest - # if connected == False: - # pytest.exit("AP is disconnected from UC Gateway") - # if latest != active: - # allure.attach(name="FAIL : ubus call ucentral status: ", body="connected: " + str(connected) + "\nlatest: " + str(latest) + "\nactive: " + str(active)) - # ap_logs = ap_ssh.logread() - # allure.attach(body=ap_logs, name="FAILURE: AP LOgs: ") - # pytest.fail("AP is disconnected from UC Gateway") - # - # S = 10 - # - # # Add logger command before config push - # instance_name = ''.join(random.choices(string.ascii_uppercase + string.digits, k=S)) - # ap_ssh.run_generic_command(cmd="logger start testcase: " + instance_name) - # - # time_1 = time.time() - # - # # Apply config - # print("get equipment id ref ", get_equipment_ref) - # print("get equipment id ref [0]", get_equipment_ref[length]) - # instantiate_profile_obj.push_config(serial_number=get_equipment_ref[length]) - # - # config = json.loads(str(instantiate_profile_obj.base_profile_config).replace(" ", "").replace("'", '"')) - # config["uuid"] = 0 - # - # # Attach the config that is sent from API - # allure.attach(name="Config Sent from API: ", body=str(config), attachment_type=allure.attachment_type.JSON) - # - # ap_config_latest = ap_ssh.get_uc_latest_config() - # try: - # ap_config_latest["uuid"] = 0 - # except Exception as e: - # print(e) - # pass - # x = 1 - # - # # Check if ucentral gw has pushed the config into latest - # connected, latest, active = ap_ssh.get_ucentral_status() - # while latest_old == latest: - # time.sleep(5) - # x += 1 - # print("old config: ", latest_old) - # print("latest: ", latest) - # connected, latest, active = ap_ssh.get_ucentral_status() - # if x == 5: - # break - # onnected, latest, active = ap_ssh.get_ucentral_status() - # if latest == latest_old: - # latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) - # allure.attach(name="Latest Config Received by AP: ", - # body=str(latest_cfg), - # attachment_type=allure.attachment_type.JSON) - # ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, - # stop_ref="stop testcase: " + instance_name) - # allure.attach(body=ap_logs, name="AP Log: ") - # print("Config from ucentral gw is not sent to AP") - # else: - # print("Config is sent to AP from ucentral gw") - # - # x = 1 - # latest_cfg = ap_ssh.run_generic_command(cmd="cat /etc/ucentral/ucentral.cfg." + str(latest)) - # allure.attach(name="Latest Config Received by AP: ", - # body=str(latest_cfg), - # attachment_type=allure.attachment_type.JSON) - # - # while active != latest: - # connected, latest, active = ap_ssh.get_ucentral_status() - # time.sleep(20) - # x += 1 - # print("active: ", active) - # print("latest: ", latest) - # if x == 10: - # break - # - # connected, latest, active = ap_ssh.get_ucentral_status() - # if latest == active: - # print("Config properly Applied on AP") - # else: - # print("Config is not Applied on AP") - # - # time_2 = time.time() - # time_interval = time_2 - time_1 - # allure.attach(name="Time Took to apply Config: " + str(time_interval), body="") - # - # time.sleep(60) - # - # ap_config_latest = ap_ssh.get_uc_latest_config() - # ap_config_active = ap_ssh.get_uc_active_config() - # if x < 19: - # print("AP is Broadcasting Applied Config") - # allure.attach(name="Success : Active Config in AP: ", body=str(ap_config_active)) - # - # else: - # print("AP is Not Broadcasting Applied Config") - # allure.attach(name="Failed to Apply Config : Active Config in AP : ", body=str(ap_config_active)) - # time.sleep(10) - # - # try: - # iwinfo = ap_ssh.iwinfo() - # allure.attach(name="iwinfo: ", body=str(iwinfo)) - # - # # tx_power, name = ap_ssh.gettxpower() - # # allure.attach(name="interface name: ", body=str(name)) - # # allure.attach(name="tx power: ", body=str(tx_power)) - # except: - # pass - # ap_ssh.run_generic_command(cmd="logger stop testcase: " + instance_name) - # ap_logs = ap_ssh.get_logread(start_ref="start testcase: " + instance_name, - # stop_ref="stop testcase: " + instance_name) - # allure.attach(body=ap_logs, name="AP Log: ") - # - # wifi_status = ap_ssh.get_wifi_status() - # allure.attach(name="wifi status", body=str(wifi_status)) - # - # try: - # ssid_info_sdk = instantiate_profile_obj.get_ssid_info() - # print("ssid_info_sdk", ssid_info_sdk) - # ap_wifi_data = ap_ssh.get_iwinfo() - # print("ap_wifi_data", ap_wifi_data) - # print(type(ap_wifi_data)) - # - # for p in ap_wifi_data: - # print(p) - # for q in ssid_info_sdk: - # if ap_wifi_data[p][0] == q[0] and ap_wifi_data[p][2] == q[3]: - # q.append(ap_wifi_data[p][1]) - # - # ssid_data = [] - # idx_mapping = {} - # print(ssid_info_sdk) - # for interface in range(len(ssid_info_sdk)): - # ssid = ["ssid_idx=" + str(interface) + - # " ssid=" + ssid_info_sdk[interface][0] + - # " security=" + ssid_info_sdk[interface][1].upper() + - # " password=" + ssid_info_sdk[interface][2] + - # " bssid=" + ssid_info_sdk[interface][4].lower() - # ] - # print("ssid", ssid) - # idx_mapping[str(interface)] = [ssid_info_sdk[interface][0], - # ssid_info_sdk[interface][2], - # ssid_info_sdk[interface][1], - # ssid_info_sdk[interface][3], - # ssid_info_sdk[interface][4].lower() - # ] - # print("idx mapping", idx_mapping[str(interface)]) - # ssid_data.append(ssid) - # print("ssid_data", ssid_data) - # lf_tools.ssid_list.append(ssid_info_sdk[interface][0]) - # if not skip_lf: - # lf_tools.dut_idx_mapping = idx_mapping - # lf_tools.update_ssid(ssid_data=ssid_data) - # except Exception as e: - # print(e) - # pass - # - # def teardown_session(): - # wifi_status = ap_ssh.get_wifi_status() - # allure.attach(name="wifi status", body=str(wifi_status)) - # - # iwinfo = ap_ssh.iwinfo() - # allure.attach(name="iwinfo: ", body=str(iwinfo)) - # - # print("\nTeardown") - # - # request.addfinalizer(teardown_session) - # return test_cases - #