Add service supervisor

This patch set adds supervisor mechanism for Watcher services
to get ability to track states.

Partially-Implements: blueprint watcher-service-list
Change-Id: Iab1cefb971c79ed27b22b6a5d1bed8698e35f9a4
This commit is contained in:
Alexander Chadin 2016-09-16 18:36:49 +03:00
parent 6cf796ca87
commit e7a1e148ca
30 changed files with 1714 additions and 77 deletions

View File

@ -37,5 +37,9 @@
"strategy:detail": "rule:default",
"strategy:get": "rule:default",
"strategy:get_all": "rule:default"
"strategy:get_all": "rule:default",
"service:detail": "rule:default",
"service:get": "rule:default",
"service:get_all": "rule:default"
}

View File

@ -35,6 +35,7 @@ from watcher.api.controllers.v1 import audit
from watcher.api.controllers.v1 import audit_template
from watcher.api.controllers.v1 import goal
from watcher.api.controllers.v1 import scoring_engine
from watcher.api.controllers.v1 import service
from watcher.api.controllers.v1 import strategy
@ -105,6 +106,9 @@ class V1(APIBase):
scoring_engines = [link.Link]
"""Links to the Scoring Engines resource"""
services = [link.Link]
"""Links to the services resource"""
links = [link.Link]
"""Links that point to a specific URL for this version and documentation"""
@ -159,6 +163,14 @@ class V1(APIBase):
'scoring_engines', '',
bookmark=True)
]
v1.services = [link.Link.make_link(
'self', pecan.request.host_url, 'services', ''),
link.Link.make_link('bookmark',
pecan.request.host_url,
'services', '',
bookmark=True)
]
return v1
@ -171,6 +183,7 @@ class Controller(rest.RestController):
action_plans = action_plan.ActionPlansController()
goals = goal.GoalsController()
scoring_engines = scoring_engine.ScoringEngineController()
services = service.ServicesController()
strategies = strategy.StrategiesController()
@wsme_pecan.wsexpose(V1)

View File

@ -0,0 +1,263 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Service mechanism provides ability to monitor Watcher services state.
"""
import datetime
import six
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import pecan
from pecan import rest
import wsme
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from watcher._i18n import _LW
from watcher.api.controllers import base
from watcher.api.controllers import link
from watcher.api.controllers.v1 import collection
from watcher.api.controllers.v1 import utils as api_utils
from watcher.common import exception
from watcher.common import policy
from watcher import objects
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class Service(base.APIBase):
"""API representation of a service.
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of a service.
"""
_status = None
def _get_status(self):
return self._status
def _set_status(self, name):
service = objects.Service.get_by_name(pecan.request.context, name)
last_heartbeat = (service.last_seen_up or service.updated_at
or service.created_at)
if isinstance(last_heartbeat, six.string_types):
# NOTE(russellb) If this service came in over rpc via
# conductor, then the timestamp will be a string and needs to be
# converted back to a datetime.
last_heartbeat = timeutils.parse_strtime(last_heartbeat)
else:
# Objects have proper UTC timezones, but the timeutils comparison
# below does not (and will fail)
last_heartbeat = last_heartbeat.replace(tzinfo=None)
elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow())
is_up = abs(elapsed) <= CONF.service_down_time
if not is_up:
LOG.warning(_LW('Seems service %(name)s on host %(host)s is down. '
'Last heartbeat was %(lhb)s.'
'Elapsed time is %(el)s'),
{'name': service.name,
'host': service.host,
'lhb': str(last_heartbeat), 'el': str(elapsed)})
self._status = objects.service.ServiceStatus.FAILED
else:
self._status = objects.service.ServiceStatus.ACTIVE
id = wsme.wsattr(int, readonly=True)
"""ID for this service."""
name = wtypes.text
"""Name of the service."""
host = wtypes.text
"""Host where service is placed on."""
last_seen_up = wsme.wsattr(datetime.datetime, readonly=True)
"""Time when Watcher service sent latest heartbeat."""
status = wsme.wsproperty(wtypes.text, _get_status, _set_status,
mandatory=True)
links = wsme.wsattr([link.Link], readonly=True)
"""A list containing a self link."""
def __init__(self, **kwargs):
super(Service, self).__init__()
fields = list(objects.Service.fields.keys()) + ['status']
self.fields = []
for field in fields:
self.fields.append(field)
setattr(self, field, kwargs.get(
field if field != 'status' else 'name', wtypes.Unset))
@staticmethod
def _convert_with_links(service, url, expand=True):
if not expand:
service.unset_fields_except(
['id', 'name', 'host', 'status'])
service.links = [
link.Link.make_link('self', url, 'services', str(service.id)),
link.Link.make_link('bookmark', url, 'services', str(service.id),
bookmark=True)]
return service
@classmethod
def convert_with_links(cls, service, expand=True):
service = Service(**service.as_dict())
return cls._convert_with_links(
service, pecan.request.host_url, expand)
@classmethod
def sample(cls, expand=True):
sample = cls(id=1,
name='watcher-applier',
host='Controller',
last_seen_up=datetime.datetime(2016, 1, 1))
return cls._convert_with_links(sample, 'http://localhost:9322', expand)
class ServiceCollection(collection.Collection):
"""API representation of a collection of services."""
services = [Service]
"""A list containing services objects"""
def __init__(self, **kwargs):
super(ServiceCollection, self).__init__()
self._type = 'services'
@staticmethod
def convert_with_links(services, limit, url=None, expand=False,
**kwargs):
service_collection = ServiceCollection()
service_collection.services = [
Service.convert_with_links(g, expand) for g in services]
if 'sort_key' in kwargs:
reverse = False
if kwargs['sort_key'] == 'service':
if 'sort_dir' in kwargs:
reverse = True if kwargs['sort_dir'] == 'desc' else False
service_collection.services = sorted(
service_collection.services,
key=lambda service: service.id,
reverse=reverse)
service_collection.next = service_collection.get_next(
limit, url=url, marker_field='id', **kwargs)
return service_collection
@classmethod
def sample(cls):
sample = cls()
sample.services = [Service.sample(expand=False)]
return sample
class ServicesController(rest.RestController):
"""REST controller for Services."""
def __init__(self):
super(ServicesController, self).__init__()
from_services = False
"""A flag to indicate if the requests to this controller are coming
from the top-level resource Services."""
_custom_actions = {
'detail': ['GET'],
}
def _get_services_collection(self, marker, limit, sort_key, sort_dir,
expand=False, resource_url=None):
limit = api_utils.validate_limit(limit)
api_utils.validate_sort_dir(sort_dir)
sort_db_key = (sort_key if sort_key in objects.Service.fields.keys()
else None)
marker_obj = None
if marker:
marker_obj = objects.Service.get(
pecan.request.context, marker)
services = objects.Service.list(
pecan.request.context, limit, marker_obj,
sort_key=sort_db_key, sort_dir=sort_dir)
return ServiceCollection.convert_with_links(
services, limit, url=resource_url, expand=expand,
sort_key=sort_key, sort_dir=sort_dir)
@wsme_pecan.wsexpose(ServiceCollection, int, int, wtypes.text, wtypes.text)
def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc'):
"""Retrieve a list of services.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
context = pecan.request.context
policy.enforce(context, 'service:get_all',
action='service:get_all')
return self._get_services_collection(marker, limit, sort_key, sort_dir)
@wsme_pecan.wsexpose(ServiceCollection, int, int, wtypes.text, wtypes.text)
def detail(self, marker=None, limit=None, sort_key='id', sort_dir='asc'):
"""Retrieve a list of services with detail.
:param marker: pagination marker for large data sets.
:param limit: maximum number of resources to return in a single result.
:param sort_key: column to sort results by. Default: id.
:param sort_dir: direction to sort. "asc" or "desc". Default: asc.
"""
context = pecan.request.context
policy.enforce(context, 'service:detail',
action='service:detail')
# NOTE(lucasagomes): /detail should only work agaist collections
parent = pecan.request.path.split('/')[:-1][-1]
if parent != "services":
raise exception.HTTPNotFound
expand = True
resource_url = '/'.join(['services', 'detail'])
return self._get_services_collection(
marker, limit, sort_key, sort_dir, expand, resource_url)
@wsme_pecan.wsexpose(Service, wtypes.text)
def get_one(self, service):
"""Retrieve information about the given service.
:param service: ID or name of the service.
"""
if self.from_services:
raise exception.OperationNotPermitted
context = pecan.request.context
rpc_service = api_utils.get_resource('Service', service)
policy.enforce(context, 'service:get', rpc_service,
action='service:get')
return Service.convert_with_links(rpc_service)

