Implement periodic task for jobs

Change-Id: I9093333cc9b1a64b5e386af87db80df5aa4dfb4b
This commit is contained in:
Lingxian Kong 2017-07-24 16:45:36 +12:00
parent 5baa3f1e45
commit 997c78131e
18 changed files with 483 additions and 133 deletions

View File

@ -13,11 +13,15 @@
# limitations under the License. # limitations under the License.
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging
import pecan import pecan
from qinling.api import access_control from qinling.api import access_control
from qinling import context as ctx from qinling import context as ctx
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling.services import periodics
LOG = logging.getLogger(__name__)
def get_pecan_config(): def get_pecan_config():
@ -39,10 +43,13 @@ def get_pecan_config():
def setup_app(config=None): def setup_app(config=None):
if not config: if not config:
config = get_pecan_config() config = get_pecan_config()
app_conf = dict(config.app) app_conf = dict(config.app)
db_api.setup_db() db_api.setup_db()
LOG.info('Starting periodic tasks...')
periodics.start_job_handler()
app = pecan.make_app( app = pecan.make_app(
app_conf.pop('root'), app_conf.pop('root'),
hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()], hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()],

View File

@ -14,15 +14,13 @@
from oslo_log import log as logging from oslo_log import log as logging
from pecan import rest from pecan import rest
import requests
import wsmeext.pecan as wsme_pecan import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types from qinling.api.controllers.v1 import types
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import rpc from qinling import rpc
from qinling import status from qinling.utils import executions
from qinling.utils import rest_utils from qinling.utils import rest_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -42,55 +40,10 @@ class ExecutionsController(rest.RestController):
) )
def post(self, execution): def post(self, execution):
params = execution.to_dict() params = execution.to_dict()
LOG.info("Creating execution. [execution=%s]", params) LOG.info("Creating execution. [execution=%s]", params)
function_id = params['function_id'] db_model = executions.create_execution(self.engine_client, params)
is_sync = params.get('sync', True) if params.get('sync', True):
func_url = None
with db_api.transaction():
func_db = db_api.get_function(function_id)
# Increase function invoke count, the updated_at field will be also
# updated.
func_db.count = func_db.count + 1
try:
# Check if the service url is existing.
mapping_db = db_api.get_function_service_mapping(function_id)
LOG.info('Found Service url for function: %s', function_id)
func_url = '%s/execute' % mapping_db.service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
except exc.DBEntityNotFoundError:
pass
if func_url:
r = requests.post(func_url, json=params.get('input'))
params.update(
{'status': 'success', 'output': {'result': r.json()}}
)
else:
runtime_id = func_db.runtime_id
runtime_db = db_api.get_runtime(runtime_id)
if runtime_db.status != status.AVAILABLE:
raise exc.RuntimeNotAvailableException(
'Runtime %s is not available.' % runtime_id
)
params.update({'status': status.RUNNING})
db_model = db_api.create_execution(params)
if not func_url:
self.engine_client.create_execution(
db_model.id, function_id, runtime_id,
input=params.get('input'),
is_sync=is_sync
)
if is_sync:
db_model = db_api.get_execution(db_model.id) db_model = db_api.get_execution(db_model.id)
return resources.Execution.from_dict(db_model.to_dict()) return resources.Execution.from_dict(db_model.to_dict())

View File

