Merge "Convergence: Check-Resource skeleton"
This commit is contained in:
commit
00693a4e61
@ -55,6 +55,10 @@ def raw_template_update(context, template_id, values):
|
||||
return IMPL.raw_template_update(context, template_id, values)
|
||||
|
||||
|
||||
def raw_template_delete(context, template_id):
|
||||
return IMPL.raw_template_delete(context, template_id)
|
||||
|
||||
|
||||
def resource_data_get_all(resource, data=None):
|
||||
return IMPL.resource_data_get_all(resource, data)
|
||||
|
||||
|
@ -116,6 +116,11 @@ def raw_template_update(context, template_id, values):
|
||||
return raw_template_ref
|
||||
|
||||
|
||||
def raw_template_delete(context, template_id):
|
||||
raw_template = raw_template_get(context, template_id)
|
||||
raw_template.delete()
|
||||
|
||||
|
||||
def resource_get(context, resource_id):
|
||||
result = model_query(context, models.Resource).get(resource_id)
|
||||
|
||||
|
@ -41,6 +41,7 @@ from heat.engine import resources
|
||||
from heat.engine import rsrc_defn
|
||||
from heat.engine import scheduler
|
||||
from heat.engine import support
|
||||
from heat.engine import template
|
||||
from heat.objects import resource as resource_objects
|
||||
from heat.objects import resource_data as resource_data_objects
|
||||
from heat.rpc import client as rpc_client
|
||||
@ -86,6 +87,12 @@ class ResourceUnknownStatus(exception.HeatException):
|
||||
result=result, status_reason=status_reason, **kwargs)
|
||||
|
||||
|
||||
class UpdateInProgress(Exception):
|
||||
def __init__(self, resource_name='Unknown'):
|
||||
msg = _("The resource %s is already being updated.") % resource_name
|
||||
super(Exception, self).__init__(six.text_type(msg))
|
||||
|
||||
|
||||
class Resource(object):
|
||||
ACTIONS = (
|
||||
INIT, CREATE, DELETE, UPDATE, ROLLBACK,
|
||||
@ -228,6 +235,26 @@ class Resource(object):
|
||||
def stack(self, stack):
|
||||
self._stackref = weakref.ref(stack)
|
||||
|
||||
@classmethod
|
||||
def load(cls, context, resource_id, data):
|
||||
# FIXME(sirushtim): Import this in global space.
|
||||
from heat.engine import stack as stack_mod
|
||||
db_res = resource_objects.Resource.get_obj(context, resource_id)
|
||||
stack = stack_mod.Stack.load(context, db_res.stack_id, cache_data=data)
|
||||
# NOTE(sirushtim): Because on delete/cleanup operations, we simply
|
||||
# update with another template, the stack object won't have the
|
||||
# template of the previous stack-run.
|
||||
tmpl = template.Template.load(context, db_res.current_template_id)
|
||||
stack_res = tmpl.resource_definitions(stack)[db_res.name]
|
||||
resource = cls(db_res.name, stack_res, stack)
|
||||
resource._load_data(db_res)
|
||||
return resource, stack
|
||||
|
||||
def make_replacement(self):
|
||||
# NOTE(sirushtim): Used for mocking. Will be complete
|
||||
# once convergence-resource-replacement is implemented.
|
||||
pass
|
||||
|
||||
def reparse(self):
|
||||
self.properties = self.t.properties(self.properties_schema,
|
||||
self.context)
|
||||
@ -267,6 +294,13 @@ class Resource(object):
|
||||
rs.update_and_save({'rsrc_metadata': metadata})
|
||||
self._rsrc_metadata = metadata
|
||||
|
||||
def clear_requirers(self, gone_requires):
|
||||
self.requires = set(self.requires) - set(gone_requires)
|
||||
self.requires = list(self.requires)
|
||||
self._store_or_update(self.action,
|
||||
self.status,
|
||||
self.status_reason)
|
||||
|
||||
@classmethod
|
||||
def set_needed_by(cls, db_rsrc, needed_by):
|
||||
if db_rsrc:
|
||||
|
@ -44,12 +44,14 @@ from heat.engine import scheduler
|
||||
from heat.engine import sync_point
|
||||
from heat.engine import template as tmpl
|
||||
from heat.engine import update
|
||||
from heat.objects import raw_template as raw_template_object
|
||||
from heat.objects import resource as resource_objects
|
||||
from heat.objects import snapshot as snapshot_object
|
||||
from heat.objects import stack as stack_object
|
||||
from heat.objects import stack_tag as stack_tag_object
|
||||
from heat.objects import user_creds as ucreds_object
|
||||
from heat.rpc import api as rpc_api
|
||||
from heat.rpc import worker_client as rpc_worker_client
|
||||
|
||||
cfg.CONF.import_opt('error_wait_time', 'heat.common.config')
|
||||
|
||||
@ -140,6 +142,7 @@ class Stack(collections.Mapping):
|
||||
self.prev_raw_template_id = prev_raw_template_id
|
||||
self.current_deps = current_deps
|
||||
self.cache_data = cache_data
|
||||
self._worker_client = None
|
||||
|
||||
if use_stored_context:
|
||||
self.context = self.stored_context()
|
||||
@ -164,6 +167,13 @@ class Stack(collections.Mapping):
|
||||
else:
|
||||
self.outputs = {}
|
||||
|
||||
@property
|
||||
def worker_client(self):
|
||||
'''Return a client for making engine RPC calls.'''
|
||||
if not self._worker_client:
|
||||
self._worker_client = rpc_worker_client.WorkerClient()
|
||||
return self._worker_client
|
||||
|
||||
@property
|
||||
def env(self):
|
||||
"""This is a helper to allow resources to access stack.env."""
|
||||
@ -972,6 +982,10 @@ class Stack(collections.Mapping):
|
||||
LOG.info(_LI("Triggering resource %(rsrc_id)s "
|
||||
"for update=%(is_update)s"),
|
||||
{'rsrc_id': rsrc_id, 'is_update': is_update})
|
||||
self.worker_client.check_resource(self.context, rsrc_id,
|
||||
self.current_traversal,
|
||||
{}, is_update)
|
||||
|
||||
self.temp_update_requires(self.convergence_dependencies)
|
||||
|
||||
def _update_or_store_resources(self):
|
||||
@ -1583,3 +1597,29 @@ class Stack(collections.Mapping):
|
||||
def cache_data_resource_attribute(self, resource_name, attribute_key):
|
||||
return self.cache_data.get(
|
||||
resource_name, {}).get('attributes', {}).get(attribute_key)
|
||||
|
||||
def mark_complete(self, traversal_id):
|
||||
'''
|
||||
Mark the update as complete.
|
||||
|
||||
This currently occurs when all resources have been updated; there may
|
||||
still be resources being cleaned up, but the Stack should now be in
|
||||
service.
|
||||
'''
|
||||
if traversal_id != self.current_traversal:
|
||||
return
|
||||
|
||||
LOG.info('[%s(%s)] update traversal %s complete',
|
||||
self.name, self.id, traversal_id)
|
||||
|
||||
prev_prev_id = self.prev_raw_template_id
|
||||
self.prev_raw_template_id = self.t.id
|
||||
self.store()
|
||||
|
||||
if (prev_prev_id is not None and
|
||||
prev_prev_id != self.t.id):
|
||||
raw_template_object.RawTemplate.delete(self.context,
|
||||
prev_prev_id)
|
||||
|
||||
reason = 'Stack %s completed successfully' % self.action
|
||||
self.state_set(self.action, self.COMPLETE, reason)
|
||||
|
@ -12,8 +12,26 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from heat.common.i18n import _
|
||||
from heat.objects import sync_point as sync_point_object
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
KEY_SEPERATOR = ':'
|
||||
|
||||
|
||||
def _dump_list(items, separator=', '):
|
||||
return separator.join(map(str, items))
|
||||
|
||||
|
||||
def make_key(*components):
|
||||
assert len(components) >= 2
|
||||
return _dump_list(components, KEY_SEPERATOR)
|
||||
|
||||
|
||||
def create(context, entity_id, traversal_id, is_update, stack_id):
|
||||
"""
|
||||
@ -29,8 +47,14 @@ def get(context, entity_id, traversal_id, is_update):
|
||||
"""
|
||||
Retrieves a sync point entry from DB.
|
||||
"""
|
||||
return sync_point_object.SyncPoint.get_by_key(context, entity_id,
|
||||
traversal_id, is_update)
|
||||
sync_point = sync_point_object.SyncPoint.get_by_key(context, entity_id,
|
||||
traversal_id,
|
||||
is_update)
|
||||
if sync_point is None:
|
||||
key = (entity_id, traversal_id, is_update)
|
||||
raise SyncPointNotFound(key)
|
||||
|
||||
return sync_point
|
||||
|
||||
|
||||
def delete_all(context, stack_id, traversal_id):
|
||||
@ -40,3 +64,52 @@ def delete_all(context, stack_id, traversal_id):
|
||||
return sync_point_object.SyncPoint.delete_all_by_stack_and_traversal(
|
||||
context, stack_id, traversal_id
|
||||
)
|
||||
|
||||
|
||||
def update_input_data(context, entity_id, current_traversal,
|
||||
is_update, atomic_key, input_data):
|
||||
sync_point_object.SyncPoint.update_input_data(
|
||||
context, entity_id, current_traversal, is_update, atomic_key,
|
||||
input_data)
|
||||
|
||||
|
||||
def deserialize_input_data(db_input_data):
|
||||
db_input_data = db_input_data.get('input_data')
|
||||
if not db_input_data:
|
||||
return {}
|
||||
|
||||
return {tuple(i): j for i, j in db_input_data}
|
||||
|
||||
|
||||
def serialize_input_data(input_data):
|
||||
return {'input_data': [[list(i), j] for i, j in six.iteritems(input_data)]}
|
||||
|
||||
|
||||
def sync(cnxt, entity_id, current_traversal, is_update, propagate,
|
||||
predecessors, new_data):
|
||||
sync_point = get(cnxt, entity_id, current_traversal,
|
||||
is_update)
|
||||
input_data = dict(deserialize_input_data(sync_point.input_data))
|
||||
input_data.update(new_data)
|
||||
waiting = predecessors - set(input_data)
|
||||
|
||||
# Note: update must be atomic
|
||||
update_input_data(cnxt, entity_id, current_traversal,
|
||||
is_update, sync_point.atomic_key,
|
||||
serialize_input_data(input_data))
|
||||
|
||||
key = make_key(entity_id, current_traversal, is_update)
|
||||
if waiting:
|
||||
LOG.debug('[%s] Waiting %s: Got %s; still need %s',
|
||||
key, entity_id, _dump_list(input_data), _dump_list(waiting))
|
||||
else:
|
||||
LOG.debug('[%s] Ready %s: Got %s',
|
||||
key, entity_id, _dump_list(input_data))
|
||||
propagate(entity_id, input_data)
|
||||
|
||||
|
||||
class SyncPointNotFound(Exception):
|
||||
'''Raised when resource update requires replacement.'''
|
||||
def __init__(self, sync_point):
|
||||
msg = _("Sync Point %s not found") % (sync_point, )
|
||||
super(Exception, self).__init__(six.text_type(msg))
|
||||
|
@ -16,10 +16,16 @@
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
|
||||
from heat.common import context
|
||||
from heat.common import exception
|
||||
from heat.common.i18n import _LE
|
||||
from heat.common.i18n import _LI
|
||||
from heat.common import messaging as rpc_messaging
|
||||
from heat.engine import dependencies
|
||||
from heat.engine import resource
|
||||
from heat.engine import sync_point
|
||||
from heat.openstack.common import service
|
||||
from heat.rpc import worker_client as rpc_client
|
||||
|
||||
@ -36,7 +42,7 @@ class WorkerService(service.Service):
|
||||
or expect replies from these messages.
|
||||
"""
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
RPC_API_VERSION = '1.1'
|
||||
|
||||
def __init__(self,
|
||||
host,
|
||||
@ -76,3 +82,146 @@ class WorkerService(service.Service):
|
||||
LOG.error(_LE("WorkerService is failed to stop, %s"), e)
|
||||
|
||||
super(WorkerService, self).stop()
|
||||
|
||||
@context.request_context
|
||||
def check_resource(self, cnxt, resource_id, current_traversal, data,
|
||||
is_update):
|
||||
'''
|
||||
Process a node in the dependency graph.
|
||||
|
||||
The node may be associated with either an update or a cleanup of its
|
||||
associated resource.
|
||||
'''
|
||||
try:
|
||||
rsrc, stack = resource.Resource.load(cnxt, resource_id, data)
|
||||
except exception.NotFound:
|
||||
return
|
||||
tmpl = stack.t
|
||||
|
||||
if current_traversal != rsrc.stack.current_traversal:
|
||||
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
|
||||
return
|
||||
|
||||
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
|
||||
for i, j in rsrc.stack.current_deps['edges'])
|
||||
deps = dependencies.Dependencies(edges=current_deps)
|
||||
graph = deps.graph()
|
||||
|
||||
if is_update:
|
||||
if (rsrc.replaced_by is not None and
|
||||
rsrc.current_template_id != tmpl.id):
|
||||
return
|
||||
|
||||
try:
|
||||
check_resource_update(rsrc, tmpl.id, data)
|
||||
except resource.UpdateReplace:
|
||||
# NOTE(sirushtim): Implemented by spec
|
||||
# convergence-resource-replacement.
|
||||
rsrc.make_replacement()
|
||||
return
|
||||
except resource.UpdateInProgress:
|
||||
return
|
||||
|
||||
input_data = construct_input_data(rsrc)
|
||||
else:
|
||||
try:
|
||||
check_resource_cleanup(rsrc, tmpl.id, data)
|
||||
except resource.UpdateInProgress:
|
||||
return
|
||||
|
||||
graph_key = (rsrc.id, is_update)
|
||||
if graph_key not in graph and rsrc.replaces is not None:
|
||||
# If we are a replacement, impersonate the replaced resource for
|
||||
# the purposes of calculating whether subsequent resources are
|
||||
# ready, since everybody has to work from the same version of the
|
||||
# graph. Our real resource ID is sent in the input_data, so the
|
||||
# dependencies will get updated to point to this resource in time
|
||||
# for the next traversal.
|
||||
graph_key = (rsrc.replaces, is_update)
|
||||
|
||||
try:
|
||||
for req, fwd in deps.required_by(graph_key):
|
||||
propagate_check_resource(
|
||||
cnxt, self._rpc_client, req, current_traversal,
|
||||
set(graph[(req, fwd)]), graph_key,
|
||||
input_data if fwd else rsrc.id, fwd)
|
||||
|
||||
check_stack_complete(cnxt, rsrc.stack, current_traversal,
|
||||
rsrc.id, graph, is_update)
|
||||
except sync_point.SyncPointNotFound:
|
||||
# NOTE(sirushtim): Implemented by spec
|
||||
# convergence-concurrent-workflow
|
||||
pass
|
||||
|
||||
|
||||
def construct_input_data(rsrc):
|
||||
attributes = rsrc.stack.get_dep_attrs(
|
||||
six.itervalues(rsrc.stack.resources),
|
||||
rsrc.stack.outputs,
|
||||
rsrc.name)
|
||||
resolved_attributes = {attr: rsrc.FnGetAtt(attr) for attr in attributes}
|
||||
input_data = {'id': rsrc.id,
|
||||
'name': rsrc.name,
|
||||
'physical_resource_id': rsrc.resource_id,
|
||||
'attrs': resolved_attributes}
|
||||
return input_data
|
||||
|
||||
|
||||
def check_stack_complete(cnxt, stack, current_traversal, sender, graph,
|
||||
is_update):
|
||||
'''
|
||||
Mark the stack complete if the update is complete.
|
||||
|
||||
Complete is currently in the sense that all desired resources are in
|
||||
service, not that superfluous ones have been cleaned up.
|
||||
'''
|
||||
roots = set(key for (key, fwd), node in graph.items()
|
||||
if not any(f for k, f in node.required_by()))
|
||||
|
||||
if sender not in roots:
|
||||
return
|
||||
|
||||
def mark_complete(stack_id, data):
|
||||
stack.mark_complete(current_traversal)
|
||||
|
||||
sync_point.sync(cnxt, stack.id, current_traversal, is_update,
|
||||
mark_complete, roots, {sender: None})
|
||||
|
||||
|
||||
def propagate_check_resource(cnxt, rpc_client, next_res_id,
|
||||
current_traversal, predecessors, sender,
|
||||
sender_data, is_update):
|
||||
'''
|
||||
Trigger processing of a node if all of its dependencies are satisfied.
|
||||
'''
|
||||
def do_check(entity_id, data):
|
||||
rpc_client.check_resource(cnxt, entity_id, current_traversal,
|
||||
data, is_update)
|
||||
|
||||
sync_point.sync(cnxt, next_res_id, current_traversal,
|
||||
is_update, do_check, predecessors,
|
||||
{sender: sender_data})
|
||||
|
||||
|
||||
def check_resource_update(rsrc, template_id, data):
|
||||
'''
|
||||
Create or update the Resource if appropriate.
|
||||
'''
|
||||
input_data = {in_data.name: in_data for in_data in data.values()}
|
||||
|
||||
if rsrc.resource_id is None:
|
||||
rsrc.create(template_id, input_data)
|
||||
else:
|
||||
rsrc.update(template_id, input_data)
|
||||
|
||||
|
||||
def check_resource_cleanup(rsrc, template_id, data):
|
||||
'''
|
||||
Delete the Resource if appropriate.
|
||||
'''
|
||||
# Clear out deleted resources from the requirers list
|
||||
rsrc.clear_requirers(rsrc_id for rsrc_id, id in data.items()
|
||||
if id is None)
|
||||
|
||||
if rsrc.current_template_id != template_id:
|
||||
rsrc.delete(template_id, data)
|
||||
|
@ -87,3 +87,7 @@ class RawTemplate(
|
||||
@classmethod
|
||||
def update_by_id(cls, context, template_id, values):
|
||||
return db_api.raw_template_update(context, template_id, values)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, context, template_id):
|
||||
return db_api.raw_template_delete(context, template_id)
|
||||
|
@ -27,6 +27,7 @@ class WorkerClient(object):
|
||||
API version history::
|
||||
|
||||
1.0 - Initial version.
|
||||
1.1 - Added check_resource.
|
||||
'''
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -47,3 +48,10 @@ class WorkerClient(object):
|
||||
else:
|
||||
client = self._client
|
||||
client.cast(ctxt, method, **kwargs)
|
||||
|
||||
def check_resource(self, ctxt, resource_id,
|
||||
current_traversal, data, is_update):
|
||||
self.cast(ctxt, self.make_msg(
|
||||
'check_resource', resource_id=resource_id,
|
||||
current_traversal=current_traversal, data=data,
|
||||
is_update=is_update))
|
||||
|
@ -1471,6 +1471,13 @@ class DBAPIRawTemplateTest(common.HeatTestCase):
|
||||
self.assertEqual(new_t, updated_tp.template)
|
||||
self.assertEqual(new_files, updated_tp.files)
|
||||
|
||||
def test_raw_template_delete(self):
|
||||
t = template_format.parse(wp_template)
|
||||
tp = create_raw_template(self.ctx, template=t)
|
||||
db_api.raw_template_delete(self.ctx, tp.id)
|
||||
self.assertRaises(exception.NotFound, db_api.raw_template_get,
|
||||
self.ctx, tp.id)
|
||||
|
||||
|
||||
class DBAPIUserCredsTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
|
@ -45,6 +45,45 @@ resources:
|
||||
UserData: wordpress
|
||||
'''
|
||||
|
||||
string_template_five = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Random String templates
|
||||
|
||||
parameters:
|
||||
salt:
|
||||
type: string
|
||||
default: "quickbrownfox"
|
||||
|
||||
resources:
|
||||
A:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
B:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
C:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: [A, B]
|
||||
properties:
|
||||
salt: {get_attr: [A, value]}
|
||||
|
||||
D:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
E:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
'''
|
||||
|
||||
|
||||
def get_stack(stack_name, ctx, template=None, with_params=True,
|
||||
convergence=False):
|
||||
|
@ -51,6 +51,7 @@ from heat.objects import watch_rule as watch_rule_object
|
||||
from heat.openstack.common import threadgroup
|
||||
from heat.rpc import api as rpc_api
|
||||
from heat.rpc import worker_api
|
||||
from heat.rpc import worker_client
|
||||
from heat.tests import common
|
||||
from heat.tests.engine import tools
|
||||
from heat.tests import generic_resource as generic_rsrc
|
||||
@ -61,45 +62,6 @@ cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
|
||||
cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config')
|
||||
|
||||
|
||||
string_template_five = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Random String templates
|
||||
|
||||
parameters:
|
||||
salt:
|
||||
type: string
|
||||
default: "quickbrownfox"
|
||||
|
||||
resources:
|
||||
A:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
B:
|
||||
type: OS::Heat::RandomString
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
C:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: [A, B]
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
D:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
|
||||
E:
|
||||
type: OS::Heat::RandomString
|
||||
depends_on: C
|
||||
properties:
|
||||
salt: {get_param: salt}
|
||||
'''
|
||||
|
||||
string_template_five_update = '''
|
||||
heat_template_version: 2013-05-23
|
||||
description: Random String templates
|
||||
@ -231,14 +193,16 @@ resources:
|
||||
'''
|
||||
|
||||
|
||||
@mock.patch.object(worker_client.WorkerClient, 'check_resource')
|
||||
class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(StackConvergenceCreateUpdateDeleteTest, self).setUp()
|
||||
cfg.CONF.set_override('convergence_engine', True)
|
||||
|
||||
def test_conv_wordpress_single_instance_stack_create(self):
|
||||
def test_conv_wordpress_single_instance_stack_create(self, mock_cr):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
convergence=True)
|
||||
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
self.assertIsNone(stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies([((1, True), None)])',
|
||||
@ -252,11 +216,20 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
|
||||
|
||||
self.assertEqual(stack_db.convergence, True)
|
||||
self.assertEqual({'edges': [[[1, True], None]]}, stack_db.current_deps)
|
||||
leaves = stack.convergence_dependencies.leaves()
|
||||
expected_calls = []
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
stack.context, rsrc_id, stack.current_traversal, {},
|
||||
is_update))
|
||||
self.assertEqual(expected_calls, mock_cr.mock_calls)
|
||||
|
||||
def test_conv_string_five_instance_stack_create(self):
|
||||
def test_conv_string_five_instance_stack_create(self, mock_cr):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
self.assertIsNone(stack.ext_rsrcs_db)
|
||||
self.assertEqual('Dependencies(['
|
||||
@ -299,9 +272,18 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
def test_conv_string_five_instance_stack_update(self):
|
||||
leaves = stack.convergence_dependencies.leaves()
|
||||
expected_calls = []
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
stack.context, rsrc_id, stack.current_traversal, {},
|
||||
is_update))
|
||||
self.assertEqual(expected_calls, mock_cr.mock_calls)
|
||||
|
||||
def test_conv_string_five_instance_stack_update(self, mock_cr):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
# create stack
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
@ -401,9 +383,25 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
def test_conv_empty_template_stack_update_delete(self):
|
||||
leaves = stack.convergence_dependencies.leaves()
|
||||
expected_calls = []
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
stack.context, rsrc_id, stack.current_traversal, {},
|
||||
is_update))
|
||||
|
||||
leaves = curr_stack.convergence_dependencies.leaves()
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
curr_stack.context, rsrc_id, curr_stack.current_traversal,
|
||||
{}, is_update))
|
||||
self.assertEqual(expected_calls, mock_cr.mock_calls)
|
||||
|
||||
def test_conv_empty_template_stack_update_delete(self, mock_cr):
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=string_template_five,
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
# create stack
|
||||
stack.converge_stack(template=stack.t, action=stack.CREATE)
|
||||
@ -458,6 +456,22 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
|
||||
self.assertIsNotNone(sync_point)
|
||||
self.assertEqual(stack_db.id, sync_point.stack_id)
|
||||
|
||||
leaves = stack.convergence_dependencies.leaves()
|
||||
expected_calls = []
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
stack.context, rsrc_id, stack.current_traversal, {},
|
||||
is_update))
|
||||
|
||||
leaves = curr_stack.convergence_dependencies.leaves()
|
||||
for rsrc_id, is_update in leaves:
|
||||
expected_calls.append(
|
||||
mock.call.worker_client.WorkerClient.check_resource(
|
||||
curr_stack.context, rsrc_id, curr_stack.current_traversal,
|
||||
{}, is_update))
|
||||
self.assertEqual(expected_calls, mock_cr.mock_calls)
|
||||
|
||||
|
||||
class StackCreateTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
@ -1342,7 +1356,7 @@ class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase):
|
||||
template = '{ "Template": "data" }'
|
||||
|
||||
stack = tools.get_stack(stack_name, self.ctx,
|
||||
template=string_template_five,
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
|
||||
self.m.StubOutWithMock(templatem, 'Template')
|
||||
@ -1382,7 +1396,7 @@ class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase):
|
||||
params = {'foo': 'bar'}
|
||||
template = '{ "Template": "data" }'
|
||||
old_stack = tools.get_stack(stack_name, self.ctx,
|
||||
template=string_template_five,
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
sid = old_stack.store()
|
||||
s = stack_object.Stack.get_by_id(self.ctx, sid)
|
||||
|
@ -15,22 +15,28 @@
|
||||
|
||||
import mock
|
||||
|
||||
from heat.engine import resource
|
||||
from heat.engine import stack
|
||||
from heat.engine import sync_point
|
||||
from heat.engine import worker
|
||||
from heat.rpc import worker_client
|
||||
from heat.tests import common
|
||||
from heat.tests.engine import tools
|
||||
from heat.tests import utils
|
||||
|
||||
|
||||
class WorkerServiceTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(WorkerServiceTest, self).setUp()
|
||||
thread_gruop_mgr = mock.Mock()
|
||||
thread_group_mgr = mock.Mock()
|
||||
self.worker = worker.WorkerService('host-1',
|
||||
'topic-1',
|
||||
'engine_id',
|
||||
thread_gruop_mgr)
|
||||
thread_group_mgr)
|
||||
|
||||
def test_make_sure_rpc_version(self):
|
||||
self.assertEqual(
|
||||
'1.0',
|
||||
'1.1',
|
||||
worker.WorkerService.RPC_API_VERSION,
|
||||
('RPC version is changed, please update this test to new version '
|
||||
'and make sure additional test cases are added for RPC APIs '
|
||||
@ -80,3 +86,223 @@ class WorkerServiceTest(common.HeatTestCase):
|
||||
self.worker.stop()
|
||||
mock_rpc_server.stop.assert_called_once_with()
|
||||
mock_rpc_server.wait.assert_called_once_with()
|
||||
|
||||
|
||||
@mock.patch.object(worker, 'construct_input_data')
|
||||
@mock.patch.object(worker, 'check_stack_complete')
|
||||
@mock.patch.object(worker, 'propagate_check_resource')
|
||||
@mock.patch.object(worker, 'check_resource_cleanup')
|
||||
@mock.patch.object(worker, 'check_resource_update')
|
||||
class CheckWorkflowUpdateTest(common.HeatTestCase):
|
||||
@mock.patch.object(worker_client.WorkerClient, 'check_resource',
|
||||
lambda *_: None)
|
||||
def setUp(self):
|
||||
super(CheckWorkflowUpdateTest, self).setUp()
|
||||
thread_group_mgr = mock.Mock()
|
||||
self.worker = worker.WorkerService('host-1',
|
||||
'topic-1',
|
||||
'engine_id',
|
||||
thread_group_mgr)
|
||||
self.worker._rpc_client = worker_client.WorkerClient()
|
||||
self.ctx = utils.dummy_context()
|
||||
self.stack = tools.get_stack(
|
||||
'check_workflow_create_stack', self.ctx,
|
||||
template=tools.string_template_five, convergence=True)
|
||||
self.stack.converge_stack(self.stack.t)
|
||||
self.resource = self.stack['A']
|
||||
self.is_update = True
|
||||
self.graph_key = (self.resource.id, self.is_update)
|
||||
|
||||
def test_resource_not_available(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self.worker.check_resource(
|
||||
self.ctx, 'non-existant-id', self.stack.current_traversal, {},
|
||||
True)
|
||||
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
|
||||
self.assertFalse(mocked.called)
|
||||
|
||||
def test_stale_traversal(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self.worker.check_resource(self.ctx, self.resource.id,
|
||||
'stale-traversal', {}, True)
|
||||
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
|
||||
self.assertFalse(mocked.called)
|
||||
|
||||
def test_is_update_traversal(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update)
|
||||
mock_cru.assert_called_once_with(self.resource,
|
||||
self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertFalse(mock_crc.called)
|
||||
|
||||
expected_calls = []
|
||||
for req, fwd in self.stack.convergence_dependencies.leaves():
|
||||
expected_calls.append(
|
||||
(mock.call.worker.propagate_check_resource.
|
||||
assert_called_once_with(
|
||||
self.ctx, mock.ANY, mock.ANY,
|
||||
self.stack.current_traversal, mock.ANY,
|
||||
self.graph_key, {}, self.is_update)))
|
||||
mock_csc.assert_called_once_with(
|
||||
self.ctx, mock.ANY, self.stack.current_traversal,
|
||||
self.resource.id,
|
||||
mock.ANY, True)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'make_replacement')
|
||||
def test_is_update_traversal_raise_update_replace(
|
||||
self, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
mock_cru.side_effect = resource.UpdateReplace
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update)
|
||||
mock_cru.assert_called_once_with(self.resource,
|
||||
self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertTrue(mock_mr.called)
|
||||
self.assertFalse(mock_crc.called)
|
||||
self.assertFalse(mock_pcr.called)
|
||||
self.assertFalse(mock_csc.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'make_replacement')
|
||||
def test_is_update_traversal_raise_update_inprogress(
|
||||
self, mock_mr, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
mock_cru.side_effect = resource.UpdateInProgress
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update)
|
||||
mock_cru.assert_called_once_with(self.resource,
|
||||
self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertFalse(mock_mr.called)
|
||||
self.assertFalse(mock_crc.called)
|
||||
self.assertFalse(mock_pcr.called)
|
||||
self.assertFalse(mock_csc.called)
|
||||
|
||||
|
||||
@mock.patch.object(worker, 'construct_input_data')
|
||||
@mock.patch.object(worker, 'check_stack_complete')
|
||||
@mock.patch.object(worker, 'propagate_check_resource')
|
||||
@mock.patch.object(worker, 'check_resource_cleanup')
|
||||
@mock.patch.object(worker, 'check_resource_update')
|
||||
class CheckWorkflowCleanupTest(common.HeatTestCase):
|
||||
@mock.patch.object(worker_client.WorkerClient, 'check_resource',
|
||||
lambda *_: None)
|
||||
def setUp(self):
|
||||
super(CheckWorkflowCleanupTest, self).setUp()
|
||||
thread_group_mgr = mock.Mock()
|
||||
self.worker = worker.WorkerService('host-1',
|
||||
'topic-1',
|
||||
'engine_id',
|
||||
thread_group_mgr)
|
||||
self.worker._rpc_client = worker_client.WorkerClient()
|
||||
self.ctx = utils.dummy_context()
|
||||
tstack = tools.get_stack(
|
||||
'check_workflow_create_stack', self.ctx,
|
||||
template=tools.string_template_five, convergence=True)
|
||||
tstack.converge_stack(tstack.t, action=tstack.CREATE)
|
||||
self.stack = stack.Stack.load(self.ctx, stack_id=tstack.id)
|
||||
self.stack.converge_stack(self.stack.t, action=self.stack.DELETE)
|
||||
self.resource = self.stack['A']
|
||||
self.is_update = False
|
||||
self.graph_key = (self.resource.id, self.is_update)
|
||||
|
||||
def test_is_cleanup_traversal(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update)
|
||||
self.assertFalse(mock_cru.called)
|
||||
mock_crc.assert_called_once_with(
|
||||
self.resource, self.resource.stack.t.id,
|
||||
{})
|
||||
|
||||
def test_is_cleanup_traversal_raise_update_inprogress(
|
||||
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
|
||||
mock_crc.side_effect = resource.UpdateInProgress
|
||||
self.worker.check_resource(
|
||||
self.ctx, self.resource.id, self.stack.current_traversal, {},
|
||||
self.is_update)
|
||||
mock_crc.assert_called_once_with(self.resource,
|
||||
self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertFalse(mock_cru.called)
|
||||
self.assertFalse(mock_pcr.called)
|
||||
self.assertFalse(mock_csc.called)
|
||||
|
||||
|
||||
class MiscMethodsTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(MiscMethodsTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
self.stack = tools.get_stack(
|
||||
'check_workflow_create_stack', self.ctx,
|
||||
template=tools.string_template_five, convergence=True)
|
||||
self.stack.converge_stack(self.stack.t)
|
||||
self.resource = self.stack['A']
|
||||
|
||||
def test_construct_input_data(self):
|
||||
expected_input_data = {'attrs': {'value': None},
|
||||
'id': mock.ANY,
|
||||
'physical_resource_id': None,
|
||||
'name': 'A'}
|
||||
actual_input_data = worker.construct_input_data(self.resource)
|
||||
self.assertEqual(expected_input_data, actual_input_data)
|
||||
|
||||
@mock.patch.object(sync_point, 'sync')
|
||||
def test_check_stack_complete_root(self, mock_sync):
|
||||
worker.check_stack_complete(
|
||||
self.ctx, self.stack, self.stack.current_traversal,
|
||||
self.stack['E'].id, self.stack.convergence_dependencies.graph(),
|
||||
True)
|
||||
mock_sync.assert_called_once_with(
|
||||
self.ctx, self.stack.id, self.stack.current_traversal, True,
|
||||
mock.ANY, mock.ANY, {self.stack['E'].id: None})
|
||||
|
||||
@mock.patch.object(sync_point, 'sync')
|
||||
def test_check_stack_complete_child(self, mock_sync):
|
||||
worker.check_stack_complete(
|
||||
self.ctx, self.stack, self.stack.current_traversal,
|
||||
self.resource.id, self.stack.convergence_dependencies.graph(),
|
||||
True)
|
||||
self.assertFalse(mock_sync.called)
|
||||
|
||||
@mock.patch.object(sync_point, 'sync')
|
||||
def test_propagate_check_resource(self, mock_sync):
|
||||
worker.propagate_check_resource(
|
||||
self.ctx, mock.ANY, mock.ANY,
|
||||
self.stack.current_traversal, mock.ANY,
|
||||
mock.ANY, {}, True)
|
||||
self.assertTrue(mock_sync.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'create')
|
||||
def test_check_resource_update_create(self, mock_create):
|
||||
worker.check_resource_update(self.resource, self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertTrue(mock_create.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'update')
|
||||
def test_check_resource_update_update(self, mock_update):
|
||||
self.resource.resource_id = 'physical-res-id'
|
||||
worker.check_resource_update(self.resource, self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertTrue(mock_update.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'delete')
|
||||
@mock.patch.object(resource.Resource, 'clear_requirers')
|
||||
def test_check_resource_cleanup_delete(self, mock_cr, mock_delete):
|
||||
self.resource.current_template_id = 'new-template-id'
|
||||
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertTrue(mock_cr.called)
|
||||
self.assertTrue(mock_delete.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'delete')
|
||||
@mock.patch.object(resource.Resource, 'clear_requirers')
|
||||
def test_check_resource_cleanup_nodelete(self, mock_cr, mock_delete):
|
||||
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
|
||||
{})
|
||||
self.assertTrue(mock_cr.called)
|
||||
self.assertFalse(mock_delete.called)
|
||||
|
@ -85,6 +85,21 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.assertIsInstance(res, generic_rsrc.GenericResource)
|
||||
self.assertEqual("INIT", res.action)
|
||||
|
||||
def test_resource_load_with_state(self):
|
||||
self.stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
template.Template(empty_template))
|
||||
self.stack.store()
|
||||
snippet = rsrc_defn.ResourceDefinition('aresource',
|
||||
'GenericResourceType')
|
||||
# Store Resource
|
||||
res = resource.Resource('aresource', snippet, self.stack)
|
||||
res.current_template_id = self.stack.t.id
|
||||
res.state_set('CREATE', 'IN_PROGRESS')
|
||||
self.stack.add_resource(res)
|
||||
loaded_res, stack = resource.Resource.load(self.stack.context,
|
||||
res.id, {})
|
||||
self.assertEqual(loaded_res.id, res.id)
|
||||
|
||||
def test_resource_invalid_name(self):
|
||||
snippet = rsrc_defn.ResourceDefinition('wrong/name',
|
||||
'GenericResourceType')
|
||||
|
@ -32,6 +32,7 @@ from heat.engine import resource
|
||||
from heat.engine import scheduler
|
||||
from heat.engine import stack
|
||||
from heat.engine import template
|
||||
from heat.objects import raw_template as raw_template_object
|
||||
from heat.objects import stack as stack_object
|
||||
from heat.objects import stack_tag as stack_tag_object
|
||||
from heat.objects import user_creds as ucreds_object
|
||||
@ -2036,6 +2037,62 @@ class StackTest(common.HeatTestCase):
|
||||
self.assertEqual('foo', params.get('param1'))
|
||||
self.assertEqual('bar', params.get('param2'))
|
||||
|
||||
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
|
||||
def test_mark_complete_create(self, mock_delete):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
'foo': {'Type': 'GenericResourceType'}
|
||||
}
|
||||
})
|
||||
|
||||
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
|
||||
tmpl_stack.store()
|
||||
tmpl_stack.current_traversal = 'some-traversal'
|
||||
tmpl_stack.mark_complete('some-traversal')
|
||||
self.assertEqual(tmpl_stack.prev_raw_template_id,
|
||||
tmpl_stack.t.id)
|
||||
self.assertFalse(mock_delete.called)
|
||||
self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE)
|
||||
|
||||
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
|
||||
@mock.patch.object(stack.Stack, 'store')
|
||||
def test_mark_complete_update(self, mock_store, mock_delete):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
'foo': {'Type': 'GenericResourceType'}
|
||||
}
|
||||
})
|
||||
|
||||
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
|
||||
tmpl_stack.id = 2
|
||||
tmpl_stack.t.id = 2
|
||||
tmpl_stack.prev_raw_template_id = 1
|
||||
tmpl_stack.current_traversal = 'some-traversal'
|
||||
tmpl_stack.mark_complete('some-traversal')
|
||||
self.assertEqual(tmpl_stack.prev_raw_template_id,
|
||||
tmpl_stack.t.id)
|
||||
mock_delete.assert_called_once_with(self.ctx, 1)
|
||||
self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE)
|
||||
|
||||
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
|
||||
@mock.patch.object(stack.Stack, 'store')
|
||||
def test_mark_complete_stale_traversal(self, mock_store, mock_delete):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
'foo': {'Type': 'GenericResourceType'}
|
||||
}
|
||||
})
|
||||
|
||||
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
|
||||
tmpl_stack.current_traversal = 'new-traversal'
|
||||
tmpl_stack.mark_complete('old-traversal')
|
||||
self.assertFalse(mock_delete.called)
|
||||
self.assertIsNone(tmpl_stack.prev_raw_template_id)
|
||||
self.assertFalse(mock_store.called)
|
||||
|
||||
|
||||
class StackKwargsForCloningTest(common.HeatTestCase):
|
||||
scenarios = [
|
||||
|
64
heat/tests/test_sync_point.py
Normal file
64
heat/tests/test_sync_point.py
Normal file
@ -0,0 +1,64 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from heat.engine import sync_point
|
||||
from heat.tests import common
|
||||
from heat.tests.engine import tools
|
||||
from heat.tests import utils
|
||||
|
||||
|
||||
class SyncPointTestCase(common.HeatTestCase):
|
||||
def test_sync_waiting(self):
|
||||
ctx = utils.dummy_context()
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
stack.converge_stack(stack.t, action=stack.CREATE)
|
||||
resource = stack['C']
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
|
||||
sender = (4, True)
|
||||
mock_callback = mock.Mock()
|
||||
sync_point.sync(ctx, resource.id, stack.current_traversal, True,
|
||||
mock_callback, set(graph[(resource.id, True)]),
|
||||
{sender: None})
|
||||
updated_sync_point = sync_point.get(ctx, resource.id,
|
||||
stack.current_traversal, True)
|
||||
input_data = sync_point.deserialize_input_data(
|
||||
updated_sync_point.input_data)
|
||||
self.assertEqual({sender: None}, input_data)
|
||||
self.assertFalse(mock_callback.called)
|
||||
|
||||
def test_sync_non_waiting(self):
|
||||
ctx = utils.dummy_context()
|
||||
stack = tools.get_stack('test_stack', utils.dummy_context(),
|
||||
template=tools.string_template_five,
|
||||
convergence=True)
|
||||
stack.converge_stack(stack.t, action=stack.CREATE)
|
||||
resource = stack['A']
|
||||
graph = stack.convergence_dependencies.graph()
|
||||
|
||||
sender = (3, True)
|
||||
mock_callback = mock.Mock()
|
||||
sync_point.sync(ctx, resource.id, stack.current_traversal, True,
|
||||
mock_callback, set(graph[(resource.id, True)]),
|
||||
{sender: None})
|
||||
updated_sync_point = sync_point.get(ctx, resource.id,
|
||||
stack.current_traversal, True)
|
||||
input_data = sync_point.deserialize_input_data(
|
||||
updated_sync_point.input_data)
|
||||
self.assertEqual({sender: None}, input_data)
|
||||
self.assertTrue(mock_callback.called)
|
Loading…
Reference in New Issue
Block a user