Add jobs api

This is part-1 of jobs support in Qinling.

With job API, users can create a job with an existing function
that can be ran in cron job's fashion, but the job definition
has more capabilities than cron.

Change-Id: I0cb885dd6005ba024e3816bae9b79b0d3f35d335
This commit is contained in:
Lingxian Kong 2017-07-20 00:36:47 +12:00
parent ebe601b8f5
commit 5baa3f1e45
15 changed files with 471 additions and 64 deletions

View File

@ -0,0 +1,137 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import croniter
import datetime
from dateutil import parser
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from pecan import rest
import six
import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling.utils.openstack import keystone as keystone_util
from qinling.utils import rest_utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
POST_REQUIRED = set(['function_id'])
class JobsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
resources.Job,
body=resources.Job,
status_code=201
)
def post(self, job):
"""Creates a new job."""
params = job.to_dict()
if not POST_REQUIRED.issubset(set(params.keys())):
raise exc.InputException(
'Required param is missing. Required: %s' % POST_REQUIRED
)
first_time = params.get('first_execution_time')
pattern = params.get('pattern')
count = params.get('count')
start_time = timeutils.utcnow()
if isinstance(first_time, six.string_types):
try:
# We need naive datetime object.
first_time = parser.parse(first_time, ignoretz=True)
except ValueError as e:
raise exc.InputException(e.message)
if not (first_time or pattern):
raise exc.InputException(
'Pattern or first_execution_time must be specified.'
)
if first_time:
# first_time is assumed to be UTC time.
valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60)
if valid_min_time > first_time:
raise exc.InputException(
'first_execution_time must be at least 1 minute in the '
'future.'
)
if not pattern and count and count > 1:
raise exc.InputException(
'Pattern must be provided if count is greater than 1.'
)
next_time = first_time
if not (pattern or count):
count = 1
if pattern:
try:
croniter.croniter(pattern)
except (ValueError, KeyError):
raise exc.InputException(
'The specified pattern is not valid: {}'.format(pattern)
)
if not first_time:
next_time = croniter.croniter(pattern, start_time).get_next(
datetime.datetime
)
LOG.info("Creating job. [job=%s]", params)
with db_api.transaction():
db_api.get_function(params['function_id'])
values = {
'name': params.get('name'),
'pattern': pattern,
'first_execution_time': first_time,
'next_execution_time': next_time,
'count': count,
'function_id': params['function_id'],
'function_input': params.get('function_input') or {}
}
if cfg.CONF.pecan.auth_enable:
values['trust_id'] = keystone_util.create_trust().id
try:
db_job = db_api.create_job(values)
except Exception:
# Delete trust before raising exception.
keystone_util.delete_trust(values.get('trust_id'))
raise
return resources.Job.from_dict(db_job.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete job."""
LOG.info("Delete job [id=%s]" % id)
job = db_api.get_job(id)
trust_id = job.trust_id
keystone_util.delete_trust(trust_id)
db_api.delete_job(id)

View File

@ -170,6 +170,7 @@ class Function(Resource):
code = types.jsontype
entry = wtypes.text
count = wsme.wsattr(int, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wtypes.text
updated_at = wtypes.text
@ -185,6 +186,7 @@ class Function(Resource):
code={'zip': True},
entry='main',
count=10,
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
@ -263,6 +265,7 @@ class Execution(Resource):
sync = bool
input = types.jsontype
output = wsme.wsattr(types.jsontype, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
@ -275,6 +278,7 @@ class Execution(Resource):
sync=True,
input={'data': 'hello, world'},
output={'result': 'hello, world'},
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
@ -299,3 +303,54 @@ class Executions(ResourceList):
)
return sample
class Job(Resource):
id = types.uuid
name = wtypes.text
function_id = types.uuid
function_input = types.jsontype
pattern = wtypes.text
count = wtypes.IntegerType(minimum=1)
first_execution_time = wtypes.text
next_execution_time = wtypes.text
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
name='my_job',
function_id='123e4567-e89b-12d3-a456-426655440000',
function_input={'data': 'hello, world'},
pattern='* * * * *',
count=42,
first_execution_time='',
next_execution_time='',
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Jobs(ResourceList):
jobs = [Job]
def __init__(self, **kwargs):
self._type = 'jobs'
super(Jobs, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.jobs = [Job.sample()]
sample.next = (
"http://localhost:7070/v1/jobs?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000"
)
return sample

View File

@ -18,6 +18,7 @@ import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import execution
from qinling.api.controllers.v1 import function
from qinling.api.controllers.v1 import job
from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import runtime
@ -37,6 +38,7 @@ class Controller(object):
functions = function.FunctionsController()
runtimes = runtime.RuntimesController()
executions = execution.ExecutionsController()
jobs = job.JobsController()
@wsme_pecan.wsexpose(RootResource)
def index(self):

View File

@ -159,7 +159,11 @@ _DEFAULT_LOG_LEVELS = [
'oslo_service.periodic_task=INFO',
'oslo_service.loopingcall=INFO',
'oslo_db=WARN',
'oslo_concurrency.lockutils=WARN'
'oslo_concurrency.lockutils=WARN',
'kubernetes.client.rest=DEBUG',
'keystoneclient=INFO',
'requests.packages.urllib3.connectionpool=CRITICAL',
'urllib3.connectionpool=CRITICAL'
]

View File

@ -59,9 +59,6 @@ def delete_all():
delete_runtimes(insecure=True)
# Function
def get_function(id):
return IMPL.get_function(id)
@ -90,9 +87,6 @@ def delete_function(id):
IMPL.delete_function(id)
# Function
def create_runtime(values):
return IMPL.create_runtime(values)
@ -117,9 +111,6 @@ def delete_runtimes(**kwargs):
return IMPL.delete_runtimes(**kwargs)
# Execution
def create_execution(values):
return IMPL.create_execution(values)
@ -154,3 +145,15 @@ def get_function_service_mappings(**kwargs):
def delete_function_service_mapping(id):
return IMPL.delete_function_service_mapping(id)
def create_job(values):
return IMPL.create_job(values)
def get_job(id):
return IMPL.get_job(id)
def delete_job(id):
return IMPL.delete_job(id)

View File

@ -366,3 +366,34 @@ def delete_function_service_mapping(id, session=None):
return
session.delete(mapping)
@db_base.session_aware()
def create_job(values, session=None):
job = models.Job()
job.update(values)
try:
job.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for Job: %s" % e.columns
)
return job
@db_base.session_aware()
def get_job(id, session=None):
job = _get_db_object_by_id(models.Job, id)
if not job:
raise exc.DBEntityNotFoundError("Job not found [id=%s]" % id)
return job
@db_base.session_aware()
def delete_job(id, session=None):
job = get_job(id)
session.delete(job)

View File

@ -107,3 +107,22 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
info={"check_ifexists": True}
)
op.create_table(
'job',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('function_input', st.JsonLongDictType(), nullable=True),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('pattern', sa.String(length=32), nullable=False),
sa.Column('first_execution_time', sa.DateTime(), nullable=True),
sa.Column('next_execution_time', sa.DateTime(), nullable=False),
sa.Column('count', sa.Integer(), nullable=True),
sa.Column('trust_id', sa.String(length=80), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['function_id'], [u'function.id']),
info={"check_ifexists": True}
)

View File

@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from oslo_db.sqlalchemy import models as oslo_models
import sqlalchemy as sa
from sqlalchemy.ext import declarative
@ -76,8 +74,8 @@ class _QinlingModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
if col.name not in unloaded and hasattr(self, col.name):
d[col.name] = getattr(self, col.name)
datetime_to_str(d, 'created_at')
datetime_to_str(d, 'updated_at')
common.datetime_to_str(d, 'created_at')
common.datetime_to_str(d, 'updated_at')
return d
@ -101,12 +99,6 @@ class _QinlingModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
return '%s %s' % (type(self).__name__, self.to_dict().__repr__())
def datetime_to_str(dct, attr_name):
if (dct.get(attr_name) is not None and
not isinstance(dct.get(attr_name), six.string_types)):
dct[attr_name] = dct[attr_name].isoformat(' ')
QinlingModelBase = declarative.declarative_base(cls=_QinlingModelBase)

View File

@ -13,9 +13,11 @@
# limitations under the License.
import sqlalchemy as sa
from sqlalchemy.orm import relationship
from qinling.db.sqlalchemy import model_base
from qinling.db.sqlalchemy import types as st
from qinling.utils import common
class Function(model_base.QinlingSecureModelBase):
@ -71,3 +73,31 @@ class Execution(model_base.QinlingSecureModelBase):
sync = sa.Column(sa.BOOLEAN, default=True)
input = sa.Column(st.JsonLongDictType())
output = sa.Column(st.JsonLongDictType())
class Job(model_base.QinlingSecureModelBase):
__tablename__ = 'job'
name = sa.Column(sa.String(255), nullable=True)
pattern = sa.Column(
sa.String(32),
nullable=True,
# Set default to 'never'.
default='0 0 30 2 0'
)
first_execution_time = sa.Column(sa.DateTime, nullable=True)
next_execution_time = sa.Column(sa.DateTime, nullable=False)
count = sa.Column(sa.Integer)
function_id = sa.Column(
sa.String(36),
sa.ForeignKey(Function.id)
)
function = relationship('Function', lazy='joined')
function_input = sa.Column(st.JsonDictType())
trust_id = sa.Column(sa.String(80))
def to_dict(self):
d = super(Job, self).to_dict()
common.datetime_to_str(d, 'first_execution_time')
common.datetime_to_str(d, 'next_execution_time')
return d

View File

@ -0,0 +1,48 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
from qinling.tests.unit.api import base
class TestJobController(base.APITest):
def setUp(self):
super(TestJobController, self).setUp()
# Insert a function record in db for each test case. The data will be
# removed automatically in db clean up.
db_function = self.create_function(prefix='TestJobController')
self.function_id = db_function.id
def test_post(self):
body = {
'name': self.rand_name('job', prefix='TestJobController'),
'first_execution_time': str(
datetime.utcnow() + timedelta(hours=1)),
'function_id': self.function_id
}
resp = self.app.post_json('/v1/jobs', body)
self.assertEqual(201, resp.status_int)
def test_delete(self):
job_id = self.create_job(
self.function_id, prefix='TestJobController'
).id
resp = self.app.delete('/v1/jobs/%s' % job_id)
self.assertEqual(204, resp.status_int)

View File

@ -26,16 +26,7 @@ class TestRuntimeController(base.APITest):
# Insert a runtime record in db. The data will be removed in db clean
# up.
self.db_runtime = db_api.create_runtime(
{
'name': 'test_runtime',
'image': 'python2.7',
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.AVAILABLE
}
)
self.db_runtime = self.create_runtime(prefix='TestRuntimeController')
self.runtime_id = self.db_runtime.id
def test_get(self):
@ -43,8 +34,8 @@ class TestRuntimeController(base.APITest):
expected = {
'id': self.runtime_id,
"image": "python2.7",
"name": "test_runtime",
"image": self.db_runtime.image,
"name": self.db_runtime.name,
"project_id": test_base.DEFAULT_PROJECT_ID,
"status": status.AVAILABLE
}
@ -57,8 +48,8 @@ class TestRuntimeController(base.APITest):
expected = {
'id': self.runtime_id,
"image": "python2.7",
"name": "test_runtime",
"image": self.db_runtime.image,
"name": self.db_runtime.name,
"project_id": test_base.DEFAULT_PROJECT_ID,
"status": status.AVAILABLE
}
@ -72,8 +63,8 @@ class TestRuntimeController(base.APITest):
@mock.patch('qinling.rpc.EngineClient.create_runtime')
def test_post(self, mock_create_time):
body = {
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
'name': self.rand_name('runtime', prefix='TestRuntimeController'),
'image': self.rand_name('image', prefix='TestRuntimeController'),
}
resp = self.app.post_json('/v1/runtimes', body)
@ -83,36 +74,13 @@ class TestRuntimeController(base.APITest):
@mock.patch('qinling.rpc.EngineClient.delete_runtime')
def test_delete(self, mock_delete_runtime):
db_runtime = db_api.create_runtime(
{
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.AVAILABLE
}
)
runtime_id = db_runtime.id
resp = self.app.delete('/v1/runtimes/%s' % runtime_id)
resp = self.app.delete('/v1/runtimes/%s' % self.runtime_id)
self.assertEqual(204, resp.status_int)
mock_delete_runtime.assert_called_once_with(runtime_id)
mock_delete_runtime.assert_called_once_with(self.runtime_id)
def test_delete_runtime_with_function_associated(self):
db_api.create_function(
{
'name': self.rand_name('function', prefix='APITest'),
'runtime_id': self.runtime_id,
'code': {},
'entry': 'main.main',
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': test_base.DEFAULT_PROJECT_ID,
}
)
self.create_function(self.runtime_id, prefix='TestRuntimeController')
resp = self.app.delete(
'/v1/runtimes/%s' % self.runtime_id, expect_errors=True
)
@ -130,8 +98,10 @@ class TestRuntimeController(base.APITest):
def test_put_image_runtime_not_available(self):
db_runtime = db_api.create_runtime(
{
'name': self.rand_name('runtime', prefix='APITest'),
'image': self.rand_name('image', prefix='APITest'),
'name': self.rand_name(
'runtime', prefix='TestRuntimeController'),
'image': self.rand_name(
'image', prefix='TestRuntimeController'),
'project_id': test_base.DEFAULT_PROJECT_ID,
'status': status.CREATING
}

View File

@ -14,6 +14,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
import random
from oslo_config import cfg
@ -22,6 +23,7 @@ from oslotest import base
from qinling import context as auth_context
from qinling.db import api as db_api
from qinling.db.sqlalchemy import sqlite_lock
from qinling import status
from qinling.tests.unit import config as test_config
test_config.parse_args()
@ -151,3 +153,54 @@ class DbTestCase(BaseTest):
def _clean_db(self):
db_api.delete_all()
sqlite_lock.cleanup()
def create_runtime(self, prefix=None):
runtime = db_api.create_runtime(
{
'name': self.rand_name('runtime', prefix=prefix),
'image': self.rand_name('image', prefix=prefix),
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': DEFAULT_PROJECT_ID,
'status': status.AVAILABLE
}
)
return runtime
def create_function(self, runtime_id=None, prefix=None):
if not runtime_id:
runtime_id = self.create_runtime(prefix).id
function = db_api.create_function(
{
'name': self.rand_name('function', prefix=prefix),
'runtime_id': runtime_id,
'code': {"source": "package"},
'entry': 'main.main',
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': DEFAULT_PROJECT_ID,
}
)
return function
def create_job(self, function_id=None, prefix=None):
if not function_id:
function_id = self.create_function(prefix=prefix).id
job = db_api.create_job(
{
'name': self.rand_name('job', prefix=prefix),
'function_id': function_id,
'first_execution_time': datetime.utcnow(),
'next_execution_time': datetime.utcnow(),
'count': 1,
# 'auth_enable' is disabled by default, we create runtime for
# default tenant.
'project_id': DEFAULT_PROJECT_ID,
}
)
return job

View File

@ -15,6 +15,7 @@ import functools
import warnings
from oslo_utils import uuidutils
import six
def convert_dict_to_string(d):
@ -23,6 +24,13 @@ def convert_dict_to_string(d):
return ','.join(temp_list)
def datetime_to_str(dct, attr_name):
"""Convert datetime object in dict to string."""
if (dct.get(attr_name) is not None and
not isinstance(dct.get(attr_name), six.string_types)):
dct[attr_name] = dct[attr_name].isoformat(' ')
def generate_unicode_uuid(dashed=True):
return uuidutils.generate_uuid(dashed=dashed)

View File

@ -14,11 +14,15 @@
from keystoneauth1.identity import generic
from keystoneauth1 import session
from keystoneclient.v3 import client as ks_client
from oslo_config import cfg
from oslo_log import log as logging
import swiftclient
from qinling import context
from qinling.utils import common
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -33,9 +37,58 @@ def _get_user_keystone_session():
return session.Session(auth=auth, verify=False)
@common.disable_ssl_warnings
def get_swiftclient():
session = _get_user_keystone_session()
conn = swiftclient.Connection(session=session)
return conn
@common.disable_ssl_warnings
def get_keystone_client():
session = _get_user_keystone_session()
keystone = ks_client.Client(session=session)
return keystone
@common.disable_ssl_warnings
def _get_admin_user_id():
auth_url = CONF.keystone_authtoken.auth_uri
client = ks_client.Client(
username=CONF.keystone_authtoken.username,
password=CONF.keystone_authtoken.password,
project_name=CONF.keystone_authtoken.project_name,
auth_url=auth_url,
)
return client.user_id
@common.disable_ssl_warnings
def create_trust():
client = get_keystone_client()
ctx = context.get_ctx()
trustee_id = _get_admin_user_id()
return client.trusts.create(
trustor_user=ctx.user,
trustee_user=trustee_id,
impersonation=True,
role_names=ctx.roles,
project=ctx.tenant
)
@common.disable_ssl_warnings
def delete_trust(trust_id):
if not trust_id:
return
client = get_keystone_client()
try:
client.trusts.delete(trust_id)
except Exception as e:
LOG.warning("Failed to delete trust [id=%s]: %s" % (trust_id, e))

View File

@ -26,3 +26,5 @@ WSME>=0.8 # MIT
kubernetes>=1.0.0b1 # Apache-2.0
PyYAML>=3.10.0 # MIT
python-swiftclient>=3.2.0 # Apache-2.0
croniter>=0.3.4 # MIT License
python-dateutil>=2.4.2 # BSD