Browse Source

Support version for execution creation

Now, all the underlying resources(pod, service) in k8s cluster have
version number in their labels. Different versions of the same function
will have different services exposed in k8s cluster.

Change-Id: Ic0b3045404105175073844b908fa0f6c2ef2ab8a
Story: #2001829
Task: #14350
changes/83/562883/9
Lingxian Kong 3 years ago
parent
commit
885ed28234
  1. 3
      qinling/api/controllers/v1/execution.py
  2. 4
      qinling/api/controllers/v1/function.py
  3. 29
      qinling/api/controllers/v1/resources.py
  4. 9
      qinling/db/api.py
  5. 14
      qinling/db/sqlalchemy/api.py
  6. 1
      qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py
  7. 1
      qinling/db/sqlalchemy/models.py
  8. 97
      qinling/engine/default_engine.py
  9. 24
      qinling/engine/utils.py
  10. 9
      qinling/orchestrator/base.py
  11. 84
      qinling/orchestrator/kubernetes/manager.py
  12. 2
      qinling/orchestrator/kubernetes/templates/service.j2
  13. 15
      qinling/rpc.py
  14. 33
      qinling/services/periodics.py
  15. 21
      qinling/tests/unit/api/controllers/v1/test_execution.py
  16. 252
      qinling/tests/unit/engine/test_default_engine.py
  17. 180
      qinling/tests/unit/orchestrator/kubernetes/test_manager.py
  18. 44
      qinling/tests/unit/services/test_periodics.py
  19. 31
      qinling/utils/etcd_util.py
  20. 73
      qinling/utils/executions.py

3
qinling/api/controllers/v1/execution.py

@ -56,6 +56,9 @@ class ExecutionsController(rest.RestController):
status_code=201
)
def post(self, body):
ctx = context.get_ctx()
acl.enforce('execution:create', ctx)
params = body.to_dict()
LOG.info("Creating %s. [params=%s]", self.type, params)

4
qinling/api/controllers/v1/function.py

@ -55,12 +55,12 @@ class FunctionWorkerController(rest.RestController):
acl.enforce('function_worker:get_all', context.get_ctx())
LOG.info("Get workers for function %s.", function_id)
workers = etcd_util.get_workers(function_id, CONF)
workers = etcd_util.get_workers(function_id)
workers = [
resources.FunctionWorker.from_dict(
{'function_id': function_id, 'worker_name': w}
) for w in workers
]
]
return resources.FunctionWorkers(workers=workers)

29
qinling/api/controllers/v1/resources.py

@ -278,6 +278,7 @@ class Runtimes(ResourceList):
class Execution(Resource):
id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True)
function_version = wsme.wsattr(int, default=0)
description = wtypes.text
status = wsme.wsattr(wtypes.text, readonly=True)
sync = bool
@ -303,21 +304,6 @@ class Execution(Resource):
return obj
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
function_id='123e4567-e89b-12d3-a456-426655440000',
description='this is the first execution.',
status='success',
sync=True,
input={'data': 'hello, world'},
result={'result': 'hello, world'},
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Executions(ResourceList):
executions = [Execution]
@ -327,18 +313,6 @@ class Executions(ResourceList):
super(Executions, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.executions = [Execution.sample()]
sample.next = (
"http://localhost:7070/v1/executions?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000"
)
return sample
class Job(Resource):
id = types.uuid
@ -420,6 +394,7 @@ class FunctionVersion(Resource):
id = types.uuid
description = wtypes.text
version_number = wsme.wsattr(int, readonly=True)
count = wsme.wsattr(int, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)

9
qinling/db/api.py

@ -212,5 +212,14 @@ def get_function_version(function_id, version):
return IMPL.get_function_version(function_id, version)
# This function is only used in unit test.
def update_function_version(function_id, version, **kwargs):
return IMPL.update_function_version(function_id, version, **kwargs)
def delete_function_version(function_id, version):
return IMPL.delete_function_version(function_id, version)
def get_function_versions(**kwargs):
return IMPL.get_function_versions(**kwargs)

14
qinling/db/sqlalchemy/api.py

@ -529,7 +529,21 @@ def get_function_version(function_id, version, session=None):
return version_db
# This function is only used in unit test.
@db_base.session_aware()
def update_function_version(function_id, version, session=None, **kwargs):
version_db = get_function_version(function_id, version, session=session)
version_db.update(kwargs.copy())
return version_db
@db_base.session_aware()
def delete_function_version(function_id, version, session=None):
version_db = get_function_version(function_id, version)
session.delete(version_db)
@db_base.session_aware()
def get_function_versions(session=None, **kwargs):
return _get_collection_sorted_by_time(models.FunctionVersion, **kwargs)

1
qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py

@ -38,6 +38,7 @@ def upgrade():
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('version_number', sa.Integer, nullable=False),
sa.Column('count', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['function_id'], [u'functions.id']),
sa.UniqueConstraint('function_id', 'version_number', 'project_id'),

1
qinling/db/sqlalchemy/models.py

@ -119,6 +119,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase):
function = relationship('Function', back_populates="versions")
description = sa.Column(sa.String(255), nullable=True)
version_number = sa.Column(sa.Integer, default=0)
count = sa.Column(sa.Integer, default=0)
Runtime.functions = relationship("Function", back_populates="runtime")

