Improved testing functionality

This commit is contained in:
LoganLipke
2020-10-12 15:24:11 -07:00
parent 78bad7d526
commit b0ffdb4e56

View File

@@ -231,33 +231,6 @@ class TIPStationPowersave(LFCliBase):
cx_rx_map[item] = value_rx
return cx_rx_map
def parse_text_dump(self, station):
dump_stats = {
"subtypes": {},
"macs": {},
"ps_vals_true": 0
}
with open("%s/%s.txt" % (self.pcap_save_path, station)) as file:
for line in file:
line_split = line.rstrip().split('\t')
subtype = line_split[0]
macs = line_split[1].split(',')
ps_val = line_split[2]
if subtype not in dump_stats['subtypes']:
dump_stats['subtypes'][subtype] = 1
else:
dump_stats['subtypes'][subtype] += 1
for mac in macs:
if mac != 'ff:ff:ff:ff:ff:ff':
if mac not in dump_stats['macs']:
dump_stats['macs'][mac] = 1
else:
dump_stats['macs'][mac] += 1
if int(ps_val) > 0:
dump_stats['ps_vals_true'] += 1
return dump_stats
def start(self, print_pass=False, print_fail=False):
"""
This method is intended to start the monitor, the normal station (without powersave),
@@ -352,33 +325,44 @@ class TIPStationPowersave(LFCliBase):
for eid,record in self.sta_mac_map.items():
interesting_macs[record['alias']] = [record['mac'], record['ap']]
# print(interesting_macs)
# mac_str = ""
# for mac in interesting_macs.keys():
# mac_str += "wlan.addr==%s or " % mac
# mac_str = mac_str[:-3]
results = {}
for station, info in interesting_macs.items():
assoc_filter = self.packet_filter.get_filter_wlan_assoc_packets(ap_mac=info[1], sta_mac=info[0])
null_filter = self.packet_filter.get_filter_wlan_null_packets(ap_mac=info[1], sta_mac=info[0])
if station not in self.normal_sta_list:
assoc_filter = self.packet_filter.get_filter_wlan_assoc_packets(ap_mac=info[1], sta_mac=info[0])
null_filter = self.packet_filter.get_filter_wlan_null_packets(ap_mac=info[1], sta_mac=info[0])
# print(assoc_filter)
# print(null_filter)
assoc_filter_results = self.packet_filter.run_filter(pcap_file=self.pcap_file, filter=assoc_filter)
null_filter_results = self.packet_filter.run_filter(pcap_file=self.pcap_file, filter=null_filter)
results[station] = [assoc_filter_results, null_filter_results]
print("\t%s %s" % (station, info[0]))
print("\t%s\n" % assoc_filter, self.packet_filter.run_filter(pcap_file=self.pcap_file, filter=assoc_filter))
print("\t%s\n" % null_filter, self.packet_filter.run_filter(pcap_file=self.pcap_file, filter=null_filter), "\n")
# mac_str = "wlan.addr==%s or wlan.addr==%s" % (info[0], info[1])
# tshark_filter = "tshark -r " + self.pcap_file + " -T fields -e wlan.fc.type_subtype -e wlan.addr -e " \
# "wlan.fc.pwrmgt " + mac_str
# # now check for the pcap file we just created
# print("TSHARK COMMAND: %s " % station + tshark_filter)
# os.system("%s > %s/%s.txt"%(tshark_filter, self.pcap_save_path, station))
# print(station)
# pprint.pprint(self.parse_text_dump(station))
# print("\t%s %s" % (station, info[0]))
# print("\t%s\n" % assoc_filter, assoc_filter_results)
# print("\t%s\n" % null_filter, null_filter_results, "\n")
total_fail = 0
total_pass = 0
for station,filters in results.items():
for filter in filters:
fail_count = 0
pass_count = 0
for line in filter:
# print(station, "LINE = ", line)
line_split = line.rstrip().split('\t')
ps_val = line_split[2]
if int(ps_val):
pass_count += 1
else:
fail_count += 1
if fail_count == len(filter) or pass_count < len(filter):
total_fail += 1
else:
total_pass += 1
# print(len(filter),"%s:%s, %s:%s" % (fail_count, total_fail, total_pass, pass_count))
print("%d stations had powersave flag set" % total_pass)
if total_fail >= total_pass:
self._fail("%d station(s) did not have powersave flag set" % total_fail)
self._fail("not done writing pcap logic", print_=True)
exit(1)
@@ -396,9 +380,9 @@ class TIPStationPowersave(LFCliBase):
def main():
lfjson_host = "localhost"
lfjson_port = 8080
#station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=4, padding_number_=10000)
#station_list = LFUtils.portNameSeries(prefix_="sta", start_id_=0, end_id_=4, padding_number_=10000)
normal_station_list = ["sta1000" ]
powersave_station_list = ["sta0001"] #,"sta0002","sta0003","sta0004"]
powersave_station_list = ["sta0001","sta0002","sta0003","sta0004"]
ip_powersave_test = TIPStationPowersave(lfjson_host, lfjson_port,
ssid="jedway-open-149",
password="[BLANK]",