@ -12,21 +12,17 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import croniter
import datetime
from dateutil import parser
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils
from pecan import rest from pecan import rest
import six
import wsmeext.pecan as wsme_pecan import wsmeext.pecan as wsme_pecan
from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import resources
from qinling.api.controllers.v1 import types from qinling.api.controllers.v1 import types
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling import exceptions as exc from qinling import exceptions as exc
from qinling import status
from qinling.utils import jobs
from qinling.utils.openstack import keystone as keystone_util from qinling.utils.openstack import keystone as keystone_util
from qinling.utils import rest_utils from qinling.utils import rest_utils
@ -51,52 +47,7 @@ class JobsController(rest.RestController):
'Required param is missing. Required: %s' % POST_REQUIRED 'Required param is missing. Required: %s' % POST_REQUIRED
) )
first_time = params.get('first_execution_time') first_time, next_time, count = jobs.validate_job(params)
pattern = params.get('pattern')
count = params.get('count')
start_time = timeutils.utcnow()
if isinstance(first_time, six.string_types):
try:
# We need naive datetime object.
first_time = parser.parse(first_time, ignoretz=True)
except ValueError as e:
raise exc.InputException(e.message)
if not (first_time or pattern):
raise exc.InputException(
'Pattern or first_execution_time must be specified.'
)
if first_time:
# first_time is assumed to be UTC time.
valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60)
if valid_min_time > first_time:
raise exc.InputException(
'first_execution_time must be at least 1 minute in the '
'future.'
)
if not pattern and count and count > 1:
raise exc.InputException(
'Pattern must be provided if count is greater than 1.'
)
next_time = first_time
if not (pattern or count):
count = 1
if pattern:
try:
croniter.croniter(pattern)
except (ValueError, KeyError):
raise exc.InputException(
'The specified pattern is not valid: {}'.format(pattern)
)
if not first_time:
next_time = croniter.croniter(pattern, start_time).get_next(
datetime.datetime
)
LOG.info("Creating job. [job=%s]", params) LOG.info("Creating job. [job=%s]", params)
with db_api.transaction(): with db_api.transaction():
@ -104,12 +55,13 @@ class JobsController(rest.RestController):
values = { values = {
'name': params.get('name'), 'name': params.get('name'),
'pattern': pattern, 'pattern': params.get('pattern'),
'first_execution_time': first_time, 'first_execution_time': first_time,
'next_execution_time': next_time, 'next_execution_time': next_time,
'count': count, 'count': count,
'function_id': params['function_id'], 'function_id': params['function_id'],
'function_input': params.get('function_input') or {} 'function_input': params.get('function_input') or {},
'status': status.RUNNING
} }
if cfg.CONF.pecan.auth_enable: if cfg.CONF.pecan.auth_enable:
@ -129,9 +81,22 @@ class JobsController(rest.RestController):
def delete(self, id): def delete(self, id):
"""Delete job.""" """Delete job."""
LOG.info("Delete job [id=%s]" % id) LOG.info("Delete job [id=%s]" % id)
jobs.delete_job(id)
job = db_api.get_job(id) @rest_utils.wrap_wsme_controller_exception
trust_id = job.trust_id @wsme_pecan.wsexpose(resources.Job, types.uuid)
def get(self, id):
LOG.info("Fetch job [id=%s]", id)
job_db = db_api.get_job(id)
keystone_util.delete_trust(trust_id) return resources.Job.from_dict(job_db.to_dict())
db_api.delete_job(id)
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.Jobs)
def get_all(self):
LOG.info("Get all jobs.")
jobs = [resources.Job.from_dict(db_model.to_dict())
for db_model in db_api.get_jobs()]
return resources.Jobs(jobs=jobs)

View File

