move out oslo.service

This change replaces oslo.service by cotyledon.

Change-Id: I6eea5fcd26ade07fbcb5718b4285b7fa039a3b08
This commit is contained in:
Mehdi Abaakouk 2016-05-03 08:23:50 +02:00
parent 432f411603
commit 80bc124511
12 changed files with 119 additions and 133 deletions

View File

@ -233,7 +233,7 @@ class PollingTask(object):
class AgentManager(service_base.PipelineBasedService):
def __init__(self, namespaces=None, pollster_list=None):
def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
namespaces = namespaces or ['compute', 'central']
pollster_list = pollster_list or []
group_prefix = cfg.CONF.polling.partitioning_group_prefix
@ -244,7 +244,7 @@ class AgentManager(service_base.PipelineBasedService):
if pollster_list and cfg.CONF.coordination.backend_url:
raise PollsterListForbidden()
super(AgentManager, self).__init__()
super(AgentManager, self).__init__(worker_id)
def _match(pollster):
"""Find out if pollster name matches to one of the list."""
@ -412,19 +412,18 @@ class AgentManager(service_base.PipelineBasedService):
# Don't start useless threads if no task will run
utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
def start(self):
super(AgentManager, self).start()
def run(self):
super(AgentManager, self).run()
self.polling_manager = pipeline.setup_polling()
self.join_partitioning_groups()
self.start_polling_tasks()
self.init_pipeline_refresh()
def stop(self):
if self.started:
self.stop_pollsters_tasks()
self.heartbeat_timer.stop()
self.partition_coordinator.stop()
super(AgentManager, self).stop()
def terminate(self):
self.stop_pollsters_tasks()
self.heartbeat_timer.stop()
self.partition_coordinator.stop()
super(AgentManager, self).terminate()
def interval_task(self, task):
# NOTE(sileht): remove the previous keystone client

View File

@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import cotyledon
from oslo_config import cfg
from oslo_service import service as os_service
from ceilometer import notification
from ceilometer import service
@ -25,5 +25,8 @@ CONF = cfg.CONF
def main():
service.prepare_service()
os_service.launch(CONF, notification.NotificationService(),
workers=CONF.notification.workers).wait()
sm = cotyledon.ServiceManager()
sm.add(notification.NotificationService,
workers=CONF.notification.workers)
sm.run()

View File

@ -14,8 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import cotyledon
from oslo_config import cfg
from oslo_service import service as os_service
from ceilometer import collector
from ceilometer import service
@ -25,5 +25,6 @@ CONF = cfg.CONF
def main():
service.prepare_service()
os_service.launch(CONF, collector.CollectorService(),
workers=CONF.collector.workers).wait()
sm = cotyledon.ServiceManager()
sm.add(collector.CollectorService, workers=CONF.collector.workers)
sm.run()

View File

@ -14,9 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import cotyledon
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as os_service
from ceilometer.agent import manager
from ceilometer.i18n import _LW
@ -78,7 +78,14 @@ CLI_OPTS = [
CONF.register_cli_opts(CLI_OPTS)
def create_polling_service(worker_id):
return manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list,
worker_id)
def main():
service.prepare_service()
os_service.launch(CONF, manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list)).wait()
sm = cotyledon.ServiceManager()
sm.add(create_polling_service)
sm.run()

View File

