Strategy requirements

This patch set adds /state resource to strategy API
which allows to retrieve strategy requirements.

Partially-Implements: blueprint check-strategy-requirements
Change-Id: I177b443648301eb50da0da63271ecbfd9408bd4f
This commit is contained in:
Alexander Chadin 2017-10-02 17:32:02 +03:00
parent 5ec8932182
commit 0c66fe2e65
26 changed files with 360 additions and 9 deletions

View File

@ -0,0 +1,6 @@
---
features:
- Added a way to check state of strategy before audit's execution.
Administrator can use "watcher strategy state <strategy_name>" command
to get information about metrics' availability, datasource's availability
and CDM's availability.

View File

@ -41,6 +41,7 @@ from watcher.api.controllers.v1 import utils as api_utils
from watcher.common import exception
from watcher.common import policy
from watcher.common import utils as common_utils
from watcher.decision_engine import rpcapi
from watcher import objects
@ -205,6 +206,7 @@ class StrategiesController(rest.RestController):
_custom_actions = {
'detail': ['GET'],
'state': ['GET'],
}
def _get_strategies_collection(self, filters, marker, limit, sort_key,
@ -288,6 +290,26 @@ class StrategiesController(rest.RestController):
return self._get_strategies_collection(
filters, marker, limit, sort_key, sort_dir, expand, resource_url)
@wsme_pecan.wsexpose(wtypes.text, wtypes.text)
def state(self, strategy):
"""Retrieve a inforamation about strategy requirements.
:param strategy: name of the strategy.
"""
context = pecan.request.context
policy.enforce(context, 'strategy:state', action='strategy:state')
parents = pecan.request.path.split('/')[:-1]
if parents[-2] != "strategies":
raise exception.HTTPNotFound
rpc_strategy = api_utils.get_resource('Strategy', strategy)
de_client = rpcapi.DecisionEngineAPI()
strategy_state = de_client.get_strategy_info(context,
rpc_strategy.name)
strategy_state.extend([{
'type': 'Name', 'state': rpc_strategy.name,
'mandatory': '', 'comment': ''}])
return strategy_state
@wsme_pecan.wsexpose(Strategy, wtypes.text)
def get_one(self, strategy):
"""Retrieve information about the given strategy.

View File

@ -409,6 +409,10 @@ class UnsupportedDataSource(UnsupportedError):
"by strategy %(strategy)s")
class DataSourceNotAvailable(WatcherException):
msg_fmt = _("Datasource %(datasource)s is not available.")
class NoSuchMetricForHost(WatcherException):
msg_fmt = _("No %(metric)s metric for %(host)s found.")

View File

@ -49,6 +49,17 @@ rules = [
'method': 'GET'
}
]
),
policy.DocumentedRuleDefault(
name=STRATEGY % 'state',
check_str=base.RULE_ADMIN_API,
description='Get state of strategy.',
operations=[
{
'path': '/v1/strategies{strategy_uuid}/state',
'method': 'GET'
}
]
)
]

View File

@ -57,6 +57,14 @@ class DataSourceBase(object):
),
)
@abc.abstractmethod
def list_metrics(self):
pass
@abc.abstractmethod
def check_availability(self):
pass
@abc.abstractmethod
def get_host_cpu_usage(self, resource_id, period, aggregate,
granularity=None):

View File

@ -115,6 +115,13 @@ class CeilometerHelper(base.DataSourceBase):
except Exception:
raise
def check_availability(self):
try:
self.query_retry(self.ceilometer.resources.list)
except Exception:
return 'not available'
return 'available'
def query_sample(self, meter_name, query, limit=1):
return self.query_retry(f=self.ceilometer.samples.list,
meter_name=meter_name,
@ -129,11 +136,14 @@ class CeilometerHelper(base.DataSourceBase):
period=period)
return statistics
def meter_list(self, query=None):
def list_metrics(self):
"""List the user's meters."""
meters = self.query_retry(f=self.ceilometer.meters.list,
query=query)
return meters
try:
meters = self.query_retry(f=self.ceilometer.meters.list)
except Exception:
return set()
else:
return meters
def statistic_aggregation(self,
resource_id,

View File

@ -49,7 +49,14 @@ class GnocchiHelper(base.DataSourceBase):
except Exception as e:
LOG.exception(e)
time.sleep(CONF.gnocchi_client.query_timeout)
raise
raise exception.DataSourceNotAvailable(datasource='gnocchi')
def check_availability(self):
try:
self.query_retry(self.gnocchi.status.get)
except Exception:
return 'not available'
return 'available'
def _statistic_aggregation(self,
resource_id,
@ -108,8 +115,17 @@ class GnocchiHelper(base.DataSourceBase):
# measure has structure [time, granularity, value]
return statistics[-1][2]
def statistic_aggregation(self, resource_id, metric, period, aggregation,
granularity=300):
def list_metrics(self):
"""List the user's meters."""
try:
response = self.query_retry(f=self.gnocchi.metric.list)
except Exception:
return set()
else:
return set([metric['name'] for metric in response])
def statistic_aggregation(self, resource_id, metric, period, granularity,
aggregation='mean'):
stop_time = datetime.utcnow()
start_time = stop_time - timedelta(seconds=(int(period)))
return self._statistic_aggregation(

View File

@ -65,6 +65,18 @@ class MonascaHelper(base.DataSourceBase):
return start_timestamp, end_timestamp, period
def check_availability(self):
try:
self.query_retry(self.monasca.metrics.list)
except Exception:
return 'not available'
return 'available'
def list_metrics(self):
# TODO(alexchadin): this method should be implemented in accordance to
# monasca API.
pass
def statistics_list(self, meter_name, dimensions, start_time=None,
end_time=None, period=None,):
"""List of statistics."""

View File

@ -40,6 +40,8 @@ See :doc:`../architecture` for more details on this component.
from watcher.common import service_manager
from watcher.decision_engine.messaging import audit_endpoint
from watcher.decision_engine.model.collector import manager
from watcher.decision_engine.strategy.strategies import base \
as strategy_endpoint
from watcher import conf
@ -70,7 +72,8 @@ class DecisionEngineManager(service_manager.ServiceManager):
@property
def conductor_endpoints(self):
return [audit_endpoint.AuditEndpoint]
return [audit_endpoint.AuditEndpoint,
strategy_endpoint.StrategyEndpoint]
@property
def notification_endpoints(self):

View File

@ -40,6 +40,10 @@ class DecisionEngineAPI(service.Service):
self.conductor_client.cast(
context, 'trigger_audit', audit_uuid=audit_uuid)
def get_strategy_info(self, context, strategy_name):
return self.conductor_client.call(
context, 'get_strategy_info', strategy_name=strategy_name)
class DecisionEngineAPIManager(service_manager.ServiceManager):

View File

@ -157,6 +157,9 @@ class ComputeScope(base.BaseScope):
compute_scope = []
model_hosts = list(cluster_model.get_all_compute_nodes().keys())
if not self.scope:
return cluster_model
for scope in self.scope:
compute_scope = scope.get('compute')

View File

@ -53,6 +53,69 @@ from watcher.decision_engine.solution import default
from watcher.decision_engine.strategy.common import level
class StrategyEndpoint(object):
def __init__(self, messaging):
self._messaging = messaging
def _collect_metrics(self, strategy, datasource):
metrics = []
if not datasource:
return {'type': 'Metrics', 'state': metrics,
'mandatory': False, 'comment': ''}
else:
ds_metrics = datasource.list_metrics()
if ds_metrics is None:
raise exception.DataSourceNotAvailable(
datasource=strategy.config.datasource)
else:
for metric in strategy.DATASOURCE_METRICS:
original_metric_name = datasource.METRIC_MAP.get(metric)
if original_metric_name in ds_metrics:
metrics.append({original_metric_name: 'available'})
else:
metrics.append({original_metric_name: 'not available'})
return {'type': 'Metrics', 'state': metrics,
'mandatory': False, 'comment': ''}
def _get_datasource_status(self, strategy, datasource):
if not datasource:
state = "Datasource is not presented for this strategy"
else:
state = "%s: %s" % (strategy.config.datasource,
datasource.check_availability())
return {'type': 'Datasource',
'state': state,
'mandatory': True, 'comment': ''}
def _get_cdm(self, strategy):
models = []
for model in ['compute_model', 'storage_model']:
try:
getattr(strategy, model)
except Exception:
models.append({model: 'not available'})
else:
models.append({model: 'available'})
return {'type': 'CDM', 'state': models,
'mandatory': True, 'comment': ''}
def get_strategy_info(self, context, strategy_name):
strategy = loading.DefaultStrategyLoader().load(strategy_name)
try:
is_datasources = getattr(strategy.config, 'datasources', None)
if is_datasources:
datasource = is_datasources[0]
else:
datasource = getattr(strategy, strategy.config.datasource)
except (AttributeError, IndexError):
datasource = []
available_datasource = self._get_datasource_status(strategy,
datasource)
available_metrics = self._collect_metrics(strategy, datasource)
available_cdm = self._get_cdm(strategy)
return [available_datasource, available_metrics, available_cdm]
@six.add_metaclass(abc.ABCMeta)
class BaseStrategy(loadable.Loadable):
"""A base class for all the strategies

View File

@ -52,6 +52,8 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent'
INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util'
DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage']
METRIC_NAMES = dict(
ceilometer=dict(
host_cpu_usage='compute.node.cpu.percent',

View File

@ -30,6 +30,9 @@ CONF = cfg.CONF
class NoisyNeighbor(base.NoisyNeighborBaseStrategy):
MIGRATION = "migrate"
DATASOURCE_METRICS = ['instance_l3_cache_usage']
# The meter to report L3 cache in ceilometer
METER_NAME_L3 = "cpu_l3_cache"
DEFAULT_WATCHER_PRIORITY = 5

View File

@ -77,6 +77,8 @@ class OutletTempControl(base.ThermalOptimizationBaseStrategy):
# The meter to report outlet temperature in ceilometer
MIGRATION = "migrate"
DATASOURCE_METRICS = ['host_outlet_temp']
METRIC_NAMES = dict(
ceilometer=dict(
host_outlet_temp='hardware.ipmi.node.outlet_temperature'),

View File

@ -86,6 +86,8 @@ class UniformAirflow(base.BaseStrategy):
# choose 300 seconds as the default duration of meter aggregation
PERIOD = 300
DATASOURCE_METRICS = ['host_airflow', 'host_inlet_temp', 'host_power']
METRIC_NAMES = dict(
ceilometer=dict(
# The meter to report Airflow of physical server in ceilometer

View File

@ -74,6 +74,9 @@ class VMWorkloadConsolidation(base.ServerConsolidationBaseStrategy):
HOST_CPU_USAGE_METRIC_NAME = 'compute.node.cpu.percent'
INSTANCE_CPU_USAGE_METRIC_NAME = 'cpu_util'
DATASOURCE_METRICS = ['instance_ram_allocated', 'instance_cpu_usage',
'instance_ram_usage', 'instance_root_disk_size']
METRIC_NAMES = dict(
ceilometer=dict(
cpu_util_metric='cpu_util',

View File

@ -95,6 +95,8 @@ class WorkloadBalance(base.WorkloadStabilizationBaseStrategy):
# Unit: MB
MEM_METER_NAME = "memory.resident"
DATASOURCE_METRICS = ['instance_cpu_usage', 'instance_ram_usage']
MIGRATION = "migrate"
def __init__(self, config, osc=None):

View File

@ -62,6 +62,9 @@ class WorkloadStabilization(base.WorkloadStabilizationBaseStrategy):
MIGRATION = "migrate"
MEMOIZE = _set_memoize(CONF)
DATASOURCE_METRICS = ['host_cpu_usage', 'instance_cpu_usage',
'instance_ram_usage', 'host_memory_usage']
def __init__(self, config, osc=None):
"""Workload Stabilization control using live migration

View File

@ -10,11 +10,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from oslo_serialization import jsonutils
from six.moves.urllib import parse as urlparse
from watcher.common import utils
from watcher.decision_engine import rpcapi as deapi
from watcher.tests.api import base as api_base
from watcher.tests.objects import utils as obj_utils
@ -31,6 +34,28 @@ class TestListStrategy(api_base.FunctionalTest):
for field in strategy_fields:
self.assertIn(field, strategy)
@mock.patch.object(deapi.DecisionEngineAPI, 'get_strategy_info')
def test_state(self, mock_strategy_info):
strategy = obj_utils.create_test_strategy(self.context)
mock_state = [
{"type": "Datasource", "mandatory": True, "comment": "",
"state": "gnocchi: True"},
{"type": "Metrics", "mandatory": False, "comment": "",
"state": [{"compute.node.cpu.percent": "available"},
{"cpu_util": "available"}]},
{"type": "CDM", "mandatory": True, "comment": "",
"state": [{"compute_model": "available"},
{"storage_model": "not available"}]},
{"type": "Name", "mandatory": "", "comment": "",
"state": strategy.name}
]
mock_strategy_info.return_value = mock_state
response = self.get_json('/strategies/%s/state' % strategy.uuid)
strategy_name = [requirement["state"] for requirement in response
if requirement["type"] == "Name"][0]
self.assertEqual(strategy.name, strategy_name)
def test_one(self):
strategy = obj_utils.create_test_strategy(self.context)
response = self.get_json('/strategies')
@ -234,6 +259,13 @@ class TestStrategyPolicyEnforcement(api_base.FunctionalTest):
'/strategies/detail',
expect_errors=True)
def test_policy_disallow_state(self):
strategy = obj_utils.create_test_strategy(self.context)
self._common_policy_check(
"strategy:get", self.get_json,
'/strategies/%s/state' % strategy.uuid,
expect_errors=True)
class TestStrategyEnforcementWithAdminContext(
TestListStrategy, api_base.AdminRoleTest):
@ -245,4 +277,5 @@ class TestStrategyEnforcementWithAdminContext(
"default": "rule:admin_api",
"strategy:detail": "rule:default",
"strategy:get": "rule:default",
"strategy:get_all": "rule:default"})
"strategy:get_all": "rule:default",
"strategy:state": "rule:default"})