@ -261,6 +261,7 @@ class Runtimes(ResourceList):
class Execution(Resource): class Execution(Resource):
id = types.uuid id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True) function_id = wsme.wsattr(types.uuid, mandatory=True)
description = wtypes.text
status = wsme.wsattr(wtypes.text, readonly=True) status = wsme.wsattr(wtypes.text, readonly=True)
sync = bool sync = bool
input = types.jsontype input = types.jsontype
@ -274,6 +275,7 @@ class Execution(Resource):
return cls( return cls(
id='123e4567-e89b-12d3-a456-426655440000', id='123e4567-e89b-12d3-a456-426655440000',
function_id='123e4567-e89b-12d3-a456-426655440000', function_id='123e4567-e89b-12d3-a456-426655440000',
description='this is the first execution.',
status='success', status='success',
sync=True, sync=True,
input={'data': 'hello, world'}, input={'data': 'hello, world'},
@ -310,8 +312,9 @@ class Job(Resource):
name = wtypes.text name = wtypes.text
function_id = types.uuid function_id = types.uuid
function_input = types.jsontype function_input = types.jsontype
status = wsme.wsattr(wtypes.text, readonly=True)
pattern = wtypes.text pattern = wtypes.text
count = wtypes.IntegerType(minimum=1) count = int
first_execution_time = wtypes.text first_execution_time = wtypes.text
next_execution_time = wtypes.text next_execution_time = wtypes.text
project_id = wsme.wsattr(wtypes.text, readonly=True) project_id = wsme.wsattr(wtypes.text, readonly=True)
@ -325,8 +328,9 @@ class Job(Resource):
name='my_job', name='my_job',
function_id='123e4567-e89b-12d3-a456-426655440000', function_id='123e4567-e89b-12d3-a456-426655440000',
function_input={'data': 'hello, world'}, function_input={'data': 'hello, world'},
status='done',
pattern='* * * * *', pattern='* * * * *',
count=42, count=0,
first_execution_time='', first_execution_time='',
next_execution_time='', next_execution_time='',
project_id='default', project_id='default',

View File

@ -80,9 +80,11 @@ def set_ctx(new_ctx):
class Context(oslo_context.RequestContext): class Context(oslo_context.RequestContext):
_session = None def __init__(self, is_trust_scoped=False, trust_id=None, is_admin=False,
**kwargs):
self.is_trust_scoped = is_trust_scoped
self.trust_id = trust_id
def __init__(self, is_admin=False, **kwargs):
super(Context, self).__init__(is_admin=is_admin, **kwargs) super(Context, self).__init__(is_admin=is_admin, **kwargs)
@property @property
@ -92,6 +94,40 @@ class Context(oslo_context.RequestContext):
else: else:
return DEFAULT_PROJECT_ID return DEFAULT_PROJECT_ID
def convert_to_dict(self):
"""Return a dictionary of context attributes.
Use get_logging_values() instead of to_dict() from parent class to get
more information from the context. This method is not named "to_dict"
to avoid recursive calling.
"""
ctx_dict = self.get_logging_values()
ctx_dict.update(
{
'is_trust_scoped': self.is_trust_scoped,
'trust_id': self.trust_id,
}
)
return ctx_dict
@classmethod
def from_dict(cls, values, **kwargs):
"""Construct a context object from a provided dictionary."""
kwargs.setdefault(
'is_trust_scoped', values.get('is_trust_scoped', False)
)
kwargs.setdefault('trust_id', values.get('trust_id'))
return super(Context, cls).from_dict(values, **kwargs)
@classmethod
def from_environ(cls, env):
context = super(Context, cls).from_environ(env)
context.is_admin = True if 'admin' in context.roles else False
return context
class ContextHook(hooks.PecanHook): class ContextHook(hooks.PecanHook):
def before(self, state): def before(self, state):

View File

@ -59,6 +59,10 @@ def delete_all():
delete_runtimes(insecure=True) delete_runtimes(insecure=True)
def conditional_update(model, values, expected_values, **kwargs):
return IMPL.conditional_update(model, values, expected_values, **kwargs)
def get_function(id): def get_function(id):
return IMPL.get_function(id) return IMPL.get_function(id)
@ -155,5 +159,13 @@ def get_job(id):
return IMPL.get_job(id) return IMPL.get_job(id)
def get_next_jobs(before):
return IMPL.get_next_jobs(before)
def delete_job(id): def delete_job(id):
return IMPL.delete_job(id) return IMPL.delete_job(id)
def get_jobs():
return IMPL.get_jobs()

View File

@ -28,6 +28,7 @@ from qinling.db.sqlalchemy import filters as db_filters
from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import model_base
from qinling.db.sqlalchemy import models from qinling.db.sqlalchemy import models
from qinling import exceptions as exc from qinling import exceptions as exc
from qinling import status
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -190,6 +191,21 @@ def _delete_all(model, insecure=False, **kwargs):
query.filter_by(**kwargs).delete(synchronize_session=False) query.filter_by(**kwargs).delete(synchronize_session=False)
@db_base.session_aware()
def conditional_update(model, values, expected_values, insecure=False,
filters={}, session=None):
"""Compare-and-swap conditional update SQLAlchemy implementation."""
filters.update(expected_values)
query = (db_base.model_query(model) if insecure else _secure_query(model))
query = db_filters.apply_filters(query, model, **filters)
update_args = {'synchronize_session': False}
# Return True if we were able to change any DB entry, False otherwise
result = query.update(values, **update_args)
return 0 != result
@db_base.session_aware() @db_base.session_aware()
def get_function(id, session=None): def get_function(id, session=None):
function = _get_db_object_by_id(models.Function, id) function = _get_db_object_by_id(models.Function, id)
@ -394,6 +410,24 @@ def get_job(id, session=None):
@db_base.session_aware() @db_base.session_aware()
def delete_job(id, session=None): def delete_job(id, session=None):
job = get_job(id) get_job(id)
session.delete(job) # Delete the job by ID and get the affected row count.
table = models.Job.__table__
result = session.execute(table.delete().where(table.c.id == id))
return result.rowcount
@db_base.session_aware()
def get_next_jobs(before, session=None):
return _get_collection(
models.Job, insecure=True, sort_keys=['next_execution_time'],
sort_dirs=['asc'], next_execution_time={'lt': before},
status={'neq': status.DONE}
)
@db_base.session_aware()
def get_jobs(session=None, **kwargs):
return _get_collection_sorted_by_time(models.Job, **kwargs)

View File

@ -100,6 +100,7 @@ def upgrade():
sa.Column('project_id', sa.String(length=80), nullable=False), sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False), sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('status', sa.String(length=32), nullable=False), sa.Column('status', sa.String(length=32), nullable=False),
sa.Column('sync', sa.BOOLEAN, nullable=False), sa.Column('sync', sa.BOOLEAN, nullable=False),
sa.Column('input', st.JsonLongDictType(), nullable=True), sa.Column('input', st.JsonLongDictType(), nullable=True),
@ -116,6 +117,7 @@ def upgrade():
sa.Column('id', sa.String(length=36), nullable=False), sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('function_input', st.JsonLongDictType(), nullable=True), sa.Column('function_input', st.JsonLongDictType(), nullable=True),
sa.Column('status', sa.String(length=32), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True), sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('pattern', sa.String(length=32), nullable=False), sa.Column('pattern', sa.String(length=32), nullable=False),
sa.Column('first_execution_time', sa.DateTime(), nullable=True), sa.Column('first_execution_time', sa.DateTime(), nullable=True),

View File

@ -73,6 +73,7 @@ class Execution(model_base.QinlingSecureModelBase):
sync = sa.Column(sa.BOOLEAN, default=True) sync = sa.Column(sa.BOOLEAN, default=True)
input = sa.Column(st.JsonLongDictType()) input = sa.Column(st.JsonLongDictType())
output = sa.Column(st.JsonLongDictType()) output = sa.Column(st.JsonLongDictType())
description = sa.Column(sa.String(255))
class Job(model_base.QinlingSecureModelBase): class Job(model_base.QinlingSecureModelBase):
@ -85,6 +86,7 @@ class Job(model_base.QinlingSecureModelBase):
# Set default to 'never'. # Set default to 'never'.
default='0 0 30 2 0' default='0 0 30 2 0'
) )
status = sa.Column(sa.String(32), nullable=False)
first_execution_time = sa.Column(sa.DateTime, nullable=True) first_execution_time = sa.Column(sa.DateTime, nullable=True)
next_execution_time = sa.Column(sa.DateTime, nullable=False) next_execution_time = sa.Column(sa.DateTime, nullable=False)
count = sa.Column(sa.Integer) count = sa.Column(sa.Integer)

