From b8e16648ebd0a9a549d3eb49607672e412b3505c Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Wed, 2 Aug 2017 17:46:34 +0300 Subject: [PATCH] [Trusts] Fix deleting trust * Delete trust only after the last execution of cron-trigger or event-trigger is completed. Otherwise during workflow execution we get an error "Unauthorized error: No such trust " Closes-Bug: #1708139 Change-Id: I42849d4f7c517f8a27e0d26c9cf0d98f9c7fb94a --- mistral/db/v2/api.py | 4 ++ mistral/db/v2/sqlalchemy/api.py | 5 +++ mistral/engine/workflows.py | 3 ++ mistral/event_engine/default_event_engine.py | 18 ++++++++- mistral/services/periodic.py | 20 ++++++++-- mistral/services/security.py | 7 +++- mistral/services/triggers.py | 37 ++++++++++++++++++- .../unit/services/test_trigger_service.py | 8 ++-- 8 files changed, 89 insertions(+), 13 deletions(-) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index b9339a89b..c9cdd6af4 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -504,6 +504,10 @@ def get_event_trigger(id, insecure=False): return IMPL.get_event_trigger(id, insecure) +def load_event_trigger(id, insecure=False): + return IMPL.load_event_trigger(id, insecure) + + def get_event_triggers(insecure=False, limit=None, marker=None, sort_keys=None, sort_dirs=None, fields=None, **kwargs): return IMPL.get_event_triggers( diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 28b2bb83c..299160259 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1505,6 +1505,11 @@ def get_event_trigger(id, insecure=False, session=None): return event_trigger +@b.session_aware() +def load_event_trigger(id, insecure=False, session=None): + return _get_event_trigger(id, insecure) + + @b.session_aware() def get_event_triggers(insecure=False, session=None, **kwargs): return _get_collection_sorted_by_time( diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 82f42864b..e2bca847d 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -27,6 +27,7 @@ from mistral import exceptions as exc from mistral.lang import parser as spec_parser from mistral.rpc import clients as rpc from mistral.services import scheduler +from mistral.services import triggers from mistral.services import workflows as wf_service from mistral import utils from mistral.utils import merge_dicts @@ -294,6 +295,8 @@ class Workflow(object): # lookup cache anymore. lookup_utils.invalidate_cached_task_executions(self.wf_ex.id) + triggers.on_workflow_complete(self.wf_ex) + if recursive and self.wf_ex.task_execution_id: parent_task_ex = db_api.get_task_execution( self.wf_ex.task_execution_id diff --git a/mistral/event_engine/default_event_engine.py b/mistral/event_engine/default_event_engine.py index ac9754b59..33fc30257 100644 --- a/mistral/event_engine/default_event_engine.py +++ b/mistral/event_engine/default_event_engine.py @@ -14,6 +14,7 @@ # under the License. from collections import defaultdict +import json import os import threading @@ -237,12 +238,23 @@ class DefaultEventEngine(base.EventEngine): ctx = security.create_context(t['trust_id'], t['project_id']) auth_ctx.set_ctx(ctx) + description = { + "description": ( + "Workflow execution created by event" + " trigger '(%s)'." % t['id'] + ), + "triggered_by": { + "type": "event_trigger", + "id": t['id'], + "name": t['name'] + } + } + try: self.engine_client.start_workflow( t['workflow_id'], t['workflow_input'], - description="Workflow execution created by event " - "trigger %s." % t['id'], + description=json.dumps(description), **workflow_params ) except Exception as e: @@ -390,4 +402,6 @@ class DefaultEventEngine(base.EventEngine): return + security.delete_trust(trigger['trust_id']) + self._add_event_listener(trigger['exchange'], trigger['topic'], events) diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 0bf0608c5..5ac965f11 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json + from oslo_config import cfg from oslo_log import log as logging from oslo_service import periodic_task @@ -59,11 +61,22 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): trigger.name ) + description = { + "description": ( + "Workflow execution created by cron" + " trigger '(%s)'." % trigger.id + ), + "triggered_by": { + "type": "cron_trigger", + "id": trigger.id, + "name": trigger.name, + } + } + rpc.get_engine_client().start_workflow( trigger.workflow.name, trigger.workflow_input, - description="Workflow execution created " - "by cron trigger '(%s)'." % trigger.id, + description=json.dumps(description), **trigger.workflow_params ) except Exception: @@ -88,7 +101,8 @@ def advance_cron_trigger(t): if t.remaining_executions == 0: modified_count = triggers.delete_cron_trigger( t.name, - trust_id=t.trust_id + trust_id=t.trust_id, + delete_trust=False ) else: # if remaining execution = None or > 0. next_time = triggers.get_next_execution_time( diff --git a/mistral/services/security.py b/mistral/services/security.py index d389200ad..6fd4913fd 100644 --- a/mistral/services/security.py +++ b/mistral/services/security.py @@ -81,7 +81,12 @@ def create_context(trust_id, project_id): ) -def delete_trust(trust_id): +def delete_trust(trust_id=None): + if not trust_id: + # Try to retrieve trust from context. + if auth_ctx.has_ctx(): + trust_id = auth_ctx.ctx().trust_id + if not trust_id: return diff --git a/mistral/services/triggers.py b/mistral/services/triggers.py index da3ca6e6e..c32bc3430 100644 --- a/mistral/services/triggers.py +++ b/mistral/services/triggers.py @@ -14,8 +14,11 @@ import croniter import datetime +import json import six +from oslo_log import log as logging + from mistral.db.v2 import api as db_api from mistral.engine import utils as eng_utils from mistral import exceptions as exc @@ -24,6 +27,9 @@ from mistral.rpc import clients as rpc from mistral.services import security +LOG = logging.getLogger(__name__) + + def get_next_execution_time(pattern, start_time): return croniter.croniter(pattern, start_time).get_next( datetime.datetime @@ -130,13 +136,14 @@ def create_cron_trigger(name, workflow_name, workflow_input, return trig -def delete_cron_trigger(name, trust_id=None): +def delete_cron_trigger(name, trust_id=None, delete_trust=True): if not trust_id: trigger = db_api.get_cron_trigger(name) trust_id = trigger.trust_id modified_count = db_api.delete_cron_trigger(name) - if modified_count: + + if modified_count and delete_trust: # Delete trust only together with deleting trigger. security.delete_trust(trust_id) @@ -217,3 +224,29 @@ def update_event_trigger(id, values): rpc.get_event_engine_client().update_event_trigger(trig.to_dict()) return trig + + +def on_workflow_complete(wf_ex): + if wf_ex.task_execution_id: + return + + try: + description = json.loads(wf_ex.description) + except ValueError as e: + LOG.debug(str(e)) + return + + if not isinstance(description, dict): + return + + triggered = description.get('triggered_by') + + if not triggered: + return + + if triggered['type'] == 'cron_trigger': + if not db_api.load_cron_trigger(triggered['name']): + security.delete_trust() + elif triggered['type'] == 'event_trigger': + if not db_api.load_event_trigger(triggered['id'], True): + security.delete_trust() diff --git a/mistral/tests/unit/services/test_trigger_service.py b/mistral/tests/unit/services/test_trigger_service.py index eddf29ca5..8f965fc86 100644 --- a/mistral/tests/unit/services/test_trigger_service.py +++ b/mistral/tests/unit/services/test_trigger_service.py @@ -234,7 +234,7 @@ class TriggerServiceV2Test(base.DbTestCase): mock.MagicMock(side_effect=new_advance_cron_trigger) ) @mock.patch.object(security, 'delete_trust') - def test_create_delete_trust_in_trigger(self, create_ctx, delete_trust): + def test_create_delete_trust_in_trigger(self, delete_trust, create_ctx): create_ctx.return_value = self.ctx cfg.CONF.set_default('auth_enable', True, group='pecan') trigger_thread = periodic.setup() @@ -255,10 +255,8 @@ class TriggerServiceV2Test(base.DbTestCase): datetime.datetime(2010, 8, 25) ) - self._await( - lambda: delete_trust.call_count == 1, timeout=10 - ) - self.assertEqual('my_trust_id', delete_trust.mock_calls[0][1][0]) + eventlet.sleep(1) + self.assertEqual(0, delete_trust.call_count) def test_get_trigger_in_correct_orders(self): t1_name = 'trigger-%s' % utils.generate_unicode_uuid()