XJob reliability improvement

1. What is the problem?
Currently we use cast method to trigger XJob daemon which we have
risk to lose jobs.

2. What is the solution to the problem?
As proposed in the specification[1], we create a database record
for each new job before invoking cast method, so the job will not
be lost even if there is something wrong with the message broker.

3. What the features need to be implemented to the Tricircle
to realize the solution?
XJob reliability is enhanced.

[1] https://review.openstack.org/#/c/412762/

Change-Id: I86f37093aa1843f1daaf61b99541d3f921ef28ef
This commit is contained in:
zhiyuan_cai 2017-01-05 09:22:29 +08:00
parent ac26a377f1
commit 13f3f31bf1
9 changed files with 193 additions and 101 deletions

View File

@ -118,6 +118,10 @@ xjob.conf.
- (Integer) Timeout for worker's one turn of processing, in seconds
* - ``worker_sleep_time`` = ``60``
- (Float) Seconds a worker sleeps after one run in a loop
* - ``redo_time_span`` = ``172800``
- (Integer) Time span in seconds, we calculate the latest job timestamp by
subtracting this time span from the current timestamp, jobs created
between these two timestamps will be redone
Networking Setting for Tricircle
================================

View File

@ -66,10 +66,10 @@ DEFAULT_DESTINATION = '0.0.0.0/0'
expire_time = datetime.datetime(2000, 1, 1)
# job status
JS_New = 'New'
JS_Running = 'Running'
JS_Success = 'Success'
JS_Fail = 'Fail'
JS_New = '3_New'
JS_Running = '2_Running'
JS_Success = '1_Success'
JS_Fail = '0_Fail'
SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000'
TOP = 'top'

View File