View File

@ -40,7 +40,7 @@ class EngineService(service.Service):
db_api.setup_db() db_api.setup_db()
LOG.info('Starting periodic tasks...') LOG.info('Starting periodic tasks...')
periodics.start(orchestrator) periodics.start_function_mapping_handler(orchestrator)
topic = CONF.engine.topic topic = CONF.engine.topic
server = CONF.engine.host server = CONF.engine.host

View File

@ -101,7 +101,7 @@ class ContextSerializer(messaging.Serializer):
return self._base.deserialize_entity(context, entity) return self._base.deserialize_entity(context, entity)
def serialize_context(self, context): def serialize_context(self, context):
return context.to_dict() return context.convert_to_dict()
def deserialize_context(self, context): def deserialize_context(self, context):
qinling_ctx = ctx.Context.from_dict(context) qinling_ctx = ctx.Context.from_dict(context)

View File

@ -18,14 +18,21 @@ from datetime import timedelta
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import threadgroup from oslo_service import threadgroup
from oslo_utils import timeutils
from qinling import context from qinling import context
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling.db.sqlalchemy import models
from qinling import rpc from qinling import rpc
from qinling import status
from qinling.utils import constants
from qinling.utils import executions
from qinling.utils import jobs
from qinling.utils.openstack import keystone as keystone_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
_THREAD_GROUP = None _periodic_tasks = {}
def handle_function_service_expiration(ctx, engine_client, orchestrator): def handle_function_service_expiration(ctx, engine_client, orchestrator):
@ -60,23 +67,119 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator):
db_api.delete_function_service_mapping(m.function_id) db_api.delete_function_service_mapping(m.function_id)
def start(orchestrator): def handle_job(engine_client):
global _THREAD_GROUP for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
_THREAD_GROUP = threadgroup.ThreadGroup() LOG.debug("Processing job: %s, function: %s", job.id, job.function_id)
try:
# Setup context before schedule job.
ctx = keystone_utils.create_trust_context(
job.trust_id, job.project_id
)
context.set_ctx(ctx)
if (job.count is not None and job.count > 0):
job.count -= 1
# Job delete/update is done using UPDATE ... FROM ... WHERE
# non-locking clause.
if job.count == 0:
modified = db_api.conditional_update(
models.Job,
{
'status': status.DONE,
'trust_id': '',
'count': 0
},
{
'id': job.id,
'status': status.RUNNING
},
insecure=True,
)
else:
next_time = jobs.get_next_execution_time(
job.pattern,
job.next_execution_time
)
modified = db_api.conditional_update(
models.Job,
{
'next_execution_time': next_time,
'count': job.count
},
{
'id': job.id,
'next_execution_time': job.next_execution_time
},
insecure=True,
)
if not modified:
LOG.warning(
'Job %s has been already handled by another periodic '
'task.', job.id
)
continue
LOG.debug(
"Starting to execute function %s by job %s",
job.function_id, job.id
)
exe_param = {
'function_id': job.function_id,
'input': job.function_input,
'sync': False,
'description': constants.EXECUTION_BY_JOB % job.id
}
executions.create_execution(engine_client, exe_param)
except Exception:
LOG.exception("Failed to process job %s", job.id)
finally:
context.set_ctx(None)
def start_function_mapping_handler(orchestrator):
tg = threadgroup.ThreadGroup(1)
engine_client = rpc.get_engine_client() engine_client = rpc.get_engine_client()
_THREAD_GROUP.add_timer( tg.add_timer(
300, 300,
handle_function_service_expiration, handle_function_service_expiration,
ctx=context.Context(), ctx=context.Context(),
engine_client=engine_client, engine_client=engine_client,
orchestrator=orchestrator orchestrator=orchestrator
) )
_periodic_tasks[constants.PERIODIC_FUNC_MAPPING_HANDLER] = tg
LOG.info('Function mapping handler started.')
def stop(): def start_job_handler():
global _THREAD_GROUP tg = threadgroup.ThreadGroup(1)
engine_client = rpc.get_engine_client()
if _THREAD_GROUP: tg.add_timer(
_THREAD_GROUP.stop() 3,
handle_job,
engine_client=engine_client,
)
_periodic_tasks[constants.PERIODIC_JOB_HANDLER] = tg
LOG.info('Job handler started.')
def stop(task=None):
if not task:
for name, tg in _periodic_tasks.items():
LOG.info('Stopping periodic task: %s', name)
tg.stop()
del _periodic_tasks[name]
else:
tg = _periodic_tasks.get(task)
if tg:
LOG.info('Stopping periodic task: %s', task)
tg.stop()
del _periodic_tasks[task]

