Files
2024-03-25 10:11:24 -04:00

968 lines
37 KiB
Python

''' Non-test support objects and classes used by the actual test cases.
'''
import datetime
from io import BytesIO
import logging
import lxml.etree as etree
import os
import re
import requests
import shutil
import tempfile
import unittest
from urlparse import urljoin
import werkzeug.datastructures
import werkzeug.http
import werkzeug.urls
from nose import SkipTest
#: Logger for this module
LOGGER = logging.getLogger(__name__)
#: Regex to match application/xml and applicaiton/sub+xml
XML_CONTENT_RE = re.compile(r'^application/(.+\+)?xml$')
#: Time format for ISO 8601 "basic" time used by XML Schema.
#: This string is usable by datetime.strftime and datetime.strptime
TIME_FORMAT_BASIC = '%Y%m%dT%H%M%SZ'
#: Time format for ISO 8601 "extended" time used by XML Schema.
#: This string is usable by datetime.strftime and datetime.strptime
TIME_FORMAT_EXTENDED = '%Y-%m-%dT%H:%M:%SZ'
#: Absolute path to this package directory
PACKAGE_PATH = os.path.abspath(os.path.dirname(__file__))
def get_xml_parser(schema):
''' Generate a function to extract an XML DOM tree from an encoded document.
:param schema: Iff not None, the document will be validated against
this schema object.
:type use_schema: bool
:return: The parser function which takes a file-like parameter and
returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
'''
xmlparser = etree.XMLParser(schema=schema)
def func(infile):
try:
return etree.parse(infile, parser=xmlparser)
except etree.XMLSyntaxError as err:
infile.seek(0)
with tempfile.NamedTemporaryFile(delete=False) as outfile:
shutil.copyfileobj(infile, outfile)
raise ValueError(
'Failed to parse XML with error {0} in file {1}'.format(
err, outfile.name))
return func
def extract_metadict(doc):
''' Extract a server metadata dictionary from its parsed XML document.
:param doc: The document to read from.
:return: The metadata URL map.
'''
metadict = {}
for el_a in doc.findall('//{http://www.w3.org/1999/xhtml}a'):
m_id = el_a.attrib.get('id')
m_href = el_a.attrib.get('href')
if m_id is None or m_href is None:
continue
metadict[m_id] = m_href
return metadict
def merged(base, delta):
''' Return a merged dictionary contents.
:param base: The initial contents to merge.
:param delta: The modifications to apply.
:return: A dictionary containing the :py:obj:`base` updated
by the :py:obj:`delta`.
'''
mod = dict(base)
mod.update(delta)
return mod
def limit_count(iterable, limit):
''' Wrap an iterable/generator with a count limit to only yield the first
:py:obj:`count` number of items.
:param iterable: The source iterable object.
:param limit: The maximum number of items available from the generator.
:return A generator with a count limit.
'''
count = 0
for item in iterable:
yield item
count += 1
if count >= limit:
return
def modify_etag(orig):
''' Given a base ETag value, generate a modified ETag which is
guaranteed to not match the original.
:param str orig: The original ETag.
:return: A different ETag
'''
# Inject characters near the end
mod = orig[:-1] + '-eh' + orig[-1:]
return mod
class ValidateJsonResponse(object):
''' Validate an expected JSON file response.
'''
def __call__(self, resp):
import json
json.loads(resp.content)
class ValidateHtmlResponse(object):
''' Validate an HTML response with a loose parser.
'''
def __call__(self, resp):
import bs4
kwargs = dict(
markup=resp.content,
features='lxml',
)
bs4.BeautifulSoup(**kwargs)
class ValidateXmlResponse(object):
''' Validate an expected XML file response.
:param parser: A function to take a file-like input and output an
XML element tree :py:cls:`lxml.etree.ElementTree`.
:param require_root: If not None, the root element must be this value.
'''
def __init__(self, parser, require_root=None):
self._parser = parser
if not callable(self._parser):
raise ValueError('ValidateXmlResponse parser invalid')
self._require_root = require_root
def __call__(self, resp):
xml_tree = self._parser(BytesIO(resp.content))
if self._require_root is not None:
root_tag = xml_tree.getroot().tag
if self._require_root != root_tag:
raise ValueError(
'Required root element "{0}" not present, got "{1}"'.format(
self._require_root, root_tag))
class BaseTestCase(unittest.TestCase):
''' Common access and helper functions which use the :py:mod:`unittest`
framework but this class defines no test functions itself.
:ivar httpsession: An :py:class:`requests.Session` instance for test use.
:ivar xmlparser: An :py:class:`etree.XMLParser` instance for test use.
'''
#: Cached URL to start at and to resolve from
BASE_URL = os.environ.get('HTTPCHECKOUT_BASEURL')
#: True if the editing tests should be skipped
READONLY = bool(os.environ.get('HTTPCHECKOUT_READONLY'))
#: Keep any created resources in the tearDown() method which are left behind
KEEP_TESTITEMS = bool(os.environ.get('HTTPCHECKOUT_KEEP_TESTITEMS'))
def setUp(self):
unittest.TestCase.setUp(self)
self.maxDiff = 10e3
self.assertIsNotNone(
self.BASE_URL, 'Missing environment HTTPCHECKOUT_BASEURL')
self.httpsession = requests.Session()
ca_roots = os.environ.get('HTTPCHECKOUT_CACERTS')
LOGGER.info('HTTPCHECKOUT_CACERTS is "%s"', ca_roots)
if ca_roots == '':
import warnings
from urllib3.exceptions import InsecureRequestWarning
self.httpsession.verify = False
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
def tearDown(self):
self.httpsession = None
unittest.TestCase.tearDown(self)
def _assertWriteTest(self):
''' Skip the current test if HTTPCHECKOUT_READONLY is set.
'''
if self.READONLY:
self.skipTest(
'Not running editing tests because of HTTPCHECKOUT_READONLY')
def _resolve_url(self, url):
''' Resolve a URL relative to the original base URL.
:param url: The URL to resolve.
:type url: str
:return: The resolved absolute URL to request on.
:rtype: str
'''
return urljoin(self.BASE_URL, url)
def assertSameUrlPath(self, first, second):
''' Assert that two URLs are equal except for their query/fragment parts.
'''
f_url = werkzeug.urls.url_parse(first)
s_url = werkzeug.urls.url_parse(second)
for attr in ('scheme', 'netloc', 'path'):
self.assertEqual(
getattr(f_url, attr),
getattr(s_url, attr),
'Mismatched URL {0}'.format(attr)
)
def _get_xml_parser(self, use_schema=None):
''' Generate a function to extract an XML DOM tree from an encoded document.
:return: The parser function which takes a file-like parameter and
returns a tree object of type :py:cls:`lxml.etree.ElementTree`.
'''
return get_xml_parser(schema=use_schema)
def _get_xml_encoder(self):
''' Generate a function to encode a document from an XML DOM tree.
:return: The parser function which takes a parameter of a
tree object of type :py:cls:`lxml.etree.ElementTree` and
returns a file-like object.
'''
def func(doc, outfile=None):
''' Encode a document.
:parm doc: The document to encode.
:type doc: :py:cls:`lxml.etree.ElementTree`
:param outfile: An optional file-like object to encode into.
This must be None if the encoder is used multiple times.
:type outfile: file-like or None
:return: The encoded file-like object.
'''
if outfile is None:
outfile = BytesIO()
doc.write(outfile, encoding='UTF-8', xml_declaration=True)
if hasattr(outfile, 'seek'):
try:
outfile.seek(0)
except BaseException:
pass
return outfile
return func
def _test_working_links(self, text):
''' Verify that html has well formed links
:param text: html doc with href's
'''
import bs4
html = bs4.BeautifulSoup(text, 'html.parser')
for link in [a['href'] for a in html.find_all('a')]:
self._test_working_link(link)
def _test_working_link(self, url):
''' Verify that a url returns a 200 response
:param url: The URL to be checked
'''
resolved_url = self._resolve_url(url)
resp = self.httpsession.get(resolved_url)
self.assertEqual(200, resp.status_code)
def _test_options_allow(self, url, methods):
''' Verify that the OPTIONS response for a URL matches a specific set.
:param url: The URL to pass to :py:mod:`requests`
:type url: str
:param methods: The method names which must be identical to the response.
:type methods: iterable
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
:type params: dict or None
'''
methods = set([str(m).upper() for m in methods])
methods.add('OPTIONS')
if 'GET' in methods:
methods.add('HEAD')
resolved_url = self._resolve_url(url)
resp = self.httpsession.options(resolved_url)
self.assertEqual(200, resp.status_code)
got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
self.assertEqual(methods, set(got_allow))
def _test_path_contents(
self,
url,
params=None,
base_headers=None,
validate_response_pre=None,
validate_response_post=None,
must_authorize=True,
valid_status=None,
content_type=None,
valid_encodings=None,
require_length=True,
require_vary=None,
require_etag=True,
require_lastmod=True,
require_cacheable=True,
cache_must_revalidate=False):
''' Common assertions for static resources.
:param url: The URL to pass to :py:mod:`requests`
:type url: str
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
:type params: dict or None
:param base_headers: A dictionary of headers to send with every request.
:type base_headers: dict or None
:param validate_response_pre: A callable which takes a single argument of
the response object and performs its own validation of the headers
and/or body for each non-cached response.
This is performed before any of the parametric checks.
:type validate_response_pre: callable or None
:param validate_response_post: A callable which takes a single argument of
the response object and performs its own validation of the headers
and/or body for each non-cached response.
This is performed after any of the parametric checks.
:type validate_response_post: callable or None
:param must_authorize: Access to the resource without an Authorization
header is attempted and compared against this value.
:type must_authorize: bool
:param valid_status: A set of valid status codes to allow.
If not provided, only code 200 is valid.
:type valid_status: set or None
:param content_type: If not None, the required Content-Type header.
:type content_type: bool or None
:param valid_encodings: If not None, a list of content encodings to check for.
The resource must provide each of the non-identity encodings listed.
:type valid_encodings: list or None
:param require_length: If either true or false, assert that the
content-length header is present or not.
:type require_length: bool or None
:param require_vary: A set of Vary reults required to be present
in the response.
If the :py:obj:`valid_encodings` list has more than the identity
encoding present, then 'accept-encoding' will be automatically
added to this vary list.
:type require_vary: list or None
:param require_etag: If not None, whether the ETag is required present
or not present (True or False) or a specific string value.
:type require_etag: str or bool or None
:param require_lastmod: If not None, whether the Last-Modified is
required present or not present (True or False) or a specific value.
:type require_lastmod: str or bool or None
:param require_cacheable: If true, the resource is checked for its cacheability.
Not all resources should be cacheable (even if not explicitly marked no-cache).
:type require_cacheable: bool
:param cache_must_revalidate: If True, the response must have its
'must-revalidate' cache control header set.
:type cache_must_revalidate: bool
:raises: raises unittest exceptions if an assertion fails
'''
if base_headers is None:
base_headers = {}
if valid_status is None:
valid_status = [200]
valid_status = set(valid_status)
# Set of valid encodings to require
if valid_encodings is None:
valid_encodings = []
valid_encodings = set(valid_encodings)
valid_encodings.add('identity')
# Force as ordered list with identity encoding first and gzip always
# attempted
try_encodings = set(valid_encodings)
try_encodings.discard('identity')
try_encodings = sorted(list(try_encodings))
try_encodings.insert(0, 'identity')
# Cached identity-encoded contents
identity_body = None
if require_vary is None:
require_vary = []
require_vary = set(require_vary)
if len(valid_encodings) > 1:
require_vary.add('accept-encoding')
resolved_url = self._resolve_url(url)
# Options on the resource itself
resp = self.httpsession.options(
resolved_url, params=params,
allow_redirects=False,
headers=base_headers,
)
self.assertEqual(200, resp.status_code)
got_allow = werkzeug.http.parse_set_header(resp.headers.get('allow'))
self.assertIn('options', got_allow)
if 404 not in valid_status:
self.assertIn('head', got_allow)
self.assertIn('get', got_allow)
# Options without authentication
resp = self.httpsession.options(
resolved_url, params=params,
allow_redirects=False,
headers=merged(base_headers, {
'authorization': None,
}),
)
if must_authorize:
self.assertEqual(401, resp.status_code)
else:
self.assertEqual(200, resp.status_code)
for try_encoding in try_encodings:
# initial non-cache response
enc_headers = merged(
base_headers,
{
'accept-encoding': try_encoding,
}
)
resp = self.httpsession.get(
resolved_url, params=params,
allow_redirects=False,
headers=enc_headers,
stream=True,
)
# External validation first
if validate_response_pre:
try:
validate_response_pre(resp)
except Exception as err:
self.fail('Failed pre-validation: {0}'.format(err))
# Now parametric validation
self.assertIn(resp.status_code, valid_status)
got_content_type = werkzeug.http.parse_options_header(
resp.headers['content-type'])
if content_type is not None:
self.assertEqual(content_type.lower(),
got_content_type[0].lower())
# Encoding comparison compared to valid
got_encoding = resp.headers.get('content-encoding', 'identity')
if try_encoding in valid_encodings:
self.assertEqual(try_encoding, got_encoding)
else:
self.assertEqual(
'identity', got_encoding,
msg='"{0}" was supposed to be a disallowed content-encoding but it was accepted'.format(
try_encoding)
)
got_length = resp.headers.get('content-length')
if require_length is True:
self.assertIsNotNone(got_length, msg='Content-Length missing')
elif require_length is False:
self.assertIsNone(
got_length, msg='Content-Length should not be present')
# Guarantee type is correct also
if got_length is not None:
try:
got_length = int(got_length)
except ValueError:
self.fail(
'Got a non-integer Content-Length: {0}'.format(got_length))
got_vary = werkzeug.http.parse_set_header(resp.headers.get('vary'))
for item in require_vary:
LOGGER.debug("headers: %s", resp.headers)
self.assertIn(
item,
got_vary,
msg='Vary header missing item "{0}" got {1}'.format(
item,
got_vary))
got_etag = resp.headers.get('etag')
got_lastmod = resp.headers.get('last-modified')
if resp.status_code != 204:
if require_etag is True:
self.assertIsNotNone(got_etag, msg='ETag header missing')
elif require_etag is False:
self.assertIsNone(
got_etag, msg='ETag header should not be present')
elif require_etag is not None:
self.assertEqual(require_etag, got_etag)
if require_lastmod is True:
self.assertIsNotNone(
got_lastmod, msg='Last-Modified header missing')
elif require_lastmod is False:
self.assertIsNone(
got_lastmod, msg='Last-Modified header should not be present')
elif require_lastmod is not None:
self.assertEqual(require_lastmod, got_lastmod)
# Caching headers
cache_control = werkzeug.http.parse_cache_control_header(
resp.headers.get('cache-control'),
cls=werkzeug.datastructures.ResponseCacheControl,
)
# The resource must define its domain
if False:
self.assertTrue(
cache_control.no_cache
or cache_control.public # pylint: disable=no-member
or cache_control.private, # pylint: disable=no-member
msg='Missing cache public/private assertion for {0}'.format(
resolved_url)
)
if require_cacheable is not False and cache_must_revalidate is True:
self.assertTrue(
cache_control.must_revalidate) # pylint: disable=no-member
if require_cacheable is True:
self.assertFalse(cache_control.no_cache)
self.assertFalse(cache_control.no_store)
# self.assertLessEqual(0, cache_control.max_age)
elif require_cacheable is False:
# FIXME not always true
self.assertTrue(cache_control.no_cache)
# Actual body content itself
got_body = str(resp.content)
if resp.status_code == 204:
self.assertEqual('', got_body)
else:
# Ensure decoded body is identical
if got_encoding == 'identity':
identity_body = got_body
self.assertIsNotNone(identity_body)
if got_length is not None:
self.assertEqual(len(identity_body), got_length)
else:
self.assertEqual(identity_body, got_body)
# XML specific decoding
if XML_CONTENT_RE.match(
got_content_type[0]) is not None and validate_response_post is None:
validate_response_post = ValidateXmlResponse(
self._get_xml_parser(use_schema=True))
# After all parametric tests on this response
if validate_response_post:
try:
validate_response_post(resp)
except Exception as err:
self.fail('Failed post-validation: {0}'.format(err))
# Check the unauthorized view of same URL
for method in ('GET', 'HEAD'):
resp = self.httpsession.request(
method,
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'authorization': None,
}),
)
if must_authorize:
self.assertEqual(
401,
resp.status_code,
msg='For {0} on {1}: Expected 401 status got {2}'.format(
method,
resolved_url,
resp.status_code))
else:
self.assertIn(
resp.status_code,
valid_status,
msg='For {0} on {1}: Expected valid status got {2}'.format(
method,
resolved_url,
resp.status_code))
# Any resource with cache control header
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-match': '*',
}),
)
self.assertIn(resp.status_code, valid_status)
# Caching with ETag
if got_etag is not None:
self.assertIn(resp.status_code, valid_status)
# Existing resource
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-match': got_etag,
}),
)
self.assertIn(resp.status_code, valid_status)
# Client cache response
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-none-match': got_etag,
}),
)
self.assertIn(resp.status_code, [
304] if require_cacheable else valid_status)
# With adjusted ETag
mod_etag = modify_etag(got_etag)
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-none-match': mod_etag,
}),
)
self.assertIn(resp.status_code, valid_status)
# Caching with Last-Modified
if got_lastmod is not None:
# No changes here so normal response
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-unmodified-since': got_lastmod,
}),
)
# self.assertIn(resp.status_code, valid_status)
# An earlier time will give a 412
new_time = werkzeug.http.parse_date(
got_lastmod) - datetime.timedelta(seconds=5)
resp = self.httpsession.head(
resolved_url, params=params, allow_redirects=False, headers=merged(
enc_headers, {
'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
self.assertIn(resp.status_code, [
412] if require_cacheable else valid_status)
# An later time is normal response
new_time = werkzeug.http.parse_date(
got_lastmod) + datetime.timedelta(seconds=5)
resp = self.httpsession.head(
resolved_url, params=params, allow_redirects=False, headers=merged(
enc_headers, {
'if-unmodified-since': werkzeug.http.http_date(new_time), }), )
self.assertIn(resp.status_code, valid_status)
# Client cache response
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-modified-since': got_lastmod,
}),
)
# self.assertIn(resp.status_code, [304] if require_cacheable else valid_status)
# A later time should also give a 304 response
new_time = werkzeug.http.parse_date(
got_lastmod) + datetime.timedelta(seconds=5)
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-modified-since': werkzeug.http.http_date(new_time),
}),
)
self.assertIn(resp.status_code, [
304] if require_cacheable else valid_status)
# An earlier time will give a 200 response
new_time = werkzeug.http.parse_date(
got_lastmod) - datetime.timedelta(seconds=5)
resp = self.httpsession.head(
resolved_url, params=params,
allow_redirects=False,
headers=merged(enc_headers, {
'if-modified-since': werkzeug.http.http_date(new_time),
}),
)
self.assertIn(resp.status_code, valid_status)
def _test_modify_request(
self, url, up_data, method='POST', params=None, **kwargs):
''' Common assertions for static resources.
:param url: The URL to pass to :py:mod:`requests`
:type url: str
:param up_data: The request body data.
:type up_data: str or file-like
:param str method: The method to request the modification.
:param params: URL parameter dictionary to pass to :py:mod:`requests`.
:type params: dict or None
:param base_headers: A dictionary of headers to send with every request.
:type base_headers: dict or None
:param must_authorize: Access to the resource without an Authorization
header is attempted and compared against this value.
:type must_authorize: bool
:param valid_status: A set of valid status codes to allow.
If not provided, only code 201 is valid for new resources and 204 for existing ones.
:type valid_status: set or None
:param empty_is_valid: Whether or not an empty document is a valid modification.
The default is False.
:type empty_is_valid: bool
:param is_idempotent: Whether or not sending the same request should
not change the resource (except for the modify time).
The default is true if the method is PUT.
:type is_idempotent: bool
:param require_etag: If not None, whether the ETag is required present or not present.
:type require_etag: bool or None
'''
method = method.lower()
# Arguments not passed to _assert_modify_response()
base_headers = kwargs.pop('base_headers', None)
if base_headers is None:
base_headers = {}
must_authorize = kwargs.pop('must_authorize', True)
empty_is_valid = kwargs.pop('empty_is_valid', False)
is_idempotent = kwargs.pop('is_idempotent', method == 'put')
resolved_url = self._resolve_url(url)
if hasattr(up_data, 'seek'):
def reset_up_data():
up_data.seek(0)
return up_data
else:
def reset_up_data():
return up_data
# Options on the resource itself
resp = self.httpsession.options(
resolved_url, params=params,
allow_redirects=False,
headers=base_headers,
)
self.assertEqual(200, resp.status_code)
got_allow = werkzeug.http.parse_set_header(resp.headers['allow'])
self.assertIn('options', got_allow)
self.assertIn(method, got_allow)
# Options without authentication
resp = self.httpsession.options(
resolved_url, params=params,
allow_redirects=False,
headers=merged(base_headers, {
'authorization': None,
}),
)
if must_authorize:
self.assertEqual(401, resp.status_code)
else:
self.assertEqual(200, resp.status_code)
# Initial state for conditions
resp_head = self.httpsession.head(
resolved_url, params=params,
headers={'accept-encoding': 'identity'},
)
init_status = resp_head.status_code
self.assertIn(init_status, {200, 404})
init_etag = resp_head.headers.get('etag')
if init_status == 200:
# Replacing resource
if kwargs.get('valid_status') is None:
kwargs['valid_status'] = [204]
match_etag = init_etag if init_etag else '*'
add_headers_fail = {'if-none-match': match_etag}
add_headers_good = {'if-match': match_etag}
elif init_status == 404:
# New resource
if kwargs.get('valid_status') is None:
kwargs['valid_status'] = [201]
add_headers_fail = {'if-match': '*'}
add_headers_good = {'if-none-match': '*'}
if not empty_is_valid:
# Invalid header content
resp = self.httpsession.request(
method, resolved_url, params=params,
)
self.assertEqual(415, resp.status_code)
# Invalid (empty) body content
resp = self.httpsession.request(
method, resolved_url, params=params,
headers=base_headers,
)
self.assertEqual(415, resp.status_code)
# Check precondition failure
resp = self.httpsession.request(
method, resolved_url, params=params,
headers=merged(base_headers, add_headers_fail),
data=reset_up_data(),
)
self.assertEqual(412, resp.status_code)
if must_authorize:
# Unauthorized access with otherwise valid request
resp = self.httpsession.request(
method, resolved_url, params=params,
headers=merged(base_headers, {
'authorization': None,
}),
data=reset_up_data(),
)
self.assertEqual(401, resp.status_code)
# Actual modifying request
resp_mod = self.httpsession.request(
method, resolved_url, params=params,
headers=merged(base_headers, add_headers_good),
data=reset_up_data(),
)
self._assert_modify_response(resp_mod, **kwargs)
got_modtime = resp_mod.headers.get('last-modified')
got_etag = resp_mod.headers.get('etag')
# Verify the same info is present in new HEAD reply
resp_head = self.httpsession.head(
resolved_url, params=params,
headers={'accept-encoding': 'identity'},
)
self.assertEqual(200, resp_head.status_code)
self.assertEqual(got_modtime, resp_head.headers.get('last-modified'))
self.assertEqual(got_etag, resp_head.headers.get('etag'))
if is_idempotent:
# Check a duplicate request
add_headers_good = {'if-match': got_etag}
kwargs['valid_status'] = [204]
resp_mod = self.httpsession.request(
method, resolved_url, params=params,
headers=merged(base_headers, add_headers_good),
data=reset_up_data(),
)
self._assert_modify_response(resp_mod, **kwargs)
self.assertEqual(got_etag, resp_mod.headers.get('etag'))
# Give back the final valid response
return resp_mod
def _assert_modify_response(self, resp, valid_status=None,
require_etag=True, require_lastmod=True,
old_etag=None):
''' Verify the contents of a response to HTTP modification with no body.
:param resp: The response object to check.
:type resp: :py:cls:`requests.Response`
:param valid_status: A set of valid status codes to allow.
If not provided, only codes (200, 201, 204) are valid.
:type valid_status: set or None
:param require_etag: If not None, whether the ETag is required present
or not present (True or False) or a specific string value.
:type require_etag: str or bool or None
:param require_lastmod: If not None, whether the Last-Modified is
required present or not present (True or False) or a specific value.
:type require_lastmod: str or bool or None
:param old_etag: An optional old ETag value to compare against.
The new response must have a different ETag value than this.
:type old_etag: str or None
'''
if valid_status is None:
valid_status = [200, 201, 204]
valid_status = set(valid_status)
self.assertIn(resp.status_code, valid_status)
got_lastmod = resp.headers.get('last-modified')
got_etag = resp.headers.get('etag')
if require_etag is True:
self.assertIsNotNone(got_etag, msg='ETag header missing')
elif require_etag is False:
self.assertIsNone(
got_etag, msg='ETag header should not be present')
elif require_etag is not None:
self.assertEqual(require_etag, got_etag)
if require_lastmod is True:
self.assertIsNotNone(
got_lastmod, msg='Last-Modified header missing')
elif require_lastmod is False:
self.assertIsNone(
got_lastmod, msg='Last-Modified header should not be present')
elif require_lastmod is not None:
self.assertEqual(require_lastmod, got_lastmod)
if old_etag is not None:
self.assertNotEqual(old_etag, got_etag)
# Empty body
self.assertFalse(bool(resp.content))
class UserLoginBaseTestCase(BaseTestCase):
"""Wraps tests in login/logout flow
Encapsulates login/logout wrapping of tests.
Tests that require authentication will need to use the saved login_token and cookies as headers in their requests
"""
#: User name to test as this assumes a user HTTPCHECKOUT_ACCTNAME already exists in the User DB
VALID_ACCTNAME = os.environ.get(
'HTTPCHECKOUT_ACCTNAME', 'admin').decode('utf8')
VALID_PASSPHRASE = os.environ.get(
'HTTPCHECKOUT_PASSPHRASE', 'admin').decode('utf8')
def __init__(self, *args, **kwargs):
BaseTestCase.__init__(self, *args, **kwargs)
self.login_token = None
self.cookies = None
def setUp(self):
BaseTestCase.setUp(self)
resp = self.httpsession.post(
self._resolve_url('auth/login'),
json={
'email': self.VALID_ACCTNAME,
'password': self.VALID_PASSPHRASE})
LOGGER.debug('code: %s, login headers: %s',
resp.status_code, resp.content)
self.cookies = resp.cookies
if resp.status_code == 404:
self.fail(msg="{} not found on this server.".format(
self._resolve_url('auth/login')))
try:
self.login_token = resp.json()["token"]
except ValueError:
raise SkipTest("Could not login as {}".format(self.VALID_ACCTNAME))
def tearDown(self):
resp = self.httpsession.post(self._resolve_url(
'auth/logout'), params={'Authorization': self.login_token})
LOGGER.debug('response code: %d\nbody: %s',
resp.status_code, resp.content)
self.login_token = None
self.cookies = None
BaseTestCase.tearDown(self)