@ -23,6 +23,7 @@ from tricircle.common import constants
from tricircle.common import rpc
from tricircle.common import serializer as t_serializer
from tricircle.common import topics
import tricircle.db.api as db_api
CONF = cfg.CONF
@ -73,6 +74,7 @@ class XJobAPI(object):
def setup_bottom_router(self, ctxt, net_id, router_id, pod_id):
combine_id = '%s#%s#%s' % (pod_id, router_id, net_id)
db_api.new_job(ctxt, constants.JT_ROUTER_SETUP, combine_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'setup_bottom_router',
payload={constants.JT_ROUTER_SETUP: combine_id})
@ -82,23 +84,27 @@ class XJobAPI(object):
# control exchange is "neutron", however, we starts xjob without
# specifying its control exchange, so the default value "openstack" is
# used, thus we need to pass exchange as "openstack" here.
db_api.new_job(ctxt, constants.JT_ROUTER, router_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'configure_extra_routes',
payload={constants.JT_ROUTER: router_id})
def delete_server_port(self, ctxt, port_id, pod_id):
combine_id = '%s#%s' % (pod_id, port_id)
db_api.new_job(ctxt, constants.JT_PORT_DELETE, combine_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'delete_server_port',
payload={constants.JT_PORT_DELETE: combine_id})
def configure_security_group_rules(self, ctxt, project_id):
db_api.new_job(ctxt, constants.JT_SEG_RULE_SETUP, project_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'configure_security_group_rules',
payload={constants.JT_SEG_RULE_SETUP: project_id})
def update_network(self, ctxt, network_id, pod_id):
combine_id = '%s#%s' % (pod_id, network_id)
db_api.new_job(ctxt, constants.JT_NETWORK_UPDATE, combine_id)
self.client.prepare(exchange='openstack').cast(
ctxt, 'update_network',
payload={constants.JT_NETWORK_UPDATE: combine_id})

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import functools
import sqlalchemy as sql
import time
@ -20,6 +21,7 @@ import time
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import uuidutils
from tricircle.common import constants
@ -346,24 +348,40 @@ def register_job(context, _type, resource_id):
context.session.close()
def get_latest_failed_jobs(context):
jobs = []
def get_latest_failed_or_new_jobs(context):
current_timestamp = timeutils.utcnow()
time_span = datetime.timedelta(seconds=CONF.redo_time_span)
latest_timestamp = current_timestamp - time_span
failed_jobs = []
new_jobs = []
# first we group the jobs by type and resource id, and in each group we
# pick the latest timestamp
stmt = context.session.query(
models.AsyncJob.type, models.AsyncJob.resource_id,
sql.func.max(models.AsyncJob.timestamp).label('timestamp'))
stmt = stmt.filter(models.AsyncJob.timestamp >= latest_timestamp)
stmt = stmt.group_by(models.AsyncJob.type,
models.AsyncJob.resource_id).subquery()
# then we join the result with the original table and group again, in each
# group, we pick the "minimum" of the status, for status, the ascendant
# sort sequence is "0_Fail", "1_Success", "2_Running", "3_New"
query = context.session.query(models.AsyncJob.type,
models.AsyncJob.resource_id,
sql.func.count(models.AsyncJob.id))
query = query.group_by(models.AsyncJob.type, models.AsyncJob.resource_id)
for job_type, resource_id, count in query:
_query = context.session.query(models.AsyncJob)
_query = _query.filter_by(type=job_type, resource_id=resource_id)
_query = _query.order_by(sql.desc('timestamp'))
# when timestamps of async job entries are the same, sort entries by
# status so "Fail" async job is placed before "New" and "Success"
# async jobs
_query = _query.order_by(sql.asc('status'))
latest_job = _query[0].to_dict()
if latest_job['status'] == constants.JS_Fail:
jobs.append(latest_job)
return jobs
sql.func.min(models.AsyncJob.status)).join(
stmt, sql.and_(models.AsyncJob.type == stmt.c.type,
models.AsyncJob.resource_id == stmt.c.resource_id,
models.AsyncJob.timestamp == stmt.c.timestamp))
query = query.group_by(models.AsyncJob.type,
models.AsyncJob.resource_id)
for job_type, resource_id, status in query:
if status == constants.JS_Fail:
failed_jobs.append({'type': job_type, 'resource_id': resource_id})
elif status == constants.JS_New:
new_jobs.append({'type': job_type, 'resource_id': resource_id})
return failed_jobs, new_jobs
def get_latest_timestamp(context, status, _type, resource_id):
@ -397,8 +415,27 @@ def finish_job(context, job_id, successful, timestamp):
job_dict = {'status': status,
'timestamp': timestamp,
'extra_id': uuidutils.generate_uuid()}
core.update_resource(context, models.AsyncJob,
job_id, job_dict)
job = core.update_resource(context, models.AsyncJob, job_id, job_dict)
if status == constants.JS_Success:
log_dict = {'id': uuidutils.generate_uuid(),
'type': job['type'],
'timestamp': timestamp,
'resource_id': job['resource_id']}
context.session.query(models.AsyncJob).filter(
sql.and_(models.AsyncJob.type == job['type'],
models.AsyncJob.resource_id == job['resource_id'],
models.AsyncJob.timestamp <= timestamp)).delete(
synchronize_session=False)
core.create_resource(context, models.AsyncJobLog, log_dict)
else:
# sqlite has problem handling "<" operator on timestamp, so we
# slide the timestamp a bit and use "<="
timestamp = timestamp - datetime.timedelta(microseconds=1)
context.session.query(models.AsyncJob).filter(
sql.and_(models.AsyncJob.type == job['type'],
models.AsyncJob.resource_id == job['resource_id'],
models.AsyncJob.timestamp <= timestamp)).delete(
synchronize_session=False)
def _is_user_context(context):

View File

@ -49,7 +49,7 @@ def upgrade(migrate_engine):
sql.Column('id', sql.String(length=36), primary_key=True),
sql.Column('type', sql.String(length=36)),
sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP')),
server_default=sql.text('CURRENT_TIMESTAMP'), index=True),
sql.Column('status', sql.String(length=36)),
sql.Column('resource_id', sql.String(length=127)),
sql.Column('extra_id', sql.String(length=36)),
@ -59,7 +59,17 @@ def upgrade(migrate_engine):
mysql_engine='InnoDB',
mysql_charset='utf8')
tables = [async_jobs, resource_routings]
async_job_logs = sql.Table(
'async_job_logs', meta,
sql.Column('id', sql.String(length=36), primary_key=True),
sql.Column('resource_id', sql.String(length=127)),
sql.Column('type', sql.String(length=36)),
sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'), index=True),
mysql_engine='InnoDB',
mysql_charset='utf8')
tables = [async_jobs, resource_routings, async_job_logs]
for table in tables:
table.create()

View File

