Merge "Asynchronous job management(part 2)"

This commit is contained in:
Jenkins 2016-04-12 06:57:11 +00:00 committed by Gerrit Code Review
commit 6015586f8f
5 changed files with 87 additions and 5 deletions

View File

@ -66,3 +66,7 @@ JS_Success = 'Success'
JS_Fail = 'Fail'
SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000'
TOP = 'top'
# job type
JT_ROUTER = 'router'

View File

@ -24,6 +24,9 @@ import rpc
from serializer import TricircleSerializer as Serializer
import topics
from tricircle.common import constants
CONF = cfg.CONF
rpcapi_cap_opt = cfg.StrOpt('xjobapi',
@ -78,4 +81,5 @@ class XJobAPI(object):
# specifying its control exchange, so the default value "openstack" is
# used, thus we need to pass exchange as "openstack" here.
self.client.prepare(exchange='openstack').cast(
ctxt, 'configure_extra_routes', payload={'router': router_id})
ctxt, 'configure_extra_routes',
payload={constants.JT_ROUTER: router_id})

View File

@ -14,6 +14,7 @@
# under the License.
import functools
import sqlalchemy as sql
import time
import uuid
@ -236,6 +237,24 @@ def register_job(context, _type, resource_id):
context.session.close()
def get_latest_failed_jobs(context):
jobs = []
query = context.session.query(models.Job.type, models.Job.resource_id,
sql.func.count(models.Job.id))
query = query.group_by(models.Job.type, models.Job.resource_id)
for job_type, resource_id, count in query:
_query = context.session.query(models.Job)
_query = _query.filter_by(type=job_type, resource_id=resource_id)
_query = _query.order_by(sql.desc('timestamp'))
# when timestamps of job entries are the same, sort entries by status
# so "Fail" job is placed before "New" and "Success" jobs
_query = _query.order_by(sql.asc('status'))
latest_job = _query[0].to_dict()
if latest_job['status'] == constants.JS_Fail:
jobs.append(latest_job)
return jobs
def get_latest_timestamp(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.Job,

View File

@ -267,6 +267,41 @@ class XManagerTest(unittest.TestCase):
# nothing to assert, what we test is that fake_handle can exit when
# timeout
def test_get_failed_jobs(self):
job_dict_list = [
{'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0),
'resource_id': 'uuid1', 'type': 'res1',
'status': constants.JS_Fail}, # job_uuid1
{'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0),
'resource_id': 'uuid1', 'type': 'res1',
'status': constants.JS_Fail}, # job_uuid3
{'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0),
'resource_id': 'uuid2', 'type': 'res2',
'status': constants.JS_Fail}, # job_uuid5
{'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0),
'resource_id': 'uuid2', 'type': 'res2',
'status': constants.JS_Fail}, # job_uuid7
{'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Fail}, # job_uuid9
{'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Success}]
for i, job_dict in enumerate(job_dict_list, 1):
job_dict['id'] = 'job_uuid%d' % (2 * i - 1)
job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1)
core.create_resource(self.context, models.Job, job_dict)
job_dict['id'] = 'job_uuid%d' % (2 * i)
job_dict['extra_id'] = 'extra_uuid%d' % (2 * i)
job_dict['status'] = constants.JS_New
core.create_resource(self.context, models.Job, job_dict)
# for res3 + uuid3, the latest job's status is "Success", not returned
expected_ids = ['job_uuid3', 'job_uuid5']
returned_jobs = db_api.get_latest_failed_jobs(self.context)
actual_ids = [job['id'] for job in returned_jobs]
self.assertItemsEqual(expected_ids, actual_ids)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST:

View File

@ -16,6 +16,7 @@
import datetime
import eventlet
import netaddr
import random
import six
from oslo_config import cfg
@ -127,12 +128,13 @@ class XManager(PeriodicTasks):
self.service_name = service_name
# self.notifier = rpc.get_notifier(self.service_name, self.host)
self.additional_endpoints = []
self.clients = {'top': client.Client()}
self.clients = {constants.TOP: client.Client()}
self.job_handles = {constants.JT_ROUTER: self.configure_extra_routes}
super(XManager, self).__init__()
def _get_client(self, pod_name=None):
if not pod_name:
return self.clients['top']
return self.clients[constants.TOP]
if pod_name not in self.clients:
self.clients[pod_name] = client.Client(pod_name)
return self.clients[pod_name]
@ -205,11 +207,29 @@ class XManager(PeriodicTasks):
return info_text
@_job_handle('router')
@periodic_task.periodic_task
def redo_failed_job(self, ctx):
failed_jobs = db_api.get_latest_failed_jobs(ctx)
failed_jobs = [
job for job in failed_jobs if job['type'] in self.job_handles]
if not failed_jobs:
return
# in one run we only pick one job to handle
job_index = random.randint(0, len(failed_jobs) - 1)
failed_job = failed_jobs[job_index]
job_type = failed_job['type']
payload = {job_type: failed_job['resource_id']}
LOG.debug(_('Redo failed job for %(resource_id)s of type '
'%(job_type)s'),
{'resource_id': failed_job['resource_id'],
'job_type': job_type})
self.job_handles[job_type](ctx, payload=payload)
@_job_handle(constants.JT_ROUTER)
def configure_extra_routes(self, ctx, payload):
# TODO(zhiyuan) performance and reliability issue
# better have a job tracking mechanism
t_router_id = payload['router']
t_router_id = payload[constants.JT_ROUTER]
b_pods, b_router_ids = zip(*db_api.get_bottom_mappings_by_top_id(
ctx, t_router_id, constants.RT_ROUTER))