View File

@ -20,6 +20,7 @@ import pecan
import wsme
from watcher._i18n import _
from watcher.common import utils
from watcher import objects
CONF = cfg.CONF
@ -80,17 +81,20 @@ def as_filters_dict(**filters):
return filters_dict
def get_resource(resource, resource_ident):
"""Get the resource from the uuid or logical name.
def get_resource(resource, resource_id):
"""Get the resource from the uuid, id or logical name.
:param resource: the resource type.
:param resource_ident: the UUID or logical name of the resource.
:param resource_id: the UUID, ID or logical name of the resource.
:returns: The resource.
"""
resource = getattr(objects, resource)
if uuidutils.is_uuid_like(resource_ident):
return resource.get_by_uuid(pecan.request.context, resource_ident)
if utils.is_int_like(resource_id):
return resource.get(pecan.request.context, int(resource_id))
return resource.get_by_name(pecan.request.context, resource_ident)
if uuidutils.is_uuid_like(resource_id):
return resource.get_by_uuid(pecan.request.context, resource_id)
return resource.get_by_name(pecan.request.context, resource_id)

View File

@ -20,6 +20,7 @@
from oslo_config import cfg
from watcher.applier.messaging import trigger
from watcher.common import service_manager
CONF = cfg.CONF
@ -60,17 +61,40 @@ CONF.register_group(opt_group)
CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group)
class ApplierManager(object):
class ApplierManager(service_manager.ServiceManagerBase):
API_VERSION = '1.0'
@property
def service_name(self):
return 'watcher-applier'
conductor_endpoints = [trigger.TriggerActionPlan]
status_endpoints = []
notification_endpoints = []
notification_topics = []
@property
def api_version(self):
return '1.0'
def __init__(self):
self.publisher_id = CONF.watcher_applier.publisher_id
self.conductor_topic = CONF.watcher_applier.conductor_topic
self.status_topic = CONF.watcher_applier.status_topic
self.api_version = self.API_VERSION
@property
def publisher_id(self):
return CONF.watcher_applier.publisher_id
@property
def conductor_topic(self):
return CONF.watcher_applier.conductor_topic
@property
def status_topic(self):
return CONF.watcher_applier.status_topic
@property
def notification_topics(self):
return []
@property
def conductor_endpoints(self):
return [trigger.TriggerActionPlan]
@property
def status_endpoints(self):
return []
@property
def notification_endpoints(self):
return []

View File

@ -45,15 +45,38 @@ class ApplierAPI(service.Service):
class ApplierAPIManager(object):
API_VERSION = '1.0'
@property
def service_name(self):
return None
conductor_endpoints = []
status_endpoints = []
notification_endpoints = []
notification_topics = []
@property
def api_version(self):
return '1.0'
def __init__(self):
self.publisher_id = CONF.watcher_applier.publisher_id
self.conductor_topic = CONF.watcher_applier.conductor_topic
self.status_topic = CONF.watcher_applier.status_topic
self.api_version = self.API_VERSION
@property
def publisher_id(self):
return CONF.watcher_applier.publisher_id
@property
def conductor_topic(self):
return CONF.watcher_applier.conductor_topic
@property
def status_topic(self):
return CONF.watcher_applier.status_topic
@property
def notification_topics(self):
return []
@property
def conductor_endpoints(self):
return []
@property
def status_endpoints(self):
return []
@property
def notification_endpoints(self):
return []

View File

@ -362,6 +362,14 @@ class NoSuchMetricForHost(WatcherException):
msg_fmt = _("No %(metric)s metric for %(host)s found.")
class ServiceAlreadyExists(Conflict):
msg_fmt = _("A service with name %(name)s is already working on %(host)s.")
class ServiceNotFound(ResourceNotFound):
msg_fmt = _("The service %(service)s cannot be found.")
# Model
class InstanceNotFound(WatcherException):

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import logging
import socket
@ -30,10 +31,13 @@ from oslo_service import wsgi
from watcher._i18n import _, _LI
from watcher.api import app
from watcher.common import config
from watcher.common import context
from watcher.common.messaging.events import event_dispatcher as dispatcher
from watcher.common.messaging import messaging_handler
from watcher.common import rpc
from watcher.common import scheduling
from watcher.objects import base
from watcher.objects import service as service_object
from watcher import opts
from watcher import version
@ -48,6 +52,9 @@ service_opts = [
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')),
cfg.IntOpt('service_down_time',
default=90,
help=_('Maximum time since last check-in for up service.'))
]
cfg.CONF.register_opts(service_opts)
@ -101,6 +108,52 @@ class WSGIService(service.ServiceBase):
self.server.reset()
class ServiceHeartbeat(scheduling.BackgroundSchedulerService):
def __init__(self, gconfig=None, service_name=None, **kwargs):
gconfig = None or {}
super(ServiceHeartbeat, self).__init__(gconfig, **kwargs)
self.service_name = service_name
self.context = context.make_context()
def send_beat(self):
host = CONF.host
watcher_list = service_object.Service.list(
self.context, filters={'name': self.service_name,
'host': host})
if watcher_list:
watcher_service = watcher_list[0]
watcher_service.last_seen_up = datetime.datetime.utcnow()
watcher_service.save()
else:
watcher_service = service_object.Service(self.context)
watcher_service.name = self.service_name
watcher_service.host = host
watcher_service.create()
def add_heartbeat_job(self):
self.add_job(self.send_beat, 'interval', seconds=60,
next_run_time=datetime.datetime.now())
def start(self):
"""Start service."""
self.add_heartbeat_job()
super(ServiceHeartbeat, self).start()
def stop(self):
"""Stop service."""
self.shutdown()
def wait(self):
"""Wait for service to complete."""
def reset(self):
"""Reset service.
Called in case service running in daemon mode receives SIGHUP.
"""
class Service(service.ServiceBase, dispatcher.EventDispatcher):
API_VERSION = '1.0'
@ -110,7 +163,7 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.manager = manager_class()
self.publisher_id = self.manager.publisher_id
self.api_version = self.manager.API_VERSION
self.api_version = self.manager.api_version
self.conductor_topic = self.manager.conductor_topic
self.status_topic = self.manager.status_topic
@ -136,6 +189,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.status_topic_handler = None
self.notification_handler = None
self.heartbeat = None
if self.conductor_topic and self.conductor_endpoints:
self.conductor_topic_handler = self.build_topic_handler(
self.conductor_topic, self.conductor_endpoints)
@ -146,6 +201,10 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.notification_handler = self.build_notification_handler(
self.notification_topics, self.notification_endpoints
)
self.service_name = self.manager.service_name
if self.service_name:
self.heartbeat = ServiceHeartbeat(
service_name=self.manager.service_name)
@property
def transport(self):
@ -211,6 +270,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.status_topic_handler.start()
if self.notification_handler:
self.notification_handler.start()
if self.heartbeat:
self.heartbeat.start()
def stop(self):
LOG.debug("Disconnecting from '%s' (%s)",
@ -221,6 +282,8 @@ class Service(service.ServiceBase, dispatcher.EventDispatcher):
self.status_topic_handler.stop()
if self.notification_handler:
self.notification_handler.stop()
if self.heartbeat:
self.heartbeat.stop()
def reset(self):
"""Reset a service in case it received a SIGHUP."""

View File

@ -0,0 +1,56 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2016 Servionica
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
class ServiceManagerBase(object):
@abc.abstractproperty
def service_name(self):
raise NotImplementedError()
@abc.abstractproperty
def api_version(self):
raise NotImplementedError()
@abc.abstractproperty
def publisher_id(self):
raise NotImplementedError()
@abc.abstractproperty
def conductor_topic(self):
raise NotImplementedError()
@abc.abstractproperty
def status_topic(self):
raise NotImplementedError()
@abc.abstractproperty
def notification_topics(self):
raise NotImplementedError()
@abc.abstractproperty
def conductor_endpoints(self):
raise NotImplementedError()
@abc.abstractproperty
def status_endpoints(self):
raise NotImplementedError()
@abc.abstractproperty
def notification_endpoints(self):
raise NotImplementedError()

View File

@ -715,3 +715,89 @@ class BaseConnection(object):
:raises: :py:class:`~.ScoringEngineNotFound`
:raises: :py:class:`~.Invalid`
"""
@abc.abstractmethod
def get_service_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
"""Get specific columns for matching services.
Return a list of the specified columns for all services that
match the specified filters.
:param context: The security context
:param filters: Filters to apply. Defaults to None.
:param limit: Maximum number of services to return.
:param marker: The last item of the previous page; we return the next
result set.
:param sort_key: Attribute by which results should be sorted.
:param sort_dir: Direction in which results should be sorted.
(asc, desc)
:returns: A list of tuples of the specified columns.
"""
@abc.abstractmethod
def create_service(self, values):
"""Create a new service.
:param values: A dict containing items used to identify
and track the service. For example:
::
{
'id': 1,
'name': 'watcher-api',
'status': 'ACTIVE',
'host': 'controller'
}
:returns: A service
:raises: :py:class:`~.ServiceAlreadyExists`
"""
@abc.abstractmethod
def get_service_by_id(self, context, service_id):
"""Return a service given its ID.
:param context: The security context
:param service_id: The ID of a service
:returns: A service
:raises: :py:class:`~.ServiceNotFound`
"""
@abc.abstractmethod
def get_service_by_name(self, context, service_name):
"""Return a service given its name.
:param context: The security context
:param service_name: The name of a service
:returns: A service
:raises: :py:class:`~.ServiceNotFound`
"""
@abc.abstractmethod
def destroy_service(self, service_id):
"""Destroy a service.
:param service_id: The ID of a service
:raises: :py:class:`~.ServiceNotFound`
"""
@abc.abstractmethod
def update_service(self, service_id, values):
"""Update properties of a service.
:param service_id: The ID of a service
:returns: A service
:raises: :py:class:`~.ServiceyNotFound`
:raises: :py:class:`~.Invalid`
"""
@abc.abstractmethod
def soft_delete_service(self, service_id):
"""Soft delete a service.
:param service_id: The id of a service.
:returns: A service.
:raises: :py:class:`~.ServiceNotFound`
"""

