mirror of
				https://github.com/Telecominfraproject/openafc_final.git
				synced 2025-11-04 03:57:48 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			968 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			968 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
''' Non-test support objects and classes used by the actual test cases.
 | 
						|
'''
 | 
						|
 | 
						|
import datetime
 | 
						|
from io import BytesIO
 | 
						|
import logging
 | 
						|
import lxml.etree as etree
 | 
						|
import os
 | 
						|
import re
 | 
						|
import requests
 | 
						|
import shutil
 | 
						|
import tempfile
 | 
						|
import unittest
 | 
						|
from urlparse import urljoin
 | 
						|
import werkzeug.datastructures
 | 
						|
import werkzeug.http
 | 
						|
import werkzeug.urls
 | 
						|
from nose import SkipTest
 | 
						|
 | 
						|
#: Logger for this module
 | 
						|
LOGGER = logging.getLogger(__name__)
 | 
						|
 | 
						|
#: Regex to match application/xml and applicaiton/sub+xml
 | 
						|
XML_CONTENT_RE = re.compile(r'^application/(.+\+)?xml$')
 | 
						|
 | 
						|
#: Time format for ISO 8601 "basic" time used by XML Schema.
 | 
						|
#: This string is usable by datetime.strftime and datetime.strptime
 | 
						|
TIME_FORMAT_BASIC = '%Y%m%dT%H%M%SZ'
 | 
						|
#: Time format for ISO 8601 "extended" time used by XML Schema.
 | 
						|
#: This string is usable by datetime.strftime and datetime.strptime
 | 
						|
TIME_FORMAT_EXTENDED = '%Y-%m-%dT%H:%M:%SZ'
 | 
						|
 | 
						|
#: Absolute path to this package directory
 | 
						|
PACKAGE_PATH = os.path.abspath(os.path.dirname(__file__))
 | 
						|
 | 
						|
 | 
						|
def get_xml_parser(schema):
 | 
						|
    ''' Generate a function to extract an XML DOM tree from an encoded document.
 | 
						|
 | 
						|
    :param schema: Iff not None, the document will be validated against
 | 
						|
        this schema object.
 | 
						|
    :type use_schema: bool
 | 
						|
    :return: The parser function which takes a file-like parameter and
 | 
						|
        returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
 | 
						|
    '''
 | 
						|
    xmlparser = etree.XMLParser(schema=schema)
 | 
						|
 | 
						|
    def func(infile):
 | 
						|
        try:
 | 
						|
            return etree.parse(infile, parser=xmlparser)
 | 
						|
        except etree.XMLSyntaxError as err:
 | 
						|
            infile.seek(0)
 | 
						|
            with tempfile.NamedTemporaryFile(delete=False) as outfile:
 | 
						|
                shutil.copyfileobj(infile, outfile)
 | 
						|
                raise ValueError(
 | 
						|
                    'Failed to parse XML with error {0} in file {1}'.format(
 | 
						|
                        err, outfile.name))
 | 
						|
 | 
						|
    return func
 | 
						|
 | 
						|
 | 
						|
def extract_metadict(doc):
 | 
						|
    ''' Extract a server metadata dictionary from its parsed XML document.
 | 
						|
 | 
						|
    :param doc: The document to read from.
 | 
						|
    :return: The metadata URL map.
 | 
						|
    '''
 | 
						|
    metadict = {}
 | 
						|
    for el_a in doc.findall('//{http://www.w3.org/1999/xhtml}a'):
 | 
						|
        m_id = el_a.attrib.get('id')
 | 
						|
        m_href = el_a.attrib.get('href')
 | 
						|
        if m_id is None or m_href is None:
 | 
						|
            continue
 | 
						|
        metadict[m_id] = m_href
 | 
						|
    return metadict
 | 
						|
 | 
						|
 | 
						|
def merged(base, delta):
 | 
						|
    ''' Return a merged dictionary contents.
 | 
						|
 | 
						|
    :param base: The initial contents to merge.
 | 
						|
    :param delta: The modifications to apply.
 | 
						|
    :return: A dictionary containing the :py:obj:`base` updated
 | 
						|
        by the :py:obj:`delta`.
 | 
						|
    '''
 | 
						|
    mod = dict(base)
 | 
						|
    mod.update(delta)
 | 
						|
    return mod
 | 
						|
 | 
						|
 | 
						|
def limit_count(iterable, limit):
 | 
						|
    ''' Wrap an iterable/generator with a count limit to only yield the first
 | 
						|
    :py:obj:`count` number of items.
 | 
						|
 | 
						|
    :param iterable: The source iterable object.
 | 
						|
    :param limit: The maximum number of items available from the generator.
 | 
						|
    :return A generator with a count limit.
 | 
						|
    '''
 | 
						|
    count = 0
 | 
						|
    for item in iterable:
 | 
						|
        yield item
 | 
						|
        count += 1
 | 
						|
        if count >= limit:
 | 
						|
            return
 | 
						|
 | 
						|
 | 
						|
def modify_etag(orig):
 | 
						|
    ''' Given a base ETag value, generate a modified ETag which is
 | 
						|
    guaranteed to not match the original.
 | 
						|
 | 
						|
    :param str orig: The original ETag.
 | 
						|
    :return: A different ETag
 | 
						|
    '''
 | 
						|
    # Inject characters near the end
 | 
						|
    mod = orig[:-1] + '-eh' + orig[-1:]
 | 
						|
    return mod
 | 
						|
 | 
						|
 | 
						|
class ValidateJsonResponse(object):
 | 
						|
    ''' Validate an expected JSON file response.
 | 
						|
    '''
 | 
						|
 | 
						|
    def __call__(self, resp):
 | 
						|
        import json
 | 
						|
        json.loads(resp.content)
 | 
						|
 | 
						|
 | 
						|
