Implement asynchronous job Admin API

1. What is the problem
When XJob receives a job message from service, it will register
the job in database and handle it asynchronously. Tricircle
needs to provide API for admin to query the job status and trigger
failed job if something happens unexpectedly. The detailed work
for XJob Admin APIs is covered in the document[1].

2. What is the solution for the problem
We implement XJob management APIs, they are listed as following:
 *(1) create a job
 *(2) list single job info
 *(3) list all jobs
 *(4) list jobs with filters
 *(5) list all jobs' schemas
 *(6) delete a job
 *(7) redo a job

3. What the features need to be implemented to the Tricircle to
realize the solution
Implement above job operations.

[1] https://review.openstack.org/#/c/438304/

Change-Id: Ibd90e539c9360a0ad7a01eeef185c0dbbee9bb4e
This commit is contained in:
southeast02 2017-03-15 10:21:12 +08:00 committed by Dongfeng Huang
parent 0d83bdca0a
commit 5fe9c5b444
16 changed files with 2227 additions and 153 deletions

View File

@ -59,10 +59,77 @@ listed as following:
Normal Response Code: 202
* Get job(s)
* Get a job
Retrieve a job from the Tricircle database.
The detailed information of the job will be shown. Otherwise
it will return "Resource not found" exception.
List Request::
GET /v1.0/jobs/3f4ecf30-0213-4f1f-9cb0-0233bcedb767
Response:
{
"job": {
"id": "3f4ecf30-0213-4f1f-9cb0-0233bcedb767",
"project_id": "d01246bc5792477d9062a76332b7514a",
"type": "port_delete",
"timestamp": "2017-03-03 11:05:36",
"status": "NEW",
"resource": {
"pod_id": "0eb59465-5132-4f57-af01-a9e306158b86",
"port_id": "8498b903-9e18-4265-8d62-3c12e0ce4314"
}
}
}
Normal Response Code: 200
* Get all jobs
Retrieve all of the jobs from the Tricircle database.
List Request::
GET /v1.0/jobs/detail
Response:
{
"jobs":
[
{
"id": "3f4ecf30-0213-4f1f-9cb0-0233bcedb767",
"project_id": "d01246bc5792477d9062a76332b7514a",
"type": "port_delete",
"timestamp": "2017-03-03 11:05:36",
"status": "NEW",
"resource": {
"pod_id": "0eb59465-5132-4f57-af01-a9e306158b86",
"port_id": "8498b903-9e18-4265-8d62-3c12e0ce4314"
}
},
{
"id": "b01fe514-5211-4758-bbd1-9f32141a7ac2",
"project_id": "d01246bc5792477d9062a76332b7514a",
"type": "seg_rule_setup",
"timestamp": "2017-03-01 17:14:44",
"status": "FAIL",
"resource": {
"project_id": "d01246bc5792477d9062a76332b7514a"
}
}
]
}
Normal Response Code: 200
* Get all jobs with filter(s)
Retrieve job(s) from the Tricircle database. We can filter them by
project ID, job type and job status.
project ID, job type and job status. If no filter is provided,
GET /v1.0/jobs will return all jobs.
The response contains a list of jobs. Using filters, a subset of jobs
will be returned.
@ -117,7 +184,7 @@ listed as following:
"schemas":
[
{
"type": "router",
"type": "configure_route",
"resource": ["router_id"]
},
{
@ -153,7 +220,7 @@ listed as following:
* Delete a job
Delete a failed or duplicated job from the Tricircle database.
Nothing will be returned for this request if succeeds, otherwise an
A pair of curly braces will be returned if succeeds, otherwise an
exception will be thrown. What's more, we can list all jobs to verify
whether it is deleted successfully or not.

View File

@ -0,0 +1,371 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pecan
from pecan import expose
from pecan import rest
import six
from oslo_log import log as logging
from oslo_utils import timeutils
from tricircle.common import constants
import tricircle.common.context as t_context
import tricircle.common.exceptions as t_exc
from tricircle.common.i18n import _
from tricircle.common import policy
from tricircle.common import utils
from tricircle.common import xrpcapi
from tricircle.db import api as db_api
LOG = logging.getLogger(__name__)
class AsyncJobController(rest.RestController):
# with AsyncJobController, admin can create, show, delete and
# redo asynchronous jobs
def __init__(self):
self.xjob_handler = xrpcapi.XJobAPI()
@expose(generic=True, template='json')
def post(self, **kw):
context = t_context.extract_context_from_environ()
job_resource_map = constants.job_resource_map
if not policy.enforce(context, policy.ADMIN_API_JOB_CREATE):
return utils.format_api_error(
403, _("Unauthorized to create a job"))
if 'job' not in kw:
return utils.format_api_error(
400, _("Request body not found"))
job = kw['job']
for field in ('type', 'project_id'):
value = job.get(field)
if value is None:
return utils.format_api_error(
400, _("%(field)s isn't provided in request body") % {
'field': field})
elif len(value.strip()) == 0:
return utils.format_api_error(
400, _("%(field)s can't be empty") % {'field': field})
if job['type'] not in job_resource_map.keys():
return utils.format_api_error(
400, _('There is no such job type: %(job_type)s') % {
'job_type': job['type']})
job_type = job['type']
project_id = job['project_id']
if 'resource' not in job:
return utils.format_api_error(
400, _('Failed to create job, because the resource is not'
' specified'))
# verify that all given resources are exactly needed
request_fields = set(job['resource'].keys())
require_fields = set([resource_id
for resource_type, resource_id in
job_resource_map[job_type]])
missing_fields = require_fields - request_fields
redundant_fields = request_fields - require_fields
if missing_fields:
return utils.format_api_error(
400, _('Some required fields are not specified:'
' %(field)s') % {'field': missing_fields})
if redundant_fields:
return utils.format_api_error(
400, _('Some fields are redundant: %(field)s') % {
'field': redundant_fields})
# validate whether the project id is legal
resource_type_1, resource_id_1 = (
constants.job_primary_resource_map[job_type])
if resource_type_1 is not None:
filter = [{'key': 'project_id', 'comparator': 'eq',
'value': project_id},
{'key': 'resource_type', 'comparator': 'eq',
'value': resource_type_1},
{'key': 'top_id', 'comparator': 'eq',
'value': job['resource'][resource_id_1]}]
routings = db_api.list_resource_routings(context, filter)
if not routings:
msg = (_("%(resource)s %(resource_id)s doesn't belong to the"
" project %(project_id)s") %
{'resource': resource_type_1,
'resource_id': job['resource'][resource_id_1],
'project_id': project_id})
return utils.format_api_error(400, msg)
# if job_type = seg_rule_setup, we should ensure the project id
# is equal to the one from resource.
if job_type == constants.JT_SEG_RULE_SETUP:
if job['project_id'] != job['resource']['project_id']:
msg = (_("Specified project_id %(project_id_1)s and resource's"
" project_id %(project_id_2)s are different") %
{'project_id_1': job['project_id'],
'project_id_2': job['resource']['project_id']})
return utils.format_api_error(400, msg)
# combine uuid into target resource id
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in job_resource_map[job_type]])
try:
# create a job and put it into execution immediately
self.xjob_handler.invoke_method(context, project_id,
constants.job_handles[job_type],
job_type, resource_id)
except Exception as e:
LOG.exception('Failed to create job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to create a job'))
new_job = db_api.get_latest_job(context, constants.JS_New, job_type,
resource_id)
return {'job': self._get_more_readable_job(new_job)}
@expose(generic=True, template='json')
def get_one(self, id, **kwargs):
"""the return value may vary according to the value of id
:param id: 1) if id = 'schemas', return job schemas
2) if id = 'detail', return all jobs
3) if id = $job_id, return detailed single job info
:return: return value is decided by id parameter
"""
context = t_context.extract_context_from_environ()
job_resource_map = constants.job_resource_map
if not policy.enforce(context, policy.ADMIN_API_JOB_SCHEMA_LIST):
return utils.format_api_error(
403, _('Unauthorized to show job information'))
if id == 'schemas':
job_schemas = []
for job_type in job_resource_map.keys():
job = {}
resource = []
for resource_type, resource_id in job_resource_map[job_type]:
resource.append(resource_id)
job['resource'] = resource
job['type'] = job_type
job_schemas.append(job)
return {'schemas': job_schemas}
if id == 'detail':
return self.get_all(**kwargs)
try:
job = db_api.get_job(context, id)
return {'job': self._get_more_readable_job(job)}
except Exception:
try:
job = db_api.get_job_from_log(context, id)
return {'job': self._get_more_readable_job(job)}
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Resource not found'))
@expose(generic=True, template='json')
def get_all(self, **kwargs):
"""Get all the jobs. Using filters, only get a subset of jobs.
:param kwargs: job filters
:return: a list of jobs
"""
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_LIST):
return utils.format_api_error(
403, _('Unauthorized to show all jobs'))
is_valid_filter, filters = self._get_filters(kwargs)
if not is_valid_filter:
msg = (_('Unsupported filter type: %(filter)s') %
{'filter': [filter_name for filter_name in filters]})
return utils.format_api_error(400, msg)
filters = [{'key': key,
'comparator': 'eq',
'value': value} for key, value in six.iteritems(filters)]
try:
jobs_in_job_table = db_api.list_jobs(context, filters)
jobs_in_job_log_table = db_api.list_jobs_from_log(context, filters)
jobs = jobs_in_job_table + jobs_in_job_log_table
return {'jobs': [self._get_more_readable_job(job) for job in jobs]}
except Exception as e:
LOG.exception('Failed to show all asynchronous jobs: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to show all asynchronous jobs'))
# make the job status and resource id more human readable. Split
# resource id into several member uuid(s) to provide more detailed resource
# information. If job entry is from job table, then remove resource id
# and extra id from job attributes. If job entry is from job log table,
# only remove resource id from job attributes.
def _get_more_readable_job(self, job):
job_resource_map = constants.job_resource_map
if 'status' in job:
job['status'] = constants.job_status_map[job['status']]
else:
job['status'] = constants.job_status_map[constants.JS_Success]
job['resource'] = dict(zip([resource_id
for resource_type, resource_id
in job_resource_map[job['type']]],
job['resource_id'].split('#')))
job.pop('resource_id')
if "extra_id" in job:
job.pop('extra_id')
return job
def _get_filters(self, params):
"""Return a dictionary of query param filters from the request.
:param params: the URI params coming from the wsgi layer
:return (flag, filters), flag indicates whether the filters are valid,
and the filters denote a list of key-value pairs.
"""
filters = {}
unsupported_filters = {}
for filter_name in params:
if filter_name in constants.JOB_LIST_SUPPORTED_FILTERS:
# map filter name
if filter_name == 'status':
job_status_in_db = self._get_job_status_in_db(
params.get(filter_name))
filters[filter_name] = job_status_in_db
continue
filters[filter_name] = params.get(filter_name)
else:
unsupported_filters[filter_name] = params.get(filter_name)
if unsupported_filters:
return False, unsupported_filters
return True, filters
# map user input job status to job status stored in database
def _get_job_status_in_db(self, job_status):
job_status_map = {
'fail': constants.JS_Fail,
'success': constants.JS_Success,
'running': constants.JS_Running,
'new': constants.JS_New
}
job_status_lower = job_status.lower()
if job_status_lower in job_status_map:
return job_status_map[job_status_lower]
return job_status
@expose(generic=True, template='json')
def delete(self, job_id):
# delete a job from the database. If the job is running, the delete
# operation will fail. In other cases, job will be deleted directly.
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_DELETE):
return utils.format_api_error(
403, _('Unauthorized to delete a job'))
try:
db_api.get_job_from_log(context, job_id)
return utils.format_api_error(
400, _('Job %(job_id)s is from job log') % {'job_id': job_id})
except Exception:
try:
job = db_api.get_job(context, job_id)
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Job %(job_id)s not found') % {'job_id': job_id})
try:
# if job status = RUNNING, notify user this new one, delete
# operation fails.
if job['status'] == constants.JS_Running:
return utils.format_api_error(
400, (_('Failed to delete the running job %(job_id)s') %
{"job_id": job_id}))
# if job status = SUCCESS, move the job entry to job log table,
# then delete it from job table.
elif job['status'] == constants.JS_Success:
db_api.finish_job(context, job_id, True, timeutils.utcnow())
pecan.response.status = 200
return {}
db_api.delete_job(context, job_id)
pecan.response.status = 200
return {}
except Exception as e:
LOG.exception('Failed to delete the job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to delete the job'))
@expose(generic=True, template='json')
def put(self, job_id):
# we use HTTP/HTTPS PUT method to redo a job. Regularly PUT method
# requires a request body, but considering the job redo operation
# doesn't need more information other than job id, we will issue
# this request without a request body.
context = t_context.extract_context_from_environ()
if not policy.enforce(context, policy.ADMIN_API_JOB_REDO):
return utils.format_api_error(
403, _('Unauthorized to redo a job'))
try:
db_api.get_job_from_log(context, job_id)
return utils.format_api_error(
400, _('Job %(job_id)s is from job log') % {'job_id': job_id})
except Exception:
try:
job = db_api.get_job(context, job_id)
except t_exc.ResourceNotFound:
return utils.format_api_error(
404, _('Job %(job_id)s not found') % {'job_id': job_id})
try:
# if status = RUNNING, notify user this new one and then exit
if job['status'] == constants.JS_Running:
return utils.format_api_error(
400, (_("Can't redo job %(job_id)s which is running") %
{'job_id': job['id']}))
# if status = SUCCESS, notify user this new one and then exit
elif job['status'] == constants.JS_Success:
msg = (_("Can't redo job %(job_id)s which had run successfully"
) % {'job_id': job['id']})
return utils.format_api_error(400, msg)
# if job status = FAIL or job status = NEW, redo it immediately
self.xjob_handler.invoke_method(context, job['project_id'],
constants.job_handles[job['type']],
job['type'], job['resource_id'])
except Exception as e:
LOG.exception('Failed to redo the job: '
'%(exception)s ', {'exception': e})
return utils.format_api_error(
500, _('Failed to redo the job'))

