From ea1c28d61d35e7720119524b3c1a1b95c300f86b Mon Sep 17 00:00:00 2001 From: zhiyuan_cai Date: Wed, 6 Apr 2016 16:36:07 +0800 Subject: [PATCH] Asynchronous job management(part 2) Implement asynchronous job management to ensure jobs can be successfully completed even if those jobs temporally fail for some reasons. The detailed design can be found in section 9 in design document. This patch focuses on enabling workers to rerun failed job. Workers started with configuration option 'periodic_enable' set to 'True' are responsible to this work. Purging old job records will be covered in later patches. Change-Id: I2631a98af67e663f929f293bdfb7e7779fe8018e --- tricircle/common/constants.py | 4 +++ tricircle/common/xrpcapi.py | 6 +++- tricircle/db/api.py | 19 ++++++++++++ tricircle/tests/unit/xjob/test_xmanager.py | 35 ++++++++++++++++++++++ tricircle/xjob/xmanager.py | 28 ++++++++++++++--- 5 files changed, 87 insertions(+), 5 deletions(-) diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index 8eaf6c59..601939f1 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -66,3 +66,7 @@ JS_Success = 'Success' JS_Fail = 'Fail' SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000' +TOP = 'top' + +# job type +JT_ROUTER = 'router' diff --git a/tricircle/common/xrpcapi.py b/tricircle/common/xrpcapi.py index c4b64f06..32ce085b 100755 --- a/tricircle/common/xrpcapi.py +++ b/tricircle/common/xrpcapi.py @@ -24,6 +24,9 @@ import rpc from serializer import TricircleSerializer as Serializer import topics +from tricircle.common import constants + + CONF = cfg.CONF rpcapi_cap_opt = cfg.StrOpt('xjobapi', @@ -78,4 +81,5 @@ class XJobAPI(object): # specifying its control exchange, so the default value "openstack" is # used, thus we need to pass exchange as "openstack" here. self.client.prepare(exchange='openstack').cast( - ctxt, 'configure_extra_routes', payload={'router': router_id}) + ctxt, 'configure_extra_routes', + payload={constants.JT_ROUTER: router_id}) diff --git a/tricircle/db/api.py b/tricircle/db/api.py index 5ddd40ae..27e4ccdf 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -14,6 +14,7 @@ # under the License. import functools +import sqlalchemy as sql import time import uuid @@ -236,6 +237,24 @@ def register_job(context, _type, resource_id): context.session.close() +def get_latest_failed_jobs(context): + jobs = [] + query = context.session.query(models.Job.type, models.Job.resource_id, + sql.func.count(models.Job.id)) + query = query.group_by(models.Job.type, models.Job.resource_id) + for job_type, resource_id, count in query: + _query = context.session.query(models.Job) + _query = _query.filter_by(type=job_type, resource_id=resource_id) + _query = _query.order_by(sql.desc('timestamp')) + # when timestamps of job entries are the same, sort entries by status + # so "Fail" job is placed before "New" and "Success" jobs + _query = _query.order_by(sql.asc('status')) + latest_job = _query[0].to_dict() + if latest_job['status'] == constants.JS_Fail: + jobs.append(latest_job) + return jobs + + def get_latest_timestamp(context, status, _type, resource_id): jobs = core.query_resource( context, models.Job, diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index a1667426..4e0d506c 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -267,6 +267,41 @@ class XManagerTest(unittest.TestCase): # nothing to assert, what we test is that fake_handle can exit when # timeout + def test_get_failed_jobs(self): + job_dict_list = [ + {'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0), + 'resource_id': 'uuid1', 'type': 'res1', + 'status': constants.JS_Fail}, # job_uuid1 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 5, 0), + 'resource_id': 'uuid1', 'type': 'res1', + 'status': constants.JS_Fail}, # job_uuid3 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 20, 0), + 'resource_id': 'uuid2', 'type': 'res2', + 'status': constants.JS_Fail}, # job_uuid5 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 15, 0), + 'resource_id': 'uuid2', 'type': 'res2', + 'status': constants.JS_Fail}, # job_uuid7 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0), + 'resource_id': 'uuid3', 'type': 'res3', + 'status': constants.JS_Fail}, # job_uuid9 + {'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0), + 'resource_id': 'uuid3', 'type': 'res3', + 'status': constants.JS_Success}] + for i, job_dict in enumerate(job_dict_list, 1): + job_dict['id'] = 'job_uuid%d' % (2 * i - 1) + job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1) + core.create_resource(self.context, models.Job, job_dict) + job_dict['id'] = 'job_uuid%d' % (2 * i) + job_dict['extra_id'] = 'extra_uuid%d' % (2 * i) + job_dict['status'] = constants.JS_New + core.create_resource(self.context, models.Job, job_dict) + + # for res3 + uuid3, the latest job's status is "Success", not returned + expected_ids = ['job_uuid3', 'job_uuid5'] + returned_jobs = db_api.get_latest_failed_jobs(self.context) + actual_ids = [job['id'] for job in returned_jobs] + self.assertItemsEqual(expected_ids, actual_ids) + def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) for res in RES_LIST: diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index 566b74e3..649ad25e 100755 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -16,6 +16,7 @@ import datetime import eventlet import netaddr +import random import six from oslo_config import cfg @@ -127,12 +128,13 @@ class XManager(PeriodicTasks): self.service_name = service_name # self.notifier = rpc.get_notifier(self.service_name, self.host) self.additional_endpoints = [] - self.clients = {'top': client.Client()} + self.clients = {constants.TOP: client.Client()} + self.job_handles = {constants.JT_ROUTER: self.configure_extra_routes} super(XManager, self).__init__() def _get_client(self, pod_name=None): if not pod_name: - return self.clients['top'] + return self.clients[constants.TOP] if pod_name not in self.clients: self.clients[pod_name] = client.Client(pod_name) return self.clients[pod_name] @@ -205,11 +207,29 @@ class XManager(PeriodicTasks): return info_text - @_job_handle('router') + @periodic_task.periodic_task + def redo_failed_job(self, ctx): + failed_jobs = db_api.get_latest_failed_jobs(ctx) + failed_jobs = [ + job for job in failed_jobs if job['type'] in self.job_handles] + if not failed_jobs: + return + # in one run we only pick one job to handle + job_index = random.randint(0, len(failed_jobs) - 1) + failed_job = failed_jobs[job_index] + job_type = failed_job['type'] + payload = {job_type: failed_job['resource_id']} + LOG.debug(_('Redo failed job for %(resource_id)s of type ' + '%(job_type)s'), + {'resource_id': failed_job['resource_id'], + 'job_type': job_type}) + self.job_handles[job_type](ctx, payload=payload) + + @_job_handle(constants.JT_ROUTER) def configure_extra_routes(self, ctx, payload): # TODO(zhiyuan) performance and reliability issue # better have a job tracking mechanism - t_router_id = payload['router'] + t_router_id = payload[constants.JT_ROUTER] b_pods, b_router_ids = zip(*db_api.get_bottom_mappings_by_top_id( ctx, t_router_id, constants.RT_ROUTER))