longevity: Add initial influx support.

Doesn't work yet, calling influx with wrong data structure it seems.
But, getting closer.

Signed-off-by: Ben Greear <greearb@candelatech.com>
This commit is contained in:
Ben Greear
2021-04-07 17:20:31 -07:00
parent 54f2fe70af
commit b4bead52ff
2 changed files with 116 additions and 43 deletions

View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python3
# pip3 install influxdb
import sys
if sys.version_info[0] != 3:
@@ -16,62 +18,75 @@ import time
class RecordInflux(LFCliBase):
def __init__(self,
_lfjson_host="lanforge",
_lfjson_port=8080,
_influx_host="localhost",
_port=8080,
_influx_port=8086,
_influx_user=None,
_influx_passwd=None,
_influx_db=None,
_debug_on=False,
_exit_on_fail=False):
super().__init__(_influx_host,
_port,
super().__init__(_lfjson_host, _lfjson_port,
_debug=_debug_on,
_exit_on_fail=_exit_on_fail)
self.host = _influx_host
self.port = _port
self.influx_host = _influx_host
self.influx_port = _influx_port
self.influx_user = _influx_user
self.influx_passwd = _influx_passwd
self.influx_db = _influx_db
self.client = InfluxDBClient(self.host,
8086,
self.client = InfluxDBClient(self.influx_host,
self.influx_port,
self.influx_user,
self.influx_passwd,
self.influx_db)
def posttoinflux(self, station, key, value):
json_body = [
{
"measurement": station + ' ' + key,
"tags": {
"host": self.host,
"region": "us-west"
},
"time": str(datetime.datetime.utcnow().isoformat()),
"fields": {
"value": value
}
}
]
self.client.write_points(json_body)
def post_to_influx(self, key, value, tags):
data = {}
data['measurement'] = key
data['tags'] = {}
for t in tags:
data['tags'][t.key] = t.val
data['time'] = str(datetime.datetime.utcnow().isoformat())
data['fields'] = {}
data['fields']['value'] = value
def getdata(self,
devices=None,
target_kpi=None,
longevity=None,
monitor_interval=None):
url = 'http://' + self.host + ':8080/port/1/1/'
# json_body = json.dumps(data)
# json_body = [
# {
# "measurement": key,
# "tags": {
# "host": self.host,
# "region": "us-west"
# },
# "time": str(datetime.datetime.utcnow().isoformat()),
# "fields": {
# "value": value
# }
# }
# ]
self.client.write_points(data)
# Don't use this unless you are sure you want to.
# More likely you would want to generate KPI in the
# individual test cases and poke those relatively small bits of
# info into influxdb.
# This will not return until the 'longevity' timer has expired.
def monitor_port_data(self,
lanforge_host="localhost",
devices=None,
longevity=None,
monitor_interval=None):
url = 'http://' + lanforge_host + ':8080/port/1/1/'
end = datetime.datetime.now() + datetime.timedelta(0, longevity)
while datetime.datetime.now() < end:
for station in devices:
url1 = url + station
response = json.loads(requests.get(url1).text)
if target_kpi is None:
for key in response['interface'].keys():
self.posttoinflux(station, key, response['interface'][key])
else:
targets = target_kpi + ['ip', 'ipv6 address', 'alias', 'mac']
response['interface'] = {your_key: response['interface'][your_key] for your_key in targets}
for key in response['interface'].keys():
self.posttoinflux(station, key, response['interface'][key])
# Poke everything into influx db
for key in response['interface'].keys():
self.posttoinflux("%s-%s"%(station, key), response['interface'][key])
time.sleep(monitor_interval)

View File

@@ -56,6 +56,7 @@ class L3VariableTime(Realm):
_exit_on_error=False,
_exit_on_fail=False,
_proxy_str=None,
influxdb=None,
_capture_signal_list=[]):
super().__init__(lfclient_host=lfclient_host,
lfclient_port=lfclient_port,
@@ -64,6 +65,7 @@ class L3VariableTime(Realm):
_exit_on_fail=_exit_on_fail,
_proxy_str=_proxy_str,
_capture_signal_list=_capture_signal_list)
self.influxdb = influxdb
self.tos = tos.split()
self.endp_types = endp_types.split()
self.side_b = side_b
@@ -148,10 +150,14 @@ class L3VariableTime(Realm):
self.cx_profile.name_prefix = self.name_prefix
def __get_rx_values(self):
endp_list = self.json_get("endp?fields=name,rx+bytes,rx+drop+%25", debug_=False)
endp_list = self.json_get("endp?fields=name,rx+rate,rx+bytes,rx+drop+%25", debug_=False)
endp_rx_drop_map = {}
endp_rx_map = {}
our_endps = {}
total_ul = 0
total_dl = 0
for e in self.multicast_profile.get_mc_names():
our_endps[e] = e;
for e in self.cx_profile.created_endp.keys():
@@ -166,8 +172,15 @@ class L3VariableTime(Realm):
for value_name, value_rx_drop in value.items():
if value_name == 'rx drop %':
endp_rx_drop_map[item] = value_rx_drop
for value_name, value_rx_bps in value.items():
if value_name == 'rx rate':
# This hack breaks for mcast
if item.endswith("-A"):
total_dl += int(value_rx_bps)
else:
total_ul += int(value_rx_bps)
return endp_rx_map, endp_rx_drop_map
return endp_rx_map, endp_rx_drop_map, total_dl, total_ul
def time_stamp(self):
return time.strftime('%m_%d_%Y_%H_%M_%S', time.localtime(self.epoch_time))
@@ -522,7 +535,7 @@ class L3VariableTime(Realm):
cur_time = datetime.datetime.now()
print("Getting initial values.")
old_rx_values, rx_drop_percent = self.__get_rx_values()
old_rx_values, rx_drop_percent, total_dl_bps, total_ul_bps = self.__get_rx_values()
end_time = self.parse_time(self.test_duration) + cur_time
@@ -530,6 +543,8 @@ class L3VariableTime(Realm):
passes = 0
expected_passes = 0
total_dl_bps = 0
total_ul_bps = 0
while cur_time < end_time:
#interval_time = cur_time + datetime.timedelta(seconds=5)
interval_time = cur_time + datetime.timedelta(seconds=self.polling_interval_seconds)
@@ -540,7 +555,7 @@ class L3VariableTime(Realm):
time.sleep(1)
self.epoch_time = int(time.time())
new_rx_values, rx_drop_percent = self.__get_rx_values()
new_rx_values, rx_drop_percent, total_dl_bps, total_ul_bps = self.__get_rx_values()
expected_passes += 1
if self.__compare_vals(old_rx_values, new_rx_values):
@@ -552,6 +567,8 @@ class L3VariableTime(Realm):
self.__record_rx_dropped_percent(rx_drop_percent)
self.record_kpi(len(temp_stations_list), ul, dl, ul_pdu, dl_pdu, total_dl_bps, total_ul_bps)
self.cx_profile.stop_cx();
self.multicast_profile.stop_mc();
@@ -560,6 +577,30 @@ class L3VariableTime(Realm):
if passes == expected_passes:
self._pass("PASS: Requested-Rate: %s <-> %s PDU: %s <-> %s All tests passed" % (ul, dl, ul_pdu, dl_pdu), print_pass)
def record_kpi(self, sta_count, ul, dl, ul_pdu, dl_pdu, total_dl_bps, total_ul_bps):
if self.influxdb == None:
return
class MyKvPair:
key = ""
val = ""
tags = [MyKvPair] * 5
tags[0].key = "requested-ul-bps"
tags[0].val = ul
tags[1].key = "requested-dl-bps"
tags[1].val = dl
tags[2].key = "ul-pdu-size"
tags[2].val = ul_pdu
tags[3].key = "dl-pdu-size"
tags[3].val = dl_pdu
tags[4].key = "station-count"
tags[4].val = sta_count
self.influxdb.post_to_influx("total-download-bps", total_dl_bps, tags)
self.influxdb.post_to_influx("total-upload-bps", total_ul_bps, tags)
self.influxdb.post_to_influx("total-bi-directional-bps", total_ul_bps + total_dl_bps, tags)
def stop(self):
self.cx_profile.stop_cx()
self.multicast_profile.stop_mc()
@@ -779,6 +820,12 @@ python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr
parser.add_argument('-bmr','--side_b_min_bps', help='--side_b_min_bps , upstream min tx rate default 256000', default="256000")
parser.add_argument('-bmp','--side_b_min_pdu', help='--side_b_min_pdu , upstream pdu size default 1518', default="MTU")
parser.add_argument('--influx_host', help='Hostname for the Influx database')
parser.add_argument('--influx_port', help='IP Port for the Influx database')
parser.add_argument('--influx_user', help='Username for the Influx database')
parser.add_argument('--influx_passwd', help='Password for the Influx database')
parser.add_argument('--influx_db', help='Name of the Influx database')
parser.add_argument("--cap_ctl_out", help="--cap_ctl_out , switch the cisco controller output will be captured", action='store_true')
parser.add_argument("--wait", help="--wait <time> , time to wait at the end of the test", default='0')
@@ -809,7 +856,18 @@ python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr
current_time = time.strftime("%m_%d_%Y_%H_%M_%S", time.localtime())
csv_outfile = "{}_{}.csv".format(args.csv_outfile,current_time)
print("csv output file : {}".format(csv_outfile))
influxdb = None
if args.influx_db is not None:
from influx import RecordInflux
influxdb = RecordInflux(_lfjson_host=lfjson_host,
_lfjson_port=lfjson_port,
_influx_host=args.influx_host,
_influx_port=args.influx_port,
_influx_db=args.influx_db,
_influx_user=args.influx_user,
_influx_passwd=args.influx_passwd)
MAX_NUMBER_OF_STATIONS = 1000
@@ -859,7 +917,6 @@ python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr
reset_port_time_max_list.append('0s')
index = 0
station_lists = []
for (radio_name_, number_of_stations_per_radio_) in zip(radio_name_list,number_of_stations_per_radio_list):
@@ -909,7 +966,8 @@ python3 test_l3_longevity.py --cisco_ctlr 192.168.100.112 --cisco_dfs True --mgr
side_a_min_pdu=ul_pdus,
side_b_min_pdu=dl_pdus,
debug=debug,
outfile=csv_outfile)
outfile=csv_outfile,
influxdb=influxdb)
ip_var_test.pre_cleanup()