View File

@ -198,3 +198,19 @@ class TestCeilometerHelper(base.BaseTestCase):
mock_aggregation.assert_called_once_with(
'compute1', helper.METRIC_MAP['host_power'], 600,
aggregate='mean')
def test_check_availability(self, mock_ceilometer):
ceilometer = mock.MagicMock()
ceilometer.resources.list.return_value = True
mock_ceilometer.return_value = ceilometer
helper = ceilometer_helper.CeilometerHelper()
result = helper.check_availability()
self.assertEqual('available', result)
def test_check_availability_with_failure(self, mock_ceilometer):
ceilometer = mock.MagicMock()
ceilometer.resources.list.side_effect = Exception()
mock_ceilometer.return_value = ceilometer
helper = ceilometer_helper.CeilometerHelper()
self.assertEqual('not available', helper.check_availability())

View File

@ -152,3 +152,40 @@ class TestGnocchiHelper(base.BaseTestCase):
mock_aggregation.assert_called_once_with(
'compute1', helper.METRIC_MAP['host_power'], 600, 300,
aggregation='mean')
def test_gnocchi_check_availability(self, mock_gnocchi):
gnocchi = mock.MagicMock()
gnocchi.status.get.return_value = True
mock_gnocchi.return_value = gnocchi
helper = gnocchi_helper.GnocchiHelper()
result = helper.check_availability()
self.assertEqual('available', result)
def test_gnocchi_check_availability_with_failure(self, mock_gnocchi):
cfg.CONF.set_override("query_max_retries", 1,
group='gnocchi_client')
gnocchi = mock.MagicMock()
gnocchi.status.get.side_effect = Exception()
mock_gnocchi.return_value = gnocchi
helper = gnocchi_helper.GnocchiHelper()
self.assertEqual('not available', helper.check_availability())
def test_gnocchi_list_metrics(self, mock_gnocchi):
gnocchi = mock.MagicMock()
metrics = [{"name": "metric1"}, {"name": "metric2"}]
expected_metrics = set(["metric1", "metric2"])
gnocchi.metric.list.return_value = metrics
mock_gnocchi.return_value = gnocchi
helper = gnocchi_helper.GnocchiHelper()
result = helper.list_metrics()
self.assertEqual(expected_metrics, result)
def test_gnocchi_list_metrics_with_failure(self, mock_gnocchi):
cfg.CONF.set_override("query_max_retries", 1,
group='gnocchi_client')
gnocchi = mock.MagicMock()
gnocchi.metric.list.side_effect = Exception()
mock_gnocchi.return_value = gnocchi
helper = gnocchi_helper.GnocchiHelper()
self.assertFalse(helper.list_metrics())

