Merge "Asynchronous job management(part 1)"

This commit is contained in:
Jenkins 2016-03-31 07:34:55 +00:00 committed by Gerrit Code Review
commit d317e3895b
7 changed files with 300 additions and 4 deletions

View File

@ -58,3 +58,11 @@ ns_bridge_port_name = 'ns_bridge_port_%s_%s_%s'
MAX_INT = 0x7FFFFFFF
expire_time = datetime.datetime(2000, 1, 1)
# job status
JS_New = 'New'
JS_Running = 'Running'
JS_Success = 'Success'
JS_Fail = 'Fail'
SP_EXTRA_ID = '00000000-0000-0000-0000-000000000000'

View File

@ -21,7 +21,9 @@ from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import uuidutils
from tricircle.common import constants
from tricircle.common.context import is_admin_context as _is_admin_context
from tricircle.common import exceptions
from tricircle.common.i18n import _
@ -202,6 +204,72 @@ def get_pod_by_name(context, pod_name):
return None
def new_job(context, _type, resource_id):
with context.session.begin():
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_New,
'resource_id': resource_id,
'extra_id': uuidutils.generate_uuid()}
job = core.create_resource(context, models.Job, job_dict)
return job
def register_job(context, _type, resource_id):
try:
context.session.begin()
job_dict = {'id': uuidutils.generate_uuid(),
'type': _type,
'status': constants.JS_Running,
'resource_id': resource_id,
'extra_id': constants.SP_EXTRA_ID}
job = core.create_resource(context, models.Job, job_dict)
context.session.commit()
return job
except db_exc.DBDuplicateEntry:
context.session.rollback()
return None
except db_exc.DBDeadlock:
context.session.rollback()
return None
finally:
context.session.close()
def get_latest_timestamp(context, status, _type, resource_id):
jobs = core.query_resource(
context, models.Job,
[{'key': 'status', 'comparator': 'eq', 'value': status},
{'key': 'type', 'comparator': 'eq', 'value': _type},
{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id}],
[('timestamp', False)])
if jobs:
return jobs[0]['timestamp']
else:
return None
def get_running_job(context, _type, resource_id):
jobs = core.query_resource(
context, models.Job,
[{'key': 'resource_id', 'comparator': 'eq', 'value': resource_id},
{'key': 'status', 'comparator': 'eq', 'value': constants.JS_Running},
{'key': 'type', 'comparator': 'eq', 'value': _type}], [])
if jobs:
return jobs[0]
else:
return None
def finish_job(context, job_id, successful, timestamp):
status = constants.JS_Success if successful else constants.JS_Fail
with context.session.begin():
job_dict = {'status': status,
'timestamp': timestamp,
'extra_id': uuidutils.generate_uuid()}
core.update_resource(context, models.Job, job_id, job_dict)
_DEFAULT_QUOTA_NAME = 'default'

View File

@ -217,10 +217,25 @@ def upgrade(migrate_engine):
mysql_engine='InnoDB',
mysql_charset='utf8')
job = sql.Table(
'job', meta,
sql.Column('id', sql.String(length=36), primary_key=True),
sql.Column('type', sql.String(length=36)),
sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP')),
sql.Column('status', sql.String(length=36)),
sql.Column('resource_id', sql.String(length=36)),
sql.Column('extra_id', sql.String(length=36)),
migrate.UniqueConstraint(
'type', 'status', 'resource_id', 'extra_id',
name='job0type0status0resource_id0extra_id'),
mysql_engine='InnoDB',
mysql_charset='utf8')
tables = [aggregates, aggregate_metadata, instance_types,
instance_type_projects, instance_type_extra_specs, key_pairs,
quotas, quota_classes, quota_usages, reservations,
volume_types,
volume_types, job,
quality_of_service_specs, cascaded_pods_resource_routing]
for table in tables:
table.create()

View File

