mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-03 04:07:52 +00:00
added comments
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
"""
|
"""
|
||||||
Note : please do not overwrite script under progress and is used for cisco
|
Note : lf_hard_roam_test works on forced roaming specific to 11r
|
||||||
|
- This script will give results of 11r forced roam test pdf along with all the packet captures genersted after the roam
|
||||||
|
- still under progress do not overwrite
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
@@ -76,7 +78,8 @@ class HardRoam(Realm):
|
|||||||
band_cc="5g",
|
band_cc="5g",
|
||||||
timeout="10",
|
timeout="10",
|
||||||
identity=None,
|
identity=None,
|
||||||
ttls_pass=None
|
ttls_pass=None,
|
||||||
|
debug=False
|
||||||
):
|
):
|
||||||
super().__init__(lanforge_ip,
|
super().__init__(lanforge_ip,
|
||||||
lanforge_port)
|
lanforge_port)
|
||||||
@@ -132,19 +135,24 @@ class HardRoam(Realm):
|
|||||||
self.end_time = None
|
self.end_time = None
|
||||||
self.identity = identity
|
self.identity = identity
|
||||||
self.ttls_pass = ttls_pass
|
self.ttls_pass = ttls_pass
|
||||||
|
self.debug = debug
|
||||||
|
self.mac_data = None
|
||||||
|
|
||||||
|
# start debugger of controller
|
||||||
def start_debug_(self, mac_list):
|
def start_debug_(self, mac_list):
|
||||||
mac = mac_list
|
mac = mac_list
|
||||||
for i in mac:
|
for i in mac:
|
||||||
y = self.cc.debug_wireless_mac_cc(mac=str(i))
|
y = self.cc.debug_wireless_mac_cc(mac=str(i))
|
||||||
print(y)
|
print(y)
|
||||||
|
|
||||||
|
# stop debugger of controller
|
||||||
def stop_debug_(self, mac_list):
|
def stop_debug_(self, mac_list):
|
||||||
mac = mac_list
|
mac = mac_list
|
||||||
for i in mac:
|
for i in mac:
|
||||||
y = self.cc.no_debug_wireless_mac_cc(mac=str(i))
|
y = self.cc.no_debug_wireless_mac_cc(mac=str(i))
|
||||||
print(y)
|
print(y)
|
||||||
|
|
||||||
|
# get trace file names from controller
|
||||||
def get_ra_trace_file(self):
|
def get_ra_trace_file(self):
|
||||||
ra = self.cc.get_ra_trace_files__cc()
|
ra = self.cc.get_ra_trace_files__cc()
|
||||||
print(ra)
|
print(ra)
|
||||||
@@ -152,39 +160,66 @@ class HardRoam(Realm):
|
|||||||
print(ele_list)
|
print(ele_list)
|
||||||
return ele_list
|
return ele_list
|
||||||
|
|
||||||
|
# get trace file names from controller with respect to number of clients
|
||||||
def get_file_name(self, client):
|
def get_file_name(self, client):
|
||||||
file = self.get_ra_trace_file()
|
|
||||||
indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s]
|
|
||||||
# print(indices)
|
|
||||||
y = indices[-1]
|
|
||||||
file_name = []
|
file_name = []
|
||||||
if client == 1:
|
if not self.debug:
|
||||||
z = file[y + 1]
|
|
||||||
list_ = []
|
|
||||||
list_.append(z)
|
|
||||||
m = list_[0].split(" ")
|
|
||||||
print(m)
|
|
||||||
print(len(m))
|
|
||||||
print(m[-1])
|
|
||||||
file_name.append(m[-1])
|
|
||||||
else:
|
|
||||||
for i in range(client):
|
for i in range(client):
|
||||||
z = file[y + (int(i)+1)]
|
file_name.append("debug disabled")
|
||||||
|
else:
|
||||||
|
file = self.get_ra_trace_file()
|
||||||
|
indices = [i for i, s in enumerate(file) if 'dir bootflash: | i ra_trace' in s]
|
||||||
|
# print(indices)
|
||||||
|
y = indices[-1]
|
||||||
|
|
||||||
|
if client == 1:
|
||||||
|
z = file[y + 1]
|
||||||
list_ = []
|
list_ = []
|
||||||
list_.append(z)
|
list_.append(z)
|
||||||
m = list_[0].split(" ")
|
m = list_[0].split(" ")
|
||||||
print(m)
|
print(m)
|
||||||
print(len(m))
|
print(len(m))
|
||||||
print(m[-1])
|
print(m[-1])
|
||||||
|
if m[-1].isnumeric():
|
||||||
|
print("log file not available")
|
||||||
|
file_name.append("file not found")
|
||||||
file_name.append(m[-1])
|
file_name.append(m[-1])
|
||||||
print("file_name", file_name)
|
else:
|
||||||
file_name.reverse()
|
z = file[y + (int(0) + 1)]
|
||||||
|
list_ = []
|
||||||
|
list_.append(z)
|
||||||
|
m = list_[0].split(" ")
|
||||||
|
print(m)
|
||||||
|
print(len(m))
|
||||||
|
print(m[-1])
|
||||||
|
if m[-1].isnumeric():
|
||||||
|
print("log file not available")
|
||||||
|
for i in range(client):
|
||||||
|
file_name.append("file not found")
|
||||||
|
else:
|
||||||
|
for i in range(client):
|
||||||
|
z = file[y + (int(i)+1)]
|
||||||
|
list_ = []
|
||||||
|
list_.append(z)
|
||||||
|
m = list_[0].split(" ")
|
||||||
|
print(m)
|
||||||
|
print(len(m))
|
||||||
|
print(m[-1])
|
||||||
|
if m[-1].isnumeric():
|
||||||
|
print("log file not available")
|
||||||
|
file_name.append("file not found")
|
||||||
|
file_name.append(m[-1])
|
||||||
|
print("file_name", file_name)
|
||||||
|
file_name.reverse()
|
||||||
|
|
||||||
return file_name
|
return file_name
|
||||||
|
|
||||||
|
# delete trace file from controller
|
||||||
def delete_trace_file(self, file):
|
def delete_trace_file(self, file):
|
||||||
# file = self.get_file_name()
|
# file = self.get_file_name()
|
||||||
self.cc.del_ra_trace_file_cc(file=file)
|
self.cc.del_ra_trace_file_cc(file=file)
|
||||||
|
|
||||||
|
# get station list from lf
|
||||||
def get_station_list(self):
|
def get_station_list(self):
|
||||||
sta = self.staConnect.station_list()
|
sta = self.staConnect.station_list()
|
||||||
if sta == "no response":
|
if sta == "no response":
|
||||||
@@ -195,6 +230,7 @@ class HardRoam(Realm):
|
|||||||
sta_list.append(j)
|
sta_list.append(j)
|
||||||
return sta_list
|
return sta_list
|
||||||
|
|
||||||
|
# create n number of clients of advanced configuration on lf
|
||||||
def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None,
|
def create_n_clients(self, start_id=0, sta_prefix=None, num_sta=None, dut_ssid=None,
|
||||||
dut_security=None, dut_passwd=None, radio=None, type=None):
|
dut_security=None, dut_passwd=None, radio=None, type=None):
|
||||||
|
|
||||||
@@ -294,6 +330,7 @@ class HardRoam(Realm):
|
|||||||
print("Stations failed to get IPs")
|
print("Stations failed to get IPs")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# create layer3 traffic on clients
|
||||||
def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu,
|
def create_layer3(self, side_a_min_rate, side_a_max_rate, side_b_min_rate, side_b_max_rate, side_a_min_pdu,
|
||||||
side_b_min_pdu,
|
side_b_min_pdu,
|
||||||
traffic_type, sta_list):
|
traffic_type, sta_list):
|
||||||
@@ -317,18 +354,21 @@ class HardRoam(Realm):
|
|||||||
side_b=self.upstream, sleep_time=0)
|
side_b=self.upstream, sleep_time=0)
|
||||||
self.cx_profile.start_cx()
|
self.cx_profile.start_cx()
|
||||||
|
|
||||||
|
# get layer3 values
|
||||||
def get_layer3_values(self, cx_name=None, query=None):
|
def get_layer3_values(self, cx_name=None, query=None):
|
||||||
url = f"/cx/{cx_name}"
|
url = f"/cx/{cx_name}"
|
||||||
response = self.json_get(_req_url=url)
|
response = self.json_get(_req_url=url)
|
||||||
result = response[str(cx_name)][str(query)]
|
result = response[str(cx_name)][str(query)]
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
# get cross connect names
|
||||||
def get_cx_list(self):
|
def get_cx_list(self):
|
||||||
layer3_result = self.local_realm.cx_list()
|
layer3_result = self.local_realm.cx_list()
|
||||||
layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item]
|
layer3_names = [item["name"] for item in layer3_result.values() if "_links" in item]
|
||||||
print(layer3_names)
|
print(layer3_names)
|
||||||
return layer3_names
|
return layer3_names
|
||||||
|
|
||||||
|
# get endpoint values
|
||||||
def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"):
|
def get_endp_values(self, endp="A", cx_name="niki", query="tx bytes"):
|
||||||
# self.get_cx_list()
|
# self.get_cx_list()
|
||||||
# self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate")
|
# self.json_get("http://192.168.100.131:8080/endp/Unsetwlan000-0-B?fields=rx%20rate")
|
||||||
@@ -342,6 +382,7 @@ class HardRoam(Realm):
|
|||||||
print(final)
|
print(final)
|
||||||
return final
|
return final
|
||||||
|
|
||||||
|
# precleanup on lanforge
|
||||||
def precleanup(self):
|
def precleanup(self):
|
||||||
obj = lf_clean.lf_clean(host=self.lanforge_ip,
|
obj = lf_clean.lf_clean(host=self.lanforge_ip,
|
||||||
port=self.lanforge_port,
|
port=self.lanforge_port,
|
||||||
@@ -351,6 +392,7 @@ class HardRoam(Realm):
|
|||||||
obj.cxs_clean()
|
obj.cxs_clean()
|
||||||
obj.endp_clean()
|
obj.endp_clean()
|
||||||
|
|
||||||
|
# get client data from lf
|
||||||
def station_data_query(self, station_name="wlan0", query="channel"):
|
def station_data_query(self, station_name="wlan0", query="channel"):
|
||||||
url = f"/port/{1}/{1}/{station_name}?fields={query}"
|
url = f"/port/{1}/{1}/{station_name}?fields={query}"
|
||||||
# print("url//////", url)
|
# print("url//////", url)
|
||||||
@@ -363,6 +405,7 @@ class HardRoam(Realm):
|
|||||||
y = response["interface"][query]
|
y = response["interface"][query]
|
||||||
return y
|
return y
|
||||||
|
|
||||||
|
# start packet capture on lf
|
||||||
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
|
def start_sniffer(self, radio_channel=None, radio=None, test_name="sniff_radio", duration=60):
|
||||||
self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
|
self.pcap_name = test_name + str(datetime.now().strftime("%Y-%m-%d-%H-%M")).replace(':', '-') + ".pcap"
|
||||||
self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port,
|
self.pcap_obj_2 = sniff_radio.SniffRadio(lfclient_host=self.lanforge_ip, lfclient_port=self.lanforge_port,
|
||||||
@@ -373,6 +416,7 @@ class HardRoam(Realm):
|
|||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
self.pcap_obj_2.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration)
|
self.pcap_obj_2.monitor.start_sniff(capname=self.pcap_name, duration_sec=duration)
|
||||||
|
|
||||||
|
# stop packet capture and get file name
|
||||||
def stop_sniffer(self):
|
def stop_sniffer(self):
|
||||||
directory = None
|
directory = None
|
||||||
directory_name = "pcap"
|
directory_name = "pcap"
|
||||||
@@ -396,6 +440,7 @@ class HardRoam(Realm):
|
|||||||
|
|
||||||
return self.pcap_name
|
return self.pcap_name
|
||||||
|
|
||||||
|
# generate csv files at the begining
|
||||||
def generate_csv(self):
|
def generate_csv(self):
|
||||||
file_name = []
|
file_name = []
|
||||||
for i in range(self.num_sta):
|
for i in range(self.num_sta):
|
||||||
@@ -407,6 +452,7 @@ class HardRoam(Realm):
|
|||||||
lf_csv_obj.generate_csv()
|
lf_csv_obj.generate_csv()
|
||||||
return file_name
|
return file_name
|
||||||
|
|
||||||
|
# get journal ctl logs/ kernel logs
|
||||||
def journal_ctl_logs(self, file):
|
def journal_ctl_logs(self, file):
|
||||||
jor_lst = []
|
jor_lst = []
|
||||||
name = "kernel_log" + file + ".txt"
|
name = "kernel_log" + file + ".txt"
|
||||||
@@ -426,23 +472,28 @@ class HardRoam(Realm):
|
|||||||
print(e)
|
print(e)
|
||||||
return jor_lst
|
return jor_lst
|
||||||
|
|
||||||
|
# gives wlan management status of pcap file
|
||||||
def get_wlan_mgt_status(self, file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55)"):
|
def get_wlan_mgt_status(self, file_name, filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55)"):
|
||||||
query_reasso_response = self.pcap_obj.get_wlan_mgt_status_code(pcap_file=str(file_name),
|
query_reasso_response = self.pcap_obj.get_wlan_mgt_status_code(pcap_file=str(file_name),
|
||||||
filter=filter)
|
filter=filter)
|
||||||
print("query", query_reasso_response)
|
print("query", query_reasso_response)
|
||||||
return query_reasso_response
|
return query_reasso_response
|
||||||
|
|
||||||
|
# main roam fun
|
||||||
def run(self, file_n=None):
|
def run(self, file_n=None):
|
||||||
kernel_log = []
|
kernel_log = []
|
||||||
|
# start timer
|
||||||
test_time = datetime.now()
|
test_time = datetime.now()
|
||||||
test_time = test_time.strftime("%b %d %H:%M:%S")
|
test_time = test_time.strftime("%b %d %H:%M:%S")
|
||||||
print("Test started at ", test_time)
|
print("Test started at ", test_time)
|
||||||
self.start_time = test_time
|
self.start_time = test_time
|
||||||
|
# get two bssids for roam
|
||||||
self.final_bssid.extend([self.c1_bssid, self.c2_bssid])
|
self.final_bssid.extend([self.c1_bssid, self.c2_bssid])
|
||||||
print("final bssid", self.final_bssid)
|
print("final bssid", self.final_bssid)
|
||||||
self.precleanup()
|
self.precleanup()
|
||||||
|
|
||||||
message = None, None
|
message = None, None
|
||||||
|
|
||||||
|
# create clients with respect to bands
|
||||||
if self.band == "twog":
|
if self.band == "twog":
|
||||||
self.local_realm.reset_port(self.twog_radios)
|
self.local_realm.reset_port(self.twog_radios)
|
||||||
self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name,
|
self.create_n_clients(sta_prefix="wlan1", num_sta=self.num_sta, dut_ssid=self.ssid_name,
|
||||||
@@ -462,7 +513,7 @@ class HardRoam(Realm):
|
|||||||
|
|
||||||
# check if all stations have ip
|
# check if all stations have ip
|
||||||
sta_list = self.get_station_list()
|
sta_list = self.get_station_list()
|
||||||
print(sta_list)
|
print("station list", sta_list)
|
||||||
if sta_list == "no response":
|
if sta_list == "no response":
|
||||||
print("no response")
|
print("no response")
|
||||||
else:
|
else:
|
||||||
@@ -470,14 +521,19 @@ class HardRoam(Realm):
|
|||||||
mac_list = []
|
mac_list = []
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
sta = sta_name.split(".")[2]
|
sta = sta_name.split(".")[2]
|
||||||
time.sleep(5)
|
time.sleep(0.5)
|
||||||
mac = self.station_data_query(station_name=str(sta), query="mac")
|
mac = self.station_data_query(station_name=str(sta), query="mac")
|
||||||
mac_list.append(mac)
|
mac_list.append(mac)
|
||||||
print(mac_list)
|
print("mac address list of all stations", mac_list)
|
||||||
|
self.mac_data = mac_list
|
||||||
|
|
||||||
|
# if all stations got ip check mac address
|
||||||
if val:
|
if val:
|
||||||
print("all stations got ip")
|
print("all stations got ip")
|
||||||
print("check if all tations are conncted one ap")
|
print("check if all stations are connected one ap")
|
||||||
|
|
||||||
|
# get bssids of all stations
|
||||||
|
print("get bssids of all stations")
|
||||||
check = []
|
check = []
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
sta = sta_name.split(".")[2]
|
sta = sta_name.split(".")[2]
|
||||||
@@ -488,14 +544,18 @@ class HardRoam(Realm):
|
|||||||
print(check)
|
print(check)
|
||||||
|
|
||||||
# check if all element of bssid list has same bssid's
|
# check if all element of bssid list has same bssid's
|
||||||
|
print("check if all bssids are same or not")
|
||||||
result = all(element == check[0] for element in check)
|
result = all(element == check[0] for element in check)
|
||||||
|
# if yes
|
||||||
if result:
|
if result:
|
||||||
self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000,
|
self.create_layer3(side_a_min_rate=1000000, side_a_max_rate=0, side_b_min_rate=1000000,
|
||||||
side_b_max_rate=0,
|
side_b_max_rate=0,
|
||||||
sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250,
|
sta_list=sta_list, traffic_type=self.traffic_type, side_a_min_pdu=1250,
|
||||||
side_b_min_pdu=1250)
|
side_b_min_pdu=1250)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print("move all clients to one ap")
|
# if bssid's are not same move try to move all clients to one ap
|
||||||
|
print("try to move all clients to one ap before roam ")
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
sta = sta_name.split(".")[2]
|
sta = sta_name.split(".")[2]
|
||||||
print(sta)
|
print(sta)
|
||||||
@@ -532,7 +592,8 @@ class HardRoam(Realm):
|
|||||||
if self.iteration_based:
|
if self.iteration_based:
|
||||||
variable = self.iteration
|
variable = self.iteration
|
||||||
iterable_var = self.iteration
|
iterable_var = self.iteration
|
||||||
|
post_bssid = None
|
||||||
|
# roam iteration loop starts here
|
||||||
while variable:
|
while variable:
|
||||||
print("variable", variable)
|
print("variable", variable)
|
||||||
iter = None
|
iter = None
|
||||||
@@ -558,8 +619,9 @@ class HardRoam(Realm):
|
|||||||
else:
|
else:
|
||||||
station = self.wait_for_ip(sta_list)
|
station = self.wait_for_ip(sta_list)
|
||||||
time.sleep(20)
|
time.sleep(20)
|
||||||
print("start debug")
|
if self.debug:
|
||||||
self.start_debug_(mac_list=mac_list)
|
print("start debug")
|
||||||
|
self.start_debug_(mac_list=mac_list)
|
||||||
if station:
|
if station:
|
||||||
print("all stations got ip")
|
print("all stations got ip")
|
||||||
# get bssid's of all stations connected
|
# get bssid's of all stations connected
|
||||||
@@ -570,18 +632,6 @@ class HardRoam(Realm):
|
|||||||
bssid = self.station_data_query(station_name=str(sta), query="ap")
|
bssid = self.station_data_query(station_name=str(sta), query="ap")
|
||||||
bssid_list.append(bssid)
|
bssid_list.append(bssid)
|
||||||
print(bssid_list)
|
print(bssid_list)
|
||||||
|
|
||||||
# for sta_name in sta_list:
|
|
||||||
# # local_row_list = [0, "68"]
|
|
||||||
# local_row_list = [str(iter)]
|
|
||||||
# sta = sta_name.split(".")[2]
|
|
||||||
# time.sleep(5)
|
|
||||||
# before_bssid = self.station_data_query(station_name=str(sta), query="ap")
|
|
||||||
# print(before_bssid)
|
|
||||||
# local_row_list.append(before_bssid)
|
|
||||||
# print(local_row_list)
|
|
||||||
# row_list.append(local_row_list)
|
|
||||||
# print(row_list)
|
|
||||||
pass_fail_list = []
|
pass_fail_list = []
|
||||||
pcap_file_list = []
|
pcap_file_list = []
|
||||||
roam_time1 = []
|
roam_time1 = []
|
||||||
@@ -593,12 +643,16 @@ class HardRoam(Realm):
|
|||||||
result = all(element == bssid_list[0] for element in bssid_list)
|
result = all(element == bssid_list[0] for element in bssid_list)
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
|
# for a iteration if client is not connected to same ap try to connect them
|
||||||
print("giving a try to connect")
|
print("giving a try to connect")
|
||||||
print("move all clients to one ap")
|
print("move all clients to one ap")
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
sta = sta_name.split(".")[2]
|
sta = sta_name.split(".")[2]
|
||||||
print(sta)
|
print(sta)
|
||||||
wpa_cmd = "roam " + str(bssid_list[0])
|
if iter == 0:
|
||||||
|
wpa_cmd = "roam " + str(bssid_list[0])
|
||||||
|
else:
|
||||||
|
wpa_cmd = "roam " + str(post_bssid)
|
||||||
wifi_cli_cmd_data1 = {
|
wifi_cli_cmd_data1 = {
|
||||||
"shelf": 1,
|
"shelf": 1,
|
||||||
"resource": 1,
|
"resource": 1,
|
||||||
@@ -627,10 +681,12 @@ class HardRoam(Realm):
|
|||||||
print(before_bss)
|
print(before_bss)
|
||||||
before_bssid.append(before_bss)
|
before_bssid.append(before_bss)
|
||||||
print("bssid 1", before_bssid)
|
print("bssid 1", before_bssid)
|
||||||
# local_row_list.append(before_bss)
|
|
||||||
# print(local_row_list)
|
if before_bssid[0] == str(self.c1_bssid.upper()):
|
||||||
# row_list.append(local_row_list)
|
post_bssid = self.c2_bssid.upper()
|
||||||
# print(row_list)
|
else:
|
||||||
|
post_bssid = self.c1_bssid.upper()
|
||||||
|
print("station are connected to post bssid", post_bssid)
|
||||||
result1 = all(element == before_bssid[0] for element in before_bssid)
|
result1 = all(element == before_bssid[0] for element in before_bssid)
|
||||||
|
|
||||||
|
|
||||||
@@ -726,35 +782,13 @@ class HardRoam(Realm):
|
|||||||
file_name_ = self.stop_sniffer()
|
file_name_ = self.stop_sniffer()
|
||||||
file_name = "./pcap/" + str(file_name_)
|
file_name = "./pcap/" + str(file_name_)
|
||||||
print("pcap file name", file_name)
|
print("pcap file name", file_name)
|
||||||
|
if self.debug:
|
||||||
|
print("stop debugger")
|
||||||
|
self.stop_debug_(mac_list=mac_list)
|
||||||
|
time.sleep(40)
|
||||||
|
else:
|
||||||
|
print("debug is disabled")
|
||||||
|
|
||||||
self.stop_debug_(mac_list=mac_list)
|
|
||||||
|
|
||||||
time.sleep(60)
|
|
||||||
trace = self.get_file_name(client=self.num_sta)
|
|
||||||
log_file.append(trace)
|
|
||||||
|
|
||||||
# cx_list = self.get_cx_list()
|
|
||||||
# print("quiece layer3")
|
|
||||||
# self.local_realm.drain_stop_cx(cx_name=cx_list[0])
|
|
||||||
# time.sleep(10)
|
|
||||||
# self.cx_profile.start_cx()
|
|
||||||
# time.sleep(10)
|
|
||||||
#
|
|
||||||
# print("quiece layer3")
|
|
||||||
#
|
|
||||||
# cx_list = self.get_cx_list()
|
|
||||||
# print(cx_list)
|
|
||||||
# self.local_realm.drain_stop_cx(cx_name=cx_list[0])
|
|
||||||
# time.sleep(30)
|
|
||||||
# tx_b = self.get_endp_values(cx_name=cx_list[0], query="tx bytes", endp="B")
|
|
||||||
# rx_a = self.get_endp_values(cx_name=cx_list[0], query="rx bytes", endp="A")
|
|
||||||
# packet_loss = int(tx_b) - int(rx_a)
|
|
||||||
# print(packet_loss)
|
|
||||||
# packet_loss_lst.append(packet_loss)
|
|
||||||
# print("start cx again")
|
|
||||||
# self.cx_profile.start_cx()
|
|
||||||
|
|
||||||
time.sleep(40)
|
|
||||||
self.wait_for_ip(sta_list)
|
self.wait_for_ip(sta_list)
|
||||||
bssid_list_1 = []
|
bssid_list_1 = []
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
@@ -766,13 +800,17 @@ class HardRoam(Realm):
|
|||||||
for i, x in zip(row_list, bssid_list_1):
|
for i, x in zip(row_list, bssid_list_1):
|
||||||
i.append(x)
|
i.append(x)
|
||||||
print("row list", row_list)
|
print("row list", row_list)
|
||||||
|
time.sleep(60)
|
||||||
|
trace = self.get_file_name(client=self.num_sta)
|
||||||
|
log_file.append(trace)
|
||||||
# check if all are equal
|
# check if all are equal
|
||||||
result = all(element == bssid_list_1[0] for element in bssid_list_1)
|
all(element == bssid_list_1[0] for element in bssid_list_1)
|
||||||
|
|
||||||
res = ""
|
res = ""
|
||||||
station_before_ = before_bssid
|
station_before_ = before_bssid
|
||||||
print("station before", station_before_)
|
print("station before", station_before_)
|
||||||
for i,x in zip( mac_list, range(len(station_before_))):
|
# for each mac address query data from pcap
|
||||||
|
for i, x in zip(mac_list, range(len(station_before_))):
|
||||||
print("mac ", i)
|
print("mac ", i)
|
||||||
print(x)
|
print(x)
|
||||||
print(bssid_list_1)
|
print(bssid_list_1)
|
||||||
@@ -786,12 +824,7 @@ class HardRoam(Realm):
|
|||||||
res = "PASS"
|
res = "PASS"
|
||||||
if res == "FAIL":
|
if res == "FAIL":
|
||||||
res = "FAIL"
|
res = "FAIL"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if res == "PASS":
|
if res == "PASS":
|
||||||
|
|
||||||
query_reasso_response = self.get_wlan_mgt_status(file_name=file_name,
|
query_reasso_response = self.get_wlan_mgt_status(file_name=file_name,
|
||||||
filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % (
|
filter="(wlan.fc.type_subtype eq 3 && wlan.fixed.status_code == 0x0000 && wlan.tag.number == 55) && (wlan.da == %s)" % (
|
||||||
str(i)))
|
str(i)))
|
||||||
@@ -939,7 +972,13 @@ class HardRoam(Realm):
|
|||||||
|
|
||||||
message = "all stations are not connected to same ap for iteration " + str(iter)
|
message = "all stations are not connected to same ap for iteration " + str(iter)
|
||||||
print("all stations are not connected to same ap")
|
print("all stations are not connected to same ap")
|
||||||
|
print("starting snifer")
|
||||||
|
self.start_sniffer(radio_channel=int(self.channel), radio=self.sniff_radio,
|
||||||
|
test_name="roam_11r_" + str(self.option) + "_iteration_" + str(
|
||||||
|
iter) + "_",
|
||||||
|
duration=3600)
|
||||||
|
print("stop sniff")
|
||||||
|
self.stop_sniffer()
|
||||||
bssid_list2 = []
|
bssid_list2 = []
|
||||||
for sta_name in sta_list:
|
for sta_name in sta_list:
|
||||||
# local_row_list = [0, "68"]
|
# local_row_list = [0, "68"]
|
||||||
@@ -966,7 +1005,13 @@ class HardRoam(Realm):
|
|||||||
for i in row_list:
|
for i in row_list:
|
||||||
i.append("N/A")
|
i.append("N/A")
|
||||||
print("row list", row_list)
|
print("row list", row_list)
|
||||||
self.stop_debug_(mac_list=mac_list)
|
if self.debug:
|
||||||
|
print("stop debugger")
|
||||||
|
self.stop_debug_(mac_list=mac_list)
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("debug is disabled")
|
||||||
|
|
||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
trace = self.get_file_name(client=self.num_sta)
|
trace = self.get_file_name(client=self.num_sta)
|
||||||
log_file.append(trace)
|
log_file.append(trace)
|
||||||
@@ -982,7 +1027,6 @@ class HardRoam(Realm):
|
|||||||
for i, x in zip(file_n, row_list):
|
for i, x in zip(file_n, row_list):
|
||||||
self.lf_csv_obj.open_csv_append(fields=x, name=i)
|
self.lf_csv_obj.open_csv_append(fields=x, name=i)
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message = "station's failed to get ip after the test start"
|
message = "station's failed to get ip after the test start"
|
||||||
print("station's failed to get ip after test starts")
|
print("station's failed to get ip after test starts")
|
||||||
@@ -990,11 +1034,14 @@ class HardRoam(Realm):
|
|||||||
if time.time() > timeout:
|
if time.time() > timeout:
|
||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
print(e)
|
||||||
|
pass
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message = "station's failed to get ip at the begining"
|
message = "station's failed to get ip at the begining"
|
||||||
print("station's failed to get associate at the begining")
|
print("station's failed to get associate at the begining")
|
||||||
|
else:
|
||||||
|
print("stations failed to get ip")
|
||||||
|
|
||||||
test_end = datetime.now()
|
test_end = datetime.now()
|
||||||
test_end = test_end.strftime("%b %d %H:%M:%S")
|
test_end = test_end.strftime("%b %d %H:%M:%S")
|
||||||
@@ -1006,6 +1053,7 @@ class HardRoam(Realm):
|
|||||||
self.test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
|
self.test_duration = datetime.strptime(s2, FMT) - datetime.strptime(s1, FMT)
|
||||||
return kernel_log, message
|
return kernel_log, message
|
||||||
|
|
||||||
|
# graph function
|
||||||
def generate_client_pass_fail_graph(self, csv_list=None):
|
def generate_client_pass_fail_graph(self, csv_list=None):
|
||||||
print("csv_list", csv_list)
|
print("csv_list", csv_list)
|
||||||
x_axis_category = []
|
x_axis_category = []
|
||||||
@@ -1049,6 +1097,7 @@ class HardRoam(Realm):
|
|||||||
print("graph name {}".format(graph_png))
|
print("graph name {}".format(graph_png))
|
||||||
return graph_png
|
return graph_png
|
||||||
|
|
||||||
|
# report generation function
|
||||||
def generate_report(self, csv_list, kernel_lst, current_path=None):
|
def generate_report(self, csv_list, kernel_lst, current_path=None):
|
||||||
report = lf_report_pdf.lf_report(_path= "", _results_dir_name="Hard Roam Test", _output_html="hard_roam.html",
|
report = lf_report_pdf.lf_report(_path= "", _results_dir_name="Hard Roam Test", _output_html="hard_roam.html",
|
||||||
_output_pdf="Hard_roam_test.pdf")
|
_output_pdf="Hard_roam_test.pdf")
|
||||||
@@ -1099,7 +1148,7 @@ class HardRoam(Realm):
|
|||||||
# report.set_table_title("Client information " + str(i))
|
# report.set_table_title("Client information " + str(i))
|
||||||
# report.build_table_title()
|
# report.build_table_title()
|
||||||
report.set_obj_html("Client " + str(i+1) + " Information", "This Table gives detailed information "
|
report.set_obj_html("Client " + str(i+1) + " Information", "This Table gives detailed information "
|
||||||
"of client " + str(i+1) + " like the bssid it was before roam," +
|
"of client " + str(i+1) + " like the bssid it was before roam," +
|
||||||
" bssid it was after roam, " +
|
" bssid it was after roam, " +
|
||||||
"roam time, capture file name and ra_trace file name along with remarks ")
|
"roam time, capture file name and ra_trace file name along with remarks ")
|
||||||
|
|
||||||
@@ -1139,6 +1188,7 @@ class HardRoam(Realm):
|
|||||||
"iterations": self.iteration,
|
"iterations": self.iteration,
|
||||||
"SSID": self.ssid_name,
|
"SSID": self.ssid_name,
|
||||||
"Security": self.security,
|
"Security": self.security,
|
||||||
|
"Client mac": self.mac_data,
|
||||||
"Contact": "support@candelatech.com"
|
"Contact": "support@candelatech.com"
|
||||||
}
|
}
|
||||||
report.set_table_title("Test basic Information")
|
report.set_table_title("Test basic Information")
|
||||||
|
|||||||
Reference in New Issue
Block a user