heat-manage service list

Adds required REST API, Db model and engine service
changes for reporting the heat engine service status.

Change-Id: I3ef29c1efd2015d62eec1033ed3a8c9f42e7a6e2
Implements: blueprint heat-manage-service-list
"DocImpact"
This commit is contained in:
Kanagaraj Manickam 2015-01-09 15:18:34 +05:30
parent f299d0260e
commit 65134b8d99
18 changed files with 725 additions and 2 deletions

View File

@ -67,5 +67,7 @@
"software_deployments:show": "rule:deny_stack_user",
"software_deployments:update": "rule:deny_stack_user",
"software_deployments:delete": "rule:deny_stack_user",
"software_deployments:metadata": ""
"software_deployments:metadata": "",
"service:index": "rule:context_is_admin"
}

View File

@ -18,6 +18,7 @@ from heat.api.openstack.v1 import actions
from heat.api.openstack.v1 import build_info
from heat.api.openstack.v1 import events
from heat.api.openstack.v1 import resources
from heat.api.openstack.v1 import services
from heat.api.openstack.v1 import software_configs
from heat.api.openstack.v1 import software_deployments
from heat.api.openstack.v1 import stacks
@ -396,5 +397,17 @@ class API(wsgi.Router):
}
])
# Services
service_resource = services.create_resource(conf)
with mapper.submapper(
controller=service_resource,
path_prefix='/{tenant_id}/services'
) as sa_mapper:
sa_mapper.connect("service_index",
"",
action="index",
conditions={'method': 'GET'})
# now that all the routes are defined, add a handler for
super(API, self).__init__(mapper)

View File

@ -0,0 +1,50 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.messaging import exceptions
from webob import exc
from heat.api.openstack.v1 import util
from heat.common.i18n import _
from heat.common import serializers
from heat.common import wsgi
from heat.rpc import client as rpc_client
class ServiceController(object):
"""
WSGI controller for reporting the heat engine status in Heat v1 API
"""
# Define request scope (must match what is in policy.json)
REQUEST_SCOPE = 'service'
def __init__(self, options):
self.options = options
self.rpc_client = rpc_client.EngineClient()
@util.policy_enforce
def index(self, req):
try:
services = self.rpc_client.list_services(req.context)
return {'services': services}
except exceptions.MessagingTimeout:
msg = _('All heat engines are down.')
raise exc.HTTPServiceUnavailable(msg)
def create_resource(options):
deserializer = wsgi.JSONRequestDeserializer()
serializer = serializers.JSONResponseSerializer()
return wsgi.Resource(ServiceController(options), deserializer, serializer)

View File