@ -94,7 +94,21 @@ class AsyncJob(core.ModelBase, core.DictBase):
id = sql.Column('id', sql.String(length=36), primary_key=True)
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'))
server_default=sql.text('CURRENT_TIMESTAMP'),
index=True)
status = sql.Column('status', sql.String(length=36))
resource_id = sql.Column('resource_id', sql.String(length=127))
extra_id = sql.Column('extra_id', sql.String(length=36))
class AsyncJobLog(core.ModelBase, core.DictBase):
__tablename__ = 'async_job_logs'
attributes = ['id', 'resource_id', 'type', 'timestamp']
id = sql.Column('id', sql.String(length=36), primary_key=True)
resource_id = sql.Column('resource_id', sql.String(length=127))
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'),
index=True)

View File

@ -147,7 +147,7 @@ class XManagerTest(unittest.TestCase):
core.get_engine().execute('pragma foreign_keys=on')
for opt in xservice.common_opts:
if opt.name in ('worker_handle_timeout', 'job_run_expire',
'worker_sleep_time'):
'worker_sleep_time', 'redo_time_span'):
cfg.CONF.register_opt(opt)
self.context = context.Context()
self.xmanager = FakeXManager()
@ -317,8 +317,9 @@ class XManagerTest(unittest.TestCase):
bridge_infos = self._prepare_east_west_network_test(top_router_id)
ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id)
self._prepare_dnat_test()
self.xmanager.configure_extra_routes(self.context,
payload={'router': top_router_id})
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
calls = []
ns_routes = []
for i in range(2):
@ -342,8 +343,9 @@ class XManagerTest(unittest.TestCase):
top_router_id = 'router_id'
bridge_infos = self._prepare_east_west_network_test(top_router_id)
ns_bridge_ip, ns_router_id = self._prepare_snat_test(top_router_id)
self.xmanager.configure_extra_routes(self.context,
payload={'router': top_router_id})
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
calls = []
ns_routes = []
for i in range(2):
@ -366,8 +368,9 @@ class XManagerTest(unittest.TestCase):
def test_configure_extra_routes(self, mock_update):
top_router_id = 'router_id'
bridge_infos = self._prepare_east_west_network_test(top_router_id)
self.xmanager.configure_extra_routes(self.context,
payload={'router': top_router_id})
db_api.new_job(self.context, constants.JT_ROUTER, top_router_id)
self.xmanager.configure_extra_routes(
self.context, payload={constants.JT_ROUTER: top_router_id})
calls = []
for i in range(2):
routes = []
@ -435,8 +438,9 @@ class XManagerTest(unittest.TestCase):
core.create_resource(self.context, models.ResourceRouting,
route)
db_api.new_job(self.context, constants.JT_SEG_RULE_SETUP, project_id)
self.xmanager.configure_security_group_rules(
self.context, payload={'seg_rule_setup': project_id})
self.context, payload={constants.JT_SEG_RULE_SETUP: project_id})
calls = [mock.call(self.context, sg_rule_id_1)]
mock_delete.assert_has_calls(calls)
@ -467,31 +471,32 @@ class XManagerTest(unittest.TestCase):
mock_create.assert_has_calls(calls)
def test_job_handle(self):
@xmanager._job_handle('fake_resource')
job_type = 'fake_resource'
@xmanager._job_handle(job_type)
def fake_handle(self, ctx, payload):
pass
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.AsyncJob, [], [])
expected_status = [constants.JS_New, constants.JS_Success]
job_status = [job['status'] for job in jobs]
six.assertCountEqual(self, expected_status, job_status)
logs = core.query_resource(self.context, models.AsyncJobLog, [], [])
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
self.assertEqual(fake_id, logs[0]['resource_id'])
self.assertEqual(job_type, logs[0]['type'])
def test_job_handle_exception(self):
@xmanager._job_handle('fake_resource')
job_type = 'fake_resource'
@xmanager._job_handle(job_type)
def fake_handle(self, ctx, payload):
raise Exception()
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.AsyncJob, [], [])
@ -501,19 +506,22 @@ class XManagerTest(unittest.TestCase):
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
self.assertEqual(job_type, jobs[0]['type'])
self.assertEqual(job_type, jobs[1]['type'])
def test_job_run_expire(self):
@xmanager._job_handle('fake_resource')
job_type = 'fake_resource'
@xmanager._job_handle(job_type)
def fake_handle(self, ctx, payload):
pass
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
expired_job = {
'id': uuidutils.generate_uuid(),
'type': 'fake_resource',
'type': job_type,
'timestamp': datetime.datetime.now() - datetime.timedelta(0, 200),
'status': constants.JS_Running,
'resource_id': fake_id,
@ -522,19 +530,17 @@ class XManagerTest(unittest.TestCase):
core.create_resource(self.context, models.AsyncJob, expired_job)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.AsyncJob, [], [])
expected_status = ['New', 'Fail', 'Success']
job_status = [job['status'] for job in jobs]
six.assertCountEqual(self, expected_status, job_status)
logs = core.query_resource(self.context, models.AsyncJobLog, [], [])
for i in xrange(3):
self.assertEqual(fake_id, jobs[i]['resource_id'])
self.assertEqual('fake_resource', jobs[i]['type'])
self.assertEqual(fake_id, logs[0]['resource_id'])
self.assertEqual(job_type, logs[0]['type'])
@patch.object(db_api, 'get_running_job')
@patch.object(db_api, 'register_job')
def test_worker_handle_timeout(self, mock_register, mock_get):
@xmanager._job_handle('fake_resource')
job_type = 'fake_resource'
@xmanager._job_handle(job_type)
def fake_handle(self, ctx, payload):
pass
@ -543,13 +549,16 @@ class XManagerTest(unittest.TestCase):
mock_get.return_value = None
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
payload = {job_type: fake_id}
db_api.new_job(self.context, job_type, fake_id)
fake_handle(None, self.context, payload=payload)
# nothing to assert, what we test is that fake_handle can exit when
# timeout
def test_get_failed_jobs(self):
@patch('oslo_utils.timeutils.utcnow')
def test_get_failed_or_new_jobs(self, mock_now):
mock_now.return_value = datetime.datetime(2000, 1, 2, 12, 0, 0)
job_dict_list = [
{'timestamp': datetime.datetime(2000, 1, 1, 12, 0, 0),
'resource_id': 'uuid1', 'type': 'res1',
@ -565,10 +574,16 @@ class XManagerTest(unittest.TestCase):
'status': constants.JS_Fail}, # job_uuid7
{'timestamp': datetime.datetime(2000, 1, 1, 12, 25, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Fail}, # job_uuid9
'status': constants.JS_Success}, # job_uuid9
{'timestamp': datetime.datetime(2000, 1, 1, 12, 30, 0),
'resource_id': 'uuid3', 'type': 'res3',
'status': constants.JS_Success}]
'resource_id': 'uuid4', 'type': 'res4',
'status': constants.JS_New}, # job_uuid11
{'timestamp': datetime.datetime(1999, 12, 31, 12, 0, 0),
'resource_id': 'uuid5', 'type': 'res5',
'status': constants.JS_Fail}, # job_uuid13
{'timestamp': datetime.datetime(1999, 12, 31, 11, 59, 59),
'resource_id': 'uuid6', 'type': 'res6',
'status': constants.JS_Fail}] # job_uuid15
for i, job_dict in enumerate(job_dict_list, 1):
job_dict['id'] = 'job_uuid%d' % (2 * i - 1)
job_dict['extra_id'] = 'extra_uuid%d' % (2 * i - 1)
@ -579,10 +594,16 @@ class XManagerTest(unittest.TestCase):
core.create_resource(self.context, models.AsyncJob, job_dict)
# for res3 + uuid3, the latest job's status is "Success", not returned
expected_ids = ['job_uuid3', 'job_uuid5']
returned_jobs = db_api.get_latest_failed_jobs(self.context)
actual_ids = [job['id'] for job in returned_jobs]
six.assertCountEqual(self, expected_ids, actual_ids)
# for res6 + uuid6, the latest job is out of the redo time span
expected_failed_jobs = [
{'resource_id': 'uuid1', 'type': 'res1'},
{'resource_id': 'uuid2', 'type': 'res2'},
{'resource_id': 'uuid5', 'type': 'res5'}]
expected_new_jobs = [{'resource_id': 'uuid4', 'type': 'res4'}]
(failed_jobs,
new_jobs) = db_api.get_latest_failed_or_new_jobs(self.context)
six.assertCountEqual(self, expected_failed_jobs, failed_jobs)
six.assertCountEqual(self, expected_new_jobs, new_jobs)
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())

