mirror of
				https://github.com/Telecominfraproject/wlan-lanforge-scripts.git
				synced 2025-11-04 04:38:02 +00:00 
			
		
		
		
	test_status_msg: Check if message_list_response exists, otherwise pass. It will post a LANforge error output if you don't write your LANforge URL properly.
Signed-off-by: Matthew Stidham <stidmatt@gmail.com>
This commit is contained in:
		@@ -67,12 +67,13 @@ class TestStatusMessage(LFCliBase):
 | 
			
		||||
        message_response = self.json_get(self.session_url)
 | 
			
		||||
        if self.debug:
 | 
			
		||||
            pprint(message_response)
 | 
			
		||||
        if "empty" in message_response:
 | 
			
		||||
            self._pass("empty response, zero messages")
 | 
			
		||||
        elif "messages" in message_response:
 | 
			
		||||
            messages_a = message_response["messages"]
 | 
			
		||||
            if len(messages_a) > 0:
 | 
			
		||||
                self._fail("we should have zero messages")
 | 
			
		||||
        if message_response:
 | 
			
		||||
            if "empty" in message_response:
 | 
			
		||||
                self._pass("empty response, zero messages")
 | 
			
		||||
            elif "messages" in message_response:
 | 
			
		||||
                messages_a = message_response["messages"]
 | 
			
		||||
                if len(messages_a) > 0:
 | 
			
		||||
                    self._fail("we should have zero messages")
 | 
			
		||||
 | 
			
		||||
        for msg_num in (1, 2, 3, 4, 5):
 | 
			
		||||
            # print("----- ----- ----- ----- ----- ----- %s ----- ----- ----- ----- ----- " % msg_num)
 | 
			
		||||
@@ -84,9 +85,10 @@ class TestStatusMessage(LFCliBase):
 | 
			
		||||
                "message": "message %s" % msg_num
 | 
			
		||||
            })
 | 
			
		||||
            message_response = self.json_get(self.session_url)
 | 
			
		||||
            if len(message_response["messages"]) != msg_num:
 | 
			
		||||
                pprint(message_response)
 | 
			
		||||
                self._fail("we should have %s messages" % msg_num)
 | 
			
		||||
            if message_response:
 | 
			
		||||
                if len(message_response["messages"]) != msg_num:
 | 
			
		||||
                    pprint(message_response)
 | 
			
		||||
                    self._fail("we should have %s messages" % msg_num)
 | 
			
		||||
 | 
			
		||||
        self._pass("created and listed %s messages counted" % msg_num)
 | 
			
		||||
 | 
			
		||||
