diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index dffaa0d7..7a923450 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -46,33 +46,39 @@ class ExecutionsController(rest.RestController): function_id = params['function_id'] is_sync = params.get('sync', True) + func_url = None - # Check if the service url is existing. - try: - mapping = db_api.get_function_service_mapping(function_id) - LOG.debug('Found Service url for function: %s', function_id) + with db_api.transaction(): + func_db = db_api.get_function(function_id) - func_url = '%s/execute' % mapping.service_url - LOG.info('Invoke function %s, url: %s', function_id, func_url) + # Increase function invoke count, the updated_at field will be also + # updated. + func_db.count = func_db.count + 1 + + try: + # Check if the service url is existing. + mapping_db = db_api.get_function_service_mapping(function_id) + LOG.info('Found Service url for function: %s', function_id) + + func_url = '%s/execute' % mapping_db.service_url + LOG.info('Invoke function %s, url: %s', function_id, func_url) + except exc.DBEntityNotFoundError: + pass + + if func_url: + r = requests.post(func_url, data=params.get('input')) + params.update( + {'status': 'success', 'output': {'result': r.json()}} + ) + else: + runtime_id = func_db.runtime_id + params.update({'status': 'running'}) - r = requests.post(func_url, data=params.get('input')) - params.update( - {'status': 'success', 'output': {'result': r.json()}} - ) db_model = db_api.create_execution(params) - return resources.Execution.from_dict(db_model.to_dict()) - except exc.DBEntityNotFoundError: - pass - - func = db_api.get_function(function_id) - runtime_id = func.runtime_id - params.update({'status': 'running'}) - - db_model = db_api.create_execution(params) - self.engine_client.create_execution( - db_model.id, function_id, runtime_id, input=params.get('input'), + db_model.id, function_id, runtime_id, + input=params.get('input'), is_sync=is_sync ) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index ae23487a..2ab5abf1 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -177,7 +177,9 @@ class FunctionsController(rest.RestController): if source == 'package': self.storage_provider.delete(context.get_ctx().projectid, id) if source == 'image': - self.engine_client.delete_function(id, func_db.name) + # If it's image function, need to delete all resources created + # by orchestrator asynchronously. + self.engine_client.delete_function(id) # This will also delete function service mapping as well. db_api.delete_function(id) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 83171a57..2ba30221 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -169,6 +169,7 @@ class Function(Resource): runtime_id = types.uuid code = types.jsontype entry = wtypes.text + count = int created_at = wtypes.text updated_at = wtypes.text @@ -183,6 +184,7 @@ class Function(Resource): runtime_id='123e4567-e89b-12d3-a456-426655440001', code={'zip': True}, entry='main', + count=10, created_at='1970-01-01T00:00:00.000000', updated_at='1970-01-01T00:00:00.000000' ) diff --git a/qinling/config.py b/qinling/config.py index b69efa86..367e9bc0 100644 --- a/qinling/config.py +++ b/qinling/config.py @@ -91,6 +91,11 @@ engine_opts = [ choices=['kubernetes', 'swarm'], help='The container orchestrator.' ), + cfg.IntOpt( + 'function_service_expiration', + default=300, + help='Maximum service time for function in orchestrator.' + ) ] STORAGE_GROUP = 'storage' diff --git a/qinling/db/api.py b/qinling/db/api.py index cf7d009c..af02fb66 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -137,3 +137,11 @@ def create_function_service_mapping(values): def get_function_service_mapping(function_id): return IMPL.get_function_service_mapping(function_id) + + +def get_function_service_mappings(**kwargs): + return IMPL.get_function_service_mappings(**kwargs) + + +def delete_function_service_mapping(id): + return IMPL.delete_function_service_mapping(id) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index 87cb6773..580f1e75 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -143,7 +143,7 @@ def _get_collection(model, insecure=False, limit=None, marker=None, if fields else () ) - query = (db_base.model_query(model, *columns) if insecure + query = (db_base.model_query(model, columns) if insecure else _secure_query(model, *columns)) query = db_filters.apply_filters(query, model, **filters) @@ -325,3 +325,17 @@ def get_function_service_mapping(function_id, session=None): ) return mapping + + +@db_base.session_aware() +def get_function_service_mappings(session=None, **kwargs): + return _get_collection_sorted_by_time( + models.FunctionServiceMapping, **kwargs + ) + + +@db_base.session_aware() +def delete_function_service_mapping(id, session=None): + mapping = get_function_service_mapping(id) + + session.delete(mapping) diff --git a/qinling/db/sqlalchemy/filters.py b/qinling/db/sqlalchemy/filters.py index 84faa920..029c29fd 100644 --- a/qinling/db/sqlalchemy/filters.py +++ b/qinling/db/sqlalchemy/filters.py @@ -12,8 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import sqlalchemy as sa - def apply_filters(query, model, **filters): filter_dict = {} @@ -45,22 +43,6 @@ def apply_filters(query, model, **filters): else: filter_dict[key] = value - # We need to handle tag case seprately. As tag datatype is MutableList. - # TODO(hparekh): Need to think how can we get rid of this. - tags = filters.pop('tags', None) - - # To match the tag list, a resource must contain at least all of the - # tags present in the filter parameter. - if tags: - tag_attr = getattr(model, 'tags') - - if not isinstance(tags, list): - expr = tag_attr.contains(tags) - else: - expr = sa.and_(*[tag_attr.contains(tag) for tag in tags]) - - query = query.filter(expr) - if filter_dict: query = query.filter_by(**filter_dict) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py index 8b394cd1..fc7225c9 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/001_pike.py @@ -57,6 +57,7 @@ def upgrade(): sa.Column('timeout', sa.Integer, nullable=True), sa.Column('code', st.JsonLongDictType(), nullable=False), sa.Column('entry', sa.String(length=80), nullable=False), + sa.Column('count', sa.Integer, nullable=False), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('name', 'project_id'), info={"check_ifexists": True} @@ -64,11 +65,12 @@ def upgrade(): op.create_table( 'function_service_mapping', + sa.Column('id', sa.String(length=36), nullable=False), sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('service_url', sa.String(length=255), nullable=False), - sa.PrimaryKeyConstraint('function_id'), + sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('function_id', 'service_url'), sa.ForeignKeyConstraint( ['function_id'], [u'function.id'], ondelete='CASCADE' diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index e6010533..5d700d88 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -32,6 +32,7 @@ class Function(model_base.QinlingSecureModelBase): timeout = sa.Column(sa.Integer) code = sa.Column(st.JsonLongDictType(), nullable=False) entry = sa.Column(sa.String(80), nullable=False) + count = sa.Column(sa.Integer, default=0) class FunctionServiceMapping(model_base.QinlingModelBase): @@ -41,10 +42,10 @@ class FunctionServiceMapping(model_base.QinlingModelBase): sa.UniqueConstraint('function_id', 'service_url'), ) + id = model_base.id_column() function_id = sa.Column( sa.String(36), sa.ForeignKey(Function.id, ondelete='CASCADE'), - primary_key=True, ) service_url = sa.Column(sa.String(255), nullable=False) diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index cb3d8d3e..2751ce9d 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -127,10 +127,9 @@ class DefaultEngine(object): } db_api.create_function_service_mapping(mapping) - def delete_function(self, ctx, function_id, function_name): + def delete_function(self, ctx, function_id): LOG.info('Start to delete function, id=%s', function_id) - labels = { - 'function_name': function_name, 'function_id': function_id - } - self.orchestrator.delete_function(labels) + labels = {'function_id': function_id} + + self.orchestrator.delete_function(function_id, labels=labels) diff --git a/qinling/engine/service.py b/qinling/engine/service.py index f0d6c98d..b3ece4c5 100644 --- a/qinling/engine/service.py +++ b/qinling/engine/service.py @@ -15,12 +15,14 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging +from oslo_messaging.rpc import dispatcher from oslo_service import service from qinling.db import api as db_api from qinling.engine import default_engine as engine from qinling.orchestrator import base as orchestra_base from qinling import rpc +from qinling.services import periodics LOG = logging.getLogger(__name__) CONF = cfg.CONF @@ -35,34 +37,41 @@ class EngineService(service.Service): def start(self): orchestrator = orchestra_base.load_orchestrator(CONF) + db_api.setup_db() + + LOG.info('Starting periodic tasks...') + periodics.start(orchestrator) + topic = CONF.engine.topic server = CONF.engine.host transport = messaging.get_transport(CONF) target = messaging.Target(topic=topic, server=server, fanout=False) endpoints = [engine.DefaultEngine(orchestrator)] + access_policy = dispatcher.DefaultRPCAccessPolicy self.server = messaging.get_rpc_server( transport, target, endpoints, executor='eventlet', + access_policy=access_policy, serializer=rpc.ContextSerializer( messaging.serializer.JsonPayloadSerializer()) ) - db_api.setup_db() - LOG.info('Starting engine...') self.server.start() super(EngineService, self).start() def stop(self, graceful=False): + periodics.stop() + if self.server: LOG.info('Stopping engine...') self.server.stop() if graceful: LOG.info( - 'Consumer successfully stopped. Waiting for final ' + 'Consumer successfully stopped. Waiting for final ' 'messages to be processed...' ) self.server.wait() diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index 9ff083d0..d5146e59 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -43,7 +43,7 @@ class OrchestratorBase(object): raise NotImplementedError @abc.abstractmethod - def delete_function(self, labels): + def delete_function(self, function_id, **kwargs): raise NotImplementedError diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 529323d1..1e58267f 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -157,7 +157,7 @@ class KubernetesManager(base.OrchestratorBase): return pod - def _prepare_pod(self, pod, deployment_name, function_id, service_labels): + def _prepare_pod(self, pod, deployment_name, function_id, labels): """Pod preparation. 1. Update pod labels. @@ -186,10 +186,11 @@ class KubernetesManager(base.OrchestratorBase): # Create service for the choosen pod. service_name = "service-%s" % function_id + labels.update({'function_id': function_id}) service_body = self.service_template.render( { "service_name": service_name, - "labels": service_labels, + "labels": labels, "selector": pod_labels } ) @@ -322,10 +323,24 @@ class KubernetesManager(base.OrchestratorBase): return {'result': output} - def delete_function(self, labels): + def delete_function(self, function_id, labels=[]): selector = common.convert_dict_to_string(labels) + ret = self.v1.list_namespaced_service( + self.conf.kubernetes.namespace, label_selector=selector + ) + names = [i.metadata.name for i in ret.items] + for svc_name in names: + self.v1.delete_namespaced_service( + svc_name, + self.conf.kubernetes.namespace, + ) + + LOG.info("Services for function %s deleted.", function_id) + self.v1.delete_collection_namespaced_pod( self.conf.kubernetes.namespace, label_selector=selector ) + + LOG.info("Pod for function %s deleted.", function_id) diff --git a/qinling/rpc.py b/qinling/rpc.py index 9df2a53c..54a72ecf 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -170,10 +170,9 @@ class EngineClient(object): ) @wrap_messaging_exception - def delete_function(self, id, name): + def delete_function(self, id): return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), 'delete_function', - function_id=id, - function_name=name + function_id=id ) diff --git a/qinling/services/__init__.py b/qinling/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py new file mode 100644 index 00000000..eb713fc3 --- /dev/null +++ b/qinling/services/periodics.py @@ -0,0 +1,82 @@ +# Copyright 2017 Catalyst IT Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +from datetime import timedelta + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import threadgroup + +from qinling import context +from qinling.db import api as db_api +from qinling import rpc + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF +_THREAD_GROUP = None + + +def handle_function_service_expiration(ctx, engine_client, orchestrator): + context.set_ctx(ctx) + + delta = timedelta(seconds=CONF.engine.function_service_expiration) + expiry_time = datetime.utcnow() - delta + + results = db_api.get_functions( + fields=['id'], + sort_keys=['updated_at'], + insecure=True, + updated_at={'lte': expiry_time} + ) + + expiry_ids = [ret.id for ret in results] + + if not expiry_ids: + return + + mappings = db_api.get_function_service_mappings( + function_id={'in': expiry_ids} + ) + + LOG.info('Found total expiry function mapping numbers: %s', len(mappings)) + + with db_api.transaction(): + for m in mappings: + LOG.info('Deleting service mapping for function %s', m.function_id) + + engine_client.delete_function(m.function_id) + db_api.delete_function_service_mapping(m.function_id) + + +def start(orchestrator): + global _THREAD_GROUP + _THREAD_GROUP = threadgroup.ThreadGroup() + + engine_client = rpc.get_engine_client() + + _THREAD_GROUP.add_timer( + 300, + handle_function_service_expiration, + ctx=context.Context(), + engine_client=engine_client, + orchestrator=orchestrator + ) + + +def stop(): + global _THREAD_GROUP + + if _THREAD_GROUP: + _THREAD_GROUP.stop()