Trigger remaining-executions and first-exec-date
Adds the support of two optionals parameters for the cron-trigger resource: -first_execution_date : supersedes to the first calculation of "next_execution_time". -remaining_occurrences: number of occurrences after which the trigger should be deleted. The parameter "pattern" is now optional if first-execution-date only is used. Implements: blueprint mistral-cron-triggers-start-and-repeat Change-Id: I55bc28e98f89ffdfdce9cb3daa3848a17d85fd20
This commit is contained in:
parent
24f3c92caa
commit
f2cb7524e4
@ -32,13 +32,16 @@ class CronTrigger(resource.Resource):
|
||||
|
||||
id = wtypes.text
|
||||
name = wtypes.text
|
||||
pattern = wtypes.text
|
||||
workflow_name = wtypes.text
|
||||
workflow_input = wtypes.text
|
||||
|
||||
scope = SCOPE_TYPES
|
||||
|
||||
pattern = wtypes.text
|
||||
remaining_executions = wtypes.IntegerType(minimum=1)
|
||||
first_execution_time = wtypes.text
|
||||
next_execution_time = wtypes.text
|
||||
|
||||
created_at = wtypes.text
|
||||
updated_at = wtypes.text
|
||||
|
||||
@ -68,10 +71,11 @@ class CronTrigger(resource.Resource):
|
||||
def sample(cls):
|
||||
return cls(id='123e4567-e89b-12d3-a456-426655440000',
|
||||
name='my_trigger',
|
||||
pattern='* * * * *',
|
||||
workflow_name='my_wf',
|
||||
workflow_input={},
|
||||
scope='private',
|
||||
pattern='* * * * *',
|
||||
remaining_executions=42,
|
||||
created_at='1970-01-01T00:00:00.000000',
|
||||
updated_at='1970-01-01T00:00:00.000000')
|
||||
|
||||
@ -109,9 +113,11 @@ class CronTriggersController(rest.RestController):
|
||||
|
||||
db_model = triggers.create_cron_trigger(
|
||||
values['name'],
|
||||
values['pattern'],
|
||||
values['workflow_name'],
|
||||
values.get('workflow_input')
|
||||
values.get('workflow_input'),
|
||||
values.get('pattern'),
|
||||
values.get('first_execution_time'),
|
||||
values.get('remaining_executions')
|
||||
)
|
||||
|
||||
return CronTrigger.from_dict(db_model.to_dict())
|
||||
|
@ -256,6 +256,7 @@ class CronTrigger(mb.MistralSecureModelBase):
|
||||
pattern = sa.Column(sa.String(100))
|
||||
next_execution_time = sa.Column(sa.DateTime, nullable=False)
|
||||
workflow_name = sa.Column(sa.String(80))
|
||||
remaining_executions = sa.Column(sa.Integer)
|
||||
|
||||
workflow_id = sa.Column(
|
||||
sa.String(36),
|
||||
|
@ -81,6 +81,11 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
t.workflow_input
|
||||
)
|
||||
finally:
|
||||
if t.remaining_executions > 0:
|
||||
t.remaining_executions -= 1
|
||||
if t.remaining_executions == 0:
|
||||
db_api_v2.delete_cron_trigger(t.name)
|
||||
else: # if remaining execution = None or > 0
|
||||
next_time = triggers.get_next_execution_time(
|
||||
t.pattern,
|
||||
t.next_execution_time
|
||||
@ -88,7 +93,8 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks):
|
||||
|
||||
db_api_v2.update_cron_trigger(
|
||||
t.name,
|
||||
{'next_execution_time': next_time}
|
||||
{'next_execution_time': next_time,
|
||||
'remaining_executions': t.remaining_executions}
|
||||
)
|
||||
|
||||
auth_ctx.set_ctx(None)
|
||||
|
@ -17,6 +17,7 @@ import datetime
|
||||
|
||||
from mistral.db.v1 import api as db_api_v1
|
||||
from mistral.db.v2 import api as db_api_v2
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import security
|
||||
from mistral.workbook import parser as spec_parser
|
||||
|
||||
@ -80,20 +81,54 @@ def get_next_cron_triggers():
|
||||
)
|
||||
|
||||
|
||||
def create_cron_trigger(name, pattern, workflow_name, workflow_input,
|
||||
start_time=None):
|
||||
def validate_cron_trigger_input(pattern, first_time, count):
|
||||
if not (first_time or pattern):
|
||||
raise exc.InvalidModelException("Pattern or first_execution_time must"
|
||||
" be specified.")
|
||||
if first_time:
|
||||
if (datetime.datetime.now() + datetime.timedelta(0, 60)) > first_time:
|
||||
raise exc.InvalidModelException("First_execution_time must be at"
|
||||
" least one minute in the future.")
|
||||
if not pattern and count > 1:
|
||||
raise exc.InvalidModelException("Pattern must be provided if count"
|
||||
" is superior to 1.")
|
||||
if pattern:
|
||||
try:
|
||||
croniter(pattern)
|
||||
except (ValueError, KeyError):
|
||||
raise exc.InvalidModelException("The specified pattern is not"
|
||||
" valid: {}".format(pattern))
|
||||
|
||||
|
||||
def create_cron_trigger(name, workflow_name, workflow_input, pattern=None,
|
||||
first_time=None, count=None, start_time=None):
|
||||
if not start_time:
|
||||
start_time = datetime.datetime.now()
|
||||
|
||||
if type(first_time) in [str, unicode]:
|
||||
try:
|
||||
first_time = datetime.datetime.strptime(first_time,
|
||||
'%Y-%m-%d %H:%M')
|
||||
except ValueError as e:
|
||||
raise exc.InvalidModelException(e.message)
|
||||
|
||||
validate_cron_trigger_input(pattern, first_time, count)
|
||||
|
||||
if first_time:
|
||||
next_time = first_time
|
||||
if not (pattern and count):
|
||||
count = 1
|
||||
else:
|
||||
next_time = get_next_execution_time(pattern, start_time)
|
||||
|
||||
with db_api_v2.transaction():
|
||||
wf = db_api_v2.get_workflow_definition(workflow_name)
|
||||
|
||||
next_time = get_next_execution_time(pattern, start_time)
|
||||
|
||||
values = {
|
||||
'name': name,
|
||||
'pattern': pattern,
|
||||
'next_execution_time': next_time,
|
||||
'remaining_executions': count,
|
||||
'workflow_name': workflow_name,
|
||||
'workflow_id': wf.id,
|
||||
'workflow_input': workflow_input,
|
||||
|
@ -400,7 +400,7 @@ class CronTriggerTestsV2(base.TestCase):
|
||||
tr_name = 'trigger'
|
||||
|
||||
resp, body = self.client.create_cron_trigger(
|
||||
tr_name, '5 * * * *', self.wf_name)
|
||||
tr_name, self.wf_name, None, '5 * * * *')
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(tr_name, body['name'])
|
||||
|
||||
@ -418,18 +418,42 @@ class CronTriggerTestsV2(base.TestCase):
|
||||
trs_names = [tr['name'] for tr in body['cron_triggers']]
|
||||
self.assertNotIn(tr_name, trs_names)
|
||||
|
||||
@test.attr(type='sanity')
|
||||
def test_create_and_delete_oneshot_cron_triggers(self):
|
||||
tr_name = 'trigger'
|
||||
|
||||
resp, body = self.client.create_cron_trigger(
|
||||
tr_name, self.wf_name, None, None, "4242-12-25 13:37")
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(tr_name, body['name'])
|
||||
self.assertEqual("4242-12-25 13:37:00", body['next_execution_time'])
|
||||
|
||||
resp, body = self.client.get_list_obj('cron_triggers')
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
trs_names = [tr['name'] for tr in body['cron_triggers']]
|
||||
self.assertIn(tr_name, trs_names)
|
||||
|
||||
self.client.delete_obj('cron_triggers', tr_name)
|
||||
self.client.triggers.remove(tr_name)
|
||||
|
||||
_, body = self.client.get_list_obj('cron_triggers')
|
||||
|
||||
trs_names = [tr['name'] for tr in body['cron_triggers']]
|
||||
self.assertNotIn(tr_name, trs_names)
|
||||
|
||||
@test.attr(type='sanity')
|
||||
def test_create_two_cron_triggers_for_one_wf(self):
|
||||
tr_name_1 = 'trigger1'
|
||||
tr_name_2 = 'trigger2'
|
||||
|
||||
resp, body = self.client.create_cron_trigger(
|
||||
tr_name_1, '5 * * * *', self.wf_name)
|
||||
tr_name_1, self.wf_name, None, '5 * * * *')
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(tr_name_1, body['name'])
|
||||
|
||||
resp, body = self.client.create_cron_trigger(
|
||||
tr_name_2, '15 * * * *', self.wf_name)
|
||||
tr_name_2, self.wf_name, None, '15 * * * *')
|
||||
self.assertEqual(201, resp.status)
|
||||
self.assertEqual(tr_name_2, body['name'])
|
||||
|
||||
@ -444,7 +468,7 @@ class CronTriggerTestsV2(base.TestCase):
|
||||
def test_get_cron_trigger(self):
|
||||
tr_name = 'trigger'
|
||||
self.client.create_cron_trigger(
|
||||
tr_name, '5 * * * *', self.wf_name)
|
||||
tr_name, self.wf_name, None, '5 * * * *')
|
||||
|
||||
resp, body = self.client.get_object('cron_triggers', tr_name)
|
||||
|
||||
@ -455,7 +479,38 @@ class CronTriggerTestsV2(base.TestCase):
|
||||
def test_create_cron_trigger_nonexistent_wf(self):
|
||||
self.assertRaises(exceptions.NotFound,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', '5 * * * *', 'nonexist')
|
||||
'trigger', 'nonexist', None, '5 * * * *')
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_cron_trigger_invalid_count(self):
|
||||
self.assertRaises(exceptions.ServerFault,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', 'nonexist', None, '5 * * * *', None, "q")
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_cron_trigger_negative_count(self):
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', 'nonexist', None, '5 * * * *', None, -1)
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_cron_trigger_invalid_first_date(self):
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', 'nonexist', None, '5 * * * *', "q")
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_cron_trigger_count_only(self):
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', 'nonexist', None, None, None, "42")
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_create_cron_trigger_date_and_count_without_pattern(self):
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', 'nonexist', None, None,
|
||||
"4242-12-25 13:37", "42")
|
||||
|
||||
@test.attr(type='negative')
|
||||
def test_get_nonexistent_cron_trigger(self):
|
||||
@ -473,31 +528,31 @@ class CronTriggerTestsV2(base.TestCase):
|
||||
def test_create_two_cron_triggers_with_same_name(self):
|
||||
tr_name = 'trigger'
|
||||
self.client.create_cron_trigger(
|
||||
tr_name, '5 * * * *', self.wf_name)
|
||||
tr_name, self.wf_name, None, '5 * * * *')
|
||||
self.assertRaises(exceptions.Conflict,
|
||||
self.client.create_cron_trigger,
|
||||
tr_name, '5 * * * *', self.wf_name)
|
||||
tr_name, self.wf_name, None, '5 * * * *')
|
||||
|
||||
@decorators.skip_because(bug="1383146")
|
||||
@test.attr(type='negative')
|
||||
def test_create_two_cron_triggers_with_same_pattern(self):
|
||||
self.client.create_trigger(
|
||||
'trigger1', '5 * * * *', self.wf_name)
|
||||
'trigger1', self.wf_name, None, '5 * * * *')
|
||||
self.assertRaises(exceptions.Conflict,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger2', '5 * * * *', self.wf_name)
|
||||
'trigger2', self.wf_name, None, '5 * * * *')
|
||||
|
||||
@test.attr(type='nagative')
|
||||
@test.attr(type='negative')
|
||||
def test_invalid_cron_pattern_not_enough_params(self):
|
||||
self.assertRaises(exceptions.ServerFault,
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', '5 *', self.wf_name)
|
||||
'trigger', self.wf_name, None, '5 *')
|
||||
|
||||
@test.attr(type='nagative')
|
||||
@test.attr(type='negative')
|
||||
def test_invalid_cron_pattern_out_of_range(self):
|
||||
self.assertRaises(exceptions.ServerFault,
|
||||
self.assertRaises(exceptions.BadRequest,
|
||||
self.client.create_cron_trigger,
|
||||
'trigger', '88 * * * *', self.wf_name)
|
||||
'trigger', self.wf_name, None, '88 * * * *')
|
||||
|
||||
|
||||
class ActionTestsV2(base.TestCase):
|
||||
|
@ -270,11 +270,14 @@ class MistralClientV2(MistralClientBase):
|
||||
|
||||
return resp, json.loads(body)
|
||||
|
||||
def create_cron_trigger(self, name, pattern, wf_name, wf_input=None):
|
||||
def create_cron_trigger(self, name, wf_name, wf_input=None, pattern=None,
|
||||
first_time=None, count=None):
|
||||
post_body = {
|
||||
'name': name,
|
||||
'pattern': pattern,
|
||||
'workflow_name': wf_name,
|
||||
'pattern': pattern,
|
||||
'remaining_executions': count,
|
||||
'first_execution_time': first_time
|
||||
}
|
||||
if wf_input:
|
||||
post_body.update({'workflow_input': json.dumps(wf_input)})
|
||||
|
@ -32,7 +32,8 @@ TRIGGER = {
|
||||
'pattern': '* * * * *',
|
||||
'workflow_name': WF.name,
|
||||
'workflow_input': '{}',
|
||||
'scope': 'private'
|
||||
'scope': 'private',
|
||||
'remaining_executions': 42
|
||||
}
|
||||
|
||||
trigger_values = copy.copy(TRIGGER)
|
||||
@ -91,6 +92,7 @@ class TestCronTriggerController(base.FunctionalTest):
|
||||
values = mock_mtd.call_args[0][0]
|
||||
|
||||
self.assertEqual('* * * * *', values['pattern'])
|
||||
self.assertEqual(42, values['remaining_executions'])
|
||||
|
||||
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
|
||||
@mock.patch.object(db_api, "create_cron_trigger", MOCK_DUPLICATE)
|
||||
|
@ -1083,6 +1083,7 @@ CRON_TRIGGERS = [
|
||||
'workflow_input': {},
|
||||
'next_execution_time':
|
||||
datetime.datetime.now() + datetime.timedelta(days=1),
|
||||
'remaining_executions': 42,
|
||||
'scope': 'private',
|
||||
'project_id': '<default-project>'
|
||||
},
|
||||
@ -1094,6 +1095,7 @@ CRON_TRIGGERS = [
|
||||
'workflow_input': {'param': 'val'},
|
||||
'next_execution_time':
|
||||
datetime.datetime.now() + datetime.timedelta(days=1),
|
||||
'remaining_executions': 42,
|
||||
'scope': 'private',
|
||||
'project_id': '<default-project>'
|
||||
},
|
||||
|
@ -95,9 +95,11 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
def test_trigger_create(self):
|
||||
trigger = t_s.create_cron_trigger(
|
||||
'test',
|
||||
'*/5 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/5 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime(2010, 8, 25)
|
||||
)
|
||||
|
||||
@ -113,6 +115,22 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
|
||||
self.assertEqual(datetime.datetime(2010, 8, 25, 0, 10), next_time)
|
||||
|
||||
def test_oneshot_trigger_create(self):
|
||||
trigger = t_s.create_cron_trigger(
|
||||
'test',
|
||||
self.wf.name,
|
||||
{},
|
||||
None,
|
||||
"4242-12-25 13:37",
|
||||
None,
|
||||
datetime.datetime(2010, 8, 25)
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
datetime.datetime(4242, 12, 25, 13, 37),
|
||||
trigger.next_execution_time
|
||||
)
|
||||
|
||||
@mock.patch.object(security, 'create_trust',
|
||||
type('trust', (object,), {'id': 'my_trust_id'}))
|
||||
def test_create_trust_in_trigger(self):
|
||||
@ -124,9 +142,11 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
|
||||
trigger = t_s.create_cron_trigger(
|
||||
'test',
|
||||
'*/2 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/2 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime(2010, 8, 25)
|
||||
)
|
||||
|
||||
@ -135,33 +155,41 @@ class TriggerServiceV2Test(base.DbTestCase):
|
||||
def test_get_trigger_in_correct_orders(self):
|
||||
t_s.create_cron_trigger(
|
||||
'test1',
|
||||
'*/5 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/5 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime(2010, 8, 25)
|
||||
)
|
||||
|
||||
t_s.create_cron_trigger(
|
||||
'test2',
|
||||
'*/1 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/1 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime(2010, 8, 22)
|
||||
)
|
||||
|
||||
t_s.create_cron_trigger(
|
||||
'test3',
|
||||
'*/2 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/2 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime(2010, 9, 21)
|
||||
)
|
||||
|
||||
t_s.create_cron_trigger(
|
||||
'test4',
|
||||
'*/3 * * * *',
|
||||
self.wf.name,
|
||||
{},
|
||||
'*/3 * * * *',
|
||||
None,
|
||||
None,
|
||||
datetime.datetime.now() + datetime.timedelta(0, 50)
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user