View File

@ -17,6 +17,7 @@
import pecan
from pecan import request
from tricircle.api.controllers import job
from tricircle.api.controllers import pod
from tricircle.api.controllers import routing
import tricircle.common.context as t_context
@ -74,7 +75,8 @@ class V1Controller(object):
self.sub_controllers = {
"pods": pod.PodsController(),
"routings": routing.RoutingController()
"routings": routing.RoutingController(),
"jobs": job.AsyncJobController()
}
for name, ctrl in self.sub_controllers.items():

View File

@ -90,7 +90,7 @@ PROFILE_FORCE_UP = 'force_up'
DEVICE_OWNER_SHADOW = 'compute:shadow'
# job type
JT_ROUTER = 'router'
JT_CONFIGURE_ROUTE = 'configure_route'
JT_ROUTER_SETUP = 'router_setup'
JT_PORT_DELETE = 'port_delete'
JT_SEG_RULE_SETUP = 'seg_rule_setup'
@ -108,3 +108,58 @@ NT_FLAT = 'flat'
NM_P2P = 'p2p'
NM_L2GW = 'l2gw'
NM_NOOP = 'noop'
# map job type to its resource, each resource is denoted by
# (resource_type, resource_id), for the field necessary
# to run the job but resides outside of job resource, we
# denote its type by "None"
job_resource_map = {
JT_CONFIGURE_ROUTE: [(RT_ROUTER, "router_id")],
JT_ROUTER_SETUP: [(None, "pod_id"),
(RT_ROUTER, "router_id"),
(RT_NETWORK, "network_id")],
JT_PORT_DELETE: [(None, "pod_id"),
(RT_PORT, "port_id")],
JT_SEG_RULE_SETUP: [(None, "project_id")],
JT_NETWORK_UPDATE: [(None, "pod_id"),
(RT_NETWORK, "network_id")],
JT_SUBNET_UPDATE: [(None, "pod_id"),
(RT_SUBNET, "subnet_id")],
JT_SHADOW_PORT_SETUP: [(None, "pod_id"),
(RT_NETWORK, "network_id")]
}
# map raw job status to more human readable job status
job_status_map = {
JS_Fail: 'FAIL',
JS_Success: 'SUCCESS',
JS_Running: 'RUNNING',
JS_New: 'NEW'
}
# filter jobs according to the job's attributes
JOB_LIST_SUPPORTED_FILTERS = ['project_id', 'type', 'status']
# map job type to corresponding job handler
job_handles = {
JT_CONFIGURE_ROUTE: "configure_route",
JT_ROUTER_SETUP: "setup_bottom_router",
JT_PORT_DELETE: "delete_server_port",
JT_SEG_RULE_SETUP: "configure_security_group_rules",
JT_NETWORK_UPDATE: "update_network",
JT_SUBNET_UPDATE: "update_subnet",
JT_SHADOW_PORT_SETUP: "setup_shadow_ports"
}
# map job type to its primary resource and then we only validate the project_id
# of that resource. For JT_SEG_RULE_SETUP, as it has only one project_id
# parameter, there is no need to validate it.
job_primary_resource_map = {
JT_CONFIGURE_ROUTE: (RT_ROUTER, "router_id"),
JT_ROUTER_SETUP: (RT_ROUTER, "router_id"),
JT_PORT_DELETE: (RT_PORT, "port_id"),
JT_SEG_RULE_SETUP: (None, "project_id"),
JT_NETWORK_UPDATE: (RT_NETWORK, "network_id"),
JT_SUBNET_UPDATE: (RT_SUBNET, "subnet_id"),
JT_SHADOW_PORT_SETUP: (RT_NETWORK, "network_id")
}

View File

