Resolve race in providing deployment data to Swift
While we are careful to ensure there are no race conditions in updating the software deployment metadata in the Heat database, there was no such protection in place for the actual update of the metadata provided to the server via Swift. In legacy Heat, this race could only occur when software deployments were created from the REST API or from separate stacks deploying software to the same server, because resources in the same stack were handled by a single thread making synchronous calls to create the software deployments. However, with convergence, resources in the same stack not only can be processed concurrently but will tend to be synchronised if they have the same dependencies (since they'll be triggered by consecutive asynchronous RPC messages). This handles the race by retrying if a concurrent update is detected by the time the data has been provided to Swift. Note that this is only guaranteed to result in the latest data being available on the assumption that a later write to Swift cannot overwrite an earlier one that has been accepted. This is probably *not* guaranteed by Swift in the presence of clock skew. Nonetheless, this should drastically reduce the failure rate. Similar caveats would probably apply to Zaqar, although Zaqar nominally provides FIFO queues (in practice, it likely depends on the backend used). However, with Zaqar clients receive all updates, not just the latest one, and os-collect-config stores the deployments contained in every message. Thus, Zaqar is not affected even assuming out-of-order arrival. Change-Id: Ic9a447f27e8c51f91f47f93b0fd1e9710341ec38 Closes-Bug: #1731032
This commit is contained in:
parent
5595f86615
commit
46132999c0
@ -111,51 +111,47 @@ class SoftwareConfigService(object):
|
||||
|
||||
metadata_put_url = None
|
||||
metadata_queue_id = None
|
||||
metadata_headers = None
|
||||
for rd in rs.data:
|
||||
if rd.key == 'metadata_put_url':
|
||||
metadata_put_url = rd.value
|
||||
if rd.key == 'metadata_queue_id':
|
||||
metadata_queue_id = rd.value
|
||||
if metadata_put_url:
|
||||
data = requests.head(metadata_put_url)
|
||||
etag = data.headers.get('etag')
|
||||
if etag:
|
||||
metadata_headers = {'if-match': etag}
|
||||
else:
|
||||
LOG.warning("Couldn't find existing Swift metadata "
|
||||
"for server %s", server_id)
|
||||
|
||||
action = _('deployments of server %s') % server_id
|
||||
atomic_key = rs.atomic_key
|
||||
rows_updated = db_api.resource_update(
|
||||
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
|
||||
cnxt, rs.id, {'rsrc_metadata': md}, atomic_key)
|
||||
if not rows_updated:
|
||||
LOG.debug('Conflict on deployment metadata update for '
|
||||
'server %s; retrying', server_id)
|
||||
action = _('deployments of server %s') % server_id
|
||||
LOG.debug('Retrying server %s deployment metadata update',
|
||||
server_id)
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
LOG.debug('Updated deployment metadata for server %s', server_id)
|
||||
|
||||
LOG.debug('Updated server %s deployment metadata', server_id)
|
||||
|
||||
if metadata_put_url:
|
||||
json_md = jsonutils.dumps(md)
|
||||
resp = requests.put(metadata_put_url, json_md,
|
||||
headers=metadata_headers)
|
||||
if resp.status_code == requests.codes.precondition_failed:
|
||||
LOG.debug('Conflict on Swift deployment update for '
|
||||
'server %s; retrying', server_id)
|
||||
action = _('deployments of server %s') % server_id
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
else:
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.HTTPError as exc:
|
||||
LOG.error('Failed to deliver deployment data to '
|
||||
'server %s: %s', server_id, exc)
|
||||
resp = requests.put(metadata_put_url, json_md)
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.HTTPError as exc:
|
||||
LOG.error('Failed to deliver deployment data to '
|
||||
'server %s: %s', server_id, exc)
|
||||
if metadata_queue_id:
|
||||
project = stack_user_project_id
|
||||
queue = self._get_zaqar_queue(cnxt, rs, project, metadata_queue_id)
|
||||
zaqar_plugin = cnxt.clients.client_plugin('zaqar')
|
||||
queue.post({'body': md, 'ttl': zaqar_plugin.DEFAULT_TTL})
|
||||
|
||||
# Bump the atomic key again to serialise updates to the data sent to
|
||||
# the server via Swift.
|
||||
if metadata_put_url is not None:
|
||||
rows_updated = db_api.resource_update(cnxt, rs.id, {},
|
||||
atomic_key + 1)
|
||||
if not rows_updated:
|
||||
LOG.debug('Concurrent update to server %s deployments data '
|
||||
'detected - retrying.', server_id)
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
|
||||
def _refresh_swift_software_deployment(self, cnxt, sd, deploy_signal_id):
|
||||
container, object_name = urlparse.urlparse(
|
||||
deploy_signal_id).path.split('/')[-2:]
|
||||
|
@ -693,9 +693,8 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
@mock.patch.object(db_api, 'resource_update')
|
||||
@mock.patch.object(db_api, 'resource_get_by_physical_resource_id')
|
||||
@mock.patch.object(service_software_config.requests, 'put')
|
||||
@mock.patch.object(service_software_config.requests, 'head')
|
||||
def test_push_metadata_software_deployments_temp_url(
|
||||
self, head, put, res_get, res_upd, md_sd):
|
||||
self, put, res_get, res_upd, md_sd):
|
||||
rs = mock.Mock()
|
||||
rs.rsrc_metadata = {'original': 'metadata'}
|
||||
rs.id = '1234'
|
||||
@ -707,14 +706,6 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
res_get.return_value = rs
|
||||
res_upd.return_value = 1
|
||||
|
||||
head_response = mock.Mock()
|
||||
head_response.headers = {'etag': '1234'}
|
||||
head_response.status_code = 200
|
||||
head.return_value = head_response
|
||||
|
||||
put_response = mock.Mock()
|
||||
put_response.status_code = 201
|
||||
|
||||
deployments = {'deploy': 'this'}
|
||||
md_sd.return_value = deployments
|
||||
|
||||
@ -725,12 +716,15 @@ class SoftwareConfigServiceTest(common.HeatTestCase):
|
||||
with mock.patch.object(self.ctx.session, 'refresh'):
|
||||
self.engine.software_config._push_metadata_software_deployments(
|
||||
self.ctx, '1234', None)
|
||||
res_upd.assert_called_once_with(
|
||||
self.ctx, '1234', {'rsrc_metadata': result_metadata}, 1)
|
||||
res_upd.has_calls(
|
||||
mock.call(self.ctx, '1234',
|
||||
{'rsrc_metadata': result_metadata}, 1),
|
||||
mock.call(self.ctx, '1234',
|
||||
{'rsrc_metadata': result_metadata}, 2),
|
||||
)
|
||||
|
||||
put.assert_called_once_with(
|
||||
'http://192.168.2.2/foo/bar', json.dumps(result_metadata),
|
||||
headers={'if-match': '1234'})
|
||||
'http://192.168.2.2/foo/bar', json.dumps(result_metadata))
|
||||
|
||||
@mock.patch.object(service_software_config.SoftwareConfigService,
|
||||
'metadata_software_deployments')
|
||||
|
Loading…
Reference in New Issue
Block a user