class ValidateHtmlResponse(object):
 | 
						|
    ''' Validate an HTML response with a loose parser.
 | 
						|
    '''
 | 
						|
 | 
						|
    def __call__(self, resp):
 | 
						|
        import bs4
 | 
						|
        kwargs = dict(
 | 
						|
            markup=resp.content,
 | 
						|
            features='lxml',
 | 
						|
        )
 | 
						|
        bs4.BeautifulSoup(**kwargs)
 | 
						|
 | 
						|
 | 
						|
class ValidateXmlResponse(object):
 | 
						|
    ''' Validate an expected XML file response.
 | 
						|
 | 
						|
    :param parser: A function to take a file-like input and output an
 | 
						|
        XML element tree :py:cls:`lxml.etree.ElementTree`.
 | 
						|
    :param require_root: If not None, the root element must be this value.
 | 
						|
    '''
 | 
						|
 | 
						|
    def __init__(self, parser, require_root=None):
 | 
						|
        self._parser = parser
 | 
						|
        if not callable(self._parser):
 | 
						|
            raise ValueError('ValidateXmlResponse parser invalid')
 | 
						|
        self._require_root = require_root
 | 
						|
 | 
						|
    def __call__(self, resp):
 | 
						|
        xml_tree = self._parser(BytesIO(resp.content))
 | 
						|
        if self._require_root is not None:
 | 
						|
            root_tag = xml_tree.getroot().tag
 | 
						|
            if self._require_root != root_tag:
 | 
						|
                raise ValueError(
 | 
						|
                    'Required root element "{0}" not present, got "{1}"'.format(
 | 
						|
                        self._require_root, root_tag))
 | 
						|
 | 
						|
 | 
						|
