From 13f3f31bf10b50d57830c2d5c0ba798cef34e94c Mon Sep 17 00:00:00 2001 From: zhiyuan_cai Date: Thu, 5 Jan 2017 09:22:29 +0800 Subject: [PATCH] XJob reliability improvement 1. What is the problem? Currently we use cast method to trigger XJob daemon which we have risk to lose jobs. 2. What is the solution to the problem? As proposed in the specification[1], we create a database record for each new job before invoking cast method, so the job will not be lost even if there is something wrong with the message broker. 3. What the features need to be implemented to the Tricircle to realize the solution? XJob reliability is enhanced. [1] https://review.openstack.org/#/c/412762/ Change-Id: I86f37093aa1843f1daaf61b99541d3f921ef28ef --- doc/source/configuration.rst | 4 + tricircle/common/constants.py | 8 +- tricircle/common/xrpcapi.py | 6 + tricircle/db/api.py | 73 +++++++++--- .../db/migrate_repo/versions/002_resource.py | 14 ++- tricircle/db/models.py | 16 ++- tricircle/tests/unit/xjob/test_xmanager.py | 105 +++++++++++------- tricircle/xjob/xmanager.py | 63 +++++------ tricircle/xjob/xservice.py | 5 + 9 files changed, 193 insertions(+), 101 deletions(-) diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 3c0a713b..ae6612e5 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -118,6 +118,10 @@ xjob.conf. - (Integer) Timeout for worker's one turn of processing, in seconds * - ``worker_sleep_time`` = ``60`` - (Float) Seconds a worker sleeps after one run in a loop + * - ``redo_time_span`` = ``172800`` + - (Integer) Time span in seconds, we calculate the latest job timestamp by + subtracting this time span from the current timestamp, jobs created + between these two timestamps will be redone Networking Setting for Tricircle ================================ diff --git a/tricircle/common/constants.py b/tricircle/common/constants.py index a8573753..2aa91683 100644 --- a/tricircle/common/constants.py +++ b/tricircle/common/constants.py @@ -66,10 +66,10 @@ DEFAULT_DESTINATION = '0.0.0.0/0' expire_time = datetime.datetime(2000, 1, 1) # job status -JS_New = 'New' -JS_Running = 'Running' -JS_Success = 'Success' -JS_Fail = 'Fail' +JS_New = '3_New' +JS_Running = '2_Running' +JS_Success = '1_Success' +JS_Fail = '0_Fail' SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000' TOP = 'top' diff --git a/tricircle/common/xrpcapi.py b/tricircle/common/xrpcapi.py index 53615a74..59d7970d 100644 --- a/tricircle/common/xrpcapi.py +++ b/tricircle/common/xrpcapi.py @@ -23,6 +23,7 @@ from tricircle.common import constants from tricircle.common import rpc from tricircle.common import serializer as t_serializer from tricircle.common import topics +import tricircle.db.api as db_api CONF = cfg.CONF @@ -73,6 +74,7 @@ class XJobAPI(object): def setup_bottom_router(self, ctxt, net_id, router_id, pod_id): combine_id = '%s#%s#%s' % (pod_id, router_id, net_id) + db_api.new_job(ctxt, constants.JT_ROUTER_SETUP, combine_id) self.client.prepare(exchange='openstack').cast( ctxt, 'setup_bottom_router', payload={constants.JT_ROUTER_SETUP: combine_id}) @@ -82,23 +84,27 @@ class XJobAPI(object): # control exchange is "neutron", however, we starts xjob without # specifying its control exchange, so the default value "openstack" is # used, thus we need to pass exchange as "openstack" here. + db_api.new_job(ctxt, constants.JT_ROUTER, router_id) self.client.prepare(exchange='openstack').cast( ctxt, 'configure_extra_routes', payload={constants.JT_ROUTER: router_id}) def delete_server_port(self, ctxt, port_id, pod_id): combine_id = '%s#%s' % (pod_id, port_id) + db_api.new_job(ctxt, constants.JT_PORT_DELETE, combine_id) self.client.prepare(exchange='openstack').cast( ctxt, 'delete_server_port', payload={constants.JT_PORT_DELETE: combine_id}) def configure_security_group_rules(self, ctxt, project_id): + db_api.new_job(ctxt, constants.JT_SEG_RULE_SETUP, project_id) self.client.prepare(exchange='openstack').cast( ctxt, 'configure_security_group_rules', payload={constants.JT_SEG_RULE_SETUP: project_id}) def update_network(self, ctxt, network_id, pod_id): combine_id = '%s#%s' % (pod_id, network_id) + db_api.new_job(ctxt, constants.JT_NETWORK_UPDATE, combine_id) self.client.prepare(exchange='openstack').cast( ctxt, 'update_network', payload={constants.JT_NETWORK_UPDATE: combine_id}) diff --git a/tricircle/db/api.py b/tricircle/db/api.py index 31d0cc3e..e3701d8c 100644 --- a/tricircle/db/api.py +++ b/tricircle/db/api.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import functools import sqlalchemy as sql import time @@ -20,6 +21,7 @@ import time from oslo_config import cfg from oslo_db import exception as db_exc from oslo_log import log as logging +from oslo_utils import timeutils from oslo_utils import uuidutils from tricircle.common import constants @@ -346,24 +348,40 @@ def register_job(context, _type, resource_id): context.session.close() -def get_latest_failed_jobs(context): - jobs = [] +def get_latest_failed_or_new_jobs(context): + current_timestamp = timeutils.utcnow() + time_span = datetime.timedelta(seconds=CONF.redo_time_span) + latest_timestamp = current_timestamp - time_span + failed_jobs = [] + new_jobs = [] + + # first we group the jobs by type and resource id, and in each group we + # pick the latest timestamp + stmt = context.session.query( + models.AsyncJob.type, models.AsyncJob.resource_id, + sql.func.max(models.AsyncJob.timestamp).label('timestamp')) + stmt = stmt.filter(models.AsyncJob.timestamp >= latest_timestamp) + stmt = stmt.group_by(models.AsyncJob.type, + models.AsyncJob.resource_id).subquery() + + # then we join the result with the original table and group again, in each + # group, we pick the "minimum" of the status, for status, the ascendant + # sort sequence is "0_Fail", "1_Success", "2_Running", "3_New" query = context.session.query(models.AsyncJob.type, models.AsyncJob.resource_id, - sql.func.count(models.AsyncJob.id)) - query = query.group_by(models.AsyncJob.type, models.AsyncJob.resource_id) - for job_type, resource_id, count in query: - _query = context.session.query(models.AsyncJob) - _query = _query.filter_by(type=job_type, resource_id=resource_id) - _query = _query.order_by(sql.desc('timestamp')) - # when timestamps of async job entries are the same, sort entries by - # status so "Fail" async job is placed before "New" and "Success" - # async jobs - _query = _query.order_by(sql.asc('status')) - latest_job = _query[0].to_dict() - if latest_job['status'] == constants.JS_Fail: - jobs.append(latest_job) - return jobs + sql.func.min(models.AsyncJob.status)).join( + stmt, sql.and_(models.AsyncJob.type == stmt.c.type, + models.AsyncJob.resource_id == stmt.c.resource_id, + models.AsyncJob.timestamp == stmt.c.timestamp)) + query = query.group_by(models.AsyncJob.type, + models.AsyncJob.resource_id) + + for job_type, resource_id, status in query: + if status == constants.JS_Fail: + failed_jobs.append({'type': job_type, 'resource_id': resource_id}) + elif status == constants.JS_New: + new_jobs.append({'type': job_type, 'resource_id': resource_id}) + return failed_jobs, new_jobs def get_latest_timestamp(context, status, _type, resource_id): @@ -397,8 +415,27 @@ def finish_job(context, job_id, successful, timestamp): job_dict = {'status': status, 'timestamp': timestamp, 'extra_id': uuidutils.generate_uuid()} - core.update_resource(context, models.AsyncJob, - job_id, job_dict) + job = core.update_resource(context, models.AsyncJob, job_id, job_dict) + if status == constants.JS_Success: + log_dict = {'id': uuidutils.generate_uuid(), + 'type': job['type'], + 'timestamp': timestamp, + 'resource_id': job['resource_id']} + context.session.query(models.AsyncJob).filter( + sql.and_(models.AsyncJob.type == job['type'], + models.AsyncJob.resource_id == job['resource_id'], + models.AsyncJob.timestamp <= timestamp)).delete( + synchronize_session=False) + core.create_resource(context, models.AsyncJobLog, log_dict) + else: + # sqlite has problem handling "<" operator on timestamp, so we + # slide the timestamp a bit and use "<=" + timestamp = timestamp - datetime.timedelta(microseconds=1) + context.session.query(models.AsyncJob).filter( + sql.and_(models.AsyncJob.type == job['type'], + models.AsyncJob.resource_id == job['resource_id'], + models.AsyncJob.timestamp <= timestamp)).delete( + synchronize_session=False) def _is_user_context(context): diff --git a/tricircle/db/migrate_repo/versions/002_resource.py b/tricircle/db/migrate_repo/versions/002_resource.py index df9e7756..f27eea5a 100644 --- a/tricircle/db/migrate_repo/versions/002_resource.py +++ b/tricircle/db/migrate_repo/versions/002_resource.py @@ -49,7 +49,7 @@ def upgrade(migrate_engine): sql.Column('id', sql.String(length=36), primary_key=True), sql.Column('type', sql.String(length=36)), sql.Column('timestamp', sql.TIMESTAMP, - server_default=sql.text('CURRENT_TIMESTAMP')), + server_default=sql.text('CURRENT_TIMESTAMP'), index=True), sql.Column('status', sql.String(length=36)), sql.Column('resource_id', sql.String(length=127)), sql.Column('extra_id', sql.String(length=36)), @@ -59,7 +59,17 @@ def upgrade(migrate_engine): mysql_engine='InnoDB', mysql_charset='utf8') - tables = [async_jobs, resource_routings] + async_job_logs = sql.Table( + 'async_job_logs', meta, + sql.Column('id', sql.String(length=36), primary_key=True), + sql.Column('resource_id', sql.String(length=127)), + sql.Column('type', sql.String(length=36)), + sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP'), index=True), + mysql_engine='InnoDB', + mysql_charset='utf8') + + tables = [async_jobs, resource_routings, async_job_logs] for table in tables: table.create() diff --git a/tricircle/db/models.py b/tricircle/db/models.py index 98556247..61352803 100644 --- a/tricircle/db/models.py +++ b/tricircle/db/models.py @@ -94,7 +94,21 @@ class AsyncJob(core.ModelBase, core.DictBase): id = sql.Column('id', sql.String(length=36), primary_key=True) type = sql.Column('type', sql.String(length=36)) timestamp = sql.Column('timestamp', sql.TIMESTAMP, - server_default=sql.text('CURRENT_TIMESTAMP')) + server_default=sql.text('CURRENT_TIMESTAMP'), + index=True) status = sql.Column('status', sql.String(length=36)) resource_id = sql.Column('resource_id', sql.String(length=127)) extra_id = sql.Column('extra_id', sql.String(length=36)) + + +class AsyncJobLog(core.ModelBase, core.DictBase): + __tablename__ = 'async_job_logs' + + attributes = ['id', 'resource_id', 'type', 'timestamp'] + + id = sql.Column('id', sql.String(length=36), primary_key=True) + resource_id = sql.Column('resource_id', sql.String(length=127)) + type = sql.Column('type', sql.String(length=36)) + timestamp = sql.Column('timestamp', sql.TIMESTAMP, + server_default=sql.text('CURRENT_TIMESTAMP'), + index=True) diff --git a/tricircle/tests/unit/xjob/test_xmanager.py b/tricircle/tests/unit/xjob/test_xmanager.py index b9cb2b8e..f0a200e1 100644 --- a/tricircle/tests/unit/xjob/test_xmanager.py +++ b/tricircle/tests/unit/xjob/test_xmanager.py @@ -147,7 +147,7 @@ class XManagerTest(unittest.TestCase): core.get_engine().execute('pragma foreign_keys=on') for opt in xservice.common_opts: if opt.name in ('worker_handle_timeout', 'job_run_expire', - 'worker_sleep_time'): + 'worker_sleep_time', 'redo_time_span'): cfg.CONF.register_opt(opt) self.context = context.Context() self.xmanager = FakeXManager() @@ -317,8 +317,9 @@ class XManagerTest(unittest.TestCase): bridge_infos = self._prepare_east_west_network_test(top_router_id) ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id) self._prepare_dnat_test() - self.xmanager.configure_extra_routes(self.context, - payload={'router': top_router_id}) + db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) + self.xmanager.configure_extra_routes( + self.context, payload={constants.JT_ROUTER: top_router_id}) calls = [] ns_routes = [] for i in range(2): @@ -342,8 +343,9 @@ class XManagerTest(unittest.TestCase): top_router_id = 'router_id' bridge_infos = self._prepare_east_west_network_test(top_router_id) ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id) - self.xmanager.configure_extra_routes(self.context, - payload={'router': top_router_id}) + db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) + self.xmanager.configure_extra_routes( + self.context, payload={constants.JT_ROUTER: top_router_id}) calls = [] ns_routes = [] for i in range(2): @@ -366,8 +368,9 @@ class XManagerTest(unittest.TestCase): def test_configure_extra_routes(self, mock_update): top_router_id = 'router_id' bridge_infos = self._prepare_east_west_network_test(top_router_id) - self.xmanager.configure_extra_routes(self.context, - payload={'router': top_router_id}) + db_api.new_job(self.context, constants.JT_ROUTER, top_router_id) + self.xmanager.configure_extra_routes( + self.context, payload={constants.JT_ROUTER: top_router_id}) calls = [] for i in range(2): routes = [] @@ -435,8 +438,9 @@ class XManagerTest(unittest.TestCase): core.create_resource(self.context, models.ResourceRouting, route) + db_api.new_job(self.context, constants.JT_SEG_RULE_SETUP, project_id) self.xmanager.configure_security_group_rules( - self.context, payload={'seg_rule_setup': project_id}) + self.context, payload={constants.JT_SEG_RULE_SETUP: project_id}) calls = [mock.call(self.context, sg_rule_id_1)] mock_delete.assert_has_calls(calls) @@ -467,31 +471,32 @@ class XManagerTest(unittest.TestCase): mock_create.assert_has_calls(calls) def test_job_handle(self): - @xmanager._job_handle('fake_resource') + job_type = 'fake_resource' + + @xmanager._job_handle(job_type) def fake_handle(self, ctx, payload): pass fake_id = 'fake_id' - payload = {'fake_resource': fake_id} + payload = {job_type: fake_id} + db_api.new_job(self.context, job_type, fake_id) fake_handle(None, self.context, payload=payload) - jobs = core.query_resource(self.context, models.AsyncJob, [], []) - expected_status = [constants.JS_New, constants.JS_Success] - job_status = [job['status'] for job in jobs] - six.assertCountEqual(self, expected_status, job_status) + logs = core.query_resource(self.context, models.AsyncJobLog, [], []) - self.assertEqual(fake_id, jobs[0]['resource_id']) - self.assertEqual(fake_id, jobs[1]['resource_id']) - self.assertEqual('fake_resource', jobs[0]['type']) - self.assertEqual('fake_resource', jobs[1]['type']) + self.assertEqual(fake_id, logs[0]['resource_id']) + self.assertEqual(job_type, logs[0]['type']) def test_job_handle_exception(self): - @xmanager._job_handle('fake_resource') + job_type = 'fake_resource' + + @xmanager._job_handle(job_type) def fake_handle(self, ctx, payload): raise Exception() fake_id = 'fake_id' - payload = {'fake_resource': fake_id} + payload = {job_type: fake_id} + db_api.new_job(self.context, job_type, fake_id) fake_handle(None, self.context, payload=payload) jobs = core.query_resource(self.context, models.AsyncJob, [], []) @@ -501,19 +506,22 @@ class XManagerTest(unittest.TestCase): self.assertEqual(fake_id, jobs[0]['resource_id']) self.assertEqual(fake_id, jobs[1]['resource_id']) - self.assertEqual('fake_resource', jobs[0]['type']) - self.assertEqual('fake_resource', jobs[1]['type']) + self.assertEqual(job_type, jobs[0]['type']) + self.assertEqual(job_type, jobs[1]['type']) def test_job_run_expire(self): - @xmanager._job_handle('fake_resource') + job_type = 'fake_resource' + + @xmanager._job_handle(job_type) def fake_handle(self, ctx, payload): pass fake_id = uuidutils.generate_uuid() - payload = {'fake_resource': fake_id} + payload = {job_type: fake_id} + db_api.new_job(self.context, job_type, fake_id) expired_job = { 'id': uuidutils.generate_uuid(), - 'type': 'fake_resource', + 'type': job_type, 'timestamp': datetime.datetime.now() - datetime.timedelta(0, 200), 'status': constants.JS_Running, 'resource_id': fake_id, @@ -522,19 +530,17 @@ class XManagerTest(unittest.TestCase): core.create_resource(self.context, models.AsyncJob, expired_job) fake_handle(None, self.context, payload=payload) - jobs = core.query_resource(self.context, models.AsyncJob, [], []) - expected_status = ['New', 'Fail', 'Success'] - job_status = [job['status'] for job in jobs] - six.assertCountEqual(self, expected_status, job_status) + logs = core.query_resource(self.context, models.AsyncJobLog, [], []) - for i in xrange(3): - self.assertEqual(fake_id, jobs[i]['resource_id']) - self.assertEqual('fake_resource', jobs[i]['type']) + self.assertEqual(fake_id, logs[0]['resource_id']) + self.assertEqual(job_type, logs[0]['type']) @patch.object(db_api, 'get_running_job') @patch.object(db_api, 'register_job') def test_worker_handle_timeout(self, mock_register, mock_get): - @xmanager._job_handle('fake_resource') + job_type = 'fake_resource' + + @xmanager._job_handle(job_type) def fake_handle(self, ctx, payload): pass @@ -543,13 +549,16 @@ class XManagerTest(unittest.TestCase): mock_get.return_value = None fake_id = uuidutils.generate_uuid() - payload = {'fake_resource': fake_id} + payload = {job_type: fake_id} + db_api.new_job(self.context, job_type, fake_id) fake_handle(None, self.context, payload=payload) # nothing to assert, what we test is that fake_handle can exit when # timeout - def test_get_failed_jobs(self): + @patch('oslo_utils.timeutils.utcnow') + def test_get_failed_or_new_jobs(self, mock_now): + mock_now.return_value = datetime.datetime(2000, 1, 2, 12, 0, 0) job_dict_list = [ {'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0), 'resource_id': 'uuid1', 'type': 'res1', @@ -565,10 +574,16 @@ class XManagerTest(unittest.TestCase): 'status': constants.JS_Fail}, # job_uuid7 {'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0), 'resource_id': 'uuid3', 'type': 'res3', - 'status': constants.JS_Fail}, # job_uuid9 + 'status': constants.JS_Success}, # job_uuid9 {'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0), - 'resource_id': 'uuid3', 'type': 'res3', - 'status': constants.JS_Success}] + 'resource_id': 'uuid4', 'type': 'res4', + 'status': constants.JS_New}, # job_uuid11 + {'timestamp': datetime.datetime(1999, 12, 31, 12, 0, 0), + 'resource_id': 'uuid5', 'type': 'res5', + 'status': constants.JS_Fail}, # job_uuid13 + {'timestamp': datetime.datetime(1999, 12, 31, 11, 59, 59), + 'resource_id': 'uuid6', 'type': 'res6', + 'status': constants.JS_Fail}] # job_uuid15 for i, job_dict in enumerate(job_dict_list, 1): job_dict['id'] = 'job_uuid%d' % (2 * i - 1) job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1) @@ -579,10 +594,16 @@ class XManagerTest(unittest.TestCase): core.create_resource(self.context, models.AsyncJob, job_dict) # for res3 + uuid3, the latest job's status is "Success", not returned - expected_ids = ['job_uuid3', 'job_uuid5'] - returned_jobs = db_api.get_latest_failed_jobs(self.context) - actual_ids = [job['id'] for job in returned_jobs] - six.assertCountEqual(self, expected_ids, actual_ids) + # for res6 + uuid6, the latest job is out of the redo time span + expected_failed_jobs = [ + {'resource_id': 'uuid1', 'type': 'res1'}, + {'resource_id': 'uuid2', 'type': 'res2'}, + {'resource_id': 'uuid5', 'type': 'res5'}] + expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4'}] + (failed_jobs, + new_jobs) = db_api.get_latest_failed_or_new_jobs(self.context) + six.assertCountEqual(self, expected_failed_jobs, failed_jobs) + six.assertCountEqual(self, expected_new_jobs, new_jobs) def tearDown(self): core.ModelBase.metadata.drop_all(core.get_engine()) diff --git a/tricircle/xjob/xmanager.py b/tricircle/xjob/xmanager.py index 676e6516..f86e5ce6 100644 --- a/tricircle/xjob/xmanager.py +++ b/tricircle/xjob/xmanager.py @@ -30,7 +30,7 @@ import neutronclient.common.exceptions as q_cli_exceptions from tricircle.common import client from tricircle.common import constants -from tricircle.common.i18n import _, _LE, _LI, _LW +from tricircle.common.i18n import _LE, _LI, _LW from tricircle.common import xrpcapi import tricircle.db.api as db_api from tricircle.db import core @@ -61,7 +61,6 @@ def _job_handle(job_type): payload = kwargs['payload'] resource_id = payload[job_type] - db_api.new_job(ctx, job_type, resource_id) start_time = datetime.datetime.now() while True: @@ -72,6 +71,8 @@ def _job_handle(job_type): break time_new = db_api.get_latest_timestamp(ctx, constants.JS_New, job_type, resource_id) + if not time_new: + break time_success = db_api.get_latest_timestamp( ctx, constants.JS_Success, job_type, resource_id) if time_success and time_success >= time_new: @@ -136,7 +137,7 @@ class XManager(PeriodicTasks): def __init__(self, host=None, service_name='xjob'): - LOG.debug(_('XManager initialization...')) + LOG.debug('XManager initialization...') if not host: host = CONF.host @@ -167,7 +168,6 @@ class XManager(PeriodicTasks): return self.run_periodic_tasks(context, raise_on_error=raise_on_error) def init_host(self): - """init_host Hook to do additional manager initialization when one requests @@ -175,25 +175,17 @@ class XManager(PeriodicTasks): is created. Child classes should override this method. """ - - LOG.debug(_('XManager init_host...')) - - pass + LOG.debug('XManager init_host...') def cleanup_host(self): - """cleanup_host Hook to do cleanup work when the service shuts down. Child classes should override this method. """ - - LOG.debug(_('XManager cleanup_host...')) - - pass + LOG.debug('XManager cleanup_host...') def pre_start_hook(self): - """pre_start_hook Hook to provide the manager the ability to do additional @@ -202,13 +194,9 @@ class XManager(PeriodicTasks): record is created. Child classes should override this method. """ - - LOG.debug(_('XManager pre_start_hook...')) - - pass + LOG.debug('XManager pre_start_hook...') def post_start_hook(self): - """post_start_hook Hook to provide the manager the ability to do additional @@ -216,10 +204,7 @@ class XManager(PeriodicTasks): and starts 'running'. Child classes should override this method. """ - - LOG.debug(_('XManager post_start_hook...')) - - pass + LOG.debug('XManager post_start_hook...') # rpc message endpoint handling def test_rpc(self, ctx, payload): @@ -245,21 +230,31 @@ class XManager(PeriodicTasks): 'value': router_id}]) @periodic_task.periodic_task - def redo_failed_job(self, ctx): - failed_jobs = db_api.get_latest_failed_jobs(ctx) + def redo_failed_or_new_job(self, ctx): + failed_jobs, new_jobs = db_api.get_latest_failed_or_new_jobs(ctx) failed_jobs = [ job for job in failed_jobs if job['type'] in self.job_handles] - if not failed_jobs: + new_jobs = [ + job for job in new_jobs if job['type'] in self.job_handles] + if not failed_jobs and not new_jobs: return + if new_jobs: + jobs = new_jobs + is_new_job = True + else: + jobs = failed_jobs + is_new_job = False # in one run we only pick one job to handle - job_index = random.randint(0, len(failed_jobs) - 1) - failed_job = failed_jobs[job_index] - job_type = failed_job['type'] - payload = {job_type: failed_job['resource_id']} - LOG.debug(_('Redo failed job for %(resource_id)s of type ' - '%(job_type)s'), - {'resource_id': failed_job['resource_id'], - 'job_type': job_type}) + job_index = random.randint(0, len(jobs) - 1) + job_type = jobs[job_index]['type'] + resource_id = jobs[job_index]['resource_id'] + payload = {job_type: resource_id} + LOG.debug('Redo %(status)s job for %(resource_id)s of type ' + '%(job_type)s', + {'status': 'new' if is_new_job else 'failed', + 'resource_id': resource_id, 'job_type': job_type}) + if not is_new_job: + db_api.new_job(ctx, job_type, resource_id) self.job_handles[job_type](ctx, payload=payload) @staticmethod diff --git a/tricircle/xjob/xservice.py b/tricircle/xjob/xservice.py index 5d5eb016..cd579ee4 100644 --- a/tricircle/xjob/xservice.py +++ b/tricircle/xjob/xservice.py @@ -54,6 +54,11 @@ common_opts = [ " seconds")), cfg.FloatOpt('worker_sleep_time', default=0.1, help=_("Seconds a worker sleeps after one run in a loop")), + cfg.IntOpt('redo_time_span', default=172800, + help=_("Time span in seconds, we calculate the latest job " + "timestamp by subtracting this time span from the " + "current timestamp, jobs created between these two " + "timestamps will be redone")), cfg.BoolOpt('enable_api_gateway', default=False, help=_('Whether the Nova API gateway is enabled'))