View File

@ -18,3 +18,4 @@ UPGRADING = 'upgrading'
ERROR = 'error' ERROR = 'error'
DELETING = 'deleting' DELETING = 'deleting'
RUNNING = 'running' RUNNING = 'running'
DONE = 'done'

View File

@ -200,6 +200,7 @@ class DbTestCase(BaseTest):
# 'auth_enable' is disabled by default, we create runtime for # 'auth_enable' is disabled by default, we create runtime for
# default tenant. # default tenant.
'project_id': DEFAULT_PROJECT_ID, 'project_id': DEFAULT_PROJECT_ID,
'status': status.RUNNING
} }
) )

View File

@ -0,0 +1,18 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
EXECUTION_BY_JOB = 'Created by Job %s'
PERIODIC_JOB_HANDLER = 'job_handler'
PERIODIC_FUNC_MAPPING_HANDLER = 'function_mapping_handler'

View File

@ -0,0 +1,70 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import requests
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import status
LOG = logging.getLogger(__name__)
def create_execution(engine_client, execution):
function_id = execution['function_id']
is_sync = execution.get('sync', True)
func_url = None
with db_api.transaction():
func_db = db_api.get_function(function_id)
# Increase function invoke count, the updated_at field will be also
# updated.
func_db.count = func_db.count + 1
try:
# Check if the service url is existing.
mapping_db = db_api.get_function_service_mapping(function_id)
LOG.info('Found Service url for function: %s', function_id)
func_url = '%s/execute' % mapping_db.service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
except exc.DBEntityNotFoundError:
pass
if func_url:
r = requests.post(func_url, json=execution.get('input'))
execution.update(
{'status': 'success', 'output': {'result': r.json()}}
)
else:
runtime_id = func_db.runtime_id
runtime_db = db_api.get_runtime(runtime_id)
if runtime_db.status != status.AVAILABLE:
raise exc.RuntimeNotAvailableException(
'Runtime %s is not available.' % runtime_id
)
execution.update({'status': status.RUNNING})
db_model = db_api.create_execution(execution)
if not func_url:
engine_client.create_execution(
db_model.id, function_id, runtime_id,
input=execution.get('input'),
is_sync=is_sync
)
return db_model

