diff --git a/py-scripts/lf_hard_roam_test.py b/py-scripts/lf_hard_roam_test.py index 2b6ba262..0ab79c47 100644 --- a/py-scripts/lf_hard_roam_test.py +++ b/py-scripts/lf_hard_roam_test.py @@ -38,7 +38,8 @@ sniff_radio = importlib.import_module("py-scripts.lf_sniff_radio") sta_connect = importlib.import_module("py-scripts.sta_connect2") lf_clean = importlib.import_module("py-scripts.lf_cleanup") series = importlib.import_module("cc_module_9800_3504") - +attenuator = importlib.import_module("py-scripts.attenuator_serial") +modify = importlib.import_module("py-scripts.lf_atten_mod_test") class HardRoam(Realm): @@ -79,7 +80,8 @@ class HardRoam(Realm): timeout="10", identity=None, ttls_pass=None, - debug=False + debug=False, + soft_roam=False ): super().__init__(lanforge_ip, lanforge_port) @@ -137,6 +139,7 @@ class HardRoam(Realm): self.ttls_pass = ttls_pass self.debug = debug self.mac_data = None + self.soft_roam = soft_roam # start debugger of controller def start_debug_(self, mac_list): @@ -479,6 +482,19 @@ class HardRoam(Realm): print("query", query_reasso_response) return query_reasso_response + # get attenuator serial number + def attenuator_serial(self): + obj = attenuator.AttenuatorSerial( + lfclient_host=self.lanforge_ip, + lfclient_port=self.lanforge_port + ) + val = obj.show() + return val + + def attenuator_modify(self, serno, idx, val): + atten_obj = modify.CreateAttenuator(self.lanforge_ip, self.lanforge_port, serno, idx, val) + atten_obj.build() + # main roam fun def run(self, file_n=None): kernel_log = [] @@ -493,7 +509,23 @@ class HardRoam(Realm): self.precleanup() message = None, None + # if soft roam is selected set attenuator to zero initiallly + if self.soft_roam: + print("make both attenuators set to zero attenuation at the begining") + ser_no = self.attenuator_serial() + print(ser_no[0], ser_no[1]) + ser_1 = ser_no[0].split(".")[2] + ser_2 = ser_no[1].split(".")[2] + # set attenuation to zero in first attenuator and high in second attenuator + self.attenuator_modify(ser_1, "all", 0) + self.attenuator_modify(ser_2, "all", 0) + + + # create clients with respect to bands + self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio, + test_name="roam_11r_" + str(self.option) + "start" + "_", + duration=3600) if self.band == "twog": self.local_realm.reset_port(self.twog_radios) self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name, @@ -518,6 +550,22 @@ class HardRoam(Realm): print("no response") else: val = self.wait_for_ip(sta_list) + for sta_name in sta_list: + sta = sta_name.split(".")[2] + # wpa_cmd = "roam " + str(checker2) + + bgscan = { + "shelf": 1, + "resource": 1, + "port": str(sta), + "type": 'NA', + "text": 'bgscan="simple:30:-65:300"' + } + + print(bgscan) + self.local_realm.json_post("/cli-json/set_wifi_custom", bgscan) + time.sleep(2) + mac_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] @@ -527,6 +575,23 @@ class HardRoam(Realm): print("mac address list of all stations", mac_list) self.mac_data = mac_list + # if self.debug: + # print("start debug") + # self.start_debug_(mac_list=mac_list) + # print("check for 30 min") + # time.sleep(1800) + # print("stop sniff") + # file_name_ = self.stop_sniffer() + # file_name = "./pcap/" + str(file_name_) + # print("pcap file name", file_name) + # if self.debug: + # print("stop debugger") + # self.stop_debug_(mac_list=mac_list) + # # time.sleep(40) + # exit() + + + # if all stations got ip check mac address if val: print("all stations got ip") @@ -537,7 +602,6 @@ class HardRoam(Realm): check = [] for sta_name in sta_list: sta = sta_name.split(".")[2] - time.sleep(5) bssid = self.station_data_query(station_name=str(sta), query="ap") print(bssid) check.append(bssid) @@ -548,18 +612,35 @@ class HardRoam(Realm): result = all(element == check[0] for element in check) # if yes if result: - self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, - side_b_max_rate=0, - sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, - side_b_min_pdu=1250) + pass + # self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, + # side_b_max_rate=0, + # sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, + # side_b_min_pdu=1250) else: # if bssid's are not same move try to move all clients to one ap print("try to move all clients to one ap before roam ") - for sta_name in sta_list: + count1 = check.count(self.c1_bssid.upper()) + count2 = check.count(self.c2_bssid.upper()) + checker, new_sta_list, checker2 = None, [], None + if count1 > count2: + print("station connected mostly to ap1") + checker = self.c2_bssid.upper() + checker2 = self.c1_bssid.upper() + else: + checker = self.c1_bssid.upper() + checker2 = self.c2_bssid.upper() + index_count = [i for i, x in enumerate(check) if x == checker] + print(index_count) + for i in index_count: + new_sta_list.append(sta_list[i]) + print("new_sta_list", new_sta_list) + + for sta_name in new_sta_list: sta = sta_name.split(".")[2] print(sta) - wpa_cmd = "roam " + str(check[0]) + wpa_cmd = "roam " + str(checker2) wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, @@ -576,10 +657,10 @@ class HardRoam(Realm): self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) time.sleep(2) self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) - self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, - side_b_max_rate=0, - sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, - side_b_min_pdu=1250) + # self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000, + # side_b_max_rate=0, + # sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250, + # side_b_min_pdu=1250) timeout, variable, iterable_var = None, None, None @@ -596,7 +677,7 @@ class HardRoam(Realm): # roam iteration loop starts here while variable: print("variable", variable) - iter = None + iter, number, ser_1, ser_2 = None, None, None, None if variable != -1: iter = iterable_var - variable variable = variable - 1 @@ -608,6 +689,22 @@ class HardRoam(Realm): if time.time() > timeout: break time.sleep(1) + if self.soft_roam: + if iter % 2 == 0: + print("even set c1 to lowest and c2 to highest attenuation ") + number = "even" + # get serial nummber of attenuators from lf + ser_no = self.attenuator_serial() + print(ser_no[0]) + ser_1 = ser_no[0].split(".")[2] + ser_2 = ser_no[1].split(".")[2] + + # set attenuation to zero in first attenuator and high in second attenuator + self.attenuator_modify(ser_1, "all", 950) + self.attenuator_modify(ser_2, "all", 100) + else: + print("odd, c1 is already at highest and c2 is at lowest") + number = "odd" try: # define ro list per iteration row_list = [] @@ -628,7 +725,6 @@ class HardRoam(Realm): bssid_list = [] for sta_name in sta_list: sta = sta_name.split(".")[2] - time.sleep(5) bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list.append(bssid) print(bssid_list) @@ -646,13 +742,34 @@ class HardRoam(Realm): # for a iteration if client is not connected to same ap try to connect them print("giving a try to connect") print("move all clients to one ap") - for sta_name in sta_list: + count3 = bssid_list.count(self.c1_bssid.upper()) + count4 = bssid_list.count(self.c2_bssid.upper()) + print("count3", count3) + print("count4", count4) + checker, new_sta_list, checker2 = None, [], None + if count3 > count4: + print("station connected mostly to ap1") + checker = self.c2_bssid.upper() + checker2 = self.c1_bssid.upper() + else: + checker = self.c1_bssid.upper() + checker2 = self.c2_bssid.upper() + index_count = [i for i, x in enumerate(bssid_list) if x == checker] + print(index_count) + for i in index_count: + new_sta_list.append(sta_list[i]) + print("new_sta_list", new_sta_list) + # for i, x in zip(bssid_list, sta_list): + # if i == checker: + # index_count = bssid_list.index(checker) + # new_sta_list.append(sta_list[index_count]) + # print("new_sta_list", new_sta_list) + + for sta_name in new_sta_list: sta = sta_name.split(".")[2] print(sta) - if iter == 0: - wpa_cmd = "roam " + str(bssid_list[0]) - else: - wpa_cmd = "roam " + str(post_bssid) + wpa_cmd = "roam " + str(checker2) + wifi_cli_cmd_data1 = { "shelf": 1, "resource": 1, @@ -673,10 +790,7 @@ class HardRoam(Realm): # check bssid before before_bssid = [] for sta_name in sta_list: - # local_row_list = [0, "68"] - # local_row_list = [str(iter)] sta = sta_name.split(".")[2] - time.sleep(5) before_bss = self.station_data_query(station_name=str(sta), query="ap") print(before_bss) before_bssid.append(before_bss) @@ -714,63 +828,124 @@ class HardRoam(Realm): test_name="roam_11r_" + str(self.option) + "_iteration_" + str( iter) + "_", duration=3600) + if self.soft_roam: + ser_num = None + ser_num2 = None + if number == "even": + ser_num = ser_1 + ser_num2 = ser_2 + elif number == "odd": + ser_num = ser_2 + ser_num2 = ser_1 + # logic to decrease c2 attenuation till 10 db + status = None + for atten_val2 in range(900, 100, -10): + self.attenuator_modify(int(ser_num), "all", atten_val2) + print("wait for 30 secs") + time.sleep(30) + # query bssid's of all stations + bssid_check = [] + for sta_name in sta_list: + sta = sta_name.split(".")[2] + bssid = self.station_data_query(station_name=str(sta), query="ap") + bssid_check.append(bssid) + print(bssid_check) + # check if all are equal + resulta = all(element == bssid_check[0] for element in bssid_check) + if resulta: + station_after = bssid_check[0].lower() + if station_after == station_before: + status = "station did not roamed" + print("station did not roamed") + continue + elif station_after != station_before: + print("client performed roam") + break + + if status == "station did not roamed": + # set c1 to high + for atten_val1 in (range(150, 950, 10)): + print(atten_val1) + self.attenuator_modify(int(ser_num2), "all", atten_val1) + print("wait for 30 secs") + time.sleep(30) + bssid_check2 = [] + for sta_name in sta_list: + sta = sta_name.split(".")[2] + bssid = self.station_data_query(station_name=str(sta), + query="ap") + bssid_check2.append(bssid) + print(bssid_check2) + # check if all are equal + result = all(element == bssid_check2[0] for element in bssid_check2) + if result: + station_after = bssid_check2[0].lower() + if station_after == station_before: + status = "station did not roamed" + print("station did not roamed") + continue + else: + print('station roamed') + break - if station_before == self.final_bssid[0]: - print("connected stations bssid is same to bssid list first element") - for sta_name in sta_list: - sta = sta_name.split(".")[2] - print(sta) - wpa_cmd = "" - if self.option == "ota": - wpa_cmd = "roam " + str(self.final_bssid[1]) - if self.option == "otds": - wpa_cmd = "ft_ds " + str(self.final_bssid[1]) - # wpa_cmd = "roam " + str(self.final_bssid[1]) - wifi_cli_cmd_data1 = { - "shelf": 1, - "resource": 1, - "port": str(sta), - "wpa_cli_cmd": 'scan trigger freq 5180 5300' - } - wifi_cli_cmd_data = { - "shelf": 1, - "resource": 1, - "port": str(sta), - "wpa_cli_cmd": wpa_cmd - } - print(wifi_cli_cmd_data) - self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) - time.sleep(2) - self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) - time.sleep(2) else: - print("connected stations bssid is same to bssid list second element") - for sta_name in sta_list: - sta = sta_name.split(".")[2] - wifi_cmd = "" - if self.option == "ota": - wifi_cmd = "roam " + str(self.final_bssid[0]) - if self.option == "otds": - wifi_cmd = "ft_ds " + str(self.final_bssid[0]) - print(sta) - wifi_cli_cmd_data1 = { - "shelf": 1, - "resource": 1, - "port": str(sta), - "wpa_cli_cmd": 'scan trigger freq 5180 5300' - } - wifi_cli_cmd_data = { - "shelf": 1, - "resource": 1, - "port": str(sta), - "wpa_cli_cmd": wifi_cmd - } - print(wifi_cli_cmd_data) - self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) - time.sleep(2) - self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) - time.sleep(2) + if station_before == self.final_bssid[0]: + print("connected stations bssid is same to bssid list first element") + for sta_name in sta_list: + sta = sta_name.split(".")[2] + print(sta) + wpa_cmd = "" + if self.option == "ota": + wpa_cmd = "roam " + str(self.final_bssid[1]) + if self.option == "otds": + wpa_cmd = "ft_ds " + str(self.final_bssid[1]) + # wpa_cmd = "roam " + str(self.final_bssid[1]) + wifi_cli_cmd_data1 = { + "shelf": 1, + "resource": 1, + "port": str(sta), + "wpa_cli_cmd": 'scan trigger freq 5180 5300' + } + wifi_cli_cmd_data = { + "shelf": 1, + "resource": 1, + "port": str(sta), + "wpa_cli_cmd": wpa_cmd + } + print(wifi_cli_cmd_data) + self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) + time.sleep(2) + self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) + time.sleep(2) + + else: + print("connected stations bssid is same to bssid list second element") + for sta_name in sta_list: + sta = sta_name.split(".")[2] + wifi_cmd = "" + if self.option == "ota": + wifi_cmd = "roam " + str(self.final_bssid[0]) + if self.option == "otds": + wifi_cmd = "ft_ds " + str(self.final_bssid[0]) + print(sta) + wifi_cli_cmd_data1 = { + "shelf": 1, + "resource": 1, + "port": str(sta), + "wpa_cli_cmd": 'scan trigger freq 5180 5300' + } + wifi_cli_cmd_data = { + "shelf": 1, + "resource": 1, + "port": str(sta), + "wpa_cli_cmd": wifi_cmd + } + print(wifi_cli_cmd_data) + self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data1) + time.sleep(2) + self.local_realm.json_post("/cli-json/wifi_cli_cmd", wifi_cli_cmd_data) + time.sleep(2) # krnel logs kernel = self.journal_ctl_logs(file=str(iter)) @@ -793,7 +968,6 @@ class HardRoam(Realm): bssid_list_1 = [] for sta_name in sta_list: sta = sta_name.split(".")[2] - time.sleep(5) bssid = self.station_data_query(station_name=str(sta), query="ap") bssid_list_1.append(bssid) print(bssid_list_1) @@ -979,12 +1153,14 @@ class HardRoam(Realm): duration=3600) print("stop sniff") self.stop_sniffer() + kernel = self.journal_ctl_logs(file=str(iter)) + for i in kernel: + kernel_log.append(i) bssid_list2 = [] for sta_name in sta_list: # local_row_list = [0, "68"] local_row_list = [str(iter)] sta = sta_name.split(".")[2] - time.sleep(5) before_bssid_ = self.station_data_query(station_name=str(sta), query="ap") print(before_bssid_) bssid_list2.append(before_bssid_) @@ -1034,7 +1210,7 @@ class HardRoam(Realm): if time.time() > timeout: break except Exception as e: - print(e) + # print(e) pass else: