Add initial tests
To make os-http testable we do a little bit of rearranging of the shell so that it returns strings and exceptions rather than printing directly. With this rearranging we can then add a couple of tests to verify basic functionality.
This commit is contained in:
parent
78dfff9891
commit
2477249a40
@ -45,8 +45,44 @@ try:
|
|||||||
except Exception:
|
except Exception:
|
||||||
_occ_version = "unknown"
|
_occ_version = "unknown"
|
||||||
|
|
||||||
|
formatter_name = 'console' if sys.stdout.isatty() else 'text'
|
||||||
|
|
||||||
def main(argv=sys.argv[1:]):
|
|
||||||
|
class ErrorExit(Exception):
|
||||||
|
|
||||||
|
def __init__(self, message, exit_code=1):
|
||||||
|
self.message = message
|
||||||
|
self.exit_code = exit_code
|
||||||
|
|
||||||
|
|
||||||
|
def format_resp(resp):
|
||||||
|
# I can see no way to get the HTTP version
|
||||||
|
headers = ["HTTP/1.1 %d %s" % (resp.status_code, resp.reason or '')]
|
||||||
|
headers.extend('%s: %s' % k for k in resp.headers.items())
|
||||||
|
headers = '\n'.join(headers)
|
||||||
|
|
||||||
|
if 'json' in resp.headers.get('Content-Type', '').lower():
|
||||||
|
body = json.dumps(resp.json(), sort_keys=True, indent=4)
|
||||||
|
else:
|
||||||
|
body = resp.content
|
||||||
|
|
||||||
|
if pygments:
|
||||||
|
mime = resp.headers.get('Content-Type')
|
||||||
|
http_lexer = pygments.lexers.get_lexer_by_name('http')
|
||||||
|
formatter = pygments.formatters.get_formatter_by_name(formatter_name)
|
||||||
|
|
||||||
|
try:
|
||||||
|
body_lexer = pygments.lexers.get_lexer_for_mimetype(mime)
|
||||||
|
except pygments.util.ClassNotFound:
|
||||||
|
body_lexer = pygments.lexers.get_lexer_by_name('text')
|
||||||
|
|
||||||
|
headers = pygments.highlight(headers, http_lexer, formatter)
|
||||||
|
body = pygments.highlight(body, body_lexer, formatter)
|
||||||
|
|
||||||
|
return '\n'.join([headers, '', body])
|
||||||
|
|
||||||
|
|
||||||
|
def run(argv):
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description='Simple HTTP testing for Openstack')
|
description='Simple HTTP testing for Openstack')
|
||||||
|
|
||||||
@ -69,7 +105,7 @@ def main(argv=sys.argv[1:]):
|
|||||||
nargs='*',
|
nargs='*',
|
||||||
help='Additional items')
|
help='Additional items')
|
||||||
|
|
||||||
opts = parser.parse_args()
|
opts = parser.parse_args(argv)
|
||||||
|
|
||||||
if opts.debug:
|
if opts.debug:
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
@ -95,8 +131,7 @@ def main(argv=sys.argv[1:]):
|
|||||||
key, val = item.split(':', 1)
|
key, val = item.split(':', 1)
|
||||||
headers[key] = val
|
headers[key] = val
|
||||||
else:
|
else:
|
||||||
LOG.error("Unknown item: %s", item)
|
raise ErrorExit("Unknown item: %s" % item)
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
resp = adap.request(opts.url,
|
resp = adap.request(opts.url,
|
||||||
@ -121,8 +156,7 @@ def main(argv=sys.argv[1:]):
|
|||||||
"result in an unscoped token. Please check your "
|
"result in an unscoped token. Please check your "
|
||||||
"authentication credentials.")
|
"authentication credentials.")
|
||||||
|
|
||||||
LOG.error(message)
|
raise ErrorExit(message)
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
except exceptions.EndpointNotFound:
|
except exceptions.EndpointNotFound:
|
||||||
service_params = ('service_type',
|
service_params = ('service_type',
|
||||||
@ -133,35 +167,17 @@ def main(argv=sys.argv[1:]):
|
|||||||
|
|
||||||
query = ", ".join("%s=%s" % (p, getattr(adap, p))
|
query = ", ".join("%s=%s" % (p, getattr(adap, p))
|
||||||
for p in service_params if getattr(adap, p))
|
for p in service_params if getattr(adap, p))
|
||||||
LOG.error("Failed to find an endpoint in the service catalog that "
|
raise ErrorExit("Failed to find an endpoint in the service catalog "
|
||||||
"matches your query: %s", query)
|
"that matches your query: %s" % query)
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# I can see no way to get the HTTP version
|
return format_resp(resp)
|
||||||
headers = ["HTTP/1.1 %d %s" % (resp.status_code, resp.reason)]
|
|
||||||
headers.extend('%s: %s' % k for k in resp.headers.items())
|
|
||||||
headers = '\n'.join(headers)
|
|
||||||
|
|
||||||
if 'json' in resp.headers.get('Content-Type', '').lower():
|
|
||||||
body = json.dumps(resp.json(), sort_keys=True, indent=4)
|
def main(argv=sys.argv[1:]):
|
||||||
|
try:
|
||||||
|
output = run(argv)
|
||||||
|
except ErrorExit as e:
|
||||||
|
LOG.error(e.message)
|
||||||
|
sys.exit(e.exit_code)
|
||||||
else:
|
else:
|
||||||
body = resp.content
|
print(output)
|
||||||
|
|
||||||
if pygments:
|
|
||||||
mime = resp.headers.get('Content-Type')
|
|
||||||
http_lexer = pygments.lexers.get_lexer_by_name('http')
|
|
||||||
|
|
||||||
formatter_name = 'console' if sys.stdout.isatty() else 'text'
|
|
||||||
formatter = pygments.formatters.get_formatter_by_name(formatter_name)
|
|
||||||
|
|
||||||
try:
|
|
||||||
body_lexer = pygments.lexers.get_lexer_for_mimetype(mime)
|
|
||||||
except pygments.util.ClassNotFound:
|
|
||||||
body_lexer = pygments.lexers.get_lexer_by_name('text')
|
|
||||||
|
|
||||||
headers = pygments.highlight(headers, http_lexer, formatter)
|
|
||||||
body = pygments.highlight(body, body_lexer, formatter)
|
|
||||||
|
|
||||||
print(headers)
|
|
||||||
print('')
|
|
||||||
print(body)
|
|
||||||
|
@ -15,9 +15,14 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import fixtures
|
||||||
from oslotest import base
|
from oslotest import base
|
||||||
|
from requests_mock.contrib import fixture
|
||||||
|
|
||||||
|
|
||||||
class TestCase(base.BaseTestCase):
|
class TestCase(base.BaseTestCase):
|
||||||
|
|
||||||
"""Test case base class for all unit tests."""
|
def setUp(self):
|
||||||
|
super(TestCase, self).setUp()
|
||||||
|
self.requests_mock = self.useFixture(fixture.Fixture())
|
||||||
|
self.logger_mock = self.useFixture(fixtures.FakeLogger())
|
||||||
|
@ -19,10 +19,142 @@ test_os_http
|
|||||||
Tests for `os_http` module.
|
Tests for `os_http` module.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import fixtures
|
||||||
|
from keystoneauth1 import fixture
|
||||||
|
import requests_mock
|
||||||
|
from testtools import matchers
|
||||||
|
|
||||||
|
from os_http import shell
|
||||||
from os_http.tests import base
|
from os_http.tests import base
|
||||||
|
|
||||||
|
AUTH_URL = 'http://openstack.example.com:5000'
|
||||||
|
|
||||||
class TestOs_http(base.TestCase):
|
PUBLIC_SERVICE_URL = 'http://public.example.com:9292'
|
||||||
|
ADMIN_SERVICE_URL = 'http://admin.example.com:9292'
|
||||||
|
INTERNAL_SERVICE_URL = 'http://internal.example.com:9292'
|
||||||
|
SERVICE_REGION = uuid.uuid4().hex
|
||||||
|
|
||||||
def test_something(self):
|
|
||||||
pass
|
class TestInputs(base.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestInputs, self).setUp()
|
||||||
|
|
||||||
|
disc = fixture.DiscoveryList(href=AUTH_URL, v2=False)
|
||||||
|
|
||||||
|
self.service_type = uuid.uuid4().hex
|
||||||
|
self.service_id = uuid.uuid4().hex
|
||||||
|
self.service_name = uuid.uuid4().hex
|
||||||
|
|
||||||
|
self.user_id = uuid.uuid4().hex
|
||||||
|
self.username = uuid.uuid4().hex
|
||||||
|
self.project_id = uuid.uuid4().hex
|
||||||
|
self.project_name = uuid.uuid4().hex
|
||||||
|
|
||||||
|
self.token = fixture.V3Token(user_id=self.user_id,
|
||||||
|
user_name=self.username,
|
||||||
|
project_id=self.project_id,
|
||||||
|
project_name=self.project_name)
|
||||||
|
|
||||||
|
self.token.add_role()
|
||||||
|
self.token.add_role()
|
||||||
|
self.token_id = uuid.uuid4().hex
|
||||||
|
|
||||||
|
service = self.token.add_service(self.service_type,
|
||||||
|
id=self.service_id,
|
||||||
|
name=self.service_name)
|
||||||
|
|
||||||
|
service.add_standard_endpoints(public=PUBLIC_SERVICE_URL,
|
||||||
|
admin=ADMIN_SERVICE_URL,
|
||||||
|
internal=INTERNAL_SERVICE_URL,
|
||||||
|
region=SERVICE_REGION)
|
||||||
|
|
||||||
|
self.requests_mock.get(AUTH_URL, json=disc, status_code=300)
|
||||||
|
self.auth_mock = self.requests_mock.post(
|
||||||
|
AUTH_URL + '/v3/auth/tokens',
|
||||||
|
json=self.token,
|
||||||
|
headers={'X-Subject-Token': self.token_id})
|
||||||
|
|
||||||
|
# don't do any console formatting markup
|
||||||
|
m = fixtures.MockPatchObject(shell, 'formatter_name', 'text')
|
||||||
|
self.useFixture(m)
|
||||||
|
|
||||||
|
def shell(self, *args, **kwargs):
|
||||||
|
for k, v in kwargs.items():
|
||||||
|
args.append('--os-%s' % k.replace('_', '-'))
|
||||||
|
args.append(v)
|
||||||
|
|
||||||
|
return shell.run(args)
|
||||||
|
|
||||||
|
def test_simple_get(self):
|
||||||
|
path = '/%s' % uuid.uuid4().hex
|
||||||
|
public_url = '%s%s' % (PUBLIC_SERVICE_URL, path)
|
||||||
|
|
||||||
|
json_a = uuid.uuid4().hex
|
||||||
|
json_b = uuid.uuid4().hex
|
||||||
|
|
||||||
|
service_mock = self.requests_mock.get(
|
||||||
|
public_url,
|
||||||
|
json={json_a: json_b},
|
||||||
|
status_code=200,
|
||||||
|
reason='OK',
|
||||||
|
headers={'Content-Type': 'application/json'})
|
||||||
|
|
||||||
|
resp = self.shell('get', path,
|
||||||
|
'--os-service-type', self.service_type,
|
||||||
|
'--os-auth-type', 'password',
|
||||||
|
'--os-auth-url', AUTH_URL,
|
||||||
|
'--os-project-id', self.project_id,
|
||||||
|
'--os-user-id', self.user_id)
|
||||||
|
|
||||||
|
self.assertEqual('GET', self.requests_mock.last_request.method)
|
||||||
|
self.assertEqual(public_url, self.requests_mock.last_request.url)
|
||||||
|
|
||||||
|
self.assertTrue(service_mock.called)
|
||||||
|
|
||||||
|
self.assertThat(resp, matchers.StartsWith('HTTP/1.1 200 OK'))
|
||||||
|
self.assertIn('Content-Type: application/json', resp)
|
||||||
|
|
||||||
|
r = '.*{\s*"%s":\s*"%s"\s*}$' % (json_a, json_b)
|
||||||
|
self.assertThat(resp, matchers.MatchesRegex(r, re.M | re.S))
|
||||||
|
|
||||||
|
def test_endpoint_not_found(self):
|
||||||
|
path = '/%s' % uuid.uuid4().hex
|
||||||
|
public_url = '%s%s' % (PUBLIC_SERVICE_URL, path)
|
||||||
|
service_mock = self.requests_mock.get(public_url)
|
||||||
|
|
||||||
|
e = self.assertRaises(shell.ErrorExit,
|
||||||
|
self.shell,
|
||||||
|
'get', path,
|
||||||
|
'--os-service-type', uuid.uuid4().hex,
|
||||||
|
'--os-auth-type', 'password',
|
||||||
|
'--os-auth-url', AUTH_URL,
|
||||||
|
'--os-project-id', self.project_id,
|
||||||
|
'--os-user-id', self.user_id)
|
||||||
|
self.assertIn('Failed to find an endpoint in the service ', e.message)
|
||||||
|
|
||||||
|
def test_headers(self):
|
||||||
|
path = '/%s' % uuid.uuid4().hex
|
||||||
|
public_url = '%s%s' % (PUBLIC_SERVICE_URL, path)
|
||||||
|
|
||||||
|
json_a = uuid.uuid4().hex
|
||||||
|
json_b = uuid.uuid4().hex
|
||||||
|
|
||||||
|
service_mock = self.requests_mock.get(public_url)
|
||||||
|
|
||||||
|
header_key = uuid.uuid4().hex
|
||||||
|
header_val = uuid.uuid4().hex
|
||||||
|
|
||||||
|
self.shell('get', path,
|
||||||
|
'%s:%s' % (header_key, header_val),
|
||||||
|
'--os-service-type', self.service_type,
|
||||||
|
'--os-auth-type', 'password',
|
||||||
|
'--os-auth-url', AUTH_URL,
|
||||||
|
'--os-project-id', self.project_id,
|
||||||
|
'--os-user-id', self.user_id)
|
||||||
|
|
||||||
|
self.assertEqual(header_val,
|
||||||
|
self.requests_mock.last_request.headers[header_key])
|
||||||
|
@ -6,10 +6,12 @@ hacking<0.11,>=0.10.0
|
|||||||
|
|
||||||
coverage>=3.6
|
coverage>=3.6
|
||||||
discover
|
discover
|
||||||
|
fixtures>=3.0.0 # Apache-2.0/BSD
|
||||||
python-subunit>=0.0.18
|
python-subunit>=0.0.18
|
||||||
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
|
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
|
||||||
oslosphinx>=2.5.0 # Apache-2.0
|
oslosphinx>=2.5.0 # Apache-2.0
|
||||||
oslotest>=1.10.0 # Apache-2.0
|
oslotest>=1.10.0 # Apache-2.0
|
||||||
|
requests-mock>=0.7.0 # Apache-2.0
|
||||||
testrepository>=0.0.18
|
testrepository>=0.0.18
|
||||||
testscenarios>=0.4
|
testscenarios>=0.4
|
||||||
testtools>=1.4.0
|
testtools>=1.4.0
|
||||||
|
Loading…
Reference in New Issue
Block a user