@ -57,6 +57,13 @@ ADMIN_API_ROUTINGS_PUT = 'admin_api:routings:put'
ADMIN_API_ROUTINGS_SHOW = 'admin_api:routings:show'
ADMIN_API_ROUTINGS_LIST = 'admin_api:routings:list'
ADMIN_API_JOB_CREATE = 'admin_api:jobs:create'
ADMIN_API_JOB_LIST = 'admin_api:jobs:list'
ADMIN_API_JOB_SCHEMA_LIST = 'admin_api:jobs:schema_list'
ADMIN_API_JOB_REDO = 'admin_api:jobs:redo'
ADMIN_API_JOB_DELETE = 'admin_api:jobs:delete'
tricircle_admin_api_policies = [
policy.RuleDefault(ADMIN_API_PODS_CREATE,
'rule:admin_api',
@ -86,6 +93,22 @@ tricircle_admin_api_policies = [
policy.RuleDefault(ADMIN_API_ROUTINGS_LIST,
'rule:admin_api',
description='List resource routings'),
policy.RuleDefault(ADMIN_API_JOB_CREATE,
'rule:admin_api',
description='Create job'),
policy.RuleDefault(ADMIN_API_JOB_LIST,
'rule:admin_api',
description='List jobs'),
policy.RuleDefault(ADMIN_API_JOB_SCHEMA_LIST,
'rule:admin_api',
description='List job schemas'),
policy.RuleDefault(ADMIN_API_JOB_REDO,
'rule:admin_api',
description='Redo job'),
policy.RuleDefault(ADMIN_API_JOB_DELETE,
'rule:admin_api',
description='Delete job')
]

View File

@ -69,45 +69,55 @@ class XJobAPI(object):
version_cap = 1.0
return version_cap
def _invoke_method(self, ctxt, method, _type, id):
db_api.new_job(ctxt, _type, id)
def invoke_method(self, ctxt, project_id, method, _type, id):
db_api.new_job(ctxt, project_id, _type, id)
self.client.prepare(exchange='openstack').cast(
ctxt, method, payload={_type: id})
def setup_bottom_router(self, ctxt, net_id, router_id, pod_id):
self._invoke_method(
ctxt, 'setup_bottom_router', constants.JT_ROUTER_SETUP,
def setup_bottom_router(self, ctxt, project_id, net_id, router_id, pod_id):
self.invoke_method(
ctxt, project_id, constants.job_handles[constants.JT_ROUTER_SETUP],
constants.JT_ROUTER_SETUP,
'%s#%s#%s' % (pod_id, router_id, net_id))
def configure_extra_routes(self, ctxt, router_id):
def configure_route(self, ctxt, project_id, router_id):
# NOTE(zhiyuan) this RPC is called by plugin in Neutron server, whose
# control exchange is "neutron", however, we starts xjob without
# specifying its control exchange, so the default value "openstack" is
# used, thus we need to pass exchange as "openstack" here.
self._invoke_method(
ctxt, 'configure_extra_routes', constants.JT_ROUTER, router_id)
self.invoke_method(
ctxt, project_id,
constants.job_handles[constants.JT_CONFIGURE_ROUTE],
constants.JT_CONFIGURE_ROUTE, router_id)
def delete_server_port(self, ctxt, port_id, pod_id):
self._invoke_method(
ctxt, 'delete_server_port', constants.JT_PORT_DELETE,
def delete_server_port(self, ctxt, project_id, port_id, pod_id):
self.invoke_method(
ctxt, project_id, constants.job_handles[constants.JT_PORT_DELETE],
constants.JT_PORT_DELETE,
'%s#%s' % (pod_id, port_id))
def configure_security_group_rules(self, ctxt, project_id):
self._invoke_method(
ctxt, 'configure_security_group_rules',
self.invoke_method(
ctxt, project_id,
constants.job_handles[constants.JT_SEG_RULE_SETUP],
constants.JT_SEG_RULE_SETUP, project_id)
def update_network(self, ctxt, network_id, pod_id):
self._invoke_method(
ctxt, 'update_network', constants.JT_NETWORK_UPDATE,
def update_network(self, ctxt, project_id, network_id, pod_id):
self.invoke_method(
ctxt, project_id,
constants.job_handles[constants.JT_NETWORK_UPDATE],
constants.JT_NETWORK_UPDATE,
'%s#%s' % (pod_id, network_id))
def update_subnet(self, ctxt, subnet_id, pod_id):
self._invoke_method(
ctxt, 'update_subnet', constants.JT_SUBNET_UPDATE,
def update_subnet(self, ctxt, project_id, subnet_id, pod_id):
self.invoke_method(
ctxt, project_id,
constants.job_handles[constants.JT_SUBNET_UPDATE],
constants.JT_SUBNET_UPDATE,
'%s#%s' % (pod_id, subnet_id))
def setup_shadow_ports(self, ctxt, pod_id, net_id):
self._invoke_method(
ctxt, 'setup_shadow_ports', constants.JT_SHADOW_PORT_SETUP,
'%s#%s' % (pod_id, net_id))
def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id):
self.invoke_method(
ctxt, project_id,
constants.job_handles[constants.JT_SHADOW_PORT_SETUP],
constants.JT_SHADOW_PORT_SETUP, '%s#%s' % (pod_id, net_id))

View File

@ -337,11 +337,12 @@ def find_pod_by_az_or_region(context, az_or_region):
reason='Multiple pods with the same az_name are found')
def new_job(context, _type, resource_id):
def new_job(context, project_id, _type, resource_id):
with context.session.begin():
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_New,
'project_id': project_id,
'resource_id': resource_id,
'extra_id': uuidutils.generate_uuid()}
job = core.create_resource(context,
@ -349,12 +350,13 @@ def new_job(context, _type, resource_id):
return job
def register_job(context, _type, resource_id):
def register_job(context, project_id, _type, resource_id):
try:
context.session.begin()
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_Running,
'project_id': project_id,
'resource_id': resource_id,
'extra_id': constants.SP_EXTRA_ID}
job = core.create_resource(context,
@ -392,22 +394,68 @@ def get_latest_failed_or_new_jobs(context):
# sort sequence is "0_Fail", "1_Success", "2_Running", "3_New"
query = context.session.query(models.AsyncJob.type,
models.AsyncJob.resource_id,
models.AsyncJob.project_id,
sql.func.min(models.AsyncJob.status)).join(
stmt, sql.and_(models.AsyncJob.type == stmt.c.type,
models.AsyncJob.resource_id == stmt.c.resource_id,
models.AsyncJob.timestamp == stmt.c.timestamp))
query = query.group_by(models.AsyncJob.type,
query = query.group_by(models.AsyncJob.project_id,
models.AsyncJob.type,
models.AsyncJob.resource_id)
for job_type, resource_id, status in query:
for job_type, resource_id, project_id, status in query:
if status == constants.JS_Fail:
failed_jobs.append({'type': job_type, 'resource_id': resource_id})
failed_jobs.append({'type': job_type, 'resource_id': resource_id,
'project_id': project_id})
elif status == constants.JS_New:
new_jobs.append({'type': job_type, 'resource_id': resource_id})
new_jobs.append({'type': job_type, 'resource_id': resource_id,
'project_id': project_id})
return failed_jobs, new_jobs
def get_latest_timestamp(context, status, _type, resource_id):
def list_jobs(context, filters=None, sorts=None):
with context.session.begin():
# get all jobs from job table
jobs = core.query_resource(context, models.AsyncJob,
filters or [], sorts or [])
return jobs
def list_jobs_from_log(context, filters=None, sorts=None):
with context.session.begin():
# get all jobs from job log table, because the job log table only
# stores successful jobs, so this method merely returns successful jobs
if filters is not None:
for filter in filters:
if filter.get('key') == 'status':
job_status = filter['value']
# job entry in job log table has no status attribute.
if job_status == constants.JS_Success:
filters.remove(filter)
else:
return []
jobs_in_log = core.query_resource(
context, models.AsyncJobLog, filters or [], sorts or [])
return jobs_in_log
def get_job(context, job_id):
with context.session.begin():
return core.get_resource(context, models.AsyncJob, job_id)
def get_job_from_log(context, job_id):
with context.session.begin():
return core.get_resource(context, models.AsyncJobLog, job_id)
def delete_job(context, job_id):
with context.session.begin():
return core.delete_resource(context, models.AsyncJob, job_id)
def get_latest_job(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.AsyncJob,
[{'key': 'status', 'comparator': 'eq', 'value': status},
@ -415,7 +463,7 @@ def get_latest_timestamp(context, status, _type, resource_id):
{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}],
[('timestamp', False)])
if jobs:
return jobs[0]['timestamp']
return jobs[0]
else:
return None
@ -442,6 +490,7 @@ def finish_job(context, job_id, successful, timestamp):
if status == constants.JS_Success:
log_dict = {'id': uuidutils.generate_uuid(),
'type': job['type'],
'project_id': job['project_id'],
'timestamp': timestamp,
'resource_id': job['resource_id']}
context.session.query(models.AsyncJob).filter(

View File

@ -0,0 +1,30 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
def upgrade(migrate_engine):
"""Function adds project_id field."""
meta = MetaData(bind=migrate_engine)
# Add a new column project_id for async_jobs
async_jobs = Table('async_jobs', meta, autoload=True)
project_id = Column('project_id', String(36), nullable=True)
if not hasattr(async_jobs.c, 'project_id'):
async_jobs.create_column(project_id)

View File

@ -0,0 +1,30 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
def upgrade(migrate_engine):
"""Function adds project_id field."""
meta = MetaData(bind=migrate_engine)
# Add a new column project_id for async_job_logs
async_job_logs = Table('async_job_logs', meta, autoload=True)
project_id = Column('project_id', String(36), nullable=True)
if not hasattr(async_job_logs.c, 'project_id'):
async_job_logs.create_column(project_id)

View File

@ -88,10 +88,11 @@ class AsyncJob(core.ModelBase, core.DictBase):
name='async_jobs0type0status0resource_id0extra_id'),
)
attributes = ['id', 'type', 'timestamp', 'status', 'resource_id',
'extra_id']
attributes = ['id', 'project_id', 'type', 'timestamp', 'status',
'resource_id', 'extra_id']
id = sql.Column('id', sql.String(length=36), primary_key=True)
project_id = sql.Column('project_id', sql.String(length=36))
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'),
@ -104,9 +105,10 @@ class AsyncJob(core.ModelBase, core.DictBase):
class AsyncJobLog(core.ModelBase, core.DictBase):
__tablename__ = 'async_job_logs'
attributes = ['id', 'resource_id', 'type', 'timestamp']
attributes = ['id', 'project_id', 'resource_id', 'type', 'timestamp']
id = sql.Column('id', sql.String(length=36), primary_key=True)
project_id = sql.Column('project_id', sql.String(length=36))
resource_id = sql.Column('resource_id', sql.String(length=127))
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,

View File

@ -393,7 +393,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
t_ctx, network_id, t_constants.RT_NETWORK)
if mappings:
self.xjob_handler.update_network(
t_ctx, network_id, t_constants.POD_NOT_SPECIFIED)
t_ctx, net['tenant_id'], network_id,
t_constants.POD_NOT_SPECIFIED)
self.type_manager.extend_network_dict_provider(context, net)
return net
@ -530,7 +531,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
mappings = db_api.get_bottom_mappings_by_top_id(
t_ctx, subnet_id, t_constants.RT_SUBNET)
if mappings:
self.xjob_handler.update_subnet(t_ctx, subnet_id,
self.xjob_handler.update_subnet(t_ctx, result['tenant_id'],
subnet_id,
t_constants.POD_NOT_SPECIFIED)
return result
@ -645,8 +647,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
# for local router, job will be triggered after router
# interface attachment.
self.xjob_handler.setup_bottom_router(
admin_context, port_body['network_id'],
router_id, pod['pod_id'])
admin_context, router['tenant_id'],
port_body['network_id'], router_id, pod['pod_id'])
# network will be attached to only one non-local router,
# so we break here
break
@ -695,7 +697,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
if is_vxlan_network and (
cfg.CONF.client.cross_pod_vxlan_mode in (
t_constants.NM_P2P, t_constants.NM_L2GW)):
self.xjob_handler.setup_shadow_ports(t_ctx, pod['pod_id'],
self.xjob_handler.setup_shadow_ports(t_ctx, res['tenant_id'],
pod['pod_id'],
res['network_id'])
# for vm port or port with empty device_owner, update top port and
# bottom port
@ -768,8 +771,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
# delete ports
for pod, _id in self.helper.get_real_shadow_resource_iterator(
t_ctx, t_constants.RT_NETWORK, port['network_id']):
self.xjob_handler.delete_server_port(t_ctx, port_id,
pod['pod_id'])
self.xjob_handler.delete_server_port(
t_ctx, port['tenant_id'], port_id, pod['pod_id'])
except Exception:
raise
with t_ctx.session.begin():
@ -1554,7 +1557,8 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
t_ctx = t_context.get_context_from_neutron_context(context)
is_local_router = self.helper.is_local_router(t_ctx, router)
if not is_local_router:
self.xjob_handler.configure_extra_routes(t_ctx, router_id)
self.xjob_handler.configure_route(
t_ctx, ret['tenant_id'], router_id)
return ret
def validate_router_net_location_match(self, t_ctx, router, net):
@ -1674,10 +1678,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
try:
if len(b_pods) == 1:
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, router_id, b_pods[0]['pod_id'])
t_ctx, project_id, net_id, router_id, b_pods[0]['pod_id'])
else:
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, router_id, t_constants.POD_NOT_SPECIFIED)
t_ctx, project_id, net_id, router_id,
t_constants.POD_NOT_SPECIFIED)
except Exception:
# NOTE(zhiyuan) we fail to submit the job, so bottom router
# operations are not started, it's safe for us to remove the top
@ -1703,15 +1708,20 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
return_info = super(TricirclePlugin, self).remove_router_interface(
context, router_id, interface_info)
router = self._get_router(context, router_id)
if not b_pods:
return return_info
try:
if len(b_pods) == 1:
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, router_id, b_pods[0]['pod_id'])
t_ctx, router['tenant_id'], net_id,
router_id, b_pods[0]['pod_id'])
else:
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, router_id, t_constants.POD_NOT_SPECIFIED)
t_ctx, router['tenant_id'], net_id,
router_id, t_constants.POD_NOT_SPECIFIED)
except Exception:
# NOTE(zhiyuan) we fail to submit the job, so if bottom router
# interface exists, it would not be deleted, then after we add
@ -1832,8 +1842,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
int_net_pod, b_int_port_id = mappings[0]
int_port = self.get_port(context, int_port_id)
net_id = int_port['network_id']
router_id = floatingip_db['router_id']
router = self._get_router(context, router_id)
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, floatingip_db['router_id'], int_net_pod['pod_id'])
t_ctx, router['tenant_id'], net_id,
floatingip_db['router_id'], int_net_pod['pod_id'])
def _disassociate_floatingip(self, context, ori_floatingip_db):
if not ori_floatingip_db['port_id']:
@ -1858,9 +1871,11 @@ class TricirclePlugin(db_base_plugin_v2.NeutronDbPluginV2,
b_int_net_pod, b_int_port_id = mappings[0]
int_port = self.get_port(context, t_int_port_id)
net_id = int_port['network_id']
router_id = ori_floatingip_db['router_id']
router = self._get_router(context, router_id)
self.xjob_handler.setup_bottom_router(
t_ctx, net_id, ori_floatingip_db['router_id'],
b_int_net_pod['pod_id'])
t_ctx, router['tenant_id'], net_id,
ori_floatingip_db['router_id'], b_int_net_pod['pod_id'])
def delete_floatingip(self, context, _id):
"""Disassociate floating ip if needed then delete it

View File

@ -0,0 +1,767 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from mock import patch
from oslo_config import cfg
from oslo_config import fixture as fixture_config
from oslo_utils import timeutils
from oslo_utils import uuidutils
from six.moves import xrange
import pecan
from pecan.configuration import set_config
from pecan.testing import load_test_app
from tricircle.api import app
from tricircle.common import constants
from tricircle.common import context
from tricircle.common import policy
from tricircle.common import xrpcapi
from tricircle.db import api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.tests import base
OPT_GROUP_NAME = 'keystone_authtoken'
cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token")
def fake_admin_context():
context_paras = {'is_admin': True}
return context.Context(**context_paras)
def fake_non_admin_context():
context_paras = {}
return context.Context(**context_paras)
class API_FunctionalTest(base.TestCase):
def setUp(self):
super(API_FunctionalTest, self).setUp()
self.addCleanup(set_config, {}, overwrite=True)
cfg.CONF.clear()
cfg.CONF.register_opts(app.common_opts)
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override('auth_strategy', 'noauth')
self.CONF.set_override('tricircle_db_connection', 'sqlite:///:memory:')
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.context = context.get_admin_context()
policy.populate_default_rules()
self.app = self._make_app()
def _make_app(self, enable_acl=False):
self.config = {
'app': {
'root': 'tricircle.api.controllers.root.RootController',
'modules': ['tricircle.api'],
'enable_acl': enable_acl,
},
}
return load_test_app(self.config)
def tearDown(self):
super(API_FunctionalTest, self).tearDown()
cfg.CONF.unregister_opts(app.common_opts)
pecan.set_config({}, overwrite=True)
core.ModelBase.metadata.drop_all(core.get_engine())
policy.reset()
class TestAsyncJobController(API_FunctionalTest):
"""Test version listing on root URI."""
def setUp(self):
super(TestAsyncJobController, self).setUp()
self.job_resource_map = constants.job_resource_map
self.all_job_types = list(self.job_resource_map.keys())
def fake_new_job(context, project_id, type, resource_id):
raise Exception
def fake_invoke_method(self, context, project_id, method, type, id):
db_api.new_job(context, project_id, type, id)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_no_input(self):
job = self._prepare_job_element(constants.JT_CONFIGURE_ROUTE)
jobs = [
# missing job
{
"job_xxx": job,
"expected_error": 400
},
]
for test_job in jobs:
response = self.app.post_json(
'/v1.0/jobs',
dict(job_xxx=test_job['job_xxx']),
expect_errors=True)
self.assertEqual(response.status_int,
test_job['expected_error'])
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(db_api, 'new_job',
new=fake_new_job)
def test_post_exception(self):
job = self._prepare_job_element(constants.JT_CONFIGURE_ROUTE)
jobs = [
{
"job": job,
"expected_error": 500
},
]
self._test_and_check(jobs)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_invalid_input(self):
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
# wrong job type parameter: no job type is provided
job_1 = copy.deepcopy(job)
job_1.pop('type')
# wrong job type parameter: job type is empty
job_2 = copy.deepcopy(job)
job_2['type'] = ''
# wrong job type parameter: job type is wrong
job_3 = copy.deepcopy(job)
job_3['type'] = job['type'] + '_1'
# wrong resource parameter: no resource is provided
job_4 = copy.deepcopy(job)
job_4.pop('resource')
# wrong resource parameter: lack of necessary resource
job_5 = copy.deepcopy(job)
job_5['resource'].popitem()
# wrong resource parameter: redundant resource
job_6 = copy.deepcopy(job)
job_6['resource']['fake_resource'] = 'fake_resource'
# wrong project id parameter: no project id is provided
job_7 = copy.deepcopy(job)
job_7.pop('project_id')
# wrong project id parameter: project id is empty
job_8 = copy.deepcopy(job)
job_8['project_id'] = ''
# wrong project id parameter: project is not the
# owner of resource
job_9 = copy.deepcopy(job)
job_9['project_id'] = uuidutils.generate_uuid()
jobs = [
{
"job": job_1,
"expected_error": 400
},
{
"job": job_2,
"expected_error": 400
},
{
"job": job_3,
"expected_error": 400
},
{
"job": job_4,
"expected_error": 400
},
{
"job": job_5,
"expected_error": 400
},
{
"job": job_6,
"expected_error": 400
},
{
"job": job_7,
"expected_error": 400
},
{
"job": job_8,
"expected_error": 400
},
{
"job": job_9,
"expected_error": 400
},
]
self._test_and_check(jobs)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(xrpcapi.XJobAPI, 'invoke_method',
new=fake_invoke_method)
def test_post_job(self):
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
jobs = [
# create an entirely new job
{
"job": job,
"expected_error": 200
},
# target job already exists in the job table and its status
# is NEW, then this newer job will be picked by job handler.
{
"job": job,
"expected_error": 200
},
]
self._test_and_check(jobs)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(xrpcapi.XJobAPI, 'invoke_method',
new=fake_invoke_method)
def test_get_one_and_get_all(self):
all_job_ids = {}
all_job_project_ids = {}
index = 0
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
jobs = [
{
"job": job,
"expected_error": 200
},
]
self._test_and_check(jobs)
response = self.app.get('/v1.0/jobs')
return_job = response.json
all_job_ids[index] = return_job['jobs'][index]['id']
all_job_project_ids[job_type] = (
return_job['jobs'][index]['project_id'])
index = index + 1
service_uris = ['jobs', 'jobs/detail']
amount_of_all_jobs = len(self.all_job_types)
# with no filters all jobs are returned
for service_uri in service_uris:
response_1 = self.app.get('/v1.0/%(service_uri)s' % {
'service_uri': service_uri})
return_jobs_1 = response_1.json
self.assertEqual(amount_of_all_jobs, len(return_jobs_1['jobs']))
self.assertIn('status', response_1)
self.assertIn('resource', response_1)
self.assertIn('project_id', response_1)
self.assertIn('id', response_1)
self.assertIn('timestamp', response_1)
self.assertIn('type', response_1)
self.assertNotIn('extra_id', response_1)
self.assertNotIn('resource_id', response_1)
# use job status filter
response_2 = self.app.get('/v1.0/jobs?status=new')
return_jobs_2 = response_2.json
self.assertEqual(amount_of_all_jobs, len(return_jobs_2['jobs']))
response = self.app.get('/v1.0/jobs?status=fail')
return_jobs_3 = response.json
self.assertEqual(0, len(return_jobs_3['jobs']))
amount_of_fail_jobs = int(amount_of_all_jobs / 3)
for i in xrange(amount_of_fail_jobs):
db_api.finish_job(self.context,
all_job_ids[i], False,
timeutils.utcnow())
amount_of_succ_jobs = int(amount_of_all_jobs / 3)
for i in xrange(amount_of_succ_jobs):
db_api.finish_job(self.context,
all_job_ids[amount_of_fail_jobs + i], True,
timeutils.utcnow())
for service_uri in service_uris:
response = self.app.get('/v1.0/%(service_uri)s?status=fail' % {
'service_uri': service_uri})
return_jobs = response.json
self.assertEqual(amount_of_fail_jobs, len(return_jobs['jobs']))
response = self.app.get('/v1.0/%(service_uri)s?status=success'
'' % {'service_uri': service_uri})
return_jobs = response.json
self.assertEqual(amount_of_succ_jobs, len(return_jobs['jobs']))
# use job type filter or project id filter
for job_type in self.all_job_types:
response = self.app.get('/v1.0/%(service_uri)s?type=%(type)s'
'' % {'service_uri': service_uri,
'type': job_type})
return_job = response.json
self.assertEqual(1, len(return_job['jobs']))
response = self.app.get(
'/v1.0/%(service_uri)s?project_id=%(project_id)s' % {
'service_uri': service_uri,
'project_id': all_job_project_ids[job_type]})
return_job = response.json
self.assertEqual(1, len(return_job['jobs']))
# combine job type filter and project id filter
response = self.app.get(
'/v1.0/%(service_uri)s?project_id=%(project_id)s&'
'type=%(type)s' % {
'service_uri': service_uri,
'project_id': all_job_project_ids[job_type],
'type': job_type})
return_job = response.json
self.assertEqual(1, len(return_job['jobs']))
# combine job type filter, project id filter and job status filter
for i in xrange(amount_of_all_jobs):
if i < amount_of_fail_jobs:
# this aims to test service "/v1.0/jobs/{id}"
response_1 = self.app.get('/v1.0/jobs/%(id)s' % {
'id': all_job_ids[i]})
return_job_1 = response_1.json
response_2 = self.app.get(
'/v1.0/%(service_uri)s?'
'project_id=%(project_id)s&'
'type=%(type)s&'
'status=%(status)s' % {
'service_uri': service_uri,
'project_id': return_job_1['job']['project_id'],
'type': return_job_1['job']['type'],
'status': 'fail'})
return_job_2 = response_2.json
self.assertEqual(1, len(return_job_2['jobs']))
elif ((i >= amount_of_fail_jobs
) and (i < amount_of_fail_jobs + amount_of_succ_jobs)):
# those jobs are set to 'success' and they are moved to
# job log. their job ids are not stored in all_job_ids
job_type = self.all_job_types[i]
response = self.app.get(
'/v1.0/%(service_uri)s?project_id=%(project_id)s&'
'type=%(type)s&status=%(status)s' % {
'service_uri': service_uri,
'project_id': all_job_project_ids[job_type],
'type': job_type,
'status': 'success'})
return_job = response.json
self.assertEqual(1, len(return_job['jobs']))
response_2 = self.app.get(
'/v1.0/%(service_uri)s?status=%(status)s'
'&type=%(type)s' % {
'service_uri': service_uri,
'status': "success-x",
'type': job_type})
return_job_2 = response_2.json
self.assertEqual(0, len(return_job_2['jobs']))
else:
response_1 = self.app.get('/v1.0/jobs/%(id)s' % {
'id': all_job_ids[i]})
return_job_1 = response_1.json
response_2 = self.app.get(
'/v1.0/%(service_uri)s?project_id=%(project_id)s&'
'type=%(type)s&status=%(status)s' % {
'service_uri': service_uri,
'project_id': return_job_1['job']['project_id'],
'type': return_job_1['job']['type'],
'status': 'new'})
return_job_2 = response_2.json
self.assertEqual(1, len(return_job_2['jobs']))
response_3 = self.app.get(
'/v1.0/%(service_uri)s?status=%(status)s'
'&type=%(type)s' % {
'service_uri': service_uri,
'status': "new-x",
'type': return_job_1['job']['type']})
return_job_3 = response_3.json
self.assertEqual(0, len(return_job_3['jobs']))
# use unsupported filter, it will raise 400 error
response = self.app.get('/v1.0/%(service_uri)s?'
'fake_filter=%(fake_filter)s'
'' % {'service_uri': service_uri,
'fake_filter': "fake_filter"},
expect_errors=True)
self.assertEqual(response.status_int, 400)
# use invalid filter, it will return empty set
response = self.app.get('/v1.0/%(service_uri)s?status=%(status)s'
'' % {'service_uri': service_uri,
'status': "new-x"})
return_job = response.json
self.assertEqual(0, len(return_job['jobs']))
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_get_job_schemas(self):
response = self.app.get('/v1.0/jobs/schemas')
return_job_schemas = response.json
job_schemas = []
for job_type in self.all_job_types:
job = {}
resource = []
for resource_type, resource_id in (
self.job_resource_map[job_type]):
resource.append(resource_id)
job['resource'] = resource
job['type'] = job_type
job_schemas.append(job)
self.assertEqual(job_schemas, return_job_schemas['schemas'])
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(xrpcapi.XJobAPI, 'invoke_method',
new=fake_invoke_method)
def test_delete_job(self):
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
jobs = [
{
"job": job,
"expected_error": 200
},
]
self._test_and_check(jobs)
response = self.app.get('/v1.0/jobs')
return_job = response.json
jobs = return_job['jobs']
# delete a new job
for job in jobs:
response_1 = self.app.delete(
'/v1.0/jobs/%(id)s' % {'id': job['id']},
expect_errors=True)
return_value_1 = response_1.json
self.assertEqual(response_1.status_int, 200)
self.assertEqual(return_value_1, {})
response_2 = self.app.get('/v1.0/jobs')
return_job_2 = response_2.json
self.assertEqual(0, len(return_job_2['jobs']))
response_3 = self.app.delete('/v1.0/jobs/123', expect_errors=True)
self.assertEqual(response_3.status_int, 404)
# delete a running job
job_type_4 = constants.JT_NETWORK_UPDATE
job_4 = self._prepare_job_element(job_type_4)
resource_id_4 = '#'.join([job_4['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type_4]])
job_running_4 = db_api.register_job(self.context,
job_4['project_id'],
job_type_4,
resource_id_4)
self.assertEqual(constants.JS_Running, job_running_4['status'])
response_4 = self.app.delete('/v1.0/jobs/%(id)s' % {
'id': job_running_4['id']}, expect_errors=True)
self.assertEqual(response_4.status_int, 400)
# delete a failed job
job_type_5 = constants.JT_NETWORK_UPDATE
job_5 = self._prepare_job_element(job_type_5)
job_dict_5 = {
"job": job_5,
"expected_error": 200
}
response_5 = self.app.post_json('/v1.0/jobs',
dict(job=job_dict_5['job']),
expect_errors=True)
return_job_5 = response_5.json
self.assertEqual(response_5.status_int, 200)
db_api.finish_job(self.context,
return_job_5['job']['id'],
False, timeutils.utcnow())
job_fail_5 = db_api.get_job(self.context, return_job_5['job']['id'])
self.assertEqual(constants.JS_Fail, job_fail_5['status'])
response_6 = self.app.delete('/v1.0/jobs/%(id)s' % {
'id': return_job_5['job']['id']}, expect_errors=True)
self.assertEqual(response_6.status_int, 200)
# delete a successful job
job_type_6 = constants.JT_NETWORK_UPDATE
job_6 = self._prepare_job_element(job_type_6)
job_dict_6 = {
"job": job_6,
"expected_error": 200
}
response_6 = self.app.post_json('/v1.0/jobs',
dict(job=job_dict_6['job']),
expect_errors=True)
return_job_6 = response_6.json
with self.context.session.begin():
job_dict = {'status': constants.JS_Success,
'timestamp': timeutils.utcnow(),
'extra_id': uuidutils.generate_uuid()}
core.update_resource(self.context, models.AsyncJob,
return_job_6['job']['id'], job_dict)
job_succ_6 = db_api.get_job(self.context, return_job_6['job']['id'])
self.assertEqual(constants.JS_Success, job_succ_6['status'])
response_7 = self.app.delete('/v1.0/jobs/%(id)s' % {
'id': return_job_6['job']['id']}, expect_errors=True)
self.assertEqual(response_7.status_int, 200)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(xrpcapi.XJobAPI, 'invoke_method',
new=fake_invoke_method)
def test_redo_job(self):
for job_type in self.all_job_types:
job = self._prepare_job_element(job_type)
jobs = [
# create an entirely new job
{
"job": job,
"expected_error": 200
},
]
self._test_and_check(jobs)
response = self.app.get('/v1.0/jobs')
return_job = response.json
jobs = return_job['jobs']
# redo a new job
for job in jobs:
response_1 = self.app.put('/v1.0/jobs/%(id)s' % {'id': job['id']},
expect_errors=True)
self.assertEqual(response_1.status_int, 200)
response_2 = self.app.put('/v1.0/jobs/123', expect_errors=True)
self.assertEqual(response_2.status_int, 404)
# redo a running job
job_type_3 = constants.JT_NETWORK_UPDATE
job_3 = self._prepare_job_element(job_type_3)
resource_id_3 = '#'.join([job_3['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type_3]])
job_running_3 = db_api.register_job(self.context,
job_3['project_id'],
job_type_3,
resource_id_3)
self.assertEqual(constants.JS_Running, job_running_3['status'])
response_3 = self.app.put('/v1.0/jobs/%(id)s' % {
'id': job_running_3['id']}, expect_errors=True)
self.assertEqual(response_3.status_int, 400)
# redo a failed job
job_type_4 = constants.JT_NETWORK_UPDATE
job_4 = self._prepare_job_element(job_type_4)
job_dict_4 = {
"job": job_4,
"expected_error": 200
}
response_4 = self.app.post_json('/v1.0/jobs',
dict(job=job_dict_4['job']),
expect_errors=True)
return_job_4 = response_4.json
self.assertEqual(response_4.status_int, 200)
db_api.finish_job(self.context,
return_job_4['job']['id'],
False, timeutils.utcnow())
job_fail_4 = db_api.get_job(self.context, return_job_4['job']['id'])
self.assertEqual(constants.JS_Fail, job_fail_4['status'])
response_5 = self.app.put('/v1.0/jobs/%(id)s' % {
'id': return_job_4['job']['id']}, expect_errors=True)
self.assertEqual(response_5.status_int, 200)
# redo a successful job
job_type_6 = constants.JT_NETWORK_UPDATE
job_6 = self._prepare_job_element(job_type_6)
job_dict_6 = {
"job": job_6,
"expected_error": 200
}
response_6 = self.app.post_json('/v1.0/jobs',
dict(job=job_dict_6['job']),
expect_errors=True)
return_job_6 = response_6.json
with self.context.session.begin():
job_dict = {'status': constants.JS_Success,
'timestamp': timeutils.utcnow(),
'extra_id': uuidutils.generate_uuid()}
core.update_resource(self.context, models.AsyncJob,
return_job_6['job']['id'], job_dict)
job_succ_6 = db_api.get_job(self.context, return_job_6['job']['id'])
self.assertEqual(constants.JS_Success, job_succ_6['status'])
response_7 = self.app.put('/v1.0/jobs/%(id)s' % {
'id': return_job_6['job']['id']}, expect_errors=True)
self.assertEqual(response_7.status_int, 400)
@patch.object(context, 'extract_context_from_environ',
new=fake_non_admin_context)
def test_non_admin_action(self):
job_type = constants.JT_NETWORK_UPDATE
job = self._prepare_job_element(job_type)
jobs = [
{
"job": job,
"expected_error": 403
},
]
self._test_and_check(jobs)
response_1 = self.app.get('/v1.0/jobs/1234567890',
expect_errors=True)
self.assertEqual(response_1.status_int, 403)
response_2 = self.app.get('/v1.0/jobs',
expect_errors=True)
self.assertEqual(response_2.status_int, 403)
response_3 = self.app.delete('/v1.0/jobs/1234567890',
expect_errors=True)
self.assertEqual(response_3.status_int, 403)
response_4 = self.app.put('/v1.0/jobs/1234567890',
expect_errors=True)
self.assertEqual(response_4.status_int, 403)
def _test_and_check(self, jobs):
for test_job in jobs:
response = self.app.post_json(
'/v1.0/jobs', dict(job=test_job['job']),
expect_errors=True)
self.assertEqual(response.status_int, test_job['expected_error'])
def _prepare_job_element(self, job_type):
# in order to create a job, we need three elements: job type,
# job resource and project id.
job = {}
job['resource'] = {}
job['type'] = job_type
for resource_type, resource_id in self.job_resource_map[job_type]:
job['resource'][resource_id] = uuidutils.generate_uuid()
job['project_id'] = self._prepare_project_id_for_job(job)
return job
def _prepare_project_id_for_job(self, job):
# prepare the project id for job creation, currently job parameter
# contains job type and job resource information.
job_type = job['type']
if job_type == constants.JT_SEG_RULE_SETUP:
project_id = job['resource']['project_id']
else:
project_id = uuidutils.generate_uuid()
pod_id = uuidutils.generate_uuid()
resource_type, resource_id = (
constants.job_primary_resource_map[job_type])
routing = db_api.create_resource_mapping(
self.context, job['resource'][resource_id],
job['resource'][resource_id], pod_id, project_id,
resource_type)
self.assertIsNotNone(routing)
return project_id
def _validate_error_code(self, res, code):
self.assertEqual(res[list(res.keys())[0]]['code'], code)

View File

@ -0,0 +1,616 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from mock import patch
from oslo_utils import timeutils
from oslo_utils import uuidutils
from six.moves import xrange
import unittest
import pecan
from tricircle.api.controllers import job
from tricircle.common import constants
from tricircle.common import context
from tricircle.common import policy
from tricircle.common import xrpcapi
from tricircle.db import api as db_api
from tricircle.db import core
from tricircle.db import models
class FakeRPCAPI(xrpcapi.XJobAPI):
def invoke_method(self, ctxt, project_id, method, _type, id):
db_api.new_job(ctxt, project_id, _type, id)
class FakeAsyncJobController(job.AsyncJobController):
def __init__(self):
self.xjob_handler = FakeRPCAPI()
class FakeResponse(object):
def __new__(cls, code=500):
cls.status = code
cls.status_code = code
return super(FakeResponse, cls).__new__(cls)
class AsyncJobControllerTest(unittest.TestCase):
def setUp(self):
core.initialize()
core.ModelBase.metadata.create_all(core.get_engine())
self.controller = FakeAsyncJobController()
self.context = context.get_admin_context()
self.job_resource_map = constants.job_resource_map
policy.populate_default_rules()
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(context, 'extract_context_from_environ')
def test_post(self, mock_context):
mock_context.return_value = self.context
# cover all job types
for job_type in self.job_resource_map.keys():
job = self._prepare_job_element(job_type)
kw_job = {'job': job}
# failure case, only admin can create the job
self.context.is_admin = False
res = self.controller.post(**kw_job)
self._validate_error_code(res, 403)
self.context.is_admin = True
# failure case, request body not found
kw_job_1 = {'job_1': job}
res = self.controller.post(**kw_job_1)
self._validate_error_code(res, 400)
# failure case, wrong job type parameter
job_type_backup = job.pop('type')
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['type'] = ''
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['type'] = job_type_backup + '_1'
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['type'] = job_type_backup
# failure case, wrong resource parameter
job_resource_backup = job.pop('resource')
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['resource'] = copy.deepcopy(job_resource_backup)
job['resource'].popitem()
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
fake_resource = 'fake_resource'
job['resource'][fake_resource] = fake_resource
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['resource'] = job_resource_backup
# failure case, wrong project id parameter
project_id_backup = job.pop('project_id')
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['project_id'] = ''
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['project_id'] = uuidutils.generate_uuid()
res = self.controller.post(**kw_job)
self._validate_error_code(res, 400)
job['project_id'] = project_id_backup
# successful case, create an entirely new job. Because the job
# status returned from controller has been formatted, so we not
# only validate the database records, but also validate the return
# value of the controller.
job_1 = self.controller.post(**kw_job)['job']
job_in_db_1 = db_api.get_job(self.context, job_1['id'])
self.assertEqual(job_type, job_in_db_1['type'])
self.assertEqual(job['project_id'], job_in_db_1['project_id'])
self.assertEqual(constants.JS_New, job_in_db_1['status'])
self.assertEqual('NEW', job_1['status'])
self.assertEqual(len(constants.job_resource_map[job['type']]),
len(job_1['resource']))
self.assertFalse('resource_id' in job_1)
self.assertFalse('extra_id' in job_1)
db_api.delete_job(self.context, job_1['id'])
# successful case, target job already exists in the job table
# and its status is NEW, then this newer job will be picked by
# job handler.
job_2 = self.controller.post(**kw_job)['job']
job_in_db_2 = db_api.get_job(self.context, job_2['id'])
job_3 = self.controller.post(**kw_job)['job']
job_in_db_3 = db_api.get_job(self.context, job_3['id'])
self.assertEqual(job_type, job_in_db_2['type'])
self.assertEqual(job['project_id'], job_in_db_2['project_id'])
self.assertEqual(constants.JS_New, job_in_db_2['status'])
self.assertEqual('NEW', job_2['status'])
self.assertEqual(len(constants.job_resource_map[job['type']]),
len(job_2['resource']))
self.assertFalse('resource_id' in job_2)
self.assertFalse('extra_id' in job_2)
self.assertEqual(job_type, job_in_db_3['type'])
self.assertEqual(job['project_id'], job_in_db_3['project_id'])
self.assertEqual(constants.JS_New, job_in_db_3['status'])
self.assertEqual('NEW', job_3['status'])
self.assertEqual(len(constants.job_resource_map[job['type']]),
len(job_3['resource']))
self.assertFalse('resource_id' in job_3)
self.assertFalse('extra_id' in job_3)
db_api.finish_job(self.context, job_3['id'], False,
timeutils.utcnow())
db_api.delete_job(self.context, job_3['id'])
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(context, 'extract_context_from_environ')
def test_get_one(self, mock_context):
mock_context.return_value = self.context
# failure case, only admin can list the job's info
self.context.is_admin = False
res = self.controller.get_one("schemas")
self._validate_error_code(res, 403)
res = self.controller.get_one("detail")
self._validate_error_code(res, 403)
res = self.controller.get_one(uuidutils.generate_uuid())
self._validate_error_code(res, 403)
self.context.is_admin = True
# failure case, parameter error
res = self.controller.get_one("schemas_1")
self._validate_error_code(res, 404)
res = self.controller.get_one(uuidutils.generate_uuid())
self._validate_error_code(res, 404)
# successful case, set id="schemas" to get job schemas
job_schemas_2 = self.controller.get_one("schemas")
job_schemas_3 = []
for job_type in self.job_resource_map.keys():
job = {}
resource = []
for resource_type, resource_id in self.job_resource_map[job_type]:
resource.append(resource_id)
job['resource'] = resource
job['type'] = job_type
job_schemas_3.append(job)
self.assertEqual(job_schemas_3, job_schemas_2['schemas'])
# successful case, set id="detail" to get all jobs.
# first, we need to create jobs in job table.
amount_of_all_jobs = len(self.job_resource_map.keys())
all_job_ids = {}
index = 0
for job_type in self.job_resource_map.keys():
job = self._prepare_job_element(job_type)
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type]])
job_1 = db_api.new_job(self.context,
job['project_id'], job_type,
resource_id)
all_job_ids[index] = job_1['id']
index = index + 1
# validate if the id=job_id, get_one(id=job_id) can take effective
job_2 = self.controller.get_one(job_1['id'])['job']
self.assertEqual(job_1['type'], job_2['type'])
self.assertEqual(job_1['project_id'], job_2['project_id'])
self.assertEqual("NEW", job_2['status'])
jobs_1 = self.controller.get_one("detail")
self.assertEqual(amount_of_all_jobs, len(jobs_1['jobs']))
# create jobs in job log table, in order to validate
# get_one(id=detail) can also get the jobs from job log
amount_of_succ_jobs = int(len(all_job_ids) / 2)
for i in xrange(amount_of_succ_jobs):
db_api.finish_job(self.context, all_job_ids[i], True,
timeutils.utcnow())
jobs_2 = self.controller.get_one("detail")
self.assertEqual(amount_of_all_jobs, len(jobs_2['jobs']))
job_status_filter_1 = {'status': 'success'}
jobs_3 = self.controller.get_one("detail", **job_status_filter_1)
self.assertEqual(amount_of_succ_jobs, len(jobs_3['jobs']))
job_status_filter_2 = {'status': 'new'}
jobs_4 = self.controller.get_one("detail", **job_status_filter_2)
self.assertEqual(amount_of_all_jobs - amount_of_succ_jobs,
len(jobs_4['jobs']))
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(context, 'extract_context_from_environ')
def test_get_all_jobs(self, mock_context):
mock_context.return_value = self.context
# map job type to project id for later project id filter validation.
job_project_id_map = {}
amount_of_all_jobs = len(self.job_resource_map.keys())
amount_of_running_jobs = 3
count = 1
# cover all job types.
for job_type in self.job_resource_map.keys():
job = self._prepare_job_element(job_type)
job_project_id_map[job_type] = job['project_id']
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type]])
if count <= amount_of_running_jobs:
db_api.register_job(self.context,
job['project_id'], job_type,
resource_id)
else:
db_api.new_job(self.context,
job['project_id'], job_type,
resource_id)
count = count + 1
# query the jobs with several kinds of filters.
# supported filters: project id, job status, job type.
job_status_filter_1 = {'status': 'new'}
job_status_filter_2 = {'status': 'fail'}
job_status_filter_3 = {'status': 'running'}
invalid_filter = {'status': "new-x"}
unsupported_filter = {'fake_filter': "fake_filter"}
count = 1
for job_type in self.job_resource_map.keys():
project_id_filter_1 = {'project_id': job_project_id_map[job_type]}
project_id_filter_2 = {'project_id': uuidutils.generate_uuid()}
job_type_filter_1 = {'type': job_type}
job_type_filter_2 = {'type': job_type + '_1'}
# failure case, only admin can list the jobs
self.context.is_admin = False
res = self.controller.get_all()
self._validate_error_code(res, 403)
self.context.is_admin = True
# successful case, filter by project id
jobs_project_id_filter_1 = self.controller.get_all(
**project_id_filter_1)
self.assertEqual(1, len(jobs_project_id_filter_1['jobs']))
jobs_project_id_filter_2 = self.controller.get_all(
**project_id_filter_2)
self.assertEqual(0, len(jobs_project_id_filter_2['jobs']))
# successful case, filter by job type
jobs_job_type_filter_1 = self.controller.get_all(
**job_type_filter_1)
self.assertEqual(1, len(jobs_job_type_filter_1['jobs']))
jobs_job_type_filter_2 = self.controller.get_all(
**job_type_filter_2)
self.assertEqual(0, len(jobs_job_type_filter_2['jobs']))
# successful case, filter by project id, job status and job type
if count <= amount_of_running_jobs:
all_filters = dict(list(project_id_filter_1.items()) +
list(job_status_filter_3.items()) +
list(job_type_filter_1.items()))
jobs_all_filters = self.controller.get_all(**all_filters)
self.assertEqual(1, len(jobs_all_filters['jobs']))
else:
all_filters = dict(list(project_id_filter_1.items()) +
list(job_status_filter_1.items()) +
list(job_type_filter_1.items()))
jobs_all_filters = self.controller.get_all(**all_filters)
self.assertEqual(1, len(jobs_all_filters['jobs']))
# successful case, contradictory filter
contradict_filters = dict(list(project_id_filter_1.items()) +
list(job_status_filter_2.items()) +
list((job_type_filter_2.items())))
jobs_contradict_filters = self.controller.get_all(
**contradict_filters)
self.assertEqual(0, len(jobs_contradict_filters['jobs']))
count = count + 1
# failure case, unsupported filter
res = self.controller.get_all(**unsupported_filter)
self._validate_error_code(res, 400)
# successful case, invalid filter
jobs_invalid_filter = self.controller.get_all(**invalid_filter)
self.assertEqual(0, len(jobs_invalid_filter['jobs']))
# successful case, list jobs without filters
jobs_empty_filters = self.controller.get_all()
self.assertEqual(amount_of_all_jobs, len(jobs_empty_filters['jobs']))
# successful case, filter by job status
jobs_job_status_filter_1 = self.controller.get_all(
**job_status_filter_1)
self.assertEqual(amount_of_all_jobs - amount_of_running_jobs,
len(jobs_job_status_filter_1['jobs']))
jobs_job_status_filter_2 = self.controller.get_all(
**job_status_filter_2)
self.assertEqual(0, len(jobs_job_status_filter_2['jobs']))
jobs_job_status_filter_3 = self.controller.get_all(
**job_status_filter_3)
self.assertEqual(amount_of_running_jobs,
len(jobs_job_status_filter_3['jobs']))
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(pecan, 'response', new=mock.Mock)
@patch.object(context, 'extract_context_from_environ')
def test_delete(self, mock_context):
mock_context.return_value = self.context
# cover all job types.
# each 'for' loop adds one item in job log table, we set count variable
# to record dynamic total job entries in job log table.
count = 1
for job_type in self.job_resource_map.keys():
job = self._prepare_job_element(job_type)
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type]])
# failure case, only admin can delete the job
job_1 = db_api.new_job(self.context, job['project_id'],
job_type,
resource_id)
self.context.is_admin = False
res = self.controller.delete(job_1['id'])
self._validate_error_code(res, 403)
self.context.is_admin = True
db_api.delete_job(self.context, job_1['id'])
# failure case, job not found
res = self.controller.delete(-123)
self._validate_error_code(res, 404)
# failure case, delete a running job
job_2 = db_api.register_job(self.context,
job['project_id'],
job_type, resource_id)
job = db_api.get_job(self.context, job_2['id'])
res = self.controller.delete(job_2['id'])
self._validate_error_code(res, 400)
# finish the job and delete it
db_api.finish_job(self.context, job_2['id'], False,
timeutils.utcnow())
db_api.delete_job(self.context, job_2['id'])
# successful case, delete a successful job. successful job from
# job log can't be deleted, here this successful job is from
# job table.
job_3 = self._prepare_job_element(job_type)
resource_id_3 = '#'.join([job_3['resource'][resource_id_3]
for resource_type_3, resource_id_3
in self.job_resource_map[job_type]])
job_4 = db_api.new_job(self.context,
job_3['project_id'],
job_type, resource_id_3)
with self.context.session.begin():
job_dict = {'status': constants.JS_Success,
'timestamp': timeutils.utcnow(),
'extra_id': uuidutils.generate_uuid()}
core.update_resource(self.context, models.AsyncJob,
job_4['id'], job_dict)
job_4_succ = db_api.get_job(self.context, job_4['id'])
self.controller.delete(job_4['id'])
filters_job_4 = [
{'key': 'type', 'comparator': 'eq',
'value': job_4_succ['type']},
{'key': 'status', 'comparator': 'eq',
'value': job_4_succ['status']},
{'key': 'resource_id', 'comparator': 'eq',
'value': job_4_succ['resource_id']},
{'key': 'extra_id', 'comparator': 'eq',
'value': job_4_succ['extra_id']}]
self.assertEqual(0, len(db_api.list_jobs(self.context,
filters_job_4)))
self.assertEqual(count,
len(db_api.list_jobs_from_log(self.context)))
count = count + 1
# successful case, delete a new job
job_5 = db_api.new_job(self.context,
job['project_id'], job_type,
resource_id)
self.controller.delete(job_5['id'])
filters_job_5 = [
{'key': 'type', 'comparator': 'eq', 'value': job_5['type']},
{'key': 'status', 'comparator': 'eq',
'value': job_5['status']},
{'key': 'resource_id', 'comparator': 'eq',
'value': job_5['resource_id']},
{'key': 'extra_id', 'comparator': 'eq',
'value': job_5['extra_id']}]
self.assertEqual(0, len(db_api.list_jobs(self.context,
filters_job_5)))
# successful case, delete a failed job
job_6 = db_api.new_job(self.context,
job['project_id'], job_type,
resource_id)
db_api.finish_job(self.context, job_6['id'], False,
timeutils.utcnow())
job_6_failed = db_api.get_job(self.context, job_6['id'])
self.controller.delete(job_6['id'])
filters_job_6 = [
{'key': 'type', 'comparator': 'eq',
'value': job_6_failed['type']},
{'key': 'status', 'comparator': 'eq',
'value': job_6_failed['status']},
{'key': 'resource_id', 'comparator': 'eq',
'value': job_6_failed['resource_id']},
{'key': 'extra_id', 'comparator': 'eq',
'value': job_6_failed['extra_id']}]
self.assertEqual(0, len(db_api.list_jobs(self.context,
filters_job_6)))
@patch.object(pecan, 'response', new=FakeResponse)
@patch.object(pecan, 'response', new=mock.Mock)
@patch.object(context, 'extract_context_from_environ')
def test_put(self, mock_context):
mock_context.return_value = self.context
# cover all job types
for job_type in self.job_resource_map.keys():
job = self._prepare_job_element(job_type)
resource_id = '#'.join([job['resource'][resource_id]
for resource_type, resource_id
in self.job_resource_map[job_type]])
# failure case, only admin can redo the job
job_1 = db_api.new_job(self.context,
job['project_id'],
job_type, resource_id)
self.context.is_admin = False
res = self.controller.put(job_1['id'])
self._validate_error_code(res, 403)
self.context.is_admin = True
db_api.delete_job(self.context, job_1['id'])
# failure case, job not found
res = self.controller.put(-123)
self._validate_error_code(res, 404)
# failure case, redo a running job
job_2 = db_api.register_job(self.context,
job['project_id'],
job_type, resource_id)
res = self.controller.put(job_2['id'])
self._validate_error_code(res, 400)
db_api.finish_job(self.context, job_2['id'], False,
timeutils.utcnow())
db_api.delete_job(self.context, job_2['id'])
# failure case, redo a successful job
job_3 = self._prepare_job_element(job_type)
resource_id_3 = '#'.join([job_3['resource'][resource_id_3]
for resource_type_3, resource_id_3
in self.job_resource_map[job_type]])
job_4 = db_api.new_job(self.context,
job_3['project_id'],
job_type, resource_id_3)
with self.context.session.begin():
job_dict = {'status': constants.JS_Success,
'timestamp': timeutils.utcnow(),
'extra_id': uuidutils.generate_uuid()}
core.update_resource(self.context, models.AsyncJob,
job_4['id'], job_dict)
res = self.controller.put(job_4['id'])
self._validate_error_code(res, 400)
db_api.finish_job(self.context, job_4['id'], True,
timeutils.utcnow())
# successful case, redo a failed job
job_5 = db_api.new_job(self.context,
job['project_id'],
job_type, resource_id)
db_api.finish_job(self.context, job_5['id'], False,
timeutils.utcnow())
self.controller.put(job_5['id'])
db_api.delete_job(self.context, job_5['id'])
# successful case, redo a new job
job_6 = db_api.new_job(self.context,
job['project_id'],
job_type, resource_id)
self.controller.put(job_6['id'])
db_api.delete_job(self.context, job_6['id'])
def _prepare_job_element(self, job_type):
# in order to create a job, we need three elements: job type,
# job resource and project id.
job = {}
job['resource'] = {}
job['type'] = job_type
for resource_type, resource_id in self.job_resource_map[job_type]:
job['resource'][resource_id] = uuidutils.generate_uuid()
job['project_id'] = self._prepare_project_id_for_job(job)
return job
def _prepare_project_id_for_job(self, job):
# prepare the project id for job creation, currently job parameter
# contains job type and job resource information.
job_type = job['type']
if job_type == constants.JT_SEG_RULE_SETUP:
project_id = job['resource']['project_id']
else:
project_id = uuidutils.generate_uuid()
pod_id = uuidutils.generate_uuid()
resource_type, resource_id = (
constants.job_primary_resource_map[job_type])
routing = db_api.create_resource_mapping(
self.context, job['resource'][resource_id],
job['resource'][resource_id], pod_id, project_id,
resource_type)
self.assertIsNotNone(routing)
return project_id
def _validate_error_code(self, res, code):
self.assertEqual(res[list(res.keys())[0]]['code'], code)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())

View File

@ -1034,7 +1034,7 @@ class FakeBaseXManager(xmanager.XManager):
def __init__(self, fake_plugin):
self.clients = {constants.TOP: client.Client()}
self.job_handles = {
constants.JT_ROUTER: self.configure_extra_routes,
constants.JT_CONFIGURE_ROUTE: self.configure_route,
constants.JT_ROUTER_SETUP: self.setup_bottom_router,
constants.JT_PORT_DELETE: self.delete_server_port}
self.helper = FakeHelper(fake_plugin)
@ -1070,15 +1070,15 @@ class FakeBaseRPCAPI(object):
def __init__(self, fake_plugin):
self.xmanager = FakeBaseXManager(fake_plugin)
def configure_extra_routes(self, ctxt, router_id):
def configure_route(self, ctxt, project_id, router_id):
pass
def update_network(self, ctxt, network_id, pod_id):
def update_network(self, ctxt, project_id, network_id, pod_id):
combine_id = '%s#%s' % (pod_id, network_id)
self.xmanager.update_network(
ctxt, payload={constants.JT_NETWORK_UPDATE: combine_id})
def update_subnet(self, ctxt, subnet_id, pod_id):
def update_subnet(self, ctxt, project_id, subnet_id, pod_id):
combine_id = '%s#%s' % (pod_id, subnet_id)
self.xmanager.update_subnet(
ctxt, payload={constants.JT_SUBNET_UPDATE: combine_id})
@ -1086,7 +1086,7 @@ class FakeBaseRPCAPI(object):
def configure_security_group_rules(self, ctxt, project_id):
pass
def setup_shadow_ports(self, ctxt, pod_id, net_id):
def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id):
pass
@ -1094,18 +1094,18 @@ class FakeRPCAPI(FakeBaseRPCAPI):
def __init__(self, fake_plugin):
self.xmanager = FakeXManager(fake_plugin)
def setup_bottom_router(self, ctxt, net_id, router_id, pod_id):
def setup_bottom_router(self, ctxt, project_id, net_id, router_id, pod_id):
combine_id = '%s#%s#%s' % (pod_id, router_id, net_id)
self.xmanager.setup_bottom_router(
ctxt, payload={constants.JT_ROUTER_SETUP: combine_id})
def delete_server_port(self, ctxt, port_id, pod_id):
def delete_server_port(self, ctxt, project_id, port_id, pod_id):
pass
def configure_security_group_rules(self, ctxt, project_id):
pass
def setup_shadow_ports(self, ctxt, pod_id, net_id):
def setup_shadow_ports(self, ctxt, project_id, pod_id, net_id):
combine_id = '%s#%s' % (pod_id, net_id)
self.xmanager.setup_shadow_ports(
ctxt, payload={constants.JT_SHADOW_PORT_SETUP: combine_id})
@ -1381,7 +1381,8 @@ class PluginTest(unittest.TestCase,
port_id='top_id_0', ip_address='10.0.0.1',
subnet_id='top_subnet_id',
network_id='top_net_id')]},
{'id': 'top_id_1', 'name': 'top'},
{'id': 'top_id_1', 'name': 'top',
'tenant_id': 'project_id'},
{'id': 'top_id_2', 'name': 'top'},
{'id': 'top_id_3', 'name': 'top'}])
BOTTOM1_PORTS.append({'id': 'bottom_id_1', 'name': 'bottom'})
@ -1518,8 +1519,8 @@ class PluginTest(unittest.TestCase,
plugin_calls = [mock.call(neutron_context, t_port_id1),
mock.call(neutron_context, t_port_id2)]
client_calls = [
mock.call(tricircle_context, t_port_id1, 'pod_id_1'),
mock.call(tricircle_context, t_port_id2, 'pod_id_1')]
mock.call(tricircle_context, project_id, t_port_id1, 'pod_id_1'),
mock.call(tricircle_context, project_id, t_port_id2, 'pod_id_1')]
mock_plugin_method.assert_has_calls(plugin_calls)
mock_client_method.assert_has_calls(client_calls)
@ -1880,7 +1881,7 @@ class PluginTest(unittest.TestCase,
'tenant_id': tenant_id,
'mac_address': 'fa:16:3e:cd:76:40',
'binding:vif_type': vif_type,
'project_id': 'tenant_id',
'project_id': 'project_id',
'binding:host_id': 'zhiyuan-5',
'status': 'ACTIVE'
}
@ -2488,7 +2489,7 @@ class PluginTest(unittest.TestCase,
@patch.object(db_base_plugin_common.DbBasePluginCommon,
'_make_subnet_dict', new=fake_make_subnet_dict)
@patch.object(FakeClient, 'add_gateway_routers')
@patch.object(FakeBaseRPCAPI, 'configure_extra_routes')
@patch.object(FakeBaseRPCAPI, 'configure_route')
@patch.object(context, 'get_context_from_neutron_context')
def test_add_interface(self, mock_context, mock_rpc, mock_action):
self._basic_pod_route_setup()
@ -2509,7 +2510,7 @@ class PluginTest(unittest.TestCase,
_, b_router_id = db_api.get_bottom_mappings_by_top_id(
t_ctx, t_router_id, constants.RT_ROUTER)[0]
mock_rpc.assert_called_once_with(t_ctx, t_router_id)
mock_rpc.assert_called_once_with(t_ctx, tenant_id, t_router_id)
for b_net in BOTTOM1_NETS:
if 'provider:segmentation_id' in b_net:
self.assertIn(b_net['provider:segmentation_id'], (2000, 2001))
@ -2715,7 +2716,7 @@ class PluginTest(unittest.TestCase,
'_make_subnet_dict', new=fake_make_subnet_dict)
@patch.object(FakePlugin, '_get_bridge_network_subnet')
@patch.object(FakeClient, 'add_gateway_routers')
@patch.object(FakeBaseRPCAPI, 'configure_extra_routes')
@patch.object(FakeBaseRPCAPI, 'configure_route')
@patch.object(context, 'get_context_from_neutron_context')
def test_add_interface_for_local_router(
self, mock_context, mock_rpc, mock_action, mock_get_bridge_net):
@ -2861,7 +2862,7 @@ class PluginTest(unittest.TestCase,
'_allocate_ips_for_port', new=fake_allocate_ips_for_port)
@patch.object(db_base_plugin_common.DbBasePluginCommon,
'_make_subnet_dict', new=fake_make_subnet_dict)
@patch.object(FakeBaseRPCAPI, 'configure_extra_routes')
@patch.object(FakeBaseRPCAPI, 'configure_route')
@patch.object(FakeClient, 'remove_interface_routers')
@patch.object(context, 'get_context_from_neutron_context')
def test_remove_interface(self, mock_context, mock_remove, mock_rpc):
@ -2891,7 +2892,7 @@ class PluginTest(unittest.TestCase,
mock_remove.assert_called_with(
t_ctx, b_router_id, {'port_id': b_interface_id})
mock_rpc.assert_called_with(t_ctx, t_router_id)
mock_rpc.assert_called_with(t_ctx, tenant_id, t_router_id)
def _prepare_interface_port(self, t_ctx, t_subnet_id, ip_suffix):
t_client = FakeClient()
@ -2923,7 +2924,7 @@ class PluginTest(unittest.TestCase,
@patch.object(l3_db.L3_NAT_dbonly_mixin, '_make_router_dict',
new=fake_make_router_dict)
@patch.object(FakeClient, 'add_gateway_routers')
@patch.object(FakeBaseRPCAPI, 'configure_extra_routes')
@patch.object(FakeBaseRPCAPI, 'configure_route')
@patch.object(context, 'get_context_from_neutron_context')
def test_east_west_gw_router(self, mock_context, mock_rpc, mock_action):
self._basic_pod_route_setup()
@ -3844,7 +3845,8 @@ class PluginTest(unittest.TestCase,
t_ctx, b_sd_port1['id'], {'port': {
'binding:profile': {constants.PROFILE_FORCE_UP: 'True'}}})
# asynchronous job in pod_1 is registered
mock_setup.assert_called_once_with(t_ctx, 'pod_id_1', t_net_id)
mock_setup.assert_called_once_with(t_ctx, TEST_TENANT_ID,
'pod_id_1', t_net_id)
@patch.object(directory, 'get_plugin', new=fake_get_plugin)
@patch.object(driver.Pool, 'get_instance', new=fake_get_instance)
@ -3852,7 +3854,7 @@ class PluginTest(unittest.TestCase,
'_allocate_ips_for_port', new=fake_allocate_ips_for_port)
@patch.object(db_base_plugin_common.DbBasePluginCommon,
'_make_subnet_dict', new=fake_make_subnet_dict)
@patch.object(FakeBaseRPCAPI, 'configure_extra_routes', new=mock.Mock)
@patch.object(FakeBaseRPCAPI, 'configure_route', new=mock.Mock)
@patch.object(FakeBaseRPCAPI, 'setup_shadow_ports')
@patch.object(context, 'get_context_from_neutron_context')
def test_add_interface_trigger_l2pop(self, mock_context, mock_setup):
@ -3919,8 +3921,10 @@ class PluginTest(unittest.TestCase,
self.assertIn(constants.PROFILE_FORCE_UP,
shadow_ports[0]['binding:profile'])
# asynchronous jobs are registered
calls = [mock.call(t_ctx, 'pod_id_2', shadow_ports[0]['network_id']),
mock.call(t_ctx, 'pod_id_1', shadow_ports[0]['network_id'])]
calls = [mock.call(t_ctx, tenant_id, 'pod_id_2',
shadow_ports[0]['network_id']),
mock.call(t_ctx, tenant_id, 'pod_id_1',
shadow_ports[0]['network_id'])]
mock_setup.assert_has_calls(calls)
def tearDown(self):

View File

@ -367,12 +367,15 @@ class XManagerTest(unittest.TestCase):
@patch.object(FakeClient, 'update_routers')
def test_configure_extra_routes_with_floating_ips(self, mock_update):
top_router_id = 'router_id'
project_id = uuidutils.generate_uuid()
bridge_infos = self._prepare_east_west_network_test(top_router_id)
ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id)
self._prepare_dnat_test()
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE,
top_router_id)
self.xmanager.configure_route(
self.context,
payload={constants.JT_CONFIGURE_ROUTE: top_router_id})
calls = []
ns_routes = []
for i in range(2):
@ -394,11 +397,14 @@ class XManagerTest(unittest.TestCase):
@patch.object(FakeClient, 'update_routers')
def test_configure_extra_routes_with_external_network(self, mock_update):
top_router_id = 'router_id'
project_id = uuidutils.generate_uuid()
bridge_infos = self._prepare_east_west_network_test(top_router_id)
ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id)
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE,
top_router_id)
self.xmanager.configure_route(
self.context,
payload={constants.JT_CONFIGURE_ROUTE: top_router_id})
calls = []
ns_routes = []
for i in range(2):
@ -418,12 +424,15 @@ class XManagerTest(unittest.TestCase):
self._check_extra_routes_calls(calls, mock_update.call_args_list)
@patch.object(FakeClient, 'update_routers')
def test_configure_extra_routes(self, mock_update):
def test_configure_route(self, mock_update):
top_router_id = 'router_id'
project_id = uuidutils.generate_uuid()
bridge_infos = self._prepare_east_west_network_test(top_router_id)
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
db_api.new_job(self.context, project_id, constants.JT_CONFIGURE_ROUTE,
top_router_id)
self.xmanager.configure_route(
self.context,
payload={constants.JT_CONFIGURE_ROUTE: top_router_id})
calls = []
for i in range(2):
routes = []
@ -538,9 +547,12 @@ class XManagerTest(unittest.TestCase):
# net3 is attached to R3
target_router_id = 'top_router_3_id'
db_api.new_job(self.context, constants.JT_ROUTER, target_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: target_router_id})
project_id = uuidutils.generate_uuid()
db_api.new_job(self.context, project_id,
constants.JT_CONFIGURE_ROUTE, target_router_id)
self.xmanager.configure_route(
self.context,
payload={constants.JT_CONFIGURE_ROUTE: target_router_id})
# for the following paths, packets will go to R3 via the interface
# which is attached to R3
@ -653,7 +665,8 @@ class XManagerTest(unittest.TestCase):
RES_MAP['top']['subnet'].append(subnet_ipv6)
RES_MAP['pod_1']['security_group'].append(sg)
db_api.new_job(self.context, constants.JT_SEG_RULE_SETUP, project_id)
db_api.new_job(self.context, project_id, constants.JT_SEG_RULE_SETUP,
project_id)
self.xmanager.configure_security_group_rules(
self.context, payload={constants.JT_SEG_RULE_SETUP: project_id})
@ -727,8 +740,8 @@ class XManagerTest(unittest.TestCase):
'192.168.1.102')
resource_id = 'pod_id_1#' + net1_id
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
db_api.new_job(self.context, project_id,
constants.JT_SHADOW_PORT_SETUP, resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
@ -745,7 +758,8 @@ class XManagerTest(unittest.TestCase):
sd_ports[0]['binding:profile'])
# check job to setup shadow ports for pod2 is registered
mock_setup.assert_called_once_with(self.context, 'pod_id_2', net1_id)
mock_setup.assert_called_once_with(self.context, project_id,
'pod_id_2', net1_id)
# update shadow port to down and test again, this is possible when we
# succeed to create shadow port but fail to update it to active
@ -755,8 +769,8 @@ class XManagerTest(unittest.TestCase):
{'port': {'status': q_constants.PORT_STATUS_DOWN,
'binding:profile': profile}})
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
db_api.new_job(self.context, project_id,
constants.JT_SHADOW_PORT_SETUP, resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
@ -767,8 +781,8 @@ class XManagerTest(unittest.TestCase):
# manually trigger shadow ports setup in pod2
resource_id = 'pod_id_2#' + net1_id
db_api.new_job(self.context, constants.JT_SHADOW_PORT_SETUP,
resource_id)
db_api.new_job(self.context, project_id,
constants.JT_SHADOW_PORT_SETUP, resource_id)
self.xmanager.setup_shadow_ports(
self.context,
payload={constants.JT_SHADOW_PORT_SETUP: resource_id})
@ -789,8 +803,9 @@ class XManagerTest(unittest.TestCase):
pass
fake_id = 'fake_id'
fake_project_id = uuidutils.generate_uuid()
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
db_api.new_job(self.context, fake_project_id, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
logs = core.query_resource(self.context, models.AsyncJobLog, [], [])
@ -806,8 +821,9 @@ class XManagerTest(unittest.TestCase):
raise Exception()
fake_id = 'fake_id'
fake_project_id = uuidutils.generate_uuid()
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
db_api.new_job(self.context, fake_project_id, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.AsyncJob, [], [])
@ -828,8 +844,9 @@ class XManagerTest(unittest.TestCase):
pass
fake_id = uuidutils.generate_uuid()
fake_project_id = uuidutils.generate_uuid()
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
db_api.new_job(self.context, fake_project_id, job_type, fake_id)
expired_job = {
'id': uuidutils.generate_uuid(),
'type': job_type,
@ -860,8 +877,9 @@ class XManagerTest(unittest.TestCase):
mock_get.return_value = None
fake_id = uuidutils.generate_uuid()
fake_project_id = uuidutils.generate_uuid()
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
db_api.new_job(self.context, fake_project_id, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
# nothing to assert, what we test is that fake_handle can exit when
@ -872,28 +890,28 @@ class XManagerTest(unittest.TestCase):
mock_now.return_value = datetime.datetime(2000, 1, 2, 12, 0, 0)
job_dict_list = [
{'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0),
'resource_id': 'uuid1', 'type': 'res1',
'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1",
'status': constants.JS_Fail}, # job_uuid1
{'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0),
'resource_id': 'uuid1', 'type': 'res1',
'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1",
'status': constants.JS_Fail}, # job_uuid3
{'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0),
'resource_id': 'uuid2', 'type': 'res2',
'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1",
'status': constants.JS_Fail}, # job_uuid5
{'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0),
'resource_id': 'uuid2', 'type': 'res2',
'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1",
'status': constants.JS_Fail}, # job_uuid7
{'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0),
'resource_id': 'uuid3', 'type': 'res3',
'resource_id': 'uuid3', 'type': 'res3', 'project_id': "uuid1",
'status': constants.JS_Success}, # job_uuid9
{'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0),
'resource_id': 'uuid4', 'type': 'res4',
'resource_id': 'uuid4', 'type': 'res4', 'project_id': "uuid1",
'status': constants.JS_New}, # job_uuid11
{'timestamp': datetime.datetime(1999, 12, 31, 12, 0, 0),
'resource_id': 'uuid5', 'type': 'res5',
'resource_id': 'uuid5', 'type': 'res5', 'project_id': "uuid1",
'status': constants.JS_Fail}, # job_uuid13
{'timestamp': datetime.datetime(1999, 12, 31, 11, 59, 59),
'resource_id': 'uuid6', 'type': 'res6',
'resource_id': 'uuid6', 'type': 'res6', 'project_id': "uuid1",
'status': constants.JS_Fail}] # job_uuid15
for i, job_dict in enumerate(job_dict_list, 1):
job_dict['id'] = 'job_uuid%d' % (2 * i - 1)
@ -907,10 +925,11 @@ class XManagerTest(unittest.TestCase):
# for res3 + uuid3, the latest job's status is "Success", not returned
# for res6 + uuid6, the latest job is out of the redo time span
expected_failed_jobs = [
{'resource_id': 'uuid1', 'type': 'res1'},
{'resource_id': 'uuid2', 'type': 'res2'},
{'resource_id': 'uuid5', 'type': 'res5'}]
expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4'}]
{'resource_id': 'uuid1', 'type': 'res1', 'project_id': "uuid1"},
{'resource_id': 'uuid2', 'type': 'res2', 'project_id': "uuid1"},
{'resource_id': 'uuid5', 'type': 'res5', 'project_id': "uuid1"}]
expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4',
'project_id': "uuid1"}]
(failed_jobs,
new_jobs) = db_api.get_latest_failed_or_new_jobs(self.context)
six.assertCountEqual(self, expected_failed_jobs, failed_jobs)

View File

@ -67,15 +67,16 @@ def _job_handle(job_type):
if delta.seconds >= CONF.worker_handle_timeout:
# quit when this handle is running for a long time
break
time_new = db_api.get_latest_timestamp(ctx, constants.JS_New,
job_type, resource_id)
if not time_new:
job_new = db_api.get_latest_job(
ctx, constants.JS_New, job_type, resource_id)
if not job_new:
break
time_success = db_api.get_latest_timestamp(
job_succ = db_api.get_latest_job(
ctx, constants.JS_Success, job_type, resource_id)
if time_success and time_success >= time_new:
if job_succ and job_succ['timestamp'] >= job_new['timestamp']:
break
job = db_api.register_job(ctx, job_type, resource_id)
job = db_api.register_job(ctx, job_new['project_id'], job_type,
resource_id)
if not job:
# fail to obtain the lock, let other worker handle the job
running_job = db_api.get_running_job(ctx, job_type,
@ -94,7 +95,7 @@ def _job_handle(job_type):
# previous running job expires, we set its status to
# fail and try again to obtain the lock
db_api.finish_job(ctx, running_job['id'], False,
time_new)
job_new['timestamp'])
LOG.warning('Job %(job)s of type %(job_type)s for '
'resource %(resource)s expires, set '
'its state to Fail',
@ -111,14 +112,15 @@ def _job_handle(job_type):
try:
func(*args, **kwargs)
except Exception:
db_api.finish_job(ctx, job['id'], False, time_new)
db_api.finish_job(ctx, job['id'], False,
job_new['timestamp'])
LOG.error('Job %(job)s of type %(job_type)s for '
'resource %(resource)s fails',
{'job': job['id'],
'job_type': job_type,
'resource': resource_id})
break
db_api.finish_job(ctx, job['id'], True, time_new)
db_api.finish_job(ctx, job['id'], True, job_new['timestamp'])
eventlet.sleep(CONF.worker_sleep_time)
return handle_args
return handle_func
@ -145,7 +147,7 @@ class XManager(PeriodicTasks):
self.additional_endpoints = []
self.clients = {constants.TOP: client.Client()}
self.job_handles = {
constants.JT_ROUTER: self.configure_extra_routes,
constants.JT_CONFIGURE_ROUTE: self.configure_route,
constants.JT_ROUTER_SETUP: self.setup_bottom_router,
constants.JT_PORT_DELETE: self.delete_server_port,
constants.JT_SEG_RULE_SETUP: self.configure_security_group_rules,
@ -251,13 +253,14 @@ class XManager(PeriodicTasks):
job_index = random.randint(0, len(jobs) - 1)
job_type = jobs[job_index]['type']
resource_id = jobs[job_index]['resource_id']
project_id = jobs[job_index]['project_id']
payload = {job_type: resource_id}
LOG.debug('Redo %(status)s job for %(resource_id)s of type '
'%(job_type)s',
{'status': 'new' if is_new_job else 'failed',
'resource_id': resource_id, 'job_type': job_type})
if not is_new_job:
db_api.new_job(ctx, job_type, resource_id)
db_api.new_job(ctx, project_id, job_type, resource_id)
self.job_handles[job_type](ctx, payload=payload)
@staticmethod
@ -512,6 +515,15 @@ class XManager(PeriodicTasks):
(b_pod_id,
t_router_id, t_net_id) = payload[constants.JT_ROUTER_SETUP].split('#')
t_client = self._get_client()
t_pod = db_api.get_top_pod(ctx)
t_router = t_client.get_routers(ctx, t_router_id)
if not t_router:
# we just end this job if top router no longer exists
return
project_id = t_router['tenant_id']
if b_pod_id == constants.POD_NOT_SPECIFIED:
mappings = db_api.get_bottom_mappings_by_top_id(
ctx, t_net_id, constants.RT_NETWORK)
@ -520,15 +532,9 @@ class XManager(PeriodicTasks):
# NOTE(zhiyuan) we create one job for each pod to avoid
# conflict caused by different workers operating the same pod
self.xjob_handler.setup_bottom_router(
ctx, t_net_id, t_router_id, b_pod['pod_id'])
ctx, project_id, t_net_id, t_router_id, b_pod['pod_id'])
return
t_client = self._get_client()
t_pod = db_api.get_top_pod(ctx)
t_router = t_client.get_routers(ctx, t_router_id)
if not t_router:
# we just end this job if top router no longer exists
return
t_net = t_client.get_networks(ctx, t_net_id)
if not t_net:
# we just end this job if top network no longer exists
@ -566,11 +572,12 @@ class XManager(PeriodicTasks):
ctx, t_pod, b_pod, t_client, t_net, t_router, t_bridge_net,
t_bridge_subnet, is_ext_net_pod)
if not is_local_router:
self.xjob_handler.configure_extra_routes(ctx, t_router_id)
self.xjob_handler.configure_route(ctx, project_id,
t_router_id)
@_job_handle(constants.JT_ROUTER)
def configure_extra_routes(self, ctx, payload):
t_router_id = payload[constants.JT_ROUTER]
@_job_handle(constants.JT_CONFIGURE_ROUTE)
def configure_route(self, ctx, payload):
t_router_id = payload[constants.JT_CONFIGURE_ROUTE]
t_client = self._get_client()
t_router = t_client.get_routers(ctx, t_router_id)
if not t_router:
@ -867,19 +874,22 @@ class XManager(PeriodicTasks):
"""
(b_pod_id, t_network_id) = payload[
constants.JT_NETWORK_UPDATE].split('#')
if b_pod_id == constants.POD_NOT_SPECIFIED:
mappings = db_api.get_bottom_mappings_by_top_id(
ctx, t_network_id, constants.RT_NETWORK)
b_pods = [mapping[0] for mapping in mappings]
for b_pod in b_pods:
self.xjob_handler.update_network(ctx, t_network_id,
b_pod['pod_id'])
return
t_client = self._get_client()
t_network = t_client.get_networks(ctx, t_network_id)
if not t_network:
return
project_id = t_network['tenant_id']
if b_pod_id == constants.POD_NOT_SPECIFIED:
mappings = db_api.get_bottom_mappings_by_top_id(
ctx, t_network_id, constants.RT_NETWORK)
b_pods = [mapping[0] for mapping in mappings]
for b_pod in b_pods:
self.xjob_handler.update_network(ctx, project_id,
t_network_id, b_pod['pod_id'])
return
b_pod = db_api.get_pod(ctx, b_pod_id)
b_region_name = b_pod['region_name']
b_client = self._get_client(region_name=b_region_name)
@ -918,19 +928,22 @@ class XManager(PeriodicTasks):
"""
(b_pod_id, t_subnet_id) = payload[
constants.JT_SUBNET_UPDATE].split('#')
if b_pod_id == constants.POD_NOT_SPECIFIED:
mappings = db_api.get_bottom_mappings_by_top_id(
ctx, t_subnet_id, constants.RT_SUBNET)
b_pods = [mapping[0] for mapping in mappings]
for b_pod in b_pods:
self.xjob_handler.update_subnet(ctx, t_subnet_id,
b_pod['pod_id'])
return
t_client = self._get_client()
t_subnet = t_client.get_subnets(ctx, t_subnet_id)
if not t_subnet:
return
project_id = t_subnet['tenant_id']
if b_pod_id == constants.POD_NOT_SPECIFIED:
mappings = db_api.get_bottom_mappings_by_top_id(
ctx, t_subnet_id, constants.RT_SUBNET)
b_pods = [mapping[0] for mapping in mappings]
for b_pod in b_pods:
self.xjob_handler.update_subnet(ctx, project_id,
t_subnet_id, b_pod['pod_id'])
return
b_pod = db_api.get_pod(ctx, b_pod_id)
b_region_name = b_pod['region_name']
b_subnet_id = db_api.get_bottom_id_by_top_id_region_name(
@ -1097,4 +1110,5 @@ class XManager(PeriodicTasks):
ctx, sw_port_id, update_body)
for pod_id in sync_pod_list:
self.xjob_handler.setup_shadow_ports(ctx, pod_id, t_net_id)
self.xjob_handler.setup_shadow_ports(ctx, project_id,
pod_id, t_net_id)