diff --git a/mistral/api/controllers/v2/action_execution.py b/mistral/api/controllers/v2/action_execution.py index 926aed7e..b2128ac0 100644 --- a/mistral/api/controllers/v2/action_execution.py +++ b/mistral/api/controllers/v2/action_execution.py @@ -43,9 +43,10 @@ def _load_deferred_output_field(action_ex): def _get_action_execution(id): - action_ex = db_api.get_action_execution(id) + with db_api.transaction(): + action_ex = db_api.get_action_execution(id) - return _get_action_execution_resource(action_ex) + return _get_action_execution_resource(action_ex) def _get_action_execution_resource(action_ex): @@ -275,17 +276,20 @@ class ActionExecutionsController(rest.RestController): raise exc.NotAllowedException("Action execution deletion is not " "allowed.") - action_ex = db_api.get_action_execution(id) + with db_api.transaction(): + action_ex = db_api.get_action_execution(id) - if action_ex.task_execution_id: - raise exc.NotAllowedException("Only ad-hoc action execution can " - "be deleted.") + if action_ex.task_execution_id: + raise exc.NotAllowedException( + "Only ad-hoc action execution can be deleted." + ) - if not states.is_completed(action_ex.state): - raise exc.NotAllowedException("Only completed action execution " - "can be deleted.") + if not states.is_completed(action_ex.state): + raise exc.NotAllowedException( + "Only completed action execution can be deleted." + ) - return db_api.delete_action_execution(id) + return db_api.delete_action_execution(id) class TasksActionExecutionController(rest.RestController): diff --git a/mistral/api/controllers/v2/event_trigger.py b/mistral/api/controllers/v2/event_trigger.py index 2346485a..607414a6 100644 --- a/mistral/api/controllers/v2/event_trigger.py +++ b/mistral/api/controllers/v2/event_trigger.py @@ -96,11 +96,12 @@ class EventTriggersController(rest.RestController): UPDATE_NOT_ALLOWED ) - db_api.ensure_event_trigger_exists(id) - LOG.info('Update event trigger: [id=%s, values=%s]', id, values) - db_model = triggers.update_event_trigger(id, values) + with db_api.transaction(): + db_api.ensure_event_trigger_exists(id) + + db_model = triggers.update_event_trigger(id, values) return resources.EventTrigger.from_dict(db_model.to_dict()) @@ -112,9 +113,10 @@ class EventTriggersController(rest.RestController): LOG.info("Delete event trigger [id=%s]", id) - event_trigger = db_api.get_event_trigger(id) + with db_api.transaction(): + event_trigger = db_api.get_event_trigger(id) - triggers.delete_event_trigger(event_trigger.to_dict()) + triggers.delete_event_trigger(event_trigger.to_dict()) @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.EventTriggers, types.uuid, int, diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index 528863ff..d0049bf0 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -95,49 +95,49 @@ class ExecutionsController(rest.RestController): LOG.info('Update execution [id=%s, execution=%s]' % (id, wf_ex)) - db_api.ensure_workflow_execution_exists(id) + with db_api.transaction(): + db_api.ensure_workflow_execution_exists(id) - delta = {} + delta = {} - if wf_ex.state: - delta['state'] = wf_ex.state + if wf_ex.state: + delta['state'] = wf_ex.state - if wf_ex.description: - delta['description'] = wf_ex.description + if wf_ex.description: + delta['description'] = wf_ex.description - if wf_ex.params and wf_ex.params.get('env'): - delta['env'] = wf_ex.params.get('env') + if wf_ex.params and wf_ex.params.get('env'): + delta['env'] = wf_ex.params.get('env') - # Currently we can change only state, description, or env. - if len(delta.values()) <= 0: - raise exc.InputException( - 'The property state, description, or env ' - 'is not provided for update.' - ) + # Currently we can change only state, description, or env. + if len(delta.values()) <= 0: + raise exc.InputException( + 'The property state, description, or env ' + 'is not provided for update.' + ) - # Description cannot be updated together with state. - if delta.get('description') and delta.get('state'): - raise exc.InputException( - 'The property description must be updated ' - 'separately from state.' - ) + # Description cannot be updated together with state. + if delta.get('description') and delta.get('state'): + raise exc.InputException( + 'The property description must be updated ' + 'separately from state.' + ) - # If state change, environment cannot be updated if not RUNNING. - if (delta.get('env') and - delta.get('state') and delta['state'] != states.RUNNING): - raise exc.InputException( - 'The property env can only be updated when workflow ' - 'execution is not running or on resume from pause.' - ) + # If state change, environment cannot be updated if not RUNNING. + if (delta.get('env') and + delta.get('state') and delta['state'] != states.RUNNING): + raise exc.InputException( + 'The property env can only be updated when workflow ' + 'execution is not running or on resume from pause.' + ) - if delta.get('description'): - wf_ex = db_api.update_workflow_execution( - id, - {'description': delta['description']} - ) + if delta.get('description'): + wf_ex = db_api.update_workflow_execution( + id, + {'description': delta['description']} + ) - if not delta.get('state') and delta.get('env'): - with db_api.transaction(): + if not delta.get('state') and delta.get('env'): wf_ex = db_api.get_workflow_execution(id) wf_ex = wf_service.update_workflow_execution_env( wf_ex, diff --git a/mistral/api/controllers/v2/member.py b/mistral/api/controllers/v2/member.py index e586d1ad..b71b70d8 100644 --- a/mistral/api/controllers/v2/member.py +++ b/mistral/api/controllers/v2/member.py @@ -121,20 +121,21 @@ class MembersController(rest.RestController): msg = "Member id must be provided." raise exc.WorkflowException(msg) - wf_db = db_api.get_workflow_definition(self.resource_id) + with db_api.transaction(): + wf_db = db_api.get_workflow_definition(self.resource_id) - if wf_db.scope != 'private': - msg = "Only private resource could be shared." - raise exc.WorkflowException(msg) + if wf_db.scope != 'private': + msg = "Only private resource could be shared." + raise exc.WorkflowException(msg) - resource_member = { - 'resource_id': self.resource_id, - 'resource_type': self.type, - 'member_id': member_info.member_id, - 'status': 'pending' - } + resource_member = { + 'resource_id': self.resource_id, + 'resource_type': self.type, + 'member_id': member_info.member_id, + 'status': 'pending' + } - db_member = db_api.create_resource_member(resource_member) + db_member = db_api.create_resource_member(resource_member) return resources.Member.from_dict(db_member.to_dict()) diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index c9a33b52..72b3a0f5 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -140,9 +140,10 @@ class TasksController(rest.RestController): acl.enforce('tasks:get', context.ctx()) LOG.info("Fetch task [id=%s]" % id) - task_ex = db_api.get_task_execution(id) + with db_api.transaction(): + task_ex = db_api.get_task_execution(id) - return _get_task_resource_with_result(task_ex) + return _get_task_resource_with_result(task_ex) @rest_utils.wrap_wsme_controller_exception @wsme_pecan.wsexpose(resources.Tasks, types.uuid, int, types.uniquelist, @@ -245,16 +246,20 @@ class TasksController(rest.RestController): LOG.info("Update task execution [id=%s, task=%s]" % (id, task)) - task_ex = db_api.get_task_execution(id) - task_spec = spec_parser.get_task_spec(task_ex.spec) - task_name = task.name or None - reset = task.reset - env = task.env or None + with db_api.transaction(): + task_ex = db_api.get_task_execution(id) + task_spec = spec_parser.get_task_spec(task_ex.spec) + task_name = task.name or None + reset = task.reset + env = task.env or None - if task_name and task_name != task_ex.name: - raise exc.WorkflowException('Task name does not match.') + if task_name and task_name != task_ex.name: + raise exc.WorkflowException('Task name does not match.') + + wf_ex = db_api.get_workflow_execution( + task_ex.workflow_execution_id + ) - wf_ex = db_api.get_workflow_execution(task_ex.workflow_execution_id) wf_name = task.workflow_name or None if wf_name and wf_name != wf_ex.name: @@ -262,7 +267,8 @@ class TasksController(rest.RestController): if task.state != states.RUNNING: raise exc.WorkflowException( - 'Invalid task state. Only updating task to rerun is supported.' + 'Invalid task state. ' + 'Only updating task to rerun is supported.' ) if task_ex.state != states.ERROR: @@ -282,9 +288,10 @@ class TasksController(rest.RestController): env=env ) - task_ex = db_api.get_task_execution(id) + with db_api.transaction(): + task_ex = db_api.get_task_execution(id) - return _get_task_resource_with_result(task_ex) + return _get_task_resource_with_result(task_ex) class ExecutionTasksController(rest.RestController): diff --git a/mistral/services/triggers.py b/mistral/services/triggers.py index 6605f471..ebefb41e 100644 --- a/mistral/services/triggers.py +++ b/mistral/services/triggers.py @@ -158,30 +158,28 @@ def create_event_trigger(name, exchange, topic, event, workflow_id, def delete_event_trigger(event_trigger): - with db_api.transaction(): - db_api.delete_event_trigger(event_trigger['id']) + db_api.delete_event_trigger(event_trigger['id']) - trigs = db_api.get_event_triggers( - insecure=True, - exchange=event_trigger['exchange'], - topic=event_trigger['topic'] - ) - events = set([t.event for t in trigs]) + trigs = db_api.get_event_triggers( + insecure=True, + exchange=event_trigger['exchange'], + topic=event_trigger['topic'] + ) + events = set([t.event for t in trigs]) - # NOTE(kong): Send RPC message within the db transaction, rollback if - # any error occurs. - rpc.get_event_engine_client().delete_event_trigger( - event_trigger, - list(events) - ) + # NOTE(kong): Send RPC message within the db transaction, rollback if + # any error occurs. + rpc.get_event_engine_client().delete_event_trigger( + event_trigger, + list(events) + ) def update_event_trigger(id, values): - with db_api.transaction(): - trig = db_api.update_event_trigger(id, values) + trig = db_api.update_event_trigger(id, values) - # NOTE(kong): Send RPC message within the db transaction, rollback if - # any error occurs. - rpc.get_event_engine_client().update_event_trigger(trig.to_dict()) + # NOTE(kong): Send RPC message within the db transaction, rollback if + # any error occurs. + rpc.get_event_engine_client().update_event_trigger(trig.to_dict()) return trig diff --git a/mistral/utils/rest_utils.py b/mistral/utils/rest_utils.py index 065be2c5..d2f854b4 100644 --- a/mistral/utils/rest_utils.py +++ b/mistral/utils/rest_utils.py @@ -25,6 +25,7 @@ import six import webob from wsme import exc as wsme_exc +from mistral.db.v2.sqlalchemy import api as db_api from mistral import exceptions as exc LOG = logging.getLogger(__name__) @@ -163,30 +164,31 @@ def get_all(list_cls, cls, get_all_function, get_function, list_to_return = [] if resource_function: - # do not filter fields yet, resource_function needs the ORM object - db_list = get_all_function( - limit=limit, - marker=marker_obj, - sort_keys=sort_keys, - sort_dirs=sort_dirs, - **filters - ) + with db_api.transaction(): + # do not filter fields yet, resource_function needs the ORM object + db_list = get_all_function( + limit=limit, + marker=marker_obj, + sort_keys=sort_keys, + sort_dirs=sort_dirs, + **filters + ) - for data in db_list: - obj = resource_function(data) + for data in db_list: + obj = resource_function(data) - # filter fields using a loop instead of the ORM - if fields: - data = [] - for f in fields: - if hasattr(obj, f): - data.append(getattr(obj, f)) + # filter fields using a loop instead of the ORM + if fields: + data = [] + for f in fields: + if hasattr(obj, f): + data.append(getattr(obj, f)) - dict_data = dict(zip(fields, data)) - else: - dict_data = obj.to_dict() + dict_data = dict(zip(fields, data)) + else: + dict_data = obj.to_dict() - list_to_return.append(cls.from_dict(dict_data)) + list_to_return.append(cls.from_dict(dict_data)) else: db_list = get_all_function( limit=limit,