Convergence: concurrent workflow

implements blueprint convergence-concurrent-workflow
Depends-On: I7586bc16ce492144f617ee3adc31a2bc19a62173
Change-Id: If968e46ed54f92c1bbfe3ce3bef241802a993ce6
This commit is contained in:
Angus Salkeld 2015-06-26 13:00:53 +10:00
parent 9711730a4b
commit 56012a3584
6 changed files with 176 additions and 89 deletions

View File

@ -166,7 +166,10 @@ def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None): expected_engine_id=None):
session = _session(context) session = _session(context)
with session.begin(): with session.begin():
values['atomic_key'] = atomic_key + 1 if atomic_key is None:
values['atomic_key'] = 1
else:
values['atomic_key'] = atomic_key + 1
rows_updated = session.query(models.Resource).filter_by( rows_updated = session.query(models.Resource).filter_by(
id=resource_id, engine_id=expected_engine_id, id=resource_id, engine_id=expected_engine_id,
atomic_key=atomic_key).update(values) atomic_key=atomic_key).update(values)

View File

@ -630,24 +630,21 @@ class Resource(object):
''' '''
return self return self
def create_convergence(self, template_id, resource_data): def create_convergence(self, template_id, resource_data, engine_id):
''' '''
Creates the resource by invoking the scheduler TaskRunner Creates the resource by invoking the scheduler TaskRunner
and it persists the resource's current_template_id to template_id and and it persists the resource's current_template_id to template_id and
resource's requires to list of the required resource id from the resource's requires to list of the required resource id from the
given resource_data. given resource_data.
''' '''
with self.lock(engine_id):
runner = scheduler.TaskRunner(self.create)
runner()
runner = scheduler.TaskRunner(self.create) # update the resource db record (stored in unlock())
runner() self.current_template_id = template_id
self.requires = list(
# update the resource db record {graph_key[0] for graph_key, data in resource_data.items()})
self.current_template_id = template_id
self.requires = (list({graph_key[0]
for graph_key, data in resource_data.items()}))
self._store_or_update(self.action,
self.status,
self.status_reason)
@scheduler.wrappertask @scheduler.wrappertask
def create(self): def create(self):
@ -790,31 +787,22 @@ class Resource(object):
except ValueError: except ValueError:
return True return True
def update_convergence(self, template_id, resource_data): def update_convergence(self, template_id, resource_data, engine_id):
''' '''
Updates the resource by invoking the scheduler TaskRunner Updates the resource by invoking the scheduler TaskRunner
and it persists the resource's current_template_id to template_id and and it persists the resource's current_template_id to template_id and
resource's requires to list of the required resource id from the resource's requires to list of the required resource id from the
given resource_data and existing resource's requires. given resource_data and existing resource's requires.
''' '''
with self.lock(engine_id):
runner = scheduler.TaskRunner(self.update, self.t)
runner()
if self.status == self.IN_PROGRESS: # update the resource db record (stored in unlock)
ex = UpdateInProgress(self.name) self.current_template_id = template_id
LOG.exception(ex) current_requires = set(
raise ex graph_key[0] for graph_key, data in resource_data.items())
self.requires = list(set(self.requires) | current_requires)
# update the resource
runner = scheduler.TaskRunner(self.update, self.t)
runner()
# update the resource db record
self.current_template_id = template_id
current_requires = {graph_key[0]
for graph_key, data in resource_data.items()}
self.requires = (list(set(self.requires) | current_requires))
self._store_or_update(self.action,
self.status,
self.status_reason)
@scheduler.wrappertask @scheduler.wrappertask
def update(self, after, before=None, prev_resource=None): def update(self, after, before=None, prev_resource=None):
@ -1006,30 +994,22 @@ class Resource(object):
msg = _('"%s" deletion policy not supported') % policy msg = _('"%s" deletion policy not supported') % policy
raise exception.StackValidationFailed(message=msg) raise exception.StackValidationFailed(message=msg)
def delete_convergence(self, template_id, resource_data): def delete_convergence(self, template_id, resource_data, engine_id):
''' '''
Deletes the resource by invoking the scheduler TaskRunner Deletes the resource by invoking the scheduler TaskRunner
and it persists the resource's current_template_id to template_id and and it persists the resource's current_template_id to template_id and
resource's requires to list of the required resource id from the resource's requires to list of the required resource id from the
given resource_data and existing resource's requires. given resource_data and existing resource's requires.
''' '''
if self.status == self.IN_PROGRESS: with self.lock(engine_id):
ex = UpdateInProgress(self.name) runner = scheduler.TaskRunner(self.delete)
LOG.exception(ex) runner()
raise ex
# delete the resource # update the resource db record
runner = scheduler.TaskRunner(self.delete) self.current_template_id = template_id
runner() current_requires = {graph_key[0]
for graph_key, data in resource_data.items()}
# update the resource db record self.requires = (list(set(self.requires) - current_requires))
self.current_template_id = template_id
current_requires = {graph_key[0]
for graph_key, data in resource_data.items()}
self.requires = (list(set(self.requires) - current_requires))
self._store_or_update(self.action,
self.status,
self.status_reason)
@scheduler.wrappertask @scheduler.wrappertask
def delete(self): def delete(self):
@ -1172,6 +1152,46 @@ class Resource(object):
LOG.warning(_LW('Resource "%s" not pre-stored in DB'), self) LOG.warning(_LW('Resource "%s" not pre-stored in DB'), self)
self._store(metadata) self._store(metadata)
@contextlib.contextmanager
def lock(self, engine_id):
updated_ok = False
try:
rs = resource_objects.Resource.get_obj(self.context, self.id)
updated_ok = rs.select_and_update(
{'engine_id': engine_id},
atomic_key=rs.atomic_key,
expected_engine_id=None)
except Exception as ex:
LOG.error(_LE('DB error %s'), ex)
raise
if not updated_ok:
ex = UpdateInProgress(self.name)
LOG.exception('atomic:%s engine_id:%s/%s' % (
rs.atomic_key, rs.engine_id, engine_id))
raise ex
try:
yield
except: # noqa
with excutils.save_and_reraise_exception():
self.unlock(rs, engine_id, rs.atomic_key)
else:
self.unlock(rs, engine_id, rs.atomic_key)
def unlock(self, rsrc, engine_id, atomic_key):
if atomic_key is None:
atomic_key = 0
res = rsrc.select_and_update(
{'engine_id': None,
'current_template_id': self.current_template_id,
'updated_at': self.updated_time,
'requires': self.requires},
expected_engine_id=engine_id,
atomic_key=atomic_key + 1)
if res != 1:
LOG.warn(_LW('Failed to unlock resource %s'), rsrc.name)
def _resolve_attribute(self, name): def _resolve_attribute(self, name):
""" """
Default implementation; should be overridden by resources that expose Default implementation; should be overridden by resources that expose

View File

@ -28,6 +28,8 @@ from heat.engine import dependencies
from heat.engine import resource from heat.engine import resource
from heat.engine import stack as parser from heat.engine import stack as parser
from heat.engine import sync_point from heat.engine import sync_point
from heat.objects import resource as resource_objects
from heat.rpc import listener_client
from heat.rpc import worker_client as rpc_client from heat.rpc import worker_client as rpc_client
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -84,6 +86,18 @@ class WorkerService(service.Service):
super(WorkerService, self).stop() super(WorkerService, self).stop()
def _try_steal_engine_lock(self, cnxt, resource_id):
rs_obj = resource_objects.Resource.get_obj(cnxt,
resource_id)
if (rs_obj.engine_id != self.engine_id and
rs_obj.engine_id is not None):
if not listener_client.EngineListnerClient(
rs_obj.engine_id).is_alive(cnxt):
# steal the lock.
rs_obj.update_and_save({'engine_id': None})
return True
return False
def _trigger_rollback(self, cnxt, stack): def _trigger_rollback(self, cnxt, stack):
# TODO(ananta) convergence-rollback implementation # TODO(ananta) convergence-rollback implementation
pass pass
@ -136,7 +150,7 @@ class WorkerService(service.Service):
return return
try: try:
check_resource_update(rsrc, tmpl.id, data) check_resource_update(rsrc, tmpl.id, data, self.engine_id)
except resource.UpdateReplace: except resource.UpdateReplace:
new_res_id = rsrc.make_replacement() new_res_id = rsrc.make_replacement()
self._rpc_client.check_resource(cnxt, self._rpc_client.check_resource(cnxt,
@ -145,6 +159,11 @@ class WorkerService(service.Service):
data, is_update) data, is_update)
return return
except resource.UpdateInProgress: except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, resource_id):
self._rpc_client.check_resource(cnxt,
resource_id,
current_traversal,
data, is_update)
return return
except exception.ResourceFailure as e: except exception.ResourceFailure as e:
reason = six.text_type(e) reason = six.text_type(e)
@ -155,8 +174,13 @@ class WorkerService(service.Service):
input_data = construct_input_data(rsrc) input_data = construct_input_data(rsrc)
else: else:
try: try:
check_resource_cleanup(rsrc, tmpl.id, data) check_resource_cleanup(rsrc, tmpl.id, data, self.engine_id)
except resource.UpdateInProgress: except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, resource_id):
self._rpc_client.check_resource(cnxt,
resource_id,
current_traversal,
data, is_update)
return return
except exception.ResourceFailure as e: except exception.ResourceFailure as e:
reason = six.text_type(e) reason = six.text_type(e)
@ -184,9 +208,33 @@ class WorkerService(service.Service):
check_stack_complete(cnxt, rsrc.stack, current_traversal, check_stack_complete(cnxt, rsrc.stack, current_traversal,
rsrc.id, deps, is_update) rsrc.id, deps, is_update)
except sync_point.SyncPointNotFound: except sync_point.SyncPointNotFound:
# NOTE(sirushtim): Implemented by spec # Reload the stack to determine the current traversal, and check
# convergence-concurrent-workflow # the SyncPoint for the current node to determine if it is ready.
pass # If it is, then retrigger the current node with the appropriate
# data for the latest traversal.
stack = parser.Stack.load(cnxt, stack_id=rsrc.stack.id)
if current_traversal == rsrc.stack.current_traversal:
LOG.debug('[%s] Traversal sync point missing.',
current_traversal)
return
current_traversal = stack.current_traversal
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
for i, j in stack.current_deps['edges'])
deps = dependencies.Dependencies(edges=current_deps)
key = sync_point.make_key(resource_id, current_traversal,
is_update)
predecessors = deps.graph()[key]
def do_check(target_key, data):
self.check_resource(resource_id, current_traversal,
data)
try:
sync_point.sync(cnxt, resource_id, current_traversal,
is_update, do_check, predecessors, {key: None})
except sync_point.sync_points.NotFound:
pass
def construct_input_data(rsrc): def construct_input_data(rsrc):
@ -238,20 +286,20 @@ def propagate_check_resource(cnxt, rpc_client, next_res_id,
{sender_key: sender_data}) {sender_key: sender_data})
def check_resource_update(rsrc, template_id, data): def check_resource_update(rsrc, template_id, data, engine_id):
''' '''
Create or update the Resource if appropriate. Create or update the Resource if appropriate.
''' '''
if rsrc.resource_id is None: if rsrc.resource_id is None:
rsrc.create_convergence(template_id, data) rsrc.create_convergence(template_id, data, engine_id)
else: else:
rsrc.update_convergence(template_id, data) rsrc.update_convergence(template_id, data, engine_id)
def check_resource_cleanup(rsrc, template_id, data): def check_resource_cleanup(rsrc, template_id, data, engine_id):
''' '''
Delete the Resource if appropriate. Delete the Resource if appropriate.
''' '''
if rsrc.current_template_id != template_id: if rsrc.current_template_id != template_id:
rsrc.delete_convergence(template_id, data) rsrc.delete_convergence(template_id, data, engine_id)

