diff --git a/setup.cfg b/setup.cfg index d39ce79d4..4d3d513c0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,6 +44,7 @@ watcher.database.migration_backend = watcher_strategies = dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation + outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl [build_sphinx] source-dir = doc/source diff --git a/watcher/common/keystone.py b/watcher/common/keystone.py index e75f2a233..c58ebbd44 100644 --- a/watcher/common/keystone.py +++ b/watcher/common/keystone.py @@ -53,7 +53,7 @@ class KeystoneClient(object): self._token = None def get_endpoint(self, **kwargs): - kc = self._get_ksclient() + kc = self.get_ksclient() if not kc.has_service_catalog(): raise exception.KeystoneFailure( _('No Keystone service catalog loaded') @@ -63,7 +63,7 @@ class KeystoneClient(object): if kwargs.get('region_name'): attr = 'region' filter_value = kwargs.get('region_name') - return self._get_ksclient().service_catalog.url_for( + return kc.service_catalog.url_for( service_type=kwargs.get('service_type') or 'metering', attr=attr, filter_value=filter_value, @@ -81,15 +81,17 @@ class KeystoneClient(object): # fails to override the version in the URL return urljoin(auth_url.rstrip('/'), api_version) - def _get_ksclient(self): + def get_ksclient(self, creds=None): """Get an endpoint and auth token from Keystone.""" - - ks_args = self.get_credentials() auth_version = CONF.keystone_authtoken.auth_version auth_url = CONF.keystone_authtoken.auth_uri - api_version = self._is_apiv3(auth_url, auth_version) + api_v3 = self._is_apiv3(auth_url, auth_version) + if creds is None: + ks_args = self._get_credentials(api_v3) + else: + ks_args = creds - if api_version: + if api_v3: from keystoneclient.v3 import client else: from keystoneclient.v2_0 import client @@ -99,17 +101,29 @@ class KeystoneClient(object): return client.Client(**ks_args) - def get_credentials(self): - creds = \ - {'auth_url': CONF.keystone_authtoken.auth_uri, - 'username': CONF.keystone_authtoken.admin_user, - 'password': CONF.keystone_authtoken.admin_password, - 'project_name': CONF.keystone_authtoken.admin_tenant_name, - 'user_domain_name': "default", - 'project_domain_name': "default"} + def _get_credentials(self, api_v3): + if api_v3: + creds = \ + {'auth_url': CONF.keystone_authtoken.auth_uri, + 'username': CONF.keystone_authtoken.admin_user, + 'password': CONF.keystone_authtoken.admin_password, + 'project_name': CONF.keystone_authtoken.admin_tenant_name, + 'user_domain_name': "default", + 'project_domain_name': "default"} + else: + creds = \ + {'auth_url': CONF.keystone_authtoken.auth_uri, + 'username': CONF.keystone_authtoken.admin_user, + 'password': CONF.keystone_authtoken.admin_password, + 'tenant_name': CONF.keystone_authtoken.admin_tenant_name} LOG.debug(creds) return creds + def get_credentials(self): + api_v3 = self._is_apiv3(CONF.keystone_authtoken.auth_uri, + CONF.keystone_authtoken.auth_version) + return self._get_credentials(api_v3) + def get_session(self): creds = self.get_credentials() self._auth = generic.Password(**creds) diff --git a/watcher/common/nova.py b/watcher/common/nova.py index b5b2c76df..636f00223 100644 --- a/watcher/common/nova.py +++ b/watcher/common/nova.py @@ -25,11 +25,12 @@ from oslo_log import log import cinderclient.exceptions as ciexceptions import cinderclient.v2.client as ciclient import glanceclient.v2.client as glclient -import keystoneclient.v3.client as ksclient import neutronclient.neutron.client as netclient import novaclient.client as nvclient import novaclient.exceptions as nvexceptions +from watcher.common import keystone + LOG = log.getLogger(__name__) @@ -43,7 +44,7 @@ class NovaClient(object): self.cinder = None self.nova = nvclient.Client(self.NOVA_CLIENT_API_VERSION, session=session) - self.keystone = ksclient.Client(**creds) + self.keystone = keystone.KeystoneClient().get_ksclient(creds) self.glance = None def get_hypervisors_list(self): diff --git a/watcher/decision_engine/strategy/strategies/outlet_temp_control.py b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py new file mode 100644 index 000000000..059d7896a --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/outlet_temp_control.py @@ -0,0 +1,251 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 Intel Corp +# +# Authors: Junjie-Huang +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from oslo_log import log + +from watcher._i18n import _LE +from watcher.common import exception as wexc +from watcher.decision_engine.actions.migration import Migrate +from watcher.decision_engine.actions.migration import MigrationType +from watcher.decision_engine.model.resource import ResourceType +from watcher.decision_engine.model.vm_state import VMState +from watcher.decision_engine.strategy.common.level import StrategyLevel +from watcher.decision_engine.strategy.strategies.base import BaseStrategy +from watcher.metrics_engine.cluster_history.ceilometer import \ + CeilometerClusterHistory + +LOG = log.getLogger(__name__) + + +class OutletTempControl(BaseStrategy): + + DEFAULT_NAME = "outlet_temp_control" + DEFAULT_DESCRIPTION = "outlet temperature based migration strategy" + # The meter to report outlet temperature in ceilometer + METER_NAME = "hardware.ipmi.node.outlet_temperature" + # Unit: degree C + THRESHOLD = 35.0 + + def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION): + """[PoC]Outlet temperature control using live migration + + It is a migration strategy based on the Outlet Temperature of physical + servers. It generates solutions to move a workload whenever a server’s + outlet temperature is higher than the specified threshold. As of now, + we cannot forecast how many instances should be migrated. This is the + reason why we simply plan a single virtual machine migration. + So it's better to use this algorithm with CONTINUOUS audits. + + Requirements: + * Hardware: computer node should support IPMI and PTAS technology + * Software: Ceilometer component ceilometer-agent-ipmi running + in each compute node, and Ceilometer API can report such telemetry + "hardware.ipmi.node.outlet_temperature" successfully. + * You must have at least 2 physical compute nodes to run this strategy. + + Good Strategy: + Towards to software defined infrastructure, the power and thermal + intelligences is being adopted to optimize workload, which can help + improve efficiency, reduce power, as well as to improve datacenter PUE + and lower down operation cost in data center. + Outlet(Exhaust Air) Temperature is one of the important thermal + telemetries to measure thermal/workload status of server. + + :param name: the name of the strategy + :param description: a description of the strategy + """ + super(OutletTempControl, self).__init__(name, description) + # the migration plan will be triggered when the outlet temperature + # reaches threshold + # TODO(zhenzanz): Threshold should be configurable for each audit + self.threshold = self.THRESHOLD + self._meter = self.METER_NAME + self._ceilometer = None + + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = CeilometerClusterHistory() + return self._ceilometer + + @ceilometer.setter + def ceilometer(self, c): + self._ceilometer = c + + def calc_used_res(self, model, hypervisor, cap_cores, cap_mem, cap_disk): + '''calculate the used vcpus, memory and disk based on VM flavors''' + vms = model.get_mapping().get_node_vms(hypervisor) + vcpus_used = 0 + memory_mb_used = 0 + disk_gb_used = 0 + if len(vms) > 0: + for vm_id in vms: + vm = model.get_vm_from_id(vm_id) + vcpus_used += cap_cores.get_capacity(vm) + memory_mb_used += cap_mem.get_capacity(vm) + disk_gb_used += cap_disk.get_capacity(vm) + + return vcpus_used, memory_mb_used, disk_gb_used + + def group_hosts_by_outlet_temp(self, model): + '''Group hosts based on outlet temp meters''' + + hypervisors = model.get_all_hypervisors() + size_cluster = len(hypervisors) + if size_cluster == 0: + raise wexc.ClusterEmpty() + + hosts_need_release = [] + hosts_target = [] + for hypervisor_id in hypervisors: + hypervisor = model.get_hypervisor_from_id(hypervisor_id) + resource_id = hypervisor.uuid + + outlet_temp = self.ceilometer.statistic_aggregation( + resource_id=resource_id, + meter_name=self._meter, + period="30", + aggregate='avg') + # some hosts may not have outlet temp meters, remove from target + if outlet_temp is None: + LOG.warning(_LE("%s: no outlet temp data"), resource_id) + continue + + LOG.debug("%s: outlet temperature %f" % (resource_id, outlet_temp)) + hvmap = {'hv': hypervisor, 'outlet_temp': outlet_temp} + if outlet_temp >= self.threshold: + # mark the hypervisor to release resources + hosts_need_release.append(hvmap) + else: + hosts_target.append(hvmap) + return hosts_need_release, hosts_target + + def choose_vm_to_migrate(self, model, hosts): + '''pick up an active vm instance to migrate from provided hosts''' + + for hvmap in hosts: + mig_src_hypervisor = hvmap['hv'] + vms_of_src = model.get_mapping().get_node_vms(mig_src_hypervisor) + if len(vms_of_src) > 0: + for vm_id in vms_of_src: + try: + # select the first active VM to migrate + vm = model.get_vm_from_id(vm_id) + if vm.state != VMState.ACTIVE.value: + LOG.info(_LE("VM not active, skipped: %s"), + vm.uuid) + continue + return (mig_src_hypervisor, vm) + except wexc.VMNotFound as e: + LOG.info("VM not found Error: %s" % e.message) + pass + + return None + + def filter_dest_servers(self, model, hosts, vm_to_migrate): + '''Only return hosts with sufficient available resources''' + + cap_cores = model.get_resource_from_id(ResourceType.cpu_cores) + cap_disk = model.get_resource_from_id(ResourceType.disk) + cap_mem = model.get_resource_from_id(ResourceType.memory) + + required_cores = cap_cores.get_capacity(vm_to_migrate) + required_disk = cap_disk.get_capacity(vm_to_migrate) + required_mem = cap_mem.get_capacity(vm_to_migrate) + + # filter hypervisors without enough resource + dest_servers = [] + for hvmap in hosts: + host = hvmap['hv'] + # available + cores_used, mem_used, disk_used = self.calc_used_res(model, + host, + cap_cores, + cap_mem, + cap_disk) + cores_available = cap_cores.get_capacity(host) - cores_used + disk_available = cap_disk.get_capacity(host) - mem_used + mem_available = cap_mem.get_capacity(host) - disk_used + if cores_available >= required_cores and \ + disk_available >= required_disk and \ + mem_available >= required_mem: + dest_servers.append(hvmap) + + return dest_servers + + def execute(self, orign_model): + LOG.debug("Initializing Outlet temperature strategy") + + if orign_model is None: + raise wexc.ClusterStateNotDefined() + + current_model = orign_model + hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp( + current_model) + + if len(hosts_need_release) == 0: + # TODO(zhenzanz): return something right if there's no hot servers + LOG.debug("No hosts require optimization") + return self.solution + + if len(hosts_target) == 0: + LOG.warning(_LE("No hosts under outlet temp threshold found")) + return self.solution + + # choose the server with highest outlet t + hosts_need_release = sorted(hosts_need_release, + reverse=True, + key=lambda x: (x["outlet_temp"])) + + vm_to_migrate = self.choose_vm_to_migrate(current_model, + hosts_need_release) + # calculate the vm's cpu cores,memory,disk needs + if vm_to_migrate is None: + return self.solution + + mig_src_hypervisor, vm_src = vm_to_migrate + dest_servers = self.filter_dest_servers(current_model, + hosts_target, + vm_src) + # sort the filtered result by outlet temp + # pick up the lowest one as dest server + if len(dest_servers) == 0: + # TODO(zhenzanz): maybe to warn that there's no resource + # for instance. + LOG.info(_LE("No proper target host could be found")) + return self.solution + + dest_servers = sorted(dest_servers, + reverse=False, + key=lambda x: (x["outlet_temp"])) + # always use the host with lowerest outlet temperature + mig_dst_hypervisor = dest_servers[0]['hv'] + # generate solution to migrate the vm to the dest server, + if current_model.get_mapping().migrate_vm(vm_src, + mig_src_hypervisor, + mig_dst_hypervisor): + live_migrate = Migrate(vm_src, + mig_src_hypervisor, + mig_dst_hypervisor) + live_migrate.migration_type = MigrationType.pre_copy + live_migrate.level = StrategyLevel.conservative + self.solution.add_change_request(live_migrate) + + self.solution.model = current_model + + return self.solution diff --git a/watcher/tests/common/test_keystone.py b/watcher/tests/common/test_keystone.py index e1a93c279..409d00a9b 100644 --- a/watcher/tests/common/test_keystone.py +++ b/watcher/tests/common/test_keystone.py @@ -46,13 +46,17 @@ class TestKeystone(BaseTestCase): self.assertEqual(ep, expected_endpoint) - def test_get_session(self): + @mock.patch('watcher.common.keystone.KeystoneClient._is_apiv3') + def test_get_session(self, mock_apiv3): + mock_apiv3.return_value = True k = KeystoneClient() session = k.get_session() self.assertIsInstance(session.auth, Password) self.assertIsInstance(session, Session) - def test_get_credentials(self): + @mock.patch('watcher.common.keystone.KeystoneClient._is_apiv3') + def test_get_credentials(self, mock_apiv3): + mock_apiv3.return_value = True expected_creds = {'auth_url': None, 'password': None, 'project_domain_name': 'default', diff --git a/watcher/tests/common/test_nova_client.py b/watcher/tests/common/test_nova_client.py index d6e53a404..fe22263a5 100644 --- a/watcher/tests/common/test_nova_client.py +++ b/watcher/tests/common/test_nova_client.py @@ -20,10 +20,10 @@ import time import glanceclient.v2.client as glclient -import keystoneclient.v3.client as ksclient import mock import novaclient.client as nvclient +from watcher.common import keystone from watcher.common.nova import NovaClient from watcher.common import utils from watcher.tests import base @@ -40,7 +40,7 @@ class TestNovaClient(base.TestCase): self.creds = mock.MagicMock() self.session = mock.MagicMock() - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_stop_instance(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -55,7 +55,7 @@ class TestNovaClient(base.TestCase): result = nova_client.stop_instance(instance_id) self.assertEqual(result, True) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_set_host_offline(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -66,7 +66,7 @@ class TestNovaClient(base.TestCase): self.assertEqual(result, True) @mock.patch.object(time, 'sleep', mock.Mock()) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_live_migrate_instance(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -79,7 +79,7 @@ class TestNovaClient(base.TestCase): ) self.assertIsNotNone(instance) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_watcher_non_live_migrate_instance_not_found(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -93,7 +93,7 @@ class TestNovaClient(base.TestCase): self.assertEqual(is_success, False) @mock.patch.object(time, 'sleep', mock.Mock()) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_watcher_non_live_migrate_instance_volume(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -107,7 +107,7 @@ class TestNovaClient(base.TestCase): self.assertIsNotNone(instance) @mock.patch.object(time, 'sleep', mock.Mock()) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) def test_watcher_non_live_migrate_keep_image(self): nova_client = NovaClient(creds=self.creds, session=self.session) @@ -130,7 +130,7 @@ class TestNovaClient(base.TestCase): self.assertIsNotNone(instance) @mock.patch.object(time, 'sleep', mock.Mock()) - @mock.patch.object(ksclient, "Client", mock.Mock()) + @mock.patch.object(keystone, 'KeystoneClient', mock.Mock()) @mock.patch.object(nvclient, "Client", mock.Mock()) @mock.patch.object(glclient, "Client") def test_create_image_from_instance(self, m_glance_cls): diff --git a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py index 6323cbf41..331400592 100644 --- a/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py +++ b/watcher/tests/decision_engine/strategy/strategies/faker_metrics_collector.py @@ -34,8 +34,21 @@ class FakerMetricsCollector(object): result = self.get_usage_node_cpu(resource_id) elif meter_name == "cpu_util": result = self.get_average_usage_vm_cpu(resource_id) + elif meter_name == "hardware.ipmi.node.outlet_temperature": + result = self.get_average_outlet_temperature(resource_id) return result + def get_average_outlet_temperature(self, uuid): + """The average outlet temperature for host""" + mock = {} + mock['Node_0'] = 30 + # use a big value to make sure it exceeds threshold + mock['Node_1'] = 100 + if uuid not in mock.keys(): + mock[uuid] = 100 + + return mock[str(uuid)] + def get_usage_node_cpu(self, uuid): """The last VM CPU usage values to average diff --git a/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py new file mode 100644 index 000000000..775bdb580 --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_outlet_temp_control.py @@ -0,0 +1,129 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2015 Intel Corp +# +# Authors: Zhenzan Zhou +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from collections import Counter + +from mock import MagicMock + +from watcher.common import exception + +from watcher.decision_engine.actions.migration import Migrate +from watcher.decision_engine.model.model_root import ModelRoot +from watcher.decision_engine.model.resource import ResourceType +from watcher.decision_engine.strategy.strategies.outlet_temp_control import \ + OutletTempControl +from watcher.tests import base +from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \ + import FakerModelCollector +from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector\ + import FakerMetricsCollector + + +class TestOutletTempControl(base.BaseTestCase): + # fake metrics + fake_metrics = FakerMetricsCollector() + + # fake cluster + fake_cluster = FakerModelCollector() + + def test_calc_used_res(self): + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + strategy = OutletTempControl() + hypervisor = model.get_hypervisor_from_id('Node_0') + cap_cores = model.get_resource_from_id(ResourceType.cpu_cores) + cap_mem = model.get_resource_from_id(ResourceType.memory) + cap_disk = model.get_resource_from_id(ResourceType.disk) + cores_used, mem_used, disk_used = strategy.calc_used_res(model, + hypervisor, + cap_cores, + cap_mem, + cap_disk) + + self.assertEqual((cores_used, mem_used, disk_used), (10, 2, 20)) + + def test_group_hosts_by_outlet_temp(self): + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + h1, h2 = strategy.group_hosts_by_outlet_temp(model) + self.assertEqual(h1[0]['hv'].uuid, 'Node_1') + self.assertEqual(h2[0]['hv'].uuid, 'Node_0') + + def test_choose_vm_to_migrate(self): + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + h1, h2 = strategy.group_hosts_by_outlet_temp(model) + vm_to_mig = strategy.choose_vm_to_migrate(model, h1) + self.assertEqual(vm_to_mig[0].uuid, 'Node_1') + self.assertEqual(vm_to_mig[1].uuid, 'VM_1') + + def test_filter_dest_servers(self): + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + h1, h2 = strategy.group_hosts_by_outlet_temp(model) + vm_to_mig = strategy.choose_vm_to_migrate(model, h1) + dest_hosts = strategy.filter_dest_servers(model, h2, vm_to_mig[1]) + self.assertEqual(len(dest_hosts), 1) + self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_0') + + def test_exception_model(self): + strategy = OutletTempControl() + self.assertRaises(exception.ClusterStateNotDefined, strategy.execute, + None) + + def test_exception_cluster_empty(self): + strategy = OutletTempControl() + model = ModelRoot() + self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + + def test_execute_cluster_empty(self): + current_state_cluster = FakerModelCollector() + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = current_state_cluster.generate_random(0, 0) + self.assertRaises(exception.ClusterEmpty, strategy.execute, model) + + def test_execute_no_workload(self): + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + + current_state_cluster = FakerModelCollector() + model = current_state_cluster. \ + generate_scenario_4_with_1_hypervisor_no_vm() + + solution = strategy.execute(model) + self.assertEqual(solution.actions, []) + + def test_execute(self): + strategy = OutletTempControl() + strategy.ceilometer = MagicMock( + statistic_aggregation=self.fake_metrics.mock_get_statistics) + model = self.fake_cluster.generate_scenario_3_with_2_hypervisors() + solution = strategy.execute(model) + actions_counter = Counter( + [type(action) for action in solution.actions]) + + num_migrations = actions_counter.get(Migrate, 0) + self.assertEqual(num_migrations, 1)