Adding changes for the database lock for the cpulse HA

Change-Id: Iaa3f6a68a497e0f14f14a8cf3b67f0c338000550
This commit is contained in:
Anand Shanmugam 2015-08-26 23:32:23 -07:00
parent 4b8dd7df37
commit c24ad6d386
9 changed files with 186 additions and 43 deletions

View File

@ -12,15 +12,18 @@
from cloudpulse.common import context as cloudpulse_context
from cloudpulse.common.plugin import discover
from cloudpulse.common import utils
from cloudpulse.conductor import cpulse_lock
from cloudpulse.db.sqlalchemy import api as dbapi
from cloudpulse import objects
from cloudpulse.openstack.common import service as os_service
from cloudpulse.scenario import base
import datetime
import logging
from oslo_config import cfg
from oslo_utils import importutils
import pytz
import textwrap
import threading
cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
group='keystone_authtoken')
@ -28,35 +31,40 @@ cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token',
CONF = cfg.CONF
dblock = threading.RLock()
def acquireLock():
dblock.acquire()
def releaseLock():
dblock.release()
LOG = logging.getLogger(__name__)
class Periodic_Task(object):
def __init__(self, task):
self.task = task
self.conductor_id = '1'
def create_task_entry(self, context):
test = {}
test['state'] = 'created'
test['testtype'] = 'periodic'
test['name'] = self.task
test['uuid'] = utils.generate_uuid()
new_test = objects.Cpulse(context, **test)
acquireLock()
new_test.create()
releaseLock()
with cpulse_lock.thread_lock(new_test, self.conductor_id):
new_test.create()
return new_test
def check_if_already_run(self, context):
tasks = CONF.periodic_tests
task_interval = int(tasks[self.task])
filters = {}
filters['name'] = self.task
lasttest = objects.Cpulse.list(context, filters=filters)[-1]
lastime = lasttest['created_at']
timenow = datetime.datetime.now(pytz.utc)
timesincelast = (timenow - lastime).seconds
if timesincelast >= task_interval:
return True
else:
return False
def run_task(self):
importutils.import_module('keystonemiddleware.auth_token')
username = cfg.CONF.keystone_authtoken.username
@ -68,12 +76,13 @@ class Periodic_Task(object):
auth_url=auth_url,
user=username,
project=tenant_name)
new_test = self.create_task_entry(context)
test_manager.run(test=new_test)
if self.check_if_already_run(context):
new_test = self.create_task_entry(context)
test_manager.run(test=new_test)
class Periodic_TestManager(os_service.Service):
def __init__(self):
super(Periodic_TestManager, self).__init__()
@ -92,8 +101,10 @@ class Periodic_TestManager(os_service.Service):
class TestManager(object):
def __init__(self):
self.command_ref = {}
self.conductor_id = '1'
discover.import_modules_from_package("cloudpulse.scenario.plugins")
for scenario_group in discover.itersubclasses(base.Scenario):
for method in dir(scenario_group):
@ -105,9 +116,8 @@ class TestManager(object):
Test = kwargs['test']
func = self.command_ref[Test['name']]
Test['state'] = 'running'
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
with cpulse_lock.thread_lock(Test, self.conductor_id):
self.update_test(Test['uuid'], Test)
result = func()
if result[0] == 200:
Test['state'] = 'success'
@ -115,9 +125,8 @@ class TestManager(object):
else:
Test['state'] = 'failed'
Test['result'] = textwrap.fill(str(result[1]), 40)
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
with cpulse_lock.thread_lock(Test, self.conductor_id):
self.update_test(Test['uuid'], Test)
def run_periodic(self, **kwargs):
Test = kwargs['test']
@ -128,9 +137,8 @@ class TestManager(object):
else:
Test['state'] = 'failed'
Test['result'] = textwrap.fill(str(result[1]), 40)
acquireLock()
self.update_test(Test['uuid'], Test)
releaseLock()
with cpulse_lock.thread_lock(Test, self.conductor_id):
self.update_test(Test['uuid'], Test)
def update_test(self, tuuid, patch):
npatch = {}

View File

@ -170,6 +170,7 @@ def wrap_keystone_exception(func):
class CloudpulseException(Exception):
"""Base Cloudpulse Exception
To correctly use this class, inherit from it and define
@ -338,6 +339,10 @@ class RequiredParameterNotProvided(CloudpulseException):
message = _("Required parameter %(heat_param)s not provided.")
class TestLocked(CloudpulseException):
message = _("A process has already locked the database for update")
class Urllib2InvalidScheme(CloudpulseException):
message = _("The urllib2 URL %(url) has an invalid scheme.")

View File

@ -0,0 +1,57 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudpulse.common import exception
from cloudpulse import objects
import contextlib
import logging
from oslo_utils import excutils
LOG = logging.getLogger(__name__)
class CpulseLock(object):
def __init__(self, cpulse_test, conductor_id):
self.cpulse_test = cpulse_test
self.conductor_id = conductor_id
def acquire(self, retry=True):
lock_conductor_id = objects.CpulseLock.create(self.cpulse_test.name,
self.conductor_id)
if lock_conductor_id is None:
return
else:
raise exception.TestLocked(uuid=self.cpulse_test.name)
def release(self, test_name):
result = objects.CpulseLock.release(test_name, self.conductor_id)
if result is True:
LOG.debug("Lock was already released on test %s!" % test_name)
else:
LOG.debug("Lock has been released")
@contextlib.contextmanager
def thread_lock(cpulse_test, conductor_id):
cpulselock = CpulseLock(cpulse_test, conductor_id)
try:
cpulselock.acquire()
yield
except exception.TestLocked:
raise
except: # noqa
with excutils.save_and_reraise_exception():
cpulselock.release(cpulse_test.name)
finally:
cpulselock.release(cpulse_test.name)

View File

@ -125,31 +125,31 @@ class Connection(object):
"""
@abc.abstractmethod
def create_test_lock(self, test_uuid):
def create_test_lock(self, test_name):
"""Create a new testlock.
This method will fail if the test has already been locked.
:param test_uuid: The uuid of a test.
:param test_name: The name of a test.
:returns: None if success.
Otherwise, the id of the the test.
"""
@abc.abstractmethod
def steal_test_lock(self, test_uuid):
def steal_test_lock(self, test_name):
"""Steal lock of a test.
Lock the test with test id if the test is currently locked.
:param test_uuid: The uuid of a test.
:param test_name: The name of a test.
:returns: None if success. True if the test is not locked.
Otherwise, the id of the test.
"""
@abc.abstractmethod
def release_test_lock(self, test_uuid):
def release_test_lock(self, test_name):
"""Release lock of a test.
:param test_uuid: The uuid of a test.
:param test_name: The name of a test.
:returns: None if success. True otherwise.
"""

