use Cotyledon lib

This change replaces oslo.service with Cotyledon

Change-Id: Ic724be73660727b123a3be280b0891f9dd014957
This commit is contained in:
Mehdi Abaakouk 2016-07-06 09:46:55 +02:00
parent 6f186567bb
commit ddedc4b49d
9 changed files with 141 additions and 201 deletions

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_service import service as os_service
import cotyledon
from aodh import evaluator as evaluator_svc
from aodh import event as event_svc
@ -25,17 +25,23 @@ from aodh import service
def notifier():
conf = service.prepare_service()
os_service.launch(conf, notifier_svc.AlarmNotifierService(conf),
workers=conf.notifier.workers).wait()
sm = cotyledon.ServiceManager()
sm.add(notifier_svc.AlarmNotifierService,
workers=conf.notifier.workers, args=(conf,))
sm.run()
def evaluator():
conf = service.prepare_service()
os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf),
workers=conf.evaluator.workers).wait()
sm = cotyledon.ServiceManager()
sm.add(evaluator_svc.AlarmEvaluationService,
workers=conf.evaluator.workers, args=(conf,))
sm.run()
def listener():
conf = service.prepare_service()
os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf),
workers=conf.listener.workers).wait()
sm = cotyledon.ServiceManager()
sm.add(event_svc.EventAlarmEvaluationService,
workers=conf.listener.workers, args=(conf,))
sm.run()

View File

@ -20,11 +20,11 @@ import json
import threading
from concurrent import futures
import cotyledon
import croniter
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as os_service
from oslo_utils import timeutils
import pytz
import six
@ -181,68 +181,34 @@ class Evaluator(object):
"""
class AlarmEvaluationService(os_service.Service):
class AlarmEvaluationService(cotyledon.Service):
PARTITIONING_GROUP_NAME = "alarm_evaluator"
EVALUATOR_EXTENSIONS_NAMESPACE = "aodh.evaluator"
def __init__(self, conf):
super(AlarmEvaluationService, self).__init__()
def __init__(self, worker_id, conf):
super(AlarmEvaluationService, self).__init__(worker_id)
self.conf = conf
@property
def _storage_conn(self):
if not self.storage_conn:
self.storage_conn = storage.get_connection_from_config(self.conf)
return self.storage_conn
ef = lambda: futures.ThreadPoolExecutor(max_workers=10)
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=ef)
def _load_evaluators(self):
self.evaluators = extension.ExtensionManager(
namespace=self.EVALUATOR_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(self.conf,)
)
self.storage_conn = storage.get_connection_from_config(self.conf)
def _evaluate_assigned_alarms(self):
try:
alarms = self._assigned_alarms()
LOG.info(_('initiating evaluation cycle on %d alarms') %
len(alarms))
for alarm in alarms:
self._evaluate_alarm(alarm)
except Exception:
LOG.exception(_('alarm evaluation cycle failed'))
def _evaluate_alarm(self, alarm):
"""Evaluate the alarms assigned to this evaluator."""
if alarm.type not in self.evaluators:
LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id)
return
LOG.debug('evaluating alarm %s', alarm.alarm_id)
try:
self.evaluators[alarm.type].obj.evaluate(alarm)
except Exception:
LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id)
def start(self):
super(AlarmEvaluationService, self).start()
self.storage_conn = None
self._load_evaluators()
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf)
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
ef = lambda: futures.ThreadPoolExecutor(max_workers=10)
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=ef)
if self.evaluators:
@periodics.periodic(spacing=self.conf.evaluation_interval,
run_immediately=not delay_start)
@ -266,25 +232,39 @@ class AlarmEvaluationService(os_service.Service):
t.daemon = True
t.start()
# NOTE(sileht): We have to drop eventlet to drop this last eventlet
# thread
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def terminate(self):
self.periodic.stop()
self.partition_coordinator.stop()
self.periodic.wait()
def stop(self):
if getattr(self, 'periodic', None):
self.periodic.stop()
self.periodic.wait()
if getattr(self, 'partition_coordinator', None):
self.partition_coordinator.stop()
super(AlarmEvaluationService, self).stop()
def _evaluate_assigned_alarms(self):
try:
alarms = self._assigned_alarms()
LOG.info(_('initiating evaluation cycle on %d alarms') %
len(alarms))
for alarm in alarms:
self._evaluate_alarm(alarm)
except Exception:
LOG.exception(_('alarm evaluation cycle failed'))
def _evaluate_alarm(self, alarm):
"""Evaluate the alarms assigned to this evaluator."""
if alarm.type not in self.evaluators:
LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id)
return
LOG.debug('evaluating alarm %s', alarm.alarm_id)
try:
self.evaluators[alarm.type].obj.evaluate(alarm)
except Exception:
LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id)
def _assigned_alarms(self):
# NOTE(r-mibu): The 'event' type alarms will be evaluated by the
# event-driven alarm evaluator, so this periodical evaluator skips
# those alarms.
all_alarms = self._storage_conn.get_alarms(enabled=True,
exclude=dict(type='event'))
all_alarms = self.storage_conn.get_alarms(enabled=True,
exclude=dict(type='event'))
all_alarms = list(all_alarms)
all_alarm_ids = [a.alarm_id for a in all_alarms]
selected = self.partition_coordinator.extract_my_subset(

View File

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import cotyledon
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_service import service
from aodh.evaluator import event
from aodh import messaging
@ -51,14 +51,10 @@ class EventAlarmEndpoint(object):
self.evaluator.evaluate_events(notification['payload'])
class EventAlarmEvaluationService(service.Service):
def __init__(self, conf):
super(EventAlarmEvaluationService, self).__init__()
class EventAlarmEvaluationService(cotyledon.Service):
def __init__(self, worker_id, conf):
super(EventAlarmEvaluationService, self).__init__(worker_id)
self.conf = conf
def start(self):
super(EventAlarmEvaluationService, self).start()
self.storage_conn = storage.get_connection_from_config(self.conf)
self.evaluator = event.EventAlarmEvaluator(self.conf)
self.listener = messaging.get_batch_notification_listener(
@ -69,11 +65,7 @@ class EventAlarmEvaluationService(service.Service):
self.conf.listener.batch_size,
self.conf.listener.batch_timeout)
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
if getattr(self, 'listener', None):
self.listener.stop()
self.listener.wait()
super(EventAlarmEvaluationService, self).stop()
def terminate(self):
self.listener.stop()
self.listener.wait()

View File

@ -15,10 +15,10 @@
import abc
import cotyledon
from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_service import service as os_service
from oslo_utils import netutils
import six
from stevedore import extension
@ -66,15 +66,11 @@ class AlarmNotifier(object):
"""
class AlarmNotifierService(os_service.Service):
class AlarmNotifierService(cotyledon.Service):
NOTIFIER_EXTENSIONS_NAMESPACE = "aodh.notifier"
def __init__(self, conf):
super(AlarmNotifierService, self).__init__()
def __init__(self, worker_id, conf):
self.conf = conf
def start(self):
super(AlarmNotifierService, self).start()
transport = messaging.get_transport(self.conf)
self.notifiers = extension.ExtensionManager(
self.NOTIFIER_EXTENSIONS_NAMESPACE,
@ -86,14 +82,10 @@ class AlarmNotifierService(os_service.Service):
transport, [target], [AlarmEndpoint(self.notifiers)], False,
self.conf.notifier.batch_size, self.conf.notifier.batch_timeout)
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
if getattr(self, 'listener', None):
self.listener.stop()
self.listener.wait()
super(AlarmNotifierService, self).stop()
def terminate(self):
self.listener.stop()
self.listener.wait()
class AlarmEndpoint(object):