View File

@ -30,7 +30,7 @@ import neutronclient.common.exceptions as q_cli_exceptions
from tricircle.common import client
from tricircle.common import constants
from tricircle.common.i18n import _, _LE, _LI, _LW
from tricircle.common.i18n import _LE, _LI, _LW
from tricircle.common import xrpcapi
import tricircle.db.api as db_api
from tricircle.db import core
@ -61,7 +61,6 @@ def _job_handle(job_type):
payload = kwargs['payload']
resource_id = payload[job_type]
db_api.new_job(ctx, job_type, resource_id)
start_time = datetime.datetime.now()
while True:
@ -72,6 +71,8 @@ def _job_handle(job_type):
break
time_new = db_api.get_latest_timestamp(ctx, constants.JS_New,
job_type, resource_id)
if not time_new:
break
time_success = db_api.get_latest_timestamp(
ctx, constants.JS_Success, job_type, resource_id)
if time_success and time_success >= time_new:
@ -136,7 +137,7 @@ class XManager(PeriodicTasks):
def __init__(self, host=None, service_name='xjob'):
LOG.debug(_('XManager initialization...'))
LOG.debug('XManager initialization...')
if not host:
host = CONF.host
@ -167,7 +168,6 @@ class XManager(PeriodicTasks):
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""init_host
Hook to do additional manager initialization when one requests
@ -175,25 +175,17 @@ class XManager(PeriodicTasks):
is created.
Child classes should override this method.
"""
LOG.debug(_('XManager init_host...'))
pass
LOG.debug('XManager init_host...')
def cleanup_host(self):
"""cleanup_host
Hook to do cleanup work when the service shuts down.
Child classes should override this method.
"""
LOG.debug(_('XManager cleanup_host...'))
pass
LOG.debug('XManager cleanup_host...')
def pre_start_hook(self):
"""pre_start_hook
Hook to provide the manager the ability to do additional
@ -202,13 +194,9 @@ class XManager(PeriodicTasks):
record is created.
Child classes should override this method.
"""
LOG.debug(_('XManager pre_start_hook...'))
pass
LOG.debug('XManager pre_start_hook...')
def post_start_hook(self):
"""post_start_hook
Hook to provide the manager the ability to do additional
@ -216,10 +204,7 @@ class XManager(PeriodicTasks):
and starts 'running'.
Child classes should override this method.
"""
LOG.debug(_('XManager post_start_hook...'))
pass
LOG.debug('XManager post_start_hook...')
# rpc message endpoint handling
def test_rpc(self, ctx, payload):
@ -245,21 +230,31 @@ class XManager(PeriodicTasks):
'value': router_id}])
@periodic_task.periodic_task
def redo_failed_job(self, ctx):
failed_jobs = db_api.get_latest_failed_jobs(ctx)
def redo_failed_or_new_job(self, ctx):
failed_jobs, new_jobs = db_api.get_latest_failed_or_new_jobs(ctx)
failed_jobs = [
job for job in failed_jobs if job['type'] in self.job_handles]
if not failed_jobs:
new_jobs = [
job for job in new_jobs if job['type'] in self.job_handles]
if not failed_jobs and not new_jobs:
return
if new_jobs:
jobs = new_jobs
is_new_job = True
else:
jobs = failed_jobs
is_new_job = False
# in one run we only pick one job to handle
job_index = random.randint(0, len(failed_jobs) - 1)
failed_job = failed_jobs[job_index]
job_type = failed_job['type']
payload = {job_type: failed_job['resource_id']}
LOG.debug(_('Redo failed job for %(resource_id)s of type '
'%(job_type)s'),
{'resource_id': failed_job['resource_id'],
'job_type': job_type})
job_index = random.randint(0, len(jobs) - 1)
job_type = jobs[job_index]['type']
resource_id = jobs[job_index]['resource_id']
payload = {job_type: resource_id}
LOG.debug('Redo %(status)s job for %(resource_id)s of type '
'%(job_type)s',
{'status': 'new' if is_new_job else 'failed',
'resource_id': resource_id, 'job_type': job_type})
if not is_new_job:
db_api.new_job(ctx, job_type, resource_id)
self.job_handles[job_type](ctx, payload=payload)
@staticmethod

View File

@ -54,6 +54,11 @@ common_opts = [
" seconds")),
cfg.FloatOpt('worker_sleep_time', default=0.1,
help=_("Seconds a worker sleeps after one run in a loop")),
cfg.IntOpt('redo_time_span', default=172800,
help=_("Time span in seconds, we calculate the latest job "
"timestamp by subtracting this time span from the "
"current timestamp, jobs created between these two "
"timestamps will be redone")),
cfg.BoolOpt('enable_api_gateway',
default=False,
help=_('Whether the Nova API gateway is enabled'))