Change compute pollster API to work on one instance at a time

We will eventually need the compute agent to accept instructions to
poll the status of an instance before nova deletes it (for details
see bug 1005944). This change prepares for that fix by making
the compute agent pollsters take an instance as an argument
provided by the caller, instead of accessing the database directly
to look up the instances known to be running on the current
host.

Change-Id: I6a16405fd65bfb3c190d02f3d70b2bde7fc0fc83
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-07-30 14:42:00 -04:00
parent 0e8f2359d9
commit dbccbb5cb9
5 changed files with 87 additions and 66 deletions

View File

@ -83,20 +83,18 @@ class DiskIOPollster(plugin.ComputePollster):
for target in tree.findall('devices/disk/target')
])
def get_counters(self, manager, context):
def get_counters(self, manager, instance):
if FLAGS.compute_driver == 'libvirt.LibvirtDriver':
conn = get_libvirt_connection()
for instance in manager.db.instance_get_all_by_host(context,
manager.host):
# TODO(jd) This does not work see bug#998089
# for disk in conn.get_disks(instance.name):
try:
disks = self._get_disks(conn, instance.name)
except Exception as err:
self.LOG.warning('Ignoring instance %s: %s',
instance.name, err)
self.LOG.exception(err)
continue
# TODO(jd) This does not work see bug#998089
# for disk in conn.get_disks(instance.name):
try:
disks = self._get_disks(conn, instance.name)
except Exception as err:
self.LOG.warning('Ignoring instance %s: %s',
instance.name, err)
self.LOG.exception(err)
else:
bytes = 0
for disk in disks:
stats = conn.block_stats(instance.name, disk)
@ -115,28 +113,24 @@ class CPUPollster(plugin.ComputePollster):
LOG = log.getLogger(__name__ + '.cpu')
def get_counters(self, manager, context):
def get_counters(self, manager, instance):
conn = get_libvirt_connection()
# FIXME(dhellmann): How do we get a list of instances without
# talking directly to the database?
for instance in manager.db.instance_get_all_by_host(context,
manager.host):
self.LOG.info('checking instance %s', instance.uuid)
try:
cpu_info = conn.get_info(instance)
self.LOG.info("CPUTIME USAGE: %s %d",
instance, cpu_info['cpu_time'])
yield make_counter_from_instance(instance,
name='cpu',
type='cumulative',
volume=cpu_info['cpu_time'],
)
yield make_counter_from_instance(instance,
name='instance',
type='cumulative',
volume=1,
)
except Exception as err:
self.LOG.error('could not get CPU time for %s: %s',
instance.uuid, err)
self.LOG.exception(err)
self.LOG.info('checking instance %s', instance.uuid)
try:
cpu_info = conn.get_info(instance)
self.LOG.info("CPUTIME USAGE: %s %d",
instance, cpu_info['cpu_time'])
yield make_counter_from_instance(instance,
name='cpu',
type='cumulative',
volume=cpu_info['cpu_time'],
)
yield make_counter_from_instance(instance,
name='instance',
type='cumulative',
volume=1,
)
except Exception as err:
self.LOG.error('could not get CPU time for %s: %s',
instance.uuid, err)
self.LOG.exception(err)

View File

@ -26,7 +26,7 @@ from ceilometer import publish
LOG = log.getLogger(__name__)
COMPUTE_PLUGIN_NAMESPACE = 'ceilometer.poll.compute'
PLUGIN_NAMESPACE = 'ceilometer.poll.compute'
class AgentManager(manager.Manager):
@ -37,7 +37,7 @@ class AgentManager(manager.Manager):
def _load_plugins(self):
self.pollsters = []
for ep in pkg_resources.iter_entry_points(COMPUTE_PLUGIN_NAMESPACE):
for ep in pkg_resources.iter_entry_points(PLUGIN_NAMESPACE):
try:
plugin_class = ep.load()
plugin = plugin_class()
@ -48,24 +48,28 @@ class AgentManager(manager.Manager):
# it should be enabled.
self.pollsters.append((ep.name, plugin))
LOG.info('loaded pollster %s:%s',
COMPUTE_PLUGIN_NAMESPACE, ep.name)
PLUGIN_NAMESPACE, ep.name)
except Exception as err:
LOG.warning('Failed to load pollster %s:%s',
ep.name, err)
LOG.exception(err)
if not self.pollsters:
LOG.warning('Failed to load any pollsters for %s',
COMPUTE_PLUGIN_NAMESPACE)
PLUGIN_NAMESPACE)
return
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
for name, pollster in self.pollsters:
try:
LOG.info('polling %s', name)
for c in pollster.get_counters(self, context):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context, c)
except Exception as err:
LOG.warning('Continuing after error from %s: %s', name, err)
LOG.exception(err)
# FIXME(dhellmann): How do we get a list of instances without
# talking directly to the database?
for instance in self.db.instance_get_all_by_host(context, self.host):
for name, pollster in self.pollsters:
try:
LOG.info('polling %s', name)
for c in pollster.get_counters(self, instance):
LOG.info('COUNTER: %s', c)
publish.publish_counter(context, c)
except Exception as err:
LOG.warning('Continuing after error from %s: %s',
name, err)
LOG.exception(err)