91
qinling/utils/jobs.py Normal file
View File

@ -0,0 +1,91 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import croniter
from dateutil import parser
from oslo_utils import timeutils
import six
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling.utils.openstack import keystone as keystone_utils
def validate_job(params):
first_time = params.get('first_execution_time')
pattern = params.get('pattern')
count = params.get('count')
start_time = timeutils.utcnow()
if isinstance(first_time, six.string_types):
try:
# We need naive datetime object.
first_time = parser.parse(first_time, ignoretz=True)
except ValueError as e:
raise exc.InputException(e.message)
if not (first_time or pattern):
raise exc.InputException(
'Pattern or first_execution_time must be specified.'
)
if first_time:
# first_time is assumed to be UTC time.
valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60)
if valid_min_time > first_time:
raise exc.InputException(
'first_execution_time must be at least 1 minute in the '
'future.'
)
if not pattern and count and count > 1:
raise exc.InputException(
'Pattern must be provided if count is greater than 1.'
)
next_time = first_time
if not (pattern or count):
count = 1
if pattern:
try:
croniter.croniter(pattern)
except (ValueError, KeyError):
raise exc.InputException(
'The specified pattern is not valid: {}'.format(pattern)
)
if not first_time:
next_time = croniter.croniter(pattern, start_time).get_next(
datetime.datetime
)
return first_time, next_time, count
def delete_job(id, trust_id=None):
if not trust_id:
trust_id = db_api.get_job(id).trust_id
modified_count = db_api.delete_job(id)
if modified_count:
# Delete trust only together with deleting trigger.
keystone_utils.delete_trust(trust_id)
return 0 != modified_count
def get_next_execution_time(pattern, start_time):
return croniter.croniter(pattern, start_time).get_next(
datetime.datetime
)

