Convergence: Refactor worker

Refactored worker to remove duplicate code. The check_resource is broken
down into many smaller methods for readability and unit tests.

Change-Id: Id32b9aa11ecb637bf737dc1d86261a9b78739535
This commit is contained in:
Anant Patil 2015-07-13 11:00:41 +05:30
parent c9adea34f0
commit c723362a2f
2 changed files with 105 additions and 80 deletions

View File

@ -116,85 +116,87 @@ class WorkerService(service.Service):
else:
stack.purge_db()
@context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update):
'''
Process a node in the dependency graph.
The node may be associated with either an update or a cleanup of its
associated resource.
'''
def _load_resource(self, cnxt, resource_id, data, is_update):
adopt_data = data.get('adopt_stack_data')
data = dict(sync_point.deserialize_input_data(data))
cache_data = {in_data.get(
'name'): in_data for in_data in data.values()
if in_data is not None}
cache_data['adopt_stack_data'] = adopt_data
rsrc, stack = None, None
try:
cache_data = {in_data.get(
'name'): in_data for in_data in data.values()
if in_data is not None}
cache_data['adopt_stack_data'] = adopt_data
rsrc, stack = resource.Resource.load(cnxt, resource_id, is_update,
cache_data)
except (exception.ResourceNotFound, exception.NotFound):
return
tmpl = stack.t
pass # can be ignored
if current_traversal != rsrc.stack.current_traversal:
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
return
return rsrc, stack
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
for i, j in rsrc.stack.current_deps['edges'])
deps = dependencies.Dependencies(edges=current_deps)
graph = deps.graph()
def _do_check_resource(self, cnxt, current_traversal, tmpl, data,
is_update, rsrc, stack_id):
try:
if is_update:
try:
check_resource_update(rsrc, tmpl.id, data, self.engine_id)
except resource.UpdateReplace:
new_res_id = rsrc.make_replacement(tmpl.id)
LOG.info("Replacing resource with new id %s", new_res_id)
data = sync_point.serialize_input_data(data)
self._rpc_client.check_resource(cnxt,
new_res_id,
current_traversal,
data, is_update)
return False
if is_update:
if (rsrc.replaced_by is not None and
rsrc.current_template_id != tmpl.id):
return
try:
check_resource_update(rsrc, tmpl.id, data, self.engine_id)
except resource.UpdateReplace:
new_res_id = rsrc.make_replacement(tmpl.id)
LOG.info("Replacing resource with new id %s", new_res_id)
data = sync_point.serialize_input_data(data)
else:
check_resource_cleanup(rsrc, tmpl.id, data, self.engine_id)
return True
except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, rsrc.id):
self._rpc_client.check_resource(cnxt,
new_res_id,
rsrc.id,
current_traversal,
data, is_update)
return
except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, resource_id):
self._rpc_client.check_resource(cnxt,
resource_id,
current_traversal,
data, is_update)
return
except exception.ResourceFailure as ex:
reason = 'Resource %s failed: %s' % (stack.action,
six.text_type(ex))
self._handle_resource_failure(
cnxt, stack.id, current_traversal, reason)
return
except exception.ResourceFailure as ex:
reason = 'Resource %s failed: %s' % (rsrc.action,
six.text_type(ex))
self._handle_resource_failure(
cnxt, stack_id, current_traversal, reason)
return False
def _compute_dependencies(self, stack):
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
for i, j in stack.current_deps['edges'])
return dependencies.Dependencies(edges=current_deps)
def _retrigger_check_resource(self, cnxt, is_update, resource_id, stack):
current_traversal = stack.current_traversal
graph = self._compute_dependencies(stack).graph()
key = sync_point.make_key(resource_id, current_traversal, is_update)
predecessors = graph[key]
def do_check(target_key, data):
self.check_resource(resource_id, current_traversal,
data)
try:
sync_point.sync(cnxt, resource_id, current_traversal, is_update,
do_check, predecessors, {key: None})
except sync_point.sync_points.NotFound:
pass
def _initiate_propagate_resource(self, cnxt, resource_id,
current_traversal, is_update, rsrc,
stack):
input_data = None
if is_update:
input_data = construct_input_data(rsrc)
else:
try:
check_resource_cleanup(rsrc, tmpl.id, data, self.engine_id)
except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, resource_id):
self._rpc_client.check_resource(cnxt,
resource_id,
current_traversal,
data, is_update)
return
except exception.ResourceFailure as ex:
reason = 'Resource %s failed: %s' % (stack.action,
six.text_type(ex))
self._handle_resource_failure(
cnxt, stack.id, current_traversal, reason)
return
deps = self._compute_dependencies(stack)
graph = deps.graph()
graph_key = (resource_id, is_update)
if graph_key not in graph and rsrc.replaces is not None:
# If we are a replacement, impersonate the replaced resource for
# the purposes of calculating whether subsequent resources are
@ -211,7 +213,7 @@ class WorkerService(service.Service):
set(graph[(req, fwd)]), graph_key,
input_data if fwd else None, fwd)
check_stack_complete(cnxt, rsrc.stack, current_traversal,
check_stack_complete(cnxt, stack, current_traversal,
resource_id, deps, is_update)
except sync_point.SyncPointNotFound:
# Reload the stack to determine the current traversal, and check
@ -224,23 +226,42 @@ class WorkerService(service.Service):
current_traversal)
return
current_traversal = stack.current_traversal
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
for i, j in stack.current_deps['edges'])
deps = dependencies.Dependencies(edges=current_deps)
key = sync_point.make_key(resource_id, current_traversal,
is_update)
predecessors = deps.graph()[key]
self._retrigger_check_resource(cnxt, is_update, resource_id, stack)
def do_check(target_key, data):
self.check_resource(resource_id, current_traversal,
data)
@context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update):
'''
Process a node in the dependency graph.
try:
sync_point.sync(cnxt, resource_id, current_traversal,
is_update, do_check, predecessors, {key: None})
except sync_point.sync_points.NotFound:
pass
The node may be associated with either an update or a cleanup of its
associated resource.
'''
rsrc, stack = self._load_resource(cnxt, resource_id, data, is_update)
if rsrc is None:
return
if current_traversal != stack.current_traversal:
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
return
tmpl = stack.t
if is_update:
if (rsrc.replaced_by is not None and
rsrc.current_template_id != tmpl.id):
return
check_resource_done = self._do_check_resource(cnxt, current_traversal,
tmpl, data, is_update,
rsrc, stack.id)
if check_resource_done:
# initiate check on next set of resources from graph
self._initiate_propagate_resource(cnxt, resource_id,
current_traversal, is_update,
rsrc, stack)
def construct_input_data(rsrc):

View File

@ -228,6 +228,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
def test_resource_update_failure_sets_stack_state_as_failed(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.stack.state_set(self.stack.UPDATE, self.stack.IN_PROGRESS, '')
self.resource.state_set(self.resource.UPDATE,
self.resource.IN_PROGRESS)
self.worker._trigger_rollback = mock.Mock()
dummy_ex = exception.ResourceNotAvailable(
resource_name=self.resource.name)
@ -246,6 +248,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.is_update = False # invokes check_resource_cleanup
self.stack.state_set(self.stack.UPDATE, self.stack.IN_PROGRESS, '')
self.resource.state_set(self.resource.UPDATE,
self.resource.IN_PROGRESS)
self.worker._trigger_rollback = mock.Mock()
dummy_ex = exception.ResourceNotAvailable(
resource_name=self.resource.name)