Implement workflow execution environment - part 1

First part of a series to implement the workflow execution environment.
Implement REST API for the Environment model.

Change-Id: Ic6420ba5699688c64649ed0771aee88c0c0dec82
Implements: blueprint mistral-execution-environment
This commit is contained in:
W Chan 2015-01-13 15:07:21 -08:00
parent 53cb213254
commit a98e8016c6
7 changed files with 656 additions and 10 deletions

View File

@ -0,0 +1,148 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import uuid
from pecan import rest
import six
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.db.v2 import api as db_api
from mistral.openstack.common import log as logging
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
SAMPLE = {
'server': 'localhost',
'database': 'temp',
'timeout': 600,
'verbose': True
}
class Environment(resource.Resource):
"""Environment resource."""
id = wtypes.text
name = wtypes.text
description = wtypes.text
variables = wtypes.text
scope = wtypes.Enum(str, 'private', 'public')
created_at = wtypes.text
updated_at = wtypes.text
def __init__(self, *args, **kwargs):
super(Environment, self).__init__()
for key, val in six.iteritems(kwargs):
if key == 'variables' and val is not None:
val = json.dumps(val)
setattr(self, key, val)
def to_dict(self):
d = super(Environment, self).to_dict()
if d.get('variables'):
d['variables'] = json.loads(d['variables'])
return d
@classmethod
def from_dict(cls, d):
return cls(**d)
@classmethod
def sample(cls):
return cls(id=str(uuid.uuid4()),
name='sample',
description='example environment entry',
variables=json.dumps(SAMPLE),
scope='private',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000')
class Environments(resource.Resource):
"""A collection of Environment resources."""
environments = [Environment]
@classmethod
def sample(cls):
return cls(environments=[Environment.sample()])
class EnvironmentController(rest.RestController):
@wsme_pecan.wsexpose(Environments)
def get_all(self):
"""Return all environments.
Where project_id is the same as the requestor or
project_id is different but the scope is public.
"""
LOG.info("Fetch environments.")
environments = [Environment(**db_model.to_dict())
for db_model in db_api.get_environments()]
return Environments(environments=environments)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Environment, wtypes.text)
def get(self, name):
"""Return the named environment."""
LOG.info("Fetch environment [name=%s]" % name)
db_model = db_api.get_environment(name)
return Environment(**db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Environment, body=Environment, status_code=201)
def post(self, environment):
"""Create a new environment."""
LOG.info("Create environment [env=%s]" % environment)
db_model = db_api.create_environment(environment.to_dict())
return Environment(**db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(Environment, body=Environment)
def put(self, environment):
"""Update an environment."""
if not environment.name:
raise ValueError('Name of the environment is not provided.')
LOG.info("Update environment [name=%s, env=%s]" %
(environment.name, environment))
db_model = db_api.update_environment(environment.name,
environment.to_dict())
return Environment(**db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, name):
"""Delete the named environment."""
LOG.info("Delete environment [name=%s]" % name)
db_api.delete_environment(name)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -21,6 +20,7 @@ import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.api.controllers.v2 import action
from mistral.api.controllers.v2 import cron_trigger
from mistral.api.controllers.v2 import environment
from mistral.api.controllers.v2 import execution
from mistral.api.controllers.v2 import task
from mistral.api.controllers.v2 import workbook
@ -49,6 +49,7 @@ class Controller(object):
executions = execution.ExecutionsController()
tasks = task.TasksController()
cron_triggers = cron_trigger.CronTriggersController()
environments = environment.EnvironmentController()
@wsme_pecan.wsexpose(RootResource)
def index(self):

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -292,3 +291,38 @@ def delete_cron_trigger(name):
def delete_cron_triggers(**kwargs):
return IMPL.delete_cron_triggers(**kwargs)
# Environments.
def get_environment(name):
return IMPL.get_environment(name)
def load_environment(name):
"""Unlike get_environment this method is allowed to return None."""
return IMPL.load_environment(name)
def get_environments():
return IMPL.get_environments()
def create_environment(values):
return IMPL.create_environment(values)
def update_environment(name, values):
return IMPL.update_environment(name, values)
def create_or_update_environment(name, values):
return IMPL.create_or_update_environment(name, values)
def delete_environment(name):
IMPL.delete_environment(name)
def delete_environments(**kwargs):
IMPL.delete_environments(**kwargs)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -669,3 +668,95 @@ def _get_cron_triggers(**kwargs):
query = b.model_query(models.CronTrigger)
return query.filter_by(**kwargs).all()
# Environments.
def get_environment(name):
env = _get_environment(name)
if not env:
raise exc.NotFoundException(
"Environment not found [environment_name=%s]" % name)
return env
def load_environment(name):
return _get_environment(name)
def get_environments(**kwargs):
return _get_collection_sorted_by_name(models.Environment, **kwargs)
@b.session_aware()
def create_environment(values, session=None):
env = models.Environment()
env.update(values)
# Default environment to private unless specified.
if not getattr(env, 'scope', None):
env['scope'] = 'private'
if context.ctx().project_id:
env['project_id'] = context.ctx().project_id
try:
env.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Environment: %s"
% e.columns)
return env
@b.session_aware()
def update_environment(name, values, session=None):
env = _get_environment(name)
if not env:
raise exc.NotFoundException(
"Environment not found [environment_name=%s]" % name)
env.update(values)
# Default environment to private unless specified.
if not getattr(env, 'scope', None):
env['scope'] = 'private'
if context.ctx().project_id:
env['project_id'] = context.ctx().project_id
return env
@b.session_aware()
def create_or_update_environment(name, values, session=None):
env = _get_environment(name)
if not env:
return create_environment(values)
else:
return update_environment(name, values)
@b.session_aware()
def delete_environment(name, session=None):
env = _get_environment(name)
if not env:
raise exc.NotFoundException(
"Environment not found [environment_name=%s]" % name)
session.delete(env)
def _get_environment(name):
return _get_db_object_by_name(models.Environment, name)
@b.session_aware()
def delete_environments(**kwargs):
return _delete_all(models.Environment, **kwargs)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -174,6 +173,27 @@ class Action(mb.MistralModelBase):
trust_id = sa.Column(sa.String(80))
class Environment(mb.MistralModelBase):
"""Contains environment variables for workflow execution."""
__tablename__ = 'environments_v2'
__table_args__ = (
sa.UniqueConstraint('name', 'project_id'),
)
# Main properties.
id = mb.id_column()
name = sa.Column(sa.String(200))
description = sa.Column(sa.Text())
variables = sa.Column(st.JsonDictType())
# Security properties.
scope = sa.Column(sa.String(80))
project_id = sa.Column(sa.String(80), default=db_base.DEFAULT_PROJECT_ID)
trust_id = sa.Column(sa.String(80))
def calc_hash(context):
d = context.current_parameters['workflow_input'] or {}

