Add support for uploading signed test results

Test results with digital signature can be uploaded into Refstack.
For correct signature verification request should contain extra headers
X-Signature and X-Public-Key. Public key saves into database as metadata
to test result. Results with broken signature are ejected with HTTP error 400.
Uploading of unsigned results are not affected.

Needed-by: https://review.openstack.org/#/c/169847/

Change-Id: Ia61715a45c99e7d674f804afb0f0287ed057b1e2
This commit is contained in:
sslypushenko 2015-04-01 20:16:33 +03:00
parent 649137a0b0
commit a0f6974cc2
9 changed files with 322 additions and 223 deletions

View File

@ -25,6 +25,8 @@ from oslo_log import loggers
import pecan import pecan
import webob import webob
from refstack.common import validators
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
PROJECT_ROOT = os.path.join(os.path.dirname(os.path.abspath(__file__)), PROJECT_ROOT = os.path.join(os.path.dirname(os.path.abspath(__file__)),
@ -81,24 +83,22 @@ class JSONErrorHook(pecan.hooks.PecanHook):
def on_error(self, state, exc): def on_error(self, state, exc):
"""Request error handler.""" """Request error handler."""
if isinstance(exc, webob.exc.HTTPError): if isinstance(exc, webob.exc.HTTPError):
body = {'code': exc.status_int, status_code = exc.status_int
'title': exc.title} body = {'title': exc.title}
if self.debug: elif isinstance(exc, validators.ValidationError):
body['detail'] = str(exc) status_code = 400
return webob.Response( body = {'title': exc.title}
body=json.dumps(body),
status=exc.status,
content_type='application/json'
)
else: else:
LOG.exception(exc) LOG.exception(exc)
body = {'code': 500, status_code = 500
'title': 'Internal Server Error'} body = {'title': 'Internal Server Error'}
body['code'] = status_code
if self.debug: if self.debug:
body['detail'] = str(exc) body['detail'] = str(exc)
return webob.Response( return webob.Response(
body=json.dumps(body), body=json.dumps(body),
status=500, status=status_code,
content_type='application/json' content_type='application/json'
) )

View File

@ -14,6 +14,8 @@
# under the License. # under the License.
"""Version 1 of the API.""" """Version 1 of the API."""
import json
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import pecan import pecan
@ -53,16 +55,21 @@ class BaseRestControllerWithValidation(rest.RestController):
GET base_url/schema GET base_url/schema
""" """
def __init__(self, validator): __validator__ = None
self.validator = validator
def __init__(self): # pragma: no cover
if self.__validator__:
self.validator = self.__validator__()
else:
raise ValueError("__validator__ is not defined")
def get_item(self, item_id): # pragma: no cover def get_item(self, item_id): # pragma: no cover
"""Handler for getting item""" """Handler for getting item"""
raise NotImplemented raise NotImplementedError
def store_item(self, item_in_json): # pragma: no cover def store_item(self, item_in_json): # pragma: no cover
"""Handler for storing item. Should return new item id""" """Handler for storing item. Should return new item id"""
raise NotImplemented raise NotImplementedError
@pecan.expose('json') @pecan.expose('json')
def get_one(self, arg): def get_one(self, arg):
@ -81,7 +88,8 @@ class BaseRestControllerWithValidation(rest.RestController):
@pecan.expose('json') @pecan.expose('json')
def post(self, ): def post(self, ):
"""POST handler.""" """POST handler."""
item = validators.safe_load_json_body(self.validator) self.validator.validate(pecan.request)
item = json.loads(pecan.request.body)
item_id = self.store_item(item) item_id = self.store_item(item)
pecan.response.status = 201 pecan.response.status = 201
return item_id return item_id
@ -91,6 +99,8 @@ class ResultsController(BaseRestControllerWithValidation):
"""/v1/results handler.""" """/v1/results handler."""
__validator__ = validators.TestResultValidator
def get_item(self, item_id): def get_item(self, item_id):
"""Handler for getting item""" """Handler for getting item"""
test_info = db.get_test(item_id) test_info = db.get_test(item_id)
@ -105,7 +115,14 @@ class ResultsController(BaseRestControllerWithValidation):
def store_item(self, item_in_json): def store_item(self, item_in_json):
"""Handler for storing item. Should return new item id""" """Handler for storing item. Should return new item id"""
test_id = db.store_results(item_in_json) item = item_in_json.copy()
if pecan.request.headers.get('X-Public-Key'):
if 'metadata' not in item:
item['metadata'] = {}
item['metadata']['public_key'] = \
pecan.request.headers.get('X-Public-Key')
test_id = db.store_results(item)
LOG.debug(item)
return {'test_id': test_id} return {'test_id': test_id}
@pecan.expose('json') @pecan.expose('json')
@ -150,12 +167,11 @@ class ResultsController(BaseRestControllerWithValidation):
'cpid': r.cpid 'cpid': r.cpid
}) })
page = {} page = {'results': results,
page['results'] = results 'pagination': {
page['pagination'] = {
'current_page': page_number, 'current_page': page_number,
'total_pages': total_pages_number 'total_pages': total_pages_number
} }}
except Exception as ex: except Exception as ex:
LOG.debug('An error occurred during ' LOG.debug('An error occurred during '
'operation with database: %s' % ex) 'operation with database: %s' % ex)
@ -168,4 +184,4 @@ class V1Controller(object):
"""Version 1 API controller root.""" """Version 1 API controller root."""
results = ResultsController(validators.TestResultValidator()) results = ResultsController()

