mirror of
https://github.com/outbackdingo/openlan-cgw.git
synced 2026-01-27 10:19:56 +00:00
Merge pull request #107 from Telecominfraproject/dev-enhance-nb-replys
Dev enhance nb replys
This commit is contained in:
@@ -526,21 +526,25 @@ pub struct AppArgs {
|
||||
/// Topomap featue status (enabled/disabled)
|
||||
pub feature_topomap_enabled: bool,
|
||||
|
||||
/// Utilize TLS connection with NB infrastructure (Redis, PostgreSQL)
|
||||
pub nb_infra_tls: bool,
|
||||
|
||||
/// CGW Websocket args
|
||||
pub wss_args: CGWWSSArgs,
|
||||
|
||||
/// CGW GRPC args
|
||||
pub grpc_args: CGWGRPCArgs,
|
||||
|
||||
/// CGW Kafka args
|
||||
pub kafka_args: CGWKafkaArgs,
|
||||
|
||||
/// CGW DB args
|
||||
pub db_args: CGWDBArgs,
|
||||
|
||||
/// CGW Redis args
|
||||
pub redis_args: CGWRedisArgs,
|
||||
|
||||
/// CGW Metrics args
|
||||
pub metrics_args: CGWMetricsArgs,
|
||||
|
||||
/// CGW Validation schema URI args
|
||||
pub validation_schema: CGWValidationSchemaArgs,
|
||||
}
|
||||
|
||||
@@ -632,7 +636,6 @@ impl AppArgs {
|
||||
log_level,
|
||||
cgw_id,
|
||||
feature_topomap_enabled,
|
||||
nb_infra_tls,
|
||||
wss_args,
|
||||
grpc_args,
|
||||
kafka_args,
|
||||
|
||||
@@ -79,19 +79,17 @@ pub struct CGWConnectionProcessor {
|
||||
cgw_server: Arc<CGWConnectionServer>,
|
||||
pub serial: MacAddress,
|
||||
pub addr: SocketAddr,
|
||||
pub idx: i64,
|
||||
pub group_id: i32,
|
||||
pub feature_topomap_enabled: bool,
|
||||
pub device_type: CGWDeviceType,
|
||||
}
|
||||
|
||||
impl CGWConnectionProcessor {
|
||||
pub fn new(server: Arc<CGWConnectionServer>, conn_idx: i64, addr: SocketAddr) -> Self {
|
||||
pub fn new(server: Arc<CGWConnectionServer>, addr: SocketAddr) -> Self {
|
||||
let conn_processor: CGWConnectionProcessor = CGWConnectionProcessor {
|
||||
cgw_server: server.clone(),
|
||||
serial: MacAddress::default(),
|
||||
addr,
|
||||
idx: conn_idx,
|
||||
group_id: 0,
|
||||
feature_topomap_enabled: server.feature_topomap_enabled,
|
||||
// Default to AP, it's safe, as later-on it will be changed
|
||||
|
||||
@@ -783,6 +783,19 @@ impl CGWConnectionServer {
|
||||
Some(val) => val,
|
||||
None => {
|
||||
warn!("Failed to parse recv msg with key {key}, discarded!");
|
||||
|
||||
if let Ok(resp) = cgw_construct_infra_enqueue_response(
|
||||
self.local_cgw_id,
|
||||
Uuid::default(),
|
||||
false,
|
||||
Some(format!("Failed to parse NB API message with key {key}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
|
||||
} else {
|
||||
error!("Failed to construct device_enqueue message!");
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -1529,6 +1542,18 @@ impl CGWConnectionServer {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if let Ok(resp) = cgw_construct_infra_enqueue_response(
|
||||
self.local_cgw_id,
|
||||
Uuid::default(),
|
||||
false,
|
||||
Some(format!("Failed to parse NB API message with key {key}")),
|
||||
local_shard_partition_key.clone(),
|
||||
) {
|
||||
self.enqueue_mbox_message_from_cgw_to_nb_api(-1, resp);
|
||||
} else {
|
||||
error!("Failed to construct device_enqueue message!");
|
||||
}
|
||||
|
||||
error!("Failed to parse msg from NBAPI (malformed?)!");
|
||||
continue;
|
||||
}
|
||||
@@ -1876,8 +1901,7 @@ impl CGWConnectionServer {
|
||||
self: Arc<Self>,
|
||||
socket: TcpStream,
|
||||
tls_acceptor: tokio_rustls::TlsAcceptor,
|
||||
addr: SocketAddr,
|
||||
conn_idx: i64,
|
||||
addr: SocketAddr
|
||||
) {
|
||||
// Only ACK connection. We will either drop it or accept it once processor starts
|
||||
// (we'll handle it via "mailbox" notify handle in process_internal_mbox)
|
||||
@@ -1900,7 +1924,7 @@ impl CGWConnectionServer {
|
||||
};
|
||||
|
||||
let allow_mismatch = server_clone.allow_mismatch;
|
||||
let conn_processor = CGWConnectionProcessor::new(server_clone, conn_idx, addr);
|
||||
let conn_processor = CGWConnectionProcessor::new(server_clone, addr);
|
||||
if let Err(e) = conn_processor
|
||||
.start(tls_stream, client_cn, allow_mismatch)
|
||||
.await
|
||||
|
||||
@@ -1115,6 +1115,8 @@ impl CGWRemoteDiscovery {
|
||||
"Failed to relay message! CGW{} seems to be unreachable at [{}:{}]! Error: {}",
|
||||
shard_id, cl.shard.server_host, cl.shard.server_port, e
|
||||
);
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
|
||||
@@ -293,7 +293,7 @@ async fn server_loop(app_core: Arc<AppCore>) -> Result<()> {
|
||||
|
||||
app_core_clone.conn_ack_runtime_handle.spawn(async move {
|
||||
cgw_server_clone
|
||||
.ack_connection(socket, tls_acceptor_clone, remote_addr, conn_idx)
|
||||
.ack_connection(socket, tls_acceptor_clone, remote_addr)
|
||||
.await;
|
||||
});
|
||||
|
||||
|
||||
@@ -216,10 +216,10 @@ def kafka_default_infra_group(test_context):
|
||||
@pytest.fixture(scope='function')
|
||||
def kafka_default_infra(test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Cannot create default infra: kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Cannot create default infra: kafka consumer is not connected to Kafka'
|
||||
|
||||
uuid_val = random.randint(1, 100)
|
||||
default_group = test_context.default_kafka_group()
|
||||
|
||||
@@ -14,10 +14,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_single_infra_group_add_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -113,10 +113,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_multiple_infra_group_add_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -223,10 +223,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_create_existing_infra_group(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -345,10 +345,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_remove_not_existing_infra_group(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -394,10 +394,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_single_infra_group_add_del_to_shard(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -493,10 +493,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_multiple_infra_group_add_del_to_shard(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -603,10 +603,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_single_infra_group_add_to_not_existing_shard(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -663,10 +663,10 @@ class TestCgwInfraGroup:
|
||||
"psql_probe")
|
||||
def test_infra_group_capacity_overflow(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
|
||||
@@ -16,10 +16,10 @@ class TestCgwInfra:
|
||||
"psql_probe")
|
||||
def test_single_infra_add_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -201,10 +201,10 @@ class TestCgwInfra:
|
||||
"redis_probe")
|
||||
def test_single_infra_add_not_existing_group(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -253,10 +253,10 @@ class TestCgwInfra:
|
||||
"redis_probe")
|
||||
def test_single_infra_del_not_existing_group(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -306,10 +306,10 @@ class TestCgwInfra:
|
||||
"psql_probe")
|
||||
def test_single_infra_del_existing_group_not_existing_infra(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -437,10 +437,10 @@ class TestCgwInfra:
|
||||
"psql_probe")
|
||||
def test_multiple_infras_add_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -635,10 +635,10 @@ class TestCgwInfra:
|
||||
"psql_probe")
|
||||
def test_infras_capacity_overflow(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
@@ -887,10 +887,10 @@ class TestCgwInfra:
|
||||
"psql_probe")
|
||||
def test_partial_infras_add(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'Cannot create default group: kafka producer is not connected to Kafka'
|
||||
f'Kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'Cannot create default group: kafka consumer is not connected to Kafka'
|
||||
f'Kafka consumer is not connected to Kafka'
|
||||
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
|
||||
200
tests/test_cgw_malformed_packets.py
Normal file
200
tests/test_cgw_malformed_packets.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import pytest
|
||||
import uuid
|
||||
|
||||
from kafka_producer.src.utils import MalformedMessage
|
||||
|
||||
class TestCgwMalformedPackets:
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infra_group_add(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
uuid_val = uuid.uuid4()
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.group_create("cgw_default_group_name", uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive create group result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infra group create passed while expected to be failed! Malformed packet was sent!')
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infra_group_add_to_shard(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
uuid_val = uuid.uuid4()
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
default_shard_id = test_context.default_shard_id()
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.group_create_to_shard(default_shard_id, "cgw_default_group_name", uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive create group result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infra group create passed while expected to be failed! Malformed packet was sent!')
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infra_group_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
uuid_val = uuid.uuid4()
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.group_delete(uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive delete group result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive delete group result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infra group delete passed while expected to be failed! Malformed packet was sent!')
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infras_add(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
uuid_val = uuid.uuid4()
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
infra_mac = "11-22-33-44-55-66"
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.add_dev_to_group([infra_mac], uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive infas add result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive infas add result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infras add passed while expected to be failed! Malformed packet was sent!')
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infras_del(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
uuid_val = uuid.uuid4()
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
infra_mac = "11-22-33-44-55-66"
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.remove_dev_from_group([infra_mac], uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive infas del result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive infas del result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infras del passed while expected to be failed! Malformed packet was sent!')
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("test_context",
|
||||
"cgw_probe",
|
||||
"kafka_probe",
|
||||
"redis_probe",
|
||||
"psql_probe")
|
||||
def test_malformed_infra_msg(self, test_context):
|
||||
assert test_context.kafka_producer.is_connected(),\
|
||||
f'kafka producer is not connected to Kafka'
|
||||
|
||||
assert test_context.kafka_consumer.is_connected(),\
|
||||
f'kafka consumer is not connected to Kafka'
|
||||
|
||||
expected_uuid = uuid.UUID('00000000-0000-0000-0000-000000000000')
|
||||
uuid_val = uuid.uuid4()
|
||||
group_id = 100
|
||||
|
||||
message = MalformedMessage()
|
||||
infra_mac = "11-22-33-44-55-66"
|
||||
|
||||
# Create single group
|
||||
test_context.kafka_producer.conn.send(test_context.default_producer_topic(), \
|
||||
message.to_device(infra_mac, uuid_val.int), bytes(str(group_id), encoding="utf-8"))
|
||||
ret_msg = test_context.kafka_consumer.get_result_msg(expected_uuid.int)
|
||||
if not ret_msg:
|
||||
print('Failed to receive infa message result, was expecting ' + str(uuid_val.int) + ' uuid reply')
|
||||
raise Exception('Failed to receive infa message result when expected')
|
||||
|
||||
assert (ret_msg.value['type'] == 'infrastructure_group_infra_message_enqueue_response')
|
||||
|
||||
if ret_msg.value['success'] is True:
|
||||
raise Exception('Infra message passed while expected to be failed! Malformed packet was sent!')
|
||||
@@ -0,0 +1,34 @@
|
||||
{
|
||||
"add_group": {
|
||||
"type": "infrastructure_group_create",
|
||||
"infra_name": "name",
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
|
||||
},
|
||||
"add_group_to_shard": {
|
||||
"type": "infrastructure_group_create_to_shard",
|
||||
"infra_name": "name",
|
||||
"infra_shard_id": 0,
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
|
||||
},
|
||||
"del_group": {
|
||||
"type": "infrastructure_group_delete",
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
|
||||
},
|
||||
"add_to_group": {
|
||||
"type": "infrastructure_group_infras_add",
|
||||
"infra_group_infras": [],
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
|
||||
},
|
||||
"del_from_group": {
|
||||
"type": "infrastructure_group_infras_del",
|
||||
"infra_group_infras": [],
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff"
|
||||
},
|
||||
"message_infra": {
|
||||
"type": "infrastructure_group_infra_message_enqueue",
|
||||
"mac": "mac",
|
||||
"msg": {},
|
||||
"uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff",
|
||||
"timeout": 60
|
||||
}
|
||||
}
|
||||
@@ -215,6 +215,74 @@ class Message:
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
|
||||
class MalformedMessage:
|
||||
TEMPLATE_FILE = "./kafka_data/malformed_message_template.json"
|
||||
GROUP_ADD = "add_group"
|
||||
GROUP_ADD_TO_SHARD = "add_group_to_shard"
|
||||
GROUP_DEL = "del_group"
|
||||
DEV_TO_GROUP = "add_to_group"
|
||||
DEV_FROM_GROUP = "del_from_group"
|
||||
TO_DEVICE = "message_infra"
|
||||
GROUP_NAME = "infra_name"
|
||||
SHARD_ID = "infra_shard_id"
|
||||
DEV_LIST = "infra_group_infras"
|
||||
MAC = "mac"
|
||||
DATA = "msg"
|
||||
MSG_UUID = "uuid"
|
||||
|
||||
def __init__(self) -> None:
|
||||
with open(self.TEMPLATE_FILE) as f:
|
||||
self.templates = json.loads(f.read())
|
||||
|
||||
@staticmethod
|
||||
def parse_uuid(uuid_val = None) -> str:
|
||||
if uuid_val is None:
|
||||
return str(uuid.uuid1())
|
||||
|
||||
return str(uuid.UUID(int=uuid_val))
|
||||
|
||||
def group_create(self, name: str, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.GROUP_ADD])
|
||||
msg[self.GROUP_NAME] = name
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def group_create_to_shard(self, shard_id: int, name: str, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.GROUP_ADD_TO_SHARD])
|
||||
msg[self.SHARD_ID] = shard_id
|
||||
msg[self.GROUP_NAME] = name
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def group_delete(self, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.GROUP_DEL])
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def add_dev_to_group(self, mac_range: MacRange, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.DEV_TO_GROUP])
|
||||
msg[self.DEV_LIST] = list(mac_range)
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def remove_dev_from_group(self, mac_range: MacRange, uuid_val: int = None) -> bytes:
|
||||
msg = copy.copy(self.templates[self.DEV_FROM_GROUP])
|
||||
msg[self.DEV_LIST] = list(mac_range)
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
def to_device(self, mac: str, data, uuid_val: int = None):
|
||||
msg = copy.copy(self.templates[self.TO_DEVICE])
|
||||
msg[self.MAC] = mac
|
||||
if type(data) is dict:
|
||||
msg[self.DATA] = data
|
||||
else:
|
||||
msg[self.DATA] = {"data": data}
|
||||
|
||||
msg[self.MSG_UUID] = Message.parse_uuid(uuid_val)
|
||||
return json.dumps(msg).encode('utf-8')
|
||||
|
||||
|
||||
@dataclass
|
||||
class Args:
|
||||
add_groups: List[Tuple[str, int, str]]
|
||||
|
||||
Reference in New Issue
Block a user