mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-01 19:37:54 +00:00
client_connectivity updates
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -17,6 +17,7 @@ import swagger_client
|
||||
from swagger_client import FirmwareManagementApi
|
||||
from swagger_client import EquipmentGatewayApi
|
||||
from bs4 import BeautifulSoup
|
||||
import threading
|
||||
|
||||
|
||||
class ConfigureController:
|
||||
@@ -75,31 +76,34 @@ class Controller(ConfigureController):
|
||||
if customer_id is None:
|
||||
self.customer_id = 2
|
||||
print("Setting to default Customer ID 2")
|
||||
|
||||
#
|
||||
# Setting the Controller Client Configuration
|
||||
self.select_controller_data(controller_data=controller_data)
|
||||
self.set_credentials(controller_data=controller_data)
|
||||
# self.configuration.refresh_api_key_hook = self.get_bearer_token
|
||||
self.configuration.refresh_api_key_hook = self.get_bearer_token
|
||||
|
||||
# Connecting to Controller
|
||||
self.api_client = swagger_client.ApiClient(self.configuration)
|
||||
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
|
||||
self.bearer = self.get_bearer_token()
|
||||
self.bearer = False
|
||||
self.disconnect = False
|
||||
self.semaphore = False
|
||||
try:
|
||||
self.bearer = self.get_bearer_token()
|
||||
# t1 = threading.Thread(target=self.refresh_instance)
|
||||
# t1.start()
|
||||
self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
|
||||
self.status_client = swagger_client.StatusApi(api_client=self.api_client)
|
||||
self.equipment_client = swagger_client.EquipmentApi(self.api_client)
|
||||
self.profile_client = swagger_client.ProfileApi(self.api_client)
|
||||
self.api_client.configuration.api_key_prefix = {
|
||||
"Authorization": "Bearer " + self.bearer._access_token
|
||||
}
|
||||
self.api_client.configuration.refresh_api_key_hook = self.refresh_instance
|
||||
except Exception as e:
|
||||
self.bearer = False
|
||||
print(e)
|
||||
|
||||
self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
|
||||
self.status_client = swagger_client.StatusApi(api_client=self.api_client)
|
||||
self.equipment_client = swagger_client.EquipmentApi(self.api_client)
|
||||
self.profile_client = swagger_client.ProfileApi(self.api_client)
|
||||
self.api_client.configuration.api_key_prefix = {
|
||||
"Authorization": "Bearer " + self.bearer._access_token
|
||||
}
|
||||
self.api_client.configuration.refresh_api_key_hook = self.refresh_instance
|
||||
self.ping_response = self.portal_ping()
|
||||
self.default_profiles = {}
|
||||
# print(self.bearer)
|
||||
if self.ping_response._application_name != 'PortalServer':
|
||||
print("Server not Reachable")
|
||||
exit()
|
||||
print("Connected to Controller Server")
|
||||
|
||||
def get_bearer_token(self):
|
||||
@@ -111,6 +115,15 @@ class Controller(ConfigureController):
|
||||
|
||||
def refresh_instance(self):
|
||||
# Connecting to Controller
|
||||
# while True:
|
||||
# print("Controller Refresh Thread Started")
|
||||
# for i in range(0, 800):
|
||||
# if self.disconnect:
|
||||
# break
|
||||
# time.sleep(1)
|
||||
# if self.disconnect:
|
||||
# break
|
||||
# self.semaphore = True
|
||||
self.api_client = swagger_client.ApiClient(self.configuration)
|
||||
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
|
||||
self.bearer = self.get_bearer_token()
|
||||
@@ -130,11 +143,13 @@ class Controller(ConfigureController):
|
||||
print("Server not Reachable")
|
||||
exit()
|
||||
print("Connected to Controller Server")
|
||||
# self.semaphore = False
|
||||
|
||||
def portal_ping(self):
|
||||
return self.login_client.portal_ping()
|
||||
|
||||
def disconnect_Controller(self):
|
||||
self.disconnect = True
|
||||
self.api_client.__del__()
|
||||
|
||||
# Returns a List of All the Equipments that are available in the cloud instances
|
||||
@@ -837,16 +852,21 @@ class FirmwareUtility(JFrogUtility):
|
||||
firmware_version = False
|
||||
print("firmware not available: ", firmware_version)
|
||||
return firmware_version
|
||||
controller = {
|
||||
'url': "https://wlan-portal-svc-nola-ext-03.cicd.lab.wlan.tip.build", # API base url for the controller
|
||||
'username': 'support@example.com',
|
||||
'password': 'support',
|
||||
'version': "1.1.0-SNAPSHOT",
|
||||
'commit_date': "2021-04-27"
|
||||
}
|
||||
api = Controller(controller_data=controller)
|
||||
for i in range(0, 2500):
|
||||
print(i)
|
||||
time.sleep(1)
|
||||
print(api.get_equipment_by_customer_id())
|
||||
api.disconnect_Controller()
|
||||
|
||||
# controller = {
|
||||
# 'url': "https://wlan-portal-svc-nola-ext-03.cicd.lab.wlan.tip.build", # API base url for the controller
|
||||
# 'username': 'support@example.com',
|
||||
# 'password': 'support',
|
||||
# 'version': "1.1.0-SNAPSHOT",
|
||||
# 'commit_date': "2021-04-27"
|
||||
# }
|
||||
# api = Controller(controller_data=controller)
|
||||
# # profile = ProfileUtility(sdk_client=api)
|
||||
# # profile.cleanup_profiles()
|
||||
# # # print(api.get_equipment_by_customer_id())
|
||||
# # #
|
||||
# for i in range(0, 300):
|
||||
# print(i)
|
||||
# time.sleep(1)
|
||||
# print(api.get_equipment_by_customer_id())
|
||||
# api.disconnect_Controller()
|
||||
|
||||
Reference in New Issue
Block a user