From e9d37d1b8c8bba9ffba9a19bd52bd838ad5bff5d Mon Sep 17 00:00:00 2001 From: Oleksandr Mazur Date: Thu, 12 Dec 2024 16:52:01 +0200 Subject: [PATCH] Initial formalization of API in form of YAML files Add initial list of YAML files that formalize Kafka API: - requests list that CGW can handle - responses that CGW will generate - unsolicited events that CGW might generate Also a small cleanup of requests and responses was made, to align it with a common format (renamed some of the fields, added missing etc). Tests are tweaked to accomodate for changed field names. Signed-off-by: Oleksandr Mazur --- api/cnc_api.yaml | 204 ++++++++++++++ api/cnc_res_api.yaml | 253 ++++++++++++++++++ api/events.yaml | 194 ++++++++++++++ src/cgw_connection_server.rs | 17 +- src/cgw_nb_api_listener.rs | 12 +- tests/conftest.py | 2 +- tests/test_cgw_infra_groups.py | 18 +- tests/test_cgw_infras.py | 10 +- tests/test_cgw_multi_instances.py | 2 +- .../malformed_message_template.json | 4 +- .../kafka_data/message_template.json | 4 +- utils/kafka_producer/src/producer.py | 18 +- utils/kafka_producer/src/utils.py | 8 +- 13 files changed, 700 insertions(+), 46 deletions(-) create mode 100644 api/cnc_api.yaml create mode 100644 api/cnc_res_api.yaml create mode 100644 api/events.yaml diff --git a/api/cnc_api.yaml b/api/cnc_api.yaml new file mode 100644 index 0000000..9d6a1cd --- /dev/null +++ b/api/cnc_api.yaml @@ -0,0 +1,204 @@ +--- +# Kafka 'CnC' (default) topic API list to interact with CGW infrastructure +# The following objects define the layout of messages one can push into +# 'CnC' topic. +# The result messages for each of the request can be found in the +# 'cnc_res_api.yaml' file. +infrastructure_group_create: + description: + Create a single infrastracture group and assign it to any available shard. + The decision of assignment will be made by whatever shard that is processing request. + Request generates corresponding 'infrastructure_group_create_response' response. + type: + type: string + enum: + - infrastructure_group_create + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +infrastructure_group_create_to_shard: + description: + Create a single infrastracture group and assign it to specific shard. + While group's being assigned to specific shard, the handling of this request + will be made by whatever shard that's received the request. + Request generates corresponding 'infrastructure_group_create_response' response. + (same as for 'infrastructure_group_create'). + type: + type: string + enum: + - infrastructure_group_create_to_shard + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + shard_id: + description: + Specific shard (CGW) id that this group should be assigned to. + If shard does not exist, request will fail. + type: integer + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +infrastructure_group_delete: + description: + Destroy previously created infrastructure group. + NOTE - also deassigns (if any) underlying assigned infras. + Also works if any assigned infra is already connected to CGW. + Request generates corresponding 'infrastructure_group_delete_response' response. + type: + type: string + enum: + - infrastructure_group_delete + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +infrastructure_group_infras_add: + description: + Assign list of infras to specified group. + Request generates corresponding 'infrastructure_group_infras_add_response' response. + type: + type: string + enum: + - infrastructure_group_infras_add + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + infra_group_infras: + description: + Array of infras (MAC address / mac serial, any form is accepted) that + should get assigned to specified infra group. + type: array + items: + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +infrastructure_group_infras_del: + description: + De-assign list of infras from specified group. + Any connected infras will become un-assigned and thus - unaddressable. + It's up to the caller to make sure to reassign them (if needed). + Request generates corresponding 'infrastructure_group_infras_del_response' response. + type: + type: string + enum: + - infrastructure_group_infras_del + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + infra_group_infras: + description: + Array of infras (MAC address / mac serial, any form is accepted) that + should get deassigned from specified infra group. + type: array + items: + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +infrastructure_group_infra_message_enqueue: + description: + Enqueue a uCentral request for the specified infra device. + This does not result immediate execution of the underlying request, + but rather caches the request in internal message queue, and the request + will get executed whenever device is ready to receive one. + Request generates corresponding 'infrastructure_group_infra_message_enqueue_response' response. + Whenever request get's completed (executed by the device), + the corresponding 'infra_request_result' is also generated. + type: + type: string + enum: + - infrastructure_group_infra_message_enqueue + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + type: string + infra_group_infra: + description: + MAC (serial) of the infra to sink message down to. + Must be a part of infra_group, either way request will fail + with corresponding fail message. + type: string + msg: + description: + Complete uCentral-formatted JSON document request to the uCentral device. + Shuld include method, ID. + type: object + timeout: + description: + Timeout value for how long the execution should take. + Whenever elapses, msg queue get's completely flushed + and the timeout messages - 'infra_request_result' with status 'failed' are generated. + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid + +rebalance_groups: + description: + Rebalance all infrastructure groups among all currently running/available + CGW shards. + Request generates corresponding 'rebalance_groups_response' response. + type: + type: string + enum: + - rebalance_groups + infra_group_id: + description: + The 'infra_group_id' while is represented as string, is actually a stringified digit. + The 'infra_group_id' while is required, can be set to any value, as it's currently omitted + and ignored. + type: string + uuid: + description: + The underlying unique identifier of the request. + The caller can expect a response with the same UUID value, + effectively matching a request with a response, due to + async nature of the Kafka bus. + type: string + format: uuid diff --git a/api/cnc_res_api.yaml b/api/cnc_res_api.yaml new file mode 100644 index 0000000..7ce228a --- /dev/null +++ b/api/cnc_res_api.yaml @@ -0,0 +1,253 @@ +--- +# Kafka 'CnC_Res' (default) topic API list that defines response messages format +# for each of the CnC request. +# The request messages for which these results are generated can be found in the +# 'cnc_api.yaml' file. +infrastructure_group_create_response: + description: + Response to corresponding 'infrastructure_group_create' + or 'infrastructure_group_create_to_shard' request(s). + type: + type: string + enum: + - infrastructure_group_create_response + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + description: + The underlying unique identifier of the request, to which + this response is being generated. + type: string + format: uuid + success: + type: boolean + error_message: + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + type: string + +infrastructure_group_delete_response: + description: + Response to corresponding 'infrastructure_group_delete' request. + type: + type: string + enum: + - infrastructure_group_delete_response + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + description: + The underlying unique identifier of the request, to which + this response is being generated. + type: string + format: uuid + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + +infrastructure_group_infras_add_response: + description: + Response to corresponding 'infrastructure_group_infras_add' request. + type: + type: string + enum: + - infrastructure_group_infras_add_response + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + type: string + format: uuid + description: + The underlying unique identifier of the request, to which + this response is being generated. + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + failed_infras: + description: + Array of infras (MAC address / mac serial, any form is accepted) that + should were not added (failed to) to specified infra group. + Potential cause - infra is a duplicate (already member of specified group), + or it is already a member of some other group and should be removed + from old group first. + type: array + items: + type: string + kafka_partition_key: + description: + CGW can return a special string value - kafka partition key, + that can be used by generating consecutive CnC request, + that will result in direct addressing of the shard that replied + to the original request. + It's an optimization technique to overcome the need of + using relaying mechanism all the time. + NOTE - this kafka key in replies _could_ be used by the callers, + but it's not required. It's optional. + Can be empty. + Can be present even if request failed. + type: string + +infrastructure_group_infras_del_response: + description: + Response to corresponding 'infrastructure_group_infras_del' request. + type: + type: string + enum: + - infrastructure_group_infras_del_response + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + type: string + format: uuid + description: + The underlying unique identifier of the request, to which + this response is being generated. + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + failed_infras: + description: + Array of infras (MAC address / mac serial, any form is accepted) that + should were not removed (failed to) from specified infra group. + Potential cause - infra is not member of specified group. + type: array + items: + type: string + kafka_partition_key: + description: + CGW can return a special string value - kafka partition key, + that can be used by generating consecutive CnC request, + that will result in direct addressing of the shard that replied + to the original request. + It's an optimization technique to overcome the need of + using relaying mechanism all the time. + NOTE - this kafka key in replies _could_ be used by the callers, + but it's not required. It's optional. + Can be empty. + Can be present even if request failed. + type: string + +infrastructure_group_infra_message_enqueue_response: + description: + Response to corresponding 'infrastructure_group_infra_message_enqueue' request. + type: + type: string + enum: + - infrastructure_group_infra_message_enqueue_response + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + type: string + format: uuid + description: + The underlying unique identifier of the request, to which + this response is being generated. + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + kafka_partition_key: + description: + CGW can return a special string value - kafka partition key, + that can be used by generating consecutive CnC request, + that will result in direct addressing of the shard that replied + to the original request. + It's an optimization technique to overcome the need of + using relaying mechanism all the time. + NOTE - this kafka key in replies _could_ be used by the callers, + but it's not required. It's optional. + Can be empty. + Can be present even if request failed. + type: string + +infra_request_result: + description: + Result of the underlying 'infrastructure_group_infra_message_enqueue' request execution. + This result is generated whenever underlying infra finishes and responds + to the request with status 'result' message that CGW handles internally. + type: + type: string + enum: + - infra_request_result + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + uuid: + type: string + format: uuid + description: + The underlying unique identifier of the request, to which + this response is being generated. + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). + +rebalance_groups_response: + description: + Response to corresponding 'rebalance_groups' request. + type: + type: string + enum: + - rebalance_groups_response + reporter_shard_id: + type: integer + description: + ID of the shard that handled request and generated this response. + infra_group_id: + type: integer + uuid: + type: string + format: uuid + description: + The underlying unique identifier of the request, to which + this response is being generated. + success: + type: boolean + error_message: + type: string + description: + Error message reporting why the request failed. + Non-empty only if 'success' is false (e.g. request failed). diff --git a/api/events.yaml b/api/events.yaml new file mode 100644 index 0000000..1462bfa --- /dev/null +++ b/api/events.yaml @@ -0,0 +1,194 @@ +--- +# List of events and messages that CGW can raise as a reaction to different +# events that happen within CGW (for example infra device connection). +infra_join: + description: + Event, that CGW generates whenever assigned infra successfully connects to CGW. + type: + type: string + enum: + - infra_join + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that successfully connected to CGW. + type: string + infra_public_ip: + description: + Peer address of the connected infra, as seen on the socket level + of the CGW. + type: string + +infra_leave: + description: + Event, that CGW generates whenever assigned infra diconnects from the CGW. + type: + type: string + enum: + - infra_leave + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that successfully connected to CGW. + type: string + +unassigned_infra_connection: + description: + Event, that CGW generates whenever un-assigned infra successfully connects to CGW. + type: + type: string + enum: + - unassigned_infra_connection + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + group_owner_shard_id: + description: + ID of the shard that is the actual owner of the infra group. + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that successfully connected to CGW. + type: string + +foreign_infra_connection: + description: + Event, that CGW generates whenever foreign (assigned, but connected + to the wrong CGW instance) infra successfully connects to CGW. + type: + type: string + enum: + - foreign_infra_connection + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that successfully connected to CGW. + type: string + +infrastructure_group_infra_capabilities_changed: + description: + Event, that CGW generates whenever CGW detects capabilities change + of the connected assigned infra. + type: + type: string + enum: + - infrastructure_group_infra_capabilities_changed + reporter_shard_id: + description: + ID of the shard that handled request and generated this response. + type: integer + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that successfully connected to CGW. + type: string + changes: + description: + List of detected delta- changes / diff in capabilities; + type: array + items: + type: object + properties: + changed: + description: + String-value representing value that changed + type: string + old: + type: string + new: + type: string + +ap_client_join: + description: + Event, that CGW generates whenever it detects topology + change - a new WiFi client connection. + type: + type: string + enum: + - ap_client_join + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that connected the WiFi ssid of the infra. + type: string + client: + description: + MAC (serial) of the infra client that joined. + type: string + ssid: + description: + SSID that the underlying infra client joined. + type: string + band: + description: + Band on which the underlying infra client joined. + type: string + +ap_client_leave: + description: + Event, that CGW generates whenever it detects topology + change - a WiFi client disconnect. + type: + type: string + enum: + - ap_client_leave + infra_group_id: + type: integer + infra_group_infra: + description: + MAC (serial) of the infra that disconnected the WiFi ssid of the infra. + type: string + client: + description: + MAC (serial) of the infra client that disconnected. + type: string + band: + description: + Band on which the underlying infra client disconnected. + type: string + +ap_client_migrate: + description: + Event, that CGW generates whenever it detects topology + change - existing WiFi client migrating from one infra to another + (wifi client connects to AP_1 while was connected to AP_0). + type: + type: string + enum: + - ap_client_migrate + infra_group_id: + type: integer + to_infra_group_infra_device: + description: + MAC (serial) of the destination infra to which the WiFi client is migrating to. + type: string + client: + description: + MAC (serial) of the infra client that joined. + type: string + to_ssid: + description: + Destination SSID that the underlying infra client is migrating to. + type: string + to_band: + description: + Destination band on which the underlying infra is migrating on. + type: string diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index 05ae12a..47f3ada 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -485,7 +485,7 @@ impl CGWConnectionServer { struct InfraGroupCreateToShard { r#type: String, infra_group_id: String, - infra_shard_id: i32, + shard_id: i32, uuid: Uuid, } #[derive(Debug, Serialize, Deserialize)] @@ -515,12 +515,19 @@ impl CGWConnectionServer { struct InfraGroupMsgJSON { r#type: String, infra_group_id: String, - mac: MacAddress, + infra_group_infra: MacAddress, msg: Map, uuid: Uuid, timeout: Option, } + #[derive(Debug, Serialize, Deserialize)] + struct RebalanceGroups { + r#type: String, + infra_group_id: String, + uuid: Uuid, + } + let map: Map = serde_json::from_str(pload).ok()?; let rc = map.get(&String::from("type"))?; @@ -544,7 +551,7 @@ impl CGWConnectionServer { json_msg.uuid, group_id, CGWNBApiParsedMsgType::InfrastructureGroupCreateToShard( - json_msg.infra_shard_id, + json_msg.shard_id, ), )); } @@ -583,14 +590,14 @@ impl CGWConnectionServer { json_msg.uuid, group_id, CGWNBApiParsedMsgType::InfrastructureGroupInfraMsg( - json_msg.mac, + json_msg.infra_group_infra, serde_json::to_string(&json_msg.msg).ok()?, json_msg.timeout, ), )); } "rebalance_groups" => { - let json_msg: InfraGroupMsgJSON = serde_json::from_str(pload).ok()?; + let json_msg: RebalanceGroups = serde_json::from_str(pload).ok()?; return Some(CGWNBApiParsedMsg::new( json_msg.uuid, group_id, diff --git a/src/cgw_nb_api_listener.rs b/src/cgw_nb_api_listener.rs index f0cffc3..b1eb984 100644 --- a/src/cgw_nb_api_listener.rs +++ b/src/cgw_nb_api_listener.rs @@ -138,7 +138,7 @@ pub struct APClientJoinMessage { pub r#type: &'static str, pub infra_group_id: i32, pub client: MacAddress, - pub infra_group_infra_device: MacAddress, + pub infra_group_infra: MacAddress, pub ssid: String, pub band: String, } @@ -148,7 +148,7 @@ pub struct APClientLeaveMessage { pub r#type: &'static str, pub infra_group_id: i32, pub client: MacAddress, - pub infra_group_infra_device: MacAddress, + pub infra_group_infra: MacAddress, pub band: String, } @@ -361,7 +361,7 @@ pub fn cgw_construct_foreign_infra_connection_msg( pub fn cgw_construct_client_join_msg( infra_group_id: i32, client: MacAddress, - infra_group_infra_device: MacAddress, + infra_group_infra: MacAddress, ssid: String, band: String, ) -> Result { @@ -369,7 +369,7 @@ pub fn cgw_construct_client_join_msg( r#type: "ap_client_join", infra_group_id, client, - infra_group_infra_device, + infra_group_infra, ssid, band, }; @@ -380,14 +380,14 @@ pub fn cgw_construct_client_join_msg( pub fn cgw_construct_client_leave_msg( infra_group_id: i32, client: MacAddress, - infra_group_infra_device: MacAddress, + infra_group_infra: MacAddress, band: String, ) -> Result { let client_join_msg = APClientLeaveMessage { r#type: "ap_client_leave", infra_group_id, client, - infra_group_infra_device, + infra_group_infra, band, }; diff --git a/tests/conftest.py b/tests/conftest.py index 707c5ba..9454239 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -189,7 +189,7 @@ def kafka_default_infra_group(test_context): default_group = test_context.default_kafka_group() default_shard_id = test_context.default_shard_id() - test_context.kafka_producer.handle_single_group_create_to_shard(default_group, default_shard_id, uuid_val) + test_context.kafka_producer.handle_single_group_create(default_group, uuid_val, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val) + ' uuid reply') diff --git a/tests/test_cgw_infra_groups.py b/tests/test_cgw_infra_groups.py index 78e1293..6bcd28b 100644 --- a/tests/test_cgw_infra_groups.py +++ b/tests/test_cgw_infra_groups.py @@ -34,7 +34,7 @@ class TestCgwInfraGroup: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -136,7 +136,7 @@ class TestCgwInfraGroup: group_id = (100 + group) # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -243,7 +243,7 @@ class TestCgwInfraGroup: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -281,7 +281,7 @@ class TestCgwInfraGroup: assert int(shard_info.get('assigned_groups_num')) == cgw_metrics_get_groups_assigned_num() == 1 # Try to create the same group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -414,7 +414,7 @@ class TestCgwInfraGroup: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -516,7 +516,7 @@ class TestCgwInfraGroup: group_id = (100 + group) # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -624,7 +624,7 @@ class TestCgwInfraGroup: shard_id = 2 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -689,7 +689,7 @@ class TestCgwInfraGroup: group_id = (100 + group) # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -732,7 +732,7 @@ class TestCgwInfraGroup: # Try to create additional group to simulate group capacity overflow group_to_fail_id = 2024 uuid_val = uuid.uuid4() - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_to_fail_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_to_fail_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') diff --git a/tests/test_cgw_infras.py b/tests/test_cgw_infras.py index 4eb32e1..6f44905 100644 --- a/tests/test_cgw_infras.py +++ b/tests/test_cgw_infras.py @@ -36,7 +36,7 @@ class TestCgwInfra: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -335,7 +335,7 @@ class TestCgwInfra: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -468,7 +468,7 @@ class TestCgwInfra: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -670,7 +670,7 @@ class TestCgwInfra: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') @@ -928,7 +928,7 @@ class TestCgwInfra: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') diff --git a/tests/test_cgw_multi_instances.py b/tests/test_cgw_multi_instances.py index 1eafbdc..34bfb41 100644 --- a/tests/test_cgw_multi_instances.py +++ b/tests/test_cgw_multi_instances.py @@ -59,7 +59,7 @@ class TestCgwMultiInstances: group_id = 100 # Create single group - test_context.kafka_producer.handle_single_group_create_to_shard(str(group_id), default_shard_id, uuid_val.int) + test_context.kafka_producer.handle_single_group_create(str(group_id), uuid_val.int, default_shard_id) ret_msg = test_context.kafka_consumer.get_result_msg(uuid_val.int) if not ret_msg: print('Failed to receive create group result, was expecting ' + str(uuid_val.int) + ' uuid reply') diff --git a/utils/kafka_producer/kafka_data/malformed_message_template.json b/utils/kafka_producer/kafka_data/malformed_message_template.json index 705f93b..4dd94b5 100644 --- a/utils/kafka_producer/kafka_data/malformed_message_template.json +++ b/utils/kafka_producer/kafka_data/malformed_message_template.json @@ -5,7 +5,7 @@ }, "add_group_to_shard": { "type": "infrastructure_group_create_to_shard", - "infra_shard_id": 0, + "shard_id": 0, "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, "del_group": { @@ -24,7 +24,7 @@ }, "message_infra": { "type": "infrastructure_group_infra_message_enqueue", - "mac": "mac", + "infra_group_infra": "mac", "msg": {}, "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff", "timeout": 60 diff --git a/utils/kafka_producer/kafka_data/message_template.json b/utils/kafka_producer/kafka_data/message_template.json index 66874d1..aaaa8a5 100644 --- a/utils/kafka_producer/kafka_data/message_template.json +++ b/utils/kafka_producer/kafka_data/message_template.json @@ -7,7 +7,7 @@ "add_group_to_shard": { "type": "infrastructure_group_create_to_shard", "infra_group_id": "key", - "infra_shard_id": 0, + "shard_id": 0, "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff" }, "del_group": { @@ -30,7 +30,7 @@ "message_infra": { "type": "infrastructure_group_infra_message_enqueue", "infra_group_id": "key", - "mac": "mac", + "infra_group_infra": "mac", "msg": {}, "uuid": "290d06b6-8eba-11ee-8005-aabbccddeeff", "timeout": 60 diff --git a/utils/kafka_producer/src/producer.py b/utils/kafka_producer/src/producer.py index 10de967..e1e1e66 100644 --- a/utils/kafka_producer/src/producer.py +++ b/utils/kafka_producer/src/producer.py @@ -161,20 +161,16 @@ class Producer: bytes(group, encoding="utf-8")) self.conn.flush() - def handle_single_group_create(self, group: str, uuid_val: int = None): + def handle_single_group_create(self, group: str, uuid_val: int = None, shard_id: int = None): if group is None: raise Exception('producer: Cannot create new group without group id specified!') - self.conn.send(self.topic, self.message.group_create(group, uuid_val), - bytes(group, encoding="utf-8")) - self.conn.flush() - - def handle_single_group_create_to_shard(self, group: str, shard_id: int, uuid_val: int = None): - if group is None: - raise Exception('producer: Cannot create new group without group id specified!') - - self.conn.send(self.topic, self.message.group_create_to_shard(group, shard_id, uuid_val), - bytes(group, encoding="utf-8")) + if shard_id is None: + self.conn.send(self.topic, self.message.group_create(group, uuid_val), + bytes(group, encoding="utf-8")) + else: + self.conn.send(self.topic, self.message.group_create_to_shard(group, shard_id, uuid_val), + bytes(group, encoding="utf-8")) self.conn.flush() def handle_group_creation(self, create: List[str], delete: List[str]) -> None: diff --git a/utils/kafka_producer/src/utils.py b/utils/kafka_producer/src/utils.py index 37d4288..fd2f700 100644 --- a/utils/kafka_producer/src/utils.py +++ b/utils/kafka_producer/src/utils.py @@ -135,9 +135,9 @@ class Message: DEV_FROM_GROUP = "del_from_group" TO_DEVICE = "message_infra" GROUP_ID = "infra_group_id" - SHARD_ID = "infra_shard_id" + SHARD_ID = "shard_id" DEV_LIST = "infra_group_infras" - MAC = "mac" + MAC = "infra_group_infra" DATA = "msg" MSG_UUID = "uuid" @@ -220,9 +220,9 @@ class MalformedMessage: DEV_TO_GROUP = "add_to_group" DEV_FROM_GROUP = "del_from_group" TO_DEVICE = "message_infra" - SHARD_ID = "infra_shard_id" + SHARD_ID = "shard_id" DEV_LIST = "infra_group_infras" - MAC = "mac" + MAC = "infra_group_infra" DATA = "msg" MSG_UUID = "uuid"