basic OperationEngine service: Service class for binaries running on hosts

Create the basic Service class for binaries running
on hosts.
A service takes a manager and enables rpc by listening
to queues based on topic. It also periodically runs
tasks on the manager(optional) and reports it state
to the database services table.
Also add some unit tests for Service class.

Partial-Bug: #1527097
Change-Id: I241c4757d0b2d1880d1a1a59cce007ca9d1037c7
This commit is contained in:
chenying 2015-12-21 17:10:30 +08:00
parent 135ecafba2
commit 71478b89ac
7 changed files with 590 additions and 2 deletions

View File

@ -114,7 +114,7 @@ class ServiceCommands(object):
ctxt = context.get_admin_context()
services = db.service_get_all(ctxt)
print_format = "%-16s %-36s %-16s %-10s %-5s %-10s"
print_format = "%-16s %-36s %-10s %-5s %-10s"
print(print_format % (_('Binary'),
_('Host'),
_('Status'),

View File

@ -173,3 +173,7 @@ class PasteAppNotFound(NotFound):
class ServiceNotFound(NotFound):
message = _("Service %(service_id)s could not be found.")
class HostBinaryNotFound(NotFound):
message = _("Could not find binary %(binary)s on host %(host)s.")

112
smaug/manager.py Normal file
View File

@ -0,0 +1,112 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base Manager class.
Managers are responsible for a certain aspect of the system. It is a logical
grouping of code relating to a portion of the system. In general other
components should be using the manager to make changes to the components that
it is responsible for.
We have adopted a basic strategy of Smart managers and dumb data, which means
rather than attaching methods to data objects, components should call manager
methods that act on the data.
Methods on managers that can be executed locally should be called directly. If
a particular method must execute on a remote host, this should be done via rpc
to the service that wraps the manager
Managers should be responsible for most of the db access, and
non-implementation specific data. Anything implementation specific that can't
be generalized should be done by the Driver.
Managers will often provide methods for initial setup of a host or periodic
tasks to a wrapping service.
This module provides Manager, a base class for managers.
"""
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import periodic_task
from smaug.db import base
from smaug import version
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class PeriodicTasks(periodic_task.PeriodicTasks):
def __init__(self):
super(PeriodicTasks, self).__init__(CONF)
class Manager(base.Base, PeriodicTasks):
# Set RPC API version to 1.0 by default.
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, host=None, db_driver=None):
if not host:
host = CONF.host
self.host = host
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
available (i.e. showing up on RPC and starting to accept RPC calls) to
other components. Child classes should override this method.
"""
pass
def init_host_with_rpc(self):
"""A hook for service to do jobs after RPC is ready.
Like init_host(), this method is a hook where services get a chance
to execute tasks that *need* RPC. Child classes should override
this method.
"""
pass
def service_version(self):
return version.version_string()
def service_config(self):
config = {}
for key in CONF:
config[key] = CONF.get(key, None)
return config
def is_working(self):
"""Method indicating if service is working correctly.
This method is supposed to be overriden by subclasses and return if
manager is working correctly.
"""
return True

View File

@ -44,6 +44,9 @@ TRANSPORT_ALIASES = {}
def init(conf):
if initialized():
return
global TRANSPORT, NOTIFIER
exmods = get_allowed_exmods()
TRANSPORT = messaging.get_transport(conf,

View File

@ -13,23 +13,43 @@
"""Generic Node base class for all workers that run on hosts."""
import inspect
import os
import random
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import loopingcall
from oslo_service import service
from oslo_utils import importutils
from smaug import context
from smaug import db
from smaug import exception
from smaug.i18n import _
from smaug.i18n import _, _LE, _LI, _LW
from smaug import rpc
from smaug import version
from smaug.wsgi import common as wsgi_common
from smaug.wsgi import eventlet_server as wsgi
LOG = logging.getLogger(__name__)
service_opts = [
cfg.IntOpt('report_interval',
default=10,
help='Interval, in seconds, between nodes reporting state '
'to datastore'),
cfg.IntOpt('periodic_interval',
default=60,
help='Interval, in seconds, between running periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='Range, in seconds, to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
cfg.StrOpt('osapi_smaug_listen',
default="0.0.0.0",
help='IP address on which OpenStack Smaug API listens'),
@ -44,6 +64,235 @@ CONF = cfg.CONF
CONF.register_opts(service_opts)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service takes a manager and enables rpc by listening to queues based
on topic. It also periodically runs tasks on the manager and reports
it state to the database services table.
"""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.basic_config_check()
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
self.rpcserver = None
def start(self):
version_string = version.version_string()
LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False
self.manager.init_host()
ctxt = context.get_admin_context()
try:
service_ref = db.service_get_by_args(ctxt,
self.host,
self.binary)
self.service_id = service_ref['id']
except exception.NotFound:
self._create_service_ref(ctxt)
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
self.rpcserver = rpc.get_server(target, endpoints)
self.rpcserver.start()
self.manager.init_host_with_rpc()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(
self.report_state)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
def basic_config_check(self):
"""Perform basic config checks before starting service."""
# Make sure report interval is less than service down time
if self.report_interval:
if CONF.service_down_time <= self.report_interval:
new_down_time = int(self.report_interval * 2.5)
LOG.warning(
_LW("Report interval must be less than service down "
"time. Current config service_down_time: "
"%(service_down_time)s, report_interval for this: "
"service is: %(report_interval)s. Setting global "
"service_down_time to: %(new_down_time)s"),
{'service_down_time': CONF.service_down_time,
'report_interval': self.report_interval,
'new_down_time': new_down_time})
CONF.set_override('service_down_time', new_down_time)
def _create_service_ref(self, context):
service_ref = db.service_create(context,
{'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0})
self.service_id = service_ref['id']
def __getattr__(self, key):
manager = self.__dict__.get('manager', None)
return getattr(manager, key)
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None, service_name=None):
"""Instantiates class and passes back application object.
:param host: defaults to CONF.host
:param binary: defaults to basename of executable
:param topic: defaults to bin_name - 'smaug-' part
:param manager: defaults to CONF.<topic>_manager
:param report_interval: defaults to CONF.report_interval
:param periodic_interval: defaults to CONF.periodic_interval
:param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
"""
if not host:
host = CONF.host
if not binary:
binary = os.path.basename(inspect.stack()[-1][1])
if not topic:
topic = binary
if not manager:
subtopic = topic.rpartition('smaug-')[2]
manager = CONF.get('%s_manager' % subtopic, None)
if report_interval is None:
report_interval = CONF.report_interval
if periodic_interval is None:
periodic_interval = CONF.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay,
service_name=service_name)
return service_obj
def kill(self):
"""Destroy the service object in the datastore."""
self.stop()
try:
db.service_destroy(context.get_admin_context(), self.service_id)
except exception.NotFound:
LOG.warning(_LW('Service killed that has no database entry'))
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpcserver.stop()
except Exception:
pass
for x in self.timers:
try:
x.stop()
except Exception:
pass
self.timers = []
super(Service, self).stop()
def wait(self):
for x in self.timers:
try:
x.wait()
except Exception:
pass
if self.rpcserver:
self.rpcserver.wait()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
def report_state(self):
"""Update the state of this service in the datastore."""
if not self.manager.is_working():
# NOTE(dulek): If manager reports a problem we're not sending
# heartbeats - to indicate that service is actually down.
LOG.error(_LE('Manager for service %(binary)s %(host)s is '
'reporting problems, not sending heartbeat. '
'Service will appear "down".'),
{'binary': self.binary,
'host': self.host})
return
ctxt = context.get_admin_context()
state_catalog = {}
try:
try:
service_ref = db.service_get(ctxt, self.service_id)
except exception.NotFound:
LOG.debug('The service database object disappeared, '
'recreating it.')
self._create_service_ref(ctxt)
service_ref = db.service_get(ctxt, self.service_id)
state_catalog['report_count'] = service_ref['report_count'] + 1
db.service_update(ctxt,
self.service_id, state_catalog)
# TODO(termie): make this pattern be more elegant.
if getattr(self, 'model_disconnected', False):
self.model_disconnected = False
LOG.error(_LE('Recovered model server connection!'))
except db_exc.DBConnectionError:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('model server went away'))
# NOTE(jsbryant) Other DB errors can happen in HA configurations.
# such errors shouldn't kill this thread, so we handle them here.
except db_exc.DBError:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('DBError encountered: '))
except Exception:
if not getattr(self, 'model_disconnected', False):
self.model_disconnected = True
LOG.exception(_LE('Exception encountered: '))
class WSGIService(service.ServiceBase):
"""Provides ability to launch API from a 'paste' configuration."""

View File

@ -16,11 +16,14 @@ import shutil
import fixtures
from oslo_config import cfg
from oslo_log import log
from oslo_messaging import conffixture as messaging_conffixture
from oslo_utils import timeutils
from oslotest import base
from smaug.common import config # noqa Need to register global_opts
from smaug.db import migration
from smaug.db.sqlalchemy import api as sqla_api
from smaug import rpc
from smaug.tests.unit import conf_fixture
test_opts = [
@ -82,9 +85,24 @@ class TestCase(base.BaseTestCase):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
rpc.add_extra_exmods("smaug.tests.unit")
self.addCleanup(rpc.clear_extra_exmods)
self.addCleanup(rpc.cleanup)
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 15
self.useFixture(self.messaging_conf)
rpc.init(CONF)
conf_fixture.set_defaults(CONF)
CONF([], default_config_files=[])
# NOTE(vish): We need a better method for creating fixtures for tests
# now that we have some required db setup for the system
# to work properly.
self.start = timeutils.utcnow()
CONF.set_default('connection', 'sqlite://', 'database')
CONF.set_default('sqlite_synchronous', False, 'database')

View File

@ -17,14 +17,216 @@ Unit Tests for remote procedure calls using queue
import mock
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from smaug import context
from smaug import db
from smaug import exception
from smaug import manager
from smaug import rpc
from smaug import service
from smaug.tests import base
from smaug.wsgi import common as wsgi
test_service_opts = [
cfg.StrOpt("fake_manager",
default="smaug.tests.unit.test_service.FakeManager",
help="Manager for testing"), ]
CONF = cfg.CONF
CONF.register_opts(test_service_opts)
class FakeManager(manager.Manager):
"""Fake manager for tests."""
def __init__(self, host=None,
db_driver=None, service_name=None):
super(FakeManager, self).__init__(host=host,
db_driver=db_driver)
def test_method(self):
return 'manager'
class ExtendedService(service.Service):
def test_method(self):
return 'service'
class ServiceManagerTestCase(base.TestCase):
"""Test cases for Services."""
def test_message_gets_to_manager(self):
serv = service.Service('test',
'test',
'test',
'smaug.tests.unit.test_service.FakeManager')
serv.start()
self.assertEqual('manager', serv.test_method())
def test_override_manager_method(self):
serv = ExtendedService('test',
'test',
'test',
'smaug.tests.unit.test_service.FakeManager')
serv.start()
self.assertEqual('service', serv.test_method())
class ServiceFlagsTestCase(base.TestCase):
def test_service_enabled_on_create_based_on_flag(self):
self.flags(enable_new_services=True)
host = 'foo'
binary = 'smaug-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertFalse(ref['disabled'])
def test_service_disabled_on_create_based_on_flag(self):
self.flags(enable_new_services=False)
host = 'foo'
binary = 'smaug-fake'
app = service.Service.create(host=host, binary=binary)
app.start()
app.stop()
ref = db.service_get(context.get_admin_context(), app.service_id)
db.service_destroy(context.get_admin_context(), app.service_id)
self.assertTrue(ref['disabled'])
class ServiceTestCase(base.TestCase):
"""Test cases for Services."""
def setUp(self):
super(ServiceTestCase, self).setUp()
self.host = 'foo'
self.binary = 'smaug-fake'
self.topic = 'fake'
def test_create(self):
app = service.Service.create(host=self.host,
binary=self.binary,
topic=self.topic)
self.assertTrue(app)
def test_report_state_newly_disconnected(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.side_effect = db_exc.DBConnectionError()
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.report_state()
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
def test_report_state_disconnected_DBError(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.side_effect = db_exc.DBError()
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.report_state()
self.assertTrue(serv.model_disconnected)
self.assertFalse(mock_db.service_update.called)
def test_report_state_newly_connected(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'id': 1}
with mock.patch.object(service, 'db') as mock_db:
mock_db.service_get_by_args.side_effect = exception.NotFound()
mock_db.service_create.return_value = service_ref
mock_db.service_get.return_value = service_ref
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.model_disconnected = True
serv.report_state()
self.assertFalse(serv.model_disconnected)
self.assertTrue(mock_db.service_update.called)
def test_report_state_manager_not_working(self):
service_ref = {'host': self.host,
'binary': self.binary,
'topic': self.topic,
'report_count': 0,
'id': 1}
with mock.patch('smaug.db') as mock_db:
mock_db.service_get.return_value = service_ref
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.manager.is_working = mock.Mock(return_value=False)
serv.start()
serv.report_state()
serv.manager.is_working.assert_called_once_with()
self.assertFalse(mock_db.service_update.called)
def test_service_with_long_report_interval(self):
self.override_config('service_down_time', 10)
self.override_config('report_interval', 10)
service.Service.create(
binary="test_service",
manager="smaug.tests.unit.test_service.FakeManager")
self.assertEqual(25, CONF.service_down_time)
@mock.patch.object(rpc, 'get_server')
@mock.patch('smaug.db')
def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc):
serv = service.Service(
self.host,
self.binary,
self.topic,
'smaug.tests.unit.test_service.FakeManager'
)
serv.start()
serv.stop()
serv.wait()
serv.rpcserver.start.assert_called_once_with()
serv.rpcserver.stop.assert_called_once_with()
serv.rpcserver.wait.assert_called_once_with()
class TestWSGIService(base.TestCase):