Refactor action execution controller and tests

* Fixing custom WSME type for json fields. Making it able to convert
  values in both direction (to base type and from base type).
* Changed type of ActionExecution resource fields 'input' and 'output'
  to new custom json type.
* Fixed action execution controller where necessary.
* Adjusted and refactored action execution controller tests.

Change-Id: I28d0f3204518a2688cb72acefd910d9d99ada66d
This commit is contained in:
Renat Akhmerov 2015-08-27 15:50:07 +06:00
parent d8a3e3fc75
commit a4175da422
5 changed files with 98 additions and 129 deletions

View File

@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_log import log as logging
from pecan import rest
from wsme import types as wtypes
@ -50,28 +48,12 @@ class ActionExecution(resource.Resource):
name = wtypes.text
description = wtypes.text
accepted = bool
input = wtypes.text
output = wtypes.text
input = types.jsontype
output = types.jsontype
created_at = wtypes.text
updated_at = wtypes.text
# Add this param to make Mistral API work with WSME 0.8.0 or higher version
params = types.jsontype
@classmethod
def from_dict(cls, d):
e = cls()
for key, val in d.items():
if hasattr(e, key):
# Nonetype check for dictionary must be explicit.
if val is not None and (
key == 'input' or key == 'output' or key == 'params'):
val = json.dumps(val)
setattr(e, key, val)
return e
@classmethod
def sample(cls):
return cls(
@ -86,8 +68,8 @@ class ActionExecution(resource.Resource):
name='std.echo',
description='My running action',
accepted=True,
input='{"first_name": "John", "last_name": "Doe"}',
output='{"some_output": "Hello, John Doe!"}',
input={'first_name': 'John', 'last_name': 'Doe'},
output={'some_output': 'Hello, John Doe!'},
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000',
params={'save_result': True}
@ -136,14 +118,12 @@ def _get_action_executions(task_execution_id=None):
if task_execution_id:
kwargs['task_execution_id'] = task_execution_id
action_executions = []
action_execs = [
_get_action_execution_resource(a_ex)
for a_ex in db_api.get_action_executions(**kwargs)
]
for action_ex in db_api.get_action_executions(**kwargs):
action_executions.append(
_get_action_execution_resource(action_ex)
)
return ActionExecutions(action_executions=action_executions)
return ActionExecutions(action_executions=action_execs)
class ActionExecutionsController(rest.RestController):
@ -156,29 +136,16 @@ class ActionExecutionsController(rest.RestController):
return _get_action_execution(id)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose({wtypes.text: wtypes.text},
@wsme_pecan.wsexpose(ActionExecution,
body=ActionExecution, status_code=201)
def post(self, action_execution):
def post(self, action_ex):
"""Create new action_execution."""
LOG.info("Create action_execution [action_execution=%s]" %
action_execution)
LOG.info("Create action_execution [action_execution=%s]" % action_ex)
action_input = action_execution.input or None
description = action_execution.description or None
params = action_execution.params or {}
name = action_execution.name
if action_input:
try:
action_input = json.loads(action_execution.input)
if not isinstance(action_input, dict):
raise TypeError("Input should be dict type.")
except (TypeError, ValueError) as e:
raise exc.InputException(
"Input should be JSON-serialized dict string. Actual: %s, "
"error: %s" % (action_execution.input, e)
)
name = action_ex.name
description = action_ex.description or None
action_input = action_ex.input or {}
params = action_ex.params or {}
if not name:
raise exc.InputException(
@ -192,35 +159,25 @@ class ActionExecutionsController(rest.RestController):
**params
)
return ActionExecution.from_dict(action_ex).to_dict()
return ActionExecution.from_dict(action_ex)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(ActionExecution, wtypes.text, body=ActionExecution)
def put(self, id, action_execution):
def put(self, id, action_ex):
"""Update the specified action_execution."""
LOG.info(
"Update action_execution [id=%s, action_execution=%s]"
% (id, action_execution)
% (id, action_ex)
)
# Client must provide a valid json. It shouldn't necessarily be an
# object but it should be json complaint so strings have to be escaped.
output = None
if action_execution.output:
try:
output = json.loads(action_execution.output)
except (ValueError, TypeError) as e:
raise exc.InvalidResultException(str(e))
if action_execution.state == states.SUCCESS:
result = wf_utils.Result(data=output)
elif action_execution.state == states.ERROR:
result = wf_utils.Result(error=output)
if action_ex.state == states.SUCCESS:
result = wf_utils.Result(data=action_ex.output)
elif action_ex.state == states.ERROR:
result = wf_utils.Result(error=action_ex.output)
else:
raise exc.InvalidResultException(
"Error. Expected on of %s, actual: %s" %
([states.SUCCESS, states.ERROR], action_execution.state)
([states.SUCCESS, states.ERROR], action_ex.state)
)
values = rpc.get_engine_client().on_action_complete(id, result)

View File

@ -95,18 +95,24 @@ class JsonType(wtypes.UserType):
basetype = wtypes.text
name = 'json'
@staticmethod
def validate(value):
try:
json.dumps(value)
except TypeError:
raise exc.InputException('%s is not JSON serializable' % value)
else:
return value
def validate(self, value):
if not value:
return {}
@staticmethod
def frombasetype(value):
return JsonType.validate(value) if value is not None else None
if not isinstance(value, dict):
raise exc.InputException(
'JsonType field value must be a dictionary [actual=%s]' % value
)
return value
def frombasetype(self, value):
# Value must a string.
return json.loads(value) if value is not None else None
def tobasetype(self, value):
# Value must a dict.
return json.dumps(value) if value is not None else None
uuid = UuidType()

View File

@ -1027,11 +1027,12 @@ class ActionExecutionTestsV2(base.TestCase):
{
'name': 'std.echo',
'input': '{"output": "Hello, Mistral!"}',
'params': {"save_result": True}
'params': '{"save_result": true}'
}
)
self.assertEqual(201, resp.status)
body = json.loads(body)
self.assertEqual('RUNNING', body['state'])

View File

@ -52,15 +52,15 @@ def find_items(items, **props):
class MistralClientBase(rest_client.RestClient):
def __init__(self, auth_provider, service_type):
super(MistralClientBase, self).__init__(
auth_provider=auth_provider,
service=service_type,
region=CONF.identity.region)
region=CONF.identity.region
)
if service_type not in ('workflow', 'workflowv2'):
msg = ("Invalid parameter 'service_type'. ")
msg = "Invalid parameter 'service_type'. "
raise exceptions.UnprocessableEntity(msg)
self.endpoint_url = 'publicURL'
@ -73,6 +73,7 @@ class MistralClientBase(rest_client.RestClient):
def get_list_obj(self, name):
resp, body = self.get(name)
return resp, json.loads(body)
def delete_obj(self, obj, name):
@ -80,6 +81,7 @@ class MistralClientBase(rest_client.RestClient):
def get_object(self, obj, id):
resp, body = self.get('{obj}/{id}'.format(obj=obj, id=id))
return resp, json.loads(body)
def wait_execution_success(self, ex_body, timeout=180):
@ -108,21 +110,19 @@ class MistralClientBase(rest_client.RestClient):
class MistralClientV2(MistralClientBase):
def post_request(self, url, file_name):
text = get_resource(file_name)
headers = {"headers": "Content-Type:text/plain"}
return self.post(url, text, headers=headers)
return self.post(url, get_resource(file_name), headers=headers)
def post_json(self, url, obj):
text = json.dumps(obj)
headers = {"Content-Type": "application/json"}
return self.post(url, text, headers=headers)
return self.post(url, json.dumps(obj), headers=headers)
def update_request(self, url, file_name):
text = get_resource(file_name)
headers = {"headers": "Content-Type:text/plain"}
resp, body = self.put(url, text, headers=headers)
resp, body = self.put(url, get_resource(file_name), headers=headers)
return resp, json.loads(body)
@ -138,6 +138,7 @@ class MistralClientV2(MistralClientBase):
self.workbooks.append(wb_name)
_, wfs = self.get_list_obj('workflows')
for wf in wfs['workflows']:
if wf['name'].startswith(wb_name):
self.workflows.append(wf['name'])
@ -154,6 +155,7 @@ class MistralClientV2(MistralClientBase):
def create_execution(self, wf_name, wf_input=None, params=None):
body = {"workflow_name": "%s" % wf_name}
if wf_input:
body.update({'input': json.dumps(wf_input)})
if params:
@ -179,6 +181,7 @@ class MistralClientV2(MistralClientBase):
'remaining_executions': count,
'first_execution_time': first_time
}
if wf_input:
post_body.update({'workflow_input': json.dumps(wf_input)})
@ -203,7 +206,6 @@ class MistralClientV2(MistralClientBase):
class AuthProv(auth.KeystoneV2AuthProvider):
def __init__(self):
self.alt_part = None
@ -221,7 +223,6 @@ class AuthProv(auth.KeystoneV2AuthProvider):
class TestCase(test.BaseTestCase):
@classmethod
def setUpClass(cls):
"""This method allows to initialize authentication before
@ -247,6 +248,7 @@ class TestCase(test.BaseTestCase):
for wb in self.client.workbooks:
self.client.delete_obj('workbooks', wb)
self.client.workbooks = []
@ -269,10 +271,12 @@ class TestCaseAdvanced(TestCase):
def tearDown(self):
for wb in self.client.workbooks:
self.client.delete_obj('workbooks', wb)
self.client.workbooks = []
for ex in self.client.executions:
self.client.delete_obj('executions', ex)
self.client.executions = []
super(TestCaseAdvanced, self).tearDown()

View File

@ -28,7 +28,7 @@ from mistral.workflow import states
from mistral.workflow import utils as wf_utils
action_ex = models.ActionExecution(
ACTION_EX_DB = models.ActionExecution(
id='123',
workflow_name='flow',
task_execution=models.TaskExecution(name='task1'),
@ -45,7 +45,7 @@ action_ex = models.ActionExecution(
updated_at=datetime.datetime(1970, 1, 1)
)
ACTION_EXEC = {
ACTION_EX = {
'id': '123',
'workflow_name': 'flow',
'task_execution_id': '333',
@ -62,25 +62,25 @@ ACTION_EXEC = {
'updated_at': '1970-01-01 00:00:00'
}
UPDATED_ACTION_EX = copy.copy(action_ex).to_dict()
UPDATED_ACTION_EX['state'] = 'SUCCESS'
UPDATED_ACTION_EX['task_name'] = 'task1'
UPDATED_ACTION = copy.copy(ACTION_EXEC)
UPDATED_ACTION_EX_DB = copy.copy(ACTION_EX_DB).to_dict()
UPDATED_ACTION_EX_DB['state'] = 'SUCCESS'
UPDATED_ACTION_EX_DB['task_name'] = 'task1'
UPDATED_ACTION = copy.copy(ACTION_EX)
UPDATED_ACTION['state'] = 'SUCCESS'
UPDATED_ACTION_OUTPUT = UPDATED_ACTION['output']
ERROR_ACTION_EX = copy.copy(action_ex).to_dict()
ERROR_ACTION_EX = copy.copy(ACTION_EX_DB).to_dict()
ERROR_ACTION_EX['state'] = 'ERROR'
ERROR_ACTION_EX['task_name'] = 'task1'
ERROR_ACTION = copy.copy(ACTION_EXEC)
ERROR_ACTION = copy.copy(ACTION_EX)
ERROR_ACTION['state'] = 'ERROR'
ERROR_ACTION_RES = ERROR_ACTION['output']
BROKEN_ACTION = copy.copy(ACTION_EXEC)
BROKEN_ACTION = copy.copy(ACTION_EX)
BROKEN_ACTION['output'] = 'string not escaped'
MOCK_ACTION = mock.MagicMock(return_value=action_ex)
MOCK_ACTIONS = mock.MagicMock(return_value=[action_ex])
MOCK_ACTION = mock.MagicMock(return_value=ACTION_EX_DB)
MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_EX_DB])
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
@ -91,7 +91,7 @@ class TestActionExecutionsController(base.FunctionalTest):
resp = self.app.get('/v2/action_executions/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(ACTION_EXEC, resp.json)
self.assertDictEqual(ACTION_EX, resp.json)
@mock.patch.object(db_api, 'get_action_execution', MOCK_NOT_FOUND)
def test_get_not_found(self):
@ -101,35 +101,35 @@ class TestActionExecutionsController(base.FunctionalTest):
@mock.patch.object(rpc.EngineClient, 'start_action')
def test_post(self, f):
f.return_value = action_ex.to_dict()
f.return_value = ACTION_EX_DB.to_dict()
resp = self.app.post_json(
'/v2/action_executions',
{
'name': 'std.echo',
'input': '{}',
'params': {'save_result': True}
'input': "{}",
'params': '{"save_result": true}'
}
)
self.assertEqual(resp.status_int, 201)
action_exec = ACTION_EXEC
action_exec = ACTION_EX
del action_exec['task_name']
self.assertDictEqual(ACTION_EXEC, resp.json)
self.assertDictEqual(ACTION_EX, resp.json)
f.assert_called_once_with(
ACTION_EXEC['name'],
json.loads(ACTION_EXEC['input']),
ACTION_EX['name'],
json.loads(ACTION_EX['input']),
description=None,
save_result=True
)
@mock.patch.object(rpc.EngineClient, 'start_action')
def test_post_without_input(self, f):
f.return_value = action_ex.to_dict()
f.return_value['output'] = {'result': '"123"'}
f.return_value = ACTION_EX_DB.to_dict()
f.return_value['output'] = {'result': '123'}
resp = self.app.post_json(
'/v2/action_executions',
@ -137,16 +137,9 @@ class TestActionExecutionsController(base.FunctionalTest):
)
self.assertEqual(resp.status_int, 201)
self.assertDictEqual(
{'result': '"123"'},
json.loads(resp.json['output'])
)
self.assertEqual('{"result": "123"}', resp.json['output'])
f.assert_called_once_with(
'nova.servers_list',
None,
description=None
)
f.assert_called_once_with('nova.servers_list', {}, description=None)
def test_post_bad_result(self):
resp = self.app.post_json(
@ -168,7 +161,7 @@ class TestActionExecutionsController(base.FunctionalTest):
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
def test_put(self, f):
f.return_value = UPDATED_ACTION_EX
f.return_value = UPDATED_ACTION_EX_DB
resp = self.app.put_json('/v2/action_executions/123', UPDATED_ACTION)
@ -177,7 +170,7 @@ class TestActionExecutionsController(base.FunctionalTest):
f.assert_called_once_with(
UPDATED_ACTION['id'],
wf_utils.Result(data=action_ex.output)
wf_utils.Result(data=ACTION_EX_DB.output)
)
@mock.patch.object(rpc.EngineClient, 'on_action_complete')
@ -191,7 +184,7 @@ class TestActionExecutionsController(base.FunctionalTest):
f.assert_called_once_with(
ERROR_ACTION['id'],
wf_utils.Result(error=action_ex.output)
wf_utils.Result(error=ACTION_EX_DB.output)
)
@mock.patch.object(
@ -200,14 +193,20 @@ class TestActionExecutionsController(base.FunctionalTest):
MOCK_NOT_FOUND
)
def test_put_no_action_ex(self):
resp = self.app.put_json('/v2/action_executions/123', UPDATED_ACTION,
expect_errors=True)
resp = self.app.put_json(
'/v2/action_executions/123',
UPDATED_ACTION,
expect_errors=True
)
self.assertEqual(resp.status_int, 404)
def test_put_bad_result(self):
resp = self.app.put_json('/v2/action_executions/123', BROKEN_ACTION,
expect_errors=True)
resp = self.app.put_json(
'/v2/action_executions/123',
BROKEN_ACTION,
expect_errors=True
)
self.assertEqual(resp.status_int, 400)
@ -215,7 +214,9 @@ class TestActionExecutionsController(base.FunctionalTest):
def test_put_without_result(self, f):
action_ex = copy.copy(UPDATED_ACTION)
del action_ex['output']
f.return_value = UPDATED_ACTION_EX
f.return_value = UPDATED_ACTION_EX_DB
resp = self.app.put_json('/v2/action_executions/123', action_ex)
self.assertEqual(resp.status_int, 200)
@ -227,7 +228,7 @@ class TestActionExecutionsController(base.FunctionalTest):
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['action_executions']), 1)
self.assertDictEqual(ACTION_EXEC, resp.json['action_executions'][0])
self.assertDictEqual(ACTION_EX, resp.json['action_executions'][0])
@mock.patch.object(db_api, 'get_action_executions', MOCK_EMPTY)
def test_get_all_empty(self):