Fixed service manage not cleaning properly
When spliting the engine up into multiple services the cleanup job was left on the conductor, but in reality each service needs to handle their own clean up, as the service startup order is never guaranteed. This patch refactors the service workflow and moves all cleanup logic to the base class. Closes-Bug: 1975440 Change-Id: I718910a124780804ec8e14c2e99697025e4c4df0
This commit is contained in:
parent
d8070bcf77
commit
ee2959af3c
|
@ -87,7 +87,7 @@ class ServiceManageCommand(object):
|
|||
|
||||
status = 'up'
|
||||
CONF.import_opt('periodic_interval', 'senlin.conf')
|
||||
max_interval = 2 * CONF.periodic_interval
|
||||
max_interval = 2.2 * CONF.periodic_interval
|
||||
if timeutils.is_older_than(service.updated_at, max_interval):
|
||||
status = 'down'
|
||||
|
||||
|
|
|
@ -11,15 +11,17 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
|
||||
from oslo_service import sslutils
|
||||
from oslo_service import wsgi
|
||||
from oslo_utils import netutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from senlin.common import context as senlin_context
|
||||
import senlin.conf
|
||||
from senlin.objects import service as service_obj
|
||||
from senlin import version
|
||||
|
||||
CONF = senlin.conf.CONF
|
||||
|
@ -28,24 +30,122 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
class Service(service.Service):
|
||||
def __init__(self, name, host, topic, threads=None):
|
||||
threads = threads or 1000
|
||||
super(Service, self).__init__(threads)
|
||||
self.tg = None
|
||||
super(Service, self).__init__(threads or 1000)
|
||||
self.name = name
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
|
||||
self.server = None
|
||||
self.service_id = None
|
||||
self.cleanup_timer = None
|
||||
self.cleanup_count = 0
|
||||
self.service_report_timer = None
|
||||
|
||||
# Start the service cleanup process. This is only going to be
|
||||
# running on the main process.
|
||||
if self.tg:
|
||||
self.cleanup_timer = self.tg.add_timer(
|
||||
CONF.periodic_interval, self.service_manage_cleanup
|
||||
)
|
||||
|
||||
def start(self):
|
||||
LOG.info('Starting %(name)s service (version: %(version)s)',
|
||||
{
|
||||
'name': self.name,
|
||||
'version': version.version_info.version_string()
|
||||
})
|
||||
super(Service, self).start()
|
||||
self.service_id = uuidutils.generate_uuid()
|
||||
LOG.info(
|
||||
'Starting %(name)s service (version: %(version)s '
|
||||
'id: %(service_id)s)',
|
||||
{
|
||||
'name': self.name,
|
||||
'version': version.version_info.version_string(),
|
||||
'service_id': self.service_id,
|
||||
}
|
||||
)
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.create(
|
||||
ctx, self.service_id, self.host, self.name, self.topic
|
||||
)
|
||||
self.service_report_timer = self.tg.add_timer(
|
||||
CONF.periodic_interval, self.service_manage_report
|
||||
)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
LOG.info('Stopping %(name)s service', {'name': self.name})
|
||||
LOG.info(
|
||||
'Stopping %(name)s service (id: %(service_id)s)',
|
||||
{
|
||||
'name': self.name,
|
||||
'service_id': self.service_id or 'main',
|
||||
}
|
||||
)
|
||||
if self.service_report_timer:
|
||||
self.service_report_timer.stop()
|
||||
self.service_report_timer = None
|
||||
if self.cleanup_timer:
|
||||
self.cleanup_timer.stop()
|
||||
self.cleanup_timer = None
|
||||
if self.service_id:
|
||||
service_obj.Service.delete(self.service_id)
|
||||
super(Service, self).stop(graceful)
|
||||
|
||||
def service_manage_cleanup(self):
|
||||
self.cleanup_count += 1
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
services = service_obj.Service.get_all_expired(
|
||||
ctx, self.name
|
||||
)
|
||||
for svc in services:
|
||||
LOG.info(
|
||||
'Breaking locks for dead service %(name)s '
|
||||
'(id: %(service_id)s)',
|
||||
{
|
||||
'name': self.name,
|
||||
'service_id': svc['id'],
|
||||
}
|
||||
)
|
||||
service_obj.Service.gc_by_engine(svc['id'])
|
||||
LOG.info(
|
||||
'Done breaking locks for service %(name)s '
|
||||
'(id: %(service_id)s)',
|
||||
{
|
||||
'name': self.name,
|
||||
'service_id': svc['id'],
|
||||
}
|
||||
)
|
||||
service_obj.Service.delete(svc['id'])
|
||||
except Exception as ex:
|
||||
LOG.error(
|
||||
'Error while cleaning up service %(name)s: %(ex)s',
|
||||
{
|
||||
'name': self.name,
|
||||
'ex': ex,
|
||||
}
|
||||
)
|
||||
|
||||
# The clean-up process runs during service startup and will over
|
||||
# multiple attempts check to see if any services have reach the
|
||||
# deadline and if so remove them. This is only done on startup, or
|
||||
# after a service recovers from a crash.
|
||||
if self.cleanup_count >= 5:
|
||||
self.cleanup_timer.stop()
|
||||
self.cleanup_timer = None
|
||||
LOG.info('Finished cleaning up dead services.')
|
||||
else:
|
||||
LOG.info('Service clean-up attempt count: %s', self.cleanup_count)
|
||||
|
||||
def service_manage_report(self):
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.update(ctx, self.service_id)
|
||||
except Exception as ex:
|
||||
LOG.error(
|
||||
'Error while updating service %(name)s: %(ex)s',
|
||||
{
|
||||
'name': self.name,
|
||||
'ex': ex,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class WSGIService(service.Service):
|
||||
def __init__(self, app, name, listen, max_url_len=None):
|
||||
|
|
|
@ -221,22 +221,22 @@ def get_path_parser(path):
|
|||
return expr
|
||||
|
||||
|
||||
def is_engine_dead(ctx, engine_id, duration=None):
|
||||
"""Check if an engine is dead.
|
||||
def is_service_dead(ctx, service_id, duration=None):
|
||||
"""Check if a service is dead.
|
||||
|
||||
If engine hasn't reported its status for the given duration, it is treated
|
||||
as a dead engine.
|
||||
If the service hasn't reported its status for the given duration, it is
|
||||
treated as a dead service.
|
||||
|
||||
:param ctx: A request context.
|
||||
:param engine_id: The ID of the engine to test.
|
||||
:param service_id: The ID of the service to test.
|
||||
:param duration: The time duration in seconds.
|
||||
"""
|
||||
if not duration:
|
||||
duration = 2 * cfg.CONF.periodic_interval
|
||||
duration = 2.2 * cfg.CONF.periodic_interval
|
||||
|
||||
eng = service_obj.Service.get(ctx, engine_id)
|
||||
if not eng:
|
||||
service = service_obj.Service.get(ctx, service_id)
|
||||
if not service:
|
||||
return True
|
||||
if timeutils.is_older_than(eng.updated_at, duration):
|
||||
if timeutils.is_older_than(service.updated_at, duration):
|
||||
return True
|
||||
return False
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import functools
|
||||
|
||||
|
@ -17,7 +16,6 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from osprofiler import profiler
|
||||
|
||||
from senlin.common import consts
|
||||
|
@ -46,7 +44,6 @@ from senlin.objects import node as node_obj
|
|||
from senlin.objects import policy as policy_obj
|
||||
from senlin.objects import profile as profile_obj
|
||||
from senlin.objects import receiver as receiver_obj
|
||||
from senlin.objects import service as service_obj
|
||||
from senlin.policies import base as policy_base
|
||||
from senlin.profiles import base as profile_base
|
||||
|
||||
|
@ -91,10 +88,7 @@ class ConductorService(service.Service):
|
|||
|
||||
# The following are initialized here and will be assigned in start()
|
||||
# which happens after the fork when spawning multiple worker processes
|
||||
self.server = None
|
||||
self.service_id = None
|
||||
self.cleanup_timer = None
|
||||
self.cleanup_count = 0
|
||||
self.target = None
|
||||
|
||||
# Initialize the global environment
|
||||
environment.initialize()
|
||||
|
@ -105,69 +99,21 @@ class ConductorService(service.Service):
|
|||
|
||||
def start(self):
|
||||
super(ConductorService, self).start()
|
||||
self.service_id = uuidutils.generate_uuid()
|
||||
|
||||
target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
|
||||
server=self.host,
|
||||
topic=self.topic)
|
||||
self.target = oslo_messaging.Target(version=consts.RPC_API_VERSION,
|
||||
server=self.host,
|
||||
topic=self.topic)
|
||||
serializer = obj_base.VersionedObjectSerializer()
|
||||
self.server = rpc_messaging.get_rpc_server(
|
||||
target, self, serializer=serializer)
|
||||
self.server = rpc_messaging.get_rpc_server(self.target, self,
|
||||
serializer=serializer)
|
||||
self.server.start()
|
||||
|
||||
# create service record
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.create(ctx, self.service_id, self.host,
|
||||
self.service_name, self.topic)
|
||||
|
||||
# we may want to make the clean-up attempts configurable.
|
||||
self.cleanup_timer = self.tg.add_timer(2 * CONF.periodic_interval,
|
||||
self.service_manage_cleanup)
|
||||
|
||||
self.tg.add_timer(CONF.periodic_interval, self.service_manage_report)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
def stop(self, graceful=False):
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
service_obj.Service.delete(self.service_id)
|
||||
LOG.info('Conductor %s is deleted', self.service_id)
|
||||
|
||||
super(ConductorService, self).stop(graceful)
|
||||
|
||||
def service_manage_report(self):
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.update(ctx, self.service_id)
|
||||
except Exception as ex:
|
||||
LOG.error('Error while updating engine service: %s', ex)
|
||||
|
||||
def _service_manage_cleanup(self):
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
time_window = (2 * CONF.periodic_interval)
|
||||
svcs = service_obj.Service.get_all(ctx)
|
||||
for svc in svcs:
|
||||
if svc['id'] == self.service_id:
|
||||
continue
|
||||
if timeutils.is_older_than(svc['updated_at'], time_window):
|
||||
LOG.info('Service %s was aborted', svc['id'])
|
||||
LOG.info('Breaking locks for dead engine %s', svc['id'])
|
||||
service_obj.Service.gc_by_engine(svc['id'])
|
||||
LOG.info('Done breaking locks for engine %s', svc['id'])
|
||||
service_obj.Service.delete(svc['id'])
|
||||
except Exception as ex:
|
||||
LOG.error('Error while cleaning up engine service: %s', ex)
|
||||
|
||||
def service_manage_cleanup(self):
|
||||
self._service_manage_cleanup()
|
||||
self.cleanup_count += 1
|
||||
LOG.info('Service clean-up attempt count: %s', self.cleanup_count)
|
||||
if self.cleanup_count >= 2:
|
||||
self.cleanup_timer.stop()
|
||||
LOG.info("Finished cleaning up dead services.")
|
||||
|
||||
@request_context
|
||||
def credential_create(self, ctx, req):
|
||||
"""Create the credential based on the context.
|
||||
|
|
|
@ -486,6 +486,10 @@ def service_get_all():
|
|||
return IMPL.service_get_all()
|
||||
|
||||
|
||||
def service_get_all_expired(binary):
|
||||
return IMPL.service_get_all_expired(binary)
|
||||
|
||||
|
||||
def gc_by_engine(engine_id):
|
||||
return IMPL.gc_by_engine(engine_id)
|
||||
|
||||
|
|
|
@ -78,6 +78,11 @@ def _get_main_context_manager():
|
|||
return _MAIN_CONTEXT_MANAGER
|
||||
|
||||
|
||||
def service_expired_time():
|
||||
return (timeutils.utcnow() -
|
||||
datetime.timedelta(seconds=2.2 * CONF.periodic_interval))
|
||||
|
||||
|
||||
def get_engine():
|
||||
return _get_main_context_manager().writer.get_engine()
|
||||
|
||||
|
@ -1605,6 +1610,15 @@ def service_get_all():
|
|||
return session.query(models.Service).all()
|
||||
|
||||
|
||||
def service_get_all_expired(binary):
|
||||
with session_for_read() as session:
|
||||
date_limit = service_expired_time()
|
||||
svc = models.Service
|
||||
return session.query(models.Service).filter(
|
||||
and_(svc.binary == binary, svc.updated_at <= date_limit)
|
||||
)
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def _mark_engine_failed(session, action_id, timestamp, reason=None):
|
||||
query = session.query(models.ActionDependency)
|
||||
|
|
|
@ -76,7 +76,7 @@ def cluster_lock_acquire(context, cluster_id, action_id, engine=None,
|
|||
# Will reach here only because scope == CLUSTER_SCOPE
|
||||
action = ao.Action.get(context, owners[0])
|
||||
if (action and action.owner and action.owner != engine and
|
||||
utils.is_engine_dead(context, action.owner)):
|
||||
utils.is_service_dead(context, action.owner)):
|
||||
LOG.info('The cluster %(c)s is locked by dead action %(a)s, '
|
||||
'try to steal the lock.',
|
||||
{'c': cluster_id, 'a': owners[0]})
|
||||
|
@ -133,7 +133,7 @@ def node_lock_acquire(context, node_id, action_id, engine=None,
|
|||
# if this node lock by dead engine
|
||||
action = ao.Action.get(context, owner)
|
||||
if (action and action.owner and action.owner != engine and
|
||||
utils.is_engine_dead(context, action.owner)):
|
||||
utils.is_service_dead(context, action.owner)):
|
||||
LOG.info('The node %(n)s is locked by dead action %(a)s, '
|
||||
'try to steal the lock.',
|
||||
{'n': node_id, 'a': owner})
|
||||
|
|
|
@ -16,18 +16,15 @@ from oslo_config import cfg
|
|||
from oslo_context import context as oslo_context
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_utils import uuidutils
|
||||
from osprofiler import profiler
|
||||
|
||||
from senlin.common import consts
|
||||
from senlin.common import context
|
||||
from senlin.common import context as senlin_context
|
||||
from senlin.common import messaging
|
||||
from senlin.common import service
|
||||
from senlin.engine.actions import base as action_mod
|
||||
from senlin.engine import event as EVENT
|
||||
from senlin.objects import action as ao
|
||||
from senlin.objects import service as service_obj
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
@ -49,8 +46,6 @@ class EngineService(service.Service):
|
|||
)
|
||||
self.version = consts.RPC_API_VERSION
|
||||
|
||||
self.server = None
|
||||
self.service_id = None
|
||||
self.target = None
|
||||
|
||||
# TODO(Yanyan Hu): Build a DB session with full privilege
|
||||
|
@ -71,7 +66,6 @@ class EngineService(service.Service):
|
|||
versioned object for parameter passing.
|
||||
"""
|
||||
super(EngineService, self).start()
|
||||
self.service_id = uuidutils.generate_uuid()
|
||||
|
||||
self.target = oslo_messaging.Target(server=self.service_id,
|
||||
topic=self.topic,
|
||||
|
@ -80,30 +74,12 @@ class EngineService(service.Service):
|
|||
self.server = messaging.get_rpc_server(self.target, self)
|
||||
self.server.start()
|
||||
|
||||
# create service record
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.create(ctx, self.service_id, self.host,
|
||||
self.service_name, self.topic)
|
||||
|
||||
self.tg.add_timer(CONF.periodic_interval, self.service_manage_report)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
def stop(self, graceful=False):
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
service_obj.Service.delete(self.service_id)
|
||||
LOG.info('Engine %s deleted', self.service_id)
|
||||
|
||||
super(EngineService, self).stop(graceful)
|
||||
|
||||
def service_manage_report(self):
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.update(ctx, self.service_id)
|
||||
except Exception as ex:
|
||||
LOG.error('Error while updating dispatcher service: %s', ex)
|
||||
|
||||
def execute(self, func, *args, **kwargs):
|
||||
"""Run the given method in a thread."""
|
||||
req_cnxt = oslo_context.get_current()
|
||||
|
|
|
@ -11,18 +11,15 @@
|
|||
# under the License.
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
import oslo_messaging
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
from osprofiler import profiler
|
||||
|
||||
from senlin.common import consts
|
||||
from senlin.common import context
|
||||
from senlin.common import context as senlin_context
|
||||
from senlin.common import messaging as rpc
|
||||
from senlin.common import service
|
||||
from senlin.engine import health_manager
|
||||
from senlin.objects import service as service_obj
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
@ -42,8 +39,6 @@ class HealthManagerService(service.Service):
|
|||
# The following are initialized here and will be assigned in start()
|
||||
# which happens after the fork when spawning multiple worker processes
|
||||
self.health_registry = None
|
||||
self.server = None
|
||||
self.service_id = None
|
||||
self.target = None
|
||||
|
||||
@property
|
||||
|
@ -52,45 +47,25 @@ class HealthManagerService(service.Service):
|
|||
|
||||
def start(self):
|
||||
super(HealthManagerService, self).start()
|
||||
self.service_id = uuidutils.generate_uuid()
|
||||
|
||||
self.health_registry = health_manager.RuntimeHealthRegistry(
|
||||
ctx=self.ctx, engine_id=self.service_id,
|
||||
thread_group=self.tg
|
||||
)
|
||||
|
||||
# create service record
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.create(ctx, self.service_id, self.host,
|
||||
self.service_name,
|
||||
self.topic)
|
||||
self.tg.add_timer(CONF.periodic_interval, self.service_manage_report)
|
||||
|
||||
self.target = messaging.Target(server=self.service_id,
|
||||
topic=self.topic,
|
||||
version=self.version)
|
||||
self.target = oslo_messaging.Target(server=self.service_id,
|
||||
topic=self.topic,
|
||||
version=self.version)
|
||||
self.server = rpc.get_rpc_server(self.target, self)
|
||||
self.server.start()
|
||||
|
||||
self.tg.add_dynamic_timer(self.task, None, cfg.CONF.periodic_interval)
|
||||
|
||||
def stop(self, graceful=True):
|
||||
def stop(self, graceful=False):
|
||||
if self.server:
|
||||
self.server.stop()
|
||||
self.server.wait()
|
||||
|
||||
service_obj.Service.delete(self.service_id)
|
||||
LOG.info('Health-manager %s deleted', self.service_id)
|
||||
|
||||
super(HealthManagerService, self).stop(graceful)
|
||||
|
||||
def service_manage_report(self):
|
||||
try:
|
||||
ctx = senlin_context.get_admin_context()
|
||||
service_obj.Service.update(ctx, self.service_id)
|
||||
except Exception as ex:
|
||||
LOG.error('Error while updating health-manager service: %s', ex)
|
||||
|
||||
def task(self):
|
||||
"""Task that is queued on the health manager thread group.
|
||||
|
||||
|
|
|
@ -48,6 +48,11 @@ class Service(base.SenlinObject, base.VersionedObjectDictCompat):
|
|||
objs = db_api.service_get_all()
|
||||
return [cls._from_db_object(context, cls(), obj) for obj in objs]
|
||||
|
||||
@classmethod
|
||||
def get_all_expired(cls, context, binary):
|
||||
objs = db_api.service_get_all_expired(binary)
|
||||
return [cls._from_db_object(context, cls(), obj) for obj in objs]
|
||||
|
||||
@classmethod
|
||||
def update(cls, context, obj_id, values=None):
|
||||
obj = db_api.service_update(obj_id, values=values)
|
||||
|
|
|
@ -123,13 +123,13 @@ class ConductorCleanupTest(base.SenlinTestCase):
|
|||
self.service_id = '4db0a14c-dc10-4131-8ed6-7573987ce9b0'
|
||||
self.topic = consts.HEALTH_MANAGER_TOPIC
|
||||
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'update')
|
||||
def test_conductor_manage_report(self, mock_update):
|
||||
cfg.CONF.set_override('periodic_interval', 0.1)
|
||||
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
|
||||
# start engine and verify that update is being called more than once
|
||||
self.svc.start()
|
||||
eventlet.sleep(0.6)
|
||||
|
@ -140,6 +140,9 @@ class ConductorCleanupTest(base.SenlinTestCase):
|
|||
def test_conductor_manage_report_with_exception(self, mock_update):
|
||||
cfg.CONF.set_override('periodic_interval', 0.1)
|
||||
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
|
||||
# start engine and verify that update is being called more than once
|
||||
# even with the exception being thrown
|
||||
mock_update.side_effect = Exception('blah')
|
||||
|
@ -149,34 +152,46 @@ class ConductorCleanupTest(base.SenlinTestCase):
|
|||
self.svc.stop()
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'gc_by_engine')
|
||||
@mock.patch.object(service_obj.Service, 'get_all')
|
||||
@mock.patch.object(service_obj.Service, 'get_all_expired')
|
||||
@mock.patch.object(service_obj.Service, 'delete')
|
||||
def test_service_manage_cleanup(self, mock_delete, mock_get_all, mock_gc):
|
||||
delta = datetime.timedelta(seconds=2 * cfg.CONF.periodic_interval)
|
||||
def test_service_manage_cleanup(self, mock_delete, mock_get_all_expired,
|
||||
mock_gc):
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
delta = datetime.timedelta(seconds=2.2 * cfg.CONF.periodic_interval)
|
||||
ages_a_go = timeutils.utcnow(True) - delta
|
||||
mock_get_all.return_value = [{'id': 'foo', 'updated_at': ages_a_go}]
|
||||
self.svc._service_manage_cleanup()
|
||||
mock_get_all_expired.return_value = [
|
||||
{'id': 'foo', 'updated_at': ages_a_go}
|
||||
]
|
||||
self.svc.service_manage_cleanup()
|
||||
mock_delete.assert_called_once_with('foo')
|
||||
mock_gc.assert_called_once_with('foo')
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'get_all')
|
||||
def test_service_manage_cleanup_without_exception(self, mock_get_all):
|
||||
@mock.patch.object(service_obj.Service, 'get_all_expired')
|
||||
def test_service_manage_cleanup_without_exception(self,
|
||||
mock_get_all_expired):
|
||||
cfg.CONF.set_override('periodic_interval', 0.1)
|
||||
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
|
||||
# start engine and verify that get_all is being called more than once
|
||||
self.svc.start()
|
||||
eventlet.sleep(0.6)
|
||||
self.assertGreater(mock_get_all.call_count, 1)
|
||||
self.assertGreater(mock_get_all_expired.call_count, 1)
|
||||
self.svc.stop()
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'get_all')
|
||||
def test_service_manage_cleanup_with_exception(self, mock_get_all):
|
||||
@mock.patch.object(service_obj.Service, 'get_all_expired')
|
||||
def test_service_manage_cleanup_with_exception(self, mock_get_all_expired):
|
||||
cfg.CONF.set_override('periodic_interval', 0.1)
|
||||
|
||||
self.svc = service.ConductorService('HOST', self.topic)
|
||||
self.svc.service_id = self.service_id
|
||||
|
||||
# start engine and verify that get_all is being called more than once
|
||||
# even with the exception being thrown
|
||||
mock_get_all.side_effect = Exception('blah')
|
||||
mock_get_all_expired.side_effect = Exception('blah')
|
||||
self.svc.start()
|
||||
eventlet.sleep(0.6)
|
||||
self.assertGreater(mock_get_all.call_count, 1)
|
||||
self.assertGreater(mock_get_all_expired.call_count, 1)
|
||||
self.svc.stop()
|
||||
|
|
|
@ -9,10 +9,13 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from senlin.db.sqlalchemy import api as db_api
|
||||
from senlin.db.sqlalchemy import models
|
||||
from senlin.tests.unit.common import base
|
||||
from senlin.tests.unit.common import utils
|
||||
|
||||
|
@ -29,9 +32,21 @@ class DBAPIServiceTest(base.SenlinTestCase):
|
|||
'binary': 'senlin-engine',
|
||||
'topic': 'engine',
|
||||
}
|
||||
|
||||
values.update(kwargs)
|
||||
return db_api.service_create(service_id, **values)
|
||||
|
||||
with db_api.session_for_write() as session:
|
||||
time_now = timeutils.utcnow(True)
|
||||
svc = models.Service(
|
||||
id=service_id,
|
||||
host=values.get('host'),
|
||||
binary=values.get('binary'),
|
||||
topic=values.get('topic'),
|
||||
created_at=values.get('created_at') or time_now,
|
||||
updated_at=values.get('updated_at') or time_now,
|
||||
)
|
||||
session.add(svc)
|
||||
|
||||
return svc
|
||||
|
||||
def test_service_create_get(self):
|
||||
service = self._create_service()
|
||||
|
@ -58,6 +73,31 @@ class DBAPIServiceTest(base.SenlinTestCase):
|
|||
|
||||
self.assertEqual(4, len(services))
|
||||
|
||||
def test_service_get_all_expired(self):
|
||||
for index in range(6):
|
||||
dt = timeutils.utcnow() - datetime.timedelta(seconds=60 * index)
|
||||
values = {
|
||||
'binary': 'senlin-health-manager',
|
||||
'host': 'host-%s' % index,
|
||||
'updated_at': dt
|
||||
}
|
||||
self._create_service(uuidutils.generate_uuid(), **values)
|
||||
|
||||
for index in range(8):
|
||||
dt = timeutils.utcnow() - datetime.timedelta(seconds=60 * index)
|
||||
values = {
|
||||
'binary': 'senlin-engine',
|
||||
'host': 'host-%s' % index,
|
||||
'updated_at': dt
|
||||
}
|
||||
self._create_service(uuidutils.generate_uuid(), **values)
|
||||
|
||||
services = db_api.service_get_all_expired('senlin-health-manager')
|
||||
self.assertEqual(3, len(services.all()))
|
||||
|
||||
services = db_api.service_get_all_expired('senlin-engine')
|
||||
self.assertEqual(5, len(services.all()))
|
||||
|
||||
def test_service_update(self):
|
||||
old_service = self._create_service()
|
||||
old_updated_time = old_service.updated_at
|
||||
|
|
|
@ -42,7 +42,7 @@ class SenlinLockTest(base.SenlinTestCase):
|
|||
mock_acquire.assert_called_once_with('CLUSTER_A', 'ACTION_XYZ',
|
||||
lockm.CLUSTER_SCOPE)
|
||||
|
||||
@mock.patch.object(common_utils, 'is_engine_dead')
|
||||
@mock.patch.object(common_utils, 'is_service_dead')
|
||||
@mock.patch.object(svco.Service, 'gc_by_engine')
|
||||
@mock.patch.object(clo.ClusterLock, "acquire")
|
||||
@mock.patch.object(clo.ClusterLock, "steal")
|
||||
|
@ -62,7 +62,7 @@ class SenlinLockTest(base.SenlinTestCase):
|
|||
mock_steal.assert_called_once_with('CLUSTER_A', 'ACTION_XYZ')
|
||||
mock_gc.assert_called_once_with(mock.ANY)
|
||||
|
||||
@mock.patch.object(common_utils, 'is_engine_dead')
|
||||
@mock.patch.object(common_utils, 'is_service_dead')
|
||||
@mock.patch.object(clo.ClusterLock, "acquire")
|
||||
def test_cluster_lock_acquire_failed(self, mock_acquire, mock_dead):
|
||||
mock_dead.return_value = False
|
||||
|
@ -90,7 +90,7 @@ class SenlinLockTest(base.SenlinTestCase):
|
|||
self.assertEqual(3, mock_acquire.call_count)
|
||||
mock_steal.assert_called_once_with('CLUSTER_A', 'ACTION_XY')
|
||||
|
||||
@mock.patch.object(common_utils, 'is_engine_dead')
|
||||
@mock.patch.object(common_utils, 'is_service_dead')
|
||||
@mock.patch.object(clo.ClusterLock, "acquire")
|
||||
@mock.patch.object(clo.ClusterLock, "steal")
|
||||
def test_cluster_lock_acquire_steal_failed(self, mock_steal, mock_acquire,
|
||||
|
@ -124,7 +124,7 @@ class SenlinLockTest(base.SenlinTestCase):
|
|||
self.assertTrue(res)
|
||||
mock_acquire.assert_called_once_with('NODE_A', 'ACTION_XYZ')
|
||||
|
||||
@mock.patch.object(common_utils, 'is_engine_dead')
|
||||
@mock.patch.object(common_utils, 'is_service_dead')
|
||||
@mock.patch.object(ao.Action, 'mark_failed')
|
||||
@mock.patch.object(nlo.NodeLock, "acquire")
|
||||
@mock.patch.object(nlo.NodeLock, "steal")
|
||||
|
@ -144,7 +144,7 @@ class SenlinLockTest(base.SenlinTestCase):
|
|||
self.ctx, 'ACTION_ABC', mock.ANY,
|
||||
'Engine died when executing this action.')
|
||||
|
||||
@mock.patch.object(common_utils, 'is_engine_dead')
|
||||
@mock.patch.object(common_utils, 'is_service_dead')
|
||||
@mock.patch.object(nlo.NodeLock, "acquire")
|
||||
def test_node_lock_acquire_failed(self, mock_acquire, mock_dead):
|
||||
mock_dead.return_value = False
|
||||
|
|
|
@ -266,7 +266,7 @@ class EngineDeathTest(base.SenlinTestCase):
|
|||
@mock.patch.object(service_obj.Service, 'get')
|
||||
def test_engine_is_none(self, mock_service):
|
||||
mock_service.return_value = None
|
||||
self.assertTrue(utils.is_engine_dead(self.ctx, 'fake_engine_id'))
|
||||
self.assertTrue(utils.is_service_dead(self.ctx, 'fake_engine_id'))
|
||||
mock_service.assert_called_once_with(self.ctx, 'fake_engine_id')
|
||||
|
||||
@mock.patch.object(service_obj.Service, 'get')
|
||||
|
@ -275,7 +275,7 @@ class EngineDeathTest(base.SenlinTestCase):
|
|||
update_time = timeutils.utcnow(True) - delta
|
||||
mock_service.return_value = mock.Mock(updated_at=update_time)
|
||||
|
||||
res = utils.is_engine_dead(self.ctx, 'fake_engine_id')
|
||||
res = utils.is_service_dead(self.ctx, 'fake_engine_id')
|
||||
|
||||
self.assertTrue(res)
|
||||
mock_service.assert_called_once_with(self.ctx, 'fake_engine_id')
|
||||
|
@ -284,7 +284,7 @@ class EngineDeathTest(base.SenlinTestCase):
|
|||
def test_engine_is_alive(self, mock_svc):
|
||||
mock_svc.return_value = mock.Mock(updated_at=timeutils.utcnow(True))
|
||||
|
||||
res = utils.is_engine_dead(self.ctx, 'fake_engine_id')
|
||||
res = utils.is_service_dead(self.ctx, 'fake_engine_id')
|
||||
|
||||
self.assertFalse(res)
|
||||
mock_svc.assert_called_once_with(self.ctx, 'fake_engine_id')
|
||||
|
@ -293,7 +293,7 @@ class EngineDeathTest(base.SenlinTestCase):
|
|||
def test_use_specified_duration(self, mock_svc):
|
||||
mock_svc.return_value = mock.Mock(updated_at=timeutils.utcnow(True))
|
||||
|
||||
res = utils.is_engine_dead(self.ctx, 'fake_engine_id', 10000)
|
||||
res = utils.is_service_dead(self.ctx, 'fake_engine_id', 10000)
|
||||
|
||||
self.assertFalse(res)
|
||||
mock_svc.assert_called_once_with(self.ctx, 'fake_engine_id')
|
||||
|
|
Loading…
Reference in New Issue