View File

@ -0,0 +1,226 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
import json
import uuid
import mock
import six
from mistral.api.controllers.v2 import environment as api
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models as db
from mistral import exceptions as exc
from mistral.tests.unit.api import base
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
VARIABLES = {
'host': 'localhost',
'db': 'test',
'timeout': 600,
'verbose': True,
'__actions': {
'std.sql': {
'conn': 'mysql://admin:secrete@{$.__env.host}/{$.__env.db}'
}
}
}
ENVIRONMENT = {
'id': str(uuid.uuid4()),
'name': 'test',
'description': 'my test settings',
'variables': json.dumps(VARIABLES),
'scope': 'private',
'created_at': '2015-01-01 00:00:00',
'updated_at': '2015-01-01 00:00:00'
}
ENVIRONMENT_DB = db.Environment(
id=ENVIRONMENT['id'],
name=ENVIRONMENT['name'],
description=ENVIRONMENT['description'],
variables=copy.deepcopy(VARIABLES),
scope=ENVIRONMENT['scope'],
created_at=datetime.datetime.strptime(ENVIRONMENT['created_at'],
DATETIME_FORMAT),
updated_at=datetime.datetime.strptime(ENVIRONMENT['updated_at'],
DATETIME_FORMAT)
)
ENVIRONMENT_DB_DICT = {k: v for k, v in six.iteritems(ENVIRONMENT_DB)}
UPDATED_VARIABLES = copy.deepcopy(VARIABLES)
UPDATED_VARIABLES['host'] = '127.0.0.1'
UPDATED_ENVIRONMENT = copy.deepcopy(ENVIRONMENT)
UPDATED_ENVIRONMENT['variables'] = json.dumps(UPDATED_VARIABLES)
UPDATED_ENVIRONMENT_DB = db.Environment(**ENVIRONMENT_DB_DICT)
UPDATED_ENVIRONMENT_DB.variables = copy.deepcopy(UPDATED_VARIABLES)
MOCK_ENVIRONMENT = mock.MagicMock(return_value=ENVIRONMENT_DB)
MOCK_ENVIRONMENTS = mock.MagicMock(return_value=[ENVIRONMENT_DB])
MOCK_UPDATED_ENVIRONMENT = mock.MagicMock(return_value=UPDATED_ENVIRONMENT_DB)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntry())
MOCK_DELETE = mock.MagicMock(return_value=None)
class TestEnvironmentController(base.FunctionalTest):
def _assert_dict_equal(self, actual, expected):
self.assertIsInstance(actual, dict)
self.assertIsInstance(expected, dict)
if (actual.get('variables') and
isinstance(actual.get('variables'), basestring)):
actual['variables'] = json.loads(actual['variables'])
if (expected.get('variables') and
isinstance(expected.get('variables'), basestring)):
expected['variables'] = json.loads(expected['variables'])
self.assertDictEqual(actual, expected)
def test_resource(self):
resource = api.Environment(**copy.deepcopy(ENVIRONMENT))
actual = resource.to_dict()
expected = copy.deepcopy(ENVIRONMENT)
self._assert_dict_equal(actual, expected)
def test_resource_to_db_model(self):
resource = api.Environment(**copy.deepcopy(ENVIRONMENT))
values = resource.to_dict()
values['variables'] = json.loads(values['variables'])
values['created_at'] = datetime.datetime.strptime(
values['created_at'], DATETIME_FORMAT)
values['updated_at'] = datetime.datetime.strptime(
values['updated_at'], DATETIME_FORMAT)
db_model = db.Environment(**values)
with db_api.transaction():
db_api.create_environment(db_model)
self.assertEqual(db_model.id, values['id'])
self.assertEqual(db_model.name, values['name'])
self.assertIsNone(db_model.project_id)
self.assertEqual(db_model.description, values['description'])
self.assertDictEqual(db_model.variables, values['variables'])
self.assertEqual(db_model.created_at, values['created_at'])
self.assertEqual(db_model.updated_at, values['updated_at'])
@mock.patch.object(db_api, 'get_environments', MOCK_ENVIRONMENTS)
def test_get_all(self):
resp = self.app.get('/v2/environments')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['environments']), 1)
def test_get_all_empty(self):
resp = self.app.get('/v2/environments')
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json['environments']), 0)
@mock.patch.object(db_api, 'get_environment', MOCK_ENVIRONMENT)
def test_get(self):
resp = self.app.get('/v2/environments/123')
self.assertEqual(resp.status_int, 200)
self.assertDictEqual(ENVIRONMENT, resp.json)
@mock.patch.object(db_api, "get_environment", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get('/v2/environments/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'create_environment', MOCK_ENVIRONMENT)
def test_post(self):
resp = self.app.post_json(
'/v2/environments',
copy.deepcopy(ENVIRONMENT))
self.assertEqual(resp.status_int, 201)
self._assert_dict_equal(resp.json, copy.deepcopy(ENVIRONMENT))
@mock.patch.object(db_api, 'create_environment', MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post_json(
'/v2/environments',
copy.deepcopy(ENVIRONMENT),
expect_errors=True)
self.assertEqual(resp.status_int, 409)
@mock.patch.object(db_api, 'create_environment', MOCK_ENVIRONMENT)
def test_post_default_scope(self):
env = copy.deepcopy(ENVIRONMENT)
del env['scope']
resp = self.app.post_json('/v2/environments', env)
self.assertEqual(resp.status_int, 201)
self._assert_dict_equal(resp.json, copy.deepcopy(ENVIRONMENT))
@mock.patch.object(db_api, 'update_environment', MOCK_UPDATED_ENVIRONMENT)
def test_put(self):
resp = self.app.put_json(
'/v2/environments',
copy.deepcopy(UPDATED_ENVIRONMENT))
self.assertEqual(resp.status_int, 200)
self._assert_dict_equal(resp.json, copy.deepcopy(UPDATED_ENVIRONMENT))
@mock.patch.object(db_api, 'update_environment', MOCK_UPDATED_ENVIRONMENT)
def test_put_default_scope(self):
env = copy.deepcopy(UPDATED_ENVIRONMENT)
env['scope'] = None
resp = self.app.put_json('/v2/environments', env)
self.assertEqual(resp.status_int, 200)
self._assert_dict_equal(resp.json, copy.deepcopy(UPDATED_ENVIRONMENT))
@mock.patch.object(db_api, 'update_environment', MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put_json(
'/v2/environments/test',
copy.deepcopy(UPDATED_ENVIRONMENT),
expect_errors=True)
self.assertEqual(resp.status_int, 404)
@mock.patch.object(db_api, 'delete_environment', MOCK_DELETE)
def test_delete(self):
resp = self.app.delete('/v2/environments/123')
self.assertEqual(resp.status_int, 204)
@mock.patch.object(db_api, 'delete_environment', MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete('/v2/environments/123', expect_errors=True)
self.assertEqual(resp.status_int, 404)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -925,6 +924,133 @@ class CronTriggerTest(SQLAlchemyTest):
self.assertIn("'name': 'trigger1'", s)
ENVIRONMENTS = [
{
'name': 'env1',
'description': 'Test Environment #1',
'scope': 'private',
'variables': {
'server': 'localhost',
'database': 'test',
'timeout': 600,
'verbose': True
}
},
{
'name': 'env2',
'description': 'Test Environment #2',
'scope': 'public',
'variables': {
'server': '127.0.0.1',
'database': 'temp',
'timeout': 300,
'verbose': False
}
}
]
class EnvironmentTest(SQLAlchemyTest):
def setUp(self):
super(EnvironmentTest, self).setUp()
db_api.delete_environments()
def test_create_and_get_and_load_environment(self):
created = db_api.create_environment(ENVIRONMENTS[0])
fetched = db_api.get_environment(created.name)
self.assertEqual(created, fetched)
fetched = db_api.load_environment(created.name)
self.assertEqual(created, fetched)
self.assertIsNone(db_api.load_environment("not-existing-id"))
def test_create_environment_duplicate_without_auth(self):
cfg.CONF.set_default('auth_enable', False, group='pecan')
db_api.create_environment(ENVIRONMENTS[0])
self.assertRaises(
exc.DBDuplicateEntry,
db_api.create_environment,
ENVIRONMENTS[0]
)
def test_update_environment(self):
created = db_api.create_environment(ENVIRONMENTS[0])
updated = db_api.update_environment(
created.name,
{'description': 'my new desc'}
)
self.assertEqual('my new desc', updated.description)
fetched = db_api.get_environment(created.name)
self.assertEqual(updated, fetched)
def test_create_or_update_environment(self):
name = 'not-existing-id'
self.assertIsNone(db_api.load_environment(name))
created = db_api.create_or_update_environment(name, ENVIRONMENTS[0])
self.assertIsNotNone(created)
self.assertIsNotNone(created.name)
updated = db_api.create_or_update_environment(
created.name,
{'description': 'my new desc'}
)
self.assertEqual('my new desc', updated.description)
self.assertEqual(
'my new desc',
db_api.load_environment(updated.name).description
)
fetched = db_api.get_environment(created.name)
self.assertEqual(updated, fetched)
def test_get_environments(self):
created0 = db_api.create_environment(ENVIRONMENTS[0])
created1 = db_api.create_environment(ENVIRONMENTS[1])
fetched = db_api.get_environments()
self.assertEqual(2, len(fetched))
self.assertEqual(created0, fetched[0])
self.assertEqual(created1, fetched[1])
def test_delete_environment(self):
created = db_api.create_environment(ENVIRONMENTS[0])
fetched = db_api.get_environment(created.name)
self.assertEqual(created, fetched)
db_api.delete_environment(created.name)
self.assertRaises(
exc.NotFoundException,
db_api.get_environment,
created.name
)
def test_environment_repr(self):
s = db_api.create_environment(ENVIRONMENTS[0]).__repr__()
self.assertIn('Environment ', s)
self.assertIn("'description': 'Test Environment #1'", s)
self.assertIn("'name': 'env1'", s)
class TXTest(SQLAlchemyTest):
def test_rollback(self):
db_api.start_tx()