@@ -97,81 +99,84 @@ class TestStatusMessage(LFCliBase):
 | 
			
		||||
        """
 | 
			
		||||
 | 
			
		||||
        message_list_response = self.json_get(self.session_url)
 | 
			
		||||
        if "empty" in message_list_response:
 | 
			
		||||
            self._fail("empty response, we expect 1 or more messages")
 | 
			
		||||
        msg_num = 0
 | 
			
		||||
        for message_o in message_list_response["messages"]:
 | 
			
		||||
            msg_url = message_o["_links"]
 | 
			
		||||
            print("Message url: " + msg_url)
 | 
			
		||||
            message_response = self.json_get(msg_url)
 | 
			
		||||
            if self.debug:
 | 
			
		||||
                pprint(message_response)
 | 
			
		||||
            for content_o in message_response["messages"]:
 | 
			
		||||
                msg_num += 1
 | 
			
		||||
                print("id %s" % content_o["message_id"])
 | 
			
		||||
                print("key %s" % content_o["message"]["key"])
 | 
			
		||||
                print("content-type %s" % content_o["message"]["content-type"])
 | 
			
		||||
                print("message %s" % content_o["message"]["message"])
 | 
			
		||||
        if message_list_response:
 | 
			
		||||
            if "empty" in message_list_response:
 | 
			
		||||
                self._fail("empty response, we expect 1 or more messages")
 | 
			
		||||
            msg_num = 0
 | 
			
		||||
            for message_o in message_list_response["messages"]:
 | 
			
		||||
                msg_url = message_o["_links"]
 | 
			
		||||
                print("Message url: " + msg_url)
 | 
			
		||||
                message_response = self.json_get(msg_url)
 | 
			
		||||
                if self.debug:
 | 
			
		||||
                    pprint(message_response)
 | 
			
		||||
                for content_o in message_response["messages"]:
 | 
			
		||||
                    msg_num += 1
 | 
			
		||||
                    print("id %s" % content_o["message_id"])
 | 
			
		||||
                    print("key %s" % content_o["message"]["key"])
 | 
			
		||||
                    print("content-type %s" % content_o["message"]["content-type"])
 | 
			
		||||
                    print("message %s" % content_o["message"]["message"])
 | 
			
		||||
 | 
			
		||||
        if msg_num != self.msg_count:
 | 
			
		||||
            self._fail("(stop) expected %s messages, saw %s" % (self.msg_count, msg_num))
 | 
			
		||||
        else:
 | 
			
		||||
            self._pass("saw correct number of messages")
 | 
			
		||||
            if msg_num != self.msg_count:
 | 
			
		||||
                self._fail("(stop) expected %s messages, saw %s" % (self.msg_count, msg_num))
 | 
			
		||||
            else:
 | 
			
		||||
                self._pass("saw correct number of messages")
 | 
			
		||||
 | 
			
		||||
    def cleanup(self):
 | 
			
		||||
        """delete messages and delete the session"""
 | 
			
		||||
 | 
			
		||||
        message_list_response = self.json_get(self.session_url)
 | 
			
		||||
        if "empty" in message_list_response:
 | 
			
		||||
            self._fail("empty response, we expect 1 or more messages")
 | 
			
		||||
        last_link = ""
 | 
			
		||||
        msg_num = 0
 | 
			
		||||
        for message_o in message_list_response["messages"]:
 | 
			
		||||
            # print("Delete Message url: "+msg_url)
 | 
			
		||||
            last_link = message_o["_links"]
 | 
			
		||||
            msg_num += 1
 | 
			
		||||
        if message_list_response:
 | 
			
		||||
            if "empty" in message_list_response:
 | 
			
		||||
                self._fail("empty response, we expect 1 or more messages")
 | 
			
		||||
            last_link = ""
 | 
			
		||||
            msg_num = 0
 | 
			
		||||
            for message_o in message_list_response["messages"]:
 | 
			
		||||
                # print("Delete Message url: "+msg_url)
 | 
			
		||||
                last_link = message_o["_links"]
 | 
			
		||||
                msg_num += 1
 | 
			
		||||
 | 
			
		||||
        if msg_num != self.msg_count:
 | 
			
		||||
            self._fail("(cleanup) expected %s messages, saw %s" % (self.msg_count, msg_num))
 | 
			
		||||
        message_response = self.json_delete(last_link)
 | 
			
		||||
        if self.debug:
 | 
			
		||||
            pprint(message_response)
 | 
			
		||||
            if msg_num != self.msg_count:
 | 
			
		||||
                self._fail("(cleanup) expected %s messages, saw %s" % (self.msg_count, msg_num))
 | 
			
		||||
            message_response = self.json_delete(last_link)
 | 
			
		||||
            if self.debug:
 | 
			
		||||
                pprint(message_response)
 | 
			
		||||
 | 
			
		||||
        # check message removal
 | 
			
		||||
        message_list_response = self.json_get(self.session_url)
 | 
			
		||||
        msg_num = len(message_list_response["messages"])
 | 
			
		||||
        if msg_num != (self.msg_count - 1):
 | 
			
		||||
            self._fail("(cleanup) expected %s messages, saw %s" % ((self.msg_count - 1), msg_num))
 | 
			
		||||
        else:
 | 
			
		||||
            self._pass("(cleanup) messages decreased by one")
 | 
			
		||||
 | 
			
		||||
        all_url = self.session_url + "/all"
 | 
			
		||||
        message_response = self.json_delete(all_url)
 | 
			
		||||
        if self.debug:
 | 
			
		||||
            pprint(message_response)
 | 
			
		||||
 | 
			
		||||
        message_list_response = self.json_get(self.session_url)
 | 
			
		||||
        if self.debug:
 | 
			
		||||
            pprint(message_list_response)
 | 
			
		||||
        if "messages" in message_list_response:
 | 
			
		||||
        if message_list_response:
 | 
			
		||||
            msg_num = len(message_list_response["messages"])
 | 
			
		||||
        elif "empty" in message_list_response:
 | 
			
		||||
            msg_num = 0
 | 
			
		||||
            if msg_num != (self.msg_count - 1):
 | 
			
		||||
                self._fail("(cleanup) expected %s messages, saw %s" % ((self.msg_count - 1), msg_num))
 | 
			
		||||
            else:
 | 
			
		||||
                self._pass("(cleanup) messages decreased by one")
 | 
			
		||||
 | 
			
		||||
        if msg_num == 0:
 | 
			
		||||
            return "deleted all messages in session"
 | 
			
		||||
        else:
 | 
			
		||||
            self._fail("failed to delete all messages in session")
 | 
			
		||||
            all_url = self.session_url + "/all"
 | 
			
		||||
            message_response = self.json_delete(all_url)
 | 
			
		||||
            if self.debug:
 | 
			
		||||
                pprint(message_response)
 | 
			
		||||
 | 
			
		||||
        if 'empty' in self.json_get(self.session_url).keys():
 | 
			
		||||
            message_list_response = self.json_get(self.session_url)
 | 
			
		||||
            if self.debug:
 | 
			
		||||
                print("--- del -------------------- -------------------- --------------------")
 | 
			
		||||
            self.exit_on_error = False
 | 
			
		||||
            self.json_delete("%s/this" % self.session_url, debug_=False)
 | 
			
		||||
            if self.debug:
 | 
			
		||||
                print("--- ~del -------------------- -------------------- --------------------")
 | 
			
		||||
        else:
 | 
			
		||||
            return 'ports deleted successfully'
 | 
			
		||||
                pprint(message_list_response)
 | 
			
		||||
            if "messages" in message_list_response:
 | 
			
		||||
                msg_num = len(message_list_response["messages"])
 | 
			
		||||
            elif "empty" in message_list_response:
 | 
			
		||||
                msg_num = 0
 | 
			
		||||
 | 
			
		||||
            if msg_num == 0:
 | 
			
		||||
                return "deleted all messages in session"
 | 
			
		||||
            else:
 | 
			
		||||
                self._fail("failed to delete all messages in session")
 | 
			
		||||
 | 
			
		||||
            if 'empty' in message_list_response.keys():
 | 
			
		||||
                if self.debug:
 | 
			
		||||
                    print("--- del -------------------- -------------------- --------------------")
 | 
			
		||||
                self.exit_on_error = False
 | 
			
		||||
                self.json_delete("%s/this" % self.session_url, debug_=False)
 | 
			
		||||
                if self.debug:
 | 
			
		||||
                    print("--- ~del -------------------- -------------------- --------------------")
 | 
			
		||||
            else:
 | 
			
		||||
                return 'ports deleted successfully'
 | 
			
		||||
 | 
			
		||||
        sessions_list_response = self.json_get("/status-msg")
 | 
			
		||||
        if self.debug:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user