Add job update api

By using update api, users could pause/cancel/resume jobs, or change
job execution pattern.

Change-Id: I05103316a91ad79c4d0593c4a02a400aac45a762
This commit is contained in:
Lingxian Kong 2017-08-01 16:18:01 +12:00
parent a2213f3f6d
commit 16876649e5
9 changed files with 305 additions and 45 deletions

View File

@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import croniter
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from pecan import rest
import wsmeext.pecan as wsme_pecan
@ -30,6 +34,8 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
POST_REQUIRED = set(['function_id'])
UPDATE_ALLOWED = set(['name', 'function_input', 'status', 'pattern',
'next_execution_time'])
class JobsController(rest.RestController):
@ -49,6 +55,7 @@ class JobsController(rest.RestController):
'Required param is missing. Required: %s' % POST_REQUIRED
)
# Check the input params.
first_time, next_time, count = jobs.validate_job(params)
LOG.info("Creating %s, params: %s", self.type, params)
@ -101,3 +108,66 @@ class JobsController(rest.RestController):
for db_model in db_api.get_jobs()]
return resources.Jobs(jobs=jobs)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Job,
types.uuid,
body=resources.Job
)
def put(self, id, job):
"""Update job definition.
1. Can not update a finished job.
2. Can not change job type.
3. Allow to pause a one-shot job and resume before its first execution
time.
"""
values = {}
for key in UPDATE_ALLOWED:
if key in job.to_dict():
values.update({key: job.to_dict().get(key)})
LOG.info('Update resource, params: %s', values,
resource={'type': self.type, 'id': id})
new_status = values.get('status')
pattern = values.get('pattern')
next_execution_time = values.get('next_execution_time')
job_db = db_api.get_job(id)
if job_db.status in [status.DONE, status.CANCELLED]:
raise exc.InputException('Can not update a finished job.')
if pattern:
if not job_db.pattern:
raise exc.InputException('Can not change job type.')
jobs.validate_pattern(pattern)
elif pattern == '' and job_db.pattern:
raise exc.InputException('Can not change job type.')
valid_states = [status.RUNNING, status.CANCELLED, status.PAUSED]
if new_status and new_status not in valid_states:
raise exc.InputException('Invalid status.')
if next_execution_time:
values['next_execution_time'] = jobs.validate_next_time(
next_execution_time
)
elif (job_db.status == status.PAUSED and
new_status == status.RUNNING):
p = job_db.pattern or pattern
if not p:
# Check if the next execution time for one-shot job is still
# valid.
jobs.validate_next_time(job_db.next_execution_time)
else:
# Update next_execution_time for recurring job.
values['next_execution_time'] = croniter.croniter(
p, timeutils.utcnow()
).get_next(datetime.datetime)
updated_job = db_api.update_job(id, values)
return resources.Job.from_dict(updated_job.to_dict())

View File

@ -312,7 +312,7 @@ class Job(Resource):
name = wtypes.text
function_id = types.uuid
function_input = types.jsontype
status = wsme.wsattr(wtypes.text, readonly=True)
status = wtypes.text
pattern = wtypes.text
count = int
first_execution_time = wtypes.text

View File

@ -178,6 +178,10 @@ def delete_job(id):
return IMPL.delete_job(id)
def update_job(id, values):
return IMPL.update_job(id, values)
def get_jobs():
return IMPL.get_jobs()

View File

@ -430,12 +430,20 @@ def delete_job(id, session=None):
return result.rowcount
@db_base.session_aware()
def update_job(id, values, session=None):
job = get_job(id)
job.update(values.copy())
return job
@db_base.session_aware()
def get_next_jobs(before, session=None):
return _get_collection(
models.Job, insecure=True, sort_keys=['next_execution_time'],
sort_dirs=['asc'], next_execution_time={'lt': before},
status={'neq': status.DONE}
status=status.RUNNING
)

View File