View File

@ -57,6 +57,21 @@ class TestMonascaHelper(base.BaseTestCase):
)
self.assertEqual(expected_result, result)
def test_check_availability(self, mock_monasca):
monasca = mock.MagicMock()
monasca.metrics.list.return_value = True
mock_monasca.return_value = monasca
helper = monasca_helper.MonascaHelper()
result = helper.check_availability()
self.assertEqual('available', result)
def test_check_availability_with_failure(self, mock_monasca):
monasca = mock.MagicMock()
monasca.metrics.list.side_effect = Exception()
mock_monasca.return_value = monasca
helper = monasca_helper.MonascaHelper()
self.assertEqual('not available', helper.check_availability())
def test_monasca_statistic_list(self, mock_monasca):
monasca = mock.MagicMock()
expected_result = [{

View File

@ -0,0 +1,64 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from watcher.decision_engine.strategy.strategies import base as strategy_base
from watcher.tests import base
class TestStrategyEndpoint(base.BaseTestCase):
def test_collect_metrics(self):
datasource = mock.MagicMock()
datasource.list_metrics.return_value = ["m1", "m2"]
datasource.METRIC_MAP = {"metric1": "m1", "metric2": "m2",
"metric3": "m3"}
strategy = mock.MagicMock()
strategy.DATASOURCE_METRICS = ["metric1", "metric2", "metric3"]
strategy.config.datasource = "gnocchi"
se = strategy_base.StrategyEndpoint(mock.MagicMock())
result = se._collect_metrics(strategy, datasource)
expected_result = {'type': 'Metrics',
'state': [{"m1": "available"},
{"m2": "available"},
{"m3": "not available"}],
'mandatory': False, 'comment': ''}
self.assertEqual(expected_result, result)
def test_get_datasource_status(self):
strategy = mock.MagicMock()
datasource = mock.MagicMock()
strategy.config.datasource = "gnocchi"
datasource.check_availability.return_value = "available"
se = strategy_base.StrategyEndpoint(mock.MagicMock())
result = se._get_datasource_status(strategy, datasource)
expected_result = {'type': 'Datasource',
'state': "gnocchi: available",
'mandatory': True, 'comment': ''}
self.assertEqual(expected_result, result)
def test_get_cdm(self):
strategy = mock.MagicMock()
strategy.compute_model = mock.MagicMock()
del strategy.storage_model
se = strategy_base.StrategyEndpoint(mock.MagicMock())
result = se._get_cdm(strategy)
expected_result = {'type': 'CDM',
'state': [{"compute_model": "available"},
{"storage_model": "not available"}],
'mandatory': True, 'comment': ''}
self.assertEqual(expected_result, result)

View File

@ -46,3 +46,9 @@ class TestDecisionEngineAPI(base.TestCase):
self.api.trigger_audit(self.context, audit_uuid)
mock_cast.assert_called_once_with(
self.context, 'trigger_audit', audit_uuid=audit_uuid)
def test_get_strategy_info(self):
with mock.patch.object(om.RPCClient, 'call') as mock_call:
self.api.get_strategy_info(self.context, "dummy")
mock_call.assert_called_once_with(
self.context, 'get_strategy_info', strategy_name="dummy")

View File

@ -54,6 +54,7 @@ policy_data = """
"strategy:detail": "",
"strategy:get": "",
"strategy:get_all": "",
"strategy:state": "",
"service:detail": "",
"service:get": "",