From e7a1e148ca93f38c779be0c0d5149adb9a37e8b3 Mon Sep 17 00:00:00 2001 From: Alexander Chadin Date: Fri, 16 Sep 2016 18:36:49 +0300 Subject: [PATCH] Add service supervisor This patch set adds supervisor mechanism for Watcher services to get ability to track states. Partially-Implements: blueprint watcher-service-list Change-Id: Iab1cefb971c79ed27b22b6a5d1bed8698e35f9a4 --- etc/watcher/policy.json | 6 +- watcher/api/controllers/v1/__init__.py | 13 + watcher/api/controllers/v1/service.py | 263 +++++++++++++++ watcher/api/controllers/v1/utils.py | 16 +- watcher/applier/manager.py | 46 ++- watcher/applier/rpcapi.py | 43 ++- watcher/common/exception.py | 8 + watcher/common/service.py | 65 +++- watcher/common/service_manager.py | 56 ++++ watcher/db/api.py | 86 +++++ watcher/db/sqlalchemy/api.py | 62 ++++ watcher/db/sqlalchemy/models.py | 15 + watcher/decision_engine/manager.py | 50 ++- watcher/decision_engine/rpcapi.py | 43 ++- watcher/objects/__init__.py | 6 +- watcher/objects/base.py | 2 +- watcher/objects/service.py | 178 ++++++++++ watcher/objects/utils.py | 34 +- watcher/tests/api/test_root.py | 3 +- watcher/tests/api/v1/test_services.py | 173 ++++++++++ watcher/tests/common/test_service.py | 37 +++ watcher/tests/db/test_service.py | 303 ++++++++++++++++++ watcher/tests/db/utils.py | 31 +- .../model/notification/fake_managers.py | 1 + watcher/tests/fake_policy.py | 6 +- watcher/tests/objects/test_objects.py | 20 +- watcher/tests/objects/test_service.py | 105 ++++++ watcher/tests/objects/utils.py | 26 ++ .../services/infra_optim/v1/json/client.py | 21 ++ .../tests/api/admin/test_service.py | 73 +++++ 30 files changed, 1714 insertions(+), 77 deletions(-) create mode 100644 watcher/api/controllers/v1/service.py create mode 100644 watcher/common/service_manager.py create mode 100644 watcher/objects/service.py create mode 100644 watcher/tests/api/v1/test_services.py create mode 100644 watcher/tests/db/test_service.py create mode 100644 watcher/tests/objects/test_service.py create mode 100644 watcher_tempest_plugin/tests/api/admin/test_service.py diff --git a/etc/watcher/policy.json b/etc/watcher/policy.json index 49139c686..5f9493164 100644 --- a/etc/watcher/policy.json +++ b/etc/watcher/policy.json @@ -37,5 +37,9 @@ "strategy:detail": "rule:default", "strategy:get": "rule:default", - "strategy:get_all": "rule:default" + "strategy:get_all": "rule:default", + + "service:detail": "rule:default", + "service:get": "rule:default", + "service:get_all": "rule:default" } diff --git a/watcher/api/controllers/v1/__init__.py b/watcher/api/controllers/v1/__init__.py index 260ce9b02..16279551e 100644 --- a/watcher/api/controllers/v1/__init__.py +++ b/watcher/api/controllers/v1/__init__.py @@ -35,6 +35,7 @@ from watcher.api.controllers.v1 import audit from watcher.api.controllers.v1 import audit_template from watcher.api.controllers.v1 import goal from watcher.api.controllers.v1 import scoring_engine +from watcher.api.controllers.v1 import service from watcher.api.controllers.v1 import strategy @@ -105,6 +106,9 @@ class V1(APIBase): scoring_engines = [link.Link] """Links to the Scoring Engines resource""" + services = [link.Link] + """Links to the services resource""" + links = [link.Link] """Links that point to a specific URL for this version and documentation""" @@ -159,6 +163,14 @@ class V1(APIBase): 'scoring_engines', '', bookmark=True) ] + + v1.services = [link.Link.make_link( + 'self', pecan.request.host_url, 'services', ''), + link.Link.make_link('bookmark', + pecan.request.host_url, + 'services', '', + bookmark=True) + ] return v1 @@ -171,6 +183,7 @@ class Controller(rest.RestController): action_plans = action_plan.ActionPlansController() goals = goal.GoalsController() scoring_engines = scoring_engine.ScoringEngineController() + services = service.ServicesController() strategies = strategy.StrategiesController() @wsme_pecan.wsexpose(V1) diff --git a/watcher/api/controllers/v1/service.py b/watcher/api/controllers/v1/service.py new file mode 100644 index 000000000..9933704ee --- /dev/null +++ b/watcher/api/controllers/v1/service.py @@ -0,0 +1,263 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Service mechanism provides ability to monitor Watcher services state. +""" + +import datetime +import six + +from oslo_config import cfg +from oslo_log import log +from oslo_utils import timeutils +import pecan +from pecan import rest +import wsme +from wsme import types as wtypes +import wsmeext.pecan as wsme_pecan + +from watcher._i18n import _LW +from watcher.api.controllers import base +from watcher.api.controllers import link +from watcher.api.controllers.v1 import collection +from watcher.api.controllers.v1 import utils as api_utils +from watcher.common import exception +from watcher.common import policy +from watcher import objects + + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class Service(base.APIBase): + """API representation of a service. + + This class enforces type checking and value constraints, and converts + between the internal object model and the API representation of a service. + """ + + _status = None + + def _get_status(self): + return self._status + + def _set_status(self, name): + service = objects.Service.get_by_name(pecan.request.context, name) + last_heartbeat = (service.last_seen_up or service.updated_at + or service.created_at) + if isinstance(last_heartbeat, six.string_types): + # NOTE(russellb) If this service came in over rpc via + # conductor, then the timestamp will be a string and needs to be + # converted back to a datetime. + last_heartbeat = timeutils.parse_strtime(last_heartbeat) + else: + # Objects have proper UTC timezones, but the timeutils comparison + # below does not (and will fail) + last_heartbeat = last_heartbeat.replace(tzinfo=None) + elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow()) + is_up = abs(elapsed) <= CONF.service_down_time + if not is_up: + LOG.warning(_LW('Seems service %(name)s on host %(host)s is down. ' + 'Last heartbeat was %(lhb)s.' + 'Elapsed time is %(el)s'), + {'name': service.name, + 'host': service.host, + 'lhb': str(last_heartbeat), 'el': str(elapsed)}) + self._status = objects.service.ServiceStatus.FAILED + else: + self._status = objects.service.ServiceStatus.ACTIVE + + id = wsme.wsattr(int, readonly=True) + """ID for this service.""" + + name = wtypes.text + """Name of the service.""" + + host = wtypes.text + """Host where service is placed on.""" + + last_seen_up = wsme.wsattr(datetime.datetime, readonly=True) + """Time when Watcher service sent latest heartbeat.""" + + status = wsme.wsproperty(wtypes.text, _get_status, _set_status, + mandatory=True) + + links = wsme.wsattr([link.Link], readonly=True) + """A list containing a self link.""" + + def __init__(self, **kwargs): + super(Service, self).__init__() + + fields = list(objects.Service.fields.keys()) + ['status'] + self.fields = [] + for field in fields: + self.fields.append(field) + setattr(self, field, kwargs.get( + field if field != 'status' else 'name', wtypes.Unset)) + + @staticmethod + def _convert_with_links(service, url, expand=True): + if not expand: + service.unset_fields_except( + ['id', 'name', 'host', 'status']) + + service.links = [ + link.Link.make_link('self', url, 'services', str(service.id)), + link.Link.make_link('bookmark', url, 'services', str(service.id), + bookmark=True)] + return service + + @classmethod + def convert_with_links(cls, service, expand=True): + service = Service(**service.as_dict()) + return cls._convert_with_links( + service, pecan.request.host_url, expand) + + @classmethod + def sample(cls, expand=True): + sample = cls(id=1, + name='watcher-applier', + host='Controller', + last_seen_up=datetime.datetime(2016, 1, 1)) + return cls._convert_with_links(sample, 'http://localhost:9322', expand) + + +class ServiceCollection(collection.Collection): + """API representation of a collection of services.""" + + services = [Service] + """A list containing services objects""" + + def __init__(self, **kwargs): + super(ServiceCollection, self).__init__() + self._type = 'services' + + @staticmethod + def convert_with_links(services, limit, url=None, expand=False, + **kwargs): + service_collection = ServiceCollection() + service_collection.services = [ + Service.convert_with_links(g, expand) for g in services] + + if 'sort_key' in kwargs: + reverse = False + if kwargs['sort_key'] == 'service': + if 'sort_dir' in kwargs: + reverse = True if kwargs['sort_dir'] == 'desc' else False + service_collection.services = sorted( + service_collection.services, + key=lambda service: service.id, + reverse=reverse) + + service_collection.next = service_collection.get_next( + limit, url=url, marker_field='id', **kwargs) + return service_collection + + @classmethod + def sample(cls): + sample = cls() + sample.services = [Service.sample(expand=False)] + return sample + + +class ServicesController(rest.RestController): + """REST controller for Services.""" + def __init__(self): + super(ServicesController, self).__init__() + + from_services = False + """A flag to indicate if the requests to this controller are coming + from the top-level resource Services.""" + + _custom_actions = { + 'detail': ['GET'], + } + + def _get_services_collection(self, marker, limit, sort_key, sort_dir, + expand=False, resource_url=None): + limit = api_utils.validate_limit(limit) + api_utils.validate_sort_dir(sort_dir) + + sort_db_key = (sort_key if sort_key in objects.Service.fields.keys() + else None) + + marker_obj = None + if marker: + marker_obj = objects.Service.get( + pecan.request.context, marker) + + services = objects.Service.list( + pecan.request.context, limit, marker_obj, + sort_key=sort_db_key, sort_dir=sort_dir) + + return ServiceCollection.convert_with_links( + services, limit, url=resource_url, expand=expand, + sort_key=sort_key, sort_dir=sort_dir) + + @wsme_pecan.wsexpose(ServiceCollection, int, int, wtypes.text, wtypes.text) + def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc'): + """Retrieve a list of services. + + :param marker: pagination marker for large data sets. + :param limit: maximum number of resources to return in a single result. + :param sort_key: column to sort results by. Default: id. + :param sort_dir: direction to sort. "asc" or "desc". Default: asc. + """ + context = pecan.request.context + policy.enforce(context, 'service:get_all', + action='service:get_all') + + return self._get_services_collection(marker, limit, sort_key, sort_dir) + + @wsme_pecan.wsexpose(ServiceCollection, int, int, wtypes.text, wtypes.text) + def detail(self, marker=None, limit=None, sort_key='id', sort_dir='asc'): + """Retrieve a list of services with detail. + + :param marker: pagination marker for large data sets. + :param limit: maximum number of resources to return in a single result. + :param sort_key: column to sort results by. Default: id. + :param sort_dir: direction to sort. "asc" or "desc". Default: asc. + """ + context = pecan.request.context + policy.enforce(context, 'service:detail', + action='service:detail') + # NOTE(lucasagomes): /detail should only work agaist collections + parent = pecan.request.path.split('/')[:-1][-1] + if parent != "services": + raise exception.HTTPNotFound + expand = True + resource_url = '/'.join(['services', 'detail']) + + return self._get_services_collection( + marker, limit, sort_key, sort_dir, expand, resource_url) + + @wsme_pecan.wsexpose(Service, wtypes.text) + def get_one(self, service): + """Retrieve information about the given service. + + :param service: ID or name of the service. + """ + if self.from_services: + raise exception.OperationNotPermitted + + context = pecan.request.context + rpc_service = api_utils.get_resource('Service', service) + policy.enforce(context, 'service:get', rpc_service, + action='service:get') + + return Service.convert_with_links(rpc_service) diff --git a/watcher/api/controllers/v1/utils.py b/watcher/api/controllers/v1/utils.py index 2eaaa4aa7..207f79043 100644 --- a/watcher/api/controllers/v1/utils.py +++ b/watcher/api/controllers/v1/utils.py @@ -20,6 +20,7 @@ import pecan import wsme from watcher._i18n import _ +from watcher.common import utils from watcher import objects CONF = cfg.CONF @@ -80,17 +81,20 @@ def as_filters_dict(**filters): return filters_dict -def get_resource(resource, resource_ident): - """Get the resource from the uuid or logical name. +def get_resource(resource, resource_id): + """Get the resource from the uuid, id or logical name. :param resource: the resource type. - :param resource_ident: the UUID or logical name of the resource. + :param resource_id: the UUID, ID or logical name of the resource. :returns: The resource. """ resource = getattr(objects, resource) - if uuidutils.is_uuid_like(resource_ident): - return resource.get_by_uuid(pecan.request.context, resource_ident) + if utils.is_int_like(resource_id): + return resource.get(pecan.request.context, int(resource_id)) - return resource.get_by_name(pecan.request.context, resource_ident) + if uuidutils.is_uuid_like(resource_id): + return resource.get_by_uuid(pecan.request.context, resource_id) + + return resource.get_by_name(pecan.request.context, resource_id) diff --git a/watcher/applier/manager.py b/watcher/applier/manager.py index f576bc320..bb3933759 100644 --- a/watcher/applier/manager.py +++ b/watcher/applier/manager.py @@ -20,6 +20,7 @@ from oslo_config import cfg from watcher.applier.messaging import trigger +from watcher.common import service_manager CONF = cfg.CONF @@ -60,17 +61,40 @@ CONF.register_group(opt_group) CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group) -class ApplierManager(object): +class ApplierManager(service_manager.ServiceManagerBase): - API_VERSION = '1.0' + @property + def service_name(self): + return 'watcher-applier' - conductor_endpoints = [trigger.TriggerActionPlan] - status_endpoints = [] - notification_endpoints = [] - notification_topics = [] + @property + def api_version(self): + return '1.0' - def __init__(self): - self.publisher_id = CONF.watcher_applier.publisher_id - self.conductor_topic = CONF.watcher_applier.conductor_topic - self.status_topic = CONF.watcher_applier.status_topic - self.api_version = self.API_VERSION + @property + def publisher_id(self): + return CONF.watcher_applier.publisher_id + + @property + def conductor_topic(self): + return CONF.watcher_applier.conductor_topic + + @property + def status_topic(self): + return CONF.watcher_applier.status_topic + + @property + def notification_topics(self): + return [] + + @property + def conductor_endpoints(self): + return [trigger.TriggerActionPlan] + + @property + def status_endpoints(self): + return [] + + @property + def notification_endpoints(self): + return [] diff --git a/watcher/applier/rpcapi.py b/watcher/applier/rpcapi.py index b97e3f85a..8ff247351 100644 --- a/watcher/applier/rpcapi.py +++ b/watcher/applier/rpcapi.py @@ -45,15 +45,38 @@ class ApplierAPI(service.Service): class ApplierAPIManager(object): - API_VERSION = '1.0' + @property + def service_name(self): + return None - conductor_endpoints = [] - status_endpoints = [] - notification_endpoints = [] - notification_topics = [] + @property + def api_version(self): + return '1.0' - def __init__(self): - self.publisher_id = CONF.watcher_applier.publisher_id - self.conductor_topic = CONF.watcher_applier.conductor_topic - self.status_topic = CONF.watcher_applier.status_topic - self.api_version = self.API_VERSION + @property + def publisher_id(self): + return CONF.watcher_applier.publisher_id + + @property + def conductor_topic(self): + return CONF.watcher_applier.conductor_topic + + @property + def status_topic(self): + return CONF.watcher_applier.status_topic + + @property + def notification_topics(self): + return [] + + @property + def conductor_endpoints(self): + return [] + + @property + def status_endpoints(self): + return [] + + @property + def notification_endpoints(self): + return [] diff --git a/watcher/common/exception.py b/watcher/common/exception.py index 54b57afea..8890684be 100644 --- a/watcher/common/exception.py +++ b/watcher/common/exception.py @@ -362,6 +362,14 @@ class NoSuchMetricForHost(WatcherException): msg_fmt = _("No %(metric)s metric for %(host)s found.") +class ServiceAlreadyExists(Conflict): + msg_fmt = _("A service with name %(name)s is already working on %(host)s.") + + +class ServiceNotFound(ResourceNotFound): + msg_fmt = _("The service %(service)s cannot be found.") + + # Model class InstanceNotFound(WatcherException): diff --git a/watcher/common/service.py b/watcher/common/service.py index b5fee73fa..20c44e5b7 100644 --- a/watcher/common/service.py +++ b/watcher/common/service.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime import logging import socket @@ -30,10 +31,13 @@ from oslo_service import wsgi from watcher._i18n import _, _LI from watcher.api import app from watcher.common import config +from watcher.common import context from watcher.common.messaging.events import event_dispatcher as dispatcher from watcher.common.messaging import messaging_handler from watcher.common import rpc +from watcher.common import scheduling from watcher.objects import base +from watcher.objects import service as service_object from watcher import opts from watcher import version @@ -48,6 +52,9 @@ service_opts = [ 'However, the node name must be valid within ' 'an AMQP key, and if using ZeroMQ, a valid ' 'hostname, FQDN, or IP address.')), + cfg.IntOpt('service_down_time', + default=90, + help=_('Maximum time since last check-in for up service.')) ] cfg.CONF.register_opts(service_opts) @@ -101,6 +108,52 @@ class WSGIService(service.ServiceBase): self.server.reset() +class ServiceHeartbeat(scheduling.BackgroundSchedulerService): + + def __init__(self, gconfig=None, service_name=None, **kwargs): + gconfig = None or {} + super(ServiceHeartbeat, self).__init__(gconfig, **kwargs) + self.service_name = service_name + self.context = context.make_context() + + def send_beat(self): + host = CONF.host + watcher_list = service_object.Service.list( + self.context, filters={'name': self.service_name, + 'host': host}) + if watcher_list: + watcher_service = watcher_list[0] + watcher_service.last_seen_up = datetime.datetime.utcnow() + watcher_service.save() + else: + watcher_service = service_object.Service(self.context) + watcher_service.name = self.service_name + watcher_service.host = host + watcher_service.create() + + def add_heartbeat_job(self): + self.add_job(self.send_beat, 'interval', seconds=60, + next_run_time=datetime.datetime.now()) + + def start(self): + """Start service.""" + self.add_heartbeat_job() + super(ServiceHeartbeat, self).start() + + def stop(self): + """Stop service.""" + self.shutdown() + + def wait(self): + """Wait for service to complete.""" + + def reset(self): + """Reset service. + + Called in case service running in daemon mode receives SIGHUP. + """ + + class Service(service.ServiceBase, dispatcher.EventDispatcher): API_VERSION = '1.0' @@ -110,7 +163,7 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.manager = manager_class() self.publisher_id = self.manager.publisher_id - self.api_version = self.manager.API_VERSION + self.api_version = self.manager.api_version self.conductor_topic = self.manager.conductor_topic self.status_topic = self.manager.status_topic @@ -136,6 +189,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.status_topic_handler = None self.notification_handler = None + self.heartbeat = None + if self.conductor_topic and self.conductor_endpoints: self.conductor_topic_handler = self.build_topic_handler( self.conductor_topic, self.conductor_endpoints) @@ -146,6 +201,10 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.notification_handler = self.build_notification_handler( self.notification_topics, self.notification_endpoints ) + self.service_name = self.manager.service_name + if self.service_name: + self.heartbeat = ServiceHeartbeat( + service_name=self.manager.service_name) @property def transport(self): @@ -211,6 +270,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.status_topic_handler.start() if self.notification_handler: self.notification_handler.start() + if self.heartbeat: + self.heartbeat.start() def stop(self): LOG.debug("Disconnecting from '%s' (%s)", @@ -221,6 +282,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher): self.status_topic_handler.stop() if self.notification_handler: self.notification_handler.stop() + if self.heartbeat: + self.heartbeat.stop() def reset(self): """Reset a service in case it received a SIGHUP.""" diff --git a/watcher/common/service_manager.py b/watcher/common/service_manager.py new file mode 100644 index 000000000..0d0a710a9 --- /dev/null +++ b/watcher/common/service_manager.py @@ -0,0 +1,56 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2016 Servionica +## +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + + +class ServiceManagerBase(object): + + @abc.abstractproperty + def service_name(self): + raise NotImplementedError() + + @abc.abstractproperty + def api_version(self): + raise NotImplementedError() + + @abc.abstractproperty + def publisher_id(self): + raise NotImplementedError() + + @abc.abstractproperty + def conductor_topic(self): + raise NotImplementedError() + + @abc.abstractproperty + def status_topic(self): + raise NotImplementedError() + + @abc.abstractproperty + def notification_topics(self): + raise NotImplementedError() + + @abc.abstractproperty + def conductor_endpoints(self): + raise NotImplementedError() + + @abc.abstractproperty + def status_endpoints(self): + raise NotImplementedError() + + @abc.abstractproperty + def notification_endpoints(self): + raise NotImplementedError() diff --git a/watcher/db/api.py b/watcher/db/api.py index 4d8caf4ca..2d7f843c5 100644 --- a/watcher/db/api.py +++ b/watcher/db/api.py @@ -715,3 +715,89 @@ class BaseConnection(object): :raises: :py:class:`~.ScoringEngineNotFound` :raises: :py:class:`~.Invalid` """ + + @abc.abstractmethod + def get_service_list(self, context, filters=None, limit=None, + marker=None, sort_key=None, sort_dir=None): + """Get specific columns for matching services. + + Return a list of the specified columns for all services that + match the specified filters. + + :param context: The security context + :param filters: Filters to apply. Defaults to None. + + :param limit: Maximum number of services to return. + :param marker: The last item of the previous page; we return the next + result set. + :param sort_key: Attribute by which results should be sorted. + :param sort_dir: Direction in which results should be sorted. + (asc, desc) + :returns: A list of tuples of the specified columns. + """ + + @abc.abstractmethod + def create_service(self, values): + """Create a new service. + + :param values: A dict containing items used to identify + and track the service. For example: + + :: + + { + 'id': 1, + 'name': 'watcher-api', + 'status': 'ACTIVE', + 'host': 'controller' + } + :returns: A service + :raises: :py:class:`~.ServiceAlreadyExists` + """ + + @abc.abstractmethod + def get_service_by_id(self, context, service_id): + """Return a service given its ID. + + :param context: The security context + :param service_id: The ID of a service + :returns: A service + :raises: :py:class:`~.ServiceNotFound` + """ + + @abc.abstractmethod + def get_service_by_name(self, context, service_name): + """Return a service given its name. + + :param context: The security context + :param service_name: The name of a service + :returns: A service + :raises: :py:class:`~.ServiceNotFound` + """ + + @abc.abstractmethod + def destroy_service(self, service_id): + """Destroy a service. + + :param service_id: The ID of a service + :raises: :py:class:`~.ServiceNotFound` + """ + + @abc.abstractmethod + def update_service(self, service_id, values): + """Update properties of a service. + + :param service_id: The ID of a service + :returns: A service + :raises: :py:class:`~.ServiceyNotFound` + :raises: :py:class:`~.Invalid` + """ + + @abc.abstractmethod + def soft_delete_service(self, service_id): + """Soft delete a service. + + :param service_id: The id of a service. + :returns: A service. + :raises: :py:class:`~.ServiceNotFound` + """ diff --git a/watcher/db/sqlalchemy/api.py b/watcher/db/sqlalchemy/api.py index 394726bd3..26caef3c6 100644 --- a/watcher/db/sqlalchemy/api.py +++ b/watcher/db/sqlalchemy/api.py @@ -1065,3 +1065,65 @@ class Connection(api.BaseConnection): except exception.ResourceNotFound: raise exception.ScoringEngineNotFound( scoring_engine=scoring_engine_id) + + # ### SERVICES ### # + + def _add_services_filters(self, query, filters): + if not filters: + filters = {} + + plain_fields = ['id', 'name', 'host'] + + return self._add_filters( + query=query, model=models.Service, filters=filters, + plain_fields=plain_fields) + + def get_service_list(self, context, filters=None, limit=None, + marker=None, sort_key=None, sort_dir=None): + query = model_query(models.Service) + query = self._add_services_filters(query, filters) + if not context.show_deleted: + query = query.filter_by(deleted_at=None) + return _paginate_query(models.Service, limit, marker, + sort_key, sort_dir, query) + + def create_service(self, values): + service = models.Service() + service.update(values) + try: + service.save() + except db_exc.DBDuplicateEntry: + raise exception.ServiceAlreadyExists(name=values['name'], + host=values['host']) + return service + + def _get_service(self, context, fieldname, value): + try: + return self._get(context, model=models.Service, + fieldname=fieldname, value=value) + except exception.ResourceNotFound: + raise exception.ServiceNotFound(service=value) + + def get_service_by_id(self, context, service_id): + return self._get_service(context, fieldname="id", value=service_id) + + def get_service_by_name(self, context, service_name): + return self._get_service(context, fieldname="name", value=service_name) + + def destroy_service(self, service_id): + try: + return self._destroy(models.Service, service_id) + except exception.ResourceNotFound: + raise exception.ServiceNotFound(service=service_id) + + def update_service(self, service_id, values): + try: + return self._update(models.Service, service_id, values) + except exception.ResourceNotFound: + raise exception.ServiceNotFound(service=service_id) + + def soft_delete_service(self, service_id): + try: + self._soft_delete(models.Service, service_id) + except exception.ResourceNotFound: + raise exception.ServiceNotFound(service=service_id) diff --git a/watcher/db/sqlalchemy/models.py b/watcher/db/sqlalchemy/models.py index 6752b5206..53dab4ba4 100644 --- a/watcher/db/sqlalchemy/models.py +++ b/watcher/db/sqlalchemy/models.py @@ -255,3 +255,18 @@ class ScoringEngine(Base): # The format might vary between different models (e.g. be JSON, XML or # even some custom format), the blob type should cover all scenarios. metainfo = Column(Text, nullable=True) + + +class Service(Base): + """Represents a service entity""" + + __tablename__ = 'services' + __table_args__ = ( + UniqueConstraint('host', 'name', 'deleted', + name="uniq_services0host0name0deleted"), + table_args() + ) + id = Column(Integer, primary_key=True) + name = Column(String(255), nullable=False) + host = Column(String(255), nullable=False) + last_seen_up = Column(DateTime, nullable=True) diff --git a/watcher/decision_engine/manager.py b/watcher/decision_engine/manager.py index 774bd2b34..4ff871117 100644 --- a/watcher/decision_engine/manager.py +++ b/watcher/decision_engine/manager.py @@ -38,6 +38,7 @@ See :doc:`../architecture` for more details on this component. from oslo_config import cfg +from watcher.common import service_manager from watcher.decision_engine.messaging import audit_endpoint from watcher.decision_engine.model.collector import manager @@ -78,23 +79,44 @@ CONF.register_group(decision_engine_opt_group) CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group) -class DecisionEngineManager(object): +class DecisionEngineManager(service_manager.ServiceManagerBase): - API_VERSION = '1.0' + @property + def service_name(self): + return 'watcher-decision-engine' - def __init__(self): - self.api_version = self.API_VERSION + @property + def api_version(self): + return '1.0' - self.publisher_id = CONF.watcher_decision_engine.publisher_id - self.conductor_topic = CONF.watcher_decision_engine.conductor_topic - self.status_topic = CONF.watcher_decision_engine.status_topic - self.notification_topics = ( - CONF.watcher_decision_engine.notification_topics) + @property + def publisher_id(self): + return CONF.watcher_decision_engine.publisher_id - self.conductor_endpoints = [audit_endpoint.AuditEndpoint] + @property + def conductor_topic(self): + return CONF.watcher_decision_engine.conductor_topic - self.status_endpoints = [] + @property + def status_topic(self): + return CONF.watcher_decision_engine.status_topic - self.collector_manager = manager.CollectorManager() - self.notification_endpoints = ( - self.collector_manager.get_notification_endpoints()) + @property + def notification_topics(self): + return CONF.watcher_decision_engine.notification_topics + + @property + def conductor_endpoints(self): + return [audit_endpoint.AuditEndpoint] + + @property + def status_endpoints(self): + return [] + + @property + def notification_endpoints(self): + return self.collector_manager.get_notification_endpoints() + + @property + def collector_manager(self): + return manager.CollectorManager() diff --git a/watcher/decision_engine/rpcapi.py b/watcher/decision_engine/rpcapi.py index 6da9a03f5..ecab6634f 100644 --- a/watcher/decision_engine/rpcapi.py +++ b/watcher/decision_engine/rpcapi.py @@ -48,15 +48,38 @@ class DecisionEngineAPI(service.Service): class DecisionEngineAPIManager(object): - API_VERSION = '1.0' + @property + def service_name(self): + return None - conductor_endpoints = [] - status_endpoints = [notification_handler.NotificationHandler] - notification_endpoints = [] - notification_topics = [] + @property + def api_version(self): + return '1.0' - def __init__(self): - self.publisher_id = CONF.watcher_decision_engine.publisher_id - self.conductor_topic = CONF.watcher_decision_engine.conductor_topic - self.status_topic = CONF.watcher_decision_engine.status_topic - self.api_version = self.API_VERSION + @property + def publisher_id(self): + return CONF.watcher_decision_engine.publisher_id + + @property + def conductor_topic(self): + return CONF.watcher_decision_engine.conductor_topic + + @property + def status_topic(self): + return CONF.watcher_decision_engine.status_topic + + @property + def notification_topics(self): + return [] + + @property + def conductor_endpoints(self): + return [] + + @property + def status_endpoints(self): + return [notification_handler.NotificationHandler] + + @property + def notification_endpoints(self): + return [] diff --git a/watcher/objects/__init__.py b/watcher/objects/__init__.py index c56e87afd..3c56f8efa 100644 --- a/watcher/objects/__init__.py +++ b/watcher/objects/__init__.py @@ -20,6 +20,7 @@ from watcher.objects import audit_template from watcher.objects import efficacy_indicator from watcher.objects import goal from watcher.objects import scoring_engine +from watcher.objects import service from watcher.objects import strategy Audit = audit.Audit @@ -30,6 +31,7 @@ Goal = goal.Goal ScoringEngine = scoring_engine.ScoringEngine Strategy = strategy.Strategy EfficacyIndicator = efficacy_indicator.EfficacyIndicator +Service = service.Service -__all__ = ("Audit", "AuditTemplate", "Action", "ActionPlan", - "Goal", "ScoringEngine", "Strategy", "EfficacyIndicator") +__all__ = ("Audit", "AuditTemplate", "Action", "ActionPlan", "Goal", + "ScoringEngine", "Strategy", "EfficacyIndicator", "Service") diff --git a/watcher/objects/base.py b/watcher/objects/base.py index 41fd1caf8..67f1c73d8 100644 --- a/watcher/objects/base.py +++ b/watcher/objects/base.py @@ -311,7 +311,7 @@ class WatcherObject(object): """Returns a dict of changed fields and their new values.""" changes = {} for key in self.obj_what_changed(): - changes[key] = self[key] + changes[key] = self._attr_to_primitive(key) return changes def obj_what_changed(self): diff --git a/watcher/objects/service.py b/watcher/objects/service.py new file mode 100644 index 000000000..c5c4ee9d2 --- /dev/null +++ b/watcher/objects/service.py @@ -0,0 +1,178 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from watcher.common import exception +from watcher.common import utils +from watcher.db import api as dbapi +from watcher.objects import base +from watcher.objects import utils as obj_utils + + +class ServiceStatus(object): + ACTIVE = 'ACTIVE' + FAILED = 'FAILED' + + +class Service(base.WatcherObject): + + dbapi = dbapi.get_instance() + + fields = { + 'id': int, + 'name': obj_utils.str_or_none, + 'host': obj_utils.str_or_none, + 'last_seen_up': obj_utils.datetime_or_str_or_none + } + + @staticmethod + def _from_db_object(service, db_service): + """Converts a database entity to a formal object.""" + for field in service.fields: + service[field] = db_service[field] + + service.obj_reset_changes() + return service + + @staticmethod + def _from_db_object_list(db_objects, cls, context): + """Converts a list of database entities to a list of formal objects.""" + return [Service._from_db_object(cls(context), obj) + for obj in db_objects] + + @classmethod + def get(cls, context, service_id): + """Find a service based on its id + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + :param service_id: the id of a service. + :returns: a :class:`Service` object. + """ + if utils.is_int_like(service_id): + db_service = cls.dbapi.get_service_by_id(context, service_id) + service = Service._from_db_object(cls(context), db_service) + return service + else: + raise exception.InvalidIdentity(identity=service_id) + + @classmethod + def get_by_name(cls, context, name): + """Find a service based on name + + :param name: the name of a service. + :param context: Security context + :returns: a :class:`Service` object. + """ + + db_service = cls.dbapi.get_service_by_name(context, name) + service = cls._from_db_object(cls(context), db_service) + return service + + @classmethod + def list(cls, context, limit=None, marker=None, filters=None, + sort_key=None, sort_dir=None): + """Return a list of :class:`Service` objects. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + :param filters: dict mapping the filter key to a value. + :param limit: maximum number of resources to return in a single result. + :param marker: pagination marker for large data sets. + :param sort_key: column to sort results by. + :param sort_dir: direction to sort. "asc" or "desc". + :returns: a list of :class:`Service` object. + """ + db_services = cls.dbapi.get_service_list( + context, + filters=filters, + limit=limit, + marker=marker, + sort_key=sort_key, + sort_dir=sort_dir) + return Service._from_db_object_list(db_services, cls, context) + + def create(self, context=None): + """Create a :class:`Service` record in the DB. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + """ + + values = self.obj_get_changes() + db_service = self.dbapi.create_service(values) + self._from_db_object(self, db_service) + + def save(self, context=None): + """Save updates to this :class:`Service`. + + Updates will be made column by column based on the result + of self.what_changed(). + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + """ + updates = self.obj_get_changes() + self.dbapi.update_service(self.id, updates) + + self.obj_reset_changes() + + def refresh(self, context=None): + """Loads updates for this :class:`Service`. + + Loads a service with the same id from the database and + checks for updated attributes. Updates are applied from + the loaded service column by column, if there are any updates. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + """ + current = self.__class__.get(self._context, service_id=self.id) + for field in self.fields: + if (hasattr(self, base.get_attrname(field)) and + self[field] != current[field]): + self[field] = current[field] + + def soft_delete(self, context=None): + """Soft Delete the :class:`Service` from the DB. + + :param context: Security context. NOTE: This should only + be used internally by the indirection_api. + Unfortunately, RPC requires context as the first + argument, even though we don't use it. + A context should be set when instantiating the + object, e.g.: Service(context) + """ + self.dbapi.soft_delete_service(self.id) diff --git a/watcher/objects/utils.py b/watcher/objects/utils.py index 5a5dd57ee..06814478b 100644 --- a/watcher/objects/utils.py +++ b/watcher/objects/utils.py @@ -25,25 +25,33 @@ import six from watcher._i18n import _ -def datetime_or_none(dt): +def datetime_or_none(value, tzinfo_aware=False): """Validate a datetime or None value.""" - if dt is None: + if value is None: return None - elif isinstance(dt, datetime.datetime): - if dt.utcoffset() is None: - # NOTE(danms): Legacy objects from sqlalchemy are stored in UTC, - # but are returned without a timezone attached. - # As a transitional aid, assume a tz-naive object is in UTC. - return dt.replace(tzinfo=iso8601.iso8601.Utc()) - else: - return dt - raise ValueError(_("A datetime.datetime is required here")) + if isinstance(value, six.string_types): + # NOTE(danms): Being tolerant of isotime strings here will help us + # during our objects transition + value = timeutils.parse_isotime(value) + elif not isinstance(value, datetime.datetime): + raise ValueError( + _("A datetime.datetime is required here. Got %s"), value) + + if value.utcoffset() is None and tzinfo_aware: + # NOTE(danms): Legacy objects from sqlalchemy are stored in UTC, + # but are returned without a timezone attached. + # As a transitional aid, assume a tz-naive object is in UTC. + value = value.replace(tzinfo=iso8601.iso8601.Utc()) + elif not tzinfo_aware: + value = value.replace(tzinfo=None) + + return value -def datetime_or_str_or_none(val): +def datetime_or_str_or_none(val, tzinfo_aware=False): if isinstance(val, six.string_types): return timeutils.parse_isotime(val) - return datetime_or_none(val) + return datetime_or_none(val, tzinfo_aware=tzinfo_aware) def numeric_or_none(val): diff --git a/watcher/tests/api/test_root.py b/watcher/tests/api/test_root.py index f67f857f1..7d93c30b1 100644 --- a/watcher/tests/api/test_root.py +++ b/watcher/tests/api/test_root.py @@ -37,7 +37,8 @@ class TestV1Root(base.FunctionalTest): not_resources = ('id', 'links', 'media_types') actual_resources = tuple(set(data.keys()) - set(not_resources)) expected_resources = ('audit_templates', 'audits', 'actions', - 'action_plans', 'scoring_engines') + 'action_plans', 'scoring_engines', + 'services') self.assertEqual(sorted(expected_resources), sorted(actual_resources)) self.assertIn({'type': 'application/vnd.openstack.watcher.v1+json', diff --git a/watcher/tests/api/v1/test_services.py b/watcher/tests/api/v1/test_services.py new file mode 100644 index 000000000..fa5fd9159 --- /dev/null +++ b/watcher/tests/api/v1/test_services.py @@ -0,0 +1,173 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_serialization import jsonutils +from six.moves.urllib import parse as urlparse + +from watcher.tests.api import base as api_base +from watcher.tests.objects import utils as obj_utils + + +class TestListService(api_base.FunctionalTest): + + def _assert_service_fields(self, service): + service_fields = ['id', 'name', 'host', 'status'] + for field in service_fields: + self.assertIn(field, service) + + def test_one(self): + service = obj_utils.create_test_service(self.context) + response = self.get_json('/services') + self.assertEqual(service.id, response['services'][0]["id"]) + self._assert_service_fields(response['services'][0]) + + def test_get_one_by_id(self): + service = obj_utils.create_test_service(self.context) + response = self.get_json('/services/%s' % service.id) + self.assertEqual(service.id, response["id"]) + self.assertEqual(service.name, response["name"]) + self._assert_service_fields(response) + + def test_get_one_by_name(self): + service = obj_utils.create_test_service(self.context) + response = self.get_json(urlparse.quote( + '/services/%s' % service['name'])) + self.assertEqual(service.id, response['id']) + self._assert_service_fields(response) + + def test_get_one_soft_deleted(self): + service = obj_utils.create_test_service(self.context) + service.soft_delete() + response = self.get_json( + '/services/%s' % service['id'], + headers={'X-Show-Deleted': 'True'}) + self.assertEqual(service.id, response['id']) + self._assert_service_fields(response) + + response = self.get_json( + '/services/%s' % service['id'], + expect_errors=True) + self.assertEqual(404, response.status_int) + + def test_detail(self): + service = obj_utils.create_test_service(self.context) + response = self.get_json('/services/detail') + self.assertEqual(service.id, response['services'][0]["id"]) + self._assert_service_fields(response['services'][0]) + for service in response['services']: + self.assertTrue( + all(val is not None for key, val in service.items() + if key in ['id', 'name', 'host', 'status']) + ) + + def test_detail_against_single(self): + service = obj_utils.create_test_service(self.context) + response = self.get_json('/services/%s/detail' % service.id, + expect_errors=True) + self.assertEqual(404, response.status_int) + + def test_many(self): + service_list = [] + for idx in range(1, 6): + service = obj_utils.create_test_service( + self.context, id=idx, host='CONTROLLER', + name='SERVICE_{0}'.format(idx)) + service_list.append(service.id) + response = self.get_json('/services') + self.assertEqual(5, len(response['services'])) + for service in response['services']: + self.assertTrue( + all(val is not None for key, val in service.items() + if key in ['id', 'name', 'host', 'status'])) + + def test_many_without_soft_deleted(self): + service_list = [] + for id_ in [1, 2, 3]: + service = obj_utils.create_test_service( + self.context, id=id_, host='CONTROLLER', + name='SERVICE_{0}'.format(id_)) + service_list.append(service.id) + for id_ in [4, 5]: + service = obj_utils.create_test_service( + self.context, id=id_, host='CONTROLLER', + name='SERVICE_{0}'.format(id_)) + service.soft_delete() + response = self.get_json('/services') + self.assertEqual(3, len(response['services'])) + ids = [s['id'] for s in response['services']] + self.assertEqual(sorted(service_list), sorted(ids)) + + def test_services_collection_links(self): + for idx in range(1, 6): + obj_utils.create_test_service( + self.context, id=idx, + host='CONTROLLER', + name='SERVICE_{0}'.format(idx)) + response = self.get_json('/services/?limit=2') + self.assertEqual(2, len(response['services'])) + + def test_services_collection_links_default_limit(self): + for idx in range(1, 6): + obj_utils.create_test_service( + self.context, id=idx, + host='CONTROLLER', + name='SERVICE_{0}'.format(idx)) + cfg.CONF.set_override('max_limit', 3, 'api', enforce_type=True) + response = self.get_json('/services') + self.assertEqual(3, len(response['services'])) + + +class TestServicePolicyEnforcement(api_base.FunctionalTest): + + def _common_policy_check(self, rule, func, *arg, **kwarg): + self.policy.set_rules({ + "admin_api": "(role:admin or role:administrator)", + "default": "rule:admin_api", + rule: "rule:default"}) + response = func(*arg, **kwarg) + self.assertEqual(403, response.status_int) + self.assertEqual('application/json', response.content_type) + self.assertTrue( + "Policy doesn't allow %s to be performed." % rule, + jsonutils.loads(response.json['error_message'])['faultstring']) + + def test_policy_disallow_get_all(self): + self._common_policy_check( + "service:get_all", self.get_json, '/services', + expect_errors=True) + + def test_policy_disallow_get_one(self): + service = obj_utils.create_test_service(self.context) + self._common_policy_check( + "service:get", self.get_json, + '/services/%s' % service.id, + expect_errors=True) + + def test_policy_disallow_detail(self): + self._common_policy_check( + "service:detail", self.get_json, + '/services/detail', + expect_errors=True) + + +class TestServiceEnforcementWithAdminContext(TestListService, + api_base.AdminRoleTest): + + def setUp(self): + super(TestServiceEnforcementWithAdminContext, self).setUp() + self.policy.set_rules({ + "admin_api": "(role:admin or role:administrator)", + "default": "rule:admin_api", + "service:detail": "rule:default", + "service:get": "rule:default", + "service:get_all": "rule:default"}) diff --git a/watcher/tests/common/test_service.py b/watcher/tests/common/test_service.py index 9e6dbf81e..80b9bed74 100644 --- a/watcher/tests/common/test_service.py +++ b/watcher/tests/common/test_service.py @@ -17,11 +17,16 @@ import mock +from oslo_config import cfg + from watcher.common.messaging import messaging_handler from watcher.common import rpc from watcher.common import service +from watcher import objects from watcher.tests import base +CONF = cfg.CONF + class DummyManager(object): @@ -37,6 +42,38 @@ class DummyManager(object): self.status_topic = "status_topic" self.notification_topics = [] self.api_version = self.API_VERSION + self.service_name = None + + +class TestServiceHeartbeat(base.TestCase): + + def setUp(self): + super(TestServiceHeartbeat, self).setUp() + + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(objects.Service, 'create') + def test_send_beat_with_creating_service(self, mock_create, + mock_list): + CONF.set_default('host', 'fake-fqdn') + service_heartbeat = service.ServiceHeartbeat( + service_name='watcher-service') + mock_list.return_value = [] + service_heartbeat.send_beat() + mock_list.assert_called_once_with(mock.ANY, + filters={'name': 'watcher-service', + 'host': 'fake-fqdn'}) + self.assertEqual(1, mock_create.call_count) + + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(objects.Service, 'save') + def test_send_beat_without_creating_service(self, mock_save, mock_list): + service_heartbeat = service.ServiceHeartbeat( + service_name='watcher-service') + mock_list.return_value = [objects.Service(mock.Mock(), + name='watcher-service', + host='controller')] + service_heartbeat.send_beat() + self.assertEqual(1, mock_save.call_count) class TestService(base.TestCase): diff --git a/watcher/tests/db/test_service.py b/watcher/tests/db/test_service.py new file mode 100644 index 000000000..3803a13cb --- /dev/null +++ b/watcher/tests/db/test_service.py @@ -0,0 +1,303 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Tests for manipulating Service via the DB API""" + +import freezegun +import six + +from oslo_utils import timeutils + +from watcher.common import exception +from watcher.tests.db import base +from watcher.tests.db import utils + + +class TestDbServiceFilters(base.DbTestCase): + + FAKE_OLDER_DATE = '2014-01-01T09:52:05.219414' + FAKE_OLD_DATE = '2015-01-01T09:52:05.219414' + FAKE_TODAY = '2016-02-24T09:52:05.219414' + + def setUp(self): + super(TestDbServiceFilters, self).setUp() + self.context.show_deleted = True + self._data_setup() + + def _data_setup(self): + service1_name = "SERVICE_ID_1" + service2_name = "SERVICE_ID_2" + service3_name = "SERVICE_ID_3" + + with freezegun.freeze_time(self.FAKE_TODAY): + self.service1 = utils.create_test_service( + id=1, name=service1_name, host="controller", + last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05")) + with freezegun.freeze_time(self.FAKE_OLD_DATE): + self.service2 = utils.create_test_service( + id=2, name=service2_name, host="controller", + last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05")) + with freezegun.freeze_time(self.FAKE_OLDER_DATE): + self.service3 = utils.create_test_service( + id=3, name=service3_name, host="controller", + last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05")) + + def _soft_delete_services(self): + with freezegun.freeze_time(self.FAKE_TODAY): + self.dbapi.soft_delete_service(self.service1.id) + with freezegun.freeze_time(self.FAKE_OLD_DATE): + self.dbapi.soft_delete_service(self.service2.id) + with freezegun.freeze_time(self.FAKE_OLDER_DATE): + self.dbapi.soft_delete_service(self.service3.id) + + def _update_services(self): + with freezegun.freeze_time(self.FAKE_TODAY): + self.dbapi.update_service( + self.service1.id, values={"host": "controller1"}) + with freezegun.freeze_time(self.FAKE_OLD_DATE): + self.dbapi.update_service( + self.service2.id, values={"host": "controller2"}) + with freezegun.freeze_time(self.FAKE_OLDER_DATE): + self.dbapi.update_service( + self.service3.id, values={"host": "controller3"}) + + def test_get_service_list_filter_deleted_true(self): + with freezegun.freeze_time(self.FAKE_TODAY): + self.dbapi.soft_delete_service(self.service1.id) + + res = self.dbapi.get_service_list( + self.context, filters={'deleted': True}) + + self.assertEqual([self.service1['name']], [r.name for r in res]) + + def test_get_service_list_filter_deleted_false(self): + with freezegun.freeze_time(self.FAKE_TODAY): + self.dbapi.soft_delete_service(self.service1.id) + + res = self.dbapi.get_service_list( + self.context, filters={'deleted': False}) + + self.assertEqual( + set([self.service2['name'], self.service3['name']]), + set([r.name for r in res])) + + def test_get_service_list_filter_deleted_at_eq(self): + self._soft_delete_services() + + res = self.dbapi.get_service_list( + self.context, filters={'deleted_at__eq': self.FAKE_TODAY}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_deleted_at_lt(self): + self._soft_delete_services() + + res = self.dbapi.get_service_list( + self.context, filters={'deleted_at__lt': self.FAKE_TODAY}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_deleted_at_lte(self): + self._soft_delete_services() + + res = self.dbapi.get_service_list( + self.context, filters={'deleted_at__lte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_deleted_at_gt(self): + self._soft_delete_services() + + res = self.dbapi.get_service_list( + self.context, filters={'deleted_at__gt': self.FAKE_OLD_DATE}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_deleted_at_gte(self): + self._soft_delete_services() + + res = self.dbapi.get_service_list( + self.context, filters={'deleted_at__gte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service1['id'], self.service2['id']]), + set([r.id for r in res])) + + # created_at # + + def test_get_service_list_filter_created_at_eq(self): + res = self.dbapi.get_service_list( + self.context, filters={'created_at__eq': self.FAKE_TODAY}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_created_at_lt(self): + res = self.dbapi.get_service_list( + self.context, filters={'created_at__lt': self.FAKE_TODAY}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_created_at_lte(self): + res = self.dbapi.get_service_list( + self.context, filters={'created_at__lte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_created_at_gt(self): + res = self.dbapi.get_service_list( + self.context, filters={'created_at__gt': self.FAKE_OLD_DATE}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_created_at_gte(self): + res = self.dbapi.get_service_list( + self.context, filters={'created_at__gte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service1['id'], self.service2['id']]), + set([r.id for r in res])) + + # updated_at # + + def test_get_service_list_filter_updated_at_eq(self): + self._update_services() + + res = self.dbapi.get_service_list( + self.context, filters={'updated_at__eq': self.FAKE_TODAY}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_updated_at_lt(self): + self._update_services() + + res = self.dbapi.get_service_list( + self.context, filters={'updated_at__lt': self.FAKE_TODAY}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_updated_at_lte(self): + self._update_services() + + res = self.dbapi.get_service_list( + self.context, filters={'updated_at__lte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service2['id'], self.service3['id']]), + set([r.id for r in res])) + + def test_get_service_list_filter_updated_at_gt(self): + self._update_services() + + res = self.dbapi.get_service_list( + self.context, filters={'updated_at__gt': self.FAKE_OLD_DATE}) + + self.assertEqual([self.service1['id']], [r.id for r in res]) + + def test_get_service_list_filter_updated_at_gte(self): + self._update_services() + + res = self.dbapi.get_service_list( + self.context, filters={'updated_at__gte': self.FAKE_OLD_DATE}) + + self.assertEqual( + set([self.service1['id'], self.service2['id']]), + set([r.id for r in res])) + + +class DbServiceTestCase(base.DbTestCase): + + def _create_test_service(self, **kwargs): + service = utils.get_test_service(**kwargs) + self.dbapi.create_service(service) + return service + + def test_get_service_list(self): + ids = [] + for i in range(1, 6): + service = utils.create_test_service( + id=i, + name="SERVICE_ID_%s" % i, + host="controller_{0}".format(i)) + ids.append(six.text_type(service['id'])) + res = self.dbapi.get_service_list(self.context) + res_ids = [r.id for r in res] + self.assertEqual(ids.sort(), res_ids.sort()) + + def test_get_service_list_with_filters(self): + service1 = self._create_test_service( + id=1, + name="SERVICE_ID_1", + host="controller_1", + ) + service2 = self._create_test_service( + id=2, + name="SERVICE_ID_2", + host="controller_2", + ) + + res = self.dbapi.get_service_list( + self.context, filters={'host': 'controller_1'}) + self.assertEqual([service1['id']], [r.id for r in res]) + + res = self.dbapi.get_service_list( + self.context, filters={'host': 'controller_3'}) + self.assertEqual([], [r.id for r in res]) + + res = self.dbapi.get_service_list( + self.context, + filters={'host': 'controller_2'}) + self.assertEqual([service2['id']], [r.id for r in res]) + + def test_get_service_by_name(self): + created_service = self._create_test_service() + service = self.dbapi.get_service_by_name( + self.context, created_service['name']) + self.assertEqual(service.name, created_service['name']) + + def test_get_service_that_does_not_exist(self): + self.assertRaises(exception.ServiceNotFound, + self.dbapi.get_service_by_id, + self.context, 404) + + def test_update_service(self): + service = self._create_test_service() + res = self.dbapi.update_service( + service['id'], {'host': 'controller_test'}) + self.assertEqual('controller_test', res.host) + + def test_update_service_that_does_not_exist(self): + self.assertRaises(exception.ServiceNotFound, + self.dbapi.update_service, + 405, + {'name': ''}) + + def test_create_service_already_exists(self): + service_id = "STRATEGY_ID" + self._create_test_service(name=service_id) + self.assertRaises(exception.ServiceAlreadyExists, + self._create_test_service, + name=service_id) diff --git a/watcher/tests/db/utils.py b/watcher/tests/db/utils.py index f78c4cee5..160a5c471 100644 --- a/watcher/tests/db/utils.py +++ b/watcher/tests/db/utils.py @@ -12,7 +12,9 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Magnum test utilities.""" +"""Watcher test utilities.""" + +from oslo_utils import timeutils from watcher.db import api as db_api @@ -212,6 +214,33 @@ def get_test_strategy(**kwargs): } +def get_test_service(**kwargs): + return { + 'id': kwargs.get('id', 1), + 'name': kwargs.get('name', 'watcher-service'), + 'host': kwargs.get('host', 'controller'), + 'last_seen_up': kwargs.get( + 'last_seen_up', + timeutils.parse_isotime('2016-09-22T08:32:06').replace(tzinfo=None) + ), + 'created_at': kwargs.get('created_at'), + 'updated_at': kwargs.get('updated_at'), + 'deleted_at': kwargs.get('deleted_at'), + } + + +def create_test_service(**kwargs): + """Create test service entry in DB and return Service DB object. + + Function to be used to create test Service objects in the database. + :param kwargs: kwargs with overriding values for service's attributes. + :returns: Test Service DB object. + """ + service = get_test_service(**kwargs) + dbapi = db_api.get_instance() + return dbapi.create_service(service) + + def create_test_strategy(**kwargs): """Create test strategy entry in DB and return Strategy DB object. diff --git a/watcher/tests/decision_engine/model/notification/fake_managers.py b/watcher/tests/decision_engine/model/notification/fake_managers.py index 1054ef617..94cce46a2 100644 --- a/watcher/tests/decision_engine/model/notification/fake_managers.py +++ b/watcher/tests/decision_engine/model/notification/fake_managers.py @@ -26,6 +26,7 @@ class FakeManager(object): def __init__(self): self.api_version = self.API_VERSION + self.service_name = None # fake cluster instead on Nova CDM self.fake_cdmc = faker_cluster_state.FakerModelCollector() diff --git a/watcher/tests/fake_policy.py b/watcher/tests/fake_policy.py index 413ffbf31..bed907cd4 100644 --- a/watcher/tests/fake_policy.py +++ b/watcher/tests/fake_policy.py @@ -53,7 +53,11 @@ policy_data = """ "strategy:detail": "", "strategy:get": "", - "strategy:get_all": "" + "strategy:get_all": "", + + "service:detail": "", + "service:get": "", + "service:get_all": "" } """ diff --git a/watcher/tests/objects/test_objects.py b/watcher/tests/objects/test_objects.py index faa449674..4e7be9e76 100644 --- a/watcher/tests/objects/test_objects.py +++ b/watcher/tests/objects/test_objects.py @@ -118,19 +118,29 @@ class TestUtils(test_base.TestCase): def test_datetime_or_none(self): naive_dt = datetime.datetime.now() dt = timeutils.parse_isotime(timeutils.isotime(naive_dt)) - self.assertEqual(dt, utils.datetime_or_none(dt)) + self.assertEqual(dt, utils.datetime_or_none(dt, tzinfo_aware=True)) self.assertEqual(naive_dt.replace(tzinfo=iso8601.iso8601.Utc(), microsecond=0), - utils.datetime_or_none(dt)) + utils.datetime_or_none(dt, tzinfo_aware=True)) + self.assertIsNone(utils.datetime_or_none(None)) + self.assertRaises(ValueError, utils.datetime_or_none, 'foo') + + def test_datetime_or_none_tzinfo_naive(self): + naive_dt = datetime.datetime.utcnow() + self.assertEqual(naive_dt, utils.datetime_or_none(naive_dt, + tzinfo_aware=False)) self.assertIsNone(utils.datetime_or_none(None)) self.assertRaises(ValueError, utils.datetime_or_none, 'foo') def test_datetime_or_str_or_none(self): dts = timeutils.isotime() dt = timeutils.parse_isotime(dts) - self.assertEqual(dt, utils.datetime_or_str_or_none(dt)) - self.assertIsNone(utils.datetime_or_str_or_none(None)) - self.assertEqual(dt, utils.datetime_or_str_or_none(dts)) + self.assertEqual(dt, utils.datetime_or_str_or_none(dt, + tzinfo_aware=True)) + self.assertIsNone(utils.datetime_or_str_or_none(None, + tzinfo_aware=True)) + self.assertEqual(dt, utils.datetime_or_str_or_none(dts, + tzinfo_aware=True)) self.assertRaises(ValueError, utils.datetime_or_str_or_none, 'foo') def test_int_or_none(self): diff --git a/watcher/tests/objects/test_service.py b/watcher/tests/objects/test_service.py new file mode 100644 index 000000000..5e5b6d354 --- /dev/null +++ b/watcher/tests/objects/test_service.py @@ -0,0 +1,105 @@ +# Copyright 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from testtools import matchers + +from watcher import objects +from watcher.tests.db import base +from watcher.tests.db import utils + + +class TestServiceObject(base.DbTestCase): + + def setUp(self): + super(TestServiceObject, self).setUp() + self.fake_service = utils.get_test_service() + + def test_get_by_id(self): + service_id = self.fake_service['id'] + with mock.patch.object(self.dbapi, 'get_service_by_id', + autospec=True) as mock_get_service: + mock_get_service.return_value = self.fake_service + service = objects.Service.get(self.context, service_id) + mock_get_service.assert_called_once_with(self.context, + service_id) + self.assertEqual(self.context, service._context) + + def test_list(self): + with mock.patch.object(self.dbapi, 'get_service_list', + autospec=True) as mock_get_list: + mock_get_list.return_value = [self.fake_service] + services = objects.Service.list(self.context) + self.assertEqual(1, mock_get_list.call_count, 1) + self.assertThat(services, matchers.HasLength(1)) + self.assertIsInstance(services[0], objects.Service) + self.assertEqual(self.context, services[0]._context) + + def test_create(self): + with mock.patch.object(self.dbapi, 'create_service', + autospec=True) as mock_create_service: + mock_create_service.return_value = self.fake_service + service = objects.Service(self.context, **self.fake_service) + + fake_service = utils.get_test_service() + + service.create() + mock_create_service.assert_called_once_with(fake_service) + self.assertEqual(self.context, service._context) + + def test_save(self): + _id = self.fake_service['id'] + with mock.patch.object(self.dbapi, 'get_service_by_id', + autospec=True) as mock_get_service: + mock_get_service.return_value = self.fake_service + with mock.patch.object(self.dbapi, 'update_service', + autospec=True) as mock_update_service: + service = objects.Service.get(self.context, _id) + service.name = 'UPDATED NAME' + service.save() + + mock_get_service.assert_called_once_with(self.context, _id) + mock_update_service.assert_called_once_with( + _id, {'name': 'UPDATED NAME'}) + self.assertEqual(self.context, service._context) + + def test_refresh(self): + _id = self.fake_service['id'] + returns = [dict(self.fake_service, name="first name"), + dict(self.fake_service, name="second name")] + expected = [mock.call(self.context, _id), + mock.call(self.context, _id)] + with mock.patch.object(self.dbapi, 'get_service_by_id', + side_effect=returns, + autospec=True) as mock_get_service: + service = objects.Service.get(self.context, _id) + self.assertEqual("first name", service.name) + service.refresh() + self.assertEqual("second name", service.name) + self.assertEqual(expected, mock_get_service.call_args_list) + self.assertEqual(self.context, service._context) + + def test_soft_delete(self): + _id = self.fake_service['id'] + with mock.patch.object(self.dbapi, 'get_service_by_id', + autospec=True) as mock_get_service: + mock_get_service.return_value = self.fake_service + with mock.patch.object(self.dbapi, 'soft_delete_service', + autospec=True) as mock_soft_delete: + service = objects.Service.get(self.context, _id) + service.soft_delete() + mock_get_service.assert_called_once_with(self.context, _id) + mock_soft_delete.assert_called_once_with(_id) + self.assertEqual(self.context, service._context) diff --git a/watcher/tests/objects/utils.py b/watcher/tests/objects/utils.py index 9143e5e12..873f3745f 100644 --- a/watcher/tests/objects/utils.py +++ b/watcher/tests/objects/utils.py @@ -178,6 +178,32 @@ def create_test_scoring_engine(context, **kw): return scoring_engine +def get_test_service(context, **kw): + """Return a Service object with appropriate attributes. + + NOTE: The object leaves the attributes marked as changed, such + that a create() could be used to commit it to the DB. + """ + db_service = db_utils.get_test_service(**kw) + service = objects.Service(context) + for key in db_service: + if key == 'last_seen_up': + db_service[key] = None + setattr(service, key, db_service[key]) + return service + + +def create_test_service(context, **kw): + """Create and return a test service object. + + Create a service in the DB and return a Service object with + appropriate attributes. + """ + service = get_test_service(context, **kw) + service.create() + return service + + def get_test_strategy(context, **kw): """Return a Strategy object with appropriate attributes. diff --git a/watcher_tempest_plugin/services/infra_optim/v1/json/client.py b/watcher_tempest_plugin/services/infra_optim/v1/json/client.py index 5bc37f17d..be3446839 100644 --- a/watcher_tempest_plugin/services/infra_optim/v1/json/client.py +++ b/watcher_tempest_plugin/services/infra_optim/v1/json/client.py @@ -279,3 +279,24 @@ class InfraOptimClientJSON(base.BaseInfraOptimClient): :return: Serialized strategy as a dictionary """ return self._show_request('/strategies', strategy) + + # ### SERVICES ### # + + @base.handle_errors + def list_services(self, **kwargs): + """List all existing services""" + return self._list_request('/services', **kwargs) + + @base.handle_errors + def list_services_detail(self, **kwargs): + """Lists details of all existing services""" + return self._list_request('/services/detail', **kwargs) + + @base.handle_errors + def show_service(self, service): + """Gets a specific service + + :param service: Name of the strategy + :return: Serialized strategy as a dictionary + """ + return self._show_request('/services', service) diff --git a/watcher_tempest_plugin/tests/api/admin/test_service.py b/watcher_tempest_plugin/tests/api/admin/test_service.py new file mode 100644 index 000000000..4339c60c8 --- /dev/null +++ b/watcher_tempest_plugin/tests/api/admin/test_service.py @@ -0,0 +1,73 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2016 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import unicode_literals + +from tempest import test + +from watcher_tempest_plugin.tests.api.admin import base + + +class TestShowListService(base.BaseInfraOptimTest): + """Tests for services""" + + DECISION_ENGINE = "watcher-decision-engine" + APPLIER = "watcher-applier" + + @classmethod + def resource_setup(cls): + super(TestShowListService, cls).resource_setup() + + def assert_expected(self, expected, actual, + keys=('created_at', 'updated_at', 'deleted_at')): + super(TestShowListService, self).assert_expected( + expected, actual, keys) + + @test.attr(type='smoke') + def test_show_service(self): + _, service = self.client.show_service(self.DECISION_ENGINE) + + self.assertEqual(self.DECISION_ENGINE, service['name']) + self.assertIn("host", service.keys()) + self.assertIn("last_seen_up", service.keys()) + self.assertIn("status", service.keys()) + + @test.attr(type='smoke') + def test_show_service_with_links(self): + _, service = self.client.show_service(self.DECISION_ENGINE) + self.assertIn('links', service.keys()) + self.assertEqual(2, len(service['links'])) + self.assertIn(str(service['id']), + service['links'][0]['href']) + + @test.attr(type="smoke") + def test_list_services(self): + _, body = self.client.list_services() + self.assertIn('services', body) + services = body['services'] + self.assertIn(self.DECISION_ENGINE, + [i['name'] for i in body['services']]) + + for service in services: + self.assertTrue( + all(val is not None for key, val in service.items() + if key in ['id', 'name', 'host', 'status', + 'last_seen_up'])) + + # Verify self links. + for service in body['services']: + self.validate_self_link('services', service['id'], + service['links'][0]['href'])