@ -83,12 +83,7 @@ class Job(model_base.QinlingSecureModelBase):
__tablename__ = 'job'
name = sa.Column(sa.String(255), nullable=True)
pattern = sa.Column(
sa.String(32),
nullable=True,
# Set default to 'never'.
default='0 0 30 2 0'
)
pattern = sa.Column(sa.String(32), nullable=True)
status = sa.Column(sa.String(32), nullable=False)
first_execution_time = sa.Column(sa.DateTime, nullable=True)
next_execution_time = sa.Column(sa.DateTime, nullable=False)

View File

@ -19,4 +19,6 @@ ERROR = 'error'
DELETING = 'deleting'
RUNNING = 'running'
DONE = 'done'
PAUSED = 'paused'
CANCELLED = 'cancelled'
SUCCESS = 'success'

View File

@ -15,6 +15,9 @@
from datetime import datetime
from datetime import timedelta
from qinling import context as auth_context
from qinling.db import api as db_api
from qinling import status
from qinling.tests.unit.api import base
@ -40,9 +43,183 @@ class TestJobController(base.APITest):
def test_delete(self):
job_id = self.create_job(
self.function_id, prefix='TestJobController'
self.function_id, prefix='TestJobController',
first_execution_time=datetime.utcnow(),
next_execution_time=datetime.utcnow() + timedelta(hours=1),
status=status.RUNNING,
count=1
).id
resp = self.app.delete('/v1/jobs/%s' % job_id)
self.assertEqual(204, resp.status_int)
def test_update_one_shot_job(self):
job_id = self.create_job(
self.function_id,
prefix='TestJobController',
first_execution_time=datetime.utcnow(),
next_execution_time=datetime.utcnow() + timedelta(hours=1),
status=status.RUNNING,
count=1
).id
req_body = {
'name': 'new_name',
'status': status.PAUSED
}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
req_body = {
'status': status.RUNNING
}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
def test_update_one_shot_job_failed(self):
job_id = self.create_job(
self.function_id,
prefix='TestJobController',
first_execution_time=datetime.utcnow(),
next_execution_time=datetime.utcnow() + timedelta(hours=1),
status=status.RUNNING,
count=1
).id
url = '/v1/jobs/%s' % job_id
# Try to change job type
resp = self.app.put_json(
url,
{'pattern': '*/1 * * * *'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn('Can not change job type.', resp.json['faultstring'])
# Try to resume job but the execution time is invalid
auth_context.set_ctx(self.ctx)
self.addCleanup(auth_context.set_ctx, None)
db_api.update_job(
job_id,
{
'next_execution_time': datetime.utcnow() - timedelta(hours=1),
'status': status.PAUSED
}
)
resp = self.app.put_json(
url,
{'status': status.RUNNING},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn(
'Execution time must be at least 1 minute in the future',
resp.json['faultstring']
)
def test_update_recurring_job(self):
job_id = self.create_job(
self.function_id,
prefix='TestJobController',
first_execution_time=datetime.utcnow() + timedelta(hours=1),
next_execution_time=datetime.utcnow() + timedelta(hours=1),
pattern='0 */1 * * *',
status=status.RUNNING,
count=10
).id
req_body = {
'next_execution_time': str(
datetime.utcnow() + timedelta(hours=1.5)
),
'pattern': '1 */1 * * *'
}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
# Pause the job and resume with a valid next_execution_time
req_body = {
'status': status.PAUSED
}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
req_body = {
'status': status.RUNNING,
'next_execution_time': str(datetime.utcnow() + timedelta(hours=2)),
}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
# Pause the job and resume without specifying next_execution_time
auth_context.set_ctx(self.ctx)
self.addCleanup(auth_context.set_ctx, None)
db_api.update_job(
job_id,
{
'next_execution_time': datetime.utcnow() - timedelta(hours=1),
'status': status.PAUSED
}
)
req_body = {'status': status.RUNNING}
resp = self.app.put_json('/v1/jobs/%s' % job_id, req_body)
self.assertEqual(200, resp.status_int)
self._assertDictContainsSubset(resp.json, req_body)
def test_update_recurring_job_failed(self):
job_id = self.create_job(
self.function_id,
prefix='TestJobController',
first_execution_time=datetime.utcnow() + timedelta(hours=1),
next_execution_time=datetime.utcnow() + timedelta(hours=1),
pattern='0 */1 * * *',
status=status.RUNNING,
count=10
).id
url = '/v1/jobs/%s' % job_id
# Try to change job type
resp = self.app.put_json(
url,
{'pattern': ''},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn('Can not change job type.', resp.json['faultstring'])
# Pause the job and try to resume with an invalid next_execution_time
auth_context.set_ctx(self.ctx)
self.addCleanup(auth_context.set_ctx, None)
db_api.update_job(job_id, {'status': status.PAUSED})
resp = self.app.put_json(
url,
{
'status': status.RUNNING,
'next_execution_time': str(
datetime.utcnow() - timedelta(hours=1)
)
},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn(
'Execution time must be at least 1 minute in the future',
resp.json['faultstring']
)

View File

@ -14,7 +14,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import random
from oslo_config import cfg
@ -186,22 +185,18 @@ class DbTestCase(BaseTest):
return function
def create_job(self, function_id=None, prefix=None):
def create_job(self, function_id=None, prefix=None, **kwargs):
if not function_id:
function_id = self.create_function(prefix=prefix).id
job = db_api.create_job(
{
'name': self.rand_name('job', prefix=prefix),
'function_id': function_id,
'first_execution_time': datetime.utcnow(),
'next_execution_time': datetime.utcnow(),
'count': 1,
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': DEFAULT_PROJECT_ID,
'status': status.RUNNING
}
)
job_params = {
'name': self.rand_name('job', prefix=prefix),
'function_id': function_id,
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': DEFAULT_PROJECT_ID,
}
job_params.update(kwargs)
job = db_api.create_job(job_params)
return job

View File

@ -23,32 +23,46 @@ from qinling import exceptions as exc
from qinling.utils.openstack import keystone as keystone_utils
def validate_next_time(next_execution_time):
next_time = next_execution_time
if isinstance(next_execution_time, six.string_types):
try:
# We need naive datetime object.
next_time = parser.parse(next_execution_time, ignoretz=True)
except ValueError as e:
raise exc.InputException(str(e))
valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60)
if valid_min_time > next_time:
raise exc.InputException(
'Execution time must be at least 1 minute in the future.'
)
return next_time
def validate_pattern(pattern):
try:
croniter.croniter(pattern)
except (ValueError, KeyError):
raise exc.InputException(
'The specified pattern is not valid: {}'.format(pattern)
)
def validate_job(params):
first_time = params.get('first_execution_time')
pattern = params.get('pattern')
count = params.get('count')
start_time = timeutils.utcnow()
if isinstance(first_time, six.string_types):
try:
# We need naive datetime object.
first_time = parser.parse(first_time, ignoretz=True)
except ValueError as e:
raise exc.InputException(e.message)
if not (first_time or pattern):
raise exc.InputException(
'Pattern or first_execution_time must be specified.'
)
first_time = validate_next_time(first_time)
if first_time:
# first_time is assumed to be UTC time.
valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60)
if valid_min_time > first_time:
raise exc.InputException(
'first_execution_time must be at least 1 minute in the '
'future.'
)
if not pattern and count and count > 1:
raise exc.InputException(
'Pattern must be provided if count is greater than 1.'
@ -57,14 +71,9 @@ def validate_job(params):
next_time = first_time
if not (pattern or count):
count = 1
if pattern:
try:
croniter.croniter(pattern)
except (ValueError, KeyError):
raise exc.InputException(
'The specified pattern is not valid: {}'.format(pattern)
)
if pattern:
validate_pattern(pattern)
if not first_time:
next_time = croniter.croniter(pattern, start_time).get_next(
datetime.datetime