Implement bay monitoring and notifications

* Implement a bay monitoring framework that provides APIs to pull data from
  bay, and compute metrics based on the data.
* Add support for computing memory utilization of a swarm bay.
* Add a periodic task that sends metrics to ceilometer.

Change-Id: I32730e116c59a4e5f9d1ccb466d464c2fc1f6193
Partially-Implements: blueprint autoscale-bay
Partially-Implements: blueprint magnum-notifications
This commit is contained in:
Hongbin Lu 2015-08-20 17:04:11 -04:00
parent 413764cbe2
commit b88085c93c
4 changed files with 441 additions and 0 deletions

View File

@ -0,0 +1,137 @@
# Copyright 2015 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo_config import cfg
from oslo_log import log
import six
from magnum.conductor.handlers.common import docker_client
from magnum.i18n import _LW
from magnum import objects
from magnum.objects.fields import BayType as bay_type
LOG = log.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('docker_remote_api_version',
'magnum.conductor.handlers.docker_conductor',
group='docker')
CONF.import_opt('default_timeout',
'magnum.conductor.handlers.docker_conductor',
group='docker')
@six.add_metaclass(abc.ABCMeta)
class MonitorBase(object):
def __init__(self, bay):
self.bay = bay
@abc.abstractproperty
def metrics_spec(self):
"""Metric specification."""
@abc.abstractmethod
def pull_data(self):
"""Pull data from bay."""
def get_metric_names(self):
return self.metrics_spec.keys()
def get_metric_unit(self, metric_name):
return self.metrics_spec[metric_name]['unit']
def compute_metric_value(self, metric_name):
func_name = self.metrics_spec[metric_name]['func']
func = getattr(self, func_name)
return func()
class SwarmMonitor(MonitorBase):
def __init__(self, bay):
super(SwarmMonitor, self).__init__(bay)
self.data = {}
self.data['nodes'] = []
self.data['containers'] = []
@property
def metrics_spec(self):
return {
'memory_util': {
'unit': '%',
'func': 'compute_memory_util',
},
}
def pull_data(self):
# pull data from each bay node
nodes = []
for node_addr in (self.bay.node_addresses + [self.bay.api_address]):
docker = self._docker_client(node_addr)
node_info = docker.info()
nodes.append(node_info)
self.data['nodes'] = nodes
# pull data from each container
containers = []
docker = self._docker_swarm_client(self.bay)
for container in docker.containers(all=True):
try:
container = docker.inspect_container(container['Id'])
except Exception as e:
LOG.warn(_LW("Ignore error [%(e)s] when inspecting container "
"%(container_id)s."),
{'e': e, 'container_id': container['Id']},
exc_info=True)
containers.append(container)
self.data['containers'] = containers
def compute_memory_util(self):
mem_total = 0
for node in self.data['nodes']:
mem_total += node['MemTotal']
mem_reserved = 0
for container in self.data['containers']:
mem_reserved += container['Config']['Memory']
if mem_total == 0:
return 0
else:
return mem_reserved * 100 / mem_total
def _docker_client(self, api_address, port=2375):
tcp_url = 'tcp://%s:%s' % (api_address, port)
return docker_client.DockerHTTPClient(
tcp_url,
CONF.docker.docker_remote_api_version,
CONF.docker.default_timeout
)
def _docker_swarm_client(self, bay):
return self._docker_client(bay.api_address, port=2376)
def create_monitor(context, bay):
baymodel = objects.BayModel.get_by_uuid(context, bay.baymodel_id)
if baymodel.coe == bay_type.SWARM:
return SwarmMonitor(bay)
# TODO(hongbin): add support for other bay types
LOG.debug("Cannot create monitor with bay type '%s'" % baymodel.coe)
return None

View File

