Merge "Convergence prepare traversal"
This commit is contained in:
commit
e4b959d75c
|
@ -53,9 +53,14 @@ class Node(object):
|
|||
self.satisfy.add(source)
|
||||
return iter(self.satisfy)
|
||||
|
||||
def requires(self, target):
|
||||
'''Add a key that this node requires.'''
|
||||
self.require.add(target)
|
||||
def requires(self, target=None):
|
||||
'''
|
||||
Add a key that this node requires, and optionally add a
|
||||
new one.
|
||||
'''
|
||||
if target is not None:
|
||||
self.require.add(target)
|
||||
return iter(self.require)
|
||||
|
||||
def __isub__(self, target):
|
||||
'''Remove a key that this node requires.'''
|
||||
|
@ -153,6 +158,13 @@ class Graph(collections.defaultdict):
|
|||
text = '{%s}' % ', '.join(pairs)
|
||||
return encodeutils.safe_decode(text)
|
||||
|
||||
def leaves(self):
|
||||
'''
|
||||
Return an iterator over all of the leaf nodes in the graph.
|
||||
'''
|
||||
return (requirer for requirer, required in self.items()
|
||||
if not required)
|
||||
|
||||
@staticmethod
|
||||
def toposort(graph):
|
||||
'''
|
||||
|
@ -207,6 +219,15 @@ class Dependencies(object):
|
|||
|
||||
return self._graph[last].required_by()
|
||||
|
||||
def requires(self, target):
|
||||
'''
|
||||
List the keys that require the specified node.
|
||||
'''
|
||||
if target not in self._graph:
|
||||
raise KeyError
|
||||
|
||||
return self._graph[target].requires()
|
||||
|
||||
def __getitem__(self, last):
|
||||
'''
|
||||
Return a partial dependency graph consisting of the specified node and
|
||||
|
@ -235,6 +256,18 @@ class Dependencies(object):
|
|||
|
||||
return Dependencies(edges)
|
||||
|
||||
def translate(self, transform):
|
||||
'''
|
||||
Translate all of the nodes using a transform function.
|
||||
|
||||
Returns a new Dependencies object.
|
||||
'''
|
||||
def transform_key(key):
|
||||
return transform(key) if key is not None else None
|
||||
|
||||
edges = self._graph.edges()
|
||||
return type(self)(tuple(map(transform_key, e)) for e in edges)
|
||||
|
||||
def __str__(self):
|
||||
'''
|
||||
Return a human-readable string representation of the dependency graph
|
||||
|
|
|
@ -174,11 +174,11 @@ class Resource(object):
|
|||
self.created_time = None
|
||||
self.updated_time = None
|
||||
self._rpc_client = None
|
||||
self.needed_by = None
|
||||
self.requires = None
|
||||
self.needed_by = []
|
||||
self.requires = []
|
||||
self.replaces = None
|
||||
self.replaced_by = None
|
||||
self.current_template_id = stack.t.id
|
||||
self.current_template_id = None
|
||||
|
||||
resource = stack.db_resource_get(name)
|
||||
if resource:
|
||||
|
@ -268,6 +268,20 @@ class Resource(object):
|
|||
rs.update_and_save({'rsrc_metadata': metadata})
|
||||
self._rsrc_metadata = metadata
|
||||
|
||||
@classmethod
|
||||
def set_needed_by(cls, db_rsrc, needed_by):
|
||||
if db_rsrc:
|
||||
db_rsrc.update_and_save(
|
||||
{'needed_by': needed_by}
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def set_requires(cls, db_rsrc, requires):
|
||||
if db_rsrc:
|
||||
db_rsrc.update_and_save(
|
||||
{'requires': requires}
|
||||
)
|
||||
|
||||
def _break_if_required(self, action, hook):
|
||||
'''Block the resource until the hook is cleared if there is one.'''
|
||||
if self.stack.env.registry.matches_hook(self.name, hook):
|
||||
|
|
|
@ -36,6 +36,7 @@ from heat.common.i18n import _LW
|
|||
from heat.common import identifier
|
||||
from heat.common import messaging as rpc_messaging
|
||||
from heat.common import service_utils
|
||||
from heat.common import template_format
|
||||
from heat.engine import api
|
||||
from heat.engine import attributes
|
||||
from heat.engine import clients
|
||||
|
@ -668,8 +669,7 @@ class EngineService(service.Service):
|
|||
"""
|
||||
LOG.info(_LI('Creating stack %s'), stack_name)
|
||||
|
||||
def _stack_create(stack):
|
||||
|
||||
def _create_stack_user(stack):
|
||||
if not stack.stack_user_project_id:
|
||||
try:
|
||||
stack.create_stack_user_project_id()
|
||||
|
@ -677,6 +677,8 @@ class EngineService(service.Service):
|
|||
stack.state_set(stack.action, stack.FAILED,
|
||||
six.text_type(ex))
|
||||
|
||||
def _stack_create(stack):
|
||||
_create_stack_user(stack)
|
||||
# Create/Adopt a stack, and create the periodic task if successful
|
||||
if stack.adopt_stack_data:
|
||||
stack.adopt()
|
||||
|
@ -692,18 +694,22 @@ class EngineService(service.Service):
|
|||
LOG.info(_LI("Stack create failed, status %s"), stack.status)
|
||||
|
||||
convergence = cfg.CONF.convergence_engine
|
||||
if convergence:
|
||||
raise exception.NotSupported(feature=_('Convergence engine'))
|
||||
|
||||
stack = self._parse_template_and_validate_stack(
|
||||
cnxt, stack_name, template, params, files, args, owner_id,
|
||||
nested_depth, user_creds_id, stack_user_project_id, convergence,
|
||||
parent_resource_name)
|
||||
|
||||
stack.store()
|
||||
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
_stack_create, stack)
|
||||
# once validations are done
|
||||
# if convergence is enabled, take convergence path
|
||||
if convergence:
|
||||
# TODO(later): call _create_stack_user(stack)
|
||||
# call stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
raise exception.NotSupported(feature=_('Convergence engine'))
|
||||
else:
|
||||
stack.store()
|
||||
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
|
||||
_stack_create, stack)
|
||||
|
||||
return dict(stack.identifier())
|
||||
|
||||
|
@ -765,14 +771,20 @@ class EngineService(service.Service):
|
|||
self._validate_deferred_auth_context(cnxt, updated_stack)
|
||||
updated_stack.validate()
|
||||
|
||||
event = eventlet.event.Event()
|
||||
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
|
||||
self.engine_id,
|
||||
current_stack.update,
|
||||
updated_stack,
|
||||
event=event)
|
||||
th.link(self.thread_group_mgr.remove_event, current_stack.id, event)
|
||||
self.thread_group_mgr.add_event(current_stack.id, event)
|
||||
# Once all the validations are done
|
||||
# if convergence is enabled, take the convergence path
|
||||
if current_kwargs['convergence']:
|
||||
current_stack.converge_stack(template=tmpl)
|
||||
else:
|
||||
event = eventlet.event.Event()
|
||||
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
|
||||
self.engine_id,
|
||||
current_stack.update,
|
||||
updated_stack,
|
||||
event=event)
|
||||
th.link(self.thread_group_mgr.remove_event,
|
||||
current_stack.id, event)
|
||||
self.thread_group_mgr.add_event(current_stack.id, event)
|
||||
return dict(current_stack.identifier())
|
||||
|
||||
@context.request_context
|
||||
|
@ -927,6 +939,16 @@ class EngineService(service.Service):
|
|||
LOG.info(_LI('Deleting stack %s'), st.name)
|
||||
stack = parser.Stack.load(cnxt, stack=st)
|
||||
|
||||
if stack.convergence:
|
||||
empty_template = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Empty Template
|
||||
'''
|
||||
tmpl = template_format.parse(empty_template)
|
||||
template = templatem.Template(tmpl)
|
||||
stack.converge_stack(template=template, action=stack.DELETE)
|
||||
return
|
||||
|
||||
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
|
||||
with lock.try_thread_lock() as acquire_result:
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import warnings
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import uuidutils
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
|
||||
|
@ -41,6 +42,7 @@ from heat.engine import parameter_groups as param_groups
|
|||
from heat.engine import resource
|
||||
from heat.engine import resources
|
||||
from heat.engine import scheduler
|
||||
from heat.engine import sync_point
|
||||
from heat.engine import template as tmpl
|
||||
from heat.engine import update
|
||||
from heat.objects import resource as resource_objects
|
||||
|
@ -86,7 +88,8 @@ class Stack(collections.Mapping):
|
|||
user_creds_id=None, tenant_id=None,
|
||||
use_stored_context=False, username=None,
|
||||
nested_depth=0, strict_validate=True, convergence=False,
|
||||
current_traversal=None, tags=None):
|
||||
current_traversal=None, tags=None, prev_raw_template_id=None,
|
||||
current_deps=None):
|
||||
'''
|
||||
Initialise from a context, name, Template object and (optionally)
|
||||
Environment object. The database ID may also be initialised, if the
|
||||
|
@ -130,6 +133,8 @@ class Stack(collections.Mapping):
|
|||
self.convergence = convergence
|
||||
self.current_traversal = current_traversal
|
||||
self.tags = tags
|
||||
self.prev_raw_template_id = prev_raw_template_id
|
||||
self.current_deps = current_deps
|
||||
|
||||
if use_stored_context:
|
||||
self.context = self.stored_context()
|
||||
|
@ -400,7 +405,9 @@ class Stack(collections.Mapping):
|
|||
user_creds_id=stack.user_creds_id, tenant_id=stack.tenant,
|
||||
use_stored_context=use_stored_context,
|
||||
username=stack.username, convergence=stack.convergence,
|
||||
current_traversal=stack.current_traversal, tags=tags)
|
||||
current_traversal=stack.current_traversal, tags=tags,
|
||||
prev_raw_template_id=stack.prev_raw_template_id,
|
||||
current_deps=stack.current_deps)
|
||||
|
||||
def get_kwargs_for_cloning(self, keep_status=False, only_db=False):
|
||||
"""Get common kwargs for calling Stack() for cloning.
|
||||
|
@ -426,6 +433,8 @@ class Stack(collections.Mapping):
|
|||
'nested_depth': self.nested_depth,
|
||||
'convergence': self.convergence,
|
||||
'current_traversal': self.current_traversal,
|
||||
'prev_raw_template_id': self.prev_raw_template_id,
|
||||
'current_deps': self.current_deps
|
||||
}
|
||||
if keep_status:
|
||||
stack.update({
|
||||
|
@ -908,6 +917,155 @@ class Stack(collections.Mapping):
|
|||
event=event)
|
||||
updater()
|
||||
|
||||
@profiler.trace('Stack.converge_stack', hide_args=False)
|
||||
def converge_stack(self, template, action=UPDATE):
|
||||
"""
|
||||
Updates the stack and triggers convergence for resources
|
||||
"""
|
||||
self.prev_raw_template_id = getattr(self.t, 'id', None)
|
||||
self.t = template
|
||||
previous_traversal = self.current_traversal
|
||||
self.current_traversal = uuidutils.generate_uuid()
|
||||
self.store()
|
||||
|
||||
# TODO(later): lifecycle_plugin_utils.do_pre_ops
|
||||
self.state_set(action, self.IN_PROGRESS,
|
||||
'Stack %s started' % action)
|
||||
|
||||
# delete the prev traversal sync_points
|
||||
sync_point.delete_all(self.context, self.id, previous_traversal)
|
||||
self._converge_create_or_update()
|
||||
|
||||
def _converge_create_or_update(self):
|
||||
self._update_or_store_resources()
|
||||
self.convergence_dependencies = self._convergence_dependencies(
|
||||
self.ext_rsrcs_db, self.dependencies)
|
||||
LOG.info(_LI('convergence_dependencies: %s'),
|
||||
self.convergence_dependencies)
|
||||
|
||||
# create sync_points for resources in DB
|
||||
for rsrc_id, is_update in self.convergence_dependencies:
|
||||
sync_point.create(self.context, rsrc_id,
|
||||
self.current_traversal, is_update,
|
||||
self.id)
|
||||
# create sync_point entry for stack
|
||||
sync_point.create(
|
||||
self.context, self.id, self.current_traversal,
|
||||
False if self.action in (self.DELETE, self.SUSPEND) else True,
|
||||
self.id)
|
||||
|
||||
# Store list of edges
|
||||
self.current_deps = {
|
||||
'edges': [[rqr, rqd] for rqr, rqd in
|
||||
self.convergence_dependencies.graph().edges()]}
|
||||
self.store()
|
||||
|
||||
leaves = (self.convergence_dependencies.graph(reverse=True).leaves()
|
||||
if self.action in (self.DELETE, self.SUSPEND)
|
||||
else self.convergence_dependencies.graph().leaves())
|
||||
|
||||
for rsrc_id, is_update in leaves:
|
||||
LOG.info(_LI("Triggering resource %(rsrc_id)s "
|
||||
"for update=%(is_update)s"),
|
||||
{'rsrc_id': rsrc_id, 'is_update': is_update})
|
||||
self.temp_update_requires(self.convergence_dependencies)
|
||||
|
||||
def _update_or_store_resources(self):
|
||||
try:
|
||||
ext_rsrcs_db = resource_objects.Resource.get_all_by_stack(
|
||||
self.context, self.id)
|
||||
except exception.NotFound:
|
||||
self.ext_rsrcs_db = None
|
||||
else:
|
||||
self.ext_rsrcs_db = {res.id: res
|
||||
for res_name, res in ext_rsrcs_db.items()}
|
||||
|
||||
def get_existing_rsrc_db(rsrc_name):
|
||||
candidate = None
|
||||
if self.ext_rsrcs_db:
|
||||
for id, ext_rsrc in self.ext_rsrcs_db.items():
|
||||
if ext_rsrc.name != rsrc_name:
|
||||
continue
|
||||
if ext_rsrc.current_template_id == self.t.id:
|
||||
# Rollback where the previous resource still exists
|
||||
candidate = ext_rsrc
|
||||
break
|
||||
elif (ext_rsrc.current_template_id ==
|
||||
self.prev_raw_template_id):
|
||||
# Current resource is otherwise a good candidate
|
||||
candidate = ext_rsrc
|
||||
break
|
||||
return candidate
|
||||
|
||||
curr_name_translated_dep = self.dependencies.translate(lambda res:
|
||||
res.name)
|
||||
rsrcs = {}
|
||||
|
||||
def update_needed_by(res):
|
||||
new_requirers = set(
|
||||
rsrcs[rsrc_name].id for rsrc_name in
|
||||
curr_name_translated_dep.required_by(res.name)
|
||||
)
|
||||
old_requirers = set(res.needed_by) if res.needed_by else set()
|
||||
needed_by = old_requirers | new_requirers
|
||||
res.needed_by = list(needed_by)
|
||||
|
||||
for rsrc in reversed(self.dependencies):
|
||||
existing_rsrc_db = get_existing_rsrc_db(rsrc.name)
|
||||
if existing_rsrc_db is None:
|
||||
update_needed_by(rsrc)
|
||||
rsrc.current_template_id = self.t.id
|
||||
rsrc._store()
|
||||
rsrcs[rsrc.name] = rsrc
|
||||
else:
|
||||
update_needed_by(existing_rsrc_db)
|
||||
resource.Resource.set_needed_by(
|
||||
existing_rsrc_db, existing_rsrc_db.needed_by
|
||||
)
|
||||
rsrcs[existing_rsrc_db.name] = existing_rsrc_db
|
||||
|
||||
def _convergence_dependencies(self, existing_resources,
|
||||
curr_template_dep):
|
||||
dep = curr_template_dep.translate(lambda res: (res.id, True))
|
||||
if existing_resources:
|
||||
for rsrc_id, rsrc in existing_resources.items():
|
||||
dep += (rsrc_id, False), None
|
||||
|
||||
for requirement in rsrc.requires:
|
||||
if requirement in existing_resources:
|
||||
dep += (requirement, False), (rsrc_id, False)
|
||||
if rsrc.replaces in existing_resources:
|
||||
dep += (rsrc.replaces, False), (rsrc_id, False)
|
||||
|
||||
if (rsrc.id, True) in dep:
|
||||
dep += (rsrc_id, False), (rsrc_id, True)
|
||||
return dep
|
||||
|
||||
def temp_update_requires(self, conv_deps):
|
||||
'''updates requires column of resources'''
|
||||
# This functions should be removed once the dependent patches
|
||||
# are implemented.
|
||||
if self.action in (self.CREATE, self.UPDATE):
|
||||
requires = dict()
|
||||
for rsrc_id, is_update in conv_deps:
|
||||
reqs = conv_deps.requires((rsrc_id, is_update))
|
||||
requires[rsrc_id] = list({id for id, is_update in reqs})
|
||||
|
||||
try:
|
||||
rsrcs_db = resource_objects.Resource.get_all_by_stack(
|
||||
self.context, self.id)
|
||||
except exception.NotFound:
|
||||
rsrcs_db = None
|
||||
else:
|
||||
rsrcs_db = {res.id: res for res_name, res in rsrcs_db.items()}
|
||||
|
||||
if rsrcs_db:
|
||||
for id, db_rsrc in rsrcs_db.items():
|
||||
if id in requires:
|
||||
resource.Resource.set_requires(
|
||||
db_rsrc, requires[id]
|
||||
)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def update_task(self, newstack, action=UPDATE, event=None):
|
||||
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from heat.objects import sync_point as sync_point_object
|
||||
|
||||
|
||||
def create(context, entity_id, traversal_id, is_update, stack_id):
|
||||
"""
|
||||
Creates an sync point entry in DB.
|
||||
"""
|
||||
values = {'entity_id': entity_id, 'traversal_id': traversal_id,
|
||||
'is_update': is_update, 'atomic_key': 0,
|
||||
'stack_id': stack_id, 'input_data': {}}
|
||||
return sync_point_object.SyncPoint.create(context, values)
|
||||
|
||||
|
||||
def get(context, entity_id, traversal_id, is_update):
|
||||
"""
|
||||
Retrieves a sync point entry from DB.
|
||||
"""
|
||||
return sync_point_object.SyncPoint.get_by_key(context, entity_id,
|
||||
traversal_id, is_update)
|
||||
|
||||
|
||||
def delete_all(context, stack_id, traversal_id):
|
||||
"""
|
||||
Deletes all sync points of a stack associated with a particular traversal.
|
||||
"""
|
||||
return sync_point_object.SyncPoint.delete_all_by_stack_and_traversal(
|
||||
context, stack_id, traversal_id
|
||||
)
|
|
@ -46,7 +46,8 @@ resources:
|
|||
'''
|
||||
|
||||
|
||||
def get_stack(stack_name, ctx, template=None, with_params=True):
|
||||
def get_stack(stack_name, ctx, template=None, with_params=True,
|
||||
convergence=False):
|
||||
if template is None:
|
||||
t = template_format.parse(wp_template)
|
||||
if with_params:
|
||||
|
@ -57,7 +58,7 @@ def get_stack(stack_name, ctx, template=None, with_params=True):
|
|||
else:
|
||||
t = template_format.parse(template)
|
||||
tmpl = templatem.Template(t)
|
||||
stack = parser.Stack(ctx, stack_name, tmpl)
|
||||
stack = parser.Stack(ctx, stack_name, tmpl, convergence=convergence)
|
||||
return stack
|
||||
|
||||
|
||||
|
|
|
@ -226,3 +226,11 @@ class dependenciesTest(common.HeatTestCase):
|
|||
"'%s' not found in required_by" % n)
|
||||
|
||||
self.assertRaises(KeyError, d.required_by, 'foo')
|
||||
|
||||
def test_graph_leaves(self):
|
||||
d = dependencies.Dependencies([('last1', 'mid'), ('last2', 'mid'),
|
||||
('mid', 'first1'), ('mid', 'first2')])
|
||||
|
||||
leaves = sorted(list(d._graph.leaves()))
|
||||
|
||||
self.assertEqual(['first1', 'first2'], leaves)
|
||||
|
|
|
@ -50,6 +50,7 @@ from heat.objects import service as service_objects
|
|||
from heat.objects import software_deployment as software_deployment_object
|
||||
from heat.objects import stack as stack_object
|
||||
from heat.objects import stack_lock as stack_lock_object
|
||||
from heat.objects import sync_point as sync_point_object
|
||||
from heat.objects import watch_data as watch_data_object
|
||||
from heat.objects import watch_rule as watch_rule_object
|
||||
from heat.openstack.common import threadgroup
|
||||
|
@ -65,6 +66,89 @@ cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
|
|||
cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config')
|
||||
|
||||
|
||||
string_template_five = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Random String templates
|
||||
|
||||
parameters:
|
||||
salt:
|
||||
type: string
|
||||
default: "quickbrownfox"
|
||||
|
||||
resources:
|
||||
A:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
B:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
C:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: [A, B]
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
D:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
E:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
'''
|
||||
|
||||
string_template_five_update = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Random String templates
|
||||
|
||||
parameters:
|
||||
salt:
|
||||
type: string
|
||||
default: "quickbrownfox123"
|
||||
|
||||
resources:
|
||||
A:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
B:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
F:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: [A, B]
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
G:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: F
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
H:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: F
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
'''
|
||||
|
||||
empty_template = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Empty Template
|
||||
'''
|
||||
|
||||
wp_template_no_default = '''
|
||||
{
|
||||
"AWSTemplateFormatVersion" : "2010-09-09",
|
||||
|
@ -152,6 +236,234 @@ resources:
|
|||
'''
|
||||
|
||||
|
||||
class StackConvergenceCreateUpdateTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(StackConvergenceCreateUpdateTest, self).setUp()
|
||||
cfg.CONF.set_override('convergence_engine', True)
|
||||
|
||||
def test_conv_wordpress_single_instance_stack_create(self):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
convergence=True)
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
self.assertIsNone(stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies([((1, True), None)])',
|
||||
repr(stack.convergence_dependencies))
|
||||
|
||||
stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
|
||||
self.assertIsNotNone(stack_db.current_traversal)
|
||||
self.assertIsNotNone(stack_db.raw_template_id)
|
||||
|
||||
self.assertIsNone(stack_db.prev_raw_template_id)
|
||||
|
||||
self.assertEqual(stack_db.convergence, True)
|
||||
self.assertEqual({'edges': [[[1, True], None]]}, stack_db.current_deps)
|
||||
|
||||
def test_conv_string_five_instance_stack_create(self):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
convergence=True)
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
self.assertIsNone(stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies(['
|
||||
'((3, True), (5, True)), '
|
||||
'((3, True), (4, True)), '
|
||||
'((1, True), (3, True)), '
|
||||
'((2, True), (3, True))])',
|
||||
repr(stack.convergence_dependencies))
|
||||
|
||||
stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
|
||||
self.assertIsNotNone(stack_db.current_traversal)
|
||||
self.assertIsNotNone(stack_db.raw_template_id)
|
||||
self.assertIsNone(stack_db.prev_raw_template_id)
|
||||
self.assertEqual(stack_db.convergence, True)
|
||||
self.assertEqual(sorted([[[3, True], [5, True]], # C, A
|
||||
[[3, True], [4, True]], # C, B
|
||||
[[1, True], [3, True]], # E, C
|
||||
[[2, True], [3, True]]]), # D, C
|
||||
sorted(stack_db.current_deps['edges']))
|
||||
|
||||
# check if needed_by is stored properly
|
||||
expected_needed_by = {'A': [3], 'B': [3],
|
||||
'C': [1, 2],
|
||||
'D': [], 'E': []}
|
||||
rsrcs_db = resource_objects.Resource.get_all_by_stack(
|
||||
stack_db._context, stack_db.id
|
||||
)
|
||||
self.assertEqual(5, len(rsrcs_db))
|
||||
for rsrc_name, rsrc_obj in rsrcs_db.items():
|
||||
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
|
||||
sorted(rsrc_obj.needed_by))
|
||||
self.assertEqual(stack_db.raw_template_id,
|
||||
rsrc_obj.current_template_id)
|
||||
|
||||
# check if sync_points were stored
|
||||
for entity_id in [5, 4, 3, 2, 1, stack_db.id]:
|
||||
sync_point = sync_point_object.SyncPoint.get_by_key(
|
||||
stack_db._context, entity_id, stack_db.current_traversal, True
|
||||
)
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
def test_conv_string_five_instance_stack_update(self):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
convergence=True)
|
||||
# create stack
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
|
||||
curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
|
||||
curr_stack = parser.Stack.load(curr_stack_db._context,
|
||||
stack=curr_stack_db)
|
||||
# update stack with new template
|
||||
t2 = template_format.parse(string_template_five_update)
|
||||
template2 = templatem.Template(
|
||||
t2, env=environment.Environment({'KeyName2': 'test2'}))
|
||||
curr_stack.converge_stack(template=template2, action=stack.UPDATE)
|
||||
|
||||
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies(['
|
||||
'((7, True), (8, True)), '
|
||||
'((8, True), (5, True)), '
|
||||
'((8, True), (4, True)), '
|
||||
'((6, True), (8, True)), '
|
||||
'((3, False), (2, False)), '
|
||||
'((3, False), (1, False)), '
|
||||
'((5, False), (3, False)), '
|
||||
'((5, False), (5, True)), '
|
||||
'((4, False), (3, False)), '
|
||||
'((4, False), (4, True))])',
|
||||
repr(curr_stack.convergence_dependencies))
|
||||
|
||||
stack_db = stack_object.Stack.get_by_id(curr_stack.context,
|
||||
curr_stack.id)
|
||||
self.assertIsNotNone(stack_db.raw_template_id)
|
||||
self.assertIsNotNone(stack_db.current_traversal)
|
||||
self.assertIsNotNone(stack_db.prev_raw_template_id)
|
||||
self.assertEqual(True, stack_db.convergence)
|
||||
self.assertEqual(sorted([[[7, True], [8, True]],
|
||||
[[8, True], [5, True]],
|
||||
[[8, True], [4, True]],
|
||||
[[6, True], [8, True]],
|
||||
[[3, False], [2, False]],
|
||||
[[3, False], [1, False]],
|
||||
[[5, False], [3, False]],
|
||||
[[5, False], [5, True]],
|
||||
[[4, False], [3, False]],
|
||||
[[4, False], [4, True]]]),
|
||||
sorted(stack_db.current_deps['edges']))
|
||||
'''
|
||||
To visualize:
|
||||
|
||||
G(7, True) H(6, True)
|
||||
\ /
|
||||
\ / B(4, False) A(5, False)
|
||||
\ / / \ / /
|
||||
\ / / /
|
||||
F(8, True) / / \ /
|
||||
/ \ / / C(3, False)
|
||||
/ \ / / \
|
||||
/ / \ /
|
||||
/ / \ / / \
|
||||
B(4, True) A(5, True) D(2, False) E(1, False)
|
||||
|
||||
Leaves are at the bottom
|
||||
'''
|
||||
|
||||
# check if needed_by are stored properly
|
||||
# For A & B:
|
||||
# needed_by=C, F
|
||||
# TODO(later): when worker is implemented test for current_template_id
|
||||
# Also test for requires
|
||||
|
||||
expected_needed_by = {'A': [3, 8], 'B': [3, 8],
|
||||
'C': [1, 2],
|
||||
'D': [], 'E': [],
|
||||
'F': [6, 7],
|
||||
'G': [], 'H': []}
|
||||
rsrcs_db = resource_objects.Resource.get_all_by_stack(
|
||||
stack_db._context, stack_db.id
|
||||
)
|
||||
self.assertEqual(8, len(rsrcs_db))
|
||||
for rsrc_name, rsrc_obj in rsrcs_db.items():
|
||||
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
|
||||
sorted(rsrc_obj.needed_by))
|
||||
|
||||
# check if sync_points are created for forward traversal
|
||||
# [F, H, G, A, B, Stack]
|
||||
for entity_id in [8, 7, 6, 5, 4, stack_db.id]:
|
||||
sync_point = sync_point_object.SyncPoint.get_by_key(
|
||||
stack_db._context, entity_id, stack_db.current_traversal, True
|
||||
)
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
# check if sync_points are created for cleanup traversal
|
||||
# [A, B, C, D, E]
|
||||
for entity_id in [5, 4, 3, 2, 1]:
|
||||
sync_point = sync_point_object.SyncPoint.get_by_key(
|
||||
stack_db._context, entity_id, stack_db.current_traversal, False
|
||||
)
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
def test_conv_empty_template_stack_update_delete(self):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
convergence=True)
|
||||
# create stack
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
|
||||
# update stack with new template
|
||||
t2 = template_format.parse(empty_template)
|
||||
template2 = templatem.Template(
|
||||
t2, env=environment.Environment({'KeyName2': 'test2'}))
|
||||
|
||||
curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
|
||||
curr_stack = parser.Stack.load(curr_stack_db._context,
|
||||
stack=curr_stack_db)
|
||||
curr_stack.converge_stack(template=template2, action=stack.DELETE)
|
||||
|
||||
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies(['
|
||||
'((3, False), (2, False)), '
|
||||
'((3, False), (1, False)), '
|
||||
'((5, False), (3, False)), '
|
||||
'((4, False), (3, False))])',
|
||||
repr(curr_stack.convergence_dependencies))
|
||||
|
||||
stack_db = stack_object.Stack.get_by_id(curr_stack.context,
|
||||
curr_stack.id)
|
||||
self.assertIsNotNone(stack_db.current_traversal)
|
||||
self.assertIsNotNone(stack_db.prev_raw_template_id)
|
||||
self.assertEqual(sorted([[[3, False], [2, False]],
|
||||
[[3, False], [1, False]],
|
||||
[[5, False], [3, False]],
|
||||
[[4, False], [3, False]]]),
|
||||
sorted(stack_db.current_deps['edges']))
|
||||
|
||||
# TODO(later): when worker is implemented test for current_template_id
|
||||
# Also test for requires
|
||||
expected_needed_by = {'A': [3], 'B': [3],
|
||||
'C': [1, 2],
|
||||
'D': [], 'E': []}
|
||||
rsrcs_db = resource_objects.Resource.get_all_by_stack(
|
||||
stack_db._context, stack_db.id
|
||||
)
|
||||
self.assertEqual(5, len(rsrcs_db))
|
||||
for rsrc_name, rsrc_obj in rsrcs_db.items():
|
||||
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
|
||||
sorted(rsrc_obj.needed_by))
|
||||
|
||||
# check if sync_points are created for cleanup traversal
|
||||
# [A, B, C, D, E, Stack]
|
||||
for entity_id in [5, 4, 3, 2, 1, stack_db.id]:
|
||||
sync_point = sync_point_object.SyncPoint.get_by_key(
|
||||
stack_db._context, entity_id, stack_db.current_traversal, False
|
||||
)
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
|
||||
class StackCreateTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(StackCreateTest, self).setUp()
|
||||
|
@ -444,15 +756,6 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
self.man.create_stack,
|
||||
self.ctx, stack_name, stack.t.t, {}, None, {})
|
||||
|
||||
def test_stack_create_enabled_convergence_engine(self):
|
||||
cfg.CONF.set_override('convergence_engine', True)
|
||||
ex = self.assertRaises(dispatcher.ExpectedException,
|
||||
self.man.create_stack, self.ctx, 'test',
|
||||
tools.wp_template, {}, None, {})
|
||||
self.assertEqual(exception.NotSupported, ex.exc_info[0])
|
||||
self.assertEqual('Convergence engine is not supported.',
|
||||
six.text_type(ex.exc_info[1]))
|
||||
|
||||
def test_stack_create_invalid_resource_name(self):
|
||||
stack_name = 'service_create_test_stack_invalid_res'
|
||||
stack = tools.get_stack(stack_name, self.ctx)
|
||||
|
@ -836,6 +1139,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
stack.t,
|
||||
convergence=False,
|
||||
current_traversal=None,
|
||||
prev_raw_template_id=None,
|
||||
current_deps=None,
|
||||
disable_rollback=True,
|
||||
nested_depth=0,
|
||||
owner_id=None,
|
||||
|
@ -895,6 +1200,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t,
|
||||
convergence=False, current_traversal=None,
|
||||
prev_raw_template_id=None, current_deps=None,
|
||||
disable_rollback=True, nested_depth=0,
|
||||
owner_id=None, parent_resource=None,
|
||||
stack_user_project_id='1234',
|
||||
|
@ -947,6 +1253,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t,
|
||||
convergence=False, current_traversal=None,
|
||||
prev_raw_template_id=None, current_deps=None,
|
||||
disable_rollback=False, nested_depth=0,
|
||||
owner_id=None, parent_resource=None,
|
||||
stack_user_project_id='1234',
|
||||
|
@ -1056,6 +1363,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t,
|
||||
convergence=False, current_traversal=None,
|
||||
prev_raw_template_id=None, current_deps=None,
|
||||
disable_rollback=True, nested_depth=0,
|
||||
owner_id=None, parent_resource=None,
|
||||
stack_user_project_id='1234', strict_validate=True,
|
||||
|
@ -1185,6 +1493,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t,
|
||||
convergence=False, current_traversal=None,
|
||||
prev_raw_template_id=None, current_deps=None,
|
||||
disable_rollback=True, nested_depth=0,
|
||||
owner_id=None, parent_resource=None,
|
||||
stack_user_project_id='1234', strict_validate=True,
|
||||
|
@ -1251,6 +1560,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
old_stack.t,
|
||||
convergence=False,
|
||||
current_traversal=None,
|
||||
prev_raw_template_id=None,
|
||||
current_deps=None,
|
||||
disable_rollback=True,
|
||||
nested_depth=0,
|
||||
owner_id=None,
|
||||
|
@ -1317,6 +1628,116 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
|
|||
six.text_type(ex))
|
||||
|
||||
|
||||
class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(StackConvergenceServiceCreateUpdateTest, self).setUp()
|
||||
cfg.CONF.set_override('convergence_engine', True)
|
||||
self.ctx = utils.dummy_context()
|
||||
self.patch('heat.engine.service.warnings')
|
||||
self.man = service.EngineService('a-host', 'a-topic')
|
||||
self.man.create_periodic_tasks()
|
||||
|
||||
def _stub_update_mocks(self, stack_to_load, stack_to_return):
|
||||
self.m.StubOutWithMock(parser, 'Stack')
|
||||
self.m.StubOutWithMock(parser.Stack, 'load')
|
||||
parser.Stack.load(self.ctx, stack=stack_to_load
|
||||
).AndReturn(stack_to_return)
|
||||
|
||||
self.m.StubOutWithMock(templatem, 'Template')
|
||||
self.m.StubOutWithMock(environment, 'Environment')
|
||||
|
||||
def _test_stack_create_convergence(self, stack_name):
|
||||
params = {'foo': 'bar'}
|
||||
template = '{ "Template": "data" }'
|
||||
|
||||
stack = tools.get_stack(stack_name, self.ctx,
|
||||
template=string_template_five,
|
||||
convergence=True)
|
||||
|
||||
self.m.StubOutWithMock(templatem, 'Template')
|
||||
self.m.StubOutWithMock(environment, 'Environment')
|
||||
self.m.StubOutWithMock(parser, 'Stack')
|
||||
|
||||
templatem.Template(template, files=None,
|
||||
env=stack.env).AndReturn(stack.t)
|
||||
environment.Environment(params).AndReturn(stack.env)
|
||||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t, owner_id=None,
|
||||
parent_resource=None,
|
||||
nested_depth=0, user_creds_id=None,
|
||||
stack_user_project_id=None,
|
||||
convergence=True).AndReturn(stack)
|
||||
|
||||
self.m.StubOutWithMock(stack, 'validate')
|
||||
stack.validate().AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
# TODO(later): Remove exception once convergence is supported.
|
||||
ex = self.assertRaises(dispatcher.ExpectedException,
|
||||
self.man.create_stack, self.ctx, stack_name,
|
||||
template, params, None, {})
|
||||
self.assertEqual(exception.NotSupported, ex.exc_info[0])
|
||||
self.assertEqual('Convergence engine is not supported.',
|
||||
six.text_type(ex.exc_info[1]))
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_stack_create_enabled_convergence_engine(self):
|
||||
stack_name = 'service_create_test_stack'
|
||||
self._test_stack_create_convergence(stack_name)
|
||||
|
||||
def test_stack_update_enabled_convergence_engine(self):
|
||||
stack_name = 'service_update_test_stack'
|
||||
params = {'foo': 'bar'}
|
||||
template = '{ "Template": "data" }'
|
||||
old_stack = tools.get_stack(stack_name, self.ctx,
|
||||
template=string_template_five,
|
||||
convergence=True)
|
||||
sid = old_stack.store()
|
||||
s = stack_object.Stack.get_by_id(self.ctx, sid)
|
||||
|
||||
stack = tools.get_stack(stack_name, self.ctx,
|
||||
template=string_template_five_update,
|
||||
convergence=True)
|
||||
|
||||
self._stub_update_mocks(s, old_stack)
|
||||
|
||||
templatem.Template(template, files=None,
|
||||
env=stack.env).AndReturn(stack.t)
|
||||
environment.Environment(params).AndReturn(stack.env)
|
||||
parser.Stack(self.ctx, stack.name,
|
||||
stack.t,
|
||||
owner_id=old_stack.owner_id,
|
||||
nested_depth=old_stack.nested_depth,
|
||||
user_creds_id=old_stack.user_creds_id,
|
||||
stack_user_project_id=old_stack.stack_user_project_id,
|
||||
timeout_mins=60,
|
||||
disable_rollback=True,
|
||||
parent_resource=None,
|
||||
strict_validate=True,
|
||||
tenant_id=old_stack.tenant_id,
|
||||
username=old_stack.username,
|
||||
convergence=old_stack.convergence,
|
||||
current_traversal=old_stack.current_traversal,
|
||||
prev_raw_template_id=old_stack.prev_raw_template_id,
|
||||
current_deps=old_stack.current_deps).AndReturn(stack)
|
||||
|
||||
self.m.StubOutWithMock(stack, 'validate')
|
||||
stack.validate().AndReturn(None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
|
||||
api_args = {'timeout_mins': 60}
|
||||
result = self.man.update_stack(self.ctx, old_stack.identifier(),
|
||||
template, params, None, api_args)
|
||||
self.assertEqual(old_stack.convergence, True)
|
||||
self.assertEqual(old_stack.identifier(), result)
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertTrue(result['stack_id'])
|
||||
self.m.VerifyAll()
|
||||
|
||||
|
||||
class StackServiceAuthorizeTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
@ -293,7 +293,9 @@ class StackTest(common.HeatTestCase):
|
|||
username=mox.IgnoreArg(),
|
||||
convergence=False,
|
||||
current_traversal=None,
|
||||
tags=mox.IgnoreArg())
|
||||
tags=mox.IgnoreArg(),
|
||||
prev_raw_template_id=None,
|
||||
current_deps=None)
|
||||
|
||||
self.m.ReplayAll()
|
||||
stack.Stack.load(self.ctx, stack_id=self.stack.id)
|
||||
|
|
Loading…
Reference in New Issue