View File

@ -16,15 +16,36 @@
""" Validators module """ Validators module
""" """
import binascii
import uuid import uuid
import json import json
import jsonschema import jsonschema
import pecan from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
ext_format_checker = jsonschema.FormatChecker() ext_format_checker = jsonschema.FormatChecker()
class ValidationError(Exception):
def __init__(self, title, exc=None):
super(ValidationError, self).__init__(title)
self.exc = exc
self.title = title
self.details = "%s(%s: %s)" % (self.title,
self.exc.__class__.__name__,
str(self.exc)) \
if self.exc else self.title
def __repr__(self):
return self.details
def __str__(self):
return self.__repr__()
def is_uuid(inst): def is_uuid(inst):
""" Check that inst is a uuid_hex string. """ """ Check that inst is a uuid_hex string. """
try: try:
@ -45,12 +66,22 @@ def checker_uuid(inst):
class Validator(object): class Validator(object):
"""Base class for validators""" """Base class for validators"""
def __init__(self):
self.schema = {} # pragma: no cover
def validate(self, json_data): def validate(self, request):
""" """
:param json_data: data for validation :param json_data: data for validation
""" """
jsonschema.validate(json_data, self.schema) try:
body = json.loads(request.body)
except (ValueError, TypeError) as e:
raise ValidationError('Malformed request', e)
try:
jsonschema.validate(body, self.schema)
except jsonschema.ValidationError as e:
raise ValidationError('Request doesn''t correspond to schema', e)
class TestResultValidator(Validator): class TestResultValidator(Validator):
@ -90,31 +121,26 @@ class TestResultValidator(Validator):
format_checker=ext_format_checker format_checker=ext_format_checker
) )
def validate(self, request):
super(TestResultValidator, self).validate(request)
if request.headers.get('X-Signature') or \
request.headers.get('X-Public-Key'):
try:
sign = binascii.a2b_hex(request.headers.get('X-Signature', ''))
except (binascii.Error, TypeError) as e:
raise ValidationError('Malformed signature', e)
try:
key = RSA.importKey(request.headers.get('X-Public-Key', ''))
except ValueError as e:
raise ValidationError('Malformed public key', e)
signer = PKCS1_v1_5.new(key)
data_hash = SHA256.new()
data_hash.update(request.body.encode('utf-8'))
if not signer.verify(data_hash, sign):
raise ValidationError('Signature verification failed')
@staticmethod @staticmethod
def assert_id(_id): def assert_id(_id):
""" Check that _id is a valid uuid_hex string. """ """ Check that _id is a valid uuid_hex string. """
return is_uuid(_id) return is_uuid(_id)
def safe_load_json_body(validator):
"""
Helper for load validated request body
:param validator: instance of Validator class
:return validated body
:raise ValueError, jsonschema.ValidationError
"""
body = ''
try:
body = json.loads(pecan.request.body)
except (ValueError, TypeError) as e:
pecan.abort(400, detail=e.message)
try:
validator.validate(body)
except jsonschema.ValidationError as e:
pecan.abort(400,
detail=e.message,
title='Malformed json data, '
'see %s/schema' % pecan.request.path_url)
return body