@ -23,6 +23,8 @@ from oslo_service import threadgroup
from magnum.common import clients from magnum.common import clients
from magnum.common import context from magnum.common import context
from magnum.common import exception from magnum.common import exception
from magnum.common import rpc
from magnum.conductor import monitors
from magnum.i18n import _ from magnum.i18n import _
from magnum.i18n import _LI from magnum.i18n import _LI
from magnum.i18n import _LW from magnum.i18n import _LW
@ -54,6 +56,7 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
self.host = conf.host self.host = conf.host
self.binary = binary self.binary = binary
super(MagnumPeriodicTasks, self).__init__(conf) super(MagnumPeriodicTasks, self).__init__(conf)
self.notifier = rpc.get_notifier()
@periodic_task.periodic_task(run_immediately=True) @periodic_task.periodic_task(run_immediately=True)
@set_context @set_context
@ -147,6 +150,49 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks):
LOG.warn(_LW("Ignore error [%s] when syncing up bay status."), e, LOG.warn(_LW("Ignore error [%s] when syncing up bay status."), e,
exc_info=True) exc_info=True)
@periodic_task.periodic_task(run_immediately=True)
@set_context
def _send_bay_metrics(self, ctx):
LOG.debug('Starting to send bay metrics')
for bay in objects.Bay.list(ctx):
if bay.status not in [bay_status.CREATE_COMPLETE,
bay_status.UPDATE_COMPLETE]:
continue
monitor = monitors.create_monitor(ctx, bay)
if monitor is None:
continue
try:
monitor.pull_data()
except Exception as e:
LOG.warn(_LW("Skip pulling data from bay %(bay)s due to "
"error: %(e)s"),
{'e': e, 'bay': bay.uuid}, exc_info=True)
continue
metrics = list()
for name in monitor.get_metric_names():
try:
metric = {
'name': name,
'value': monitor.compute_metric_value(name),
'unit': monitor.get_metric_unit(name),
}
metrics.append(metric)
except Exception as e:
LOG.warn(_LW("Skip adding metric %(name)s due to "
"error: %(e)s"),
{'e': e, 'name': name}, exc_info=True)
message = dict(metrics=metrics,
user_id=bay.user_id,
project_id=bay.project_id,
resource_id=bay.uuid)
LOG.debug("About to send notification: '%s'" % message)
self.notifier.info(ctx, "magnum.bay.metrics.update",
message)
def setup(conf, binary): def setup(conf, binary):
tg = threadgroup.ThreadGroup() tg = threadgroup.ThreadGroup()

View File

