Fix possible DB race conditions in REST controller

Two (or more) consecutive DB API calls must use the
same transaction.

Change-Id: I68f7fd7b205818d3049c456b717beccf17153727
This commit is contained in:
Xavier Hardy 2016-11-15 14:26:01 +01:00
parent 9b39d7bc62
commit ae06418726
7 changed files with 126 additions and 112 deletions

View File

@ -43,9 +43,10 @@ def _load_deferred_output_field(action_ex):
def _get_action_execution(id):
action_ex = db_api.get_action_execution(id)
with db_api.transaction():
action_ex = db_api.get_action_execution(id)
return _get_action_execution_resource(action_ex)
return _get_action_execution_resource(action_ex)
def _get_action_execution_resource(action_ex):
@ -275,17 +276,20 @@ class ActionExecutionsController(rest.RestController):
raise exc.NotAllowedException("Action execution deletion is not "
"allowed.")
action_ex = db_api.get_action_execution(id)
with db_api.transaction():
action_ex = db_api.get_action_execution(id)
if action_ex.task_execution_id:
raise exc.NotAllowedException("Only ad-hoc action execution can "
"be deleted.")
if action_ex.task_execution_id:
raise exc.NotAllowedException(
"Only ad-hoc action execution can be deleted."
)
if not states.is_completed(action_ex.state):
raise exc.NotAllowedException("Only completed action execution "
"can be deleted.")
if not states.is_completed(action_ex.state):
raise exc.NotAllowedException(
"Only completed action execution can be deleted."
)
return db_api.delete_action_execution(id)
return db_api.delete_action_execution(id)
class TasksActionExecutionController(rest.RestController):

View File

