diff --git a/qinling/api/app.py b/qinling/api/app.py index 4efe44e8..f5969f99 100644 --- a/qinling/api/app.py +++ b/qinling/api/app.py @@ -13,11 +13,15 @@ # limitations under the License. from oslo_config import cfg +from oslo_log import log as logging import pecan from qinling.api import access_control from qinling import context as ctx from qinling.db import api as db_api +from qinling.services import periodics + +LOG = logging.getLogger(__name__) def get_pecan_config(): @@ -39,10 +43,13 @@ def get_pecan_config(): def setup_app(config=None): if not config: config = get_pecan_config() - app_conf = dict(config.app) + db_api.setup_db() + LOG.info('Starting periodic tasks...') + periodics.start_job_handler() + app = pecan.make_app( app_conf.pop('root'), hooks=lambda: [ctx.ContextHook(), ctx.AuthHook()], diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index 92423f36..3f7dd879 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -14,15 +14,13 @@ from oslo_log import log as logging from pecan import rest -import requests import wsmeext.pecan as wsme_pecan from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import types from qinling.db import api as db_api -from qinling import exceptions as exc from qinling import rpc -from qinling import status +from qinling.utils import executions from qinling.utils import rest_utils LOG = logging.getLogger(__name__) @@ -42,55 +40,10 @@ class ExecutionsController(rest.RestController): ) def post(self, execution): params = execution.to_dict() - LOG.info("Creating execution. [execution=%s]", params) - function_id = params['function_id'] - is_sync = params.get('sync', True) - func_url = None - - with db_api.transaction(): - func_db = db_api.get_function(function_id) - - # Increase function invoke count, the updated_at field will be also - # updated. - func_db.count = func_db.count + 1 - - try: - # Check if the service url is existing. - mapping_db = db_api.get_function_service_mapping(function_id) - LOG.info('Found Service url for function: %s', function_id) - - func_url = '%s/execute' % mapping_db.service_url - LOG.info('Invoke function %s, url: %s', function_id, func_url) - except exc.DBEntityNotFoundError: - pass - - if func_url: - r = requests.post(func_url, json=params.get('input')) - params.update( - {'status': 'success', 'output': {'result': r.json()}} - ) - else: - runtime_id = func_db.runtime_id - runtime_db = db_api.get_runtime(runtime_id) - if runtime_db.status != status.AVAILABLE: - raise exc.RuntimeNotAvailableException( - 'Runtime %s is not available.' % runtime_id - ) - - params.update({'status': status.RUNNING}) - - db_model = db_api.create_execution(params) - - if not func_url: - self.engine_client.create_execution( - db_model.id, function_id, runtime_id, - input=params.get('input'), - is_sync=is_sync - ) - - if is_sync: + db_model = executions.create_execution(self.engine_client, params) + if params.get('sync', True): db_model = db_api.get_execution(db_model.id) return resources.Execution.from_dict(db_model.to_dict()) diff --git a/qinling/api/controllers/v1/job.py b/qinling/api/controllers/v1/job.py index f054cc6f..eba32c40 100644 --- a/qinling/api/controllers/v1/job.py +++ b/qinling/api/controllers/v1/job.py @@ -12,21 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import croniter -import datetime - -from dateutil import parser from oslo_config import cfg from oslo_log import log as logging -from oslo_utils import timeutils from pecan import rest -import six import wsmeext.pecan as wsme_pecan from qinling.api.controllers.v1 import resources from qinling.api.controllers.v1 import types from qinling.db import api as db_api from qinling import exceptions as exc +from qinling import status +from qinling.utils import jobs from qinling.utils.openstack import keystone as keystone_util from qinling.utils import rest_utils @@ -51,52 +47,7 @@ class JobsController(rest.RestController): 'Required param is missing. Required: %s' % POST_REQUIRED ) - first_time = params.get('first_execution_time') - pattern = params.get('pattern') - count = params.get('count') - start_time = timeutils.utcnow() - - if isinstance(first_time, six.string_types): - try: - # We need naive datetime object. - first_time = parser.parse(first_time, ignoretz=True) - except ValueError as e: - raise exc.InputException(e.message) - - if not (first_time or pattern): - raise exc.InputException( - 'Pattern or first_execution_time must be specified.' - ) - - if first_time: - # first_time is assumed to be UTC time. - valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60) - if valid_min_time > first_time: - raise exc.InputException( - 'first_execution_time must be at least 1 minute in the ' - 'future.' - ) - if not pattern and count and count > 1: - raise exc.InputException( - 'Pattern must be provided if count is greater than 1.' - ) - - next_time = first_time - if not (pattern or count): - count = 1 - if pattern: - try: - croniter.croniter(pattern) - except (ValueError, KeyError): - raise exc.InputException( - 'The specified pattern is not valid: {}'.format(pattern) - ) - - if not first_time: - next_time = croniter.croniter(pattern, start_time).get_next( - datetime.datetime - ) - + first_time, next_time, count = jobs.validate_job(params) LOG.info("Creating job. [job=%s]", params) with db_api.transaction(): @@ -104,12 +55,13 @@ class JobsController(rest.RestController): values = { 'name': params.get('name'), - 'pattern': pattern, + 'pattern': params.get('pattern'), 'first_execution_time': first_time, 'next_execution_time': next_time, 'count': count, 'function_id': params['function_id'], - 'function_input': params.get('function_input') or {} + 'function_input': params.get('function_input') or {}, + 'status': status.RUNNING } if cfg.CONF.pecan.auth_enable: @@ -129,9 +81,22 @@ class JobsController(rest.RestController): def delete(self, id): """Delete job.""" LOG.info("Delete job [id=%s]" % id) + jobs.delete_job(id) - job = db_api.get_job(id) - trust_id = job.trust_id + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Job, types.uuid) + def get(self, id): + LOG.info("Fetch job [id=%s]", id) + job_db = db_api.get_job(id) - keystone_util.delete_trust(trust_id) - db_api.delete_job(id) + return resources.Job.from_dict(job_db.to_dict()) + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(resources.Jobs) + def get_all(self): + LOG.info("Get all jobs.") + + jobs = [resources.Job.from_dict(db_model.to_dict()) + for db_model in db_api.get_jobs()] + + return resources.Jobs(jobs=jobs) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 71903463..b075c5b9 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -261,6 +261,7 @@ class Runtimes(ResourceList): class Execution(Resource): id = types.uuid function_id = wsme.wsattr(types.uuid, mandatory=True) + description = wtypes.text status = wsme.wsattr(wtypes.text, readonly=True) sync = bool input = types.jsontype @@ -274,6 +275,7 @@ class Execution(Resource): return cls( id='123e4567-e89b-12d3-a456-426655440000', function_id='123e4567-e89b-12d3-a456-426655440000', + description='this is the first execution.', status='success', sync=True, input={'data': 'hello, world'}, @@ -310,8 +312,9 @@ class Job(Resource): name = wtypes.text function_id = types.uuid function_input = types.jsontype + status = wsme.wsattr(wtypes.text, readonly=True) pattern = wtypes.text - count = wtypes.IntegerType(minimum=1) + count = int first_execution_time = wtypes.text next_execution_time = wtypes.text project_id = wsme.wsattr(wtypes.text, readonly=True) @@ -325,8 +328,9 @@ class Job(Resource): name='my_job', function_id='123e4567-e89b-12d3-a456-426655440000', function_input={'data': 'hello, world'}, + status='done', pattern='* * * * *', - count=42, + count=0, first_execution_time='', next_execution_time='', project_id='default', diff --git a/qinling/context.py b/qinling/context.py index ab8e223a..0bcacd98 100644 --- a/qinling/context.py +++ b/qinling/context.py @@ -80,9 +80,11 @@ def set_ctx(new_ctx): class Context(oslo_context.RequestContext): - _session = None + def __init__(self, is_trust_scoped=False, trust_id=None, is_admin=False, + **kwargs): + self.is_trust_scoped = is_trust_scoped + self.trust_id = trust_id - def __init__(self, is_admin=False, **kwargs): super(Context, self).__init__(is_admin=is_admin, **kwargs) @property @@ -92,6 +94,40 @@ class Context(oslo_context.RequestContext): else: return DEFAULT_PROJECT_ID + def convert_to_dict(self): + """Return a dictionary of context attributes. + + Use get_logging_values() instead of to_dict() from parent class to get + more information from the context. This method is not named "to_dict" + to avoid recursive calling. + """ + ctx_dict = self.get_logging_values() + ctx_dict.update( + { + 'is_trust_scoped': self.is_trust_scoped, + 'trust_id': self.trust_id, + } + ) + + return ctx_dict + + @classmethod + def from_dict(cls, values, **kwargs): + """Construct a context object from a provided dictionary.""" + kwargs.setdefault( + 'is_trust_scoped', values.get('is_trust_scoped', False) + ) + kwargs.setdefault('trust_id', values.get('trust_id')) + + return super(Context, cls).from_dict(values, **kwargs) + + @classmethod + def from_environ(cls, env): + context = super(Context, cls).from_environ(env) + context.is_admin = True if 'admin' in context.roles else False + + return context + class ContextHook(hooks.PecanHook): def before(self, state): diff --git a/qinling/db/api.py b/qinling/db/api.py index 2ab2643d..3ab551ce 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -59,6 +59,10 @@ def delete_all(): delete_runtimes(insecure=True) +def conditional_update(model, values, expected_values, **kwargs): + return IMPL.conditional_update(model, values, expected_values, **kwargs) + + def get_function(id): return IMPL.get_function(id) @@ -155,5 +159,13 @@ def get_job(id): return IMPL.get_job(id) +def get_next_jobs(before): + return IMPL.get_next_jobs(before) + + def delete_job(id): return IMPL.delete_job(id) + + +def get_jobs(): + return IMPL.get_jobs() diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 2df0dbad..6a45c64c 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -28,6 +28,7 @@ from qinling.db.sqlalchemy import filters as db_filters from qinling.db.sqlalchemy import model_base from qinling.db.sqlalchemy import models from qinling import exceptions as exc +from qinling import status CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -190,6 +191,21 @@ def _delete_all(model, insecure=False, **kwargs): query.filter_by(**kwargs).delete(synchronize_session=False) +@db_base.session_aware() +def conditional_update(model, values, expected_values, insecure=False, + filters={}, session=None): + """Compare-and-swap conditional update SQLAlchemy implementation.""" + filters.update(expected_values) + query = (db_base.model_query(model) if insecure else _secure_query(model)) + query = db_filters.apply_filters(query, model, **filters) + update_args = {'synchronize_session': False} + + # Return True if we were able to change any DB entry, False otherwise + result = query.update(values, **update_args) + + return 0 != result + + @db_base.session_aware() def get_function(id, session=None): function = _get_db_object_by_id(models.Function, id) @@ -394,6 +410,24 @@ def get_job(id, session=None): @db_base.session_aware() def delete_job(id, session=None): - job = get_job(id) + get_job(id) - session.delete(job) + # Delete the job by ID and get the affected row count. + table = models.Job.__table__ + result = session.execute(table.delete().where(table.c.id == id)) + + return result.rowcount + + +@db_base.session_aware() +def get_next_jobs(before, session=None): + return _get_collection( + models.Job, insecure=True, sort_keys=['next_execution_time'], + sort_dirs=['asc'], next_execution_time={'lt': before}, + status={'neq': status.DONE} + ) + + +@db_base.session_aware() +def get_jobs(session=None, **kwargs): + return _get_collection_sorted_by_time(models.Job, **kwargs) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 439cba8d..f905cbb8 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -100,6 +100,7 @@ def upgrade(): sa.Column('project_id', sa.String(length=80), nullable=False), sa.Column('id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False), + sa.Column('description', sa.String(length=255), nullable=True), sa.Column('status', sa.String(length=32), nullable=False), sa.Column('sync', sa.BOOLEAN, nullable=False), sa.Column('input', st.JsonLongDictType(), nullable=True), @@ -116,6 +117,7 @@ def upgrade(): sa.Column('id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('function_input', st.JsonLongDictType(), nullable=True), + sa.Column('status', sa.String(length=32), nullable=False), sa.Column('name', sa.String(length=255), nullable=True), sa.Column('pattern', sa.String(length=32), nullable=False), sa.Column('first_execution_time', sa.DateTime(), nullable=True), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index f419815e..1c8e66f6 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -73,6 +73,7 @@ class Execution(model_base.QinlingSecureModelBase): sync = sa.Column(sa.BOOLEAN, default=True) input = sa.Column(st.JsonLongDictType()) output = sa.Column(st.JsonLongDictType()) + description = sa.Column(sa.String(255)) class Job(model_base.QinlingSecureModelBase): @@ -85,6 +86,7 @@ class Job(model_base.QinlingSecureModelBase): # Set default to 'never'. default='0 0 30 2 0' ) + status = sa.Column(sa.String(32), nullable=False) first_execution_time = sa.Column(sa.DateTime, nullable=True) next_execution_time = sa.Column(sa.DateTime, nullable=False) count = sa.Column(sa.Integer) diff --git a/qinling/engine/service.py b/qinling/engine/service.py index b3ece4c5..f3b4217a 100644 --- a/qinling/engine/service.py +++ b/qinling/engine/service.py @@ -40,7 +40,7 @@ class EngineService(service.Service): db_api.setup_db() LOG.info('Starting periodic tasks...') - periodics.start(orchestrator) + periodics.start_function_mapping_handler(orchestrator) topic = CONF.engine.topic server = CONF.engine.host diff --git a/qinling/rpc.py b/qinling/rpc.py index 4d7dbcec..e8835450 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -101,7 +101,7 @@ class ContextSerializer(messaging.Serializer): return self._base.deserialize_entity(context, entity) def serialize_context(self, context): - return context.to_dict() + return context.convert_to_dict() def deserialize_context(self, context): qinling_ctx = ctx.Context.from_dict(context) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index 1353d22c..3bb11f28 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -18,14 +18,21 @@ from datetime import timedelta from oslo_config import cfg from oslo_log import log as logging from oslo_service import threadgroup +from oslo_utils import timeutils from qinling import context from qinling.db import api as db_api +from qinling.db.sqlalchemy import models from qinling import rpc +from qinling import status +from qinling.utils import constants +from qinling.utils import executions +from qinling.utils import jobs +from qinling.utils.openstack import keystone as keystone_utils LOG = logging.getLogger(__name__) CONF = cfg.CONF -_THREAD_GROUP = None +_periodic_tasks = {} def handle_function_service_expiration(ctx, engine_client, orchestrator): @@ -60,23 +67,119 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator): db_api.delete_function_service_mapping(m.function_id) -def start(orchestrator): - global _THREAD_GROUP - _THREAD_GROUP = threadgroup.ThreadGroup() +def handle_job(engine_client): + for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)): + LOG.debug("Processing job: %s, function: %s", job.id, job.function_id) + try: + # Setup context before schedule job. + ctx = keystone_utils.create_trust_context( + job.trust_id, job.project_id + ) + context.set_ctx(ctx) + + if (job.count is not None and job.count > 0): + job.count -= 1 + + # Job delete/update is done using UPDATE ... FROM ... WHERE + # non-locking clause. + if job.count == 0: + modified = db_api.conditional_update( + models.Job, + { + 'status': status.DONE, + 'trust_id': '', + 'count': 0 + }, + { + 'id': job.id, + 'status': status.RUNNING + }, + insecure=True, + ) + else: + next_time = jobs.get_next_execution_time( + job.pattern, + job.next_execution_time + ) + + modified = db_api.conditional_update( + models.Job, + { + 'next_execution_time': next_time, + 'count': job.count + }, + { + 'id': job.id, + 'next_execution_time': job.next_execution_time + }, + insecure=True, + ) + + if not modified: + LOG.warning( + 'Job %s has been already handled by another periodic ' + 'task.', job.id + ) + continue + + LOG.debug( + "Starting to execute function %s by job %s", + job.function_id, job.id + ) + + exe_param = { + 'function_id': job.function_id, + 'input': job.function_input, + 'sync': False, + 'description': constants.EXECUTION_BY_JOB % job.id + } + executions.create_execution(engine_client, exe_param) + except Exception: + LOG.exception("Failed to process job %s", job.id) + finally: + context.set_ctx(None) + + +def start_function_mapping_handler(orchestrator): + tg = threadgroup.ThreadGroup(1) engine_client = rpc.get_engine_client() - _THREAD_GROUP.add_timer( + tg.add_timer( 300, handle_function_service_expiration, ctx=context.Context(), engine_client=engine_client, orchestrator=orchestrator ) + _periodic_tasks[constants.PERIODIC_FUNC_MAPPING_HANDLER] = tg + + LOG.info('Function mapping handler started.') -def stop(): - global _THREAD_GROUP +def start_job_handler(): + tg = threadgroup.ThreadGroup(1) + engine_client = rpc.get_engine_client() - if _THREAD_GROUP: - _THREAD_GROUP.stop() + tg.add_timer( + 3, + handle_job, + engine_client=engine_client, + ) + _periodic_tasks[constants.PERIODIC_JOB_HANDLER] = tg + + LOG.info('Job handler started.') + + +def stop(task=None): + if not task: + for name, tg in _periodic_tasks.items(): + LOG.info('Stopping periodic task: %s', name) + tg.stop() + del _periodic_tasks[name] + else: + tg = _periodic_tasks.get(task) + if tg: + LOG.info('Stopping periodic task: %s', task) + tg.stop() + del _periodic_tasks[task] diff --git a/qinling/status.py b/qinling/status.py index 4dfac928..2c0f4876 100644 --- a/qinling/status.py +++ b/qinling/status.py @@ -18,3 +18,4 @@ UPGRADING = 'upgrading' ERROR = 'error' DELETING = 'deleting' RUNNING = 'running' +DONE = 'done' diff --git a/qinling/tests/unit/base.py b/qinling/tests/unit/base.py index 3a3f30c2..385516a7 100644 --- a/qinling/tests/unit/base.py +++ b/qinling/tests/unit/base.py @@ -200,6 +200,7 @@ class DbTestCase(BaseTest): # 'auth_enable' is disabled by default, we create runtime for # default tenant. 'project_id': DEFAULT_PROJECT_ID, + 'status': status.RUNNING } ) diff --git a/qinling/utils/constants.py b/qinling/utils/constants.py new file mode 100644 index 00000000..8255d6d1 --- /dev/null +++ b/qinling/utils/constants.py @@ -0,0 +1,18 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +EXECUTION_BY_JOB = 'Created by Job %s' + +PERIODIC_JOB_HANDLER = 'job_handler' +PERIODIC_FUNC_MAPPING_HANDLER = 'function_mapping_handler' diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py new file mode 100644 index 00000000..01226caf --- /dev/null +++ b/qinling/utils/executions.py @@ -0,0 +1,70 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import requests + +from qinling.db import api as db_api +from qinling import exceptions as exc +from qinling import status + +LOG = logging.getLogger(__name__) + + +def create_execution(engine_client, execution): + function_id = execution['function_id'] + is_sync = execution.get('sync', True) + func_url = None + + with db_api.transaction(): + func_db = db_api.get_function(function_id) + # Increase function invoke count, the updated_at field will be also + # updated. + func_db.count = func_db.count + 1 + + try: + # Check if the service url is existing. + mapping_db = db_api.get_function_service_mapping(function_id) + LOG.info('Found Service url for function: %s', function_id) + + func_url = '%s/execute' % mapping_db.service_url + LOG.info('Invoke function %s, url: %s', function_id, func_url) + except exc.DBEntityNotFoundError: + pass + + if func_url: + r = requests.post(func_url, json=execution.get('input')) + execution.update( + {'status': 'success', 'output': {'result': r.json()}} + ) + else: + runtime_id = func_db.runtime_id + runtime_db = db_api.get_runtime(runtime_id) + if runtime_db.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % runtime_id + ) + + execution.update({'status': status.RUNNING}) + + db_model = db_api.create_execution(execution) + + if not func_url: + engine_client.create_execution( + db_model.id, function_id, runtime_id, + input=execution.get('input'), + is_sync=is_sync + ) + + return db_model diff --git a/qinling/utils/jobs.py b/qinling/utils/jobs.py new file mode 100644 index 00000000..731a7214 --- /dev/null +++ b/qinling/utils/jobs.py @@ -0,0 +1,91 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +import croniter +from dateutil import parser +from oslo_utils import timeutils +import six + +from qinling.db import api as db_api +from qinling import exceptions as exc +from qinling.utils.openstack import keystone as keystone_utils + + +def validate_job(params): + first_time = params.get('first_execution_time') + pattern = params.get('pattern') + count = params.get('count') + start_time = timeutils.utcnow() + + if isinstance(first_time, six.string_types): + try: + # We need naive datetime object. + first_time = parser.parse(first_time, ignoretz=True) + except ValueError as e: + raise exc.InputException(e.message) + + if not (first_time or pattern): + raise exc.InputException( + 'Pattern or first_execution_time must be specified.' + ) + + if first_time: + # first_time is assumed to be UTC time. + valid_min_time = timeutils.utcnow() + datetime.timedelta(0, 60) + if valid_min_time > first_time: + raise exc.InputException( + 'first_execution_time must be at least 1 minute in the ' + 'future.' + ) + if not pattern and count and count > 1: + raise exc.InputException( + 'Pattern must be provided if count is greater than 1.' + ) + + next_time = first_time + if not (pattern or count): + count = 1 + if pattern: + try: + croniter.croniter(pattern) + except (ValueError, KeyError): + raise exc.InputException( + 'The specified pattern is not valid: {}'.format(pattern) + ) + + if not first_time: + next_time = croniter.croniter(pattern, start_time).get_next( + datetime.datetime + ) + + return first_time, next_time, count + + +def delete_job(id, trust_id=None): + if not trust_id: + trust_id = db_api.get_job(id).trust_id + + modified_count = db_api.delete_job(id) + if modified_count: + # Delete trust only together with deleting trigger. + keystone_utils.delete_trust(trust_id) + + return 0 != modified_count + + +def get_next_execution_time(pattern, start_time): + return croniter.croniter(pattern, start_time).get_next( + datetime.datetime + ) diff --git a/qinling/utils/openstack/keystone.py b/qinling/utils/openstack/keystone.py index 4c091053..9eab9aa8 100644 --- a/qinling/utils/openstack/keystone.py +++ b/qinling/utils/openstack/keystone.py @@ -30,7 +30,7 @@ def _get_user_keystone_session(): ctx = context.get_ctx() auth = generic.Token( - auth_url=CONF.keystone_authtoken.auth_url, + auth_url=CONF.keystone_authtoken.auth_uri, token=ctx.auth_token, ) @@ -47,9 +47,20 @@ def get_swiftclient(): @common.disable_ssl_warnings -def get_keystone_client(): - session = _get_user_keystone_session() - keystone = ks_client.Client(session=session) +def get_keystone_client(use_session=True): + if use_session: + session = _get_user_keystone_session() + keystone = ks_client.Client(session=session) + else: + ctx = context.get_ctx() + auth_url = CONF.keystone_authtoken.auth_uri + keystone = ks_client.Client( + user_id=ctx.user, + token=ctx.auth_token, + tenant_id=ctx.projectid, + auth_url=auth_url + ) + keystone.management_url = auth_url return keystone @@ -67,6 +78,20 @@ def _get_admin_user_id(): return client.user_id +@common.disable_ssl_warnings +def _get_trust_client(trust_id): + """Get project keystone client using admin credential.""" + client = ks_client.Client( + username=CONF.keystone_authtoken.username, + password=CONF.keystone_authtoken.password, + auth_url=CONF.keystone_authtoken.auth_uri, + trust_id=trust_id + ) + client.management_url = CONF.keystone_authtoken.auth_uri + + return client + + @common.disable_ssl_warnings def create_trust(): client = get_keystone_client() @@ -84,11 +109,37 @@ def create_trust(): @common.disable_ssl_warnings def delete_trust(trust_id): + """Delete trust from keystone. + + The trust can only be deleted by original user(trustor) + """ if not trust_id: return - client = get_keystone_client() try: + client = get_keystone_client() client.trusts.delete(trust_id) - except Exception as e: - LOG.warning("Failed to delete trust [id=%s]: %s" % (trust_id, e)) + LOG.debug('Trust %s deleted.', trust_id) + except Exception: + LOG.exception("Failed to delete trust [id=%s]: %s", trust_id) + + +def create_trust_context(trust_id, project_id): + """Creates Qinling context on behalf of the project.""" + if CONF.pecan.auth_enable: + client = _get_trust_client(trust_id) + + return context.Context( + user=client.user_id, + tenant=project_id, + auth_token=client.auth_token, + is_trust_scoped=True, + trust_id=trust_id, + ) + + return context.Context( + user=None, + tenant=context.DEFAULT_PROJECT_ID, + auth_token=None, + is_admin=True + )