View File

@ -18,6 +18,7 @@ import time
import mock
from oslo_config import fixture as fixture_config
from oslotest import mockpatch
from stevedore import extension
from aodh import evaluator
@ -34,7 +35,9 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
self.setup_messaging(self.CONF)
self.threshold_eval = mock.MagicMock()
self.evaluators = extension.ExtensionManager.make_test_instance(
self._fake_conn = mock.Mock()
self._fake_pc = mock.Mock()
self._fake_em = extension.ExtensionManager.make_test_instance(
[
extension.Extension(
'threshold',
@ -44,14 +47,23 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
]
)
self.svc = evaluator.AlarmEvaluationService(self.CONF)
self.svc.tg = mock.Mock()
self.useFixture(mockpatch.Patch(
'stevedore.extension.ExtensionManager',
return_value=self._fake_em
))
self.useFixture(mockpatch.Patch(
'aodh.coordination.PartitionCoordinator',
return_value=self._fake_pc
))
self.useFixture(mockpatch.Patch(
'aodh.storage.get_connection_from_config',
return_value=self._fake_conn
))
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def _do_test_start(self, test_interval=120,
coordination_heartbeat=1.0,
coordination_active=False):
self.CONF.set_override('evaluation_interval',
test_interval)
self.CONF.set_override('heartbeat',
@ -59,18 +71,14 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
group='coordination',
enforce_type=True)
with mock.patch('aodh.coordination.PartitionCoordinator') as m_pc:
m_pc.return_value.is_active.return_value = coordination_active
self._fake_pc.is_active.return_value = coordination_active
self.svc.start()
self.svc.stop()
self.svc.partition_coordinator.start.assert_called_once_with()
self.svc.partition_coordinator.join_group.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME)
actual = self.svc.tg.add_timer.call_args_list
self.assertEqual([mock.call(604800, mock.ANY)], actual)
svc = evaluator.AlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
svc.terminate()
svc.partition_coordinator.start.assert_called_once_with()
svc.partition_coordinator.join_group.assert_called_once_with(
svc.PARTITIONING_GROUP_NAME)
def test_start_singleton(self):
self._do_test_start(coordination_active=False)
@ -82,78 +90,62 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
self._do_test_start(coordination_active=True, test_interval=10,
coordination_heartbeat=5)
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.storage.get_connection_from_config')
@mock.patch('aodh.coordination.PartitionCoordinator')
def test_evaluation_cycle(self, m_pc, m_conn, m_em):
def test_evaluation_cycle(self):
alarm = mock.Mock(type='threshold', alarm_id="alarm_id1")
m_pc.return_value.extract_my_subset.return_value = ["alarm_id1"]
m_pc.return_value.is_active.return_value = False
m_conn.return_value.get_alarms.return_value = [alarm]
m_em.return_value = self.evaluators
self._fake_pc.extract_my_subset.return_value = ["alarm_id1"]
self._fake_pc.is_active.return_value = False
self._fake_conn.get_alarms.return_value = [alarm]
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
self.addCleanup(self.svc.stop)
self.svc.start()
svc = evaluator.AlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
time.sleep(1)
target = self.svc.partition_coordinator.extract_my_subset
target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME,
target = svc.partition_coordinator.extract_my_subset
target.assert_called_once_with(svc.PARTITIONING_GROUP_NAME,
["alarm_id1"])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.coordination.PartitionCoordinator')
def test_evaluation_cycle_with_bad_alarm(self, m_pc, m_em):
m_pc.return_value.is_active.return_value = False
m_em.return_value = self.evaluators
def test_evaluation_cycle_with_bad_alarm(self):
alarms = [
mock.Mock(type='threshold', name='bad'),
mock.Mock(type='threshold', name='good'),
mock.Mock(type='threshold', name='bad', alarm_id='a'),
mock.Mock(type='threshold', name='good', alarm_id='b'),
]
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
with mock.patch.object(self.svc, '_assigned_alarms',
return_value=alarms):
self.addCleanup(self.svc.stop)
self.svc.start()
time.sleep(1)
self._fake_pc.is_active.return_value = False
self._fake_pc.extract_my_subset.return_value = ['a', 'b']
self._fake_conn.get_alarms.return_value = alarms
svc = evaluator.AlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
time.sleep(1)
self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])],
self.threshold_eval.evaluate.call_args_list)
@mock.patch('stevedore.extension.ExtensionManager')
def test_unknown_extension_skipped(self, m_em):
m_em.return_value = self.evaluators
def test_unknown_extension_skipped(self):
alarms = [
mock.Mock(type='not_existing_type'),
mock.Mock(type='threshold')
mock.Mock(type='not_existing_type', alarm_id='a'),
mock.Mock(type='threshold', alarm_id='b')
]
with mock.patch.object(self.svc, '_assigned_alarms',
return_value=alarms):
self.addCleanup(self.svc.stop)
self.svc.start()
time.sleep(1)
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.coordination.PartitionCoordinator')
@mock.patch('aodh.storage.get_connection_from_config')
def test_check_alarm_query_constraints(self, m_conn, m_pc, m_em):
m_conn.return_value.get_alarms.return_value = []
m_pc.return_value.extract_my_subset.return_value = []
m_pc.return_value.is_active.return_value = False
m_em.return_value = self.evaluators
self.addCleanup(self.svc.stop)
self.svc.start()
self._fake_pc.is_active.return_value = False
self._fake_pc.extract_my_subset.return_value = ['a', 'b']
self._fake_conn.get_alarms.return_value = alarms
svc = evaluator.AlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
time.sleep(1)
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
def test_check_alarm_query_constraints(self):
self._fake_conn.get_alarms.return_value = []
self._fake_pc.extract_my_subset.return_value = []
self._fake_pc.is_active.return_value = False
svc = evaluator.AlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
time.sleep(1)
expected = [({'enabled': True, 'exclude': {'type': 'event'}},)]
self.assertEqual(expected,
self.svc.storage_conn.get_alarms.call_args_list)
svc.storage_conn.get_alarms.call_args_list)

