Implement ZooKeeper driver for ServiceGroup API.

The ZooKeeper driver uses ephemeral nodes in ZooKeeper to keep
track of node liveness in a service group. The Implementation is
based on the evzookeeper library to combine zookeeper
and eventlet.

Part of blueprint zk-service-heartbeat

DocImpact: new driver

Change-Id: Ia20519de2b4964007f8b91ea5d56d1875510d40f
This commit is contained in:
Yun Mao
2013-02-07 16:46:35 -05:00
parent 90c02e2cf9
commit 8880aadb97
4 changed files with 229 additions and 1 deletions

View File

@@ -1110,3 +1110,8 @@ class CryptoCRLFileNotFound(FileNotFound):
class InstanceRecreateNotSupported(Invalid):
message = _('Instance recreate is not implemented by this virt driver.')
class ServiceGroupUnavailable(NovaException):
message = _("The service from servicegroup driver %(driver) is "
"temporarily unavailable.")

View File

@@ -40,7 +40,8 @@ class API(object):
_driver = None
_driver_name_class_mapping = {
'db': 'nova.servicegroup.drivers.db.DbDriver'
'db': 'nova.servicegroup.drivers.db.DbDriver',
'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver'
}
@lockutils.synchronized('nova.servicegroup.api.new', 'nova-')

View File

@@ -0,0 +1,157 @@
# Copyright (c) AT&T 2012-2013 Yun Mao <yunmao@gmail.com>
#
# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com>
# Alexey Roytman <roytman at il dot ibm dot com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import eventlet
import evzookeeper
from evzookeeper import membership
import zookeeper
from nova import exception
from nova.openstack.common import cfg
from nova.openstack.common import log as logging
from nova.servicegroup import api
from nova import utils
zk_driver_opts = [
cfg.StrOpt('address',
default=None,
help='The ZooKeeper addresses for servicegroup service in the '
'format of host1:port,host2:port,host3:port'),
cfg.IntOpt('recv_timeout',
default=4000,
help='recv_timeout parameter for the zk session'),
cfg.StrOpt('sg_prefix',
default="/servicegroups",
help='The prefix used in ZooKeeper to store ephemeral nodes'),
cfg.IntOpt('sg_retry_interval',
default=5,
help='Number of seconds to wait until retrying to join the '
'session'),
]
CONF = cfg.CONF
CONF.register_opts(zk_driver_opts, group="zk")
LOG = logging.getLogger(__name__)
class ZooKeeperDriver(api.ServiceGroupDriver):
"""ZooKeeper driver for the service group API."""
def __init__(self, *args, **kwargs):
"""Create the zk session object."""
null = open(os.devnull, "w")
self._session = evzookeeper.ZKSession(CONF.zk.address,
recv_timeout=
CONF.zk.recv_timeout,
zklog_fd=null)
self._memberships = {}
self._monitors = {}
# Make sure the prefix exists
try:
self._session.create(CONF.zk.sg_prefix, "",
acl=[evzookeeper.ZOO_OPEN_ACL_UNSAFE])
except zookeeper.NodeExistsException:
pass
super(ZooKeeperDriver, self).__init__()
def join(self, member_id, group, service=None):
"""Join the given service with its group."""
LOG.debug(_('ZooKeeperDriver: join new member %(id)s to the '
'%(gr)s group, service=%(sr)s'),
{'id': member_id, 'gr': group, 'sr': service})
member = self._memberships.get((group, member_id), None)
if member is None:
# the first time to join. Generate a new object
path = "%s/%s" % (CONF.zk.sg_prefix, group)
try:
member = membership.Membership(self._session, path, member_id)
except RuntimeError:
LOG.exception(_("Unable to join. It is possible that either "
"another node exists with the same name, or "
"this node just restarted. We will try "
"again in a short while to make sure."))
eventlet.sleep(CONF.zk.sg_retry_interval)
member = membership.Membership(self._session, path, member_id)
self._memberships[(group, member_id)] = member
return FakeLoopingCall(self, member_id, group)
def leave(self, member_id, group):
"""Remove the given member from the service group."""
LOG.debug(_('ZooKeeperDriver.leave: %(member)s from group %(group)s'),
{'member': member_id, 'group': group})
try:
key = (group, member_id)
member = self._memberships[key]
member.leave()
del self._memberships[key]
except KeyError:
LOG.error(_('ZooKeeperDriver.leave: %(id)s has not joined to the '
'%(gr)s group'), {'id': member_id, 'gr': group})
def is_up(self, service_ref):
group_id = service_ref['topic']
member_id = service_ref['host']
all_members = self.get_all(group_id)
return member_id in all_members
def get_all(self, group_id):
"""Return all members in a list, or a ServiceGroupUnavailable
exception.
"""
monitor = self._monitors.get(group_id, None)
if monitor is None:
path = "%s/%s" % (CONF.zk.sg_prefix, group_id)
monitor = membership.MembershipMonitor(self._session, path)
self._monitors[group_id] = monitor
# Note(maoy): When initialized for the first time, it takes a
# while to retrieve all members from zookeeper. To prevent
# None to be returned, we sleep 5 sec max to wait for data to
# be ready.
for _retry in range(50):
eventlet.sleep(0.1)
all_members = monitor.get_all()
if all_members is not None:
return all_members
all_members = monitor.get_all()
if all_members is None:
raise exception.ServiceGroupUnavailable(driver="ZooKeeperDriver")
return all_members
class FakeLoopingCall(utils.LoopingCallBase):
"""The fake Looping Call implementation, created for backward
compatibility with a membership based on DB.
"""
def __init__(self, driver, host, group):
self._driver = driver
self._group = group
self._host = host
def stop(self):
self._driver.leave(self._host, self._group)
def start(self, interval, initial_delay=None):
pass
def wait(self):
pass

