mirror of
https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
synced 2025-11-01 03:07:56 +00:00
469 lines
18 KiB
Python
469 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
# Class holds default settings for json requests to Grafana -
|
|
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
|
import sys
|
|
|
|
if sys.version_info[0] != 3:
|
|
print("This script requires Python 3")
|
|
exit()
|
|
|
|
import requests
|
|
|
|
import json
|
|
import string
|
|
import random
|
|
|
|
|
|
class CSVReader:
|
|
def __init__(self):
|
|
self.shape = None
|
|
|
|
def read_csv(self,
|
|
file,
|
|
sep='\t'):
|
|
df = open(file).read().split('\n')
|
|
rows = list()
|
|
for x in df:
|
|
if len(x) > 0:
|
|
rows.append(x.split(sep))
|
|
length = list(range(0, len(df[0])))
|
|
columns = dict(zip(df[0], length))
|
|
self.shape = (length, columns)
|
|
return rows
|
|
|
|
def get_column(self,
|
|
df,
|
|
value):
|
|
index = df[0].index(value)
|
|
values = []
|
|
for row in df[1:]:
|
|
values.append(row[index])
|
|
return values
|
|
|
|
|
|
class GrafanaRequest:
|
|
def __init__(self,
|
|
_grafana_token,
|
|
_grafanajson_host,
|
|
grafanajson_port=3000,
|
|
_folderID=0,
|
|
_headers=dict(),
|
|
_overwrite='false',
|
|
debug_=False,
|
|
die_on_error_=False):
|
|
self.debug = debug_
|
|
self.die_on_error = die_on_error_
|
|
self.headers = _headers
|
|
self.headers['Authorization'] = 'Bearer ' + _grafana_token
|
|
self.headers['Content-Type'] = 'application/json'
|
|
self.grafanajson_host = _grafanajson_host
|
|
self.grafanajson_port = grafanajson_port
|
|
self.grafanajson_token = _grafana_token
|
|
self.grafanajson_url = "http://%s:%s" % (_grafanajson_host, grafanajson_port)
|
|
self.data = dict()
|
|
self.data['overwrite'] = _overwrite
|
|
self.csvreader = CSVReader()
|
|
self.units = dict()
|
|
|
|
def create_bucket(self,
|
|
bucket_name=None):
|
|
# Create a bucket in Grafana
|
|
if bucket_name is not None:
|
|
pass
|
|
|
|
def list_dashboards(self):
|
|
url = self.grafanajson_url + '/api/search'
|
|
print(url)
|
|
return json.loads(requests.get(url, headers=self.headers).text)
|
|
|
|
def create_dashboard(self,
|
|
dashboard_name=None,
|
|
):
|
|
grafanajson_url = self.grafanajson_url + "/api/dashboards/db"
|
|
datastore = dict()
|
|
dashboard = dict()
|
|
dashboard['id'] = None
|
|
dashboard['title'] = dashboard_name
|
|
dashboard['tags'] = ['templated']
|
|
dashboard['timezone'] = 'browser'
|
|
dashboard['schemaVersion'] = 27
|
|
dashboard['version'] = 4
|
|
datastore['dashboard'] = dashboard
|
|
datastore['overwrite'] = False
|
|
data = json.dumps(datastore, indent=4)
|
|
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
|
|
|
|
def delete_dashboard(self,
|
|
dashboard_uid=None):
|
|
grafanajson_url = self.grafanajson_url + "/api/dashboards/uid/" + dashboard_uid
|
|
return requests.post(grafanajson_url, headers=self.headers, verify=False)
|
|
|
|
def create_dashboard_from_data(self,
|
|
json_file=None):
|
|
grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
|
|
datastore = dict()
|
|
dashboard = dict(json.loads(open(json_file).read()))
|
|
datastore['dashboard'] = dashboard
|
|
datastore['overwrite'] = False
|
|
data = json.dumps(datastore, indent=4)
|
|
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
|
|
|
|
def create_dashboard_from_dict(self,
|
|
dictionary=None,
|
|
overwrite=False):
|
|
grafanajson_url = self.grafanajson_url + '/api/dashboards/db'
|
|
datastore = dict()
|
|
dashboard = dict(json.loads(dictionary))
|
|
datastore['dashboard'] = dashboard
|
|
datastore['overwrite'] = overwrite
|
|
data = json.dumps(datastore, indent=4)
|
|
return requests.post(grafanajson_url, headers=self.headers, data=data, verify=False)
|
|
|
|
def get_graph_groups(self, target_csvs): # Get the unique values in the Graph-Group column
|
|
dictionary = dict()
|
|
for target_csv in target_csvs:
|
|
if len(target_csv) > 1:
|
|
csv = self.csvreader.read_csv(target_csv)
|
|
# Unique values in the test-id column
|
|
scripts = list(set(self.csvreader.get_column(csv, 'test-id')))
|
|
# we need to make sure we match each Graph Group to the script it occurs in
|
|
for script in scripts:
|
|
# Unique Graph Groups for each script
|
|
graph_groups = self.csvreader.get_column(csv, 'Graph-Group')
|
|
dictionary[script] = list(set(graph_groups))
|
|
units = self.csvreader.get_column(csv, 'Units')
|
|
self.units[script] = dict()
|
|
for index in range(0, len(graph_groups)):
|
|
self.units[script][graph_groups[index]] = units[index]
|
|
subtests = 0
|
|
for score in list(self.csvreader.get_column(csv, 'Subtest-Pass')):
|
|
subtests += int(score)
|
|
for score in list(self.csvreader.get_column(csv, 'Subtest-Fail')):
|
|
subtests += int(score)
|
|
if subtests > 0:
|
|
dictionary[script].append('Subtests passed')
|
|
dictionary[script].append('Subtests failed')
|
|
print(subtests)
|
|
for item in dictionary[script]:
|
|
print('%s, %s' % (item, type(item)))
|
|
print(dictionary)
|
|
return dictionary
|
|
|
|
def maketargets(self,
|
|
bucket,
|
|
scriptname,
|
|
groupBy,
|
|
index,
|
|
graph_group,
|
|
testbed,
|
|
test_tag=None):
|
|
query = (
|
|
'from(bucket: "%s")\n '
|
|
'|> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n '
|
|
'|> filter(fn: (r) => r["script"] == "%s")\n '
|
|
'|> group(columns: ["_measurement"])\n '
|
|
% (bucket, scriptname))
|
|
queryend = ('|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)\n '
|
|
'|> yield(name: "mean")\n ')
|
|
if graph_group is not None:
|
|
graphgroup = ('|> filter(fn: (r) => r["Graph-Group"] == "%s")\n' % graph_group)
|
|
query += graphgroup
|
|
if test_tag is not None and len(test_tag) > 0:
|
|
graphgroup = ('|> filter(fn: (r) => r["Test Tag"] == "%s")\n' % test_tag)
|
|
query += graphgroup
|
|
if testbed is not None:
|
|
query += ('|> filter(fn: (r) => r["testbed"] == "%s")\n' % testbed)
|
|
targets = dict()
|
|
targets['delimiter'] = ','
|
|
targets['groupBy'] = groupBy
|
|
targets['header'] = True
|
|
targets['ignoreUnknown'] = False
|
|
targets['orderByTime'] = 'ASC'
|
|
targets['policy'] = 'default'
|
|
targets['query'] = query + queryend
|
|
targets['refId'] = dict(enumerate(string.ascii_uppercase, 1))[index + 1]
|
|
targets['resultFormat'] = "time_series"
|
|
targets['schema'] = list()
|
|
targets['skipRows'] = 0
|
|
targets['tags'] = list()
|
|
return targets
|
|
|
|
def groupby(self, params, grouptype):
|
|
dic = dict()
|
|
dic['params'] = list()
|
|
dic['params'].append(params)
|
|
dic['type'] = grouptype
|
|
return dic
|
|
|
|
def create_custom_dashboard(self,
|
|
scripts=None,
|
|
title=None,
|
|
bucket=None,
|
|
graph_groups=None,
|
|
graph_groups_file=None,
|
|
target_csvs=None,
|
|
testbed=None,
|
|
datasource='InfluxDB',
|
|
from_date='now-1y',
|
|
to_date='now',
|
|
graph_height=8,
|
|
graph_width=12,
|
|
pass_fail=None,
|
|
test_tag=None):
|
|
options = string.ascii_lowercase + string.ascii_uppercase + string.digits
|
|
uid = ''.join(random.choice(options) for i in range(9))
|
|
input1 = dict()
|
|
annotations = dict()
|
|
annotations['builtIn'] = 1
|
|
annotations['datasource'] = '-- Grafana --'
|
|
annotations['enable'] = True
|
|
annotations['hide'] = True
|
|
annotations['iconColor'] = 'rgba(0, 211, 255, 1)'
|
|
annotations['name'] = 'Annotations & Alerts'
|
|
annotations['type'] = 'dashboard'
|
|
annot = dict()
|
|
annot['list'] = list()
|
|
annot['list'].append(annotations)
|
|
|
|
templating = dict()
|
|
templating['list'] = list()
|
|
|
|
timedict = dict()
|
|
timedict['from'] = from_date
|
|
timedict['to'] = to_date
|
|
|
|
panels = list()
|
|
index = 1
|
|
if graph_groups_file:
|
|
print("graph_groups_file: %s" % graph_groups_file)
|
|
target_csvs = open(graph_groups_file).read().split('\n')
|
|
graph_groups = self.get_graph_groups(
|
|
target_csvs) # Get the list of graph groups which are in the tests we ran
|
|
if target_csvs:
|
|
print('Target CSVs: %s' % target_csvs)
|
|
graph_groups = self.get_graph_groups(
|
|
target_csvs) # Get the list of graph groups which are in the tests we ran
|
|
if pass_fail is not None:
|
|
graph_groups[pass_fail] = ['PASS', 'FAIL']
|
|
|
|
print('Test Tag in Grafana: %s' % test_tag)
|
|
|
|
for scriptname in graph_groups.keys():
|
|
print(scriptname)
|
|
if scriptname in test_tag.keys():
|
|
for tag in test_tag[scriptname]:
|
|
print('Script: %s, Tag: %s' % (scriptname, tag))
|
|
panel = self.create_panel(graph_groups,
|
|
graph_height,
|
|
graph_width,
|
|
scriptname,
|
|
bucket,
|
|
testbed,
|
|
tag,
|
|
datasource,
|
|
index)
|
|
panels.append(panel)
|
|
index = index + 1
|
|
else:
|
|
panel = self.create_panel(graph_groups,
|
|
graph_height,
|
|
graph_width,
|
|
scriptname,
|
|
bucket,
|
|
testbed,
|
|
None,
|
|
datasource,
|
|
index)
|
|
panels.append(panel)
|
|
index = index + 1
|
|
|
|
input1['annotations'] = annot
|
|
input1['editable'] = True
|
|
input1['gnetId'] = None
|
|
input1['graphTooltip'] = 0
|
|
input1['links'] = list()
|
|
input1['panels'] = panels
|
|
input1['refresh'] = False
|
|
input1['schemaVersion'] = 27
|
|
input1['style'] = 'dark'
|
|
input1['tags'] = list()
|
|
input1['templating'] = templating
|
|
input1['time'] = timedict
|
|
input1['timepicker'] = dict()
|
|
input1['timezone'] = ''
|
|
input1['title'] = ("Testbed: %s" % title)
|
|
input1['uid'] = uid
|
|
input1['version'] = 11
|
|
return self.create_dashboard_from_dict(dictionary=json.dumps(input1))
|
|
|
|
def create_panel(self,
|
|
graph_groups,
|
|
graph_height,
|
|
graph_width,
|
|
scriptname,
|
|
bucket,
|
|
testbed,
|
|
test_tag,
|
|
datasource,
|
|
index):
|
|
print('Test Tag: %s' % test_tag)
|
|
for graph_group in graph_groups[scriptname]:
|
|
panel = dict()
|
|
|
|
gridpos = dict()
|
|
gridpos['h'] = graph_height
|
|
gridpos['w'] = graph_width
|
|
gridpos['x'] = 0
|
|
gridpos['y'] = 0
|
|
|
|
legend = dict()
|
|
legend['avg'] = False
|
|
legend['current'] = False
|
|
legend['max'] = False
|
|
legend['min'] = False
|
|
legend['show'] = True
|
|
legend['total'] = False
|
|
legend['values'] = False
|
|
|
|
options = dict()
|
|
options['alertThreshold'] = True
|
|
|
|
groupBy = list()
|
|
groupBy.append(self.groupby('$__interval', 'time'))
|
|
groupBy.append(self.groupby('null', 'fill'))
|
|
|
|
targets = list()
|
|
counter = 0
|
|
try:
|
|
new_target = self.maketargets(bucket,
|
|
scriptname,
|
|
groupBy,
|
|
counter,
|
|
graph_group,
|
|
testbed,
|
|
test_tag=test_tag)
|
|
except:
|
|
new_target = self.maketargets(bucket, scriptname, groupBy, counter, graph_group, testbed)
|
|
targets.append(new_target)
|
|
|
|
fieldConfig = dict()
|
|
fieldConfig['defaults'] = dict()
|
|
fieldConfig['overrides'] = list()
|
|
|
|
transformation = dict()
|
|
transformation['id'] = "renameByRegex"
|
|
transformation_options = dict()
|
|
transformation_options['regex'] = "(.*) value.*"
|
|
transformation_options['renamePattern'] = "$1"
|
|
transformation['options'] = transformation_options
|
|
|
|
xaxis = dict()
|
|
xaxis['buckets'] = None
|
|
xaxis['mode'] = "time"
|
|
xaxis['name'] = None
|
|
xaxis['show'] = True
|
|
xaxis['values'] = list()
|
|
|
|
yaxis = dict()
|
|
yaxis['format'] = 'short'
|
|
try:
|
|
yaxis['label'] = self.units[scriptname][graph_group]
|
|
except:
|
|
pass
|
|
yaxis['logBase'] = 1
|
|
yaxis['max'] = None
|
|
yaxis['min'] = None
|
|
yaxis['show'] = True
|
|
|
|
yaxis1 = dict()
|
|
yaxis1['align'] = False
|
|
yaxis1['alignLevel'] = None
|
|
|
|
panel['aliasColors'] = dict()
|
|
panel['bars'] = False
|
|
panel['dashes'] = False
|
|
panel['dashLength'] = 10
|
|
panel['datasource'] = datasource
|
|
panel['fieldConfig'] = fieldConfig
|
|
panel['fill'] = 0
|
|
panel['fillGradient'] = 0
|
|
panel['gridPos'] = gridpos
|
|
panel['hiddenSeries'] = False
|
|
panel['id'] = index
|
|
panel['legend'] = legend
|
|
panel['lines'] = True
|
|
panel['linewidth'] = 1
|
|
panel['nullPointMode'] = 'null'
|
|
panel['options'] = options
|
|
panel['percentage'] = False
|
|
panel['pluginVersion'] = '7.5.4'
|
|
panel['pointradius'] = 2
|
|
panel['points'] = True
|
|
panel['renderer'] = 'flot'
|
|
panel['seriesOverrides'] = list()
|
|
panel['spaceLength'] = 10
|
|
panel['stack'] = False
|
|
panel['steppedLine'] = False
|
|
panel['targets'] = targets
|
|
panel['thresholds'] = list()
|
|
panel['timeFrom'] = None
|
|
panel['timeRegions'] = list()
|
|
panel['timeShift'] = None
|
|
|
|
if graph_group is not None:
|
|
scriptname = '%s: %s' % (scriptname, graph_group)
|
|
if test_tag is not None:
|
|
scriptname = '%s: %s' % (scriptname, test_tag)
|
|
scriptname = '%s: %s' % (scriptname, testbed)
|
|
panel['title'] = scriptname
|
|
|
|
if self.debug:
|
|
print(panel['title'])
|
|
panel['transformations'] = list()
|
|
panel['transformations'].append(transformation)
|
|
panel['type'] = "graph"
|
|
panel['xaxis'] = xaxis
|
|
panel['yaxes'] = list()
|
|
panel['yaxes'].append(yaxis)
|
|
panel['yaxes'].append(yaxis)
|
|
panel['yaxis'] = yaxis1
|
|
return panel
|
|
|
|
def create_snapshot(self, title):
|
|
print('create snapshot')
|
|
grafanajson_url = self.grafanajson_url + '/api/snapshots'
|
|
data = self.get_dashboard(title)
|
|
data['expires'] = False
|
|
data['external'] = False
|
|
data['timeout'] = 15
|
|
if self.debug:
|
|
print(data)
|
|
return requests.post(grafanajson_url, headers=self.headers, json=data, verify=False).text
|
|
|
|
def list_snapshots(self):
|
|
grafanajson_url = self.grafanajson_url + '/api/dashboard/snapshots'
|
|
print(grafanajson_url)
|
|
return json.loads(requests.get(grafanajson_url, headers=self.headers, verify=False).text)
|
|
|
|
def get_dashboard(self, target):
|
|
dashboards = self.list_dashboards()
|
|
print(target)
|
|
for dashboard in dashboards:
|
|
if dashboard['title'] == target:
|
|
uid = dashboard['uid']
|
|
grafanajson_url = self.grafanajson_url + '/api/dashboards/uid/' + uid
|
|
print(grafanajson_url)
|
|
return json.loads(requests.get(grafanajson_url, headers=self.headers, verify=False).text)
|
|
|
|
def get_units(self, csv):
|
|
df = self.csvreader.read_csv(csv)
|
|
units = self.csvreader.get_column(df, 'Units')
|
|
test_id = self.csvreader.get_column(df, 'test-id')
|
|
maxunit = max(set(units), key=units.count)
|
|
maxtest = max(set(test_id), key=test_id.count)
|
|
d = dict()
|
|
d[maxunit] = maxtest
|
|
print(maxunit, maxtest)
|
|
return d |