@ -0,0 +1,139 @@
# Copyright 2015 Huawei Technologies Co.,LTD.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from magnum.conductor import monitors
from magnum import objects
from magnum.tests import base
from magnum.tests.unit.db import utils
class MonitorsTestCase(base.TestCase):
test_metrics_spec = {
'metric1': {
'unit': 'metric1_unit',
'func': 'metric1_func',
},
'metric2': {
'unit': 'metric2_unit',
'func': 'metric2_func',
},
}
def setUp(self):
super(MonitorsTestCase, self).setUp()
bay = utils.get_test_bay(node_addresses=['1.2.3.4'],
api_address='5.6.7.8')
self.bay = objects.Bay(self.context, **bay)
self.monitor = monitors.SwarmMonitor(self.bay)
p = mock.patch('magnum.conductor.monitors.SwarmMonitor.metrics_spec',
new_callable=mock.PropertyMock)
self.mock_metrics_spec = p.start()
self.mock_metrics_spec.return_value = self.test_metrics_spec
self.addCleanup(p.stop)
@mock.patch('magnum.objects.BayModel.get_by_uuid')
def test_create_monitor_success(self, mock_baymodel_get_by_uuid):
baymodel = mock.MagicMock()
baymodel.coe = 'swarm'
mock_baymodel_get_by_uuid.return_value = baymodel
monitor = monitors.create_monitor(self.context, self.bay)
self.assertTrue(isinstance(monitor, monitors.SwarmMonitor))
@mock.patch('magnum.objects.BayModel.get_by_uuid')
def test_create_monitor_unsupported_coe(self, mock_baymodel_get_by_uuid):
baymodel = mock.MagicMock()
baymodel.coe = 'unsupported'
mock_baymodel_get_by_uuid.return_value = baymodel
monitor = monitors.create_monitor(self.context, self.bay)
self.assertIsNone(monitor)
@mock.patch('magnum.conductor.handlers.common.docker_client.'
'DockerHTTPClient')
def test_swarm_monitor_pull_data_success(self, mock_docker_http_client):
mock_docker = mock.MagicMock()
mock_docker.info.return_value = 'test_node'
mock_docker.containers.return_value = [mock.MagicMock()]
mock_docker.inspect_container.return_value = 'test_container'
mock_docker_http_client.return_value = mock_docker
self.monitor.pull_data()
self.assertEqual(self.monitor.data['nodes'],
['test_node', 'test_node'])
self.assertEqual(self.monitor.data['containers'], ['test_container'])
@mock.patch('magnum.conductor.handlers.common.docker_client.'
'DockerHTTPClient')
def test_swarm_monitor_pull_data_raise(self, mock_docker_http_client):
mock_container = mock.MagicMock()
mock_docker = mock.MagicMock()
mock_docker.info.return_value = 'test_node'
mock_docker.containers.return_value = [mock_container]
mock_docker.inspect_container.side_effect = Exception("inspect error")
mock_docker_http_client.return_value = mock_docker
self.monitor.pull_data()
self.assertEqual(self.monitor.data['nodes'],
['test_node', 'test_node'])
self.assertEqual(self.monitor.data['containers'], [mock_container])
def test_swarm_monitor_get_metric_names(self):
names = self.monitor.get_metric_names()
self.assertEqual(sorted(names), sorted(['metric1', 'metric2']))
def test_swarm_monitor_get_metric_unit(self):
unit = self.monitor.get_metric_unit('metric1')
self.assertEqual(unit, 'metric1_unit')
def test_swarm_monitor_compute_metric_value(self):
mock_func = mock.MagicMock()
mock_func.return_value = 'metric1_value'
self.monitor.metric1_func = mock_func
value = self.monitor.compute_metric_value('metric1')
self.assertEqual(value, 'metric1_value')
def test_swarm_monitor_compute_memory_util(self):
test_data = {
'nodes': [
{
'Name': 'node',
'MemTotal': 20,
},
],
'containers': [
{
'Name': 'container',
'Config': {
'Memory': 10,
},
},
],
}
self.monitor.data = test_data
mem_util = self.monitor.compute_memory_util()
self.assertEqual(mem_util, 50)
test_data = {
'nodes': [],
'containers': [],
}
self.monitor.data = test_data
mem_util = self.monitor.compute_memory_util()
self.assertEqual(mem_util, 0)

View File