class BaseTestCase(unittest.TestCase):
 | 
						|
    ''' Common access and helper functions which use the :py:mod:`unittest`
 | 
						|
    framework but this class defines no test functions itself.
 | 
						|
 | 
						|
    :ivar httpsession: An :py:class:`requests.Session` instance for test use.
 | 
						|
    :ivar xmlparser: An :py:class:`etree.XMLParser` instance for test use.
 | 
						|
    '''
 | 
						|
 | 
						|
    #: Cached URL to start at and to resolve from
 | 
						|
    BASE_URL = os.environ.get('HTTPCHECKOUT_BASEURL')
 | 
						|
    #: True if the editing tests should be skipped
 | 
						|
    READONLY = bool(os.environ.get('HTTPCHECKOUT_READONLY'))
 | 
						|
    #: Keep any created resources in the tearDown() method which are left behind
 | 
						|
    KEEP_TESTITEMS = bool(os.environ.get('HTTPCHECKOUT_KEEP_TESTITEMS'))
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        unittest.TestCase.setUp(self)
 | 
						|
        self.maxDiff = 10e3
 | 
						|
 | 
						|
        self.assertIsNotNone(
 | 
						|
            self.BASE_URL, 'Missing environment HTTPCHECKOUT_BASEURL')
 | 
						|
        self.httpsession = requests.Session()
 | 
						|
 | 
						|
        ca_roots = os.environ.get('HTTPCHECKOUT_CACERTS')
 | 
						|
        LOGGER.info('HTTPCHECKOUT_CACERTS is "%s"', ca_roots)
 | 
						|
        if ca_roots == '':
 | 
						|
            import warnings
 | 
						|
            from urllib3.exceptions import InsecureRequestWarning
 | 
						|
            self.httpsession.verify = False
 | 
						|
            warnings.filterwarnings("ignore", category=InsecureRequestWarning)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.httpsession = None
 | 
						|
        unittest.TestCase.tearDown(self)
 | 
						|
 | 
						|
    def _assertWriteTest(self):
 | 
						|
        ''' Skip the current test if HTTPCHECKOUT_READONLY is set.
 | 
						|
        '''
 | 
						|
        if self.READONLY:
 | 
						|
            self.skipTest(
 | 
						|
                'Not running editing tests because of HTTPCHECKOUT_READONLY')
 | 
						|
 | 
						|
    def _resolve_url(self, url):
 | 
						|
        ''' Resolve a URL relative to the original base URL.
 | 
						|
 | 
						|
        :param url: The URL to resolve.
 | 
						|
        :type url: str
 | 
						|
        :return: The resolved absolute URL to request on.
 | 
						|
        :rtype: str
 | 
						|
        '''
 | 
						|
        return urljoin(self.BASE_URL, url)
 | 
						|
 | 
						|
    def assertSameUrlPath(self, first, second):
 | 
						|
        ''' Assert that two URLs are equal except for their query/fragment parts.
 | 
						|
        '''
 | 
						|
        f_url = werkzeug.urls.url_parse(first)
 | 
						|
        s_url = werkzeug.urls.url_parse(second)
 | 
						|
 | 
						|
        for attr in ('scheme', 'netloc', 'path'):
 | 
						|
            self.assertEqual(
 | 
						|
                getattr(f_url, attr),
 | 
						|
                getattr(s_url, attr),
 | 
						|
                'Mismatched URL {0}'.format(attr)
 | 
						|
            )
 | 
						|
 | 
						|
    def _get_xml_parser(self, use_schema=None):
 | 
						|
        ''' Generate a function to extract an XML DOM tree from an encoded document.
 | 
						|
 | 
						|
        :return: The parser function which takes a file-like parameter and
 | 
						|
            returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
 | 
						|
        '''
 | 
						|
        return get_xml_parser(schema=use_schema)
 | 
						|
 | 
						|
    def _get_xml_encoder(self):
 | 
						|
        ''' Generate a function to encode a document from an XML DOM tree.
 | 
						|
 | 
						|
        :return: The parser function which takes a parameter of a
 | 
						|
            tree object of type :py:cls:`lxml.etree.ElementTree` and
 | 
						|
            returns a file-like object.
 | 
						|
        '''
 | 
						|
 | 
						|
        def func(doc, outfile=None):
 | 
						|
            ''' Encode a document.
 | 
						|
 | 
						|
            :parm doc: The document to encode.
 | 
						|
            :type doc: :py:cls:`lxml.etree.ElementTree`
 | 
						|
            :param outfile: An optional file-like object to encode into.
 | 
						|
                This must be None if the encoder is used multiple times.
 | 
						|
            :type outfile: file-like or None
 | 
						|
            :return: The encoded file-like object.
 | 
						|
            '''
 | 
						|
            if outfile is None:
 | 
						|
                outfile = BytesIO()
 | 
						|
 | 
						|
            doc.write(outfile, encoding='UTF-8', xml_declaration=True)
 | 
						|
            if hasattr(outfile, 'seek'):
 | 
						|
                try:
 | 
						|
                    outfile.seek(0)
 | 
						|
                except BaseException:
 | 
						|
                    pass
 | 
						|
            return outfile
 | 
						|
 | 
						|
        return func
 | 
						|
 | 
						|
    def _test_working_links(self, text):
 | 
						|
        ''' Verify that html has well formed links
 | 
						|
 | 
						|
        :param text: html doc with href's
 | 
						|
        '''
 | 
						|
 | 
						|
        import bs4
 | 
						|
        html = bs4.BeautifulSoup(text, 'html.parser')
 | 
						|
        for link in [a['href'] for a in html.find_all('a')]:
 | 
						|
            self._test_working_link(link)
 | 
						|
 | 
						|
    def _test_working_link(self, url):
 | 
						|
        ''' Verify that a url returns a 200 response
 | 
						|
 | 
						|
        :param url: The URL to be checked
 | 
						|
        '''
 | 
						|
 | 
						|
        resolved_url = self._resolve_url(url)
 | 
						|
        resp = self.httpsession.get(resolved_url)
 | 
						|
        self.assertEqual(200, resp.status_code)
 | 
						|
 | 
						|
    def _test_options_allow(self, url, methods):
 | 
						|
        ''' Verify that the OPTIONS response for a URL matches a specific set.
 | 
						|
 | 
						|
        :param url: The URL to pass to :py:mod:`requests`
 | 
						|
        :type url: str
 | 
						|
        :param methods: The method names which must be identical to the response.
 | 
						|
        :type methods: iterable
 | 
						|
        :param params: URL parameter dictionary to pass to :py:mod:`requests`.
 | 
						|
        :type params: dict or None
 | 
						|
        '''
 | 
						|
        methods = set([str(m).upper() for m in methods])
 | 
						|
        methods.add('OPTIONS')
 | 
						|
        if 'GET' in methods:
 | 
						|
            methods.add('HEAD')
 | 
						|
 | 
						|
        resolved_url = self._resolve_url(url)
 | 
						|
        resp = self.httpsession.options(resolved_url)
 | 
						|
        self.assertEqual(200, resp.status_code)
 | 
						|
        got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
 | 
						|
        self.assertEqual(methods, set(got_allow))
 | 
						|
 | 
						|
    def _test_path_contents(
 | 
						|
            self,
 | 
						|
            url,
 | 
						|
            params=None,
 | 
						|
            base_headers=None,
 | 
						|
            validate_response_pre=None,
 | 
						|
            validate_response_post=None,
 | 
						|
            must_authorize=True,
 | 
						|
            valid_status=None,
 | 
						|
            content_type=None,
 | 
						|
            valid_encodings=None,
 | 
						|
            require_length=True,
 | 
						|
            require_vary=None,
 | 
						|
            require_etag=True,
 | 
						|
            require_lastmod=True,
 | 
						|
            require_cacheable=True,
 | 
						|
            cache_must_revalidate=False):
 | 
						|
        ''' Common assertions for static resources.
 | 
						|
 | 
						|
        :param url: The URL to pass to :py:mod:`requests`
 | 
						|
        :type url: str
 | 
						|
        :param params: URL parameter dictionary to pass to :py:mod:`requests`.
 | 
						|
        :type params: dict or None
 | 
						|
        :param base_headers: A dictionary of headers to send with every request.
 | 
						|
        :type base_headers: dict or None
 | 
						|
        :param validate_response_pre: A callable which takes a single argument of
 | 
						|
            the response object and performs its own validation of the headers
 | 
						|
            and/or body for each non-cached response.
 | 
						|
            This is performed before any of the parametric checks.
 | 
						|
        :type validate_response_pre: callable or None
 | 
						|
        :param validate_response_post: A callable which takes a single argument of
 | 
						|
            the response object and performs its own validation of the headers
 | 
						|
            and/or body for each non-cached response.
 | 
						|
            This is performed after any of the parametric checks.
 | 
						|
        :type validate_response_post: callable or None
 | 
						|
        :param must_authorize: Access to the resource without an Authorization
 | 
						|
            header is attempted and compared against this value.
 | 
						|
        :type must_authorize: bool
 | 
						|
        :param valid_status: A set of valid status codes to allow.
 | 
						|
            If not provided, only code 200 is valid.
 | 
						|
        :type valid_status: set or None
 | 
						|
        :param content_type: If not None, the required Content-Type header.
 | 
						|
        :type content_type: bool or None
 | 
						|
        :param valid_encodings: If not None, a list of content encodings to check for.
 | 
						|
            The resource must provide each of the non-identity encodings listed.
 | 
						|
        :type valid_encodings: list or None
 | 
						|
        :param require_length: If either true or false, assert that the
 | 
						|
            content-length header is present or not.
 | 
						|
        :type require_length: bool or None
 | 
						|
        :param require_vary: A set of Vary reults required to be present
 | 
						|
            in the response.
 | 
						|
            If the :py:obj:`valid_encodings` list has more than the identity
 | 
						|
            encoding present, then 'accept-encoding' will be automatically
 | 
						|
            added to this vary list.
 | 
						|
        :type require_vary: list or None
 | 
						|
        :param require_etag: If not None, whether the ETag is required present
 | 
						|
            or not present (True or False) or a specific string value.
 | 
						|
        :type require_etag: str or bool or None
 | 
						|
        :param require_lastmod: If not None, whether the Last-Modified is
 | 
						|
            required present or not present (True or False) or a specific value.
 | 
						|
        :type require_lastmod: str or bool or None
 | 
						|
        :param require_cacheable: If true, the resource is checked for its cacheability.
 | 
						|
            Not all resources should be cacheable (even if not explicitly marked no-cache).
 | 
						|
        :type require_cacheable: bool
 | 
						|
        :param cache_must_revalidate: If True, the response must have its
 | 
						|
            'must-revalidate' cache control header set.
 | 
						|
        :type cache_must_revalidate: bool
 | 
						|
        :raises: raises unittest exceptions if an assertion fails
 | 
						|
        '''
 | 
						|
 | 
						|
        if base_headers is None:
 | 
						|
            base_headers = {}
 | 
						|
 | 
						|
        if valid_status is None:
 | 
						|
            valid_status = [200]
 | 
						|
        valid_status = set(valid_status)
 | 
						|
 | 
						|
        # Set of valid encodings to require
 | 
						|
        if valid_encodings is None:
 | 
						|
            valid_encodings = []
 | 
						|
        valid_encodings = set(valid_encodings)
 | 
						|
        valid_encodings.add('identity')
 | 
						|
        # Force as ordered list with identity encoding first and gzip always
 | 
						|
        # attempted
 | 
						|
        try_encodings = set(valid_encodings)
 | 
						|
        try_encodings.discard('identity')
 | 
						|
        try_encodings = sorted(list(try_encodings))
 | 
						|
        try_encodings.insert(0, 'identity')
 | 
						|
        # Cached identity-encoded contents
 | 
						|
        identity_body = None
 | 
						|
 | 
						|
        if require_vary is None:
 | 
						|
            require_vary = []
 | 
						|
        require_vary = set(require_vary)
 | 
						|
        if len(valid_encodings) > 1:
 | 
						|
            require_vary.add('accept-encoding')
 | 
						|
 | 
						|
        resolved_url = self._resolve_url(url)
 | 
						|
 | 
						|
        # Options on the resource itself
 | 
						|
        resp = self.httpsession.options(
 | 
						|
            resolved_url, params=params,
 | 
						|
            allow_redirects=False,
 | 
						|
            headers=base_headers,
 | 
						|
        )
 | 
						|
        self.assertEqual(200, resp.status_code)
 | 
						|
        got_allow = werkzeug.http.parse_set_header(resp.headers.get('allow'))
 | 
						|
        self.assertIn('options', got_allow)
 | 
						|
        if 404 not in valid_status:
 | 
						|
            self.assertIn('head', got_allow)
 | 
						|
            self.assertIn('get', got_allow)
 | 
						|
 | 
						|
        # Options without authentication
 | 
						|
        resp = self.httpsession.options(
 | 
						|
            resolved_url, params=params,
 | 
						|
            allow_redirects=False,
 | 
						|
            headers=merged(base_headers, {
 | 
						|
                'authorization': None,
 | 
						|
            }),
 | 
						|
        )
 | 
						|
        if must_authorize:
 | 
						|
            self.assertEqual(401, resp.status_code)
 | 
						|
        else:
 | 
						|
            self.assertEqual(200, resp.status_code)
 | 
						|
 | 
						|
        for try_encoding in try_encodings:
 | 
						|
            # initial non-cache response
 | 
						|
            enc_headers = merged(
 | 
						|
                base_headers,
 | 
						|
                {
 | 
						|
                    'accept-encoding': try_encoding,
 | 
						|
                }
 | 
						|
            )
 | 
						|
            resp = self.httpsession.get(
 | 
						|
                resolved_url, params=params,
 | 
						|
                allow_redirects=False,
 | 
						|
                headers=enc_headers,
 | 
						|
                stream=True,
 | 
						|
            )
 | 
						|
            # External validation first
 | 
						|
            if validate_response_pre:
 | 
						|
                try:
 | 
						|
                    validate_response_pre(resp)
 | 
						|
                except Exception as err:
 | 
						|
                    self.fail('Failed pre-validation: {0}'.format(err))
 | 
						|
 | 
						|
            # Now parametric validation
 | 
						|
            self.assertIn(resp.status_code, valid_status)
 | 
						|
 | 
						|
            got_content_type = werkzeug.http.parse_options_header(
 | 
						|
                resp.headers['content-type'])
 | 
						|
            if content_type is not None:
 | 
						|
                self.assertEqual(content_type.lower(),
 | 
						|
                                 got_content_type[0].lower())
 | 
						|
 | 
						|
            # Encoding comparison compared to valid
 | 
						|
            got_encoding = resp.headers.get('content-encoding', 'identity')
 | 
						|
            if try_encoding in valid_encodings:
 | 
						|
                self.assertEqual(try_encoding, got_encoding)
 | 
						|
            else:
 | 
						|
                self.assertEqual(
 | 
						|
                    'identity', got_encoding,
 | 
						|
                    msg='"{0}" was supposed to be a disallowed content-encoding but it was accepted'.format(
 | 
						|
                        try_encoding)
 | 
						|
                )
 | 
						|
 | 
						|
            got_length = resp.headers.get('content-length')
 | 
						|
            if require_length is True:
 | 
						|
                self.assertIsNotNone(got_length, msg='Content-Length missing')
 | 
						|
            elif require_length is False:
 | 
						|
                self.assertIsNone(
 | 
						|
                    got_length, msg='Content-Length should not be present')
 | 
						|
 | 
						|
            # Guarantee type is correct also
 | 
						|
            if got_length is not None:
 | 
						|
                try:
 | 
						|
                    got_length = int(got_length)
 | 
						|
                except ValueError:
 | 
						|
                    self.fail(
 | 
						|
                        'Got a non-integer Content-Length: {0}'.format(got_length))
 | 
						|
 | 
						|
            got_vary = werkzeug.http.parse_set_header(resp.headers.get('vary'))
 | 
						|
            for item in require_vary:
 | 
						|
                LOGGER.debug("headers: %s", resp.headers)
 | 
						|
                self.assertIn(
 | 
						|
                    item,
 | 
						|
                    got_vary,
 | 
						|
                    msg='Vary header missing item "{0}" got {1}'.format(
 | 
						|
                        item,
 | 
						|
                        got_vary))
 | 
						|
 | 
						|
            got_etag = resp.headers.get('etag')
 | 
						|
            got_lastmod = resp.headers.get('last-modified')
 | 
						|
            if resp.status_code != 204:
 | 
						|
                if require_etag is True:
 | 
						|
                    self.assertIsNotNone(got_etag, msg='ETag header missing')
 | 
						|
                elif require_etag is False:
 | 
						|
                    self.assertIsNone(
 | 
						|
                        got_etag, msg='ETag header should not be present')
 | 
						|
                elif require_etag is not None:
 | 
						|
                    self.assertEqual(require_etag, got_etag)
 | 
						|
 | 
						|
                if require_lastmod is True:
 | 
						|
                    self.assertIsNotNone(
 | 
						|
                        got_lastmod, msg='Last-Modified header missing')
 | 
						|
                elif require_lastmod is False:
 | 
						|
                    self.assertIsNone(
 | 
						|
                        got_lastmod, msg='Last-Modified header should not be present')
 | 
						|
                elif require_lastmod is not None:
 | 
						|
                    self.assertEqual(require_lastmod, got_lastmod)
 | 
						|
 | 
						|
            # Caching headers
 | 
						|
            cache_control = werkzeug.http.parse_cache_control_header(
 | 
						|
                resp.headers.get('cache-control'),
 | 
						|
                cls=werkzeug.datastructures.ResponseCacheControl,
 | 
						|
            )
 | 
						|
            # The resource must define its domain
 | 
						|
            if False:
 | 
						|
                self.assertTrue(
 | 
						|
                    cache_control.no_cache
 | 
						|
                    or cache_control.public  # pylint: disable=no-member
 | 
						|
                    or cache_control.private,  # pylint: disable=no-member
 | 
						|
                    msg='Missing cache public/private assertion for {0}'.format(
 | 
						|
                        resolved_url)
 | 
						|
                )
 | 
						|
            if require_cacheable is not False and cache_must_revalidate is True:
 | 
						|
                self.assertTrue(
 | 
						|
                    cache_control.must_revalidate)  # pylint: disable=no-member
 | 
						|
            if require_cacheable is True:
 | 
						|
                self.assertFalse(cache_control.no_cache)
 | 
						|
                self.assertFalse(cache_control.no_store)
 | 
						|