View File

@ -1065,3 +1065,65 @@ class Connection(api.BaseConnection):
except exception.ResourceNotFound:
raise exception.ScoringEngineNotFound(
scoring_engine=scoring_engine_id)
# ### SERVICES ### #
def _add_services_filters(self, query, filters):
if not filters:
filters = {}
plain_fields = ['id', 'name', 'host']
return self._add_filters(
query=query, model=models.Service, filters=filters,
plain_fields=plain_fields)
def get_service_list(self, context, filters=None, limit=None,
marker=None, sort_key=None, sort_dir=None):
query = model_query(models.Service)
query = self._add_services_filters(query, filters)
if not context.show_deleted:
query = query.filter_by(deleted_at=None)
return _paginate_query(models.Service, limit, marker,
sort_key, sort_dir, query)
def create_service(self, values):
service = models.Service()
service.update(values)
try:
service.save()
except db_exc.DBDuplicateEntry:
raise exception.ServiceAlreadyExists(name=values['name'],
host=values['host'])
return service
def _get_service(self, context, fieldname, value):
try:
return self._get(context, model=models.Service,
fieldname=fieldname, value=value)
except exception.ResourceNotFound:
raise exception.ServiceNotFound(service=value)
def get_service_by_id(self, context, service_id):
return self._get_service(context, fieldname="id", value=service_id)
def get_service_by_name(self, context, service_name):
return self._get_service(context, fieldname="name", value=service_name)
def destroy_service(self, service_id):
try:
return self._destroy(models.Service, service_id)
except exception.ResourceNotFound:
raise exception.ServiceNotFound(service=service_id)
def update_service(self, service_id, values):
try:
return self._update(models.Service, service_id, values)
except exception.ResourceNotFound:
raise exception.ServiceNotFound(service=service_id)
def soft_delete_service(self, service_id):
try:
self._soft_delete(models.Service, service_id)
except exception.ResourceNotFound:
raise exception.ServiceNotFound(service=service_id)