View File

@ -21,6 +21,7 @@ import uuid
from oslo_config import cfg from oslo_config import cfg
from oslo_db import options as db_options from oslo_db import options as db_options
from oslo_db.sqlalchemy import session as db_session from oslo_db.sqlalchemy import session as db_session
import six
from refstack.api import constants as api_const from refstack.api import constants as api_const
from refstack.db.sqlalchemy import models from refstack.db.sqlalchemy import models
@ -65,17 +66,19 @@ def store_results(results):
test.id = test_id test.id = test_id
test.cpid = results.get('cpid') test.cpid = results.get('cpid')
test.duration_seconds = results.get('duration_seconds') test.duration_seconds = results.get('duration_seconds')
received_test_results = results.get('results', [])
session = get_session() session = get_session()
with session.begin(): with session.begin():
test.save(session) for result in results.get('results', []):
for result in received_test_results:
test_result = models.TestResults() test_result = models.TestResults()
test_result.test_id = test_id test_result.test_id = test_id
test_result.name = result['name'] test_result.name = result['name']
test_result.uuid = result.get('uuid', None) test_result.uid = result.get('uuid', None)
test_result.save(session) test.results.append(test_result)
for k, v in six.iteritems(results.get('metadata', {})):
meta = models.TestMeta()
meta.meta_key, meta.value = k, v
test.meta.append(meta)
test.save(session)
return test_id return test_id

View File

