Merge "Added a service heartbeat driver using Memcached."
This commit is contained in:
commit
c8fc2fba5f
@ -1689,7 +1689,8 @@
|
||||
# Options defined in nova.servicegroup.api
|
||||
#
|
||||
|
||||
# The driver for servicegroup service. (string value)
|
||||
# The driver for servicegroup service (valid options are: db,
|
||||
# zk, mc) (string value)
|
||||
#servicegroup_driver=db
|
||||
|
||||
|
||||
|
@ -29,7 +29,9 @@ LOG = logging.getLogger(__name__)
|
||||
_default_driver = 'db'
|
||||
servicegroup_driver_opt = cfg.StrOpt('servicegroup_driver',
|
||||
default=_default_driver,
|
||||
help='The driver for servicegroup service.')
|
||||
help='The driver for servicegroup '
|
||||
'service (valid options are: '
|
||||
'db, zk, mc)')
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opt(servicegroup_driver_opt)
|
||||
@ -40,7 +42,8 @@ class API(object):
|
||||
_driver = None
|
||||
_driver_name_class_mapping = {
|
||||
'db': 'nova.servicegroup.drivers.db.DbDriver',
|
||||
'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver'
|
||||
'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver',
|
||||
'mc': 'nova.servicegroup.drivers.mc.MemcachedDriver'
|
||||
}
|
||||
|
||||
def __new__(cls, *args, **kwargs):
|
||||
|
108
nova/servicegroup/drivers/mc.py
Normal file
108
nova/servicegroup/drivers/mc.py
Normal file
@ -0,0 +1,108 @@
|
||||
# Service heartbeat driver using Memcached
|
||||
# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com>
|
||||
#
|
||||
# This is derived from nova/servicegroup/drivers/db.py.
|
||||
# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com>
|
||||
# Alexey Roytman <roytman at il dot ibm dot com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from nova.common import memorycache
|
||||
from nova import conductor
|
||||
from nova import context
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import log as logging
|
||||
from nova.openstack.common import timeutils
|
||||
from nova.servicegroup import api
|
||||
from nova import utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('service_down_time', 'nova.service')
|
||||
CONF.import_opt('memcached_servers', 'nova.common.memorycache')
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MemcachedDriver(api.ServiceGroupDriver):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
test = kwargs.get('test')
|
||||
if not CONF.memcached_servers and not test:
|
||||
raise RuntimeError(_('memcached_servers not defined'))
|
||||
self.mc = memorycache.get_client()
|
||||
self.db_allowed = kwargs.get('db_allowed', True)
|
||||
self.conductor_api = conductor.API(use_local=self.db_allowed)
|
||||
|
||||
def join(self, member_id, group_id, service=None):
|
||||
"""Join the given service with its group."""
|
||||
|
||||
msg = _('Memcached_Driver: join new ServiceGroup member '
|
||||
'%(member_id)s to the %(group_id)s group, '
|
||||
'service = %(service)s')
|
||||
LOG.debug(msg, locals())
|
||||
if service is None:
|
||||
raise RuntimeError(_('service is a mandatory argument for '
|
||||
'Memcached based ServiceGroup driver'))
|
||||
report_interval = service.report_interval
|
||||
if report_interval:
|
||||
pulse = utils.FixedIntervalLoopingCall(self._report_state, service)
|
||||
pulse.start(interval=report_interval,
|
||||
initial_delay=report_interval)
|
||||
return pulse
|
||||
|
||||
def is_up(self, service_ref):
|
||||
"""Moved from nova.utils
|
||||
Check whether a service is up based on last heartbeat.
|
||||
"""
|
||||
key = "%(topic)s:%(host)s" % service_ref
|
||||
return self.mc.get(str(key)) is not None
|
||||
|
||||
def get_all(self, group_id):
|
||||
"""
|
||||
Returns ALL members of the given group
|
||||
"""
|
||||
LOG.debug(_('Memcached_Driver: get_all members of the %s group') %
|
||||
group_id)
|
||||
rs = []
|
||||
ctxt = context.get_admin_context()
|
||||
services = self.conductor_api.service_get_all_by_topic(ctxt, group_id)
|
||||
for service in services:
|
||||
if self.is_up(service):
|
||||
rs.append(service['host'])
|
||||
return rs
|
||||
|
||||
def _report_state(self, service):
|
||||
"""Update the state of this service in the datastore."""
|
||||
ctxt = context.get_admin_context()
|
||||
try:
|
||||
key = "%(topic)s:%(host)s" % service.service_ref
|
||||
# memcached has data expiration time capability.
|
||||
# set(..., time=CONF.service_down_time) uses it and
|
||||
# reduces key-deleting code.
|
||||
self.mc.set(str(key),
|
||||
timeutils.utcnow(),
|
||||
time=CONF.service_down_time)
|
||||
|
||||
# TODO(termie): make this pattern be more elegant.
|
||||
if getattr(service, 'model_disconnected', False):
|
||||
service.model_disconnected = False
|
||||
LOG.error(_('Recovered model server connection!'))
|
||||
|
||||
# TODO(vish): this should probably only catch connection errors
|
||||
except Exception: # pylint: disable=W0702
|
||||
if not getattr(service, 'model_disconnected', False):
|
||||
service.model_disconnected = True
|
||||
LOG.exception(_('model server went away'))
|
220
nova/tests/servicegroup/test_mc_servicegroup.py
Normal file
220
nova/tests/servicegroup/test_mc_servicegroup.py
Normal file
@ -0,0 +1,220 @@
|
||||
# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com>
|
||||
#
|
||||
# This is derived from test_db_servicegroup.py.
|
||||
# Copyright (c) IBM 2012 Alexey Roytman <roytman at il dot ibm dot com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import fixtures
|
||||
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova.openstack.common import timeutils
|
||||
from nova import service
|
||||
from nova import servicegroup
|
||||
from nova import test
|
||||
|
||||
|
||||
class ServiceFixture(fixtures.Fixture):
|
||||
|
||||
def __init__(self, host, binary, topic):
|
||||
super(ServiceFixture, self).__init__()
|
||||
self.host = host
|
||||
self.binary = binary
|
||||
self.topic = topic
|
||||
self.serv = None
|
||||
|
||||
def setUp(self):
|
||||
super(ServiceFixture, self).setUp()
|
||||
self.serv = service.Service(self.host,
|
||||
self.binary,
|
||||
self.topic,
|
||||
'nova.tests.test_service.FakeManager',
|
||||
1, 1)
|
||||
self.addCleanup(self.serv.kill)
|
||||
|
||||
|
||||
class MemcachedServiceGroupTestCase(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(MemcachedServiceGroupTestCase, self).setUp()
|
||||
servicegroup.API._driver = None
|
||||
self.flags(servicegroup_driver='mc')
|
||||
self.down_time = 3
|
||||
self.flags(enable_new_services=True)
|
||||
self.flags(service_down_time=self.down_time)
|
||||
self.servicegroup_api = servicegroup.API(test=True)
|
||||
self._host = 'foo'
|
||||
self._binary = 'nova-fake'
|
||||
self._topic = 'unittest'
|
||||
self._ctx = context.get_admin_context()
|
||||
|
||||
def test_memcached_driver(self):
|
||||
serv = self.useFixture(
|
||||
ServiceFixture(self._host, self._binary, self._topic)).serv
|
||||
serv.start()
|
||||
service_ref = db.service_get_by_args(self._ctx,
|
||||
self._host,
|
||||
self._binary)
|
||||
hostkey = str("%s:%s" % (self._topic, self._host))
|
||||
self.servicegroup_api._driver.mc.set(hostkey,
|
||||
timeutils.utcnow(),
|
||||
time=self.down_time)
|
||||
|
||||
self.assertTrue(self.servicegroup_api.service_is_up(service_ref))
|
||||
eventlet.sleep(self.down_time + 1)
|
||||
service_ref = db.service_get_by_args(self._ctx,
|
||||
self._host,
|
||||
self._binary)
|
||||
|
||||
self.assertTrue(self.servicegroup_api.service_is_up(service_ref))
|
||||
serv.stop()
|
||||
eventlet.sleep(self.down_time + 1)
|
||||
service_ref = db.service_get_by_args(self._ctx,
|
||||
self._host,
|
||||
self._binary)
|
||||
self.assertFalse(self.servicegroup_api.service_is_up(service_ref))
|
||||
|
||||
def test_get_all(self):
|
||||
host1 = self._host + '_1'
|
||||
host2 = self._host + '_2'
|
||||
host3 = self._host + '_3'
|
||||
|
||||
serv1 = self.useFixture(
|
||||
ServiceFixture(host1, self._binary, self._topic)).serv
|
||||
serv1.start()
|
||||
|
||||
serv2 = self.useFixture(
|
||||
ServiceFixture(host2, self._binary, self._topic)).serv
|
||||
serv2.start()
|
||||
|
||||
serv3 = self.useFixture(
|
||||
ServiceFixture(host3, self._binary, self._topic)).serv
|
||||
serv3.start()
|
||||
|
||||
service_ref1 = db.service_get_by_args(self._ctx,
|
||||
host1,
|
||||
self._binary)
|
||||
service_ref2 = db.service_get_by_args(self._ctx,
|
||||
host2,
|
||||
self._binary)
|
||||
service_ref3 = db.service_get_by_args(self._ctx,
|
||||
host3,
|
||||
self._binary)
|
||||
|
||||
host1key = str("%s:%s" % (self._topic, host1))
|
||||
host2key = str("%s:%s" % (self._topic, host2))
|
||||
host3key = str("%s:%s" % (self._topic, host3))
|
||||
self.servicegroup_api._driver.mc.set(host1key,
|
||||
timeutils.utcnow(),
|
||||
time=self.down_time)
|
||||
self.servicegroup_api._driver.mc.set(host2key,
|
||||
timeutils.utcnow(),
|
||||
time=self.down_time)
|
||||
self.servicegroup_api._driver.mc.set(host3key,
|
||||
timeutils.utcnow(),
|
||||
time=-1)
|
||||
|
||||
services = self.servicegroup_api.get_all(self._topic)
|
||||
|
||||
self.assertTrue(host1 in services)
|
||||
self.assertTrue(host2 in services)
|
||||
self.assertFalse(host3 in services)
|
||||
|
||||
service_id = self.servicegroup_api.get_one(self._topic)
|
||||
self.assertTrue(service_id in services)
|
||||
|
||||
def test_service_is_up(self):
|
||||
serv = self.useFixture(
|
||||
ServiceFixture(self._host, self._binary, self._topic)).serv
|
||||
serv.start()
|
||||
service_ref = db.service_get_by_args(self._ctx,
|
||||
self._host,
|
||||
self._binary)
|
||||
fake_now = 1000
|
||||
down_time = 5
|
||||
self.flags(service_down_time=down_time)
|
||||
self.mox.StubOutWithMock(timeutils, 'utcnow_ts')
|
||||
self.servicegroup_api = servicegroup.API()
|
||||
hostkey = str("%s:%s" % (self._topic, self._host))
|
||||
|
||||
# Up (equal)
|
||||
timeutils.utcnow_ts().AndReturn(fake_now)
|
||||
timeutils.utcnow_ts().AndReturn(fake_now + down_time - 1)
|
||||
self.mox.ReplayAll()
|
||||
self.servicegroup_api._driver.mc.set(hostkey,
|
||||
timeutils.utcnow(),
|
||||
time=down_time)
|
||||
result = self.servicegroup_api.service_is_up(service_ref)
|
||||
self.assertTrue(result)
|
||||
|
||||
self.mox.ResetAll()
|
||||
# Up
|
||||
timeutils.utcnow_ts().AndReturn(fake_now)
|
||||
timeutils.utcnow_ts().AndReturn(fake_now + down_time - 2)
|
||||
self.mox.ReplayAll()
|
||||
self.servicegroup_api._driver.mc.set(hostkey,
|
||||
timeutils.utcnow(),
|
||||
time=down_time)
|
||||
result = self.servicegroup_api.service_is_up(service_ref)
|
||||
self.assertTrue(result)
|
||||
|
||||
self.mox.ResetAll()
|
||||
# Down
|
||||
timeutils.utcnow_ts().AndReturn(fake_now)
|
||||
timeutils.utcnow_ts().AndReturn(fake_now + down_time)
|
||||
self.mox.ReplayAll()
|
||||
self.servicegroup_api._driver.mc.set(hostkey,
|
||||
timeutils.utcnow(),
|
||||
time=down_time)
|
||||
result = self.servicegroup_api.service_is_up(service_ref)
|
||||
self.assertFalse(result)
|
||||
|
||||
self.mox.ResetAll()
|
||||
# Down
|
||||
timeutils.utcnow_ts().AndReturn(fake_now)
|
||||
timeutils.utcnow_ts().AndReturn(fake_now + down_time + 1)
|
||||
self.mox.ReplayAll()
|
||||
self.servicegroup_api._driver.mc.set(hostkey,
|
||||
timeutils.utcnow(),
|
||||
time=down_time)
|
||||
result = self.servicegroup_api.service_is_up(service_ref)
|
||||
self.assertFalse(result)
|
||||
|
||||
self.mox.ResetAll()
|
||||
|
||||
def test_report_state(self):
|
||||
serv = self.useFixture(
|
||||
ServiceFixture(self._host, self._binary, self._topic)).serv
|
||||
serv.start()
|
||||
service_ref = db.service_get_by_args(self._ctx,
|
||||
self._host,
|
||||
self._binary)
|
||||
self.servicegroup_api = servicegroup.API()
|
||||
|
||||
# updating model_disconnected
|
||||
serv.model_disconnected = True
|
||||
self.servicegroup_api._driver._report_state(serv)
|
||||
self.assertFalse(serv.model_disconnected)
|
||||
|
||||
# handling exception
|
||||
serv.model_disconnected = True
|
||||
self.servicegroup_api._driver.mc = None
|
||||
self.servicegroup_api._driver._report_state(serv)
|
||||
self.assertTrue(serv.model_disconnected)
|
||||
|
||||
delattr(serv, 'model_disconnected')
|
||||
self.servicegroup_api._driver.mc = None
|
||||
self.servicegroup_api._driver._report_state(serv)
|
||||
self.assertTrue(serv.model_disconnected)
|
Loading…
Reference in New Issue
Block a user