View File

@ -100,6 +100,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None,
class Connection(api.Connection):
"""SqlAlchemy connection."""
def __init__(self):
@ -126,6 +127,7 @@ class Connection(api.Connection):
sort_key=None, sort_dir=None):
# query = model_query(models.cpulse)
query = model_query(models.cpulse)
query = self._add_tests_filters(query, filters)
return _paginate_query(models.cpulse, limit, marker,
sort_key, sort_dir, query)
@ -211,14 +213,37 @@ class Connection(api.Connection):
ref.update(values)
return ref
def create_test_lock(self, test_uuid, conductor_id):
pass
def create_test_lock(self, test_name, conductor_id):
session = get_session()
with session.begin():
query = model_query(models.CpulseLock, session=session)
lock = query.filter_by(test_name=test_name).first()
if lock is not None:
return lock.conductor_id
session.add(models.CpulseLock(test_name=test_name,
conductor_id=conductor_id))
def steal_test_lock(self, test_uuid, old_conductor_id, new_conductor_id):
pass
def steal_test_lock(self, test_name, old_conductor_id, new_conductor_id):
session = get_session()
with session.begin():
query = model_query(models.CpulseLock, session=session)
lock = query.filter_by(test_name=test_name).first()
if lock is None:
return True
elif lock.conductor_id != old_conductor_id:
return lock.conductor_id
else:
lock.update({'conductor_id': new_conductor_id})
def release_test_lock(self, test_uuid, conductor_id):
pass
def release_test_lock(self, test_name, conductor_id):
session = get_session()
with session.begin():
query = model_query(models.CpulseLock, session=session)
query = query.filter_by(test_name=test_name,
conductor_id=conductor_id)
count = query.delete()
if count == 0:
return True
def delete_old_tests(self, num_range,
num_tests=cfg.CONF.database.max_db_entries):

View File

@ -135,9 +135,9 @@ class CpulseLock(Base):
__tablename__ = 'cpulselock'
__table_args__ = (
schema.UniqueConstraint('test_uuid', name='uniq_testlock0test_uuid'),
schema.UniqueConstraint('test_name', name='uniq_testlock0test_name'),
table_args()
)
id = Column(Integer, primary_key=True)
test_uuid = Column(String(36))
test_name = Column(String(64))
conductor_id = Column(String(64))

View File

@ -13,7 +13,10 @@
# under the License.
from cloudpulse.objects import cpulse
from cloudpulse.objects import cpulselock
CpulseLock = cpulselock.CpulseLock
Cpulse = cpulse.Cpulse
__all__ = (Cpulse)

View File

@ -118,7 +118,7 @@ class Cpulse(base.CloudpulsePersistentObject, base.CloudpulseObject,
@base.remotable_classmethod
def list(cls, context, limit=None, marker=None,
sort_key=None, sort_dir=None):
sort_key=None, sort_dir=None, filters=None):
"""Return a list of Cpulse objects.
:param context: Security context.
@ -132,7 +132,8 @@ class Cpulse(base.CloudpulsePersistentObject, base.CloudpulseObject,
db = cls.dbapi.get_test_list(context, limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
sort_dir=sort_dir,
filters=filters)
return Cpulse._from_db_object_list(db, cls, context)
@base.remotable

View File

@ -0,0 +1,44 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_versionedobjects import fields
from cloudpulse.db import api as dbapi
from cloudpulse.objects import base
@base.CloudpulseObjectRegistry.register
class CpulseLock(base.CloudpulsePersistentObject, base.CloudpulseObject,
base.CloudpulseObjectDictCompat):
# Version 1.0: Initial version
VERSION = '1.0'
dbapi = dbapi.get_instance()
fields = {
'test_name': fields.StringField(nullable=True),
'conductor_id': fields.StringField(nullable=True),
}
@base.remotable_classmethod
def create(cls, test_name, conductor_id):
return cls.dbapi.create_test_lock(test_name, conductor_id)
@base.remotable_classmethod
def steal(cls, test_name, old_conductor_id, new_conductor_id):
return cls.dbapi.steal_test_lock(test_name, old_conductor_id,
new_conductor_id)
@base.remotable_classmethod
def release(cls, test_name, conductor_id):
return cls.dbapi.release_test_lock(test_name, conductor_id)