View File

@ -162,6 +162,12 @@ class Resource(
resource_db.update_and_save(values) resource_db.update_and_save(values)
return self._refresh() return self._refresh()
def select_and_update(self, values, expected_engine_id=None,
atomic_key=0):
return db_api.resource_update(self._context, self.id, values,
atomic_key=atomic_key,
expected_engine_id=expected_engine_id)
def _refresh(self): def _refresh(self):
return self.__class__._from_db_object( return self.__class__._from_db_object(
self, self,

View File

@ -142,7 +142,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.is_update) self.is_update)
mock_cru.assert_called_once_with(self.resource, mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id, self.resource.stack.t.id,
{}) {}, self.worker.engine_id)
self.assertFalse(mock_crc.called) self.assertFalse(mock_crc.called)
expected_calls = [] expected_calls = []
@ -167,7 +167,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.is_update) self.is_update)
mock_cru.assert_called_once_with(self.resource, mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id, self.resource.stack.t.id,
{}) {}, self.worker.engine_id)
self.assertTrue(mock_mr.called) self.assertTrue(mock_mr.called)
self.assertFalse(mock_crc.called) self.assertFalse(mock_crc.called)
self.assertFalse(mock_pcr.called) self.assertFalse(mock_pcr.called)
@ -182,7 +182,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.is_update) self.is_update)
mock_cru.assert_called_once_with(self.resource, mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id, self.resource.stack.t.id,
{}) {}, self.worker.engine_id)
self.assertFalse(mock_mr.called) self.assertFalse(mock_mr.called)
self.assertFalse(mock_crc.called) self.assertFalse(mock_crc.called)
self.assertFalse(mock_pcr.called) self.assertFalse(mock_pcr.called)
@ -353,7 +353,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
self.assertFalse(mock_cru.called) self.assertFalse(mock_cru.called)
mock_crc.assert_called_once_with( mock_crc.assert_called_once_with(
self.resource, self.resource.stack.t.id, self.resource, self.resource.stack.t.id,
{}) {}, self.worker.engine_id)
def test_is_cleanup_traversal_raise_update_inprogress( def test_is_cleanup_traversal_raise_update_inprogress(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
@ -363,7 +363,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
self.is_update) self.is_update)
mock_crc.assert_called_once_with(self.resource, mock_crc.assert_called_once_with(self.resource,
self.resource.stack.t.id, self.resource.stack.t.id,
{}) {}, self.worker.engine_id)
self.assertFalse(mock_cru.called) self.assertFalse(mock_cru.called)
self.assertFalse(mock_pcr.called) self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called) self.assertFalse(mock_csc.called)
@ -416,25 +416,25 @@ class MiscMethodsTest(common.HeatTestCase):
@mock.patch.object(resource.Resource, 'create_convergence') @mock.patch.object(resource.Resource, 'create_convergence')
def test_check_resource_update_create(self, mock_create): def test_check_resource_update_create(self, mock_create):
worker.check_resource_update(self.resource, self.resource.stack.t.id, worker.check_resource_update(self.resource, self.resource.stack.t.id,
{}) {}, 'engine-id')
self.assertTrue(mock_create.called) self.assertTrue(mock_create.called)
@mock.patch.object(resource.Resource, 'update_convergence') @mock.patch.object(resource.Resource, 'update_convergence')
def test_check_resource_update_update(self, mock_update): def test_check_resource_update_update(self, mock_update):
self.resource.resource_id = 'physical-res-id' self.resource.resource_id = 'physical-res-id'
worker.check_resource_update(self.resource, self.resource.stack.t.id, worker.check_resource_update(self.resource, self.resource.stack.t.id,
{}) {}, 'engine-id')
self.assertTrue(mock_update.called) self.assertTrue(mock_update.called)
@mock.patch.object(resource.Resource, 'delete_convergence') @mock.patch.object(resource.Resource, 'delete_convergence')
def test_check_resource_cleanup_delete(self, mock_delete): def test_check_resource_cleanup_delete(self, mock_delete):
self.resource.current_template_id = 'new-template-id' self.resource.current_template_id = 'new-template-id'
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id, worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
{}) {}, 'engine-id')
self.assertTrue(mock_delete.called) self.assertTrue(mock_delete.called)
@mock.patch.object(resource.Resource, 'delete_convergence') @mock.patch.object(resource.Resource, 'delete_convergence')
def test_check_resource_cleanup_nodelete(self, mock_delete): def test_check_resource_cleanup_nodelete(self, mock_delete):
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id, worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
{}) {}, 'engine-id')
self.assertFalse(mock_delete.called) self.assertFalse(mock_delete.called)

View File

@ -1414,74 +1414,84 @@ class ResourceTest(common.HeatTestCase):
res_obj = res_objs['test_res_enc'] res_obj = res_objs['test_res_enc']
self.assertEqual('string', res_obj.properties_data['prop1']) self.assertEqual('string', res_obj.properties_data['prop1'])
@mock.patch.object(resource.Resource, '_store_or_update') def _assert_resource_lock(self, res_id, engine_id, atomic_key):
rs = resource_objects.Resource.get_obj(self.stack.context, res_id)
self.assertEqual(engine_id, rs.engine_id)
self.assertEqual(atomic_key, rs.atomic_key)
@mock.patch.object(resource.Resource, 'create') @mock.patch.object(resource.Resource, 'create')
def test_create_convergence(self, def test_create_convergence(self, mock_create):
mock_create,
mock_store_update_method):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res._store()
self._assert_resource_lock(res.id, None, None)
res.create_convergence('template_key', {(1, True): {}, res.create_convergence('template_key', {(1, True): {},
(1, True): {}}) (1, True): {}},
'engine-007')
mock_create.assert_called_once_with() mock_create.assert_called_once_with()
self.assertEqual('template_key', res.current_template_id) self.assertEqual('template_key', res.current_template_id)
self.assertEqual([1], res.requires) self.assertEqual([1], res.requires)
self.assertTrue(mock_store_update_method.called) self._assert_resource_lock(res.id, None, 2)
@mock.patch.object(resource.Resource, '_store_or_update')
@mock.patch.object(resource.Resource, 'update') @mock.patch.object(resource.Resource, 'update')
def test_update_convergence(self, def test_update_convergence(self, mock_update):
mock_update,
mock_store_update_method
):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.requires = [2] res.requires = [2]
res._store()
self._assert_resource_lock(res.id, None, None)
res.update_convergence('template_key', {(1, True): {}, res.update_convergence('template_key', {(1, True): {},
(1, True): {}}) (1, True): {}}, 'engine-007')
mock_update.assert_called_once_with(res.t) mock_update.assert_called_once_with(res.t)
self.assertEqual('template_key', res.current_template_id) self.assertEqual('template_key', res.current_template_id)
self.assertEqual([1, 2], res.requires) self.assertEqual([1, 2], res.requires)
self.assertTrue(mock_store_update_method.called) self._assert_resource_lock(res.id, None, 2)
def test_update_in_progress_convergence(self): def test_update_in_progress_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.status = resource.Resource.IN_PROGRESS res._store()
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
ex = self.assertRaises(resource.UpdateInProgress, ex = self.assertRaises(resource.UpdateInProgress,
res.update_convergence, res.update_convergence,
'template_key', 'template_key',
{}) {}, 'engine-007')
msg = ("The resource %s is already being updated." % msg = ("The resource %s is already being updated." %
res.name) res.name)
self.assertEqual(msg, six.text_type(ex)) self.assertEqual(msg, six.text_type(ex))
@mock.patch.object(resource.Resource, '_store_or_update')
@mock.patch.object(resource.Resource, 'delete') @mock.patch.object(resource.Resource, 'delete')
def test_delete_convergence(self, def test_delete_convergence(self, mock_delete):
mock_delete,
mock_store_update_method):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.requires = [1, 2] res.requires = [1, 2]
res._store()
self._assert_resource_lock(res.id, None, None)
res.delete_convergence('template_key', {(1, True): {}, res.delete_convergence('template_key', {(1, True): {},
(1, True): {}}) (1, True): {}},
'engine-007')
mock_delete.assert_called_once_with() mock_delete.assert_called_once_with()
self.assertEqual('template_key', res.current_template_id) self.assertEqual('template_key', res.current_template_id)
self.assertEqual([2], res.requires) self.assertEqual([2], res.requires)
self.assertTrue(mock_store_update_method.called) self._assert_resource_lock(res.id, None, 2)
def test_delete_in_progress_convergence(self): def test_delete_in_progress_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.status = resource.Resource.IN_PROGRESS res._store()
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
ex = self.assertRaises(resource.UpdateInProgress, ex = self.assertRaises(resource.UpdateInProgress,
res.delete_convergence, res.delete_convergence,
'template_key', 'template_key',
{}) {}, 'engine-007')
msg = ("The resource %s is already being updated." % msg = ("The resource %s is already being updated." %
res.name) res.name)
self.assertEqual(msg, six.text_type(ex)) self.assertEqual(msg, six.text_type(ex))