Fix up join() and leave() methods of servicegroup
The join() method was not documented properly, and the Zookeeper driver's implementation of the join() method returned a FakeLoopingCall object for no reason whatsoever (the other drivers just return None), so it's not necessary. The leave() method wasn't used anywhere at all, therefore this patch removes it entirely. Change-Id: Idbd95636dbebfb4f21a25d23625ee338c1ba863e Closes-bug: #1414513 Closes-bug: #1414515
This commit is contained in:
@@ -75,21 +75,14 @@ class API(object):
|
||||
self._driver = importutils.import_object(driver_class,
|
||||
*args, **kwargs)
|
||||
|
||||
def join(self, member_id, group_id, service=None):
|
||||
"""Add a new member to the ServiceGroup
|
||||
def join(self, member, group, service=None):
|
||||
"""Add a new member to a service group.
|
||||
|
||||
@param member_id: the joined member ID
|
||||
@param group_id: the group name, of the joined member
|
||||
@param service: the parameter can be used for notifications about
|
||||
disconnect mode and update some internals
|
||||
:param member: the joined member ID/name
|
||||
:param group: the group ID/name, of the joined member
|
||||
:param service: a `nova.service.Service` object
|
||||
"""
|
||||
|
||||
LOG.debug('Join new ServiceGroup member %(member_id)s to the '
|
||||
'%(group_id)s group, service = %(service)s',
|
||||
{'member_id': member_id,
|
||||
'group_id': group_id,
|
||||
'service': service})
|
||||
return self._driver.join(member_id, group_id, service)
|
||||
return self._driver.join(member, group, service)
|
||||
|
||||
def service_is_up(self, member):
|
||||
"""Check if the given member is up."""
|
||||
@@ -97,15 +90,6 @@ class API(object):
|
||||
# so this doesn't slow down the scheduler
|
||||
return self._driver.is_up(member)
|
||||
|
||||
def leave(self, member_id, group_id):
|
||||
"""Explicitly remove the given member from the ServiceGroup
|
||||
monitoring.
|
||||
"""
|
||||
LOG.debug('Explicitly remove the given member %(member_id)s from the'
|
||||
'%(group_id)s group monitoring',
|
||||
{'member_id': member_id, 'group_id': group_id})
|
||||
return self._driver.leave(member_id, group_id)
|
||||
|
||||
def get_all(self, group_id):
|
||||
"""Returns ALL members of the given group."""
|
||||
LOG.debug('Returns ALL members of the [%s] '
|
||||
|
||||
@@ -17,18 +17,19 @@ import random
|
||||
class Driver(object):
|
||||
"""Base class for all ServiceGroup drivers."""
|
||||
|
||||
def join(self, member_id, group_id, service=None):
|
||||
"""Join the given service with its group."""
|
||||
def join(self, member, group, service=None):
|
||||
"""Add a new member to a service group.
|
||||
|
||||
:param member: the joined member ID/name
|
||||
:param group: the group ID/name, of the joined member
|
||||
:param service: a `nova.service.Service` object
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def is_up(self, member):
|
||||
"""Check whether the given member is up."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def leave(self, member_id, group_id):
|
||||
"""Remove the given member from the ServiceGroup monitoring."""
|
||||
raise NotImplementedError()
|
||||
|
||||
def get_all(self, group_id):
|
||||
"""Returns ALL members of the given group."""
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -46,12 +46,16 @@ class DbDriver(base.Driver):
|
||||
self.conductor_api = conductor.API(use_local=self.db_allowed)
|
||||
self.service_down_time = CONF.service_down_time
|
||||
|
||||
def join(self, member_id, group_id, service=None):
|
||||
"""Join the given service with its group."""
|
||||
def join(self, member, group, service=None):
|
||||
"""Add a new member to a service group.
|
||||
|
||||
LOG.debug('DB_Driver: join new ServiceGroup member %(member_id)s to '
|
||||
'the %(group_id)s group, service = %(service)s',
|
||||
{'member_id': member_id, 'group_id': group_id,
|
||||
:param member: the joined member ID/name
|
||||
:param group: the group ID/name, of the joined member
|
||||
:param service: a `nova.service.Service` object
|
||||
"""
|
||||
LOG.debug('DB_Driver: join new ServiceGroup member %(member)s to '
|
||||
'the %(group)s group, service = %(service)s',
|
||||
{'member': member, 'group': group,
|
||||
'service': service})
|
||||
if service is None:
|
||||
raise RuntimeError(_('service is a mandatory argument for DB based'
|
||||
|
||||
@@ -23,7 +23,6 @@ from oslo_utils import importutils
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _LE, _LW
|
||||
from nova.openstack.common import loopingcall
|
||||
from nova.servicegroup.drivers import base
|
||||
|
||||
evzookeeper = importutils.try_import('evzookeeper')
|
||||
@@ -107,43 +106,33 @@ class ZooKeeperDriver(base.Driver):
|
||||
'of it in production right now may be risky.'))
|
||||
return session
|
||||
|
||||
def join(self, member_id, group, service=None):
|
||||
"""Join the given service with its group."""
|
||||
# process id
|
||||
def join(self, member, group, service=None):
|
||||
"""Add a new member to a service group.
|
||||
|
||||
:param member: the joined member ID/name
|
||||
:param group: the group ID/name, of the joined member
|
||||
:param service: a `nova.service.Service` object
|
||||
"""
|
||||
process_id = str(os.getpid())
|
||||
LOG.debug('ZooKeeperDriver: join new member %(id)s(%(pid)s) to the '
|
||||
'%(gr)s group, service=%(sr)s',
|
||||
{'id': member_id, 'pid': process_id,
|
||||
{'id': member, 'pid': process_id,
|
||||
'gr': group, 'sr': service})
|
||||
member = self._memberships.get((group, member_id), None)
|
||||
member = self._memberships.get((group, member), None)
|
||||
if member is None:
|
||||
# the first time to join. Generate a new object
|
||||
path = "%s/%s/%s" % (CONF.zookeeper.sg_prefix, group, member_id)
|
||||
path = "%s/%s/%s" % (CONF.zookeeper.sg_prefix, group, member)
|
||||
try:
|
||||
member = membership.Membership(self._session, path, process_id)
|
||||
zk_member = membership.Membership(self._session, path,
|
||||
process_id)
|
||||
except RuntimeError:
|
||||
LOG.exception(_LE("Unable to join. It is possible that either"
|
||||
" another node exists with the same name, or"
|
||||
" this node just restarted. We will try "
|
||||
"again in a short while to make sure."))
|
||||
eventlet.sleep(CONF.zookeeper.sg_retry_interval)
|
||||
member = membership.Membership(self._session, path, member_id)
|
||||
self._memberships[(group, member_id)] = member
|
||||
return FakeLoopingCall(self, member_id, group)
|
||||
|
||||
def leave(self, member_id, group):
|
||||
"""Remove the given member from the service group."""
|
||||
LOG.debug('ZooKeeperDriver.leave: %(member)s from group %(group)s',
|
||||
{'member': member_id, 'group': group})
|
||||
try:
|
||||
key = (group, member_id)
|
||||
member = self._memberships[key]
|
||||
member.leave()
|
||||
del self._memberships[key]
|
||||
except KeyError:
|
||||
LOG.error(_LE('ZooKeeperDriver.leave: %(id)s has not joined '
|
||||
'to the %(gr)s group'),
|
||||
{'id': member_id, 'gr': group})
|
||||
zk_member = membership.Membership(self._session, path, member)
|
||||
self._memberships[(group, member)] = zk_member
|
||||
|
||||
def is_up(self, service_ref):
|
||||
group_id = service_ref['topic']
|
||||
@@ -209,22 +198,3 @@ class ZooKeeperDriver(base.Driver):
|
||||
all_members = filter(have_processes, all_members)
|
||||
|
||||
return all_members
|
||||
|
||||
|
||||
class FakeLoopingCall(loopingcall.LoopingCallBase):
|
||||
"""The fake Looping Call implementation, created for backward
|
||||
compatibility with a membership based on DB.
|
||||
"""
|
||||
def __init__(self, driver, host, group):
|
||||
self._driver = driver
|
||||
self._group = group
|
||||
self._host = host
|
||||
|
||||
def stop(self):
|
||||
self._driver.leave(self._host, self._group)
|
||||
|
||||
def start(self, interval, initial_delay=None):
|
||||
pass
|
||||
|
||||
def wait(self):
|
||||
pass
|
||||
|
||||
@@ -26,7 +26,7 @@ $ nosetests nova.tests.unit.servicegroup.test_zk_driver
|
||||
"""
|
||||
import os
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from nova import servicegroup
|
||||
from nova.servicegroup.drivers import zk
|
||||
@@ -37,33 +37,23 @@ class ZKServiceGroupTestCase(test.NoDBTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ZKServiceGroupTestCase, self).setUp()
|
||||
from nova.servicegroup.drivers import zk
|
||||
self.flags(servicegroup_driver='zk')
|
||||
self.flags(address='localhost:2181', group="zookeeper")
|
||||
try:
|
||||
zk.ZooKeeperDriver()
|
||||
__import__('evzookeeper')
|
||||
__import__('zookeeper')
|
||||
except ImportError:
|
||||
self.skipTest("Unable to test due to lack of ZooKeeper")
|
||||
|
||||
def test_join_leave(self):
|
||||
# Need to do this here, as opposed to the setUp() method, otherwise
|
||||
# the decorate will cause an import error...
|
||||
@mock.patch('evzookeeper.ZKSession')
|
||||
def _setup_sg_api(self, zk_sess_mock):
|
||||
self.zk_sess = mock.MagicMock()
|
||||
zk_sess_mock.return_value = self.zk_sess
|
||||
self.flags(servicegroup_driver='zk')
|
||||
self.flags(address='ignored', group="zookeeper")
|
||||
self.servicegroup_api = servicegroup.API()
|
||||
service_id = {'topic': 'unittest', 'host': 'serviceA'}
|
||||
self.servicegroup_api.join(service_id['host'], service_id['topic'])
|
||||
self.assertTrue(self.servicegroup_api.service_is_up(service_id))
|
||||
self.servicegroup_api.leave(service_id['host'], service_id['topic'])
|
||||
# make sure zookeeper is updated and watcher is triggered
|
||||
eventlet.sleep(1)
|
||||
self.assertFalse(self.servicegroup_api.service_is_up(service_id))
|
||||
|
||||
def test_stop(self):
|
||||
self.servicegroup_api = servicegroup.API()
|
||||
service_id = {'topic': 'unittest', 'host': 'serviceA'}
|
||||
pulse = self.servicegroup_api.join(service_id['host'],
|
||||
service_id['topic'], None)
|
||||
self.assertTrue(self.servicegroup_api.service_is_up(service_id))
|
||||
pulse.stop()
|
||||
eventlet.sleep(1)
|
||||
self.assertFalse(self.servicegroup_api.service_is_up(service_id))
|
||||
|
||||
def test_zookeeper_hierarchy_structure(self):
|
||||
"""Test that hierarchy created by join method contains process id."""
|
||||
@@ -100,3 +90,12 @@ class ZKServiceGroupTestCase(test.NoDBTestCase):
|
||||
# check that internal private session attribute is ready
|
||||
self.assertIsInstance(driver.__dict__['_ZooKeeperDriver__session'],
|
||||
evzookeeper.ZKSession)
|
||||
|
||||
@mock.patch('evzookeeper.membership.Membership')
|
||||
def test_join(self, mem_mock):
|
||||
self._setup_sg_api()
|
||||
mem_mock.return_value = mock.sentinel.zk_mem
|
||||
self.servicegroup_api.join('fake-host', 'fake-topic')
|
||||
mem_mock.assert_called_once_with(self.zk_sess,
|
||||
'/fake-topic',
|
||||
'fake-host')
|
||||
|
||||
Reference in New Issue
Block a user