@ -16,6 +16,7 @@
from itertools import chain
import socket
import cotyledon
import msgpack
from oslo_config import cfg
from oslo_log import log
@ -27,7 +28,6 @@ from ceilometer import dispatcher
from ceilometer.i18n import _, _LE, _LW
from ceilometer import messaging
from ceilometer.publisher import utils as publisher_utils
from ceilometer import service_base
from ceilometer import utils
OPTS = [
@ -59,17 +59,17 @@ cfg.CONF.import_opt('store_events', 'ceilometer.notification',
LOG = log.getLogger(__name__)
class CollectorService(service_base.ServiceBase):
class CollectorService(cotyledon.Service):
"""Listener for the collector service."""
def start(self):
def run(self):
"""Bind the UDP socket and handle incoming data."""
super(CollectorService, self).run()
# ensure dispatcher is configured before starting other services
dispatcher_managers = dispatcher.load_dispatcher_manager()
(self.meter_manager, self.event_manager) = dispatcher_managers
self.sample_listener = None
self.event_listener = None
self.udp_thread = None
super(CollectorService, self).start()
if cfg.CONF.collector.udp_address:
self.udp_thread = utils.spawn_thread(self.start_udp)
@ -133,16 +133,15 @@ class CollectorService(service_base.ServiceBase):
LOG.warning(_LW('sample signature invalid, '
'discarding: %s'), sample)
def stop(self):
if self.started:
if self.sample_listener:
utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
if self.udp_thread:
self.udp_run = False
self.udp_thread.join()
super(CollectorService, self).stop()
def terminate(self):
if self.sample_listener:
utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
if self.udp_thread:
self.udp_run = False
self.udp_thread.join()
super(CollectorService, self).terminate()
class CollectorEndpoint(object):

View File

@ -146,8 +146,8 @@ class NotificationService(service_base.PipelineBasedService):
return event_pipe_manager
def start(self):
super(NotificationService, self).start()
def run(self):
super(NotificationService, self).run()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
@ -311,19 +311,18 @@ class NotificationService(service_base.PipelineBasedService):
batch_timeout=cfg.CONF.notification.batch_timeout)
self.pipeline_listener.start()
def stop(self):
if self.started:
self.shutdown = True
if self.periodic:
self.periodic.stop()
self.periodic.wait()
if self.partition_coordinator:
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
super(NotificationService, self).stop()
def terminate(self):
self.shutdown = True
if self.periodic:
self.periodic.stop()
self.periodic.wait()
if self.partition_coordinator:
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
super(NotificationService, self).terminate()
def reload_pipeline(self):
LOG.info(_LI("Reloading notification agent and listeners."))

View File

@ -15,9 +15,9 @@
import abc
import cotyledon
from oslo_config import cfg
from oslo_log import log
from oslo_service import service as os_service
import six
from ceilometer.i18n import _LE, _LI
@ -27,18 +27,8 @@ from ceilometer import utils
LOG = log.getLogger(__name__)
class ServiceBase(os_service.Service):
def __init__(self):
self.started = False
super(ServiceBase, self).__init__()
def start(self):
self.started = True
super(ServiceBase, self).start()
@six.add_metaclass(abc.ABCMeta)
class PipelineBasedService(ServiceBase):
class PipelineBasedService(cotyledon.Service):
def clear_pipeline_validation_status(self):
"""Clears pipeline validation status flags."""
self.pipeline_validated = False
@ -65,11 +55,10 @@ class PipelineBasedService(ServiceBase):
spacing=cfg.CONF.pipeline_polling_interval)
utils.spawn_thread(self.refresh_pipeline_periodic.start)
def stop(self):
if self.started and self.refresh_pipeline_periodic:
def terminate(self):
if self.refresh_pipeline_periodic:
self.refresh_pipeline_periodic.stop()
self.refresh_pipeline_periodic.wait()
super(PipelineBasedService, self).stop()
def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE):
return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else

View File

