From 1b87a6926be1133e6deb7d9894b9e266486edfbf Mon Sep 17 00:00:00 2001 From: Dipti Date: Fri, 5 Mar 2021 23:38:40 -0800 Subject: [PATCH] v1 classes are taken out of realm.py, into their own files Signed-off-by: Dipti --- py-json/LANforge/lfcli_base.py | 26 + py-json/http_profile.py | 2 + py-json/l4_cxprofile.py | 5 +- py-json/multicast_profile.py | 17 +- py-json/realm.py | 2360 -------------------------------- 5 files changed, 39 insertions(+), 2371 deletions(-) diff --git a/py-json/LANforge/lfcli_base.py b/py-json/LANforge/lfcli_base.py index 905a5e89..42279b30 100644 --- a/py-json/LANforge/lfcli_base.py +++ b/py-json/LANforge/lfcli_base.py @@ -490,6 +490,32 @@ class LFCliBase: logging.warning(mesg) elif level == "error": logging.error(mesg) + + @staticmethod + def parse_time(time_string): + if isinstance(time_string, str): + pattern = re.compile("^(\d+)([dhms]$)") + td = pattern.match(time_string) + if td is not None: + dur_time = int(td.group(1)) + dur_measure = str(td.group(2)) + if dur_measure == "d": + duration_time = datetime.timedelta(days=dur_time) + elif dur_measure == "h": + duration_time = datetime.timedelta(hours=dur_time) + elif dur_measure == "m": + duration_time = datetime.timedelta(minutes=dur_time) + elif dur_measure == "ms": + duration_time = datetime.timedelta(milliseconds=dur_time) + elif dur_measure == "w": + duration_time = datetime.timedelta(weeks=dur_time) + else: + duration_time = datetime.timedelta(seconds=dur_time) + else: + raise ValueError("Cannot compute time string provided: %s" % time_string) + else: + raise ValueError("time_string must be of type str. Type %s provided" % type(time_string)) + return duration_time # This style of Action subclass for argparse can't do much unless we incorporate # our argparse as a member of LFCliBase. Then we can do something like automatically diff --git a/py-json/http_profile.py b/py-json/http_profile.py index d26e0246..86524c96 100644 --- a/py-json/http_profile.py +++ b/py-json/http_profile.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 from LANforge.lfcli_base import LFCliBase +import port_utils +from port_utils import PortUtils from pprint import pprint import pprint import time diff --git a/py-json/l4_cxprofile.py b/py-json/l4_cxprofile.py index 66c61406..69c63423 100644 --- a/py-json/l4_cxprofile.py +++ b/py-json/l4_cxprofile.py @@ -3,9 +3,12 @@ import re from LANforge.lfcli_base import LFCliBase from pprint import pprint +import requests import pprint +import pandas as pd import time import datetime +import ast import csv import os @@ -158,7 +161,7 @@ class L4CXProfile(LFCliBase): iterations=0, debug=False): try: - duration_sec = Realm.parse_time(duration_sec).seconds + duration_sec = LFCliBase.parse_time(duration_sec).seconds except: if (duration_sec is None) or (duration_sec <= 1): raise ValueError("L4CXProfile::monitor wants duration_sec > 1 second") diff --git a/py-json/multicast_profile.py b/py-json/multicast_profile.py index 947cf4f8..40c585f7 100644 --- a/py-json/multicast_profile.py +++ b/py-json/multicast_profile.py @@ -1,18 +1,15 @@ #!/usr/bin/env python3 -import re -import time -import pprint + + + from LANforge.lfcli_base import LFCliBase -import csv -import pandas as pd -import os from pprint import pprint -import time -import random -import string -import datetime +import pprint + + + class MULTICASTProfile(LFCliBase): diff --git a/py-json/realm.py b/py-json/realm.py index c269f2ac..416450b8 100755 --- a/py-json/realm.py +++ b/py-json/realm.py @@ -696,31 +696,6 @@ class Realm(LFCliBase): raise ValueError("time_string must be of type str. Type %s provided" % type(time_string)) return duration_sec - @staticmethod - def parse_time(time_string): - if isinstance(time_string, str): - pattern = re.compile("^(\d+)([dhms]$)") - td = pattern.match(time_string) - if td is not None: - dur_time = int(td.group(1)) - dur_measure = str(td.group(2)) - if dur_measure == "d": - duration_time = datetime.timedelta(days=dur_time) - elif dur_measure == "h": - duration_time = datetime.timedelta(hours=dur_time) - elif dur_measure == "m": - duration_time = datetime.timedelta(minutes=dur_time) - elif dur_measure == "ms": - duration_time = datetime.timedelta(milliseconds=dur_time) - elif dur_measure == "w": - duration_time = datetime.timedelta(weeks=dur_time) - else: - duration_time = datetime.timedelta(seconds=dur_time) - else: - raise ValueError("Cannot compute time string provided: %s" % time_string) - else: - raise ValueError("time_string must be of type str. Type %s provided" % type(time_string)) - return duration_time def remove_all_stations(self, resource): port_list = self.station_list() @@ -796,7 +771,6 @@ class Realm(LFCliBase): def new_station_profile(self, ver = 1): if ver == 1: - #import station_profile station_prof = StationProfile(self.lfclient_url, local_realm=self, debug_=self.debug, up=False) #elif ver == 2: # import station_profile2 @@ -805,7 +779,6 @@ class Realm(LFCliBase): def new_multicast_profile(self, ver = 1): if ver == 1: - #import multicast_profile multi_prof = MULTICASTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug, report_timer_=3000) #elif ver == 2: @@ -816,7 +789,6 @@ class Realm(LFCliBase): def new_wifi_monitor_profile(self, resource_=1, debug_=False, up_=False, ver = 1): if ver == 1: - #import wifi_monitor_profile wifi_mon_prof = WifiMonitor(self.lfclient_url, local_realm=self, resource_=resource_, @@ -850,7 +822,6 @@ class Realm(LFCliBase): def new_l4_cx_profile(self, ver=1): if ver == 1 : - #import l4_cxprofile cx_prof = L4CXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) #elif ver == 2: # import l4_cxprofile2 @@ -863,7 +834,6 @@ class Realm(LFCliBase): def new_generic_endp_profile(self, ver=1): if ver == 1 : - #import gen_cxprofile endp_prof = GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) #elif ver == 2: # import gen_cxprofile2 @@ -880,7 +850,6 @@ class Realm(LFCliBase): :return: new GenCXProfile """ if ver == 1: - #import gen_cxprofile cx_prof = GenCXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) #elif ver == 2: # import gen_cxprofile2 @@ -893,7 +862,6 @@ class Realm(LFCliBase): def new_vap_profile(self, ver=1): if ver == 1: - #import vap_profile vap_prof = VAPProfile(lfclient_host=self.lfclient_host, lfclient_port=self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: @@ -911,7 +879,6 @@ class Realm(LFCliBase): def new_http_profile(self, ver = 1): if ver == 1: - #import http_profile http_prof = HTTPProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: # import http_profile2 @@ -920,7 +887,6 @@ class Realm(LFCliBase): def new_fio_endp_profile(self, ver = 1): if ver == 1: - #import fio_endp_profile cx_prof = FIOEndpProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: # import fio_endp_profile2 @@ -929,7 +895,6 @@ class Realm(LFCliBase): def new_dut_profile(self, ver = 1): if ver == 1: - #import dut_profile dut_profile = dut_profile.DUTProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: # import dut_profile2 @@ -938,7 +903,6 @@ class Realm(LFCliBase): def new_mvlan_profile(self, ver = 1): if ver == 1: - #import mac_vlan_profile mac_vlan_profile = MACVLANProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: # import mac_vlan_profile2 @@ -947,1666 +911,12 @@ class Realm(LFCliBase): def new_test_group_profile(self, ver = 1): if ver == 1: - #import test_group_profile test_group_profile = TestGroupProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) # elif ver == 2: # import test_group_profile2 # test_group_profile = test_group_profile2.TestGroupProfile2(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) return test_group_profile - -# class MULTICASTProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, -# report_timer_=3000, name_prefix_="Unset", number_template_="00000", debug_=False): -# """ - -# :param lfclient_host: -# :param lfclient_port: -# :param local_realm: -# :param name_prefix_: prefix string for connection -# :param number_template_: how many zeros wide we padd, possibly a starting integer with left padding -# :param debug_: -# """ -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.lfclient_url = "http://%s:%s" % (lfclient_host, lfclient_port) -# self.debug = debug_ -# self.local_realm = local_realm -# self.report_timer = report_timer_ -# self.created_mc = {} -# self.name_prefix = name_prefix_ -# self.number_template = number_template_ - -# def get_mc_names(self): -# return self.created_mc.keys() - -# def refresh_mc(self, debug_=False): -# for endp_name in self.get_mc_names(): -# self.json_post("/cli-json/show_endpoints", { -# "endpoint": endp_name -# }, debug_=debug_) - -# def start_mc(self, suppress_related_commands=None, debug_=False): -# if self.debug: -# debug_ = True - -# for endp_name in self.get_mc_names(): -# print("Starting mcast endpoint: %s" % (endp_name)) -# json_data = { -# "endp_name": endp_name -# } -# url = "cli-json/start_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands) - -# pass - -# def stop_mc(self, suppress_related_commands=None, debug_=False): -# if self.debug: -# debug_ = True -# for endp_name in self.get_mc_names(): -# json_data = { -# "endp_name": endp_name -# } -# url = "cli-json/stop_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands) - -# pass - -# def cleanup_prefix(self): -# self.local_realm.cleanup_cxe_prefix(self.name_prefix) - -# def cleanup(self, suppress_related_commands=None, debug_ = False): -# if self.debug: -# debug_ = True - -# for endp_name in self.get_mc_names(): -# self.local_realm.rm_endp(endp_name, debug_=debug_, suppress_related_commands_=suppress_related_commands) - -# def create_mc_tx(self, endp_type, side_tx, suppress_related_commands=None, debug_=False): -# if self.debug: -# debug_ = True - -# side_tx_info = self.local_realm.name_to_eid(side_tx) -# side_tx_shelf = side_tx_info[0] -# side_tx_resource = side_tx_info[1] -# side_tx_port = side_tx_info[2] -# side_tx_name = "%smtx-%s-%i" % (self.name_prefix, side_tx_port, len(self.created_mc)) - -# json_data = [] - -# # add_endp mcast-xmit-sta 1 1 side_tx mc_udp -1 NO 4000000 0 NO 1472 0 INCREASING NO 32 0 0 -# json_data = { -# 'alias': side_tx_name, -# 'shelf': side_tx_shelf, -# 'resource': side_tx_resource, -# 'port': side_tx_port, -# 'type': endp_type, -# 'ip_port': -1, -# 'is_rate_bursty': -# 'NO', 'min_rate': 256000, -# 'max_rate': 0, -# 'is_pkt_sz_random': 'NO', -# 'min_pkt': 1472, -# 'max_pkt': 0, -# 'payload_pattern': 'INCREASING', -# 'use_checksum': 'NO', -# 'ttl': 32, -# 'send_bad_crc_per_million': 0, -# 'multi_conn': 0 -# } - -# url = "/cli-json/add_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - -# # set_mc_endp mcast-xmit-sta 32 224.9.9.9 9999 No # critical -# json_data = { -# 'name': side_tx_name, -# 'ttl': 32, -# 'mcast_group': '224.9.9.9', -# 'mcast_dest_port': 9999, -# 'rcv_mcast': 'No' -# } - -# url = "cli-json/set_mc_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - -# self.created_mc[side_tx_name] = side_tx_name - -# these_endp = [side_tx_name] -# self.local_realm.wait_until_endps_appear(these_endp, debug=debug_) - -# def create_mc_rx(self, endp_type, side_rx, suppress_related_commands=None, debug_=False): -# if self.debug: -# debug_ = True - -# these_endp = [] - -# for port_name in side_rx: -# side_rx_info = self.local_realm.name_to_eid(port_name) -# side_rx_shelf = side_rx_info[0] -# side_rx_resource = side_rx_info[1] -# side_rx_port = side_rx_info[2] -# side_rx_name = "%smrx-%s-%i" % (self.name_prefix, side_rx_port, len(self.created_mc)) -# # add_endp mcast-rcv-sta-001 1 1 sta0002 mc_udp 9999 NO 0 0 NO 1472 0 INCREASING NO 32 0 0 -# json_data = { -# 'alias': side_rx_name, -# 'shelf': side_rx_shelf, -# 'resource': side_rx_resource, -# 'port': side_rx_port, -# 'type': endp_type, -# 'ip_port': 9999, -# 'is_rate_bursty': 'NO', -# 'min_rate': 0, -# 'max_rate': 0, -# 'is_pkt_sz_random': 'NO', -# 'min_pkt': 1472, -# 'max_pkt': 0, -# 'payload_pattern': 'INCREASING', -# 'use_checksum': 'NO', -# 'ttl': 32, -# 'send_bad_crc_per_million': 0, -# 'multi_conn': 0 -# } - -# url = "cli-json/add_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands) - -# json_data = { -# 'name': side_rx_name, -# 'ttl': 32, -# 'mcast_group': '224.9.9.9', -# 'mcast_dest_port': 9999, -# 'rcv_mcast': 'Yes' -# } -# url = "cli-json/set_mc_endp" -# self.local_realm.json_post(url, json_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands) - -# self.created_mc[side_rx_name] = side_rx_name -# these_endp.append(side_rx_name) - -# self.local_realm.wait_until_endps_appear(these_endp, debug=debug_) - -# def to_string(self): -# pprint.pprint(self) - -# class L4CXProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.lfclient_url = "http://%s:%s" % (lfclient_host, lfclient_port) -# self.debug = debug_ -# self.url = "http://localhost/" -# self.requests_per_ten = 600 -# self.local_realm = local_realm -# self.created_cx = {} -# self.created_endp = [] -# self.lfclient_port = lfclient_port -# self.lfclient_host = lfclient_host - -# def check_errors(self, debug=False): -# fields_list = ["!conn", "acc.+denied", "bad-proto", "bad-url", "other-err", "total-err", "rslv-p", "rslv-h", -# "timeout", "nf+(4xx)", "http-r", "http-p", "http-t", "login-denied"] -# endp_list = self.json_get("layer4/list?fields=%s" % ','.join(fields_list)) -# debug_info = {} -# if endp_list is not None and endp_list['endpoint'] is not None: -# endp_list = endp_list['endpoint'] -# expected_passes = len(endp_list) -# passes = len(endp_list) -# for item in range(len(endp_list)): -# for name, info in endp_list[item].items(): -# for field in fields_list: -# if info[field.replace("+", " ")] > 0: -# passes -= 1 -# debug_info[name] = {field: info[field.replace("+", " ")]} -# if debug: -# print(debug_info) -# if passes == expected_passes: -# return True -# else: -# print(list(debug_info), " Endps in this list showed errors getting to %s " % self.url) -# return False - -# def start_cx(self): -# print("Starting CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "RUNNING" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def stop_cx(self): -# print("Stopping CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "STOPPED" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def check_request_rate(self): -# endp_list = self.json_get("layer4/list?fields=urls/s") -# expected_passes = 0 -# passes = 0 -# # TODO: this might raise a nameerror lower down -# # if self.target_requests_per_ten is None: -# # raise NameError("check request rate: missing self.target_requests_per_ten") -# if endp_list is not None and endp_list['endpoint'] is not None: -# endp_list = endp_list['endpoint'] -# for item in endp_list: -# for name, info in item.items(): -# if name in self.created_cx.keys(): -# expected_passes += 1 -# if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9: -# print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9) -# passes += 1 - -# return passes == expected_passes - - -# def cleanup(self): -# print("Cleaning up cxs and endpoints") -# if len(self.created_cx) != 0: -# for cx_name in self.created_cx.keys(): -# req_url = "cli-json/rm_cx" -# data = { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name] -# } -# self.json_post(req_url, data) -# # pprint(data) -# req_url = "cli-json/rm_endp" -# data = { -# "endp_name": cx_name -# } -# self.json_post(req_url, data) -# # pprint(data) - -# def create(self, ports=[], sleep_time=.5, debug_=False, suppress_related_commands_=None): -# cx_post_data = [] -# for port_name in ports: -# if len(self.local_realm.name_to_eid(port_name)) == 3: -# shelf = self.local_realm.name_to_eid(port_name)[0] -# resource = self.local_realm.name_to_eid(port_name)[1] -# name = self.local_realm.name_to_eid(port_name)[2] -# else: -# raise ValueError("Unexpected name for port_name %s" % port_name) -# endp_data = { -# "alias": name + "_l4", -# "shelf": shelf, -# "resource": resource, -# "port": name, -# "type": "l4_generic", -# "timeout": 10, -# "url_rate": self.requests_per_ten, -# "url": self.url, -# "proxy_auth_type": 0x200 -# } -# url = "cli-json/add_l4_endp" -# self.local_realm.json_post(url, endp_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - -# endp_data = { -# "alias": "CX_" + name + "_l4", -# "test_mgr": "default_tm", -# "tx_endp": name + "_l4", -# "rx_endp": "NA" -# } -# cx_post_data.append(endp_data) -# self.created_cx[name + "_l4"] = "CX_" + name + "_l4" - -# for cx_data in cx_post_data: -# url = "/cli-json/add_cx" -# self.local_realm.json_post(url, cx_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - -# def monitor(self, -# duration_sec=60, -# monitor_interval=1, -# col_names=None, -# created_cx=None, -# monitor=True, -# report_file=None, -# output_format=None, -# script_name=None, -# arguments=None, -# iterations=0, -# debug=False): -# try: -# duration_sec = Realm.parse_time(duration_sec).seconds -# except: -# if (duration_sec is None) or (duration_sec <= 1): -# raise ValueError("L4CXProfile::monitor wants duration_sec > 1 second") -# if (duration_sec <= monitor_interval): -# raise ValueError("L4CXProfile::monitor wants duration_sec > monitor_interval") -# if report_file == None: -# raise ValueError("Monitor requires an output file to be defined") -# if created_cx == None: -# raise ValueError("Monitor needs a list of Layer 4 connections") -# if (monitor_interval is None) or (monitor_interval < 1): -# raise ValueError("L4CXProfile::monitor wants monitor_interval >= 1 second") -# if output_format is not None: -# if output_format.lower() != report_file.split('.')[-1]: -# raise ValueError('Filename %s does not match output format %s' % (report_file, output_format)) -# else: -# output_format = report_file.split('.')[-1] - -# # Step 1 - Assign column names - -# if col_names is not None and len(col_names) > 0: -# header_row=col_names -# else: -# header_row=list((list(self.json_get("/layer4/all")['endpoint'][0].values())[0].keys())) -# if debug: -# print(header_row) - -# # Step 2 - Monitor columns - -# start_time = datetime.datetime.now() -# end_time = start_time + datetime.timedelta(seconds=duration_sec) -# sleep_interval = round(duration_sec // 5) -# if debug: -# print("Sleep_interval is %s ", sleep_interval) -# print("Start time is %s " , start_time) -# print("End time is %s " ,end_time) -# value_map = dict() -# passes = 0 -# expected_passes = 0 -# timestamps = [] -# for test in range(1+iterations): -# while datetime.datetime.now() < end_time: -# if col_names is None: -# response = self.json_get("/layer4/all") -# else: -# fields = ",".join(col_names) -# response = self.json_get("/layer4/%s?fields=%s" % (created_cx, fields)) -# if debug: -# print(response) -# if response is None: -# print(response) -# raise ValueError("Cannot find any endpoints") -# if monitor: -# if debug: -# print(response) - -# time.sleep(sleep_interval) -# t = datetime.datetime.now() -# timestamps.append(t) -# value_map[t] = response -# expected_passes += 1 -# if self.check_errors(debug): -# if self.check_request_rate(): -# passes += 1 -# else: -# self._fail("FAIL: Request rate did not exceed 90% target rate") -# self.exit_fail() -# else: -# self._fail("FAIL: Errors found getting to %s " % self.url) -# self.exit_fail() -# time.sleep(monitor_interval) -# print(value_map) - -# #[further] post-processing data, after test completion -# full_test_data_list = [] -# for test_timestamp, data in value_map.items(): -# #reduce the endpoint data to single dictionary of dictionaries -# for datum in data["endpoint"]: -# for endpoint_data in datum.values(): -# if debug: -# print(endpoint_data) -# endpoint_data["Timestamp"] = test_timestamp -# full_test_data_list.append(endpoint_data) - - -# header_row.append("Timestamp") -# header_row.append('Timestamp milliseconds') -# df = pd.DataFrame(full_test_data_list) - -# df["Timestamp milliseconds"] = [self.get_milliseconds(x) for x in df["Timestamp"]] -# #round entire column -# df["Timestamp milliseconds"]=df["Timestamp milliseconds"].astype(int) -# df["Timestamp"]=df["Timestamp"].apply(lambda x:x.strftime("%m/%d/%Y %I:%M:%S")) -# df=df[["Timestamp","Timestamp milliseconds", *header_row[:-2]]] -# #compare previous data to current data - -# systeminfo = ast.literal_eval(requests.get('http://'+str(self.lfclient_host)+':'+str(self.lfclient_port)).text) - -# if output_format == 'hdf': -# df.to_hdf(report_file, 'table', append=True) -# if output_format == 'parquet': -# df.to_parquet(report_file, engine='pyarrow') -# if output_format == 'png': -# fig = df.plot().get_figure() -# fig.savefig(report_file) -# if output_format.lower() in ['excel', 'xlsx'] or report_file.split('.')[-1] == 'xlsx': -# df.to_excel(report_file, index=False) -# if output_format == 'df': -# return df -# supported_formats = ['csv', 'json', 'stata', 'pickle','html'] -# for x in supported_formats: -# if output_format.lower() == x or report_file.split('.')[-1] == x: -# exec('df.to_' + x + '("'+report_file+'")') - - -# class GenCXProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.lfclient_host = lfclient_host -# self.lfclient_port = lfclient_port -# self.lfclient_url = "http://%s:%s" % (lfclient_host, lfclient_port) -# self.debug = debug_ -# self.type = "lfping" -# self.dest = "127.0.0.1" -# self.interval = 1 -# self.cmd = "" -# self.local_realm = local_realm -# self.name_prefix = "generic" -# self.created_cx = [] -# self.created_endp = [] -# self.file_output = "/dev/null" -# self.loop_count = 1 - -# def parse_command(self, sta_name): -# if self.type == "lfping": -# if ((self.dest is not None) or (self.dest != "")) and ((self.interval is not None) or (self.interval > 0)): -# self.cmd = "%s -i %s -I %s %s" % (self.type, self.interval, sta_name, self.dest) -# # print(self.cmd) -# else: -# raise ValueError("Please ensure dest and interval have been set correctly") -# elif self.type == "generic": -# if self.cmd == "": -# raise ValueError("Please ensure cmd has been set correctly") -# elif self.type == "speedtest": -# self.cmd = "vrf_exec.bash %s speedtest-cli --json --share" % (sta_name) -# elif self.type == "iperf3" and self.dest is not None: -# self.cmd = "iperf3 --forceflush --format k --precision 4 -c %s -t 60 --tos 0 -b 1K --bind_dev %s -i 1 " \ -# "--pidfile /tmp/lf_helper_iperf3_test.pid" % (self.dest, sta_name) -# elif self.type == "lfcurl": -# if self.file_output is not None: -# self.cmd = "./scripts/lf_curl.sh -p %s -i AUTO -o %s -n %s -d %s" % \ -# (sta_name, self.file_output, self.loop_count, self.dest) -# else: -# raise ValueError("Please ensure file_output has been set correctly") -# else: -# raise ValueError("Unknown command type") - -# def start_cx(self): -# print("Starting CXs...") -# # print(self.created_cx) -# # print(self.created_endp) -# for cx_name in self.created_cx: -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": cx_name, -# "cx_state": "RUNNING" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def stop_cx(self): -# print("Stopping CXs...") -# for cx_name in self.created_cx: -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": cx_name, -# "cx_state": "STOPPED" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def cleanup(self): -# print("Cleaning up cxs and endpoints") -# for cx_name in self.created_cx: -# req_url = "cli-json/rm_cx" -# data = { -# "test_mgr": "default_tm", -# "cx_name": cx_name -# } -# self.json_post(req_url, data) - -# for endp_name in self.created_endp: -# req_url = "cli-json/rm_endp" -# data = { -# "endp_name": endp_name -# } -# self.json_post(req_url, data) - -# def set_flags(self, endp_name, flag_name, val): -# data = { -# "name": endp_name, -# "flag": flag_name, -# "val": val -# } -# self.json_post("cli-json/set_endp_flag", data, debug_=self.debug) - -# def set_cmd(self, endp_name, cmd): -# data = { -# "name": endp_name, -# "command": cmd -# } -# self.json_post("cli-json/set_gen_cmd", data, debug_=self.debug) - -# def create(self, ports=[], sleep_time=.5, debug_=False, suppress_related_commands_=None): -# if self.debug: -# debug_ = True -# post_data = [] -# endp_tpls = [] -# for port_name in ports: -# port_info = self.local_realm.name_to_eid(port_name) -# if len(port_info) == 2: -# resource = 1 -# shelf = port_info[0] -# name = port_info[-1] -# elif len(port_info) == 3: -# resource = port_info[0] -# shelf = port_info[1] -# name = port_info[-1] -# else: -# raise ValueError("Unexpected name for port_name %s" % port_name) - -# # this naming convention follows what you see when you use -# # lf_firemod.pl --action list_endp after creating a generic endpoint -# gen_name_a = "%s-%s" % (self.name_prefix, name) -# gen_name_b = "D_%s-%s" % (self.name_prefix, name) -# endp_tpls.append((shelf, resource, name, gen_name_a, gen_name_b)) - -# for endp_tpl in endp_tpls: -# shelf = endp_tpl[0] -# resource = endp_tpl[1] -# name = endp_tpl[2] -# gen_name_a = endp_tpl[3] -# # gen_name_b = endp_tpl[3] -# # (self, alias=None, shelf=1, resource=1, port=None, type=None) - -# data = { -# "alias": gen_name_a, -# "shelf": shelf, -# "resource": resource, -# "port": name, -# "type": "gen_generic" -# } -# if self.debug: -# pprint(data) - -# self.json_post("cli-json/add_gen_endp", data, debug_=self.debug) - -# self.local_realm.json_post("/cli-json/nc_show_endpoints", {"endpoint": "all"}) -# time.sleep(sleep_time) - -# for endp_tpl in endp_tpls: -# gen_name_a = endp_tpl[3] -# gen_name_b = endp_tpl[4] -# self.set_flags(gen_name_a, "ClearPortOnStart", 1) -# time.sleep(sleep_time) - -# for endp_tpl in endp_tpls: -# name = endp_tpl[2] -# gen_name_a = endp_tpl[3] -# # gen_name_b = endp_tpl[4] -# self.parse_command(name) -# self.set_cmd(gen_name_a, self.cmd) -# time.sleep(sleep_time) - -# for endp_tpl in endp_tpls: -# name = endp_tpl[2] -# gen_name_a = endp_tpl[3] -# gen_name_b = endp_tpl[4] -# cx_name = "CX_%s-%s" % (self.name_prefix, name) -# data = { -# "alias": cx_name, -# "test_mgr": "default_tm", -# "tx_endp": gen_name_a, -# "rx_endp": gen_name_b -# } -# post_data.append(data) -# self.created_cx.append(cx_name) -# self.created_endp.append(gen_name_a) -# self.created_endp.append(gen_name_b) - -# time.sleep(sleep_time) - -# for data in post_data: -# url = "/cli-json/add_cx" -# if self.debug: -# pprint(data) -# self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands_) -# time.sleep(2) -# time.sleep(sleep_time) -# for data in post_data: -# self.local_realm.json_post("/cli-json/show_cx", { -# "test_mgr": "default_tm", -# "cross_connect": data["alias"] -# }) -# time.sleep(sleep_time) - - -# class WifiMonitor: -# def __init__(self, lfclient_url, local_realm, up=True, debug_=False, resource_=1): -# self.debug = debug_ -# self.lfclient_url = lfclient_url -# self.up = up -# self.local_realm = local_realm -# self.monitor_name = None -# self.resource = resource_ -# self.flag_names = [] -# self.flag_mask_names = [] -# self.flags_mask = add_monitor.default_flags_mask -# self.aid = "NA" # used when sniffing /ax radios -# self.bsssid = "00:00:00:00:00:00" # used when sniffing on /ax radios - -# def create(self, resource_=1, channel=None, radio_="wiphy0", name_="moni0"): -# print("Creating monitor " + name_) -# self.monitor_name = name_ -# computed_flags = 0 -# for flag_n in self.flag_names: -# computed_flags += add_monitor.flags[flag_n] - -# # we want to query the existing country code of the radio -# # there's no reason to change it but we get hollering from server -# # if we don't provide a value for the parameter -# jr = self.local_realm.json_get("/radiostatus/1/%s/%s?fields=channel,frequency,country" % (resource_, radio_), -# debug_=self.debug) -# if jr is None: -# raise ValueError("No radio %s.%s found" % (resource_, radio_)) - -# eid = "1.%s.%s" % (resource_, radio_) -# frequency = 0 -# country = 0 -# if eid in jr: -# country = jr[eid]["country"] - -# data = { -# "shelf": 1, -# "resource": resource_, -# "radio": radio_, -# "mode": 0, # "NA", #0 for AUTO or "NA" -# "channel": channel, -# "country": country, -# "frequency": self.local_realm.channel_freq(channel_=channel) -# } -# self.local_realm.json_post("/cli-json/set_wifi_radio", _data=data) -# time.sleep(1) -# self.local_realm.json_post("/cli-json/add_monitor", { -# "shelf": 1, -# "resource": resource_, -# "radio": radio_, -# "ap_name": self.monitor_name, -# "flags": computed_flags, -# "flags_mask": self.flags_mask -# }) - -# def set_flag(self, param_name, value): -# if (param_name not in add_monitor.flags): -# raise ValueError("Flag '%s' does not exist for add_monitor, consult add_monitor.py" % param_name) -# if (value == 1) and (param_name not in self.flag_names): -# self.flag_names.append(param_name) -# elif (value == 0) and (param_name in self.flag_names): -# del self.flag_names[param_name] -# self.flags_mask |= add_monitor.flags[param_name] - -# def cleanup(self, resource_=1, desired_ports=None): -# print("Cleaning up monitors") -# if (desired_ports is None) or (len(desired_ports) < 1): -# if (self.monitor_name is None) or (self.monitor_name == ""): -# print("No monitor name set to delete") -# return -# LFUtils.removePort(resource=resource_, -# port_name=self.monitor_name, -# baseurl=self.lfclient_url, -# debug=self.debug) -# else: -# names = ",".join(desired_ports) -# existing_ports = self.local_realm.json_get("/port/1/%d/%s?fields=alias" % (resource_, names), debug_=False) -# if (existing_ports is None) or ("interfaces" not in existing_ports) or ("interface" not in existing_ports): -# print("No monitor names found to delete") -# return -# if ("interfaces" in existing_ports): -# for eid, info in existing_ports["interfaces"].items(): -# LFUtils.removePort(resource=resource_, -# port_name=info["alias"], -# baseurl=self.lfclient_url, -# debug=self.debug) -# if ("interface" in existing_ports): -# for eid, info in existing_ports["interface"].items(): -# LFUtils.removePort(resource=resource_, -# port_name=info["alias"], -# baseurl=self.lfclient_url, -# debug=self.debug) - -# def admin_up(self): -# up_request = LFUtils.port_up_request(resource_id=self.resource, port_name=self.monitor_name) -# self.local_realm.json_post("/cli-json/set_port", up_request) - -# def admin_down(self): -# down_request = LFUtils.portDownRequest(resource_id=self.resource, port_name=self.monitor_name) -# self.local_realm.json_post("/cli-json/set_port", down_request) - -# def start_sniff(self, capname=None, duration_sec=60): -# if capname is None: -# raise ValueError("Need a capture file name") -# data = { -# "shelf": 1, -# "resource": 1, -# "port": self.monitor_name, -# "display": "NA", -# "flags": 0x2, -# "outfile": capname, -# "duration": duration_sec -# } -# self.local_realm.json_post("/cli-json/sniff_port", _data=data) - - -# class VAPProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, -# vap_name="", -# ssid="NA", -# ssid_pass="NA", -# mode=0, -# debug_=False): -# super().__init__(_lfjson_host=lfclient_host, _lfjson_port=lfclient_port, _debug=debug_) -# self.debug = debug_ -# # self.lfclient_url = lfclient_url # done in super() -# self.ssid = ssid -# self.ssid_pass = ssid_pass -# self.mode = mode -# self.local_realm = local_realm -# self.vap_name = vap_name -# self.COMMANDS = ["add_vap", "set_port"] -# self.desired_add_vap_flags = ["wpa2_enable", "80211u_enable", "create_admin_down"] -# self.desired_add_vap_flags_mask = ["wpa2_enable", "80211u_enable", "create_admin_down"] - -# self.add_vap_data = { -# "shelf": 1, -# "resource": 1, -# "radio": None, -# "ap_name": None, -# "flags": 0, -# "flags_mask": 0, -# "mode": 0, -# "ssid": None, -# "key": None, -# "mac": "xx:xx:xx:xx:*:xx" -# } - -# self.desired_set_port_cmd_flags = [] -# self.desired_set_port_current_flags = ["if_down"] -# self.desired_set_port_interest_flags = ["current_flags", "ifdown"] -# self.set_port_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "current_flags": 0, -# "interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down -# } -# self.wifi_extra_data_modified = False -# self.wifi_extra_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "key_mgmt": None, -# "eap": None, -# "hessid": None, -# "identity": None, -# "password": None, -# "realm": None, -# "domain": None -# } - -# def set_wifi_extra(self, -# key_mgmt="WPA-EAP", -# pairwise="DEFAULT", -# group="DEFAULT", -# psk="[BLANK]", -# eap="TTLS", -# identity="testuser", -# passwd="testpasswd", -# realm="localhost.localdomain", -# domain="localhost.localdomain", -# hessid="00:00:00:00:00:01"): -# self.wifi_extra_data_modified = True -# self.wifi_extra_data["key_mgmt"] = key_mgmt -# self.wifi_extra_data["eap"] = eap -# self.wifi_extra_data["identity"] = identity -# self.wifi_extra_data["password"] = passwd -# self.wifi_extra_data["realm"] = realm -# self.wifi_extra_data["domain"] = domain -# self.wifi_extra_data["hessid"] = hessid - -# def admin_up(self, resource): -# set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug) -# req_json = LFUtils.portUpRequest(resource, None, debug_on=self.debug) -# req_json["port"] = self.vap_name -# set_port_r.addPostData(req_json) -# json_response = set_port_r.jsonPost(self.debug) -# time.sleep(0.03) - -# def admin_down(self, resource): -# set_port_r = LFRequest.LFRequest(self.lfclient_url, "/cli-json/set_port", debug_=self.debug) -# req_json = LFUtils.port_down_request(resource, None, debug_on=self.debug) -# req_json["port"] = self.vap_name -# set_port_r.addPostData(req_json) -# json_response = set_port_r.jsonPost(self.debug) -# time.sleep(0.03) - -# def use_security(self, security_type, ssid=None, passwd=None): -# types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"} -# self.add_vap_data["ssid"] = ssid -# if security_type in types.keys(): -# if (ssid is None) or (ssid == ""): -# raise ValueError("use_security: %s requires ssid" % security_type) -# if (passwd is None) or (passwd == ""): -# raise ValueError("use_security: %s requires passphrase or [BLANK]" % security_type) -# for name in types.values(): -# if name in self.desired_add_vap_flags and name in self.desired_add_vap_flags_mask: -# self.desired_add_vap_flags.remove(name) -# self.desired_add_vap_flags_mask.remove(name) -# if security_type != "open": -# self.desired_add_vap_flags.append(types[security_type]) -# self.desired_add_vap_flags_mask.append(types[security_type]) -# else: -# passwd = "[BLANK]" -# self.set_command_param("add_vap", "ssid", ssid) -# self.set_command_param("add_vap", "key", passwd) -# # unset any other security flag before setting our present flags -# if security_type == "wpa3": -# self.set_command_param("add_vap", "ieee80211w", 2) - -# def set_command_flag(self, command_name, param_name, value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# print("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# return -# if command_name == "add_vap": -# if (param_name not in add_vap.add_vap_flags): -# print("Parameter name [%s] not defined in add_vap.py" % param_name) -# if self.debug: -# pprint(add_vap.add_vap_flags) -# return -# if (value == 1) and (param_name not in self.desired_add_vap_flags): -# self.desired_add_vap_flags.append(param_name) -# self.desired_add_vap_flags_mask.append(param_name) -# elif value == 0: -# self.desired_add_vap_flags.remove(param_name) -# self.desired_add_vap_flags_mask.append(param_name) - -# elif command_name == "set_port": -# if (param_name not in set_port.set_port_current_flags) and ( -# param_name not in set_port.set_port_cmd_flags) and ( -# param_name not in set_port.set_port_interest_flags): -# print("Parameter name [%s] not defined in set_port.py" % param_name) -# if self.debug: -# pprint(set_port.set_port_cmd_flags) -# pprint(set_port.set_port_current_flags) -# pprint(set_port.set_port_interest_flags) -# return -# if param_name in set_port.set_port_cmd_flags: -# if (value == 1) and (param_name not in self.desired_set_port_cmd_flags): -# self.desired_set_port_cmd_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_cmd_flags.remove(param_name) -# elif param_name in set_port.set_port_current_flags: -# if (value == 1) and (param_name not in self.desired_set_port_current_flags): -# self.desired_set_port_current_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_current_flags.remove(param_name) -# elif param_name in set_port.set_port_interest_flags: -# if (value == 1) and (param_name not in self.desired_set_port_interest_flags): -# self.desired_set_port_interest_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_interest_flags.remove(param_name) -# else: -# raise ValueError("Unknown param name: " + param_name) - -# def set_command_param(self, command_name, param_name, param_value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# self.error("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# return -# if command_name == "add_vap": -# self.add_vap_data[param_name] = param_value -# elif command_name == "set_port": -# self.set_port_data[param_name] = param_value - -# def add_named_flags(self, desired_list, command_ref): -# if desired_list is None: -# raise ValueError("addNamedFlags wants a list of desired flag names") -# if len(desired_list) < 1: -# print("addNamedFlags: empty desired list") -# return 0 -# if (command_ref is None) or (len(command_ref) < 1): -# raise ValueError("addNamedFlags wants a maps of flag values") - -# result = 0 -# for name in desired_list: -# if (name is None) or (name == ""): -# continue -# if name not in command_ref: -# if self.debug: -# pprint(command_ref) -# raise ValueError("flag %s not in map" % name) -# # print("add-named-flags: %s %i"%(name, command_ref[name])) -# result |= command_ref[name] - -# return result - -# def create(self, resource, radio, channel=None, up_=None, debug=False, use_ht40=True, use_ht80=True, -# use_ht160=False, -# suppress_related_commands_=True, use_radius=False, hs20_enable=False): -# port_list = self.local_realm.json_get("port/1/1/list") -# if port_list is not None: -# port_list = port_list['interfaces'] -# for port in port_list: -# for k, v in port.items(): -# if v['alias'] == self.vap_name: -# self.local_realm.rm_port(k, check_exists=True) -# if use_ht160: -# self.desired_add_vap_flags.append("enable_80211d") -# self.desired_add_vap_flags_mask.append("enable_80211d") -# self.desired_add_vap_flags.append("80211h_enable") -# self.desired_add_vap_flags_mask.append("80211h_enable") -# self.desired_add_vap_flags.append("ht160_enable") -# self.desired_add_vap_flags_mask.append("ht160_enable") -# if not use_ht40: -# self.desired_add_vap_flags.append("disable_ht40") -# self.desired_add_vap_flags_mask.append("disable_ht40") -# if not use_ht80: -# self.desired_add_vap_flags.append("disable_ht80") -# self.desired_add_vap_flags_mask.append("disable_ht80") -# if use_radius: -# self.desired_add_vap_flags.append("8021x_radius") -# self.desired_add_vap_flags_mask.append("8021x_radius") -# if hs20_enable: -# self.desired_add_vap_flags.append("hs20_enable") -# self.desired_add_vap_flags_mask.append("hs20_enable") - -# # print("MODE ========= ", self.mode) - -# jr = self.local_realm.json_get("/radiostatus/1/%s/%s?fields=channel,frequency,country" % (resource, radio), -# debug_=self.debug) -# if jr is None: -# raise ValueError("No radio %s.%s found" % (resource, radio)) - -# eid = "1.%s.%s" % (resource, radio) -# frequency = 0 -# country = 0 -# if eid in jr: -# country = jr[eid]["country"] - -# data = { -# "shelf": 1, -# "resource": resource, -# "radio": radio, -# "mode": self.mode, # "NA", #0 for AUTO or "NA" -# "channel": channel, -# "country": country, -# "frequency": self.local_realm.channel_freq(channel_=channel) -# } -# self.local_realm.json_post("/cli-json/set_wifi_radio", _data=data) -# if up_ is not None: -# self.up = up_ -# if self.up: -# if "create_admin_down" in self.desired_add_vap_flags: -# del self.desired_add_vap_flags[self.desired_add_vap_flags.index("create_admin_down")] -# elif "create_admin_down" not in self.desired_add_vap_flags: -# self.desired_add_vap_flags.append("create_admin_down") - -# # create vaps down, do set_port on them, then set vaps up -# self.add_vap_data["mode"] = self.mode -# self.add_vap_data["flags"] = self.add_named_flags(self.desired_add_vap_flags, add_vap.add_vap_flags) -# self.add_vap_data["flags_mask"] = self.add_named_flags(self.desired_add_vap_flags_mask, add_vap.add_vap_flags) -# self.add_vap_data["radio"] = radio - -# self.add_vap_data["resource"] = resource -# self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags, -# set_port.set_port_current_flags) -# self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags, -# set_port.set_port_interest_flags) -# # these are unactivated LFRequest objects that we can modify and -# # re-use inside a loop, reducing the number of object creations -# add_vap_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_vap") -# set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port") -# wifi_extra_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_wifi_extra") -# if suppress_related_commands_: -# self.add_vap_data["suppress_preexec_cli"] = "yes" -# self.add_vap_data["suppress_preexec_method"] = 1 -# self.set_port_data["suppress_preexec_cli"] = "yes" -# self.set_port_data["suppress_preexec_method"] = 1 - -# # pprint(self.station_names) -# # exit(1) -# self.set_port_data["port"] = self.vap_name -# self.add_vap_data["ap_name"] = self.vap_name -# add_vap_r.addPostData(self.add_vap_data) -# if debug: -# print("- 1502 - %s- - - - - - - - - - - - - - - - - - " % self.vap_name) -# pprint(self.add_vap_data) -# pprint(self.set_port_data) -# pprint(add_vap_r) -# print("- ~1502 - - - - - - - - - - - - - - - - - - - ") - -# json_response = add_vap_r.jsonPost(debug) -# # time.sleep(0.03) -# time.sleep(2) -# set_port_r.addPostData(self.set_port_data) -# json_response = set_port_r.jsonPost(debug) -# time.sleep(0.03) - -# self.wifi_extra_data["resource"] = resource -# self.wifi_extra_data["port"] = self.vap_name -# if self.wifi_extra_data_modified: -# wifi_extra_r.addPostData(self.wifi_extra_data) -# json_response = wifi_extra_r.jsonPost(debug) - -# port_list = self.local_realm.json_get("port/1/1/list") -# if port_list is not None: -# port_list = port_list['interfaces'] -# for port in port_list: -# for k, v in port.items(): -# if v['alias'] == 'br0': -# self.local_realm.rm_port(k, check_exists=True) -# time.sleep(5) - -# # create bridge -# data = { -# "shelf": 1, -# "resource": resource, -# "port": "br0", -# "network_devs": "eth1,%s" % self.vap_name -# } -# self.local_realm.json_post("cli-json/add_br", data) - -# bridge_set_port = { -# "shelf": 1, -# "resource": 1, -# "port": "br0", -# "current_flags": 0x80000000, -# "interest": 0x4000 # (0x2 + 0x4000 + 0x800000) # current, dhcp, down -# } -# self.local_realm.json_post("cli-json/set_port", bridge_set_port) - -# if (self.up): -# self.admin_up(1) - -# def cleanup(self, resource, delay=0.03): -# print("Cleaning up VAPs") -# desired_ports = ["1.%s.%s" % (resource, self.vap_name), "1.%s.br0" % resource] - -# del_count = len(desired_ports) - -# # First, request remove on the list. -# for port_eid in desired_ports: -# self.local_realm.rm_port(port_eid, check_exists=True) - -# # And now see if they are gone -# LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=desired_ports) - - - - -# class DUTProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True, _local_realm=local_realm) -# self.name = "NA" -# self.flags = "NA" -# self.img_file = "NA" -# self.sw_version = "NA" -# self.hw_version = "NA" -# self.model_num = "NA" -# self.serial_num = "NA" -# self.serial_port = "NA" -# self.wan_port = "NA" -# self.lan_port = "NA" -# self.ssid1 = "NA" -# self.ssid2 = "NA" -# self.ssid3 = "NA" -# self.passwd1 = "NA" -# self.passwd2 = "NA" -# self.passwd3 = "NA" -# self.mgt_ip = "NA" -# self.api_id = "NA" -# self.flags_mask = "NA" -# self.antenna_count1 = "NA" -# self.antenna_count2 = "NA" -# self.antenna_count3 = "NA" -# self.bssid1 = "NA" -# self.bssid2 = "NA" -# self.bssid3 = "NA" -# self.top_left_x = "NA" -# self.top_left_y = "NA" -# self.eap_id = "NA" -# self.flags = 0 -# self.flags_mask = 0 -# self.notes = [] -# self.append = [] - -# def set_param(self, name, value): -# if (name in self.__dict__): -# self.__dict__[name] = value - -# def create(self, name=None, param_=None, flags=None, flags_mask=None, notes=None): -# data = {} -# if (name is not None) and (name != ""): -# data["name"] = name -# elif (self.name is not None) and (self.name != ""): -# data["name"] = self.name -# else: -# raise ValueError("cannot create/update DUT record lacking a name") - -# for param in add_dut.dut_params: -# if (param.name in self.__dict__): -# if (self.__dict__[param.name] is not None) \ -# and (self.__dict__[param.name] != "NA"): -# data[param.name] = self.__dict__[param.name] -# else: -# print("---------------------------------------------------------") -# pprint(self.__dict__[param.name]) -# print("---------------------------------------------------------") -# raise ValueError("parameter %s not in dut_profile" % param) - -# if (flags is not None) and (int(flags) > -1): -# data["flags"] = flags -# elif (self.flags is not None) and (self.flags > -1): -# data["flags"] = self.flags - -# if (flags_mask is not None) and (int(flags_mask) > -1): -# data["flags_mask"] = flags_mask -# elif (self.flags_mask is not None) and (int(self.flags_mask) > -1): -# data["flags_mask"] = self.flags_mask - -# url = "/cli-json/add_dut" -# if self.debug: -# print("---- DATA -----------------------------------------------") -# pprint(data) -# pprint(self.notes) -# pprint(self.append) -# print("---------------------------------------------------------") -# self.json_post(url, data, debug_=self.debug) - -# if (self.notes is not None) and (len(self.notes) > 0): -# self.json_post("/cli-json/add_dut_notes", { -# "dut": self.name, -# "text": "[BLANK]" -# }, self.debug) -# notebytes = None -# for line in self.notes: -# notebytes = base64.b64encode(line.encode('ascii')) -# if self.debug: -# print("------ NOTES ---------------------------------------------------") -# pprint(self.notes) -# pprint(str(notebytes)) -# print("---------------------------------------------------------") -# self.json_post("/cli-json/add_dut_notes", { -# "dut": self.name, -# "text-64": notebytes.decode('ascii') -# }, self.debug) -# if (self.append is not None) and (len(self.append) > 0): -# notebytes = None -# for line in self.append: -# notebytes = base64.b64encode(line.encode('ascii')) -# if self.debug: -# print("----- APPEND ----------------------------------------------------") -# pprint(line) -# pprint(str(notebytes)) -# print("---------------------------------------------------------") -# self.json_post("/cli-json/add_dut_notes", { -# "dut": self.name, -# "text-64": notebytes.decode('ascii') -# }, self.debug) - - -# class TestGroupProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, test_group_name=None, debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.local_realm = local_realm -# self.group_name = test_group_name -# self.cx_list = [] - -# def start_group(self): -# if self.group_name is not None: -# self.local_realm.json_post("/cli-json/start_group", {"name": self.group_name}) -# else: -# raise ValueError("test_group name must be set.") - -# def quiesce_group(self): -# if self.group_name is not None: -# self.local_realm.json_post("/cli-json/quiesce_group", {"name": self.group_name}) -# else: -# raise ValueError("test_group name must be set.") - -# def stop_group(self): -# if self.group_name is not None: -# self.local_realm.json_post("/cli-json/stop_group", {"name": self.group_name}) -# else: -# raise ValueError("test_group name must be set.") - -# def create_group(self): -# if self.group_name is not None: -# self.local_realm.json_post("/cli-json/add_group", {"name": self.group_name}) -# else: -# raise ValueError("test_group name must be set.") - -# def rm_group(self): -# if self.group_name is not None: -# self.local_realm.json_post("/cli-json/rm_group", {"name": self.group_name}) -# else: -# raise ValueError("test_group name must be set.") - -# def add_cx(self, cx_name): -# self.local_realm.json_post("/cli-json/add_tgcx", {"tgname": self.group_name, "cxname": cx_name}) - -# def rm_cx(self, cx_name): -# self.local_realm.json_post("/cli-json/rm_tgcx", {"tgname": self.group_name, "cxname": cx_name}) - -# def check_group_exists(self): -# test_groups = self.local_realm.json_get("/testgroups/all") -# if test_groups is not None and "groups" in test_groups: -# test_groups = test_groups["groups"] -# for group in test_groups: -# for k, v in group.items(): -# if v['name'] == self.group_name: -# return True -# else: -# return False - -# def list_groups(self): -# test_groups = self.local_realm.json_get("/testgroups/all") -# tg_list = [] -# if test_groups is not None: -# test_groups = test_groups["groups"] -# for group in test_groups: -# for k, v in group.items(): -# tg_list.append(v['name']) -# return tg_list - -# def list_cxs(self): -# test_groups = self.local_realm.json_get("/testgroups/all") -# if test_groups is not None: -# test_groups = test_groups["groups"] -# for group in test_groups: -# for k, v in group.items(): -# if v['name'] == self.group_name: -# return v['cross connects'] - -# else: -# return [] - - -# class FIOEndpProfile(LFCliBase): -# """ -# Very often you will create the FileIO writer profile first so that it creates the data -# that a reader profile will subsequently use. -# """ - -# def __init__(self, lfclient_host, lfclient_port, local_realm, io_direction="write", debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.local_realm = local_realm -# self.fs_type = "fe_nfsv4" -# self.min_rw_size = 128 * 1024 -# self.max_rw_size = 128 * 1024 -# self.min_file_size = 10 * 1024 * 1024 -# self.max_file_size = 10 * 1024 * 1024 - -# self.min_read_rate_bps = 10 * 1000 * 1000 -# self.max_read_rate_bps = 10 * 1000 * 1000 -# self.min_write_rate_bps = 1000 * 1000 * 1000 -# self.max_write_rate_bps = 1000 * 1000 * 1000 - -# self.file_num = 10 # number of files to write -# self.directory = None # directory like /mnt/lf/$endp_name - -# # this refers to locally mounted directories presently used for writing -# # you would set this when doing read tests simultaneously to write tests -# # so like, if your endpoint names are like wo_300GB_001, your Directory value -# # defaults to /mnt/lf/wo_300GB_001; but reader enpoint would be named -# # /mnt/lf/ro_300GB_001, this overwrites a readers directory name to wo_300GB_001 -# self.mount_dir = "AUTO" - -# self.server_mount = None # like cifs://10.0.0.1/bashful or 192.168.1.1:/var/tmp -# self.mount_options = None -# self.iscsi_vol = None -# self.retry_timer_ms = 2000 -# self.io_direction = io_direction # read / write -# self.quiesce_ms = 3000 -# self.pattern = "increasing" -# self.file_prefix = "AUTO" # defaults to endp_name -# self.cx_prefix = "wo_" - -# self.created_cx = {} -# self.created_endp = [] - -# def start_cx(self): -# print("Starting CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "RUNNING" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def stop_cx(self): -# print("Stopping CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "STOPPED" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def create_ro_profile(self): -# ro_profile = self.local_realm.new_fio_endp_profile() -# ro_profile.realm = self.local_realm - -# ro_profile.fs_type = self.fs_type -# ro_profile.min_read_rate_bps = self.min_write_rate_bps -# ro_profile.max_read_rate_bps = self.max_write_rate_bps -# ro_profile.min_write_rate_bps = self.min_read_rate_bps -# ro_profile.max_write_rate_bps = self.max_read_rate_bps -# ro_profile.file_num = self.file_num -# ro_profile.directory = self.directory -# ro_profile.mount_dir = self.directory -# ro_profile.server_mount = self.server_mount -# ro_profile.mount_options = self.mount_options -# ro_profile.iscsi_vol = self.iscsi_vol -# ro_profile.retry_timer_ms = self.retry_timer_ms -# ro_profile.io_direction = "read" -# ro_profile.quiesce_ms = self.quiesce_ms -# ro_profile.pattern = self.pattern -# ro_profile.file_prefix = self.file_prefix -# ro_profile.cx_prefix = "ro_" -# return ro_profile - -# def cleanup(self): -# print("Cleaning up cxs and endpoints") -# if len(self.created_cx) != 0: -# for cx_name in self.created_cx.keys(): -# req_url = "cli-json/rm_cx" -# data = { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name] -# } -# self.json_post(req_url, data) -# # pprint(data) -# req_url = "cli-json/rm_endp" -# data = { -# "endp_name": cx_name -# } -# self.json_post(req_url, data) -# # pprint(data) - -# def create(self, ports=[], connections_per_port=1, sleep_time=.5, debug_=False, suppress_related_commands_=None): -# cx_post_data = [] -# for port_name in ports: -# for num_connection in range(connections_per_port): -# if len(self.local_realm.name_to_eid(port_name)) == 3: -# shelf = self.local_realm.name_to_eid(port_name)[0] -# resource = self.local_realm.name_to_eid(port_name)[1] -# name = self.local_realm.name_to_eid(port_name)[2] -# else: -# raise ValueError("Unexpected name for port_name %s" % port_name) -# if self.directory is None or self.server_mount is None or self.fs_type is None: -# raise ValueError("directory [%s], server_mount [%s], and type [%s] must not be None" % ( -# self.directory, self.server_mount, self.fs_type)) -# endp_data = { -# "alias": self.cx_prefix + name + "_" + str(num_connection) + "_fio", -# "shelf": shelf, -# "resource": resource, -# "port": name, -# "type": self.fs_type, -# "min_read_rate": self.min_read_rate_bps, -# "max_read_rate": self.max_read_rate_bps, -# "min_write_rate": self.min_write_rate_bps, -# "max_write_rate": self.max_write_rate_bps, -# "directory": self.directory, -# "server_mount": self.server_mount, -# "mount_dir": self.mount_dir, -# "prefix": self.file_prefix, -# "payload_pattern": self.pattern, - -# } -# # Read direction is copy of write only directory -# if self.io_direction == "read": -# endp_data["prefix"] = "wo_" + name + "_" + str(num_connection) + "_fio" -# endp_data["directory"] = "/mnt/lf/wo_" + name + "_" + str(num_connection) + "_fio" - -# url = "cli-json/add_file_endp" -# self.local_realm.json_post(url, endp_data, debug_=False, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - -# data = { -# "name": self.cx_prefix + name + "_" + str(num_connection) + "_fio", -# "io_direction": self.io_direction, -# "num_files": 5 -# } -# self.local_realm.json_post("cli-json/set_fe_info", data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) - -# self.local_realm.json_post("/cli-json/nc_show_endpoints", {"endpoint": "all"}) -# for port_name in ports: -# for num_connection in range(connections_per_port): -# if len(self.local_realm.name_to_eid(port_name)) == 3: -# shelf = self.local_realm.name_to_eid(port_name)[0] -# resource = self.local_realm.name_to_eid(port_name)[1] -# name = self.local_realm.name_to_eid(port_name)[2] - -# endp_data = { -# "alias": "CX_" + self.cx_prefix + name + "_" + str(num_connection) + "_fio", -# "test_mgr": "default_tm", -# "tx_endp": self.cx_prefix + name + "_" + str(num_connection) + "_fio", -# "rx_endp": "NA" -# } -# cx_post_data.append(endp_data) -# self.created_cx[self.cx_prefix + name + "_" + str( -# num_connection) + "_fio"] = "CX_" + self.cx_prefix + name + "_" + str(num_connection) + "_fio" - -# for cx_data in cx_post_data: -# url = "/cli-json/add_cx" -# self.local_realm.json_post(url, cx_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - - -# class MACVLANProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, -# local_realm, -# macvlan_parent="eth1", -# num_macvlans=1, -# admin_down=False, -# dhcp=False, -# debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.local_realm = local_realm -# self.num_macvlans = num_macvlans -# self.macvlan_parent = macvlan_parent -# self.resource = 1 -# self.shelf = 1 -# self.desired_macvlans = [] -# self.created_macvlans = [] -# self.dhcp = dhcp -# self.netmask = None -# self.first_ip_addr = None -# self.gateway = None -# self.ip_list = [] -# self.COMMANDS = ["set_port"] -# self.desired_set_port_cmd_flags = [] -# self.desired_set_port_current_flags = [] # do not default down, "if_down" -# self.desired_set_port_interest_flags = ["current_flags"] # do not default down, "ifdown" -# self.set_port_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "current_flags": 0, -# "interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down, -# } - -# def add_named_flags(self, desired_list, command_ref): -# if desired_list is None: -# raise ValueError("addNamedFlags wants a list of desired flag names") -# if len(desired_list) < 1: -# print("addNamedFlags: empty desired list") -# return 0 -# if (command_ref is None) or (len(command_ref) < 1): -# raise ValueError("addNamedFlags wants a maps of flag values") - -# result = 0 -# for name in desired_list: -# if (name is None) or (name == ""): -# continue -# if name not in command_ref: -# if self.debug: -# pprint(command_ref) -# raise ValueError("flag %s not in map" % name) -# result += command_ref[name] - -# return result - -# def set_command_param(self, command_name, param_name, param_value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# raise ValueError("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# # return -# if command_name == "set_port": -# self.set_port_data[param_name] = param_value - -# def set_command_flag(self, command_name, param_name, value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# print("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# return - -# elif command_name == "set_port": -# if (param_name not in set_port.set_port_current_flags) and ( -# param_name not in set_port.set_port_cmd_flags) and ( -# param_name not in set_port.set_port_interest_flags): -# print("Parameter name [%s] not defined in set_port.py" % param_name) -# if self.debug: -# pprint(set_port.set_port_cmd_flags) -# pprint(set_port.set_port_current_flags) -# pprint(set_port.set_port_interest_flags) -# return -# if (param_name in set_port.set_port_cmd_flags): -# if (value == 1) and (param_name not in self.desired_set_port_cmd_flags): -# self.desired_set_port_cmd_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_cmd_flags.remove(param_name) -# elif (param_name in set_port.set_port_current_flags): -# if (value == 1) and (param_name not in self.desired_set_port_current_flags): -# self.desired_set_port_current_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_current_flags.remove(param_name) -# elif (param_name in set_port.set_port_interest_flags): -# if (value == 1) and (param_name not in self.desired_set_port_interest_flags): -# self.desired_set_port_interest_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_interest_flags.remove(param_name) -# else: -# raise ValueError("Unknown param name: " + param_name) - -# def create(self, admin_down=False, debug=False, sleep_time=1): -# print("Creating MACVLANs...") -# req_url = "/cli-json/add_mvlan" - -# if not self.dhcp and self.first_ip_addr is not None and self.netmask is not None and self.gateway is not None: -# self.desired_set_port_interest_flags.append("ip_address") -# self.desired_set_port_interest_flags.append("ip_Mask") -# self.desired_set_port_interest_flags.append("ip_gateway") -# self.ip_list = LFUtils.gen_ip_series(ip_addr=self.first_ip_addr, netmask=self.netmask, -# num_ips=self.num_macvlans) - -# self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags, -# set_port.set_port_current_flags) -# self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags, -# set_port.set_port_interest_flags) -# set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port") - -# if self.dhcp: -# print("Using DHCP") -# self.desired_set_port_current_flags.append("use_dhcp") -# self.desired_set_port_interest_flags.append("dhcp") - -# for i in range(len(self.desired_macvlans)): -# data = { -# "shelf": self.shelf, -# "resource": self.resource, -# "mac": "xx:xx:xx:*:*:xx", -# "port": self.local_realm.name_to_eid(self.macvlan_parent)[2], -# "index": int(self.desired_macvlans[i][self.desired_macvlans[i].index('#') + 1:]), -# "flags": None -# } -# if admin_down: -# data["flags"] = 1 -# else: -# data["flags"] = 0 -# self.created_macvlans.append("%s.%s.%s#%d" % (self.shelf, self.resource, -# self.macvlan_parent, int( -# self.desired_macvlans[i][self.desired_macvlans[i].index('#') + 1:]))) -# self.local_realm.json_post(req_url, data) -# time.sleep(sleep_time) - -# LFUtils.wait_until_ports_appear(base_url=self.lfclient_url, port_list=self.created_macvlans) -# print(self.created_macvlans) - -# time.sleep(5) - -# for i in range(len(self.created_macvlans)): -# eid = self.local_realm.name_to_eid(self.created_macvlans[i]) -# name = eid[2] -# self.set_port_data["port"] = name # for set_port calls. -# if not self.dhcp and self.first_ip_addr is not None and self.netmask is not None \ -# and self.gateway is not None: -# self.set_port_data["ip_addr"] = self.ip_list[i] -# self.set_port_data["netmask"] = self.netmask -# self.set_port_data["gateway"] = self.gateway -# set_port_r.addPostData(self.set_port_data) -# json_response = set_port_r.jsonPost(debug) -# time.sleep(sleep_time) - -# def cleanup(self): -# print("Cleaning up MACVLANs...") -# print(self.created_macvlans) -# for port_eid in self.created_macvlans: -# self.local_realm.rm_port(port_eid, check_exists=True) -# time.sleep(.02) -# # And now see if they are gone -# LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=self.created_macvlans) - -# def admin_up(self): -# for macvlan in self.created_macvlans: -# self.local_realm.admin_up(macvlan) - -# def admin_down(self): -# for macvlan in self.created_macvlans: -# self.local_realm.admin_down(macvlan) - - class PacketFilter(): def get_filter_wlan_assoc_packets(self, ap_mac, sta_mac): @@ -2630,673 +940,3 @@ class PacketFilter(): lines.append(line.rstrip()) return lines - - -# class PortUtils(): -# def __init__(self, local_realm): -# self.local_realm = local_realm - -# def set_ftp(self, port_name="", resource=1, on=False): -# if port_name != "": -# data = { -# "shelf": 1, -# "resource": resource, -# "port": port_name, -# "current_flags": 0, -# "interest": 0 -# } - -# if on: -# data["current_flags"] = 0x400000000000 -# data["interest"] = 0x10000000 -# else: -# data["interest"] = 0x10000000 - -# self.local_realm.json_post("cli-json/set_port", data) -# else: -# raise ValueError("Port name required") - -# def set_http(self, port_name="", resource=1, on=False): -# if port_name != "": -# data = { -# "shelf": 1, -# "resource": resource, -# "port": port_name, -# "current_flags": 0, -# "interest": 0 -# } - -# if on: -# data["current_flags"] = 0x200000000000 -# data["interest"] = 0x8000000 -# else: -# data["interest"] = 0x8000000 - -# self.local_realm.json_post("cli-json/set_port", data) -# else: -# raise ValueError("Port name required") - - -# class HTTPProfile(LFCliBase): -# def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False): -# super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) -# self.lfclient_url = "http://%s:%s" % (lfclient_host, lfclient_port) -# self.debug = debug_ -# self.requests_per_ten = 600 -# self.local_realm = local_realm -# self.created_cx = {} -# self.created_endp = [] -# self.ip_map = {} -# self.direction = "dl" -# self.dest = "/dev/null" -# self.port_util = PortUtils(self.local_realm) - -# def check_errors(self, debug=False): -# fields_list = ["!conn", "acc.+denied", "bad-proto", "bad-url", "other-err", "total-err", "rslv-p", "rslv-h", -# "timeout", "nf+(4xx)", "http-r", "http-p", "http-t", "login-denied"] -# endp_list = self.json_get("layer4/list?fields=%s" % ','.join(fields_list)) -# debug_info = {} -# if endp_list is not None and endp_list['endpoint'] is not None: -# endp_list = endp_list['endpoint'] -# expected_passes = len(endp_list) -# passes = len(endp_list) -# for item in range(len(endp_list)): -# for name, info in endp_list[item].items(): -# for field in fields_list: -# if info[field.replace("+", " ")] > 0: -# passes -= 1 -# debug_info[name] = {field: info[field.replace("+", " ")]} -# if debug: -# print(debug_info) -# if passes == expected_passes: -# return True -# else: -# print(list(debug_info), " Endps in this list showed errors getting to its URL") # %s") % self.url) -# return False - -# def start_cx(self): -# print("Starting CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "RUNNING" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def stop_cx(self): -# print("Stopping CXs...") -# for cx_name in self.created_cx.keys(): -# self.json_post("/cli-json/set_cx_state", { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name], -# "cx_state": "STOPPED" -# }, debug_=self.debug) -# print(".", end='') -# print("") - -# def cleanup(self): -# print("Cleaning up cxs and endpoints") -# if len(self.created_cx) != 0: -# for cx_name in self.created_cx.keys(): -# req_url = "cli-json/rm_cx" -# data = { -# "test_mgr": "default_tm", -# "cx_name": self.created_cx[cx_name] -# } -# self.json_post(req_url, data) -# # pprint(data) -# req_url = "cli-json/rm_endp" -# data = { -# "endp_name": cx_name -# } -# self.json_post(req_url, data) -# # pprint(data) - -# def map_sta_ips(self, sta_list=[]): -# for sta_eid in sta_list: -# eid = self.local_realm.name_to_eid(sta_eid) -# sta_list = self.json_get("/port/%s/%s/%s?fields=alias,ip" % -# (eid[0], eid[1], eid[2])) -# if sta_list['interface'] is not None: -# self.ip_map[sta_list['interface']['alias']] = sta_list['interface']['ip'] - -# def create(self, ports=[], sleep_time=.5, debug_=False, suppress_related_commands_=None, http=False, ftp=False, -# user=None, passwd=None, source=None): -# cx_post_data = [] -# self.map_sta_ips(ports) -# for i in range(len(list(self.ip_map))): -# url = None -# if i != len(list(self.ip_map)) - 1: -# port_name = list(self.ip_map)[i] -# ip_addr = self.ip_map[list(self.ip_map)[i + 1]] -# else: -# port_name = list(self.ip_map)[i] -# ip_addr = self.ip_map[list(self.ip_map)[0]] - -# if (ip_addr is None) or (ip_addr == ""): -# raise ValueError("HTTPProfile::create encountered blank ip/hostname") - -# if len(self.local_realm.name_to_eid(port_name)) == 3: -# shelf = self.local_realm.name_to_eid(port_name)[0] -# resource = self.local_realm.name_to_eid(port_name)[1] -# name = self.local_realm.name_to_eid(port_name)[2] -# else: -# raise ValueError("Unexpected name for port_name %s" % port_name) - -# if http: -# self.port_util.set_http(port_name=name, resource=resource, on=True) -# url = "%s http://%s/ %s" % (self.direction, ip_addr, self.dest) -# if ftp: -# self.port_util.set_ftp(port_name=name, resource=resource, on=True) -# if user is not None and passwd is not None and source is not None: -# url = "%s ftp://%s:%s@%s%s %s" % (self.direction, user, passwd, ip_addr, source, self.dest) -# else: -# raise ValueError("user: %s, passwd: %s, and source: %s must all be set" % (user, passwd, source)) -# if not http and not ftp: -# raise ValueError("Please specify ftp and/or http") - -# if (url is None) or (url == ""): -# raise ValueError("HTTPProfile::create: url unset") - -# endp_data = { -# "alias": name + "_l4", -# "shelf": shelf, -# "resource": resource, -# "port": name, -# "type": "l4_generic", -# "timeout": 10, -# "url_rate": self.requests_per_ten, -# "url": url, -# "proxy_auth_type": 0x200 -# } -# url = "cli-json/add_l4_endp" -# self.local_realm.json_post(url, endp_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - -# endp_data = { -# "alias": "CX_" + name + "_l4", -# "test_mgr": "default_tm", -# "tx_endp": name + "_l4", -# "rx_endp": "NA" -# } -# cx_post_data.append(endp_data) -# self.created_cx[name + "_l4"] = "CX_" + name + "_l4" - -# for cx_data in cx_post_data: -# url = "/cli-json/add_cx" -# self.local_realm.json_post(url, cx_data, debug_=debug_, -# suppress_related_commands_=suppress_related_commands_) -# time.sleep(sleep_time) - - -# # use the station profile to set the combination of features you want on your stations -# # once this combination is configured, build the stations with the build(resource, radio, number) call -# # build() calls will fail if the station already exists. Please survey and clean your resource -# # before calling build() -# # survey = Realm.findStations(resource=1) -# # Realm.removeStations(survey) -# # profile = Realm.newStationProfile() -# # profile.set... -# # profile.build(resource, radio, 64) -# # -# class StationProfile: -# def __init__(self, lfclient_url, local_realm, -# ssid="NA", -# ssid_pass="NA", -# security="open", -# number_template_="00000", -# mode=0, -# up=True, -# resource=1, -# shelf=1, -# dhcp=True, -# debug_=False, -# use_ht160=False): -# self.debug = debug_ -# self.lfclient_url = lfclient_url -# self.ssid = ssid -# self.ssid_pass = ssid_pass -# self.mode = mode -# self.up = up -# self.resource = resource -# self.shelf = shelf -# self.dhcp = dhcp -# self.security = security -# self.local_realm = local_realm -# self.use_ht160 = use_ht160 -# self.COMMANDS = ["add_sta", "set_port"] -# self.desired_add_sta_flags = ["wpa2_enable", "80211u_enable", "create_admin_down"] -# self.desired_add_sta_flags_mask = ["wpa2_enable", "80211u_enable", "create_admin_down"] -# self.number_template = number_template_ -# self.station_names = [] # eids, these are created station names -# self.add_sta_data = { -# "shelf": 1, -# "resource": 1, -# "radio": None, -# "sta_name": None, -# "ssid": None, -# "key": None, -# "mode": 0, -# "mac": "xx:xx:xx:xx:*:xx", -# "flags": 0, # (0x400 + 0x20000 + 0x1000000000) # create admin down -# } -# self.desired_set_port_cmd_flags = [] -# self.desired_set_port_current_flags = ["if_down"] -# self.desired_set_port_interest_flags = ["current_flags", "ifdown"] -# if self.dhcp: -# self.desired_set_port_current_flags.append("use_dhcp") -# self.desired_set_port_interest_flags.append("dhcp") - -# self.set_port_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "current_flags": 0, -# "interest": 0, # (0x2 + 0x4000 + 0x800000) # current, dhcp, down, -# } -# self.wifi_extra_data_modified = False -# self.wifi_extra_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "key_mgmt": None, -# "eap": None, -# "hessid": None, -# "identity": None, -# "password": None, -# "realm": None, -# "domain": None -# } - -# self.reset_port_extra_data = { -# "shelf": 1, -# "resource": 1, -# "port": None, -# "test_duration": 0, -# "reset_port_enable": False, -# "reset_port_time_min": 0, -# "reset_port_time_max": 0, -# "reset_port_timer_started": False, -# "port_to_reset": 0, -# "seconds_till_reset": 0 -# } - -# def set_wifi_extra(self, key_mgmt="WPA-EAP", -# pairwise="CCMP TKIP", -# group="CCMP TKIP", -# psk="[BLANK]", -# wep_key="[BLANK]", # wep key -# ca_cert="[BLANK]", -# eap="TTLS", -# identity="testuser", -# anonymous_identity="[BLANK]", -# phase1="NA", # outter auth -# phase2="NA", # inner auth -# passwd="testpasswd", # eap passphrase -# pin="NA", -# pac_file="NA", -# private_key="NA", -# pk_password="NA", # priv key password -# hessid="00:00:00:00:00:01", -# realm="localhost.localdomain", -# client_cert="NA", -# imsi="NA", -# milenage="NA", -# domain="localhost.localdomain", -# roaming_consortium="NA", -# venue_group="NA", -# network_type="NA", -# ipaddr_type_avail="NA", -# network_auth_type="NA", -# anqp_3gpp_cell_net="NA" -# ): -# self.wifi_extra_data_modified = True -# self.wifi_extra_data["key_mgmt"] = key_mgmt -# self.wifi_extra_data["pairwise"] = pairwise -# self.wifi_extra_data["group"] = group -# self.wifi_extra_data["psk"] = psk -# self.wifi_extra_data["key"] = wep_key -# self.wifi_extra_data["ca_cert"] = ca_cert -# self.wifi_extra_data["eap"] = eap -# self.wifi_extra_data["identity"] = identity -# self.wifi_extra_data["anonymous_identity"] = anonymous_identity -# self.wifi_extra_data["phase1"] = phase1 -# self.wifi_extra_data["phase2"] = phase2 -# self.wifi_extra_data["password"] = passwd -# self.wifi_extra_data["pin"] = pin -# self.wifi_extra_data["pac_file"] = pac_file -# self.wifi_extra_data["private_key"] = private_key -# self.wifi_extra_data["pk_passwd"] = pk_password -# self.wifi_extra_data["hessid"] = hessid -# self.wifi_extra_data["realm"] = realm -# self.wifi_extra_data["client_cert"] = client_cert -# self.wifi_extra_data["imsi"] = imsi -# self.wifi_extra_data["milenage"] = milenage -# self.wifi_extra_data["domain"] = domain -# self.wifi_extra_data["roaming_consortium"] = roaming_consortium -# self.wifi_extra_data["venue_group"] = venue_group -# self.wifi_extra_data["network_type"] = network_type -# self.wifi_extra_data["ipaddr_type_avail"] = ipaddr_type_avail -# self.wifi_extra_data["network_auth_type"] = network_auth_type -# self.wifi_extra_data["anqp_3gpp_cell_net"] = anqp_3gpp_cell_net - -# def set_reset_extra(self, reset_port_enable=False, test_duration=0, reset_port_min_time=0, reset_port_max_time=0, -# reset_port_timer_start=False, port_to_reset=0, time_till_reset=0): -# self.reset_port_extra_data["reset_port_enable"] = reset_port_enable -# self.reset_port_extra_data["test_duration"] = test_duration -# self.reset_port_extra_data["reset_port_time_min"] = reset_port_min_time -# self.reset_port_extra_data["reset_port_time_max"] = reset_port_max_time - -# def use_security(self, security_type, ssid=None, passwd=None): -# types = {"wep": "wep_enable", "wpa": "wpa_enable", "wpa2": "wpa2_enable", "wpa3": "use-wpa3", "open": "[BLANK]"} -# self.add_sta_data["ssid"] = ssid -# if security_type in types.keys(): -# if (ssid is None) or (ssid == ""): -# raise ValueError("use_security: %s requires ssid" % security_type) -# if (passwd is None) or (passwd == ""): -# raise ValueError("use_security: %s requires passphrase or [BLANK]" % security_type) -# for name in types.values(): -# if name in self.desired_add_sta_flags and name in self.desired_add_sta_flags_mask: -# self.desired_add_sta_flags.remove(name) -# self.desired_add_sta_flags_mask.remove(name) -# if security_type != "open": -# self.desired_add_sta_flags.append(types[security_type]) -# # self.set_command_flag("add_sta", types[security_type], 1) -# self.desired_add_sta_flags_mask.append(types[security_type]) -# else: -# passwd = "[BLANK]" -# self.set_command_param("add_sta", "ssid", ssid) -# self.set_command_param("add_sta", "key", passwd) -# # unset any other security flag before setting our present flags -# if security_type == "wpa3": -# self.set_command_param("add_sta", "ieee80211w", 2) - -# # self.add_sta_data["key"] = passwd - -# def set_command_param(self, command_name, param_name, param_value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# raise ValueError("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# # return -# if command_name == "add_sta": -# self.add_sta_data[param_name] = param_value -# elif command_name == "set_port": -# self.set_port_data[param_name] = param_value - -# def set_command_flag(self, command_name, param_name, value): -# # we have to check what the param name is -# if (command_name is None) or (command_name == ""): -# return -# if (param_name is None) or (param_name == ""): -# return -# if command_name not in self.COMMANDS: -# print("Command name name [%s] not defined in %s" % (command_name, self.COMMANDS)) -# return -# if command_name == "add_sta": -# if (param_name not in add_sta.add_sta_flags) and (param_name not in add_sta.add_sta_modes): -# print("Parameter name [%s] not defined in add_sta.py" % param_name) -# if self.debug: -# pprint(add_sta.add_sta_flags) -# return -# if (value == 1) and (param_name not in self.desired_add_sta_flags): -# self.desired_add_sta_flags.append(param_name) -# self.desired_add_sta_flags_mask.append(param_name) -# elif value == 0: -# self.desired_add_sta_flags.remove(param_name) -# self.desired_add_sta_flags_mask.append(param_name) - -# elif command_name == "set_port": -# if (param_name not in set_port.set_port_current_flags) and ( -# param_name not in set_port.set_port_cmd_flags) and ( -# param_name not in set_port.set_port_interest_flags): -# print("Parameter name [%s] not defined in set_port.py" % param_name) -# if self.debug: -# pprint(set_port.set_port_cmd_flags) -# pprint(set_port.set_port_current_flags) -# pprint(set_port.set_port_interest_flags) -# return -# if (param_name in set_port.set_port_cmd_flags): -# if (value == 1) and (param_name not in self.desired_set_port_cmd_flags): -# self.desired_set_port_cmd_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_cmd_flags.remove(param_name) -# elif (param_name in set_port.set_port_current_flags): -# if (value == 1) and (param_name not in self.desired_set_port_current_flags): -# self.desired_set_port_current_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_current_flags.remove(param_name) -# elif (param_name in set_port.set_port_interest_flags): -# if (value == 1) and (param_name not in self.desired_set_port_interest_flags): -# self.desired_set_port_interest_flags.append(param_name) -# elif value == 0: -# self.desired_set_port_interest_flags.remove(param_name) -# else: -# raise ValueError("Unknown param name: " + param_name) - -# # use this for hinting station name; stations begin with 'sta', the -# # stations created with a prefix '0100' indicate value 10100 + n with -# # resulting substring(1,) applied; station 900 becomes 'sta1000' -# def set_number_template(self, pref): -# self.number_template = pref - -# def add_named_flags(self, desired_list, command_ref): -# if desired_list is None: -# raise ValueError("addNamedFlags wants a list of desired flag names") -# if len(desired_list) < 1: -# print("addNamedFlags: empty desired list") -# return 0 -# if (command_ref is None) or (len(command_ref) < 1): -# raise ValueError("addNamedFlags wants a maps of flag values") - -# result = 0 -# for name in desired_list: -# if (name is None) or (name == ""): -# continue -# if name not in command_ref: -# if self.debug: -# pprint(command_ref) -# raise ValueError("flag %s not in map" % name) -# result += command_ref[name] - -# return result - -# def admin_up(self): -# for eid in self.station_names: -# # print("3139: admin_up sta "+eid) -# # time.sleep(2) -# self.local_realm.admin_up(eid) -# time.sleep(0.005) - -# def admin_down(self): -# for sta_name in self.station_names: -# self.local_realm.admin_down(sta_name) - -# def cleanup(self, desired_stations=None, delay=0.03, debug_=False): -# print("Cleaning up stations") - -# if (desired_stations is None): -# desired_stations = self.station_names - -# if len(desired_stations) < 1: -# print("ERROR: StationProfile cleanup, list is empty") -# return - -# # First, request remove on the list. -# for port_eid in desired_stations: -# self.local_realm.rm_port(port_eid, check_exists=True, debug_=debug_) -# time.sleep(delay) -# # And now see if they are gone -# LFUtils.wait_until_ports_disappear(base_url=self.lfclient_url, port_list=desired_stations) - -# # Checks for errors in initialization values and creates specified number of stations using init parameters -# def create(self, radio, -# num_stations=0, -# sta_names_=None, -# dry_run=False, -# up_=None, -# debug=False, -# suppress_related_commands_=True, -# use_radius=False, -# hs20_enable=False, -# sleep_time=0.02): -# if (radio is None) or (radio == ""): -# raise ValueError("station_profile.create: will not create stations without radio") -# radio_eid = self.local_realm.name_to_eid(radio) -# radio_shelf = radio_eid[0] -# radio_resource = radio_eid[1] -# radio_port = radio_eid[2] - -# if self.use_ht160: -# self.desired_add_sta_flags.append("ht160_enable") -# self.desired_add_sta_flags_mask.append("ht160_enable") -# if self.mode is not None: -# self.add_sta_data["mode"] = self.mode -# if use_radius: -# self.desired_add_sta_flags.append("8021x_radius") -# self.desired_add_sta_flags_mask.append("8021x_radius") -# if hs20_enable: -# self.desired_add_sta_flags.append("hs20_enable") -# self.desired_add_sta_flags_mask.append("hs20_enable") -# if up_ is not None: -# self.up = up_ - -# if (sta_names_ is None) and (num_stations == 0): -# raise ValueError("StationProfile.create needs either num_stations= or sta_names_= specified") - -# if self.up: -# if "create_admin_down" in self.desired_add_sta_flags: -# del self.desired_add_sta_flags[self.desired_add_sta_flags.index("create_admin_down")] -# elif "create_admin_down" not in self.desired_add_sta_flags: -# self.desired_add_sta_flags.append("create_admin_down") - -# # create stations down, do set_port on them, then set stations up -# self.add_sta_data["flags"] = self.add_named_flags(self.desired_add_sta_flags, add_sta.add_sta_flags) -# self.add_sta_data["flags_mask"] = self.add_named_flags(self.desired_add_sta_flags_mask, add_sta.add_sta_flags) -# self.add_sta_data["radio"] = radio_port - -# self.add_sta_data["resource"] = radio_resource -# self.add_sta_data["shelf"] = radio_shelf -# self.set_port_data["resource"] = radio_resource -# self.set_port_data["shelf"] = radio_shelf -# self.set_port_data["current_flags"] = self.add_named_flags(self.desired_set_port_current_flags, -# set_port.set_port_current_flags) -# self.set_port_data["interest"] = self.add_named_flags(self.desired_set_port_interest_flags, -# set_port.set_port_interest_flags) -# self.wifi_extra_data["resource"] = radio_resource -# self.wifi_extra_data["shelf"] = radio_shelf -# self.reset_port_extra_data["resource"] = radio_resource -# self.reset_port_extra_data["shelf"] = radio_shelf - -# # these are unactivated LFRequest objects that we can modify and -# # re-use inside a loop, reducing the number of object creations -# add_sta_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/add_sta", debug_=debug) -# set_port_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_port", debug_=debug) -# wifi_extra_r = LFRequest.LFRequest(self.lfclient_url + "/cli-json/set_wifi_extra", debug_=debug) -# my_sta_names = [] -# # add radio here -# if (num_stations > 0) and (len(sta_names_) < 1): -# # print("CREATING MORE STA NAMES == == == == == == == == == == == == == == == == == == == == == == == ==") -# my_sta_names = LFUtils.portNameSeries("sta", 0, num_stations - 1, int("1" + self.number_template)) -# # print("CREATING MORE STA NAMES == == == == == == == == == == == == == == == == == == == == == == == ==") -# else: -# my_sta_names = sta_names_ - -# if (len(my_sta_names) >= 15) or (suppress_related_commands_ == True): -# self.add_sta_data["suppress_preexec_cli"] = "yes" -# self.add_sta_data["suppress_preexec_method"] = 1 -# self.set_port_data["suppress_preexec_cli"] = "yes" -# self.set_port_data["suppress_preexec_method"] = 1 - -# num = 0 -# if debug: -# print("== == Created STA names == == == == == == == == == == == == == == == == == == == == == == == ==") -# pprint(self.station_names) -# print("== == vs Pending STA names == ==") -# pprint(my_sta_names) -# print("== == == == == == == == == == == == == == == == == == == == == == == == == ==") - -# # track the names of stations in case we have stations added multiple times -# finished_sta = [] - -# for eidn in my_sta_names: -# if eidn in self.station_names: -# print("Station %s already created, skipping." % eidn) -# continue - -# # print (" EIDN "+eidn); -# if eidn in finished_sta: -# # pprint(my_sta_names) -# # raise ValueError("************ duplicate ****************** "+eidn) -# if self.debug: -# print("Station %s already created" % eidn) -# continue - -# eid = self.local_realm.name_to_eid(eidn) -# name = eid[2] -# num += 1 -# self.add_sta_data["shelf"] = radio_shelf -# self.add_sta_data["resource"] = radio_resource -# self.add_sta_data["radio"] = radio_port -# self.add_sta_data["sta_name"] = name # for create station calls -# self.set_port_data["port"] = name # for set_port calls. -# self.set_port_data["shelf"] = radio_shelf -# self.set_port_data["resource"] = radio_resource - -# add_sta_r.addPostData(self.add_sta_data) -# if debug: -# print("- 3254 - %s- - - - - - - - - - - - - - - - - - " % eidn) -# pprint(add_sta_r.requested_url) -# pprint(add_sta_r.proxies) -# pprint(self.add_sta_data) -# pprint(self.set_port_data) -# print("- ~3254 - - - - - - - - - - - - - - - - - - - ") -# if dry_run: -# print("dry run: not creating " + eidn) -# continue - -# # print("- 3264 - ## %s ## add_sta_r.jsonPost - - - - - - - - - - - - - - - - - - "%eidn) -# json_response = add_sta_r.jsonPost(debug=self.debug) -# finished_sta.append(eidn) -# # print("- ~3264 - %s - add_sta_r.jsonPost - - - - - - - - - - - - - - - - - - "%eidn) -# time.sleep(0.01) -# set_port_r.addPostData(self.set_port_data) -# # print("- 3270 -- %s -- set_port_r.jsonPost - - - - - - - - - - - - - - - - - - "%eidn) -# json_response = set_port_r.jsonPost(debug) -# # print("- ~3270 - %s - set_port_r.jsonPost - - - - - - - - - - - - - - - - - - "%eidn) -# time.sleep(0.01) - -# self.wifi_extra_data["resource"] = radio_resource -# self.wifi_extra_data["port"] = name -# if self.wifi_extra_data_modified: -# wifi_extra_r.addPostData(self.wifi_extra_data) -# json_response = wifi_extra_r.jsonPost(debug) - -# # append created stations to self.station_names -# self.station_names.append("%s.%s.%s" % (radio_shelf, radio_resource, name)) -# time.sleep(sleep_time) - -# # print("- ~3287 - waitUntilPortsAppear - - - - - - - - - - - - - - - - - - "%eidn) -# LFUtils.wait_until_ports_appear(self.lfclient_url, my_sta_names) - -# # and set ports up -# if dry_run: -# return -# if (self.up): -# self.admin_up() - -# # for sta_name in self.station_names: -# # req = LFUtils.portUpRequest(resource, sta_name, debug_on=False) -# # set_port_r.addPostData(req) -# # json_response = set_port_r.jsonPost(debug) -# # time.sleep(0.03) -# if self.debug: -# print("created %s stations" % num) - -# #