From 625e074033bc4d4b42f2ef641a69dd425965ac8f Mon Sep 17 00:00:00 2001 From: Akira Yoshiyama Date: Sat, 9 Feb 2013 14:26:04 +0000 Subject: [PATCH] Added a service heartbeat driver using Memcached. Today the heartbeat information of Nova services/nodes is maintained in the DB, while each service updates the corresponding record in the Service table periodically (by default -- every 10 seconds), specifying the timestamp of the last update. This mechanism is highly inefficient and does not scale. E.g., maintaining the heartbeat information for 1,000 nodes/services would require 100 DB updates per second (just for the heartbeat). This patch adds nova.servicegroup.drivers.memcached, a service heartbeat driver using Memcached. You can reduce DB updates with it. blueprint memcached-service-heartbeat Change-Id: I60bdb1cfbce1fea051f276ebfd6ccc4ad8fe6d2b --- etc/nova/nova.conf.sample | 3 +- nova/servicegroup/api.py | 9 +- nova/servicegroup/drivers/mc.py | 108 +++++++++ .../servicegroup/test_mc_servicegroup.py | 220 ++++++++++++++++++ 4 files changed, 336 insertions(+), 4 deletions(-) create mode 100644 nova/servicegroup/drivers/mc.py create mode 100644 nova/tests/servicegroup/test_mc_servicegroup.py diff --git a/etc/nova/nova.conf.sample b/etc/nova/nova.conf.sample index 61350b183312..9cbb8c1a5adf 100644 --- a/etc/nova/nova.conf.sample +++ b/etc/nova/nova.conf.sample @@ -1689,7 +1689,8 @@ # Options defined in nova.servicegroup.api # -# The driver for servicegroup service. (string value) +# The driver for servicegroup service (valid options are: db, +# zk, mc) (string value) #servicegroup_driver=db diff --git a/nova/servicegroup/api.py b/nova/servicegroup/api.py index 6dc1aa6d1f7f..057a44103ab9 100644 --- a/nova/servicegroup/api.py +++ b/nova/servicegroup/api.py @@ -28,8 +28,10 @@ import random LOG = logging.getLogger(__name__) _default_driver = 'db' servicegroup_driver_opt = cfg.StrOpt('servicegroup_driver', - default=_default_driver, - help='The driver for servicegroup service.') + default=_default_driver, + help='The driver for servicegroup ' + 'service (valid options are: ' + 'db, zk, mc)') CONF = cfg.CONF CONF.register_opt(servicegroup_driver_opt) @@ -40,7 +42,8 @@ class API(object): _driver = None _driver_name_class_mapping = { 'db': 'nova.servicegroup.drivers.db.DbDriver', - 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver' + 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver', + 'mc': 'nova.servicegroup.drivers.mc.MemcachedDriver' } def __new__(cls, *args, **kwargs): diff --git a/nova/servicegroup/drivers/mc.py b/nova/servicegroup/drivers/mc.py new file mode 100644 index 000000000000..c5048a04cfe0 --- /dev/null +++ b/nova/servicegroup/drivers/mc.py @@ -0,0 +1,108 @@ +# Service heartbeat driver using Memcached +# Copyright (c) 2013 Akira Yoshiyama +# +# This is derived from nova/servicegroup/drivers/db.py. +# Copyright (c) IBM 2012 Pavel Kravchenco +# Alexey Roytman +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nova.common import memorycache +from nova import conductor +from nova import context +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils +from nova.servicegroup import api +from nova import utils + + +CONF = cfg.CONF +CONF.import_opt('service_down_time', 'nova.service') +CONF.import_opt('memcached_servers', 'nova.common.memorycache') + + +LOG = logging.getLogger(__name__) + + +class MemcachedDriver(api.ServiceGroupDriver): + + def __init__(self, *args, **kwargs): + test = kwargs.get('test') + if not CONF.memcached_servers and not test: + raise RuntimeError(_('memcached_servers not defined')) + self.mc = memorycache.get_client() + self.db_allowed = kwargs.get('db_allowed', True) + self.conductor_api = conductor.API(use_local=self.db_allowed) + + def join(self, member_id, group_id, service=None): + """Join the given service with its group.""" + + msg = _('Memcached_Driver: join new ServiceGroup member ' + '%(member_id)s to the %(group_id)s group, ' + 'service = %(service)s') + LOG.debug(msg, locals()) + if service is None: + raise RuntimeError(_('service is a mandatory argument for ' + 'Memcached based ServiceGroup driver')) + report_interval = service.report_interval + if report_interval: + pulse = utils.FixedIntervalLoopingCall(self._report_state, service) + pulse.start(interval=report_interval, + initial_delay=report_interval) + return pulse + + def is_up(self, service_ref): + """Moved from nova.utils + Check whether a service is up based on last heartbeat. + """ + key = "%(topic)s:%(host)s" % service_ref + return self.mc.get(str(key)) is not None + + def get_all(self, group_id): + """ + Returns ALL members of the given group + """ + LOG.debug(_('Memcached_Driver: get_all members of the %s group') % + group_id) + rs = [] + ctxt = context.get_admin_context() + services = self.conductor_api.service_get_all_by_topic(ctxt, group_id) + for service in services: + if self.is_up(service): + rs.append(service['host']) + return rs + + def _report_state(self, service): + """Update the state of this service in the datastore.""" + ctxt = context.get_admin_context() + try: + key = "%(topic)s:%(host)s" % service.service_ref + # memcached has data expiration time capability. + # set(..., time=CONF.service_down_time) uses it and + # reduces key-deleting code. + self.mc.set(str(key), + timeutils.utcnow(), + time=CONF.service_down_time) + + # TODO(termie): make this pattern be more elegant. + if getattr(service, 'model_disconnected', False): + service.model_disconnected = False + LOG.error(_('Recovered model server connection!')) + + # TODO(vish): this should probably only catch connection errors + except Exception: # pylint: disable=W0702 + if not getattr(service, 'model_disconnected', False): + service.model_disconnected = True + LOG.exception(_('model server went away')) diff --git a/nova/tests/servicegroup/test_mc_servicegroup.py b/nova/tests/servicegroup/test_mc_servicegroup.py new file mode 100644 index 000000000000..2551842193e0 --- /dev/null +++ b/nova/tests/servicegroup/test_mc_servicegroup.py @@ -0,0 +1,220 @@ +# Copyright (c) 2013 Akira Yoshiyama +# +# This is derived from test_db_servicegroup.py. +# Copyright (c) IBM 2012 Alexey Roytman +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +import fixtures + +from nova import context +from nova import db +from nova.openstack.common import timeutils +from nova import service +from nova import servicegroup +from nova import test + + +class ServiceFixture(fixtures.Fixture): + + def __init__(self, host, binary, topic): + super(ServiceFixture, self).__init__() + self.host = host + self.binary = binary + self.topic = topic + self.serv = None + + def setUp(self): + super(ServiceFixture, self).setUp() + self.serv = service.Service(self.host, + self.binary, + self.topic, + 'nova.tests.test_service.FakeManager', + 1, 1) + self.addCleanup(self.serv.kill) + + +class MemcachedServiceGroupTestCase(test.TestCase): + + def setUp(self): + super(MemcachedServiceGroupTestCase, self).setUp() + servicegroup.API._driver = None + self.flags(servicegroup_driver='mc') + self.down_time = 3 + self.flags(enable_new_services=True) + self.flags(service_down_time=self.down_time) + self.servicegroup_api = servicegroup.API(test=True) + self._host = 'foo' + self._binary = 'nova-fake' + self._topic = 'unittest' + self._ctx = context.get_admin_context() + + def test_memcached_driver(self): + serv = self.useFixture( + ServiceFixture(self._host, self._binary, self._topic)).serv + serv.start() + service_ref = db.service_get_by_args(self._ctx, + self._host, + self._binary) + hostkey = str("%s:%s" % (self._topic, self._host)) + self.servicegroup_api._driver.mc.set(hostkey, + timeutils.utcnow(), + time=self.down_time) + + self.assertTrue(self.servicegroup_api.service_is_up(service_ref)) + eventlet.sleep(self.down_time + 1) + service_ref = db.service_get_by_args(self._ctx, + self._host, + self._binary) + + self.assertTrue(self.servicegroup_api.service_is_up(service_ref)) + serv.stop() + eventlet.sleep(self.down_time + 1) + service_ref = db.service_get_by_args(self._ctx, + self._host, + self._binary) + self.assertFalse(self.servicegroup_api.service_is_up(service_ref)) + + def test_get_all(self): + host1 = self._host + '_1' + host2 = self._host + '_2' + host3 = self._host + '_3' + + serv1 = self.useFixture( + ServiceFixture(host1, self._binary, self._topic)).serv + serv1.start() + + serv2 = self.useFixture( + ServiceFixture(host2, self._binary, self._topic)).serv + serv2.start() + + serv3 = self.useFixture( + ServiceFixture(host3, self._binary, self._topic)).serv + serv3.start() + + service_ref1 = db.service_get_by_args(self._ctx, + host1, + self._binary) + service_ref2 = db.service_get_by_args(self._ctx, + host2, + self._binary) + service_ref3 = db.service_get_by_args(self._ctx, + host3, + self._binary) + + host1key = str("%s:%s" % (self._topic, host1)) + host2key = str("%s:%s" % (self._topic, host2)) + host3key = str("%s:%s" % (self._topic, host3)) + self.servicegroup_api._driver.mc.set(host1key, + timeutils.utcnow(), + time=self.down_time) + self.servicegroup_api._driver.mc.set(host2key, + timeutils.utcnow(), + time=self.down_time) + self.servicegroup_api._driver.mc.set(host3key, + timeutils.utcnow(), + time=-1) + + services = self.servicegroup_api.get_all(self._topic) + + self.assertTrue(host1 in services) + self.assertTrue(host2 in services) + self.assertFalse(host3 in services) + + service_id = self.servicegroup_api.get_one(self._topic) + self.assertTrue(service_id in services) + + def test_service_is_up(self): + serv = self.useFixture( + ServiceFixture(self._host, self._binary, self._topic)).serv + serv.start() + service_ref = db.service_get_by_args(self._ctx, + self._host, + self._binary) + fake_now = 1000 + down_time = 5 + self.flags(service_down_time=down_time) + self.mox.StubOutWithMock(timeutils, 'utcnow_ts') + self.servicegroup_api = servicegroup.API() + hostkey = str("%s:%s" % (self._topic, self._host)) + + # Up (equal) + timeutils.utcnow_ts().AndReturn(fake_now) + timeutils.utcnow_ts().AndReturn(fake_now + down_time - 1) + self.mox.ReplayAll() + self.servicegroup_api._driver.mc.set(hostkey, + timeutils.utcnow(), + time=down_time) + result = self.servicegroup_api.service_is_up(service_ref) + self.assertTrue(result) + + self.mox.ResetAll() + # Up + timeutils.utcnow_ts().AndReturn(fake_now) + timeutils.utcnow_ts().AndReturn(fake_now + down_time - 2) + self.mox.ReplayAll() + self.servicegroup_api._driver.mc.set(hostkey, + timeutils.utcnow(), + time=down_time) + result = self.servicegroup_api.service_is_up(service_ref) + self.assertTrue(result) + + self.mox.ResetAll() + # Down + timeutils.utcnow_ts().AndReturn(fake_now) + timeutils.utcnow_ts().AndReturn(fake_now + down_time) + self.mox.ReplayAll() + self.servicegroup_api._driver.mc.set(hostkey, + timeutils.utcnow(), + time=down_time) + result = self.servicegroup_api.service_is_up(service_ref) + self.assertFalse(result) + + self.mox.ResetAll() + # Down + timeutils.utcnow_ts().AndReturn(fake_now) + timeutils.utcnow_ts().AndReturn(fake_now + down_time + 1) + self.mox.ReplayAll() + self.servicegroup_api._driver.mc.set(hostkey, + timeutils.utcnow(), + time=down_time) + result = self.servicegroup_api.service_is_up(service_ref) + self.assertFalse(result) + + self.mox.ResetAll() + + def test_report_state(self): + serv = self.useFixture( + ServiceFixture(self._host, self._binary, self._topic)).serv + serv.start() + service_ref = db.service_get_by_args(self._ctx, + self._host, + self._binary) + self.servicegroup_api = servicegroup.API() + + # updating model_disconnected + serv.model_disconnected = True + self.servicegroup_api._driver._report_state(serv) + self.assertFalse(serv.model_disconnected) + + # handling exception + serv.model_disconnected = True + self.servicegroup_api._driver.mc = None + self.servicegroup_api._driver._report_state(serv) + self.assertTrue(serv.model_disconnected) + + delattr(serv, 'model_disconnected') + self.servicegroup_api._driver.mc = None + self.servicegroup_api._driver._report_state(serv) + self.assertTrue(serv.model_disconnected)