@ -385,3 +385,23 @@ class ResourceRouting(core.ModelBase, core.DictBase, models.TimestampMixin):
project_id = sql.Column('project_id', sql.String(length=36))
resource_type = sql.Column('resource_type', sql.String(length=64),
nullable=False)
class Job(core.ModelBase, core.DictBase):
__tablename__ = 'job'
__table_args__ = (
schema.UniqueConstraint(
'type', 'status', 'resource_id', 'extra_id',
name='job0type0status0resource_id0extra_id'),
)
attributes = ['id', 'type', 'timestamp', 'status', 'resource_id',
'extra_id']
id = sql.Column('id', sql.String(length=36), primary_key=True)
type = sql.Column('type', sql.String(length=36))
timestamp = sql.Column('timestamp', sql.TIMESTAMP,
server_default=sql.text('CURRENT_TIMESTAMP'))
status = sql.Column('status', sql.String(length=36))
resource_id = sql.Column('resource_id', sql.String(length=36))
extra_id = sql.Column('extra_id', sql.String(length=36))

View File

@ -13,15 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
from mock import patch
import unittest
from oslo_config import cfg
from oslo_utils import uuidutils
from tricircle.common import constants
from tricircle.common import context
import tricircle.db.api as db_api
from tricircle.db import core
from tricircle.db import models
from tricircle.xjob import xmanager
from tricircle.xjob import xservice
BOTTOM1_NETWORK = []
@ -32,7 +38,8 @@ BOTTOM1_PORT = []
BOTTOM2_PORT = []
BOTTOM1_ROUTER = []
BOTTOM2_ROUTER = []
RES_LIST = [BOTTOM1_SUBNET, BOTTOM2_SUBNET, BOTTOM1_PORT, BOTTOM2_PORT]
RES_LIST = [BOTTOM1_NETWORK, BOTTOM2_NETWORK, BOTTOM1_SUBNET, BOTTOM2_SUBNET,
BOTTOM1_PORT, BOTTOM2_PORT, BOTTOM1_ROUTER, BOTTOM2_ROUTER]
RES_MAP = {'pod_1': {'network': BOTTOM1_NETWORK,
'subnet': BOTTOM1_SUBNET,
'port': BOTTOM1_PORT,
@ -93,6 +100,10 @@ class XManagerTest(unittest.TestCase):
core.ModelBase.metadata.create_all(core.get_engine())
# enforce foreign key constraint for sqlite
core.get_engine().execute('pragma foreign_keys=on')
for opt in xservice.common_opts:
if opt.name in ('worker_handle_timeout', 'job_run_expire',
'worker_sleep_time'):
cfg.CONF.register_opt(opt)
self.context = context.Context()
self.xmanager = FakeXManager()
@ -160,7 +171,7 @@ class XManagerTest(unittest.TestCase):
'ip_address': '10.0.3.1'}]})
self.xmanager.configure_extra_routes(self.context,
{'router': top_router_id})
payload={'router': top_router_id})
calls = [mock.call(self.context, 'router_1_id',
{'router': {
'routes': [{'nexthop': '100.0.1.2',
@ -172,3 +183,91 @@ class XManagerTest(unittest.TestCase):
{'nexthop': '100.0.1.1',
'destination': '10.0.3.0/24'}]}})]
mock_update.assert_has_calls(calls)
def test_job_handle(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = [constants.JS_New, constants.JS_Success]
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
def test_job_handle_exception(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
raise Exception()
fake_id = 'fake_id'
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = [constants.JS_New, constants.JS_Fail]
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
self.assertEqual(fake_id, jobs[0]['resource_id'])
self.assertEqual(fake_id, jobs[1]['resource_id'])
self.assertEqual('fake_resource', jobs[0]['type'])
self.assertEqual('fake_resource', jobs[1]['type'])
def test_job_run_expire(self):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
expired_job = {
'id': uuidutils.generate_uuid(),
'type': 'fake_resource',
'timestamp': datetime.datetime.now() - datetime.timedelta(0, 120),
'status': constants.JS_Running,
'resource_id': fake_id,
'extra_id': constants.SP_EXTRA_ID
}
core.create_resource(self.context, models.Job, expired_job)
fake_handle(None, self.context, payload=payload)
jobs = core.query_resource(self.context, models.Job, [], [])
expected_status = ['New', 'Fail', 'Success']
job_status = [job['status'] for job in jobs]
self.assertItemsEqual(expected_status, job_status)
for i in xrange(3):
self.assertEqual(fake_id, jobs[i]['resource_id'])
self.assertEqual('fake_resource', jobs[i]['type'])
@patch.object(db_api, 'get_running_job')
@patch.object(db_api, 'register_job')
def test_worker_handle_timeout(self, mock_register, mock_get):
@xmanager._job_handle('fake_resource')
def fake_handle(self, ctx, payload):
pass
cfg.CONF.set_override('worker_handle_timeout', 1)
mock_register.return_value = None
mock_get.return_value = None
fake_id = uuidutils.generate_uuid()
payload = {'fake_resource': fake_id}
fake_handle(None, self.context, payload=payload)
# nothing to assert, what we test is that fake_handle can exit when
# timeout
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
for res in RES_LIST:
del res[:]

View File

@ -13,7 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import eventlet
import netaddr
import six
from oslo_config import cfg
from oslo_log import log as logging
@ -23,7 +26,9 @@ from oslo_service import periodic_task
from tricircle.common import client
from tricircle.common import constants
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common.i18n import _LI
from tricircle.common.i18n import _LW
import tricircle.db.api as db_api
@ -31,6 +36,78 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _job_handle(job_type):
def handle_func(func):
@six.wraps(func)
def handle_args(*args, **kwargs):
ctx = args[1]
payload = kwargs['payload']
resource_id = payload[job_type]
db_api.new_job(ctx, job_type, resource_id)
start_time = datetime.datetime.now()
while True:
current_time = datetime.datetime.now()
delta = current_time - start_time
if delta.seconds >= CONF.worker_handle_timeout:
# quit when this handle is running for a long time
break
time_new = db_api.get_latest_timestamp(ctx, constants.JS_New,
job_type, resource_id)
time_success = db_api.get_latest_timestamp(
ctx, constants.JS_Success, job_type, resource_id)
if time_success and time_success >= time_new:
break
job = db_api.register_job(ctx, job_type, resource_id)
if not job:
# fail to obtain the lock, let other worker handle the job
running_job = db_api.get_running_job(ctx, job_type,
resource_id)
if not running_job:
# there are two reasons that running_job is None. one
# is that the running job has just been finished, the
# other is that all workers fail to register the job
# due to deadlock exception. so we sleep and try again
eventlet.sleep(CONF.worker_sleep_time)
continue
job_time = running_job['timestamp']
current_time = datetime.datetime.now()
delta = current_time - job_time
if delta.seconds > CONF.job_run_expire:
# previous running job expires, we set its status to
# fail and try again to obtain the lock
db_api.finish_job(ctx, running_job['id'], False,
time_new)
LOG.warning(_LW('Job %(job)s of type %(job_type)s for '
'resource %(resource)s expires, set '
'its state to Fail'),
{'job': running_job['id'],
'job_type': job_type,
'resource': resource_id})
eventlet.sleep(CONF.worker_sleep_time)
continue
else:
# previous running job is still valid, we just leave
# the job to the worker who holds the lock
break
# successfully obtain the lock, start to execute handler
try:
func(*args, **kwargs)
except Exception:
db_api.finish_job(ctx, job['id'], False, time_new)
LOG.error(_LE('Job %(job)s of type %(job_type)s for '
'resource %(resource)s fails'),
{'job': job['id'],
'job_type': job_type,
'resource': resource_id})
break
db_api.finish_job(ctx, job['id'], True, time_new)
eventlet.sleep(CONF.worker_sleep_time)
return handle_args
return handle_func
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
@ -128,6 +205,7 @@ class XManager(PeriodicTasks):
return info_text
@_job_handle('router')
def configure_extra_routes(self, ctx, payload):
# TODO(zhiyuan) performance and reliability issue
# better have a job tracking mechanism

View File

@ -47,7 +47,15 @@ common_opts = [
cfg.StrOpt('host', default='tricircle.xhost',
help=_("The host name for RPC server")),
cfg.IntOpt('workers', default=1,
help=_("number of workers")),
help=_("Number of workers")),
cfg.IntOpt('worker_handle_timeout', default=1800,
help=_("Timeout for worker's one turn of processing, in"
" seconds")),
cfg.IntOpt('job_run_expire', default=60,
help=_("Running job is considered expires after this time, in"
" seconds")),
cfg.FloatOpt('worker_sleep_time', default=0.1,
help=_("Seconds a worker sleeps after one run in a loop"))
]
service_opts = [