@ -55,10 +55,13 @@ class PeriodicTestCase(base.TestCase):
status=bay_status.DELETE_IN_PROGRESS) status=bay_status.DELETE_IN_PROGRESS)
bay3 = utils.get_test_bay(id=3, stack_id='33', bay3 = utils.get_test_bay(id=3, stack_id='33',
status=bay_status.UPDATE_IN_PROGRESS) status=bay_status.UPDATE_IN_PROGRESS)
bay4 = utils.get_test_bay(id=4, stack_id='44',
status=bay_status.CREATE_COMPLETE)
self.bay1 = objects.Bay(ctx, **bay1) self.bay1 = objects.Bay(ctx, **bay1)
self.bay2 = objects.Bay(ctx, **bay2) self.bay2 = objects.Bay(ctx, **bay2)
self.bay3 = objects.Bay(ctx, **bay3) self.bay3 = objects.Bay(ctx, **bay3)
self.bay4 = objects.Bay(ctx, **bay4)
mock_magnum_service_refresh = mock.Mock() mock_magnum_service_refresh = mock.Mock()
@ -185,3 +188,119 @@ class PeriodicTestCase(base.TestCase):
periodic_a.update_magnum_service(None) periodic_a.update_magnum_service(None)
self.fake_ms_refresh.assert_called_once_with(mock.ANY) self.fake_ms_refresh.assert_called_once_with(mock.ANY)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Bay.list')
@mock.patch('magnum.common.rpc.get_notifier')
@mock.patch('magnum.common.context.make_admin_context')
def test_send_bay_metrics(self, mock_make_admin_context, mock_get_notifier,
mock_bay_list, mock_create_monitor):
"""Test if RPC notifier receives the expected message"""
mock_make_admin_context.return_value = self.context
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_bay_list.return_value = [self.bay1, self.bay2, self.bay3,
self.bay4]
monitor = mock.MagicMock()
monitor.get_metric_names.return_value = ['metric1', 'metric2']
monitor.compute_metric_value.return_value = 30
monitor.get_metric_unit.return_value = '%'
mock_create_monitor.return_value = monitor
periodic.MagnumPeriodicTasks(
CONF, 'fake-conductor')._send_bay_metrics(self.context)
expected_event_type = 'magnum.bay.metrics.update'
expected_metrics = [
{
'name': 'metric1',
'value': 30,
'unit': '%',
},
{
'name': 'metric2',
'value': 30,
'unit': '%',
},
]
expected_msg = {
'user_id': self.bay4.user_id,
'project_id': self.bay4.project_id,
'resource_id': self.bay4.uuid,
'metrics': expected_metrics
}
self.assertEqual(mock_create_monitor.call_count, 1)
notifier.info.assert_called_once_with(
self.context, expected_event_type, expected_msg)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Bay.list')
@mock.patch('magnum.common.rpc.get_notifier')
@mock.patch('magnum.common.context.make_admin_context')
def test_send_bay_metrics_compute_metric_raise(
self, mock_make_admin_context, mock_get_notifier, mock_bay_list,
mock_create_monitor):
mock_make_admin_context.return_value = self.context
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_bay_list.return_value = [self.bay4]
monitor = mock.MagicMock()
monitor.get_metric_names.return_value = ['metric1', 'metric2']
monitor.compute_metric_value.side_effect = Exception(
"error on computing metric")
mock_create_monitor.return_value = monitor
periodic.MagnumPeriodicTasks(
CONF, 'fake-conductor')._send_bay_metrics(self.context)
expected_event_type = 'magnum.bay.metrics.update'
expected_msg = {
'user_id': self.bay4.user_id,
'project_id': self.bay4.project_id,
'resource_id': self.bay4.uuid,
'metrics': []
}
self.assertEqual(mock_create_monitor.call_count, 1)
notifier.info.assert_called_once_with(
self.context, expected_event_type, expected_msg)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Bay.list')
@mock.patch('magnum.common.rpc.get_notifier')
@mock.patch('magnum.common.context.make_admin_context')
def test_send_bay_metrics_pull_data_raise(
self, mock_make_admin_context, mock_get_notifier, mock_bay_list,
mock_create_monitor):
mock_make_admin_context.return_value = self.context
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_bay_list.return_value = [self.bay4]
monitor = mock.MagicMock()
monitor.pull_data.side_effect = Exception("error on pulling data")
mock_create_monitor.return_value = monitor
periodic.MagnumPeriodicTasks(
CONF, 'fake-conductor')._send_bay_metrics(self.context)
self.assertEqual(mock_create_monitor.call_count, 1)
self.assertEqual(notifier.info.call_count, 0)
@mock.patch('magnum.conductor.monitors.create_monitor')
@mock.patch('magnum.objects.Bay.list')
@mock.patch('magnum.common.rpc.get_notifier')
@mock.patch('magnum.common.context.make_admin_context')
def test_send_bay_metrics_monitor_none(
self, mock_make_admin_context, mock_get_notifier, mock_bay_list,
mock_create_monitor):
mock_make_admin_context.return_value = self.context
notifier = mock.MagicMock()
mock_get_notifier.return_value = notifier
mock_bay_list.return_value = [self.bay4]
mock_create_monitor.return_value = None
periodic.MagnumPeriodicTasks(
CONF, 'fake-conductor')._send_bay_metrics(self.context)
self.assertEqual(mock_create_monitor.call_count, 1)
self.assertEqual(notifier.info.call_count, 0)