From 885ed282343dba799a1ce5e3fb20d2aa2f10c408 Mon Sep 17 00:00:00 2001 From: Lingxian Kong Date: Thu, 19 Apr 2018 16:29:08 +1200 Subject: [PATCH] Support version for execution creation Now, all the underlying resources(pod, service) in k8s cluster have version number in their labels. Different versions of the same function will have different services exposed in k8s cluster. Change-Id: Ic0b3045404105175073844b908fa0f6c2ef2ab8a Story: #2001829 Task: #14350 --- qinling/api/controllers/v1/execution.py | 3 + qinling/api/controllers/v1/function.py | 4 +- qinling/api/controllers/v1/resources.py | 29 +- qinling/db/api.py | 9 + qinling/db/sqlalchemy/api.py | 14 + .../002_add_function_version_support.py | 1 + qinling/db/sqlalchemy/models.py | 1 + qinling/engine/default_engine.py | 97 ++++--- qinling/engine/utils.py | 24 +- qinling/orchestrator/base.py | 9 +- qinling/orchestrator/kubernetes/manager.py | 84 ++++-- .../kubernetes/templates/service.j2 | 2 +- qinling/rpc.py | 15 +- qinling/services/periodics.py | 33 ++- .../unit/api/controllers/v1/test_execution.py | 21 +- .../tests/unit/engine/test_default_engine.py | 252 +++++++++--------- .../orchestrator/kubernetes/test_manager.py | 180 ++++++++----- qinling/tests/unit/services/test_periodics.py | 44 ++- qinling/utils/etcd_util.py | 31 +-- qinling/utils/executions.py | 73 +++-- 20 files changed, 580 insertions(+), 346 deletions(-) diff --git a/qinling/api/controllers/v1/execution.py b/qinling/api/controllers/v1/execution.py index b91d9cd2..df64ba60 100644 --- a/qinling/api/controllers/v1/execution.py +++ b/qinling/api/controllers/v1/execution.py @@ -56,6 +56,9 @@ class ExecutionsController(rest.RestController): status_code=201 ) def post(self, body): + ctx = context.get_ctx() + acl.enforce('execution:create', ctx) + params = body.to_dict() LOG.info("Creating %s. [params=%s]", self.type, params) diff --git a/qinling/api/controllers/v1/function.py b/qinling/api/controllers/v1/function.py index 396e09b9..cb9c3859 100644 --- a/qinling/api/controllers/v1/function.py +++ b/qinling/api/controllers/v1/function.py @@ -55,12 +55,12 @@ class FunctionWorkerController(rest.RestController): acl.enforce('function_worker:get_all', context.get_ctx()) LOG.info("Get workers for function %s.", function_id) - workers = etcd_util.get_workers(function_id, CONF) + workers = etcd_util.get_workers(function_id) workers = [ resources.FunctionWorker.from_dict( {'function_id': function_id, 'worker_name': w} ) for w in workers - ] + ] return resources.FunctionWorkers(workers=workers) diff --git a/qinling/api/controllers/v1/resources.py b/qinling/api/controllers/v1/resources.py index 37c5b11f..b9c95504 100644 --- a/qinling/api/controllers/v1/resources.py +++ b/qinling/api/controllers/v1/resources.py @@ -278,6 +278,7 @@ class Runtimes(ResourceList): class Execution(Resource): id = types.uuid function_id = wsme.wsattr(types.uuid, mandatory=True) + function_version = wsme.wsattr(int, default=0) description = wtypes.text status = wsme.wsattr(wtypes.text, readonly=True) sync = bool @@ -303,21 +304,6 @@ class Execution(Resource): return obj - @classmethod - def sample(cls): - return cls( - id='123e4567-e89b-12d3-a456-426655440000', - function_id='123e4567-e89b-12d3-a456-426655440000', - description='this is the first execution.', - status='success', - sync=True, - input={'data': 'hello, world'}, - result={'result': 'hello, world'}, - project_id='default', - created_at='1970-01-01T00:00:00.000000', - updated_at='1970-01-01T00:00:00.000000' - ) - class Executions(ResourceList): executions = [Execution] @@ -327,18 +313,6 @@ class Executions(ResourceList): super(Executions, self).__init__(**kwargs) - @classmethod - def sample(cls): - sample = cls() - sample.executions = [Execution.sample()] - sample.next = ( - "http://localhost:7070/v1/executions?" - "sort_keys=id,name&sort_dirs=asc,desc&limit=10&" - "marker=123e4567-e89b-12d3-a456-426655440000" - ) - - return sample - class Job(Resource): id = types.uuid @@ -420,6 +394,7 @@ class FunctionVersion(Resource): id = types.uuid description = wtypes.text version_number = wsme.wsattr(int, readonly=True) + count = wsme.wsattr(int, readonly=True) project_id = wsme.wsattr(wtypes.text, readonly=True) created_at = wsme.wsattr(wtypes.text, readonly=True) updated_at = wsme.wsattr(wtypes.text, readonly=True) diff --git a/qinling/db/api.py b/qinling/db/api.py index 4fc2100b..a1021ae1 100644 --- a/qinling/db/api.py +++ b/qinling/db/api.py @@ -212,5 +212,14 @@ def get_function_version(function_id, version): return IMPL.get_function_version(function_id, version) +# This function is only used in unit test. +def update_function_version(function_id, version, **kwargs): + return IMPL.update_function_version(function_id, version, **kwargs) + + def delete_function_version(function_id, version): return IMPL.delete_function_version(function_id, version) + + +def get_function_versions(**kwargs): + return IMPL.get_function_versions(**kwargs) diff --git a/qinling/db/sqlalchemy/api.py b/qinling/db/sqlalchemy/api.py index f7ce3dc2..650509ea 100644 --- a/qinling/db/sqlalchemy/api.py +++ b/qinling/db/sqlalchemy/api.py @@ -529,7 +529,21 @@ def get_function_version(function_id, version, session=None): return version_db +# This function is only used in unit test. +@db_base.session_aware() +def update_function_version(function_id, version, session=None, **kwargs): + version_db = get_function_version(function_id, version, session=session) + version_db.update(kwargs.copy()) + + return version_db + + @db_base.session_aware() def delete_function_version(function_id, version, session=None): version_db = get_function_version(function_id, version) session.delete(version_db) + + +@db_base.session_aware() +def get_function_versions(session=None, **kwargs): + return _get_collection_sorted_by_time(models.FunctionVersion, **kwargs) diff --git a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py index 13f6d982..571aedc6 100644 --- a/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py +++ b/qinling/db/sqlalchemy/migration/alembic_migrations/versions/002_add_function_version_support.py @@ -38,6 +38,7 @@ def upgrade(): sa.Column('function_id', sa.String(length=36), nullable=False), sa.Column('description', sa.String(length=255), nullable=True), sa.Column('version_number', sa.Integer, nullable=False), + sa.Column('count', sa.Integer, nullable=False), sa.PrimaryKeyConstraint('id'), sa.ForeignKeyConstraint(['function_id'], [u'functions.id']), sa.UniqueConstraint('function_id', 'version_number', 'project_id'), diff --git a/qinling/db/sqlalchemy/models.py b/qinling/db/sqlalchemy/models.py index 26217afa..f018ba83 100644 --- a/qinling/db/sqlalchemy/models.py +++ b/qinling/db/sqlalchemy/models.py @@ -119,6 +119,7 @@ class FunctionVersion(model_base.QinlingSecureModelBase): function = relationship('Function', back_populates="versions") description = sa.Column(sa.String(255), nullable=True) version_number = sa.Column(sa.Integer, default=0) + count = sa.Column(sa.Integer, default=0) Runtime.functions = relationship("Function", back_populates="runtime") diff --git a/qinling/engine/default_engine.py b/qinling/engine/default_engine.py index 90f196fc..3b4d1a29 100644 --- a/qinling/engine/default_engine.py +++ b/qinling/engine/default_engine.py @@ -89,33 +89,41 @@ class DefaultEngine(object): stop=tenacity.stop_after_attempt(30), retry=(tenacity.retry_if_result(lambda result: result is False)) ) - def function_load_check(self, function_id, runtime_id): - with etcd_util.get_worker_lock() as lock: + def function_load_check(self, function_id, version, runtime_id): + """Check function load and scale the workers if needed. + + :return: None if no need to scale up otherwise return the service url + """ + with etcd_util.get_worker_lock(function_id, version) as lock: if not lock.is_acquired(): return False - workers = etcd_util.get_workers(function_id) + workers = etcd_util.get_workers(function_id, version) running_execs = db_api.get_executions( - function_id=function_id, status=status.RUNNING + function_id=function_id, + function_version=version, + status=status.RUNNING ) concurrency = (len(running_execs) or 1) / (len(workers) or 1) if (len(workers) == 0 or concurrency > CONF.engine.function_concurrency): LOG.info( - 'Scale up function %s. Current concurrency: %s, execution ' - 'number %s, worker number %s', - function_id, concurrency, len(running_execs), len(workers) + 'Scale up function %s(version %s). Current concurrency: ' + '%s, execution number %s, worker number %s', + function_id, version, concurrency, len(running_execs), + len(workers) ) # NOTE(kong): The increase step could be configurable - return self.scaleup_function(None, function_id, runtime_id, 1) + return self.scaleup_function(None, function_id, version, + runtime_id, 1) - def create_execution(self, ctx, execution_id, function_id, runtime_id, - input=None): + def create_execution(self, ctx, execution_id, function_id, + function_version, runtime_id, input=None): LOG.info( 'Creating execution. execution_id=%s, function_id=%s, ' - 'runtime_id=%s, input=%s', - execution_id, function_id, runtime_id, input + 'function_version=%s, runtime_id=%s, input=%s', + execution_id, function_id, function_version, runtime_id, input ) function = db_api.get_function(function_id) @@ -129,22 +137,25 @@ class DefaultEngine(object): # Auto scale workers if needed if not is_image_source: try: - svc_url = self.function_load_check(function_id, runtime_id) + svc_url = self.function_load_check(function_id, + function_version, + runtime_id) except exc.OrchestratorException as e: utils.handle_execution_exception(execution_id, str(e)) return - temp_url = etcd_util.get_service_url(function_id) + temp_url = etcd_util.get_service_url(function_id, function_version) svc_url = svc_url or temp_url if svc_url: func_url = '%s/execute' % svc_url LOG.debug( - 'Found service url for function: %s, execution: %s, url: %s', - function_id, execution_id, func_url + 'Found service url for function: %s(version %s), execution: ' + '%s, url: %s', + function_id, function_version, execution_id, func_url ) data = utils.get_request_data( - CONF, function_id, execution_id, + CONF, function_id, function_version, execution_id, input, function.entry, function.trust_id, self.qinling_endpoint ) @@ -152,12 +163,13 @@ class DefaultEngine(object): self.session, func_url, body=data ) - utils.finish_execution( - execution_id, success, res, is_image_source=is_image_source) + utils.finish_execution(execution_id, success, res, + is_image_source=is_image_source) return if source == constants.IMAGE_FUNCTION: image = function.code['image'] + # Be consistent with k8s naming convention identifier = ('%s-%s' % (common.generate_unicode_uuid(dashed=False), function_id) @@ -167,8 +179,13 @@ class DefaultEngine(object): labels = {'runtime_id': runtime_id} try: + # For image function, it will be executed inside this method; for + # package type function it only sets up underlying resources and + # get a service url. If the service url is already created + # beforehand, nothing happens. _, svc_url = self.orchestrator.prepare_execution( function_id, + function_version, image=image, identifier=identifier, labels=labels, @@ -178,9 +195,12 @@ class DefaultEngine(object): utils.handle_execution_exception(execution_id, str(e)) return + # For image type function, read the worker log; For package type + # function, invoke and get log success, res = self.orchestrator.run_execution( execution_id, function_id, + function_version, input=input, identifier=identifier, service_url=svc_url, @@ -188,34 +208,43 @@ class DefaultEngine(object): trust_id=function.trust_id ) - utils.finish_execution( - execution_id, success, res, is_image_source=is_image_source) + utils.finish_execution(execution_id, success, res, + is_image_source=is_image_source) - def delete_function(self, ctx, function_id): + def delete_function(self, ctx, function_id, function_version=0): """Deletes underlying resources allocated for function.""" - LOG.info('Start to delete function %s.', function_id) + LOG.info('Start to delete function %s(version %s).', function_id, + function_version) - self.orchestrator.delete_function(function_id) + self.orchestrator.delete_function(function_id, function_version) - LOG.info('Deleted function %s.', function_id) + LOG.info('Deleted function %s(version %s).', function_id, + function_version) - def scaleup_function(self, ctx, function_id, runtime_id, count=1): + def scaleup_function(self, ctx, function_id, function_version, runtime_id, + count=1): worker_names, service_url = self.orchestrator.scaleup_function( function_id, + function_version, identifier=runtime_id, count=count ) for name in worker_names: - etcd_util.create_worker(function_id, name) + etcd_util.create_worker(function_id, name, + version=function_version) - etcd_util.create_service_url(function_id, service_url) + etcd_util.create_service_url(function_id, service_url, + version=function_version) + + LOG.info('Finished scaling up function %s(version %s).', function_id, + function_version) - LOG.info('Finished scaling up function %s.', function_id) return service_url - def scaledown_function(self, ctx, function_id, count=1): - workers = etcd_util.get_workers(function_id) + def scaledown_function(self, ctx, function_id, function_version=0, + count=1): + workers = etcd_util.get_workers(function_id, function_version) worker_deleted_num = ( count if len(workers) > count else len(workers) - 1 ) @@ -224,6 +253,8 @@ class DefaultEngine(object): for worker in workers: LOG.debug('Removing worker %s', worker) self.orchestrator.delete_worker(worker) - etcd_util.delete_worker(function_id, worker) + etcd_util.delete_worker(function_id, worker, + version=function_version) - LOG.info('Finished scaling down function %s.', function_id) + LOG.info('Finished scaling down function %s(version %s).', function_id, + function_version) diff --git a/qinling/engine/utils.py b/qinling/engine/utils.py index a80a7385..e61e8c46 100644 --- a/qinling/engine/utils.py +++ b/qinling/engine/utils.py @@ -74,17 +74,29 @@ def url_request(request_session, url, body=None): return False, {'error': 'Internal service error.'} -def get_request_data(conf, function_id, execution_id, input, entry, trust_id, - qinling_endpoint): +def get_request_data(conf, function_id, version, execution_id, input, entry, + trust_id, qinling_endpoint): + """Prepare the request body should send to the worker.""" ctx = context.get_ctx() - download_url = ( - '%s/%s/functions/%s?download=true' % - (qinling_endpoint.strip('/'), constants.CURRENT_VERSION, function_id) - ) + + if version == 0: + download_url = ( + '%s/%s/functions/%s?download=true' % + (qinling_endpoint.strip('/'), constants.CURRENT_VERSION, + function_id) + ) + else: + download_url = ( + '%s/%s/functions/%s/versions/%s?download=true' % + (qinling_endpoint.strip('/'), constants.CURRENT_VERSION, + function_id, version) + ) + data = { 'execution_id': execution_id, 'input': input, 'function_id': function_id, + 'function_version': version, 'entry': entry, 'download_url': download_url, 'request_id': ctx.request_id, diff --git a/qinling/orchestrator/base.py b/qinling/orchestrator/base.py index 6f858ec8..b9f80858 100644 --- a/qinling/orchestrator/base.py +++ b/qinling/orchestrator/base.py @@ -39,19 +39,20 @@ class OrchestratorBase(object): raise NotImplementedError @abc.abstractmethod - def prepare_execution(self, function_id, **kwargs): + def prepare_execution(self, function_id, function_version, **kwargs): raise NotImplementedError @abc.abstractmethod - def run_execution(self, execution_id, function_id, **kwargs): + def run_execution(self, execution_id, function_id, function_version, + **kwargs): raise NotImplementedError @abc.abstractmethod - def delete_function(self, function_id, **kwargs): + def delete_function(self, function_id, function_version, **kwargs): raise NotImplementedError @abc.abstractmethod - def scaleup_function(self, function_id, **kwargs): + def scaleup_function(self, function_id, function_version, **kwargs): raise NotImplementedError @abc.abstractmethod diff --git a/qinling/orchestrator/kubernetes/manager.py b/qinling/orchestrator/kubernetes/manager.py index 2e27d4c6..a236c646 100644 --- a/qinling/orchestrator/kubernetes/manager.py +++ b/qinling/orchestrator/kubernetes/manager.py @@ -29,6 +29,7 @@ from qinling.orchestrator import base from qinling.orchestrator.kubernetes import utils as k8s_util from qinling.utils import common + LOG = logging.getLogger(__name__) TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) + '/templates/') @@ -218,17 +219,20 @@ class KubernetesManager(base.OrchestratorBase): return True - def _choose_available_pods(self, labels, count=1, function_id=None): + def _choose_available_pods(self, labels, count=1, function_id=None, + function_version=0): # If there is already a pod for function, reuse it. if function_id: ret = self.v1.list_namespaced_pod( self.conf.kubernetes.namespace, - label_selector='function_id=%s' % function_id + label_selector='function_id=%s,function_version=%s' % + (function_id, function_version) ) if len(ret.items) >= count: LOG.debug( - "Function %s already associates to a pod with at least " - "%d worker(s). ", function_id, count + "Function %s(version %s) already associates to a pod with " + "at least %d worker(s). ", + function_id, function_version, count ) return ret.items[:count] @@ -243,7 +247,8 @@ class KubernetesManager(base.OrchestratorBase): return ret.items[-count:] - def _prepare_pod(self, pod, deployment_name, function_id, labels=None): + def _prepare_pod(self, pod, deployment_name, function_id, version, + labels=None): """Pod preparation. 1. Update pod labels. @@ -253,16 +258,22 @@ class KubernetesManager(base.OrchestratorBase): labels = labels or {} LOG.info( - 'Prepare pod %s in deployment %s for function %s', - pod_name, deployment_name, function_id + 'Prepare pod %s in deployment %s for function %s(version %s)', + pod_name, deployment_name, function_id, version ) # Update pod label. - pod_labels = self._update_pod_label(pod, {'function_id': function_id}) + pod_labels = self._update_pod_label( + pod, + # pod label value should be string + {'function_id': function_id, 'function_version': str(version)} + ) # Create service for the chosen pod. - service_name = "service-%s" % function_id - labels.update({'function_id': function_id}) + service_name = "service-%s-%s" % (function_id, version) + labels.update( + {'function_id': function_id, 'function_version': str(version)} + ) # TODO(kong): Make the service type configurable. service_body = self.service_template.render( @@ -314,6 +325,7 @@ class KubernetesManager(base.OrchestratorBase): return pod_name, pod_service_url def _create_pod(self, image, pod_name, labels, input): + """Create pod for image type function.""" if not input: input_list = [] elif isinstance(input, dict) and input.get('__function_input'): @@ -357,15 +369,17 @@ class KubernetesManager(base.OrchestratorBase): return pod_labels - def prepare_execution(self, function_id, image=None, identifier=None, - labels=None, input=None): - """Prepare service URL for function. + def prepare_execution(self, function_id, version, image=None, + identifier=None, labels=None, input=None): + """Prepare service URL for function version. For image function, create a single pod with input, so the function will be executed. For normal function, choose a pod from the pool and expose a service, return the service URL. + + return a tuple includes pod name and the servise url. """ pods = None @@ -375,7 +389,8 @@ class KubernetesManager(base.OrchestratorBase): self._create_pod(image, identifier, labels, input) return identifier, None else: - pods = self._choose_available_pods(labels, function_id=function_id) + pods = self._choose_available_pods(labels, function_id=function_id, + function_version=version) if not pods: LOG.critical('No worker available.') @@ -383,31 +398,35 @@ class KubernetesManager(base.OrchestratorBase): try: pod_name, url = self._prepare_pod( - pods[0], identifier, function_id, labels + pods[0], identifier, function_id, version, labels ) return pod_name, url except Exception: LOG.exception('Pod preparation failed.') - self.delete_function(function_id, labels) + self.delete_function(function_id, version, labels) raise exc.OrchestratorException('Execution preparation failed.') - def run_execution(self, execution_id, function_id, input=None, + def run_execution(self, execution_id, function_id, version, input=None, identifier=None, service_url=None, entry='main.main', trust_id=None): - """Run execution and get output.""" + """Run execution. + + Return a tuple including the result and the output. + """ if service_url: func_url = '%s/execute' % service_url data = utils.get_request_data( - self.conf, function_id, execution_id, input, entry, trust_id, - self.qinling_endpoint + self.conf, function_id, version, execution_id, input, entry, + trust_id, self.qinling_endpoint ) LOG.debug( - 'Invoke function %s, url: %s, data: %s', - function_id, func_url, data + 'Invoke function %s(version %s), url: %s, data: %s', + function_id, version, func_url, data ) return utils.url_request(self.session, func_url, body=data) else: + # Wait for image type function execution to be finished def _wait_complete(): pod = self.v1.read_namespaced_pod( identifier, @@ -437,8 +456,17 @@ class KubernetesManager(base.OrchestratorBase): ) return True, output - def delete_function(self, function_id, labels=None): - labels = labels or {'function_id': function_id} + def delete_function(self, function_id, version, labels=None): + """Delete related resources for function. + + - Delete service + - Delete pods + """ + pre_label = { + 'function_id': function_id, + 'function_version': str(version) + } + labels = labels or pre_label selector = common.convert_dict_to_string(labels) ret = self.v1.list_namespaced_service( @@ -456,7 +484,7 @@ class KubernetesManager(base.OrchestratorBase): label_selector=selector ) - def scaleup_function(self, function_id, identifier=None, count=1): + def scaleup_function(self, function_id, version, identifier=None, count=1): pod_names = [] labels = {'runtime_id': identifier} pods = self._choose_available_pods(labels, count=count) @@ -466,11 +494,13 @@ class KubernetesManager(base.OrchestratorBase): for pod in pods: pod_name, service_url = self._prepare_pod( - pod, identifier, function_id, labels + pod, identifier, function_id, version, labels ) pod_names.append(pod_name) - LOG.info('Pods scaled up for function %s: %s', function_id, pod_names) + LOG.info('Pods scaled up for function %s(version %s): %s', function_id, + version, pod_names) + return pod_names, service_url def delete_worker(self, pod_name, **kwargs): diff --git a/qinling/orchestrator/kubernetes/templates/service.j2 b/qinling/orchestrator/kubernetes/templates/service.j2 index 84be4094..e2452fec 100644 --- a/qinling/orchestrator/kubernetes/templates/service.j2 +++ b/qinling/orchestrator/kubernetes/templates/service.j2 @@ -4,7 +4,7 @@ metadata: name: {{ service_name }} labels: {% for key, value in labels.items() %} - {{ key }}: {{ value }} + {{ key }}: "{{ value }}" {% endfor %} spec: type: NodePort diff --git a/qinling/rpc.py b/qinling/rpc.py index afbf5fc4..86fd2fc5 100644 --- a/qinling/rpc.py +++ b/qinling/rpc.py @@ -153,7 +153,7 @@ class EngineClient(object): ) @wrap_messaging_exception - def create_execution(self, execution_id, function_id, runtime_id, + def create_execution(self, execution_id, function_id, version, runtime_id, input=None, is_sync=True): method_client = self._client.prepare(topic=self.topic, server=None) @@ -163,6 +163,7 @@ class EngineClient(object): 'create_execution', execution_id=execution_id, function_id=function_id, + function_version=version, runtime_id=runtime_id, input=input ) @@ -172,33 +173,37 @@ class EngineClient(object): 'create_execution', execution_id=execution_id, function_id=function_id, + function_version=version, runtime_id=runtime_id, input=input ) @wrap_messaging_exception - def delete_function(self, id): + def delete_function(self, id, version=0): return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), 'delete_function', - function_id=id + function_id=id, + function_version=version ) @wrap_messaging_exception - def scaleup_function(self, id, runtime_id, count=1): + def scaleup_function(self, id, runtime_id, version=0, count=1): return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), 'scaleup_function', function_id=id, runtime_id=runtime_id, + function_version=version, count=count ) @wrap_messaging_exception - def scaledown_function(self, id, count=1): + def scaledown_function(self, id, version=0, count=1): return self._client.prepare(topic=self.topic, server=None).cast( ctx.get_ctx(), 'scaledown_function', function_id=id, + function_version=version, count=count ) diff --git a/qinling/services/periodics.py b/qinling/services/periodics.py index f352e282..fb582a88 100644 --- a/qinling/services/periodics.py +++ b/qinling/services/periodics.py @@ -51,24 +51,43 @@ def handle_function_service_expiration(ctx, engine): results = db_api.get_functions( sort_keys=['updated_at'], insecure=True, - updated_at={'lte': expiry_time} + updated_at={'lte': expiry_time}, + latest_version=0 ) - if len(results) == 0: - return for func_db in results: - if not etcd_util.get_service_url(func_db.id): + if not etcd_util.get_service_url(func_db.id, 0): continue LOG.info( - 'Deleting service mapping and workers for function %s', + 'Deleting service mapping and workers for function %s(version 0)', func_db.id ) # Delete resources related to the function - engine.delete_function(ctx, func_db.id) + engine.delete_function(ctx, func_db.id, 0) # Delete etcd keys - etcd_util.delete_function(func_db.id) + etcd_util.delete_function(func_db.id, 0) + + versions = db_api.get_function_versions( + sort_keys=['updated_at'], + insecure=True, + updated_at={'lte': expiry_time}, + ) + + for v in versions: + if not etcd_util.get_service_url(v.function_id, v.version_number): + continue + + LOG.info( + 'Deleting service mapping and workers for function %s(version %s)', + v.function_id, v.version_number + ) + + # Delete resources related to the function + engine.delete_function(ctx, v.function_id, v.version_number) + # Delete etcd keys + etcd_util.delete_function(v.function_id, v.version_number) @periodics.periodic(3) diff --git a/qinling/tests/unit/api/controllers/v1/test_execution.py b/qinling/tests/unit/api/controllers/v1/test_execution.py index 48ee34df..8369182d 100644 --- a/qinling/tests/unit/api/controllers/v1/test_execution.py +++ b/qinling/tests/unit/api/controllers/v1/test_execution.py @@ -14,6 +14,7 @@ import mock +from qinling.db import api as db_api from qinling import exceptions as exc from qinling import status from qinling.tests.unit.api import base @@ -40,7 +41,25 @@ class TestExecutionController(base.APITest): self.assertEqual(1, resp.json.get('count')) @mock.patch('qinling.rpc.EngineClient.create_execution') - def test_create_rpc_error(self, mock_create_execution): + def test_post_with_version(self, mock_rpc): + db_api.increase_function_version(self.func_id, 0, + description="version 1") + body = { + 'function_id': self.func_id, + 'function_version': 1 + } + + resp = self.app.post_json('/v1/executions', body) + self.assertEqual(201, resp.status_int) + + resp = self.app.get('/v1/functions/%s' % self.func_id) + self.assertEqual(0, resp.json.get('count')) + + resp = self.app.get('/v1/functions/%s/versions/1' % self.func_id) + self.assertEqual(1, resp.json.get('count')) + + @mock.patch('qinling.rpc.EngineClient.create_execution') + def test_post_rpc_error(self, mock_create_execution): mock_create_execution.side_effect = exc.QinlingException body = { 'function_id': self.func_id, diff --git a/qinling/tests/unit/engine/test_default_engine.py b/qinling/tests/unit/engine/test_default_engine.py index f5a7e781..79b9cef0 100644 --- a/qinling/tests/unit/engine/test_default_engine.py +++ b/qinling/tests/unit/engine/test_default_engine.py @@ -29,10 +29,11 @@ class TestDefaultEngine(base.DbTestCase): self.orchestrator = mock.Mock() self.qinling_endpoint = 'http://127.0.0.1:7070' self.default_engine = default_engine.DefaultEngine( - self.orchestrator, self.qinling_endpoint) + self.orchestrator, self.qinling_endpoint + ) def _create_running_executions(self, function_id, num): - for _i in range(num): + for _ in range(num): self.create_execution(function_id=function_id) def test_create_runtime(self): @@ -110,113 +111,91 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(runtime.image, pre_image) self.assertEqual(runtime.status, status.AVAILABLE) + @mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function') @mock.patch('qinling.utils.etcd_util.get_workers') @mock.patch('qinling.utils.etcd_util.get_worker_lock') - def test_function_load_check_no_worker_scaleup( - self, - etcd_util_get_worker_lock_mock, - etcd_util_get_workers_mock - ): + def test_function_load_check_no_worker(self, mock_getlock, mock_getworkers, + mock_scaleup): function_id = common.generate_unicode_uuid() runtime_id = common.generate_unicode_uuid() lock = mock.Mock() - ( - etcd_util_get_worker_lock_mock.return_value.__enter__.return_value - ) = lock lock.is_acquired.return_value = True - etcd_util_get_workers_mock.return_value = [] # len(workers) = 0 - self.default_engine.scaleup_function = mock.Mock() + mock_getlock.return_value.__enter__.return_value = lock + mock_getworkers.return_value = [] - self.default_engine.function_load_check(function_id, runtime_id) + self.default_engine.function_load_check(function_id, 0, runtime_id) - etcd_util_get_workers_mock.assert_called_once_with(function_id) - self.default_engine.scaleup_function.assert_called_once_with( - None, function_id, runtime_id, 1) + mock_getworkers.assert_called_once_with(function_id, 0) + mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id, + 1) + @mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function') @mock.patch('qinling.utils.etcd_util.get_workers') @mock.patch('qinling.utils.etcd_util.get_worker_lock') - def test_function_load_check_concurrency_scaleup( - self, - etcd_util_get_worker_lock_mock, - etcd_util_get_workers_mock - ): + def test_function_load_check_scaleup(self, mock_getlock, mock_getworkers, + mock_scaleup): function = self.create_function() function_id = function.id runtime_id = function.runtime_id lock = mock.Mock() - ( - etcd_util_get_worker_lock_mock.return_value.__enter__.return_value - ) = lock lock.is_acquired.return_value = True + mock_getlock.return_value.__enter__.return_value = lock + # The default concurrency is 3, we use 4 running executions against # 1 worker so that there will be a scaling up. - etcd_util_get_workers_mock.return_value = range(1) + mock_getworkers.return_value = ['worker1'] self._create_running_executions(function_id, 4) - self.default_engine.scaleup_function = mock.Mock() - self.default_engine.function_load_check(function_id, runtime_id) + self.default_engine.function_load_check(function_id, 0, runtime_id) - etcd_util_get_workers_mock.assert_called_once_with(function_id) - self.default_engine.scaleup_function.assert_called_once_with( - None, function_id, runtime_id, 1) + mock_getworkers.assert_called_once_with(function_id, 0) + mock_scaleup.assert_called_once_with(None, function_id, 0, runtime_id, + 1) + @mock.patch('qinling.engine.default_engine.DefaultEngine.scaleup_function') @mock.patch('qinling.utils.etcd_util.get_workers') @mock.patch('qinling.utils.etcd_util.get_worker_lock') - def test_function_load_check_not_scaleup( - self, - etcd_util_get_worker_lock_mock, - etcd_util_get_workers_mock - ): + def test_function_load_check_not_scaleup(self, mock_getlock, + mock_getworkers, mock_scaleup): function = self.create_function() function_id = function.id runtime_id = function.runtime_id lock = mock.Mock() - ( - etcd_util_get_worker_lock_mock.return_value.__enter__.return_value - ) = lock lock.is_acquired.return_value = True + mock_getlock.return_value.__enter__.return_value = lock + # The default concurrency is 3, we use 3 running executions against # 1 worker so that there won't be a scaling up. - etcd_util_get_workers_mock.return_value = range(1) + mock_getworkers.return_value = ['worker1'] self._create_running_executions(function_id, 3) - self.default_engine.scaleup_function = mock.Mock() - self.default_engine.function_load_check(function_id, runtime_id) + self.default_engine.function_load_check(function_id, 0, runtime_id) - etcd_util_get_workers_mock.assert_called_once_with(function_id) - self.default_engine.scaleup_function.assert_not_called() + mock_getworkers.assert_called_once_with(function_id, 0) + mock_scaleup.assert_not_called() @mock.patch('qinling.utils.etcd_util.get_workers') @mock.patch('qinling.utils.etcd_util.get_worker_lock') - def test_function_load_check_lock_wait( - self, - etcd_util_get_worker_lock_mock, - etcd_util_get_workers_mock - ): + def test_function_load_check_lock_wait(self, mock_getlock, + mock_getworkers): function = self.create_function() function_id = function.id runtime_id = function.runtime_id lock = mock.Mock() - ( - etcd_util_get_worker_lock_mock.return_value.__enter__.return_value - ) = lock + mock_getlock.return_value.__enter__.return_value = lock # Lock is acquired upon the third try. lock.is_acquired.side_effect = [False, False, True] - etcd_util_get_workers_mock.return_value = range(1) + mock_getworkers.return_value = ['worker1'] self._create_running_executions(function_id, 3) - self.default_engine.scaleup_function = mock.Mock() - self.default_engine.function_load_check(function_id, runtime_id) + self.default_engine.function_load_check(function_id, 0, runtime_id) self.assertEqual(3, lock.is_acquired.call_count) - etcd_util_get_workers_mock.assert_called_once_with(function_id) - self.default_engine.scaleup_function.assert_not_called() + mock_getworkers.assert_called_once_with(function_id, 0) @mock.patch('qinling.utils.etcd_util.get_service_url') - def test_create_execution( - self, - etcd_util_get_service_url_mock - ): + def test_create_execution_image_type_function(self, mock_svc_url): + """Create 2 executions for an image type function.""" function = self.create_function() function_id = function.id runtime_id = function.runtime_id @@ -234,42 +213,47 @@ class TestDefaultEngine(base.DbTestCase): execution_1_id = execution_1.id execution_2 = self.create_execution(function_id=function_id) execution_2_id = execution_2.id - self.default_engine.function_load_check = mock.Mock() - etcd_util_get_service_url_mock.return_value = None + mock_svc_url.return_value = None self.orchestrator.prepare_execution.return_value = ( mock.Mock(), None) self.orchestrator.run_execution.side_effect = [ (True, 'success result'), (False, 'failed result')] - # Try create two executions, with different results + # Create two executions, with different results self.default_engine.create_execution( - mock.Mock(), execution_1_id, function_id, runtime_id) + mock.Mock(), execution_1_id, function_id, 0, runtime_id + ) self.default_engine.create_execution( - mock.Mock(), execution_2_id, function_id, runtime_id, - input='input') + mock.Mock(), execution_2_id, function_id, 0, runtime_id, + input='input' + ) - self.default_engine.function_load_check.assert_not_called() get_service_url_calls = [ - mock.call(function_id), mock.call(function_id)] - etcd_util_get_service_url_mock.assert_has_calls(get_service_url_calls) - self.assertEqual(2, etcd_util_get_service_url_mock.call_count) + mock.call(function_id, 0), mock.call(function_id, 0) + ] + mock_svc_url.assert_has_calls(get_service_url_calls) + prepare_calls = [ mock.call(function_id, + 0, image=function.code['image'], identifier=mock.ANY, labels=None, input=None), mock.call(function_id, + 0, image=function.code['image'], identifier=mock.ANY, labels=None, - input='input')] + input='input') + ] self.orchestrator.prepare_execution.assert_has_calls(prepare_calls) - self.assertEqual(2, self.orchestrator.prepare_execution.call_count) + run_calls = [ mock.call(execution_1_id, function_id, + 0, input=None, identifier=mock.ANY, service_url=None, @@ -277,15 +261,18 @@ class TestDefaultEngine(base.DbTestCase): trust_id=function.trust_id), mock.call(execution_2_id, function_id, + 0, input='input', identifier=mock.ANY, service_url=None, entry=function.entry, - trust_id=function.trust_id)] + trust_id=function.trust_id) + ] self.orchestrator.run_execution.assert_has_calls(run_calls) - self.assertEqual(2, self.orchestrator.run_execution.call_count) + execution_1 = db_api.get_execution(execution_1_id) execution_2 = db_api.get_execution(execution_2_id) + self.assertEqual(execution_1.status, status.SUCCESS) self.assertEqual(execution_1.logs, '') self.assertEqual(execution_1.result, {'output': 'success result'}) @@ -298,6 +285,11 @@ class TestDefaultEngine(base.DbTestCase): self, etcd_util_get_service_url_mock ): + """test_create_execution_prepare_execution_exception + + Create execution for image type function, prepare_execution method + raises exception. + """ function = self.create_function() function_id = function.id runtime_id = function.runtime_id @@ -320,7 +312,7 @@ class TestDefaultEngine(base.DbTestCase): etcd_util_get_service_url_mock.return_value = None self.default_engine.create_execution( - mock.Mock(), execution_id, function_id, runtime_id) + mock.Mock(), execution_id, function_id, 0, runtime_id) execution = db_api.get_execution(execution_id) self.assertEqual(execution.status, status.ERROR) @@ -328,9 +320,9 @@ class TestDefaultEngine(base.DbTestCase): self.assertEqual(execution.result, {}) @mock.patch('qinling.utils.etcd_util.get_service_url') - def test_create_execution_not_image_source( - self, - etcd_util_get_service_url_mock + def test_create_execution_package_type_function( + self, + etcd_util_get_service_url_mock ): function = self.create_function() function_id = function.id @@ -347,24 +339,26 @@ class TestDefaultEngine(base.DbTestCase): 'output': 'success output'}) self.default_engine.create_execution( - mock.Mock(), execution_id, function_id, runtime_id) + mock.Mock(), execution_id, function_id, 0, runtime_id) self.default_engine.function_load_check.assert_called_once_with( - function_id, runtime_id) - etcd_util_get_service_url_mock.assert_called_once_with(function_id) + function_id, 0, runtime_id) + etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) self.orchestrator.prepare_execution.assert_called_once_with( - function_id, image=None, identifier=runtime_id, + function_id, 0, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}, input=None) self.orchestrator.run_execution.assert_called_once_with( - execution_id, function_id, input=None, identifier=runtime_id, + execution_id, function_id, 0, input=None, identifier=runtime_id, service_url='svc_url', entry=function.entry, trust_id=function.trust_id) + execution = db_api.get_execution(execution_id) + self.assertEqual(execution.status, status.SUCCESS) self.assertEqual(execution.logs, 'execution log') self.assertEqual(execution.result, {'output': 'success output'}) - def test_create_execution_not_image_source_scaleup_exception(self): + def test_create_execution_loadcheck_exception(self): function = self.create_function() function_id = function.id runtime_id = function.runtime_id @@ -377,9 +371,10 @@ class TestDefaultEngine(base.DbTestCase): ) self.default_engine.create_execution( - mock.Mock(), execution_id, function_id, runtime_id) + mock.Mock(), execution_id, function_id, 0, runtime_id) execution = db_api.get_execution(execution_id) + self.assertEqual(execution.status, status.ERROR) self.assertEqual(execution.logs, '') self.assertEqual(execution.result, {}) @@ -388,10 +383,10 @@ class TestDefaultEngine(base.DbTestCase): @mock.patch('qinling.engine.utils.url_request') @mock.patch('qinling.utils.etcd_util.get_service_url') def test_create_execution_found_service_url( - self, - etcd_util_get_service_url_mock, - engine_utils_url_request_mock, - engine_utils_get_request_data_mock + self, + etcd_util_get_service_url_mock, + engine_utils_url_request_mock, + engine_utils_get_request_data_mock ): function = self.create_function() function_id = function.id @@ -407,18 +402,21 @@ class TestDefaultEngine(base.DbTestCase): 'output': 'failed output'}) self.default_engine.create_execution( - mock.Mock(), execution_id, function_id, runtime_id, input='input') + mock.Mock(), execution_id, function_id, 0, runtime_id, + input='input') self.default_engine.function_load_check.assert_called_once_with( - function_id, runtime_id) - etcd_util_get_service_url_mock.assert_called_once_with(function_id) + function_id, 0, runtime_id) + etcd_util_get_service_url_mock.assert_called_once_with(function_id, 0) engine_utils_get_request_data_mock.assert_called_once_with( - mock.ANY, function_id, execution_id, + mock.ANY, function_id, 0, execution_id, 'input', function.entry, function.trust_id, self.qinling_endpoint) engine_utils_url_request_mock.assert_called_once_with( self.default_engine.session, 'svc_url/execute', body='data') + execution = db_api.get_execution(execution_id) + self.assertEqual(execution.status, status.FAILED) self.assertEqual(execution.logs, 'execution log') self.assertEqual(execution.result, @@ -430,35 +428,36 @@ class TestDefaultEngine(base.DbTestCase): self.default_engine.delete_function(mock.Mock(), function_id) self.orchestrator.delete_function.assert_called_once_with( - function_id) + function_id, 0 + ) @mock.patch('qinling.utils.etcd_util.create_service_url') @mock.patch('qinling.utils.etcd_util.create_worker') def test_scaleup_function( - self, - etcd_util_create_worker_mock, - etcd_util_create_service_url_mock + self, + etcd_util_create_worker_mock, + etcd_util_create_service_url_mock ): function_id = common.generate_unicode_uuid() runtime_id = common.generate_unicode_uuid() self.orchestrator.scaleup_function.return_value = (['worker'], 'url') self.default_engine.scaleup_function( - mock.Mock(), function_id, runtime_id) + mock.Mock(), function_id, 0, runtime_id) self.orchestrator.scaleup_function.assert_called_once_with( - function_id, identifier=runtime_id, count=1) + function_id, 0, identifier=runtime_id, count=1) etcd_util_create_worker_mock.assert_called_once_with( - function_id, 'worker') + function_id, 'worker', version=0) etcd_util_create_service_url_mock.assert_called_once_with( - function_id, 'url') + function_id, 'url', version=0) @mock.patch('qinling.utils.etcd_util.create_service_url') @mock.patch('qinling.utils.etcd_util.create_worker') def test_scaleup_function_multiple_workers( - self, - etcd_util_create_worker_mock, - etcd_util_create_service_url_mock + self, + etcd_util_create_worker_mock, + etcd_util_create_service_url_mock ): function_id = common.generate_unicode_uuid() runtime_id = common.generate_unicode_uuid() @@ -466,22 +465,24 @@ class TestDefaultEngine(base.DbTestCase): ['worker0', 'worker1'], 'url') self.default_engine.scaleup_function( - mock.Mock(), function_id, runtime_id, count=2) + mock.Mock(), function_id, 0, runtime_id, count=2 + ) self.orchestrator.scaleup_function.assert_called_once_with( - function_id, identifier=runtime_id, count=2) + function_id, 0, identifier=runtime_id, count=2 + ) # Two new workers are created. - expected = [mock.call(function_id, 'worker0'), - mock.call(function_id, 'worker1')] + expected = [mock.call(function_id, 'worker0', version=0), + mock.call(function_id, 'worker1', version=0)] etcd_util_create_worker_mock.assert_has_calls(expected) - self.assertEqual(2, etcd_util_create_worker_mock.call_count) etcd_util_create_service_url_mock.assert_called_once_with( - function_id, 'url') + function_id, 'url', version=0 + ) @mock.patch('qinling.utils.etcd_util.delete_worker') @mock.patch('qinling.utils.etcd_util.get_workers') def test_scaledown_function( - self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock + self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock ): function_id = common.generate_unicode_uuid() etcd_util_get_workers_mock.return_value = [ @@ -491,33 +492,33 @@ class TestDefaultEngine(base.DbTestCase): self.default_engine.scaledown_function(mock.Mock(), function_id) etcd_util_get_workers_mock.assert_called_once_with( - function_id) + function_id, 0) self.orchestrator.delete_worker.assert_called_once_with('worker_0') etcd_util_delete_workers_mock.assert_called_once_with( - function_id, 'worker_0') + function_id, 'worker_0', version=0 + ) @mock.patch('qinling.utils.etcd_util.delete_worker') @mock.patch('qinling.utils.etcd_util.get_workers') def test_scaledown_function_multiple_workers( - self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock + self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock ): function_id = common.generate_unicode_uuid() etcd_util_get_workers_mock.return_value = [ 'worker_%d' % i for i in range(4) ] - self.default_engine.scaledown_function( - mock.Mock(), function_id, count=2) + self.default_engine.scaledown_function(mock.Mock(), function_id, + count=2) - etcd_util_get_workers_mock.assert_called_once_with( - function_id) + etcd_util_get_workers_mock.assert_called_once_with(function_id, 0) # First two workers will be deleted. expected = [mock.call('worker_0'), mock.call('worker_1')] self.orchestrator.delete_worker.assert_has_calls(expected) self.assertEqual(2, self.orchestrator.delete_worker.call_count) expected = [ - mock.call(function_id, 'worker_0'), - mock.call(function_id, 'worker_1') + mock.call(function_id, 'worker_0', version=0), + mock.call(function_id, 'worker_1', version=0) ] etcd_util_delete_workers_mock.assert_has_calls(expected) self.assertEqual(2, etcd_util_delete_workers_mock.call_count) @@ -525,7 +526,7 @@ class TestDefaultEngine(base.DbTestCase): @mock.patch('qinling.utils.etcd_util.delete_worker') @mock.patch('qinling.utils.etcd_util.get_workers') def test_scaledown_function_leaving_one_worker( - self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock + self, etcd_util_get_workers_mock, etcd_util_delete_workers_mock ): function_id = common.generate_unicode_uuid() etcd_util_get_workers_mock.return_value = [ @@ -535,8 +536,7 @@ class TestDefaultEngine(base.DbTestCase): self.default_engine.scaledown_function( mock.Mock(), function_id, count=5) # count > len(workers) - etcd_util_get_workers_mock.assert_called_once_with( - function_id) + etcd_util_get_workers_mock.assert_called_once_with(function_id, 0) # Only the first three workers will be deleted expected = [ mock.call('worker_0'), mock.call('worker_1'), mock.call('worker_2') @@ -544,9 +544,9 @@ class TestDefaultEngine(base.DbTestCase): self.orchestrator.delete_worker.assert_has_calls(expected) self.assertEqual(3, self.orchestrator.delete_worker.call_count) expected = [ - mock.call(function_id, 'worker_0'), - mock.call(function_id, 'worker_1'), - mock.call(function_id, 'worker_2') + mock.call(function_id, 'worker_0', version=0), + mock.call(function_id, 'worker_1', version=0), + mock.call(function_id, 'worker_2', version=0) ] etcd_util_delete_workers_mock.assert_has_calls(expected) self.assertEqual(3, etcd_util_delete_workers_mock.call_count) diff --git a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py index 666221fa..62b72355 100644 --- a/qinling/tests/unit/orchestrator/kubernetes/test_manager.py +++ b/qinling/tests/unit/orchestrator/kubernetes/test_manager.py @@ -30,10 +30,10 @@ SERVICE_ADDRESS_EXTERNAL = '1.2.3.4' SERVICE_ADDRESS_INTERNAL = '127.0.0.1' -class TestKubernetesManager(base.BaseTest): +class TestKubernetesManager(base.DbTestCase): def setUp(self): super(TestKubernetesManager, self).setUp() - CONF.register_opts(config.kubernetes_opts, config.KUBERNETES_GROUP) + self.conf = CONF self.qinling_endpoint = 'http://127.0.0.1:7070' self.k8s_v1_api = mock.Mock() @@ -48,13 +48,17 @@ class TestKubernetesManager(base.BaseTest): prefix='TestKubernetesManager') self.override_config('namespace', self.fake_namespace, config.KUBERNETES_GROUP) + + self.override_config('auth_enable', False, group='pecan') + namespace = mock.Mock() namespace.metadata.name = self.fake_namespace namespaces = mock.Mock() namespaces.items = [namespace] self.k8s_v1_api.list_namespace.return_value = namespaces - self.manager = k8s_manager.KubernetesManager( - self.conf, self.qinling_endpoint) + + self.manager = k8s_manager.KubernetesManager(self.conf, + self.qinling_endpoint) def _create_service(self): port = mock.Mock() @@ -305,7 +309,7 @@ class TestKubernetesManager(base.BaseTest): rollback.assert_called_once_with( fake_deployment_name, self.fake_namespace, rollback_body) - def test_prepare_execution(self): + def test_prepare_execution_no_image(self): pod = mock.Mock() pod.metadata.name = self.rand_name('pod', prefix='TestKubernetesManager') @@ -323,7 +327,7 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, image=None, identifier=runtime_id, + function_id, 0, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) self.assertEqual(pod.metadata.name, pod_names) @@ -334,10 +338,15 @@ class TestKubernetesManager(base.BaseTest): # in _choose_available_pods self.k8s_v1_api.list_namespaced_pod.assert_called_once_with( self.fake_namespace, - label_selector='function_id=%s' % function_id) + label_selector='function_id=%s,function_version=0' % (function_id) + ) # in _prepare_pod -> _update_pod_label - pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id} + pod_labels = { + 'pod1_key1': 'pod1_value1', + 'function_id': function_id, + 'function_version': '0' + } body = {'metadata': {'labels': pod_labels}} self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with( pod.metadata.name, self.fake_namespace, body) @@ -345,8 +354,9 @@ class TestKubernetesManager(base.BaseTest): # in _prepare_pod service_body = self.manager.service_template.render( { - 'service_name': 'service-%s' % function_id, + 'service_name': 'service-%s-0' % function_id, 'labels': {'function_id': function_id, + 'function_version': '0', 'runtime_id': runtime_id}, 'selector': pod_labels } @@ -357,13 +367,12 @@ class TestKubernetesManager(base.BaseTest): def test_prepare_execution_with_image(self): function_id = common.generate_unicode_uuid() image = self.rand_name('image', prefix='TestKubernetesManager') - identifier = ('%s-%s' % ( - common.generate_unicode_uuid(dashed=False), - function_id) + identifier = ('%s-%s' % + (common.generate_unicode_uuid(dashed=False), function_id) )[:63] pod_name, url = self.manager.prepare_execution( - function_id, image=image, identifier=identifier) + function_id, 0, image=image, identifier=identifier) self.assertEqual(identifier, pod_name) self.assertIsNone(url) @@ -390,7 +399,7 @@ class TestKubernetesManager(base.BaseTest): fake_input = {'__function_input': 'input_item1 input_item2'} pod_name, url = self.manager.prepare_execution( - function_id, image=image, identifier=identifier, + function_id, 0, image=image, identifier=identifier, input=fake_input) # in _create_pod @@ -415,7 +424,7 @@ class TestKubernetesManager(base.BaseTest): fake_input = '["input_item3", "input_item4"]' pod_name, url = self.manager.prepare_execution( - function_id, image=image, identifier=identifier, + function_id, 0, image=image, identifier=identifier, input=fake_input) # in _create_pod @@ -430,7 +439,7 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.create_namespaced_pod.assert_called_once_with( self.fake_namespace, body=yaml.safe_load(pod_body)) - def test_prepare_execution_no_worker_available(self): + def test_prepare_execution_not_image_no_worker_available(self): ret_pods = mock.Mock() ret_pods.items = [] self.k8s_v1_api.list_namespaced_pod.return_value = ret_pods @@ -442,14 +451,19 @@ class TestKubernetesManager(base.BaseTest): exc.OrchestratorException, "^Execution preparation failed\.$", self.manager.prepare_execution, - function_id, image=None, identifier=runtime_id, labels=labels) + function_id, 0, image=None, identifier=runtime_id, labels=labels) # in _choose_available_pods list_calls = [ - mock.call(self.fake_namespace, - label_selector='function_id=%s' % function_id), - mock.call(self.fake_namespace, - label_selector='!function_id,runtime_id=%s' % runtime_id) + mock.call( + self.fake_namespace, + label_selector=('function_id=%s,function_version=0' % + function_id) + ), + mock.call( + self.fake_namespace, + label_selector='!function_id,runtime_id=%s' % runtime_id + ) ] self.k8s_v1_api.list_namespaced_pod.assert_has_calls(list_calls) self.assertEqual(2, self.k8s_v1_api.list_namespaced_pod.call_count) @@ -475,14 +489,14 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, image=None, identifier=runtime_id, + function_id, 0, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) # in _prepare_pod self.k8s_v1_api.read_namespaced_service.assert_called_once_with( - 'service-%s' % function_id, self.fake_namespace) + 'service-%s-0' % function_id, self.fake_namespace) - def test_prepare_execution_pod_preparation_failed(self): + def test_prepare_execution_create_service_failed(self): pod = mock.Mock() pod.metadata.name = self.rand_name('pod', prefix='TestKubernetesManager') @@ -503,12 +517,18 @@ class TestKubernetesManager(base.BaseTest): exc.OrchestratorException, '^Execution preparation failed\.$', self.manager.prepare_execution, - function_id, image=None, identifier=runtime_id, + function_id, 0, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) delete_function_mock.assert_called_once_with( function_id, - {'runtime_id': runtime_id, 'function_id': function_id}) + 0, + { + 'runtime_id': runtime_id, + 'function_id': function_id, + 'function_version': '0' + } + ) def test_prepare_execution_service_internal_ip(self): pod = mock.Mock() @@ -528,7 +548,7 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.prepare_execution( - function_id, image=None, identifier=runtime_id, + function_id, 0, image=None, identifier=runtime_id, labels={'runtime_id': runtime_id}) self.assertEqual(pod.metadata.name, pod_names) @@ -536,7 +556,7 @@ class TestKubernetesManager(base.BaseTest): 'http://%s:%s' % (SERVICE_ADDRESS_INTERNAL, SERVICE_PORT), service_url) - def test_run_execution(self): + def test_run_execution_image_type_function(self): pod = mock.Mock() pod.status.phase = 'Succeeded' self.k8s_v1_api.read_namespaced_pod.return_value = pod @@ -545,7 +565,8 @@ class TestKubernetesManager(base.BaseTest): execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() - result, output = self.manager.run_execution(execution_id, function_id) + result, output = self.manager.run_execution(execution_id, function_id, + 0) self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( None, self.fake_namespace) @@ -554,29 +575,33 @@ class TestKubernetesManager(base.BaseTest): self.assertTrue(result) self.assertEqual(fake_output, output) - @mock.patch('qinling.engine.utils.get_request_data') @mock.patch('qinling.engine.utils.url_request') - def test_run_execution_with_service_url(self, url_request_mock, - get_request_data_mock): - fake_output = 'fake output' - url_request_mock.return_value = (True, 'fake output') - fake_data = 'some data' - get_request_data_mock.return_value = fake_data + def test_run_execution_version_0(self, mock_request): + mock_request.return_value = (True, 'fake output') execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() result, output = self.manager.run_execution( - execution_id, function_id, service_url='FAKE_URL') + execution_id, function_id, 0, service_url='FAKE_URL' + ) - get_request_data_mock.assert_called_once_with( - self.conf, function_id, execution_id, None, 'main.main', None, - self.qinling_endpoint) - url_request_mock.assert_called_once_with( - self.manager.session, 'FAKE_URL/execute', body=fake_data) - self.assertTrue(result) - self.assertEqual(fake_output, output) + download_url = ('http://127.0.0.1:7070/v1/functions/%s?download=true' + % function_id) + data = { + 'execution_id': execution_id, + 'input': None, + 'function_id': function_id, + 'function_version': 0, + 'entry': 'main.main', + 'download_url': download_url, + 'request_id': self.ctx.request_id, + } - def test_run_execution_retry(self): + mock_request.assert_called_once_with( + self.manager.session, 'FAKE_URL/execute', body=data + ) + + def test_run_execution_no_service_url_retry(self): pod1 = mock.Mock() pod1.status.phase = '' pod2 = mock.Mock() @@ -587,7 +612,8 @@ class TestKubernetesManager(base.BaseTest): execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() - result, output = self.manager.run_execution(execution_id, function_id) + result, output = self.manager.run_execution(execution_id, function_id, + 0) self.assertEqual(2, self.k8s_v1_api.read_namespaced_pod.call_count) self.k8s_v1_api.read_namespaced_pod_log.assert_called_once_with( @@ -595,12 +621,13 @@ class TestKubernetesManager(base.BaseTest): self.assertTrue(result) self.assertEqual(fake_output, output) - def test_run_execution_failed(self): + def test_run_execution_no_service_url_read_pod_exception(self): self.k8s_v1_api.read_namespaced_pod.side_effect = RuntimeError execution_id = common.generate_unicode_uuid() function_id = common.generate_unicode_uuid() - result, output = self.manager.run_execution(execution_id, function_id) + result, output = self.manager.run_execution(execution_id, function_id, + 0) self.k8s_v1_api.read_namespaced_pod.assert_called_once_with( None, self.fake_namespace) @@ -621,10 +648,19 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.list_namespaced_service.return_value = services function_id = common.generate_unicode_uuid() - self.manager.delete_function(function_id) + self.manager.delete_function(function_id, 0) + + args, kwargs = self.k8s_v1_api.list_namespaced_service.call_args + self.assertIn(self.fake_namespace, args) + self.assertIn( + "function_id=%s" % function_id, + kwargs.get("label_selector") + ) + self.assertIn( + "function_version=0", + kwargs.get("label_selector") + ) - self.k8s_v1_api.list_namespaced_service.assert_called_once_with( - self.fake_namespace, label_selector='function_id=%s' % function_id) delete_service_calls = [ mock.call(svc1_name, self.fake_namespace), mock.call(svc2_name, self.fake_namespace) @@ -632,10 +668,20 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.delete_namespaced_service.assert_has_calls( delete_service_calls) self.assertEqual( - 2, self.k8s_v1_api.delete_namespaced_service.call_count) - delete_pod = self.k8s_v1_api.delete_collection_namespaced_pod - delete_pod.assert_called_once_with( - self.fake_namespace, label_selector='function_id=%s' % function_id) + 2, self.k8s_v1_api.delete_namespaced_service.call_count + ) + + args, kwargs = self.k8s_v1_api.delete_collection_namespaced_pod. \ + call_args + self.assertIn(self.fake_namespace, args) + self.assertIn( + "function_id=%s" % function_id, + kwargs.get("label_selector") + ) + self.assertIn( + "function_version=0", + kwargs.get("label_selector") + ) def test_delete_function_with_labels(self): services = mock.Mock() @@ -645,7 +691,7 @@ class TestKubernetesManager(base.BaseTest): self.k8s_v1_api.list_namespaced_service.return_value = services function_id = common.generate_unicode_uuid() - self.manager.delete_function(function_id, labels=labels) + self.manager.delete_function(function_id, 0, labels=labels) self.k8s_v1_api.list_namespaced_service.assert_called_once_with( self.fake_namespace, label_selector=selector) @@ -672,7 +718,8 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.scaleup_function( - function_id, identifier=runtime_id) + function_id, 0, identifier=runtime_id + ) self.assertEqual([pod.metadata.name], pod_names) self.assertEqual( @@ -685,7 +732,11 @@ class TestKubernetesManager(base.BaseTest): label_selector='!function_id,runtime_id=%s' % runtime_id) # in _prepare_pod -> _update_pod_label - pod_labels = {'pod1_key1': 'pod1_value1', 'function_id': function_id} + pod_labels = { + 'pod1_key1': 'pod1_value1', + 'function_id': function_id, + 'function_version': '0' + } body = {'metadata': {'labels': pod_labels}} self.k8s_v1_api.patch_namespaced_pod.assert_called_once_with( pod.metadata.name, self.fake_namespace, body) @@ -693,8 +744,9 @@ class TestKubernetesManager(base.BaseTest): # in _prepare_pod service_body = self.manager.service_template.render( { - 'service_name': 'service-%s' % function_id, + 'service_name': 'service-%s-0' % function_id, 'labels': {'function_id': function_id, + 'function_version': 0, 'runtime_id': runtime_id}, 'selector': pod_labels } @@ -713,7 +765,7 @@ class TestKubernetesManager(base.BaseTest): exc.OrchestratorException, "^Not enough workers available\.$", self.manager.scaleup_function, - function_id, identifier=runtime_id, count=2) + function_id, 0, identifier=runtime_id, count=2) def test_scaleup_function_service_already_exists(self): pod = mock.Mock() @@ -736,11 +788,11 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.scaleup_function( - function_id, identifier=runtime_id) + function_id, 0, identifier=runtime_id) # in _prepare_pod self.k8s_v1_api.read_namespaced_service.assert_called_once_with( - 'service-%s' % function_id, self.fake_namespace) + 'service-%s-0' % function_id, self.fake_namespace) def test_scaleup_function_service_create_failed(self): pod = mock.Mock() @@ -759,7 +811,7 @@ class TestKubernetesManager(base.BaseTest): self.assertRaises( RuntimeError, self.manager.scaleup_function, - function_id, identifier=runtime_id) + function_id, 0, identifier=runtime_id) def test_scaleup_function_service_internal_ip(self): pod = mock.Mock() @@ -779,7 +831,7 @@ class TestKubernetesManager(base.BaseTest): function_id = common.generate_unicode_uuid() pod_names, service_url = self.manager.scaleup_function( - function_id, identifier=runtime_id) + function_id, 0, identifier=runtime_id) self.assertEqual([pod.metadata.name], pod_names) self.assertEqual( diff --git a/qinling/tests/unit/services/test_periodics.py b/qinling/tests/unit/services/test_periodics.py index ee1893ba..bda10ada 100644 --- a/qinling/tests/unit/services/test_periodics.py +++ b/qinling/tests/unit/services/test_periodics.py @@ -20,7 +20,6 @@ from oslo_config import cfg from qinling import context from qinling.db import api as db_api -from qinling.engine import default_engine from qinling.services import periodics from qinling import status from qinling.tests.unit import base @@ -35,24 +34,50 @@ class TestPeriodics(base.DbTestCase): @mock.patch('qinling.utils.etcd_util.delete_function') @mock.patch('qinling.utils.etcd_util.get_service_url') - def test_function_service_expiration_handler(self, mock_etcd_url, - mock_etcd_delete): + def test_handle_function_service_no_function_version(self, mock_etcd_url, + mock_etcd_delete): db_func = self.create_function() function_id = db_func.id # Update function to simulate function execution db_api.update_function(function_id, {'count': 1}) time.sleep(1.5) - mock_k8s = mock.Mock() mock_etcd_url.return_value = 'http://localhost:37718' self.override_config('function_service_expiration', 1, 'engine') - engine = default_engine.DefaultEngine(mock_k8s, CONF.qinling_endpoint) - periodics.handle_function_service_expiration(self.ctx, engine) + mock_engine = mock.Mock() - self.assertEqual(1, mock_k8s.delete_function.call_count) - args, kwargs = mock_k8s.delete_function.call_args + periodics.handle_function_service_expiration(self.ctx, mock_engine) + + self.assertEqual(1, mock_engine.delete_function.call_count) + args, _ = mock_engine.delete_function.call_args self.assertIn(function_id, args) - mock_etcd_delete.assert_called_once_with(function_id) + self.assertIn(0, args) + + mock_etcd_delete.assert_called_once_with(function_id, 0) + + @mock.patch('qinling.utils.etcd_util.delete_function') + @mock.patch('qinling.utils.etcd_util.get_service_url') + def test_handle_function_service_with_function_versions(self, mock_srv_url, + mock_etcd_delete): + db_func = self.create_function() + function_id = db_func.id + db_api.increase_function_version(function_id, 0, + description="new version") + db_api.update_function_version(function_id, 1, count=1) + time.sleep(1.5) + + self.override_config('function_service_expiration', 1, 'engine') + mock_srv_url.return_value = 'http://localhost:37718' + mock_engine = mock.Mock() + + periodics.handle_function_service_expiration(self.ctx, mock_engine) + + self.assertEqual(1, mock_engine.delete_function.call_count) + args, _ = mock_engine.delete_function.call_args + self.assertIn(function_id, args) + self.assertIn(1, args) + + mock_etcd_delete.assert_called_once_with(function_id, 1) @mock.patch('qinling.utils.jobs.get_next_execution_time') def test_job_handler(self, mock_get_next): @@ -72,6 +97,7 @@ class TestPeriodics(base.DbTestCase): e_client = mock.Mock() mock_get_next.return_value = now + timedelta(seconds=1) + periodics.handle_job(e_client) context.set_ctx(self.ctx) diff --git a/qinling/utils/etcd_util.py b/qinling/utils/etcd_util.py index bfa4aa4d..866df650 100644 --- a/qinling/utils/etcd_util.py +++ b/qinling/utils/etcd_util.py @@ -29,9 +29,10 @@ def get_client(conf=None): return CLIENT -def get_worker_lock(): +def get_worker_lock(function_id, version=0): client = get_client() - return client.lock(id='function_worker') + lock_id = "function_worker_%s_%s" % (function_id, version) + return client.lock(id=lock_id) def get_function_version_lock(function_id): @@ -40,7 +41,7 @@ def get_function_version_lock(function_id): return client.lock(id=lock_id) -def create_worker(function_id, worker): +def create_worker(function_id, worker, version=0): """Create the worker info in etcd. The worker parameter is assumed to be unique. @@ -50,34 +51,34 @@ def create_worker(function_id, worker): # is the name of the pod so it is unique. client = get_client() client.create( - '%s/worker_%s' % (function_id, worker), + '%s_%s/worker_%s' % (function_id, version, worker), worker ) -def delete_worker(function_id, worker): +def delete_worker(function_id, worker, version=0): client = get_client() - client.delete('%s/worker_%s' % (function_id, worker)) + client.delete('%s_%s/worker_%s' % (function_id, version, worker)) -def get_workers(function_id, conf=None): - client = get_client(conf) - values = client.get_prefix('%s/worker' % function_id) +def get_workers(function_id, version=0): + client = get_client() + values = client.get_prefix("%s_%s/worker" % (function_id, version)) workers = [w[0] for w in values] return workers -def delete_function(function_id): +def delete_function(function_id, version=0): client = get_client() - client.delete_prefix(function_id) + client.delete_prefix("%s_%s" % (function_id, version)) -def create_service_url(function_id, url): +def create_service_url(function_id, url, version=0): client = get_client() - client.create('%s/service_url' % function_id, url) + client.create('%s_%s/service_url' % (function_id, version), url) -def get_service_url(function_id): +def get_service_url(function_id, version=0): client = get_client() - values = client.get('%s/service_url' % function_id) + values = client.get('%s_%s/service_url' % (function_id, version)) return None if not values else values[0] diff --git a/qinling/utils/executions.py b/qinling/utils/executions.py index d0d33270..11be6c77 100644 --- a/qinling/utils/executions.py +++ b/qinling/utils/executions.py @@ -19,49 +19,86 @@ from qinling.db import api as db_api from qinling.db.sqlalchemy import models from qinling import exceptions as exc from qinling import status +from qinling.utils import constants LOG = logging.getLogger(__name__) -def _update_function_db(function_id): - with db_api.transaction(): - # NOTE(kong): Store function info in cache? - func_db = db_api.get_function(function_id) - runtime_db = func_db.runtime - if runtime_db and runtime_db.status != status.AVAILABLE: - raise exc.RuntimeNotAvailableException( - 'Runtime %s is not available.' % func_db.runtime_id - ) - +def _update_function_db(function_id, pre_count): # Function update is done using UPDATE ... FROM ... WHERE # non-locking clause. - while func_db: - count = func_db.count + while True: modified = db_api.conditional_update( models.Function, { - 'count': count + 1, + 'count': pre_count + 1, }, { 'id': function_id, - 'count': count + 'count': pre_count }, insecure=True, ) if not modified: LOG.warning("Retrying to update function count.") - func_db = db_api.get_function(function_id) + pre_count += 1 continue else: break - return func_db.runtime_id + +def _update_function_version_db(version_id, pre_count): + # Update is done using UPDATE ... FROM ... WHERE non-locking clause. + while True: + modified = db_api.conditional_update( + models.FunctionVersion, + { + 'count': pre_count + 1, + }, + { + 'id': version_id, + 'count': pre_count + }, + insecure=True, + ) + if not modified: + LOG.warning("Retrying to update function version count.") + pre_count += 1 + continue + else: + break def create_execution(engine_client, params): function_id = params['function_id'] is_sync = params.get('sync', True) input = params.get('input') + version = params.get('function_version', 0) + + with db_api.transaction(): + func_db = db_api.get_function(function_id) + runtime_id = func_db.runtime_id + + runtime_db = func_db.runtime + if runtime_db and runtime_db.status != status.AVAILABLE: + raise exc.RuntimeNotAvailableException( + 'Runtime %s is not available.' % func_db.runtime_id + ) + + if version > 0: + if func_db.code['source'] != constants.PACKAGE_FUNCTION: + raise exc.InputException( + "Can not specify version for %s type function." % + constants.PACKAGE_FUNCTION + ) + + # update version count + version_db = db_api.get_function_version(function_id, version) + pre_version_count = version_db.count + _update_function_version_db(version_db.id, pre_version_count) + else: + pre_count = func_db.count + _update_function_db(function_id, pre_count) # input in params should be a string. if input: @@ -70,14 +107,12 @@ def create_execution(engine_client, params): except ValueError: params['input'] = {'__function_input': input} - runtime_id = _update_function_db(function_id) - params.update({'status': status.RUNNING}) db_model = db_api.create_execution(params) try: engine_client.create_execution( - db_model.id, function_id, runtime_id, + db_model.id, function_id, version, runtime_id, input=params.get('input'), is_sync=is_sync ) except exc.QinlingException: