diff --git a/magnum/conductor/monitors.py b/magnum/conductor/monitors.py new file mode 100644 index 0000000000..9736af574a --- /dev/null +++ b/magnum/conductor/monitors.py @@ -0,0 +1,137 @@ +# Copyright 2015 Huawei Technologies Co.,LTD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +from oslo_config import cfg +from oslo_log import log +import six + +from magnum.conductor.handlers.common import docker_client +from magnum.i18n import _LW +from magnum import objects +from magnum.objects.fields import BayType as bay_type + + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF +CONF.import_opt('docker_remote_api_version', + 'magnum.conductor.handlers.docker_conductor', + group='docker') +CONF.import_opt('default_timeout', + 'magnum.conductor.handlers.docker_conductor', + group='docker') + + +@six.add_metaclass(abc.ABCMeta) +class MonitorBase(object): + + def __init__(self, bay): + self.bay = bay + + @abc.abstractproperty + def metrics_spec(self): + """Metric specification.""" + + @abc.abstractmethod + def pull_data(self): + """Pull data from bay.""" + + def get_metric_names(self): + return self.metrics_spec.keys() + + def get_metric_unit(self, metric_name): + return self.metrics_spec[metric_name]['unit'] + + def compute_metric_value(self, metric_name): + func_name = self.metrics_spec[metric_name]['func'] + func = getattr(self, func_name) + return func() + + +class SwarmMonitor(MonitorBase): + + def __init__(self, bay): + super(SwarmMonitor, self).__init__(bay) + self.data = {} + self.data['nodes'] = [] + self.data['containers'] = [] + + @property + def metrics_spec(self): + return { + 'memory_util': { + 'unit': '%', + 'func': 'compute_memory_util', + }, + } + + def pull_data(self): + # pull data from each bay node + nodes = [] + for node_addr in (self.bay.node_addresses + [self.bay.api_address]): + docker = self._docker_client(node_addr) + node_info = docker.info() + nodes.append(node_info) + self.data['nodes'] = nodes + + # pull data from each container + containers = [] + docker = self._docker_swarm_client(self.bay) + for container in docker.containers(all=True): + try: + container = docker.inspect_container(container['Id']) + except Exception as e: + LOG.warn(_LW("Ignore error [%(e)s] when inspecting container " + "%(container_id)s."), + {'e': e, 'container_id': container['Id']}, + exc_info=True) + containers.append(container) + self.data['containers'] = containers + + def compute_memory_util(self): + mem_total = 0 + for node in self.data['nodes']: + mem_total += node['MemTotal'] + mem_reserved = 0 + for container in self.data['containers']: + mem_reserved += container['Config']['Memory'] + + if mem_total == 0: + return 0 + else: + return mem_reserved * 100 / mem_total + + def _docker_client(self, api_address, port=2375): + tcp_url = 'tcp://%s:%s' % (api_address, port) + return docker_client.DockerHTTPClient( + tcp_url, + CONF.docker.docker_remote_api_version, + CONF.docker.default_timeout + ) + + def _docker_swarm_client(self, bay): + return self._docker_client(bay.api_address, port=2376) + + +def create_monitor(context, bay): + baymodel = objects.BayModel.get_by_uuid(context, bay.baymodel_id) + if baymodel.coe == bay_type.SWARM: + return SwarmMonitor(bay) + + # TODO(hongbin): add support for other bay types + LOG.debug("Cannot create monitor with bay type '%s'" % baymodel.coe) + return None diff --git a/magnum/service/periodic.py b/magnum/service/periodic.py index c76a665fc5..34b3e1334b 100644 --- a/magnum/service/periodic.py +++ b/magnum/service/periodic.py @@ -23,6 +23,8 @@ from oslo_service import threadgroup from magnum.common import clients from magnum.common import context from magnum.common import exception +from magnum.common import rpc +from magnum.conductor import monitors from magnum.i18n import _ from magnum.i18n import _LI from magnum.i18n import _LW @@ -54,6 +56,7 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks): self.host = conf.host self.binary = binary super(MagnumPeriodicTasks, self).__init__(conf) + self.notifier = rpc.get_notifier() @periodic_task.periodic_task(run_immediately=True) @set_context @@ -147,6 +150,49 @@ class MagnumPeriodicTasks(periodic_task.PeriodicTasks): LOG.warn(_LW("Ignore error [%s] when syncing up bay status."), e, exc_info=True) + @periodic_task.periodic_task(run_immediately=True) + @set_context + def _send_bay_metrics(self, ctx): + LOG.debug('Starting to send bay metrics') + for bay in objects.Bay.list(ctx): + if bay.status not in [bay_status.CREATE_COMPLETE, + bay_status.UPDATE_COMPLETE]: + continue + + monitor = monitors.create_monitor(ctx, bay) + if monitor is None: + continue + + try: + monitor.pull_data() + except Exception as e: + LOG.warn(_LW("Skip pulling data from bay %(bay)s due to " + "error: %(e)s"), + {'e': e, 'bay': bay.uuid}, exc_info=True) + continue + + metrics = list() + for name in monitor.get_metric_names(): + try: + metric = { + 'name': name, + 'value': monitor.compute_metric_value(name), + 'unit': monitor.get_metric_unit(name), + } + metrics.append(metric) + except Exception as e: + LOG.warn(_LW("Skip adding metric %(name)s due to " + "error: %(e)s"), + {'e': e, 'name': name}, exc_info=True) + + message = dict(metrics=metrics, + user_id=bay.user_id, + project_id=bay.project_id, + resource_id=bay.uuid) + LOG.debug("About to send notification: '%s'" % message) + self.notifier.info(ctx, "magnum.bay.metrics.update", + message) + def setup(conf, binary): tg = threadgroup.ThreadGroup() diff --git a/magnum/tests/unit/conductor/test_monitors.py b/magnum/tests/unit/conductor/test_monitors.py new file mode 100644 index 0000000000..392a5be428 --- /dev/null +++ b/magnum/tests/unit/conductor/test_monitors.py @@ -0,0 +1,139 @@ +# Copyright 2015 Huawei Technologies Co.,LTD. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from magnum.conductor import monitors +from magnum import objects +from magnum.tests import base +from magnum.tests.unit.db import utils + + +class MonitorsTestCase(base.TestCase): + + test_metrics_spec = { + 'metric1': { + 'unit': 'metric1_unit', + 'func': 'metric1_func', + }, + 'metric2': { + 'unit': 'metric2_unit', + 'func': 'metric2_func', + }, + } + + def setUp(self): + super(MonitorsTestCase, self).setUp() + + bay = utils.get_test_bay(node_addresses=['1.2.3.4'], + api_address='5.6.7.8') + self.bay = objects.Bay(self.context, **bay) + self.monitor = monitors.SwarmMonitor(self.bay) + p = mock.patch('magnum.conductor.monitors.SwarmMonitor.metrics_spec', + new_callable=mock.PropertyMock) + self.mock_metrics_spec = p.start() + self.mock_metrics_spec.return_value = self.test_metrics_spec + self.addCleanup(p.stop) + + @mock.patch('magnum.objects.BayModel.get_by_uuid') + def test_create_monitor_success(self, mock_baymodel_get_by_uuid): + baymodel = mock.MagicMock() + baymodel.coe = 'swarm' + mock_baymodel_get_by_uuid.return_value = baymodel + monitor = monitors.create_monitor(self.context, self.bay) + self.assertTrue(isinstance(monitor, monitors.SwarmMonitor)) + + @mock.patch('magnum.objects.BayModel.get_by_uuid') + def test_create_monitor_unsupported_coe(self, mock_baymodel_get_by_uuid): + baymodel = mock.MagicMock() + baymodel.coe = 'unsupported' + mock_baymodel_get_by_uuid.return_value = baymodel + monitor = monitors.create_monitor(self.context, self.bay) + self.assertIsNone(monitor) + + @mock.patch('magnum.conductor.handlers.common.docker_client.' + 'DockerHTTPClient') + def test_swarm_monitor_pull_data_success(self, mock_docker_http_client): + mock_docker = mock.MagicMock() + mock_docker.info.return_value = 'test_node' + mock_docker.containers.return_value = [mock.MagicMock()] + mock_docker.inspect_container.return_value = 'test_container' + mock_docker_http_client.return_value = mock_docker + + self.monitor.pull_data() + + self.assertEqual(self.monitor.data['nodes'], + ['test_node', 'test_node']) + self.assertEqual(self.monitor.data['containers'], ['test_container']) + + @mock.patch('magnum.conductor.handlers.common.docker_client.' + 'DockerHTTPClient') + def test_swarm_monitor_pull_data_raise(self, mock_docker_http_client): + mock_container = mock.MagicMock() + mock_docker = mock.MagicMock() + mock_docker.info.return_value = 'test_node' + mock_docker.containers.return_value = [mock_container] + mock_docker.inspect_container.side_effect = Exception("inspect error") + mock_docker_http_client.return_value = mock_docker + + self.monitor.pull_data() + + self.assertEqual(self.monitor.data['nodes'], + ['test_node', 'test_node']) + self.assertEqual(self.monitor.data['containers'], [mock_container]) + + def test_swarm_monitor_get_metric_names(self): + names = self.monitor.get_metric_names() + self.assertEqual(sorted(names), sorted(['metric1', 'metric2'])) + + def test_swarm_monitor_get_metric_unit(self): + unit = self.monitor.get_metric_unit('metric1') + self.assertEqual(unit, 'metric1_unit') + + def test_swarm_monitor_compute_metric_value(self): + mock_func = mock.MagicMock() + mock_func.return_value = 'metric1_value' + self.monitor.metric1_func = mock_func + value = self.monitor.compute_metric_value('metric1') + self.assertEqual(value, 'metric1_value') + + def test_swarm_monitor_compute_memory_util(self): + test_data = { + 'nodes': [ + { + 'Name': 'node', + 'MemTotal': 20, + }, + ], + 'containers': [ + { + 'Name': 'container', + 'Config': { + 'Memory': 10, + }, + }, + ], + } + self.monitor.data = test_data + mem_util = self.monitor.compute_memory_util() + self.assertEqual(mem_util, 50) + + test_data = { + 'nodes': [], + 'containers': [], + } + self.monitor.data = test_data + mem_util = self.monitor.compute_memory_util() + self.assertEqual(mem_util, 0) diff --git a/magnum/tests/unit/service/test_periodic.py b/magnum/tests/unit/service/test_periodic.py index e443dd8288..f12a778e7f 100644 --- a/magnum/tests/unit/service/test_periodic.py +++ b/magnum/tests/unit/service/test_periodic.py @@ -55,10 +55,13 @@ class PeriodicTestCase(base.TestCase): status=bay_status.DELETE_IN_PROGRESS) bay3 = utils.get_test_bay(id=3, stack_id='33', status=bay_status.UPDATE_IN_PROGRESS) + bay4 = utils.get_test_bay(id=4, stack_id='44', + status=bay_status.CREATE_COMPLETE) self.bay1 = objects.Bay(ctx, **bay1) self.bay2 = objects.Bay(ctx, **bay2) self.bay3 = objects.Bay(ctx, **bay3) + self.bay4 = objects.Bay(ctx, **bay4) mock_magnum_service_refresh = mock.Mock() @@ -185,3 +188,119 @@ class PeriodicTestCase(base.TestCase): periodic_a.update_magnum_service(None) self.fake_ms_refresh.assert_called_once_with(mock.ANY) + + @mock.patch('magnum.conductor.monitors.create_monitor') + @mock.patch('magnum.objects.Bay.list') + @mock.patch('magnum.common.rpc.get_notifier') + @mock.patch('magnum.common.context.make_admin_context') + def test_send_bay_metrics(self, mock_make_admin_context, mock_get_notifier, + mock_bay_list, mock_create_monitor): + """Test if RPC notifier receives the expected message""" + mock_make_admin_context.return_value = self.context + notifier = mock.MagicMock() + mock_get_notifier.return_value = notifier + mock_bay_list.return_value = [self.bay1, self.bay2, self.bay3, + self.bay4] + monitor = mock.MagicMock() + monitor.get_metric_names.return_value = ['metric1', 'metric2'] + monitor.compute_metric_value.return_value = 30 + monitor.get_metric_unit.return_value = '%' + mock_create_monitor.return_value = monitor + + periodic.MagnumPeriodicTasks( + CONF, 'fake-conductor')._send_bay_metrics(self.context) + + expected_event_type = 'magnum.bay.metrics.update' + expected_metrics = [ + { + 'name': 'metric1', + 'value': 30, + 'unit': '%', + }, + { + 'name': 'metric2', + 'value': 30, + 'unit': '%', + }, + ] + expected_msg = { + 'user_id': self.bay4.user_id, + 'project_id': self.bay4.project_id, + 'resource_id': self.bay4.uuid, + 'metrics': expected_metrics + } + + self.assertEqual(mock_create_monitor.call_count, 1) + notifier.info.assert_called_once_with( + self.context, expected_event_type, expected_msg) + + @mock.patch('magnum.conductor.monitors.create_monitor') + @mock.patch('magnum.objects.Bay.list') + @mock.patch('magnum.common.rpc.get_notifier') + @mock.patch('magnum.common.context.make_admin_context') + def test_send_bay_metrics_compute_metric_raise( + self, mock_make_admin_context, mock_get_notifier, mock_bay_list, + mock_create_monitor): + mock_make_admin_context.return_value = self.context + notifier = mock.MagicMock() + mock_get_notifier.return_value = notifier + mock_bay_list.return_value = [self.bay4] + monitor = mock.MagicMock() + monitor.get_metric_names.return_value = ['metric1', 'metric2'] + monitor.compute_metric_value.side_effect = Exception( + "error on computing metric") + mock_create_monitor.return_value = monitor + + periodic.MagnumPeriodicTasks( + CONF, 'fake-conductor')._send_bay_metrics(self.context) + + expected_event_type = 'magnum.bay.metrics.update' + expected_msg = { + 'user_id': self.bay4.user_id, + 'project_id': self.bay4.project_id, + 'resource_id': self.bay4.uuid, + 'metrics': [] + } + self.assertEqual(mock_create_monitor.call_count, 1) + notifier.info.assert_called_once_with( + self.context, expected_event_type, expected_msg) + + @mock.patch('magnum.conductor.monitors.create_monitor') + @mock.patch('magnum.objects.Bay.list') + @mock.patch('magnum.common.rpc.get_notifier') + @mock.patch('magnum.common.context.make_admin_context') + def test_send_bay_metrics_pull_data_raise( + self, mock_make_admin_context, mock_get_notifier, mock_bay_list, + mock_create_monitor): + mock_make_admin_context.return_value = self.context + notifier = mock.MagicMock() + mock_get_notifier.return_value = notifier + mock_bay_list.return_value = [self.bay4] + monitor = mock.MagicMock() + monitor.pull_data.side_effect = Exception("error on pulling data") + mock_create_monitor.return_value = monitor + + periodic.MagnumPeriodicTasks( + CONF, 'fake-conductor')._send_bay_metrics(self.context) + + self.assertEqual(mock_create_monitor.call_count, 1) + self.assertEqual(notifier.info.call_count, 0) + + @mock.patch('magnum.conductor.monitors.create_monitor') + @mock.patch('magnum.objects.Bay.list') + @mock.patch('magnum.common.rpc.get_notifier') + @mock.patch('magnum.common.context.make_admin_context') + def test_send_bay_metrics_monitor_none( + self, mock_make_admin_context, mock_get_notifier, mock_bay_list, + mock_create_monitor): + mock_make_admin_context.return_value = self.context + notifier = mock.MagicMock() + mock_get_notifier.return_value = notifier + mock_bay_list.return_value = [self.bay4] + mock_create_monitor.return_value = None + + periodic.MagnumPeriodicTasks( + CONF, 'fake-conductor')._send_bay_metrics(self.context) + + self.assertEqual(mock_create_monitor.call_count, 1) + self.assertEqual(notifier.info.call_count, 0)