diff --git a/py-json/l3_cxprofile.py b/py-json/l3_cxprofile.py new file mode 100644 index 00000000..0f7f6f04 --- /dev/null +++ b/py-json/l3_cxprofile.py @@ -0,0 +1,511 @@ +import LANforge.base_profile + + +class L3CXProfile(BaseProfile): + def __init__(self, + lfclient_host, + lfclient_port, + local_realm, + side_a_min_bps=None, + side_b_min_bps=None, + side_a_max_bps=0, + side_b_max_bps=0, + side_a_min_pdu=-1, + side_b_min_pdu=-1, + side_a_max_pdu=0, + side_b_max_pdu=0, + report_timer_=3000, + name_prefix_="Unset", + number_template_="00000", + debug_=False): + """ + :param lfclient_host: + :param lfclient_port: + :param local_realm: + :param side_a_min_bps: + :param side_b_min_bps: + :param side_a_max_bps: + :param side_b_max_bps: + :param side_a_min_pdu: + :param side_b_min_pdu: + :param side_a_max_pdu: + :param side_b_max_pdu: + :param name_prefix_: prefix string for connection + :param number_template_: how many zeros wide we padd, possibly a starting integer with left padding + :param debug_: + """ + super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) + self.debug = debug_ + self.local_realm = local_realm + self.side_a_min_pdu = side_a_min_pdu + self.side_b_min_pdu = side_b_min_pdu + self.side_a_max_pdu = side_a_max_pdu + self.side_b_max_pdu = side_b_max_pdu + self.side_a_min_bps = side_a_min_bps + self.side_b_min_bps = side_b_min_bps + self.side_a_max_bps = side_a_max_bps + self.side_b_max_bps = side_b_max_bps + self.report_timer = report_timer_ + self.created_cx = {} + self.created_endp = {} + self.name_prefix = name_prefix_ + self.number_template = number_template_ + + def get_cx_names(self): + return self.created_cx.keys() + + def get_cx_report(self): + self.data = {} + for cx_name in self.get_cx_names(): + self.data[cx_name] = self.json_get("/cx/" + cx_name).get(cx_name) + return self.data + + def __get_rx_values(self): + cx_list = self.json_get("endp?fields=name,rx+bytes") + if self.debug: + print(self.created_cx.values()) + print("==============\n", cx_list, "\n==============") + cx_rx_map = {} + for cx_name in cx_list['endpoint']: + if cx_name != 'uri' and cx_name != 'handler': + for item, value in cx_name.items(): + for value_name, value_rx in value.items(): + if value_name == 'rx bytes' and item in self.created_cx.values(): + cx_rx_map[item] = value_rx + return cx_rx_map + + def __compare_vals(self, old_list, new_list): + passes = 0 + expected_passes = 0 + if len(old_list) == len(new_list): + for item, value in old_list.items(): + expected_passes += 1 + if new_list[item] > old_list[item]: + passes += 1 + + if passes == expected_passes: + return True + else: + return False + else: + return False + + def instantiate_file(self, file_name, file_format): + pass + + def monitor(self, + duration_sec=60, + monitor_interval_ms=1, + sta_list=None, + layer3_cols=None, + port_mgr_cols=None, + created_cx=None, + monitor=True, + report_file=None, + output_format=None, + script_name=None, + arguments=None, + compared_report=None, + debug=False): + try: + duration_sec = self.parse_time(duration_sec).seconds + except: + if (duration_sec is None) or (duration_sec <= 1): + raise ValueError("L3CXProfile::monitor wants duration_sec > 1 second") + if (duration_sec <= monitor_interval_ms): + raise ValueError("L3CXProfile::monitor wants duration_sec > monitor_interval") + if report_file == None: + raise ValueError("Monitor requires an output file to be defined") + if created_cx == None: + raise ValueError("Monitor needs a list of Layer 3 connections") + if (monitor_interval_ms is None) or (monitor_interval_ms < 1): + raise ValueError("L3CXProfile::monitor wants monitor_interval >= 1 second") + if layer3_cols is None: + raise ValueError("L3CXProfile::monitor wants a list of column names to monitor") + if output_format is not None: + if output_format.lower() != report_file.split('.')[-1]: + raise ValueError('Filename %s has an extension that does not match output format %s .' % (report_file, output_format)) + else: + output_format = report_file.split('.')[-1] + + + #default save to csv first + if report_file.split('.')[-1] != 'csv': + report_file = report_file.replace(str(output_format),'csv',1) + print("Saving rolling data into..." + str(report_file)) + + #================== Step 1, set column names and header row + layer3_cols=[self.replace_special_char(x) for x in layer3_cols] + layer3_fields = ",".join(layer3_cols) + default_cols=['Timestamp','Timestamp milliseconds epoch','Duration elapsed'] + default_cols.extend(layer3_cols) + header_row=default_cols + + + #csvwriter.writerow([systeminfo['VersionInfo']['BuildVersion'], script_name, str(arguments)]) + + if port_mgr_cols is not None: + port_mgr_cols=[self.replace_special_char(x) for x in port_mgr_cols] + port_mgr_cols_labelled =[] + for col_name in port_mgr_cols: + port_mgr_cols_labelled.append("port mgr - " + col_name) + + port_mgr_fields=",".join(port_mgr_cols) + header_row.extend(port_mgr_cols_labelled) + #add sys info to header row + systeminfo = self.json_get('/') + header_row.extend([str("LANforge GUI Build: " + systeminfo['VersionInfo']['BuildVersion']), str("Script Name: " + script_name), str("Argument input: " + str(arguments))]) + sta_list_edit=[] + if sta_list is not None: + for sta in sta_list: + sta_list_edit.append(sta[4:]) + sta_list=",".join(sta_list_edit) + + #================== Step 2, monitor columns + start_time = datetime.datetime.now() + end_time = start_time + datetime.timedelta(seconds=duration_sec) + + passes = 0 + expected_passes = 0 + old_cx_rx_values = self.__get_rx_values() + + #instantiate csv file here, add specified column headers + csvfile=open(str(report_file),'w') + csvwriter = csv.writer(csvfile,delimiter=",") + csvwriter.writerow(header_row) + + #wait 10 seconds to get proper port data + time.sleep(10) + + # for x in range(0,int(round(iterations,0))): + initial_starttime= datetime.datetime.now() + while datetime.datetime.now() < end_time: + t = datetime.datetime.now() + timestamp= t.strftime("%m/%d/%Y %I:%M:%S") + t_to_millisec_epoch= int(self.get_milliseconds(t)) + time_elapsed=int(self.get_seconds(t))-int(self.get_seconds(initial_starttime)) + + layer_3_response = self.json_get("/endp/%s?fields=%s" % (created_cx, layer3_fields)) + if port_mgr_cols is not None: + port_mgr_response=self.json_get("/port/1/1/%s?fields=%s" % (sta_list, port_mgr_fields)) + #get info from port manager with list of values from cx_a_side_list + if "endpoint" not in layer_3_response or layer_3_response is None: + print(layer_3_response) + raise ValueError("Cannot find columns requested to be searched. Exiting script, please retry.") + if debug: + print("Json layer_3_response from LANforge... " + str(layer_3_response)) + if port_mgr_cols is not None: + if "interfaces" not in port_mgr_response or port_mgr_response is None: + print(port_mgr_response) + raise ValueError("Cannot find columns requested to be searched. Exiting script, please retry.") + if debug: + print("Json port_mgr_response from LANforge... " + str(port_mgr_response)) + + + + temp_list=[] + for endpoint in layer_3_response["endpoint"]: + if debug: + print("Current endpoint values list... ") + print(list(endpoint.values())[0]) + temp_endp_values=list(endpoint.values())[0] #dict + temp_list.extend([timestamp,t_to_millisec_epoch,time_elapsed]) + current_sta = temp_endp_values['name'] + merge={} + if port_mgr_cols is not None: + for sta_name in sta_list_edit: + if sta_name in current_sta: + for interface in port_mgr_response["interfaces"]: + if sta_name in list(interface.keys())[0]: + merge=temp_endp_values.copy() + #rename keys (separate port mgr 'rx bytes' from layer3 'rx bytes') + port_mgr_values_dict =list(interface.values())[0] + renamed_port_cols={} + for key in port_mgr_values_dict.keys(): + renamed_port_cols['port mgr - ' +key]=port_mgr_values_dict[key] + merge.update(renamed_port_cols) + for name in header_row[3:-3]: + temp_list.append(merge[name]) + csvwriter.writerow(temp_list) + temp_list.clear() + new_cx_rx_values = self.__get_rx_values() + if debug: + print(old_cx_rx_values, new_cx_rx_values) + print("\n-----------------------------------") + print(t) + print("-----------------------------------\n") + expected_passes += 1 + if self.__compare_vals(old_cx_rx_values, new_cx_rx_values): + passes += 1 + else: + self.fail("FAIL: Not all stations increased traffic") + self.exit_fail() + old_cx_rx_values = new_cx_rx_values + time.sleep(monitor_interval_ms) + csvfile.close() + + #comparison to last report / report inputted + if compared_report is not None: + compared_df = self.compare_two_df(dataframe_one=self.file_to_df(report_file), dataframe_two=self.file_to_df(compared_report)) + exit(1) + #append compared df to created one + if output_format.lower() != 'csv': + self.df_to_file(dataframe=pd.read_csv(report_file), output_f=output_format, save_path=report_file) + else: + if output_format.lower() != 'csv': + self.df_to_file(dataframe=pd.read_csv(report_file), output_f=output_format, save_path=report_file) + + + def refresh_cx(self): + for cx_name in self.created_cx.keys(): + self.json_post("/cli-json/show_cxe", { + "test_mgr": "ALL", + "cross_connect": cx_name + }, debug_=self.debug) + print(".", end='') + + def start_cx(self): + print("Starting CXs...") + for cx_name in self.created_cx.keys(): + if self.debug: + print("cx-name: %s" % (cx_name)) + self.json_post("/cli-json/set_cx_state", { + "test_mgr": "default_tm", + "cx_name": cx_name, + "cx_state": "RUNNING" + }, debug_=self.debug) + if self.debug: + print(".", end='') + if self.debug: + print("") + + def stop_cx(self): + print("Stopping CXs...") + for cx_name in self.created_cx.keys(): + self.local_realm.stop_cx(cx_name) + print(".", end='') + print("") + + def cleanup_prefix(self): + self.local_realm.cleanup_cxe_prefix(self.name_prefix) + + def cleanup(self): + print("Cleaning up cxs and endpoints") + if len(self.created_cx) != 0: + for cx_name in self.created_cx.keys(): + if self.debug: + print("Cleaning cx: %s"%(cx_name)) + self.local_realm.rm_cx(cx_name) + + for side in range(len(self.created_cx[cx_name])): + ename = self.created_cx[cx_name][side] + if self.debug: + print("Cleaning endpoint: %s"%(ename)) + self.local_realm.rm_endp(self.created_cx[cx_name][side]) + + def create(self, endp_type, side_a, side_b, sleep_time=0.03, suppress_related_commands=None, debug_=False, + tos=None): + if self.debug: + debug_ = True + + cx_post_data = [] + timer_post_data = [] + these_endp = [] + these_cx = [] + + # print(self.side_a_min_rate, self.side_a_max_rate) + # print(self.side_b_min_rate, self.side_b_max_rate) + if (self.side_a_min_bps is None) \ + or (self.side_a_max_bps is None) \ + or (self.side_b_min_bps is None) \ + or (self.side_b_max_bps is None): + raise ValueError( + "side_a_min_bps, side_a_max_bps, side_b_min_bps, and side_b_max_bps must all be set to a value") + + if type(side_a) == list and type(side_b) != list: + side_b_info = self.local_realm.name_to_eid(side_b) + side_b_shelf = side_b_info[0] + side_b_resource = side_b_info[1] + + for port_name in side_a: + side_a_info = self.local_realm.name_to_eid(port_name,debug=debug_) + side_a_shelf = side_a_info[0] + side_a_resource = side_a_info[1] + if port_name.find('.') < 0: + port_name = "%d.%s" % (side_a_info[1], port_name) + + cx_name = "%s%s-%i" % (self.name_prefix, side_a_info[2], len(self.created_cx)) + + endp_a_name = cx_name + "-A" + endp_b_name = cx_name + "-B" + self.created_cx[cx_name] = [endp_a_name, endp_b_name] + self.created_endp[endp_a_name] = endp_a_name + self.created_endp[endp_b_name] = endp_b_name + these_cx.append(cx_name) + these_endp.append(endp_a_name) + these_endp.append(endp_b_name) + endp_side_a = { + "alias": endp_a_name, + "shelf": side_a_shelf, + "resource": side_a_resource, + "port": side_a_info[2], + "type": endp_type, + "min_rate": self.side_a_min_bps, + "max_rate": self.side_a_max_bps, + "min_pkt": self.side_a_min_pdu, + "max_pkt": self.side_a_max_pdu, + "ip_port": -1 + } + endp_side_b = { + "alias": endp_b_name, + "shelf": side_b_shelf, + "resource": side_b_resource, + "port": side_b_info[2], + "type": endp_type, + "min_rate": self.side_b_min_bps, + "max_rate": self.side_b_max_bps, + "min_pkt": self.side_b_min_pdu, + "max_pkt": self.side_b_max_pdu, + "ip_port": -1 + } + + url = "/cli-json/add_endp" + self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands) + self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands) + #print("napping %f sec"%sleep_time) + time.sleep(sleep_time) + + url = "cli-json/set_endp_flag" + data = { + "name": endp_a_name, + "flag": "AutoHelper", + "val": 1 + } + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + data["name"] = endp_b_name + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + + if (endp_type == "lf_udp") or (endp_type == "udp") or (endp_type == "lf_udp6") or (endp_type == "udp6"): + data["name"] = endp_a_name + data["flag"] = "UseAutoNAT" + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + data["name"] = endp_b_name + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + + if tos != None: + self.local_realm.set_endp_tos(endp_a_name, tos) + self.local_realm.set_endp_tos(endp_b_name, tos) + + data = { + "alias": cx_name, + "test_mgr": "default_tm", + "tx_endp": endp_a_name, + "rx_endp": endp_b_name, + } + # pprint(data) + cx_post_data.append(data) + timer_post_data.append({ + "test_mgr": "default_tm", + "cx_name": cx_name, + "milliseconds": self.report_timer + }) + + elif type(side_b) == list and type(side_a) != list: + side_a_info = self.local_realm.name_to_eid(side_a,debug=debug_) + side_a_shelf = side_a_info[0] + side_a_resource = side_a_info[1] + # side_a_name = side_a_info[2] + + for port_name in side_b: + print(side_b) + side_b_info = self.local_realm.name_to_eid(port_name,debug=debug_) + side_b_shelf = side_b_info[0] + side_b_resource = side_b_info[1] + side_b_name = side_b_info[2] + + cx_name = "%s%s-%i" % (self.name_prefix, port_name, len(self.created_cx)) + endp_a_name = cx_name + "-A" + endp_b_name = cx_name + "-B" + self.created_cx[cx_name] = [endp_a_name, endp_b_name] + self.created_endp[endp_a_name] = endp_a_name + self.created_endp[endp_b_name] = endp_b_name + these_cx.append(cx_name) + these_endp.append(endp_a_name) + these_endp.append(endp_b_name) + endp_side_a = { + "alias": endp_a_name, + "shelf": side_a_shelf, + "resource": side_a_resource, + "port": side_a_info[2], + "type": endp_type, + "min_rate": self.side_a_min_bps, + "max_rate": self.side_a_max_bps, + "min_pkt": self.side_a_min_pdu, + "max_pkt": self.side_a_max_pdu, + "ip_port": -1 + } + endp_side_b = { + "alias": endp_b_name, + "shelf": side_b_shelf, + "resource": side_b_resource, + "port": side_b_info[2], + "type": endp_type, + "min_rate": self.side_b_min_bps, + "max_rate": self.side_b_max_bps, + "min_pkt": self.side_b_min_pdu, + "max_pkt": self.side_b_max_pdu, + "ip_port": -1 + } + + url = "/cli-json/add_endp" + self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands) + self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands) + #print("napping %f sec" %sleep_time ) + time.sleep(sleep_time) + + url = "cli-json/set_endp_flag" + data = { + "name": endp_a_name, + "flag": "autohelper", + "val": 1 + } + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + + url = "cli-json/set_endp_flag" + data = { + "name": endp_b_name, + "flag": "autohelper", + "val": 1 + } + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + #print("CXNAME451: %s" % cx_name) + data = { + "alias": cx_name, + "test_mgr": "default_tm", + "tx_endp": endp_a_name, + "rx_endp": endp_b_name, + } + cx_post_data.append(data) + timer_post_data.append({ + "test_mgr": "default_tm", + "cx_name": cx_name, + "milliseconds": self.report_timer + }) + else: + raise ValueError( + "side_a or side_b must be of type list but not both: side_a is type %s side_b is type %s" % ( + type(side_a), type(side_b))) + print("wait_until_endps_appear these_endp: {} debug_ {}".format(these_endp,debug_)) + self.local_realm.wait_until_endps_appear(these_endp, debug=debug_) + + for data in cx_post_data: + url = "/cli-json/add_cx" + self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) + time.sleep(0.01) + + self.local_realm.wait_until_cxs_appear(these_cx, debug=debug_) + + def to_string(self): + pprint.pprint(self) + + diff --git a/py-json/realm.py b/py-json/realm.py index 2339b122..9dbc7b35 100755 --- a/py-json/realm.py +++ b/py-json/realm.py @@ -14,6 +14,7 @@ from LANforge.lfcli_base import LFCliBase #from generic_cx import GenericCx from LANforge import add_monitor from LANforge.add_monitor import * +import LANforge.l3_cxprofile import os import datetime import base64 @@ -778,13 +779,15 @@ class Realm(LFCliBase): debug_=(self.debug or debug_)) return wifi_mon_prof - def new_l3_cx_profile(self): - cx_prof = L3CXProfile(self.lfclient_host, + def new_l3_cx_profile(self, ver = 1): + if ver == 1: + import l3_cxprofile + cx_prof = l3_cxprofile.L3CXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug, report_timer_=3000) - return cx_prof + return cx_prof def new_l4_cx_profile(self): cx_prof = L4CXProfile(self.lfclient_host, self.lfclient_port, local_realm=self, debug_=self.debug) @@ -1062,774 +1065,6 @@ class MULTICASTProfile(LFCliBase): pprint.pprint(self) -class L3CXProfile(LFCliBase): - def __init__(self, - lfclient_host, - lfclient_port, - local_realm, - side_a_min_bps=None, - side_b_min_bps=None, - side_a_max_bps=0, - side_b_max_bps=0, - side_a_min_pdu=-1, - side_b_min_pdu=-1, - side_a_max_pdu=0, - side_b_max_pdu=0, - report_timer_=3000, - name_prefix_="Unset", - number_template_="00000", - debug_=False): - """ - :param lfclient_host: - :param lfclient_port: - :param local_realm: - :param side_a_min_bps: - :param side_b_min_bps: - :param side_a_max_bps: - :param side_b_max_bps: - :param side_a_min_pdu: - :param side_b_min_pdu: - :param side_a_max_pdu: - :param side_b_max_pdu: - :param name_prefix_: prefix string for connection - :param number_template_: how many zeros wide we padd, possibly a starting integer with left padding - :param debug_: - """ - super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) - self.debug = debug_ - self.local_realm = local_realm - self.side_a_min_pdu = side_a_min_pdu - self.side_b_min_pdu = side_b_min_pdu - self.side_a_max_pdu = side_a_max_pdu - self.side_b_max_pdu = side_b_max_pdu - self.side_a_min_bps = side_a_min_bps - self.side_b_min_bps = side_b_min_bps - self.side_a_max_bps = side_a_max_bps - self.side_b_max_bps = side_b_max_bps - self.report_timer = report_timer_ - self.created_cx = {} - self.created_endp = {} - self.name_prefix = name_prefix_ - self.number_template = number_template_ - - def get_cx_names(self): - return self.created_cx.keys() - - def get_cx_report(self): - self.data = {} - for cx_name in self.get_cx_names(): - self.data[cx_name] = self.json_get("/cx/" + cx_name).get(cx_name) - return self.data - - def __get_rx_values(self): - cx_list = self.json_get("endp?fields=name,rx+bytes") - if self.debug: - print(self.created_cx.values()) - print("==============\n", cx_list, "\n==============") - cx_rx_map = {} - for cx_name in cx_list['endpoint']: - if cx_name != 'uri' and cx_name != 'handler': - for item, value in cx_name.items(): - for value_name, value_rx in value.items(): - if value_name == 'rx bytes' and item in self.created_cx.values(): - cx_rx_map[item] = value_rx - return cx_rx_map - - def __compare_vals(self, old_list, new_list): - passes = 0 - expected_passes = 0 - if len(old_list) == len(new_list): - for item, value in old_list.items(): - expected_passes += 1 - if new_list[item] > old_list[item]: - passes += 1 - - if passes == expected_passes: - return True - else: - return False - else: - return False - - def instantiate_file(self, file_name, file_format): - pass - - def monitor(self, - duration_sec=60, - monitor_interval_ms=1, - sta_list=None, - layer3_cols=None, - port_mgr_cols=None, - created_cx=None, - monitor=True, - report_file=None, - output_format=None, - script_name=None, - arguments=None, - compared_report=None, - debug=False): - try: - duration_sec = self.parse_time(duration_sec).seconds - except: - if (duration_sec is None) or (duration_sec <= 1): - raise ValueError("L3CXProfile::monitor wants duration_sec > 1 second") - if (duration_sec <= monitor_interval_ms): - raise ValueError("L3CXProfile::monitor wants duration_sec > monitor_interval") - if report_file == None: - raise ValueError("Monitor requires an output file to be defined") - if created_cx == None: - raise ValueError("Monitor needs a list of Layer 3 connections") - if (monitor_interval_ms is None) or (monitor_interval_ms < 1): - raise ValueError("L3CXProfile::monitor wants monitor_interval >= 1 second") - if layer3_cols is None: - raise ValueError("L3CXProfile::monitor wants a list of column names to monitor") - if output_format is not None: - if output_format.lower() != report_file.split('.')[-1]: - raise ValueError('Filename %s has an extension that does not match output format %s .' % (report_file, output_format)) - else: - output_format = report_file.split('.')[-1] - - - #default save to csv first - if report_file.split('.')[-1] != 'csv': - report_file = report_file.replace(str(output_format),'csv',1) - print("Saving rolling data into..." + str(report_file)) - - #================== Step 1, set column names and header row - layer3_cols=[self.replace_special_char(x) for x in layer3_cols] - layer3_fields = ",".join(layer3_cols) - default_cols=['Timestamp','Timestamp milliseconds epoch','Duration elapsed'] - default_cols.extend(layer3_cols) - header_row=default_cols - - - #csvwriter.writerow([systeminfo['VersionInfo']['BuildVersion'], script_name, str(arguments)]) - - if port_mgr_cols is not None: - port_mgr_cols=[self.replace_special_char(x) for x in port_mgr_cols] - port_mgr_cols_labelled =[] - for col_name in port_mgr_cols: - port_mgr_cols_labelled.append("port mgr - " + col_name) - - port_mgr_fields=",".join(port_mgr_cols) - header_row.extend(port_mgr_cols_labelled) - #add sys info to header row - systeminfo = self.json_get('/') - header_row.extend([str("LANforge GUI Build: " + systeminfo['VersionInfo']['BuildVersion']), str("Script Name: " + script_name), str("Argument input: " + str(arguments))]) - sta_list_edit=[] - if sta_list is not None: - for sta in sta_list: - sta_list_edit.append(sta[4:]) - sta_list=",".join(sta_list_edit) - - #================== Step 2, monitor columns - start_time = datetime.datetime.now() - end_time = start_time + datetime.timedelta(seconds=duration_sec) - - passes = 0 - expected_passes = 0 - old_cx_rx_values = self.__get_rx_values() - - #instantiate csv file here, add specified column headers - csvfile=open(str(report_file),'w') - csvwriter = csv.writer(csvfile,delimiter=",") - csvwriter.writerow(header_row) - - #wait 10 seconds to get proper port data - time.sleep(10) - - # for x in range(0,int(round(iterations,0))): - initial_starttime= datetime.datetime.now() - while datetime.datetime.now() < end_time: - t = datetime.datetime.now() - timestamp= t.strftime("%m/%d/%Y %I:%M:%S") - t_to_millisec_epoch= int(self.get_milliseconds(t)) - time_elapsed=int(self.get_seconds(t))-int(self.get_seconds(initial_starttime)) - - layer_3_response = self.json_get("/endp/%s?fields=%s" % (created_cx, layer3_fields)) - if port_mgr_cols is not None: - port_mgr_response=self.json_get("/port/1/1/%s?fields=%s" % (sta_list, port_mgr_fields)) - #get info from port manager with list of values from cx_a_side_list - if "endpoint" not in layer_3_response or layer_3_response is None: - print(layer_3_response) - raise ValueError("Cannot find columns requested to be searched. Exiting script, please retry.") - if debug: - print("Json layer_3_response from LANforge... " + str(layer_3_response)) - if port_mgr_cols is not None: - if "interfaces" not in port_mgr_response or port_mgr_response is None: - print(port_mgr_response) - raise ValueError("Cannot find columns requested to be searched. Exiting script, please retry.") - if debug: - print("Json port_mgr_response from LANforge... " + str(port_mgr_response)) - - - - temp_list=[] - for endpoint in layer_3_response["endpoint"]: - if debug: - print("Current endpoint values list... ") - print(list(endpoint.values())[0]) - temp_endp_values=list(endpoint.values())[0] #dict - temp_list.extend([timestamp,t_to_millisec_epoch,time_elapsed]) - current_sta = temp_endp_values['name'] - merge={} - if port_mgr_cols is not None: - for sta_name in sta_list_edit: - if sta_name in current_sta: - for interface in port_mgr_response["interfaces"]: - if sta_name in list(interface.keys())[0]: - merge=temp_endp_values.copy() - #rename keys (separate port mgr 'rx bytes' from layer3 'rx bytes') - port_mgr_values_dict =list(interface.values())[0] - renamed_port_cols={} - for key in port_mgr_values_dict.keys(): - renamed_port_cols['port mgr - ' +key]=port_mgr_values_dict[key] - merge.update(renamed_port_cols) - for name in header_row[3:-3]: - temp_list.append(merge[name]) - csvwriter.writerow(temp_list) - temp_list.clear() - new_cx_rx_values = self.__get_rx_values() - if debug: - print(old_cx_rx_values, new_cx_rx_values) - print("\n-----------------------------------") - print(t) - print("-----------------------------------\n") - expected_passes += 1 - if self.__compare_vals(old_cx_rx_values, new_cx_rx_values): - passes += 1 - else: - self.fail("FAIL: Not all stations increased traffic") - self.exit_fail() - old_cx_rx_values = new_cx_rx_values - time.sleep(monitor_interval_ms) - csvfile.close() - - #comparison to last report / report inputted - if compared_report is not None: - compared_df = self.compare_two_df(dataframe_one=self.file_to_df(report_file), dataframe_two=self.file_to_df(compared_report)) - exit(1) - #append compared df to created one - if output_format.lower() != 'csv': - self.df_to_file(dataframe=pd.read_csv(report_file), output_f=output_format, save_path=report_file) - else: - if output_format.lower() != 'csv': - self.df_to_file(dataframe=pd.read_csv(report_file), output_f=output_format, save_path=report_file) - - - def refresh_cx(self): - for cx_name in self.created_cx.keys(): - self.json_post("/cli-json/show_cxe", { - "test_mgr": "ALL", - "cross_connect": cx_name - }, debug_=self.debug) - print(".", end='') - - def start_cx(self): - print("Starting CXs...") - for cx_name in self.created_cx.keys(): - if self.debug: - print("cx-name: %s" % (cx_name)) - self.json_post("/cli-json/set_cx_state", { - "test_mgr": "default_tm", - "cx_name": cx_name, - "cx_state": "RUNNING" - }, debug_=self.debug) - if self.debug: - print(".", end='') - if self.debug: - print("") - - def stop_cx(self): - print("Stopping CXs...") - for cx_name in self.created_cx.keys(): - self.local_realm.stop_cx(cx_name) - print(".", end='') - print("") - - def cleanup_prefix(self): - self.local_realm.cleanup_cxe_prefix(self.name_prefix) - - def cleanup(self): - print("Cleaning up cxs and endpoints") - if len(self.created_cx) != 0: - for cx_name in self.created_cx.keys(): - if self.debug: - print("Cleaning cx: %s"%(cx_name)) - self.local_realm.rm_cx(cx_name) - - for side in range(len(self.created_cx[cx_name])): - ename = self.created_cx[cx_name][side] - if self.debug: - print("Cleaning endpoint: %s"%(ename)) - self.local_realm.rm_endp(self.created_cx[cx_name][side]) - - def create(self, endp_type, side_a, side_b, sleep_time=0.03, suppress_related_commands=None, debug_=False, - tos=None): - if self.debug: - debug_ = True - - cx_post_data = [] - timer_post_data = [] - these_endp = [] - these_cx = [] - - # print(self.side_a_min_rate, self.side_a_max_rate) - # print(self.side_b_min_rate, self.side_b_max_rate) - if (self.side_a_min_bps is None) \ - or (self.side_a_max_bps is None) \ - or (self.side_b_min_bps is None) \ - or (self.side_b_max_bps is None): - raise ValueError( - "side_a_min_bps, side_a_max_bps, side_b_min_bps, and side_b_max_bps must all be set to a value") - - if type(side_a) == list and type(side_b) != list: - side_b_info = self.local_realm.name_to_eid(side_b) - side_b_shelf = side_b_info[0] - side_b_resource = side_b_info[1] - - for port_name in side_a: - side_a_info = self.local_realm.name_to_eid(port_name,debug=debug_) - side_a_shelf = side_a_info[0] - side_a_resource = side_a_info[1] - if port_name.find('.') < 0: - port_name = "%d.%s" % (side_a_info[1], port_name) - - cx_name = "%s%s-%i" % (self.name_prefix, side_a_info[2], len(self.created_cx)) - - endp_a_name = cx_name + "-A" - endp_b_name = cx_name + "-B" - self.created_cx[cx_name] = [endp_a_name, endp_b_name] - self.created_endp[endp_a_name] = endp_a_name - self.created_endp[endp_b_name] = endp_b_name - these_cx.append(cx_name) - these_endp.append(endp_a_name) - these_endp.append(endp_b_name) - endp_side_a = { - "alias": endp_a_name, - "shelf": side_a_shelf, - "resource": side_a_resource, - "port": side_a_info[2], - "type": endp_type, - "min_rate": self.side_a_min_bps, - "max_rate": self.side_a_max_bps, - "min_pkt": self.side_a_min_pdu, - "max_pkt": self.side_a_max_pdu, - "ip_port": -1 - } - endp_side_b = { - "alias": endp_b_name, - "shelf": side_b_shelf, - "resource": side_b_resource, - "port": side_b_info[2], - "type": endp_type, - "min_rate": self.side_b_min_bps, - "max_rate": self.side_b_max_bps, - "min_pkt": self.side_b_min_pdu, - "max_pkt": self.side_b_max_pdu, - "ip_port": -1 - } - - url = "/cli-json/add_endp" - self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands) - self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands) - #print("napping %f sec"%sleep_time) - time.sleep(sleep_time) - - url = "cli-json/set_endp_flag" - data = { - "name": endp_a_name, - "flag": "AutoHelper", - "val": 1 - } - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - data["name"] = endp_b_name - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - - if (endp_type == "lf_udp") or (endp_type == "udp") or (endp_type == "lf_udp6") or (endp_type == "udp6"): - data["name"] = endp_a_name - data["flag"] = "UseAutoNAT" - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - data["name"] = endp_b_name - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - - if tos != None: - self.local_realm.set_endp_tos(endp_a_name, tos) - self.local_realm.set_endp_tos(endp_b_name, tos) - - data = { - "alias": cx_name, - "test_mgr": "default_tm", - "tx_endp": endp_a_name, - "rx_endp": endp_b_name, - } - # pprint(data) - cx_post_data.append(data) - timer_post_data.append({ - "test_mgr": "default_tm", - "cx_name": cx_name, - "milliseconds": self.report_timer - }) - - elif type(side_b) == list and type(side_a) != list: - side_a_info = self.local_realm.name_to_eid(side_a,debug=debug_) - side_a_shelf = side_a_info[0] - side_a_resource = side_a_info[1] - # side_a_name = side_a_info[2] - - for port_name in side_b: - print(side_b) - side_b_info = self.local_realm.name_to_eid(port_name,debug=debug_) - side_b_shelf = side_b_info[0] - side_b_resource = side_b_info[1] - side_b_name = side_b_info[2] - - cx_name = "%s%s-%i" % (self.name_prefix, port_name, len(self.created_cx)) - endp_a_name = cx_name + "-A" - endp_b_name = cx_name + "-B" - self.created_cx[cx_name] = [endp_a_name, endp_b_name] - self.created_endp[endp_a_name] = endp_a_name - self.created_endp[endp_b_name] = endp_b_name - these_cx.append(cx_name) - these_endp.append(endp_a_name) - these_endp.append(endp_b_name) - endp_side_a = { - "alias": endp_a_name, - "shelf": side_a_shelf, - "resource": side_a_resource, - "port": side_a_info[2], - "type": endp_type, - "min_rate": self.side_a_min_bps, - "max_rate": self.side_a_max_bps, - "min_pkt": self.side_a_min_pdu, - "max_pkt": self.side_a_max_pdu, - "ip_port": -1 - } - endp_side_b = { - "alias": endp_b_name, - "shelf": side_b_shelf, - "resource": side_b_resource, - "port": side_b_info[2], - "type": endp_type, - "min_rate": self.side_b_min_bps, - "max_rate": self.side_b_max_bps, - "min_pkt": self.side_b_min_pdu, - "max_pkt": self.side_b_max_pdu, - "ip_port": -1 - } - - url = "/cli-json/add_endp" - self.local_realm.json_post(url, endp_side_a, debug_=debug_, suppress_related_commands_=suppress_related_commands) - self.local_realm.json_post(url, endp_side_b, debug_=debug_, suppress_related_commands_=suppress_related_commands) - #print("napping %f sec" %sleep_time ) - time.sleep(sleep_time) - - url = "cli-json/set_endp_flag" - data = { - "name": endp_a_name, - "flag": "autohelper", - "val": 1 - } - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - - url = "cli-json/set_endp_flag" - data = { - "name": endp_b_name, - "flag": "autohelper", - "val": 1 - } - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - #print("CXNAME451: %s" % cx_name) - data = { - "alias": cx_name, - "test_mgr": "default_tm", - "tx_endp": endp_a_name, - "rx_endp": endp_b_name, - } - cx_post_data.append(data) - timer_post_data.append({ - "test_mgr": "default_tm", - "cx_name": cx_name, - "milliseconds": self.report_timer - }) - else: - raise ValueError( - "side_a or side_b must be of type list but not both: side_a is type %s side_b is type %s" % ( - type(side_a), type(side_b))) - print("wait_until_endps_appear these_endp: {} debug_ {}".format(these_endp,debug_)) - self.local_realm.wait_until_endps_appear(these_endp, debug=debug_) - - for data in cx_post_data: - url = "/cli-json/add_cx" - self.local_realm.json_post(url, data, debug_=debug_, suppress_related_commands_=suppress_related_commands) - time.sleep(0.01) - - self.local_realm.wait_until_cxs_appear(these_cx, debug=debug_) - - def to_string(self): - pprint.pprint(self) - - -class L4CXProfile(LFCliBase): - def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False): - super().__init__(lfclient_host, lfclient_port, debug_, _halt_on_error=True) - self.lfclient_url = "http://%s:%s" % (lfclient_host, lfclient_port) - self.debug = debug_ - self.url = "http://localhost/" - self.requests_per_ten = 600 - self.local_realm = local_realm - self.created_cx = {} - self.created_endp = [] - self.lfclient_port = lfclient_port - self.lfclient_host = lfclient_host - - def check_errors(self, debug=False): - fields_list = ["!conn", "acc.+denied", "bad-proto", "bad-url", "other-err", "total-err", "rslv-p", "rslv-h", - "timeout", "nf+(4xx)", "http-r", "http-p", "http-t", "login-denied"] - endp_list = self.json_get("layer4/list?fields=%s" % ','.join(fields_list)) - debug_info = {} - if endp_list is not None and endp_list['endpoint'] is not None: - endp_list = endp_list['endpoint'] - expected_passes = len(endp_list) - passes = len(endp_list) - for item in range(len(endp_list)): - for name, info in endp_list[item].items(): - for field in fields_list: - if info[field.replace("+", " ")] > 0: - passes -= 1 - debug_info[name] = {field: info[field.replace("+", " ")]} - if debug: - print(debug_info) - if passes == expected_passes: - return True - else: - print(list(debug_info), " Endps in this list showed errors getting to %s " % self.url) - return False - - def start_cx(self): - print("Starting CXs...") - for cx_name in self.created_cx.keys(): - self.json_post("/cli-json/set_cx_state", { - "test_mgr": "default_tm", - "cx_name": self.created_cx[cx_name], - "cx_state": "RUNNING" - }, debug_=self.debug) - print(".", end='') - print("") - - def stop_cx(self): - print("Stopping CXs...") - for cx_name in self.created_cx.keys(): - self.json_post("/cli-json/set_cx_state", { - "test_mgr": "default_tm", - "cx_name": self.created_cx[cx_name], - "cx_state": "STOPPED" - }, debug_=self.debug) - print(".", end='') - print("") - - def check_request_rate(self): - endp_list = self.json_get("layer4/list?fields=urls/s") - expected_passes = 0 - passes = 0 - if endp_list is not None and endp_list['endpoint'] is not None: - endp_list = endp_list['endpoint'] - for item in endp_list: - for name, info in item.items(): - if name in self.created_cx.keys(): - expected_passes += 1 - if info['urls/s'] * self.requests_per_ten >= self.target_requests_per_ten * .9: - print(name, info['urls/s'], info['urls/s'] * self.requests_per_ten, self.target_requests_per_ten * .9) - passes += 1 - - return passes == expected_passes - - - def cleanup(self): - print("Cleaning up cxs and endpoints") - if len(self.created_cx) != 0: - for cx_name in self.created_cx.keys(): - req_url = "cli-json/rm_cx" - data = { - "test_mgr": "default_tm", - "cx_name": self.created_cx[cx_name] - } - self.json_post(req_url, data) - # pprint(data) - req_url = "cli-json/rm_endp" - data = { - "endp_name": cx_name - } - self.json_post(req_url, data) - # pprint(data) - - def create(self, ports=[], sleep_time=.5, debug_=False, suppress_related_commands_=None): - cx_post_data = [] - for port_name in ports: - if len(self.local_realm.name_to_eid(port_name)) == 3: - shelf = self.local_realm.name_to_eid(port_name)[0] - resource = self.local_realm.name_to_eid(port_name)[1] - name = self.local_realm.name_to_eid(port_name)[2] - else: - raise ValueError("Unexpected name for port_name %s" % port_name) - endp_data = { - "alias": name + "_l4", - "shelf": shelf, - "resource": resource, - "port": name, - "type": "l4_generic", - "timeout": 10, - "url_rate": self.requests_per_ten, - "url": self.url, - "proxy_auth_type": 0x200 - } - url = "cli-json/add_l4_endp" - self.local_realm.json_post(url, endp_data, debug_=debug_, - suppress_related_commands_=suppress_related_commands_) - time.sleep(sleep_time) - - endp_data = { - "alias": "CX_" + name + "_l4", - "test_mgr": "default_tm", - "tx_endp": name + "_l4", - "rx_endp": "NA" - } - cx_post_data.append(endp_data) - self.created_cx[name + "_l4"] = "CX_" + name + "_l4" - - for cx_data in cx_post_data: - url = "/cli-json/add_cx" - self.local_realm.json_post(url, cx_data, debug_=debug_, - suppress_related_commands_=suppress_related_commands_) - time.sleep(sleep_time) - - def monitor(self, - duration_sec=60, - monitor_interval=1, - col_names=None, - created_cx=None, - monitor=True, - report_file=None, - output_format=None, - script_name=None, - arguments=None, - iterations=0, - debug=False): - try: - duration_sec = self.parse_time(duration_sec).seconds - except: - if (duration_sec is None) or (duration_sec <= 1): - raise ValueError("L4CXProfile::monitor wants duration_sec > 1 second") - if (duration_sec <= monitor_interval): - raise ValueError("L4CXProfile::monitor wants duration_sec > monitor_interval") - if report_file == None: - raise ValueError("Monitor requires an output file to be defined") - if created_cx == None: - raise ValueError("Monitor needs a list of Layer 4 connections") - if (monitor_interval is None) or (monitor_interval < 1): - raise ValueError("L4CXProfile::monitor wants monitor_interval >= 1 second") - if output_format is not None: - if output_format.lower() != report_file.split('.')[-1]: - raise ValueError('Filename %s does not match output format %s' % (report_file, output_format)) - else: - output_format = report_file.split('.')[-1] - - # Step 1 - Assign column names - - if col_names is not None and len(col_names) > 0: - header_row=col_names - else: - header_row=list((list(self.json_get("/layer4/all")['endpoint'][0].values())[0].keys())) - if debug: - print(header_row) - - # Step 2 - Monitor columns - - start_time = datetime.datetime.now() - end_time = start_time + datetime.timedelta(seconds=duration_sec) - sleep_interval = round(duration_sec // 5) - if debug: - print("Sleep_interval is %s ", sleep_interval) - print("Start time is %s " , start_time) - print("End time is %s " ,end_time) - value_map = dict() - passes = 0 - expected_passes = 0 - timestamps = [] - for test in range(1+iterations): - while datetime.datetime.now() < end_time: - if col_names is None: - response = self.json_get("/layer4/all") - else: - fields = ",".join(col_names) - response = self.json_get("/layer4/%s?fields=%s" % (created_cx, fields)) - if debug: - print(response) - if response is None: - print(response) - raise ValueError("Cannot find any endpoints") - if monitor: - if debug: - print(response) - - time.sleep(sleep_interval) - t = datetime.datetime.now() - timestamps.append(t) - value_map[t] = response - expected_passes += 1 - if self.check_errors(debug): - if self.check_request_rate(): - passes += 1 - else: - self._fail("FAIL: Request rate did not exceed 90% target rate") - self.exit_fail() - else: - self._fail("FAIL: Errors found getting to %s " % self.url) - self.exit_fail() - time.sleep(monitor_interval) - print(value_map) - - #[further] post-processing data, after test completion - full_test_data_list = [] - for test_timestamp, data in value_map.items(): - #reduce the endpoint data to single dictionary of dictionaries - for datum in data["endpoint"]: - for endpoint_data in datum.values(): - if debug: - print(endpoint_data) - endpoint_data["Timestamp"] = test_timestamp - full_test_data_list.append(endpoint_data) - - - header_row.append("Timestamp") - header_row.append('Timestamp milliseconds') - df = pd.DataFrame(full_test_data_list) - - df["Timestamp milliseconds"] = [self.get_milliseconds(x) for x in df["Timestamp"]] - #round entire column - df["Timestamp milliseconds"]=df["Timestamp milliseconds"].astype(int) - df["Timestamp"]=df["Timestamp"].apply(lambda x:x.strftime("%m/%d/%Y %I:%M:%S")) - df=df[["Timestamp","Timestamp milliseconds", *header_row[:-2]]] - #compare previous data to current data - - systeminfo = ast.literal_eval(requests.get('http://'+str(self.lfclient_host)+':'+str(self.lfclient_port)).text) - - if output_format == 'hdf': - df.to_hdf(report_file, 'table', append=True) - if output_format == 'parquet': - df.to_parquet(report_file, engine='pyarrow') - if output_format == 'png': - fig = df.plot().get_figure() - fig.savefig(report_file) - if output_format.lower() in ['excel', 'xlsx'] or report_file.split('.')[-1] == 'xlsx': - df.to_excel(report_file, index=False) - if output_format == 'df': - return df - supported_formats = ['csv', 'json', 'stata', 'pickle','html'] - for x in supported_formats: - if output_format.lower() == x or report_file.split('.')[-1] == x: - exec('df.to_' + x + '("'+report_file+'")') - class GenCXProfile(LFCliBase): def __init__(self, lfclient_host, lfclient_port, local_realm, debug_=False):