[Trusts] Fix deleting trust
* Delete trust only after the last execution of cron-trigger or event-trigger is completed. Otherwise during workflow execution we get an error "Unauthorized error: No such trust <id>" Closes-Bug: #1708139 Change-Id: I42849d4f7c517f8a27e0d26c9cf0d98f9c7fb94a
This commit is contained in:
parent
75ed360270
commit
b8e16648eb
@ -504,6 +504,10 @@ def get_event_trigger(id, insecure=False):
|
|||||||
return IMPL.get_event_trigger(id, insecure)
|
return IMPL.get_event_trigger(id, insecure)
|
||||||
|
|
||||||
|
|
||||||
|
def load_event_trigger(id, insecure=False):
|
||||||
|
return IMPL.load_event_trigger(id, insecure)
|
||||||
|
|
||||||
|
|
||||||
def get_event_triggers(insecure=False, limit=None, marker=None, sort_keys=None,
|
def get_event_triggers(insecure=False, limit=None, marker=None, sort_keys=None,
|
||||||
sort_dirs=None, fields=None, **kwargs):
|
sort_dirs=None, fields=None, **kwargs):
|
||||||
return IMPL.get_event_triggers(
|
return IMPL.get_event_triggers(
|
||||||
|
@ -1505,6 +1505,11 @@ def get_event_trigger(id, insecure=False, session=None):
|
|||||||
return event_trigger
|
return event_trigger
|
||||||
|
|
||||||
|
|
||||||
|
@b.session_aware()
|
||||||
|
def load_event_trigger(id, insecure=False, session=None):
|
||||||
|
return _get_event_trigger(id, insecure)
|
||||||
|
|
||||||
|
|
||||||
@b.session_aware()
|
@b.session_aware()
|
||||||
def get_event_triggers(insecure=False, session=None, **kwargs):
|
def get_event_triggers(insecure=False, session=None, **kwargs):
|
||||||
return _get_collection_sorted_by_time(
|
return _get_collection_sorted_by_time(
|
||||||
|
@ -27,6 +27,7 @@ from mistral import exceptions as exc
|
|||||||
from mistral.lang import parser as spec_parser
|
from mistral.lang import parser as spec_parser
|
||||||
from mistral.rpc import clients as rpc
|
from mistral.rpc import clients as rpc
|
||||||
from mistral.services import scheduler
|
from mistral.services import scheduler
|
||||||
|
from mistral.services import triggers
|
||||||
from mistral.services import workflows as wf_service
|
from mistral.services import workflows as wf_service
|
||||||
from mistral import utils
|
from mistral import utils
|
||||||
from mistral.utils import merge_dicts
|
from mistral.utils import merge_dicts
|
||||||
@ -294,6 +295,8 @@ class Workflow(object):
|
|||||||
# lookup cache anymore.
|
# lookup cache anymore.
|
||||||
lookup_utils.invalidate_cached_task_executions(self.wf_ex.id)
|
lookup_utils.invalidate_cached_task_executions(self.wf_ex.id)
|
||||||
|
|
||||||
|
triggers.on_workflow_complete(self.wf_ex)
|
||||||
|
|
||||||
if recursive and self.wf_ex.task_execution_id:
|
if recursive and self.wf_ex.task_execution_id:
|
||||||
parent_task_ex = db_api.get_task_execution(
|
parent_task_ex = db_api.get_task_execution(
|
||||||
self.wf_ex.task_execution_id
|
self.wf_ex.task_execution_id
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
@ -237,12 +238,23 @@ class DefaultEventEngine(base.EventEngine):
|
|||||||
ctx = security.create_context(t['trust_id'], t['project_id'])
|
ctx = security.create_context(t['trust_id'], t['project_id'])
|
||||||
auth_ctx.set_ctx(ctx)
|
auth_ctx.set_ctx(ctx)
|
||||||
|
|
||||||
|
description = {
|
||||||
|
"description": (
|
||||||
|
"Workflow execution created by event"
|
||||||
|
" trigger '(%s)'." % t['id']
|
||||||
|
),
|
||||||
|
"triggered_by": {
|
||||||
|
"type": "event_trigger",
|
||||||
|
"id": t['id'],
|
||||||
|
"name": t['name']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.engine_client.start_workflow(
|
self.engine_client.start_workflow(
|
||||||
t['workflow_id'],
|
t['workflow_id'],
|
||||||
t['workflow_input'],
|
t['workflow_input'],
|
||||||
description="Workflow execution created by event "
|
description=json.dumps(description),
|
||||||
"trigger %s." % t['id'],
|
|
||||||
**workflow_params
|
**workflow_params
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -390,4 +402,6 @@ class DefaultEventEngine(base.EventEngine):
|
|||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
security.delete_trust(trigger['trust_id'])
|
||||||
|
|
||||||
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
self._add_event_listener(trigger['exchange'], trigger['topic'], events)
|
||||||
|
@ -12,6 +12,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_service import periodic_task
|
from oslo_service import periodic_task
|
||||||
@ -59,11 +61,22 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
|||||||
trigger.name
|
trigger.name
|
||||||
)
|
)
|
||||||
|
|
||||||
|
description = {
|
||||||
|
"description": (
|
||||||
|
"Workflow execution created by cron"
|
||||||
|
" trigger '(%s)'." % trigger.id
|
||||||
|
),
|
||||||
|
"triggered_by": {
|
||||||
|
"type": "cron_trigger",
|
||||||
|
"id": trigger.id,
|
||||||
|
"name": trigger.name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rpc.get_engine_client().start_workflow(
|
rpc.get_engine_client().start_workflow(
|
||||||
trigger.workflow.name,
|
trigger.workflow.name,
|
||||||
trigger.workflow_input,
|
trigger.workflow_input,
|
||||||
description="Workflow execution created "
|
description=json.dumps(description),
|
||||||
"by cron trigger '(%s)'." % trigger.id,
|
|
||||||
**trigger.workflow_params
|
**trigger.workflow_params
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -88,7 +101,8 @@ def advance_cron_trigger(t):
|
|||||||
if t.remaining_executions == 0:
|
if t.remaining_executions == 0:
|
||||||
modified_count = triggers.delete_cron_trigger(
|
modified_count = triggers.delete_cron_trigger(
|
||||||
t.name,
|
t.name,
|
||||||
trust_id=t.trust_id
|
trust_id=t.trust_id,
|
||||||
|
delete_trust=False
|
||||||
)
|
)
|
||||||
else: # if remaining execution = None or > 0.
|
else: # if remaining execution = None or > 0.
|
||||||
next_time = triggers.get_next_execution_time(
|
next_time = triggers.get_next_execution_time(
|
||||||
|
@ -81,7 +81,12 @@ def create_context(trust_id, project_id):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def delete_trust(trust_id):
|
def delete_trust(trust_id=None):
|
||||||
|
if not trust_id:
|
||||||
|
# Try to retrieve trust from context.
|
||||||
|
if auth_ctx.has_ctx():
|
||||||
|
trust_id = auth_ctx.ctx().trust_id
|
||||||
|
|
||||||
if not trust_id:
|
if not trust_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -14,8 +14,11 @@
|
|||||||
|
|
||||||
import croniter
|
import croniter
|
||||||
import datetime
|
import datetime
|
||||||
|
import json
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
from mistral.engine import utils as eng_utils
|
from mistral.engine import utils as eng_utils
|
||||||
from mistral import exceptions as exc
|
from mistral import exceptions as exc
|
||||||
@ -24,6 +27,9 @@ from mistral.rpc import clients as rpc
|
|||||||
from mistral.services import security
|
from mistral.services import security
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def get_next_execution_time(pattern, start_time):
|
def get_next_execution_time(pattern, start_time):
|
||||||
return croniter.croniter(pattern, start_time).get_next(
|
return croniter.croniter(pattern, start_time).get_next(
|
||||||
datetime.datetime
|
datetime.datetime
|
||||||
@ -130,13 +136,14 @@ def create_cron_trigger(name, workflow_name, workflow_input,
|
|||||||
return trig
|
return trig
|
||||||
|
|
||||||
|
|
||||||
def delete_cron_trigger(name, trust_id=None):
|
def delete_cron_trigger(name, trust_id=None, delete_trust=True):
|
||||||
if not trust_id:
|
if not trust_id:
|
||||||
trigger = db_api.get_cron_trigger(name)
|
trigger = db_api.get_cron_trigger(name)
|
||||||
trust_id = trigger.trust_id
|
trust_id = trigger.trust_id
|
||||||
|
|
||||||
modified_count = db_api.delete_cron_trigger(name)
|
modified_count = db_api.delete_cron_trigger(name)
|
||||||
if modified_count:
|
|
||||||
|
if modified_count and delete_trust:
|
||||||
# Delete trust only together with deleting trigger.
|
# Delete trust only together with deleting trigger.
|
||||||
security.delete_trust(trust_id)
|
security.delete_trust(trust_id)
|
||||||
|
|
||||||
@ -217,3 +224,29 @@ def update_event_trigger(id, values):
|
|||||||
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
|
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
|
||||||
|
|
||||||
return trig
|
return trig
|
||||||
|
|
||||||
|
|
||||||
|
def on_workflow_complete(wf_ex):
|
||||||
|
if wf_ex.task_execution_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
description = json.loads(wf_ex.description)
|
||||||
|
except ValueError as e:
|
||||||
|
LOG.debug(str(e))
|
||||||
|
return
|
||||||
|
|
||||||
|
if not isinstance(description, dict):
|
||||||
|
return
|
||||||
|
|
||||||
|
triggered = description.get('triggered_by')
|
||||||
|
|
||||||
|
if not triggered:
|
||||||
|
return
|
||||||
|
|
||||||
|
if triggered['type'] == 'cron_trigger':
|
||||||
|
if not db_api.load_cron_trigger(triggered['name']):
|
||||||
|
security.delete_trust()
|
||||||
|
elif triggered['type'] == 'event_trigger':
|
||||||
|
if not db_api.load_event_trigger(triggered['id'], True):
|
||||||
|
security.delete_trust()
|
||||||
|
@ -234,7 +234,7 @@ class TriggerServiceV2Test(base.DbTestCase):
|
|||||||
mock.MagicMock(side_effect=new_advance_cron_trigger)
|
mock.MagicMock(side_effect=new_advance_cron_trigger)
|
||||||
)
|
)
|
||||||
@mock.patch.object(security, 'delete_trust')
|
@mock.patch.object(security, 'delete_trust')
|
||||||
def test_create_delete_trust_in_trigger(self, create_ctx, delete_trust):
|
def test_create_delete_trust_in_trigger(self, delete_trust, create_ctx):
|
||||||
create_ctx.return_value = self.ctx
|
create_ctx.return_value = self.ctx
|
||||||
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
||||||
trigger_thread = periodic.setup()
|
trigger_thread = periodic.setup()
|
||||||
@ -255,10 +255,8 @@ class TriggerServiceV2Test(base.DbTestCase):
|
|||||||
datetime.datetime(2010, 8, 25)
|
datetime.datetime(2010, 8, 25)
|
||||||
)
|
)
|
||||||
|
|
||||||
self._await(
|
eventlet.sleep(1)
|
||||||
lambda: delete_trust.call_count == 1, timeout=10
|
self.assertEqual(0, delete_trust.call_count)
|
||||||
)
|
|
||||||
self.assertEqual('my_trust_id', delete_trust.mock_calls[0][1][0])
|
|
||||||
|
|
||||||
def test_get_trigger_in_correct_orders(self):
|
def test_get_trigger_in_correct_orders(self):
|
||||||
t1_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
t1_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
||||||
|
Loading…
Reference in New Issue
Block a user