View File

@@ -0,0 +1,65 @@
# Copyright (c) AT&T 2012-2013 Yun Mao <yunmao@gmail.com>
# Copyright (c) IBM 2012 Alexey Roytman <roytman at il dot ibm dot com>.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test the ZooKeeper driver for servicegroup.
You need to install ZooKeeper locally and related dependencies
to run the test. It's unclear how to install python-zookeeper lib
in venv so you might have to run the test without it.
To set up in Ubuntu 12.04:
$ sudo apt-get install zookeeper zookeeperd python-zookeeper
$ sudo pip install evzookeeper
$ nosetests nova.tests.servicegroup.test_zk_driver
"""
import eventlet
from nova import servicegroup
from nova import test
class ZKServiceGroupTestCase(test.TestCase):
def setUp(self):
super(ZKServiceGroupTestCase, self).setUp()
servicegroup.API._driver = None
try:
from nova.servicegroup.drivers import zk
_unused = zk
except ImportError:
self.skipTest("Unable to test due to lack of ZooKeeper")
self.flags(servicegroup_driver='zk')
self.flags(address='localhost:2181', group="zk")
def test_join_leave(self):
self.servicegroup_api = servicegroup.API()
service_id = {'topic': 'unittest', 'host': 'serviceA'}
self.servicegroup_api.join(service_id['host'], service_id['topic'])
self.assertTrue(self.servicegroup_api.service_is_up(service_id))
self.servicegroup_api.leave(service_id['host'], service_id['topic'])
# make sure zookeeper is updated and watcher is triggered
eventlet.sleep(1)
self.assertFalse(self.servicegroup_api.service_is_up(service_id))
def test_stop(self):
self.servicegroup_api = servicegroup.API()
service_id = {'topic': 'unittest', 'host': 'serviceA'}
pulse = self.servicegroup_api.join(service_id['host'],
service_id['topic'], None)
self.assertTrue(self.servicegroup_api.service_is_up(service_id))
pulse.stop()
eventlet.sleep(1)
self.assertFalse(self.servicegroup_api.service_is_up(service_id))