mirror of
https://github.com/Telecominfraproject/wlan-testing.git
synced 2025-11-02 03:48:09 +00:00
token timestamp fix
Signed-off-by: shivamcandela <shivam.thakur@candelatech.com>
This commit is contained in:
@@ -87,7 +87,9 @@ class Controller(ConfigureController):
|
||||
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
|
||||
self.bearer = False
|
||||
self.disconnect = False
|
||||
self.start_time = time.time()
|
||||
# Token expiry in seconds
|
||||
self.token_expiry = 1000
|
||||
self.token_timestamp = time.time()
|
||||
try:
|
||||
|
||||
self.bearer = self.get_bearer_token()
|
||||
@@ -115,11 +117,12 @@ class Controller(ConfigureController):
|
||||
return self.login_client.get_access_token(request_body)
|
||||
|
||||
def refresh_instance(self):
|
||||
if (time.time() - self.start_time) >= 500:
|
||||
# Refresh token 10 seconds before it's expiry
|
||||
if time.time() - self.token_timestamp < (self.token_expiry-10):
|
||||
print("Refreshing the controller API token")
|
||||
self.api_client = swagger_client.ApiClient(self.configuration)
|
||||
self.login_client = swagger_client.LoginApi(api_client=self.api_client)
|
||||
self.bearer = self.get_bearer_token()
|
||||
self.start_time = time.time()
|
||||
self.api_client.default_headers['Authorization'] = "Bearer " + self.bearer._access_token
|
||||
self.status_client = swagger_client.StatusApi(api_client=self.api_client)
|
||||
self.equipment_client = swagger_client.EquipmentApi(self.api_client)
|
||||
@@ -129,13 +132,12 @@ class Controller(ConfigureController):
|
||||
}
|
||||
self.api_client.configuration.refresh_api_key_hook = self.refresh_instance
|
||||
self.ping_response = self.portal_ping()
|
||||
self.default_profiles = {}
|
||||
# print(self.bearer)
|
||||
if self.ping_response._application_name != 'PortalServer':
|
||||
print("Server not Reachable")
|
||||
exit()
|
||||
print("Connected to Controller Server")
|
||||
# self.semaphore = False
|
||||
self.token_timestamp = time.time()
|
||||
|
||||
def portal_ping(self):
|
||||
self.refresh_instance()
|
||||
@@ -148,6 +150,7 @@ class Controller(ConfigureController):
|
||||
|
||||
# Returns a List of All the Equipments that are available in the cloud instances
|
||||
def get_equipment_by_customer_id(self, max_items=10):
|
||||
self.refresh_instance()
|
||||
pagination_context = """{
|
||||
"model_type": "PaginationContext",
|
||||
"maxItemsPerPage": """ + str(max_items) + """
|
||||
@@ -185,6 +188,7 @@ class Controller(ConfigureController):
|
||||
self.refresh_instance()
|
||||
if equipment_id is None:
|
||||
return None
|
||||
self.refresh_instance()
|
||||
data = self.equipment_client.get_equipment_by_id(equipment_id=equipment_id)
|
||||
print(str(data._details._equipment_model))
|
||||
return str(data._details._equipment_model)
|
||||
|
||||
@@ -40,8 +40,9 @@ class RunTest:
|
||||
self.ax_prefix = lanforge_data["AX-Station-Name"]
|
||||
self.debug = debug
|
||||
self.staConnect = StaConnect2(self.lanforge_ip, self.lanforge_port, debug_=debug)
|
||||
print(lanforge_data)
|
||||
|
||||
def Client_Connectivity(self, ssid="[BLANK]", passkey="[BLANK]", security="open", station_name=[],
|
||||
def Client_Connectivity(self, ssid="[BLANK]", passkey="[BLANK]", security="open", extra_securities=[], station_name=[],
|
||||
mode="BRIDGE", vlan_id=1, band="twog"):
|
||||
'''SINGLE CLIENT CONNECTIVITY using test_connect2.py'''
|
||||
self.staConnect.sta_mode = 0
|
||||
@@ -67,7 +68,7 @@ class RunTest:
|
||||
self.staConnect.bringup_time_sec = 60
|
||||
self.staConnect.cleanup_on_exit = True
|
||||
# self.staConnect.cleanup()
|
||||
self.staConnect.setup()
|
||||
self.staConnect.setup(extra_securities=extra_securities)
|
||||
self.staConnect.start()
|
||||
print("napping %f sec" % self.staConnect.runtime_secs)
|
||||
time.sleep(self.staConnect.runtime_secs)
|
||||
@@ -86,7 +87,8 @@ class RunTest:
|
||||
time.sleep(3)
|
||||
return self.staConnect.passes(), result
|
||||
|
||||
def EAP_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", mode="BRIDGE", band="twog", vlan_id=100,
|
||||
def EAP_Connect(self, ssid="[BLANK]", passkey="[BLANK]", security="wpa2", extra_securities=[],
|
||||
mode="BRIDGE", band="twog", vlan_id=100,
|
||||
station_name=[], key_mgmt="WPA-EAP",
|
||||
pairwise="NA", group="NA", wpa_psk="DEFAULT",
|
||||
ttls_passwd="nolastart",
|
||||
@@ -97,11 +99,14 @@ class RunTest:
|
||||
self.eap_connect.station_profile.sta_mode = 0
|
||||
self.eap_connect.upstream_resource = 1
|
||||
if mode == "BRIDGE":
|
||||
self.eap_connect.upstream_port = self.upstream_port
|
||||
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port
|
||||
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port
|
||||
elif mode == "NAT":
|
||||
self.eap_connect.upstream_port = self.upstream_port
|
||||
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port
|
||||
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port
|
||||
else:
|
||||
self.eap_connect.upstream_port = self.upstream_port + "." + str(vlan_id)
|
||||
self.eap_connect.l3_cx_obj_udp.upstream = self.upstream_port + "." + str(vlan_id)
|
||||
self.eap_connect.l3_cx_obj_tcp.upstream = self.upstream_port + "." + str(vlan_id)
|
||||
if band == "twog":
|
||||
self.eap_connect.radio = self.twog_radios[0]
|
||||
# self.eap_connect.sta_prefix = self.twog_prefix
|
||||
@@ -126,7 +131,7 @@ class RunTest:
|
||||
self.eap_connect.password = passkey
|
||||
self.eap_connect.security = security
|
||||
self.eap_connect.sta_list = station_name
|
||||
self.eap_connect.build()
|
||||
self.eap_connect.build(extra_securities=extra_securities)
|
||||
self.eap_connect.start(station_name, True, True)
|
||||
self.eap_connect.stop()
|
||||
self.eap_connect.cleanup(station_name)
|
||||
|
||||
Reference in New Issue
Block a user