keras-autoencoder.py: fixed invalid preprocessing of received base64 packet data

* split logic into seperate jobs; nDPIsrvd and Keras
 * nDPIsrvd: break event processing and re-run `epoll_wait()` after client disconnected

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2023-08-15 11:21:46 +02:00
parent 2b881d56e7
commit 4b3031245d
2 changed files with 157 additions and 62 deletions

View File

@@ -1,12 +1,12 @@
#!/usr/bin/env python3
import base64
import binascii
import joblib
import csv
import matplotlib.pyplot as plt
import multiprocessing as mp
import numpy as np
import os
import pandas as pd
import queue
import sys
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
@@ -17,29 +17,37 @@ import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor
INPUT_SIZE = nDPIsrvd.nDPId_PACKETS_PLEN_MAX
LATENT_SIZE = 8
TRAINING_SIZE = 500
EPOCH_COUNT = 5
BATCH_SIZE = 10
def generate_autoencoder():
input_i = Input(shape=())
input_i = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True)(input_i)
encoded_h1 = Dense(1024, activation='relu', name='input_i')(input_i)
encoded_h2 = Dense(512, activation='relu', name='encoded_h1')(encoded_h1)
encoded_h3 = Dense(128, activation='relu', name='encoded_h2')(encoded_h2)
encoded_h4 = Dense(64, activation='relu', name='encoded_h3')(encoded_h3)
encoded_h5 = Dense(32, activation='relu', name='encoded_h4')(encoded_h4)
latent = Dense(2, activation='relu', name='encoded_h5')(encoded_h5)
decoder_h1 = Dense(32, activation='relu', name='latent')(latent)
decoder_h2 = Dense(64, activation='relu', name='decoder_h1')(decoder_h1)
decoder_h3 = Dense(128, activation='relu', name='decoder_h2')(decoder_h2)
decoder_h4 = Dense(512, activation='relu', name='decoder_h3')(decoder_h3)
decoder_h5 = Dense(1024, activation='relu', name='decoder_h4')(decoder_h4)
return input_i, Model(input_i, Dense(INPUT_SIZE, activation='sigmoid', name='decoder_h5')(decoder_h5))
input_i = Input(shape=(), name='input_i')
input_e = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True, name='input_e')(input_i)
encoded_h1 = Dense(1024, activation='relu', name='encoded_h1')(input_e)
encoded_h2 = Dense(512, activation='relu', name='encoded_h2')(encoded_h1)
encoded_h3 = Dense(128, activation='relu', name='encoded_h3')(encoded_h2)
encoded_h4 = Dense(64, activation='relu', name='encoded_h4')(encoded_h3)
encoded_h5 = Dense(32, activation='relu', name='encoded_h5')(encoded_h4)
latent = Dense(LATENT_SIZE, activation='relu', name='latent')(encoded_h5)
input_l = Input(shape=(LATENT_SIZE), name='input_l')
decoder_h1 = Dense(32, activation='relu', name='decoder_h1')(input_l)
decoder_h2 = Dense(64, activation='relu', name='decoder_h2')(decoder_h1)
decoder_h3 = Dense(128, activation='relu', name='decoder_h3')(decoder_h2)
decoder_h4 = Dense(512, activation='relu', name='decoder_h4')(decoder_h3)
decoder_h5 = Dense(1024, activation='relu', name='decoder_h5')(decoder_h4)
output_i = Dense(INPUT_SIZE, activation='sigmoid', name='output_i')(decoder_h5)
encoder = Model(input_e, latent, name='encoder')
decoder = Model(input_l, output_i, name='decoder')
return encoder, decoder, Model(input_e, decoder(encoder(input_e)), name='VAE')
def compile_autoencoder():
inp, autoencoder = generate_autoencoder()
encoder, decoder, autoencoder = generate_autoencoder()
autoencoder.compile(loss='mse', optimizer='adam', metrics=[tf.keras.metrics.Accuracy()])
return inp, autoencoder
return encoder, decoder, autoencoder
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if 'packet_event_name' not in json_dict:
@@ -49,50 +57,122 @@ def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
json_dict['packet_event_name'] != 'packet-flow':
return True
_, padded_pkts = global_user_data
buf = base64.b64decode(json_dict['pkt'], validate=True)
shutdown_event, training_event, padded_pkts = global_user_data
if shutdown_event.is_set():
return False
try:
buf = base64.b64decode(json_dict['pkt'], validate=True)
except binascii.Error as err:
sys.stderr.write('\nBase64 Exception: {}\n'.format(str(err)))
sys.stderr.write('Affected JSON: {}\n'.format(str(json_dict)))
sys.stderr.flush()
return False
# Generate decimal byte buffer with valus from 0-255
int_buf = []
for v in buf:
int_buf.append(int(v))
mat = np.array([int_buf])
mat = np.array([int_buf], dtype='float64')
# Normalize the values
mat = mat.astype('float32') / 255.
mat = mat.astype('float64') / 255.0
# Mean removal
matmean = np.mean(mat, axis=0)
matmean = np.mean(mat, dtype='float64')
mat -= matmean
# Pad resulting matrice
buf = preprocessing.sequence.pad_sequences(mat, padding="post", maxlen=INPUT_SIZE, truncating='post')
padded_pkts.append(buf[0])
buf = preprocessing.sequence.pad_sequences(mat, padding="post", maxlen=INPUT_SIZE, truncating='post', dtype='float64')
padded_pkts.put(buf[0])
sys.stdout.write('.')
sys.stdout.flush()
if (len(padded_pkts) % TRAINING_SIZE == 0):
print('\nGot {} packets, training..'.format(len(padded_pkts)))
tmp = np.array(padded_pkts)
history = autoencoder.fit(
tmp, tmp, epochs=10, batch_size=BATCH_SIZE,
validation_split=0.2,
shuffle=True
)
padded_pkts.clear()
#print(list(buf[0]))
#plot_model(autoencoder, show_shapes=True, show_layer_names=True)
#plt.plot(history.history['loss'])
#plt.plot(history.history['val_loss'])
#plt.title('model loss')
#plt.xlabel('loss')
#plt.ylabel('val_loss')
#plt.legend(['loss', 'val_loss'], loc='upper left')
#plt.show()
if not training_event.is_set():
sys.stdout.write('.')
sys.stdout.flush()
return True
def nDPIsrvd_worker(address, shared_shutdown_event, shared_training_event, shared_packet_list):
nsock = nDPIsrvdSocket()
try:
nsock.connect(address)
padded_pkts = list()
nsock.loop(onJsonLineRecvd, None, (shared_shutdown_event, shared_training_event, shared_packet_list))
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\nnDPIsrvd-Worker Socket Error: {}\n'.format(err))
except KeyboardInterrupt:
sys.stderr.write('\n')
except Exception as err:
sys.stderr.write('\nnDPIsrvd-Worker Exception: {}\n'.format(str(err)))
sys.stderr.flush()
shared_shutdown_event.set()
def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_event, shared_packet_queue):
shared_training_event.set()
if load_model is not None:
sys.stderr.write('Loading model from {}\n'.format(load_model))
sys.stderr.flush()
try:
encoder, decoder, autoencoder = joblib.load(load_model)
except:
sys.stderr.write('Could not load model from {}\n'.format(load_model))
sys.stderr.write('Compiling new Autoencoder..\n')
sys.stderr.flush()
encoder, decoder, autoencoder = compile_autoencoder()
else:
encoder, decoder, autoencoder = compile_autoencoder()
decoder.summary()
encoder.summary()
autoencoder.summary()
shared_training_event.clear()
try:
packets = list()
while not shared_shutdown_event.is_set():
try:
packet = shared_packet_queue.get(timeout=1)
except queue.Empty:
packet = None
if packet is None:
continue
packets.append(packet)
if len(packets) % TRAINING_SIZE == 0:
shared_training_event.set()
print('\nGot {} packets, training..'.format(len(packets)))
tmp = np.array(packets)
x_test_encoded = encoder.predict(tmp, batch_size=BATCH_SIZE)
history = autoencoder.fit(
tmp, tmp, epochs=EPOCH_COUNT, batch_size=BATCH_SIZE,
validation_split=0.2,
shuffle=True
)
packets.clear()
shared_training_event.clear()
except KeyboardInterrupt:
sys.stderr.write('\n')
except Exception as err:
if len(str(err)) == 0:
err = 'Unknown'
sys.stderr.write('\nKeras-Worker Exception: {}\n'.format(str(err)))
sys.stderr.flush()
if save_model is not None:
sys.stderr.write('Saving model to {}\n'.format(save_model))
sys.stderr.flush()
joblib.dump([encoder, decoder, autoencoder], save_model)
try:
shared_shutdown_event.set()
except:
pass
if __name__ == '__main__':
sys.stderr.write('\b\n***************\n')
sys.stderr.write('*** WARNING ***\n')
@@ -125,23 +205,38 @@ if __name__ == '__main__':
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.utils import plot_model
if args.load_model is not None:
sys.stderr.write('Loading model from {}\n'.format(args.load_model))
autoencoder, options = joblib.load(args.load_model)
else:
_, autoencoder = compile_autoencoder()
autoencoder.summary()
mgr = mp.Manager()
shared_training_event = mgr.Event()
shared_training_event.clear()
shared_shutdown_event = mgr.Event()
shared_shutdown_event.clear()
shared_packet_queue = mgr.JoinableQueue()
nDPIsrvd_job = mp.Process(target=nDPIsrvd_worker, args=(
address,
shared_shutdown_event,
shared_training_event,
shared_packet_queue
))
nDPIsrvd_job.start()
keras_job = mp.Process(target=keras_worker, args=(
args.load_model,
args.save_model,
shared_shutdown_event,
shared_training_event,
shared_packet_queue
))
keras_job.start()
nsock = nDPIsrvdSocket()
nsock.connect(address)
try:
padded_pkts = list()
nsock.loop(onJsonLineRecvd, None, (autoencoder, padded_pkts))
except nDPIsrvd.SocketConnectionBroken as err:
sys.stderr.write('\n{}\n'.format(err))
shared_shutdown_event.wait()
except KeyboardInterrupt:
print()
print('\nShutting down worker processess..')
if args.save_model is not None:
sys.stderr.write('Saving model to {}\n'.format(args.save_model))
joblib.dump([autoencoder, None], args.save_model)
nDPIsrvd_job.terminate()
nDPIsrvd_job.join()
keras_job.join()