diff --git a/cloudpulse/TestManager/TestManager.py b/cloudpulse/TestManager/TestManager.py index b8ae2d1..5342ebd 100644 --- a/cloudpulse/TestManager/TestManager.py +++ b/cloudpulse/TestManager/TestManager.py @@ -12,15 +12,18 @@ from cloudpulse.common import context as cloudpulse_context from cloudpulse.common.plugin import discover +from cloudpulse.common import utils +from cloudpulse.conductor import cpulse_lock from cloudpulse.db.sqlalchemy import api as dbapi from cloudpulse import objects from cloudpulse.openstack.common import service as os_service from cloudpulse.scenario import base +import datetime import logging from oslo_config import cfg from oslo_utils import importutils +import pytz import textwrap -import threading cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', group='keystone_authtoken') @@ -28,35 +31,40 @@ cfg.CONF.import_opt('auth_uri', 'keystonemiddleware.auth_token', CONF = cfg.CONF -dblock = threading.RLock() - - -def acquireLock(): - dblock.acquire() - - -def releaseLock(): - dblock.release() - - LOG = logging.getLogger(__name__) class Periodic_Task(object): + def __init__(self, task): self.task = task + self.conductor_id = '1' def create_task_entry(self, context): test = {} test['state'] = 'created' test['testtype'] = 'periodic' test['name'] = self.task + test['uuid'] = utils.generate_uuid() new_test = objects.Cpulse(context, **test) - acquireLock() - new_test.create() - releaseLock() + with cpulse_lock.thread_lock(new_test, self.conductor_id): + new_test.create() return new_test + def check_if_already_run(self, context): + tasks = CONF.periodic_tests + task_interval = int(tasks[self.task]) + filters = {} + filters['name'] = self.task + lasttest = objects.Cpulse.list(context, filters=filters)[-1] + lastime = lasttest['created_at'] + timenow = datetime.datetime.now(pytz.utc) + timesincelast = (timenow - lastime).seconds + if timesincelast >= task_interval: + return True + else: + return False + def run_task(self): importutils.import_module('keystonemiddleware.auth_token') username = cfg.CONF.keystone_authtoken.username @@ -68,12 +76,13 @@ class Periodic_Task(object): auth_url=auth_url, user=username, project=tenant_name) - - new_test = self.create_task_entry(context) - test_manager.run(test=new_test) + if self.check_if_already_run(context): + new_test = self.create_task_entry(context) + test_manager.run(test=new_test) class Periodic_TestManager(os_service.Service): + def __init__(self): super(Periodic_TestManager, self).__init__() @@ -92,8 +101,10 @@ class Periodic_TestManager(os_service.Service): class TestManager(object): + def __init__(self): self.command_ref = {} + self.conductor_id = '1' discover.import_modules_from_package("cloudpulse.scenario.plugins") for scenario_group in discover.itersubclasses(base.Scenario): for method in dir(scenario_group): @@ -105,9 +116,8 @@ class TestManager(object): Test = kwargs['test'] func = self.command_ref[Test['name']] Test['state'] = 'running' - acquireLock() - self.update_test(Test['uuid'], Test) - releaseLock() + with cpulse_lock.thread_lock(Test, self.conductor_id): + self.update_test(Test['uuid'], Test) result = func() if result[0] == 200: Test['state'] = 'success' @@ -115,9 +125,8 @@ class TestManager(object): else: Test['state'] = 'failed' Test['result'] = textwrap.fill(str(result[1]), 40) - acquireLock() - self.update_test(Test['uuid'], Test) - releaseLock() + with cpulse_lock.thread_lock(Test, self.conductor_id): + self.update_test(Test['uuid'], Test) def run_periodic(self, **kwargs): Test = kwargs['test'] @@ -128,9 +137,8 @@ class TestManager(object): else: Test['state'] = 'failed' Test['result'] = textwrap.fill(str(result[1]), 40) - acquireLock() - self.update_test(Test['uuid'], Test) - releaseLock() + with cpulse_lock.thread_lock(Test, self.conductor_id): + self.update_test(Test['uuid'], Test) def update_test(self, tuuid, patch): npatch = {} diff --git a/cloudpulse/common/exception.py b/cloudpulse/common/exception.py index 15097d3..8d9ab09 100644 --- a/cloudpulse/common/exception.py +++ b/cloudpulse/common/exception.py @@ -170,6 +170,7 @@ def wrap_keystone_exception(func): class CloudpulseException(Exception): + """Base Cloudpulse Exception To correctly use this class, inherit from it and define @@ -338,6 +339,10 @@ class RequiredParameterNotProvided(CloudpulseException): message = _("Required parameter %(heat_param)s not provided.") +class TestLocked(CloudpulseException): + message = _("A process has already locked the database for update") + + class Urllib2InvalidScheme(CloudpulseException): message = _("The urllib2 URL %(url) has an invalid scheme.") diff --git a/cloudpulse/conductor/cpulse_lock.py b/cloudpulse/conductor/cpulse_lock.py new file mode 100644 index 0000000..a499d80 --- /dev/null +++ b/cloudpulse/conductor/cpulse_lock.py @@ -0,0 +1,57 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cloudpulse.common import exception +from cloudpulse import objects +import contextlib +import logging +from oslo_utils import excutils + +LOG = logging.getLogger(__name__) + + +class CpulseLock(object): + + def __init__(self, cpulse_test, conductor_id): + self.cpulse_test = cpulse_test + self.conductor_id = conductor_id + + def acquire(self, retry=True): + lock_conductor_id = objects.CpulseLock.create(self.cpulse_test.name, + self.conductor_id) + if lock_conductor_id is None: + return + else: + raise exception.TestLocked(uuid=self.cpulse_test.name) + + def release(self, test_name): + result = objects.CpulseLock.release(test_name, self.conductor_id) + if result is True: + LOG.debug("Lock was already released on test %s!" % test_name) + else: + LOG.debug("Lock has been released") + + +@contextlib.contextmanager +def thread_lock(cpulse_test, conductor_id): + cpulselock = CpulseLock(cpulse_test, conductor_id) + try: + cpulselock.acquire() + yield + except exception.TestLocked: + raise + except: # noqa + with excutils.save_and_reraise_exception(): + cpulselock.release(cpulse_test.name) + finally: + cpulselock.release(cpulse_test.name) diff --git a/cloudpulse/db/api.py b/cloudpulse/db/api.py index a7eb576..d4206d4 100644 --- a/cloudpulse/db/api.py +++ b/cloudpulse/db/api.py @@ -125,31 +125,31 @@ class Connection(object): """ @abc.abstractmethod - def create_test_lock(self, test_uuid): + def create_test_lock(self, test_name): """Create a new testlock. This method will fail if the test has already been locked. - :param test_uuid: The uuid of a test. + :param test_name: The name of a test. :returns: None if success. Otherwise, the id of the the test. """ @abc.abstractmethod - def steal_test_lock(self, test_uuid): + def steal_test_lock(self, test_name): """Steal lock of a test. Lock the test with test id if the test is currently locked. - :param test_uuid: The uuid of a test. + :param test_name: The name of a test. :returns: None if success. True if the test is not locked. Otherwise, the id of the test. """ @abc.abstractmethod - def release_test_lock(self, test_uuid): + def release_test_lock(self, test_name): """Release lock of a test. - :param test_uuid: The uuid of a test. + :param test_name: The name of a test. :returns: None if success. True otherwise. """ diff --git a/cloudpulse/db/sqlalchemy/api.py b/cloudpulse/db/sqlalchemy/api.py index b20ee35..5628947 100644 --- a/cloudpulse/db/sqlalchemy/api.py +++ b/cloudpulse/db/sqlalchemy/api.py @@ -100,6 +100,7 @@ def _paginate_query(model, limit=None, marker=None, sort_key=None, class Connection(api.Connection): + """SqlAlchemy connection.""" def __init__(self): @@ -126,6 +127,7 @@ class Connection(api.Connection): sort_key=None, sort_dir=None): # query = model_query(models.cpulse) query = model_query(models.cpulse) + query = self._add_tests_filters(query, filters) return _paginate_query(models.cpulse, limit, marker, sort_key, sort_dir, query) @@ -211,14 +213,37 @@ class Connection(api.Connection): ref.update(values) return ref - def create_test_lock(self, test_uuid, conductor_id): - pass + def create_test_lock(self, test_name, conductor_id): + session = get_session() + with session.begin(): + query = model_query(models.CpulseLock, session=session) + lock = query.filter_by(test_name=test_name).first() + if lock is not None: + return lock.conductor_id + session.add(models.CpulseLock(test_name=test_name, + conductor_id=conductor_id)) - def steal_test_lock(self, test_uuid, old_conductor_id, new_conductor_id): - pass + def steal_test_lock(self, test_name, old_conductor_id, new_conductor_id): + session = get_session() + with session.begin(): + query = model_query(models.CpulseLock, session=session) + lock = query.filter_by(test_name=test_name).first() + if lock is None: + return True + elif lock.conductor_id != old_conductor_id: + return lock.conductor_id + else: + lock.update({'conductor_id': new_conductor_id}) - def release_test_lock(self, test_uuid, conductor_id): - pass + def release_test_lock(self, test_name, conductor_id): + session = get_session() + with session.begin(): + query = model_query(models.CpulseLock, session=session) + query = query.filter_by(test_name=test_name, + conductor_id=conductor_id) + count = query.delete() + if count == 0: + return True def delete_old_tests(self, num_range, num_tests=cfg.CONF.database.max_db_entries): diff --git a/cloudpulse/db/sqlalchemy/models.py b/cloudpulse/db/sqlalchemy/models.py index 529b2f8..9afa2f6 100644 --- a/cloudpulse/db/sqlalchemy/models.py +++ b/cloudpulse/db/sqlalchemy/models.py @@ -135,9 +135,9 @@ class CpulseLock(Base): __tablename__ = 'cpulselock' __table_args__ = ( - schema.UniqueConstraint('test_uuid', name='uniq_testlock0test_uuid'), + schema.UniqueConstraint('test_name', name='uniq_testlock0test_name'), table_args() ) id = Column(Integer, primary_key=True) - test_uuid = Column(String(36)) + test_name = Column(String(64)) conductor_id = Column(String(64)) diff --git a/cloudpulse/objects/__init__.py b/cloudpulse/objects/__init__.py index 78a0850..655974f 100644 --- a/cloudpulse/objects/__init__.py +++ b/cloudpulse/objects/__init__.py @@ -13,7 +13,10 @@ # under the License. from cloudpulse.objects import cpulse +from cloudpulse.objects import cpulselock + +CpulseLock = cpulselock.CpulseLock Cpulse = cpulse.Cpulse __all__ = (Cpulse) diff --git a/cloudpulse/objects/cpulse.py b/cloudpulse/objects/cpulse.py index 812469e..87a64c8 100644 --- a/cloudpulse/objects/cpulse.py +++ b/cloudpulse/objects/cpulse.py @@ -118,7 +118,7 @@ class Cpulse(base.CloudpulsePersistentObject, base.CloudpulseObject, @base.remotable_classmethod def list(cls, context, limit=None, marker=None, - sort_key=None, sort_dir=None): + sort_key=None, sort_dir=None, filters=None): """Return a list of Cpulse objects. :param context: Security context. @@ -132,7 +132,8 @@ class Cpulse(base.CloudpulsePersistentObject, base.CloudpulseObject, db = cls.dbapi.get_test_list(context, limit=limit, marker=marker, sort_key=sort_key, - sort_dir=sort_dir) + sort_dir=sort_dir, + filters=filters) return Cpulse._from_db_object_list(db, cls, context) @base.remotable diff --git a/cloudpulse/objects/cpulselock.py b/cloudpulse/objects/cpulselock.py new file mode 100644 index 0000000..63e075c --- /dev/null +++ b/cloudpulse/objects/cpulselock.py @@ -0,0 +1,44 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_versionedobjects import fields + +from cloudpulse.db import api as dbapi +from cloudpulse.objects import base + + +@base.CloudpulseObjectRegistry.register +class CpulseLock(base.CloudpulsePersistentObject, base.CloudpulseObject, + base.CloudpulseObjectDictCompat): + # Version 1.0: Initial version + VERSION = '1.0' + + dbapi = dbapi.get_instance() + + fields = { + 'test_name': fields.StringField(nullable=True), + 'conductor_id': fields.StringField(nullable=True), + } + + @base.remotable_classmethod + def create(cls, test_name, conductor_id): + return cls.dbapi.create_test_lock(test_name, conductor_id) + + @base.remotable_classmethod + def steal(cls, test_name, old_conductor_id, new_conductor_id): + return cls.dbapi.steal_test_lock(test_name, old_conductor_id, + new_conductor_id) + + @base.remotable_classmethod + def release(cls, test_name, conductor_id): + return cls.dbapi.release_test_lock(test_name, conductor_id)