@ -21,7 +21,9 @@ import sys
from oslo.config import cfg
from heat.common import context
from heat.common.i18n import _
from heat.common import service_utils
from heat.db import api
from heat.db import utils
from heat.openstack.common import log
@ -44,6 +46,39 @@ def do_db_sync():
api.db_sync(api.get_engine(), CONF.command.version)
class ServiceManageCommand(object):
def service_list(self):
ctxt = context.get_admin_context()
services = [service_utils.format_service(service)
for service in api.service_get_all(ctxt)]
print_format = "%-16s %-16s %-36s %-10s %-10s %-10s %-10s"
print(print_format % (_('Hostname'),
_('Binary'),
_('Engine_Id'),
_('Host'),
_('Topic'),
_('Status'),
_('Updated At')))
for svc in services:
print(print_format % (svc['hostname'],
svc['binary'],
svc['engine_id'],
svc['host'],
svc['topic'],
svc['status'],
svc['updated_at']))
@staticmethod
def add_service_parsers(subparsers):
service_parser = subparsers.add_parser('service')
service_parser.set_defaults(command_object=ServiceManageCommand)
service_subparsers = service_parser.add_subparsers(dest='action')
list_parser = service_subparsers.add_parser('list')
list_parser.set_defaults(func=ServiceManageCommand().service_list)
def purge_deleted():
"""
Remove database records that have been previously soft deleted
@ -69,6 +104,8 @@ def add_command_parsers(subparsers):
choices=['days', 'hours', 'minutes', 'seconds'],
help=_('Granularity to use for age argument, defaults to days.'))
ServiceManageCommand.add_service_parsers(subparsers)
command_opt = cfg.SubCommandOpt('command',
title='Commands',
help='Show available commands.',

View File

@ -390,3 +390,7 @@ class StopActionFailed(HeatException):
class EventSendFailed(HeatException):
msg_fmt = _("Failed to send message to stack (%(stack_name)s) "
"on other engine (%(engine_id)s)")
class ServiceNotFound(HeatException):
msg_fmt = _("Service %(service_id)s does not found")

View File

@ -0,0 +1,72 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.utils import timeutils
SERVICE_KEYS = (
SERVICE_ID,
SERVICE_HOST,
SERVICE_HOSTNAME,
SERVICE_BINARY,
SERVICE_TOPIC,
SERVICE_ENGINE_ID,
SERVICE_REPORT_INTERVAL,
SERVICE_CREATED_AT,
SERVICE_UPDATED_AT,
SERVICE_DELETED_AT,
SERVICE_STATUS
) = (
'id',
'host',
'hostname',
'binary',
'topic',
'engine_id',
'report_interval',
'created_at',
'updated_at',
'deleted_at',
'status'
)
def format_service(service):
if service is None:
return
status = 'down'
if service.updated_at is not None:
if ((timeutils.utcnow() - service.updated_at).total_seconds()
<= service.report_interval):
status = 'up'
else:
if ((timeutils.utcnow() - service.created_at).total_seconds()
<= service.report_interval):
status = 'up'
result = {
SERVICE_ID: service.id,
SERVICE_BINARY: service.binary,
SERVICE_ENGINE_ID: service.engine_id,
SERVICE_HOST: service.host,
SERVICE_HOSTNAME: service.hostname,
SERVICE_TOPIC: service.topic,
SERVICE_REPORT_INTERVAL: service.report_interval,
SERVICE_CREATED_AT: service.created_at,
SERVICE_UPDATED_AT: service.updated_at,
SERVICE_DELETED_AT: service.deleted_at,
SERVICE_STATUS: status
}
return result

View File

@ -302,6 +302,30 @@ def snapshot_get_all(context, stack_id):
return IMPL.snapshot_get_all(context, stack_id)
def service_create(context, values):
return IMPL.service_create(context, values)
def service_update(context, service_id, values):
return IMPL.service_update(context, service_id, values)
def service_delete(context, service_id, soft_delete=True):
return IMPL.service_delete(context, service_id, soft_delete)
def service_get(context, service_id):
return IMPL.service_get(context, service_id)
def service_get_all(context):
return IMPL.service_get_all(context)
def service_get_all_by_args(context, host, binary, hostname):
return IMPL.service_get_all_by_args(context, host, binary, hostname)
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""
return IMPL.db_sync(engine, version=version)

View File

@ -18,6 +18,7 @@ import sys
from oslo.config import cfg
from oslo.db.sqlalchemy import session as db_session
from oslo.db.sqlalchemy import utils
from oslo.utils import timeutils
import osprofiler.sqlalchemy
import six
import sqlalchemy
@ -816,6 +817,50 @@ def snapshot_get_all(context, stack_id):
stack_id=stack_id, tenant=context.tenant_id)
def service_create(context, values):
service = models.Service()
service.update(values)
service.save(_session(context))
return service
def service_update(context, service_id, values):
service = service_get(context, service_id)
values.update({'updated_at': timeutils.utcnow()})
service.update(values)
service.save(_session(context))
return service
def service_delete(context, service_id, soft_delete=True):
service = service_get(context, service_id)
session = orm_session.Session.object_session(service)
if soft_delete:
service.soft_delete(session=session)
else:
session.delete(service)
session.flush()
def service_get(context, service_id):
result = model_query(context, models.Service).get(service_id)
if result is None:
raise exception.ServiceNotFound(service_id=service_id)
return result
def service_get_all(context):
return (model_query(context, models.Service).
filter_by(deleted_at=None).all())
def service_get_all_by_args(context, host, binary, hostname):
return (model_query(context, models.Service).
filter_by(host=host).
filter_by(binary=binary).
filter_by(hostname=hostname).all())
def purge_deleted(age, granularity='days'):
try:
age = int(age)
@ -840,6 +885,7 @@ def purge_deleted(age, granularity='days'):
meta = sqlalchemy.MetaData()
meta.bind = engine
# Purge deleted stacks
stack = sqlalchemy.Table('stack', meta, autoload=True)
event = sqlalchemy.Table('event', meta, autoload=True)
raw_template = sqlalchemy.Table('raw_template', meta, autoload=True)
@ -863,6 +909,16 @@ def purge_deleted(age, granularity='days'):
user_creds_del = user_creds.delete().where(user_creds.c.id == s[2])
engine.execute(user_creds_del)
# Purge deleted services
service = sqlalchemy.Table('service', meta, autoload=True)
stmt = (sqlalchemy.select([service.c.id]).
where(service.c.deleted_at < time_line))
deleted_services = engine.execute(stmt)
for s in deleted_services:
stmt = service.delete().where(service.c.id == s[0])
engine.execute(stmt)
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""

View File

@ -0,0 +1,51 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
service = sqlalchemy.Table(
'service', meta,
sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
default=lambda: str(uuid.uuid4())),
sqlalchemy.Column('engine_id', sqlalchemy.String(36), nullable=False),
sqlalchemy.Column('host', sqlalchemy.String(255), nullable=False),
sqlalchemy.Column('hostname', sqlalchemy.String(255), nullable=False),
sqlalchemy.Column('binary', sqlalchemy.String(255), nullable=False),
sqlalchemy.Column('topic', sqlalchemy.String(255), nullable=False),
sqlalchemy.Column('report_interval', sqlalchemy.Integer,
nullable=False),
sqlalchemy.Column('created_at', sqlalchemy.DateTime),
sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
service.create()
def downgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
service = sqlalchemy.Table(
'service', meta, autoload=True)
service.drop()

View File

@ -349,3 +349,31 @@ class Snapshot(BASE, HeatBase):
status = sqlalchemy.Column('status', sqlalchemy.String(255))
status_reason = sqlalchemy.Column('status_reason', sqlalchemy.String(255))
stack = relationship(Stack, backref=backref('snapshot'))
class Service(BASE, HeatBase, SoftDelete):
__tablename__ = 'service'
id = sqlalchemy.Column('id',
sqlalchemy.String(36),
primary_key=True,
default=lambda: str(uuid.uuid4()))
engine_id = sqlalchemy.Column('engine_id',
sqlalchemy.String(36),
nullable=False)
host = sqlalchemy.Column('host',
sqlalchemy.String(255),
nullable=False)
hostname = sqlalchemy.Column('hostname',
sqlalchemy.String(255),
nullable=False)
binary = sqlalchemy.Column('binary',
sqlalchemy.String(255),
nullable=False)
topic = sqlalchemy.Column('topic',
sqlalchemy.String(255),
nullable=False)
report_interval = sqlalchemy.Column('report_interval',
sqlalchemy.Integer,
nullable=False)

View File

@ -15,6 +15,7 @@ import collections
import functools
import json
import os
import socket
import warnings
import eventlet
@ -35,6 +36,7 @@ from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.common import identifier
from heat.common import messaging as rpc_messaging
from heat.common import service_utils
from heat.db import api as db_api
from heat.engine import api
from heat.engine import attributes
@ -267,13 +269,15 @@ class EngineService(service.Service):
by the RPC caller.
"""
RPC_API_VERSION = '1.3'
RPC_API_VERSION = '1.4'
def __init__(self, host, topic, manager=None):
super(EngineService, self).__init__()
resources.initialise()
self.host = host
self.topic = topic
self.binary = 'heat-engine'
self.hostname = socket.gethostname()
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
@ -282,6 +286,8 @@ class EngineService(service.Service):
self.engine_id = None
self.thread_group_mgr = None
self.target = None
self.service_id = None
self.manage_thread_grp = None
if cfg.CONF.instance_user:
warnings.warn('The "instance_user" option in heat.conf is '
@ -327,6 +333,10 @@ class EngineService(service.Service):
self._client = rpc_messaging.get_rpc_client(
version=self.RPC_API_VERSION)
self.manage_thread_grp = threadgroup.ThreadGroup()
self.manage_thread_grp.add_timer(cfg.CONF.periodic_interval,
self.service_manage_report)
super(EngineService, self).start()
def stop(self):
@ -348,6 +358,11 @@ class EngineService(service.Service):
self.thread_group_mgr.stop(stack_id, True)
LOG.info(_LI("Stack %s processing was finished"), stack_id)
self.manage_thread_grp.stop()
ctxt = context.get_admin_context()
db_api.service_delete(ctxt, self.service_id)
LOG.info(_LI('Service %s is deleted'), self.service_id)
# Terminate the engine process
LOG.info(_LI("All threads were gone, terminating engine"))
super(EngineService, self).stop()
@ -1478,3 +1493,53 @@ class EngineService(service.Service):
@request_context
def delete_software_deployment(self, cnxt, deployment_id):
db_api.software_deployment_delete(cnxt, deployment_id)
@request_context
def list_services(self, cnxt):
result = [service_utils.format_service(srv)
for srv in db_api.service_get_all(cnxt)]
return result
def service_manage_report(self):
cnxt = context.get_admin_context()
if self.service_id is not None:
# Service is already running
db_api.service_update(
cnxt,
self.service_id,
dict())
LOG.info(_LI('Service %s is updated'), self.service_id)
else:
service_refs = db_api.service_get_all_by_args(cnxt,
self.host,
self.binary,
self.hostname)
if len(service_refs) == 1:
# Service was aborted or stopped
service_ref = service_refs[0]
if service_ref['deleted_at'] is None:
LOG.info(_LI('Service %s was aborted'), self.service_id)
service_ref = db_api.service_update(
cnxt,
service_ref['id'],
dict(engine_id=self.engine_id,
deleted_at=None,
report_interval=cfg.CONF.periodic_interval))
self.service_id = service_ref['id']
LOG.info(_LI('Service %s is restarted'), self.service_id)
elif len(service_refs) == 0:
# Service is started now
service_ref = db_api.service_create(
cnxt,
dict(host=self.host,
hostname=self.hostname,
binary=self.binary,
engine_id=self.engine_id,
topic=self.topic,
report_interval=cfg.CONF.periodic_interval)
)
self.service_id = service_ref['id']
LOG.info(_LI('Service %s is started'), self.service_id)

View File

@ -28,6 +28,7 @@ class EngineClient(object):
1.0 - Initial version.
1.1 - Add support_status argument to list_resource_types()
1.4 - Add support for service list
'''
BASE_RPC_API_VERSION = '1.0'
@ -564,3 +565,6 @@ class EngineClient(object):
return self.call(cnxt, self.make_msg('stack_restore',
stack_identity=stack_identity,
snapshot_id=snapshot_id))
def list_services(self, cnxt):
return self.call(cnxt, self.make_msg('list_services'), version='1.4')

View File

@ -87,6 +87,11 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin,
col = getattr(t.c, column)
self.assertTrue(col.nullable)
def assertColumnIsNotNullable(self, engine, table, column_name):
table = utils.get_table(engine, table)
column = getattr(table.c, column_name)
self.assertFalse(column.nullable)
def assertIndexExists(self, engine, table, index):
t = utils.get_table(engine, table)
index_names = [idx.name for idx in t.indexes]
@ -384,6 +389,24 @@ class HeatMigrationsCheckers(test_migrations.WalkVersionsMixin,
def _check_049(self, engine, data):
self.assertColumnExists(engine, 'user_creds', 'region_name')
def _check_051(self, engine, data):
column_list = [('id', False),
('host', False),
('topic', False),
('binary', False),
('hostname', False),
('engine_id', False),
('report_interval', False),
('updated_at', True),
('created_at', True),
('deleted_at', True)]
for column in column_list:
self.assertColumnExists(engine, 'service', column[0])
if not column[1]:
self.assertColumnIsNotNullable(engine, 'service', column[0])
else:
self.assertColumnIsNullable(engine, 'service', column[0])
class TestHeatMigrationsMySQL(HeatMigrationsCheckers,
test_base.MySQLOpportunisticTestCase):

View File

@ -16,6 +16,7 @@ import json
import mock
from oslo.config import cfg
from oslo.messaging._drivers import common as rpc_common
from oslo.messaging import exceptions
import six
import webob.exc
@ -25,6 +26,7 @@ import heat.api.openstack.v1.actions as actions
import heat.api.openstack.v1.build_info as build_info
import heat.api.openstack.v1.events as events
import heat.api.openstack.v1.resources as resources
import heat.api.openstack.v1.services as services
import heat.api.openstack.v1.software_configs as software_configs
import heat.api.openstack.v1.software_deployments as software_deployments
import heat.api.openstack.v1.stacks as stacks
@ -3663,6 +3665,17 @@ class RoutesTest(common.HeatTestCase):
'stack_id': 'stack_id', 'allowed_methods': 'GET,PUT,PATCH,DELETE'}
)
def test_services(self):
self.assertRoute(
self.m,
'/aaaa/services',
'GET',
'index',
'ServiceController',
{
'tenant_id': 'aaaa'
})
@mock.patch.object(policy.Enforcer, 'enforce')
class ActionControllerTest(ControllerTest, common.HeatTestCase):
@ -4268,3 +4281,37 @@ class SoftwareDeploymentControllerTest(ControllerTest, common.HeatTestCase):
req, deployment_id=deployment_id, tenant_id=self.tenant)
self.assertEqual(404, resp.json['code'])
self.assertEqual('NotFound', resp.json['error']['type'])
class ServiceControllerTest(ControllerTest, common.HeatTestCase):
def setUp(self):
super(ServiceControllerTest, self).setUp()
self.controller = services.ServiceController({})
@mock.patch.object(policy.Enforcer, 'enforce')
def test_index(self, mock_enforce):
self._mock_enforce_setup(
mock_enforce, 'index')
req = self._get('/services')
return_value = []
with mock.patch.object(
self.controller.rpc_client,
'list_services',
return_value=return_value):
resp = self.controller.index(req, tenant_id=self.tenant)
self.assertEqual(
{'services': []}, resp)
@mock.patch.object(policy.Enforcer, 'enforce')
def test_index_503(self, mock_enforce):
self._mock_enforce_setup(
mock_enforce, 'index')
req = self._get('/services')
with mock.patch.object(
self.controller.rpc_client,
'list_services',
side_effect=exceptions.MessagingTimeout()):
self.assertRaises(
webob.exc.HTTPServiceUnavailable,
self.controller.index, req, tenant_id=self.tenant)

View File

@ -0,0 +1,73 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import uuid
from heat.common import service_utils
from heat.db.sqlalchemy import models
from heat.tests import common
class TestServiceUtils(common.HeatTestCase):
def test_status_check(self):
service = models.Service()
service.id = str(uuid.uuid4())
service.engine_id = str(uuid.uuid4())
service.binary = 'heat-engine'
service.hostname = 'host.devstack.org'
service.host = 'engine-1'
service.report_interval = 60
service.topic = 'engine'
service.created_at = datetime.datetime.utcnow()
service.deleted_at = None
service.updated_at = None
service_dict = service_utils.format_service(service)
self.assertEqual(service_dict['id'], service.id)
self.assertEqual(service_dict['engine_id'], service.engine_id)
self.assertEqual(service_dict['host'], service.host)
self.assertEqual(service_dict['hostname'], service.hostname)
self.assertEqual(service_dict['binary'], service.binary)
self.assertEqual(service_dict['topic'], service.topic)
self.assertEqual(service_dict['report_interval'],
service.report_interval)
self.assertEqual(service_dict['created_at'], service.created_at)
self.assertEqual(service_dict['updated_at'], service.updated_at)
self.assertEqual(service_dict['deleted_at'], service.deleted_at)
self.assertEqual(service_dict['status'], 'up')
# check again within first report_interval time (60)
service_dict = service_utils.format_service(service)
self.assertEqual(service_dict['status'], 'up')
# check update not happen within report_interval time (60+)
service.created_at = (datetime.datetime.utcnow() -
datetime.timedelta(0, 70))
service_dict = service_utils.format_service(service)
self.assertEqual(service_dict['status'], 'down')
# check update happened after report_interval time (60+)
service.updated_at = (datetime.datetime.utcnow() -
datetime.timedelta(0, 70))
service_dict = service_utils.format_service(service)
self.assertEqual(service_dict['status'], 'down')
# check update happened within report_interval time (60)
service.updated_at = (datetime.datetime.utcnow() -
datetime.timedelta(0, 50))
service_dict = service_utils.format_service(service)
self.assertEqual(service_dict['status'], 'up')

View File

@ -25,8 +25,10 @@ from oslo.messaging.rpc import dispatcher
from oslo.serialization import jsonutils
import six
from heat.common import context
from heat.common import exception
from heat.common import identifier
from heat.common import service_utils
from heat.common import template_format
from heat.db import api as db_api
from heat.engine.clients.os import glance
@ -3084,6 +3086,87 @@ class StackServiceTest(common.HeatTestCase):
self.eng._validate_new_stack,
self.ctx, 'test_existing_stack', parsed_template)
@mock.patch.object(service.db_api, 'service_get_all')
@mock.patch.object(service_utils, 'format_service')
def test_service_get_all(self, mock_format_service, mock_get_all):
mock_get_all.return_value = [mock.Mock()]
mock_format_service.return_value = mock.Mock()
self.assertEqual(1, len(self.eng.list_services(self.ctx)))
self.assertTrue(service.db_api.service_get_all.called)
mock_format_service.assert_called_once_with(mock.ANY)
@mock.patch.object(service.db_api, 'service_get_all_by_args')
@mock.patch.object(service.db_api, 'service_create')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_start(self,
mock_admin_context,
mock_service_create,
mock_get_all):
self.eng.service_id = None
mock_admin_context.return_value = self.ctx
mock_get_all.return_value = []
srv = dict(id='mock_id')
mock_service_create.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_create.assert_called_once_with(
self.ctx,
dict(host=self.eng.host,
hostname=self.eng.hostname,
binary=self.eng.binary,
engine_id=self.eng.engine_id,
topic=self.eng.topic,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(self.eng.service_id, srv['id'])
@mock.patch.object(service.db_api, 'service_get_all_by_args')
@mock.patch.object(service.db_api, 'service_update')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_restart(
self,
mock_admin_context,
mock_service_update,
mock_get_all):
self.eng.service_id = None
srv = dict(id='mock_id', deleted_at=None)
mock_get_all.return_value = [srv]
mock_admin_context.return_value = self.ctx
mock_service_update.return_value = srv
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_get_all.assert_called_once_with(self.ctx,
self.eng.host,
self.eng.binary,
self.eng.hostname)
mock_service_update.assert_called_once_with(
self.ctx,
srv['id'],
dict(engine_id=self.eng.engine_id,
deleted_at=None,
report_interval=cfg.CONF.periodic_interval))
self.assertEqual(self.eng.service_id, srv['id'])
@mock.patch.object(service.db_api, 'service_update')
@mock.patch.object(context, 'get_admin_context')
def test_service_manage_report_update(
self,
mock_admin_context,
mock_service_update):
self.eng.service_id = 'mock_id'
mock_admin_context.return_value = self.ctx
self.eng.service_manage_report()
mock_admin_context.assert_called_once_with()
mock_service_update.assert_called_once_with(
self.ctx,
'mock_id',
dict())
class SoftwareConfigServiceTest(common.HeatTestCase):

View File

@ -338,3 +338,6 @@ class EngineRpcAPITestCase(testtools.TestCase):
self._test_engine_api('delete_snapshot', 'call',
stack_identity=self.identity,
snapshot_id=snapshot_id)
def test_list_services(self):
self._test_engine_api('list_services', 'call', version='1.4')

View File

@ -1187,6 +1187,20 @@ def create_watch_data(ctx, watch_rule, **kwargs):
return db_api.watch_data_create(ctx, values)
def create_service(ctx, **kwargs):
values = {
'id': '7079762f-c863-4954-ba61-9dccb68c57e2',
'engine_id': 'f9aff81e-bc1f-4119-941d-ad1ea7f31d19',
'host': 'engine-1',
'hostname': 'host1.devstack.org',
'binary': 'heat-engine',
'topic': 'engine',
'report_interval': 60}
values.update(kwargs)
return db_api.service_create(ctx, values)
class DBAPIRawTemplateTest(common.HeatTestCase):
def setUp(self):
super(DBAPIRawTemplateTest, self).setUp()
@ -1981,3 +1995,77 @@ class DBAPIWatchDataTest(common.HeatTestCase):
data = [wd.data for wd in watch_data]
[self.assertIn(val['data'], data) for val in values]
class DBAPIServiceTest(common.HeatTestCase):
def setUp(self):
super(DBAPIServiceTest, self).setUp()
self.ctx = utils.dummy_context()
def test_service_create_get(self):
service = create_service(self.ctx)
ret_service = db_api.service_get(self.ctx, service.id)
self.assertIsNotNone(ret_service)
self.assertEqual(service.id, ret_service.id)
self.assertEqual(service.hostname, ret_service.hostname)
self.assertEqual(service.binary, ret_service.binary)
self.assertEqual(service.host, ret_service.host)
self.assertEqual(service.topic, ret_service.topic)
self.assertEqual(service.engine_id, ret_service.engine_id)
self.assertEqual(service.report_interval, ret_service.report_interval)
self.assertIsNotNone(service.created_at)
self.assertIsNone(service.updated_at)
self.assertIsNone(service.deleted_at)
def test_service_get_all_by_args(self):
# Host-1
values = [{'id': str(uuid.uuid4()),
'hostname': 'host-1',
'host': 'engine-1'}]
# Host-2
for i in [0, 1, 2]:
values.append({'id': str(uuid.uuid4()),
'hostname': 'host-2',
'host': 'engine-%s' % i})
[create_service(self.ctx, **val) for val in values]
services = db_api.service_get_all(self.ctx)
self.assertEqual(4, len(services))
services_by_args = db_api.service_get_all_by_args(self.ctx,
hostname='host-2',
binary='heat-engine',
host='engine-0')
self.assertEqual(1, len(services_by_args))
self.assertEqual('host-2', services_by_args[0].hostname)
self.assertEqual('heat-engine', services_by_args[0].binary)
self.assertEqual('engine-0', services_by_args[0].host)
def test_service_update(self):
service = create_service(self.ctx)
values = {'hostname': 'host-updated',
'host': 'engine-updated',
'retry_interval': 120}
service = db_api.service_update(self.ctx, service.id, values)
self.assertEqual('host-updated', service.hostname)
self.assertEqual(120, service.retry_interval)
self.assertEqual('engine-updated', service.host)
# simple update, expected the updated_at is updated
old_updated_date = service.updated_at
service = db_api.service_update(self.ctx, service.id, dict())
self.assertGreater(service.updated_at, old_updated_date)
def test_service_delete_soft_delete(self):
service = create_service(self.ctx)
# Soft delete
db_api.service_delete(self.ctx, service.id)
ret_service = db_api.service_get(self.ctx, service.id)
self.assertEqual(ret_service.id, service.id)
# Delete
db_api.service_delete(self.ctx, service.id, False)
self.assertRaises(exception.ServiceNotFound, db_api.service_get,
self.ctx, service.id)