Add expiry handler for function service

When a package function is executed, a service url is cached so it's
faster when function is executed again. But this will lead to problem
if function is never executed for a long time.

This patch add an expiry machanism which will release resources in
orchestration platform.

Change-Id: I754ebe314d87f6d7c194d9b9c061316f7ba74245
This commit is contained in:
Lingxian Kong 2017-05-22 23:22:34 +12:00
parent d80f3acf35
commit 75efc2b1ad
16 changed files with 184 additions and 58 deletions

View File

@ -46,33 +46,39 @@ class ExecutionsController(rest.RestController):
function_id = params['function_id'] function_id = params['function_id']
is_sync = params.get('sync', True) is_sync = params.get('sync', True)
func_url = None
# Check if the service url is existing. with db_api.transaction():
try: func_db = db_api.get_function(function_id)
mapping = db_api.get_function_service_mapping(function_id)
LOG.debug('Found Service url for function: %s', function_id)
func_url = '%s/execute' % mapping.service_url # Increase function invoke count, the updated_at field will be also
LOG.info('Invoke function %s, url: %s', function_id, func_url) # updated.
func_db.count = func_db.count + 1
try:
# Check if the service url is existing.
mapping_db = db_api.get_function_service_mapping(function_id)
LOG.info('Found Service url for function: %s', function_id)
func_url = '%s/execute' % mapping_db.service_url
LOG.info('Invoke function %s, url: %s', function_id, func_url)
except exc.DBEntityNotFoundError:
pass
if func_url:
r = requests.post(func_url, data=params.get('input'))
params.update(
{'status': 'success', 'output': {'result': r.json()}}
)
else:
runtime_id = func_db.runtime_id
params.update({'status': 'running'})
r = requests.post(func_url, data=params.get('input'))
params.update(
{'status': 'success', 'output': {'result': r.json()}}
)
db_model = db_api.create_execution(params) db_model = db_api.create_execution(params)
return resources.Execution.from_dict(db_model.to_dict())
except exc.DBEntityNotFoundError:
pass
func = db_api.get_function(function_id)
runtime_id = func.runtime_id
params.update({'status': 'running'})
db_model = db_api.create_execution(params)
self.engine_client.create_execution( self.engine_client.create_execution(
db_model.id, function_id, runtime_id, input=params.get('input'), db_model.id, function_id, runtime_id,
input=params.get('input'),
is_sync=is_sync is_sync=is_sync
) )

View File

@ -177,7 +177,9 @@ class FunctionsController(rest.RestController):
if source == 'package': if source == 'package':
self.storage_provider.delete(context.get_ctx().projectid, id) self.storage_provider.delete(context.get_ctx().projectid, id)
if source == 'image': if source == 'image':
self.engine_client.delete_function(id, func_db.name) # If it's image function, need to delete all resources created
# by orchestrator asynchronously.
self.engine_client.delete_function(id)
# This will also delete function service mapping as well. # This will also delete function service mapping as well.
db_api.delete_function(id) db_api.delete_function(id)

View File

@ -169,6 +169,7 @@ class Function(Resource):
runtime_id = types.uuid runtime_id = types.uuid
code = types.jsontype code = types.jsontype
entry = wtypes.text entry = wtypes.text
count = int
created_at = wtypes.text created_at = wtypes.text
updated_at = wtypes.text updated_at = wtypes.text
@ -183,6 +184,7 @@ class Function(Resource):
runtime_id='123e4567-e89b-12d3-a456-426655440001', runtime_id='123e4567-e89b-12d3-a456-426655440001',
code={'zip': True}, code={'zip': True},
entry='main', entry='main',
count=10,
created_at='1970-01-01T00:00:00.000000', created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000' updated_at='1970-01-01T00:00:00.000000'
) )

View File

@ -91,6 +91,11 @@ engine_opts = [
choices=['kubernetes', 'swarm'], choices=['kubernetes', 'swarm'],
help='The container orchestrator.' help='The container orchestrator.'
), ),
cfg.IntOpt(
'function_service_expiration',
default=300,
help='Maximum service time for function in orchestrator.'
)
] ]
STORAGE_GROUP = 'storage' STORAGE_GROUP = 'storage'

View File

@ -137,3 +137,11 @@ def create_function_service_mapping(values):
def get_function_service_mapping(function_id): def get_function_service_mapping(function_id):
return IMPL.get_function_service_mapping(function_id) return IMPL.get_function_service_mapping(function_id)
def get_function_service_mappings(**kwargs):
return IMPL.get_function_service_mappings(**kwargs)
def delete_function_service_mapping(id):
return IMPL.delete_function_service_mapping(id)

View File