View File

@ -42,6 +42,6 @@ class PollsterBase(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_counters(self, manager, context):
def get_counters(self, manager, instance):
"""Return a sequence of Counter instances from polling the
resources."""

View File

@ -42,22 +42,29 @@ class TestDiskIOPollster(test.TestCase):
self.manager = manager.AgentManager()
self.pollster = libvirt.DiskIOPollster()
super(TestDiskIOPollster, self).setUp()
self.instance = db.instance_create(self.context, {})
flags.FLAGS.compute_driver = 'libvirt.LibvirtDriver'
@test.skip_if(libvirt_missing, 'Test requires libvirt')
def test_fetch_diskio(self):
list(self.pollster.get_counters(self.manager, self.context))
counters = list(self.pollster.get_counters(self.manager,
self.instance))
#assert counters
# FIXME(dhellmann): The CI environment doesn't produce
# a response when the fake driver asks for the disks, so
# we do not get any counters in response.
@test.skip_if(libvirt_missing, 'Test requires libvirt')
def test_fetch_diskio_not_libvirt(self):
flags.FLAGS.compute_driver = 'fake.FakeDriver'
counters = list(self.pollster.get_counters(self.manager,
self.instance))
assert not counters
@test.skip_if(libvirt_missing, 'Test requires libvirt')
def test_fetch_diskio_with_libvirt_non_existent_instance(self):
flags.FLAGS.compute_driver = 'libvirt.LibvirtDriver'
instance = db.instance_create(self.context, {})
self.mox.StubOutWithMock(self.manager.db, 'instance_get_all_by_host')
self.manager.db.instance_get_all_by_host(self.context,
self.manager.host,
).AndReturn([instance])
self.mox.ReplayAll()
list(self.pollster.get_counters(self.manager, self.context))
print 'ID:', self.instance.id
inst = db.instance_get(self.context, self.instance.id)
inst.id = 999 # change the id so the driver cannot find the instance
counters = list(self.pollster.get_counters(self.manager, inst))
assert not counters

View File

@ -53,8 +53,8 @@ class TestRunTasks(test.TestCase):
},
)
def get_counters(self, manager, context):
self.counters.append((manager, context))
def get_counters(self, manager, instance):
self.counters.append((manager, instance))
return [self.test_data]
def faux_notify(self, context, msg):
@ -67,7 +67,23 @@ class TestRunTasks(test.TestCase):
self.mgr = manager.AgentManager()
self.mgr.pollsters = [('test', self.Pollster())]
self.ctx = context.RequestContext("user", "project")
# Set up a fake instance value to be returned by
# instance_get_all_by_host() so when the manager gets the list
# of instances to poll we can control the results.
self.instance = 'faux instance'
self.mox.StubOutWithMock(self.mgr.db, 'instance_get_all_by_host')
self.mgr.db.instance_get_all_by_host(
self.ctx,
self.mgr.host,
).AndReturn([self.instance])
self.mox.ReplayAll()
# Invoke the periodic tasks to call the pollsters.
self.mgr.periodic_tasks(self.ctx)
def test_message(self):
assert self.Pollster.counters[0][1] is self.ctx
assert self.Pollster.counters[0][1] is self.instance
def test_notifications(self):
assert self.notifications[0] is self.Pollster.test_data
assert len(self.notifications) == 1