Browse Source

Merge "Support version for execution creation"

changes/05/563805/1
Zuul 3 years ago
committed by Gerrit Code Review
parent
commit
a510f1c938
20 changed files with 580 additions and 346 deletions
  1. +3
    -0
      qinling/api/controllers/v1/execution.py
  2. +2
    -2
      qinling/api/controllers/v1/function.py
  3. +2
    -27
      qinling/api/controllers/v1/resources.py
  4. +9
    -0
      qinling/db/api.py
  5. +14
    -0
      qinling/db/sqlalchemy/api.py
  6. +1
    -0
      qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py
  7. +1
    -0
      qinling/db/sqlalchemy/models.py
  8. +64
    -33
      qinling/engine/default_engine.py
  9. +18
    -6
      qinling/engine/utils.py
  10. +5
    -4
      qinling/orchestrator/base.py
  11. +57
    -27
      qinling/orchestrator/kubernetes/manager.py
  12. +1
    -1
      qinling/orchestrator/kubernetes/templates/service.j2
  13. +10
    -5
      qinling/rpc.py
  14. +26
    -7
      qinling/services/periodics.py
  15. +20
    -1
      qinling/tests/unit/api/controllers/v1/test_execution.py
  16. +126
    -126
      qinling/tests/unit/engine/test_default_engine.py
  17. +116
    -64
      qinling/tests/unit/orchestrator/kubernetes/test_manager.py
  18. +35
    -9
      qinling/tests/unit/services/test_periodics.py
  19. +16
    -15
      qinling/utils/etcd_util.py
  20. +54
    -19
      qinling/utils/executions.py

+ 3
- 0
qinling/api/controllers/v1/execution.py View File

@ -56,6 +56,9 @@ class ExecutionsController(rest.RestController):
status_code=201
)
def post(self, body):
ctx = context.get_ctx()
acl.enforce('execution:create', ctx)
params = body.to_dict()
LOG.info("Creating %s. [params=%s]", self.type, params)


+ 2
- 2
qinling/api/controllers/v1/function.py View File

@ -55,12 +55,12 @@ class FunctionWorkerController(rest.RestController):
acl.enforce('function_worker:get_all', context.get_ctx())
LOG.info("Get workers for function %s.", function_id)
workers = etcd_util.get_workers(function_id, CONF)
workers = etcd_util.get_workers(function_id)
workers = [
resources.FunctionWorker.from_dict(
{'function_id': function_id, 'worker_name': w}
) for w in workers
]
]
return resources.FunctionWorkers(workers=workers)


+ 2
- 27
qinling/api/controllers/v1/resources.py View File

@ -278,6 +278,7 @@ class Runtimes(ResourceList):
class Execution(Resource):
id = types.uuid
function_id = wsme.wsattr(types.uuid, mandatory=True)
function_version = wsme.wsattr(int, default=0)
description = wtypes.text
status = wsme.wsattr(wtypes.text, readonly=True)
sync = bool
@ -303,21 +304,6 @@ class Execution(Resource):
return obj
@classmethod
def sample(cls):
return cls(
id='123e4567-e89b-12d3-a456-426655440000',
function_id='123e4567-e89b-12d3-a456-426655440000',
description='this is the first execution.',
status='success',
sync=True,
input={'data': 'hello, world'},
result={'result': 'hello, world'},
project_id='default',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000'
)
class Executions(ResourceList):
executions = [Execution]
@ -327,18 +313,6 @@ class Executions(ResourceList):
super(Executions, self).__init__(**kwargs)
@classmethod
def sample(cls):
sample = cls()
sample.executions = [Execution.sample()]
sample.next = (
"http://localhost:7070/v1/executions?"
"sort_keys=id,name&sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000"
)
return sample
class Job(Resource):
id = types.uuid
@ -420,6 +394,7 @@ class FunctionVersion(Resource):
id = types.uuid
description = wtypes.text
version_number = wsme.wsattr(int, readonly=True)
count = wsme.wsattr(int, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)


+ 9
- 0
qinling/db/api.py View File

@ -212,5 +212,14 @@ def get_function_version(function_id, version):
return IMPL.get_function_version(function_id, version)
# This function is only used in unit test.
def update_function_version(function_id, version, **kwargs):
return IMPL.update_function_version(function_id, version, **kwargs)
def delete_function_version(function_id, version):
return IMPL.delete_function_version(function_id, version)
def get_function_versions(**kwargs):
return IMPL.get_function_versions(**kwargs)

+ 14
- 0
qinling/db/sqlalchemy/api.py View File

@ -529,7 +529,21 @@ def get_function_version(function_id, version, session=None):
return version_db
# This function is only used in unit test.
@db_base.session_aware()
def update_function_version(function_id, version, session=None, **kwargs):
version_db = get_function_version(function_id, version, session=session)
version_db.update(kwargs.copy())
return version_db
@db_base.session_aware()
def delete_function_version(function_id, version, session=None):
version_db = get_function_version(function_id, version)
session.delete(version_db)
@db_base.session_aware()
def get_function_versions(session=None, **kwargs):
return _get_collection_sorted_by_time(models.FunctionVersion, **kwargs)

+ 1
- 0
qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py View File

