Merge "Add scale up ability for function execution"

This commit is contained in:
Jenkins 2017-08-28 04:40:25 +00:00 committed by Gerrit Code Review
commit e1a475708a
10 changed files with 229 additions and 72 deletions

View File

@ -43,6 +43,11 @@ UPDATE_ALLOWED = set(['name', 'description', 'entry'])
class FunctionsController(rest.RestController):
_custom_actions = {
'scale_up': ['POST'],
'scale_down': ['POST'],
}
def __init__(self, *args, **kwargs):
self.storage_provider = storage_base.load_storage_provider(CONF)
self.engine_client = rpc.get_engine_client()
@ -216,3 +221,21 @@ class FunctionsController(rest.RestController):
self.engine_client.delete_function(id)
return resources.Function.from_dict(func_db.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
None,
types.uuid,
status_code=202
)
def scale_up(self, id):
"""Scale up the containers for function execution.
This is admin only operation. The number of added containers is defined
in config file.
"""
func_db = db_api.get_function(id)
LOG.info('Starting to scale up function %s', id)
self.engine_client.scaleup_function(id, runtime_id=func_db.runtime_id)

View File

@ -132,6 +132,11 @@ kubernetes_opts = [
'qinling_service_address',
help='Qinling API service ip address.'
),
cfg.IntOpt(
'scale_step',
default=1,
help='Number of pods for function scale up.'
),
]
CONF = cfg.CONF

View File

@ -162,6 +162,10 @@ def delete_function_service_mapping(id):
return IMPL.delete_function_service_mapping(id)
def create_function_worker(values):
return IMPL.create_function_worker(values)
def create_job(values):
return IMPL.create_job(values)

View File

@ -395,6 +395,21 @@ def delete_function_service_mapping(id, session=None):
session.delete(mapping)
@db_base.session_aware()
def create_function_worker(values, session=None):
mapping = models.FunctionWorkers()
mapping.update(values.copy())
try:
mapping.save(session=session)
except oslo_db_exc.DBDuplicateEntry as e:
raise exc.DBError(
"Duplicate entry for FunctionWorkers: %s" % e.columns
)
return mapping
@db_base.session_aware()
def create_job(values, session=None):
job = models.Job()

View File

