mirror of
https://github.com/outbackdingo/openlan-cgw.git
synced 2026-01-27 10:19:56 +00:00
Testing: introduce initial automated testing _core_ changes
Implement initial core for automated tests: * mostly resuses/extends existing simulator, kafka producer etc; * helper scripts added for manual testing (single infra/group add/del etc) * Tests compose either of simple steps premade into fixtures, or combine usage of both fixtures + custom code inside test's funcion Signed-off-by: Oleksandr Mazur <oleksandr.mazur@plvision.eu>
This commit is contained in:
11
README.md
11
README.md
@@ -162,3 +162,14 @@ There are several environment variable to configure certificates path and names
|
||||
The infrastructure connectivity use root certs store - the directory with trusted certificates
|
||||
The environemt variable to configure certificates path:
|
||||
1. CGW_NB_INFRA_CERTS_PATH - path to certificates located on host machine
|
||||
|
||||
# Automated Testing
|
||||
Automated python-based tests are located inside the *tests* directory.
|
||||
Currently, tests should be run manually by changin PWD to *tests* and launching helper script *run.sh*:
|
||||
```console
|
||||
cd ./test
|
||||
./run.sh
|
||||
```
|
||||
*NOTE:* currently, tests are not running inside a container.
|
||||
This means, that it's up to the caller make sure tests can communicate with whatever CGW's deployment as well as thirdparty services.
|
||||
E.g. tests inside running *host* enviroment must be able to communicate with CGW, Redis, Kafka, PGSQL etc.
|
||||
|
||||
@@ -142,6 +142,7 @@ echo "CGW UCENTRAL SWITCH DATAMODEL URI : $CGW_UCENTRAL_SWITCH_DATAMODEL_URI"
|
||||
docker run \
|
||||
-p 15002:15002 \
|
||||
-p 50051:50051 \
|
||||
-p 8080:8080 \
|
||||
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
|
||||
-v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \
|
||||
-v $CGW_NB_INFRA_CERTS_PATH:$CONTAINTER_NB_INFRA_CERTS_VOLUME \
|
||||
|
||||
168
tests/conftest.py
Normal file
168
tests/conftest.py
Normal file
@@ -0,0 +1,168 @@
|
||||
import pytest
|
||||
import ssl
|
||||
import json
|
||||
import time
|
||||
from client_simulator.src.simulation_runner import Device as DeviceSimulator
|
||||
from kafka_producer.src.producer import Producer as KafkaProducer
|
||||
from kafka_producer.src.consumer import Consumer as KafkaConsumer
|
||||
import requests
|
||||
from typing import List, Tuple
|
||||
import random
|
||||
|
||||
|
||||
# Device connection, kafka wrappers etc
|
||||
class TestContext:
|
||||
@staticmethod
|
||||
def default_dev_sim_mac() -> str:
|
||||
return "02-00-00-00-00-00"
|
||||
|
||||
@staticmethod
|
||||
def default_kafka_group() -> str:
|
||||
return '9999'
|
||||
|
||||
def __init__(self):
|
||||
device = DeviceSimulator(
|
||||
mac=self.default_dev_sim_mac(),
|
||||
server='wss://localhost:15002',
|
||||
ca_cert='./ca-certs/ca.crt',
|
||||
msg_interval=10, msg_size=1024,
|
||||
client_cert='./certs/base.crt', client_key='./certs/base.key', check_cert=False,
|
||||
start_event=None, stop_event=None)
|
||||
|
||||
# Server cert CN? don't care, ignore
|
||||
device.ssl_context.check_hostname = False
|
||||
device.ssl_context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
# Tweak connect message to change initial FW version:
|
||||
# Any latter steps might want to change it to something else
|
||||
# (test capabilities change, for example);
|
||||
# However, we're making a fixture, hence all values must be the same
|
||||
# on the initial step.
|
||||
connect_msg = json.loads(device.messages.connect)
|
||||
connect_msg['params']['firmware'] = "Test_FW_A"
|
||||
connect_msg['params']['uuid'] = 1
|
||||
device.messages.connect = json.dumps(connect_msg)
|
||||
|
||||
self.device_sim = device
|
||||
|
||||
producer = KafkaProducer(db='localhost:9092', topic='CnC')
|
||||
consumer = KafkaConsumer(db='localhost:9092', topic='CnC_Res', consumer_timeout=3000)
|
||||
|
||||
self.kafka_producer = producer
|
||||
self.kafka_consumer = consumer
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def test_context():
|
||||
ctx = TestContext()
|
||||
|
||||
yield ctx
|
||||
|
||||
ctx.device_sim.disconnect()
|
||||
|
||||
# Let's make sure we destroy default group after we're done with tests.
|
||||
if ctx.kafka_producer.is_connected():
|
||||
ctx.kafka_producer.handle_single_group_delete(ctx.default_kafka_group())
|
||||
|
||||
# We have to clear any messages after done working with kafka
|
||||
if ctx.kafka_consumer.is_connected():
|
||||
ctx.kafka_consumer.flush()
|
||||
|
||||
ctx.kafka_producer.disconnect()
|
||||
ctx.kafka_consumer.disconnect()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def cgw_probe(test_context):
|
||||
try:
|
||||
r = requests.get("http://localhost:8080/health")
|
||||
print("CGW status: " + str(r.status_code) + ', txt:' + r.text)
|
||||
assert r is not None and r.status_code == 200, \
|
||||
f"CGW is in a bad state (health != 200), can't proceed"
|
||||
except:
|
||||
raise Exception('CGW health fetch failed (Not running?)')
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def kafka_probe(test_context):
|
||||
try:
|
||||
test_context.kafka_producer.connect()
|
||||
test_context.kafka_consumer.connect()
|
||||
except:
|
||||
raise Exception('Failed to connect to kafka broker! Either CnC, CnC_Res topics are unavailable, or broker is down (not running)')
|
||||
|
||||
# Let's make sure default group is always deleted.
|
||||
test_context.kafka_producer.handle_single_group_delete(test_context.default_kafka_group())
|
||||
|
||||
# We have to clear any messages before we can work with kafka
|
||||
test_context.kafka_consumer.flush()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def device_sim_connect(test_context):
|
||||
# Make sure we initiate connect;
|
||||
# If this thing throws - any tests that depend on this ficture would fail.
|
||||
test_context.device_sim.connect()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def device_sim_reconnect(test_context):
|
||||
assert test_context.device_sim._socket is not None, \
|
||||
f"Expected websocket connection to execute a reconnect while socket is not connected!"
|
||||
|
||||
time.sleep(1)
|
||||
test_context.device_sim.disconnect()
|
||||
assert test_context.device_sim._socket is None, \
|
||||
f"Expected websocket connection to be NULL after disconnect."
|
||||
time.sleep(1)
|
||||
|
||||
test_context.device_sim.connect()
|
||||
assert test_context.device_sim._socket is not None, \
|
||||
f"Expected websocket connection NOT to be NULL after reconnect."
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def device_sim_send_ucentral_connect(test_context):
|
||||
assert test_context.device_sim._socket is not None, \
|
||||
f"Expected websocket connection to send a connect ucentral event while socket is not connected!"
|
||||
|
||||
test_context.device_sim.send_hello(test_context.device_sim._socket)
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def kafka_default_infra_group(test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
|
||||
uuid_val = random.randint(1, 100)
|
||||
default_group = test_context.default_kafka_group()
|
||||
|
||||
test_context.kafka_producer.handle_single_group_create(test_context.default_kafka_group(), uuid_val)
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val)
|
||||
if not ret_msg:
|
||||
print('Failed to receive create group result, was expecting ' + str(uuid_val) + ' uuid reply')
|
||||
raise Exception('Failed to receive create group result when expected')
|
||||
|
||||
if ret_msg.value['success'] is False:
|
||||
print(ret_msg.value['error_message'])
|
||||
raise Exception('Default infra group creation failed!')
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def kafka_default_infra(test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
|
||||
uuid_val = random.randint(1, 100)
|
||||
default_group = test_context.default_kafka_group()
|
||||
default_infra_mac = test_context.default_dev_sim_mac()
|
||||
|
||||
test_context.kafka_producer.handle_single_device_assign(default_group, default_infra_mac, uuid_val)
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val)
|
||||
if ret_msg is None:
|
||||
print('Failed to receive infra assign result, was expecting ' + str(uuid_val) + ' uuid reply')
|
||||
raise Exception('Failed to receive infra assign result when expected')
|
||||
|
||||
if ret_msg.value['success'] is False:
|
||||
print(ret_msg.value['error_message'])
|
||||
raise Exception('Default infra group creation failed!')
|
||||
14
tests/run.sh
Executable file
14
tests/run.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Separate exports for clearer visibility of _what exactly_
|
||||
# we're putting in python path
|
||||
export PYTHONPATH="$PYTHONPATH:$PWD"
|
||||
export PYTHONPATH="$PYTHONPATH:$PWD/../utils"
|
||||
|
||||
ln -sf ../utils/client_simulator/sim_data sim_data
|
||||
ln -sf ../utils/kafka_producer/kafka_data kafka_data
|
||||
ln -sf ../utils/cert_generator/certs/client/ certs
|
||||
ln -sf ../utils/cert_generator/certs/ca/ ca-certs
|
||||
|
||||
pytest -v
|
||||
#pytest -v -s .
|
||||
132
tests/test_cgw_basic.py
Normal file
132
tests/test_cgw_basic.py
Normal file
@@ -0,0 +1,132 @@
|
||||
import pytest
|
||||
import json
|
||||
import random
|
||||
|
||||
class TestCgwBasic:
|
||||
# Base test:
|
||||
# - test_context can be created - 'tests core' alloc / create
|
||||
# - tests can connect to kafka broker
|
||||
# - CGW is up
|
||||
# - TODO:
|
||||
# * redis is up
|
||||
# * PGSQL is up
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe")
|
||||
def test_basic_probe(self, test_context):
|
||||
pass
|
||||
|
||||
# Base test:
|
||||
# - tests kafka client can create / receive messages through kafka bus
|
||||
# - test infra group can be successfully created
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"kafka_default_infra_group")
|
||||
def test_kafka_sanity(self, test_context):
|
||||
pass
|
||||
|
||||
# Base test:
|
||||
# - test infra can be addded successfully to the default infra group
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"kafka_default_infra_group",
|
||||
"kafka_default_infra")
|
||||
def test_kafka_basic(self, test_context):
|
||||
pass
|
||||
|
||||
# Base test:
|
||||
# - certificates can be found / Used
|
||||
# - device sim can connect to CGW
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"device_sim_connect")
|
||||
def test_device_sim_sanity(self, test_context):
|
||||
pass
|
||||
|
||||
# Base test:
|
||||
# - device sim can send connect message to cgw
|
||||
# - kafka client can verify (pull msgs from kafka bus)
|
||||
# that simulator's indeed connected
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"device_sim_connect",
|
||||
"device_sim_send_ucentral_connect")
|
||||
def test_device_sim_base(self, test_context):
|
||||
pass
|
||||
|
||||
# Base test:
|
||||
# - unassigned infra connects to CGW, and kafka sim can validate it
|
||||
# through the <infra_join> msg
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"device_sim_connect",
|
||||
"device_sim_send_ucentral_connect")
|
||||
def test_unassigned_infra_base(self, test_context):
|
||||
join_message_received = False
|
||||
infra_is_unassigned = False
|
||||
messages = test_context.kafka_consumer.get_msgs()
|
||||
msg_mac = test_context.default_dev_sim_mac()
|
||||
|
||||
assert messages,\
|
||||
f"Failed to receive any messages (events) from sim-device, while expected connect / infra_join"
|
||||
|
||||
if not messages:
|
||||
raise Exception('Failed to receive infra assign result when expected')
|
||||
|
||||
# Expecting TWO messages to be present in the message list
|
||||
for message in messages:
|
||||
if message.value['type'] == 'infra_join' and message.key == b'0' and message.value['infra_group_infra'] == msg_mac:
|
||||
join_message_received = True
|
||||
continue
|
||||
|
||||
if message.value['type'] == 'unassigned_infra_connection' and message.key == b'0' and message.value['infra_group_infra'] == msg_mac:
|
||||
infra_is_unassigned = True
|
||||
continue
|
||||
|
||||
assert join_message_received,\
|
||||
f"Failed to find 'infra_join' message for default infra MAC"
|
||||
|
||||
assert infra_is_unassigned,\
|
||||
f"Failed to find unassigned 'unassigned_infra_connection' message for default infra MAC"
|
||||
|
||||
|
||||
# Base test:
|
||||
# - assigned infra connects to CGW, and kafka sim can validate it
|
||||
# through the <infra_join> msg + kafka key
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"kafka_default_infra_group",
|
||||
"kafka_default_infra",
|
||||
"device_sim_connect",
|
||||
"device_sim_send_ucentral_connect")
|
||||
def test_assigned_infra_base(self, test_context):
|
||||
join_message_received = False
|
||||
infra_is_assigned = False
|
||||
messages = test_context.kafka_consumer.get_msgs()
|
||||
msg_mac = test_context.default_dev_sim_mac()
|
||||
default_group = test_context.default_kafka_group().encode('utf-8')
|
||||
|
||||
assert messages,\
|
||||
f"Failed to receive any messages (events) from sim-device, while expected connect / infra_join"
|
||||
|
||||
if not messages:
|
||||
raise Exception('Failed to receive infra assign result when expected')
|
||||
|
||||
# We can deduce whether infra's assigned by inspecting a single msg
|
||||
for message in messages:
|
||||
if message.value['type'] == 'infra_join' and message.value['infra_group_infra'] == msg_mac:
|
||||
join_message_received = True
|
||||
if message.key == default_group and str(message.value['infra_group_id']).encode('utf-8') == default_group:
|
||||
infra_is_assigned = True
|
||||
break
|
||||
|
||||
assert join_message_received,\
|
||||
f"Failed to find 'infra_join' message for default infra MAC"
|
||||
|
||||
assert infra_is_assigned,\
|
||||
f"While detected join message for default infra MAC, expected it to be assigned to group (key != default group id)"
|
||||
@@ -9,7 +9,7 @@ CA_CERT_PATH=./tipcerts
|
||||
CLIENT_CERT_PATH=$(pwd)/../cert_generator/certs/client
|
||||
CLIENT_CERT_PATH=./tipcerts
|
||||
#--no-cert-check
|
||||
python3 single.py --mac "$MAC" \
|
||||
PYTHONPATH="$PYTHONPATH:$PWD:$PWD/src/" python3 single.py --mac "$MAC" \
|
||||
--server "$URL" \
|
||||
--ca-cert "$CA_CERT_PATH/ca.crt" \
|
||||
--client-certs-path "$CLIENT_CERT_PATH" \
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
from utils import get_msg_templates, Args
|
||||
from log import logger
|
||||
from .utils import get_msg_templates, Args
|
||||
from .log import logger
|
||||
from websockets.sync import client
|
||||
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError, ConnectionClosed
|
||||
from websockets.frames import *
|
||||
@@ -20,6 +20,7 @@ import ssl
|
||||
import os
|
||||
import re
|
||||
|
||||
|
||||
class Message:
|
||||
def __init__(self, mac: str, size: int):
|
||||
tmp_mac = copy.deepcopy(mac)
|
||||
|
||||
@@ -7,7 +7,7 @@ import re
|
||||
import os
|
||||
|
||||
|
||||
TEMPLATE_LOCATION = "./data/message_templates.json"
|
||||
TEMPLATE_LOCATION = "./sim_data/message_templates.json"
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
3
utils/kafka_producer/run.sh
Executable file
3
utils/kafka_producer/run.sh
Executable file
@@ -0,0 +1,3 @@
|
||||
#!/bin/bash
|
||||
|
||||
PYTHONPATH="$PYTHONPATH:$PWD:$PWD/src/" python3 main.py $@
|
||||
12
utils/kafka_producer/single_group_add.sh
Executable file
12
utils/kafka_producer/single_group_add.sh
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Although we're looking for broker at localhost,
|
||||
# broker can still direct us to some <docker-broker-1>
|
||||
# so it's up to the caller to either run this script inside
|
||||
# the same network instance as broker, or create a static
|
||||
# hostname entry to point <docker-broker-1>, for example,
|
||||
# to whenever it resides.
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
./run.sh -s localhost:9092 -c 1 --new-group $1 0 some_name_0
|
||||
12
utils/kafka_producer/single_group_del.sh
Executable file
12
utils/kafka_producer/single_group_del.sh
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Although we're looking for broker at localhost,
|
||||
# broker can still direct us to some <docker-broker-1>
|
||||
# so it's up to the caller to either run this script inside
|
||||
# the same network instance as broker, or create a static
|
||||
# hostname entry to point <docker-broker-1>, for example,
|
||||
# to whenever it resides.
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
./run.sh -s localhost:9092 --rm-group $1
|
||||
13
utils/kafka_producer/single_infra_add.sh
Executable file
13
utils/kafka_producer/single_infra_add.sh
Executable file
@@ -0,0 +1,13 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Although we're looking for broker at localhost,
|
||||
# broker can still direct us to some <docker-broker-1>
|
||||
# so it's up to the caller to either run this script inside
|
||||
# the same network instance as broker, or create a static
|
||||
# hostname entry to point <docker-broker-1>, for example,
|
||||
# to whenever it resides.
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
# $2 - mac address
|
||||
./run.sh -s localhost:9092 --assign-to-group $1 "$2^1"
|
||||
13
utils/kafka_producer/single_infra_del.sh
Executable file
13
utils/kafka_producer/single_infra_del.sh
Executable file
@@ -0,0 +1,13 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Although we're looking for broker at localhost,
|
||||
# broker can still direct us to some <docker-broker-1>
|
||||
# so it's up to the caller to either run this script inside
|
||||
# the same network instance as broker, or create a static
|
||||
# hostname entry to point <docker-broker-1>, for example,
|
||||
# to whenever it resides.
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
# $2 - mac address
|
||||
./run.sh -s localhost:9092 --remove-from-group $1 "$2^4"
|
||||
14
utils/kafka_producer/single_infra_msg.sh
Executable file
14
utils/kafka_producer/single_infra_msg.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/bin/sh
|
||||
|
||||
# Although we're looking for broker at localhost,
|
||||
# broker can still direct us to some <docker-broker-1>
|
||||
# so it's up to the caller to either run this script inside
|
||||
# the same network instance as broker, or create a static
|
||||
# hostname entry to point <docker-broker-1>, for example,
|
||||
# to whenever it resides.
|
||||
#
|
||||
# ARGS:
|
||||
# $1 - group id
|
||||
# $2 - mac address
|
||||
# $3 - file name containing complete uCentral request
|
||||
./run.sh --send-to-group $1 --send-to-mac $2^1 -c 1 -m "`cat $3`" 2>/dev/null
|
||||
98
utils/kafka_producer/src/consumer.py
Normal file
98
utils/kafka_producer/src/consumer.py
Normal file
@@ -0,0 +1,98 @@
|
||||
from .utils import Message, MacRange
|
||||
from .log import logger
|
||||
|
||||
from typing import List, Tuple
|
||||
from kafka.structs import OffsetAndMetadata
|
||||
import kafka
|
||||
import time
|
||||
import uuid
|
||||
import sys
|
||||
import re
|
||||
import json
|
||||
|
||||
|
||||
class Consumer:
|
||||
def __init__(self, db: str, topic: str, consumer_timeout: int) -> None:
|
||||
self.db = db
|
||||
self.conn = None
|
||||
self.topic = topic
|
||||
self.consumer_timeout = consumer_timeout
|
||||
self.message = Message()
|
||||
|
||||
def __enter__(self) -> kafka.KafkaConsumer:
|
||||
return self.connect()
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def connect(self) -> kafka.KafkaConsumer:
|
||||
if self.is_connected() is False:
|
||||
self.conn = kafka.KafkaConsumer(self.topic,
|
||||
bootstrap_servers=self.db,
|
||||
client_id="consumer_1",
|
||||
group_id="cgw_tests_consumer",
|
||||
auto_offset_reset='latest',
|
||||
enable_auto_commit=True,
|
||||
consumer_timeout_ms=self.consumer_timeout,
|
||||
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
|
||||
logger.info("consumer: connected to kafka")
|
||||
else:
|
||||
logger.info("consumer: already connected to kafka")
|
||||
return self.conn
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if self.is_connected() is False:
|
||||
return
|
||||
self.conn.close()
|
||||
logger.info("consumer: disconnected from kafka")
|
||||
self.conn = None
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
return self.conn is not None
|
||||
|
||||
def flush(self):
|
||||
assert self.is_connected(), \
|
||||
f"consumer: Cannot flush kafka topic while not connected!"
|
||||
|
||||
for message in self.conn:
|
||||
logger.debug("consumer: Flushed kafka msg: %s key=%s value=%s ts=%s" %
|
||||
(message.topic,message.key, message.value, message.timestamp))
|
||||
|
||||
def get_msgs(self):
|
||||
res_list = []
|
||||
|
||||
assert self.is_connected(),\
|
||||
f"consumer: Cannot get Kafka result msg, Not connected!"
|
||||
|
||||
for message in self.conn:
|
||||
res_list.append(message)
|
||||
logger.debug("consumer: Recv kafka msg: %s key=%s value=%s ts=%s" %
|
||||
(message.topic, message.key, message.value, message.timestamp))
|
||||
|
||||
return res_list
|
||||
|
||||
def get_result_msg(self, uuid_val: int):
|
||||
res_uuid = str(uuid.UUID(int=uuid_val))
|
||||
|
||||
assert self.is_connected(),\
|
||||
f"consumer: Cannot get Kafka result msg, Not connected!"
|
||||
|
||||
for message in self.conn:
|
||||
logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" %
|
||||
(message.topic, message.key, message.value, message.timestamp))
|
||||
if res_uuid == message.value['uuid']:
|
||||
return message
|
||||
|
||||
def get_msg_by_substring(self, substring: int):
|
||||
res_uuid = uuid.UUID(int=uuid_val)
|
||||
|
||||
assert self.is_connected(),\
|
||||
f"Cannot get Kafka result msg, Not connected!"
|
||||
|
||||
for message in self.conn:
|
||||
res_list.append(message)
|
||||
if re.search(substring, message.value):
|
||||
logger.debug("Found '%s' in kafka msg: %s key=%s value=%s ts=%s" %
|
||||
(substring, message.topic, message.key, message.value, message.timestamp))
|
||||
return message
|
||||
@@ -6,6 +6,7 @@ import kafka
|
||||
import time
|
||||
import uuid
|
||||
import sys
|
||||
import random
|
||||
|
||||
|
||||
class Producer:
|
||||
@@ -22,20 +23,40 @@ class Producer:
|
||||
self.disconnect()
|
||||
|
||||
def connect(self) -> kafka.KafkaProducer:
|
||||
if self.conn is None:
|
||||
if self.is_connected() is False:
|
||||
self.conn = kafka.KafkaProducer(bootstrap_servers=self.db, client_id="producer")
|
||||
logger.info("connected to kafka")
|
||||
logger.info("producer: connected to kafka")
|
||||
else:
|
||||
logger.info("already connected to kafka")
|
||||
logger.info("producer: already connected to kafka")
|
||||
raise Exception('')
|
||||
return self.conn
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if self.conn is None:
|
||||
if self.is_connected() is False:
|
||||
return
|
||||
self.conn.close()
|
||||
logger.info("disconnected from kafka")
|
||||
logger.info("producer: disconnected from kafka")
|
||||
self.conn = None
|
||||
|
||||
def is_connected(self) -> bool:
|
||||
return self.conn is not None
|
||||
|
||||
def handle_single_group_delete(self, group: str, uuid_val: int = None):
|
||||
if group is None:
|
||||
raise Exception('producer: Cannot destroy group without group_id specified!')
|
||||
|
||||
self.conn.send(self.topic, self.message.group_delete(group, uuid_val),
|
||||
bytes(group, encoding="utf-8"))
|
||||
self.conn.flush()
|
||||
|
||||
def handle_single_group_create(self, group: str, uuid_val: int = None):
|
||||
if group is None:
|
||||
raise Exception('producer: Cannot create new group without group id specified!')
|
||||
|
||||
self.conn.send(self.topic, self.message.group_create(group, 0, "cgw_default_group_name", uuid_val),
|
||||
bytes(group, encoding="utf-8"))
|
||||
self.conn.flush()
|
||||
|
||||
def handle_group_creation(self, create: List[Tuple[str, int, str]], delete: List[str]) -> None:
|
||||
with self as conn:
|
||||
for group, shard_id, name in create:
|
||||
@@ -44,6 +65,33 @@ class Producer:
|
||||
for group in delete:
|
||||
conn.send(self.topic, self.message.group_delete(group),
|
||||
bytes(group, encoding="utf-8"))
|
||||
conn.flush()
|
||||
|
||||
def handle_single_device_assign(self, group: str, mac: str, uuid_val: int):
|
||||
if group is None:
|
||||
raise Exception('producer: Cannot assign infra to group without group id specified!')
|
||||
|
||||
if mac is None:
|
||||
raise Exception('producer: Cannot assign infra to group without infra MAC specified!')
|
||||
|
||||
mac_range = MacRange(mac)
|
||||
|
||||
self.conn.send(self.topic, self.message.add_dev_to_group(group, mac_range, uuid_val),
|
||||
bytes(group, encoding="utf-8"))
|
||||
self.conn.flush()
|
||||
|
||||
def handle_single_device_deassign(self, group: str, mac: str):
|
||||
if group is None:
|
||||
raise Exception('Cannot deassign infra from group without group id specified!')
|
||||
|
||||
if mac is None:
|
||||
raise Exception('Cannot deassign infra from group without infra MAC specified!')
|
||||
|
||||
mac_range = MacRange(mac)
|
||||
|
||||
self.conn.send(self.topic, self.message.remove_dev_from_group(group, mac_range),
|
||||
bytes(group, encoding="utf-8"))
|
||||
conn.flush()
|
||||
|
||||
def handle_device_assignment(self, add: List[Tuple[str, MacRange]], remove: List[Tuple[str, MacRange]]) -> None:
|
||||
with self as conn:
|
||||
@@ -54,6 +102,7 @@ class Producer:
|
||||
for group, mac_range in remove:
|
||||
conn.send(self.topic, self.message.remove_dev_from_group(group, mac_range),
|
||||
bytes(group, encoding="utf-8"))
|
||||
conn.flush()
|
||||
|
||||
def handle_device_messages(self, message: dict, group: str, mac_range: MacRange,
|
||||
count: int, time_s: int, interval_s: int) -> None:
|
||||
@@ -69,6 +118,7 @@ class Producer:
|
||||
for mac in mac_range:
|
||||
conn.send(self.topic, self.message.to_device(group, mac, message, seq),
|
||||
bytes(group, encoding="utf-8"))
|
||||
conn.flush()
|
||||
#time.sleep(interval_s)
|
||||
#if time.time() > end:
|
||||
# break
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Tuple
|
||||
from typing import Tuple
|
||||
import copy
|
||||
import json
|
||||
import uuid
|
||||
@@ -23,6 +22,8 @@ class MacRange:
|
||||
Raises ValueError
|
||||
"""
|
||||
def __init__(self, input: str = "XX:XX:XX:XX:XX:XX") -> None:
|
||||
input = input.replace("-", ":", 5)
|
||||
|
||||
self.__base_as_num, self.__len = self.__parse_input(input.upper())
|
||||
self.__idx = 0
|
||||
|
||||
@@ -53,7 +54,9 @@ class MacRange:
|
||||
|
||||
@staticmethod
|
||||
def mac2num(mac: str) -> int:
|
||||
return int(mac.replace(":", ""), base=16)
|
||||
mac = mac.replace(":", "", 5)
|
||||
mac = mac.replace("-", "", 5)
|
||||
return int(mac, base=16)
|
||||
|
||||
@staticmethod
|
||||
def num2mac(mac: int) -> str:
|
||||
@@ -78,7 +81,7 @@ class MacRange:
|
||||
|
||||
|
||||
class Message:
|
||||
TEMPLATE_FILE = "./data/message_template.json"
|
||||
TEMPLATE_FILE = "./kafka_data/message_template.json"
|
||||
GROUP_ADD = "add_group"
|
||||
GROUP_DEL = "del_group"
|
||||
DEV_TO_GROUP = "add_to_group"
|
||||
@@ -96,35 +99,42 @@ class Message:
|
||||
with open(self.TEMPLATE_FILE) as f:
|
||||
self.templates = json.loads(f.read())
|
||||
|
||||
def group_create(self, id: str, shard_id: int, name: str) -> bytes:
|
||||
@staticmethod
|
||||
def parse_uuid(uuid_val = None) -> str:
|
||||
if uuid_val is None:
|
||||
return str(uuid.uuid1())
|
||||
|
||||
return str(uuid.UUID(int=uuid_val))
|
||||
|
||||
def group_create(self, id: str, shard_id: int, name: str, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.GROUP_ADD])
|
||||
msg[self.GROUP_ID] = id
|
||||
msg[self.SHARD_ID] = shard_id
|
||||
msg[self.GROUP_NAME] = name
|
||||
msg[self.MSG_UUID] = str(uuid.uuid1())
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def group_delete(self, id: str) -> bytes:
|
||||
def group_delete(self, id: str, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.GROUP_DEL])
|
||||
msg[self.GROUP_ID] = id
|
||||
msg[self.MSG_UUID] = str(uuid.uuid1())
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def add_dev_to_group(self, id: str, mac_range: MacRange) -> bytes:
|
||||
def add_dev_to_group(self, id: str, mac_range: MacRange, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.DEV_TO_GROUP])
|
||||
msg[self.GROUP_ID] = id
|
||||
msg[self.DEV_LIST] = list(mac_range)
|
||||
msg[self.MSG_UUID] = str(uuid.uuid1())
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def remove_dev_from_group(self, id: str, mac_range: MacRange) -> bytes:
|
||||
def remove_dev_from_group(self, id: str, mac_range: MacRange, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.DEV_FROM_GROUP])
|
||||
msg[self.GROUP_ID] = id
|
||||
msg[self.DEV_LIST] = list(mac_range)
|
||||
msg[self.MSG_UUID] = str(uuid.uuid1())
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def to_device(self, id: str, mac: str, data, sequence: int = 0):
|
||||
def to_device(self, id: str, mac: str, data, sequence: int = 0, uuid_val: int = None):
|
||||
msg = copy.copy(self.templates[self.TO_DEVICE])
|
||||
msg[self.GROUP_ID] = id
|
||||
msg[self.MAC] = mac
|
||||
@@ -132,7 +142,8 @@ class Message:
|
||||
msg[self.DATA] = data
|
||||
else:
|
||||
msg[self.DATA] = {"data": data}
|
||||
msg[self.MSG_UUID] = str(uuid.uuid1(node=MacRange.mac2num(mac), clock_seq=sequence))
|
||||
#msg[self.MSG_UUID] = str(uuid.uuid1(node=MacRange.mac2num(mac), clock_seq=sequence))
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user