Merge "Handling concurrent transactions in metadata_set method"

This commit is contained in:
Jenkins 2016-03-23 21:06:47 +00:00 committed by Gerrit Code Review
commit 8021210417
3 changed files with 42 additions and 11 deletions

View File

@ -374,12 +374,28 @@ class Resource(object):
self._rsrc_metadata = rs.rsrc_metadata
return rs.rsrc_metadata
def metadata_set(self, metadata):
@resource_objects.retry_on_conflict
def metadata_set(self, metadata, merge_metadata=None):
"""Write new metadata to the database.
The caller may optionally provide a merge_metadata() function, which
takes two arguments - the metadata passed to metadata_set() and the
current metadata of the resource - and returns the merged metadata to
write. If merge_metadata is not provided, the metadata passed to
metadata_set() is written verbatim, overwriting any existing metadata.
If a race condition is detected, the write will be retried with the new
result of merge_metadata() (if it is supplied) or the verbatim data (if
it is not).
"""
if self.id is None or self.action == self.INIT:
raise exception.ResourceNotAvailable(resource_name=self.name)
LOG.debug('Setting metadata for %s', six.text_type(self))
resource_objects.Resource.update_by_id(
self.stack.context, self.id, {'rsrc_metadata': metadata})
db_res = resource_objects.Resource.get_obj(self.stack.context, self.id)
if merge_metadata is not None:
db_res = db_res.refresh(attrs=['rsrc_metadata'])
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
db_res.update_metadata(metadata)
self._rsrc_metadata = metadata
@classmethod

View File

@ -18,7 +18,6 @@ from oslo_serialization import jsonutils
from oslo_service import service
from oslo_utils import timeutils
import requests
from retrying import retry
import six
from six.moves.urllib import parse as urlparse
@ -28,6 +27,7 @@ from heat.common.i18n import _
from heat.common.i18n import _LI
from heat.db import api as db_api
from heat.engine import api
from heat.objects import resource as resource_objects
from heat.objects import software_config as software_config_object
from heat.objects import software_deployment as software_deployment_object
from heat.rpc import api as rpc_api
@ -35,10 +35,6 @@ from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
def retry_if_concur_trans_error(ex):
return isinstance(ex, exception.ConcurrentTransaction)
class SoftwareConfigService(service.Service):
def show_software_config(self, cnxt, config_id):
@ -94,8 +90,7 @@ class SoftwareConfigService(service.Service):
result = [api.format_software_config(sd.config) for sd in flt_sd_s]
return result
@retry(stop_max_attempt_number=11,
retry_on_exception=retry_if_concur_trans_error)
@resource_objects.retry_on_conflict
def _push_metadata_software_deployments(
self, cnxt, server_id, stack_user_project_id):
rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id)
@ -107,7 +102,7 @@ class SoftwareConfigService(service.Service):
rows_updated = db_api.resource_update(
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
if not rows_updated:
action = "deployments of server %s" % server_id
action = _('deployments of server %s') % server_id
raise exception.ConcurrentTransaction(action=action)
metadata_put_url = None

View File

@ -19,9 +19,12 @@ from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
import retrying
import six
from heat.common import crypt
from heat.common import exception
from heat.common.i18n import _
from heat.db import api as db_api
from heat.objects import base as heat_base
from heat.objects import fields as heat_fields
@ -31,6 +34,15 @@ from heat.objects import stack
cfg.CONF.import_opt('encrypt_parameters_and_properties', 'heat.common.config')
def retry_on_conflict(func):
def is_conflict(ex):
return isinstance(ex, exception.ConcurrentTransaction)
wrapper = retrying.retry(stop_max_attempt_number=11,
wait_random_min=0.0, wait_random_max=2.0,
retry_on_exception=is_conflict)
return wrapper(func)
class Resource(
heat_base.HeatObject,
base.VersionedObjectDictCompat,
@ -186,3 +198,11 @@ class Resource(
result[prop_name] = encrypted_value
return (True, result)
return (False, data)
def update_metadata(self, metadata):
if self.rsrc_metadata != metadata:
rows_updated = self.select_and_update(
{'rsrc_metadata': metadata}, self.engine_id, self.atomic_key)
if not rows_updated:
action = _('metadata setting for resource %s') % self.name
raise exception.ConcurrentTransaction(action=action)