View File

@ -255,3 +255,18 @@ class ScoringEngine(Base):
# The format might vary between different models (e.g. be JSON, XML or
# even some custom format), the blob type should cover all scenarios.
metainfo = Column(Text, nullable=True)
class Service(Base):
"""Represents a service entity"""
__tablename__ = 'services'
__table_args__ = (
UniqueConstraint('host', 'name', 'deleted',
name="uniq_services0host0name0deleted"),
table_args()
)
id = Column(Integer, primary_key=True)
name = Column(String(255), nullable=False)
host = Column(String(255), nullable=False)
last_seen_up = Column(DateTime, nullable=True)

View File

@ -38,6 +38,7 @@ See :doc:`../architecture` for more details on this component.
from oslo_config import cfg
from watcher.common import service_manager
from watcher.decision_engine.messaging import audit_endpoint
from watcher.decision_engine.model.collector import manager
@ -78,23 +79,44 @@ CONF.register_group(decision_engine_opt_group)
CONF.register_opts(WATCHER_DECISION_ENGINE_OPTS, decision_engine_opt_group)
class DecisionEngineManager(object):
class DecisionEngineManager(service_manager.ServiceManagerBase):
API_VERSION = '1.0'
@property
def service_name(self):
return 'watcher-decision-engine'
def __init__(self):
self.api_version = self.API_VERSION
@property
def api_version(self):
return '1.0'
self.publisher_id = CONF.watcher_decision_engine.publisher_id
self.conductor_topic = CONF.watcher_decision_engine.conductor_topic
self.status_topic = CONF.watcher_decision_engine.status_topic
self.notification_topics = (
CONF.watcher_decision_engine.notification_topics)
@property
def publisher_id(self):
return CONF.watcher_decision_engine.publisher_id
self.conductor_endpoints = [audit_endpoint.AuditEndpoint]
@property
def conductor_topic(self):
return CONF.watcher_decision_engine.conductor_topic
self.status_endpoints = []
@property
def status_topic(self):
return CONF.watcher_decision_engine.status_topic
self.collector_manager = manager.CollectorManager()
self.notification_endpoints = (
self.collector_manager.get_notification_endpoints())
@property
def notification_topics(self):
return CONF.watcher_decision_engine.notification_topics
@property
def conductor_endpoints(self):
return [audit_endpoint.AuditEndpoint]
@property
def status_endpoints(self):
return []
@property
def notification_endpoints(self):
return self.collector_manager.get_notification_endpoints()
@property
def collector_manager(self):
return manager.CollectorManager()

View File

@ -48,15 +48,38 @@ class DecisionEngineAPI(service.Service):
class DecisionEngineAPIManager(object):
API_VERSION = '1.0'
@property
def service_name(self):
return None
conductor_endpoints = []
status_endpoints = [notification_handler.NotificationHandler]
notification_endpoints = []
notification_topics = []
@property
def api_version(self):
return '1.0'
def __init__(self):
self.publisher_id = CONF.watcher_decision_engine.publisher_id
self.conductor_topic = CONF.watcher_decision_engine.conductor_topic
self.status_topic = CONF.watcher_decision_engine.status_topic
self.api_version = self.API_VERSION
@property
def publisher_id(self):
return CONF.watcher_decision_engine.publisher_id
@property
def conductor_topic(self):
return CONF.watcher_decision_engine.conductor_topic
@property
def status_topic(self):
return CONF.watcher_decision_engine.status_topic
@property
def notification_topics(self):
return []
@property
def conductor_endpoints(self):
return []
@property
def status_endpoints(self):
return [notification_handler.NotificationHandler]
@property
def notification_endpoints(self):
return []

View File

@ -20,6 +20,7 @@ from watcher.objects import audit_template
from watcher.objects import efficacy_indicator
from watcher.objects import goal
from watcher.objects import scoring_engine
from watcher.objects import service
from watcher.objects import strategy
Audit = audit.Audit
@ -30,6 +31,7 @@ Goal = goal.Goal
ScoringEngine = scoring_engine.ScoringEngine
Strategy = strategy.Strategy
EfficacyIndicator = efficacy_indicator.EfficacyIndicator
Service = service.Service
__all__ = ("Audit", "AuditTemplate", "Action", "ActionPlan",
"Goal", "ScoringEngine", "Strategy", "EfficacyIndicator")
__all__ = ("Audit", "AuditTemplate", "Action", "ActionPlan", "Goal",
"ScoringEngine", "Strategy", "EfficacyIndicator", "Service")

View File

@ -311,7 +311,7 @@ class WatcherObject(object):
"""Returns a dict of changed fields and their new values."""
changes = {}
for key in self.obj_what_changed():
changes[key] = self[key]
changes[key] = self._attr_to_primitive(key)
return changes
def obj_what_changed(self):

178
watcher/objects/service.py Normal file
View File