97
qinling/engine/default_engine.py

@ -89,33 +89,41 @@ class DefaultEngine(object):
stop=tenacity.stop_after_attempt(30),
retry=(tenacity.retry_if_result(lambda result: result is False))
)
def function_load_check(self, function_id, runtime_id):
with etcd_util.get_worker_lock() as lock:
def function_load_check(self, function_id, version, runtime_id):
"""Check function load and scale the workers if needed.
:return: None if no need to scale up otherwise return the service url
"""
with etcd_util.get_worker_lock(function_id, version) as lock:
if not lock.is_acquired():
return False
workers = etcd_util.get_workers(function_id)
workers = etcd_util.get_workers(function_id, version)
running_execs = db_api.get_executions(
function_id=function_id, status=status.RUNNING
function_id=function_id,
function_version=version,
status=status.RUNNING
)
concurrency = (len(running_execs) or 1) / (len(workers) or 1)
if (len(workers) == 0 or
concurrency > CONF.engine.function_concurrency):
LOG.info(
'Scale up function %s. Current concurrency: %s, execution '
'number %s, worker number %s',
function_id, concurrency, len(running_execs), len(workers)
'Scale up function %s(version %s). Current concurrency: '
'%s, execution number %s, worker number %s',
function_id, version, concurrency, len(running_execs),
len(workers)
)
# NOTE(kong): The increase step could be configurable
return self.scaleup_function(None, function_id, runtime_id, 1)
return self.scaleup_function(None, function_id, version,
runtime_id, 1)
def create_execution(self, ctx, execution_id, function_id, runtime_id,
input=None):
def create_execution(self, ctx, execution_id, function_id,
function_version, runtime_id, input=None):
LOG.info(
'Creating execution. execution_id=%s, function_id=%s, '
'runtime_id=%s, input=%s',
execution_id, function_id, runtime_id, input
'function_version=%s, runtime_id=%s, input=%s',
execution_id, function_id, function_version, runtime_id, input
)
function = db_api.get_function(function_id)
@ -129,22 +137,25 @@ class DefaultEngine(object):
# Auto scale workers if needed
if not is_image_source:
try:
svc_url = self.function_load_check(function_id, runtime_id)
svc_url = self.function_load_check(function_id,
function_version,
runtime_id)
except exc.OrchestratorException as e:
utils.handle_execution_exception(execution_id, str(e))
return
temp_url = etcd_util.get_service_url(function_id)
temp_url = etcd_util.get_service_url(function_id, function_version)
svc_url = svc_url or temp_url
if svc_url:
func_url = '%s/execute' % svc_url
LOG.debug(
'Found service url for function: %s, execution: %s, url: %s',
function_id, execution_id, func_url
'Found service url for function: %s(version %s), execution: '
'%s, url: %s',
function_id, function_version, execution_id, func_url
)
data = utils.get_request_data(
CONF, function_id, execution_id,
CONF, function_id, function_version, execution_id,
input, function.entry, function.trust_id,
self.qinling_endpoint
)
@ -152,12 +163,13 @@ class DefaultEngine(object):
self.session, func_url, body=data
)
utils.finish_execution(
execution_id, success, res, is_image_source=is_image_source)
utils.finish_execution(execution_id, success, res,
is_image_source=is_image_source)
return
if source == constants.IMAGE_FUNCTION:
image = function.code['image']
# Be consistent with k8s naming convention
identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False),
function_id)
@ -167,8 +179,13 @@ class DefaultEngine(object):
labels = {'runtime_id': runtime_id}
try:
# For image function, it will be executed inside this method; for
# package type function it only sets up underlying resources and
# get a service url. If the service url is already created
# beforehand, nothing happens.
_, svc_url = self.orchestrator.prepare_execution(
function_id,
function_version,
image=image,
identifier=identifier,
labels=labels,
@ -178,9 +195,12 @@ class DefaultEngine(object):
utils.handle_execution_exception(execution_id, str(e))
return
# For image type function, read the worker log; For package type
# function, invoke and get log
success, res = self.orchestrator.run_execution(
execution_id,
function_id,
function_version,
input=input,
identifier=identifier,
service_url=svc_url,
@ -188,34 +208,43 @@ class DefaultEngine(object):
trust_id=function.trust_id
)
utils.finish_execution(
execution_id, success, res, is_image_source=is_image_source)
utils.finish_execution(execution_id, success, res,
is_image_source=is_image_source)
def delete_function(self, ctx, function_id):
def delete_function(self, ctx, function_id, function_version=0):
"""Deletes underlying resources allocated for function."""
LOG.info('Start to delete function %s.', function_id)
LOG.info('Start to delete function %s(version %s).', function_id,
function_version)
self.orchestrator.delete_function(function_id)
self.orchestrator.delete_function(function_id, function_version)
LOG.info('Deleted function %s.', function_id)
LOG.info('Deleted function %s(version %s).', function_id,
function_version)
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
def scaleup_function(self, ctx, function_id, function_version, runtime_id,
count=1):
worker_names, service_url = self.orchestrator.scaleup_function(
function_id,
function_version,
identifier=runtime_id,
count=count
)
for name in worker_names:
etcd_util.create_worker(function_id, name)
etcd_util.create_worker(function_id, name,
version=function_version)
etcd_util.create_service_url(function_id, service_url,
version=function_version)
etcd_util.create_service_url(function_id, service_url)
LOG.info('Finished scaling up function %s(version %s).', function_id,
function_version)
LOG.info('Finished scaling up function %s.', function_id)
return service_url
def scaledown_function(self, ctx, function_id, count=1):
workers = etcd_util.get_workers(function_id)
def scaledown_function(self, ctx, function_id, function_version=0,
count=1):
workers = etcd_util.get_workers(function_id, function_version)
worker_deleted_num = (
count if len(workers) > count else len(workers) - 1
)
@ -224,6 +253,8 @@ class DefaultEngine(object):
for worker in workers:
LOG.debug('Removing worker %s', worker)
self.orchestrator.delete_worker(worker)
etcd_util.delete_worker(function_id, worker)
etcd_util.delete_worker(function_id, worker,
version=function_version)
LOG.info('Finished scaling down function %s.', function_id)
LOG.info('Finished scaling down function %s(version %s).', function_id,
function_version)

24
qinling/engine/utils.py

@ -74,17 +74,29 @@ def url_request(request_session, url, body=None):
return False, {'error': 'Internal service error.'}
def get_request_data(conf, function_id, execution_id, input, entry, trust_id,
qinling_endpoint):
def get_request_data(conf, function_id, version, execution_id, input, entry,
trust_id, qinling_endpoint):
"""Prepare the request body should send to the worker."""
ctx = context.get_ctx()
download_url = (
'%s/%s/functions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION, function_id)
)
if version == 0:
download_url = (
'%s/%s/functions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION,
function_id)
)
else:
download_url = (
'%s/%s/functions/%s/versions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION,
function_id, version)
)
data = {
'execution_id': execution_id,
'input': input,
'function_id': function_id,
'function_version': version,
'entry': entry,
'download_url': download_url,
'request_id': ctx.request_id,

9
qinling/orchestrator/base.py

@ -39,19 +39,20 @@ class OrchestratorBase(object):
raise NotImplementedError
@abc.abstractmethod
def prepare_execution(self, function_id, **kwargs):
def prepare_execution(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def run_execution(self, execution_id, function_id, **kwargs):
def run_execution(self, execution_id, function_id, function_version,
**kwargs):
raise NotImplementedError
@abc.abstractmethod
def delete_function(self, function_id, **kwargs):
def delete_function(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def scaleup_function(self, function_id, **kwargs):
def scaleup_function(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod

84
qinling/orchestrator/kubernetes/manager.py

@ -29,6 +29,7 @@ from qinling.orchestrator import base
from qinling.orchestrator.kubernetes import utils as k8s_util
from qinling.utils import common
LOG = logging.getLogger(__name__)
TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/')
@ -218,17 +219,20 @@ class KubernetesManager(base.OrchestratorBase):
return True
def _choose_available_pods(self, labels, count=1, function_id=None):
def _choose_available_pods(self, labels, count=1, function_id=None,
function_version=0):
# If there is already a pod for function, reuse it.
if function_id:
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='function_id=%s' % function_id
label_selector='function_id=%s,function_version=%s' %
(function_id, function_version)
)
if len(ret.items) >= count:
LOG.debug(
"Function %s already associates to a pod with at least "
"%d worker(s). ", function_id, count
"Function %s(version %s) already associates to a pod with "
"at least %d worker(s). ",
function_id, function_version, count
)
return ret.items[:count]
@ -243,7 +247,8 @@ class KubernetesManager(base.OrchestratorBase):
return ret.items[-count:]
def _prepare_pod(self, pod, deployment_name, function_id, labels=None):
def _prepare_pod(self, pod, deployment_name, function_id, version,
labels=None):
"""Pod preparation.
1. Update pod labels.
@ -253,16 +258,22 @@ class KubernetesManager(base.OrchestratorBase):
labels = labels or {}
LOG.info(
'Prepare pod %s in deployment %s for function %s',
pod_name, deployment_name, function_id
'Prepare pod %s in deployment %s for function %s(version %s)',
pod_name, deployment_name, function_id, version
)
# Update pod label.
pod_labels = self._update_pod_label(pod, {'function_id': function_id})
pod_labels = self._update_pod_label(
pod,
# pod label value should be string
{'function_id': function_id, 'function_version': str(version)}
)
# Create service for the chosen pod.
service_name = "service-%s" % function_id
labels.update({'function_id': function_id})
service_name = "service-%s-%s" % (function_id, version)
labels.update(
{'function_id': function_id, 'function_version': str(version)}
)
# TODO(kong): Make the service type configurable.
service_body = self.service_template.render(
@ -314,6 +325,7 @@ class KubernetesManager(base.OrchestratorBase):
return pod_name, pod_service_url
def _create_pod(self, image, pod_name, labels, input):
"""Create pod for image type function."""
if not input:
input_list = []
elif isinstance(input, dict) and input.get('__function_input'):
@ -357,15 +369,17 @@ class KubernetesManager(base.OrchestratorBase):
return pod_labels
def prepare_execution(self, function_id, image=None, identifier=None,
labels=None, input=None):
"""Prepare service URL for function.
def prepare_execution(self, function_id, version, image=None,
identifier=None, labels=None, input=None):
"""Prepare service URL for function version.
For image function, create a single pod with input, so the function
will be executed.
For normal function, choose a pod from the pool and expose a service,
return the service URL.
return a tuple includes pod name and the servise url.
"""
pods = None
@ -375,7 +389,8 @@ class KubernetesManager(base.OrchestratorBase):
self._create_pod(image, identifier, labels, input)
return identifier, None
else:
pods = self._choose_available_pods(labels, function_id=function_id)
pods = self._choose_available_pods(labels, function_id=function_id,
function_version=version)
if not pods:
LOG.critical('No worker available.')
@ -383,31 +398,35 @@ class KubernetesManager(base.OrchestratorBase):
try:
pod_name, url = self._prepare_pod(
pods[0], identifier, function_id, labels
pods[0], identifier, function_id, version, labels
)
return pod_name, url
except Exception:
LOG.exception('Pod preparation failed.')
self.delete_function(function_id, labels)
self.delete_function(function_id, version, labels)
raise exc.OrchestratorException('Execution preparation failed.')
def run_execution(self, execution_id, function_id, input=None,
def run_execution(self, execution_id, function_id, version, input=None,
identifier=None, service_url=None, entry='main.main',
trust_id=None):
"""Run execution and get output."""
"""Run execution.
Return a tuple including the result and the output.
"""
if service_url:
func_url = '%s/execute' % service_url
data = utils.get_request_data(
self.conf, function_id, execution_id, input, entry, trust_id,
self.qinling_endpoint
self.conf, function_id, version, execution_id, input, entry,
trust_id, self.qinling_endpoint
)
LOG.debug(
'Invoke function %s, url: %s, data: %s',
function_id, func_url, data
'Invoke function %s(version %s), url: %s, data: %s',
function_id, version, func_url, data
)
return utils.url_request(self.session, func_url, body=data)
else:
# Wait for image type function execution to be finished
def _wait_complete():
pod = self.v1.read_namespaced_pod(
identifier,
@ -437,8 +456,17 @@ class KubernetesManager(base.OrchestratorBase):
)
return True, output
def delete_function(self, function_id, labels=None):
labels = labels or {'function_id': function_id}
def delete_function(self, function_id, version, labels=None):
"""Delete related resources for function.
- Delete service
- Delete pods
"""
pre_label = {
'function_id': function_id,
'function_version': str(version)
}
labels = labels or pre_label
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_service(
@ -456,7 +484,7 @@ class KubernetesManager(base.OrchestratorBase):
label_selector=selector
)
def scaleup_function(self, function_id, identifier=None, count=1):
def scaleup_function(self, function_id, version, identifier=None, count=1):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pods(labels, count=count)
@ -466,11 +494,13 @@ class KubernetesManager(base.OrchestratorBase):
for pod in pods:
pod_name, service_url = self._prepare_pod(
pod, identifier, function_id, labels
pod, identifier, function_id, version, labels
)
pod_names.append(pod_name)
LOG.info('Pods scaled up for function %s: %s', function_id, pod_names)
LOG.info('Pods scaled up for function %s(version %s): %s', function_id,
version, pod_names)
return pod_names, service_url
def delete_worker(self, pod_name, **kwargs):

2
qinling/orchestrator/kubernetes/templates/service.j2

@ -4,7 +4,7 @@ metadata:
name: {{ service_name }}
labels:
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{{ key }}: "{{ value }}"
{% endfor %}
spec:
type: NodePort

15
qinling/rpc.py

@ -153,7 +153,7 @@ class EngineClient(object):
)
@wrap_messaging_exception
def create_execution(self, execution_id, function_id, runtime_id,
def create_execution(self, execution_id, function_id, version, runtime_id,
input=None, is_sync=True):
method_client = self._client.prepare(topic=self.topic, server=None)
@ -163,6 +163,7 @@ class EngineClient(object):
'create_execution',
execution_id=execution_id,
function_id=function_id,
function_version=version,
runtime_id=runtime_id,
input=input
)
@ -172,33 +173,37 @@ class EngineClient(object):
'create_execution',
execution_id=execution_id,
function_id=function_id,
function_version=version,
runtime_id=runtime_id,
input=input
)
@wrap_messaging_exception
def delete_function(self, id):
def delete_function(self, id, version=0):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'delete_function',
function_id=id
function_id=id,
function_version=version
)
@wrap_messaging_exception
def scaleup_function(self, id, runtime_id, count=1):
def scaleup_function(self, id, runtime_id, version=0, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaleup_function',
function_id=id,
runtime_id=runtime_id,
function_version=version,
count=count
)
@wrap_messaging_exception
def scaledown_function(self, id, count=1):
def scaledown_function(self, id, version=0, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaledown_function',
function_id=id,
function_version=version,
count=count
)

33
qinling/services/periodics.py

@ -51,24 +51,43 @@ def handle_function_service_expiration(ctx, engine):
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time}
updated_at={'lte': expiry_time},
latest_version=0
)
if len(results) == 0:
return
for func_db in results:
if not etcd_util.get_service_url(func_db.id):
if not etcd_util.get_service_url(func_db.id, 0):
continue
LOG.info(
'Deleting service mapping and workers for function %s',
'Deleting service mapping and workers for function %s(version 0)',
func_db.id
)
# Delete resources related to the function
engine.delete_function(ctx, func_db.id)
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id)
etcd_util.delete_function(func_db.id, 0)
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
LOG.info(
'Deleting service mapping and workers for function %s(version %s)',
v.function_id, v.version_number
)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
@periodics.periodic(3)

