keras-autoencoder.py: Improved Model

* added initial learning rate for Adam
 * plot some metrics using pyplot

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig
2023-08-20 23:05:08 +02:00
parent 4b3031245d
commit 86ac09a8db

View File

@@ -2,13 +2,23 @@
import base64 import base64
import binascii import binascii
import joblib import datetime as dt
import math
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import os import os
import queue import queue
import sys import sys
import tensorflow as tf
from tensorflow.keras import models, layers, preprocessing
from tensorflow.keras.layers import Embedding, Masking, Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import Adam
sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies') sys.path.append(os.path.dirname(sys.argv[0]) + '/../../dependencies')
sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId') sys.path.append(os.path.dirname(sys.argv[0]) + '/../share/nDPId')
sys.path.append(os.path.dirname(sys.argv[0])) sys.path.append(os.path.dirname(sys.argv[0]))
@@ -17,20 +27,28 @@ import nDPIsrvd
from nDPIsrvd import nDPIsrvdSocket, TermColor from nDPIsrvd import nDPIsrvdSocket, TermColor
INPUT_SIZE = nDPIsrvd.nDPId_PACKETS_PLEN_MAX INPUT_SIZE = nDPIsrvd.nDPId_PACKETS_PLEN_MAX
LATENT_SIZE = 8 LATENT_SIZE = 16
TRAINING_SIZE = 500 TRAINING_SIZE = 1024
EPOCH_COUNT = 5 EPOCH_COUNT = 50
BATCH_SIZE = 10 BATCH_SIZE = 256
LEARNING_RATE = 0.0000001
PLOT_HISTORY = 100
def generate_autoencoder(): def generate_autoencoder():
# TODO: The current model does handle *each* packet separatly.
# But in fact, depending on the nDPId settings (nDPId_PACKETS_PER_FLOW_TO_SEND), packets can be in relation to each other.
# The accuracy may (or may not) improve significantly, but some of changes in the code are required.
input_i = Input(shape=(), name='input_i') input_i = Input(shape=(), name='input_i')
input_e = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True, name='input_e')(input_i) input_e = Embedding(input_dim=INPUT_SIZE, output_dim=INPUT_SIZE, mask_zero=True, name='input_e')(input_i)
encoded_h1 = Dense(1024, activation='relu', name='encoded_h1')(input_e) masked_e = Masking(mask_value=0.0, name='masked_e')(input_e)
encoded_h2 = Dense(512, activation='relu', name='encoded_h2')(encoded_h1) encoded_h1 = Dense(4096, activation='relu', name='encoded_h1')(masked_e)
encoded_h3 = Dense(128, activation='relu', name='encoded_h3')(encoded_h2) encoded_h2 = Dense(2048, activation='relu', name='encoded_h2')(encoded_h1)
encoded_h4 = Dense(64, activation='relu', name='encoded_h4')(encoded_h3) encoded_h3 = Dense(1024, activation='relu', name='encoded_h3')(encoded_h2)
encoded_h5 = Dense(32, activation='relu', name='encoded_h5')(encoded_h4) encoded_h4 = Dense(512, activation='relu', name='encoded_h4')(encoded_h3)
latent = Dense(LATENT_SIZE, activation='relu', name='latent')(encoded_h5) encoded_h5 = Dense(128, activation='relu', name='encoded_h5')(encoded_h4)
encoded_h6 = Dense(64, activation='relu', name='encoded_h6')(encoded_h5)
encoded_h7 = Dense(32, activation='relu', name='encoded_h7')(encoded_h6)
latent = Dense(LATENT_SIZE, activation='relu', name='latent')(encoded_h7)
input_l = Input(shape=(LATENT_SIZE), name='input_l') input_l = Input(shape=(LATENT_SIZE), name='input_l')
decoder_h1 = Dense(32, activation='relu', name='decoder_h1')(input_l) decoder_h1 = Dense(32, activation='relu', name='decoder_h1')(input_l)
@@ -38,16 +56,28 @@ def generate_autoencoder():
decoder_h3 = Dense(128, activation='relu', name='decoder_h3')(decoder_h2) decoder_h3 = Dense(128, activation='relu', name='decoder_h3')(decoder_h2)
decoder_h4 = Dense(512, activation='relu', name='decoder_h4')(decoder_h3) decoder_h4 = Dense(512, activation='relu', name='decoder_h4')(decoder_h3)
decoder_h5 = Dense(1024, activation='relu', name='decoder_h5')(decoder_h4) decoder_h5 = Dense(1024, activation='relu', name='decoder_h5')(decoder_h4)
output_i = Dense(INPUT_SIZE, activation='sigmoid', name='output_i')(decoder_h5) decoder_h6 = Dense(2048, activation='relu', name='decoder_h6')(decoder_h5)
decoder_h7 = Dense(4096, activation='relu', name='decoder_h7')(decoder_h6)
output_i = Dense(INPUT_SIZE, activation='sigmoid', name='output_i')(decoder_h7)
encoder = Model(input_e, latent, name='encoder') encoder = Model(input_e, latent, name='encoder')
decoder = Model(input_l, output_i, name='decoder') decoder = Model(input_l, output_i, name='decoder')
return encoder, decoder, Model(input_e, decoder(encoder(input_e)), name='VAE') return Adam(learning_rate=LEARNING_RATE), Model(input_e, decoder(encoder(input_e)), name='VAE')
def compile_autoencoder(): def compile_autoencoder():
encoder, decoder, autoencoder = generate_autoencoder() optimizer, autoencoder = generate_autoencoder()
autoencoder.compile(loss='mse', optimizer='adam', metrics=[tf.keras.metrics.Accuracy()]) autoencoder.compile(loss='mean_squared_error', optimizer=optimizer, metrics=[])
return encoder, decoder, autoencoder return autoencoder
def get_autoencoder(load_from_file=None):
if load_from_file is None:
autoencoder = compile_autoencoder()
else:
autoencoder = models.load_model(load_from_file)
encoder_submodel = autoencoder.layers[1]
decoder_submodel = autoencoder.layers[2]
return encoder_submodel, decoder_submodel, autoencoder
def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data): def onJsonLineRecvd(json_dict, instance, current_flow, global_user_data):
if 'packet_event_name' not in json_dict: if 'packet_event_name' not in json_dict:
@@ -112,22 +142,14 @@ def nDPIsrvd_worker(address, shared_shutdown_event, shared_training_event, share
shared_shutdown_event.set() shared_shutdown_event.set()
def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_event, shared_packet_queue): def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_event, shared_packet_queue, shared_plot_queue):
shared_training_event.set() shared_training_event.set()
if load_model is not None:
sys.stderr.write('Loading model from {}\n'.format(load_model))
sys.stderr.flush()
try: try:
encoder, decoder, autoencoder = joblib.load(load_model) encoder, decoder, autoencoder = get_autoencoder(load_model)
except: except Exception as err:
sys.stderr.write('Could not load model from {}\n'.format(load_model)) sys.stderr.write('Could not load Keras model from file: {}\n'.format(str(err)))
sys.stderr.write('Compiling new Autoencoder..\n')
sys.stderr.flush() sys.stderr.flush()
encoder, decoder, autoencoder = compile_autoencoder() encoder, decoder, autoencoder = get_autoencoder()
else:
encoder, decoder, autoencoder = compile_autoencoder()
decoder.summary()
encoder.summary()
autoencoder.summary() autoencoder.summary()
shared_training_event.clear() shared_training_event.clear()
@@ -147,12 +169,17 @@ def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_
shared_training_event.set() shared_training_event.set()
print('\nGot {} packets, training..'.format(len(packets))) print('\nGot {} packets, training..'.format(len(packets)))
tmp = np.array(packets) tmp = np.array(packets)
x_test_encoded = encoder.predict(tmp, batch_size=BATCH_SIZE)
history = autoencoder.fit( history = autoencoder.fit(
tmp, tmp, epochs=EPOCH_COUNT, batch_size=BATCH_SIZE, tmp, tmp, epochs=EPOCH_COUNT, batch_size=BATCH_SIZE,
validation_split=0.2, validation_split=0.2,
shuffle=True shuffle=True
) )
reconstructed_data = autoencoder.predict(tmp)
mse = np.mean(np.square(tmp - reconstructed_data))
reconstruction_accuracy = (1.0 / mse)
encoded_data = encoder.predict(tmp)
latent_activations = encoder.predict(tmp)
shared_plot_queue.put((reconstruction_accuracy, history.history['loss'], encoded_data[:, 0], encoded_data[:, 1], latent_activations))
packets.clear() packets.clear()
shared_training_event.clear() shared_training_event.clear()
except KeyboardInterrupt: except KeyboardInterrupt:
@@ -166,13 +193,80 @@ def keras_worker(load_model, save_model, shared_shutdown_event, shared_training_
if save_model is not None: if save_model is not None:
sys.stderr.write('Saving model to {}\n'.format(save_model)) sys.stderr.write('Saving model to {}\n'.format(save_model))
sys.stderr.flush() sys.stderr.flush()
joblib.dump([encoder, decoder, autoencoder], save_model) autoencoder.save(save_model)
try: try:
shared_shutdown_event.set() shared_shutdown_event.set()
except: except:
pass pass
def plot_animate(i, shared_plot_queue, ax, xs, ys):
if not shared_plot_queue.empty():
accuracy, loss, encoded_data0, encoded_data1, latent_activations = shared_plot_queue.get(timeout=1)
epochs = len(loss)
loss_mean = sum(loss) / epochs
else:
return
(ax1, ax2, ax3, ax4) = ax
(ys1, ys2, ys3, ys4) = ys
if len(xs) == 0:
xs.append(epochs)
else:
xs.append(xs[-1] + epochs)
ys1.append(accuracy)
ys2.append(loss_mean)
xs = xs[-PLOT_HISTORY:]
ys1 = ys1[-PLOT_HISTORY:]
ys2 = ys2[-PLOT_HISTORY:]
ax1.clear()
ax1.plot(xs, ys1, '-')
ax2.clear()
ax2.plot(xs, ys2, '-')
ax3.clear()
ax3.scatter(encoded_data0, encoded_data1, marker='.')
ax4.clear()
ax4.imshow(latent_activations, cmap='viridis', aspect='auto')
ax1.set_xlabel('Epoch Count')
ax1.set_ylabel('Accuracy')
ax2.set_xlabel('Epoch Count')
ax2.set_ylabel('Loss')
ax3.set_title('Latent Space')
ax4.set_title('Latent Space Heatmap')
ax4.set_xlabel('Latent Dimensions')
ax4.set_ylabel('Datapoints')
def plot_worker(shared_shutdown_event, shared_plot_queue):
try:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2)
fig.tight_layout()
ax1.set_xlabel('Epoch Count')
ax1.set_ylabel('Accuracy')
ax2.set_xlabel('Epoch Count')
ax2.set_ylabel('Loss')
ax3.set_title('Latent Space')
ax4.set_title('Latent Space Heatmap')
ax4.set_xlabel('Latent Dimensions')
ax4.set_ylabel('Datapoints')
xs = []
ys1 = []
ys2 = []
ys3 = []
ys4 = []
x = 0
ani = animation.FuncAnimation(fig, plot_animate, fargs=(shared_plot_queue, (ax1, ax2, ax3, ax4), xs, (ys1, ys2, ys3, ys4)), interval=1000, cache_frame_data=False)
plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05)
plt.show()
except Exception as err:
sys.stderr.write('\nPlot-Worker Exception: {}\n'.format(str(err)))
sys.stderr.flush()
shared_shutdown_event.set()
return
if __name__ == '__main__': if __name__ == '__main__':
sys.stderr.write('\b\n***************\n') sys.stderr.write('\b\n***************\n')
sys.stderr.write('*** WARNING ***\n') sys.stderr.write('*** WARNING ***\n')
@@ -189,21 +283,25 @@ if __name__ == '__main__':
help='Set the amount of captured packets required to start the training phase.') help='Set the amount of captured packets required to start the training phase.')
argparser.add_argument('--batch-size', action='store', type=int, argparser.add_argument('--batch-size', action='store', type=int,
help='Set the batch size used for the training phase.') help='Set the batch size used for the training phase.')
argparser.add_argument('--learning-rate', action='store', type=float,
help='Set the (initial!) learning rate for the Adam optimizer.')
argparser.add_argument('--plot', action='store_true', default=False,
help='Show some model metrics using pyplot.')
argparser.add_argument('--plot-history', action='store', type=int,
help='Set the history size of Line plots. Requires --plot')
args = argparser.parse_args() args = argparser.parse_args()
address = nDPIsrvd.validateAddress(args) address = nDPIsrvd.validateAddress(args)
LEARNING_RATE = args.learning_rate if args.learning_rate is not None else LEARNING_RATE
TRAINING_SIZE = args.training_size if args.training_size is not None else TRAINING_SIZE TRAINING_SIZE = args.training_size if args.training_size is not None else TRAINING_SIZE
BATCH_SIZE = args.batch_size if args.batch_size is not None else BATCH_SIZE BATCH_SIZE = args.batch_size if args.batch_size is not None else BATCH_SIZE
if args.plot is False and args.plot_history is not None:
raise RuntimeError('--plot-history requires --plot')
PLOT_HISTORY = args.plot_history if args.plot_history is not None else PLOT_HISTORY
sys.stderr.write('Recv buffer size: {}\n'.format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE)) sys.stderr.write('Recv buffer size: {}\n'.format(nDPIsrvd.NETWORK_BUFFER_MAX_SIZE))
sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address)) sys.stderr.write('Connecting to {} ..\n'.format(address[0]+':'+str(address[1]) if type(address) is tuple else address))
sys.stderr.write('TRAINING_SIZE={}, BATCH_SIZE={}\n\n'.format(TRAINING_SIZE, BATCH_SIZE)) sys.stderr.write('PLOT={}, PLOT_HISTORY={}, LEARNING_RATE={}, TRAINING_SIZE={}, BATCH_SIZE={}\n\n'.format(args.plot, PLOT_HISTORY, LEARNING_RATE, TRAINING_SIZE, BATCH_SIZE))
import tensorflow as tf
from tensorflow.keras import layers, preprocessing
from tensorflow.keras.layers import Embedding, Input, Dense
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.utils import plot_model
mgr = mp.Manager() mgr = mp.Manager()
@@ -214,6 +312,7 @@ if __name__ == '__main__':
shared_shutdown_event.clear() shared_shutdown_event.clear()
shared_packet_queue = mgr.JoinableQueue() shared_packet_queue = mgr.JoinableQueue()
shared_plot_queue = mgr.JoinableQueue()
nDPIsrvd_job = mp.Process(target=nDPIsrvd_worker, args=( nDPIsrvd_job = mp.Process(target=nDPIsrvd_worker, args=(
address, address,
@@ -228,15 +327,23 @@ if __name__ == '__main__':
args.save_model, args.save_model,
shared_shutdown_event, shared_shutdown_event,
shared_training_event, shared_training_event,
shared_packet_queue shared_packet_queue,
shared_plot_queue
)) ))
keras_job.start() keras_job.start()
if args.plot is True:
plot_job = mp.Process(target=plot_worker, args=(shared_shutdown_event, shared_plot_queue))
plot_job.start()
try: try:
shared_shutdown_event.wait() shared_shutdown_event.wait()
except KeyboardInterrupt: except KeyboardInterrupt:
print('\nShutting down worker processess..') print('\nShutting down worker processess..')
if args.plot is True:
plot_job.terminate()
plot_job.join()
nDPIsrvd_job.terminate() nDPIsrvd_job.terminate()
nDPIsrvd_job.join() nDPIsrvd_job.join()
keras_job.join() keras_job.join()