@ -38,14 +38,16 @@ class ResultsControllerTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(ResultsControllerTestCase, self).setUp() super(ResultsControllerTestCase, self).setUp()
self.validator = mock.Mock() self.validator = mock.Mock()
self.controller = v1.ResultsController(self.validator) v1.ResultsController.__validator__ = \
mock.Mock(return_value=self.validator)
self.controller = v1.ResultsController()
self.config_fixture = config_fixture.Config() self.config_fixture = config_fixture.Config()
self.CONF = self.useFixture(self.config_fixture).conf self.CONF = self.useFixture(self.config_fixture).conf
@mock.patch('refstack.db.get_test') @mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results') @mock.patch('refstack.db.get_test_results')
def test_get(self, mock_get_test_res, mock_get_test): def test_get(self, mock_get_test_res, mock_get_test):
self.validator.assert_id.return_value = True self.validator.assert_id = mock.Mock(return_value=True)
test_info = mock.Mock() test_info = mock.Mock()
test_info.cpid = 'foo' test_info.cpid = 'foo'
@ -70,17 +72,34 @@ class ResultsControllerTestCase(base.BaseTestCase):
@mock.patch('refstack.db.store_results') @mock.patch('refstack.db.store_results')
@mock.patch('pecan.response') @mock.patch('pecan.response')
@mock.patch('refstack.common.validators.safe_load_json_body') @mock.patch('pecan.request')
def test_post(self, mock_safe_load, mock_response, mock_store_results): def test_post(self, mock_request, mock_response, mock_store_results):
mock_safe_load.return_value = 'fake_item' mock_request.body = '{"answer": 42}'
mock_request.headers = {}
mock_store_results.return_value = 'fake_test_id' mock_store_results.return_value = 'fake_test_id'
result = self.controller.post() result = self.controller.post()
self.assertEqual(result, {'test_id': 'fake_test_id'}) self.assertEqual(result, {'test_id': 'fake_test_id'})
self.assertEqual(mock_response.status, 201) self.assertEqual(mock_response.status, 201)
mock_safe_load.assert_called_once_with(self.validator) mock_store_results.assert_called_once_with({'answer': 42})
mock_store_results.assert_called_once_with('fake_item')
@mock.patch('refstack.db.store_results')
@mock.patch('pecan.response')
@mock.patch('pecan.request')
def test_post_with_sign(self, mock_request,
mock_response,
mock_store_results):
mock_request.body = '{"answer": 42}'
mock_request.headers = {
'X-Signature': 'fake-sign',
'X-Public-Key': 'fake-key'
}
mock_store_results.return_value = 'fake_test_id'
result = self.controller.post()
self.assertEqual(result, {'test_id': 'fake_test_id'})
self.assertEqual(mock_response.status, 201)
mock_store_results.assert_called_once_with(
{'answer': 42, 'metadata': {'public_key': 'fake-key'}}
)
@mock.patch('pecan.abort') @mock.patch('pecan.abort')
@mock.patch('refstack.db.get_test') @mock.patch('refstack.db.get_test')
@ -91,35 +110,6 @@ class ResultsControllerTestCase(base.BaseTestCase):
self.controller.get_item, self.controller.get_item,
'fake_id') 'fake_id')
@mock.patch('refstack.db.get_test')
@mock.patch('refstack.db.get_test_results')
def test_get_item(self, mock_get_test_res, mock_get_test):
test_info = mock.Mock()
test_info.cpid = 'foo'
test_info.created_at = 'bar'
test_info.duration_seconds = 999
mock_get_test.return_value = test_info
mock_get_test_res.return_value = [('test1',), ('test2',), ('test3',)]
actual_result = self.controller.get_item('fake_id')
expected_result = {
'cpid': 'foo',
'created_at': 'bar',
'duration_seconds': 999,
'results': ['test1', 'test2', 'test3']
}
self.assertEqual(actual_result, expected_result)
mock_get_test_res.assert_called_once_with('fake_id')
mock_get_test.assert_called_once_with('fake_id')
@mock.patch('refstack.db.store_results')
def test_store_item(self, mock_store_item):
mock_store_item.return_value = 'fake_result'
result = self.controller.store_item('fake_item')
self.assertEqual(result, {'test_id': 'fake_result'})
mock_store_item.assert_called_once_with('fake_item')
@mock.patch('pecan.abort') @mock.patch('pecan.abort')
@mock.patch('refstack.api.utils.parse_input_params') @mock.patch('refstack.api.utils.parse_input_params')
def test_get_failed_in_parse_input_params(self, def test_get_failed_in_parse_input_params(self,
@ -236,20 +226,21 @@ class BaseRestControllerWithValidationTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(BaseRestControllerWithValidationTestCase, self).setUp() super(BaseRestControllerWithValidationTestCase, self).setUp()
self.validator = mock.Mock() self.validator = mock.Mock()
self.controller = v1.BaseRestControllerWithValidation(self.validator) v1.BaseRestControllerWithValidation.__validator__ = \
mock.Mock(return_value=self.validator)
self.controller = v1.BaseRestControllerWithValidation()
@mock.patch('pecan.response') @mock.patch('pecan.response')
@mock.patch('refstack.common.validators.safe_load_json_body') @mock.patch('pecan.request')
def test_post(self, mock_safe_load, mock_response): def test_post(self, mock_request, mock_response):
mock_safe_load.return_value = 'fake_item' mock_request.body = '[42]'
self.controller.store_item = mock.Mock(return_value='fake_id') self.controller.store_item = mock.Mock(return_value='fake_id')
result = self.controller.post() result = self.controller.post()
self.assertEqual(result, 'fake_id') self.assertEqual(result, 'fake_id')
self.assertEqual(mock_response.status, 201) self.assertEqual(mock_response.status, 201)
mock_safe_load.assert_called_once_with(self.validator) self.controller.store_item.assert_called_once_with([42])
self.controller.store_item.assert_called_once_with('fake_item')
def test_get_one_return_item(self): def test_get_one_return_item(self):
self.validator.assert_id.return_value = True self.validator.assert_id.return_value = True
@ -268,7 +259,7 @@ class BaseRestControllerWithValidationTestCase(base.BaseTestCase):
self.assertEqual(result, 'fake_schema') self.assertEqual(result, 'fake_schema')
@mock.patch('pecan.abort') @mock.patch('pecan.abort')
def test_get_one_aborut(self, mock_abort): def test_get_one_abort(self, mock_abort):
self.validator.assert_id.return_value = False self.validator.assert_id = mock.Mock(return_value=False)
self.controller.get_one('fake_arg') self.controller.get_one('fake_arg')
mock_abort.assert_called_once_with(404) mock_abort.assert_called_once_with(404)

View File

@ -23,6 +23,14 @@ from oslotest import base
import webob import webob
from refstack.api import app from refstack.api import app
from refstack.common import validators
def get_response_kwargs(response_mock):
_, kwargs = response_mock.call_args
if kwargs['body']:
kwargs['body'] = json.loads(kwargs.get('body', ''))
return kwargs
class JSONErrorHookTestCase(base.BaseTestCase): class JSONErrorHookTestCase(base.BaseTestCase):
@ -32,89 +40,71 @@ class JSONErrorHookTestCase(base.BaseTestCase):
self.config_fixture = config_fixture.Config() self.config_fixture = config_fixture.Config()
self.CONF = self.useFixture(self.config_fixture).conf self.CONF = self.useFixture(self.config_fixture).conf
def test_on_error_with_webob_instance(self): def _on_error(self, response, exc, expected_status_code, expected_body):
self.CONF.set_override('app_dev_mode',
False,
'api')
exc = mock.Mock(spec=webob.exc.HTTPError)
exc.status_int = 999
exc.status = 111
exc.title = 'fake_title'
with mock.patch.object(webob, 'Response') as response:
response.return_value = 'fake_value' response.return_value = 'fake_value'
hook = app.JSONErrorHook() hook = app.JSONErrorHook()
result = hook.on_error(mock.Mock(), exc) result = hook.on_error(mock.Mock(), exc)
self.assertEqual(result, 'fake_value') self.assertEqual(result, 'fake_value')
body = {'code': exc.status_int, 'title': exc.title} self.assertEqual(
response.assert_called_once_with(body=json.dumps(body), dict(body=expected_body,
status=exc.status, status=expected_status_code,
content_type='application/json') content_type='application/json'),
get_response_kwargs(response)
def test_on_error_with_webob_instance_with_debug(self): )
self.CONF.set_override('app_dev_mode',
True,
'api')
exc = mock.Mock(spec=webob.exc.HTTPError)
exc.status_int = 999
exc.status = 111
exc.title = 'fake_title'
with mock.patch.object(webob, 'Response') as response:
response.return_value = 'fake_value'
hook = app.JSONErrorHook()
result = hook.on_error(mock.Mock(), exc)
self.assertEqual(result, 'fake_value')
body = {
'code': exc.status_int,
'title': exc.title,
'detail': str(exc)
}
response.assert_called_once_with(body=json.dumps(body),
status=exc.status,
content_type='application/json')
@mock.patch.object(webob, 'Response') @mock.patch.object(webob, 'Response')
def test_on_error_not_webob_instance(self, response): def test_on_error_with_webob_instance(self, response):
self.CONF.set_override('app_dev_mode', self.CONF.set_override('app_dev_mode', False, 'api')
False, exc = mock.Mock(spec=webob.exc.HTTPError,
'api') status=418, status_int=418,
response.return_value = 'fake_value' title='fake_title')
exc = mock.Mock()
hook = app.JSONErrorHook() self._on_error(
result = hook.on_error(mock.Mock(), exc) response, exc, expected_status_code=exc.status,
expected_body={'code': exc.status_int, 'title': exc.title}
)
self.assertEqual(result, 'fake_value') self.CONF.set_override('app_dev_mode', True, 'api')
body = {'code': 500, 'title': 'Internal Server Error'} self._on_error(
response.assert_called_once_with(body=json.dumps(body), response, exc, expected_status_code=exc.status,
status=500, expected_body={'code': exc.status_int, 'title': exc.title,
content_type='application/json') 'detail': str(exc)}
)
@mock.patch.object(webob, 'Response') @mock.patch.object(webob, 'Response')
def test_on_error_not_webob_instance_with_debug(self, response): def test_on_error_with_validation_error(self, response):
self.CONF.set_override('app_dev_mode', self.CONF.set_override('app_dev_mode', False, 'api')
True, exc = mock.Mock(spec=validators.ValidationError,
'api') title='No No No!')
response.return_value = 'fake_value'
exc = mock.Mock()
hook = app.JSONErrorHook() self._on_error(
result = hook.on_error(mock.Mock(), exc) response, exc, expected_status_code=400,
expected_body={'code': 400, 'title': exc.title}
)
self.assertEqual(result, 'fake_value') self.CONF.set_override('app_dev_mode', True, 'api')
body = { self._on_error(
'code': 500, response, exc, expected_status_code=400,
'title': 'Internal Server Error', expected_body={'code': 400, 'title': exc.title,
'detail': str(exc) 'detail': str(exc)}
} )
response.assert_called_once_with(body=json.dumps(body),
status=500, @mock.patch.object(webob, 'Response')
content_type='application/json') def test_on_error_with_other_exceptions(self, response):
self.CONF.set_override('app_dev_mode', False, 'api')
exc = mock.Mock(status=500)
self._on_error(
response, exc, expected_status_code=500,
expected_body={'code': 500, 'title': 'Internal Server Error'}
)
self.CONF.set_override('app_dev_mode', True, 'api')
self._on_error(
response, exc, expected_status_code=500,
expected_body={'code': 500, 'title': 'Internal Server Error',
'detail': str(exc)}
)
class SetupAppTestCase(base.BaseTestCase): class SetupAppTestCase(base.BaseTestCase):

View File

@ -99,8 +99,9 @@ class DBBackendTestCase(base.BaseTestCase):
@mock.patch.object(api, 'get_session') @mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.TestResults') @mock.patch('refstack.db.sqlalchemy.models.TestResults')
@mock.patch('refstack.db.sqlalchemy.models.Test') @mock.patch('refstack.db.sqlalchemy.models.Test')
@mock.patch('refstack.db.sqlalchemy.models.TestMeta')
@mock.patch('uuid.uuid4') @mock.patch('uuid.uuid4')
def test_store_results(self, mock_uuid, mock_test, def test_store_results(self, mock_uuid, mock_test_meta, mock_test,
mock_test_result, mock_get_session): mock_test_result, mock_get_session):
fake_tests_result = { fake_tests_result = {
'cpid': 'foo', 'cpid': 'foo',
@ -108,7 +109,8 @@ class DBBackendTestCase(base.BaseTestCase):
'results': [ 'results': [
{'name': 'tempest.some.test'}, {'name': 'tempest.some.test'},
{'name': 'tempest.test', 'uid': '12345678'} {'name': 'tempest.test', 'uid': '12345678'}
] ],
'metadata': {'answer': 42}
} }
_id = 12345 _id = 12345
@ -130,14 +132,11 @@ class DBBackendTestCase(base.BaseTestCase):
session.begin.assert_called_once_with() session.begin.assert_called_once_with()
self.assertEqual(test_id, six.text_type(_id)) self.assertEqual(test_id, six.text_type(_id))
self.assertEqual(test.id, six.text_type(_id))
self.assertEqual(test.cpid, fake_tests_result['cpid']) self.assertEqual(test.cpid, fake_tests_result['cpid'])
self.assertEqual(test.duration_seconds, self.assertEqual(test.duration_seconds,
fake_tests_result['duration_seconds']) fake_tests_result['duration_seconds'])
self.assertEqual(mock_test_result.call_count, self.assertEqual(mock_test_result.call_count,
len(fake_tests_result['results'])) len(fake_tests_result['results']))
self.assertEqual(test_result.save.call_count,
len(fake_tests_result['results']))
@mock.patch.object(api, 'get_session') @mock.patch.object(api, 'get_session')
@mock.patch('refstack.db.sqlalchemy.models.Test') @mock.patch('refstack.db.sqlalchemy.models.Test')

View File

@ -14,12 +14,16 @@
# under the License. # under the License.
"""Tests for validators.""" """Tests for validators."""
import binascii
import json import json
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
import jsonschema import jsonschema
import mock import mock
from oslotest import base from oslotest import base
import six
from refstack.common import validators from refstack.common import validators
@ -27,6 +31,22 @@ from refstack.common import validators
class ValidatorsTestCase(base.BaseTestCase): class ValidatorsTestCase(base.BaseTestCase):
"""Test case for validator's helpers.""" """Test case for validator's helpers."""
def test_str_validation_error(self):
err = validators.ValidationError(
'Something went wrong!',
AttributeError("'NoneType' object has no attribute 'a'")
)
self.assertEqual(err.title, 'Something went wrong!')
self.assertEqual("%s(%s: %s)" % (
'Something went wrong!',
'AttributeError',
"'NoneType' object has no attribute 'a'"
), str(err))
err = validators.ValidationError(
'Something went wrong again!'
)
self.assertEqual('Something went wrong again!', str(err))
def test_is_uuid(self): def test_is_uuid(self):
self.assertTrue(validators.is_uuid('12345678123456781234567812345678')) self.assertTrue(validators.is_uuid('12345678123456781234567812345678'))
@ -42,16 +62,16 @@ class ValidatorsTestCase(base.BaseTestCase):
class TestResultValidatorTestCase(base.BaseTestCase): class TestResultValidatorTestCase(base.BaseTestCase):
"""Test case for database TestResultValidator.""" """Test case for TestResultValidator."""
FAKE_TESTS_RESULTS_JSON = json.dumps({ FAKE_TESTS_RESULTS_JSON = {
'cpid': 'foo', 'cpid': 'foo',
'duration_seconds': 10, 'duration_seconds': 10,
'results': [ 'results': [
{'name': 'tempest.some.test'}, {'name': 'tempest.some.test'},
{'name': 'tempest.test', 'uid': '12345678'} {'name': 'tempest.test', 'uid': '12345678'}
] ]
}) }
def setUp(self): def setUp(self):
super(TestResultValidatorTestCase, self).setUp() super(TestResultValidatorTestCase, self).setUp()
@ -66,41 +86,95 @@ class TestResultValidatorTestCase(base.BaseTestCase):
def test_validation(self): def test_validation(self):
with mock.patch('jsonschema.validate') as mock_validate: with mock.patch('jsonschema.validate') as mock_validate:
self.validator.validate(self.FAKE_TESTS_RESULTS_JSON) request = mock.Mock()
request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
request.headers = {}
self.validator.validate(request)
mock_validate.assert_called_once_with(self.FAKE_TESTS_RESULTS_JSON, mock_validate.assert_called_once_with(self.FAKE_TESTS_RESULTS_JSON,
self.validator.schema) self.validator.schema)
@mock.patch('jsonschema.validate')
def test_validation_with_signature(self, mock_validate):
if six.PY3:
self.skip('https://github.com/dlitz/pycrypto/issues/99')
request = mock.Mock()
request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
data_hash = SHA256.new()
data_hash.update(request.body.encode('utf-8'))
key = RSA.generate(4096)
signer = PKCS1_v1_5.new(key)
sign = signer.sign(data_hash)
request.headers = {
'X-Signature': binascii.b2a_hex(sign),
'X-Public-Key': key.publickey().exportKey('OpenSSH')
}
self.validator.validate(request)
mock_validate.assert_called_once_with(self.FAKE_TESTS_RESULTS_JSON,
self.validator.schema)
def test_validation_fail_no_json(self):
wrong_request = mock.Mock()
wrong_request.body = 'foo'
self.assertRaises(validators.ValidationError,
self.validator.validate,
wrong_request)
try:
self.validator.validate(wrong_request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, ValueError)
def test_validation_fail(self): def test_validation_fail(self):
wrong_tests_result = json.dumps({ wrong_request = mock.Mock()
wrong_request.body = json.dumps({
'foo': 'bar' 'foo': 'bar'
}) })
self.assertRaises(jsonschema.ValidationError, self.assertRaises(validators.ValidationError,
self.validator.validate, self.validator.validate,
wrong_tests_result) wrong_request)
try:
self.validator.validate(wrong_request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, jsonschema.ValidationError)
@mock.patch('pecan.request') @mock.patch('jsonschema.validate')
def test_safe_load_json_body(self, mock_request): def test_validation_with_broken_signature(self, mock_validate):
mock_request.body = self.FAKE_TESTS_RESULTS_JSON if six.PY3:
actual_result = validators.safe_load_json_body(self.validator) self.skip('https://github.com/dlitz/pycrypto/issues/99')
self.assertEqual(actual_result,
json.loads(self.FAKE_TESTS_RESULTS_JSON))
@mock.patch('pecan.abort') request = mock.Mock()
@mock.patch('pecan.request') request.body = json.dumps(self.FAKE_TESTS_RESULTS_JSON)
def test_safe_load_json_body_invalid_json(self, mock_request, mock_abort): key = RSA.generate(2048)
mock_request.body = {} request.headers = {
mock_abort.side_effect = Exception() 'X-Signature': binascii.b2a_hex('fake_sign'.encode('utf-8')),
self.assertRaises(Exception, 'X-Public-Key': key.publickey().exportKey('OpenSSH')
validators.safe_load_json_body, }
self.validator) self.assertRaises(validators.ValidationError,
self.validator.validate,
request)
request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'.encode('utf-8')),
'X-Public-Key': key.publickey().exportKey('OpenSSH')
}
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertEqual(e.title,
'Signature verification failed')
@mock.patch('pecan.abort') request.headers = {
@mock.patch('pecan.request') 'X-Signature': 'z-z-z-z!!!',
def test_safe_load_json_body_invalid_schema(self, 'X-Public-Key': key.publickey().exportKey('OpenSSH')
mock_request, }
mock_abort): try:
mock_request.body = json.dumps({'foo': 'bar'}) self.validator.validate(request)
mock_abort.side_effect = Exception() except validators.ValidationError as e:
self.assertRaises(Exception, self.assertIsInstance(e.exc, TypeError)
validators.safe_load_json_body,
self.validator) request.headers = {
'X-Signature': binascii.b2a_hex('fake_sign'),
'X-Public-Key': 'H--0'
}
try:
self.validator.validate(request)
except validators.ValidationError as e:
self.assertIsInstance(e.exc, ValueError)

View File

@ -7,6 +7,6 @@ oslo.db>=1.4.1 # Apache-2.0
oslo.log oslo.log
pecan>=0.8.2 pecan>=0.8.2
pyOpenSSL==0.13 pyOpenSSL==0.13
pycrypto==2.6 pycrypto>=2.6
requests==1.2.3 requests==1.2.3
jsonschema>=2.0.0,<3.0.0 jsonschema>=2.0.0,<3.0.0