diff --git a/README.md b/README.md index 21757c6..efdd266 100644 --- a/README.md +++ b/README.md @@ -162,3 +162,14 @@ There are several environment variable to configure certificates path and names The infrastructure connectivity use root certs store - the directory with trusted certificates The environemt variable to configure certificates path: 1. CGW_NB_INFRA_CERTS_PATH - path to certificates located on host machine + +# Automated Testing +Automated python-based tests are located inside the *tests* directory. +Currently, tests should be run manually by changin PWD to *tests* and launching helper script *run.sh*: +```console +cd ./test +./run.sh +``` +*NOTE:* currently, tests are not running inside a container. +This means, that it's up to the caller make sure tests can communicate with whatever CGW's deployment as well as thirdparty services. +E.g. tests inside running *host* enviroment must be able to communicate with CGW, Redis, Kafka, PGSQL etc. diff --git a/run_cgw.sh b/run_cgw.sh index 7f9f2a8..846e7f6 100755 --- a/run_cgw.sh +++ b/run_cgw.sh @@ -142,6 +142,7 @@ echo "CGW UCENTRAL SWITCH DATAMODEL URI : $CGW_UCENTRAL_SWITCH_DATAMODEL_URI" docker run \ -p 15002:15002 \ -p 50051:50051 \ + -p 8080:8080 \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ -v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \ -v $CGW_NB_INFRA_CERTS_PATH:$CONTAINTER_NB_INFRA_CERTS_VOLUME \ diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9e99eff --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,168 @@ +import pytest +import ssl +import json +import time +from client_simulator.src.simulation_runner import Device as DeviceSimulator +from kafka_producer.src.producer import Producer as KafkaProducer +from kafka_producer.src.consumer import Consumer as KafkaConsumer +import requests +from typing import List, Tuple +import random + + +# Device connection, kafka wrappers etc +class TestContext: + @staticmethod + def default_dev_sim_mac() -> str: + return "02-00-00-00-00-00" + + @staticmethod + def default_kafka_group() -> str: + return '9999' + + def __init__(self): + device = DeviceSimulator( + mac=self.default_dev_sim_mac(), + server='wss://localhost:15002', + ca_cert='./ca-certs/ca.crt', + msg_interval=10, msg_size=1024, + client_cert='./certs/base.crt', client_key='./certs/base.key', check_cert=False, + start_event=None, stop_event=None) + + # Server cert CN? don't care, ignore + device.ssl_context.check_hostname = False + device.ssl_context.verify_mode = ssl.CERT_NONE + + # Tweak connect message to change initial FW version: + # Any latter steps might want to change it to something else + # (test capabilities change, for example); + # However, we're making a fixture, hence all values must be the same + # on the initial step. + connect_msg = json.loads(device.messages.connect) + connect_msg['params']['firmware'] = "Test_FW_A" + connect_msg['params']['uuid'] = 1 + device.messages.connect = json.dumps(connect_msg) + + self.device_sim = device + + producer = KafkaProducer(db='localhost:9092', topic='CnC') + consumer = KafkaConsumer(db='localhost:9092', topic='CnC_Res', consumer_timeout=3000) + + self.kafka_producer = producer + self.kafka_consumer = consumer + +@pytest.fixture(scope='function') +def test_context(): + ctx = TestContext() + + yield ctx + + ctx.device_sim.disconnect() + + # Let's make sure we destroy default group after we're done with tests. + if ctx.kafka_producer.is_connected(): + ctx.kafka_producer.handle_single_group_delete(ctx.default_kafka_group()) + + # We have to clear any messages after done working with kafka + if ctx.kafka_consumer.is_connected(): + ctx.kafka_consumer.flush() + + ctx.kafka_producer.disconnect() + ctx.kafka_consumer.disconnect() + +@pytest.fixture(scope='function') +def cgw_probe(test_context): + try: + r = requests.get("http://localhost:8080/health") + print("CGW status: " + str(r.status_code) + ', txt:' + r.text) + assert r is not None and r.status_code == 200, \ + f"CGW is in a bad state (health != 200), can't proceed" + except: + raise Exception('CGW health fetch failed (Not running?)') + +@pytest.fixture(scope='function') +def kafka_probe(test_context): + try: + test_context.kafka_producer.connect() + test_context.kafka_consumer.connect() + except: + raise Exception('Failed to connect to kafka broker! Either CnC, CnC_Res topics are unavailable, or broker is down (not running)') + + # Let's make sure default group is always deleted. + test_context.kafka_producer.handle_single_group_delete(test_context.default_kafka_group()) + + # We have to clear any messages before we can work with kafka + test_context.kafka_consumer.flush() + +@pytest.fixture(scope='function') +def device_sim_connect(test_context): + # Make sure we initiate connect; + # If this thing throws - any tests that depend on this ficture would fail. + test_context.device_sim.connect() + +@pytest.fixture(scope='function') +def device_sim_reconnect(test_context): + assert test_context.device_sim._socket is not None, \ + f"Expected websocket connection to execute a reconnect while socket is not connected!" + + time.sleep(1) + test_context.device_sim.disconnect() + assert test_context.device_sim._socket is None, \ + f"Expected websocket connection to be NULL after disconnect." + time.sleep(1) + + test_context.device_sim.connect() + assert test_context.device_sim._socket is not None, \ + f"Expected websocket connection NOT to be NULL after reconnect." + +@pytest.fixture(scope='function') +def device_sim_send_ucentral_connect(test_context): + assert test_context.device_sim._socket is not None, \ + f"Expected websocket connection to send a connect ucentral event while socket is not connected!" + + test_context.device_sim.send_hello(test_context.device_sim._socket) + + +@pytest.fixture(scope='function') +def kafka_default_infra_group(test_context): + assert test_context.kafka_producer.is_connected(),\ + f'Cannot create default group: kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'Cannot create default group: kafka consumer is not connected to Kafka' + + uuid_val = random.randint(1, 100) + default_group = test_context.default_kafka_group() + + test_context.kafka_producer.handle_single_group_create(test_context.default_kafka_group(), uuid_val) + ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val) + if not ret_msg: + print('Failed to receive create group result, was expecting ' + str(uuid_val) + ' uuid reply') + raise Exception('Failed to receive create group result when expected') + + if ret_msg.value['success'] is False: + print(ret_msg.value['error_message']) + raise Exception('Default infra group creation failed!') + + +@pytest.fixture(scope='function') +def kafka_default_infra(test_context): + assert test_context.kafka_producer.is_connected(),\ + f'Cannot create default group: kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'Cannot create default group: kafka consumer is not connected to Kafka' + + uuid_val = random.randint(1, 100) + default_group = test_context.default_kafka_group() + default_infra_mac = test_context.default_dev_sim_mac() + + test_context.kafka_producer.handle_single_device_assign(default_group, default_infra_mac, uuid_val) + ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val) + if ret_msg is None: + print('Failed to receive infra assign result, was expecting ' + str(uuid_val) + ' uuid reply') + raise Exception('Failed to receive infra assign result when expected') + + if ret_msg.value['success'] is False: + print(ret_msg.value['error_message']) + raise Exception('Default infra group creation failed!') diff --git a/tests/run.sh b/tests/run.sh new file mode 100755 index 0000000..e7d6344 --- /dev/null +++ b/tests/run.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# Separate exports for clearer visibility of _what exactly_ +# we're putting in python path +export PYTHONPATH="$PYTHONPATH:$PWD" +export PYTHONPATH="$PYTHONPATH:$PWD/../utils" + +ln -sf ../utils/client_simulator/sim_data sim_data +ln -sf ../utils/kafka_producer/kafka_data kafka_data +ln -sf ../utils/cert_generator/certs/client/ certs +ln -sf ../utils/cert_generator/certs/ca/ ca-certs + +pytest -v +#pytest -v -s . diff --git a/tests/test_cgw_basic.py b/tests/test_cgw_basic.py new file mode 100644 index 0000000..e0c5836 --- /dev/null +++ b/tests/test_cgw_basic.py @@ -0,0 +1,132 @@ +import pytest +import json +import random + +class TestCgwBasic: + # Base test: + # - test_context can be created - 'tests core' alloc / create + # - tests can connect to kafka broker + # - CGW is up + # - TODO: + # * redis is up + # * PGSQL is up + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe") + def test_basic_probe(self, test_context): + pass + + # Base test: + # - tests kafka client can create / receive messages through kafka bus + # - test infra group can be successfully created + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "kafka_default_infra_group") + def test_kafka_sanity(self, test_context): + pass + + # Base test: + # - test infra can be addded successfully to the default infra group + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "kafka_default_infra_group", + "kafka_default_infra") + def test_kafka_basic(self, test_context): + pass + + # Base test: + # - certificates can be found / Used + # - device sim can connect to CGW + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "device_sim_connect") + def test_device_sim_sanity(self, test_context): + pass + + # Base test: + # - device sim can send connect message to cgw + # - kafka client can verify (pull msgs from kafka bus) + # that simulator's indeed connected + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "device_sim_connect", + "device_sim_send_ucentral_connect") + def test_device_sim_base(self, test_context): + pass + + # Base test: + # - unassigned infra connects to CGW, and kafka sim can validate it + # through the msg + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "device_sim_connect", + "device_sim_send_ucentral_connect") + def test_unassigned_infra_base(self, test_context): + join_message_received = False + infra_is_unassigned = False + messages = test_context.kafka_consumer.get_msgs() + msg_mac = test_context.default_dev_sim_mac() + + assert messages,\ + f"Failed to receive any messages (events) from sim-device, while expected connect / infra_join" + + if not messages: + raise Exception('Failed to receive infra assign result when expected') + + # Expecting TWO messages to be present in the message list + for message in messages: + if message.value['type'] == 'infra_join' and message.key == b'0' and message.value['infra_group_infra'] == msg_mac: + join_message_received = True + continue + + if message.value['type'] == 'unassigned_infra_connection' and message.key == b'0' and message.value['infra_group_infra'] == msg_mac: + infra_is_unassigned = True + continue + + assert join_message_received,\ + f"Failed to find 'infra_join' message for default infra MAC" + + assert infra_is_unassigned,\ + f"Failed to find unassigned 'unassigned_infra_connection' message for default infra MAC" + + + # Base test: + # - assigned infra connects to CGW, and kafka sim can validate it + # through the msg + kafka key + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "kafka_default_infra_group", + "kafka_default_infra", + "device_sim_connect", + "device_sim_send_ucentral_connect") + def test_assigned_infra_base(self, test_context): + join_message_received = False + infra_is_assigned = False + messages = test_context.kafka_consumer.get_msgs() + msg_mac = test_context.default_dev_sim_mac() + default_group = test_context.default_kafka_group().encode('utf-8') + + assert messages,\ + f"Failed to receive any messages (events) from sim-device, while expected connect / infra_join" + + if not messages: + raise Exception('Failed to receive infra assign result when expected') + + # We can deduce whether infra's assigned by inspecting a single msg + for message in messages: + if message.value['type'] == 'infra_join' and message.value['infra_group_infra'] == msg_mac: + join_message_received = True + if message.key == default_group and str(message.value['infra_group_id']).encode('utf-8') == default_group: + infra_is_assigned = True + break + + assert join_message_received,\ + f"Failed to find 'infra_join' message for default infra MAC" + + assert infra_is_assigned,\ + f"While detected join message for default infra MAC, expected it to be assigned to group (key != default group id)" diff --git a/utils/client_simulator/runsingle b/utils/client_simulator/runsingle index 611bb15..eb13033 100755 --- a/utils/client_simulator/runsingle +++ b/utils/client_simulator/runsingle @@ -9,7 +9,7 @@ CA_CERT_PATH=./tipcerts CLIENT_CERT_PATH=$(pwd)/../cert_generator/certs/client CLIENT_CERT_PATH=./tipcerts #--no-cert-check -python3 single.py --mac "$MAC" \ +PYTHONPATH="$PYTHONPATH:$PWD:$PWD/src/" python3 single.py --mac "$MAC" \ --server "$URL" \ --ca-cert "$CA_CERT_PATH/ca.crt" \ --client-certs-path "$CLIENT_CERT_PATH" \ diff --git a/utils/client_simulator/data/message_templates.json b/utils/client_simulator/sim_data/message_templates.json similarity index 100% rename from utils/client_simulator/data/message_templates.json rename to utils/client_simulator/sim_data/message_templates.json diff --git a/utils/client_simulator/src/simulation_runner.py b/utils/client_simulator/src/simulation_runner.py index b131fe9..14ba19f 100644 --- a/utils/client_simulator/src/simulation_runner.py +++ b/utils/client_simulator/src/simulation_runner.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from utils import get_msg_templates, Args -from log import logger +from .utils import get_msg_templates, Args +from .log import logger from websockets.sync import client from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError, ConnectionClosed from websockets.frames import * @@ -20,6 +20,7 @@ import ssl import os import re + class Message: def __init__(self, mac: str, size: int): tmp_mac = copy.deepcopy(mac) diff --git a/utils/client_simulator/src/utils.py b/utils/client_simulator/src/utils.py index ee1f5fc..f2fa25a 100644 --- a/utils/client_simulator/src/utils.py +++ b/utils/client_simulator/src/utils.py @@ -7,7 +7,7 @@ import re import os -TEMPLATE_LOCATION = "./data/message_templates.json" +TEMPLATE_LOCATION = "./sim_data/message_templates.json" @dataclass diff --git a/utils/kafka_producer/data/message_template.json b/utils/kafka_producer/kafka_data/message_template.json similarity index 100% rename from utils/kafka_producer/data/message_template.json rename to utils/kafka_producer/kafka_data/message_template.json diff --git a/utils/kafka_producer/run.sh b/utils/kafka_producer/run.sh new file mode 100755 index 0000000..ea789f0 --- /dev/null +++ b/utils/kafka_producer/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +PYTHONPATH="$PYTHONPATH:$PWD:$PWD/src/" python3 main.py $@ diff --git a/utils/kafka_producer/single_group_add.sh b/utils/kafka_producer/single_group_add.sh new file mode 100755 index 0000000..5d0bd17 --- /dev/null +++ b/utils/kafka_producer/single_group_add.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Although we're looking for broker at localhost, +# broker can still direct us to some +# so it's up to the caller to either run this script inside +# the same network instance as broker, or create a static +# hostname entry to point , for example, +# to whenever it resides. +# +# ARGS: +# $1 - group id +./run.sh -s localhost:9092 -c 1 --new-group $1 0 some_name_0 diff --git a/utils/kafka_producer/single_group_del.sh b/utils/kafka_producer/single_group_del.sh new file mode 100755 index 0000000..c95a392 --- /dev/null +++ b/utils/kafka_producer/single_group_del.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Although we're looking for broker at localhost, +# broker can still direct us to some +# so it's up to the caller to either run this script inside +# the same network instance as broker, or create a static +# hostname entry to point , for example, +# to whenever it resides. +# +# ARGS: +# $1 - group id +./run.sh -s localhost:9092 --rm-group $1 diff --git a/utils/kafka_producer/single_infra_add.sh b/utils/kafka_producer/single_infra_add.sh new file mode 100755 index 0000000..b33de2a --- /dev/null +++ b/utils/kafka_producer/single_infra_add.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Although we're looking for broker at localhost, +# broker can still direct us to some +# so it's up to the caller to either run this script inside +# the same network instance as broker, or create a static +# hostname entry to point , for example, +# to whenever it resides. +# +# ARGS: +# $1 - group id +# $2 - mac address +./run.sh -s localhost:9092 --assign-to-group $1 "$2^1" diff --git a/utils/kafka_producer/single_infra_del.sh b/utils/kafka_producer/single_infra_del.sh new file mode 100755 index 0000000..f66625c --- /dev/null +++ b/utils/kafka_producer/single_infra_del.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Although we're looking for broker at localhost, +# broker can still direct us to some +# so it's up to the caller to either run this script inside +# the same network instance as broker, or create a static +# hostname entry to point , for example, +# to whenever it resides. +# +# ARGS: +# $1 - group id +# $2 - mac address +./run.sh -s localhost:9092 --remove-from-group $1 "$2^4" diff --git a/utils/kafka_producer/single_infra_msg.sh b/utils/kafka_producer/single_infra_msg.sh new file mode 100755 index 0000000..31e6f28 --- /dev/null +++ b/utils/kafka_producer/single_infra_msg.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +# Although we're looking for broker at localhost, +# broker can still direct us to some +# so it's up to the caller to either run this script inside +# the same network instance as broker, or create a static +# hostname entry to point , for example, +# to whenever it resides. +# +# ARGS: +# $1 - group id +# $2 - mac address +# $3 - file name containing complete uCentral request +./run.sh --send-to-group $1 --send-to-mac $2^1 -c 1 -m "`cat $3`" 2>/dev/null diff --git a/utils/kafka_producer/src/consumer.py b/utils/kafka_producer/src/consumer.py new file mode 100644 index 0000000..a669d98 --- /dev/null +++ b/utils/kafka_producer/src/consumer.py @@ -0,0 +1,98 @@ +from .utils import Message, MacRange +from .log import logger + +from typing import List, Tuple +from kafka.structs import OffsetAndMetadata +import kafka +import time +import uuid +import sys +import re +import json + + +class Consumer: + def __init__(self, db: str, topic: str, consumer_timeout: int) -> None: + self.db = db + self.conn = None + self.topic = topic + self.consumer_timeout = consumer_timeout + self.message = Message() + + def __enter__(self) -> kafka.KafkaConsumer: + return self.connect() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.disconnect() + + + def connect(self) -> kafka.KafkaConsumer: + if self.is_connected() is False: + self.conn = kafka.KafkaConsumer(self.topic, + bootstrap_servers=self.db, + client_id="consumer_1", + group_id="cgw_tests_consumer", + auto_offset_reset='latest', + enable_auto_commit=True, + consumer_timeout_ms=self.consumer_timeout, + value_deserializer=lambda m: json.loads(m.decode('utf-8'))) + logger.info("consumer: connected to kafka") + else: + logger.info("consumer: already connected to kafka") + return self.conn + + def disconnect(self) -> None: + if self.is_connected() is False: + return + self.conn.close() + logger.info("consumer: disconnected from kafka") + self.conn = None + + def is_connected(self) -> bool: + return self.conn is not None + + def flush(self): + assert self.is_connected(), \ + f"consumer: Cannot flush kafka topic while not connected!" + + for message in self.conn: + logger.debug("consumer: Flushed kafka msg: %s key=%s value=%s ts=%s" % + (message.topic,message.key, message.value, message.timestamp)) + + def get_msgs(self): + res_list = [] + + assert self.is_connected(),\ + f"consumer: Cannot get Kafka result msg, Not connected!" + + for message in self.conn: + res_list.append(message) + logger.debug("consumer: Recv kafka msg: %s key=%s value=%s ts=%s" % + (message.topic, message.key, message.value, message.timestamp)) + + return res_list + + def get_result_msg(self, uuid_val: int): + res_uuid = str(uuid.UUID(int=uuid_val)) + + assert self.is_connected(),\ + f"consumer: Cannot get Kafka result msg, Not connected!" + + for message in self.conn: + logger.debug("Flushed kafka msg: %s key=%s value=%s ts=%s" % + (message.topic, message.key, message.value, message.timestamp)) + if res_uuid == message.value['uuid']: + return message + + def get_msg_by_substring(self, substring: int): + res_uuid = uuid.UUID(int=uuid_val) + + assert self.is_connected(),\ + f"Cannot get Kafka result msg, Not connected!" + + for message in self.conn: + res_list.append(message) + if re.search(substring, message.value): + logger.debug("Found '%s' in kafka msg: %s key=%s value=%s ts=%s" % + (substring, message.topic, message.key, message.value, message.timestamp)) + return message diff --git a/utils/kafka_producer/src/producer.py b/utils/kafka_producer/src/producer.py index 83332b3..74805a9 100644 --- a/utils/kafka_producer/src/producer.py +++ b/utils/kafka_producer/src/producer.py @@ -6,6 +6,7 @@ import kafka import time import uuid import sys +import random class Producer: @@ -22,20 +23,40 @@ class Producer: self.disconnect() def connect(self) -> kafka.KafkaProducer: - if self.conn is None: + if self.is_connected() is False: self.conn = kafka.KafkaProducer(bootstrap_servers=self.db, client_id="producer") - logger.info("connected to kafka") + logger.info("producer: connected to kafka") else: - logger.info("already connected to kafka") + logger.info("producer: already connected to kafka") + raise Exception('') return self.conn def disconnect(self) -> None: - if self.conn is None: + if self.is_connected() is False: return self.conn.close() - logger.info("disconnected from kafka") + logger.info("producer: disconnected from kafka") self.conn = None + def is_connected(self) -> bool: + return self.conn is not None + + def handle_single_group_delete(self, group: str, uuid_val: int = None): + if group is None: + raise Exception('producer: Cannot destroy group without group_id specified!') + + self.conn.send(self.topic, self.message.group_delete(group, uuid_val), + bytes(group, encoding="utf-8")) + self.conn.flush() + + def handle_single_group_create(self, group: str, uuid_val: int = None): + if group is None: + raise Exception('producer: Cannot create new group without group id specified!') + + self.conn.send(self.topic, self.message.group_create(group, 0, "cgw_default_group_name", uuid_val), + bytes(group, encoding="utf-8")) + self.conn.flush() + def handle_group_creation(self, create: List[Tuple[str, int, str]], delete: List[str]) -> None: with self as conn: for group, shard_id, name in create: @@ -44,6 +65,33 @@ class Producer: for group in delete: conn.send(self.topic, self.message.group_delete(group), bytes(group, encoding="utf-8")) + conn.flush() + + def handle_single_device_assign(self, group: str, mac: str, uuid_val: int): + if group is None: + raise Exception('producer: Cannot assign infra to group without group id specified!') + + if mac is None: + raise Exception('producer: Cannot assign infra to group without infra MAC specified!') + + mac_range = MacRange(mac) + + self.conn.send(self.topic, self.message.add_dev_to_group(group, mac_range, uuid_val), + bytes(group, encoding="utf-8")) + self.conn.flush() + + def handle_single_device_deassign(self, group: str, mac: str): + if group is None: + raise Exception('Cannot deassign infra from group without group id specified!') + + if mac is None: + raise Exception('Cannot deassign infra from group without infra MAC specified!') + + mac_range = MacRange(mac) + + self.conn.send(self.topic, self.message.remove_dev_from_group(group, mac_range), + bytes(group, encoding="utf-8")) + conn.flush() def handle_device_assignment(self, add: List[Tuple[str, MacRange]], remove: List[Tuple[str, MacRange]]) -> None: with self as conn: @@ -54,6 +102,7 @@ class Producer: for group, mac_range in remove: conn.send(self.topic, self.message.remove_dev_from_group(group, mac_range), bytes(group, encoding="utf-8")) + conn.flush() def handle_device_messages(self, message: dict, group: str, mac_range: MacRange, count: int, time_s: int, interval_s: int) -> None: @@ -69,6 +118,7 @@ class Producer: for mac in mac_range: conn.send(self.topic, self.message.to_device(group, mac, message, seq), bytes(group, encoding="utf-8")) + conn.flush() #time.sleep(interval_s) #if time.time() > end: # break diff --git a/utils/kafka_producer/src/utils.py b/utils/kafka_producer/src/utils.py index 6d88357..7fce700 100644 --- a/utils/kafka_producer/src/utils.py +++ b/utils/kafka_producer/src/utils.py @@ -1,6 +1,5 @@ from dataclasses import dataclass from typing import List, Tuple -from typing import Tuple import copy import json import uuid @@ -23,6 +22,8 @@ class MacRange: Raises ValueError """ def __init__(self, input: str = "XX:XX:XX:XX:XX:XX") -> None: + input = input.replace("-", ":", 5) + self.__base_as_num, self.__len = self.__parse_input(input.upper()) self.__idx = 0 @@ -53,7 +54,9 @@ class MacRange: @staticmethod def mac2num(mac: str) -> int: - return int(mac.replace(":", ""), base=16) + mac = mac.replace(":", "", 5) + mac = mac.replace("-", "", 5) + return int(mac, base=16) @staticmethod def num2mac(mac: int) -> str: @@ -78,7 +81,7 @@ class MacRange: class Message: - TEMPLATE_FILE = "./data/message_template.json" + TEMPLATE_FILE = "./kafka_data/message_template.json" GROUP_ADD = "add_group" GROUP_DEL = "del_group" DEV_TO_GROUP = "add_to_group" @@ -96,35 +99,42 @@ class Message: with open(self.TEMPLATE_FILE) as f: self.templates = json.loads(f.read()) - def group_create(self, id: str, shard_id: int, name: str) -> bytes: + @staticmethod + def parse_uuid(uuid_val = None) -> str: + if uuid_val is None: + return str(uuid.uuid1()) + + return str(uuid.UUID(int=uuid_val)) + + def group_create(self, id: str, shard_id: int, name: str, uuid_val: int = None) -> bytes: msg = copy.copy(self.templates[self.GROUP_ADD]) msg[self.GROUP_ID] = id msg[self.SHARD_ID] = shard_id msg[self.GROUP_NAME] = name - msg[self.MSG_UUID] = str(uuid.uuid1()) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) return json.dumps(msg).encode('utf-8') - def group_delete(self, id: str) -> bytes: + def group_delete(self, id: str, uuid_val: int = None) -> bytes: msg = copy.copy(self.templates[self.GROUP_DEL]) msg[self.GROUP_ID] = id - msg[self.MSG_UUID] = str(uuid.uuid1()) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) return json.dumps(msg).encode('utf-8') - def add_dev_to_group(self, id: str, mac_range: MacRange) -> bytes: + def add_dev_to_group(self, id: str, mac_range: MacRange, uuid_val: int = None) -> bytes: msg = copy.copy(self.templates[self.DEV_TO_GROUP]) msg[self.GROUP_ID] = id msg[self.DEV_LIST] = list(mac_range) - msg[self.MSG_UUID] = str(uuid.uuid1()) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) return json.dumps(msg).encode('utf-8') - def remove_dev_from_group(self, id: str, mac_range: MacRange) -> bytes: + def remove_dev_from_group(self, id: str, mac_range: MacRange, uuid_val: int = None) -> bytes: msg = copy.copy(self.templates[self.DEV_FROM_GROUP]) msg[self.GROUP_ID] = id msg[self.DEV_LIST] = list(mac_range) - msg[self.MSG_UUID] = str(uuid.uuid1()) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) return json.dumps(msg).encode('utf-8') - def to_device(self, id: str, mac: str, data, sequence: int = 0): + def to_device(self, id: str, mac: str, data, sequence: int = 0, uuid_val: int = None): msg = copy.copy(self.templates[self.TO_DEVICE]) msg[self.GROUP_ID] = id msg[self.MAC] = mac @@ -132,7 +142,8 @@ class Message: msg[self.DATA] = data else: msg[self.DATA] = {"data": data} - msg[self.MSG_UUID] = str(uuid.uuid1(node=MacRange.mac2num(mac), clock_seq=sequence)) + #msg[self.MSG_UUID] = str(uuid.uuid1(node=MacRange.mac2num(mac), clock_seq=sequence)) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) return json.dumps(msg).encode('utf-8')