CSV to influx uses a while loop to write data to Influx

Signed-off-by: Matthew Stidham <stidmatt@protonmail.com>
This commit is contained in:
Matthew Stidham
2021-04-16 15:00:06 -07:00
parent fbd1046bc5
commit a9fb064002
2 changed files with 42 additions and 27 deletions

View File

@@ -19,7 +19,7 @@ if 'py-json' not in sys.path:
import argparse import argparse
from realm import Realm from realm import Realm
import pandas as pd import datetime
class CSVtoInflux(Realm): class CSVtoInflux(Realm):
@@ -32,7 +32,7 @@ class CSVtoInflux(Realm):
_proxy_str=None, _proxy_str=None,
_capture_signal_list=[], _capture_signal_list=[],
influxdb=None, influxdb=None,
#_influx_tag=[], _influx_tag=[],
target_csv=None): target_csv=None):
super().__init__(lfclient_host=lfclient_host, super().__init__(lfclient_host=lfclient_host,
lfclient_port=lfclient_port, lfclient_port=lfclient_port,
@@ -43,29 +43,44 @@ class CSVtoInflux(Realm):
_capture_signal_list=_capture_signal_list) _capture_signal_list=_capture_signal_list)
self.influxdb = influxdb self.influxdb = influxdb
self.target_csv = target_csv self.target_csv = target_csv
#self.influx_tag = _influx_tag self.influx_tag = _influx_tag
# Submit data to the influx db if configured to do so. # Submit data to the influx db if configured to do so.
def post_to_influx(self): def post_to_influx(self):
df = pd.read_csv(self.target_csv, sep='\t') with open(self.target_csv) as fp:
df['Date'] = pd.to_datetime(df['Date'], unit='ms') line = fp.readline()
df['Date'] = [str(timestamp.isoformat()) for timestamp in df['Date']] line = line.split('\t')
dates = list(set(df['Date'])) # indexes tell us where in the CSV our data is located. We do it this way so that even if the columns are moved around, as long as they are present, the script will still work.
scriptname=df['test-id'][0] numeric_score_index = line.index('numeric-score')
for date in dates: test_id_index = line.index('test-id')
kpi2 = df[df['Date'] == date][['Date', 'test details', 'numeric-score', 'test-id']] date_index = line.index('Date')
metrics = list(set(kpi2['test details'])) test_details_index = line.index('test details')
targets = dict() short_description_index = line.index('short-description')
for k in metrics: graph_group_index = line.index('Graph-Group')
targets[k] = [*kpi2[kpi2['test details'] == k]['numeric-score']][0] units_index = line.index('Units')
targets[k.replace(' ', '-').lower()] = targets.pop(k) line = fp.readline()
targets while line:
line = line.split('\t') #split the line by tabs to separate each item in the string
date = line[date_index]
date = datetime.datetime.utcfromtimestamp(int(date) / 1000).isoformat() #convert to datetime so influx can read it, this is required
numeric_score = line[numeric_score_index]
numeric_score = float(numeric_score) #convert to float, InfluxDB cannot
test_details = line[test_details_index]
short_description = line[short_description_index]
test_id = line[test_id_index]
tags = dict() tags = dict()
tags['script'] = scriptname tags['script'] = line[test_id_index]
#for item in self.influx_tag: tags['short-description'] = line[short_description_index]
#tags[item[0]] = item[1] tags['test_details'] = line[test_details_index]
for k in targets.keys(): tags['Graph-Group'] = line[graph_group_index]
self.influxdb.post_to_influx(k, targets[k], tags, date) tags['Units'] = line[units_index]
for item in self.influx_tag: # Every item in the influx_tag command needs to be added to the tags variable
tags[item[0]] = item[1]
self.influxdb.post_to_influx(short_description, numeric_score, tags, date)
line = fp.readline()
#influx wants to get data in the following format:
# variable n ame, value, tags, date
# total-download-mbps-speed-for-the-duration-of-this-iteration 171.085494 {'script': 'WiFi Capacity'} 2021-04-14T19:04:04.902000
def main(): def main():
@@ -116,7 +131,8 @@ python3 csv_to_influx.py --influx_host localhost --influx_org Candela --influx_t
parser.add_argument('--influx_token', help='Token for the Influx database') parser.add_argument('--influx_token', help='Token for the Influx database')
parser.add_argument('--influx_bucket', help='Name of the Influx bucket') parser.add_argument('--influx_bucket', help='Name of the Influx bucket')
parser.add_argument('--target_csv', help='CSV file to record to influx database', required=True) parser.add_argument('--target_csv', help='CSV file to record to influx database', required=True)
parser.add_argument('--influx_tag', action='append', nargs=2, help='--influx_tag <key> <val> Can add more than one of these.') parser.add_argument('--influx_tag', action='append', nargs=2,
help='--influx_tag <key> <val> Can add more than one of these.', default=[])
args = parser.parse_args() args = parser.parse_args()
@@ -130,10 +146,10 @@ python3 csv_to_influx.py --influx_host localhost --influx_org Candela --influx_t
_influx_org=args.influx_org, _influx_org=args.influx_org,
_influx_token=args.influx_token, _influx_token=args.influx_token,
_influx_bucket=args.influx_bucket) _influx_bucket=args.influx_bucket)
#_influx_tag=args.influx_tag)
csvtoinflux = CSVtoInflux(influxdb=influxdb, csvtoinflux = CSVtoInflux(influxdb=influxdb,
target_csv=args.target_csv) target_csv=args.target_csv,
_influx_tag=args.influx_tag)
csvtoinflux.post_to_influx() csvtoinflux.post_to_influx()

View File

@@ -53,7 +53,6 @@ class RecordInflux(LFCliBase):
print(tag_key, tag_value) print(tag_key, tag_value)
p.time(time) p.time(time)
p.field("value", value) p.field("value", value)
print(self.influx_bucket, self.influx_org, self.url, self.influx_port)
self.write_api.write(bucket=self.influx_bucket, org=self.influx_org, record=p) self.write_api.write(bucket=self.influx_bucket, org=self.influx_org, record=p)
def set_bucket(self, b): def set_bucket(self, b):