@ -143,7 +143,7 @@ def _get_collection(model, insecure=False, limit=None, marker=None,
if fields else () if fields else ()
) )
query = (db_base.model_query(model, *columns) if insecure query = (db_base.model_query(model, columns) if insecure
else _secure_query(model, *columns)) else _secure_query(model, *columns))
query = db_filters.apply_filters(query, model, **filters) query = db_filters.apply_filters(query, model, **filters)
@ -325,3 +325,17 @@ def get_function_service_mapping(function_id, session=None):
) )
return mapping return mapping
@db_base.session_aware()
def get_function_service_mappings(session=None, **kwargs):
return _get_collection_sorted_by_time(
models.FunctionServiceMapping, **kwargs
)
@db_base.session_aware()
def delete_function_service_mapping(id, session=None):
mapping = get_function_service_mapping(id)
session.delete(mapping)

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import sqlalchemy as sa
def apply_filters(query, model, **filters): def apply_filters(query, model, **filters):
filter_dict = {} filter_dict = {}
@ -45,22 +43,6 @@ def apply_filters(query, model, **filters):
else: else:
filter_dict[key] = value filter_dict[key] = value
# We need to handle tag case seprately. As tag datatype is MutableList.
# TODO(hparekh): Need to think how can we get rid of this.
tags = filters.pop('tags', None)
# To match the tag list, a resource must contain at least all of the
# tags present in the filter parameter.
if tags:
tag_attr = getattr(model, 'tags')
if not isinstance(tags, list):
expr = tag_attr.contains(tags)
else:
expr = sa.and_(*[tag_attr.contains(tag) for tag in tags])
query = query.filter(expr)
if filter_dict: if filter_dict:
query = query.filter_by(**filter_dict) query = query.filter_by(**filter_dict)

View File

@ -57,6 +57,7 @@ def upgrade():
sa.Column('timeout', sa.Integer, nullable=True), sa.Column('timeout', sa.Integer, nullable=True),
sa.Column('code', st.JsonLongDictType(), nullable=False), sa.Column('code', st.JsonLongDictType(), nullable=False),
sa.Column('entry', sa.String(length=80), nullable=False), sa.Column('entry', sa.String(length=80), nullable=False),
sa.Column('count', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'project_id'), sa.UniqueConstraint('name', 'project_id'),
info={"check_ifexists": True} info={"check_ifexists": True}
@ -64,11 +65,12 @@ def upgrade():
op.create_table( op.create_table(
'function_service_mapping', 'function_service_mapping',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True), sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True), sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('service_url', sa.String(length=255), nullable=False), sa.Column('service_url', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('function_id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('function_id', 'service_url'), sa.UniqueConstraint('function_id', 'service_url'),
sa.ForeignKeyConstraint( sa.ForeignKeyConstraint(
['function_id'], [u'function.id'], ondelete='CASCADE' ['function_id'], [u'function.id'], ondelete='CASCADE'

View File

@ -32,6 +32,7 @@ class Function(model_base.QinlingSecureModelBase):
timeout = sa.Column(sa.Integer) timeout = sa.Column(sa.Integer)
code = sa.Column(st.JsonLongDictType(), nullable=False) code = sa.Column(st.JsonLongDictType(), nullable=False)
entry = sa.Column(sa.String(80), nullable=False) entry = sa.Column(sa.String(80), nullable=False)
count = sa.Column(sa.Integer, default=0)
class FunctionServiceMapping(model_base.QinlingModelBase): class FunctionServiceMapping(model_base.QinlingModelBase):
@ -41,10 +42,10 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
sa.UniqueConstraint('function_id', 'service_url'), sa.UniqueConstraint('function_id', 'service_url'),
) )
id = model_base.id_column()
function_id = sa.Column( function_id = sa.Column(
sa.String(36), sa.String(36),
sa.ForeignKey(Function.id, ondelete='CASCADE'), sa.ForeignKey(Function.id, ondelete='CASCADE'),
primary_key=True,
) )
service_url = sa.Column(sa.String(255), nullable=False) service_url = sa.Column(sa.String(255), nullable=False)

View File

@ -127,10 +127,9 @@ class DefaultEngine(object):
} }
db_api.create_function_service_mapping(mapping) db_api.create_function_service_mapping(mapping)
def delete_function(self, ctx, function_id, function_name): def delete_function(self, ctx, function_id):
LOG.info('Start to delete function, id=%s', function_id) LOG.info('Start to delete function, id=%s', function_id)
labels = { labels = {'function_id': function_id}
'function_name': function_name, 'function_id': function_id
} self.orchestrator.delete_function(function_id, labels=labels)
self.orchestrator.delete_function(labels)

View File