@ -38,6 +38,7 @@ def upgrade():
sa.Column('function_id', sa.String(length=36), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('version_number', sa.Integer, nullable=False),
sa.Column('count', sa.Integer, nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.ForeignKeyConstraint(['function_id'], [u'functions.id']),
sa.UniqueConstraint('function_id', 'version_number', 'project_id'),


+ 1
- 0
qinling/db/sqlalchemy/models.py View File

@ -120,6 +120,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase):
function = relationship('Function', back_populates="versions")
description = sa.Column(sa.String(255), nullable=True)
version_number = sa.Column(sa.Integer, default=0)
count = sa.Column(sa.Integer, default=0)
Runtime.functions = relationship("Function", back_populates="runtime")


+ 64
- 33
qinling/engine/default_engine.py View File

@ -89,33 +89,41 @@ class DefaultEngine(object):
stop=tenacity.stop_after_attempt(30),
retry=(tenacity.retry_if_result(lambda result: result is False))
)
def function_load_check(self, function_id, runtime_id):
with etcd_util.get_worker_lock() as lock:
def function_load_check(self, function_id, version, runtime_id):
"""Check function load and scale the workers if needed.
:return: None if no need to scale up otherwise return the service url
"""
with etcd_util.get_worker_lock(function_id, version) as lock:
if not lock.is_acquired():
return False
workers = etcd_util.get_workers(function_id)
workers = etcd_util.get_workers(function_id, version)
running_execs = db_api.get_executions(
function_id=function_id, status=status.RUNNING
function_id=function_id,
function_version=version,
status=status.RUNNING
)
concurrency = (len(running_execs) or 1) / (len(workers) or 1)
if (len(workers) == 0 or
concurrency > CONF.engine.function_concurrency):
LOG.info(
'Scale up function %s. Current concurrency: %s, execution '
'number %s, worker number %s',
function_id, concurrency, len(running_execs), len(workers)
'Scale up function %s(version %s). Current concurrency: '
'%s, execution number %s, worker number %s',
function_id, version, concurrency, len(running_execs),
len(workers)
)
# NOTE(kong): The increase step could be configurable
return self.scaleup_function(None, function_id, runtime_id, 1)
return self.scaleup_function(None, function_id, version,
runtime_id, 1)
def create_execution(self, ctx, execution_id, function_id, runtime_id,
input=None):
def create_execution(self, ctx, execution_id, function_id,
function_version, runtime_id, input=None):
LOG.info(
'Creating execution. execution_id=%s, function_id=%s, '
'runtime_id=%s, input=%s',
execution_id, function_id, runtime_id, input
'function_version=%s, runtime_id=%s, input=%s',
execution_id, function_id, function_version, runtime_id, input
)
function = db_api.get_function(function_id)
@ -129,22 +137,25 @@ class DefaultEngine(object):
# Auto scale workers if needed
if not is_image_source:
try:
svc_url = self.function_load_check(function_id, runtime_id)
svc_url = self.function_load_check(function_id,
function_version,
runtime_id)
except exc.OrchestratorException as e:
utils.handle_execution_exception(execution_id, str(e))
return
temp_url = etcd_util.get_service_url(function_id)
temp_url = etcd_util.get_service_url(function_id, function_version)
svc_url = svc_url or temp_url
if svc_url:
func_url = '%s/execute' % svc_url
LOG.debug(
'Found service url for function: %s, execution: %s, url: %s',
function_id, execution_id, func_url
'Found service url for function: %s(version %s), execution: '
'%s, url: %s',
function_id, function_version, execution_id, func_url
)
data = utils.get_request_data(
CONF, function_id, execution_id,
CONF, function_id, function_version, execution_id,
input, function.entry, function.trust_id,
self.qinling_endpoint
)
@ -152,12 +163,13 @@ class DefaultEngine(object):
self.session, func_url, body=data
)
utils.finish_execution(
execution_id, success, res, is_image_source=is_image_source)
utils.finish_execution(execution_id, success, res,
is_image_source=is_image_source)
return
if source == constants.IMAGE_FUNCTION:
image = function.code['image']
# Be consistent with k8s naming convention
identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False),
function_id)
@ -167,8 +179,13 @@ class DefaultEngine(object):
labels = {'runtime_id': runtime_id}
try:
# For image function, it will be executed inside this method; for
# package type function it only sets up underlying resources and
# get a service url. If the service url is already created
# beforehand, nothing happens.
_, svc_url = self.orchestrator.prepare_execution(
function_id,
function_version,
image=image,
identifier=identifier,
labels=labels,
@ -178,9 +195,12 @@ class DefaultEngine(object):
utils.handle_execution_exception(execution_id, str(e))
return
# For image type function, read the worker log; For package type
# function, invoke and get log
success, res = self.orchestrator.run_execution(
execution_id,
function_id,
function_version,
input=input,
identifier=identifier,
service_url=svc_url,
@ -188,34 +208,43 @@ class DefaultEngine(object):
trust_id=function.trust_id
)
utils.finish_execution(
execution_id, success, res, is_image_source=is_image_source)
utils.finish_execution(execution_id, success, res,
is_image_source=is_image_source)
def delete_function(self, ctx, function_id):
def delete_function(self, ctx, function_id, function_version=0):
"""Deletes underlying resources allocated for function."""
LOG.info('Start to delete function %s.', function_id)
LOG.info('Start to delete function %s(version %s).', function_id,
function_version)
self.orchestrator.delete_function(function_id)
self.orchestrator.delete_function(function_id, function_version)
LOG.info('Deleted function %s.', function_id)
LOG.info('Deleted function %s(version %s).', function_id,
function_version)
def scaleup_function(self, ctx, function_id, runtime_id, count=1):
def scaleup_function(self, ctx, function_id, function_version, runtime_id,
count=1):
worker_names, service_url = self.orchestrator.scaleup_function(
function_id,
function_version,
identifier=runtime_id,
count=count
)
for name in worker_names:
etcd_util.create_worker(function_id, name)
etcd_util.create_worker(function_id, name,
version=function_version)
etcd_util.create_service_url(function_id, service_url,
version=function_version)
etcd_util.create_service_url(function_id, service_url)
LOG.info('Finished scaling up function %s(version %s).', function_id,
function_version)
LOG.info('Finished scaling up function %s.', function_id)
return service_url
def scaledown_function(self, ctx, function_id, count=1):
workers = etcd_util.get_workers(function_id)
def scaledown_function(self, ctx, function_id, function_version=0,
count=1):
workers = etcd_util.get_workers(function_id, function_version)
worker_deleted_num = (
count if len(workers) > count else len(workers) - 1
)
@ -224,6 +253,8 @@ class DefaultEngine(object):
for worker in workers:
LOG.debug('Removing worker %s', worker)
self.orchestrator.delete_worker(worker)
etcd_util.delete_worker(function_id, worker)
etcd_util.delete_worker(function_id, worker,
version=function_version)
LOG.info('Finished scaling down function %s.', function_id)
LOG.info('Finished scaling down function %s(version %s).', function_id,
function_version)

+ 18
- 6
qinling/engine/utils.py View File

@ -74,17 +74,29 @@ def url_request(request_session, url, body=None):
return False, {'error': 'Internal service error.'}
def get_request_data(conf, function_id, execution_id, input, entry, trust_id,
qinling_endpoint):
def get_request_data(conf, function_id, version, execution_id, input, entry,
trust_id, qinling_endpoint):
"""Prepare the request body should send to the worker."""
ctx = context.get_ctx()
download_url = (
'%s/%s/functions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION, function_id)
)
if version == 0:
download_url = (
'%s/%s/functions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION,
function_id)
)
else:
download_url = (
'%s/%s/functions/%s/versions/%s?download=true' %
(qinling_endpoint.strip('/'), constants.CURRENT_VERSION,
function_id, version)
)
data = {
'execution_id': execution_id,
'input': input,
'function_id': function_id,
'function_version': version,
'entry': entry,
'download_url': download_url,
'request_id': ctx.request_id,


+ 5
- 4
qinling/orchestrator/base.py View File

@ -39,19 +39,20 @@ class OrchestratorBase(object):
raise NotImplementedError
@abc.abstractmethod
def prepare_execution(self, function_id, **kwargs):
def prepare_execution(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def run_execution(self, execution_id, function_id, **kwargs):
def run_execution(self, execution_id, function_id, function_version,
**kwargs):
raise NotImplementedError
@abc.abstractmethod
def delete_function(self, function_id, **kwargs):
def delete_function(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def scaleup_function(self, function_id, **kwargs):
def scaleup_function(self, function_id, function_version, **kwargs):
raise NotImplementedError
@abc.abstractmethod


+ 57
- 27
qinling/orchestrator/kubernetes/manager.py View File

@ -29,6 +29,7 @@ from qinling.orchestrator import base
from qinling.orchestrator.kubernetes import utils as k8s_util
from qinling.utils import common
LOG = logging.getLogger(__name__)
TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/')
@ -218,17 +219,20 @@ class KubernetesManager(base.OrchestratorBase):
return True
def _choose_available_pods(self, labels, count=1, function_id=None):
def _choose_available_pods(self, labels, count=1, function_id=None,
function_version=0):
# If there is already a pod for function, reuse it.
if function_id:
ret = self.v1.list_namespaced_pod(
self.conf.kubernetes.namespace,
label_selector='function_id=%s' % function_id
label_selector='function_id=%s,function_version=%s' %
(function_id, function_version)
)
if len(ret.items) >= count:
LOG.debug(
"Function %s already associates to a pod with at least "
"%d worker(s). ", function_id, count
"Function %s(version %s) already associates to a pod with "
"at least %d worker(s). ",
function_id, function_version, count
)
return ret.items[:count]
@ -243,7 +247,8 @@ class KubernetesManager(base.OrchestratorBase):
return ret.items[-count:]
def _prepare_pod(self, pod, deployment_name, function_id, labels=None):
def _prepare_pod(self, pod, deployment_name, function_id, version,
labels=None):
"""Pod preparation.
1. Update pod labels.
@ -253,16 +258,22 @@ class KubernetesManager(base.OrchestratorBase):
labels = labels or {}
LOG.info(
'Prepare pod %s in deployment %s for function %s',
pod_name, deployment_name, function_id
'Prepare pod %s in deployment %s for function %s(version %s)',
pod_name, deployment_name, function_id, version
)
# Update pod label.
pod_labels = self._update_pod_label(pod, {'function_id': function_id})
pod_labels = self._update_pod_label(
pod,
# pod label value should be string
{'function_id': function_id, 'function_version': str(version)}
)
# Create service for the chosen pod.
service_name = "service-%s" % function_id
labels.update({'function_id': function_id})
service_name = "service-%s-%s" % (function_id, version)
labels.update(
{'function_id': function_id, 'function_version': str(version)}
)
# TODO(kong): Make the service type configurable.
service_body = self.service_template.render(
@ -314,6 +325,7 @@ class KubernetesManager(base.OrchestratorBase):
return pod_name, pod_service_url
def _create_pod(self, image, pod_name, labels, input):
"""Create pod for image type function."""
if not input:
input_list = []
elif isinstance(input, dict) and input.get('__function_input'):
@ -357,15 +369,17 @@ class KubernetesManager(base.OrchestratorBase):
return pod_labels
def prepare_execution(self, function_id, image=None, identifier=None,
labels=None, input=None):
"""Prepare service URL for function.
def prepare_execution(self, function_id, version, image=None,
identifier=None, labels=None, input=None):
"""Prepare service URL for function version.
For image function, create a single pod with input, so the function
will be executed.
For normal function, choose a pod from the pool and expose a service,
return the service URL.
return a tuple includes pod name and the servise url.
"""
pods = None
@ -375,7 +389,8 @@ class KubernetesManager(base.OrchestratorBase):
self._create_pod(image, identifier, labels, input)
return identifier, None
else:
pods = self._choose_available_pods(labels, function_id=function_id)
pods = self._choose_available_pods(labels, function_id=function_id,
function_version=version)
if not pods:
LOG.critical('No worker available.')
@ -383,31 +398,35 @@ class KubernetesManager(base.OrchestratorBase):
try:
pod_name, url = self._prepare_pod(
pods[0], identifier, function_id, labels
pods[0], identifier, function_id, version, labels
)
return pod_name, url
except Exception:
LOG.exception('Pod preparation failed.')
self.delete_function(function_id, labels)
self.delete_function(function_id, version, labels)
raise exc.OrchestratorException('Execution preparation failed.')
def run_execution(self, execution_id, function_id, input=None,
def run_execution(self, execution_id, function_id, version, input=None,
identifier=None, service_url=None, entry='main.main',
trust_id=None):
"""Run execution and get output."""
"""Run execution.
Return a tuple including the result and the output.
"""
if service_url:
func_url = '%s/execute' % service_url
data = utils.get_request_data(
self.conf, function_id, execution_id, input, entry, trust_id,
self.qinling_endpoint
self.conf, function_id, version, execution_id, input, entry,
trust_id, self.qinling_endpoint
)
LOG.debug(
'Invoke function %s, url: %s, data: %s',
function_id, func_url, data
'Invoke function %s(version %s), url: %s, data: %s',
function_id, version, func_url, data
)
return utils.url_request(self.session, func_url, body=data)
else:
# Wait for image type function execution to be finished
def _wait_complete():
pod = self.v1.read_namespaced_pod(
identifier,
@ -437,8 +456,17 @@ class KubernetesManager(base.OrchestratorBase):
)
return True, output
def delete_function(self, function_id, labels=None):
labels = labels or {'function_id': function_id}
def delete_function(self, function_id, version, labels=None):
"""Delete related resources for function.
- Delete service
- Delete pods
"""
pre_label = {
'function_id': function_id,
'function_version': str(version)
}
labels = labels or pre_label
selector = common.convert_dict_to_string(labels)
ret = self.v1.list_namespaced_service(
@ -456,7 +484,7 @@ class KubernetesManager(base.OrchestratorBase):
label_selector=selector
)
def scaleup_function(self, function_id, identifier=None, count=1):
def scaleup_function(self, function_id, version, identifier=None, count=1):
pod_names = []
labels = {'runtime_id': identifier}
pods = self._choose_available_pods(labels, count=count)
@ -466,11 +494,13 @@ class KubernetesManager(base.OrchestratorBase):
for pod in pods:
pod_name, service_url = self._prepare_pod(
pod, identifier, function_id, labels
pod, identifier, function_id, version, labels
)
pod_names.append(pod_name)
LOG.info('Pods scaled up for function %s: %s', function_id, pod_names)
LOG.info('Pods scaled up for function %s(version %s): %s', function_id,
version, pod_names)
return pod_names, service_url
def delete_worker(self, pod_name, **kwargs):


+ 1
- 1
qinling/orchestrator/kubernetes/templates/service.j2 View File

@ -4,7 +4,7 @@ metadata:
name: {{ service_name }}
labels:
{% for key, value in labels.items() %}
{{ key }}: {{ value }}
{{ key }}: "{{ value }}"
{% endfor %}
spec:
type: NodePort


+ 10
- 5
qinling/rpc.py View File

@ -153,7 +153,7 @@ class EngineClient(object):
)
@wrap_messaging_exception
def create_execution(self, execution_id, function_id, runtime_id,
def create_execution(self, execution_id, function_id, version, runtime_id,
input=None, is_sync=True):
method_client = self._client.prepare(topic=self.topic, server=None)
@ -163,6 +163,7 @@ class EngineClient(object):
'create_execution',
execution_id=execution_id,
function_id=function_id,
function_version=version,
runtime_id=runtime_id,
input=input
)
@ -172,33 +173,37 @@ class EngineClient(object):
'create_execution',
execution_id=execution_id,
function_id=function_id,
function_version=version,
runtime_id=runtime_id,
input=input
)
@wrap_messaging_exception
def delete_function(self, id):
def delete_function(self, id, version=0):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'delete_function',
function_id=id
function_id=id,
function_version=version
)
@wrap_messaging_exception
def scaleup_function(self, id, runtime_id, count=1):
def scaleup_function(self, id, runtime_id, version=0, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaleup_function',
function_id=id,
runtime_id=runtime_id,
function_version=version,
count=count
)
@wrap_messaging_exception
def scaledown_function(self, id, count=1):
def scaledown_function(self, id, version=0, count=1):
return self._client.prepare(topic=self.topic, server=None).cast(
ctx.get_ctx(),
'scaledown_function',
function_id=id,
function_version=version,
count=count
)

+ 26
- 7
qinling/services/periodics.py View File

@ -51,24 +51,43 @@ def handle_function_service_expiration(ctx, engine):
results = db_api.get_functions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time}
updated_at={'lte': expiry_time},
latest_version=0
)
if len(results) == 0:
return
for func_db in results:
if not etcd_util.get_service_url(func_db.id):
if not etcd_util.get_service_url(func_db.id, 0):
continue
LOG.info(
'Deleting service mapping and workers for function %s',
'Deleting service mapping and workers for function %s(version 0)',
func_db.id
)
# Delete resources related to the function
engine.delete_function(ctx, func_db.id)
engine.delete_function(ctx, func_db.id, 0)
# Delete etcd keys
etcd_util.delete_function(func_db.id)
etcd_util.delete_function(func_db.id, 0)
versions = db_api.get_function_versions(
sort_keys=['updated_at'],
insecure=True,
updated_at={'lte': expiry_time},
)
for v in versions:
if not etcd_util.get_service_url(v.function_id, v.version_number):
continue
LOG.info(
'Deleting service mapping and workers for function %s(version %s)',
v.function_id, v.version_number
)
# Delete resources related to the function
engine.delete_function(ctx, v.function_id, v.version_number)
# Delete etcd keys
etcd_util.delete_function(v.function_id, v.version_number)
@periodics.periodic(3)