21
qinling/tests/unit/api/controllers/v1/test_execution.py

@ -14,6 +14,7 @@
import mock
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import status
from qinling.tests.unit.api import base
@ -40,7 +41,25 @@ class TestExecutionController(base.APITest):
self.assertEqual(1, resp.json.get('count'))
@mock.patch('qinling.rpc.EngineClient.create_execution')
def test_create_rpc_error(self, mock_create_execution):
def test_post_with_version(self, mock_rpc):
db_api.increase_function_version(self.func_id, 0,
description="version 1")
body = {
'function_id': self.func_id,
'function_version': 1
}
resp = self.app.post_json('/v1/executions', body)
self.assertEqual(201, resp.status_int)
resp = self.app.get('/v1/functions/%s' % self.func_id)
self.assertEqual(0, resp.json.get('count'))
resp = self.app.get('/v1/functions/%s/versions/1' % self.func_id)
self.assertEqual(1, resp.json.get('count'))
@mock.patch('qinling.rpc.EngineClient.create_execution')
def test_post_rpc_error(self, mock_create_execution):
mock_create_execution.side_effect = exc.QinlingException
body = {
'function_id': self.func_id,

252
qinling/tests/unit/engine/test_default_engine.py

@ -29,10 +29,11 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator = mock.Mock()
self.qinling_endpoint = 'http://127.0.0.1:7070'
self.default_engine = default_engine.DefaultEngine(
self.orchestrator, self.qinling_endpoint)
self.orchestrator, self.qinling_endpoint
)
def _create_running_executions(self, function_id, num):
for _i in range(num):
for _ in range(num):
self.create_execution(function_id=function_id)
def test_create_runtime(self):
@ -110,113 +111,91 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(runtime.image, pre_image)
self.assertEqual(runtime.status, status.AVAILABLE)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_no_worker_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_no_worker(self, mock_getlock, mock_getworkers,
mock_scaleup):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
etcd_util_get_workers_mock.return_value = [] # len(workers) = 0
self.default_engine.scaleup_function = mock.Mock()
mock_getlock.return_value.__enter__.return_value = lock
mock_getworkers.return_value = []
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_called_once_with(
None, function_id, runtime_id, 1)
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id,
1)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_concurrency_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_scaleup(self, mock_getlock, mock_getworkers,
mock_scaleup):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
mock_getlock.return_value.__enter__.return_value = lock
# The default concurrency is 3, we use 4 running executions against
# 1 worker so that there will be a scaling up.
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 4)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_called_once_with(
None, function_id, runtime_id, 1)
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id,
1)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_not_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_not_scaleup(self, mock_getlock,
mock_getworkers, mock_scaleup):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
mock_getlock.return_value.__enter__.return_value = lock
# The default concurrency is 3, we use 3 running executions against
# 1 worker so that there won't be a scaling up.
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 3)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_not_called()
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_not_called()
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_lock_wait(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_lock_wait(self, mock_getlock,
mock_getworkers):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
mock_getlock.return_value.__enter__.return_value = lock
# Lock is acquired upon the third try.
lock.is_acquired.side_effect = [False, False, True]
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 3)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
self.assertEqual(3, lock.is_acquired.call_count)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_not_called()
mock_getworkers.assert_called_once_with(function_id, 0)
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution(
self,
etcd_util_get_service_url_mock
):
def test_create_execution_image_type_function(self, mock_svc_url):
"""Create 2 executions for an image type function."""
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -234,42 +213,47 @@ class TestDefaultEngine(base.DbTestCase):
execution_1_id = execution_1.id
execution_2 = self.create_execution(function_id=function_id)
execution_2_id = execution_2.id
self.default_engine.function_load_check = mock.Mock()
etcd_util_get_service_url_mock.return_value = None
mock_svc_url.return_value = None
self.orchestrator.prepare_execution.return_value = (
mock.Mock(), None)
self.orchestrator.run_execution.side_effect = [
(True, 'success result'),
(False, 'failed result')]
# Try create two executions, with different results
# Create two executions, with different results
self.default_engine.create_execution(
mock.Mock(), execution_1_id, function_id, runtime_id)
mock.Mock(), execution_1_id, function_id, 0, runtime_id
)
self.default_engine.create_execution(
mock.Mock(), execution_2_id, function_id, runtime_id,
input='input')
mock.Mock(), execution_2_id, function_id, 0, runtime_id,
input='input'
)
self.default_engine.function_load_check.assert_not_called()
get_service_url_calls = [
mock.call(function_id), mock.call(function_id)]
etcd_util_get_service_url_mock.assert_has_calls(get_service_url_calls)
self.assertEqual(2, etcd_util_get_service_url_mock.call_count)
mock.call(function_id, 0), mock.call(function_id, 0)
]
mock_svc_url.assert_has_calls(get_service_url_calls)
prepare_calls = [
mock.call(function_id,
0,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
input=None),
mock.call(function_id,
0,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
input='input')]
input='input')
]
self.orchestrator.prepare_execution.assert_has_calls(prepare_calls)
self.assertEqual(2, self.orchestrator.prepare_execution.call_count)
run_calls = [
mock.call(execution_1_id,
function_id,
0,
input=None,
identifier=mock.ANY,
service_url=None,
@ -277,15 +261,18 @@ class TestDefaultEngine(base.DbTestCase):
trust_id=function.trust_id),
mock.call(execution_2_id,
function_id,
0,
input='input',
identifier=mock.ANY,
service_url=None,
entry=function.entry,
trust_id=function.trust_id)]
trust_id=function.trust_id)
]
self.orchestrator.run_execution.assert_has_calls(run_calls)
self.assertEqual(2, self.orchestrator.run_execution.call_count)
execution_1 = db_api.get_execution(execution_1_id)
execution_2 = db_api.get_execution(execution_2_id)
self.assertEqual(execution_1.status, status.SUCCESS)
self.assertEqual(execution_1.logs, '')
self.assertEqual(execution_1.result, {'output': 'success result'})
@ -298,6 +285,11 @@ class TestDefaultEngine(base.DbTestCase):
self,
etcd_util_get_service_url_mock
):
"""test_create_execution_prepare_execution_exception
Create execution for image type function, prepare_execution method
raises exception.
"""
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -320,7 +312,7 @@ class TestDefaultEngine(base.DbTestCase):
etcd_util_get_service_url_mock.return_value = None
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR)
@ -328,9 +320,9 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(execution.result, {})
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_not_image_source(
self,
etcd_util_get_service_url_mock
def test_create_execution_package_type_function(
self,
etcd_util_get_service_url_mock
):
function = self.create_function()
function_id = function.id
@ -347,24 +339,26 @@ class TestDefaultEngine(base.DbTestCase):
'output': 'success output'})
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
self.default_engine.function_load_check.assert_called_once_with(
function_id, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id)
function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
self.orchestrator.prepare_execution.assert_called_once_with(
function_id, image=None, identifier=runtime_id,
function_id, 0, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id}, input=None)
self.orchestrator.run_execution.assert_called_once_with(
execution_id, function_id, input=None, identifier=runtime_id,
execution_id, function_id, 0, input=None, identifier=runtime_id,
service_url='svc_url', entry=function.entry,
trust_id=function.trust_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.SUCCESS)
self.assertEqual(execution.logs, 'execution log')
self.assertEqual(execution.result, {'output': 'success output'})
def test_create_execution_not_image_source_scaleup_exception(self):
def test_create_execution_loadcheck_exception(self):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -377,9 +371,10 @@ class TestDefaultEngine(base.DbTestCase):
)
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR)
self.assertEqual(execution.logs, '')
self.assertEqual(execution.result, {})
@ -388,10 +383,10 @@ class TestDefaultEngine(base.DbTestCase):
@mock.patch('qinling.engine.utils.url_request')
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_found_service_url(
self,
etcd_util_get_service_url_mock,
engine_utils_url_request_mock,
engine_utils_get_request_data_mock
self,
etcd_util_get_service_url_mock,
engine_utils_url_request_mock,
engine_utils_get_request_data_mock
):
function = self.create_function()
function_id = function.id
@ -407,18 +402,21 @@ class TestDefaultEngine(base.DbTestCase):
'output': 'failed output'})
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id, input='input')
mock.Mock(), execution_id, function_id, 0, runtime_id,
input='input')
self.default_engine.function_load_check.assert_called_once_with(
function_id, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id)
function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
engine_utils_get_request_data_mock.assert_called_once_with(
mock.ANY, function_id, execution_id,
mock.ANY, function_id, 0, execution_id,
'input', function.entry, function.trust_id,
self.qinling_endpoint)
engine_utils_url_request_mock.assert_called_once_with(
self.default_engine.session, 'svc_url/execute', body='data')
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.FAILED)
self.assertEqual(execution.logs, 'execution log')
self.assertEqual(execution.result,
@ -430,35 +428,36 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.delete_function(mock.Mock(), function_id)
self.orchestrator.delete_function.assert_called_once_with(
function_id)
function_id, 0
)
@mock.patch('qinling.utils.etcd_util.create_service_url')
@mock.patch('qinling.utils.etcd_util.create_worker')
def test_scaleup_function(
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
self.orchestrator.scaleup_function.return_value = (['worker'], 'url')
self.default_engine.scaleup_function(
mock.Mock(), function_id, runtime_id)
mock.Mock(), function_id, 0, runtime_id)
self.orchestrator.scaleup_function.assert_called_once_with(
function_id, identifier=runtime_id, count=1)
function_id, 0, identifier=runtime_id, count=1)
etcd_util_create_worker_mock.assert_called_once_with(
function_id, 'worker')
function_id, 'worker', version=0)
etcd_util_create_service_url_mock.assert_called_once_with(
function_id, 'url')
function_id, 'url', version=0)
@mock.patch('qinling.utils.etcd_util.create_service_url')
@mock.patch('qinling.utils.etcd_util.create_worker')
def test_scaleup_function_multiple_workers(
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
@ -466,22 +465,24 @@ class TestDefaultEngine(base.DbTestCase):
['worker0', 'worker1'], 'url')
self.default_engine.scaleup_function(
mock.Mock(), function_id, runtime_id, count=2)
mock.Mock(), function_id, 0, runtime_id, count=2
)
self.orchestrator.scaleup_function.assert_called_once_with(
function_id, identifier=runtime_id, count=2)
function_id, 0, identifier=runtime_id, count=2
)
# Two new workers are created.
expected = [mock.call(function_id, 'worker0'),
mock.call(function_id, 'worker1')]
expected = [mock.call(function_id, 'worker0', version=0),
mock.call(function_id, 'worker1', version=0)]
etcd_util_create_worker_mock.assert_has_calls(expected)
self.assertEqual(2, etcd_util_create_worker_mock.call_count)
etcd_util_create_service_url_mock.assert_called_once_with(
function_id, 'url')
function_id, 'url', version=0
)
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
@ -491,33 +492,33 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.scaledown_function(mock.Mock(), function_id)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
function_id, 0)
self.orchestrator.delete_worker.assert_called_once_with('worker_0')
etcd_util_delete_workers_mock.assert_called_once_with(
function_id, 'worker_0')
function_id, 'worker_0', version=0
)
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function_multiple_workers(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
'worker_%d' % i for i in range(4)
]
self.default_engine.scaledown_function(
mock.Mock(), function_id, count=2)
self.default_engine.scaledown_function(mock.Mock(), function_id,
count=2)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id, 0)
# First two workers will be deleted.
expected = [mock.call('worker_0'), mock.call('worker_1')]
self.orchestrator.delete_worker.assert_has_calls(expected)
self.assertEqual(2, self.orchestrator.delete_worker.call_count)
expected = [
mock.call(function_id, 'worker_0'),
mock.call(function_id, 'worker_1')
mock.call(function_id, 'worker_0', version=0),
mock.call(function_id, 'worker_1', version=0)
]
etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(2, etcd_util_delete_workers_mock.call_count)
@ -525,7 +526,7 @@ class TestDefaultEngine(base.DbTestCase):
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function_leaving_one_worker(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
@ -535,8 +536,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.scaledown_function(
mock.Mock(), function_id, count=5) # count > len(workers)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id, 0)
# Only the first three workers will be deleted
expected = [
mock.call('worker_0'), mock.call('worker_1'), mock.call('worker_2')
@ -544,9 +544,9 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator.delete_worker.assert_has_calls(expected)
self.assertEqual(3, self.orchestrator.delete_worker.call_count)
expected = [
mock.call(function_id, 'worker_0'),
mock.call(function_id, 'worker_1'),
mock.call(function_id, 'worker_2')
mock.call(function_id, 'worker_0', version=0),
mock.call(function_id, 'worker_1', version=0),
mock.call(function_id, 'worker_2', version=0)
]
etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(3, etcd_util_delete_workers_mock.call_count)

180
qinling/tests/unit/orchestrator/kubernetes/test_manager.py

@ -30,10 +30,10 @@ SERVICE_ADDRESS_EXTERNAL = '1.2.3.4'
SERVICE_ADDRESS_INTERNAL = '127.0.0.1'
class TestKubernetesManager(base.BaseTest):
class TestKubernetesManager(base.DbTestCase):
def setUp(self):
super(TestKubernetesManager, self).setUp()
CONF.register_opts(config.kubernetes_opts, config.KUBERNETES_GROUP)
self.conf = CONF
self.qinling_endpoint = 'http://127.0.0.1:7070'
self.k8s_v1_api = mock.Mock()
@ -48,13 +48,17 @@ class TestKubernetesManager(base.BaseTest):
prefix='TestKubernetesManager')
self.override_config('namespace', self.fake_namespace,
config.KUBERNETES_GROUP)
self.override_config('auth_enable', False, group='pecan')
namespace = mock.Mock()
namespace.metadata.name = self.fake_namespace
namespaces = mock.Mock()
namespaces.items = [namespace]
self.k8s_v1_api.list_namespace.return_value = namespaces
self.manager = k8s_manager.KubernetesManager(
self.conf, self.qinling_endpoint)
self.manager = k8s_manager.KubernetesManager(self.conf,
self.qinling_endpoint)
def _create_service(self):
port = mock.Mock()
@ -305,7 +309,7 @@ class TestKubernetesManager(base.BaseTest):
rollback.assert_called_once_with(
fake_deployment_name, self.fake_namespace, rollback_body)
def test_prepare_execution(self):
def test_prepare_execution_no_image(self):
pod = mock.Mock()
pod.metadata.name = self.rand_name('pod',
prefix='TestKubernetesManager')
@ -323,7 +327,7 @@ class TestKubernetesManager(base.BaseTest):
function_id = common.generate_unicode_uuid()
pod_names, service_url = self.manager.prepare_execution(
function_id, image=None, identifier=runtime_id,
function_id, 0, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
self.assertEqual(pod.metadata.name, pod_names)
@ -334,10 +338,15 @@ class TestKubernetesManager(base.BaseTest):
# in _choose_available_pods
self.k8s_v1_api.list_namespaced_pod.assert_called_once_with(
self.fake_namespace,
label_selector='function_id=%s' % function_id)
label_selector='function_id=%s,function_version=0' % (function_id)
)
# in _prepare_pod -> _update_pod_label
pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id}
pod_labels = {
'pod1_key1': 'pod1_value1',
'function_id': function_id,
'function_version': '0'
}
body = {'metadata': {'labels': pod_labels}}
self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with(
pod.metadata.name, self.fake_namespace, body)
@ -345,8 +354,9 @@ class TestKubernetesManager(base.BaseTest):
# in _prepare_pod
service_body = self.manager.service_template.render(
{
'service_name': 'service-%s' % function_id,
'service_name': 'service-%s-0' %