View File

@ -18,7 +18,6 @@ import time
from oslo_config import fixture as fixture_config
import oslo_messaging
from oslo_messaging import server
from aodh import event
from aodh import service
@ -29,41 +28,21 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
def setUp(self):
super(TestEventAlarmEvaluationService, self).setUp()
conf = service.prepare_service(argv=[], config_files=[])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.CONF.set_override("batch_size", 2, 'listener')
self.setup_messaging(self.CONF)
self.service = event.EventAlarmEvaluationService(self.CONF)
self._msg_notifier = oslo_messaging.Notifier(
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
@mock.patch('aodh.event.EventAlarmEndpoint.sample')
def test_batch_event_listener(self, mocked):
msg_notifier = oslo_messaging.Notifier(
self.transport, topics=['alarm.all'], driver='messaging',
publisher_id='test-publisher')
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def test_start_and_stop_service(self):
self.addCleanup(self.service.stop)
self.service.start()
self.assertIsInstance(self.service.listener,
server.MessageHandlingServer)
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def test_listener_start_called(self):
listener = mock.Mock()
with mock.patch('aodh.messaging.get_batch_notification_listener',
return_value=listener):
self.addCleanup(self.service.stop)
self.service.start()
self.assertTrue(listener.start.called)
@mock.patch('aodh.event.EventAlarmEndpoint.sample')
def test_batch_event_listener(self, mocked):
received_events = []
mocked.side_effect = lambda msg: received_events.append(msg)
self.CONF.set_override("batch_size", 2, 'listener')
with mock.patch('aodh.storage.get_connection_from_config'):
self.svc = event.EventAlarmEvaluationService(self.CONF)
self.svc.start()
event1 = {'event_type': 'compute.instance.update',
'traits': ['foo', 'bar'],
'message_id': '20d03d17-4aba-4900-a179-dba1281a3451',
@ -72,9 +51,12 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
'traits': ['foo', 'bar'],
'message_id': '20d03d17-4aba-4900-a179-dba1281a3452',
'generated': '2016-04-23T06:50:23.622739'}
self._msg_notifier.sample({}, 'event', event1)
self._msg_notifier.sample({}, 'event', event2)
msg_notifier.sample({}, 'event', event1)
msg_notifier.sample({}, 'event', event2)
svc = event.EventAlarmEvaluationService(0, self.CONF)
self.addCleanup(svc.terminate)
time.sleep(1)
self.assertEqual(1, len(received_events))
self.assertEqual(2, len(received_events[0]))
self.svc.stop()

View File

@ -51,9 +51,8 @@ class TestAlarmNotifierService(tests_base.BaseTestCase):
self.setup_messaging(self.CONF)
def test_init_host_queue(self):
self.service = notifier.AlarmNotifierService(self.CONF)
self.service.start()
self.service.stop()
self.service = notifier.AlarmNotifierService(0, self.CONF)
self.service.terminate()
class TestAlarmNotifier(tests_base.BaseTestCase):
@ -69,9 +68,8 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
self.useFixture(mockpatch.Patch(
'aodh.notifier.zaqar.ZaqarAlarmNotifier.get_zaqar_client',
return_value=self.zaqar))
self.service = notifier.AlarmNotifierService(self.CONF)
self.service.start()
self.addCleanup(self.service.stop)
self.service = notifier.AlarmNotifierService(0, self.CONF)
self.addCleanup(self.service.terminate)
def test_notify_alarm(self):
data = {
@ -120,11 +118,11 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
'reason': 'Everything is fine',
'reason_data': {'fine': 'fine'}
}
self.service.stop()
self.service.terminate()
self.CONF.set_override("batch_size", 2, 'notifier')
# Init a new service with new configuration
self.svc = notifier.AlarmNotifierService(self.CONF)
self.svc.start()
self.svc = notifier.AlarmNotifierService(0, self.CONF)
self.addCleanup(self.svc.terminate)
self._msg_notifier.sample({}, 'alarm.update', data1)
self._msg_notifier.sample({}, 'alarm.update', data2)
time.sleep(1)
@ -150,7 +148,6 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
notifications[1])
self.assertEqual(mock.call('Received %s messages in batch.', 2),
logger.call_args_list[0])
self.svc.stop()
@staticmethod
def _notification(action):

View File

@ -8,5 +8,4 @@ namespace = oslo.log
namespace = oslo.messaging
namespace = oslo.middleware.cors
namespace = oslo.policy
namespace = oslo.service.service
namespace = keystonemiddleware.auth_token

View File

@ -15,7 +15,6 @@ oslo.config>=2.6.0 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.log>=1.2.0 # Apache-2.0
oslo.policy>=0.5.0 # Apache-2.0
oslo.service>=0.1.0 # Apache-2.0
PasteDeploy>=1.5.0
pbr<2.0,>=0.11
pecan>=0.8.0
@ -33,3 +32,4 @@ tooz>=1.28.0 # Apache-2.0
Werkzeug>=0.7 # BSD License
WebOb>=1.2.3
WSME>=0.8
cotyledon