+ 20
- 1
qinling/tests/unit/api/controllers/v1/test_execution.py View File

@ -14,6 +14,7 @@
import mock
from qinling.db import api as db_api
from qinling import exceptions as exc
from qinling import status
from qinling.tests.unit.api import base
@ -40,7 +41,25 @@ class TestExecutionController(base.APITest):
self.assertEqual(1, resp.json.get('count'))
@mock.patch('qinling.rpc.EngineClient.create_execution')
def test_create_rpc_error(self, mock_create_execution):
def test_post_with_version(self, mock_rpc):
db_api.increase_function_version(self.func_id, 0,
description="version 1")
body = {
'function_id': self.func_id,
'function_version': 1
}
resp = self.app.post_json('/v1/executions', body)
self.assertEqual(201, resp.status_int)
resp = self.app.get('/v1/functions/%s' % self.func_id)
self.assertEqual(0, resp.json.get('count'))
resp = self.app.get('/v1/functions/%s/versions/1' % self.func_id)
self.assertEqual(1, resp.json.get('count'))
@mock.patch('qinling.rpc.EngineClient.create_execution')
def test_post_rpc_error(self, mock_create_execution):
mock_create_execution.side_effect = exc.QinlingException
body = {
'function_id': self.func_id,


+ 126
- 126
qinling/tests/unit/engine/test_default_engine.py View File

@ -29,10 +29,11 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator = mock.Mock()
self.qinling_endpoint = 'http://127.0.0.1:7070'
self.default_engine = default_engine.DefaultEngine(
self.orchestrator, self.qinling_endpoint)
self.orchestrator, self.qinling_endpoint
)
def _create_running_executions(self, function_id, num):
for _i in range(num):
for _ in range(num):
self.create_execution(function_id=function_id)
def test_create_runtime(self):
@ -110,113 +111,91 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(runtime.image, pre_image)
self.assertEqual(runtime.status, status.AVAILABLE)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_no_worker_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_no_worker(self, mock_getlock, mock_getworkers,
mock_scaleup):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
etcd_util_get_workers_mock.return_value = [] # len(workers) = 0
self.default_engine.scaleup_function = mock.Mock()
mock_getlock.return_value.__enter__.return_value = lock
mock_getworkers.return_value = []
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_called_once_with(
None, function_id, runtime_id, 1)
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id,
1)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_concurrency_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_scaleup(self, mock_getlock, mock_getworkers,
mock_scaleup):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
mock_getlock.return_value.__enter__.return_value = lock
# The default concurrency is 3, we use 4 running executions against
# 1 worker so that there will be a scaling up.
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 4)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_called_once_with(
None, function_id, runtime_id, 1)
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id,
1)
@mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function')
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_not_scaleup(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_not_scaleup(self, mock_getlock,
mock_getworkers, mock_scaleup):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
lock.is_acquired.return_value = True
mock_getlock.return_value.__enter__.return_value = lock
# The default concurrency is 3, we use 3 running executions against
# 1 worker so that there won't be a scaling up.
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 3)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_not_called()
mock_getworkers.assert_called_once_with(function_id, 0)
mock_scaleup.assert_not_called()
@mock.patch('qinling.utils.etcd_util.get_workers')
@mock.patch('qinling.utils.etcd_util.get_worker_lock')
def test_function_load_check_lock_wait(
self,
etcd_util_get_worker_lock_mock,
etcd_util_get_workers_mock
):
def test_function_load_check_lock_wait(self, mock_getlock,
mock_getworkers):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
lock = mock.Mock()
(
etcd_util_get_worker_lock_mock.return_value.__enter__.return_value
) = lock
mock_getlock.return_value.__enter__.return_value = lock
# Lock is acquired upon the third try.
lock.is_acquired.side_effect = [False, False, True]
etcd_util_get_workers_mock.return_value = range(1)
mock_getworkers.return_value = ['worker1']
self._create_running_executions(function_id, 3)
self.default_engine.scaleup_function = mock.Mock()
self.default_engine.function_load_check(function_id, runtime_id)
self.default_engine.function_load_check(function_id, 0, runtime_id)
self.assertEqual(3, lock.is_acquired.call_count)
etcd_util_get_workers_mock.assert_called_once_with(function_id)
self.default_engine.scaleup_function.assert_not_called()
mock_getworkers.assert_called_once_with(function_id, 0)
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution(
self,
etcd_util_get_service_url_mock
):
def test_create_execution_image_type_function(self, mock_svc_url):
"""Create 2 executions for an image type function."""
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -234,42 +213,47 @@ class TestDefaultEngine(base.DbTestCase):
execution_1_id = execution_1.id
execution_2 = self.create_execution(function_id=function_id)
execution_2_id = execution_2.id
self.default_engine.function_load_check = mock.Mock()
etcd_util_get_service_url_mock.return_value = None
mock_svc_url.return_value = None
self.orchestrator.prepare_execution.return_value = (
mock.Mock(), None)
self.orchestrator.run_execution.side_effect = [
(True, 'success result'),
(False, 'failed result')]
# Try create two executions, with different results
# Create two executions, with different results
self.default_engine.create_execution(
mock.Mock(), execution_1_id, function_id, runtime_id)
mock.Mock(), execution_1_id, function_id, 0, runtime_id
)
self.default_engine.create_execution(
mock.Mock(), execution_2_id, function_id, runtime_id,
input='input')
mock.Mock(), execution_2_id, function_id, 0, runtime_id,
input='input'
)
self.default_engine.function_load_check.assert_not_called()
get_service_url_calls = [
mock.call(function_id), mock.call(function_id)]
etcd_util_get_service_url_mock.assert_has_calls(get_service_url_calls)
self.assertEqual(2, etcd_util_get_service_url_mock.call_count)
mock.call(function_id, 0), mock.call(function_id, 0)
]
mock_svc_url.assert_has_calls(get_service_url_calls)
prepare_calls = [
mock.call(function_id,
0,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
input=None),
mock.call(function_id,
0,
image=function.code['image'],
identifier=mock.ANY,
labels=None,
input='input')]
input='input')
]
self.orchestrator.prepare_execution.assert_has_calls(prepare_calls)
self.assertEqual(2, self.orchestrator.prepare_execution.call_count)
run_calls = [
mock.call(execution_1_id,
function_id,
0,
input=None,
identifier=mock.ANY,
service_url=None,
@ -277,15 +261,18 @@ class TestDefaultEngine(base.DbTestCase):
trust_id=function.trust_id),
mock.call(execution_2_id,
function_id,
0,
input='input',
identifier=mock.ANY,
service_url=None,
entry=function.entry,
trust_id=function.trust_id)]
trust_id=function.trust_id)
]
self.orchestrator.run_execution.assert_has_calls(run_calls)
self.assertEqual(2, self.orchestrator.run_execution.call_count)
execution_1 = db_api.get_execution(execution_1_id)
execution_2 = db_api.get_execution(execution_2_id)
self.assertEqual(execution_1.status, status.SUCCESS)
self.assertEqual(execution_1.logs, '')
self.assertEqual(execution_1.result, {'output': 'success result'})
@ -298,6 +285,11 @@ class TestDefaultEngine(base.DbTestCase):
self,
etcd_util_get_service_url_mock
):
"""test_create_execution_prepare_execution_exception
Create execution for image type function, prepare_execution method
raises exception.
"""
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -320,7 +312,7 @@ class TestDefaultEngine(base.DbTestCase):
etcd_util_get_service_url_mock.return_value = None
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR)
@ -328,9 +320,9 @@ class TestDefaultEngine(base.DbTestCase):
self.assertEqual(execution.result, {})
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_not_image_source(
self,
etcd_util_get_service_url_mock
def test_create_execution_package_type_function(
self,
etcd_util_get_service_url_mock
):
function = self.create_function()
function_id = function.id
@ -347,24 +339,26 @@ class TestDefaultEngine(base.DbTestCase):
'output': 'success output'})
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
self.default_engine.function_load_check.assert_called_once_with(
function_id, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id)
function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
self.orchestrator.prepare_execution.assert_called_once_with(
function_id, image=None, identifier=runtime_id,
function_id, 0, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id}, input=None)
self.orchestrator.run_execution.assert_called_once_with(
execution_id, function_id, input=None, identifier=runtime_id,
execution_id, function_id, 0, input=None, identifier=runtime_id,
service_url='svc_url', entry=function.entry,
trust_id=function.trust_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.SUCCESS)
self.assertEqual(execution.logs, 'execution log')
self.assertEqual(execution.result, {'output': 'success output'})
def test_create_execution_not_image_source_scaleup_exception(self):
def test_create_execution_loadcheck_exception(self):
function = self.create_function()
function_id = function.id
runtime_id = function.runtime_id
@ -377,9 +371,10 @@ class TestDefaultEngine(base.DbTestCase):
)
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id)
mock.Mock(), execution_id, function_id, 0, runtime_id)
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.ERROR)
self.assertEqual(execution.logs, '')
self.assertEqual(execution.result, {})
@ -388,10 +383,10 @@ class TestDefaultEngine(base.DbTestCase):
@mock.patch('qinling.engine.utils.url_request')
@mock.patch('qinling.utils.etcd_util.get_service_url')
def test_create_execution_found_service_url(
self,
etcd_util_get_service_url_mock,
engine_utils_url_request_mock,
engine_utils_get_request_data_mock
self,
etcd_util_get_service_url_mock,
engine_utils_url_request_mock,
engine_utils_get_request_data_mock
):
function = self.create_function()
function_id = function.id
@ -407,18 +402,21 @@ class TestDefaultEngine(base.DbTestCase):
'output': 'failed output'})
self.default_engine.create_execution(
mock.Mock(), execution_id, function_id, runtime_id, input='input')
mock.Mock(), execution_id, function_id, 0, runtime_id,
input='input')
self.default_engine.function_load_check.assert_called_once_with(
function_id, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id)
function_id, 0, runtime_id)
etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0)
engine_utils_get_request_data_mock.assert_called_once_with(
mock.ANY, function_id, execution_id,
mock.ANY, function_id, 0, execution_id,
'input', function.entry, function.trust_id,
self.qinling_endpoint)
engine_utils_url_request_mock.assert_called_once_with(
self.default_engine.session, 'svc_url/execute', body='data')
execution = db_api.get_execution(execution_id)
self.assertEqual(execution.status, status.FAILED)
self.assertEqual(execution.logs, 'execution log')
self.assertEqual(execution.result,
@ -430,35 +428,36 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.delete_function(mock.Mock(), function_id)
self.orchestrator.delete_function.assert_called_once_with(
function_id)
function_id, 0
)
@mock.patch('qinling.utils.etcd_util.create_service_url')
@mock.patch('qinling.utils.etcd_util.create_worker')
def test_scaleup_function(
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
self.orchestrator.scaleup_function.return_value = (['worker'], 'url')
self.default_engine.scaleup_function(
mock.Mock(), function_id, runtime_id)
mock.Mock(), function_id, 0, runtime_id)
self.orchestrator.scaleup_function.assert_called_once_with(
function_id, identifier=runtime_id, count=1)
function_id, 0, identifier=runtime_id, count=1)
etcd_util_create_worker_mock.assert_called_once_with(
function_id, 'worker')
function_id, 'worker', version=0)
etcd_util_create_service_url_mock.assert_called_once_with(
function_id, 'url')
function_id, 'url', version=0)
@mock.patch('qinling.utils.etcd_util.create_service_url')
@mock.patch('qinling.utils.etcd_util.create_worker')
def test_scaleup_function_multiple_workers(
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
self,
etcd_util_create_worker_mock,
etcd_util_create_service_url_mock
):
function_id = common.generate_unicode_uuid()
runtime_id = common.generate_unicode_uuid()
@ -466,22 +465,24 @@ class TestDefaultEngine(base.DbTestCase):
['worker0', 'worker1'], 'url')
self.default_engine.scaleup_function(
mock.Mock(), function_id, runtime_id, count=2)
mock.Mock(), function_id, 0, runtime_id, count=2
)
self.orchestrator.scaleup_function.assert_called_once_with(
function_id, identifier=runtime_id, count=2)
function_id, 0, identifier=runtime_id, count=2
)
# Two new workers are created.
expected = [mock.call(function_id, 'worker0'),
mock.call(function_id, 'worker1')]
expected = [mock.call(function_id, 'worker0', version=0),
mock.call(function_id, 'worker1', version=0)]
etcd_util_create_worker_mock.assert_has_calls(expected)
self.assertEqual(2, etcd_util_create_worker_mock.call_count)
etcd_util_create_service_url_mock.assert_called_once_with(
function_id, 'url')
function_id, 'url', version=0
)
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
@ -491,33 +492,33 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.scaledown_function(mock.Mock(), function_id)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
function_id, 0)
self.orchestrator.delete_worker.assert_called_once_with('worker_0')
etcd_util_delete_workers_mock.assert_called_once_with(
function_id, 'worker_0')
function_id, 'worker_0', version=0
)
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function_multiple_workers(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
'worker_%d' % i for i in range(4)
]
self.default_engine.scaledown_function(
mock.Mock(), function_id, count=2)
self.default_engine.scaledown_function(mock.Mock(), function_id,
count=2)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id, 0)
# First two workers will be deleted.
expected = [mock.call('worker_0'), mock.call('worker_1')]
self.orchestrator.delete_worker.assert_has_calls(expected)
self.assertEqual(2, self.orchestrator.delete_worker.call_count)
expected = [
mock.call(function_id, 'worker_0'),
mock.call(function_id, 'worker_1')
mock.call(function_id, 'worker_0', version=0),
mock.call(function_id, 'worker_1', version=0)
]
etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(2, etcd_util_delete_workers_mock.call_count)
@ -525,7 +526,7 @@ class TestDefaultEngine(base.DbTestCase):
@mock.patch('qinling.utils.etcd_util.delete_worker')
@mock.patch('qinling.utils.etcd_util.get_workers')
def test_scaledown_function_leaving_one_worker(
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock
):
function_id = common.generate_unicode_uuid()
etcd_util_get_workers_mock.return_value = [
@ -535,8 +536,7 @@ class TestDefaultEngine(base.DbTestCase):
self.default_engine.scaledown_function(
mock.Mock(), function_id, count=5) # count > len(workers)
etcd_util_get_workers_mock.assert_called_once_with(
function_id)
etcd_util_get_workers_mock.assert_called_once_with(function_id, 0)
# Only the first three workers will be deleted
expected = [
mock.call('worker_0'), mock.call('worker_1'), mock.call('worker_2')
@ -544,9 +544,9 @@ class TestDefaultEngine(base.DbTestCase):
self.orchestrator.delete_worker.assert_has_calls(expected)
self.assertEqual(3, self.orchestrator.delete_worker.call_count)
expected = [
mock.call(function_id, 'worker_0'),
mock.call(function_id, 'worker_1'),
mock.call(function_id, 'worker_2')
mock.call(function_id, 'worker_0', version=0),
mock.call(function_id, 'worker_1', version=0),
mock.call(function_id, 'worker_2', version=0)
]
etcd_util_delete_workers_mock.assert_has_calls(expected)
self.assertEqual(3, etcd_util_delete_workers_mock.call_count)

+ 116
- 64
qinling/tests/unit/orchestrator/kubernetes/test_manager.py View File

@ -30,10 +30,10 @@ SERVICE_ADDRESS_EXTERNAL = '1.2.3.4'
SERVICE_ADDRESS_INTERNAL = '127.0.0.1'
class TestKubernetesManager(base.BaseTest):
class TestKubernetesManager(base.DbTestCase):
def setUp(self):
super(TestKubernetesManager, self).setUp()
CONF.register_opts(config.kubernetes_opts, config.KUBERNETES_GROUP)
self.conf = CONF
self.qinling_endpoint = 'http://127.0.0.1:7070'
self.k8s_v1_api = mock.Mock()
@ -48,13 +48,17 @@ class TestKubernetesManager(base.BaseTest):
prefix='TestKubernetesManager')
self.override_config('namespace', self.fake_namespace,
config.KUBERNETES_GROUP)
self.override_config('auth_enable', False, group='pecan')
namespace = mock.Mock()
namespace.metadata.name = self.fake_namespace
namespaces = mock.Mock()
namespaces.items = [namespace]
self.k8s_v1_api.list_namespace.return_value = namespaces
self.manager = k8s_manager.KubernetesManager(
self.conf, self.qinling_endpoint)
self.manager = k8s_manager.KubernetesManager(self.conf,
self.qinling_endpoint)
def _create_service(self):
port = mock.Mock()
@ -305,7 +309,7 @@ class TestKubernetesManager(base.BaseTest):
rollback.assert_called_once_with(
fake_deployment_name, self.fake_namespace, rollback_body)
def test_prepare_execution(self):
def test_prepare_execution_no_image(self):
pod = mock.Mock()
pod.metadata.name = self.rand_name('pod',
prefix='TestKubernetesManager')
@ -323,7 +327,7 @@ class TestKubernetesManager(base.BaseTest):
function_id = common.generate_unicode_uuid()
pod_names, service_url = self.manager.prepare_execution(
function_id, image=None, identifier=runtime_id,
function_id, 0, image=None, identifier=runtime_id,
labels={'runtime_id': runtime_id})
self.assertEqual(pod.metadata.name, pod_names)
@ -334,10 +338,15 @@ class TestKubernetesManager(base.BaseTest):
# in _choose_available_pods
self.k8s_v1_api.list_namespaced_pod.assert_called_once_with(
self.fake_namespace,
label_selector='function_id=%s' % function_id)
label_selector='function_id=%s,function_version=0' % (function_id)
)
# in _prepare_pod -> _update_pod_label
pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id}
pod_labels = {
'pod1_key1': 'pod1_value1',
'function_id': function_id,
'function_version': '0'
}
body = {'metadata': {'labels': pod_labels}}
self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with(
pod.metadata.name, self.fake_namespace, body)
@ -345,8 +354,9 @@ class TestKubernetesManager(base.BaseTest):
# in _prepare_pod
service_body = self.manager.service_template.render(
{
'service_name': 'service-%s' % function_id,
'service_name': 'service-%s-0' % function_id,
'labels': {'function_id': function_id,
'function_version': '0',
'runtime_id': runtime_id},
'selector': pod_labels
}
@ -357,13 +367,12 @@ class TestKubernetesManager(base.BaseTest):
def test_prepare_execution_with_image(self):
function_id = common.generate_unicode_uuid()
image = self.rand_name('image', prefix='TestKubernetesManager')
identifier = ('%s-%s' % (
common.generate_unicode_uuid(dashed=False),
function_id)
identifier = ('%s-%s' %
(common.generate_unicode_uuid(dashed=False), function_id)
)[:63]
pod_name, url = self.manager.prepare_execution(
function_id, image=image, identifier=identifier)
function_id, 0, image=image, identifier=identifier)
self.assertEqual(identifier, pod_name)
self.assertIsNone(url)
@ -390,7 +399,7 @@ class TestKubernetesManager(base.BaseTest):
fake_input = {'__function_input': 'input_item1 input_item2'}
pod_name, url = self.manager.prepare_execution(
function_id, image=image, identifier=identifier,
function_id, 0, image=image, identifier=identifier,
input=fake_input)
# in _create_pod
@ -415,7 +424,7 @@ class TestKubernetesManager(base.BaseTest):
fake_input = '["input_item3", "input_item4"]'
pod_name, url = self.manager.prepare_execution(
function_id, image=image, identifier=identifier,
function_id, 0, image=image, identifier=identifier,
input=fake_input)
# in _create_pod