From 5fe9c5b444ace8989b6b53c0dc451f2938f5bd22 Mon Sep 17 00:00:00 2001 From: southeast02 Date: Wed, 15 Mar 2017 10:21:12 +0800 Subject: [PATCH] Implement asynchronous job Admin API 1. What is the problem When XJob receives a job message from service, it will register the job in database and handle it asynchronously. Tricircle needs to provide API for admin to query the job status and trigger failed job if something happens unexpectedly. The detailed work for XJob Admin APIs is covered in the document[1]. 2. What is the solution for the problem We implement XJob management APIs, they are listed as following: *(1) create a job *(2) list single job info *(3) list all jobs *(4) list jobs with filters *(5) list all jobs' schemas *(6) delete a job *(7) redo a job 3. What the features need to be implemented to the Tricircle to realize the solution Implement above job operations. [1] https://review.openstack.org/#/c/438304/ Change-Id: Ibd90e539c9360a0ad7a01eeef185c0dbbee9bb4e --- specs/pike/async_job_management.rst | 75 +- tricircle/api/controllers/job.py | 371 +++++++++ tricircle/api/controllers/root.py | 4 +- tricircle/common/constants.py | 57 +- tricircle/common/policy.py | 23 + tricircle/common/xrpcapi.py | 56 +- tricircle/db/api.py | 65 +- .../006_add_project_id_to_async_jobs.py | 30 + .../007_add_project_id_to_async_job_logs.py | 30 + tricircle/db/models.py | 8 +- tricircle/network/central_plugin.py | 45 +- .../functional/api/controllers/test_job.py | 767 ++++++++++++++++++ .../tests/unit/api/controllers/test_job.py | 616 ++++++++++++++ .../tests/unit/network/test_central_plugin.py | 48 +- tricircle/tests/unit/xjob/test_xmanager.py | 93 ++- tricircle/xjob/xmanager.py | 92 ++- 16 files changed, 2227 insertions(+), 153 deletions(-) create mode 100644 tricircle/api/controllers/job.py create mode 100644 tricircle/db/migrate_repo/versions/006_add_project_id_to_async_jobs.py create mode 100644 tricircle/db/migrate_repo/versions/007_add_project_id_to_async_job_logs.py create mode 100644 tricircle/tests/functional/api/controllers/test_job.py create mode 100644 tricircle/tests/unit/api/controllers/test_job.py diff --git a/specs/pike/async_job_management.rst b/specs/pike/async_job_management.rst index 0db5c026..896211ed 100644 --- a/specs/pike/async_job_management.rst +++ b/specs/pike/async_job_management.rst @@ -59,10 +59,77 @@ listed as following: Normal Response Code: 202 -* Get job(s) +* Get a job + + Retrieve a job from the Tricircle database. + + The detailed information of the job will be shown. Otherwise + it will return "Resource not found" exception. + + List Request:: + + GET /v1.0/jobs/3f4ecf30-0213-4f1f-9cb0-0233bcedb767 + + Response: + { + "job": { + "id": "3f4ecf30-0213-4f1f-9cb0-0233bcedb767", + "project_id": "d01246bc5792477d9062a76332b7514a", + "type": "port_delete", + "timestamp": "2017-03-03 11:05:36", + "status": "NEW", + "resource": { + "pod_id": "0eb59465-5132-4f57-af01-a9e306158b86", + "port_id": "8498b903-9e18-4265-8d62-3c12e0ce4314" + } + } + } + + Normal Response Code: 200 + +* Get all jobs + + Retrieve all of the jobs from the Tricircle database. + + List Request:: + + GET /v1.0/jobs/detail + + Response: + { + "jobs": + [ + { + "id": "3f4ecf30-0213-4f1f-9cb0-0233bcedb767", + "project_id": "d01246bc5792477d9062a76332b7514a", + "type": "port_delete", + "timestamp": "2017-03-03 11:05:36", + "status": "NEW", + "resource": { + "pod_id": "0eb59465-5132-4f57-af01-a9e306158b86", + "port_id": "8498b903-9e18-4265-8d62-3c12e0ce4314" + } + }, + { + "id": "b01fe514-5211-4758-bbd1-9f32141a7ac2", + "project_id": "d01246bc5792477d9062a76332b7514a", + "type": "seg_rule_setup", + "timestamp": "2017-03-01 17:14:44", + "status": "FAIL", + "resource": { + "project_id": "d01246bc5792477d9062a76332b7514a" + } + } + ] + } + + Normal Response Code: 200 + +* Get all jobs with filter(s) Retrieve job(s) from the Tricircle database. We can filter them by - project ID, job type and job status. + project ID, job type and job status. If no filter is provided, + GET /v1.0/jobs will return all jobs. The response contains a list of jobs. Using filters, a subset of jobs will be returned. @@ -117,7 +184,7 @@ listed as following: "schemas": [ { - "type": "router", + "type": "configure_route", "resource": ["router_id"] }, { @@ -153,7 +220,7 @@ listed as following: * Delete a job Delete a failed or duplicated job from the Tricircle database. - Nothing will be returned for this request if succeeds, otherwise an + A pair of curly braces will be returned if succeeds, otherwise an exception will be thrown. What's more, we can list all jobs to verify whether it is deleted successfully or not. diff --git a/tricircle/api/controllers/job.py b/tricircle/api/controllers/job.py new file mode 100644 index 00000000..a2263195 --- /dev/null +++ b/tricircle/api/controllers/job.py @@ -0,0 +1,371 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pecan +from pecan import expose +from pecan import rest +import six + +from oslo_log import log as logging +from oslo_utils import timeutils + +from tricircle.common import constants +import tricircle.common.context as t_context +import tricircle.common.exceptions as t_exc +from tricircle.common.i18n import _ +from tricircle.common import policy +from tricircle.common import utils +from tricircle.common import xrpcapi +from tricircle.db import api as db_api + +LOG = logging.getLogger(__name__) + + +class AsyncJobController(rest.RestController): + # with AsyncJobController, admin can create, show, delete and + # redo asynchronous jobs + + def __init__(self): + self.xjob_handler = xrpcapi.XJobAPI() + + @expose(generic=True, template='json') + def post(self, **kw): + context = t_context.extract_context_from_environ() + job_resource_map = constants.job_resource_map + + if not policy.enforce(context, policy.ADMIN_API_JOB_CREATE): + return utils.format_api_error( + 403, _("Unauthorized to create a job")) + + if 'job' not in kw: + return utils.format_api_error( + 400, _("Request body not found")) + + job = kw['job'] + + for field in ('type', 'project_id'): + value = job.get(field) + if value is None: + return utils.format_api_error( + 400, _("%(field)s isn't provided in request body") % { + 'field': field}) + elif len(value.strip()) == 0: + return utils.format_api_error( + 400, _("%(field)s can't be empty") % {'field': field}) + + if job['type'] not in job_resource_map.keys(): + return utils.format_api_error( + 400, _('There is no such job type: %(job_type)s') % { + 'job_type': job['type']}) + + job_type = job['type'] + project_id = job['project_id'] + + if 'resource' not in job: + return utils.format_api_error( + 400, _('Failed to create job, because the resource is not' + ' specified')) + + # verify that all given resources are exactly needed + request_fields = set(job['resource'].keys()) + require_fields = set([resource_id + for resource_type, resource_id in + job_resource_map[job_type]]) + missing_fields = require_fields - request_fields + redundant_fields = request_fields - require_fields + + if missing_fields: + return utils.format_api_error( + 400, _('Some required fields are not specified:' + ' %(field)s') % {'field': missing_fields}) + if redundant_fields: + return utils.format_api_error( + 400, _('Some fields are redundant: %(field)s') % { + 'field': redundant_fields}) + + # validate whether the project id is legal + resource_type_1, resource_id_1 = ( + constants.job_primary_resource_map[job_type]) + if resource_type_1 is not None: + filter = [{'key': 'project_id', 'comparator': 'eq', + 'value': project_id}, + {'key': 'resource_type', 'comparator': 'eq', + 'value': resource_type_1}, + {'key': 'top_id', 'comparator': 'eq', + 'value': job['resource'][resource_id_1]}] + + routings = db_api.list_resource_routings(context, filter) + if not routings: + msg = (_("%(resource)s %(resource_id)s doesn't belong to the" + " project %(project_id)s") % + {'resource': resource_type_1, + 'resource_id': job['resource'][resource_id_1], + 'project_id': project_id}) + return utils.format_api_error(400, msg) + + # if job_type = seg_rule_setup, we should ensure the project id + # is equal to the one from resource. + if job_type == constants.JT_SEG_RULE_SETUP: + if job['project_id'] != job['resource']['project_id']: + msg = (_("Specified project_id %(project_id_1)s and resource's" + " project_id %(project_id_2)s are different") % + {'project_id_1': job['project_id'], + 'project_id_2': job['resource']['project_id']}) + return utils.format_api_error(400, msg) + + # combine uuid into target resource id + resource_id = '#'.join([job['resource'][resource_id] + for resource_type, resource_id + in job_resource_map[job_type]]) + + try: + # create a job and put it into execution immediately + self.xjob_handler.invoke_method(context, project_id, + constants.job_handles[job_type], + job_type, resource_id) + except Exception as e: + LOG.exception('Failed to create job: ' + '%(exception)s ', {'exception': e}) + return utils.format_api_error( + 500, _('Failed to create a job')) + + new_job = db_api.get_latest_job(context, constants.JS_New, job_type, + resource_id) + return {'job': self._get_more_readable_job(new_job)} + + @expose(generic=True, template='json') + def get_one(self, id, **kwargs): + """the return value may vary according to the value of id + + :param id: 1) if id = 'schemas', return job schemas + 2) if id = 'detail', return all jobs + 3) if id = $job_id, return detailed single job info + :return: return value is decided by id parameter + """ + context = t_context.extract_context_from_environ() + job_resource_map = constants.job_resource_map + + if not policy.enforce(context, policy.ADMIN_API_JOB_SCHEMA_LIST): + return utils.format_api_error( + 403, _('Unauthorized to show job information')) + + if id == 'schemas': + job_schemas = [] + for job_type in job_resource_map.keys(): + job = {} + resource = [] + for resource_type, resource_id in job_resource_map[job_type]: + resource.append(resource_id) + + job['resource'] = resource + job['type'] = job_type + job_schemas.append(job) + + return {'schemas': job_schemas} + + if id == 'detail': + return self.get_all(**kwargs) + + try: + job = db_api.get_job(context, id) + return {'job': self._get_more_readable_job(job)} + except Exception: + try: + job = db_api.get_job_from_log(context, id) + return {'job': self._get_more_readable_job(job)} + except t_exc.ResourceNotFound: + return utils.format_api_error( + 404, _('Resource not found')) + + @expose(generic=True, template='json') + def get_all(self, **kwargs): + """Get all the jobs. Using filters, only get a subset of jobs. + + :param kwargs: job filters + :return: a list of jobs + """ + context = t_context.extract_context_from_environ() + + if not policy.enforce(context, policy.ADMIN_API_JOB_LIST): + return utils.format_api_error( + 403, _('Unauthorized to show all jobs')) + + is_valid_filter, filters = self._get_filters(kwargs) + + if not is_valid_filter: + msg = (_('Unsupported filter type: %(filter)s') % + {'filter': [filter_name for filter_name in filters]}) + return utils.format_api_error(400, msg) + + filters = [{'key': key, + 'comparator': 'eq', + 'value': value} for key, value in six.iteritems(filters)] + + try: + jobs_in_job_table = db_api.list_jobs(context, filters) + jobs_in_job_log_table = db_api.list_jobs_from_log(context, filters) + jobs = jobs_in_job_table + jobs_in_job_log_table + return {'jobs': [self._get_more_readable_job(job) for job in jobs]} + except Exception as e: + LOG.exception('Failed to show all asynchronous jobs: ' + '%(exception)s ', {'exception': e}) + return utils.format_api_error( + 500, _('Failed to show all asynchronous jobs')) + + # make the job status and resource id more human readable. Split + # resource id into several member uuid(s) to provide more detailed resource + # information. If job entry is from job table, then remove resource id + # and extra id from job attributes. If job entry is from job log table, + # only remove resource id from job attributes. + def _get_more_readable_job(self, job): + job_resource_map = constants.job_resource_map + + if 'status' in job: + job['status'] = constants.job_status_map[job['status']] + else: + job['status'] = constants.job_status_map[constants.JS_Success] + + job['resource'] = dict(zip([resource_id + for resource_type, resource_id + in job_resource_map[job['type']]], + job['resource_id'].split('#'))) + job.pop('resource_id') + + if "extra_id" in job: + job.pop('extra_id') + + return job + + def _get_filters(self, params): + """Return a dictionary of query param filters from the request. + + :param params: the URI params coming from the wsgi layer + :return (flag, filters), flag indicates whether the filters are valid, + and the filters denote a list of key-value pairs. + """ + filters = {} + unsupported_filters = {} + for filter_name in params: + if filter_name in constants.JOB_LIST_SUPPORTED_FILTERS: + # map filter name + if filter_name == 'status': + job_status_in_db = self._get_job_status_in_db( + params.get(filter_name)) + filters[filter_name] = job_status_in_db + continue + filters[filter_name] = params.get(filter_name) + else: + unsupported_filters[filter_name] = params.get(filter_name) + + if unsupported_filters: + return False, unsupported_filters + return True, filters + + # map user input job status to job status stored in database + def _get_job_status_in_db(self, job_status): + job_status_map = { + 'fail': constants.JS_Fail, + 'success': constants.JS_Success, + 'running': constants.JS_Running, + 'new': constants.JS_New + } + job_status_lower = job_status.lower() + if job_status_lower in job_status_map: + return job_status_map[job_status_lower] + return job_status + + @expose(generic=True, template='json') + def delete(self, job_id): + # delete a job from the database. If the job is running, the delete + # operation will fail. In other cases, job will be deleted directly. + context = t_context.extract_context_from_environ() + + if not policy.enforce(context, policy.ADMIN_API_JOB_DELETE): + return utils.format_api_error( + 403, _('Unauthorized to delete a job')) + + try: + db_api.get_job_from_log(context, job_id) + return utils.format_api_error( + 400, _('Job %(job_id)s is from job log') % {'job_id': job_id}) + except Exception: + try: + job = db_api.get_job(context, job_id) + except t_exc.ResourceNotFound: + return utils.format_api_error( + 404, _('Job %(job_id)s not found') % {'job_id': job_id}) + try: + # if job status = RUNNING, notify user this new one, delete + # operation fails. + if job['status'] == constants.JS_Running: + return utils.format_api_error( + 400, (_('Failed to delete the running job %(job_id)s') % + {"job_id": job_id})) + # if job status = SUCCESS, move the job entry to job log table, + # then delete it from job table. + elif job['status'] == constants.JS_Success: + db_api.finish_job(context, job_id, True, timeutils.utcnow()) + pecan.response.status = 200 + return {} + + db_api.delete_job(context, job_id) + pecan.response.status = 200 + return {} + except Exception as e: + LOG.exception('Failed to delete the job: ' + '%(exception)s ', {'exception': e}) + return utils.format_api_error( + 500, _('Failed to delete the job')) + + @expose(generic=True, template='json') + def put(self, job_id): + # we use HTTP/HTTPS PUT method to redo a job. Regularly PUT method + # requires a request body, but considering the job redo operation + # doesn't need more information other than job id, we will issue + # this request without a request body. + context = t_context.extract_context_from_environ() + + if not policy.enforce(context, policy.ADMIN_API_JOB_REDO): + return utils.format_api_error( + 403, _('Unauthorized to redo a job')) + + try: + db_api.get_job_from_log(context, job_id) + return utils.format_api_error( + 400, _('Job %(job_id)s is from job log') % {'job_id': job_id}) + except Exception: + try: + job = db_api.get_job(context, job_id) + except t_exc.ResourceNotFound: + return utils.format_api_error( + 404, _('Job %(job_id)s not found') % {'job_id': job_id}) + + try: + # if status = RUNNING, notify user this new one and then exit + if job['status'] == constants.JS_Running: + return utils.format_api_error( + 400, (_("Can't redo job %(job_id)s which is running") % + {'job_id': job['id']})) + # if status = SUCCESS, notify user this new one and then exit + elif job['status'] == constants.JS_Success: + msg = (_("Can't redo job %(job_id)s which had run successfully" + ) % {'job_id': job['id']}) + return utils.format_api_error(400, msg) + # if job status = FAIL or job status = NEW, redo it immediately + self.xjob_handler.invoke_method(context, job['project_id'], + constants.job_handles[job['type']], + job['type'], job['resource_id']) + except Exception as e: + LOG.exception('Failed to redo the job: ' + '%(exception)s ', {'exception': e}) + return utils.format_api_error( + 500, _('Failed to redo the job')) diff --git a/tricircle/api/controllers/root.py b/tricircle/api/controllers/root.py index 9aae91cf..7a3382bf 100644 --- a/tricircle/api/controllers/root.py +++ b/tricircle/api/controllers/root.py @@ -17,6 +17,7 @@ import pecan from pecan import request +from tricircle.api.controllers import job from tricircle.api.controllers import pod from tricircle.api.controllers import routing import tricircle.common.context as t_context @@ -74,7 +75,8 @@ class V1Controller(object): self.sub_controllers = { "pods": pod.PodsController(), - "routings": routing.RoutingController() + "routings": routing.RoutingController(), + "jobs": job.AsyncJobController() } for name, ctrl in self.sub_controllers.items(): diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index effc6b97..89e1e955 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -90,7 +90,7 @@ PROFILE_FORCE_UP = 'force_up' DEVICE_OWNER_SHADOW = 'compute:shadow' # job type -JT_ROUTER = 'router' +JT_CONFIGURE_ROUTE = 'configure_route' JT_ROUTER_SETUP = 'router_setup' JT_PORT_DELETE = 'port_delete' JT_SEG_RULE_SETUP = 'seg_rule_setup' @@ -108,3 +108,58 @@ NT_FLAT = 'flat' NM_P2P = 'p2p' NM_L2GW = 'l2gw' NM_NOOP = 'noop' + +# map job type to its resource, each resource is denoted by +# (resource_type, resource_id), for the field necessary +# to run the job but resides outside of job resource, we +# denote its type by "None" +job_resource_map = { + JT_CONFIGURE_ROUTE: [(RT_ROUTER, "router_id")], + JT_ROUTER_SETUP: [(None, "pod_id"), + (RT_ROUTER, "router_id"), + (RT_NETWORK, "network_id")], + JT_PORT_DELETE: [(None, "pod_id"), + (RT_PORT, "port_id")], + JT_SEG_RULE_SETUP: [(None, "project_id")], + JT_NETWORK_UPDATE: [(None, "pod_id"), + (RT_NETWORK, "network_id")], + JT_SUBNET_UPDATE: [(None, "pod_id"), + (RT_SUBNET, "subnet_id")], + JT_SHADOW_PORT_SETUP: [(None, "pod_id"), + (RT_NETWORK, "network_id")] +} + +# map raw job status to more human readable job status +job_status_map = { + JS_Fail: 'FAIL', + JS_Success: 'SUCCESS', + JS_Running: 'RUNNING', + JS_New: 'NEW' +} + +# filter jobs according to the job's attributes +JOB_LIST_SUPPORTED_FILTERS = ['project_id', 'type', 'status'] + +# map job type to corresponding job handler +job_handles = { + JT_CONFIGURE_ROUTE: "configure_route", + JT_ROUTER_SETUP: "setup_bottom_router", + JT_PORT_DELETE: "delete_server_port", + JT_SEG_RULE_SETUP: "configure_security_group_rules", + JT_NETWORK_UPDATE: "update_network", + JT_SUBNET_UPDATE: "update_subnet", + JT_SHADOW_PORT_SETUP: "setup_shadow_ports" +} + +# map job type to its primary resource and then we only validate the project_id +# of that resource. For JT_SEG_RULE_SETUP, as it has only one project_id +# parameter, there is no need to validate it. +job_primary_resource_map = { + JT_CONFIGURE_ROUTE: (RT_ROUTER, "router_id"), + JT_ROUTER_SETUP: (RT_ROUTER, "router_id"), + JT_PORT_DELETE: (RT_PORT, "port_id"), + JT_SEG_RULE_SETUP: (None, "project_id"), + JT_NETWORK_UPDATE: (RT_NETWORK, "network_id"), + JT_SUBNET_UPDATE: (RT_SUBNET, "subnet_id"), + JT_SHADOW_PORT_SETUP: (RT_NETWORK, "network_id") +} diff --git a/tricircle/common/policy.py b/tricircle/common/policy.py index 34de2409..0d2319b2 100644 --- a/tricircle/common/policy.py +++ b/tricircle/common/policy.py @@ -57,6 +57,13 @@ ADMIN_API_ROUTINGS_PUT = 'admin_api:routings:put' ADMIN_API_ROUTINGS_SHOW = 'admin_api:routings:show' ADMIN_API_ROUTINGS_LIST = 'admin_api:routings:list' +ADMIN_API_JOB_CREATE = 'admin_api:jobs:create' +ADMIN_API_JOB_LIST = 'admin_api:jobs:list' +ADMIN_API_JOB_SCHEMA_LIST = 'admin_api:jobs:schema_list' +ADMIN_API_JOB_REDO = 'admin_api:jobs:redo' +ADMIN_API_JOB_DELETE = 'admin_api:jobs:delete' + + tricircle_admin_api_policies = [ policy.RuleDefault(ADMIN_API_PODS_CREATE, 'rule:admin_api', @@ -86,6 +93,22 @@ tricircle_admin_api_policies = [ policy.RuleDefault(ADMIN_API_ROUTINGS_LIST, 'rule:admin_api', description='List resource routings'), + + policy.RuleDefault(ADMIN_API_JOB_CREATE, + 'rule:admin_api', + description='Create job'), + policy.RuleDefault(ADMIN_API_JOB_LIST, + 'rule:admin_api', + description='List jobs'), + policy.RuleDefault(ADMIN_API_JOB_SCHEMA_LIST, + 'rule:admin_api', + description='List job schemas'), + policy.RuleDefault(ADMIN_API_JOB_REDO, + 'rule:admin_api', + description='Redo job'), + policy.RuleDefault(ADMIN_API_JOB_DELETE, + 'rule:admin_api', + description='Delete job') ] diff --git a/tricircle/common/xrpcapi.py b/tricircle/common/xrpcapi.py index 6eca7081..381225ac 100644 --- a/tricircle/common/xrpcapi.py +++ b/tricircle/common/xrpcapi.py @@ -69,45 +69,55 @@ class XJobAPI(object): version_cap = 1.0 return version_cap - def _invoke_method(self, ctxt, method, _type, id): - db_api.new_job(ctxt, _type, id) + def invoke_method(self, ctxt, project_id, method, _type, id): + db_api.new_job(ctxt, project_id, _type, id) self.client.prepare(exchange='openstack').cast( ctxt, method, payload={_type: id}) - def setup_bottom_router(self, ctxt, net_id, router_id, pod_id): - self._invoke_method( - ctxt, 'setup_bottom_router', constants.JT_ROUTER_SETUP, + def setup_bottom_router(self, ctxt, project_id, net_id, router_id, pod_id): + self.invoke_method( + ctxt, project_id, constants.job_handles[constants.JT_ROUTER_SETUP], + constants.JT_ROUTER_SETUP, '%s#%s#%s' % (pod_id, router_id, net_id)) - def configure_extra_routes(self, ctxt, router_id): + def configure_route(self, ctxt, project_id, router_id): # NOTE(zhiyuan) this RPC is called by plugin in Neutron server, whose # control exchange is "neutron", however, we starts xjob without # specifying its control exchange, so the default value "openstack" is # used, thus we need to pass exchange as "openstack" here. - self._invoke_method( - ctxt, 'configure_extra_routes', constants.JT_ROUTER, router_id) + self.invoke_method( + ctxt, project_id, + constants.job_handles[constants.JT_CONFIGURE_ROUTE], + constants.JT_CONFIGURE_ROUTE, router_id) - def delete_server_port(self, ctxt, port_id, pod_id): - self._invoke_method( - ctxt, 'delete_server_port', constants.JT_PORT_DELETE, + def delete_server_port(self, ctxt, project_id, port_id, pod_id): + self.invoke_method( + ctxt, project_id, constants.job_handles[constants.JT_PORT_DELETE], + constants.JT_PORT_DELETE, '%s#%s' % (pod_id, port_id)) def configure_security_group_rules(self, ctxt, project_id): - self._invoke_method( - ctxt, 'configure_security_group_rules', + self.invoke_method( + ctxt, project_id, + constants.job_handles[constants.JT_SEG_RULE_SETUP], constants.JT_SEG_RULE_SETUP, project_id) - def update_network(self, ctxt, network_id, pod_id): - self._invoke_method( - ctxt, 'update_network', constants.JT_NETWORK_UPDATE, + def update_network(self, ctxt, project_id, network_id, pod_id): + self.invoke_method( + ctxt, project_id, + constants.job_handles[constants.JT_NETWORK_UPDATE], + constants.JT_NETWORK_UPDATE, '%s#%s' % (pod_id, network_id)) - def update_subnet(self, ctxt, subnet_id, pod_id): - self._invoke_method( - ctxt, 'update_subnet', constants.JT_SUBNET_UPDATE, + def update_subnet(self, ctxt, project_id, subnet_id, pod_id): + self.invoke_method( + ctxt, project_id, + constants.job_handles[constants.JT_SUBNET_UPDATE], + constants.JT_SUBNET_UPDATE, '%s#%s' % (pod_id, subnet_id)) - def setup_shadow_ports(self, ctxt, pod_id, net_id): - self._invoke_method( - ctxt, 'setup_shadow_ports', constants.JT_SHADOW_PORT_SETUP, - '%s#%s' % (pod_id, net_id)) + def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id): + self.invoke_method( + ctxt, project_id, + constants.job_handles[constants.JT_SHADOW_PORT_SETUP], + constants.JT_SHADOW_PORT_SETUP, '%s#%s' % (pod_id, net_id)) diff --git a/tricircle/db/api.py b/tricircle/db/api.py index b99b521a..fb3f96c0 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -337,11 +337,12 @@ def find_pod_by_az_or_region(context, az_or_region): reason='Multiple pods with the same az_name are found') -def new_job(context, _type, resource_id): +def new_job(context, project_id, _type, resource_id): with context.session.begin(): job_dict = {'id': uuidutils.generate_uuid(), 'type': _type, 'status': constants.JS_New, + 'project_id': project_id, 'resource_id': resource_id, 'extra_id': uuidutils.generate_uuid()} job = core.create_resource(context, @@ -349,12 +350,13 @@ def new_job(context, _type, resource_id): return job -def register_job(context, _type, resource_id): +def register_job(context, project_id, _type, resource_id): try: context.session.begin() job_dict = {'id': uuidutils.generate_uuid(), 'type': _type, 'status': constants.JS_Running, + 'project_id': project_id, 'resource_id': resource_id, 'extra_id': constants.SP_EXTRA_ID} job = core.create_resource(context, @@ -392,22 +394,68 @@ def get_latest_failed_or_new_jobs(context): # sort sequence is "0_Fail", "1_Success", "2_Running", "3_New" query = context.session.query(models.AsyncJob.type, models.AsyncJob.resource_id, + models.AsyncJob.project_id, sql.func.min(models.AsyncJob.status)).join( stmt, sql.and_(models.AsyncJob.type == stmt.c.type, models.AsyncJob.resource_id == stmt.c.resource_id, models.AsyncJob.timestamp == stmt.c.timestamp)) - query = query.group_by(models.AsyncJob.type, + query = query.group_by(models.AsyncJob.project_id, + models.AsyncJob.type, models.AsyncJob.resource_id) - for job_type, resource_id, status in query: + for job_type, resource_id, project_id, status in query: if status == constants.JS_Fail: - failed_jobs.append({'type': job_type, 'resource_id': resource_id}) + failed_jobs.append({'type': job_type, 'resource_id': resource_id, + 'project_id': project_id}) elif status == constants.JS_New: - new_jobs.append({'type': job_type, 'resource_id': resource_id}) + new_jobs.append({'type': job_type, 'resource_id': resource_id, + 'project_id': project_id}) return failed_jobs, new_jobs -def get_latest_timestamp(context, status, _type, resource_id): +def list_jobs(context, filters=None, sorts=None): + with context.session.begin(): + # get all jobs from job table + jobs = core.query_resource(context, models.AsyncJob, + filters or [], sorts or []) + return jobs + + +def list_jobs_from_log(context, filters=None, sorts=None): + with context.session.begin(): + # get all jobs from job log table, because the job log table only + # stores successful jobs, so this method merely returns successful jobs + if filters is not None: + for filter in filters: + if filter.get('key') == 'status': + job_status = filter['value'] + # job entry in job log table has no status attribute. + if job_status == constants.JS_Success: + filters.remove(filter) + else: + return [] + + jobs_in_log = core.query_resource( + context, models.AsyncJobLog, filters or [], sorts or []) + return jobs_in_log + + +def get_job(context, job_id): + with context.session.begin(): + return core.get_resource(context, models.AsyncJob, job_id) + + +def get_job_from_log(context, job_id): + with context.session.begin(): + return core.get_resource(context, models.AsyncJobLog, job_id) + + +def delete_job(context, job_id): + with context.session.begin(): + return core.delete_resource(context, models.AsyncJob, job_id) + + +def get_latest_job(context, status, _type, resource_id): jobs = core.query_resource( context, models.AsyncJob, [{'key': 'status', 'comparator': 'eq', 'value': status}, @@ -415,7 +463,7 @@ def get_latest_timestamp(context, status, _type, resource_id): {'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}], [('timestamp', False)]) if jobs: - return jobs[0]['timestamp'] + return jobs[0] else: return None @@ -442,6 +490,7 @@ def finish_job(context, job_id, successful, timestamp): if status == constants.JS_Success: log_dict = {'id': uuidutils.generate_uuid(), 'type': job['type'], + 'project_id': job['project_id'], 'timestamp': timestamp, 'resource_id': job['resource_id']} context.session.query(models.AsyncJob).filter( diff --git a/tricircle/db/migrate_repo/versions/006_add_project_id_to_async_jobs.py b/tricircle/db/migrate_repo/versions/006_add_project_id_to_async_jobs.py new file mode 100644 index 00000000..e1bae0fb --- /dev/null +++ b/tricircle/db/migrate_repo/versions/006_add_project_id_to_async_jobs.py @@ -0,0 +1,30 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table + + +def upgrade(migrate_engine): + """Function adds project_id field.""" + meta = MetaData(bind=migrate_engine) + + # Add a new column project_id for async_jobs + async_jobs = Table('async_jobs', meta, autoload=True) + project_id = Column('project_id', String(36), nullable=True) + + if not hasattr(async_jobs.c, 'project_id'): + async_jobs.create_column(project_id) diff --git a/tricircle/db/migrate_repo/versions/007_add_project_id_to_async_job_logs.py b/tricircle/db/migrate_repo/versions/007_add_project_id_to_async_job_logs.py new file mode 100644 index 00000000..f9be88a1 --- /dev/null +++ b/tricircle/db/migrate_repo/versions/007_add_project_id_to_async_job_logs.py @@ -0,0 +1,30 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import Column +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table + + +def upgrade(migrate_engine): + """Function adds project_id field.""" + meta = MetaData(bind=migrate_engine) + + # Add a new column project_id for async_job_logs + async_job_logs = Table('async_job_logs', meta, autoload=True) + project_id = Column('project_id', String(36), nullable=True) + + if not hasattr(async_job_logs.c, 'project_id'): + async_job_logs.create_column(project_id) diff --git a/tricircle/db/models.py b/tricircle/db/models.py index c36a17cb..38d1ea11 100644 --- a/tricircle/db/models.py +++ b/tricircle/db/models.py @@ -88,10 +88,11 @@ class AsyncJob(core.ModelBase, core.DictBase): name='async_jobs0type0status0resource_id0extra_id'), ) - attributes = ['id', 'type', 'timestamp', 'status', 'resource_id', - 'extra_id'] + attributes = ['id', 'project_id', 'type', 'timestamp', 'status', + 'resource_id', 'extra_id'] id = sql.Column('id', sql.String(length=36), primary_key=True) + project_id = sql.Column('project_id', sql.String(length=36)) type = sql.Column('type', sql.String(length=36)) timestamp = sql.Column('timestamp', sql.TIMESTAMP, server_default=sql.text('CURRENT_TIMESTAMP'), @@ -104,9 +105,10 @@ class AsyncJob(core.ModelBase, core.DictBase): class AsyncJobLog(core.ModelBase, core.DictBase): __tablename__ = 'async_job_logs' - attributes = ['id', 'resource_id', 'type', 'timestamp'] + attributes = ['id', 'project_id', 'resource_id', 'type', 'timestamp'] id = sql.Column('id', sql.String(length=36), primary_key=True) + project_id = sql.Column('project_id', sql.String(length=36)) resource_id = sql.Column('resource_id', sql.String(length=127)) type = sql.Column('type', sql.String(length=36)) timestamp = sql.Column('timestamp', sql.TIMESTAMP, diff --git a/tricircle/network/central_plugin.py b/tricircle/network/central_plugin.py index 891b65ce..4fa784d8 100644 --- a/tricircle/network/central_plugin.py +++ b/tricircle/network/central_plugin.py @@ -393,7 +393,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, t_ctx, network_id, t_constants.RT_NETWORK) if mappings: self.xjob_handler.update_network( - t_ctx, network_id, t_constants.POD_NOT_SPECIFIED) + t_ctx, net['tenant_id'], network_id, + t_constants.POD_NOT_SPECIFIED) self.type_manager.extend_network_dict_provider(context, net) return net @@ -530,7 +531,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, mappings = db_api.get_bottom_mappings_by_top_id( t_ctx, subnet_id, t_constants.RT_SUBNET) if mappings: - self.xjob_handler.update_subnet(t_ctx, subnet_id, + self.xjob_handler.update_subnet(t_ctx, result['tenant_id'], + subnet_id, t_constants.POD_NOT_SPECIFIED) return result @@ -645,8 +647,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, # for local router, job will be triggered after router # interface attachment. self.xjob_handler.setup_bottom_router( - admin_context, port_body['network_id'], - router_id, pod['pod_id']) + admin_context, router['tenant_id'], + port_body['network_id'], router_id, pod['pod_id']) # network will be attached to only one non-local router, # so we break here break @@ -695,7 +697,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, if is_vxlan_network and ( cfg.CONF.client.cross_pod_vxlan_mode in ( t_constants.NM_P2P, t_constants.NM_L2GW)): - self.xjob_handler.setup_shadow_ports(t_ctx, pod['pod_id'], + self.xjob_handler.setup_shadow_ports(t_ctx, res['tenant_id'], + pod['pod_id'], res['network_id']) # for vm port or port with empty device_owner, update top port and # bottom port @@ -768,8 +771,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, # delete ports for pod, _id in self.helper.get_real_shadow_resource_iterator( t_ctx, t_constants.RT_NETWORK, port['network_id']): - self.xjob_handler.delete_server_port(t_ctx, port_id, - pod['pod_id']) + self.xjob_handler.delete_server_port( + t_ctx, port['tenant_id'], port_id, pod['pod_id']) except Exception: raise with t_ctx.session.begin(): @@ -1554,7 +1557,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, t_ctx = t_context.get_context_from_neutron_context(context) is_local_router = self.helper.is_local_router(t_ctx, router) if not is_local_router: - self.xjob_handler.configure_extra_routes(t_ctx, router_id) + self.xjob_handler.configure_route( + t_ctx, ret['tenant_id'], router_id) return ret def validate_router_net_location_match(self, t_ctx, router, net): @@ -1674,10 +1678,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, try: if len(b_pods) == 1: self.xjob_handler.setup_bottom_router( - t_ctx, net_id, router_id, b_pods[0]['pod_id']) + t_ctx, project_id, net_id, router_id, b_pods[0]['pod_id']) else: self.xjob_handler.setup_bottom_router( - t_ctx, net_id, router_id, t_constants.POD_NOT_SPECIFIED) + t_ctx, project_id, net_id, router_id, + t_constants.POD_NOT_SPECIFIED) except Exception: # NOTE(zhiyuan) we fail to submit the job, so bottom router # operations are not started, it's safe for us to remove the top @@ -1703,15 +1708,20 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, return_info = super(TricirclePlugin, self).remove_router_interface( context, router_id, interface_info) + + router = self._get_router(context, router_id) + if not b_pods: return return_info try: if len(b_pods) == 1: self.xjob_handler.setup_bottom_router( - t_ctx, net_id, router_id, b_pods[0]['pod_id']) + t_ctx, router['tenant_id'], net_id, + router_id, b_pods[0]['pod_id']) else: self.xjob_handler.setup_bottom_router( - t_ctx, net_id, router_id, t_constants.POD_NOT_SPECIFIED) + t_ctx, router['tenant_id'], net_id, + router_id, t_constants.POD_NOT_SPECIFIED) except Exception: # NOTE(zhiyuan) we fail to submit the job, so if bottom router # interface exists, it would not be deleted, then after we add @@ -1832,8 +1842,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, int_net_pod, b_int_port_id = mappings[0] int_port = self.get_port(context, int_port_id) net_id = int_port['network_id'] + router_id = floatingip_db['router_id'] + router = self._get_router(context, router_id) self.xjob_handler.setup_bottom_router( - t_ctx, net_id, floatingip_db['router_id'], int_net_pod['pod_id']) + t_ctx, router['tenant_id'], net_id, + floatingip_db['router_id'], int_net_pod['pod_id']) def _disassociate_floatingip(self, context, ori_floatingip_db): if not ori_floatingip_db['port_id']: @@ -1858,9 +1871,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2, b_int_net_pod, b_int_port_id = mappings[0] int_port = self.get_port(context, t_int_port_id) net_id = int_port['network_id'] + router_id = ori_floatingip_db['router_id'] + router = self._get_router(context, router_id) self.xjob_handler.setup_bottom_router( - t_ctx, net_id, ori_floatingip_db['router_id'], - b_int_net_pod['pod_id']) + t_ctx, router['tenant_id'], net_id, + ori_floatingip_db['router_id'], b_int_net_pod['pod_id']) def delete_floatingip(self, context, _id): """Disassociate floating ip if needed then delete it diff --git a/tricircle/tests/functional/api/controllers/test_job.py b/tricircle/tests/functional/api/controllers/test_job.py new file mode 100644 index 00000000..35e8c251 --- /dev/null +++ b/tricircle/tests/functional/api/controllers/test_job.py @@ -0,0 +1,767 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +from mock import patch +from oslo_config import cfg +from oslo_config import fixture as fixture_config +from oslo_utils import timeutils +from oslo_utils import uuidutils +from six.moves import xrange + +import pecan +from pecan.configuration import set_config +from pecan.testing import load_test_app + +from tricircle.api import app +from tricircle.common import constants +from tricircle.common import context +from tricircle.common import policy +from tricircle.common import xrpcapi +from tricircle.db import api as db_api +from tricircle.db import core +from tricircle.db import models +from tricircle.tests import base + + +OPT_GROUP_NAME = 'keystone_authtoken' +cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token") + + +def fake_admin_context(): + context_paras = {'is_admin': True} + return context.Context(**context_paras) + + +def fake_non_admin_context(): + context_paras = {} + return context.Context(**context_paras) + + +class API_FunctionalTest(base.TestCase): + + def setUp(self): + super(API_FunctionalTest, self).setUp() + + self.addCleanup(set_config, {}, overwrite=True) + + cfg.CONF.clear() + cfg.CONF.register_opts(app.common_opts) + + self.CONF = self.useFixture(fixture_config.Config()).conf + + self.CONF.set_override('auth_strategy', 'noauth') + self.CONF.set_override('tricircle_db_connection', 'sqlite:///:memory:') + + core.initialize() + core.ModelBase.metadata.create_all(core.get_engine()) + + self.context = context.get_admin_context() + + policy.populate_default_rules() + + self.app = self._make_app() + + def _make_app(self, enable_acl=False): + self.config = { + 'app': { + 'root': 'tricircle.api.controllers.root.RootController', + 'modules': ['tricircle.api'], + 'enable_acl': enable_acl, + }, + } + + return load_test_app(self.config) + + def tearDown(self): + super(API_FunctionalTest, self).tearDown() + cfg.CONF.unregister_opts(app.common_opts) + pecan.set_config({}, overwrite=True) + core.ModelBase.metadata.drop_all(core.get_engine()) + policy.reset() + + +class TestAsyncJobController(API_FunctionalTest): + """Test version listing on root URI.""" + + def setUp(self): + super(TestAsyncJobController, self).setUp() + self.job_resource_map = constants.job_resource_map + self.all_job_types = list(self.job_resource_map.keys()) + + def fake_new_job(context, project_id, type, resource_id): + raise Exception + + def fake_invoke_method(self, context, project_id, method, type, id): + db_api.new_job(context, project_id, type, id) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + def test_post_no_input(self): + job = self._prepare_job_element(constants.JT_CONFIGURE_ROUTE) + + jobs = [ + # missing job + { + "job_xxx": job, + "expected_error": 400 + }, + ] + + for test_job in jobs: + response = self.app.post_json( + '/v1.0/jobs', + dict(job_xxx=test_job['job_xxx']), + expect_errors=True) + + self.assertEqual(response.status_int, + test_job['expected_error']) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + @patch.object(db_api, 'new_job', + new=fake_new_job) + def test_post_exception(self): + job = self._prepare_job_element(constants.JT_CONFIGURE_ROUTE) + + jobs = [ + { + "job": job, + "expected_error": 500 + }, + ] + self._test_and_check(jobs) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + def test_post_invalid_input(self): + for job_type in self.all_job_types: + job = self._prepare_job_element(job_type) + + # wrong job type parameter: no job type is provided + job_1 = copy.deepcopy(job) + job_1.pop('type') + + # wrong job type parameter: job type is empty + job_2 = copy.deepcopy(job) + job_2['type'] = '' + + # wrong job type parameter: job type is wrong + job_3 = copy.deepcopy(job) + job_3['type'] = job['type'] + '_1' + + # wrong resource parameter: no resource is provided + job_4 = copy.deepcopy(job) + job_4.pop('resource') + + # wrong resource parameter: lack of necessary resource + job_5 = copy.deepcopy(job) + job_5['resource'].popitem() + + # wrong resource parameter: redundant resource + job_6 = copy.deepcopy(job) + job_6['resource']['fake_resource'] = 'fake_resource' + + # wrong project id parameter: no project id is provided + job_7 = copy.deepcopy(job) + job_7.pop('project_id') + + # wrong project id parameter: project id is empty + job_8 = copy.deepcopy(job) + job_8['project_id'] = '' + + # wrong project id parameter: project is not the + # owner of resource + job_9 = copy.deepcopy(job) + job_9['project_id'] = uuidutils.generate_uuid() + + jobs = [ + { + "job": job_1, + "expected_error": 400 + }, + { + "job": job_2, + "expected_error": 400 + }, + { + "job": job_3, + "expected_error": 400 + }, + { + "job": job_4, + "expected_error": 400 + }, + { + "job": job_5, + "expected_error": 400 + }, + { + "job": job_6, + "expected_error": 400 + }, + { + "job": job_7, + "expected_error": 400 + }, + { + "job": job_8, + "expected_error": 400 + }, + { + "job": job_9, + "expected_error": 400 + }, + ] + + self._test_and_check(jobs) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + @patch.object(xrpcapi.XJobAPI, 'invoke_method', + new=fake_invoke_method) + def test_post_job(self): + for job_type in self.all_job_types: + job = self._prepare_job_element(job_type) + + jobs = [ + # create an entirely new job + { + "job": job, + "expected_error": 200 + }, + # target job already exists in the job table and its status + # is NEW, then this newer job will be picked by job handler. + { + "job": job, + "expected_error": 200 + }, + ] + + self._test_and_check(jobs) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + @patch.object(xrpcapi.XJobAPI, 'invoke_method', + new=fake_invoke_method) + def test_get_one_and_get_all(self): + all_job_ids = {} + all_job_project_ids = {} + + index = 0 + for job_type in self.all_job_types: + job = self._prepare_job_element(job_type) + + jobs = [ + { + "job": job, + "expected_error": 200 + }, + ] + + self._test_and_check(jobs) + + response = self.app.get('/v1.0/jobs') + return_job = response.json + + all_job_ids[index] = return_job['jobs'][index]['id'] + all_job_project_ids[job_type] = ( + return_job['jobs'][index]['project_id']) + + index = index + 1 + + service_uris = ['jobs', 'jobs/detail'] + amount_of_all_jobs = len(self.all_job_types) + # with no filters all jobs are returned + for service_uri in service_uris: + response_1 = self.app.get('/v1.0/%(service_uri)s' % { + 'service_uri': service_uri}) + return_jobs_1 = response_1.json + + self.assertEqual(amount_of_all_jobs, len(return_jobs_1['jobs'])) + self.assertIn('status', response_1) + self.assertIn('resource', response_1) + self.assertIn('project_id', response_1) + self.assertIn('id', response_1) + self.assertIn('timestamp', response_1) + self.assertIn('type', response_1) + + self.assertNotIn('extra_id', response_1) + self.assertNotIn('resource_id', response_1) + + # use job status filter + response_2 = self.app.get('/v1.0/jobs?status=new') + return_jobs_2 = response_2.json + + self.assertEqual(amount_of_all_jobs, len(return_jobs_2['jobs'])) + + response = self.app.get('/v1.0/jobs?status=fail') + return_jobs_3 = response.json + + self.assertEqual(0, len(return_jobs_3['jobs'])) + + amount_of_fail_jobs = int(amount_of_all_jobs / 3) + for i in xrange(amount_of_fail_jobs): + db_api.finish_job(self.context, + all_job_ids[i], False, + timeutils.utcnow()) + + amount_of_succ_jobs = int(amount_of_all_jobs / 3) + for i in xrange(amount_of_succ_jobs): + db_api.finish_job(self.context, + all_job_ids[amount_of_fail_jobs + i], True, + timeutils.utcnow()) + + for service_uri in service_uris: + response = self.app.get('/v1.0/%(service_uri)s?status=fail' % { + 'service_uri': service_uri}) + return_jobs = response.json + + self.assertEqual(amount_of_fail_jobs, len(return_jobs['jobs'])) + + response = self.app.get('/v1.0/%(service_uri)s?status=success' + '' % {'service_uri': service_uri}) + return_jobs = response.json + + self.assertEqual(amount_of_succ_jobs, len(return_jobs['jobs'])) + + # use job type filter or project id filter + for job_type in self.all_job_types: + response = self.app.get('/v1.0/%(service_uri)s?type=%(type)s' + '' % {'service_uri': service_uri, + 'type': job_type}) + return_job = response.json + + self.assertEqual(1, len(return_job['jobs'])) + + response = self.app.get( + '/v1.0/%(service_uri)s?project_id=%(project_id)s' % { + 'service_uri': service_uri, + 'project_id': all_job_project_ids[job_type]}) + return_job = response.json + + self.assertEqual(1, len(return_job['jobs'])) + + # combine job type filter and project id filter + response = self.app.get( + '/v1.0/%(service_uri)s?project_id=%(project_id)s&' + 'type=%(type)s' % { + 'service_uri': service_uri, + 'project_id': all_job_project_ids[job_type], + 'type': job_type}) + return_job = response.json + + self.assertEqual(1, len(return_job['jobs'])) + + # combine job type filter, project id filter and job status filter + for i in xrange(amount_of_all_jobs): + if i < amount_of_fail_jobs: + # this aims to test service "/v1.0/jobs/{id}" + response_1 = self.app.get('/v1.0/jobs/%(id)s' % { + 'id': all_job_ids[i]}) + return_job_1 = response_1.json + + response_2 = self.app.get( + '/v1.0/%(service_uri)s?' + 'project_id=%(project_id)s&' + 'type=%(type)s&' + 'status=%(status)s' % { + 'service_uri': service_uri, + 'project_id': return_job_1['job']['project_id'], + 'type': return_job_1['job']['type'], + 'status': 'fail'}) + + return_job_2 = response_2.json + + self.assertEqual(1, len(return_job_2['jobs'])) + + elif ((i >= amount_of_fail_jobs + ) and (i < amount_of_fail_jobs + amount_of_succ_jobs)): + # those jobs are set to 'success' and they are moved to + # job log. their job ids are not stored in all_job_ids + job_type = self.all_job_types[i] + response = self.app.get( + '/v1.0/%(service_uri)s?project_id=%(project_id)s&' + 'type=%(type)s&status=%(status)s' % { + 'service_uri': service_uri, + 'project_id': all_job_project_ids[job_type], + 'type': job_type, + 'status': 'success'}) + + return_job = response.json + + self.assertEqual(1, len(return_job['jobs'])) + + response_2 = self.app.get( + '/v1.0/%(service_uri)s?status=%(status)s' + '&type=%(type)s' % { + 'service_uri': service_uri, + 'status': "success-x", + 'type': job_type}) + return_job_2 = response_2.json + self.assertEqual(0, len(return_job_2['jobs'])) + + else: + response_1 = self.app.get('/v1.0/jobs/%(id)s' % { + 'id': all_job_ids[i]}) + return_job_1 = response_1.json + + response_2 = self.app.get( + '/v1.0/%(service_uri)s?project_id=%(project_id)s&' + 'type=%(type)s&status=%(status)s' % { + 'service_uri': service_uri, + 'project_id': return_job_1['job']['project_id'], + 'type': return_job_1['job']['type'], + 'status': 'new'}) + + return_job_2 = response_2.json + + self.assertEqual(1, len(return_job_2['jobs'])) + + response_3 = self.app.get( + '/v1.0/%(service_uri)s?status=%(status)s' + '&type=%(type)s' % { + 'service_uri': service_uri, + 'status': "new-x", + 'type': return_job_1['job']['type']}) + return_job_3 = response_3.json + self.assertEqual(0, len(return_job_3['jobs'])) + + # use unsupported filter, it will raise 400 error + response = self.app.get('/v1.0/%(service_uri)s?' + 'fake_filter=%(fake_filter)s' + '' % {'service_uri': service_uri, + 'fake_filter': "fake_filter"}, + expect_errors=True) + + self.assertEqual(response.status_int, 400) + + # use invalid filter, it will return empty set + response = self.app.get('/v1.0/%(service_uri)s?status=%(status)s' + '' % {'service_uri': service_uri, + 'status': "new-x"}) + return_job = response.json + self.assertEqual(0, len(return_job['jobs'])) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + def test_get_job_schemas(self): + response = self.app.get('/v1.0/jobs/schemas') + return_job_schemas = response.json + + job_schemas = [] + for job_type in self.all_job_types: + job = {} + resource = [] + for resource_type, resource_id in ( + self.job_resource_map[job_type]): + resource.append(resource_id) + job['resource'] = resource + job['type'] = job_type + job_schemas.append(job) + + self.assertEqual(job_schemas, return_job_schemas['schemas']) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + @patch.object(xrpcapi.XJobAPI, 'invoke_method', + new=fake_invoke_method) + def test_delete_job(self): + + for job_type in self.all_job_types: + job = self._prepare_job_element(job_type) + + jobs = [ + { + "job": job, + "expected_error": 200 + }, + ] + + self._test_and_check(jobs) + + response = self.app.get('/v1.0/jobs') + return_job = response.json + + jobs = return_job['jobs'] + + # delete a new job + for job in jobs: + response_1 = self.app.delete( + '/v1.0/jobs/%(id)s' % {'id': job['id']}, + expect_errors=True) + return_value_1 = response_1.json + + self.assertEqual(response_1.status_int, 200) + self.assertEqual(return_value_1, {}) + + response_2 = self.app.get('/v1.0/jobs') + return_job_2 = response_2.json + self.assertEqual(0, len(return_job_2['jobs'])) + + response_3 = self.app.delete('/v1.0/jobs/123', expect_errors=True) + self.assertEqual(response_3.status_int, 404) + + # delete a running job + job_type_4 = constants.JT_NETWORK_UPDATE + job_4 = self._prepare_job_element(job_type_4) + resource_id_4 = '#'.join([job_4['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type_4]]) + job_running_4 = db_api.register_job(self.context, + job_4['project_id'], + job_type_4, + resource_id_4) + + self.assertEqual(constants.JS_Running, job_running_4['status']) + response_4 = self.app.delete('/v1.0/jobs/%(id)s' % { + 'id': job_running_4['id']}, expect_errors=True) + + self.assertEqual(response_4.status_int, 400) + + # delete a failed job + job_type_5 = constants.JT_NETWORK_UPDATE + job_5 = self._prepare_job_element(job_type_5) + + job_dict_5 = { + "job": job_5, + "expected_error": 200 + } + + response_5 = self.app.post_json('/v1.0/jobs', + dict(job=job_dict_5['job']), + expect_errors=True) + return_job_5 = response_5.json + + self.assertEqual(response_5.status_int, 200) + + db_api.finish_job(self.context, + return_job_5['job']['id'], + False, timeutils.utcnow()) + + job_fail_5 = db_api.get_job(self.context, return_job_5['job']['id']) + self.assertEqual(constants.JS_Fail, job_fail_5['status']) + response_6 = self.app.delete('/v1.0/jobs/%(id)s' % { + 'id': return_job_5['job']['id']}, expect_errors=True) + + self.assertEqual(response_6.status_int, 200) + + # delete a successful job + job_type_6 = constants.JT_NETWORK_UPDATE + job_6 = self._prepare_job_element(job_type_6) + + job_dict_6 = { + "job": job_6, + "expected_error": 200 + } + + response_6 = self.app.post_json('/v1.0/jobs', + dict(job=job_dict_6['job']), + expect_errors=True) + return_job_6 = response_6.json + + with self.context.session.begin(): + job_dict = {'status': constants.JS_Success, + 'timestamp': timeutils.utcnow(), + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(self.context, models.AsyncJob, + return_job_6['job']['id'], job_dict) + + job_succ_6 = db_api.get_job(self.context, return_job_6['job']['id']) + self.assertEqual(constants.JS_Success, job_succ_6['status']) + response_7 = self.app.delete('/v1.0/jobs/%(id)s' % { + 'id': return_job_6['job']['id']}, expect_errors=True) + + self.assertEqual(response_7.status_int, 200) + + @patch.object(context, 'extract_context_from_environ', + new=fake_admin_context) + @patch.object(xrpcapi.XJobAPI, 'invoke_method', + new=fake_invoke_method) + def test_redo_job(self): + + for job_type in self.all_job_types: + job = self._prepare_job_element(job_type) + + jobs = [ + # create an entirely new job + { + "job": job, + "expected_error": 200 + }, + ] + + self._test_and_check(jobs) + + response = self.app.get('/v1.0/jobs') + return_job = response.json + + jobs = return_job['jobs'] + + # redo a new job + for job in jobs: + response_1 = self.app.put('/v1.0/jobs/%(id)s' % {'id': job['id']}, + expect_errors=True) + + self.assertEqual(response_1.status_int, 200) + + response_2 = self.app.put('/v1.0/jobs/123', expect_errors=True) + self.assertEqual(response_2.status_int, 404) + + # redo a running job + job_type_3 = constants.JT_NETWORK_UPDATE + job_3 = self._prepare_job_element(job_type_3) + resource_id_3 = '#'.join([job_3['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type_3]]) + job_running_3 = db_api.register_job(self.context, + job_3['project_id'], + job_type_3, + resource_id_3) + + self.assertEqual(constants.JS_Running, job_running_3['status']) + response_3 = self.app.put('/v1.0/jobs/%(id)s' % { + 'id': job_running_3['id']}, expect_errors=True) + + self.assertEqual(response_3.status_int, 400) + + # redo a failed job + job_type_4 = constants.JT_NETWORK_UPDATE + job_4 = self._prepare_job_element(job_type_4) + + job_dict_4 = { + "job": job_4, + "expected_error": 200 + } + + response_4 = self.app.post_json('/v1.0/jobs', + dict(job=job_dict_4['job']), + expect_errors=True) + return_job_4 = response_4.json + + self.assertEqual(response_4.status_int, 200) + + db_api.finish_job(self.context, + return_job_4['job']['id'], + False, timeutils.utcnow()) + + job_fail_4 = db_api.get_job(self.context, return_job_4['job']['id']) + self.assertEqual(constants.JS_Fail, job_fail_4['status']) + response_5 = self.app.put('/v1.0/jobs/%(id)s' % { + 'id': return_job_4['job']['id']}, expect_errors=True) + + self.assertEqual(response_5.status_int, 200) + + # redo a successful job + job_type_6 = constants.JT_NETWORK_UPDATE + job_6 = self._prepare_job_element(job_type_6) + + job_dict_6 = { + "job": job_6, + "expected_error": 200 + } + + response_6 = self.app.post_json('/v1.0/jobs', + dict(job=job_dict_6['job']), + expect_errors=True) + return_job_6 = response_6.json + + with self.context.session.begin(): + job_dict = {'status': constants.JS_Success, + 'timestamp': timeutils.utcnow(), + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(self.context, models.AsyncJob, + return_job_6['job']['id'], job_dict) + + job_succ_6 = db_api.get_job(self.context, return_job_6['job']['id']) + self.assertEqual(constants.JS_Success, job_succ_6['status']) + response_7 = self.app.put('/v1.0/jobs/%(id)s' % { + 'id': return_job_6['job']['id']}, expect_errors=True) + + self.assertEqual(response_7.status_int, 400) + + @patch.object(context, 'extract_context_from_environ', + new=fake_non_admin_context) + def test_non_admin_action(self): + job_type = constants.JT_NETWORK_UPDATE + job = self._prepare_job_element(job_type) + + jobs = [ + { + "job": job, + "expected_error": 403 + }, + ] + self._test_and_check(jobs) + + response_1 = self.app.get('/v1.0/jobs/1234567890', + expect_errors=True) + self.assertEqual(response_1.status_int, 403) + + response_2 = self.app.get('/v1.0/jobs', + expect_errors=True) + self.assertEqual(response_2.status_int, 403) + + response_3 = self.app.delete('/v1.0/jobs/1234567890', + expect_errors=True) + self.assertEqual(response_3.status_int, 403) + + response_4 = self.app.put('/v1.0/jobs/1234567890', + expect_errors=True) + self.assertEqual(response_4.status_int, 403) + + def _test_and_check(self, jobs): + + for test_job in jobs: + response = self.app.post_json( + '/v1.0/jobs', dict(job=test_job['job']), + expect_errors=True) + self.assertEqual(response.status_int, test_job['expected_error']) + + def _prepare_job_element(self, job_type): + # in order to create a job, we need three elements: job type, + # job resource and project id. + job = {} + job['resource'] = {} + job['type'] = job_type + + for resource_type, resource_id in self.job_resource_map[job_type]: + job['resource'][resource_id] = uuidutils.generate_uuid() + + job['project_id'] = self._prepare_project_id_for_job(job) + + return job + + def _prepare_project_id_for_job(self, job): + # prepare the project id for job creation, currently job parameter + # contains job type and job resource information. + job_type = job['type'] + if job_type == constants.JT_SEG_RULE_SETUP: + project_id = job['resource']['project_id'] + else: + project_id = uuidutils.generate_uuid() + pod_id = uuidutils.generate_uuid() + + resource_type, resource_id = ( + constants.job_primary_resource_map[job_type]) + routing = db_api.create_resource_mapping( + self.context, job['resource'][resource_id], + job['resource'][resource_id], pod_id, project_id, + resource_type) + self.assertIsNotNone(routing) + + return project_id + + def _validate_error_code(self, res, code): + self.assertEqual(res[list(res.keys())[0]]['code'], code) diff --git a/tricircle/tests/unit/api/controllers/test_job.py b/tricircle/tests/unit/api/controllers/test_job.py new file mode 100644 index 00000000..e7f8c849 --- /dev/null +++ b/tricircle/tests/unit/api/controllers/test_job.py @@ -0,0 +1,616 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import mock +from mock import patch +from oslo_utils import timeutils +from oslo_utils import uuidutils +from six.moves import xrange +import unittest + +import pecan + +from tricircle.api.controllers import job +from tricircle.common import constants +from tricircle.common import context +from tricircle.common import policy +from tricircle.common import xrpcapi +from tricircle.db import api as db_api +from tricircle.db import core +from tricircle.db import models + + +class FakeRPCAPI(xrpcapi.XJobAPI): + def invoke_method(self, ctxt, project_id, method, _type, id): + db_api.new_job(ctxt, project_id, _type, id) + + +class FakeAsyncJobController(job.AsyncJobController): + def __init__(self): + self.xjob_handler = FakeRPCAPI() + + +class FakeResponse(object): + def __new__(cls, code=500): + cls.status = code + cls.status_code = code + return super(FakeResponse, cls).__new__(cls) + + +class AsyncJobControllerTest(unittest.TestCase): + def setUp(self): + core.initialize() + core.ModelBase.metadata.create_all(core.get_engine()) + self.controller = FakeAsyncJobController() + self.context = context.get_admin_context() + self.job_resource_map = constants.job_resource_map + policy.populate_default_rules() + + @patch.object(pecan, 'response', new=FakeResponse) + @patch.object(context, 'extract_context_from_environ') + def test_post(self, mock_context): + mock_context.return_value = self.context + + # cover all job types + for job_type in self.job_resource_map.keys(): + job = self._prepare_job_element(job_type) + + kw_job = {'job': job} + + # failure case, only admin can create the job + self.context.is_admin = False + res = self.controller.post(**kw_job) + self._validate_error_code(res, 403) + + self.context.is_admin = True + + # failure case, request body not found + kw_job_1 = {'job_1': job} + res = self.controller.post(**kw_job_1) + self._validate_error_code(res, 400) + + # failure case, wrong job type parameter + job_type_backup = job.pop('type') + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['type'] = '' + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['type'] = job_type_backup + '_1' + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['type'] = job_type_backup + + # failure case, wrong resource parameter + job_resource_backup = job.pop('resource') + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['resource'] = copy.deepcopy(job_resource_backup) + job['resource'].popitem() + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + fake_resource = 'fake_resource' + job['resource'][fake_resource] = fake_resource + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['resource'] = job_resource_backup + + # failure case, wrong project id parameter + project_id_backup = job.pop('project_id') + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['project_id'] = '' + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['project_id'] = uuidutils.generate_uuid() + res = self.controller.post(**kw_job) + self._validate_error_code(res, 400) + + job['project_id'] = project_id_backup + + # successful case, create an entirely new job. Because the job + # status returned from controller has been formatted, so we not + # only validate the database records, but also validate the return + # value of the controller. + job_1 = self.controller.post(**kw_job)['job'] + job_in_db_1 = db_api.get_job(self.context, job_1['id']) + self.assertEqual(job_type, job_in_db_1['type']) + self.assertEqual(job['project_id'], job_in_db_1['project_id']) + self.assertEqual(constants.JS_New, job_in_db_1['status']) + + self.assertEqual('NEW', job_1['status']) + self.assertEqual(len(constants.job_resource_map[job['type']]), + len(job_1['resource'])) + self.assertFalse('resource_id' in job_1) + self.assertFalse('extra_id' in job_1) + db_api.delete_job(self.context, job_1['id']) + + # successful case, target job already exists in the job table + # and its status is NEW, then this newer job will be picked by + # job handler. + job_2 = self.controller.post(**kw_job)['job'] + job_in_db_2 = db_api.get_job(self.context, job_2['id']) + job_3 = self.controller.post(**kw_job)['job'] + job_in_db_3 = db_api.get_job(self.context, job_3['id']) + + self.assertEqual(job_type, job_in_db_2['type']) + self.assertEqual(job['project_id'], job_in_db_2['project_id']) + self.assertEqual(constants.JS_New, job_in_db_2['status']) + + self.assertEqual('NEW', job_2['status']) + self.assertEqual(len(constants.job_resource_map[job['type']]), + len(job_2['resource'])) + self.assertFalse('resource_id' in job_2) + self.assertFalse('extra_id' in job_2) + + self.assertEqual(job_type, job_in_db_3['type']) + self.assertEqual(job['project_id'], job_in_db_3['project_id']) + self.assertEqual(constants.JS_New, job_in_db_3['status']) + + self.assertEqual('NEW', job_3['status']) + self.assertEqual(len(constants.job_resource_map[job['type']]), + len(job_3['resource'])) + self.assertFalse('resource_id' in job_3) + self.assertFalse('extra_id' in job_3) + + db_api.finish_job(self.context, job_3['id'], False, + timeutils.utcnow()) + db_api.delete_job(self.context, job_3['id']) + + @patch.object(pecan, 'response', new=FakeResponse) + @patch.object(context, 'extract_context_from_environ') + def test_get_one(self, mock_context): + mock_context.return_value = self.context + + # failure case, only admin can list the job's info + self.context.is_admin = False + res = self.controller.get_one("schemas") + self._validate_error_code(res, 403) + res = self.controller.get_one("detail") + self._validate_error_code(res, 403) + res = self.controller.get_one(uuidutils.generate_uuid()) + self._validate_error_code(res, 403) + + self.context.is_admin = True + + # failure case, parameter error + res = self.controller.get_one("schemas_1") + self._validate_error_code(res, 404) + + res = self.controller.get_one(uuidutils.generate_uuid()) + self._validate_error_code(res, 404) + + # successful case, set id="schemas" to get job schemas + job_schemas_2 = self.controller.get_one("schemas") + job_schemas_3 = [] + for job_type in self.job_resource_map.keys(): + job = {} + resource = [] + for resource_type, resource_id in self.job_resource_map[job_type]: + resource.append(resource_id) + job['resource'] = resource + job['type'] = job_type + job_schemas_3.append(job) + + self.assertEqual(job_schemas_3, job_schemas_2['schemas']) + + # successful case, set id="detail" to get all jobs. + # first, we need to create jobs in job table. + amount_of_all_jobs = len(self.job_resource_map.keys()) + all_job_ids = {} + index = 0 + for job_type in self.job_resource_map.keys(): + job = self._prepare_job_element(job_type) + + resource_id = '#'.join([job['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type]]) + job_1 = db_api.new_job(self.context, + job['project_id'], job_type, + resource_id) + all_job_ids[index] = job_1['id'] + index = index + 1 + + # validate if the id=job_id, get_one(id=job_id) can take effective + job_2 = self.controller.get_one(job_1['id'])['job'] + self.assertEqual(job_1['type'], job_2['type']) + self.assertEqual(job_1['project_id'], job_2['project_id']) + self.assertEqual("NEW", job_2['status']) + + jobs_1 = self.controller.get_one("detail") + self.assertEqual(amount_of_all_jobs, len(jobs_1['jobs'])) + + # create jobs in job log table, in order to validate + # get_one(id=detail) can also get the jobs from job log + amount_of_succ_jobs = int(len(all_job_ids) / 2) + for i in xrange(amount_of_succ_jobs): + db_api.finish_job(self.context, all_job_ids[i], True, + timeutils.utcnow()) + + jobs_2 = self.controller.get_one("detail") + self.assertEqual(amount_of_all_jobs, len(jobs_2['jobs'])) + + job_status_filter_1 = {'status': 'success'} + jobs_3 = self.controller.get_one("detail", **job_status_filter_1) + self.assertEqual(amount_of_succ_jobs, len(jobs_3['jobs'])) + + job_status_filter_2 = {'status': 'new'} + jobs_4 = self.controller.get_one("detail", **job_status_filter_2) + self.assertEqual(amount_of_all_jobs - amount_of_succ_jobs, + len(jobs_4['jobs'])) + + @patch.object(pecan, 'response', new=FakeResponse) + @patch.object(context, 'extract_context_from_environ') + def test_get_all_jobs(self, mock_context): + mock_context.return_value = self.context + + # map job type to project id for later project id filter validation. + job_project_id_map = {} + amount_of_all_jobs = len(self.job_resource_map.keys()) + amount_of_running_jobs = 3 + count = 1 + + # cover all job types. + for job_type in self.job_resource_map.keys(): + job = self._prepare_job_element(job_type) + + job_project_id_map[job_type] = job['project_id'] + + resource_id = '#'.join([job['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type]]) + if count <= amount_of_running_jobs: + db_api.register_job(self.context, + job['project_id'], job_type, + resource_id) + else: + db_api.new_job(self.context, + job['project_id'], job_type, + resource_id) + count = count + 1 + + # query the jobs with several kinds of filters. + # supported filters: project id, job status, job type. + job_status_filter_1 = {'status': 'new'} + job_status_filter_2 = {'status': 'fail'} + job_status_filter_3 = {'status': 'running'} + invalid_filter = {'status': "new-x"} + unsupported_filter = {'fake_filter': "fake_filter"} + count = 1 + for job_type in self.job_resource_map.keys(): + project_id_filter_1 = {'project_id': job_project_id_map[job_type]} + project_id_filter_2 = {'project_id': uuidutils.generate_uuid()} + + job_type_filter_1 = {'type': job_type} + job_type_filter_2 = {'type': job_type + '_1'} + + # failure case, only admin can list the jobs + self.context.is_admin = False + res = self.controller.get_all() + self._validate_error_code(res, 403) + + self.context.is_admin = True + + # successful case, filter by project id + jobs_project_id_filter_1 = self.controller.get_all( + **project_id_filter_1) + self.assertEqual(1, len(jobs_project_id_filter_1['jobs'])) + + jobs_project_id_filter_2 = self.controller.get_all( + **project_id_filter_2) + self.assertEqual(0, len(jobs_project_id_filter_2['jobs'])) + + # successful case, filter by job type + jobs_job_type_filter_1 = self.controller.get_all( + **job_type_filter_1) + self.assertEqual(1, len(jobs_job_type_filter_1['jobs'])) + + jobs_job_type_filter_2 = self.controller.get_all( + **job_type_filter_2) + self.assertEqual(0, len(jobs_job_type_filter_2['jobs'])) + + # successful case, filter by project id, job status and job type + if count <= amount_of_running_jobs: + all_filters = dict(list(project_id_filter_1.items()) + + list(job_status_filter_3.items()) + + list(job_type_filter_1.items())) + jobs_all_filters = self.controller.get_all(**all_filters) + self.assertEqual(1, len(jobs_all_filters['jobs'])) + else: + all_filters = dict(list(project_id_filter_1.items()) + + list(job_status_filter_1.items()) + + list(job_type_filter_1.items())) + jobs_all_filters = self.controller.get_all(**all_filters) + self.assertEqual(1, len(jobs_all_filters['jobs'])) + + # successful case, contradictory filter + contradict_filters = dict(list(project_id_filter_1.items()) + + list(job_status_filter_2.items()) + + list((job_type_filter_2.items()))) + jobs_contradict_filters = self.controller.get_all( + **contradict_filters) + self.assertEqual(0, len(jobs_contradict_filters['jobs'])) + count = count + 1 + + # failure case, unsupported filter + res = self.controller.get_all(**unsupported_filter) + self._validate_error_code(res, 400) + + # successful case, invalid filter + jobs_invalid_filter = self.controller.get_all(**invalid_filter) + self.assertEqual(0, len(jobs_invalid_filter['jobs'])) + + # successful case, list jobs without filters + jobs_empty_filters = self.controller.get_all() + self.assertEqual(amount_of_all_jobs, len(jobs_empty_filters['jobs'])) + + # successful case, filter by job status + jobs_job_status_filter_1 = self.controller.get_all( + **job_status_filter_1) + self.assertEqual(amount_of_all_jobs - amount_of_running_jobs, + len(jobs_job_status_filter_1['jobs'])) + + jobs_job_status_filter_2 = self.controller.get_all( + **job_status_filter_2) + self.assertEqual(0, len(jobs_job_status_filter_2['jobs'])) + + jobs_job_status_filter_3 = self.controller.get_all( + **job_status_filter_3) + self.assertEqual(amount_of_running_jobs, + len(jobs_job_status_filter_3['jobs'])) + + @patch.object(pecan, 'response', new=FakeResponse) + @patch.object(pecan, 'response', new=mock.Mock) + @patch.object(context, 'extract_context_from_environ') + def test_delete(self, mock_context): + mock_context.return_value = self.context + + # cover all job types. + # each 'for' loop adds one item in job log table, we set count variable + # to record dynamic total job entries in job log table. + count = 1 + for job_type in self.job_resource_map.keys(): + job = self._prepare_job_element(job_type) + + resource_id = '#'.join([job['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type]]) + + # failure case, only admin can delete the job + job_1 = db_api.new_job(self.context, job['project_id'], + job_type, + resource_id) + self.context.is_admin = False + res = self.controller.delete(job_1['id']) + self._validate_error_code(res, 403) + + self.context.is_admin = True + db_api.delete_job(self.context, job_1['id']) + + # failure case, job not found + res = self.controller.delete(-123) + self._validate_error_code(res, 404) + + # failure case, delete a running job + job_2 = db_api.register_job(self.context, + job['project_id'], + job_type, resource_id) + job = db_api.get_job(self.context, job_2['id']) + res = self.controller.delete(job_2['id']) + self._validate_error_code(res, 400) + + # finish the job and delete it + db_api.finish_job(self.context, job_2['id'], False, + timeutils.utcnow()) + db_api.delete_job(self.context, job_2['id']) + + # successful case, delete a successful job. successful job from + # job log can't be deleted, here this successful job is from + # job table. + job_3 = self._prepare_job_element(job_type) + resource_id_3 = '#'.join([job_3['resource'][resource_id_3] + for resource_type_3, resource_id_3 + in self.job_resource_map[job_type]]) + + job_4 = db_api.new_job(self.context, + job_3['project_id'], + job_type, resource_id_3) + + with self.context.session.begin(): + job_dict = {'status': constants.JS_Success, + 'timestamp': timeutils.utcnow(), + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(self.context, models.AsyncJob, + job_4['id'], job_dict) + + job_4_succ = db_api.get_job(self.context, job_4['id']) + self.controller.delete(job_4['id']) + + filters_job_4 = [ + {'key': 'type', 'comparator': 'eq', + 'value': job_4_succ['type']}, + {'key': 'status', 'comparator': 'eq', + 'value': job_4_succ['status']}, + {'key': 'resource_id', 'comparator': 'eq', + 'value': job_4_succ['resource_id']}, + {'key': 'extra_id', 'comparator': 'eq', + 'value': job_4_succ['extra_id']}] + self.assertEqual(0, len(db_api.list_jobs(self.context, + filters_job_4))) + self.assertEqual(count, + len(db_api.list_jobs_from_log(self.context))) + count = count + 1 + + # successful case, delete a new job + job_5 = db_api.new_job(self.context, + job['project_id'], job_type, + resource_id) + self.controller.delete(job_5['id']) + + filters_job_5 = [ + {'key': 'type', 'comparator': 'eq', 'value': job_5['type']}, + {'key': 'status', 'comparator': 'eq', + 'value': job_5['status']}, + {'key': 'resource_id', 'comparator': 'eq', + 'value': job_5['resource_id']}, + {'key': 'extra_id', 'comparator': 'eq', + 'value': job_5['extra_id']}] + self.assertEqual(0, len(db_api.list_jobs(self.context, + filters_job_5))) + + # successful case, delete a failed job + job_6 = db_api.new_job(self.context, + job['project_id'], job_type, + resource_id) + db_api.finish_job(self.context, job_6['id'], False, + timeutils.utcnow()) + job_6_failed = db_api.get_job(self.context, job_6['id']) + self.controller.delete(job_6['id']) + filters_job_6 = [ + {'key': 'type', 'comparator': 'eq', + 'value': job_6_failed['type']}, + {'key': 'status', 'comparator': 'eq', + 'value': job_6_failed['status']}, + {'key': 'resource_id', 'comparator': 'eq', + 'value': job_6_failed['resource_id']}, + {'key': 'extra_id', 'comparator': 'eq', + 'value': job_6_failed['extra_id']}] + self.assertEqual(0, len(db_api.list_jobs(self.context, + filters_job_6))) + + @patch.object(pecan, 'response', new=FakeResponse) + @patch.object(pecan, 'response', new=mock.Mock) + @patch.object(context, 'extract_context_from_environ') + def test_put(self, mock_context): + mock_context.return_value = self.context + + # cover all job types + for job_type in self.job_resource_map.keys(): + job = self._prepare_job_element(job_type) + + resource_id = '#'.join([job['resource'][resource_id] + for resource_type, resource_id + in self.job_resource_map[job_type]]) + + # failure case, only admin can redo the job + job_1 = db_api.new_job(self.context, + job['project_id'], + job_type, resource_id) + self.context.is_admin = False + res = self.controller.put(job_1['id']) + self._validate_error_code(res, 403) + + self.context.is_admin = True + db_api.delete_job(self.context, job_1['id']) + + # failure case, job not found + res = self.controller.put(-123) + self._validate_error_code(res, 404) + + # failure case, redo a running job + job_2 = db_api.register_job(self.context, + job['project_id'], + job_type, resource_id) + res = self.controller.put(job_2['id']) + self._validate_error_code(res, 400) + db_api.finish_job(self.context, job_2['id'], False, + timeutils.utcnow()) + db_api.delete_job(self.context, job_2['id']) + + # failure case, redo a successful job + job_3 = self._prepare_job_element(job_type) + + resource_id_3 = '#'.join([job_3['resource'][resource_id_3] + for resource_type_3, resource_id_3 + in self.job_resource_map[job_type]]) + + job_4 = db_api.new_job(self.context, + job_3['project_id'], + job_type, resource_id_3) + with self.context.session.begin(): + job_dict = {'status': constants.JS_Success, + 'timestamp': timeutils.utcnow(), + 'extra_id': uuidutils.generate_uuid()} + core.update_resource(self.context, models.AsyncJob, + job_4['id'], job_dict) + + res = self.controller.put(job_4['id']) + self._validate_error_code(res, 400) + db_api.finish_job(self.context, job_4['id'], True, + timeutils.utcnow()) + + # successful case, redo a failed job + job_5 = db_api.new_job(self.context, + job['project_id'], + job_type, resource_id) + db_api.finish_job(self.context, job_5['id'], False, + timeutils.utcnow()) + self.controller.put(job_5['id']) + + db_api.delete_job(self.context, job_5['id']) + + # successful case, redo a new job + job_6 = db_api.new_job(self.context, + job['project_id'], + job_type, resource_id) + self.controller.put(job_6['id']) + + db_api.delete_job(self.context, job_6['id']) + + def _prepare_job_element(self, job_type): + # in order to create a job, we need three elements: job type, + # job resource and project id. + job = {} + job['resource'] = {} + job['type'] = job_type + + for resource_type, resource_id in self.job_resource_map[job_type]: + job['resource'][resource_id] = uuidutils.generate_uuid() + + job['project_id'] = self._prepare_project_id_for_job(job) + + return job + + def _prepare_project_id_for_job(self, job): + # prepare the project id for job creation, currently job parameter + # contains job type and job resource information. + job_type = job['type'] + if job_type == constants.JT_SEG_RULE_SETUP: + project_id = job['resource']['project_id'] + else: + project_id = uuidutils.generate_uuid() + pod_id = uuidutils.generate_uuid() + + resource_type, resource_id = ( + constants.job_primary_resource_map[job_type]) + routing = db_api.create_resource_mapping( + self.context, job['resource'][resource_id], + job['resource'][resource_id], pod_id, project_id, + resource_type) + self.assertIsNotNone(routing) + + return project_id + + def _validate_error_code(self, res, code): + self.assertEqual(res[list(res.keys())[0]]['code'], code) + + def tearDown(self): + core.ModelBase.metadata.drop_all(core.get_engine()) diff --git a/tricircle/tests/unit/network/test_central_plugin.py b/tricircle/tests/unit/network/test_central_plugin.py index 3031ab69..ed1336c2 100644 --- a/tricircle/tests/unit/network/test_central_plugin.py +++ b/tricircle/tests/unit/network/test_central_plugin.py @@ -1034,7 +1034,7 @@ class FakeBaseXManager(xmanager.XManager): def __init__(self, fake_plugin): self.clients = {constants.TOP: client.Client()} self.job_handles = { - constants.JT_ROUTER: self.configure_extra_routes, + constants.JT_CONFIGURE_ROUTE: self.configure_route, constants.JT_ROUTER_SETUP: self.setup_bottom_router, constants.JT_PORT_DELETE: self.delete_server_port} self.helper = FakeHelper(fake_plugin) @@ -1070,15 +1070,15 @@ class FakeBaseRPCAPI(object): def __init__(self, fake_plugin): self.xmanager = FakeBaseXManager(fake_plugin) - def configure_extra_routes(self, ctxt, router_id): + def configure_route(self, ctxt, project_id, router_id): pass - def update_network(self, ctxt, network_id, pod_id): + def update_network(self, ctxt, project_id, network_id, pod_id): combine_id = '%s#%s' % (pod_id, network_id) self.xmanager.update_network( ctxt, payload={constants.JT_NETWORK_UPDATE: combine_id}) - def update_subnet(self, ctxt, subnet_id, pod_id): + def update_subnet(self, ctxt, project_id, subnet_id, pod_id): combine_id = '%s#%s' % (pod_id, subnet_id) self.xmanager.update_subnet( ctxt, payload={constants.JT_SUBNET_UPDATE: combine_id}) @@ -1086,7 +1086,7 @@ class FakeBaseRPCAPI(object): def configure_security_group_rules(self, ctxt, project_id): pass - def setup_shadow_ports(self, ctxt, pod_id, net_id): + def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id): pass @@ -1094,18 +1094,18 @@ class FakeRPCAPI(FakeBaseRPCAPI): def __init__(self, fake_plugin): self.xmanager = FakeXManager(fake_plugin) - def setup_bottom_router(self, ctxt, net_id, router_id, pod_id): + def setup_bottom_router(self, ctxt, project_id, net_id, router_id, pod_id): combine_id = '%s#%s#%s' % (pod_id, router_id, net_id) self.xmanager.setup_bottom_router( ctxt, payload={constants.JT_ROUTER_SETUP: combine_id}) - def delete_server_port(self, ctxt, port_id, pod_id): + def delete_server_port(self, ctxt, project_id, port_id, pod_id): pass def configure_security_group_rules(self, ctxt, project_id): pass - def setup_shadow_ports(self, ctxt, pod_id, net_id): + def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id): combine_id = '%s#%s' % (pod_id, net_id) self.xmanager.setup_shadow_ports( ctxt, payload={constants.JT_SHADOW_PORT_SETUP: combine_id}) @@ -1381,7 +1381,8 @@ class PluginTest(unittest.TestCase, port_id='top_id_0', ip_address='10.0.0.1', subnet_id='top_subnet_id', network_id='top_net_id')]}, - {'id': 'top_id_1', 'name': 'top'}, + {'id': 'top_id_1', 'name': 'top', + 'tenant_id': 'project_id'}, {'id': 'top_id_2', 'name': 'top'}, {'id': 'top_id_3', 'name': 'top'}]) BOTTOM1_PORTS.append({'id': 'bottom_id_1', 'name': 'bottom'}) @@ -1518,8 +1519,8 @@ class PluginTest(unittest.TestCase, plugin_calls = [mock.call(neutron_context, t_port_id1), mock.call(neutron_context, t_port_id2)] client_calls = [ - mock.call(tricircle_context, t_port_id1, 'pod_id_1'), - mock.call(tricircle_context, t_port_id2, 'pod_id_1')] + mock.call(tricircle_context, project_id, t_port_id1, 'pod_id_1'), + mock.call(tricircle_context, project_id, t_port_id2, 'pod_id_1')] mock_plugin_method.assert_has_calls(plugin_calls) mock_client_method.assert_has_calls(client_calls) @@ -1880,7 +1881,7 @@ class PluginTest(unittest.TestCase, 'tenant_id': tenant_id, 'mac_address': 'fa:16:3e:cd:76:40', 'binding:vif_type': vif_type, - 'project_id': 'tenant_id', + 'project_id': 'project_id', 'binding:host_id': 'zhiyuan-5', 'status': 'ACTIVE' } @@ -2488,7 +2489,7 @@ class PluginTest(unittest.TestCase, @patch.object(db_base_plugin_common.DbBasePluginCommon, '_make_subnet_dict', new=fake_make_subnet_dict) @patch.object(FakeClient, 'add_gateway_routers') - @patch.object(FakeBaseRPCAPI, 'configure_extra_routes') + @patch.object(FakeBaseRPCAPI, 'configure_route') @patch.object(context, 'get_context_from_neutron_context') def test_add_interface(self, mock_context, mock_rpc, mock_action): self._basic_pod_route_setup() @@ -2509,7 +2510,7 @@ class PluginTest(unittest.TestCase, _, b_router_id = db_api.get_bottom_mappings_by_top_id( t_ctx, t_router_id, constants.RT_ROUTER)[0] - mock_rpc.assert_called_once_with(t_ctx, t_router_id) + mock_rpc.assert_called_once_with(t_ctx, tenant_id, t_router_id) for b_net in BOTTOM1_NETS: if 'provider:segmentation_id' in b_net: self.assertIn(b_net['provider:segmentation_id'], (2000, 2001)) @@ -2715,7 +2716,7 @@ class PluginTest(unittest.TestCase, '_make_subnet_dict', new=fake_make_subnet_dict) @patch.object(FakePlugin, '_get_bridge_network_subnet') @patch.object(FakeClient, 'add_gateway_routers') - @patch.object(FakeBaseRPCAPI, 'configure_extra_routes') + @patch.object(FakeBaseRPCAPI, 'configure_route') @patch.object(context, 'get_context_from_neutron_context') def test_add_interface_for_local_router( self, mock_context, mock_rpc, mock_action, mock_get_bridge_net): @@ -2861,7 +2862,7 @@ class PluginTest(unittest.TestCase, '_allocate_ips_for_port', new=fake_allocate_ips_for_port) @patch.object(db_base_plugin_common.DbBasePluginCommon, '_make_subnet_dict', new=fake_make_subnet_dict) - @patch.object(FakeBaseRPCAPI, 'configure_extra_routes') + @patch.object(FakeBaseRPCAPI, 'configure_route') @patch.object(FakeClient, 'remove_interface_routers') @patch.object(context, 'get_context_from_neutron_context') def test_remove_interface(self, mock_context, mock_remove, mock_rpc): @@ -2891,7 +2892,7 @@ class PluginTest(unittest.TestCase, mock_remove.assert_called_with( t_ctx, b_router_id, {'port_id': b_interface_id}) - mock_rpc.assert_called_with(t_ctx, t_router_id) + mock_rpc.assert_called_with(t_ctx, tenant_id, t_router_id) def _prepare_interface_port(self, t_ctx, t_subnet_id, ip_suffix): t_client = FakeClient() @@ -2923,7 +2924,7 @@ class PluginTest(unittest.TestCase, @patch.object(l3_db.L3_NAT_dbonly_mixin, '_make_router_dict', new=fake_make_router_dict) @patch.object(FakeClient, 'add_gateway_routers') - @patch.object(FakeBaseRPCAPI, 'configure_extra_routes') + @patch.object(FakeBaseRPCAPI, 'configure_route') @patch.object(context, 'get_context_from_neutron_context') def test_east_west_gw_router(self, mock_context, mock_rpc, mock_action): self._basic_pod_route_setup() @@ -3844,7 +3845,8 @@ class PluginTest(unittest.TestCase, t_ctx, b_sd_port1['id'], {'port': { 'binding:profile': {constants.PROFILE_FORCE_UP: 'True'}}}) # asynchronous job in pod_1 is registered - mock_setup.assert_called_once_with(t_ctx, 'pod_id_1', t_net_id) + mock_setup.assert_called_once_with(t_ctx, TEST_TENANT_ID, + 'pod_id_1', t_net_id) @patch.object(directory, 'get_plugin', new=fake_get_plugin) @patch.object(driver.Pool, 'get_instance', new=fake_get_instance) @@ -3852,7 +3854,7 @@ class PluginTest(unittest.TestCase, '_allocate_ips_for_port', new=fake_allocate_ips_for_port) @patch.object(db_base_plugin_common.DbBasePluginCommon, '_make_subnet_dict', new=fake_make_subnet_dict) - @patch.object(FakeBaseRPCAPI, 'configure_extra_routes', new=mock.Mock) + @patch.object(FakeBaseRPCAPI, 'configure_route', new=mock.Mock) @patch.object(FakeBaseRPCAPI, 'setup_shadow_ports') @patch.object(context, 'get_context_from_neutron_context') def test_add_interface_trigger_l2pop(self, mock_context, mock_setup): @@ -3919,8 +3921,10 @@ class PluginTest(unittest.TestCase, self.assertIn(constants.PROFILE_FORCE_UP, shadow_ports[0]['binding:profile']) # asynchronous jobs are registered - calls = [mock.call(t_ctx, 'pod_id_2', shadow_ports[0]['network_id']), - mock.call(t_ctx, 'pod_id_1', shadow_ports[0]['network_id'])] + calls = [mock.call(t_ctx, tenant_id, 'pod_id_2', + shadow_ports[0]['network_id']), + mock.call(t_ctx, tenant_id, 'pod_id_1', + shadow_ports[0]['network_id'])] mock_setup.assert_has_calls(calls) def tearDown(self): diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index 8f72a8e2..661ebaac 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -367,12 +367,15 @@ class XManagerTest(unittest.TestCase): @patch.object(FakeClient, 'update_routers') def test_configure_extra_routes_with_floating_ips(self, mock_update): top_router_id = 'router_id' + project_id = uuidutils.generate_uuid() bridge_infos = self._prepare_east_west_network_test(top_router_id) ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id) self._prepare_dnat_test() - db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) - self.xmanager.configure_extra_routes( - self.context, payload={constants.JT_ROUTER: top_router_id}) + db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE, + top_router_id) + self.xmanager.configure_route( + self.context, + payload={constants.JT_CONFIGURE_ROUTE: top_router_id}) calls = [] ns_routes = [] for i in range(2): @@ -394,11 +397,14 @@ class XManagerTest(unittest.TestCase): @patch.object(FakeClient, 'update_routers') def test_configure_extra_routes_with_external_network(self, mock_update): top_router_id = 'router_id' + project_id = uuidutils.generate_uuid() bridge_infos = self._prepare_east_west_network_test(top_router_id) ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id) - db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) - self.xmanager.configure_extra_routes( - self.context, payload={constants.JT_ROUTER: top_router_id}) + db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE, + top_router_id) + self.xmanager.configure_route( + self.context, + payload={constants.JT_CONFIGURE_ROUTE: top_router_id}) calls = [] ns_routes = [] for i in range(2): @@ -418,12 +424,15 @@ class XManagerTest(unittest.TestCase): self._check_extra_routes_calls(calls, mock_update.call_args_list) @patch.object(FakeClient, 'update_routers') - def test_configure_extra_routes(self, mock_update): + def test_configure_route(self, mock_update): top_router_id = 'router_id' + project_id = uuidutils.generate_uuid() bridge_infos = self._prepare_east_west_network_test(top_router_id) - db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) - self.xmanager.configure_extra_routes( - self.context, payload={constants.JT_ROUTER: top_router_id}) + db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE, + top_router_id) + self.xmanager.configure_route( + self.context, + payload={constants.JT_CONFIGURE_ROUTE: top_router_id}) calls = [] for i in range(2): routes = [] @@ -538,9 +547,12 @@ class XManagerTest(unittest.TestCase): # net3 is attached to R3 target_router_id = 'top_router_3_id' - db_api.new_job(self.context, constants.JT_ROUTER, target_router_id) - self.xmanager.configure_extra_routes( - self.context, payload={constants.JT_ROUTER: target_router_id}) + project_id = uuidutils.generate_uuid() + db_api.new_job(self.context, project_id, + constants.JT_CONFIGURE_ROUTE, target_router_id) + self.xmanager.configure_route( + self.context, + payload={constants.JT_CONFIGURE_ROUTE: target_router_id}) # for the following paths, packets will go to R3 via the interface # which is attached to R3 @@ -653,7 +665,8 @@ class XManagerTest(unittest.TestCase): RES_MAP['top']['subnet'].append(subnet_ipv6) RES_MAP['pod_1']['security_group'].append(sg) - db_api.new_job(self.context, constants.JT_SEG_RULE_SETUP, project_id) + db_api.new_job(self.context, project_id, constants.JT_SEG_RULE_SETUP, + project_id) self.xmanager.configure_security_group_rules( self.context, payload={constants.JT_SEG_RULE_SETUP: project_id}) @@ -727,8 +740,8 @@ class XManagerTest(unittest.TestCase): '192.168.1.102') resource_id = 'pod_id_1#' + net1_id - db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP, - resource_id) + db_api.new_job(self.context, project_id, + constants.JT_SHADOW_PORT_SETUP, resource_id) self.xmanager.setup_shadow_ports( self.context, payload={constants.JT_SHADOW_PORT_SETUP: resource_id}) @@ -745,7 +758,8 @@ class XManagerTest(unittest.TestCase): sd_ports[0]['binding:profile']) # check job to setup shadow ports for pod2 is registered - mock_setup.assert_called_once_with(self.context, 'pod_id_2', net1_id) + mock_setup.assert_called_once_with(self.context, project_id, + 'pod_id_2', net1_id) # update shadow port to down and test again, this is possible when we # succeed to create shadow port but fail to update it to active @@ -755,8 +769,8 @@ class XManagerTest(unittest.TestCase): {'port': {'status': q_constants.PORT_STATUS_DOWN, 'binding:profile': profile}}) - db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP, - resource_id) + db_api.new_job(self.context, project_id, + constants.JT_SHADOW_PORT_SETUP, resource_id) self.xmanager.setup_shadow_ports( self.context, payload={constants.JT_SHADOW_PORT_SETUP: resource_id}) @@ -767,8 +781,8 @@ class XManagerTest(unittest.TestCase): # manually trigger shadow ports setup in pod2 resource_id = 'pod_id_2#' + net1_id - db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP, - resource_id) + db_api.new_job(self.context, project_id, + constants.JT_SHADOW_PORT_SETUP, resource_id) self.xmanager.setup_shadow_ports( self.context, payload={constants.JT_SHADOW_PORT_SETUP: resource_id}) @@ -789,8 +803,9 @@ class XManagerTest(unittest.TestCase): pass fake_id = 'fake_id' + fake_project_id = uuidutils.generate_uuid() payload = {job_type: fake_id} - db_api.new_job(self.context, job_type, fake_id) + db_api.new_job(self.context, fake_project_id, job_type, fake_id) fake_handle(None, self.context, payload=payload) logs = core.query_resource(self.context, models.AsyncJobLog, [], []) @@ -806,8 +821,9 @@ class XManagerTest(unittest.TestCase): raise Exception() fake_id = 'fake_id' + fake_project_id = uuidutils.generate_uuid() payload = {job_type: fake_id} - db_api.new_job(self.context, job_type, fake_id) + db_api.new_job(self.context, fake_project_id, job_type, fake_id) fake_handle(None, self.context, payload=payload) jobs = core.query_resource(self.context, models.AsyncJob, [], []) @@ -828,8 +844,9 @@ class XManagerTest(unittest.TestCase): pass fake_id = uuidutils.generate_uuid() + fake_project_id = uuidutils.generate_uuid() payload = {job_type: fake_id} - db_api.new_job(self.context, job_type, fake_id) + db_api.new_job(self.context, fake_project_id, job_type, fake_id) expired_job = { 'id': uuidutils.generate_uuid(), 'type': job_type, @@ -860,8 +877,9 @@ class XManagerTest(unittest.TestCase): mock_get.return_value = None fake_id = uuidutils.generate_uuid() + fake_project_id = uuidutils.generate_uuid() payload = {job_type: fake_id} - db_api.new_job(self.context, job_type, fake_id) + db_api.new_job(self.context, fake_project_id, job_type, fake_id) fake_handle(None, self.context, payload=payload) # nothing to assert, what we test is that fake_handle can exit when @@ -872,28 +890,28 @@ class XManagerTest(unittest.TestCase): mock_now.return_value = datetime.datetime(2000, 1, 2, 12, 0, 0) job_dict_list = [ {'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0), - 'resource_id': 'uuid1', 'type': 'res1', + 'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1", 'status': constants.JS_Fail}, # job_uuid1 {'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0), - 'resource_id': 'uuid1', 'type': 'res1', + 'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1", 'status': constants.JS_Fail}, # job_uuid3 {'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0), - 'resource_id': 'uuid2', 'type': 'res2', + 'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1", 'status': constants.JS_Fail}, # job_uuid5 {'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0), - 'resource_id': 'uuid2', 'type': 'res2', + 'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1", 'status': constants.JS_Fail}, # job_uuid7 {'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0), - 'resource_id': 'uuid3', 'type': 'res3', + 'resource_id': 'uuid3', 'type': 'res3', 'project_id': "uuid1", 'status': constants.JS_Success}, # job_uuid9 {'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0), - 'resource_id': 'uuid4', 'type': 'res4', + 'resource_id': 'uuid4', 'type': 'res4', 'project_id': "uuid1", 'status': constants.JS_New}, # job_uuid11 {'timestamp': datetime.datetime(1999, 12, 31, 12, 0, 0), - 'resource_id': 'uuid5', 'type': 'res5', + 'resource_id': 'uuid5', 'type': 'res5', 'project_id': "uuid1", 'status': constants.JS_Fail}, # job_uuid13 {'timestamp': datetime.datetime(1999, 12, 31, 11, 59, 59), - 'resource_id': 'uuid6', 'type': 'res6', + 'resource_id': 'uuid6', 'type': 'res6', 'project_id': "uuid1", 'status': constants.JS_Fail}] # job_uuid15 for i, job_dict in enumerate(job_dict_list, 1): job_dict['id'] = 'job_uuid%d' % (2 * i - 1) @@ -907,10 +925,11 @@ class XManagerTest(unittest.TestCase): # for res3 + uuid3, the latest job's status is "Success", not returned # for res6 + uuid6, the latest job is out of the redo time span expected_failed_jobs = [ - {'resource_id': 'uuid1', 'type': 'res1'}, - {'resource_id': 'uuid2', 'type': 'res2'}, - {'resource_id': 'uuid5', 'type': 'res5'}] - expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4'}] + {'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1"}, + {'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1"}, + {'resource_id': 'uuid5', 'type': 'res5', 'project_id': "uuid1"}] + expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4', + 'project_id': "uuid1"}] (failed_jobs, new_jobs) = db_api.get_latest_failed_or_new_jobs(self.context) six.assertCountEqual(self, expected_failed_jobs, failed_jobs) diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index 841e4d10..62c06e1d 100644 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -67,15 +67,16 @@ def _job_handle(job_type): if delta.seconds >= CONF.worker_handle_timeout: # quit when this handle is running for a long time break - time_new = db_api.get_latest_timestamp(ctx, constants.JS_New, - job_type, resource_id) - if not time_new: + job_new = db_api.get_latest_job( + ctx, constants.JS_New, job_type, resource_id) + if not job_new: break - time_success = db_api.get_latest_timestamp( + job_succ = db_api.get_latest_job( ctx, constants.JS_Success, job_type, resource_id) - if time_success and time_success >= time_new: + if job_succ and job_succ['timestamp'] >= job_new['timestamp']: break - job = db_api.register_job(ctx, job_type, resource_id) + job = db_api.register_job(ctx, job_new['project_id'], job_type, + resource_id) if not job: # fail to obtain the lock, let other worker handle the job running_job = db_api.get_running_job(ctx, job_type, @@ -94,7 +95,7 @@ def _job_handle(job_type): # previous running job expires, we set its status to # fail and try again to obtain the lock db_api.finish_job(ctx, running_job['id'], False, - time_new) + job_new['timestamp']) LOG.warning('Job %(job)s of type %(job_type)s for ' 'resource %(resource)s expires, set ' 'its state to Fail', @@ -111,14 +112,15 @@ def _job_handle(job_type): try: func(*args, **kwargs) except Exception: - db_api.finish_job(ctx, job['id'], False, time_new) + db_api.finish_job(ctx, job['id'], False, + job_new['timestamp']) LOG.error('Job %(job)s of type %(job_type)s for ' 'resource %(resource)s fails', {'job': job['id'], 'job_type': job_type, 'resource': resource_id}) break - db_api.finish_job(ctx, job['id'], True, time_new) + db_api.finish_job(ctx, job['id'], True, job_new['timestamp']) eventlet.sleep(CONF.worker_sleep_time) return handle_args return handle_func @@ -145,7 +147,7 @@ class XManager(PeriodicTasks): self.additional_endpoints = [] self.clients = {constants.TOP: client.Client()} self.job_handles = { - constants.JT_ROUTER: self.configure_extra_routes, + constants.JT_CONFIGURE_ROUTE: self.configure_route, constants.JT_ROUTER_SETUP: self.setup_bottom_router, constants.JT_PORT_DELETE: self.delete_server_port, constants.JT_SEG_RULE_SETUP: self.configure_security_group_rules, @@ -251,13 +253,14 @@ class XManager(PeriodicTasks): job_index = random.randint(0, len(jobs) - 1) job_type = jobs[job_index]['type'] resource_id = jobs[job_index]['resource_id'] + project_id = jobs[job_index]['project_id'] payload = {job_type: resource_id} LOG.debug('Redo %(status)s job for %(resource_id)s of type ' '%(job_type)s', {'status': 'new' if is_new_job else 'failed', 'resource_id': resource_id, 'job_type': job_type}) if not is_new_job: - db_api.new_job(ctx, job_type, resource_id) + db_api.new_job(ctx, project_id, job_type, resource_id) self.job_handles[job_type](ctx, payload=payload) @staticmethod @@ -512,6 +515,15 @@ class XManager(PeriodicTasks): (b_pod_id, t_router_id, t_net_id) = payload[constants.JT_ROUTER_SETUP].split('#') + t_client = self._get_client() + t_pod = db_api.get_top_pod(ctx) + t_router = t_client.get_routers(ctx, t_router_id) + + if not t_router: + # we just end this job if top router no longer exists + return + + project_id = t_router['tenant_id'] if b_pod_id == constants.POD_NOT_SPECIFIED: mappings = db_api.get_bottom_mappings_by_top_id( ctx, t_net_id, constants.RT_NETWORK) @@ -520,15 +532,9 @@ class XManager(PeriodicTasks): # NOTE(zhiyuan) we create one job for each pod to avoid # conflict caused by different workers operating the same pod self.xjob_handler.setup_bottom_router( - ctx, t_net_id, t_router_id, b_pod['pod_id']) + ctx, project_id, t_net_id, t_router_id, b_pod['pod_id']) return - t_client = self._get_client() - t_pod = db_api.get_top_pod(ctx) - t_router = t_client.get_routers(ctx, t_router_id) - if not t_router: - # we just end this job if top router no longer exists - return t_net = t_client.get_networks(ctx, t_net_id) if not t_net: # we just end this job if top network no longer exists @@ -566,11 +572,12 @@ class XManager(PeriodicTasks): ctx, t_pod, b_pod, t_client, t_net, t_router, t_bridge_net, t_bridge_subnet, is_ext_net_pod) if not is_local_router: - self.xjob_handler.configure_extra_routes(ctx, t_router_id) + self.xjob_handler.configure_route(ctx, project_id, + t_router_id) - @_job_handle(constants.JT_ROUTER) - def configure_extra_routes(self, ctx, payload): - t_router_id = payload[constants.JT_ROUTER] + @_job_handle(constants.JT_CONFIGURE_ROUTE) + def configure_route(self, ctx, payload): + t_router_id = payload[constants.JT_CONFIGURE_ROUTE] t_client = self._get_client() t_router = t_client.get_routers(ctx, t_router_id) if not t_router: @@ -867,19 +874,22 @@ class XManager(PeriodicTasks): """ (b_pod_id, t_network_id) = payload[ constants.JT_NETWORK_UPDATE].split('#') - if b_pod_id == constants.POD_NOT_SPECIFIED: - mappings = db_api.get_bottom_mappings_by_top_id( - ctx, t_network_id, constants.RT_NETWORK) - b_pods = [mapping[0] for mapping in mappings] - for b_pod in b_pods: - self.xjob_handler.update_network(ctx, t_network_id, - b_pod['pod_id']) - return t_client = self._get_client() t_network = t_client.get_networks(ctx, t_network_id) if not t_network: return + + project_id = t_network['tenant_id'] + if b_pod_id == constants.POD_NOT_SPECIFIED: + mappings = db_api.get_bottom_mappings_by_top_id( + ctx, t_network_id, constants.RT_NETWORK) + b_pods = [mapping[0] for mapping in mappings] + for b_pod in b_pods: + self.xjob_handler.update_network(ctx, project_id, + t_network_id, b_pod['pod_id']) + return + b_pod = db_api.get_pod(ctx, b_pod_id) b_region_name = b_pod['region_name'] b_client = self._get_client(region_name=b_region_name) @@ -918,19 +928,22 @@ class XManager(PeriodicTasks): """ (b_pod_id, t_subnet_id) = payload[ constants.JT_SUBNET_UPDATE].split('#') - if b_pod_id == constants.POD_NOT_SPECIFIED: - mappings = db_api.get_bottom_mappings_by_top_id( - ctx, t_subnet_id, constants.RT_SUBNET) - b_pods = [mapping[0] for mapping in mappings] - for b_pod in b_pods: - self.xjob_handler.update_subnet(ctx, t_subnet_id, - b_pod['pod_id']) - return t_client = self._get_client() t_subnet = t_client.get_subnets(ctx, t_subnet_id) if not t_subnet: return + + project_id = t_subnet['tenant_id'] + if b_pod_id == constants.POD_NOT_SPECIFIED: + mappings = db_api.get_bottom_mappings_by_top_id( + ctx, t_subnet_id, constants.RT_SUBNET) + b_pods = [mapping[0] for mapping in mappings] + for b_pod in b_pods: + self.xjob_handler.update_subnet(ctx, project_id, + t_subnet_id, b_pod['pod_id']) + return + b_pod = db_api.get_pod(ctx, b_pod_id) b_region_name = b_pod['region_name'] b_subnet_id = db_api.get_bottom_id_by_top_id_region_name( @@ -1097,4 +1110,5 @@ class XManager(PeriodicTasks): ctx, sw_port_id, update_body) for pod_id in sync_pod_list: - self.xjob_handler.setup_shadow_ports(ctx, pod_id, t_net_id) + self.xjob_handler.setup_shadow_ports(ctx, project_id, + pod_id, t_net_id)