Use atomic_key for deployment metadata updates
This change switches _push_metadata_software_deployments over to using db_api.resource_update for deployment resource metadata updates. Using the atomic_key filter of resource_update allows this code to detect if another transaction has updated the metadata, and the wrap_db_retry decorator is used to perform retry attempts. No actual resource locking is required since we just need to detect if another transaction performed an update, so the engine_id is not specified. This change uses db_api directly rather than the Resource object since this change will need to be backported to Kilo (db_api.resource_update was created for convergence, and is in Kilo, but Resource object using it is not). This change can switch back to using the Resource object once the backport is complete. This rows_updated/RetryRequest/wrap_db_retry technique is used by other OpenStack projects for concurrent transaction detection (such as nova, for floating IP allocation). Change-Id: Ibf9dd58a66a77d9ae9d4b519b0f11567977f416c Closes-Bug: #1477329
This commit is contained in:
parent
663acd3362
commit
acb2c56fe0
@ -464,6 +464,10 @@ class ReadOnlyFieldError(HeatException):
|
||||
msg_fmt = _('Cannot modify readonly field %(field)s')
|
||||
|
||||
|
||||
class DeploymentConcurrentTransaction(HeatException):
|
||||
msg_fmt = _('Concurrent transaction for deployments of server %(server)s')
|
||||
|
||||
|
||||
class ObjectFieldInvalid(HeatException):
|
||||
msg_fmt = _('Field %(field)s of %(objname)s is not an instance of Field')
|
||||
|
||||
|
@ -11,6 +11,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import service
|
||||
@ -19,10 +21,11 @@ import requests
|
||||
import six
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common.i18n import _
|
||||
from heat.common.i18n import _LI
|
||||
from heat.db import api as db_api
|
||||
from heat.engine import api
|
||||
from heat.objects import resource as resource_object
|
||||
from heat.objects import software_config as software_config_object
|
||||
from heat.objects import software_deployment as software_deployment_object
|
||||
from heat.rpc import api as rpc_api
|
||||
@ -81,15 +84,19 @@ class SoftwareConfigService(service.Service):
|
||||
result = [api.format_software_config(sd.config) for sd in all_sd_s]
|
||||
return result
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=10, retry_on_request=True)
|
||||
def _push_metadata_software_deployments(self, cnxt, server_id, sd):
|
||||
rs = (resource_object.Resource.
|
||||
get_by_physical_resource_id(cnxt, server_id))
|
||||
rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id)
|
||||
if not rs:
|
||||
return
|
||||
deployments = self.metadata_software_deployments(cnxt, server_id)
|
||||
md = rs.rsrc_metadata or {}
|
||||
md['deployments'] = deployments
|
||||
rs.update_and_save({'rsrc_metadata': md})
|
||||
rows_updated = db_api.resource_update(
|
||||
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
|
||||
if not rows_updated:
|
||||
raise db_exc.RetryRequest(
|
||||
exception.DeploymentConcurrentTransaction(server=server_id))
|
||||
|
||||
metadata_put_url = None
|
||||
metadata_queue_id = None
|
||||
|
@ -22,6 +22,7 @@ import six
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import template_format
|
||||
from heat.db import api as db_api
|
||||
from heat.engine.clients.os import swift
|
||||
from heat.engine.clients.os import zaqar
|
||||
from heat.engine import service
|
||||
@ -559,14 +560,18 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
|
||||
@mock.patch.object(service_software_config.SoftwareConfigService,
|
||||
'metadata_software_deployments')
|
||||
@mock.patch.object(service_software_config.resource_object.Resource,
|
||||
'get_by_physical_resource_id')
|
||||
@mock.patch.object(db_api, 'resource_update')
|
||||
@mock.patch.object(db_api, 'resource_get_by_physical_resource_id')
|
||||
@mock.patch.object(service_software_config.requests, 'put')
|
||||
def test_push_metadata_software_deployments(self, put, res_get, md_sd):
|
||||
def test_push_metadata_software_deployments(
|
||||
self, put, res_get, res_upd, md_sd):
|
||||
rs = mock.Mock()
|
||||
rs.rsrc_metadata = {'original': 'metadata'}
|
||||
rs.id = '1234'
|
||||
rs.atomic_key = 1
|
||||
rs.data = []
|
||||
res_get.return_value = rs
|
||||
res_upd.return_value = 1
|
||||
|
||||
deployments = {'deploy': 'this'}
|
||||
md_sd.return_value = deployments
|
||||
@ -578,24 +583,56 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
|
||||
self.engine.software_config._push_metadata_software_deployments(
|
||||
self.ctx, '1234', None)
|
||||
rs.update_and_save.assert_called_once_with(
|
||||
{'rsrc_metadata': result_metadata})
|
||||
res_upd.assert_called_once_with(
|
||||
self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1)
|
||||
put.side_effect = Exception('Unexpected requests.put')
|
||||
|
||||
@mock.patch.object(service_software_config.SoftwareConfigService,
|
||||
'metadata_software_deployments')
|
||||
@mock.patch.object(service_software_config.resource_object.Resource,
|
||||
'get_by_physical_resource_id')
|
||||
@mock.patch.object(db_api, 'resource_update')
|
||||
@mock.patch.object(db_api, 'resource_get_by_physical_resource_id')
|
||||
@mock.patch.object(service_software_config.requests, 'put')
|
||||
def test_push_metadata_software_deployments_temp_url(
|
||||
self, put, res_get, md_sd):
|
||||
def test_push_metadata_software_deployments_retry(
|
||||
self, put, res_get, res_upd, md_sd):
|
||||
rs = mock.Mock()
|
||||
rs.rsrc_metadata = {'original': 'metadata'}
|
||||
rs.id = '1234'
|
||||
rs.atomic_key = 1
|
||||
rs.data = []
|
||||
res_get.return_value = rs
|
||||
# zero update means another transaction updated
|
||||
res_upd.return_value = 0
|
||||
|
||||
deployments = {'deploy': 'this'}
|
||||
md_sd.return_value = deployments
|
||||
|
||||
self.assertRaises(
|
||||
exception.DeploymentConcurrentTransaction,
|
||||
self.engine.software_config._push_metadata_software_deployments,
|
||||
self.ctx,
|
||||
'1234',
|
||||
None)
|
||||
# retry ten times then the final failure
|
||||
self.assertEqual(11, res_upd.call_count)
|
||||
put.assert_not_called()
|
||||
|
||||
@mock.patch.object(service_software_config.SoftwareConfigService,
|
||||
'metadata_software_deployments')
|
||||
@mock.patch.object(db_api, 'resource_update')
|
||||
@mock.patch.object(db_api, 'resource_get_by_physical_resource_id')
|
||||
@mock.patch.object(service_software_config.requests, 'put')
|
||||
def test_push_metadata_software_deployments_temp_url(
|
||||
self, put, res_get, res_upd, md_sd):
|
||||
rs = mock.Mock()
|
||||
rs.rsrc_metadata = {'original': 'metadata'}
|
||||
rs.id = '1234'
|
||||
rs.atomic_key = 1
|
||||
rd = mock.Mock()
|
||||
rd.key = 'metadata_put_url'
|
||||
rd.value = 'http://192.168.2.2/foo/bar'
|
||||
rs.data = [rd]
|
||||
res_get.return_value = rs
|
||||
res_upd.return_value = 1
|
||||
|
||||
deployments = {'deploy': 'this'}
|
||||
md_sd.return_value = deployments
|
||||
@ -607,26 +644,29 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
|
||||
self.engine.software_config._push_metadata_software_deployments(
|
||||
self.ctx, '1234', None)
|
||||
rs.update_and_save.assert_called_once_with(
|
||||
{'rsrc_metadata': result_metadata})
|
||||
res_upd.assert_called_once_with(
|
||||
self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1)
|
||||
|
||||
put.assert_called_once_with(
|
||||
'http://192.168.2.2/foo/bar', json.dumps(result_metadata))
|
||||
|
||||
@mock.patch.object(service_software_config.SoftwareConfigService,
|
||||
'metadata_software_deployments')
|
||||
@mock.patch.object(service_software_config.resource_object.Resource,
|
||||
'get_by_physical_resource_id')
|
||||
@mock.patch.object(db_api, 'resource_update')
|
||||
@mock.patch.object(db_api, 'resource_get_by_physical_resource_id')
|
||||
@mock.patch.object(zaqar.ZaqarClientPlugin, 'create_for_tenant')
|
||||
def test_push_metadata_software_deployments_queue(
|
||||
self, plugin, res_get, md_sd):
|
||||
self, plugin, res_get, res_upd, md_sd):
|
||||
rs = mock.Mock()
|
||||
rs.rsrc_metadata = {'original': 'metadata'}
|
||||
rs.id = '1234'
|
||||
rs.atomic_key = 1
|
||||
rd = mock.Mock()
|
||||
rd.key = 'metadata_queue_id'
|
||||
rd.value = '6789'
|
||||
rs.data = [rd]
|
||||
res_get.return_value = rs
|
||||
res_upd.return_value = 1
|
||||
sd = mock.Mock()
|
||||
sd.stack_user_project_id = 'project1'
|
||||
queue = mock.Mock()
|
||||
@ -644,8 +684,8 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
|
||||
self.engine.software_config._push_metadata_software_deployments(
|
||||
self.ctx, '1234', sd)
|
||||
rs.update_and_save.assert_called_once_with(
|
||||
{'rsrc_metadata': result_metadata})
|
||||
res_upd.assert_called_once_with(
|
||||
self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1)
|
||||
|
||||
plugin.assert_called_once_with('project1')
|
||||
zaqar_client.queue.assert_called_once_with('6789')
|
||||
|
Loading…
x
Reference in New Issue
Block a user