@ -15,12 +15,14 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from oslo_service import service from oslo_service import service
from qinling.db import api as db_api from qinling.db import api as db_api
from qinling.engine import default_engine as engine from qinling.engine import default_engine as engine
from qinling.orchestrator import base as orchestra_base from qinling.orchestrator import base as orchestra_base
from qinling import rpc from qinling import rpc
from qinling.services import periodics
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -35,34 +37,41 @@ class EngineService(service.Service):
def start(self): def start(self):
orchestrator = orchestra_base.load_orchestrator(CONF) orchestrator = orchestra_base.load_orchestrator(CONF)
db_api.setup_db()
LOG.info('Starting periodic tasks...')
periodics.start(orchestrator)
topic = CONF.engine.topic topic = CONF.engine.topic
server = CONF.engine.host server = CONF.engine.host
transport = messaging.get_transport(CONF) transport = messaging.get_transport(CONF)
target = messaging.Target(topic=topic, server=server, fanout=False) target = messaging.Target(topic=topic, server=server, fanout=False)
endpoints = [engine.DefaultEngine(orchestrator)] endpoints = [engine.DefaultEngine(orchestrator)]
access_policy = dispatcher.DefaultRPCAccessPolicy
self.server = messaging.get_rpc_server( self.server = messaging.get_rpc_server(
transport, transport,
target, target,
endpoints, endpoints,
executor='eventlet', executor='eventlet',
access_policy=access_policy,
serializer=rpc.ContextSerializer( serializer=rpc.ContextSerializer(
messaging.serializer.JsonPayloadSerializer()) messaging.serializer.JsonPayloadSerializer())
) )
db_api.setup_db()
LOG.info('Starting engine...') LOG.info('Starting engine...')
self.server.start() self.server.start()
super(EngineService, self).start() super(EngineService, self).start()
def stop(self, graceful=False): def stop(self, graceful=False):
periodics.stop()
if self.server: if self.server:
LOG.info('Stopping engine...') LOG.info('Stopping engine...')
self.server.stop() self.server.stop()
if graceful: if graceful:
LOG.info( LOG.info(
'Consumer successfully stopped. Waiting for final ' 'Consumer successfully stopped. Waiting for final '
'messages to be processed...' 'messages to be processed...'
) )
self.server.wait() self.server.wait()

View File

@ -43,7 +43,7 @@ class OrchestratorBase(object):
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def delete_function(self, labels): def delete_function(self, function_id, **kwargs):
raise NotImplementedError raise NotImplementedError

View File

@ -157,7 +157,7 @@ class KubernetesManager(base.OrchestratorBase):
return pod return pod
def _prepare_pod(self, pod, deployment_name, function_id, service_labels): def _prepare_pod(self, pod, deployment_name, function_id, labels):
"""Pod preparation. """Pod preparation.
1. Update pod labels. 1. Update pod labels.
@ -186,10 +186,11 @@ class KubernetesManager(base.OrchestratorBase):
# Create service for the choosen pod. # Create service for the choosen pod.
service_name = "service-%s" % function_id service_name = "service-%s" % function_id
labels.update({'function_id': function_id})
service_body = self.service_template.render( service_body = self.service_template.render(
{ {
"service_name": service_name, "service_name": service_name,
"labels": service_labels, "labels": labels,
"selector": pod_labels "selector": pod_labels
} }
) )
@ -322,10 +323,24 @@ class KubernetesManager(base.OrchestratorBase):
return {'result': output} return {'result': output}
def delete_function(self, labels): def delete_function(self, function_id, labels=[]):
selector = common.convert_dict_to_string(labels) selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_service(
self.conf.kubernetes.namespace, label_selector=selector
)
names = [i.metadata.name for i in ret.items]
for svc_name in names:
self.v1.delete_namespaced_service(
svc_name,
self.conf.kubernetes.namespace,
)
LOG.info("Services for function %s deleted.", function_id)
self.v1.delete_collection_namespaced_pod( self.v1.delete_collection_namespaced_pod(
self.conf.kubernetes.namespace, self.conf.kubernetes.namespace,
label_selector=selector label_selector=selector
) )
LOG.info("Pod for function %s deleted.", function_id)

View File

@ -170,10 +170,9 @@ class EngineClient(object):
) )
@wrap_messaging_exception @wrap_messaging_exception
def delete_function(self, id, name): def delete_function(self, id):
return self._client.prepare(topic=self.topic, server=None).cast( return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(), ctx.get_ctx(),
'delete_function', 'delete_function',
function_id=id, function_id=id
function_name=name
) )

View File

View File

@ -0,0 +1,82 @@
# Copyright 2017 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import threadgroup
from qinling import context
from qinling.db import api as db_api
from qinling import rpc
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_THREAD_GROUP = None
def handle_function_service_expiration(ctx, engine_client, orchestrator):
context.set_ctx(ctx)
delta = timedelta(seconds=CONF.engine.function_service_expiration)
expiry_time = datetime.utcnow() - delta
results = db_api.get_functions(
fields=['id'],
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time}
)
expiry_ids = [ret.id for ret in results]
if not expiry_ids:
return
mappings = db_api.get_function_service_mappings(
function_id={'in': expiry_ids}
)
LOG.info('Found total expiry function mapping numbers: %s', len(mappings))
with db_api.transaction():
for m in mappings:
LOG.info('Deleting service mapping for function %s', m.function_id)
engine_client.delete_function(m.function_id)
db_api.delete_function_service_mapping(m.function_id)
def start(orchestrator):
global _THREAD_GROUP
_THREAD_GROUP = threadgroup.ThreadGroup()
engine_client = rpc.get_engine_client()
_THREAD_GROUP.add_timer(
300,
handle_function_service_expiration,
ctx=context.Context(),
engine_client=engine_client,
orchestrator=orchestrator
)
def stop():
global _THREAD_GROUP
if _THREAD_GROUP:
_THREAD_GROUP.stop()