View File

@ -30,7 +30,7 @@ def _get_user_keystone_session():
ctx = context.get_ctx() ctx = context.get_ctx()
auth = generic.Token( auth = generic.Token(
auth_url=CONF.keystone_authtoken.auth_url, auth_url=CONF.keystone_authtoken.auth_uri,
token=ctx.auth_token, token=ctx.auth_token,
) )
@ -47,9 +47,20 @@ def get_swiftclient():
@common.disable_ssl_warnings @common.disable_ssl_warnings
def get_keystone_client(): def get_keystone_client(use_session=True):
if use_session:
session = _get_user_keystone_session() session = _get_user_keystone_session()
keystone = ks_client.Client(session=session) keystone = ks_client.Client(session=session)
else:
ctx = context.get_ctx()
auth_url = CONF.keystone_authtoken.auth_uri
keystone = ks_client.Client(
user_id=ctx.user,
token=ctx.auth_token,
tenant_id=ctx.projectid,
auth_url=auth_url
)
keystone.management_url = auth_url
return keystone return keystone
@ -67,6 +78,20 @@ def _get_admin_user_id():
return client.user_id return client.user_id
@common.disable_ssl_warnings
def _get_trust_client(trust_id):
"""Get project keystone client using admin credential."""
client = ks_client.Client(
username=CONF.keystone_authtoken.username,
password=CONF.keystone_authtoken.password,
auth_url=CONF.keystone_authtoken.auth_uri,
trust_id=trust_id
)
client.management_url = CONF.keystone_authtoken.auth_uri
return client
@common.disable_ssl_warnings @common.disable_ssl_warnings
def create_trust(): def create_trust():
client = get_keystone_client() client = get_keystone_client()
@ -84,11 +109,37 @@ def create_trust():
@common.disable_ssl_warnings @common.disable_ssl_warnings
def delete_trust(trust_id): def delete_trust(trust_id):
"""Delete trust from keystone.
The trust can only be deleted by original user(trustor)
"""
if not trust_id: if not trust_id:
return return
client = get_keystone_client()
try: try:
client = get_keystone_client()
client.trusts.delete(trust_id) client.trusts.delete(trust_id)
except Exception as e: LOG.debug('Trust %s deleted.', trust_id)
LOG.warning("Failed to delete trust [id=%s]: %s" % (trust_id, e)) except Exception:
LOG.exception("Failed to delete trust [id=%s]: %s", trust_id)
def create_trust_context(trust_id, project_id):
"""Creates Qinling context on behalf of the project."""
if CONF.pecan.auth_enable:
client = _get_trust_client(trust_id)
return context.Context(
user=client.user_id,
tenant=project_id,
auth_token=client.auth_token,
is_trust_scoped=True,
trust_id=trust_id,
)
return context.Context(
user=None,
tenant=context.DEFAULT_PROJECT_ID,
auth_token=None,
is_admin=True
)