From 52f29e5f4911a9b4b3c9149f276043503ac66164 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 12:37:50 +0200 Subject: [PATCH 1/6] Return error if msg relay failed --- src/cgw_remote_discovery.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 5399d9d..8d4fe60 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -1115,6 +1115,8 @@ impl CGWRemoteDiscovery { "Failed to relay message! CGW{} seems to be unreachable at [{}:{}]! Error: {}", shard_id, cl.shard.server_host, cl.shard.server_port, e ); + + return Err(e); } return Ok(()); From b5ce07669ab164fa725e1a145a0a491b7072ed07 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 12:38:18 +0200 Subject: [PATCH 2/6] Removed unused variable from app args --- src/cgw_app_args.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/cgw_app_args.rs b/src/cgw_app_args.rs index 4f8004d..7ecc0e9 100644 --- a/src/cgw_app_args.rs +++ b/src/cgw_app_args.rs @@ -526,21 +526,25 @@ pub struct AppArgs { /// Topomap featue status (enabled/disabled) pub feature_topomap_enabled: bool, - /// Utilize TLS connection with NB infrastructure (Redis, PostgreSQL) - pub nb_infra_tls: bool, - + /// CGW Websocket args pub wss_args: CGWWSSArgs, + /// CGW GRPC args pub grpc_args: CGWGRPCArgs, + /// CGW Kafka args pub kafka_args: CGWKafkaArgs, + /// CGW DB args pub db_args: CGWDBArgs, + /// CGW Redis args pub redis_args: CGWRedisArgs, + /// CGW Metrics args pub metrics_args: CGWMetricsArgs, + /// CGW Validation schema URI args pub validation_schema: CGWValidationSchemaArgs, } @@ -632,7 +636,6 @@ impl AppArgs { log_level, cgw_id, feature_topomap_enabled, - nb_infra_tls, wss_args, grpc_args, kafka_args, From be93ba2d631ac564c9a882e9f42d96a3666726b2 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 12:39:10 +0200 Subject: [PATCH 3/6] Removed usused conn index param --- src/cgw_connection_processor.rs | 4 +--- src/cgw_connection_server.rs | 5 ++--- src/main.rs | 2 +- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/cgw_connection_processor.rs b/src/cgw_connection_processor.rs index 90feab9..24e3a8a 100644 --- a/src/cgw_connection_processor.rs +++ b/src/cgw_connection_processor.rs @@ -79,19 +79,17 @@ pub struct CGWConnectionProcessor { cgw_server: Arc, pub serial: MacAddress, pub addr: SocketAddr, - pub idx: i64, pub group_id: i32, pub feature_topomap_enabled: bool, pub device_type: CGWDeviceType, } impl CGWConnectionProcessor { - pub fn new(server: Arc, conn_idx: i64, addr: SocketAddr) -> Self { + pub fn new(server: Arc, addr: SocketAddr) -> Self { let conn_processor: CGWConnectionProcessor = CGWConnectionProcessor { cgw_server: server.clone(), serial: MacAddress::default(), addr, - idx: conn_idx, group_id: 0, feature_topomap_enabled: server.feature_topomap_enabled, // Default to AP, it's safe, as later-on it will be changed diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index b5eac46..6fbc27f 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -1876,8 +1876,7 @@ impl CGWConnectionServer { self: Arc, socket: TcpStream, tls_acceptor: tokio_rustls::TlsAcceptor, - addr: SocketAddr, - conn_idx: i64, + addr: SocketAddr ) { // Only ACK connection. We will either drop it or accept it once processor starts // (we'll handle it via "mailbox" notify handle in process_internal_mbox) @@ -1900,7 +1899,7 @@ impl CGWConnectionServer { }; let allow_mismatch = server_clone.allow_mismatch; - let conn_processor = CGWConnectionProcessor::new(server_clone, conn_idx, addr); + let conn_processor = CGWConnectionProcessor::new(server_clone, addr); if let Err(e) = conn_processor .start(tls_stream, client_cn, allow_mismatch) .await diff --git a/src/main.rs b/src/main.rs index 3c6c0f3..933ebf8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -293,7 +293,7 @@ async fn server_loop(app_core: Arc) -> Result<()> { app_core_clone.conn_ack_runtime_handle.spawn(async move { cgw_server_clone - .ack_connection(socket, tls_acceptor_clone, remote_addr, conn_idx) + .ack_connection(socket, tls_acceptor_clone, remote_addr) .await; }); From da93161530b252b63c9f9e9b1523e2cc5ff2f46b Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 13:15:44 +0200 Subject: [PATCH 4/6] Send infra enqueue response to NB if failed to parse msg --- src/cgw_connection_server.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 6fbc27f..9442ecc 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -783,6 +783,19 @@ impl CGWConnectionServer { Some(val) => val, None => { warn!("Failed to parse recv msg with key {key}, discarded!"); + + if let Ok(resp) = cgw_construct_infra_enqueue_response( + self.local_cgw_id, + Uuid::default(), + false, + Some(format!("Failed to parse NB API message with key {key}")), + local_shard_partition_key.clone(), + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp); + } else { + error!("Failed to construct device_enqueue message!"); + } + continue; } }; @@ -1529,6 +1542,18 @@ impl CGWConnectionServer { } } } else { + if let Ok(resp) = cgw_construct_infra_enqueue_response( + self.local_cgw_id, + Uuid::default(), + false, + Some(format!("Failed to parse NB API message with key {key}")), + local_shard_partition_key.clone(), + ) { + self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp); + } else { + error!("Failed to construct device_enqueue message!"); + } + error!("Failed to parse msg from NBAPI (malformed?)!"); continue; } From 09e92207e98346adb26d5766ee1ae7a18658f7a4 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 14:00:29 +0200 Subject: [PATCH 5/6] Added malformed packets basic tests --- tests/test_cgw_malformed_packets.py | 200 ++++++++++++++++++ .../malformed_message_template.json | 34 +++ utils/kafka_producer/src/utils.py | 68 ++++++ 3 files changed, 302 insertions(+) create mode 100644 tests/test_cgw_malformed_packets.py create mode 100644 utils/kafka_producer/kafka_data/malformed_message_template.json diff --git a/tests/test_cgw_malformed_packets.py b/tests/test_cgw_malformed_packets.py new file mode 100644 index 0000000..dec914f --- /dev/null +++ b/tests/test_cgw_malformed_packets.py @@ -0,0 +1,200 @@ +import pytest +import uuid + +from kafka_producer.src.utils import MalformedMessage + +class TestCgwMalformedPackets: + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infra_group_add(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + uuid_val = uuid.uuid4() + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + group_id = 100 + + message = MalformedMessage() + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.group_create("cgw_default_group_name", uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive create group result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infra group create passed while expected to be failed! Malformed packet was sent!') + + + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infra_group_add_to_shard(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + uuid_val = uuid.uuid4() + group_id = 100 + + message = MalformedMessage() + default_shard_id = test_context.default_shard_id() + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.group_create_to_shard(default_shard_id, "cgw_default_group_name", uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive create group result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infra group create passed while expected to be failed! Malformed packet was sent!') + + + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infra_group_del(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + uuid_val = uuid.uuid4() + group_id = 100 + + message = MalformedMessage() + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.group_delete(uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive delete group result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive delete group result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infra group delete passed while expected to be failed! Malformed packet was sent!') + + + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infras_add(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + uuid_val = uuid.uuid4() + group_id = 100 + + message = MalformedMessage() + infra_mac = "11-22-33-44-55-66" + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.add_dev_to_group([infra_mac], uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive infas add result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive infas add result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infras add passed while expected to be failed! Malformed packet was sent!') + + + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infras_del(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + uuid_val = uuid.uuid4() + group_id = 100 + + message = MalformedMessage() + infra_mac = "11-22-33-44-55-66" + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.remove_dev_from_group([infra_mac], uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive infas del result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive infas del result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infras del passed while expected to be failed! Malformed packet was sent!') + + + @pytest.mark.usefixtures("test_context", + "cgw_probe", + "kafka_probe", + "redis_probe", + "psql_probe") + def test_malformed_infra_msg(self, test_context): + assert test_context.kafka_producer.is_connected(),\ + f'kafka producer is not connected to Kafka' + + assert test_context.kafka_consumer.is_connected(),\ + f'kafka consumer is not connected to Kafka' + + expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000') + uuid_val = uuid.uuid4() + group_id = 100 + + message = MalformedMessage() + infra_mac = "11-22-33-44-55-66" + + # Create single group + test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \ + message.to_device(infra_mac, uuid_val.int), bytes(str(group_id), encoding="utf-8")) + ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int) + if not ret_msg: + print('Failed to receive infa message result, was expecting ' + str(uuid_val.int) + ' uuid reply') + raise Exception('Failed to receive infa message result when expected') + + assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response') + + if ret_msg.value['success'] is True: + raise Exception('Infra message passed while expected to be failed! Malformed packet was sent!') diff --git a/utils/kafka_producer/kafka_data/malformed_message_template.json b/utils/kafka_producer/kafka_data/malformed_message_template.json new file mode 100644 index 0000000..7d47b3a --- /dev/null +++ b/utils/kafka_producer/kafka_data/malformed_message_template.json @@ -0,0 +1,34 @@ +{ + "add_group": { + "type": "infrastructure_group_create", + "infra_name": "name", + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" + }, + "add_group_to_shard": { + "type": "infrastructure_group_create_to_shard", + "infra_name": "name", + "infra_shard_id": 0, + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" + }, + "del_group": { + "type": "infrastructure_group_delete", + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" + }, + "add_to_group": { + "type": "infrastructure_group_infras_add", + "infra_group_infras": [], + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" + }, + "del_from_group": { + "type": "infrastructure_group_infras_del", + "infra_group_infras": [], + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" + }, + "message_infra": { + "type": "infrastructure_group_infra_message_enqueue", + "mac": "mac", + "msg": {}, + "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff", + "timeout": 60 + } +} \ No newline at end of file diff --git a/utils/kafka_producer/src/utils.py b/utils/kafka_producer/src/utils.py index 5111a24..3f56aa2 100644 --- a/utils/kafka_producer/src/utils.py +++ b/utils/kafka_producer/src/utils.py @@ -215,6 +215,74 @@ class Message: return json.dumps(msg).encode('utf-8') +class MalformedMessage: + TEMPLATE_FILE = "./kafka_data/malformed_message_template.json" + GROUP_ADD = "add_group" + GROUP_ADD_TO_SHARD = "add_group_to_shard" + GROUP_DEL = "del_group" + DEV_TO_GROUP = "add_to_group" + DEV_FROM_GROUP = "del_from_group" + TO_DEVICE = "message_infra" + GROUP_NAME = "infra_name" + SHARD_ID = "infra_shard_id" + DEV_LIST = "infra_group_infras" + MAC = "mac" + DATA = "msg" + MSG_UUID = "uuid" + + def __init__(self) -> None: + with open(self.TEMPLATE_FILE) as f: + self.templates = json.loads(f.read()) + + @staticmethod + def parse_uuid(uuid_val = None) -> str: + if uuid_val is None: + return str(uuid.uuid1()) + + return str(uuid.UUID(int=uuid_val)) + + def group_create(self, name: str, uuid_val: int = None) -> bytes: + msg = copy.copy(self.templates[self.GROUP_ADD]) + msg[self.GROUP_NAME] = name + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + def group_create_to_shard(self, shard_id: int, name: str, uuid_val: int = None) -> bytes: + msg = copy.copy(self.templates[self.GROUP_ADD_TO_SHARD]) + msg[self.SHARD_ID] = shard_id + msg[self.GROUP_NAME] = name + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + def group_delete(self, uuid_val: int = None) -> bytes: + msg = copy.copy(self.templates[self.GROUP_DEL]) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + def add_dev_to_group(self, mac_range: MacRange, uuid_val: int = None) -> bytes: + msg = copy.copy(self.templates[self.DEV_TO_GROUP]) + msg[self.DEV_LIST] = list(mac_range) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + def remove_dev_from_group(self, mac_range: MacRange, uuid_val: int = None) -> bytes: + msg = copy.copy(self.templates[self.DEV_FROM_GROUP]) + msg[self.DEV_LIST] = list(mac_range) + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + def to_device(self, mac: str, data, uuid_val: int = None): + msg = copy.copy(self.templates[self.TO_DEVICE]) + msg[self.MAC] = mac + if type(data) is dict: + msg[self.DATA] = data + else: + msg[self.DATA] = {"data": data} + + msg[self.MSG_UUID] = Message.parse_uuid(uuid_val) + return json.dumps(msg).encode('utf-8') + + @dataclass class Args: add_groups: List[Tuple[str, int, str]] From 840b9a970b0a4c3344538643a6c02147f215c07d Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Tue, 10 Dec 2024 15:40:27 +0200 Subject: [PATCH 6/6] Fixed tests print messages --- tests/conftest.py | 4 ++-- tests/test_cgw_infra_groups.py | 32 ++++++++++++++++---------------- tests/test_cgw_infras.py | 28 ++++++++++++++-------------- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f4eca5b..707c5ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -216,10 +216,10 @@ def kafka_default_infra_group(test_context): @pytest.fixture(scope='function') def kafka_default_infra(test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Cannot create default infra: kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Cannot create default infra: kafka consumer is not connected to Kafka' uuid_val = random.randint(1, 100) default_group = test_context.default_kafka_group() diff --git a/tests/test_cgw_infra_groups.py b/tests/test_cgw_infra_groups.py index b1dd278..78e1293 100644 --- a/tests/test_cgw_infra_groups.py +++ b/tests/test_cgw_infra_groups.py @@ -14,10 +14,10 @@ class TestCgwInfraGroup: "psql_probe") def test_single_infra_group_add_del(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -113,10 +113,10 @@ class TestCgwInfraGroup: "psql_probe") def test_multiple_infra_group_add_del(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -223,10 +223,10 @@ class TestCgwInfraGroup: "psql_probe") def test_create_existing_infra_group(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -345,10 +345,10 @@ class TestCgwInfraGroup: "psql_probe") def test_remove_not_existing_infra_group(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -394,10 +394,10 @@ class TestCgwInfraGroup: "psql_probe") def test_single_infra_group_add_del_to_shard(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -493,10 +493,10 @@ class TestCgwInfraGroup: "psql_probe") def test_multiple_infra_group_add_del_to_shard(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -603,10 +603,10 @@ class TestCgwInfraGroup: "psql_probe") def test_single_infra_group_add_to_not_existing_shard(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -663,10 +663,10 @@ class TestCgwInfraGroup: "psql_probe") def test_infra_group_capacity_overflow(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() diff --git a/tests/test_cgw_infras.py b/tests/test_cgw_infras.py index b4ec1aa..9443ff0 100644 --- a/tests/test_cgw_infras.py +++ b/tests/test_cgw_infras.py @@ -16,10 +16,10 @@ class TestCgwInfra: "psql_probe") def test_single_infra_add_del(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -201,10 +201,10 @@ class TestCgwInfra: "redis_probe") def test_single_infra_add_not_existing_group(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -253,10 +253,10 @@ class TestCgwInfra: "redis_probe") def test_single_infra_del_not_existing_group(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -306,10 +306,10 @@ class TestCgwInfra: "psql_probe") def test_single_infra_del_existing_group_not_existing_infra(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -437,10 +437,10 @@ class TestCgwInfra: "psql_probe") def test_multiple_infras_add_del(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -635,10 +635,10 @@ class TestCgwInfra: "psql_probe") def test_infras_capacity_overflow(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id() @@ -887,10 +887,10 @@ class TestCgwInfra: "psql_probe") def test_partial_infras_add(self, test_context): assert test_context.kafka_producer.is_connected(),\ - f'Cannot create default group: kafka producer is not connected to Kafka' + f'Kafka producer is not connected to Kafka' assert test_context.kafka_consumer.is_connected(),\ - f'Cannot create default group: kafka consumer is not connected to Kafka' + f'Kafka consumer is not connected to Kafka' default_shard_id = test_context.default_shard_id()