Support different publisher interval

Changes the agent manager to poll based on pipeline configuration, to
support different interval requirement from different publishers.

This patch clean up the test case for agent managers. It makes the changes
a bit bigger, however, it's not so easy to seprate this cleanup patch
with different interval part.

For bp publisher-counters-frequency

Change-Id: I3c1163e37de6a17261d2c570843845696ebff58f
Signed-off-by: Yunhong, Jiang <yunhong.jiang@intel.com>
This commit is contained in:
Yunhong, Jiang 2013-02-17 14:16:39 +08:00
parent d767d2c5f5
commit 9335d81316
13 changed files with 418 additions and 149 deletions

View File

@ -22,18 +22,17 @@ eventlet.monkey_patch()
import sys
from oslo.config import cfg
from ceilometer import service as ceilo_service
from ceilometer.central import manager
from ceilometer.service import prepare_service
from ceilometer.openstack.common import service
from ceilometer.openstack.common.rpc import service as rpc_service
if __name__ == '__main__':
prepare_service(sys.argv)
mgr = manager.AgentManager()
topic = 'ceilometer.agent.central'
ceilo = ceilo_service.PeriodicService(cfg.CONF.host,
ceilo = rpc_service.Service(cfg.CONF.host,
topic, mgr)
launcher = service.launch(ceilo)
launcher.wait()

View File

@ -22,11 +22,10 @@ eventlet.monkey_patch()
import sys
from oslo.config import cfg
from ceilometer import service as ceilo_service
from ceilometer.compute import manager
from ceilometer.service import prepare_service
from ceilometer.openstack.common import service
from ceilometer.openstack.common.rpc import service as rpc_service
if __name__ == '__main__':
@ -34,7 +33,7 @@ if __name__ == '__main__':
prepare_service(sys.argv)
mgr = manager.AgentManager()
topic = 'ceilometer.agent.compute'
ceilo = ceilo_service.PeriodicService(cfg.CONF.host,
ceilo = rpc_service.Service(cfg.CONF.host,
topic, mgr)
launcher = service.launch(ceilo)
launcher.wait()

View File

@ -16,15 +16,38 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
from oslo.config import cfg
from stevedore import dispatch
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log
from ceilometer import pipeline
LOG = log.getLogger(__name__)
class PollingTask(object):
"""Polling task for polling counters and inject into pipeline
A polling task can be invoked periodically or only once"""
def __init__(self, agent_manager):
self.manager = agent_manager
self.pollsters = set()
self.publish_context = pipeline.PublishContext(
agent_manager.context,
cfg.CONF.counter_source)
def add(self, pollster, pipelines):
self.publish_context.add_pipelines(pipelines)
self.pollsters.update([pollster])
@abc.abstractmethod
def poll_and_publish(self):
"""Polling counter and publish into pipeline."""
class AgentManager(object):
def __init__(self, extension_manager):
@ -38,19 +61,34 @@ class AgentManager(object):
self.pollster_manager = extension_manager
def publish_counters_from_one_pollster(self, ext, manager, context,
*args, **kwargs):
"""Used to invoke the plugins loaded by the ExtensionManager.
"""
try:
publisher = manager.pipeline_manager.publisher(
context,
cfg.CONF.counter_source,
)
with publisher as p:
LOG.debug('Polling and publishing %s', ext.name)
p(ext.obj.get_counters(manager, *args, **kwargs))
except Exception as err:
LOG.warning('Continuing after error from %s: %s',
ext.name, err)
LOG.exception(err)
self.context = context.RequestContext('admin', 'admin', is_admin=True)
@abc.abstractmethod
def create_polling_task(self):
"""Create an empty polling task"""
def setup_polling_tasks(self):
polling_tasks = {}
for pipeline, pollster in itertools.product(
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
for counter in pollster.obj.get_counter_names():
if pipeline.support_counter(counter):
polling_task = polling_tasks.get(pipeline.interval, None)
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[pipeline.interval] = polling_task
polling_task.add(pollster, [pipeline])
break
return polling_tasks
def initialize_service_hook(self, service):
self.service = service
for interval, task in self.setup_polling_tasks().iteritems():
self.service.tg.add_timer(interval,
self.interval_task,
task=task)
def interval_task(self, task):
task.poll_and_publish()

View File

@ -21,6 +21,7 @@ from oslo.config import cfg
from ceilometer import agent
from ceilometer import extension_manager
from ceilometer.openstack.common import log
from ceilometer import service # For cfg.CONF.os_*
OPTS = [
@ -33,6 +34,26 @@ OPTS = [
cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__)
class PollingTask(agent.PollingTask):
def poll_and_publish(self):
"""Tasks to be run at a periodic interval."""
with self.publish_context as publisher:
# TODO(yjiang5) passing counters into get_counters to avoid
# polling all counters one by one
for pollster in self.pollsters:
try:
LOG.info("Polling pollster %s", pollster.name)
publisher(list(pollster.obj.get_counters(
self.manager)))
except Exception as err:
LOG.warning('Continue after error from %s: %s',
pollster.name, err)
LOG.exception(err)
class AgentManager(agent.AgentManager):
def __init__(self):
@ -43,15 +64,15 @@ class AgentManager(agent.AgentManager):
),
)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
self.keystone = ksclient.Client(username=cfg.CONF.os_username,
def create_polling_task(self):
return PollingTask(self)
def interval_task(self, task):
self.keystone = ksclient.Client(
username=cfg.CONF.os_username,
password=cfg.CONF.os_password,
tenant_id=cfg.CONF.os_tenant_id,
tenant_name=cfg.CONF.os_tenant_name,
auth_url=cfg.CONF.os_auth_url)
self.pollster_manager.map(self.publish_counters_from_one_pollster,
manager=self,
context=context,
)
super(AgentManager, self).interval_task(task)

View File

@ -41,6 +41,29 @@ cfg.CONF.register_opts(OPTS)
LOG = log.getLogger(__name__)
class PollingTask(agent.PollingTask):
def poll_and_publish_instances(self, instances):
with self.publish_context as publisher:
for instance in instances:
if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error':
# TODO(yjiang5) passing counters to get_counters to avoid
# polling all counters one by one
for pollster in self.pollsters:
try:
LOG.info("Polling pollster %s", pollster.name)
publisher(list(pollster.obj.get_counters(
self.manager,
instance)))
except Exception as err:
LOG.warning('Continue after error from %s: %s',
pollster.name, err)
LOG.exception(err)
def poll_and_publish(self):
self.poll_and_publish_instances(
self.manager.nv.instance_get_all_by_host(cfg.CONF.host))
def get_hypervisor_inspector():
try:
namespace = 'ceilometer.compute.virt'
@ -63,20 +86,23 @@ class AgentManager(agent.AgentManager):
),
)
self._inspector = get_hypervisor_inspector()
self.nv = nova_client.Client()
def create_polling_task(self):
return PollingTask(self)
def setup_notifier_task(self):
"""For nova notifier usage"""
task = PollingTask(self)
for pollster in self.pollster_manager.extensions:
task.add(
pollster,
self.pipeline_manager.pipelines)
self.notifier_task = task
def poll_instance(self, context, instance):
"""Poll one instance."""
self.pollster_manager.map(self.publish_counters_from_one_pollster,
manager=self,
context=context,
instance=instance)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
nv = nova_client.Client()
for instance in nv.instance_get_all_by_host(cfg.CONF.host):
if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error':
self.poll_instance(context, instance)
self.notifier_task.poll_and_publish_instances([instance])
@property
def inspector(self):

View File

@ -46,6 +46,7 @@ def initialize_manager(agent_manager=None):
_agent_manager = AgentManager()
else:
_agent_manager = agent_manager
_agent_manager.setup_notifier_task()
def notify(context, message):

View File

@ -95,10 +95,10 @@ class CeilometerMiddleware(object):
req = Request(env)
version, account, container, obj = split_path(req.path, 1, 4, True)
now = timeutils.utcnow().isoformat()
with self.pipeline_manager.publisher(
with pipeline.PublishContext(
context.get_admin_context(),
cfg.CONF.counter_source
cfg.CONF.counter_source,
self.pipeline_manager.pipelines,
) as publisher:
if bytes_received:
publisher([counter.Counter(

View File

@ -64,13 +64,16 @@ class TransformerExtensionManager(extension.ExtensionManager):
return self.by_name[name]
class Publisher(object):
class PublishContext(object):
def __init__(self, pipelines, context, source):
self.pipelines = pipelines
def __init__(self, context, source, pipelines=[]):
self.pipelines = set(pipelines)
self.context = context
self.source = source
def add_pipelines(self, pipelines):
self.pipelines.update(pipelines)
def __enter__(self):
def p(counters):
for p in self.pipelines:
@ -360,7 +363,7 @@ class PipelineManager(object):
:param context: The context.
:param source: Counter source.
"""
return Publisher(self.pipelines, context, source)
return PublishContext(context, source, self.pipelines)
def setup_pipeline(publisher_manager):

242
tests/agentbase.py Normal file
View File

@ -0,0 +1,242 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
# Copyright © 2013 Intel corp.
#
# Author: Yunhong Jiang <yunhong.jiang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import datetime
import mock
from stevedore import extension
from stevedore import dispatch
from stevedore.tests import manager as extension_tests
from ceilometer import counter
from ceilometer import pipeline
from ceilometer.tests import base
default_test_data = counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'Pollster'},
)
class TestPollster:
test_data = default_test_data
@classmethod
def get_counter_names(self):
return [self.test_data.name]
def get_counters(self, manager, instance=None):
self.counters.append((manager, instance))
return [self.test_data]
class TestPollsterException(TestPollster):
def get_counters(self, manager, instance=None):
# Put an instance parameter here so that it can be used
# by both central manager and compute manager
# In future, we possibly don't need such hack if we
# combin the get_counters() function again
self.counters.append((manager, instance))
raise Exception()
class BaseAgentManagerTestCase(base.TestCase):
class PublisherClass():
def __init__(self):
self.counters = []
def publish_counters(self, ctxt, counter, source):
self.counters.extend(counter)
class Pollster(TestPollster):
counters = []
test_data = default_test_data
class PollsterAnother(TestPollster):
counters = []
test_data = default_test_data._replace(name='testanother')
class PollsterException(TestPollsterException):
counters = []
test_data = default_test_data._replace(name='testexception')
class PollsterExceptionAnother(TestPollsterException):
counters = []
test_data = default_test_data._replace(name='testexceptionanother')
def setup_pipeline(self):
self.publisher = self.PublisherClass()
self.publisher_manager = dispatch.NameDispatchExtensionManager(
'fake',
check_func=lambda x: True,
invoke_on_load=False,
)
self.publisher_manager.extensions = [
extension.Extension(
'test_pub',
None,
None,
self.publisher,
), ]
self.publisher_manager.by_name = dict(
(e.name, e)
for e
in self.publisher_manager.extensions)
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.publisher_manager)
def create_extension_manager(self):
return extension_tests.TestExtensionManager(
[
extension.Extension(
'test',
None,
None,
self.Pollster(), ),
extension.Extension(
'testanother',
None,
None,
self.PollsterAnother(), ),
extension.Extension(
'testexception',
None,
None,
self.PollsterException(), ),
extension.Extension(
'testexceptionanother',
None,
None,
self.PollsterExceptionAnother(), ),
],
'fake',
invoke_on_load=False,
)
@abc.abstractmethod
def setup_manager(self):
"""Setup subclass specific managers"""
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(BaseAgentManagerTestCase, self).setUp()
self.setup_manager()
self.mgr.pollster_manager = self.create_extension_manager()
self.pipeline_cfg = [{
'name': "test_pipeline",
'interval': 60,
'counters': ['test'],
'transformers': [],
'publishers': ["test_pub"],
}, ]
self.setup_pipeline()
def tearDown(self):
self.Pollster.counters = []
self.PollsterAnother.counters = []
self.PollsterException.counters = []
self.PollsterExceptionAnother.counters = []
super(BaseAgentManagerTestCase, self).tearDown()
def test_setup_polling_tasks(self):
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.values()[0])
self.assertEqual(self.publisher.counters[0], self.Pollster.test_data)
def test_setup_polling_tasks_multiple_interval(self):
self.pipeline_cfg.append({
'name': "test_pipeline",
'interval': 10,
'counters': ['test'],
'transformers': [],
'publishers': ["test_pub"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 2)
self.assertTrue(60 in polling_tasks.keys())
self.assertTrue(10 in polling_tasks.keys())
def test_setup_polling_tasks_mismatch_counter(self):
self.pipeline_cfg.append(
{
'name': "test_pipeline_1",
'interval': 10,
'counters': ['test_invalid'],
'transformers': [],
'publishers': ["test_pub"],
})
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1)
self.assertTrue(60 in polling_tasks.keys())
def test_setup_polling_task_same_interval(self):
self.pipeline_cfg.append({
'name': "test_pipeline",
'interval': 60,
'counters': ['testanother'],
'transformers': [],
'publishers': ["test_pub"],
})
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks), 1)
pollsters = polling_tasks.get(60).pollsters
self.assertEqual(len(pollsters), 2)
def test_interval_exception_isolation(self):
self.pipeline_cfg = [
{
'name': "test_pipeline_1",
'interval': 10,
'counters': ['testexceptionanother'],
'transformers': [],
'publishers': ["test_pub"],
},
{
'name': "test_pipeline_2",
'interval': 10,
'counters': ['testexception'],
'transformers': [],
'publishers': ["test_pub"],
},
]
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.publisher_manager)
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(len(polling_tasks.keys()), 1)
task = polling_tasks.get(10)
self.mgr.interval_task(polling_tasks.get(10))
self.assertEqual(len(self.publisher.counters), 0)

View File

@ -22,12 +22,14 @@ import datetime
import mock
from oslo.config import cfg
from keystoneclient.v2_0 import client as ksclient
from stevedore import extension
from ceilometer.central import manager
from ceilometer import counter
from ceilometer.tests import base
from keystoneclient.v2_0 import client as ksclient
from tests import agentbase
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@ -37,55 +39,14 @@ def test_load_plugins():
return
class TestRunTasks(base.TestCase):
class TestRunTasks(agentbase.BaseAgentManagerTestCase):
class Pollster:
counters = []
test_data = counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'Pollster'},
)
def setup_manager(self):
self.mgr = manager.AgentManager()
def get_counters(self, manager):
self.counters.append((manager, self.test_data))
return [self.test_data]
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestRunTasks, self).setUp()
self.stubs.Set(ksclient, 'Client', lambda *args, **kwargs: None)
self.mgr = manager.AgentManager()
self.mgr.pollster_manager = extension.ExtensionManager(
'fake',
invoke_on_load=False,
)
self.mgr.pollster_manager.extensions = [
extension.Extension('test',
None,
None,
self.Pollster(), ),
]
# Invoke the periodic tasks to call the pollsters.
self.mgr.periodic_tasks(None)
def tearDown(self):
self.Pollster.counters = []
super(TestRunTasks, self).tearDown()
def test_message(self):
self.assertEqual(len(self.Pollster.counters), 1)
self.assertTrue(self.Pollster.counters[0][1] is
self.Pollster.test_data)
def test_notifications(self):
self.assertTrue(self.mgr.pipeline_manager.publisher.called)
args, _ = self.mgr.pipeline_manager.publisher.call_args
self.assertEqual(args[1], cfg.CONF.counter_source)

View File

@ -23,12 +23,17 @@ import datetime
import mock
from oslo.config import cfg
from stevedore import extension
from stevedore.tests import manager as extension_tests
from stevedore import dispatch
from ceilometer import nova_client
from ceilometer.compute import manager
from ceilometer import counter
from ceilometer import pipeline
from ceilometer.tests import base
from tests import agentbase
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def test_load_plugins():
@ -37,25 +42,7 @@ def test_load_plugins():
return
class TestRunTasks(base.TestCase):
class Pollster:
counters = []
test_data = counter.Counter(
name='test',
type=counter.TYPE_CUMULATIVE,
unit='',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
resource_metadata={'name': 'Pollster'},
)
def get_counters(self, manager, instance):
self.counters.append((manager, instance))
return [self.test_data]
class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def _fake_instance(self, name, state):
instance = mock.MagicMock()
@ -63,20 +50,12 @@ class TestRunTasks(base.TestCase):
setattr(instance, 'OS-EXT-STS:vm_state', state)
return instance
def setup_manager(self):
self.mgr = manager.AgentManager()
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
def setUp(self):
super(TestRunTasks, self).setUp()
self.mgr = manager.AgentManager()
self.mgr.pollster_manager = extension.ExtensionManager(
'fake',
invoke_on_load=False,
)
self.mgr.pollster_manager.extensions = [
extension.Extension('test',
None,
None,
self.Pollster(), ),
]
# Set up a fake instance value to be returned by
# instance_get_all_by_host() so when the manager gets the list
@ -85,19 +64,18 @@ class TestRunTasks(base.TestCase):
stillborn_instance = self._fake_instance('stillborn', 'error')
self.stubs.Set(nova_client.Client, 'instance_get_all_by_host',
lambda *x: [self.instance, stillborn_instance])
self.mox.ReplayAll()
# Invoke the periodic tasks to call the pollsters.
self.mgr.periodic_tasks(None)
def tearDown(self):
self.Pollster.counters = []
super(TestRunTasks, self).tearDown()
def test_message(self):
def test_notifier_task(self):
self.mgr.setup_notifier_task()
self.mgr.poll_instance(None, self.instance)
self.assertEqual(len(self.Pollster.counters), 1)
assert self.publisher.counters[0] == self.Pollster.test_data
def test_setup_polling_tasks(self):
super(TestRunTasks, self).test_setup_polling_tasks()
self.assertTrue(self.Pollster.counters[0][1] is self.instance)
def test_notifications(self):
self.assertTrue(self.mgr.pipeline_manager.publisher.called)
args, _ = self.mgr.pipeline_manager.publisher.call_args
self.assertEqual(args[1], cfg.CONF.counter_source)
def test_interval_exception_isolation(self):
super(TestRunTasks, self).test_interval_exception_isolation()
self.assertEqual(len(self.PollsterException.counters), 1)
self.assertEqual(len(self.PollsterExceptionAnother.counters), 1)

View File

@ -78,6 +78,9 @@ class TestNovaNotifier(base.TestCase):
self.counters.append((manager, instance))
return [self.test_data]
def get_counter_names(self):
return ['test']
def fake_db_instance_get(self, context, id_):
if self.instance['uuid'] == id_:
return mock.MagicMock(name=self.instance['name'],

View File

@ -45,22 +45,20 @@ class TestSwiftMiddleware(base.TestCase):
class _faux_pipeline_manager(object):
class _faux_pipeline(object):
def __init__(self):
def __init__(self, pipeline_manager):
self.pipeline_manager = pipeline_manager
self.counters = []
def publish_counters(self, ctxt, counters, source):
self.counters.extend(counters)
def flush(self, ctx, source):
def flush(self, context, source):
pass
def __init__(self):
self.pipelines = [self._faux_pipeline()]
self.pipelines = [self._faux_pipeline(self)]
def publisher(self, context, source):
return pipeline.Publisher(self.pipelines, context, source)
def flush(self, context, source):
def flush(self, ctx, source):
pass
def _faux_setup_pipeline(self, publisher_manager):