@ -0,0 +1,178 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.common import exception
from watcher.common import utils
from watcher.db import api as dbapi
from watcher.objects import base
from watcher.objects import utils as obj_utils
class ServiceStatus(object):
ACTIVE = 'ACTIVE'
FAILED = 'FAILED'
class Service(base.WatcherObject):
dbapi = dbapi.get_instance()
fields = {
'id': int,
'name': obj_utils.str_or_none,
'host': obj_utils.str_or_none,
'last_seen_up': obj_utils.datetime_or_str_or_none
}
@staticmethod
def _from_db_object(service, db_service):
"""Converts a database entity to a formal object."""
for field in service.fields:
service[field] = db_service[field]
service.obj_reset_changes()
return service
@staticmethod
def _from_db_object_list(db_objects, cls, context):
"""Converts a list of database entities to a list of formal objects."""
return [Service._from_db_object(cls(context), obj)
for obj in db_objects]
@classmethod
def get(cls, context, service_id):
"""Find a service based on its id
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
:param service_id: the id of a service.
:returns: a :class:`Service` object.
"""
if utils.is_int_like(service_id):
db_service = cls.dbapi.get_service_by_id(context, service_id)
service = Service._from_db_object(cls(context), db_service)
return service
else:
raise exception.InvalidIdentity(identity=service_id)
@classmethod
def get_by_name(cls, context, name):
"""Find a service based on name
:param name: the name of a service.
:param context: Security context
:returns: a :class:`Service` object.
"""
db_service = cls.dbapi.get_service_by_name(context, name)
service = cls._from_db_object(cls(context), db_service)
return service
@classmethod
def list(cls, context, limit=None, marker=None, filters=None,
sort_key=None, sort_dir=None):
"""Return a list of :class:`Service` objects.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
:param filters: dict mapping the filter key to a value.
:param limit: maximum number of resources to return in a single result.
:param marker: pagination marker for large data sets.
:param sort_key: column to sort results by.
:param sort_dir: direction to sort. "asc" or "desc".
:returns: a list of :class:`Service` object.
"""
db_services = cls.dbapi.get_service_list(
context,
filters=filters,
limit=limit,
marker=marker,
sort_key=sort_key,
sort_dir=sort_dir)
return Service._from_db_object_list(db_services, cls, context)
def create(self, context=None):
"""Create a :class:`Service` record in the DB.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
"""
values = self.obj_get_changes()
db_service = self.dbapi.create_service(values)
self._from_db_object(self, db_service)
def save(self, context=None):
"""Save updates to this :class:`Service`.
Updates will be made column by column based on the result
of self.what_changed().
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
"""
updates = self.obj_get_changes()
self.dbapi.update_service(self.id, updates)
self.obj_reset_changes()
def refresh(self, context=None):
"""Loads updates for this :class:`Service`.
Loads a service with the same id from the database and
checks for updated attributes. Updates are applied from
the loaded service column by column, if there are any updates.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
"""
current = self.__class__.get(self._context, service_id=self.id)
for field in self.fields:
if (hasattr(self, base.get_attrname(field)) and
self[field] != current[field]):
self[field] = current[field]
def soft_delete(self, context=None):
"""Soft Delete the :class:`Service` from the DB.
:param context: Security context. NOTE: This should only
be used internally by the indirection_api.
Unfortunately, RPC requires context as the first
argument, even though we don't use it.
A context should be set when instantiating the
object, e.g.: Service(context)
"""
self.dbapi.soft_delete_service(self.id)

View File

@ -25,25 +25,33 @@ import six
from watcher._i18n import _
def datetime_or_none(dt):
def datetime_or_none(value, tzinfo_aware=False):
"""Validate a datetime or None value."""
if dt is None:
if value is None:
return None
elif isinstance(dt, datetime.datetime):
if dt.utcoffset() is None:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
return dt.replace(tzinfo=iso8601.iso8601.Utc())
else:
return dt
raise ValueError(_("A datetime.datetime is required here"))
if isinstance(value, six.string_types):
# NOTE(danms): Being tolerant of isotime strings here will help us
# during our objects transition
value = timeutils.parse_isotime(value)
elif not isinstance(value, datetime.datetime):
raise ValueError(
_("A datetime.datetime is required here. Got %s"), value)
if value.utcoffset() is None and tzinfo_aware:
# NOTE(danms): Legacy objects from sqlalchemy are stored in UTC,
# but are returned without a timezone attached.
# As a transitional aid, assume a tz-naive object is in UTC.
value = value.replace(tzinfo=iso8601.iso8601.Utc())
elif not tzinfo_aware:
value = value.replace(tzinfo=None)
return value
def datetime_or_str_or_none(val):
def datetime_or_str_or_none(val, tzinfo_aware=False):
if isinstance(val, six.string_types):
return timeutils.parse_isotime(val)
return datetime_or_none(val)
return datetime_or_none(val, tzinfo_aware=tzinfo_aware)
def numeric_or_none(val):

View File

