mirror of
https://github.com/Telecominfraproject/openafc_final.git
synced 2025-11-26 19:14:55 +00:00
Migrate final Open AFC Repository
This commit is contained in:
967
http-checkout/rathttpcheckout/base.py
Normal file
967
http-checkout/rathttpcheckout/base.py
Normal file
@@ -0,0 +1,967 @@
|
||||
''' Non-test support objects and classes used by the actual test cases.
|
||||
'''
|
||||
|
||||
import datetime
|
||||
from io import BytesIO
|
||||
import logging
|
||||
import lxml.etree as etree
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
from urlparse import urljoin
|
||||
import werkzeug.datastructures
|
||||
import werkzeug.http
|
||||
import werkzeug.urls
|
||||
from nose import SkipTest
|
||||
|
||||
#: Logger for this module
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
#: Regex to match application/xml and applicaiton/sub+xml
|
||||
XML_CONTENT_RE = re.compile(r'^application/(.+\+)?xml$')
|
||||
|
||||
#: Time format for ISO 8601 "basic" time used by XML Schema.
|
||||
#: This string is usable by datetime.strftime and datetime.strptime
|
||||
TIME_FORMAT_BASIC = '%Y%m%dT%H%M%SZ'
|
||||
#: Time format for ISO 8601 "extended" time used by XML Schema.
|
||||
#: This string is usable by datetime.strftime and datetime.strptime
|
||||
TIME_FORMAT_EXTENDED = '%Y-%m-%dT%H:%M:%SZ'
|
||||
|
||||
#: Absolute path to this package directory
|
||||
PACKAGE_PATH = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
def get_xml_parser(schema):
|
||||
''' Generate a function to extract an XML DOM tree from an encoded document.
|
||||
|
||||
:param schema: Iff not None, the document will be validated against
|
||||
this schema object.
|
||||
:type use_schema: bool
|
||||
:return: The parser function which takes a file-like parameter and
|
||||
returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
|
||||
'''
|
||||
xmlparser = etree.XMLParser(schema=schema)
|
||||
|
||||
def func(infile):
|
||||
try:
|
||||
return etree.parse(infile, parser=xmlparser)
|
||||
except etree.XMLSyntaxError as err:
|
||||
infile.seek(0)
|
||||
with tempfile.NamedTemporaryFile(delete=False) as outfile:
|
||||
shutil.copyfileobj(infile, outfile)
|
||||
raise ValueError(
|
||||
'Failed to parse XML with error {0} in file {1}'.format(
|
||||
err, outfile.name))
|
||||
|
||||
return func
|
||||
|
||||
|
||||
def extract_metadict(doc):
|
||||
''' Extract a server metadata dictionary from its parsed XML document.
|
||||
|
||||
:param doc: The document to read from.
|
||||
:return: The metadata URL map.
|
||||
'''
|
||||
metadict = {}
|
||||
for el_a in doc.findall('//{http://www.w3.org/1999/xhtml}a'):
|
||||
m_id = el_a.attrib.get('id')
|
||||
m_href = el_a.attrib.get('href')
|
||||
if m_id is None or m_href is None:
|
||||
continue
|
||||
metadict[m_id] = m_href
|
||||
return metadict
|
||||
|
||||
|
||||
def merged(base, delta):
|
||||
''' Return a merged dictionary contents.
|
||||
|
||||
:param base: The initial contents to merge.
|
||||
:param delta: The modifications to apply.
|
||||
:return: A dictionary containing the :py:obj:`base` updated
|
||||
by the :py:obj:`delta`.
|
||||
'''
|
||||
mod = dict(base)
|
||||
mod.update(delta)
|
||||
return mod
|
||||
|
||||
|
||||
def limit_count(iterable, limit):
|
||||
''' Wrap an iterable/generator with a count limit to only yield the first
|
||||
:py:obj:`count` number of items.
|
||||
|
||||
:param iterable: The source iterable object.
|
||||
:param limit: The maximum number of items available from the generator.
|
||||
:return A generator with a count limit.
|
||||
'''
|
||||
count = 0
|
||||
for item in iterable:
|
||||
yield item
|
||||
count += 1
|
||||
if count >= limit:
|
||||
return
|
||||
|
||||
|
||||
def modify_etag(orig):
|
||||
''' Given a base ETag value, generate a modified ETag which is
|
||||
guaranteed to not match the original.
|
||||
|
||||
:param str orig: The original ETag.
|
||||
:return: A different ETag
|
||||
'''
|
||||
# Inject characters near the end
|
||||
mod = orig[:-1] + '-eh' + orig[-1:]
|
||||
return mod
|
||||
|
||||
|
||||
class ValidateJsonResponse(object):
|
||||
''' Validate an expected JSON file response.
|
||||
'''
|
||||
|
||||
def __call__(self, resp):
|
||||
import json
|
||||
json.loads(resp.content)
|
||||
|
||||
|
||||
class ValidateHtmlResponse(object):
|
||||
''' Validate an HTML response with a loose parser.
|
||||
'''
|
||||
|
||||
def __call__(self, resp):
|
||||
import bs4
|
||||
kwargs = dict(
|
||||
markup=resp.content,
|
||||
features='lxml',
|
||||
)
|
||||
bs4.BeautifulSoup(**kwargs)
|
||||
|
||||
|
||||
class ValidateXmlResponse(object):
|
||||
''' Validate an expected XML file response.
|
||||
|
||||
:param parser: A function to take a file-like input and output an
|
||||
XML element tree :py:cls:`lxml.etree.ElementTree`.
|
||||
:param require_root: If not None, the root element must be this value.
|
||||
'''
|
||||
|
||||
def __init__(self, parser, require_root=None):
|
||||
self._parser = parser
|
||||
if not callable(self._parser):
|
||||
raise ValueError('ValidateXmlResponse parser invalid')
|
||||
self._require_root = require_root
|
||||
|
||||
def __call__(self, resp):
|
||||
xml_tree = self._parser(BytesIO(resp.content))
|
||||
if self._require_root is not None:
|
||||
root_tag = xml_tree.getroot().tag
|
||||
if self._require_root != root_tag:
|
||||
raise ValueError(
|
||||
'Required root element "{0}" not present, got "{1}"'.format(
|
||||
self._require_root, root_tag))
|
||||
|
||||
|
||||
class BaseTestCase(unittest.TestCase):
|
||||
''' Common access and helper functions which use the :py:mod:`unittest`
|
||||
framework but this class defines no test functions itself.
|
||||
|
||||
:ivar httpsession: An :py:class:`requests.Session` instance for test use.
|
||||
:ivar xmlparser: An :py:class:`etree.XMLParser` instance for test use.
|
||||
'''
|
||||
|
||||
#: Cached URL to start at and to resolve from
|
||||
BASE_URL = os.environ.get('HTTPCHECKOUT_BASEURL')
|
||||
#: True if the editing tests should be skipped
|
||||
READONLY = bool(os.environ.get('HTTPCHECKOUT_READONLY'))
|
||||
#: Keep any created resources in the tearDown() method which are left behind
|
||||
KEEP_TESTITEMS = bool(os.environ.get('HTTPCHECKOUT_KEEP_TESTITEMS'))
|
||||
|
||||
def setUp(self):
|
||||
unittest.TestCase.setUp(self)
|
||||
self.maxDiff = 10e3
|
||||
|
||||
self.assertIsNotNone(
|
||||
self.BASE_URL, 'Missing environment HTTPCHECKOUT_BASEURL')
|
||||
self.httpsession = requests.Session()
|
||||
|
||||
ca_roots = os.environ.get('HTTPCHECKOUT_CACERTS')
|
||||
LOGGER.info('HTTPCHECKOUT_CACERTS is "%s"', ca_roots)
|
||||
if ca_roots == '':
|
||||
import warnings
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
self.httpsession.verify = False
|
||||
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
|
||||
|
||||
def tearDown(self):
|
||||
self.httpsession = None
|
||||
unittest.TestCase.tearDown(self)
|
||||
|
||||
def _assertWriteTest(self):
|
||||
''' Skip the current test if HTTPCHECKOUT_READONLY is set.
|
||||
'''
|
||||
if self.READONLY:
|
||||
self.skipTest(
|
||||
'Not running editing tests because of HTTPCHECKOUT_READONLY')
|
||||
|
||||
def _resolve_url(self, url):
|
||||
''' Resolve a URL relative to the original base URL.
|
||||
|
||||
:param url: The URL to resolve.
|
||||
:type url: str
|
||||
:return: The resolved absolute URL to request on.
|
||||
:rtype: str
|
||||
'''
|
||||
return urljoin(self.BASE_URL, url)
|
||||
|
||||
def assertSameUrlPath(self, first, second):
|
||||
''' Assert that two URLs are equal except for their query/fragment parts.
|
||||
'''
|
||||
f_url = werkzeug.urls.url_parse(first)
|
||||
s_url = werkzeug.urls.url_parse(second)
|
||||
|
||||
for attr in ('scheme', 'netloc', 'path'):
|
||||
self.assertEqual(
|
||||
getattr(f_url, attr),
|
||||
getattr(s_url, attr),
|
||||
'Mismatched URL {0}'.format(attr)
|
||||
)
|
||||
|
||||
def _get_xml_parser(self, use_schema=None):
|
||||
''' Generate a function to extract an XML DOM tree from an encoded document.
|
||||
|
||||
:return: The parser function which takes a file-like parameter and
|
||||
returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
|
||||
'''
|
||||
return get_xml_parser(schema=use_schema)
|
||||
|
||||
def _get_xml_encoder(self):
|
||||
''' Generate a function to encode a document from an XML DOM tree.
|
||||
|
||||
:return: The parser function which takes a parameter of a
|
||||
tree object of type :py:cls:`lxml.etree.ElementTree` and
|
||||
returns a file-like object.
|
||||
'''
|
||||
|
||||
def func(doc, outfile=None):
|
||||
''' Encode a document.
|
||||
|
||||
:parm doc: The document to encode.
|
||||
:type doc: :py:cls:`lxml.etree.ElementTree`
|
||||
:param outfile: An optional file-like object to encode into.
|
||||
This must be None if the encoder is used multiple times.
|
||||
:type outfile: file-like or None
|
||||
:return: The encoded file-like object.
|
||||
'''
|
||||
if outfile is None:
|
||||
outfile = BytesIO()
|
||||
|
||||
doc.write(outfile, encoding='UTF-8', xml_declaration=True)
|
||||
if hasattr(outfile, 'seek'):
|
||||
try:
|
||||
outfile.seek(0)
|
||||
except BaseException:
|
||||
pass
|
||||
return outfile
|
||||
|
||||
return func
|
||||
|
||||
def _test_working_links(self, text):
|
||||
''' Verify that html has well formed links
|
||||
|
||||
:param text: html doc with href's
|
||||
'''
|
||||
|
||||
import bs4
|
||||
html = bs4.BeautifulSoup(text, 'html.parser')
|
||||
for link in [a['href'] for a in html.find_all('a')]:
|
||||
self._test_working_link(link)
|
||||
|
||||
def _test_working_link(self, url):
|
||||
''' Verify that a url returns a 200 response
|
||||
|
||||
:param url: The URL to be checked
|
||||
'''
|
||||
|
||||
resolved_url = self._resolve_url(url)
|
||||
resp = self.httpsession.get(resolved_url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def _test_options_allow(self, url, methods):
|
||||
''' Verify that the OPTIONS response for a URL matches a specific set.
|
||||
|
||||
:param url: The URL to pass to :py:mod:`requests`
|
||||
:type url: str
|
||||
:param methods: The method names which must be identical to the response.
|
||||
:type methods: iterable
|
||||
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
|
||||
:type params: dict or None
|
||||
'''
|
||||
methods = set([str(m).upper() for m in methods])
|
||||
methods.add('OPTIONS')
|
||||
if 'GET' in methods:
|
||||
methods.add('HEAD')
|
||||
|
||||
resolved_url = self._resolve_url(url)
|
||||
resp = self.httpsession.options(resolved_url)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
|
||||
self.assertEqual(methods, set(got_allow))
|
||||
|
||||
def _test_path_contents(
|
||||
self,
|
||||
url,
|
||||
params=None,
|
||||
base_headers=None,
|
||||
validate_response_pre=None,
|
||||
validate_response_post=None,
|
||||
must_authorize=True,
|
||||
valid_status=None,
|
||||
content_type=None,
|
||||
valid_encodings=None,
|
||||
require_length=True,
|
||||
require_vary=None,
|
||||
require_etag=True,
|
||||
require_lastmod=True,
|
||||
require_cacheable=True,
|
||||
cache_must_revalidate=False):
|
||||
''' Common assertions for static resources.
|
||||
|
||||
:param url: The URL to pass to :py:mod:`requests`
|
||||
:type url: str
|
||||
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
|
||||
:type params: dict or None
|
||||
:param base_headers: A dictionary of headers to send with every request.
|
||||
:type base_headers: dict or None
|
||||
:param validate_response_pre: A callable which takes a single argument of
|
||||
the response object and performs its own validation of the headers
|
||||
and/or body for each non-cached response.
|
||||
This is performed before any of the parametric checks.
|
||||
:type validate_response_pre: callable or None
|
||||
:param validate_response_post: A callable which takes a single argument of
|
||||
the response object and performs its own validation of the headers
|
||||
and/or body for each non-cached response.
|
||||
This is performed after any of the parametric checks.
|
||||
:type validate_response_post: callable or None
|
||||
:param must_authorize: Access to the resource without an Authorization
|
||||
header is attempted and compared against this value.
|
||||
:type must_authorize: bool
|
||||
:param valid_status: A set of valid status codes to allow.
|
||||
If not provided, only code 200 is valid.
|
||||
:type valid_status: set or None
|
||||
:param content_type: If not None, the required Content-Type header.
|
||||
:type content_type: bool or None
|
||||
:param valid_encodings: If not None, a list of content encodings to check for.
|
||||
The resource must provide each of the non-identity encodings listed.
|
||||
:type valid_encodings: list or None
|
||||
:param require_length: If either true or false, assert that the
|
||||
content-length header is present or not.
|
||||
:type require_length: bool or None
|
||||
:param require_vary: A set of Vary reults required to be present
|
||||
in the response.
|
||||
If the :py:obj:`valid_encodings` list has more than the identity
|
||||
encoding present, then 'accept-encoding' will be automatically
|
||||
added to this vary list.
|
||||
:type require_vary: list or None
|
||||
:param require_etag: If not None, whether the ETag is required present
|
||||
or not present (True or False) or a specific string value.
|
||||
:type require_etag: str or bool or None
|
||||
:param require_lastmod: If not None, whether the Last-Modified is
|
||||
required present or not present (True or False) or a specific value.
|
||||
:type require_lastmod: str or bool or None
|
||||
:param require_cacheable: If true, the resource is checked for its cacheability.
|
||||
Not all resources should be cacheable (even if not explicitly marked no-cache).
|
||||
:type require_cacheable: bool
|
||||
:param cache_must_revalidate: If True, the response must have its
|
||||
'must-revalidate' cache control header set.
|
||||
:type cache_must_revalidate: bool
|
||||
:raises: raises unittest exceptions if an assertion fails
|
||||
'''
|
||||
|
||||
if base_headers is None:
|
||||
base_headers = {}
|
||||
|
||||
if valid_status is None:
|
||||
valid_status = [200]
|
||||
valid_status = set(valid_status)
|
||||
|
||||
# Set of valid encodings to require
|
||||
if valid_encodings is None:
|
||||
valid_encodings = []
|
||||
valid_encodings = set(valid_encodings)
|
||||
valid_encodings.add('identity')
|
||||
# Force as ordered list with identity encoding first and gzip always
|
||||
# attempted
|
||||
try_encodings = set(valid_encodings)
|
||||
try_encodings.discard('identity')
|
||||
try_encodings = sorted(list(try_encodings))
|
||||
try_encodings.insert(0, 'identity')
|
||||
# Cached identity-encoded contents
|
||||
identity_body = None
|
||||
|
||||
if require_vary is None:
|
||||
require_vary = []
|
||||
require_vary = set(require_vary)
|
||||
if len(valid_encodings) > 1:
|
||||
require_vary.add('accept-encoding')
|
||||
|
||||
resolved_url = self._resolve_url(url)
|
||||
|
||||
# Options on the resource itself
|
||||
resp = self.httpsession.options(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=base_headers,
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
got_allow = werkzeug.http.parse_set_header(resp.headers.get('allow'))
|
||||
self.assertIn('options', got_allow)
|
||||
if 404 not in valid_status:
|
||||
self.assertIn('head', got_allow)
|
||||
self.assertIn('get', got_allow)
|
||||
|
||||
# Options without authentication
|
||||
resp = self.httpsession.options(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(base_headers, {
|
||||
'authorization': None,
|
||||
}),
|
||||
)
|
||||
if must_authorize:
|
||||
self.assertEqual(401, resp.status_code)
|
||||
else:
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
for try_encoding in try_encodings:
|
||||
# initial non-cache response
|
||||
enc_headers = merged(
|
||||
base_headers,
|
||||
{
|
||||
'accept-encoding': try_encoding,
|
||||
}
|
||||
)
|
||||
resp = self.httpsession.get(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=enc_headers,
|
||||
stream=True,
|
||||
)
|
||||
# External validation first
|
||||
if validate_response_pre:
|
||||
try:
|
||||
validate_response_pre(resp)
|
||||
except Exception as err:
|
||||
self.fail('Failed pre-validation: {0}'.format(err))
|
||||
|
||||
# Now parametric validation
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
|
||||
got_content_type = werkzeug.http.parse_options_header(
|
||||
resp.headers['content-type'])
|
||||
if content_type is not None:
|
||||
self.assertEqual(content_type.lower(),
|
||||
got_content_type[0].lower())
|
||||
|
||||
# Encoding comparison compared to valid
|
||||
got_encoding = resp.headers.get('content-encoding', 'identity')
|
||||
if try_encoding in valid_encodings:
|
||||
self.assertEqual(try_encoding, got_encoding)
|
||||
else:
|
||||
self.assertEqual(
|
||||
'identity', got_encoding,
|
||||
msg='"{0}" was supposed to be a disallowed content-encoding but it was accepted'.format(
|
||||
try_encoding)
|
||||
)
|
||||
|
||||
got_length = resp.headers.get('content-length')
|
||||
if require_length is True:
|
||||
self.assertIsNotNone(got_length, msg='Content-Length missing')
|
||||
elif require_length is False:
|
||||
self.assertIsNone(
|
||||
got_length, msg='Content-Length should not be present')
|
||||
|
||||
# Guarantee type is correct also
|
||||
if got_length is not None:
|
||||
try:
|
||||
got_length = int(got_length)
|
||||
except ValueError:
|
||||
self.fail(
|
||||
'Got a non-integer Content-Length: {0}'.format(got_length))
|
||||
|
||||
got_vary = werkzeug.http.parse_set_header(resp.headers.get('vary'))
|
||||
for item in require_vary:
|
||||
LOGGER.debug("headers: %s", resp.headers)
|
||||
self.assertIn(
|
||||
item,
|
||||
got_vary,
|
||||
msg='Vary header missing item "{0}" got {1}'.format(
|
||||
item,
|
||||
got_vary))
|
||||
|
||||
got_etag = resp.headers.get('etag')
|
||||
got_lastmod = resp.headers.get('last-modified')
|
||||
if resp.status_code != 204:
|
||||
if require_etag is True:
|
||||
self.assertIsNotNone(got_etag, msg='ETag header missing')
|
||||
elif require_etag is False:
|
||||
self.assertIsNone(
|
||||
got_etag, msg='ETag header should not be present')
|
||||
elif require_etag is not None:
|
||||
self.assertEqual(require_etag, got_etag)
|
||||
|
||||
if require_lastmod is True:
|
||||
self.assertIsNotNone(
|
||||
got_lastmod, msg='Last-Modified header missing')
|
||||
elif require_lastmod is False:
|
||||
self.assertIsNone(
|
||||
got_lastmod, msg='Last-Modified header should not be present')
|
||||
elif require_lastmod is not None:
|
||||
self.assertEqual(require_lastmod, got_lastmod)
|
||||
|
||||
# Caching headers
|
||||
cache_control = werkzeug.http.parse_cache_control_header(
|
||||
resp.headers.get('cache-control'),
|
||||
cls=werkzeug.datastructures.ResponseCacheControl,
|
||||
)
|
||||
# The resource must define its domain
|
||||
if False:
|
||||
self.assertTrue(
|
||||
cache_control.no_cache
|
||||
or cache_control.public # pylint: disable=no-member
|
||||
or cache_control.private, # pylint: disable=no-member
|
||||
msg='Missing cache public/private assertion for {0}'.format(
|
||||
resolved_url)
|
||||
)
|
||||
if require_cacheable is not False and cache_must_revalidate is True:
|
||||
self.assertTrue(
|
||||
cache_control.must_revalidate) # pylint: disable=no-member
|
||||
if require_cacheable is True:
|
||||
self.assertFalse(cache_control.no_cache)
|
||||
self.assertFalse(cache_control.no_store)
|
||||
# self.assertLessEqual(0, cache_control.max_age)
|
||||
elif require_cacheable is False:
|
||||
# FIXME not always true
|
||||
self.assertTrue(cache_control.no_cache)
|
||||
|
||||
# Actual body content itself
|
||||
got_body = str(resp.content)
|
||||
if resp.status_code == 204:
|
||||
self.assertEqual('', got_body)
|
||||
else:
|
||||
# Ensure decoded body is identical
|
||||
if got_encoding == 'identity':
|
||||
identity_body = got_body
|
||||
self.assertIsNotNone(identity_body)
|
||||
if got_length is not None:
|
||||
self.assertEqual(len(identity_body), got_length)
|
||||
else:
|
||||
self.assertEqual(identity_body, got_body)
|
||||
|
||||
# XML specific decoding
|
||||
if XML_CONTENT_RE.match(
|
||||
got_content_type[0]) is not None and validate_response_post is None:
|
||||
validate_response_post = ValidateXmlResponse(
|
||||
self._get_xml_parser(use_schema=True))
|
||||
|
||||
# After all parametric tests on this response
|
||||
if validate_response_post:
|
||||
try:
|
||||
validate_response_post(resp)
|
||||
except Exception as err:
|
||||
self.fail('Failed post-validation: {0}'.format(err))
|
||||
|
||||
# Check the unauthorized view of same URL
|
||||
for method in ('GET', 'HEAD'):
|
||||
resp = self.httpsession.request(
|
||||
method,
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'authorization': None,
|
||||
}),
|
||||
)
|
||||
if must_authorize:
|
||||
self.assertEqual(
|
||||
401,
|
||||
resp.status_code,
|
||||
msg='For {0} on {1}: Expected 401 status got {2}'.format(
|
||||
method,
|
||||
resolved_url,
|
||||
resp.status_code))
|
||||
else:
|
||||
self.assertIn(
|
||||
resp.status_code,
|
||||
valid_status,
|
||||
msg='For {0} on {1}: Expected valid status got {2}'.format(
|
||||
method,
|
||||
resolved_url,
|
||||
resp.status_code))
|
||||
|
||||
# Any resource with cache control header
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-match': '*',
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
# Caching with ETag
|
||||
if got_etag is not None:
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
# Existing resource
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-match': got_etag,
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
# Client cache response
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-none-match': got_etag,
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, [
|
||||
304] if require_cacheable else valid_status)
|
||||
# With adjusted ETag
|
||||
mod_etag = modify_etag(got_etag)
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-none-match': mod_etag,
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
|
||||
# Caching with Last-Modified
|
||||
if got_lastmod is not None:
|
||||
# No changes here so normal response
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-unmodified-since': got_lastmod,
|
||||
}),
|
||||
)
|
||||
# self.assertIn(resp.status_code, valid_status)
|
||||
|
||||
# An earlier time will give a 412
|
||||
new_time = werkzeug.http.parse_date(
|
||||
got_lastmod) - datetime.timedelta(seconds=5)
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params, allow_redirects=False, headers=merged(
|
||||
enc_headers, {
|
||||
'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
|
||||
self.assertIn(resp.status_code, [
|
||||
412] if require_cacheable else valid_status)
|
||||
|
||||
# An later time is normal response
|
||||
new_time = werkzeug.http.parse_date(
|
||||
got_lastmod) + datetime.timedelta(seconds=5)
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params, allow_redirects=False, headers=merged(
|
||||
enc_headers, {
|
||||
'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
|
||||
# Client cache response
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-modified-since': got_lastmod,
|
||||
}),
|
||||
)
|
||||
# self.assertIn(resp.status_code, [304] if require_cacheable else valid_status)
|
||||
|
||||
# A later time should also give a 304 response
|
||||
new_time = werkzeug.http.parse_date(
|
||||
got_lastmod) + datetime.timedelta(seconds=5)
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-modified-since': werkzeug.http.http_date(new_time),
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, [
|
||||
304] if require_cacheable else valid_status)
|
||||
|
||||
# An earlier time will give a 200 response
|
||||
new_time = werkzeug.http.parse_date(
|
||||
got_lastmod) - datetime.timedelta(seconds=5)
|
||||
resp = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(enc_headers, {
|
||||
'if-modified-since': werkzeug.http.http_date(new_time),
|
||||
}),
|
||||
)
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
|
||||
def _test_modify_request(
|
||||
self, url, up_data, method='POST', params=None, **kwargs):
|
||||
''' Common assertions for static resources.
|
||||
|
||||
:param url: The URL to pass to :py:mod:`requests`
|
||||
:type url: str
|
||||
:param up_data: The request body data.
|
||||
:type up_data: str or file-like
|
||||
:param str method: The method to request the modification.
|
||||
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
|
||||
:type params: dict or None
|
||||
:param base_headers: A dictionary of headers to send with every request.
|
||||
:type base_headers: dict or None
|
||||
:param must_authorize: Access to the resource without an Authorization
|
||||
header is attempted and compared against this value.
|
||||
:type must_authorize: bool
|
||||
:param valid_status: A set of valid status codes to allow.
|
||||
If not provided, only code 201 is valid for new resources and 204 for existing ones.
|
||||
:type valid_status: set or None
|
||||
:param empty_is_valid: Whether or not an empty document is a valid modification.
|
||||
The default is False.
|
||||
:type empty_is_valid: bool
|
||||
:param is_idempotent: Whether or not sending the same request should
|
||||
not change the resource (except for the modify time).
|
||||
The default is true if the method is PUT.
|
||||
:type is_idempotent: bool
|
||||
:param require_etag: If not None, whether the ETag is required present or not present.
|
||||
:type require_etag: bool or None
|
||||
'''
|
||||
method = method.lower()
|
||||
# Arguments not passed to _assert_modify_response()
|
||||
base_headers = kwargs.pop('base_headers', None)
|
||||
if base_headers is None:
|
||||
base_headers = {}
|
||||
must_authorize = kwargs.pop('must_authorize', True)
|
||||
empty_is_valid = kwargs.pop('empty_is_valid', False)
|
||||
is_idempotent = kwargs.pop('is_idempotent', method == 'put')
|
||||
|
||||
resolved_url = self._resolve_url(url)
|
||||
|
||||
if hasattr(up_data, 'seek'):
|
||||
|
||||
def reset_up_data():
|
||||
up_data.seek(0)
|
||||
return up_data
|
||||
|
||||
else:
|
||||
|
||||
def reset_up_data():
|
||||
return up_data
|
||||
|
||||
# Options on the resource itself
|
||||
resp = self.httpsession.options(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=base_headers,
|
||||
)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
|
||||
self.assertIn('options', got_allow)
|
||||
self.assertIn(method, got_allow)
|
||||
# Options without authentication
|
||||
resp = self.httpsession.options(
|
||||
resolved_url, params=params,
|
||||
allow_redirects=False,
|
||||
headers=merged(base_headers, {
|
||||
'authorization': None,
|
||||
}),
|
||||
)
|
||||
if must_authorize:
|
||||
self.assertEqual(401, resp.status_code)
|
||||
else:
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
# Initial state for conditions
|
||||
resp_head = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
headers={'accept-encoding': 'identity'},
|
||||
)
|
||||
init_status = resp_head.status_code
|
||||
self.assertIn(init_status, {200, 404})
|
||||
init_etag = resp_head.headers.get('etag')
|
||||
|
||||
if init_status == 200:
|
||||
# Replacing resource
|
||||
if kwargs.get('valid_status') is None:
|
||||
kwargs['valid_status'] = [204]
|
||||
match_etag = init_etag if init_etag else '*'
|
||||
add_headers_fail = {'if-none-match': match_etag}
|
||||
add_headers_good = {'if-match': match_etag}
|
||||
|
||||
elif init_status == 404:
|
||||
# New resource
|
||||
if kwargs.get('valid_status') is None:
|
||||
kwargs['valid_status'] = [201]
|
||||
add_headers_fail = {'if-match': '*'}
|
||||
add_headers_good = {'if-none-match': '*'}
|
||||
|
||||
if not empty_is_valid:
|
||||
# Invalid header content
|
||||
resp = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
)
|
||||
self.assertEqual(415, resp.status_code)
|
||||
# Invalid (empty) body content
|
||||
resp = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
headers=base_headers,
|
||||
)
|
||||
self.assertEqual(415, resp.status_code)
|
||||
|
||||
# Check precondition failure
|
||||
resp = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
headers=merged(base_headers, add_headers_fail),
|
||||
data=reset_up_data(),
|
||||
)
|
||||
self.assertEqual(412, resp.status_code)
|
||||
|
||||
if must_authorize:
|
||||
# Unauthorized access with otherwise valid request
|
||||
resp = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
headers=merged(base_headers, {
|
||||
'authorization': None,
|
||||
}),
|
||||
data=reset_up_data(),
|
||||
)
|
||||
self.assertEqual(401, resp.status_code)
|
||||
|
||||
# Actual modifying request
|
||||
resp_mod = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
headers=merged(base_headers, add_headers_good),
|
||||
data=reset_up_data(),
|
||||
)
|
||||
self._assert_modify_response(resp_mod, **kwargs)
|
||||
got_modtime = resp_mod.headers.get('last-modified')
|
||||
got_etag = resp_mod.headers.get('etag')
|
||||
|
||||
# Verify the same info is present in new HEAD reply
|
||||
resp_head = self.httpsession.head(
|
||||
resolved_url, params=params,
|
||||
headers={'accept-encoding': 'identity'},
|
||||
)
|
||||
self.assertEqual(200, resp_head.status_code)
|
||||
self.assertEqual(got_modtime, resp_head.headers.get('last-modified'))
|
||||
self.assertEqual(got_etag, resp_head.headers.get('etag'))
|
||||
|
||||
if is_idempotent:
|
||||
# Check a duplicate request
|
||||
add_headers_good = {'if-match': got_etag}
|
||||
kwargs['valid_status'] = [204]
|
||||
resp_mod = self.httpsession.request(
|
||||
method, resolved_url, params=params,
|
||||
headers=merged(base_headers, add_headers_good),
|
||||
data=reset_up_data(),
|
||||
)
|
||||
self._assert_modify_response(resp_mod, **kwargs)
|
||||
self.assertEqual(got_etag, resp_mod.headers.get('etag'))
|
||||
|
||||
# Give back the final valid response
|
||||
return resp_mod
|
||||
|
||||
def _assert_modify_response(self, resp, valid_status=None,
|
||||
require_etag=True, require_lastmod=True,
|
||||
old_etag=None):
|
||||
''' Verify the contents of a response to HTTP modification with no body.
|
||||
|
||||
:param resp: The response object to check.
|
||||
:type resp: :py:cls:`requests.Response`
|
||||
:param valid_status: A set of valid status codes to allow.
|
||||
If not provided, only codes (200, 201, 204) are valid.
|
||||
:type valid_status: set or None
|
||||
:param require_etag: If not None, whether the ETag is required present
|
||||
or not present (True or False) or a specific string value.
|
||||
:type require_etag: str or bool or None
|
||||
:param require_lastmod: If not None, whether the Last-Modified is
|
||||
required present or not present (True or False) or a specific value.
|
||||
:type require_lastmod: str or bool or None
|
||||
:param old_etag: An optional old ETag value to compare against.
|
||||
The new response must have a different ETag value than this.
|
||||
:type old_etag: str or None
|
||||
'''
|
||||
if valid_status is None:
|
||||
valid_status = [200, 201, 204]
|
||||
valid_status = set(valid_status)
|
||||
|
||||
self.assertIn(resp.status_code, valid_status)
|
||||
got_lastmod = resp.headers.get('last-modified')
|
||||
got_etag = resp.headers.get('etag')
|
||||
|
||||
if require_etag is True:
|
||||
self.assertIsNotNone(got_etag, msg='ETag header missing')
|
||||
elif require_etag is False:
|
||||
self.assertIsNone(
|
||||
got_etag, msg='ETag header should not be present')
|
||||
elif require_etag is not None:
|
||||
self.assertEqual(require_etag, got_etag)
|
||||
|
||||
if require_lastmod is True:
|
||||
self.assertIsNotNone(
|
||||
got_lastmod, msg='Last-Modified header missing')
|
||||
elif require_lastmod is False:
|
||||
self.assertIsNone(
|
||||
got_lastmod, msg='Last-Modified header should not be present')
|
||||
elif require_lastmod is not None:
|
||||
self.assertEqual(require_lastmod, got_lastmod)
|
||||
|
||||
if old_etag is not None:
|
||||
self.assertNotEqual(old_etag, got_etag)
|
||||
|
||||
# Empty body
|
||||
self.assertFalse(bool(resp.content))
|
||||
|
||||
|
||||
class UserLoginBaseTestCase(BaseTestCase):
|
||||
"""Wraps tests in login/logout flow
|
||||
|
||||
Encapsulates login/logout wrapping of tests.
|
||||
Tests that require authentication will need to use the saved login_token and cookies as headers in their requests
|
||||
"""
|
||||
#: User name to test as this assumes a user HTTPCHECKOUT_ACCTNAME already exists in the User DB
|
||||
VALID_ACCTNAME = os.environ.get(
|
||||
'HTTPCHECKOUT_ACCTNAME', 'admin').decode('utf8')
|
||||
VALID_PASSPHRASE = os.environ.get(
|
||||
'HTTPCHECKOUT_PASSPHRASE', 'admin').decode('utf8')
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
BaseTestCase.__init__(self, *args, **kwargs)
|
||||
self.login_token = None
|
||||
self.cookies = None
|
||||
|
||||
def setUp(self):
|
||||
BaseTestCase.setUp(self)
|
||||
resp = self.httpsession.post(
|
||||
self._resolve_url('auth/login'),
|
||||
json={
|
||||
'email': self.VALID_ACCTNAME,
|
||||
'password': self.VALID_PASSPHRASE})
|
||||
LOGGER.debug('code: %s, login headers: %s',
|
||||
resp.status_code, resp.content)
|
||||
self.cookies = resp.cookies
|
||||
if resp.status_code == 404:
|
||||
self.fail(msg="{} not found on this server.".format(
|
||||
self._resolve_url('auth/login')))
|
||||
try:
|
||||
self.login_token = resp.json()["token"]
|
||||
except ValueError:
|
||||
raise SkipTest("Could not login as {}".format(self.VALID_ACCTNAME))
|
||||
|
||||
def tearDown(self):
|
||||
resp = self.httpsession.post(self._resolve_url(
|
||||
'auth/logout'), params={'Authorization': self.login_token})
|
||||
LOGGER.debug('response code: %d\nbody: %s',
|
||||
resp.status_code, resp.content)
|
||||
self.login_token = None
|
||||
self.cookies = None
|
||||
BaseTestCase.tearDown(self)
|
||||
Reference in New Issue
Block a user