diff --git a/smaug/cmd/manage.py b/smaug/cmd/manage.py index 3168c1d0..17086b63 100644 --- a/smaug/cmd/manage.py +++ b/smaug/cmd/manage.py @@ -114,7 +114,7 @@ class ServiceCommands(object): ctxt = context.get_admin_context() services = db.service_get_all(ctxt) - print_format = "%-16s %-36s %-16s %-10s %-5s %-10s" + print_format = "%-16s %-36s %-10s %-5s %-10s" print(print_format % (_('Binary'), _('Host'), _('Status'), diff --git a/smaug/exception.py b/smaug/exception.py index a56d96d2..4936aa1f 100644 --- a/smaug/exception.py +++ b/smaug/exception.py @@ -173,3 +173,7 @@ class PasteAppNotFound(NotFound): class ServiceNotFound(NotFound): message = _("Service %(service_id)s could not be found.") + + +class HostBinaryNotFound(NotFound): + message = _("Could not find binary %(binary)s on host %(host)s.") diff --git a/smaug/manager.py b/smaug/manager.py new file mode 100644 index 00000000..599aa568 --- /dev/null +++ b/smaug/manager.py @@ -0,0 +1,112 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Base Manager class. + +Managers are responsible for a certain aspect of the system. It is a logical +grouping of code relating to a portion of the system. In general other +components should be using the manager to make changes to the components that +it is responsible for. + +We have adopted a basic strategy of Smart managers and dumb data, which means +rather than attaching methods to data objects, components should call manager +methods that act on the data. + +Methods on managers that can be executed locally should be called directly. If +a particular method must execute on a remote host, this should be done via rpc +to the service that wraps the manager + +Managers should be responsible for most of the db access, and +non-implementation specific data. Anything implementation specific that can't +be generalized should be done by the Driver. + +Managers will often provide methods for initial setup of a host or periodic +tasks to a wrapping service. + +This module provides Manager, a base class for managers. + +""" + + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import periodic_task + +from smaug.db import base +from smaug import version + + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + + +class PeriodicTasks(periodic_task.PeriodicTasks): + def __init__(self): + super(PeriodicTasks, self).__init__(CONF) + + +class Manager(base.Base, PeriodicTasks): + # Set RPC API version to 1.0 by default. + RPC_API_VERSION = '1.0' + + target = messaging.Target(version=RPC_API_VERSION) + + def __init__(self, host=None, db_driver=None): + if not host: + host = CONF.host + self.host = host + self.additional_endpoints = [] + super(Manager, self).__init__(db_driver) + + def periodic_tasks(self, context, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + return self.run_periodic_tasks(context, raise_on_error=raise_on_error) + + def init_host(self): + """Handle initialization if this is a standalone service. + + A hook point for services to execute tasks before the services are made + available (i.e. showing up on RPC and starting to accept RPC calls) to + other components. Child classes should override this method. + + """ + pass + + def init_host_with_rpc(self): + """A hook for service to do jobs after RPC is ready. + + Like init_host(), this method is a hook where services get a chance + to execute tasks that *need* RPC. Child classes should override + this method. + + """ + pass + + def service_version(self): + return version.version_string() + + def service_config(self): + config = {} + for key in CONF: + config[key] = CONF.get(key, None) + return config + + def is_working(self): + """Method indicating if service is working correctly. + + This method is supposed to be overriden by subclasses and return if + manager is working correctly. + """ + return True diff --git a/smaug/rpc.py b/smaug/rpc.py index 28622dd1..5a3bb9df 100644 --- a/smaug/rpc.py +++ b/smaug/rpc.py @@ -44,6 +44,9 @@ TRANSPORT_ALIASES = {} def init(conf): + if initialized(): + return + global TRANSPORT, NOTIFIER exmods = get_allowed_exmods() TRANSPORT = messaging.get_transport(conf, diff --git a/smaug/service.py b/smaug/service.py index beadbea7..9f6e6dae 100644 --- a/smaug/service.py +++ b/smaug/service.py @@ -13,23 +13,43 @@ """Generic Node base class for all workers that run on hosts.""" +import inspect import os +import random from oslo_concurrency import processutils from oslo_config import cfg +from oslo_db import exception as db_exc from oslo_log import log as logging +import oslo_messaging as messaging +from oslo_service import loopingcall from oslo_service import service from oslo_utils import importutils +from smaug import context +from smaug import db from smaug import exception -from smaug.i18n import _ +from smaug.i18n import _, _LE, _LI, _LW from smaug import rpc +from smaug import version from smaug.wsgi import common as wsgi_common from smaug.wsgi import eventlet_server as wsgi LOG = logging.getLogger(__name__) service_opts = [ + cfg.IntOpt('report_interval', + default=10, + help='Interval, in seconds, between nodes reporting state ' + 'to datastore'), + cfg.IntOpt('periodic_interval', + default=60, + help='Interval, in seconds, between running periodic tasks'), + cfg.IntOpt('periodic_fuzzy_delay', + default=60, + help='Range, in seconds, to randomly delay when starting the' + ' periodic task scheduler to reduce stampeding.' + ' (Disable by setting to 0)'), cfg.StrOpt('osapi_smaug_listen', default="0.0.0.0", help='IP address on which OpenStack Smaug API listens'), @@ -44,6 +64,235 @@ CONF = cfg.CONF CONF.register_opts(service_opts) +class Service(service.Service): + """Service object for binaries running on hosts. + + A service takes a manager and enables rpc by listening to queues based + on topic. It also periodically runs tasks on the manager and reports + it state to the database services table. + """ + + def __init__(self, host, binary, topic, manager, report_interval=None, + periodic_interval=None, periodic_fuzzy_delay=None, + service_name=None, *args, **kwargs): + super(Service, self).__init__() + + self.host = host + self.binary = binary + self.topic = topic + self.manager_class_name = manager + manager_class = importutils.import_class(self.manager_class_name) + self.manager = manager_class(host=self.host, + service_name=service_name, + *args, **kwargs) + self.report_interval = report_interval + self.periodic_interval = periodic_interval + self.periodic_fuzzy_delay = periodic_fuzzy_delay + self.basic_config_check() + self.saved_args, self.saved_kwargs = args, kwargs + self.timers = [] + + self.rpcserver = None + + def start(self): + version_string = version.version_string() + LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'), + {'topic': self.topic, 'version_string': version_string}) + self.model_disconnected = False + self.manager.init_host() + ctxt = context.get_admin_context() + try: + service_ref = db.service_get_by_args(ctxt, + self.host, + self.binary) + self.service_id = service_ref['id'] + except exception.NotFound: + self._create_service_ref(ctxt) + + LOG.debug("Creating RPC server for service %s", self.topic) + + target = messaging.Target(topic=self.topic, server=self.host) + endpoints = [self.manager] + endpoints.extend(self.manager.additional_endpoints) + self.rpcserver = rpc.get_server(target, endpoints) + self.rpcserver.start() + + self.manager.init_host_with_rpc() + + if self.report_interval: + pulse = loopingcall.FixedIntervalLoopingCall( + self.report_state) + pulse.start(interval=self.report_interval, + initial_delay=self.report_interval) + self.timers.append(pulse) + + if self.periodic_interval: + if self.periodic_fuzzy_delay: + initial_delay = random.randint(0, self.periodic_fuzzy_delay) + else: + initial_delay = None + + periodic = loopingcall.FixedIntervalLoopingCall( + self.periodic_tasks) + periodic.start(interval=self.periodic_interval, + initial_delay=initial_delay) + self.timers.append(periodic) + + def basic_config_check(self): + """Perform basic config checks before starting service.""" + # Make sure report interval is less than service down time + if self.report_interval: + if CONF.service_down_time <= self.report_interval: + new_down_time = int(self.report_interval * 2.5) + LOG.warning( + _LW("Report interval must be less than service down " + "time. Current config service_down_time: " + "%(service_down_time)s, report_interval for this: " + "service is: %(report_interval)s. Setting global " + "service_down_time to: %(new_down_time)s"), + {'service_down_time': CONF.service_down_time, + 'report_interval': self.report_interval, + 'new_down_time': new_down_time}) + CONF.set_override('service_down_time', new_down_time) + + def _create_service_ref(self, context): + service_ref = db.service_create(context, + {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0}) + self.service_id = service_ref['id'] + + def __getattr__(self, key): + manager = self.__dict__.get('manager', None) + return getattr(manager, key) + + @classmethod + def create(cls, host=None, binary=None, topic=None, manager=None, + report_interval=None, periodic_interval=None, + periodic_fuzzy_delay=None, service_name=None): + """Instantiates class and passes back application object. + + :param host: defaults to CONF.host + :param binary: defaults to basename of executable + :param topic: defaults to bin_name - 'smaug-' part + :param manager: defaults to CONF._manager + :param report_interval: defaults to CONF.report_interval + :param periodic_interval: defaults to CONF.periodic_interval + :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay + + """ + if not host: + host = CONF.host + if not binary: + binary = os.path.basename(inspect.stack()[-1][1]) + if not topic: + topic = binary + if not manager: + subtopic = topic.rpartition('smaug-')[2] + manager = CONF.get('%s_manager' % subtopic, None) + if report_interval is None: + report_interval = CONF.report_interval + if periodic_interval is None: + periodic_interval = CONF.periodic_interval + if periodic_fuzzy_delay is None: + periodic_fuzzy_delay = CONF.periodic_fuzzy_delay + service_obj = cls(host, binary, topic, manager, + report_interval=report_interval, + periodic_interval=periodic_interval, + periodic_fuzzy_delay=periodic_fuzzy_delay, + service_name=service_name) + + return service_obj + + def kill(self): + """Destroy the service object in the datastore.""" + self.stop() + try: + db.service_destroy(context.get_admin_context(), self.service_id) + except exception.NotFound: + LOG.warning(_LW('Service killed that has no database entry')) + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.rpcserver.stop() + except Exception: + pass + for x in self.timers: + try: + x.stop() + except Exception: + pass + self.timers = [] + super(Service, self).stop() + + def wait(self): + for x in self.timers: + try: + x.wait() + except Exception: + pass + if self.rpcserver: + self.rpcserver.wait() + + def periodic_tasks(self, raise_on_error=False): + """Tasks to be run at a periodic interval.""" + ctxt = context.get_admin_context() + self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) + + def report_state(self): + """Update the state of this service in the datastore.""" + if not self.manager.is_working(): + # NOTE(dulek): If manager reports a problem we're not sending + # heartbeats - to indicate that service is actually down. + LOG.error(_LE('Manager for service %(binary)s %(host)s is ' + 'reporting problems, not sending heartbeat. ' + 'Service will appear "down".'), + {'binary': self.binary, + 'host': self.host}) + return + + ctxt = context.get_admin_context() + state_catalog = {} + try: + try: + service_ref = db.service_get(ctxt, self.service_id) + except exception.NotFound: + LOG.debug('The service database object disappeared, ' + 'recreating it.') + self._create_service_ref(ctxt) + service_ref = db.service_get(ctxt, self.service_id) + + state_catalog['report_count'] = service_ref['report_count'] + 1 + + db.service_update(ctxt, + self.service_id, state_catalog) + + # TODO(termie): make this pattern be more elegant. + if getattr(self, 'model_disconnected', False): + self.model_disconnected = False + LOG.error(_LE('Recovered model server connection!')) + + except db_exc.DBConnectionError: + if not getattr(self, 'model_disconnected', False): + self.model_disconnected = True + LOG.exception(_LE('model server went away')) + + # NOTE(jsbryant) Other DB errors can happen in HA configurations. + # such errors shouldn't kill this thread, so we handle them here. + except db_exc.DBError: + if not getattr(self, 'model_disconnected', False): + self.model_disconnected = True + LOG.exception(_LE('DBError encountered: ')) + + except Exception: + if not getattr(self, 'model_disconnected', False): + self.model_disconnected = True + LOG.exception(_LE('Exception encountered: ')) + + class WSGIService(service.ServiceBase): """Provides ability to launch API from a 'paste' configuration.""" diff --git a/smaug/tests/base.py b/smaug/tests/base.py index 4920a8e1..245b91f8 100644 --- a/smaug/tests/base.py +++ b/smaug/tests/base.py @@ -16,11 +16,14 @@ import shutil import fixtures from oslo_config import cfg from oslo_log import log +from oslo_messaging import conffixture as messaging_conffixture +from oslo_utils import timeutils from oslotest import base from smaug.common import config # noqa Need to register global_opts from smaug.db import migration from smaug.db.sqlalchemy import api as sqla_api +from smaug import rpc from smaug.tests.unit import conf_fixture test_opts = [ @@ -82,9 +85,24 @@ class TestCase(base.BaseTestCase): """Run before each test method to initialize test environment.""" super(TestCase, self).setUp() + rpc.add_extra_exmods("smaug.tests.unit") + self.addCleanup(rpc.clear_extra_exmods) + self.addCleanup(rpc.cleanup) + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 15 + self.useFixture(self.messaging_conf) + rpc.init(CONF) + conf_fixture.set_defaults(CONF) CONF([], default_config_files=[]) + # NOTE(vish): We need a better method for creating fixtures for tests + # now that we have some required db setup for the system + # to work properly. + self.start = timeutils.utcnow() + CONF.set_default('connection', 'sqlite://', 'database') CONF.set_default('sqlite_synchronous', False, 'database') diff --git a/smaug/tests/unit/test_service.py b/smaug/tests/unit/test_service.py index 090b88a3..53f8de30 100644 --- a/smaug/tests/unit/test_service.py +++ b/smaug/tests/unit/test_service.py @@ -17,14 +17,216 @@ Unit Tests for remote procedure calls using queue import mock from oslo_concurrency import processutils from oslo_config import cfg +from oslo_db import exception as db_exc +from smaug import context +from smaug import db from smaug import exception +from smaug import manager +from smaug import rpc from smaug import service from smaug.tests import base from smaug.wsgi import common as wsgi +test_service_opts = [ + cfg.StrOpt("fake_manager", + default="smaug.tests.unit.test_service.FakeManager", + help="Manager for testing"), ] + CONF = cfg.CONF +CONF.register_opts(test_service_opts) + + +class FakeManager(manager.Manager): + """Fake manager for tests.""" + def __init__(self, host=None, + db_driver=None, service_name=None): + super(FakeManager, self).__init__(host=host, + db_driver=db_driver) + + def test_method(self): + return 'manager' + + +class ExtendedService(service.Service): + def test_method(self): + return 'service' + + +class ServiceManagerTestCase(base.TestCase): + """Test cases for Services.""" + + def test_message_gets_to_manager(self): + serv = service.Service('test', + 'test', + 'test', + 'smaug.tests.unit.test_service.FakeManager') + serv.start() + self.assertEqual('manager', serv.test_method()) + + def test_override_manager_method(self): + serv = ExtendedService('test', + 'test', + 'test', + 'smaug.tests.unit.test_service.FakeManager') + serv.start() + self.assertEqual('service', serv.test_method()) + + +class ServiceFlagsTestCase(base.TestCase): + def test_service_enabled_on_create_based_on_flag(self): + self.flags(enable_new_services=True) + host = 'foo' + binary = 'smaug-fake' + app = service.Service.create(host=host, binary=binary) + app.start() + app.stop() + ref = db.service_get(context.get_admin_context(), app.service_id) + db.service_destroy(context.get_admin_context(), app.service_id) + self.assertFalse(ref['disabled']) + + def test_service_disabled_on_create_based_on_flag(self): + self.flags(enable_new_services=False) + host = 'foo' + binary = 'smaug-fake' + app = service.Service.create(host=host, binary=binary) + app.start() + app.stop() + ref = db.service_get(context.get_admin_context(), app.service_id) + db.service_destroy(context.get_admin_context(), app.service_id) + self.assertTrue(ref['disabled']) + + +class ServiceTestCase(base.TestCase): + """Test cases for Services.""" + + def setUp(self): + super(ServiceTestCase, self).setUp() + self.host = 'foo' + self.binary = 'smaug-fake' + self.topic = 'fake' + + def test_create(self): + app = service.Service.create(host=self.host, + binary=self.binary, + topic=self.topic) + + self.assertTrue(app) + + def test_report_state_newly_disconnected(self): + service_ref = {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0, + 'id': 1} + with mock.patch.object(service, 'db') as mock_db: + mock_db.service_get_by_args.side_effect = exception.NotFound() + mock_db.service_create.return_value = service_ref + mock_db.service_get.side_effect = db_exc.DBConnectionError() + + serv = service.Service( + self.host, + self.binary, + self.topic, + 'smaug.tests.unit.test_service.FakeManager' + ) + serv.start() + serv.report_state() + self.assertTrue(serv.model_disconnected) + self.assertFalse(mock_db.service_update.called) + + def test_report_state_disconnected_DBError(self): + service_ref = {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0, + 'id': 1} + with mock.patch.object(service, 'db') as mock_db: + mock_db.service_get_by_args.side_effect = exception.NotFound() + mock_db.service_create.return_value = service_ref + mock_db.service_get.side_effect = db_exc.DBError() + + serv = service.Service( + self.host, + self.binary, + self.topic, + 'smaug.tests.unit.test_service.FakeManager' + ) + serv.start() + serv.report_state() + self.assertTrue(serv.model_disconnected) + self.assertFalse(mock_db.service_update.called) + + def test_report_state_newly_connected(self): + service_ref = {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0, + 'id': 1} + with mock.patch.object(service, 'db') as mock_db: + mock_db.service_get_by_args.side_effect = exception.NotFound() + mock_db.service_create.return_value = service_ref + mock_db.service_get.return_value = service_ref + + serv = service.Service( + self.host, + self.binary, + self.topic, + 'smaug.tests.unit.test_service.FakeManager' + ) + serv.start() + serv.model_disconnected = True + serv.report_state() + + self.assertFalse(serv.model_disconnected) + self.assertTrue(mock_db.service_update.called) + + def test_report_state_manager_not_working(self): + service_ref = {'host': self.host, + 'binary': self.binary, + 'topic': self.topic, + 'report_count': 0, + 'id': 1} + with mock.patch('smaug.db') as mock_db: + mock_db.service_get.return_value = service_ref + + serv = service.Service( + self.host, + self.binary, + self.topic, + 'smaug.tests.unit.test_service.FakeManager' + ) + serv.manager.is_working = mock.Mock(return_value=False) + serv.start() + serv.report_state() + + serv.manager.is_working.assert_called_once_with() + self.assertFalse(mock_db.service_update.called) + + def test_service_with_long_report_interval(self): + self.override_config('service_down_time', 10) + self.override_config('report_interval', 10) + service.Service.create( + binary="test_service", + manager="smaug.tests.unit.test_service.FakeManager") + self.assertEqual(25, CONF.service_down_time) + + @mock.patch.object(rpc, 'get_server') + @mock.patch('smaug.db') + def test_service_stop_waits_for_rpcserver(self, mock_db, mock_rpc): + serv = service.Service( + self.host, + self.binary, + self.topic, + 'smaug.tests.unit.test_service.FakeManager' + ) + serv.start() + serv.stop() + serv.wait() + serv.rpcserver.start.assert_called_once_with() + serv.rpcserver.stop.assert_called_once_with() + serv.rpcserver.wait.assert_called_once_with() class TestWSGIService(base.TestCase):