@ -37,7 +37,8 @@ class TestV1Root(base.FunctionalTest):
not_resources = ('id', 'links', 'media_types')
actual_resources = tuple(set(data.keys()) - set(not_resources))
expected_resources = ('audit_templates', 'audits', 'actions',
'action_plans', 'scoring_engines')
'action_plans', 'scoring_engines',
'services')
self.assertEqual(sorted(expected_resources), sorted(actual_resources))
self.assertIn({'type': 'application/vnd.openstack.watcher.v1+json',

View File

@ -0,0 +1,173 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_serialization import jsonutils
from six.moves.urllib import parse as urlparse
from watcher.tests.api import base as api_base
from watcher.tests.objects import utils as obj_utils
class TestListService(api_base.FunctionalTest):
def _assert_service_fields(self, service):
service_fields = ['id', 'name', 'host', 'status']
for field in service_fields:
self.assertIn(field, service)
def test_one(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services')
self.assertEqual(service.id, response['services'][0]["id"])
self._assert_service_fields(response['services'][0])
def test_get_one_by_id(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services/%s' % service.id)
self.assertEqual(service.id, response["id"])
self.assertEqual(service.name, response["name"])
self._assert_service_fields(response)
def test_get_one_by_name(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json(urlparse.quote(
'/services/%s' % service['name']))
self.assertEqual(service.id, response['id'])
self._assert_service_fields(response)
def test_get_one_soft_deleted(self):
service = obj_utils.create_test_service(self.context)
service.soft_delete()
response = self.get_json(
'/services/%s' % service['id'],
headers={'X-Show-Deleted': 'True'})
self.assertEqual(service.id, response['id'])
self._assert_service_fields(response)
response = self.get_json(
'/services/%s' % service['id'],
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_detail(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services/detail')
self.assertEqual(service.id, response['services'][0]["id"])
self._assert_service_fields(response['services'][0])
for service in response['services']:
self.assertTrue(
all(val is not None for key, val in service.items()
if key in ['id', 'name', 'host', 'status'])
)
def test_detail_against_single(self):
service = obj_utils.create_test_service(self.context)
response = self.get_json('/services/%s/detail' % service.id,
expect_errors=True)
self.assertEqual(404, response.status_int)
def test_many(self):
service_list = []
for idx in range(1, 6):
service = obj_utils.create_test_service(
self.context, id=idx, host='CONTROLLER',
name='SERVICE_{0}'.format(idx))
service_list.append(service.id)
response = self.get_json('/services')
self.assertEqual(5, len(response['services']))
for service in response['services']:
self.assertTrue(
all(val is not None for key, val in service.items()
if key in ['id', 'name', 'host', 'status']))
def test_many_without_soft_deleted(self):
service_list = []
for id_ in [1, 2, 3]:
service = obj_utils.create_test_service(
self.context, id=id_, host='CONTROLLER',
name='SERVICE_{0}'.format(id_))
service_list.append(service.id)
for id_ in [4, 5]:
service = obj_utils.create_test_service(
self.context, id=id_, host='CONTROLLER',
name='SERVICE_{0}'.format(id_))
service.soft_delete()
response = self.get_json('/services')
self.assertEqual(3, len(response['services']))
ids = [s['id'] for s in response['services']]
self.assertEqual(sorted(service_list), sorted(ids))
def test_services_collection_links(self):
for idx in range(1, 6):
obj_utils.create_test_service(
self.context, id=idx,
host='CONTROLLER',
name='SERVICE_{0}'.format(idx))
response = self.get_json('/services/?limit=2')
self.assertEqual(2, len(response['services']))
def test_services_collection_links_default_limit(self):
for idx in range(1, 6):
obj_utils.create_test_service(
self.context, id=idx,
host='CONTROLLER',
name='SERVICE_{0}'.format(idx))
cfg.CONF.set_override('max_limit', 3, 'api', enforce_type=True)
response = self.get_json('/services')
self.assertEqual(3, len(response['services']))
class TestServicePolicyEnforcement(api_base.FunctionalTest):
def _common_policy_check(self, rule, func, *arg, **kwarg):
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
rule: "rule:default"})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
jsonutils.loads(response.json['error_message'])['faultstring'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
"service:get_all", self.get_json, '/services',
expect_errors=True)
def test_policy_disallow_get_one(self):
service = obj_utils.create_test_service(self.context)
self._common_policy_check(
"service:get", self.get_json,
'/services/%s' % service.id,
expect_errors=True)
def test_policy_disallow_detail(self):
self._common_policy_check(
"service:detail", self.get_json,
'/services/detail',
expect_errors=True)
class TestServiceEnforcementWithAdminContext(TestListService,
api_base.AdminRoleTest):
def setUp(self):
super(TestServiceEnforcementWithAdminContext, self).setUp()
self.policy.set_rules({
"admin_api": "(role:admin or role:administrator)",
"default": "rule:admin_api",
"service:detail": "rule:default",
"service:get": "rule:default",
"service:get_all": "rule:default"})

View File

@ -17,11 +17,16 @@
import mock
from oslo_config import cfg
from watcher.common.messaging import messaging_handler
from watcher.common import rpc
from watcher.common import service
from watcher import objects
from watcher.tests import base
CONF = cfg.CONF
class DummyManager(object):
@ -37,6 +42,38 @@ class DummyManager(object):
self.status_topic = "status_topic"
self.notification_topics = []
self.api_version = self.API_VERSION
self.service_name = None
class TestServiceHeartbeat(base.TestCase):
def setUp(self):
super(TestServiceHeartbeat, self).setUp()
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(objects.Service, 'create')
def test_send_beat_with_creating_service(self, mock_create,
mock_list):
CONF.set_default('host', 'fake-fqdn')
service_heartbeat = service.ServiceHeartbeat(
service_name='watcher-service')
mock_list.return_value = []
service_heartbeat.send_beat()
mock_list.assert_called_once_with(mock.ANY,
filters={'name': 'watcher-service',
'host': 'fake-fqdn'})
self.assertEqual(1, mock_create.call_count)
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(objects.Service, 'save')
def test_send_beat_without_creating_service(self, mock_save, mock_list):
service_heartbeat = service.ServiceHeartbeat(
service_name='watcher-service')
mock_list.return_value = [objects.Service(mock.Mock(),
name='watcher-service',
host='controller')]
service_heartbeat.send_beat()
self.assertEqual(1, mock_save.call_count)
class TestService(base.TestCase):

View File

@ -0,0 +1,303 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for manipulating Service via the DB API"""
import freezegun
import six
from oslo_utils import timeutils
from watcher.common import exception
from watcher.tests.db import base
from watcher.tests.db import utils
class TestDbServiceFilters(base.DbTestCase):
FAKE_OLDER_DATE = '2014-01-01T09:52:05.219414'
FAKE_OLD_DATE = '2015-01-01T09:52:05.219414'
FAKE_TODAY = '2016-02-24T09:52:05.219414'
def setUp(self):
super(TestDbServiceFilters, self).setUp()
self.context.show_deleted = True
self._data_setup()
def _data_setup(self):
service1_name = "SERVICE_ID_1"
service2_name = "SERVICE_ID_2"
service3_name = "SERVICE_ID_3"
with freezegun.freeze_time(self.FAKE_TODAY):
self.service1 = utils.create_test_service(
id=1, name=service1_name, host="controller",
last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05"))
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.service2 = utils.create_test_service(
id=2, name=service2_name, host="controller",
last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05"))
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.service3 = utils.create_test_service(
id=3, name=service3_name, host="controller",
last_seen_up=timeutils.parse_isotime("2016-09-22T08:32:05"))
def _soft_delete_services(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_service(self.service1.id)
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.dbapi.soft_delete_service(self.service2.id)
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.dbapi.soft_delete_service(self.service3.id)
def _update_services(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.update_service(
self.service1.id, values={"host": "controller1"})
with freezegun.freeze_time(self.FAKE_OLD_DATE):
self.dbapi.update_service(
self.service2.id, values={"host": "controller2"})
with freezegun.freeze_time(self.FAKE_OLDER_DATE):
self.dbapi.update_service(
self.service3.id, values={"host": "controller3"})
def test_get_service_list_filter_deleted_true(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_service(self.service1.id)
res = self.dbapi.get_service_list(
self.context, filters={'deleted': True})
self.assertEqual([self.service1['name']], [r.name for r in res])
def test_get_service_list_filter_deleted_false(self):
with freezegun.freeze_time(self.FAKE_TODAY):
self.dbapi.soft_delete_service(self.service1.id)
res = self.dbapi.get_service_list(
self.context, filters={'deleted': False})
self.assertEqual(
set([self.service2['name'], self.service3['name']]),
set([r.name for r in res]))
def test_get_service_list_filter_deleted_at_eq(self):
self._soft_delete_services()
res = self.dbapi.get_service_list(
self.context, filters={'deleted_at__eq': self.FAKE_TODAY})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_deleted_at_lt(self):
self._soft_delete_services()
res = self.dbapi.get_service_list(
self.context, filters={'deleted_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_deleted_at_lte(self):
self._soft_delete_services()
res = self.dbapi.get_service_list(
self.context, filters={'deleted_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_deleted_at_gt(self):
self._soft_delete_services()
res = self.dbapi.get_service_list(
self.context, filters={'deleted_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_deleted_at_gte(self):
self._soft_delete_services()
res = self.dbapi.get_service_list(
self.context, filters={'deleted_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service1['id'], self.service2['id']]),
set([r.id for r in res]))
# created_at #
def test_get_service_list_filter_created_at_eq(self):
res = self.dbapi.get_service_list(
self.context, filters={'created_at__eq': self.FAKE_TODAY})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_created_at_lt(self):
res = self.dbapi.get_service_list(
self.context, filters={'created_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_created_at_lte(self):
res = self.dbapi.get_service_list(
self.context, filters={'created_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_created_at_gt(self):
res = self.dbapi.get_service_list(
self.context, filters={'created_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_created_at_gte(self):
res = self.dbapi.get_service_list(
self.context, filters={'created_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service1['id'], self.service2['id']]),
set([r.id for r in res]))
# updated_at #
def test_get_service_list_filter_updated_at_eq(self):
self._update_services()
res = self.dbapi.get_service_list(
self.context, filters={'updated_at__eq': self.FAKE_TODAY})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_updated_at_lt(self):
self._update_services()
res = self.dbapi.get_service_list(
self.context, filters={'updated_at__lt': self.FAKE_TODAY})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_updated_at_lte(self):
self._update_services()
res = self.dbapi.get_service_list(
self.context, filters={'updated_at__lte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service2['id'], self.service3['id']]),
set([r.id for r in res]))
def test_get_service_list_filter_updated_at_gt(self):
self._update_services()
res = self.dbapi.get_service_list(
self.context, filters={'updated_at__gt': self.FAKE_OLD_DATE})
self.assertEqual([self.service1['id']], [r.id for r in res])
def test_get_service_list_filter_updated_at_gte(self):
self._update_services()
res = self.dbapi.get_service_list(
self.context, filters={'updated_at__gte': self.FAKE_OLD_DATE})
self.assertEqual(
set([self.service1['id'], self.service2['id']]),
set([r.id for r in res]))
class DbServiceTestCase(base.DbTestCase):
def _create_test_service(self, **kwargs):
service = utils.get_test_service(**kwargs)
self.dbapi.create_service(service)
return service
def test_get_service_list(self):
ids = []
for i in range(1, 6):
service = utils.create_test_service(
id=i,
name="SERVICE_ID_%s" % i,
host="controller_{0}".format(i))
ids.append(six.text_type(service['id']))
res = self.dbapi.get_service_list(self.context)
res_ids = [r.id for r in res]
self.assertEqual(ids.sort(), res_ids.sort())
def test_get_service_list_with_filters(self):
service1 = self._create_test_service(
id=1,
name="SERVICE_ID_1",
host="controller_1",
)
service2 = self._create_test_service(
id=2,
name="SERVICE_ID_2",
host="controller_2",
)
res = self.dbapi.get_service_list(
self.context, filters={'host': 'controller_1'})
self.assertEqual([service1['id']], [r.id for r in res])
res = self.dbapi.get_service_list(
self.context, filters={'host': 'controller_3'})
self.assertEqual([], [r.id for r in res])
res = self.dbapi.get_service_list(
self.context,
filters={'host': 'controller_2'})
self.assertEqual([service2['id']], [r.id for r in res])
def test_get_service_by_name(self):
created_service = self._create_test_service()
service = self.dbapi.get_service_by_name(
self.context, created_service['name'])
self.assertEqual(service.name, created_service['name'])
def test_get_service_that_does_not_exist(self):
self.assertRaises(exception.ServiceNotFound,
self.dbapi.get_service_by_id,
self.context, 404)
def test_update_service(self):
service = self._create_test_service()
res = self.dbapi.update_service(
service['id'], {'host': 'controller_test'})
self.assertEqual('controller_test', res.host)
def test_update_service_that_does_not_exist(self):
self.assertRaises(exception.ServiceNotFound,
self.dbapi.update_service,
405,
{'name': ''})
def test_create_service_already_exists(self):
service_id = "STRATEGY_ID"
self._create_test_service(name=service_id)
self.assertRaises(exception.ServiceAlreadyExists,
self._create_test_service,
name=service_id)

View File

@ -12,7 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Magnum test utilities."""
"""Watcher test utilities."""
from oslo_utils import timeutils
from watcher.db import api as db_api
@ -212,6 +214,33 @@ def get_test_strategy(**kwargs):
}
def get_test_service(**kwargs):
return {
'id': kwargs.get('id', 1),
'name': kwargs.get('name', 'watcher-service'),
'host': kwargs.get('host', 'controller'),
'last_seen_up': kwargs.get(
'last_seen_up',
timeutils.parse_isotime('2016-09-22T08:32:06').replace(tzinfo=None)
),
'created_at': kwargs.get('created_at'),
'updated_at': kwargs.get('updated_at'),
'deleted_at': kwargs.get('deleted_at'),
}
def create_test_service(**kwargs):
"""Create test service entry in DB and return Service DB object.
Function to be used to create test Service objects in the database.
:param kwargs: kwargs with overriding values for service's attributes.
:returns: Test Service DB object.
"""
service = get_test_service(**kwargs)
dbapi = db_api.get_instance()
return dbapi.create_service(service)
def create_test_strategy(**kwargs):
"""Create test strategy entry in DB and return Strategy DB object.

View File

@ -26,6 +26,7 @@ class FakeManager(object):
def __init__(self):
self.api_version = self.API_VERSION
self.service_name = None
# fake cluster instead on Nova CDM
self.fake_cdmc = faker_cluster_state.FakerModelCollector()

View File

@ -53,7 +53,11 @@ policy_data = """
"strategy:detail": "",
"strategy:get": "",
"strategy:get_all": ""
"strategy:get_all": "",
"service:detail": "",
"service:get": "",
"service:get_all": ""
}
"""

View File

@ -118,19 +118,29 @@ class TestUtils(test_base.TestCase):
def test_datetime_or_none(self):
naive_dt = datetime.datetime.now()
dt = timeutils.parse_isotime(timeutils.isotime(naive_dt))
self.assertEqual(dt, utils.datetime_or_none(dt))
self.assertEqual(dt, utils.datetime_or_none(dt, tzinfo_aware=True))
self.assertEqual(naive_dt.replace(tzinfo=iso8601.iso8601.Utc(),
microsecond=0),
utils.datetime_or_none(dt))
utils.datetime_or_none(dt, tzinfo_aware=True))
self.assertIsNone(utils.datetime_or_none(None))
self.assertRaises(ValueError, utils.datetime_or_none, 'foo')
def test_datetime_or_none_tzinfo_naive(self):
naive_dt = datetime.datetime.utcnow()
self.assertEqual(naive_dt, utils.datetime_or_none(naive_dt,
tzinfo_aware=False))
self.assertIsNone(utils.datetime_or_none(None))
self.assertRaises(ValueError, utils.datetime_or_none, 'foo')
def test_datetime_or_str_or_none(self):
dts = timeutils.isotime()
dt = timeutils.parse_isotime(dts)
self.assertEqual(dt, utils.datetime_or_str_or_none(dt))
self.assertIsNone(utils.datetime_or_str_or_none(None))
self.assertEqual(dt, utils.datetime_or_str_or_none(dts))
self.assertEqual(dt, utils.datetime_or_str_or_none(dt,
tzinfo_aware=True))
self.assertIsNone(utils.datetime_or_str_or_none(None,
tzinfo_aware=True))
self.assertEqual(dt, utils.datetime_or_str_or_none(dts,
tzinfo_aware=True))
self.assertRaises(ValueError, utils.datetime_or_str_or_none, 'foo')
def test_int_or_none(self):

View File

@ -0,0 +1,105 @@
# Copyright 2015 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from testtools import matchers
from watcher import objects
from watcher.tests.db import base
from watcher.tests.db import utils
class TestServiceObject(base.DbTestCase):
def setUp(self):
super(TestServiceObject, self).setUp()
self.fake_service = utils.get_test_service()
def test_get_by_id(self):
service_id = self.fake_service['id']
with mock.patch.object(self.dbapi, 'get_service_by_id',
autospec=True) as mock_get_service:
mock_get_service.return_value = self.fake_service
service = objects.Service.get(self.context, service_id)
mock_get_service.assert_called_once_with(self.context,
service_id)
self.assertEqual(self.context, service._context)
def test_list(self):
with mock.patch.object(self.dbapi, 'get_service_list',
autospec=True) as mock_get_list:
mock_get_list.return_value = [self.fake_service]
services = objects.Service.list(self.context)
self.assertEqual(1, mock_get_list.call_count, 1)
self.assertThat(services, matchers.HasLength(1))
self.assertIsInstance(services[0], objects.Service)
self.assertEqual(self.context, services[0]._context)
def test_create(self):
with mock.patch.object(self.dbapi, 'create_service',
autospec=True) as mock_create_service:
mock_create_service.return_value = self.fake_service
service = objects.Service(self.context, **self.fake_service)
fake_service = utils.get_test_service()
service.create()
mock_create_service.assert_called_once_with(fake_service)
self.assertEqual(self.context, service._context)
def test_save(self):
_id = self.fake_service['id']
with mock.patch.object(self.dbapi, 'get_service_by_id',
autospec=True) as mock_get_service:
mock_get_service.return_value = self.fake_service
with mock.patch.object(self.dbapi, 'update_service',
autospec=True) as mock_update_service:
service = objects.Service.get(self.context, _id)
service.name = 'UPDATED NAME'
service.save()
mock_get_service.assert_called_once_with(self.context, _id)
mock_update_service.assert_called_once_with(
_id, {'name': 'UPDATED NAME'})
self.assertEqual(self.context, service._context)
def test_refresh(self):
_id = self.fake_service['id']
returns = [dict(self.fake_service, name="first name"),
dict(self.fake_service, name="second name")]
expected = [mock.call(self.context, _id),
mock.call(self.context, _id)]
with mock.patch.object(self.dbapi, 'get_service_by_id',
side_effect=returns,
autospec=True) as mock_get_service:
service = objects.Service.get(self.context, _id)
self.assertEqual("first name", service.name)
service.refresh()
self.assertEqual("second name", service.name)
self.assertEqual(expected, mock_get_service.call_args_list)
self.assertEqual(self.context, service._context)
def test_soft_delete(self):
_id = self.fake_service['id']
with mock.patch.object(self.dbapi, 'get_service_by_id',
autospec=True) as mock_get_service:
mock_get_service.return_value = self.fake_service
with mock.patch.object(self.dbapi, 'soft_delete_service',
autospec=True) as mock_soft_delete:
service = objects.Service.get(self.context, _id)
service.soft_delete()
mock_get_service.assert_called_once_with(self.context, _id)
mock_soft_delete.assert_called_once_with(_id)
self.assertEqual(self.context, service._context)

View File

@ -178,6 +178,32 @@ def create_test_scoring_engine(context, **kw):
return scoring_engine
def get_test_service(context, **kw):
"""Return a Service object with appropriate attributes.
NOTE: The object leaves the attributes marked as changed, such
that a create() could be used to commit it to the DB.
"""
db_service = db_utils.get_test_service(**kw)
service = objects.Service(context)
for key in db_service:
if key == 'last_seen_up':
db_service[key] = None
setattr(service, key, db_service[key])
return service
def create_test_service(context, **kw):
"""Create and return a test service object.
Create a service in the DB and return a Service object with
appropriate attributes.
"""
service = get_test_service(context, **kw)
service.create()
return service
def get_test_strategy(context, **kw):
"""Return a Strategy object with appropriate attributes.

View File

@ -279,3 +279,24 @@ class InfraOptimClientJSON(base.BaseInfraOptimClient):
:return: Serialized strategy as a dictionary
"""
return self._show_request('/strategies', strategy)
# ### SERVICES ### #
@base.handle_errors
def list_services(self, **kwargs):
"""List all existing services"""
return self._list_request('/services', **kwargs)
@base.handle_errors
def list_services_detail(self, **kwargs):
"""Lists details of all existing services"""
return self._list_request('/services/detail', **kwargs)
@base.handle_errors
def show_service(self, service):
"""Gets a specific service
:param service: Name of the strategy
:return: Serialized strategy as a dictionary
"""
return self._show_request('/services', service)

View File

@ -0,0 +1,73 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
from tempest import test
from watcher_tempest_plugin.tests.api.admin import base
class TestShowListService(base.BaseInfraOptimTest):
"""Tests for services"""
DECISION_ENGINE = "watcher-decision-engine"
APPLIER = "watcher-applier"
@classmethod
def resource_setup(cls):
super(TestShowListService, cls).resource_setup()
def assert_expected(self, expected, actual,
keys=('created_at', 'updated_at', 'deleted_at')):
super(TestShowListService, self).assert_expected(
expected, actual, keys)
@test.attr(type='smoke')
def test_show_service(self):
_, service = self.client.show_service(self.DECISION_ENGINE)
self.assertEqual(self.DECISION_ENGINE, service['name'])
self.assertIn("host", service.keys())
self.assertIn("last_seen_up", service.keys())
self.assertIn("status", service.keys())
@test.attr(type='smoke')
def test_show_service_with_links(self):
_, service = self.client.show_service(self.DECISION_ENGINE)
self.assertIn('links', service.keys())
self.assertEqual(2, len(service['links']))
self.assertIn(str(service['id']),
service['links'][0]['href'])
@test.attr(type="smoke")
def test_list_services(self):
_, body = self.client.list_services()
self.assertIn('services', body)
services = body['services']
self.assertIn(self.DECISION_ENGINE,
[i['name'] for i in body['services']])
for service in services:
self.assertTrue(
all(val is not None for key, val in service.items()
if key in ['id', 'name', 'host', 'status',
'last_seen_up']))
# Verify self links.
for service in body['services']:
self.validate_self_link('services', service['id'],
service['links'][0]['href'])