#                self.assertLessEqual(0, cache_control.max_age)
 | 
						|
            elif require_cacheable is False:
 | 
						|
                # FIXME not always true
 | 
						|
                self.assertTrue(cache_control.no_cache)
 | 
						|
 | 
						|
            # Actual body content itself
 | 
						|
            got_body = str(resp.content)
 | 
						|
            if resp.status_code == 204:
 | 
						|
                self.assertEqual('', got_body)
 | 
						|
            else:
 | 
						|
                # Ensure decoded body is identical
 | 
						|
                if got_encoding == 'identity':
 | 
						|
                    identity_body = got_body
 | 
						|
                    self.assertIsNotNone(identity_body)
 | 
						|
                    if got_length is not None:
 | 
						|
                        self.assertEqual(len(identity_body), got_length)
 | 
						|
                else:
 | 
						|
                    self.assertEqual(identity_body, got_body)
 | 
						|
 | 
						|
                # XML specific decoding
 | 
						|
                if XML_CONTENT_RE.match(
 | 
						|
                        got_content_type[0]) is not None and validate_response_post is None:
 | 
						|
                    validate_response_post = ValidateXmlResponse(
 | 
						|
                        self._get_xml_parser(use_schema=True))
 | 
						|
 | 
						|
                # After all parametric tests on this response
 | 
						|
                if validate_response_post:
 | 
						|
                    try:
 | 
						|
                        validate_response_post(resp)
 | 
						|
                    except Exception as err:
 | 
						|
                        self.fail('Failed post-validation: {0}'.format(err))
 | 
						|
 | 
						|
            # Check the unauthorized view of same URL
 | 
						|
            for method in ('GET', 'HEAD'):
 | 
						|
                resp = self.httpsession.request(
 | 
						|
                    method,
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'authorization': None,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                if must_authorize:
 | 
						|
                    self.assertEqual(
 | 
						|
                        401,
 | 
						|
                        resp.status_code,
 | 
						|
                        msg='For {0} on {1}: Expected 401 status got {2}'.format(
 | 
						|
                            method,
 | 
						|
                            resolved_url,
 | 
						|
                            resp.status_code))
 | 
						|
                else:
 | 
						|
                    self.assertIn(
 | 
						|
                        resp.status_code,
 | 
						|
                        valid_status,
 | 
						|
                        msg='For {0} on {1}: Expected valid status got {2}'.format(
 | 
						|
                            method,
 | 
						|
                            resolved_url,
 | 
						|
                            resp.status_code))
 | 
						|
 | 
						|
            # Any resource with cache control header
 | 
						|
            resp = self.httpsession.head(
 | 
						|
                resolved_url, params=params,
 | 
						|
                allow_redirects=False,
 | 
						|
                headers=merged(enc_headers, {
 | 
						|
                    'if-match': '*',
 | 
						|
                }),
 | 
						|
            )
 | 
						|
            self.assertIn(resp.status_code, valid_status)
 | 
						|
            # Caching with ETag
 | 
						|
            if got_etag is not None:
 | 
						|
                self.assertIn(resp.status_code, valid_status)
 | 
						|
                # Existing resource
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-match': got_etag,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                self.assertIn(resp.status_code, valid_status)
 | 
						|
                # Client cache response
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-none-match': got_etag,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                self.assertIn(resp.status_code, [
 | 
						|
                              304] if require_cacheable else valid_status)
 | 
						|
                # With adjusted ETag
 | 
						|
                mod_etag = modify_etag(got_etag)
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-none-match': mod_etag,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                self.assertIn(resp.status_code, valid_status)
 | 
						|
 | 
						|
            # Caching with Last-Modified
 | 
						|
            if got_lastmod is not None:
 | 
						|
                # No changes here so normal response
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-unmodified-since': got_lastmod,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
#                self.assertIn(resp.status_code, valid_status)
 | 
						|
 | 
						|
                # An earlier time will give a 412
 | 
						|
                new_time = werkzeug.http.parse_date(
 | 
						|
                    got_lastmod) - datetime.timedelta(seconds=5)
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params, allow_redirects=False, headers=merged(
 | 
						|
                        enc_headers, {
 | 
						|
                            'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
 | 
						|
                self.assertIn(resp.status_code, [
 | 
						|
                              412] if require_cacheable else valid_status)
 | 
						|
 | 
						|
                # An later time is normal response
 | 
						|
                new_time = werkzeug.http.parse_date(
 | 
						|
                    got_lastmod) + datetime.timedelta(seconds=5)
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params, allow_redirects=False, headers=merged(
 | 
						|
                        enc_headers, {
 | 
						|
                            'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
 | 
						|
                self.assertIn(resp.status_code, valid_status)
 | 
						|
 | 
						|
                # Client cache response
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-modified-since': got_lastmod,
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
#                self.assertIn(resp.status_code, [304] if require_cacheable else valid_status)
 | 
						|
 | 
						|
                # A later time should also give a 304 response
 | 
						|
                new_time = werkzeug.http.parse_date(
 | 
						|
                    got_lastmod) + datetime.timedelta(seconds=5)
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-modified-since': werkzeug.http.http_date(new_time),
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                self.assertIn(resp.status_code, [
 | 
						|
                              304] if require_cacheable else valid_status)
 | 
						|
 | 
						|
                # An earlier time will give a 200 response
 | 
						|
                new_time = werkzeug.http.parse_date(
 | 
						|
                    got_lastmod) - datetime.timedelta(seconds=5)
 | 
						|
                resp = self.httpsession.head(
 | 
						|
                    resolved_url, params=params,
 | 
						|
                    allow_redirects=False,
 | 
						|
                    headers=merged(enc_headers, {
 | 
						|
                        'if-modified-since': werkzeug.http.http_date(new_time),
 | 
						|
                    }),
 | 
						|
                )
 | 
						|
                self.assertIn(resp.status_code, valid_status)
 | 
						|
 | 
						|
    def _test_modify_request(
 | 
						|
            self, url, up_data, method='POST', params=None, **kwargs):
 | 
						|
        ''' Common assertions for static resources.
 | 
						|
 | 
						|
        :param url: The URL to pass to :py:mod:`requests`
 | 
						|
        :type url: str
 | 
						|
        :param up_data: The request body data.
 | 
						|
        :type up_data: str or file-like
 | 
						|
        :param str method: The method to request the modification.
 | 
						|
        :param params: URL parameter dictionary to pass to :py:mod:`requests`.
 | 
						|
        :type params: dict or None
 | 
						|
        :param base_headers: A dictionary of headers to send with every request.
 | 
						|
        :type base_headers: dict or None
 | 
						|
        :param must_authorize: Access to the resource without an Authorization
 | 
						|
            header is attempted and compared against this value.
 | 
						|
        :type must_authorize: bool
 | 
						|
        :param valid_status: A set of valid status codes to allow.
 | 
						|
            If not provided, only code 201 is valid for new resources and 204 for existing ones.
 | 
						|
        :type valid_status: set or None
 | 
						|
        :param empty_is_valid: Whether or not an empty document is a valid modification.
 | 
						|
            The default is False.
 | 
						|
        :type empty_is_valid: bool
 | 
						|
        :param is_idempotent: Whether or not sending the same request should
 | 
						|
            not change the resource (except for the modify time).
 | 
						|
            The default is true if the method is PUT.
 | 
						|
        :type is_idempotent: bool
 | 
						|
        :param require_etag: If not None, whether the ETag is required present or not present.
 | 
						|
        :type require_etag: bool or None
 | 
						|
        '''
 | 
						|
        method = method.lower()
 | 
						|
        # Arguments not passed to _assert_modify_response()
 | 
						|
        base_headers = kwargs.pop('base_headers', None)
 | 
						|
        if base_headers is None:
 | 
						|
            base_headers = {}
 | 
						|
        must_authorize = kwargs.pop('must_authorize', True)
 | 
						|
        empty_is_valid = kwargs.pop('empty_is_valid', False)
 | 
						|
        is_idempotent = kwargs.pop('is_idempotent', method == 'put')
 | 
						|
 | 
						|
        resolved_url = self._resolve_url(url)
 | 
						|
 | 
						|
        if hasattr(up_data, 'seek'):
 | 
						|
 | 
						|
            def reset_up_data():
 | 
						|
                up_data.seek(0)
 | 
						|
                return up_data
 | 
						|
 | 
						|
        else:
 | 
						|
 | 
						|
            def reset_up_data():
 | 
						|
                return up_data
 | 
						|
 | 
						|
        # Options on the resource itself
 | 
						|
        resp = self.httpsession.options(
 | 
						|
            resolved_url, params=params,
 | 
						|
            allow_redirects=False,
 | 
						|
            headers=base_headers,
 | 
						|
        )
 | 
						|
        self.assertEqual(200, resp.status_code)
 | 
						|
        got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
 | 
						|
        self.assertIn('options', got_allow)
 | 
						|
        self.assertIn(method, got_allow)
 | 
						|
        # Options without authentication
 | 
						|
        resp = self.httpsession.options(
 | 
						|
            resolved_url, params=params,
 | 
						|
            allow_redirects=False,
 | 
						|
            headers=merged(base_headers, {
 | 
						|
                'authorization': None,
 | 
						|
            }),
 | 
						|
        )
 | 
						|
        if must_authorize:
 | 
						|
            self.assertEqual(401, resp.status_code)
 | 
						|
        else:
 | 
						|
            self.assertEqual(200, resp.status_code)
 | 
						|
 | 
						|
        # Initial state for conditions
 | 
						|
        resp_head = self.httpsession.head(
 | 
						|
            resolved_url, params=params,
 | 
						|
            headers={'accept-encoding': 'identity'},
 | 
						|
        )
 | 
						|
        init_status = resp_head.status_code
 | 
						|
        self.assertIn(init_status, {200, 404})
 | 
						|
        init_etag = resp_head.headers.get('etag')
 | 
						|
 | 
						|
        if init_status == 200:
 | 
						|
            # Replacing resource
 | 
						|
            if kwargs.get('valid_status') is None:
 | 
						|
                kwargs['valid_status'] = [204]
 | 
						|
            match_etag = init_etag if init_etag else '*'
 | 
						|
            add_headers_fail = {'if-none-match': match_etag}
 | 
						|
            add_headers_good = {'if-match': match_etag}
 | 
						|
 | 
						|
        elif init_status == 404:
 | 
						|
            # New resource
 | 
						|
            if kwargs.get('valid_status') is None:
 | 
						|
                kwargs['valid_status'] = [201]
 | 
						|
            add_headers_fail = {'if-match': '*'}
 | 
						|
            add_headers_good = {'if-none-match': '*'}
 | 
						|
 | 
						|
        if not empty_is_valid:
 | 
						|
            # Invalid header content
 | 
						|
            resp = self.httpsession.request(
 | 
						|
                method, resolved_url, params=params,
 | 
						|
            )
 | 
						|
            self.assertEqual(415, resp.status_code)
 | 
						|
            # Invalid (empty) body content
 | 
						|
            resp = self.httpsession.request(
 | 
						|
                method, resolved_url, params=params,
 | 
						|
                headers=base_headers,
 | 
						|
            )
 | 
						|
            self.assertEqual(415, resp.status_code)
 | 
						|
 | 
						|
        # Check precondition failure
 | 
						|
        resp = self.httpsession.request(
 | 
						|
            method, resolved_url, params=params,
 | 
						|
            headers=merged(base_headers, add_headers_fail),
 | 
						|
            data=reset_up_data(),
 | 
						|
        )
 | 
						|
        self.assertEqual(412, resp.status_code)
 | 
						|
 | 
						|
        if must_authorize:
 | 
						|
            # Unauthorized access with otherwise valid request
 | 
						|
            resp = self.httpsession.request(
 | 
						|
                method, resolved_url, params=params,
 | 
						|
                headers=merged(base_headers, {
 | 
						|
                    'authorization': None,
 | 
						|
                }),
 | 
						|
                data=reset_up_data(),
 | 
						|
            )
 | 
						|
            self.assertEqual(401, resp.status_code)
 | 
						|
 | 
						|
        # Actual modifying request
 | 
						|
        resp_mod = self.httpsession.request(
 | 
						|
            method, resolved_url, params=params,
 | 
						|
            headers=merged(base_headers, add_headers_good),
 | 
						|
            data=reset_up_data(),
 | 
						|
        )
 | 
						|
        self._assert_modify_response(resp_mod, **kwargs)
 | 
						|
        got_modtime = resp_mod.headers.get('last-modified')
 | 
						|
        got_etag = resp_mod.headers.get('etag')
 | 
						|
 | 
						|
        # Verify the same info is present in new HEAD reply
 | 
						|
        resp_head = self.httpsession.head(
 | 
						|
            resolved_url, params=params,
 | 
						|
            headers={'accept-encoding': 'identity'},
 | 
						|
        )
 | 
						|
        self.assertEqual(200, resp_head.status_code)
 | 
						|
        self.assertEqual(got_modtime, resp_head.headers.get('last-modified'))
 | 
						|
        self.assertEqual(got_etag, resp_head.headers.get('etag'))
 | 
						|
 | 
						|
        if is_idempotent:
 | 
						|
            # Check a duplicate request
 | 
						|
            add_headers_good = {'if-match': got_etag}
 | 
						|
            kwargs['valid_status'] = [204]
 | 
						|
            resp_mod = self.httpsession.request(
 | 
						|
                method, resolved_url, params=params,
 | 
						|
                headers=merged(base_headers, add_headers_good),
 | 
						|
                data=reset_up_data(),
 | 
						|
            )
 | 
						|
            self._assert_modify_response(resp_mod, **kwargs)
 | 
						|
            self.assertEqual(got_etag, resp_mod.headers.get('etag'))
 | 
						|
 | 
						|
        # Give back the final valid response
 | 
						|
        return resp_mod
 | 
						|
 | 
						|
    def _assert_modify_response(self, resp, valid_status=None,
 | 
						|
                                require_etag=True, require_lastmod=True,
 | 
						|
                                old_etag=None):
 | 
						|
        ''' Verify the contents of a response to HTTP modification with no body.
 | 
						|
 | 
						|
        :param resp: The response object to check.
 | 
						|
        :type resp: :py:cls:`requests.Response`
 | 
						|
        :param valid_status: A set of valid status codes to allow.
 | 
						|
            If not provided, only codes (200, 201, 204) are valid.
 | 
						|
        :type valid_status: set or None
 | 
						|
        :param require_etag: If not None, whether the ETag is required present
 | 
						|
            or not present (True or False) or a specific string value.
 | 
						|
        :type require_etag: str or bool or None
 | 
						|
        :param require_lastmod: If not None, whether the Last-Modified is
 | 
						|
            required present or not present (True or False) or a specific value.
 | 
						|
        :type require_lastmod: str or bool or None
 | 
						|
        :param old_etag: An optional old ETag value to compare against.
 | 
						|
            The new response must have a different ETag value than this.
 | 
						|
        :type old_etag: str or None
 | 
						|
        '''
 | 
						|
        if valid_status is None:
 | 
						|
            valid_status = [200, 201, 204]
 | 
						|
        valid_status = set(valid_status)
 | 
						|
 | 
						|
        self.assertIn(resp.status_code, valid_status)
 | 
						|
        got_lastmod = resp.headers.get('last-modified')
 | 
						|
        got_etag = resp.headers.get('etag')
 | 
						|
 | 
						|
        if require_etag is True:
 | 
						|
            self.assertIsNotNone(got_etag, msg='ETag header missing')
 | 
						|
        elif require_etag is False:
 | 
						|
            self.assertIsNone(
 | 
						|
                got_etag, msg='ETag header should not be present')
 | 
						|
        elif require_etag is not None:
 | 
						|
            self.assertEqual(require_etag, got_etag)
 | 
						|
 | 
						|
        if require_lastmod is True:
 | 
						|
            self.assertIsNotNone(
 | 
						|
                got_lastmod, msg='Last-Modified header missing')
 | 
						|
        elif require_lastmod is False:
 | 
						|
            self.assertIsNone(
 | 
						|
                got_lastmod, msg='Last-Modified header should not be present')
 | 
						|
        elif require_lastmod is not None:
 | 
						|
            self.assertEqual(require_lastmod, got_lastmod)
 | 
						|
 | 
						|
        if old_etag is not None:
 | 
						|
            self.assertNotEqual(old_etag, got_etag)
 | 
						|
 | 
						|
        # Empty body
 | 
						|
        self.assertFalse(bool(resp.content))
 | 
						|
 | 
						|
 | 
						|
class UserLoginBaseTestCase(BaseTestCase):
 | 
						|
    """Wraps tests in login/logout flow
 | 
						|
 | 
						|
    Encapsulates login/logout wrapping of tests.
 | 
						|
    Tests that require authentication will need to use the saved login_token and cookies as headers in their requests
 | 
						|
    """
 | 
						|
    #: User name to test as this assumes a user HTTPCHECKOUT_ACCTNAME already exists in the User DB
 | 
						|
    VALID_ACCTNAME = os.environ.get(
 | 
						|
        'HTTPCHECKOUT_ACCTNAME', 'admin').decode('utf8')
 | 
						|
    VALID_PASSPHRASE = os.environ.get(
 | 
						|
        'HTTPCHECKOUT_PASSPHRASE', 'admin').decode('utf8')
 | 
						|
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        BaseTestCase.__init__(self, *args, **kwargs)
 | 
						|
        self.login_token = None
 | 
						|
        self.cookies = None
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        BaseTestCase.setUp(self)
 | 
						|
        resp = self.httpsession.post(
 | 
						|
            self._resolve_url('auth/login'),
 | 
						|
            json={
 | 
						|
                'email': self.VALID_ACCTNAME,
 | 
						|
                'password': self.VALID_PASSPHRASE})
 | 
						|
        LOGGER.debug('code: %s, login headers: %s',
 | 
						|
                     resp.status_code, resp.content)
 | 
						|
        self.cookies = resp.cookies
 | 
						|
        if resp.status_code == 404:
 | 
						|
            self.fail(msg="{} not found on this server.".format(
 | 
						|
                self._resolve_url('auth/login')))
 | 
						|
        try:
 | 
						|
            self.login_token = resp.json()["token"]
 | 
						|
        except ValueError:
 | 
						|
            raise SkipTest("Could not login as {}".format(self.VALID_ACCTNAME))
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        resp = self.httpsession.post(self._resolve_url(
 | 
						|
            'auth/logout'), params={'Authorization': self.login_token})
 | 
						|
        LOGGER.debug('response code: %d\nbody: %s',
 | 
						|
                     resp.status_code, resp.content)
 | 
						|
        self.login_token = None
 | 
						|
        self.cookies = None
 | 
						|
        BaseTestCase.tearDown(self)
 |