@ -45,43 +45,7 @@ def _add_if_not_exists(element, compiler, **kw):
def upgrade():
op.create_table(
'function',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('runtime_id', sa.String(length=36), nullable=True),
sa.Column('memory_size', sa.Integer, nullable=True),
sa.Column('timeout', sa.Integer, nullable=True),
sa.Column('code', st.JsonLongDictType(), nullable=False),
sa.Column('entry', sa.String(length=80), nullable=False),
sa.Column('count', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'project_id'),
sa.ForeignKeyConstraint(['runtime_id'], [u'runtime.id']),
info={"check_ifexists": True}
)
op.create_table(
'function_service_mapping',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('service_url', sa.String(length=255), nullable=False),
sa.Column('worker_name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('function_id', 'service_url'),
sa.ForeignKeyConstraint(
['function_id'], [u'function.id'], ondelete='CASCADE'
),
info={"check_ifexists": True}
)
op.create_table(
'runtime',
'runtimes',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
@ -96,7 +60,56 @@ def upgrade():
)
op.create_table(
'execution',
'functions',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('runtime_id', sa.String(length=36), nullable=True),
sa.Column('memory_size', sa.Integer, nullable=True),
sa.Column('timeout', sa.Integer, nullable=True),
sa.Column('code', st.JsonLongDictType(), nullable=False),
sa.Column('entry', sa.String(length=80), nullable=False),
sa.Column('count', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name', 'project_id'),
sa.ForeignKeyConstraint(['runtime_id'], [u'runtimes.id']),
info={"check_ifexists": True}
)
op.create_table(
'function_service_mappings',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('service_url', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('function_id', 'service_url'),
sa.ForeignKeyConstraint(
['function_id'], [u'functions.id'], ondelete='CASCADE'
),
info={"check_ifexists": True}
)
op.create_table(
'function_workers',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('worker_name', sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(
['function_id'], [u'functions.id'], ondelete='CASCADE'
),
info={"check_ifexists": True}
)
op.create_table(
'executions',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
@ -113,7 +126,7 @@ def upgrade():
)
op.create_table(
'job',
'jobs',
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.Column('project_id', sa.String(length=80), nullable=False),
@ -128,6 +141,6 @@ def upgrade():
sa.Column('count', sa.Integer(), nullable=True),
sa.Column('trust_id', sa.String(length=80), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['function_id'], [u'function.id']),
sa.ForeignKeyConstraint(['function_id'], [u'functions.id']),
info={"check_ifexists": True}
)

View File

@ -21,7 +21,7 @@ from qinling.utils import common
class Runtime(model_base.QinlingSecureModelBase):
__tablename__ = 'runtime'
__tablename__ = 'runtimes'
__table_args__ = (
sa.UniqueConstraint('image', 'project_id'),
@ -34,7 +34,7 @@ class Runtime(model_base.QinlingSecureModelBase):
class Function(model_base.QinlingSecureModelBase):
__tablename__ = 'function'
__tablename__ = 'functions'
__table_args__ = (
sa.UniqueConstraint('name', 'project_id'),
@ -54,7 +54,7 @@ class Function(model_base.QinlingSecureModelBase):
class FunctionServiceMapping(model_base.QinlingModelBase):
__tablename__ = 'function_service_mapping'
__tablename__ = 'function_service_mappings'
__table_args__ = (
sa.UniqueConstraint('function_id', 'service_url'),
@ -66,11 +66,21 @@ class FunctionServiceMapping(model_base.QinlingModelBase):
sa.ForeignKey(Function.id, ondelete='CASCADE'),
)
service_url = sa.Column(sa.String(255), nullable=False)
class FunctionWorkers(model_base.QinlingModelBase):
__tablename__ = 'function_workers'
id = model_base.id_column()
function_id = sa.Column(
sa.String(36),
sa.ForeignKey(Function.id, ondelete='CASCADE'),
)
worker_name = sa.Column(sa.String(255), nullable=False)
class Execution(model_base.QinlingSecureModelBase):
__tablename__ = 'execution'
__tablename__ = 'executions'
function_id = sa.Column(sa.String(36), nullable=False)
status = sa.Column(sa.String(32), nullable=False)
@ -82,7 +92,7 @@ class Execution(model_base.QinlingSecureModelBase):
class Job(model_base.QinlingSecureModelBase):
__tablename__ = 'job'
__tablename__ = 'jobs'
name = sa.Column(sa.String(255), nullable=True)
pattern = sa.Column(sa.String(32), nullable=True)
@ -111,6 +121,11 @@ Function.service = relationship(
uselist=False,
cascade="all, delete-orphan"
)
# Delete workers automatically when deleting function.
Function.workers = relationship(
"FunctionWorkers",
cascade="all, delete-orphan"
)
Runtime.functions = relationship("Function", back_populates="runtime")

View File

@ -103,16 +103,16 @@ class DefaultEngine(object):
data = {'input': input, 'execution_id': execution_id}
r = requests.post(func_url, json=data)
logs = self.orchestrator.get_execution_log(
execution_id,
worker_name=function.service.worker_name,
)
# logs = self.orchestrator.get_execution_log(
# execution_id,
# worker_name=function.service.worker_name,
# )
LOG.debug('Finished execution %s', execution_id)
execution.status = status.SUCCESS
execution.output = r.json()
execution.logs = logs
# execution.logs = logs
return
source = function.code['source']
@ -169,9 +169,13 @@ class DefaultEngine(object):
mapping = {
'function_id': function_id,
'service_url': service_url,
'worker_name': worker_name
}
db_api.create_function_service_mapping(mapping)
worker = {
'function_id': function_id,
'worker_name': worker_name
}
db_api.create_function_worker(worker)
def delete_function(self, ctx, function_id):
resource = {'type': 'function', 'id': function_id}
@ -181,3 +185,22 @@ class DefaultEngine(object):
self.orchestrator.delete_function(function_id, labels=labels)
LOG.info('Deleted.', resource=resource)
def scaleup_function(self, ctx, function_id, runtime_id):
function = db_api.get_function(function_id)
worker_names = self.orchestrator.scaleup_function(
function_id,
identifier=runtime_id,
entry=function.entry
)
with db_api.transaction():
for name in worker_names:
worker = {
'function_id': function_id,
'worker_name': name
}
db_api.create_function_worker(worker)
LOG.info('Finished scaling up function %s.', function_id)

View File

@ -54,6 +54,10 @@ class OrchestratorBase(object):
def delete_function(self, function_id, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def scaleup_function(self, function_id, **kwargs):
raise NotImplementedError
def load_orchestrator(conf):
global ORCHESTRATOR

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import os
import time
@ -195,7 +196,7 @@ class KubernetesManager(base.OrchestratorBase):
return True
def _choose_available_pod(self, labels):
def _choose_available_pod(self, labels, count=1):
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_pod(
@ -206,18 +207,17 @@ class KubernetesManager(base.OrchestratorBase):
if len(ret.items) == 0:
return None
# Choose the last available one by default.
pod = ret.items[-1]
return ret.items[-count:]
return pod
def _prepare_pod(self, pod, deployment_name, function_id, labels, entry):
def _prepare_pod(self, pod, deployment_name, function_id, labels=None,
entry=None, actual_function=None):
"""Pod preparation.
1. Update pod labels.
2. Expose service and trigger package download.
"""
name = pod.metadata.name
actual_function = actual_function or function_id
LOG.info(
'Prepare pod %s in deployment %s for function %s',
@ -225,18 +225,7 @@ class KubernetesManager(base.OrchestratorBase):
)
# Update pod label.
pod_labels = pod.metadata.labels or {}
pod_labels.update({'function_id': function_id})
body = {
'metadata': {
'labels': pod_labels
}
}
self.v1.patch_namespaced_pod(
name, self.conf.kubernetes.namespace, body
)
LOG.debug('Labels updated for pod %s', name)
pod_labels = self._update_pod_label(pod, {'function_id': function_id})
# Create service for the chosen pod.
service_name = "service-%s" % function_id
@ -278,12 +267,12 @@ class KubernetesManager(base.OrchestratorBase):
download_url = (
'http://%s:%s/v1/functions/%s?download=true' %
(self.conf.kubernetes.qinling_service_address,
self.conf.api.port, function_id)
self.conf.api.port, actual_function)
)
data = {
'download_url': download_url,
'function_id': function_id,
'function_id': actual_function,
'entry': entry,
'token': context.get_ctx().auth_token,
}
@ -325,6 +314,24 @@ class KubernetesManager(base.OrchestratorBase):
body=yaml.safe_load(pod_body),
)
def _update_pod_label(self, pod, new_label=None):
name = pod.metadata.name
pod_labels = copy.deepcopy(pod.metadata.labels) or {}
pod_labels.update(new_label)
body = {
'metadata': {
'labels': pod_labels
}
}
self.v1.patch_namespaced_pod(
name, self.conf.kubernetes.namespace, body
)
LOG.debug('Labels updated for pod %s', name)
return pod_labels
def prepare_execution(self, function_id, image=None, identifier=None,
labels=None, input=None, entry='main.main'):
"""Prepare service URL for function.
@ -346,7 +353,8 @@ class KubernetesManager(base.OrchestratorBase):
if not pod:
raise exc.OrchestratorException('No pod available.')
return self._prepare_pod(pod, identifier, function_id, labels, entry)
return self._prepare_pod(pod[0], identifier, function_id, labels,
entry)
def run_execution(self, execution_id, function_id, input=None,
identifier=None, service_url=None):
@ -415,3 +423,41 @@ class KubernetesManager(base.OrchestratorBase):
)
LOG.info("Pod for function %s deleted.", function_id)
def scaleup_function(self, function_id, identifier=None,
entry='main.main'):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pod(
labels, count=self.conf.kubernetes.scale_step
)
if not pods:
raise exc.OrchestratorException('Not enough pods available.')
temp_function = '%s-temp' % function_id
for pod in pods:
self._prepare_pod(pod, identifier, temp_function, labels, entry,
actual_function=function_id)
# Delete temporary service
selector = common.convert_dict_to_string(
{'function_id': temp_function}
)
ret = self.v1.list_namespaced_service(
self.conf.kubernetes.namespace, label_selector=selector
)
svc_names = [i.metadata.name for i in ret.items]
for svc_name in svc_names:
self.v1.delete_namespaced_service(
svc_name,
self.conf.kubernetes.namespace,
)
# Modify pod labels to fit into correct service
self._update_pod_label(pod, {'function_id': function_id})
pod_names.append(pod.metadata.name)
LOG.info('Pods scaled up for function %s: %s', function_id, pod_names)
return pod_names

View File

@ -186,3 +186,12 @@ class EngineClient(object):
'delete_function',
function_id=id
)
@wrap_messaging_exception
def scaleup_function(self, id, runtime_id):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaleup_function',
function_id=id,
runtime_id=runtime_id,
)