@ -76,7 +76,7 @@ class TestCollector(tests_base.BaseTestCase):
),
'not-so-secret')
self.srv = collector.CollectorService()
self.srv = collector.CollectorService(0)
def _setup_messaging(self, enabled=True):
if enabled:
@ -121,8 +121,8 @@ class TestCollector(tests_base.BaseTestCase):
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
@ -139,8 +139,8 @@ class TestCollector(tests_base.BaseTestCase):
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
mock_socket.assert_called_with(socket.AF_INET6, socket.SOCK_DGRAM)
@ -153,8 +153,8 @@ class TestCollector(tests_base.BaseTestCase):
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
@ -172,8 +172,8 @@ class TestCollector(tests_base.BaseTestCase):
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
with mock.patch('msgpack.loads', self._raise_error):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
@ -189,8 +189,8 @@ class TestCollector(tests_base.BaseTestCase):
with mock.patch.object(oslo_messaging.MessageHandlingServer,
'start', side_effect=real_start) as rpc_start:
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertEqual(0, rpc_start.call_count)
@ -202,8 +202,8 @@ class TestCollector(tests_base.BaseTestCase):
self.data_sent = []
with mock.patch('socket.socket',
return_value=self._make_fake_socket(self.utf8_msg)):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.srv.udp_thread.join(5)
self.assertFalse(self.srv.udp_thread.is_alive())
self.assertTrue(utils.verify_signature(
@ -218,8 +218,8 @@ class TestCollector(tests_base.BaseTestCase):
mock_record.side_effect = Exception('boom')
mock_dispatcher.record_events.side_effect = Exception('boom')
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
ret = endp.sample([{'ctxt': {}, 'publisher_id': 'pub_id',
'event_type': 'event', 'payload': {},

View File

@ -19,7 +19,6 @@ import shutil
import mock
from oslo_config import fixture as fixture_config
import oslo_messaging
import oslo_service.service
from oslo_utils import fileutils
from oslo_utils import timeutils
import six
@ -95,7 +94,7 @@ class TestNotification(tests_base.BaseTestCase):
self.CONF.set_override("disable_non_metric_meters", False,
group="notification")
self.setup_messaging(self.CONF)
self.srv = notification.NotificationService()
self.srv = notification.NotificationService(0)
def fake_get_notifications_manager(self, pm):
self.plugin = instance.Instance(pm)
@ -115,8 +114,8 @@ class TestNotification(tests_base.BaseTestCase):
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = self.fake_get_notifications_manager
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.fake_event_endpoint = fake_event_endpoint_class.return_value
def test_start_multiple_listeners(self):
@ -165,12 +164,13 @@ class TestNotification(tests_base.BaseTestCase):
with mock.patch.object(self.srv,
'_get_notifications_manager') as get_nm:
get_nm.side_effect = fake_get_notifications_manager_dup_targets
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.assertEqual(1, len(mock_listener.call_args_list))
args, kwargs = mock_listener.call_args
self.assertEqual(1, len(args[1]))
self.assertIsInstance(args[1][0], oslo_messaging.Target)
self.assertEqual(1, len(self.srv.listeners))
class BaseRealNotification(tests_base.BaseTestCase):
@ -245,8 +245,8 @@ class BaseRealNotification(tests_base.BaseTestCase):
self.publisher = test_publisher.TestPublisher("")
def _check_notification_service(self):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
@ -271,21 +271,21 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('refresh_event_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 1)
self.srv = notification.NotificationService()
self.srv = notification.NotificationService(0)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_pipeline_poller(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
self.assertIsNotNone(self.srv.refresh_pipeline_periodic)
def test_notification_reloaded_pipeline(self):
pipeline_cfg_file = self.setup_pipeline(['instance'])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
pipeline = self.srv.pipe_manager
@ -306,8 +306,8 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
self.CONF.set_override("store_events", True, group="notification")
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
pipeline = self.srv.event_pipe_manager
@ -327,7 +327,7 @@ class TestRealNotification(BaseRealNotification):
def setUp(self):
super(TestRealNotification, self).setUp()
self.srv = notification.NotificationService()
self.srv = notification.NotificationService(0)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
@ -337,8 +337,8 @@ class TestRealNotification(BaseRealNotification):
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service_error_topic(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
notifier = messaging.get_notifier(self.transport,
'compute.vagrant-precise')
notifier.error({}, 'compute.instance.error',
@ -359,14 +359,6 @@ class TestRealNotification(BaseRealNotification):
self._check_notification_service()
self.assertEqual('memory', self.publisher.samples[0].name)
@mock.patch.object(oslo_service.service.Service, 'stop')
def test_notification_service_start_abnormal(self, mocked):
try:
self.srv.stop()
except Exception:
pass
self.assertEqual(1, mocked.call_count)
class TestRealNotificationHA(BaseRealNotification):
@ -374,7 +366,7 @@ class TestRealNotificationHA(BaseRealNotification):
super(TestRealNotificationHA, self).setUp()
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.srv = notification.NotificationService()
self.srv = notification.NotificationService(0)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
@ -389,8 +381,8 @@ class TestRealNotificationHA(BaseRealNotification):
mock.MagicMock(), # refresh pipeline listener
]
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
def _check_listener_targets():
args, kwargs = mock_listener.call_args
@ -409,8 +401,8 @@ class TestRealNotificationHA(BaseRealNotification):
def test_retain_common_targets_on_refresh(self, mock_listener):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
listened_before = [target.topic for target in
mock_listener.call_args[0][1]]
self.assertEqual(4, len(listened_before))
@ -426,8 +418,8 @@ class TestRealNotificationHA(BaseRealNotification):
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_notify_to_relevant_endpoint(self, mock_listener):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
targets = mock_listener.call_args[0][1]
self.assertIsNotEmpty(targets)
@ -449,8 +441,8 @@ class TestRealNotificationHA(BaseRealNotification):
@mock.patch('oslo_messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
@ -531,16 +523,16 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
self.srv = notification.NotificationService()
self.srv2 = notification.NotificationService()
self.srv = notification.NotificationService(0)
self.srv2 = notification.NotificationService(0)
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
with mock.patch('uuid.uuid4', return_value='harry'):
self.srv.start()
self.addCleanup(self.srv.stop)
self.srv.run()
self.addCleanup(self.srv.terminate)
with mock.patch('uuid.uuid4', return_value='lloyd'):
self.srv2.start()
self.addCleanup(self.srv2.stop)
self.srv2.run()
self.addCleanup(self.srv2.terminate)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")

View File

@ -310,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mpc.is_active.return_value = False
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.partition_coordinator.heartbeat = mock.MagicMock()
self.mgr.start()
self.mgr.run()
setup_polling.assert_called_once_with()
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
@ -325,7 +325,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
time.sleep(0.5)
self.assertGreaterEqual(1, runs)
self.mgr.stop()
self.mgr.terminate()
mpc.stop.assert_called_once_with()
@mock.patch('ceilometer.pipeline.setup_polling')
@ -338,9 +338,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 5)
self.addCleanup(self.mgr.stop)
self.mgr.start()
self.addCleanup(self.mgr.stop)
self.mgr.run()
self.addCleanup(self.mgr.terminate)
setup_polling.assert_called_once_with()
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
@ -432,8 +431,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mgr = self.create_manager()
mgr.extensions = self.mgr.extensions
mgr.create_polling_task = mock.MagicMock()
mgr.start()
self.addCleanup(mgr.stop)
mgr.run()
self.addCleanup(mgr.terminate)
mgr.create_polling_task.assert_called_once_with()
def test_manager_exception_persistency(self):

View File

@ -20,7 +20,6 @@ from keystoneclient import exceptions as ks_exceptions
import mock
from novaclient import client as novaclient
from oslo_config import fixture as fixture_config
from oslo_service import service as os_service
from oslo_utils import fileutils
from oslotest import base
from oslotest import mockpatch
@ -431,8 +430,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.start()
self.addCleanup(self.mgr.stop)
self.mgr.run()
self.addCleanup(self.mgr.terminate)
# Manually executes callbacks
for cb, __, args, kwargs in self.mgr.polling_periodics._callables:
cb(*args, **kwargs)
@ -463,9 +462,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
pipeline_cfg_file = self.setup_pipeline_file(pipeline)
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
self.addCleanup(self.mgr.stop)
self.mgr.run()
self.addCleanup(self.mgr.terminate)
# we only got the old name of meters
for sample in self.notified_samples:

View File

@ -2,6 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
cotyledon
futures>=3.0;python_version=='2.7' or python_version=='2.6' # BSD
futurist>=0.11.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
@ -20,7 +21,6 @@ oslo.log>=1.14.0 # Apache-2.0
oslo.policy>=0.5.0 # Apache-2.0
oslo.reports>=0.6.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.service>=1.0.0 # Apache-2.0
PasteDeploy>=1.5.0 # MIT
pbr>=1.6 # Apache-2.0
pecan>=1.0.0 # BSD