@ -96,11 +96,12 @@ class EventTriggersController(rest.RestController):
UPDATE_NOT_ALLOWED
)
db_api.ensure_event_trigger_exists(id)
LOG.info('Update event trigger: [id=%s, values=%s]', id, values)
db_model = triggers.update_event_trigger(id, values)
with db_api.transaction():
db_api.ensure_event_trigger_exists(id)
db_model = triggers.update_event_trigger(id, values)
return resources.EventTrigger.from_dict(db_model.to_dict())
@ -112,9 +113,10 @@ class EventTriggersController(rest.RestController):
LOG.info("Delete event trigger [id=%s]", id)
event_trigger = db_api.get_event_trigger(id)
with db_api.transaction():
event_trigger = db_api.get_event_trigger(id)
triggers.delete_event_trigger(event_trigger.to_dict())
triggers.delete_event_trigger(event_trigger.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.EventTriggers, types.uuid, int,

View File

@ -95,49 +95,49 @@ class ExecutionsController(rest.RestController):
LOG.info('Update execution [id=%s, execution=%s]' % (id, wf_ex))
db_api.ensure_workflow_execution_exists(id)
with db_api.transaction():
db_api.ensure_workflow_execution_exists(id)
delta = {}
delta = {}
if wf_ex.state:
delta['state'] = wf_ex.state
if wf_ex.state:
delta['state'] = wf_ex.state
if wf_ex.description:
delta['description'] = wf_ex.description
if wf_ex.description:
delta['description'] = wf_ex.description
if wf_ex.params and wf_ex.params.get('env'):
delta['env'] = wf_ex.params.get('env')
if wf_ex.params and wf_ex.params.get('env'):
delta['env'] = wf_ex.params.get('env')
# Currently we can change only state, description, or env.
if len(delta.values()) <= 0:
raise exc.InputException(
'The property state, description, or env '
'is not provided for update.'
)
# Currently we can change only state, description, or env.
if len(delta.values()) <= 0:
raise exc.InputException(
'The property state, description, or env '
'is not provided for update.'
)
# Description cannot be updated together with state.
if delta.get('description') and delta.get('state'):
raise exc.InputException(
'The property description must be updated '
'separately from state.'
)
# Description cannot be updated together with state.
if delta.get('description') and delta.get('state'):
raise exc.InputException(
'The property description must be updated '
'separately from state.'
)
# If state change, environment cannot be updated if not RUNNING.
if (delta.get('env') and
delta.get('state') and delta['state'] != states.RUNNING):
raise exc.InputException(
'The property env can only be updated when workflow '
'execution is not running or on resume from pause.'
)
# If state change, environment cannot be updated if not RUNNING.
if (delta.get('env') and
delta.get('state') and delta['state'] != states.RUNNING):
raise exc.InputException(
'The property env can only be updated when workflow '
'execution is not running or on resume from pause.'
)
if delta.get('description'):
wf_ex = db_api.update_workflow_execution(
id,
{'description': delta['description']}
)
if delta.get('description'):
wf_ex = db_api.update_workflow_execution(
id,
{'description': delta['description']}
)
if not delta.get('state') and delta.get('env'):
with db_api.transaction():
if not delta.get('state') and delta.get('env'):
wf_ex = db_api.get_workflow_execution(id)
wf_ex = wf_service.update_workflow_execution_env(
wf_ex,

View File

@ -121,20 +121,21 @@ class MembersController(rest.RestController):
msg = "Member id must be provided."
raise exc.WorkflowException(msg)
wf_db = db_api.get_workflow_definition(self.resource_id)
with db_api.transaction():
wf_db = db_api.get_workflow_definition(self.resource_id)
if wf_db.scope != 'private':
msg = "Only private resource could be shared."
raise exc.WorkflowException(msg)
if wf_db.scope != 'private':
msg = "Only private resource could be shared."
raise exc.WorkflowException(msg)
resource_member = {
'resource_id': self.resource_id,
'resource_type': self.type,
'member_id': member_info.member_id,
'status': 'pending'
}
resource_member = {
'resource_id': self.resource_id,
'resource_type': self.type,
'member_id': member_info.member_id,
'status': 'pending'
}
db_member = db_api.create_resource_member(resource_member)
db_member = db_api.create_resource_member(resource_member)
return resources.Member.from_dict(db_member.to_dict())

View File

@ -140,9 +140,10 @@ class TasksController(rest.RestController):
acl.enforce('tasks:get', context.ctx())
LOG.info("Fetch task [id=%s]" % id)
task_ex = db_api.get_task_execution(id)
with db_api.transaction():
task_ex = db_api.get_task_execution(id)
return _get_task_resource_with_result(task_ex)
return _get_task_resource_with_result(task_ex)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Tasks, types.uuid, int, types.uniquelist,
@ -245,16 +246,20 @@ class TasksController(rest.RestController):
LOG.info("Update task execution [id=%s, task=%s]" % (id, task))
task_ex = db_api.get_task_execution(id)
task_spec = spec_parser.get_task_spec(task_ex.spec)
task_name = task.name or None
reset = task.reset
env = task.env or None
with db_api.transaction():
task_ex = db_api.get_task_execution(id)
task_spec = spec_parser.get_task_spec(task_ex.spec)
task_name = task.name or None
reset = task.reset
env = task.env or None
if task_name and task_name != task_ex.name:
raise exc.WorkflowException('Task name does not match.')
if task_name and task_name != task_ex.name:
raise exc.WorkflowException('Task name does not match.')
wf_ex = db_api.get_workflow_execution(
task_ex.workflow_execution_id
)
wf_ex = db_api.get_workflow_execution(task_ex.workflow_execution_id)
wf_name = task.workflow_name or None
if wf_name and wf_name != wf_ex.name:
@ -262,7 +267,8 @@ class TasksController(rest.RestController):
if task.state != states.RUNNING:
raise exc.WorkflowException(
'Invalid task state. Only updating task to rerun is supported.'
'Invalid task state. '
'Only updating task to rerun is supported.'
)
if task_ex.state != states.ERROR:
@ -282,9 +288,10 @@ class TasksController(rest.RestController):
env=env
)
task_ex = db_api.get_task_execution(id)
with db_api.transaction():
task_ex = db_api.get_task_execution(id)
return _get_task_resource_with_result(task_ex)
return _get_task_resource_with_result(task_ex)
class ExecutionTasksController(rest.RestController):

View File

@ -158,30 +158,28 @@ def create_event_trigger(name, exchange, topic, event, workflow_id,
def delete_event_trigger(event_trigger):
with db_api.transaction():
db_api.delete_event_trigger(event_trigger['id'])
db_api.delete_event_trigger(event_trigger['id'])
trigs = db_api.get_event_triggers(
insecure=True,
exchange=event_trigger['exchange'],
topic=event_trigger['topic']
)
events = set([t.event for t in trigs])
trigs = db_api.get_event_triggers(
insecure=True,
exchange=event_trigger['exchange'],
topic=event_trigger['topic']
)
events = set([t.event for t in trigs])
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().delete_event_trigger(
event_trigger,
list(events)
)
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().delete_event_trigger(
event_trigger,
list(events)
)
def update_event_trigger(id, values):
with db_api.transaction():
trig = db_api.update_event_trigger(id, values)
trig = db_api.update_event_trigger(id, values)
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
return trig

View File

@ -25,6 +25,7 @@ import six
import webob
from wsme import exc as wsme_exc
from mistral.db.v2.sqlalchemy import api as db_api
from mistral import exceptions as exc
LOG = logging.getLogger(__name__)
@ -163,30 +164,31 @@ def get_all(list_cls, cls, get_all_function, get_function,
list_to_return = []
if resource_function:
# do not filter fields yet, resource_function needs the ORM object
db_list = get_all_function(
limit=limit,
marker=marker_obj,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
**filters
)
with db_api.transaction():
# do not filter fields yet, resource_function needs the ORM object
db_list = get_all_function(
limit=limit,
marker=marker_obj,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
**filters
)
for data in db_list:
obj = resource_function(data)
for data in db_list:
obj = resource_function(data)
# filter fields using a loop instead of the ORM
if fields:
data = []
for f in fields:
if hasattr(obj, f):
data.append(getattr(obj, f))
# filter fields using a loop instead of the ORM
if fields:
data = []
for f in fields:
if hasattr(obj, f):
data.append(getattr(obj, f))
dict_data = dict(zip(fields, data))
else:
dict_data = obj.to_dict()
dict_data = dict(zip(fields, data))
else:
dict_data = obj.to_dict()
list_to_return.append(cls.from_dict(dict_data))
list_to_return.append(cls.from_dict(dict_data))
else:
db_list = get_all_function(
limit=limit,